1use crate::error::{OTelSdkError, OTelSdkResult};
38use crate::resource::Resource;
39use crate::trace::Span;
40use crate::trace::{SpanData, SpanExporter};
41use opentelemetry::Context;
42use opentelemetry::{otel_debug, otel_warn};
43use opentelemetry::{otel_error, otel_info};
44use std::cmp::min;
45use std::sync::atomic::{AtomicUsize, Ordering};
46use std::sync::{Arc, Mutex};
47use std::{env, str::FromStr, time::Duration};
48
49use std::sync::atomic::AtomicBool;
50use std::thread;
51use std::time::Instant;
52
53pub(crate) const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY";
55pub(crate) const OTEL_BSP_SCHEDULE_DELAY_DEFAULT: u64 = 5_000;
57pub(crate) const OTEL_BSP_MAX_QUEUE_SIZE: &str = "OTEL_BSP_MAX_QUEUE_SIZE";
59pub(crate) const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
61pub(crate) const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE";
63pub(crate) const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
65pub(crate) const OTEL_BSP_EXPORT_TIMEOUT: &str = "OTEL_BSP_EXPORT_TIMEOUT";
67pub(crate) const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000;
69pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS: &str = "OTEL_BSP_MAX_CONCURRENT_EXPORTS";
72pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT: usize = 1;
74
75pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
79 fn on_start(&self, span: &mut Span, cx: &Context);
83 fn on_end(&self, span: SpanData);
88 fn force_flush(&self) -> OTelSdkResult;
90 fn shutdown(&self) -> OTelSdkResult;
95 fn set_resource(&mut self, _resource: &Resource) {}
97}
98
99#[derive(Debug)]
115pub struct SimpleSpanProcessor {
116 exporter: Mutex<Box<dyn SpanExporter>>,
117}
118
119impl SimpleSpanProcessor {
120 pub fn new(exporter: Box<dyn SpanExporter>) -> Self {
122 Self {
123 exporter: Mutex::new(exporter),
124 }
125 }
126}
127
128impl SpanProcessor for SimpleSpanProcessor {
129 fn on_start(&self, _span: &mut Span, _cx: &Context) {
130 }
132
133 fn on_end(&self, span: SpanData) {
134 if !span.span_context.is_sampled() {
135 return;
136 }
137
138 let result = self
139 .exporter
140 .lock()
141 .map_err(|_| OTelSdkError::InternalFailure("SimpleSpanProcessor mutex poison".into()))
142 .and_then(|mut exporter| futures_executor::block_on(exporter.export(vec![span])));
143
144 if let Err(err) = result {
145 otel_debug!(
147 name: "SimpleProcessor.OnEnd.Error",
148 reason = format!("{:?}", err)
149 );
150 }
151 }
152
153 fn force_flush(&self) -> OTelSdkResult {
154 Ok(())
156 }
157
158 fn shutdown(&self) -> OTelSdkResult {
159 if let Ok(mut exporter) = self.exporter.lock() {
160 exporter.shutdown()
161 } else {
162 Err(OTelSdkError::InternalFailure(
163 "SimpleSpanProcessor mutex poison at shutdown".into(),
164 ))
165 }
166 }
167
168 fn set_resource(&mut self, resource: &Resource) {
169 if let Ok(mut exporter) = self.exporter.lock() {
170 exporter.set_resource(resource);
171 }
172 }
173}
174
175use std::sync::mpsc::sync_channel;
236use std::sync::mpsc::Receiver;
237use std::sync::mpsc::RecvTimeoutError;
238use std::sync::mpsc::SyncSender;
239
240#[allow(clippy::large_enum_variant)]
242#[derive(Debug)]
243enum BatchMessage {
244 ExportSpan(Arc<AtomicBool>),
246 ForceFlush(SyncSender<OTelSdkResult>),
247 Shutdown(SyncSender<OTelSdkResult>),
248 SetResource(Arc<Resource>),
249}
250
251#[derive(Debug)]
284pub struct BatchSpanProcessor {
285 span_sender: SyncSender<SpanData>, message_sender: SyncSender<BatchMessage>, handle: Mutex<Option<thread::JoinHandle<()>>>,
288 forceflush_timeout: Duration,
289 shutdown_timeout: Duration,
290 is_shutdown: AtomicBool,
291 dropped_span_count: Arc<AtomicUsize>,
292 export_span_message_sent: Arc<AtomicBool>,
293 current_batch_size: Arc<AtomicUsize>,
294 max_export_batch_size: usize,
295 max_queue_size: usize,
296}
297
298impl BatchSpanProcessor {
299 pub fn new<E>(
301 mut exporter: E,
302 config: BatchConfig,
303 ) -> Self
307 where
308 E: SpanExporter + Send + 'static,
309 {
310 let (span_sender, span_receiver) = sync_channel::<SpanData>(config.max_queue_size);
311 let (message_sender, message_receiver) = sync_channel::<BatchMessage>(64); let max_queue_size = config.max_queue_size;
313 let max_export_batch_size = config.max_export_batch_size;
314 let current_batch_size = Arc::new(AtomicUsize::new(0));
315 let current_batch_size_for_thread = current_batch_size.clone();
316
317 let handle = thread::Builder::new()
318 .name("OpenTelemetry.Traces.BatchProcessor".to_string())
319 .spawn(move || {
320 otel_info!(
321 name: "BatchSpanProcessor.ThreadStarted",
322 interval_in_millisecs = config.scheduled_delay.as_millis(),
323 max_export_batch_size = config.max_export_batch_size,
324 max_queue_size = config.max_queue_size,
325 );
326 let mut spans = Vec::with_capacity(config.max_export_batch_size);
327 let mut last_export_time = Instant::now();
328 let current_batch_size = current_batch_size_for_thread;
329 loop {
330 let remaining_time_option = config
331 .scheduled_delay
332 .checked_sub(last_export_time.elapsed());
333 let remaining_time = match remaining_time_option {
334 Some(remaining_time) => remaining_time,
335 None => config.scheduled_delay,
336 };
337 match message_receiver.recv_timeout(remaining_time) {
338 Ok(message) => match message {
339 BatchMessage::ExportSpan(export_span_message_sent) => {
340 export_span_message_sent.store(false, Ordering::Relaxed);
342 otel_debug!(
343 name: "BatchSpanProcessor.ExportingDueToBatchSize",
344 );
345 let _ = Self::get_spans_and_export(
346 &span_receiver,
347 &mut exporter,
348 &mut spans,
349 &mut last_export_time,
350 ¤t_batch_size,
351 &config,
352 );
353 }
354 BatchMessage::ForceFlush(sender) => {
355 otel_debug!(name: "BatchSpanProcessor.ExportingDueToForceFlush");
356 let result = Self::get_spans_and_export(
357 &span_receiver,
358 &mut exporter,
359 &mut spans,
360 &mut last_export_time,
361 ¤t_batch_size,
362 &config,
363 );
364 let _ = sender.send(result);
365 }
366 BatchMessage::Shutdown(sender) => {
367 otel_debug!(name: "BatchSpanProcessor.ExportingDueToShutdown");
368 let result = Self::get_spans_and_export(
369 &span_receiver,
370 &mut exporter,
371 &mut spans,
372 &mut last_export_time,
373 ¤t_batch_size,
374 &config,
375 );
376 let _ = sender.send(result);
377
378 otel_debug!(
379 name: "BatchSpanProcessor.ThreadExiting",
380 reason = "ShutdownRequested"
381 );
382 break;
386 }
387 BatchMessage::SetResource(resource) => {
388 exporter.set_resource(&resource);
389 }
390 },
391 Err(RecvTimeoutError::Timeout) => {
392 otel_debug!(
393 name: "BatchSpanProcessor.ExportingDueToTimer",
394 );
395
396 let _ = Self::get_spans_and_export(
397 &span_receiver,
398 &mut exporter,
399 &mut spans,
400 &mut last_export_time,
401 ¤t_batch_size,
402 &config,
403 );
404 }
405 Err(RecvTimeoutError::Disconnected) => {
406 otel_debug!(
409 name: "BatchSpanProcessor.ThreadExiting",
410 reason = "MessageSenderDisconnected"
411 );
412 break;
413 }
414 }
415 }
416 otel_info!(
417 name: "BatchSpanProcessor.ThreadStopped"
418 );
419 })
420 .expect("Failed to spawn thread"); Self {
423 span_sender,
424 message_sender,
425 handle: Mutex::new(Some(handle)),
426 forceflush_timeout: Duration::from_secs(5), shutdown_timeout: Duration::from_secs(5), is_shutdown: AtomicBool::new(false),
429 dropped_span_count: Arc::new(AtomicUsize::new(0)),
430 max_queue_size,
431 export_span_message_sent: Arc::new(AtomicBool::new(false)),
432 current_batch_size,
433 max_export_batch_size,
434 }
435 }
436
437 pub fn builder<E>(exporter: E) -> BatchSpanProcessorBuilder<E>
439 where
440 E: SpanExporter + Send + 'static,
441 {
442 BatchSpanProcessorBuilder {
443 exporter,
444 config: BatchConfig::default(),
445 }
446 }
447
448 #[inline]
452 fn get_spans_and_export<E>(
453 spans_receiver: &Receiver<SpanData>,
454 exporter: &mut E,
455 spans: &mut Vec<SpanData>,
456 last_export_time: &mut Instant,
457 current_batch_size: &AtomicUsize,
458 config: &BatchConfig,
459 ) -> OTelSdkResult
460 where
461 E: SpanExporter + Send + Sync + 'static,
462 {
463 while let Ok(span) = spans_receiver.try_recv() {
465 spans.push(span);
466 if spans.len() == config.max_export_batch_size {
467 break;
468 }
469 }
470
471 let count_of_spans = spans.len(); let result = Self::export_batch_sync(exporter, spans, last_export_time); current_batch_size.fetch_sub(count_of_spans, Ordering::Relaxed);
475 result
476 }
477
478 #[allow(clippy::vec_box)]
479 fn export_batch_sync<E>(
480 exporter: &mut E,
481 batch: &mut Vec<SpanData>,
482 last_export_time: &mut Instant,
483 ) -> OTelSdkResult
484 where
485 E: SpanExporter + Send + Sync + 'static,
486 {
487 *last_export_time = Instant::now();
488
489 if batch.is_empty() {
490 return OTelSdkResult::Ok(());
491 }
492
493 let export = exporter.export(batch.split_off(0));
494 let export_result = futures_executor::block_on(export);
495
496 match export_result {
497 Ok(_) => OTelSdkResult::Ok(()),
498 Err(err) => {
499 otel_error!(
500 name: "BatchSpanProcessor.ExportError",
501 error = format!("{}", err)
502 );
503 Err(OTelSdkError::InternalFailure(err.to_string()))
504 }
505 }
506 }
507}
508
509impl SpanProcessor for BatchSpanProcessor {
510 fn on_start(&self, _span: &mut Span, _cx: &Context) {
512 }
514
515 fn on_end(&self, span: SpanData) {
517 if self.is_shutdown.load(Ordering::Relaxed) {
518 otel_warn!(
520 name: "BatchSpanProcessor.Emit.ProcessorShutdown",
521 message = "BatchSpanProcessor has been shutdown. No further spans will be emitted."
522 );
523 return;
524 }
525 let result = self.span_sender.try_send(span);
526
527 if result.is_err() {
528 if self.dropped_span_count.fetch_add(1, Ordering::Relaxed) == 0 {
531 otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
532 message = "BatchSpanProcessor dropped a Span due to queue full/internal errors. No further internal log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total Spans dropped.");
533 }
534 }
535 if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size
538 {
539 if !self.export_span_message_sent.load(Ordering::Relaxed) {
544 if !self.export_span_message_sent.swap(true, Ordering::Relaxed) {
549 match self.message_sender.try_send(BatchMessage::ExportSpan(
550 self.export_span_message_sent.clone(),
551 )) {
552 Ok(_) => {
553 }
555 Err(_err) => {
556 self.export_span_message_sent
559 .store(false, Ordering::Relaxed);
560 }
561 }
562 }
563 }
564 }
565 }
566
567 fn force_flush(&self) -> OTelSdkResult {
569 if self.is_shutdown.load(Ordering::Relaxed) {
570 return Err(OTelSdkError::AlreadyShutdown);
571 }
572 let (sender, receiver) = sync_channel(1);
573 self.message_sender
574 .try_send(BatchMessage::ForceFlush(sender))
575 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
576
577 receiver
578 .recv_timeout(self.forceflush_timeout)
579 .map_err(|_| OTelSdkError::Timeout(self.forceflush_timeout))?
580 }
581
582 fn shutdown(&self) -> OTelSdkResult {
584 if self.is_shutdown.swap(true, Ordering::Relaxed) {
585 return Err(OTelSdkError::AlreadyShutdown);
586 }
587 let dropped_spans = self.dropped_span_count.load(Ordering::Relaxed);
588 let max_queue_size = self.max_queue_size;
589 if dropped_spans > 0 {
590 otel_warn!(
591 name: "BatchSpanProcessor.SpansDropped",
592 dropped_span_count = dropped_spans,
593 max_queue_size = max_queue_size,
594 message = "Spans were dropped due to a queue being full or other error. The count represents the total count of spans dropped in the lifetime of this BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals."
595 );
596 }
597
598 let (sender, receiver) = sync_channel(1);
599 self.message_sender
600 .try_send(BatchMessage::Shutdown(sender))
601 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
602
603 let result = receiver
604 .recv_timeout(self.shutdown_timeout)
605 .map_err(|_| OTelSdkError::Timeout(self.shutdown_timeout))?;
606 if let Some(handle) = self.handle.lock().unwrap().take() {
607 if let Err(err) = handle.join() {
608 return Err(OTelSdkError::InternalFailure(format!(
609 "Background thread failed to join during shutdown. This may indicate a panic or unexpected termination: {:?}",
610 err
611 )));
612 }
613 }
614 result
615 }
616
617 fn set_resource(&mut self, resource: &Resource) {
619 let resource = Arc::new(resource.clone());
620 let _ = self
621 .message_sender
622 .try_send(BatchMessage::SetResource(resource));
623 }
624}
625
626#[derive(Debug, Default)]
628pub struct BatchSpanProcessorBuilder<E>
629where
630 E: SpanExporter + Send + 'static,
631{
632 exporter: E,
633 config: BatchConfig,
634}
635
636impl<E> BatchSpanProcessorBuilder<E>
637where
638 E: SpanExporter + Send + 'static,
639{
640 pub fn with_batch_config(self, config: BatchConfig) -> Self {
642 BatchSpanProcessorBuilder { config, ..self }
643 }
644
645 pub fn build(self) -> BatchSpanProcessor {
647 BatchSpanProcessor::new(self.exporter, self.config)
648 }
649}
650
651#[derive(Debug)]
654pub struct BatchConfig {
655 pub(crate) max_queue_size: usize,
658
659 pub(crate) scheduled_delay: Duration,
662
663 #[allow(dead_code)]
664 pub(crate) max_export_batch_size: usize,
669
670 #[allow(dead_code)]
671 pub(crate) max_export_timeout: Duration,
673
674 #[allow(dead_code)]
675 pub(crate) max_concurrent_exports: usize,
681}
682
683impl Default for BatchConfig {
684 fn default() -> Self {
685 BatchConfigBuilder::default().build()
686 }
687}
688
689#[derive(Debug)]
691pub struct BatchConfigBuilder {
692 max_queue_size: usize,
693 scheduled_delay: Duration,
694 max_export_batch_size: usize,
695 max_export_timeout: Duration,
696 max_concurrent_exports: usize,
697}
698
699impl Default for BatchConfigBuilder {
700 fn default() -> Self {
709 BatchConfigBuilder {
710 max_queue_size: OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
711 scheduled_delay: Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT),
712 max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
713 max_export_timeout: Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT),
714 max_concurrent_exports: OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT,
715 }
716 .init_from_env_vars()
717 }
718}
719
720impl BatchConfigBuilder {
721 pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
726 self.max_queue_size = max_queue_size;
727 self
728 }
729
730 pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
736 self.max_export_batch_size = max_export_batch_size;
737 self
738 }
739
740 #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
741 pub fn with_max_concurrent_exports(mut self, max_concurrent_exports: usize) -> Self {
748 self.max_concurrent_exports = max_concurrent_exports;
749 self
750 }
751
752 pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
756 self.scheduled_delay = scheduled_delay;
757 self
758 }
759
760 #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
764 pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
765 self.max_export_timeout = max_export_timeout;
766 self
767 }
768
769 pub fn build(self) -> BatchConfig {
772 let max_export_batch_size = min(self.max_export_batch_size, self.max_queue_size);
775
776 BatchConfig {
777 max_queue_size: self.max_queue_size,
778 scheduled_delay: self.scheduled_delay,
779 max_export_timeout: self.max_export_timeout,
780 max_concurrent_exports: self.max_concurrent_exports,
781 max_export_batch_size,
782 }
783 }
784
785 fn init_from_env_vars(mut self) -> Self {
786 if let Some(max_concurrent_exports) = env::var(OTEL_BSP_MAX_CONCURRENT_EXPORTS)
787 .ok()
788 .and_then(|max_concurrent_exports| usize::from_str(&max_concurrent_exports).ok())
789 {
790 self.max_concurrent_exports = max_concurrent_exports;
791 }
792
793 if let Some(max_queue_size) = env::var(OTEL_BSP_MAX_QUEUE_SIZE)
794 .ok()
795 .and_then(|queue_size| usize::from_str(&queue_size).ok())
796 {
797 self.max_queue_size = max_queue_size;
798 }
799
800 if let Some(scheduled_delay) = env::var(OTEL_BSP_SCHEDULE_DELAY)
801 .ok()
802 .and_then(|delay| u64::from_str(&delay).ok())
803 {
804 self.scheduled_delay = Duration::from_millis(scheduled_delay);
805 }
806
807 if let Some(max_export_batch_size) = env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE)
808 .ok()
809 .and_then(|batch_size| usize::from_str(&batch_size).ok())
810 {
811 self.max_export_batch_size = max_export_batch_size;
812 }
813
814 if self.max_export_batch_size > self.max_queue_size {
817 self.max_export_batch_size = self.max_queue_size;
818 }
819
820 if let Some(max_export_timeout) = env::var(OTEL_BSP_EXPORT_TIMEOUT)
821 .ok()
822 .and_then(|timeout| u64::from_str(&timeout).ok())
823 {
824 self.max_export_timeout = Duration::from_millis(max_export_timeout);
825 }
826
827 self
828 }
829}
830
831#[cfg(all(test, feature = "testing", feature = "trace"))]
832mod tests {
833 use super::{
835 BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT,
836 OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
837 OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT,
838 };
839 use crate::error::OTelSdkResult;
840 use crate::testing::trace::new_test_export_span_data;
841 use crate::trace::span_processor::{
842 OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_CONCURRENT_EXPORTS,
843 OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
844 };
845 use crate::trace::InMemorySpanExporterBuilder;
846 use crate::trace::{BatchConfig, BatchConfigBuilder, SpanEvents, SpanLinks};
847 use crate::trace::{SpanData, SpanExporter};
848 use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status};
849 use std::fmt::Debug;
850 use std::time::Duration;
851
852 #[test]
853 fn simple_span_processor_on_end_calls_export() {
854 let exporter = InMemorySpanExporterBuilder::new().build();
855 let processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
856 let span_data = new_test_export_span_data();
857 processor.on_end(span_data.clone());
858 assert_eq!(exporter.get_finished_spans().unwrap()[0], span_data);
859 let _result = processor.shutdown();
860 }
861
862 #[test]
863 fn simple_span_processor_on_end_skips_export_if_not_sampled() {
864 let exporter = InMemorySpanExporterBuilder::new().build();
865 let processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
866 let unsampled = SpanData {
867 span_context: SpanContext::empty_context(),
868 parent_span_id: SpanId::INVALID,
869 span_kind: SpanKind::Internal,
870 name: "opentelemetry".into(),
871 start_time: opentelemetry::time::now(),
872 end_time: opentelemetry::time::now(),
873 attributes: Vec::new(),
874 dropped_attributes_count: 0,
875 events: SpanEvents::default(),
876 links: SpanLinks::default(),
877 status: Status::Unset,
878 instrumentation_scope: Default::default(),
879 };
880 processor.on_end(unsampled);
881 assert!(exporter.get_finished_spans().unwrap().is_empty());
882 }
883
884 #[test]
885 fn simple_span_processor_shutdown_calls_shutdown() {
886 let exporter = InMemorySpanExporterBuilder::new().build();
887 let processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
888 let span_data = new_test_export_span_data();
889 processor.on_end(span_data.clone());
890 assert!(!exporter.get_finished_spans().unwrap().is_empty());
891 let _result = processor.shutdown();
892 assert!(exporter.get_finished_spans().unwrap().is_empty());
894 }
895
896 #[test]
897 fn test_default_const_values() {
898 assert_eq!(OTEL_BSP_MAX_QUEUE_SIZE, "OTEL_BSP_MAX_QUEUE_SIZE");
899 assert_eq!(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, 2048);
900 assert_eq!(OTEL_BSP_SCHEDULE_DELAY, "OTEL_BSP_SCHEDULE_DELAY");
901 assert_eq!(OTEL_BSP_SCHEDULE_DELAY_DEFAULT, 5000);
902 assert_eq!(
903 OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
904 "OTEL_BSP_MAX_EXPORT_BATCH_SIZE"
905 );
906 assert_eq!(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
907 assert_eq!(OTEL_BSP_EXPORT_TIMEOUT, "OTEL_BSP_EXPORT_TIMEOUT");
908 assert_eq!(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, 30000);
909 }
910
911 #[test]
912 fn test_default_batch_config_adheres_to_specification() {
913 let env_vars = vec![
914 OTEL_BSP_SCHEDULE_DELAY,
915 OTEL_BSP_EXPORT_TIMEOUT,
916 OTEL_BSP_MAX_QUEUE_SIZE,
917 OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
918 OTEL_BSP_MAX_CONCURRENT_EXPORTS,
919 ];
920
921 let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);
922
923 assert_eq!(
924 config.max_concurrent_exports,
925 OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT
926 );
927 assert_eq!(
928 config.scheduled_delay,
929 Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT)
930 );
931 assert_eq!(
932 config.max_export_timeout,
933 Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT)
934 );
935 assert_eq!(config.max_queue_size, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT);
936 assert_eq!(
937 config.max_export_batch_size,
938 OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT
939 );
940 }
941
942 #[test]
943 fn test_batch_config_configurable_by_env_vars() {
944 let env_vars = vec![
945 (OTEL_BSP_SCHEDULE_DELAY, Some("2000")),
946 (OTEL_BSP_EXPORT_TIMEOUT, Some("60000")),
947 (OTEL_BSP_MAX_QUEUE_SIZE, Some("4096")),
948 (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
949 ];
950
951 let config = temp_env::with_vars(env_vars, BatchConfig::default);
952
953 assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
954 assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
955 assert_eq!(config.max_queue_size, 4096);
956 assert_eq!(config.max_export_batch_size, 1024);
957 }
958
959 #[test]
960 fn test_batch_config_max_export_batch_size_validation() {
961 let env_vars = vec![
962 (OTEL_BSP_MAX_QUEUE_SIZE, Some("256")),
963 (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
964 ];
965
966 let config = temp_env::with_vars(env_vars, BatchConfig::default);
967
968 assert_eq!(config.max_queue_size, 256);
969 assert_eq!(config.max_export_batch_size, 256);
970 assert_eq!(
971 config.scheduled_delay,
972 Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT)
973 );
974 assert_eq!(
975 config.max_export_timeout,
976 Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT)
977 );
978 }
979
980 #[test]
981 fn test_batch_config_with_fields() {
982 let batch = BatchConfigBuilder::default()
983 .with_max_export_batch_size(10)
984 .with_scheduled_delay(Duration::from_millis(10))
985 .with_max_queue_size(10);
986 #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
987 let batch = batch.with_max_concurrent_exports(10);
988 #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
989 let batch = batch.with_max_export_timeout(Duration::from_millis(10));
990 let batch = batch.build();
991 assert_eq!(batch.max_export_batch_size, 10);
992 assert_eq!(batch.scheduled_delay, Duration::from_millis(10));
993 assert_eq!(batch.max_export_timeout, Duration::from_millis(10));
994 assert_eq!(batch.max_concurrent_exports, 10);
995 assert_eq!(batch.max_queue_size, 10);
996 }
997
998 fn create_test_span(name: &str) -> SpanData {
1000 SpanData {
1001 span_context: SpanContext::empty_context(),
1002 parent_span_id: SpanId::INVALID,
1003 span_kind: SpanKind::Internal,
1004 name: name.to_string().into(),
1005 start_time: opentelemetry::time::now(),
1006 end_time: opentelemetry::time::now(),
1007 attributes: Vec::new(),
1008 dropped_attributes_count: 0,
1009 events: SpanEvents::default(),
1010 links: SpanLinks::default(),
1011 status: Status::Unset,
1012 instrumentation_scope: Default::default(),
1013 }
1014 }
1015
1016 use crate::Resource;
1017 use futures_util::future::BoxFuture;
1018 use futures_util::FutureExt;
1019 use opentelemetry::{Key, KeyValue, Value};
1020 use std::sync::{atomic::Ordering, Arc, Mutex};
1021
1022 #[derive(Debug)]
1024 struct MockSpanExporter {
1025 exported_spans: Arc<Mutex<Vec<SpanData>>>,
1026 exported_resource: Arc<Mutex<Option<Resource>>>,
1027 }
1028
1029 impl MockSpanExporter {
1030 fn new() -> Self {
1031 Self {
1032 exported_spans: Arc::new(Mutex::new(Vec::new())),
1033 exported_resource: Arc::new(Mutex::new(None)),
1034 }
1035 }
1036 }
1037
1038 impl SpanExporter for MockSpanExporter {
1039 fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult> {
1040 let exported_spans = self.exported_spans.clone();
1041 async move {
1042 exported_spans.lock().unwrap().extend(batch);
1043 Ok(())
1044 }
1045 .boxed()
1046 }
1047
1048 fn shutdown(&mut self) -> OTelSdkResult {
1049 Ok(())
1050 }
1051 fn set_resource(&mut self, resource: &Resource) {
1052 let mut exported_resource = self.exported_resource.lock().unwrap();
1053 *exported_resource = Some(resource.clone());
1054 }
1055 }
1056
1057 #[test]
1058 fn batchspanprocessor_handles_on_end() {
1059 let exporter = MockSpanExporter::new();
1060 let exporter_shared = exporter.exported_spans.clone();
1061 let config = BatchConfigBuilder::default()
1062 .with_max_queue_size(10)
1063 .with_max_export_batch_size(10)
1064 .with_scheduled_delay(Duration::from_secs(5))
1065 .build();
1066 let processor = BatchSpanProcessor::new(exporter, config);
1067
1068 let test_span = create_test_span("test_span");
1069 processor.on_end(test_span.clone());
1070
1071 std::thread::sleep(Duration::from_secs(6));
1073
1074 let exported_spans = exporter_shared.lock().unwrap();
1075 assert_eq!(exported_spans.len(), 1);
1076 assert_eq!(exported_spans[0].name, "test_span");
1077 }
1078
1079 #[test]
1080 fn batchspanprocessor_force_flush() {
1081 let exporter = MockSpanExporter::new();
1082 let exporter_shared = exporter.exported_spans.clone(); let config = BatchConfigBuilder::default()
1084 .with_max_queue_size(10)
1085 .with_max_export_batch_size(10)
1086 .with_scheduled_delay(Duration::from_secs(5))
1087 .build();
1088 let processor = BatchSpanProcessor::new(exporter, config);
1089
1090 let test_span = create_test_span("force_flush_span");
1092 processor.on_end(test_span.clone());
1093
1094 let flush_result = processor.force_flush();
1096 assert!(flush_result.is_ok(), "Force flush failed unexpectedly");
1097
1098 let exported_spans = exporter_shared.lock().unwrap();
1100 assert_eq!(
1101 exported_spans.len(),
1102 1,
1103 "Unexpected number of exported spans"
1104 );
1105 assert_eq!(exported_spans[0].name, "force_flush_span");
1106 }
1107
1108 #[test]
1109 fn batchspanprocessor_shutdown() {
1110 let exporter = MockSpanExporter::new();
1111 let exporter_shared = exporter.exported_spans.clone(); let config = BatchConfigBuilder::default()
1113 .with_max_queue_size(10)
1114 .with_max_export_batch_size(10)
1115 .with_scheduled_delay(Duration::from_secs(5))
1116 .build();
1117 let processor = BatchSpanProcessor::new(exporter, config);
1118
1119 let test_span = create_test_span("shutdown_span");
1121 processor.on_end(test_span.clone());
1122
1123 let shutdown_result = processor.shutdown();
1125 assert!(shutdown_result.is_ok(), "Shutdown failed unexpectedly");
1126
1127 let exported_spans = exporter_shared.lock().unwrap();
1129 assert_eq!(
1130 exported_spans.len(),
1131 1,
1132 "Unexpected number of exported spans"
1133 );
1134 assert_eq!(exported_spans[0].name, "shutdown_span");
1135
1136 let second_shutdown_result = processor.shutdown();
1138 assert!(
1139 second_shutdown_result.is_err(),
1140 "Shutdown should fail when called a second time"
1141 );
1142 }
1143
1144 #[test]
1145 fn batchspanprocessor_handles_dropped_spans() {
1146 let exporter = MockSpanExporter::new();
1147 let exporter_shared = exporter.exported_spans.clone(); let config = BatchConfigBuilder::default()
1149 .with_max_queue_size(2) .with_scheduled_delay(Duration::from_secs(5))
1151 .build();
1152 let processor = BatchSpanProcessor::new(exporter, config);
1153
1154 let span1 = create_test_span("span1");
1156 let span2 = create_test_span("span2");
1157 let span3 = create_test_span("span3"); processor.on_end(span1.clone());
1160 processor.on_end(span2.clone());
1161 processor.on_end(span3.clone()); std::thread::sleep(Duration::from_secs(3));
1165
1166 let exported_spans = exporter_shared.lock().unwrap();
1167
1168 assert_eq!(
1170 exported_spans.len(),
1171 2,
1172 "Unexpected number of exported spans"
1173 );
1174 assert!(exported_spans.iter().any(|s| s.name == "span1"));
1175 assert!(exported_spans.iter().any(|s| s.name == "span2"));
1176
1177 assert!(
1179 !exported_spans.iter().any(|s| s.name == "span3"),
1180 "Span3 should have been dropped"
1181 );
1182
1183 let dropped_count = processor.dropped_span_count.load(Ordering::Relaxed);
1185 assert_eq!(dropped_count, 1, "Unexpected number of dropped spans");
1186 }
1187
1188 #[test]
1189 fn validate_span_attributes_exported_correctly() {
1190 let exporter = MockSpanExporter::new();
1191 let exporter_shared = exporter.exported_spans.clone();
1192 let config = BatchConfigBuilder::default().build();
1193 let processor = BatchSpanProcessor::new(exporter, config);
1194
1195 let mut span_data = create_test_span("attribute_validation");
1197 span_data.attributes = vec![
1198 KeyValue::new("key1", "value1"),
1199 KeyValue::new("key2", "value2"),
1200 ];
1201 processor.on_end(span_data.clone());
1202
1203 let _ = processor.force_flush();
1205
1206 let exported_spans = exporter_shared.lock().unwrap();
1208 assert_eq!(exported_spans.len(), 1);
1209 let exported_span = &exported_spans[0];
1210 assert!(exported_span
1211 .attributes
1212 .contains(&KeyValue::new("key1", "value1")));
1213 assert!(exported_span
1214 .attributes
1215 .contains(&KeyValue::new("key2", "value2")));
1216 }
1217
1218 #[test]
1219 fn batchspanprocessor_sets_and_exports_with_resource() {
1220 let exporter = MockSpanExporter::new();
1221 let exporter_shared = exporter.exported_spans.clone();
1222 let resource_shared = exporter.exported_resource.clone();
1223 let config = BatchConfigBuilder::default().build();
1224 let mut processor = BatchSpanProcessor::new(exporter, config);
1225
1226 let resource = Resource::new(vec![KeyValue::new("service.name", "test_service")]);
1228 processor.set_resource(&resource);
1229
1230 let test_span = create_test_span("resource_test");
1232 processor.on_end(test_span.clone());
1233
1234 let _ = processor.force_flush();
1236
1237 let exported_spans = exporter_shared.lock().unwrap();
1239 assert_eq!(exported_spans.len(), 1);
1240
1241 let exported_resource = resource_shared.lock().unwrap();
1243 assert!(exported_resource.is_some());
1244 assert_eq!(
1245 exported_resource
1246 .as_ref()
1247 .unwrap()
1248 .get(&Key::new("service.name")),
1249 Some(Value::from("test_service"))
1250 );
1251 }
1252
1253 #[tokio::test(flavor = "current_thread")]
1254 async fn test_batch_processor_current_thread_runtime() {
1255 let exporter = MockSpanExporter::new();
1256 let exporter_shared = exporter.exported_spans.clone();
1257
1258 let config = BatchConfigBuilder::default()
1259 .with_max_queue_size(5)
1260 .with_max_export_batch_size(3)
1261 .with_scheduled_delay(Duration::from_millis(50))
1262 .build();
1263
1264 let processor = BatchSpanProcessor::new(exporter, config);
1265
1266 for _ in 0..4 {
1267 let span = new_test_export_span_data();
1268 processor.on_end(span);
1269 }
1270
1271 tokio::time::sleep(Duration::from_millis(200)).await;
1272
1273 let exported_spans = exporter_shared.lock().unwrap();
1274 assert_eq!(exported_spans.len(), 4);
1275 }
1276
1277 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1278 async fn test_batch_processor_multi_thread_count_1_runtime() {
1279 let exporter = MockSpanExporter::new();
1280 let exporter_shared = exporter.exported_spans.clone();
1281
1282 let config = BatchConfigBuilder::default()
1283 .with_max_queue_size(5)
1284 .with_max_export_batch_size(3)
1285 .with_scheduled_delay(Duration::from_millis(50))
1286 .build();
1287
1288 let processor = BatchSpanProcessor::new(exporter, config);
1289
1290 for _ in 0..4 {
1291 let span = new_test_export_span_data();
1292 processor.on_end(span);
1293 }
1294
1295 tokio::time::sleep(Duration::from_millis(200)).await;
1296
1297 let exported_spans = exporter_shared.lock().unwrap();
1298 assert_eq!(exported_spans.len(), 4);
1299 }
1300
1301 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1302 async fn test_batch_processor_multi_thread() {
1303 let exporter = MockSpanExporter::new();
1304 let exporter_shared = exporter.exported_spans.clone();
1305
1306 let config = BatchConfigBuilder::default()
1307 .with_max_queue_size(20)
1308 .with_max_export_batch_size(5)
1309 .with_scheduled_delay(Duration::from_millis(50))
1310 .build();
1311
1312 let processor = Arc::new(BatchSpanProcessor::new(exporter, config));
1314
1315 let mut handles = vec![];
1316 for _ in 0..10 {
1317 let processor_clone = Arc::clone(&processor);
1318 let handle = tokio::spawn(async move {
1319 let span = new_test_export_span_data();
1320 processor_clone.on_end(span);
1321 });
1322 handles.push(handle);
1323 }
1324
1325 for handle in handles {
1326 handle.await.unwrap();
1327 }
1328
1329 tokio::time::sleep(Duration::from_millis(200)).await;
1331
1332 let exported_spans = exporter_shared.lock().unwrap();
1334 assert_eq!(exported_spans.len(), 10);
1335 }
1336}