opentelemetry_otlp/exporter/tonic/
mod.rs

1use std::env;
2use std::fmt::{Debug, Formatter};
3use std::str::FromStr;
4use std::time::Duration;
5
6use http::{HeaderMap, HeaderName, HeaderValue};
7use opentelemetry::otel_debug;
8use tonic::codec::CompressionEncoding;
9use tonic::metadata::{KeyAndValueRef, MetadataMap};
10use tonic::service::Interceptor;
11use tonic::transport::Channel;
12#[cfg(feature = "tls")]
13use tonic::transport::ClientTlsConfig;
14
15use super::{default_headers, parse_header_string, OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT};
16use crate::exporter::Compression;
17use crate::{
18    ExportConfig, OTEL_EXPORTER_OTLP_COMPRESSION, OTEL_EXPORTER_OTLP_ENDPOINT,
19    OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TIMEOUT,
20};
21
22#[cfg(feature = "logs")]
23pub(crate) mod logs;
24
25#[cfg(feature = "metrics")]
26mod metrics;
27
28#[cfg(feature = "trace")]
29pub(crate) mod trace;
30
31/// Configuration for [tonic]
32///
33/// [tonic]: https://github.com/hyperium/tonic
34#[derive(Debug, Default)]
35#[non_exhaustive]
36pub struct TonicConfig {
37    /// Custom metadata entries to send to the collector.
38    pub(crate) metadata: Option<MetadataMap>,
39    /// TLS settings for the collector endpoint.
40    #[cfg(feature = "tls")]
41    pub(crate) tls_config: Option<ClientTlsConfig>,
42    /// The compression algorithm to use when communicating with the collector.
43    pub(crate) compression: Option<Compression>,
44    pub(crate) channel: Option<tonic::transport::Channel>,
45    pub(crate) interceptor: Option<BoxInterceptor>,
46}
47
48impl TryFrom<Compression> for tonic::codec::CompressionEncoding {
49    type Error = crate::Error;
50
51    fn try_from(value: Compression) -> Result<Self, Self::Error> {
52        match value {
53            #[cfg(feature = "gzip-tonic")]
54            Compression::Gzip => Ok(tonic::codec::CompressionEncoding::Gzip),
55            #[cfg(not(feature = "gzip-tonic"))]
56            Compression::Gzip => Err(crate::Error::FeatureRequiredForCompressionAlgorithm(
57                "gzip-tonic",
58                Compression::Gzip,
59            )),
60            #[cfg(feature = "zstd-tonic")]
61            Compression::Zstd => Ok(tonic::codec::CompressionEncoding::Zstd),
62            #[cfg(not(feature = "zstd-tonic"))]
63            Compression::Zstd => Err(crate::Error::FeatureRequiredForCompressionAlgorithm(
64                "zstd-tonic",
65                Compression::Zstd,
66            )),
67        }
68    }
69}
70
71/// Configuration for the [tonic] OTLP GRPC exporter.
72///
73/// It allows you to
74/// - add additional metadata
75/// - set tls config (via the  `tls` feature)
76/// - specify custom [channel]s
77///
78/// [tonic]: <https://github.com/hyperium/tonic>
79/// [channel]: tonic::transport::Channel
80///
81/// ## Examples
82///
83/// ```no_run
84/// # #[cfg(feature="metrics")]
85/// use opentelemetry_sdk::metrics::Temporality;
86///
87/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
88/// // Create a span exporter you can use to when configuring tracer providers
89/// # #[cfg(feature="trace")]
90/// let span_exporter = opentelemetry_otlp::SpanExporter::builder().with_tonic().build()?;
91///
92/// // Create a metric exporter you can use when configuring meter providers
93/// # #[cfg(feature="metrics")]
94/// let metric_exporter = opentelemetry_otlp::MetricExporter::builder()
95///     .with_tonic()
96///     .with_temporality(Temporality::default())
97///     .build()?;
98///
99/// // Create a log exporter you can use when configuring logger providers
100/// # #[cfg(feature="logs")]
101/// let log_exporter = opentelemetry_otlp::LogExporter::builder().with_tonic().build()?;
102/// # Ok(())
103/// # }
104/// ```
105#[derive(Debug)]
106pub struct TonicExporterBuilder {
107    pub(crate) tonic_config: TonicConfig,
108    pub(crate) exporter_config: ExportConfig,
109}
110
111pub(crate) struct BoxInterceptor(Box<dyn Interceptor + Send + Sync>);
112impl tonic::service::Interceptor for BoxInterceptor {
113    fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
114        self.0.call(request)
115    }
116}
117
118impl Debug for BoxInterceptor {
119    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
120        write!(f, "BoxInterceptor(..)")
121    }
122}
123
124impl Default for TonicExporterBuilder {
125    fn default() -> Self {
126        TonicExporterBuilder {
127            tonic_config: TonicConfig {
128                metadata: Some(MetadataMap::from_headers(
129                    (&default_headers())
130                        .try_into()
131                        .expect("Invalid tonic headers"),
132                )),
133                #[cfg(feature = "tls")]
134                tls_config: None,
135                compression: None,
136                channel: Option::default(),
137                interceptor: Option::default(),
138            },
139            exporter_config: ExportConfig {
140                protocol: crate::Protocol::Grpc,
141                ..Default::default()
142            },
143        }
144    }
145}
146
147impl TonicExporterBuilder {
148    fn build_channel(
149        self,
150        signal_endpoint_var: &str,
151        signal_timeout_var: &str,
152        signal_compression_var: &str,
153        signal_headers_var: &str,
154    ) -> Result<(Channel, BoxInterceptor, Option<CompressionEncoding>), crate::Error> {
155        let compression = self.resolve_compression(signal_compression_var)?;
156
157        let (headers_from_env, headers_for_logging) = parse_headers_from_env(signal_headers_var);
158        let metadata = merge_metadata_with_headers_from_env(
159            self.tonic_config.metadata.unwrap_or_default(),
160            headers_from_env,
161        );
162
163        let add_metadata = move |mut req: tonic::Request<()>| {
164            for key_and_value in metadata.iter() {
165                match key_and_value {
166                    KeyAndValueRef::Ascii(key, value) => {
167                        req.metadata_mut().append(key, value.to_owned())
168                    }
169                    KeyAndValueRef::Binary(key, value) => {
170                        req.metadata_mut().append_bin(key, value.to_owned())
171                    }
172                };
173            }
174
175            Ok(req)
176        };
177
178        let interceptor = match self.tonic_config.interceptor {
179            Some(mut interceptor) => {
180                BoxInterceptor(Box::new(move |req| interceptor.call(add_metadata(req)?)))
181            }
182            None => BoxInterceptor(Box::new(add_metadata)),
183        };
184
185        // If a custom channel was provided, use that channel instead of creating one
186        if let Some(channel) = self.tonic_config.channel {
187            return Ok((channel, interceptor, compression));
188        }
189
190        let config = self.exporter_config;
191
192        let endpoint = Self::resolve_endpoint(signal_endpoint_var, config.endpoint);
193
194        // Used for logging the endpoint
195        let endpoint_clone = endpoint.clone();
196
197        let endpoint = Channel::from_shared(endpoint).map_err(crate::Error::from)?;
198        let timeout = match env::var(signal_timeout_var)
199            .ok()
200            .or(env::var(OTEL_EXPORTER_OTLP_TIMEOUT).ok())
201        {
202            Some(val) => match val.parse() {
203                Ok(seconds) => Duration::from_secs(seconds),
204                Err(_) => config.timeout,
205            },
206            None => config.timeout,
207        };
208
209        #[cfg(feature = "tls")]
210        let channel = match self.tonic_config.tls_config {
211            Some(tls_config) => endpoint
212                .tls_config(tls_config)
213                .map_err(crate::Error::from)?,
214            None => endpoint,
215        }
216        .timeout(timeout)
217        .connect_lazy();
218
219        #[cfg(not(feature = "tls"))]
220        let channel = endpoint.timeout(timeout).connect_lazy();
221
222        otel_debug!(name: "TonicChannelBuilt", endpoint = endpoint_clone, timeout_in_millisecs = timeout.as_millis(), compression = format!("{:?}", compression), headers = format!("{:?}", headers_for_logging));
223        Ok((channel, interceptor, compression))
224    }
225
226    fn resolve_endpoint(default_endpoint_var: &str, provided_endpoint: Option<String>) -> String {
227        // resolving endpoint string
228        // grpc doesn't have a "path" like http(See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md)
229        // the path of grpc calls are based on the protobuf service definition
230        // so we won't append one for default grpc endpoints
231        // If users for some reason want to use a custom path, they can use env var or builder to pass it
232        match env::var(default_endpoint_var)
233            .ok()
234            .or(env::var(OTEL_EXPORTER_OTLP_ENDPOINT).ok())
235        {
236            Some(val) => val,
237            None => {
238                provided_endpoint.unwrap_or(OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT.to_string())
239            }
240        }
241    }
242
243    fn resolve_compression(
244        &self,
245        env_override: &str,
246    ) -> Result<Option<CompressionEncoding>, crate::Error> {
247        if let Some(compression) = self.tonic_config.compression {
248            Ok(Some(compression.try_into()?))
249        } else if let Ok(compression) = env::var(env_override) {
250            Ok(Some(compression.parse::<Compression>()?.try_into()?))
251        } else if let Ok(compression) = env::var(OTEL_EXPORTER_OTLP_COMPRESSION) {
252            Ok(Some(compression.parse::<Compression>()?.try_into()?))
253        } else {
254            Ok(None)
255        }
256    }
257
258    /// Build a new tonic log exporter
259    #[cfg(feature = "logs")]
260    pub(crate) fn build_log_exporter(
261        self,
262    ) -> Result<crate::logs::LogExporter, opentelemetry_sdk::logs::LogError> {
263        use crate::exporter::tonic::logs::TonicLogsClient;
264
265        otel_debug!(name: "LogsTonicChannelBuilding");
266
267        let (channel, interceptor, compression) = self.build_channel(
268            crate::logs::OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
269            crate::logs::OTEL_EXPORTER_OTLP_LOGS_TIMEOUT,
270            crate::logs::OTEL_EXPORTER_OTLP_LOGS_COMPRESSION,
271            crate::logs::OTEL_EXPORTER_OTLP_LOGS_HEADERS,
272        )?;
273
274        let client = TonicLogsClient::new(channel, interceptor, compression);
275
276        Ok(crate::logs::LogExporter::from_tonic(client))
277    }
278
279    /// Build a new tonic metrics exporter
280    #[cfg(feature = "metrics")]
281    pub(crate) fn build_metrics_exporter(
282        self,
283        temporality: opentelemetry_sdk::metrics::Temporality,
284    ) -> opentelemetry_sdk::metrics::MetricResult<crate::MetricExporter> {
285        use crate::MetricExporter;
286        use metrics::TonicMetricsClient;
287
288        otel_debug!(name: "MetricsTonicChannelBuilding");
289
290        let (channel, interceptor, compression) = self.build_channel(
291            crate::metric::OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
292            crate::metric::OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
293            crate::metric::OTEL_EXPORTER_OTLP_METRICS_COMPRESSION,
294            crate::metric::OTEL_EXPORTER_OTLP_METRICS_HEADERS,
295        )?;
296
297        let client = TonicMetricsClient::new(channel, interceptor, compression);
298
299        Ok(MetricExporter::new(client, temporality))
300    }
301
302    /// Build a new tonic span exporter
303    #[cfg(feature = "trace")]
304    pub(crate) fn build_span_exporter(
305        self,
306    ) -> Result<crate::SpanExporter, opentelemetry::trace::TraceError> {
307        use crate::exporter::tonic::trace::TonicTracesClient;
308
309        otel_debug!(name: "TracesTonicChannelBuilding");
310
311        let (channel, interceptor, compression) = self.build_channel(
312            crate::span::OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
313            crate::span::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
314            crate::span::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION,
315            crate::span::OTEL_EXPORTER_OTLP_TRACES_HEADERS,
316        )?;
317
318        let client = TonicTracesClient::new(channel, interceptor, compression);
319
320        Ok(crate::SpanExporter::new(client))
321    }
322}
323
324fn merge_metadata_with_headers_from_env(
325    metadata: MetadataMap,
326    headers_from_env: HeaderMap,
327) -> MetadataMap {
328    if headers_from_env.is_empty() {
329        metadata
330    } else {
331        let mut existing_headers: HeaderMap = metadata.into_headers();
332        existing_headers.extend(headers_from_env);
333
334        MetadataMap::from_headers(existing_headers)
335    }
336}
337
338fn parse_headers_from_env(signal_headers_var: &str) -> (HeaderMap, Vec<(String, String)>) {
339    let mut headers = Vec::new();
340
341    (
342        env::var(signal_headers_var)
343            .or_else(|_| env::var(OTEL_EXPORTER_OTLP_HEADERS))
344            .map(|input| {
345                parse_header_string(&input)
346                    .filter_map(|(key, value)| {
347                        headers.push((key.to_owned(), value.clone()));
348                        Some((
349                            HeaderName::from_str(key).ok()?,
350                            HeaderValue::from_str(&value).ok()?,
351                        ))
352                    })
353                    .collect::<HeaderMap>()
354            })
355            .unwrap_or_default(),
356        headers,
357    )
358}
359
360/// Expose interface for modifying [TonicConfig] fields within the exporter builders.
361pub trait HasTonicConfig {
362    /// Return a mutable reference to the export config within the exporter builders.
363    fn tonic_config(&mut self) -> &mut TonicConfig;
364}
365
366/// Expose interface for modifying [TonicConfig] fields within the [TonicExporterBuilder].
367impl HasTonicConfig for TonicExporterBuilder {
368    fn tonic_config(&mut self) -> &mut TonicConfig {
369        &mut self.tonic_config
370    }
371}
372
373/// Expose methods to override [TonicConfig].
374///
375/// This trait will be implemented for every struct that implemented [`HasTonicConfig`] trait.
376///
377/// ## Examples
378/// ```
379/// # #[cfg(all(feature = "trace", feature = "grpc-tonic"))]
380/// # {
381/// use crate::opentelemetry_otlp::{WithExportConfig, WithTonicConfig};
382/// let exporter_builder = opentelemetry_otlp::SpanExporter::builder()
383///     .with_tonic()
384///     .with_compression(opentelemetry_otlp::Compression::Gzip);
385/// # }
386/// ```
387pub trait WithTonicConfig {
388    /// Set the TLS settings for the collector endpoint.
389    #[cfg(feature = "tls")]
390    fn with_tls_config(self, tls_config: ClientTlsConfig) -> Self;
391
392    /// Set custom metadata entries to send to the collector.
393    fn with_metadata(self, metadata: MetadataMap) -> Self;
394
395    /// Set the compression algorithm to use when communicating with the collector.
396    fn with_compression(self, compression: Compression) -> Self;
397
398    /// Use `channel` as tonic's transport channel.
399    /// this will override tls config and should only be used
400    /// when working with non-HTTP transports.
401    ///
402    /// Users MUST make sure the [`ExportConfig::timeout`] is
403    /// the same as the channel's timeout.
404    fn with_channel(self, channel: tonic::transport::Channel) -> Self;
405
406    /// Use a custom `interceptor` to modify each outbound request.
407    /// this can be used to modify the grpc metadata, for example
408    /// to inject auth tokens.
409    fn with_interceptor<I>(self, interceptor: I) -> Self
410    where
411        I: tonic::service::Interceptor + Clone + Send + Sync + 'static;
412}
413
414impl<B: HasTonicConfig> WithTonicConfig for B {
415    #[cfg(feature = "tls")]
416    fn with_tls_config(mut self, tls_config: ClientTlsConfig) -> Self {
417        self.tonic_config().tls_config = Some(tls_config);
418        self
419    }
420
421    /// Set custom metadata entries to send to the collector.
422    fn with_metadata(mut self, metadata: MetadataMap) -> Self {
423        // extending metadata maps is harder than just casting back/forth
424        let mut existing_headers = self
425            .tonic_config()
426            .metadata
427            .clone()
428            .unwrap_or_default()
429            .into_headers();
430        existing_headers.extend(metadata.into_headers());
431
432        self.tonic_config().metadata = Some(MetadataMap::from_headers(existing_headers));
433        self
434    }
435
436    fn with_compression(mut self, compression: Compression) -> Self {
437        self.tonic_config().compression = Some(compression);
438        self
439    }
440
441    fn with_channel(mut self, channel: tonic::transport::Channel) -> Self {
442        self.tonic_config().channel = Some(channel);
443        self
444    }
445
446    fn with_interceptor<I>(mut self, interceptor: I) -> Self
447    where
448        I: tonic::service::Interceptor + Clone + Send + Sync + 'static,
449    {
450        self.tonic_config().interceptor = Some(BoxInterceptor(Box::new(interceptor)));
451        self
452    }
453}
454
455#[cfg(test)]
456mod tests {
457    use crate::exporter::tests::run_env_test;
458    use crate::exporter::tonic::WithTonicConfig;
459    #[cfg(feature = "grpc-tonic")]
460    use crate::exporter::Compression;
461    use crate::{TonicExporterBuilder, WithExportConfig, OTEL_EXPORTER_OTLP_TRACES_ENDPOINT};
462    use crate::{OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TRACES_HEADERS};
463    use http::{HeaderMap, HeaderName, HeaderValue};
464    use tonic::metadata::{MetadataMap, MetadataValue};
465
466    #[test]
467    fn test_with_metadata() {
468        // metadata should merge with the current one with priority instead of just replacing it
469        let mut metadata = MetadataMap::new();
470        metadata.insert("foo", "bar".parse().unwrap());
471        let builder = TonicExporterBuilder::default().with_metadata(metadata);
472        let result = builder.tonic_config.metadata.unwrap();
473        let foo = result
474            .get("foo")
475            .expect("there to always be an entry for foo");
476        assert_eq!(foo, &MetadataValue::try_from("bar").unwrap());
477        assert!(result.get("User-Agent").is_some());
478
479        // metadata should override entries with the same key in the default one
480        let mut metadata = MetadataMap::new();
481        metadata.insert("user-agent", "baz".parse().unwrap());
482        let builder = TonicExporterBuilder::default().with_metadata(metadata);
483        let result = builder.tonic_config.metadata.unwrap();
484        assert_eq!(
485            result.get("User-Agent").unwrap(),
486            &MetadataValue::try_from("baz").unwrap()
487        );
488        assert_eq!(
489            result.len(),
490            TonicExporterBuilder::default()
491                .tonic_config
492                .metadata
493                .unwrap()
494                .len()
495        );
496    }
497
498    #[test]
499    #[cfg(feature = "gzip-tonic")]
500    fn test_with_gzip_compression() {
501        // metadata should merge with the current one with priority instead of just replacing it
502        let mut metadata = MetadataMap::new();
503        metadata.insert("foo", "bar".parse().unwrap());
504        let builder = TonicExporterBuilder::default().with_compression(Compression::Gzip);
505        assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Gzip);
506    }
507
508    #[test]
509    #[cfg(feature = "zstd-tonic")]
510    fn test_with_zstd_compression() {
511        let builder = TonicExporterBuilder::default().with_compression(Compression::Zstd);
512        assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Zstd);
513    }
514
515    #[test]
516    fn test_convert_compression() {
517        #[cfg(feature = "gzip-tonic")]
518        assert!(tonic::codec::CompressionEncoding::try_from(Compression::Gzip).is_ok());
519        #[cfg(not(feature = "gzip-tonic"))]
520        assert!(tonic::codec::CompressionEncoding::try_from(Compression::Gzip).is_err());
521        #[cfg(feature = "zstd-tonic")]
522        assert!(tonic::codec::CompressionEncoding::try_from(Compression::Zstd).is_ok());
523        #[cfg(not(feature = "zstd-tonic"))]
524        assert!(tonic::codec::CompressionEncoding::try_from(Compression::Zstd).is_err());
525    }
526
527    #[test]
528    fn test_parse_headers_from_env() {
529        run_env_test(
530            vec![
531                (OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2"),
532                (OTEL_EXPORTER_OTLP_HEADERS, "k3=v3"),
533            ],
534            || {
535                assert_eq!(
536                    super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS).0,
537                    HeaderMap::from_iter([
538                        (
539                            HeaderName::from_static("k1"),
540                            HeaderValue::from_static("v1")
541                        ),
542                        (
543                            HeaderName::from_static("k2"),
544                            HeaderValue::from_static("v2")
545                        ),
546                    ])
547                );
548
549                assert_eq!(
550                    super::parse_headers_from_env("EMPTY_ENV").0,
551                    HeaderMap::from_iter([(
552                        HeaderName::from_static("k3"),
553                        HeaderValue::from_static("v3")
554                    )])
555                );
556            },
557        )
558    }
559
560    #[test]
561    fn test_merge_metadata_with_headers_from_env() {
562        run_env_test(
563            vec![(OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2")],
564            || {
565                let headers_from_env =
566                    super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS);
567
568                let mut metadata = MetadataMap::new();
569                metadata.insert("foo", "bar".parse().unwrap());
570                metadata.insert("k1", "v0".parse().unwrap());
571
572                let result =
573                    super::merge_metadata_with_headers_from_env(metadata, headers_from_env.0);
574
575                assert_eq!(
576                    result.get("foo").unwrap(),
577                    MetadataValue::from_static("bar")
578                );
579                assert_eq!(result.get("k1").unwrap(), MetadataValue::from_static("v1"));
580                assert_eq!(result.get("k2").unwrap(), MetadataValue::from_static("v2"));
581            },
582        );
583    }
584
585    #[test]
586    fn test_tonic_exporter_endpoint() {
587        // default endpoint for grpc should not add signal path.
588        run_env_test(vec![], || {
589            let exporter = TonicExporterBuilder::default();
590
591            let url = TonicExporterBuilder::resolve_endpoint(
592                OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
593                exporter.exporter_config.endpoint,
594            );
595
596            assert_eq!(url, "http://localhost:4317");
597        });
598
599        // if builder endpoint is set, it should not use default.
600        run_env_test(vec![], || {
601            let exporter = TonicExporterBuilder::default().with_endpoint("http://localhost:1234");
602
603            let url = TonicExporterBuilder::resolve_endpoint(
604                OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
605                exporter.exporter_config.endpoint,
606            );
607
608            assert_eq!(url, "http://localhost:1234");
609        });
610    }
611}