1use super::{
2 default_headers, default_protocol, parse_header_string,
3 OTEL_EXPORTER_OTLP_HTTP_ENDPOINT_DEFAULT,
4};
5use crate::{
6 ExportConfig, Protocol, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_HEADERS,
7 OTEL_EXPORTER_OTLP_TIMEOUT,
8};
9use http::{HeaderName, HeaderValue, Uri};
10use opentelemetry_http::HttpClient;
11use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
12#[cfg(feature = "logs")]
13use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
14#[cfg(feature = "trace")]
15use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
16#[cfg(feature = "logs")]
17use opentelemetry_sdk::logs::LogBatch;
18#[cfg(feature = "trace")]
19use opentelemetry_sdk::trace::SpanData;
20use prost::Message;
21use std::collections::HashMap;
22use std::env;
23use std::str::FromStr;
24use std::sync::{Arc, Mutex};
25use std::time::Duration;
26
27#[cfg(feature = "metrics")]
28mod metrics;
29
30#[cfg(feature = "metrics")]
31use opentelemetry_sdk::metrics::data::ResourceMetrics;
32
33#[cfg(feature = "logs")]
34pub(crate) mod logs;
35
36#[cfg(feature = "trace")]
37mod trace;
38
39#[cfg(all(
40 not(feature = "reqwest-client"),
41 not(feature = "reqwest-blocking-client"),
42 feature = "hyper-client"
43))]
44use opentelemetry_http::hyper::HyperClient;
45
46#[derive(Debug, Default)]
48pub struct HttpConfig {
49 client: Option<Arc<dyn HttpClient>>,
51
52 headers: Option<HashMap<String, String>>,
54}
55
56#[derive(Debug)]
84pub struct HttpExporterBuilder {
85 pub(crate) exporter_config: ExportConfig,
86 pub(crate) http_config: HttpConfig,
87}
88
89impl Default for HttpExporterBuilder {
90 fn default() -> Self {
91 HttpExporterBuilder {
92 exporter_config: ExportConfig {
93 protocol: default_protocol(),
94 ..ExportConfig::default()
95 },
96 http_config: HttpConfig {
97 headers: Some(default_headers()),
98 ..HttpConfig::default()
99 },
100 }
101 }
102}
103
104impl HttpExporterBuilder {
105 fn build_client(
106 &mut self,
107 signal_endpoint_var: &str,
108 signal_endpoint_path: &str,
109 signal_timeout_var: &str,
110 signal_http_headers_var: &str,
111 ) -> Result<OtlpHttpClient, crate::Error> {
112 let endpoint = resolve_http_endpoint(
113 signal_endpoint_var,
114 signal_endpoint_path,
115 self.exporter_config.endpoint.clone(),
116 )?;
117
118 let timeout = match env::var(signal_timeout_var)
119 .ok()
120 .or(env::var(OTEL_EXPORTER_OTLP_TIMEOUT).ok())
121 {
122 Some(val) => match val.parse() {
123 Ok(seconds) => Duration::from_secs(seconds),
124 Err(_) => self.exporter_config.timeout,
125 },
126 None => self.exporter_config.timeout,
127 };
128
129 #[allow(unused_mut)] let mut http_client = self.http_config.client.take();
131
132 if http_client.is_none() {
133 #[cfg(all(
134 not(feature = "reqwest-client"),
135 not(feature = "reqwest-blocking-client"),
136 feature = "hyper-client"
137 ))]
138 {
139 http_client = Some(Arc::new(HyperClient::with_default_connector(timeout, None))
141 as Arc<dyn HttpClient>);
142 }
143 #[cfg(all(
144 not(feature = "hyper-client"),
145 not(feature = "reqwest-blocking-client"),
146 feature = "reqwest-client"
147 ))]
148 {
149 http_client = Some(Arc::new(
150 reqwest::Client::builder()
151 .timeout(timeout)
152 .build()
153 .unwrap_or_default(),
154 ) as Arc<dyn HttpClient>);
155 }
156 #[cfg(all(
157 not(feature = "hyper-client"),
158 not(feature = "reqwest-client"),
159 feature = "reqwest-blocking-client"
160 ))]
161 {
162 let timeout_clone = timeout;
163 http_client = Some(Arc::new(
164 std::thread::spawn(move || {
165 reqwest::blocking::Client::builder()
166 .timeout(timeout_clone)
167 .build()
168 .unwrap_or_else(|_| reqwest::blocking::Client::new())
169 })
170 .join()
171 .unwrap(), ) as Arc<dyn HttpClient>);
173 }
174 }
175
176 let http_client = http_client.ok_or(crate::Error::NoHttpClient)?;
177
178 #[allow(clippy::mutable_key_type)] let mut headers: HashMap<HeaderName, HeaderValue> = self
180 .http_config
181 .headers
182 .take()
183 .unwrap_or_default()
184 .into_iter()
185 .filter_map(|(k, v)| {
186 Some((
187 HeaderName::from_str(&k).ok()?,
188 HeaderValue::from_str(&v).ok()?,
189 ))
190 })
191 .collect();
192
193 if let Ok(input) =
195 env::var(signal_http_headers_var).or_else(|_| env::var(OTEL_EXPORTER_OTLP_HEADERS))
196 {
197 add_header_from_string(&input, &mut headers);
198 }
199
200 Ok(OtlpHttpClient::new(
201 http_client,
202 endpoint,
203 headers,
204 self.exporter_config.protocol,
205 timeout,
206 ))
207 }
208
209 #[cfg(feature = "trace")]
211 pub fn build_span_exporter(
212 mut self,
213 ) -> Result<crate::SpanExporter, opentelemetry::trace::TraceError> {
214 use crate::{
215 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, OTEL_EXPORTER_OTLP_TRACES_HEADERS,
216 OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
217 };
218
219 let client = self.build_client(
220 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
221 "/v1/traces",
222 OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
223 OTEL_EXPORTER_OTLP_TRACES_HEADERS,
224 )?;
225
226 Ok(crate::SpanExporter::new(client))
227 }
228
229 #[cfg(feature = "logs")]
231 pub fn build_log_exporter(mut self) -> opentelemetry_sdk::logs::LogResult<crate::LogExporter> {
232 use crate::{
233 OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, OTEL_EXPORTER_OTLP_LOGS_HEADERS,
234 OTEL_EXPORTER_OTLP_LOGS_TIMEOUT,
235 };
236
237 let client = self.build_client(
238 OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
239 "/v1/logs",
240 OTEL_EXPORTER_OTLP_LOGS_TIMEOUT,
241 OTEL_EXPORTER_OTLP_LOGS_HEADERS,
242 )?;
243
244 Ok(crate::LogExporter::from_http(client))
245 }
246
247 #[cfg(feature = "metrics")]
249 pub fn build_metrics_exporter(
250 mut self,
251 temporality: opentelemetry_sdk::metrics::Temporality,
252 ) -> opentelemetry_sdk::metrics::MetricResult<crate::MetricExporter> {
253 use crate::{
254 OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, OTEL_EXPORTER_OTLP_METRICS_HEADERS,
255 OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
256 };
257
258 let client = self.build_client(
259 OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
260 "/v1/metrics",
261 OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
262 OTEL_EXPORTER_OTLP_METRICS_HEADERS,
263 )?;
264
265 Ok(crate::MetricExporter::new(client, temporality))
266 }
267}
268
269#[derive(Debug)]
270pub(crate) struct OtlpHttpClient {
271 client: Mutex<Option<Arc<dyn HttpClient>>>,
272 collector_endpoint: Uri,
273 headers: HashMap<HeaderName, HeaderValue>,
274 protocol: Protocol,
275 _timeout: Duration,
276 #[allow(dead_code)]
277 resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
279}
280
281impl OtlpHttpClient {
282 #[allow(clippy::mutable_key_type)] fn new(
284 client: Arc<dyn HttpClient>,
285 collector_endpoint: Uri,
286 headers: HashMap<HeaderName, HeaderValue>,
287 protocol: Protocol,
288 timeout: Duration,
289 ) -> Self {
290 OtlpHttpClient {
291 client: Mutex::new(Some(client)),
292 collector_endpoint,
293 headers,
294 protocol,
295 _timeout: timeout,
296 resource: ResourceAttributesWithSchema::default(),
297 }
298 }
299
300 #[cfg(feature = "trace")]
301 fn build_trace_export_body(
302 &self,
303 spans: Vec<SpanData>,
304 ) -> opentelemetry::trace::TraceResult<(Vec<u8>, &'static str)> {
305 use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
306 let resource_spans = group_spans_by_resource_and_scope(spans, &self.resource);
307
308 let req = ExportTraceServiceRequest { resource_spans };
309 match self.protocol {
310 #[cfg(feature = "http-json")]
311 Protocol::HttpJson => match serde_json::to_string_pretty(&req) {
312 Ok(json) => Ok((json.into(), "application/json")),
313 Err(e) => Err(opentelemetry::trace::TraceError::from(e.to_string())),
314 },
315 _ => Ok((req.encode_to_vec(), "application/x-protobuf")),
316 }
317 }
318
319 #[cfg(feature = "logs")]
320 fn build_logs_export_body(
321 &self,
322 logs: LogBatch<'_>,
323 ) -> opentelemetry_sdk::logs::LogResult<(Vec<u8>, &'static str)> {
324 use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
325 let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource);
326 let req = ExportLogsServiceRequest { resource_logs };
327
328 match self.protocol {
329 #[cfg(feature = "http-json")]
330 Protocol::HttpJson => match serde_json::to_string_pretty(&req) {
331 Ok(json) => Ok((json.into(), "application/json")),
332 Err(e) => Err(opentelemetry_sdk::logs::LogError::from(e.to_string())),
333 },
334 _ => Ok((req.encode_to_vec(), "application/x-protobuf")),
335 }
336 }
337
338 #[cfg(feature = "metrics")]
339 fn build_metrics_export_body(
340 &self,
341 metrics: &mut ResourceMetrics,
342 ) -> opentelemetry_sdk::metrics::MetricResult<(Vec<u8>, &'static str)> {
343 use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
344
345 let req: ExportMetricsServiceRequest = (&*metrics).into();
346
347 match self.protocol {
348 #[cfg(feature = "http-json")]
349 Protocol::HttpJson => match serde_json::to_string_pretty(&req) {
350 Ok(json) => Ok((json.into(), "application/json")),
351 Err(e) => Err(opentelemetry_sdk::metrics::MetricError::Other(
352 e.to_string(),
353 )),
354 },
355 _ => Ok((req.encode_to_vec(), "application/x-protobuf")),
356 }
357 }
358}
359
360fn build_endpoint_uri(endpoint: &str, path: &str) -> Result<Uri, crate::Error> {
361 let path = if endpoint.ends_with('/') && path.starts_with('/') {
362 path.strip_prefix('/').unwrap()
363 } else {
364 path
365 };
366 format!("{endpoint}{path}").parse().map_err(From::from)
367}
368
369fn resolve_http_endpoint(
371 signal_endpoint_var: &str,
372 signal_endpoint_path: &str,
373 provided_endpoint: Option<String>,
374) -> Result<Uri, crate::Error> {
375 if let Some(endpoint) = env::var(signal_endpoint_var)
377 .ok()
378 .and_then(|s| s.parse().ok())
379 {
380 return Ok(endpoint);
381 }
382
383 if let Some(endpoint) = env::var(OTEL_EXPORTER_OTLP_ENDPOINT)
385 .ok()
386 .and_then(|s| build_endpoint_uri(&s, signal_endpoint_path).ok())
387 {
388 return Ok(endpoint);
389 }
390
391 provided_endpoint
392 .map(|e| e.parse().map_err(From::from))
393 .unwrap_or_else(|| {
394 build_endpoint_uri(
395 OTEL_EXPORTER_OTLP_HTTP_ENDPOINT_DEFAULT,
396 signal_endpoint_path,
397 )
398 })
399}
400
401#[allow(clippy::mutable_key_type)] fn add_header_from_string(input: &str, headers: &mut HashMap<HeaderName, HeaderValue>) {
403 headers.extend(parse_header_string(input).filter_map(|(key, value)| {
404 Some((
405 HeaderName::from_str(key).ok()?,
406 HeaderValue::from_str(&value).ok()?,
407 ))
408 }));
409}
410
411pub trait HasHttpConfig {
413 fn http_client_config(&mut self) -> &mut HttpConfig;
415}
416
417impl HasHttpConfig for HttpExporterBuilder {
419 fn http_client_config(&mut self) -> &mut HttpConfig {
420 &mut self.http_config
421 }
422}
423
424pub trait WithHttpConfig {
437 fn with_http_client<T: HttpClient + 'static>(self, client: T) -> Self;
439
440 fn with_headers(self, headers: HashMap<String, String>) -> Self;
442}
443
444impl<B: HasHttpConfig> WithHttpConfig for B {
445 fn with_http_client<T: HttpClient + 'static>(mut self, client: T) -> Self {
446 self.http_client_config().client = Some(Arc::new(client));
447 self
448 }
449
450 fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
451 self.http_client_config()
453 .headers
454 .iter_mut()
455 .zip(headers)
456 .for_each(|(http_client_headers, (key, value))| {
457 http_client_headers.insert(key, super::url_decode(&value).unwrap_or(value));
458 });
459 self
460 }
461}
462
463#[cfg(test)]
464mod tests {
465 use crate::exporter::http::HttpConfig;
466 use crate::exporter::tests::run_env_test;
467 use crate::{
468 HttpExporterBuilder, WithExportConfig, WithHttpConfig, OTEL_EXPORTER_OTLP_ENDPOINT,
469 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
470 };
471
472 use super::{build_endpoint_uri, resolve_http_endpoint};
473
474 #[test]
475 fn test_append_signal_path_to_generic_env() {
476 run_env_test(
477 vec![(OTEL_EXPORTER_OTLP_ENDPOINT, "http://example.com")],
478 || {
479 let endpoint = resolve_http_endpoint(
480 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
481 "/v1/traces",
482 Some("http://localhost:4317".to_string()),
483 )
484 .unwrap();
485 assert_eq!(endpoint, "http://example.com/v1/traces");
486 },
487 )
488 }
489
490 #[test]
491 fn test_not_append_signal_path_to_signal_env() {
492 run_env_test(
493 vec![(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "http://example.com")],
494 || {
495 let endpoint = super::resolve_http_endpoint(
496 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
497 "/v1/traces",
498 Some("http://localhost:4317".to_string()),
499 )
500 .unwrap();
501 assert_eq!(endpoint, "http://example.com");
502 },
503 )
504 }
505
506 #[test]
507 fn test_priority_of_signal_env_over_generic_env() {
508 run_env_test(
509 vec![
510 (OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "http://example.com"),
511 (OTEL_EXPORTER_OTLP_ENDPOINT, "http://wrong.com"),
512 ],
513 || {
514 let endpoint = super::resolve_http_endpoint(
515 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
516 "/v1/traces",
517 Some("http://localhost:4317".to_string()),
518 )
519 .unwrap();
520 assert_eq!(endpoint, "http://example.com");
521 },
522 );
523 }
524
525 #[test]
526 fn test_use_provided_or_default_when_others_missing() {
527 run_env_test(vec![], || {
528 let endpoint = super::resolve_http_endpoint(
529 "NON_EXISTENT_VAR",
530 "/v1/traces",
531 Some("http://localhost:4317".to_string()),
532 )
533 .unwrap();
534 assert_eq!(endpoint, "http://localhost:4317/");
535 });
536 }
537
538 #[test]
539 fn test_build_endpoint_uri() {
540 let uri = build_endpoint_uri("https://example.com", "/v1/traces").unwrap();
541 assert_eq!(uri, "https://example.com/v1/traces");
542
543 let uri = build_endpoint_uri("https://example.com/", "/v1/traces").unwrap();
545 assert_eq!(uri, "https://example.com/v1/traces");
546
547 let uri = build_endpoint_uri("https://example.com/additional/path/", "/v1/traces").unwrap();
549 assert_eq!(uri, "https://example.com/additional/path/v1/traces");
550 }
551
552 #[test]
553 fn test_invalid_uri_in_signal_env_falls_back_to_generic_env() {
554 run_env_test(
555 vec![
556 (
557 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
558 "-*/*-/*-//-/-/invalid-uri",
559 ),
560 (OTEL_EXPORTER_OTLP_ENDPOINT, "http://example.com"),
561 ],
562 || {
563 let endpoint = super::resolve_http_endpoint(
564 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
565 "/v1/traces",
566 Some("http://localhost:4317".to_string()),
567 )
568 .unwrap();
569 assert_eq!(endpoint, "http://example.com/v1/traces");
570 },
571 );
572 }
573
574 #[test]
575 fn test_all_invalid_urls_falls_back_to_error() {
576 run_env_test(vec![], || {
577 let result = super::resolve_http_endpoint(
578 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
579 "/v1/traces",
580 Some("-*/*-/*-//-/-/yet-another-invalid-uri".to_string()),
581 );
582 assert!(result.is_err());
583 });
585 }
586
587 #[test]
588 fn test_add_header_from_string() {
589 use http::{HeaderName, HeaderValue};
590 use std::collections::HashMap;
591 let test_cases = vec![
592 ("k1=v1", vec![("k1", "v1")]),
594 ("k1=v1,k2=v2", vec![("k1", "v1"), ("k2", "v2")]),
595 ("k1=v1=10,k2,k3", vec![("k1", "v1=10")]),
596 ("k1=v1,,,k2,k3=10", vec![("k1", "v1"), ("k3", "10")]),
597 ];
598
599 for (input_str, expected_headers) in test_cases {
600 #[allow(clippy::mutable_key_type)] let mut headers: HashMap<HeaderName, HeaderValue> = HashMap::new();
602 super::add_header_from_string(input_str, &mut headers);
603
604 assert_eq!(
605 headers.len(),
606 expected_headers.len(),
607 "Failed on input: {}",
608 input_str
609 );
610
611 for (expected_key, expected_value) in expected_headers {
612 assert_eq!(
613 headers.get(&HeaderName::from_static(expected_key)),
614 Some(&HeaderValue::from_static(expected_value)),
615 "Failed on key: {} with input: {}",
616 expected_key,
617 input_str
618 );
619 }
620 }
621 }
622
623 #[test]
624 fn test_merge_header_from_string() {
625 use http::{HeaderName, HeaderValue};
626 use std::collections::HashMap;
627 #[allow(clippy::mutable_key_type)] let mut headers: HashMap<HeaderName, HeaderValue> = std::collections::HashMap::new();
629 headers.insert(
630 HeaderName::from_static("k1"),
631 HeaderValue::from_static("v1"),
632 );
633 headers.insert(
634 HeaderName::from_static("k2"),
635 HeaderValue::from_static("v2"),
636 );
637 let test_cases = vec![
638 ("k1=v1_new", vec![("k1", "v1_new"), ("k2", "v2")]),
640 (
641 "k3=val=10,22,34,k4=,k5=10",
642 vec![
643 ("k1", "v1_new"),
644 ("k2", "v2"),
645 ("k3", "val=10"),
646 ("k5", "10"),
647 ],
648 ),
649 ];
650
651 for (input_str, expected_headers) in test_cases {
652 super::add_header_from_string(input_str, &mut headers);
653
654 assert_eq!(
655 headers.len(),
656 expected_headers.len(),
657 "Failed on input: {}",
658 input_str
659 );
660
661 for (expected_key, expected_value) in expected_headers {
662 assert_eq!(
663 headers.get(&HeaderName::from_static(expected_key)),
664 Some(&HeaderValue::from_static(expected_value)),
665 "Failed on key: {} with input: {}",
666 expected_key,
667 input_str
668 );
669 }
670 }
671 }
672
673 #[test]
674 fn test_http_exporter_builder_with_header() {
675 use std::collections::HashMap;
676 let initial_headers = HashMap::from([("k1".to_string(), "v1".to_string())]);
678 let extra_headers = HashMap::from([("k2".to_string(), "v2".to_string())]);
679 let expected_headers = initial_headers.iter().chain(extra_headers.iter()).fold(
680 HashMap::new(),
681 |mut acc, (k, v)| {
682 acc.insert(k.clone(), v.clone());
683 acc
684 },
685 );
686 let builder = HttpExporterBuilder {
687 http_config: HttpConfig {
688 client: None,
689 headers: Some(initial_headers),
690 },
691 exporter_config: crate::ExportConfig::default(),
692 };
693
694 let builder = builder.with_headers(extra_headers);
696
697 assert_eq!(
699 builder
700 .http_config
701 .headers
702 .clone()
703 .expect("headers should always be Some"),
704 expected_headers,
705 );
706 }
707
708 #[test]
709 fn test_http_exporter_endpoint() {
710 run_env_test(vec![], || {
712 let exporter = HttpExporterBuilder::default();
713
714 let url = resolve_http_endpoint(
715 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
716 "/v1/traces",
717 exporter.exporter_config.endpoint,
718 )
719 .unwrap();
720
721 assert_eq!(url, "http://localhost:4318/v1/traces");
722 });
723
724 run_env_test(vec![], || {
726 let exporter = HttpExporterBuilder::default()
727 .with_endpoint("http://localhost:4318/v1/tracesbutnotreally");
728
729 let url = resolve_http_endpoint(
730 OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
731 "/v1/traces",
732 exporter.exporter_config.endpoint,
733 )
734 .unwrap();
735
736 assert_eq!(url, "http://localhost:4318/v1/tracesbutnotreally");
737 });
738 }
739}