1use std::env;
2use std::fmt::{Debug, Formatter};
3use std::str::FromStr;
4use std::time::Duration;
5
6use http::{HeaderMap, HeaderName, HeaderValue};
7use opentelemetry::otel_debug;
8use tonic::codec::CompressionEncoding;
9use tonic::metadata::{KeyAndValueRef, MetadataMap};
10use tonic::service::Interceptor;
11use tonic::transport::Channel;
12#[cfg(feature = "tls")]
13use tonic::transport::ClientTlsConfig;
14
15use super::{default_headers, parse_header_string, OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT};
16use crate::exporter::Compression;
17use crate::{
18 ExportConfig, OTEL_EXPORTER_OTLP_COMPRESSION, OTEL_EXPORTER_OTLP_ENDPOINT,
19 OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TIMEOUT,
20};
21
22#[cfg(feature = "logs")]
23pub(crate) mod logs;
24
25#[cfg(feature = "metrics")]
26mod metrics;
27
28#[cfg(feature = "trace")]
29pub(crate) mod trace;
30
31#[derive(Debug, Default)]
35#[non_exhaustive]
36pub struct TonicConfig {
37 pub(crate) metadata: Option<MetadataMap>,
39 #[cfg(feature = "tls")]
41 pub(crate) tls_config: Option<ClientTlsConfig>,
42 pub(crate) compression: Option<Compression>,
44 pub(crate) channel: Option<tonic::transport::Channel>,
45 pub(crate) interceptor: Option<BoxInterceptor>,
46}
47
48impl TryFrom<Compression> for tonic::codec::CompressionEncoding {
49 type Error = crate::Error;
50
51 fn try_from(value: Compression) -> Result<Self, Self::Error> {
52 match value {
53 #[cfg(feature = "gzip-tonic")]
54 Compression::Gzip => Ok(tonic::codec::CompressionEncoding::Gzip),
55 #[cfg(not(feature = "gzip-tonic"))]
56 Compression::Gzip => Err(crate::Error::FeatureRequiredForCompressionAlgorithm(
57 "gzip-tonic",
58 Compression::Gzip,
59 )),
60 #[cfg(feature = "zstd-tonic")]
61 Compression::Zstd => Ok(tonic::codec::CompressionEncoding::Zstd),
62 #[cfg(not(feature = "zstd-tonic"))]
63 Compression::Zstd => Err(crate::Error::FeatureRequiredForCompressionAlgorithm(
64 "zstd-tonic",
65 Compression::Zstd,
66 )),
67 }
68 }
69}
70
71#[derive(Debug)]
106pub struct TonicExporterBuilder {
107 pub(crate) tonic_config: TonicConfig,
108 pub(crate) exporter_config: ExportConfig,
109}
110
111pub(crate) struct BoxInterceptor(Box<dyn Interceptor + Send + Sync>);
112impl tonic::service::Interceptor for BoxInterceptor {
113 fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
114 self.0.call(request)
115 }
116}
117
118impl Debug for BoxInterceptor {
119 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
120 write!(f, "BoxInterceptor(..)")
121 }
122}
123
124impl Default for TonicExporterBuilder {
125 fn default() -> Self {
126 TonicExporterBuilder {
127 tonic_config: TonicConfig {
128 metadata: Some(MetadataMap::from_headers(
129 (&default_headers())
130 .try_into()
131 .expect("Invalid tonic headers"),
132 )),
133 #[cfg(feature = "tls")]
134 tls_config: None,
135 compression: None,
136 channel: Option::default(),
137 interceptor: Option::default(),
138 },
139 exporter_config: ExportConfig {
140 protocol: crate::Protocol::Grpc,
141 ..Default::default()
142 },
143 }
144 }
145}
146
147impl TonicExporterBuilder {
148 fn build_channel(
149 self,
150 signal_endpoint_var: &str,
151 signal_timeout_var: &str,
152 signal_compression_var: &str,
153 signal_headers_var: &str,
154 ) -> Result<(Channel, BoxInterceptor, Option<CompressionEncoding>), crate::Error> {
155 let compression = self.resolve_compression(signal_compression_var)?;
156
157 let (headers_from_env, headers_for_logging) = parse_headers_from_env(signal_headers_var);
158 let metadata = merge_metadata_with_headers_from_env(
159 self.tonic_config.metadata.unwrap_or_default(),
160 headers_from_env,
161 );
162
163 let add_metadata = move |mut req: tonic::Request<()>| {
164 for key_and_value in metadata.iter() {
165 match key_and_value {
166 KeyAndValueRef::Ascii(key, value) => {
167 req.metadata_mut().append(key, value.to_owned())
168 }
169 KeyAndValueRef::Binary(key, value) => {
170 req.metadata_mut().append_bin(key, value.to_owned())
171 }
172 };
173 }
174
175 Ok(req)
176 };
177
178 let interceptor = match self.tonic_config.interceptor {
179 Some(mut interceptor) => {
180 BoxInterceptor(Box::new(move |req| interceptor.call(add_metadata(req)?)))
181 }
182 None => BoxInterceptor(Box::new(add_metadata)),
183 };
184
185 if let Some(channel) = self.tonic_config.channel {
187 return Ok((channel, interceptor, compression));
188 }
189
190 let config = self.exporter_config;
191
192 let endpoint = Self::resolve_endpoint(signal_endpoint_var, config.endpoint);
193
194 let endpoint_clone = endpoint.clone();
196
197 let endpoint = Channel::from_shared(endpoint).map_err(crate::Error::from)?;
198 let timeout = match env::var(signal_timeout_var)
199 .ok()
200 .or(env::var(OTEL_EXPORTER_OTLP_TIMEOUT).ok())
201 {
202 Some(val) => match val.parse() {
203 Ok(seconds) => Duration::from_secs(seconds),
204 Err(_) => config.timeout,
205 },
206 None => config.timeout,
207 };
208
209 #[cfg(feature = "tls")]
210 let channel = match self.tonic_config.tls_config {
211 Some(tls_config) => endpoint
212 .tls_config(tls_config)
213 .map_err(crate::Error::from)?,
214 None => endpoint,
215 }
216 .timeout(timeout)
217 .connect_lazy();
218
219 #[cfg(not(feature = "tls"))]
220 let channel = endpoint.timeout(timeout).connect_lazy();
221
222 otel_debug!(name: "TonicChannelBuilt", endpoint = endpoint_clone, timeout_in_millisecs = timeout.as_millis(), compression = format!("{:?}", compression), headers = format!("{:?}", headers_for_logging));
223 Ok((channel, interceptor, compression))
224 }
225
226 fn resolve_endpoint(default_endpoint_var: &str, provided_endpoint: Option<String>) -> String {
227 match env::var(default_endpoint_var)
233 .ok()
234 .or(env::var(OTEL_EXPORTER_OTLP_ENDPOINT).ok())
235 {
236 Some(val) => val,
237 None => {
238 provided_endpoint.unwrap_or(OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT.to_string())
239 }
240 }
241 }
242
243 fn resolve_compression(
244 &self,
245 env_override: &str,
246 ) -> Result<Option<CompressionEncoding>, crate::Error> {
247 if let Some(compression) = self.tonic_config.compression {
248 Ok(Some(compression.try_into()?))
249 } else if let Ok(compression) = env::var(env_override) {
250 Ok(Some(compression.parse::<Compression>()?.try_into()?))
251 } else if let Ok(compression) = env::var(OTEL_EXPORTER_OTLP_COMPRESSION) {
252 Ok(Some(compression.parse::<Compression>()?.try_into()?))
253 } else {
254 Ok(None)
255 }
256 }
257
258 #[cfg(feature = "logs")]
260 pub(crate) fn build_log_exporter(
261 self,
262 ) -> Result<crate::logs::LogExporter, opentelemetry_sdk::logs::LogError> {
263 use crate::exporter::tonic::logs::TonicLogsClient;
264
265 otel_debug!(name: "LogsTonicChannelBuilding");
266
267 let (channel, interceptor, compression) = self.build_channel(
268 crate::logs::OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
269 crate::logs::OTEL_EXPORTER_OTLP_LOGS_TIMEOUT,
270 crate::logs::OTEL_EXPORTER_OTLP_LOGS_COMPRESSION,
271 crate::logs::OTEL_EXPORTER_OTLP_LOGS_HEADERS,
272 )?;
273
274 let client = TonicLogsClient::new(channel, interceptor, compression);
275
276 Ok(crate::logs::LogExporter::from_tonic(client))
277 }
278
279 #[cfg(feature = "metrics")]
281 pub(crate) fn build_metrics_exporter(
282 self,
283 temporality: opentelemetry_sdk::metrics::Temporality,
284 ) -> opentelemetry_sdk::metrics::MetricResult<crate::MetricExporter> {
285 use crate::MetricExporter;
286 use metrics::TonicMetricsClient;
287
288 otel_debug!(name: "MetricsTonicChannelBuilding");
289
290 let (channel, interceptor, compression) = self.build_channel(
291 crate::metric::OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
292 crate::metric::OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
293 crate::metric::OTEL_EXPORTER_OTLP_METRICS_COMPRESSION,
294 crate::metric::OTEL_EXPORTER_OTLP_METRICS_HEADERS,
295 )?;
296
297 let client = TonicMetricsClient::new(channel, interceptor, compression);
298
299 Ok(MetricExporter::new(client, temporality))
300 }
301
302 #[cfg(feature = "trace")]
304 pub(crate) fn build_span_exporter(
305 self,
306 ) -> Result<crate::SpanExporter, opentelemetry::trace::TraceError> {
307 use crate::exporter::tonic::trace::TonicTracesClient;
308
309 otel_debug!(name: "TracesTonicChannelBuilding");
310
311 let (channel, interceptor, compression) = self.build_channel(
312 crate::span::OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
313 crate::span::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
314 crate::span::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION,
315 crate::span::OTEL_EXPORTER_OTLP_TRACES_HEADERS,
316 )?;
317
318 let client = TonicTracesClient::new(channel, interceptor, compression);
319
320 Ok(crate::SpanExporter::new(client))
321 }
322}
323
324fn merge_metadata_with_headers_from_env(
325 metadata: MetadataMap,
326 headers_from_env: HeaderMap,
327) -> MetadataMap {
328 if headers_from_env.is_empty() {
329 metadata
330 } else {
331 let mut existing_headers: HeaderMap = metadata.into_headers();
332 existing_headers.extend(headers_from_env);
333
334 MetadataMap::from_headers(existing_headers)
335 }
336}
337
338fn parse_headers_from_env(signal_headers_var: &str) -> (HeaderMap, Vec<(String, String)>) {
339 let mut headers = Vec::new();
340
341 (
342 env::var(signal_headers_var)
343 .or_else(|_| env::var(OTEL_EXPORTER_OTLP_HEADERS))
344 .map(|input| {
345 parse_header_string(&input)
346 .filter_map(|(key, value)| {
347 headers.push((key.to_owned(), value.clone()));
348 Some((
349 HeaderName::from_str(key).ok()?,
350 HeaderValue::from_str(&value).ok()?,
351 ))
352 })
353 .collect::<HeaderMap>()
354 })
355 .unwrap_or_default(),
356 headers,
357 )
358}
359
360pub trait HasTonicConfig {
362 fn tonic_config(&mut self) -> &mut TonicConfig;
364}
365
366impl HasTonicConfig for TonicExporterBuilder {
368 fn tonic_config(&mut self) -> &mut TonicConfig {
369 &mut self.tonic_config
370 }
371}
372
373pub trait WithTonicConfig {
388 #[cfg(feature = "tls")]
390 fn with_tls_config(self, tls_config: ClientTlsConfig) -> Self;
391
392 fn with_metadata(self, metadata: MetadataMap) -> Self;
394
395 fn with_compression(self, compression: Compression) -> Self;
397
398 fn with_channel(self, channel: tonic::transport::Channel) -> Self;
405
406 fn with_interceptor<I>(self, interceptor: I) -> Self
410 where
411 I: tonic::service::Interceptor + Clone + Send + Sync + 'static;
412}
413
414impl<B: HasTonicConfig> WithTonicConfig for B {
415 #[cfg(feature = "tls")]
416 fn with_tls_config(mut self, tls_config: ClientTlsConfig) -> Self {
417 self.tonic_config().tls_config = Some(tls_config);
418 self
419 }
420
421 fn with_metadata(mut self, metadata: MetadataMap) -> Self {
423 let mut existing_headers = self
425 .tonic_config()
426 .metadata
427 .clone()
428 .unwrap_or_default()
429 .into_headers();
430 existing_headers.extend(metadata.into_headers());
431
432 self.tonic_config().metadata = Some(MetadataMap::from_headers(existing_headers));
433 self
434 }
435
436 fn with_compression(mut self, compression: Compression) -> Self {
437 self.tonic_config().compression = Some(compression);
438 self
439 }
440
441 fn with_channel(mut self, channel: tonic::transport::Channel) -> Self {
442 self.tonic_config().channel = Some(channel);
443 self
444 }
445
446 fn with_interceptor<I>(mut self, interceptor: I) -> Self
447 where
448 I: tonic::service::Interceptor + Clone + Send + Sync + 'static,
449 {
450 self.tonic_config().interceptor = Some(BoxInterceptor(Box::new(interceptor)));
451 self
452 }
453}
454
455#[cfg(test)]
456mod tests {
457 use crate::exporter::tests::run_env_test;
458 use crate::exporter::tonic::WithTonicConfig;
459 #[cfg(feature = "grpc-tonic")]
460 use crate::exporter::Compression;
461 use crate::{TonicExporterBuilder, WithExportConfig, OTEL_EXPORTER_OTLP_TRACES_ENDPOINT};
462 use crate::{OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TRACES_HEADERS};
463 use http::{HeaderMap, HeaderName, HeaderValue};
464 use tonic::metadata::{MetadataMap, MetadataValue};
465
466 #[test]
467 fn test_with_metadata() {
468 let mut metadata = MetadataMap::new();
470 metadata.insert("foo", "bar".parse().unwrap());
471 let builder = TonicExporterBuilder::default().with_metadata(metadata);
472 let result = builder.tonic_config.metadata.unwrap();
473 let foo = result
474 .get("foo")
475 .expect("there to always be an entry for foo");
476 assert_eq!(foo, &MetadataValue::try_from("bar").unwrap());
477 assert!(result.get("User-Agent").is_some());
478
479 let mut metadata = MetadataMap::new();
481 metadata.insert("user-agent", "baz".parse().unwrap());
482 let builder = TonicExporterBuilder::default().with_metadata(metadata);
483 let result = builder.tonic_config.metadata.unwrap();
484 assert_eq!(
485 result.get("User-Agent").unwrap(),
486 &MetadataValue::try_from("baz").unwrap()
487 );
488 assert_eq!(
489 result.len(),
490 TonicExporterBuilder::default()
491 .tonic_config
492 .metadata
493 .unwrap()
494 .len()
495 );
496 }
497
498 #[test]
499 #[cfg(feature = "gzip-tonic")]
500 fn test_with_gzip_compression() {
501 let mut metadata = MetadataMap::new();
503 metadata.insert("foo", "bar".parse().unwrap());
504 let builder = TonicExporterBuilder::default().with_compression(Compression::Gzip);
505 assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Gzip);
506 }
507
508 #[test]
509 #[cfg(feature = "zstd-tonic")]
510 fn test_with_zstd_compression() {
511 let builder = TonicExporterBuilder::default().with_compression(Compression::Zstd);
512 assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Zstd);
513 }
514
515 #[test]
516 fn test_convert_compression() {
517 #[cfg(feature = "gzip-tonic")]
518 assert!(tonic::codec::CompressionEncoding::try_from(Compression::Gzip).is_ok());
519 #[cfg(not(feature = "gzip-tonic"))]
520 assert!(tonic::codec::CompressionEncoding::try_from(Compression::Gzip).is_err());
521 #[cfg(feature = "zstd-tonic")]
522 assert!(tonic::codec::CompressionEncoding::try_from(Compression::Zstd).is_ok());
523 #[cfg(not(feature = "zstd-tonic"))]
524 assert!(tonic::codec::CompressionEncoding::try_from(Compression::Zstd).is_err());
525 }
526
527 #[test]
528 fn test_parse_headers_from_env() {
529 run_env_test(
530 vec![
531 (OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2"),
532 (OTEL_EXPORTER_OTLP_HEADERS, "k3=v3"),
533 ],
534 || {
535 assert_eq!(
536 super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS).0,
537 HeaderMap::from_iter([
538 (
539 HeaderName::from_static("k1"),
540 HeaderValue::from_static("v1")
541 ),
542 (
543 HeaderName::from_static("k2"),
544 HeaderValue::from_static("v2")
545 ),
546 ])
547 );
548
549 assert_eq!(
550 super::parse_headers_from_env("EMPTY_ENV").0,
551 HeaderMap::from_iter([(
552 HeaderName::from_static("k3"),
553 HeaderValue::from_static("v3")
554 )])
555 );
556 },
557 )
558 }
559
560 #[test]
561 fn test_merge_metadata_with_headers_from_env() {
562 run_env_test(
563 vec![(OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2")],
564 || {
565 let headers_from_env =
566 super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS);
567
568 let mut metadata = MetadataMap::new();
569 metadata.insert("foo", "bar".parse().unwrap());
570 metadata.insert("k1", "v0".parse().unwrap());
571
572 let result =
573 super::merge_metadata_with_headers_from_env(metadata, headers_from_env.0);
574
575 assert_eq!(
576 result.get("foo").unwrap(),
577 MetadataValue::from_static("bar")
578 );
579 assert_eq!(result.get("k1").unwrap(), MetadataValue::from_static("v1"));
580 assert_eq!(result.get("k2").unwrap(), MetadataValue::from_static("v2"));
581 },
582 );
583 }
584
585 #[test]
586 fn test_tonic_exporter_endpoint() {
587 run_env_test(vec![], || {
589 let exporter = TonicExporterBuilder::default();
590
591 let url = TonicExporterBuilder::resolve_endpoint(
592 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
593 exporter.exporter_config.endpoint,
594 );
595
596 assert_eq!(url, "http://localhost:4317");
597 });
598
599 run_env_test(vec![], || {
601 let exporter = TonicExporterBuilder::default().with_endpoint("http://localhost:1234");
602
603 let url = TonicExporterBuilder::resolve_endpoint(
604 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
605 exporter.exporter_config.endpoint,
606 );
607
608 assert_eq!(url, "http://localhost:1234");
609 });
610 }
611}