opentelemetry_otlp/exporter/tonic/
trace.rs1use core::fmt;
2
3use futures_core::future::BoxFuture;
4use opentelemetry::otel_debug;
5use opentelemetry_proto::tonic::collector::trace::v1::{
6 trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
7};
8use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
9use opentelemetry_sdk::error::OTelSdkError;
10use opentelemetry_sdk::{
11 error::OTelSdkResult,
12 trace::{SpanData, SpanExporter},
13};
14use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
15
16use super::BoxInterceptor;
17
18pub(crate) struct TonicTracesClient {
19 inner: Option<ClientInner>,
20 #[allow(dead_code)]
21 resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
23}
24
25struct ClientInner {
26 client: TraceServiceClient<Channel>,
27 interceptor: BoxInterceptor,
28}
29
30impl fmt::Debug for TonicTracesClient {
31 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32 f.write_str("TonicTracesClient")
33 }
34}
35
36impl TonicTracesClient {
37 pub(super) fn new(
38 channel: Channel,
39 interceptor: BoxInterceptor,
40 compression: Option<CompressionEncoding>,
41 ) -> Self {
42 let mut client = TraceServiceClient::new(channel);
43 if let Some(compression) = compression {
44 client = client
45 .send_compressed(compression)
46 .accept_compressed(compression);
47 }
48
49 otel_debug!(name: "TonicsTracesClientBuilt");
50
51 TonicTracesClient {
52 inner: Some(ClientInner {
53 client,
54 interceptor,
55 }),
56 resource: Default::default(),
57 }
58 }
59}
60
61impl SpanExporter for TonicTracesClient {
62 fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult> {
63 let (mut client, metadata, extensions) = match &mut self.inner {
64 Some(inner) => {
65 let (m, e, _) = match inner.interceptor.call(Request::new(())) {
66 Ok(res) => res.into_parts(),
67 Err(e) => {
68 return Box::pin(std::future::ready(Err(OTelSdkError::InternalFailure(
69 e.to_string(),
70 ))))
71 }
72 };
73 (inner.client.clone(), m, e)
74 }
75 None => {
76 return Box::pin(std::future::ready(Err(OTelSdkError::AlreadyShutdown)));
77 }
78 };
79
80 let resource_spans = group_spans_by_resource_and_scope(batch, &self.resource);
81
82 otel_debug!(name: "TonicsTracesClient.CallingExport");
83
84 Box::pin(async move {
85 client
86 .export(Request::from_parts(
87 metadata,
88 extensions,
89 ExportTraceServiceRequest { resource_spans },
90 ))
91 .await
92 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
93 Ok(())
94 })
95 }
96
97 fn shutdown(&mut self) -> OTelSdkResult {
98 match self.inner.take() {
99 Some(_) => Ok(()), None => Err(OTelSdkError::AlreadyShutdown), }
102 }
103
104 fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
105 self.resource = resource.into();
106 }
107}