1#![doc = include_str!("../README.md")]
2use std::borrow::Cow;
8use std::future::Future;
9use std::pin::Pin;
10use std::string::String;
11use std::sync::Arc;
12use std::task::Poll::Ready;
13use std::task::{Context, Poll};
14use std::time::Instant;
15use std::{fmt, result};
16
17#[cfg(feature = "axum")]
18use axum::extract::MatchedPath;
19use futures_util::ready;
20use opentelemetry::metrics::{Histogram, Meter, UpDownCounter};
21use opentelemetry::{global, KeyValue};
22use pin_project_lite::pin_project;
23use tower_layer::Layer;
24use tower_service::Service;
25
26const HTTP_SERVER_DURATION_METRIC: &str = "http.server.request.duration";
27const HTTP_SERVER_DURATION_UNIT: &str = "s";
28
29const HTTP_SERVER_DURATION_BOUNDARIES: [f64; 14] = [
30 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0,
31];
32const HTTP_SERVER_ACTIVE_REQUESTS_METRIC: &str = "http.server.active_requests";
33const HTTP_SERVER_ACTIVE_REQUESTS_UNIT: &str = "{request}";
34
35const HTTP_SERVER_REQUEST_BODY_SIZE_METRIC: &str = "http.server.request.body.size";
36const HTTP_SERVER_REQUEST_BODY_SIZE_UNIT: &str = "By";
37
38const NETWORK_PROTOCOL_NAME_LABEL: &str = "network.protocol.name";
39const NETWORK_PROTOCOL_VERSION_LABEL: &str = "network.protocol.version";
40const URL_SCHEME_LABEL: &str = "url.scheme";
41
42const HTTP_REQUEST_METHOD_LABEL: &str = "http.request.method";
43#[allow(dead_code)] const HTTP_ROUTE_LABEL: &str = "http.route";
45const HTTP_RESPONSE_STATUS_CODE_LABEL: &str = "http.response.status_code";
46
47struct HTTPMetricsLayerState {
53 pub server_request_duration: Histogram<f64>,
54 pub server_active_requests: UpDownCounter<i64>,
55 pub server_request_body_size: Histogram<u64>,
56}
57
58#[derive(Clone)]
59pub struct HTTPMetricsService<S> {
61 pub(crate) state: Arc<HTTPMetricsLayerState>,
62 inner_service: S,
63}
64
65#[derive(Clone)]
66pub struct HTTPMetricsLayer {
68 state: Arc<HTTPMetricsLayerState>,
69}
70
71pub struct HTTPMetricsLayerBuilder {
72 meter: Option<Meter>,
73}
74
75pub struct Error {
77 #[allow(dead_code)]
78 inner: ErrorKind,
79}
80
81pub type Result<T> = result::Result<T, Error>;
83
84enum ErrorKind {
85 #[allow(dead_code)]
86 Other(String),
88 #[allow(dead_code)]
89 Config(String),
91}
92
93impl fmt::Debug for Error {
94 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
95 f.debug_tuple("tower_otel_http_metrics::Error").finish()
96 }
97}
98
99impl Default for HTTPMetricsLayerBuilder {
100 fn default() -> Self {
101 let meter = global::meter("");
102 HTTPMetricsLayerBuilder { meter: Some(meter) }
103 }
104}
105
106impl HTTPMetricsLayerBuilder {
107 pub fn new() -> Self {
108 HTTPMetricsLayerBuilder { meter: None }
109 }
110
111 pub fn build(self) -> Result<HTTPMetricsLayer> {
112 match self.meter {
113 Some(meter) => Ok(HTTPMetricsLayer {
114 state: Arc::from(HTTPMetricsLayerBuilder::make_state(meter)),
115 }),
116 None => Err(Error {
117 inner: ErrorKind::Config(String::from("no meter provided")),
118 }),
119 }
120 }
121
122 pub fn with_meter(self, meter: Meter) -> Self {
123 HTTPMetricsLayerBuilder { meter: Some(meter) }
124 }
125
126 fn make_state(meter: Meter) -> HTTPMetricsLayerState {
127 HTTPMetricsLayerState {
128 server_request_duration: meter
129 .f64_histogram(Cow::from(HTTP_SERVER_DURATION_METRIC))
130 .with_description("Duration of HTTP server requests.")
131 .with_unit(Cow::from(HTTP_SERVER_DURATION_UNIT))
132 .with_boundaries(HTTP_SERVER_DURATION_BOUNDARIES.to_vec())
133 .build(),
134 server_active_requests: meter
135 .i64_up_down_counter(Cow::from(HTTP_SERVER_ACTIVE_REQUESTS_METRIC))
136 .with_description("Number of active HTTP server requests.")
137 .with_unit(Cow::from(HTTP_SERVER_ACTIVE_REQUESTS_UNIT))
138 .build(),
139 server_request_body_size: meter
140 .u64_histogram(HTTP_SERVER_REQUEST_BODY_SIZE_METRIC)
141 .with_description("Size of HTTP server request bodies.")
142 .with_unit(HTTP_SERVER_REQUEST_BODY_SIZE_UNIT)
143 .build(),
144 }
145 }
146}
147
148impl<S> Layer<S> for HTTPMetricsLayer {
149 type Service = HTTPMetricsService<S>;
150
151 fn layer(&self, service: S) -> Self::Service {
152 HTTPMetricsService {
153 state: self.state.clone(),
154 inner_service: service,
155 }
156 }
157}
158
159#[derive(Clone)]
166struct ResponseFutureMetricsState {
167 duration_start: Instant,
170 body_size: Option<u64>,
172
173 protocol_name_kv: KeyValue,
175 protocol_version_kv: KeyValue,
176 url_scheme_kv: KeyValue,
177 method_kv: KeyValue,
178 route_kv_opt: Option<KeyValue>,
179}
180
181pin_project! {
182 pub struct HTTPMetricsResponseFuture<F> {
184 #[pin]
185 inner_response_future: F,
186 layer_state: Arc<HTTPMetricsLayerState>,
187 metrics_state: ResponseFutureMetricsState,
188 }
189}
190
191impl<S, ReqBody, ResBody> Service<http::Request<ReqBody>> for HTTPMetricsService<S>
192where
193 S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>>,
194{
195 type Response = S::Response;
196 type Error = S::Error;
197 type Future = HTTPMetricsResponseFuture<S::Future>;
198
199 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<result::Result<(), Self::Error>> {
200 self.inner_service.poll_ready(cx)
201 }
202
203 fn call(&mut self, req: http::Request<ReqBody>) -> Self::Future {
204 let duration_start = Instant::now();
205
206 let headers = req.headers();
207 let content_length = headers
208 .get(http::header::CONTENT_LENGTH)
209 .and_then(|value| value.to_str().ok()?.parse::<u64>().ok());
210
211 let (protocol, version) = split_and_format_protocol_version(req.version());
212 let protocol_name_kv = KeyValue::new(NETWORK_PROTOCOL_NAME_LABEL, protocol);
213 let protocol_version_kv = KeyValue::new(NETWORK_PROTOCOL_VERSION_LABEL, version);
214
215 let scheme = req.uri().scheme_str().unwrap_or("").to_string();
216 let url_scheme_kv = KeyValue::new(URL_SCHEME_LABEL, scheme);
217
218 let method = req.method().as_str().to_owned();
219 let method_kv = KeyValue::new(HTTP_REQUEST_METHOD_LABEL, method);
220
221 #[allow(unused_mut)]
222 let mut route_kv_opt = None;
223 #[cfg(feature = "axum")]
224 if let Some(matched_path) = req.extensions().get::<MatchedPath>() {
225 route_kv_opt = Some(KeyValue::new(
226 HTTP_ROUTE_LABEL,
227 matched_path.as_str().to_owned(),
228 ));
229 };
230
231 self.state
232 .server_active_requests
233 .add(1, &[url_scheme_kv.clone(), method_kv.clone()]);
234
235 HTTPMetricsResponseFuture {
236 inner_response_future: self.inner_service.call(req),
237 layer_state: self.state.clone(),
238 metrics_state: ResponseFutureMetricsState {
239 duration_start,
240 body_size: content_length,
241
242 protocol_name_kv,
243 protocol_version_kv,
244 url_scheme_kv,
245 method_kv,
246 route_kv_opt,
247 },
248 }
249 }
250}
251
252impl<F, ResBody, E> Future for HTTPMetricsResponseFuture<F>
253where
254 F: Future<Output = result::Result<http::Response<ResBody>, E>>,
255{
256 type Output = F::Output;
257
258 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
259 let this = self.project();
260 let response = ready!(this.inner_response_future.poll(cx))?;
261 let status = response.status();
262
263 let mut label_superset = vec![
266 this.metrics_state.protocol_name_kv.clone(),
267 this.metrics_state.protocol_version_kv.clone(),
268 this.metrics_state.url_scheme_kv.clone(),
269 this.metrics_state.method_kv.clone(),
270 KeyValue::new(HTTP_RESPONSE_STATUS_CODE_LABEL, i64::from(status.as_u16())),
271 ];
272 if let Some(route_kv) = this.metrics_state.route_kv_opt.clone() {
273 label_superset.push(route_kv);
274 }
275
276 this.layer_state.server_request_duration.record(
277 this.metrics_state.duration_start.elapsed().as_secs_f64(),
278 &label_superset,
279 );
280
281 if let Some(content_length) = this.metrics_state.body_size {
282 this.layer_state
283 .server_request_body_size
284 .record(content_length, &label_superset);
285 }
286
287 this.layer_state.server_active_requests.add(
288 -1,
289 &[
290 this.metrics_state.url_scheme_kv.clone(),
291 this.metrics_state.method_kv.clone(),
292 ],
293 );
294
295 Ready(Ok(response))
296 }
297}
298
299fn split_and_format_protocol_version(http_version: http::Version) -> (String, String) {
300 let version_str = match http_version {
301 http::Version::HTTP_09 => "0.9",
302 http::Version::HTTP_10 => "1.0",
303 http::Version::HTTP_11 => "1.1",
304 http::Version::HTTP_2 => "2.0",
305 http::Version::HTTP_3 => "3.0",
306 _ => "",
307 };
308 (String::from("http"), String::from(version_str))
309}