opentelemetry_otlp/exporter/tonic/
logs.rs

1use core::fmt;
2use opentelemetry::otel_debug;
3use opentelemetry_proto::tonic::collector::logs::v1::{
4    logs_service_client::LogsServiceClient, ExportLogsServiceRequest,
5};
6use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
7use opentelemetry_sdk::logs::{LogBatch, LogExporter};
8use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
9
10use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
11
12use super::BoxInterceptor;
13use tokio::sync::Mutex;
14
15pub(crate) struct TonicLogsClient {
16    inner: Option<ClientInner>,
17    #[allow(dead_code)]
18    // <allow dead> would be removed once we support set_resource for metrics.
19    resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
20}
21
22struct ClientInner {
23    client: LogsServiceClient<Channel>,
24    interceptor: Mutex<BoxInterceptor>,
25}
26
27impl fmt::Debug for TonicLogsClient {
28    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29        f.write_str("TonicLogsClient")
30    }
31}
32
33impl TonicLogsClient {
34    pub(super) fn new(
35        channel: Channel,
36        interceptor: BoxInterceptor,
37        compression: Option<CompressionEncoding>,
38    ) -> Self {
39        let mut client = LogsServiceClient::new(channel);
40        if let Some(compression) = compression {
41            client = client
42                .send_compressed(compression)
43                .accept_compressed(compression);
44        }
45
46        otel_debug!(name: "TonicsLogsClientBuilt");
47
48        TonicLogsClient {
49            inner: Some(ClientInner {
50                client,
51                interceptor: Mutex::new(interceptor),
52            }),
53            resource: Default::default(),
54        }
55    }
56}
57
58impl LogExporter for TonicLogsClient {
59    #[allow(clippy::manual_async_fn)]
60    fn export(
61        &self,
62        batch: LogBatch<'_>,
63    ) -> impl std::future::Future<Output = OTelSdkResult> + Send {
64        async move {
65            let (mut client, metadata, extensions) = match &self.inner {
66                Some(inner) => {
67                    let (m, e, _) = inner
68                        .interceptor
69                        .lock()
70                        .await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
71                        .call(Request::new(()))
72                        .map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))?
73                        .into_parts();
74                    (inner.client.clone(), m, e)
75                }
76                None => return Err(OTelSdkError::AlreadyShutdown),
77            };
78
79            let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);
80
81            otel_debug!(name: "TonicsLogsClient.CallingExport");
82
83            client
84                .export(Request::from_parts(
85                    metadata,
86                    extensions,
87                    ExportLogsServiceRequest { resource_logs },
88                ))
89                .await
90                .map_err(|e| OTelSdkError::InternalFailure(format!("export error: {:?}", e)))?;
91            Ok(())
92        }
93    }
94
95    fn shutdown(&mut self) -> OTelSdkResult {
96        match self.inner.take() {
97            Some(_) => Ok(()), // Successfully took `inner`, indicating a successful shutdown.
98            None => Err(OTelSdkError::AlreadyShutdown), // `inner` was already `None`, meaning it's already shut down.
99        }
100    }
101
102    fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
103        self.resource = resource.into();
104    }
105}