tower_otel_http_metrics/
lib.rs

1#![doc = include_str!("../README.md")]
2//!
3//! [`Layer`]: tower_layer::Layer
4//! [`Service`]: tower_service::Service
5//! [`Future`]: tower_service::Future
6
7use std::borrow::Cow;
8use std::future::Future;
9use std::pin::Pin;
10use std::string::String;
11use std::sync::Arc;
12use std::task::Poll::Ready;
13use std::task::{Context, Poll};
14use std::time::Instant;
15use std::{fmt, result};
16
17#[cfg(feature = "axum")]
18use axum::extract::MatchedPath;
19use futures_util::ready;
20use opentelemetry::metrics::{Histogram, Meter, UpDownCounter};
21use opentelemetry::{global, KeyValue};
22use pin_project_lite::pin_project;
23use tower_layer::Layer;
24use tower_service::Service;
25
26const HTTP_SERVER_DURATION_METRIC: &str = "http.server.request.duration";
27const HTTP_SERVER_DURATION_UNIT: &str = "s";
28
29const HTTP_SERVER_DURATION_BOUNDARIES: [f64; 14] = [
30    0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0,
31];
32const HTTP_SERVER_ACTIVE_REQUESTS_METRIC: &str = "http.server.active_requests";
33const HTTP_SERVER_ACTIVE_REQUESTS_UNIT: &str = "{request}";
34
35const HTTP_SERVER_REQUEST_BODY_SIZE_METRIC: &str = "http.server.request.body.size";
36const HTTP_SERVER_REQUEST_BODY_SIZE_UNIT: &str = "By";
37
38const NETWORK_PROTOCOL_NAME_LABEL: &str = "network.protocol.name";
39const NETWORK_PROTOCOL_VERSION_LABEL: &str = "network.protocol.version";
40const URL_SCHEME_LABEL: &str = "url.scheme";
41
42const HTTP_REQUEST_METHOD_LABEL: &str = "http.request.method";
43#[allow(dead_code)] // cargo check is not smart
44const HTTP_ROUTE_LABEL: &str = "http.route";
45const HTTP_RESPONSE_STATUS_CODE_LABEL: &str = "http.response.status_code";
46
47/// State scoped to the entire middleware Layer.
48///
49/// For now the only global state we hold onto is the metrics instruments.
50/// The OTEL SDKs do support calling for the global meter provider instead of holding a reference
51/// but it seems ideal to avoid extra access to the global meter, which sits behind a RWLock.
52struct HTTPMetricsLayerState {
53    pub server_request_duration: Histogram<f64>,
54    pub server_active_requests: UpDownCounter<i64>,
55    pub server_request_body_size: Histogram<u64>,
56}
57
58#[derive(Clone)]
59/// [`Service`] used by [`HTTPMetricsLayer`]
60pub struct HTTPMetricsService<S> {
61    pub(crate) state: Arc<HTTPMetricsLayerState>,
62    inner_service: S,
63}
64
65#[derive(Clone)]
66/// [`Layer`] which applies the OTEL HTTP server metrics middleware
67pub struct HTTPMetricsLayer {
68    state: Arc<HTTPMetricsLayerState>,
69}
70
71pub struct HTTPMetricsLayerBuilder {
72    meter: Option<Meter>,
73}
74
75/// Error typedef to implement `std::error::Error` for `tower_otel_http_metrics`
76pub struct Error {
77    #[allow(dead_code)]
78    inner: ErrorKind,
79}
80
81/// `Result` typedef to use with the `tower_otel_http_metrics::Error` type
82pub type Result<T> = result::Result<T, Error>;
83
84enum ErrorKind {
85    #[allow(dead_code)]
86    /// Uncategorized
87    Other(String),
88    #[allow(dead_code)]
89    /// Invalid configuration
90    Config(String),
91}
92
93impl fmt::Debug for Error {
94    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
95        f.debug_tuple("tower_otel_http_metrics::Error").finish()
96    }
97}
98
99impl Default for HTTPMetricsLayerBuilder {
100    fn default() -> Self {
101        let meter = global::meter("");
102        HTTPMetricsLayerBuilder { meter: Some(meter) }
103    }
104}
105
106impl HTTPMetricsLayerBuilder {
107    pub fn new() -> Self {
108        HTTPMetricsLayerBuilder { meter: None }
109    }
110
111    pub fn build(self) -> Result<HTTPMetricsLayer> {
112        match self.meter {
113            Some(meter) => Ok(HTTPMetricsLayer {
114                state: Arc::from(HTTPMetricsLayerBuilder::make_state(meter)),
115            }),
116            None => Err(Error {
117                inner: ErrorKind::Config(String::from("no meter provided")),
118            }),
119        }
120    }
121
122    pub fn with_meter(self, meter: Meter) -> Self {
123        HTTPMetricsLayerBuilder { meter: Some(meter) }
124    }
125
126    fn make_state(meter: Meter) -> HTTPMetricsLayerState {
127        HTTPMetricsLayerState {
128            server_request_duration: meter
129                .f64_histogram(Cow::from(HTTP_SERVER_DURATION_METRIC))
130                .with_description("Duration of HTTP server requests.")
131                .with_unit(Cow::from(HTTP_SERVER_DURATION_UNIT))
132                .with_boundaries(HTTP_SERVER_DURATION_BOUNDARIES.to_vec())
133                .build(),
134            server_active_requests: meter
135                .i64_up_down_counter(Cow::from(HTTP_SERVER_ACTIVE_REQUESTS_METRIC))
136                .with_description("Number of active HTTP server requests.")
137                .with_unit(Cow::from(HTTP_SERVER_ACTIVE_REQUESTS_UNIT))
138                .build(),
139            server_request_body_size: meter
140                .u64_histogram(HTTP_SERVER_REQUEST_BODY_SIZE_METRIC)
141                .with_description("Size of HTTP server request bodies.")
142                .with_unit(HTTP_SERVER_REQUEST_BODY_SIZE_UNIT)
143                .build(),
144        }
145    }
146}
147
148impl<S> Layer<S> for HTTPMetricsLayer {
149    type Service = HTTPMetricsService<S>;
150
151    fn layer(&self, service: S) -> Self::Service {
152        HTTPMetricsService {
153            state: self.state.clone(),
154            inner_service: service,
155        }
156    }
157}
158
159/// ResponseFutureMetricsState holds request-scoped data for metrics and their attributes.
160///
161/// ResponseFutureMetricsState lives inside the response future, as it needs to hold data
162/// initialized or extracted from the request before it is forwarded to the inner Service.
163/// The rest of the data (e.g. status code, error) can be extracted from the response
164/// or calculated with respect to the data held here (e.g., duration = now - duration start).
165#[derive(Clone)]
166struct ResponseFutureMetricsState {
167    // fields for the metric values
168    // https://opentelemetry.io/docs/specs/semconv/http/http-metrics/#metric-httpserverrequestduration
169    duration_start: Instant,
170    // https://opentelemetry.io/docs/specs/semconv/http/http-metrics/#metric-httpserverrequestbodysize
171    body_size: Option<u64>,
172
173    // fields for metric labels
174    protocol_name_kv: KeyValue,
175    protocol_version_kv: KeyValue,
176    url_scheme_kv: KeyValue,
177    method_kv: KeyValue,
178    route_kv_opt: Option<KeyValue>,
179}
180
181pin_project! {
182    /// Response [`Future`] for [`HTTPMetricsService`].
183    pub struct HTTPMetricsResponseFuture<F> {
184        #[pin]
185        inner_response_future: F,
186        layer_state: Arc<HTTPMetricsLayerState>,
187        metrics_state: ResponseFutureMetricsState,
188    }
189}
190
191impl<S, ReqBody, ResBody> Service<http::Request<ReqBody>> for HTTPMetricsService<S>
192where
193    S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>>,
194{
195    type Response = S::Response;
196    type Error = S::Error;
197    type Future = HTTPMetricsResponseFuture<S::Future>;
198
199    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<result::Result<(), Self::Error>> {
200        self.inner_service.poll_ready(cx)
201    }
202
203    fn call(&mut self, req: http::Request<ReqBody>) -> Self::Future {
204        let duration_start = Instant::now();
205
206        let headers = req.headers();
207        let content_length = headers
208            .get(http::header::CONTENT_LENGTH)
209            .and_then(|value| value.to_str().ok()?.parse::<u64>().ok());
210
211        let (protocol, version) = split_and_format_protocol_version(req.version());
212        let protocol_name_kv = KeyValue::new(NETWORK_PROTOCOL_NAME_LABEL, protocol);
213        let protocol_version_kv = KeyValue::new(NETWORK_PROTOCOL_VERSION_LABEL, version);
214
215        let scheme = req.uri().scheme_str().unwrap_or("").to_string();
216        let url_scheme_kv = KeyValue::new(URL_SCHEME_LABEL, scheme);
217
218        let method = req.method().as_str().to_owned();
219        let method_kv = KeyValue::new(HTTP_REQUEST_METHOD_LABEL, method);
220
221        #[allow(unused_mut)]
222        let mut route_kv_opt = None;
223        #[cfg(feature = "axum")]
224        if let Some(matched_path) = req.extensions().get::<MatchedPath>() {
225            route_kv_opt = Some(KeyValue::new(
226                HTTP_ROUTE_LABEL,
227                matched_path.as_str().to_owned(),
228            ));
229        };
230
231        self.state
232            .server_active_requests
233            .add(1, &[url_scheme_kv.clone(), method_kv.clone()]);
234
235        HTTPMetricsResponseFuture {
236            inner_response_future: self.inner_service.call(req),
237            layer_state: self.state.clone(),
238            metrics_state: ResponseFutureMetricsState {
239                duration_start,
240                body_size: content_length,
241
242                protocol_name_kv,
243                protocol_version_kv,
244                url_scheme_kv,
245                method_kv,
246                route_kv_opt,
247            },
248        }
249    }
250}
251
252impl<F, ResBody, E> Future for HTTPMetricsResponseFuture<F>
253where
254    F: Future<Output = result::Result<http::Response<ResBody>, E>>,
255{
256    type Output = F::Output;
257
258    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
259        let this = self.project();
260        let response = ready!(this.inner_response_future.poll(cx))?;
261        let status = response.status();
262
263        // all OTEL `http.server...` metrics use this entire label set,
264        // except `http.server.active_requests`, which only uses a subset
265        let mut label_superset = vec![
266            this.metrics_state.protocol_name_kv.clone(),
267            this.metrics_state.protocol_version_kv.clone(),
268            this.metrics_state.url_scheme_kv.clone(),
269            this.metrics_state.method_kv.clone(),
270            KeyValue::new(HTTP_RESPONSE_STATUS_CODE_LABEL, i64::from(status.as_u16())),
271        ];
272        if let Some(route_kv) = this.metrics_state.route_kv_opt.clone() {
273            label_superset.push(route_kv);
274        }
275
276        this.layer_state.server_request_duration.record(
277            this.metrics_state.duration_start.elapsed().as_secs_f64(),
278            &label_superset,
279        );
280
281        if let Some(content_length) = this.metrics_state.body_size {
282            this.layer_state
283                .server_request_body_size
284                .record(content_length, &label_superset);
285        }
286
287        this.layer_state.server_active_requests.add(
288            -1,
289            &[
290                this.metrics_state.url_scheme_kv.clone(),
291                this.metrics_state.method_kv.clone(),
292            ],
293        );
294
295        Ready(Ok(response))
296    }
297}
298
299fn split_and_format_protocol_version(http_version: http::Version) -> (String, String) {
300    let version_str = match http_version {
301        http::Version::HTTP_09 => "0.9",
302        http::Version::HTTP_10 => "1.0",
303        http::Version::HTTP_11 => "1.1",
304        http::Version::HTTP_2 => "2.0",
305        http::Version::HTTP_3 => "3.0",
306        _ => "",
307    };
308    (String::from("http"), String::from(version_str))
309}