opentelemetry_otlp/exporter/tonic/
logs.rs1use core::fmt;
2use opentelemetry::otel_debug;
3use opentelemetry_proto::tonic::collector::logs::v1::{
4 logs_service_client::LogsServiceClient, ExportLogsServiceRequest,
5};
6use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
7use opentelemetry_sdk::logs::{LogBatch, LogExporter};
8use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
9
10use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
11
12use super::BoxInterceptor;
13use tokio::sync::Mutex;
14
15pub(crate) struct TonicLogsClient {
16 inner: Option<ClientInner>,
17 #[allow(dead_code)]
18 resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
20}
21
22struct ClientInner {
23 client: LogsServiceClient<Channel>,
24 interceptor: Mutex<BoxInterceptor>,
25}
26
27impl fmt::Debug for TonicLogsClient {
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 f.write_str("TonicLogsClient")
30 }
31}
32
33impl TonicLogsClient {
34 pub(super) fn new(
35 channel: Channel,
36 interceptor: BoxInterceptor,
37 compression: Option<CompressionEncoding>,
38 ) -> Self {
39 let mut client = LogsServiceClient::new(channel);
40 if let Some(compression) = compression {
41 client = client
42 .send_compressed(compression)
43 .accept_compressed(compression);
44 }
45
46 otel_debug!(name: "TonicsLogsClientBuilt");
47
48 TonicLogsClient {
49 inner: Some(ClientInner {
50 client,
51 interceptor: Mutex::new(interceptor),
52 }),
53 resource: Default::default(),
54 }
55 }
56}
57
58impl LogExporter for TonicLogsClient {
59 #[allow(clippy::manual_async_fn)]
60 fn export(
61 &self,
62 batch: LogBatch<'_>,
63 ) -> impl std::future::Future<Output = OTelSdkResult> + Send {
64 async move {
65 let (mut client, metadata, extensions) = match &self.inner {
66 Some(inner) => {
67 let (m, e, _) = inner
68 .interceptor
69 .lock()
70 .await .call(Request::new(()))
72 .map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))?
73 .into_parts();
74 (inner.client.clone(), m, e)
75 }
76 None => return Err(OTelSdkError::AlreadyShutdown),
77 };
78
79 let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);
80
81 otel_debug!(name: "TonicsLogsClient.CallingExport");
82
83 client
84 .export(Request::from_parts(
85 metadata,
86 extensions,
87 ExportLogsServiceRequest { resource_logs },
88 ))
89 .await
90 .map_err(|e| OTelSdkError::InternalFailure(format!("export error: {:?}", e)))?;
91 Ok(())
92 }
93 }
94
95 fn shutdown(&mut self) -> OTelSdkResult {
96 match self.inner.take() {
97 Some(_) => Ok(()), None => Err(OTelSdkError::AlreadyShutdown), }
100 }
101
102 fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
103 self.resource = resource.into();
104 }
105}