opentelemetry_otlp/exporter/tonic/
metrics.rs

1use core::fmt;
2use std::sync::Mutex;
3
4use async_trait::async_trait;
5use opentelemetry::otel_debug;
6use opentelemetry_proto::tonic::collector::metrics::v1::{
7    metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest,
8};
9use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
10use opentelemetry_sdk::metrics::data::ResourceMetrics;
11use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
12
13use super::BoxInterceptor;
14use crate::metric::MetricsClient;
15
16pub(crate) struct TonicMetricsClient {
17    inner: Mutex<Option<ClientInner>>,
18}
19
20struct ClientInner {
21    client: MetricsServiceClient<Channel>,
22    interceptor: BoxInterceptor,
23}
24
25impl fmt::Debug for TonicMetricsClient {
26    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27        f.write_str("TonicMetricsClient")
28    }
29}
30
31impl TonicMetricsClient {
32    pub(super) fn new(
33        channel: Channel,
34        interceptor: BoxInterceptor,
35        compression: Option<CompressionEncoding>,
36    ) -> Self {
37        let mut client = MetricsServiceClient::new(channel);
38        if let Some(compression) = compression {
39            client = client
40                .send_compressed(compression)
41                .accept_compressed(compression);
42        }
43
44        otel_debug!(name: "TonicsMetricsClientBuilt");
45
46        TonicMetricsClient {
47            inner: Mutex::new(Some(ClientInner {
48                client,
49                interceptor,
50            })),
51        }
52    }
53}
54
55#[async_trait]
56impl MetricsClient for TonicMetricsClient {
57    async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
58        let (mut client, metadata, extensions) = self
59            .inner
60            .lock()
61            .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}")))
62            .and_then(|mut inner| match &mut *inner {
63                Some(inner) => {
64                    let (m, e, _) = inner
65                        .interceptor
66                        .call(Request::new(()))
67                        .map_err(|e| {
68                            OTelSdkError::InternalFailure(format!(
69                                "unexpected status while exporting {e:?}"
70                            ))
71                        })?
72                        .into_parts();
73                    Ok((inner.client.clone(), m, e))
74                }
75                None => Err(OTelSdkError::InternalFailure(
76                    "exporter is already shut down".into(),
77                )),
78            })?;
79
80        otel_debug!(name: "TonicsMetricsClient.CallingExport");
81
82        client
83            .export(Request::from_parts(
84                metadata,
85                extensions,
86                ExportMetricsServiceRequest::from(&*metrics),
87            ))
88            .await
89            .map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
90
91        Ok(())
92    }
93
94    fn shutdown(&self) -> OTelSdkResult {
95        self.inner
96            .lock()
97            .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {}", e)))?
98            .take();
99
100        Ok(())
101    }
102}