opentelemetry_otlp/exporter/tonic/
metrics.rs1use core::fmt;
2use std::sync::Mutex;
3
4use async_trait::async_trait;
5use opentelemetry::otel_debug;
6use opentelemetry_proto::tonic::collector::metrics::v1::{
7 metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest,
8};
9use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
10use opentelemetry_sdk::metrics::data::ResourceMetrics;
11use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
12
13use super::BoxInterceptor;
14use crate::metric::MetricsClient;
15
16pub(crate) struct TonicMetricsClient {
17 inner: Mutex<Option<ClientInner>>,
18}
19
20struct ClientInner {
21 client: MetricsServiceClient<Channel>,
22 interceptor: BoxInterceptor,
23}
24
25impl fmt::Debug for TonicMetricsClient {
26 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27 f.write_str("TonicMetricsClient")
28 }
29}
30
31impl TonicMetricsClient {
32 pub(super) fn new(
33 channel: Channel,
34 interceptor: BoxInterceptor,
35 compression: Option<CompressionEncoding>,
36 ) -> Self {
37 let mut client = MetricsServiceClient::new(channel);
38 if let Some(compression) = compression {
39 client = client
40 .send_compressed(compression)
41 .accept_compressed(compression);
42 }
43
44 otel_debug!(name: "TonicsMetricsClientBuilt");
45
46 TonicMetricsClient {
47 inner: Mutex::new(Some(ClientInner {
48 client,
49 interceptor,
50 })),
51 }
52 }
53}
54
55#[async_trait]
56impl MetricsClient for TonicMetricsClient {
57 async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
58 let (mut client, metadata, extensions) = self
59 .inner
60 .lock()
61 .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}")))
62 .and_then(|mut inner| match &mut *inner {
63 Some(inner) => {
64 let (m, e, _) = inner
65 .interceptor
66 .call(Request::new(()))
67 .map_err(|e| {
68 OTelSdkError::InternalFailure(format!(
69 "unexpected status while exporting {e:?}"
70 ))
71 })?
72 .into_parts();
73 Ok((inner.client.clone(), m, e))
74 }
75 None => Err(OTelSdkError::InternalFailure(
76 "exporter is already shut down".into(),
77 )),
78 })?;
79
80 otel_debug!(name: "TonicsMetricsClient.CallingExport");
81
82 client
83 .export(Request::from_parts(
84 metadata,
85 extensions,
86 ExportMetricsServiceRequest::from(&*metrics),
87 ))
88 .await
89 .map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
90
91 Ok(())
92 }
93
94 fn shutdown(&self) -> OTelSdkResult {
95 self.inner
96 .lock()
97 .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {}", e)))?
98 .take();
99
100 Ok(())
101 }
102}