opentelemetry_sdk/trace/
span_processor.rs

1//! # OpenTelemetry Span Processor Interface
2//!
3//! Span processor is an interface which allows hooks for span start and end method
4//! invocations. The span processors are invoked only when
5//! [`is_recording`] is true.
6//!
7//! Built-in span processors are responsible for batching and conversion of spans to
8//! exportable representation and passing batches to exporters.
9//!
10//! Span processors can be registered directly on SDK [`TracerProvider`] and they are
11//! invoked in the same order as they were registered.
12//!
13//! All `Tracer` instances created by a `TracerProvider` share the same span processors.
14//! Changes to this collection reflect in all `Tracer` instances.
15//!
16//! The following diagram shows `SpanProcessor`'s relationship to other components
17//! in the SDK:
18//!
19//! ```ascii
20//!   +-----+--------------+   +-----------------------+   +-------------------+
21//!   |     |              |   |                       |   |                   |
22//!   |     |              |   | (Batch)SpanProcessor  |   |    SpanExporter   |
23//!   |     |              +---> (Simple)SpanProcessor +--->  (OTLPExporter)   |
24//!   |     |              |   |                       |   |                   |
25//!   | SDK | Tracer.span()|   +-----------------------+   +-------------------+
26//!   |     | Span.end()   |
27//!   |     |              |
28//!   |     |              |
29//!   |     |              |
30//!   |     |              |
31//!   +-----+--------------+
32//! ```
33//!
34//! [`is_recording`]: opentelemetry::trace::Span::is_recording()
35//! [`TracerProvider`]: opentelemetry::trace::TracerProvider
36
37use crate::error::{OTelSdkError, OTelSdkResult};
38use crate::resource::Resource;
39use crate::trace::Span;
40use crate::trace::{SpanData, SpanExporter};
41use opentelemetry::Context;
42use opentelemetry::{otel_debug, otel_warn};
43use opentelemetry::{otel_error, otel_info};
44use std::cmp::min;
45use std::sync::atomic::{AtomicUsize, Ordering};
46use std::sync::{Arc, Mutex};
47use std::{env, str::FromStr, time::Duration};
48
49use std::sync::atomic::AtomicBool;
50use std::thread;
51use std::time::Instant;
52
53/// Delay interval between two consecutive exports.
54pub(crate) const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY";
55/// Default delay interval between two consecutive exports.
56pub(crate) const OTEL_BSP_SCHEDULE_DELAY_DEFAULT: u64 = 5_000;
57/// Maximum queue size
58pub(crate) const OTEL_BSP_MAX_QUEUE_SIZE: &str = "OTEL_BSP_MAX_QUEUE_SIZE";
59/// Default maximum queue size
60pub(crate) const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
61/// Maximum batch size, must be less than or equal to OTEL_BSP_MAX_QUEUE_SIZE
62pub(crate) const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE";
63/// Default maximum batch size
64pub(crate) const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
65/// Maximum allowed time to export data.
66pub(crate) const OTEL_BSP_EXPORT_TIMEOUT: &str = "OTEL_BSP_EXPORT_TIMEOUT";
67/// Default maximum allowed time to export data.
68pub(crate) const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000;
69/// Environment variable to configure max concurrent exports for batch span
70/// processor.
71pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS: &str = "OTEL_BSP_MAX_CONCURRENT_EXPORTS";
72/// Default max concurrent exports for BSP
73pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT: usize = 1;
74
75/// `SpanProcessor` is an interface which allows hooks for span start and end
76/// method invocations. The span processors are invoked only when is_recording
77/// is true.
78pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
79    /// `on_start` is called when a `Span` is started.  This method is called
80    /// synchronously on the thread that started the span, therefore it should
81    /// not block or throw exceptions.
82    fn on_start(&self, span: &mut Span, cx: &Context);
83    /// `on_end` is called after a `Span` is ended (i.e., the end timestamp is
84    /// already set). This method is called synchronously within the `Span::end`
85    /// API, therefore it should not block or throw an exception.
86    /// TODO - This method should take reference to `SpanData`
87    fn on_end(&self, span: SpanData);
88    /// Force the spans lying in the cache to be exported.
89    fn force_flush(&self) -> OTelSdkResult;
90    /// Shuts down the processor. Called when SDK is shut down. This is an
91    /// opportunity for processors to do any cleanup required.
92    ///
93    /// Implementation should make sure shutdown can be called multiple times.
94    fn shutdown(&self) -> OTelSdkResult;
95    /// Set the resource for the span processor.
96    fn set_resource(&mut self, _resource: &Resource) {}
97}
98
99/// A [SpanProcessor] that passes finished spans to the configured
100/// `SpanExporter`, as soon as they are finished, without any batching. This is
101/// typically useful for debugging and testing. For scenarios requiring higher
102/// performance/throughput, consider using [BatchSpanProcessor].
103/// Spans are exported synchronously
104/// in the same thread that emits the log record.
105/// When using this processor with the OTLP Exporter, the following exporter
106/// features are supported:
107/// - `grpc-tonic`: This requires TracerProvider to be created within a tokio
108///   runtime. Spans can be emitted from any thread, including tokio runtime
109///   threads.
110/// - `reqwest-blocking-client`: TracerProvider may be created anywhere, but
111///   spans must be emitted from a non-tokio runtime thread.
112/// - `reqwest-client`: TracerProvider may be created anywhere, but spans must be
113///   emitted from a tokio runtime thread.
114#[derive(Debug)]
115pub struct SimpleSpanProcessor {
116    exporter: Mutex<Box<dyn SpanExporter>>,
117}
118
119impl SimpleSpanProcessor {
120    /// Create a new [SimpleSpanProcessor] using the provided exporter.
121    pub fn new(exporter: Box<dyn SpanExporter>) -> Self {
122        Self {
123            exporter: Mutex::new(exporter),
124        }
125    }
126}
127
128impl SpanProcessor for SimpleSpanProcessor {
129    fn on_start(&self, _span: &mut Span, _cx: &Context) {
130        // Ignored
131    }
132
133    fn on_end(&self, span: SpanData) {
134        if !span.span_context.is_sampled() {
135            return;
136        }
137
138        let result = self
139            .exporter
140            .lock()
141            .map_err(|_| OTelSdkError::InternalFailure("SimpleSpanProcessor mutex poison".into()))
142            .and_then(|mut exporter| futures_executor::block_on(exporter.export(vec![span])));
143
144        if let Err(err) = result {
145            // TODO: check error type, and log `error` only if the error is user-actionable, else log `debug`
146            otel_debug!(
147                name: "SimpleProcessor.OnEnd.Error",
148                reason = format!("{:?}", err)
149            );
150        }
151    }
152
153    fn force_flush(&self) -> OTelSdkResult {
154        // Nothing to flush for simple span processor.
155        Ok(())
156    }
157
158    fn shutdown(&self) -> OTelSdkResult {
159        if let Ok(mut exporter) = self.exporter.lock() {
160            exporter.shutdown()
161        } else {
162            Err(OTelSdkError::InternalFailure(
163                "SimpleSpanProcessor mutex poison at shutdown".into(),
164            ))
165        }
166    }
167
168    fn set_resource(&mut self, resource: &Resource) {
169        if let Ok(mut exporter) = self.exporter.lock() {
170            exporter.set_resource(resource);
171        }
172    }
173}
174
175/// The `BatchSpanProcessor` collects finished spans in a buffer and exports them
176/// in batches to the configured `SpanExporter`. This processor is ideal for
177/// high-throughput environments, as it minimizes the overhead of exporting spans
178/// individually. It uses a **dedicated background thread** to manage and export spans
179/// asynchronously, ensuring that the application's main execution flow is not blocked.
180///
181/// When using this processor with the OTLP Exporter, the following exporter
182/// features are supported:
183/// - `grpc-tonic`: This requires `TracerProvider` to be created within a tokio
184///   runtime.
185/// - `reqwest-blocking-client`: Works with a regular `main` or `tokio::main`.
186///
187/// In other words, other clients like `reqwest` and `hyper` are not supported.
188/// /// # Example
189///
190/// This example demonstrates how to configure and use the `BatchSpanProcessor`
191/// with a custom configuration. Note that a dedicated thread is used internally
192/// to manage the export process.
193///
194/// ```rust
195/// use opentelemetry::global;
196/// use opentelemetry_sdk::{
197///     trace::{BatchSpanProcessor, BatchConfigBuilder, TracerProvider},
198///     runtime,
199///     testing::trace::NoopSpanExporter,
200/// };
201/// use opentelemetry::trace::Tracer as _;
202/// use opentelemetry::trace::Span;
203/// use std::time::Duration;
204///
205/// fn main() {
206///     // Step 1: Create an exporter (e.g., a No-Op Exporter for demonstration).
207///     let exporter = NoopSpanExporter::new();
208///
209///     // Step 2: Configure the BatchSpanProcessor.
210///     let batch_processor = BatchSpanProcessor::builder(exporter)
211///         .with_batch_config(
212///             BatchConfigBuilder::default()
213///                 .with_max_queue_size(1024) // Buffer up to 1024 spans.
214///                 .with_max_export_batch_size(256) // Export in batches of up to 256 spans.
215///                 .with_scheduled_delay(Duration::from_secs(5)) // Export every 5 seconds.
216///                 .build(),
217///         )
218///         .build();
219///
220///     // Step 3: Set up a TracerProvider with the configured processor.
221///     let provider = TracerProvider::builder()
222///         .with_span_processor(batch_processor)
223///         .build();
224///     global::set_tracer_provider(provider.clone());
225///
226///     // Step 4: Create spans and record operations.
227///     let tracer = global::tracer("example-tracer");
228///     let mut span = tracer.start("example-span");
229///     span.end(); // Mark the span as completed.
230///
231///     // Step 5: Ensure all spans are flushed before exiting.
232///     provider.shutdown();
233/// }
234/// ```
235use std::sync::mpsc::sync_channel;
236use std::sync::mpsc::Receiver;
237use std::sync::mpsc::RecvTimeoutError;
238use std::sync::mpsc::SyncSender;
239
240/// Messages exchanged between the main thread and the background thread.
241#[allow(clippy::large_enum_variant)]
242#[derive(Debug)]
243enum BatchMessage {
244    //ExportSpan(SpanData),
245    ExportSpan(Arc<AtomicBool>),
246    ForceFlush(SyncSender<OTelSdkResult>),
247    Shutdown(SyncSender<OTelSdkResult>),
248    SetResource(Arc<Resource>),
249}
250
251/// The `BatchSpanProcessor` collects finished spans in a buffer and exports them
252/// in batches to the configured `SpanExporter`. This processor is ideal for
253/// high-throughput environments, as it minimizes the overhead of exporting spans
254/// individually. It uses a **dedicated background thread** to manage and export spans
255/// asynchronously, ensuring that the application's main execution flow is not blocked.
256///
257/// This processor supports the following configurations:
258/// - **Queue size**: Maximum number of spans that can be buffered.
259/// - **Batch size**: Maximum number of spans to include in a single export.
260/// - **Scheduled delay**: Frequency at which the batch is exported.
261///
262/// When using this processor with the OTLP Exporter, the following exporter
263/// features are supported:
264/// - `grpc-tonic`: Requires `TracerProvider` to be created within a tokio runtime.
265/// - `reqwest-blocking-client`: Works with a regular `main` or `tokio::main`.
266///
267/// In other words, other clients like `reqwest` and `hyper` are not supported.
268///
269/// `BatchSpanProcessor` buffers spans in memory and exports them in batches. An
270/// export is triggered when `max_export_batch_size` is reached or every
271/// `scheduled_delay` milliseconds. Users can explicitly trigger an export using
272/// the `force_flush` method. Shutdown also triggers an export of all buffered
273/// spans and is recommended to be called before the application exits to ensure
274/// all buffered spans are exported.
275///
276/// **Warning**: When using tokio's current-thread runtime, `shutdown()`, which
277/// is a blocking call ,should not be called from your main thread. This can
278/// cause deadlock. Instead, call `shutdown()` from a separate thread or use
279/// tokio's `spawn_blocking`.
280///
281/// [`shutdown()`]: crate::trace::TracerProvider::shutdown
282/// [`force_flush()`]: crate::trace::TracerProvider::force_flush
283#[derive(Debug)]
284pub struct BatchSpanProcessor {
285    span_sender: SyncSender<SpanData>, // Data channel to store spans
286    message_sender: SyncSender<BatchMessage>, // Control channel to store control messages.
287    handle: Mutex<Option<thread::JoinHandle<()>>>,
288    forceflush_timeout: Duration,
289    shutdown_timeout: Duration,
290    is_shutdown: AtomicBool,
291    dropped_span_count: Arc<AtomicUsize>,
292    export_span_message_sent: Arc<AtomicBool>,
293    current_batch_size: Arc<AtomicUsize>,
294    max_export_batch_size: usize,
295    max_queue_size: usize,
296}
297
298impl BatchSpanProcessor {
299    /// Creates a new instance of `BatchSpanProcessor`.
300    pub fn new<E>(
301        mut exporter: E,
302        config: BatchConfig,
303        //max_queue_size: usize,
304        //scheduled_delay: Duration,
305        //shutdown_timeout: Duration,
306    ) -> Self
307    where
308        E: SpanExporter + Send + 'static,
309    {
310        let (span_sender, span_receiver) = sync_channel::<SpanData>(config.max_queue_size);
311        let (message_sender, message_receiver) = sync_channel::<BatchMessage>(64); // Is this a reasonable bound?
312        let max_queue_size = config.max_queue_size;
313        let max_export_batch_size = config.max_export_batch_size;
314        let current_batch_size = Arc::new(AtomicUsize::new(0));
315        let current_batch_size_for_thread = current_batch_size.clone();
316
317        let handle = thread::Builder::new()
318            .name("OpenTelemetry.Traces.BatchProcessor".to_string())
319            .spawn(move || {
320                otel_info!(
321                    name: "BatchSpanProcessor.ThreadStarted",
322                    interval_in_millisecs = config.scheduled_delay.as_millis(),
323                    max_export_batch_size = config.max_export_batch_size,
324                    max_queue_size = config.max_queue_size,
325                );
326                let mut spans = Vec::with_capacity(config.max_export_batch_size);
327                let mut last_export_time = Instant::now();
328                let current_batch_size = current_batch_size_for_thread;
329                loop {
330                    let remaining_time_option = config
331                        .scheduled_delay
332                        .checked_sub(last_export_time.elapsed());
333                    let remaining_time = match remaining_time_option {
334                        Some(remaining_time) => remaining_time,
335                        None => config.scheduled_delay,
336                    };
337                    match message_receiver.recv_timeout(remaining_time) {
338                        Ok(message) => match message {
339                            BatchMessage::ExportSpan(export_span_message_sent) => {
340                                // Reset the export span message sent flag now it has has been processed.
341                                export_span_message_sent.store(false, Ordering::Relaxed);
342                                otel_debug!(
343                                    name: "BatchSpanProcessor.ExportingDueToBatchSize",
344                                );
345                                let _ = Self::get_spans_and_export(
346                                    &span_receiver,
347                                    &mut exporter,
348                                    &mut spans,
349                                    &mut last_export_time,
350                                    &current_batch_size,
351                                    &config,
352                                );
353                            }
354                            BatchMessage::ForceFlush(sender) => {
355                                otel_debug!(name: "BatchSpanProcessor.ExportingDueToForceFlush");
356                                let result = Self::get_spans_and_export(
357                                    &span_receiver,
358                                    &mut exporter,
359                                    &mut spans,
360                                    &mut last_export_time,
361                                    &current_batch_size,
362                                    &config,
363                                );
364                                let _ = sender.send(result);
365                            }
366                            BatchMessage::Shutdown(sender) => {
367                                otel_debug!(name: "BatchSpanProcessor.ExportingDueToShutdown");
368                                let result = Self::get_spans_and_export(
369                                    &span_receiver,
370                                    &mut exporter,
371                                    &mut spans,
372                                    &mut last_export_time,
373                                    &current_batch_size,
374                                    &config,
375                                );
376                                let _ = sender.send(result);
377
378                                otel_debug!(
379                                    name: "BatchSpanProcessor.ThreadExiting",
380                                    reason = "ShutdownRequested"
381                                );
382                                //
383                                // break out the loop and return from the current background thread.
384                                //
385                                break;
386                            }
387                            BatchMessage::SetResource(resource) => {
388                                exporter.set_resource(&resource);
389                            }
390                        },
391                        Err(RecvTimeoutError::Timeout) => {
392                            otel_debug!(
393                                name: "BatchSpanProcessor.ExportingDueToTimer",
394                            );
395
396                            let _ = Self::get_spans_and_export(
397                                &span_receiver,
398                                &mut exporter,
399                                &mut spans,
400                                &mut last_export_time,
401                                &current_batch_size,
402                                &config,
403                            );
404                        }
405                        Err(RecvTimeoutError::Disconnected) => {
406                            // Channel disconnected, only thing to do is break
407                            // out (i.e exit the thread)
408                            otel_debug!(
409                                name: "BatchSpanProcessor.ThreadExiting",
410                                reason = "MessageSenderDisconnected"
411                            );
412                            break;
413                        }
414                    }
415                }
416                otel_info!(
417                    name: "BatchSpanProcessor.ThreadStopped"
418                );
419            })
420            .expect("Failed to spawn thread"); //TODO: Handle thread spawn failure
421
422        Self {
423            span_sender,
424            message_sender,
425            handle: Mutex::new(Some(handle)),
426            forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
427            shutdown_timeout: Duration::from_secs(5),   // TODO: make this configurable
428            is_shutdown: AtomicBool::new(false),
429            dropped_span_count: Arc::new(AtomicUsize::new(0)),
430            max_queue_size,
431            export_span_message_sent: Arc::new(AtomicBool::new(false)),
432            current_batch_size,
433            max_export_batch_size,
434        }
435    }
436
437    /// builder
438    pub fn builder<E>(exporter: E) -> BatchSpanProcessorBuilder<E>
439    where
440        E: SpanExporter + Send + 'static,
441    {
442        BatchSpanProcessorBuilder {
443            exporter,
444            config: BatchConfig::default(),
445        }
446    }
447
448    // This method gets upto `max_export_batch_size` amount of spans from the channel and exports them.
449    // It returns the result of the export operation.
450    // It expects the span vec to be empty when it's called.
451    #[inline]
452    fn get_spans_and_export<E>(
453        spans_receiver: &Receiver<SpanData>,
454        exporter: &mut E,
455        spans: &mut Vec<SpanData>,
456        last_export_time: &mut Instant,
457        current_batch_size: &AtomicUsize,
458        config: &BatchConfig,
459    ) -> OTelSdkResult
460    where
461        E: SpanExporter + Send + Sync + 'static,
462    {
463        // Get upto `max_export_batch_size` amount of spans from the channel and push them to the span vec
464        while let Ok(span) = spans_receiver.try_recv() {
465            spans.push(span);
466            if spans.len() == config.max_export_batch_size {
467                break;
468            }
469        }
470
471        let count_of_spans = spans.len(); // Count of spans that will be exported
472        let result = Self::export_batch_sync(exporter, spans, last_export_time); // This method clears the spans vec after exporting
473
474        current_batch_size.fetch_sub(count_of_spans, Ordering::Relaxed);
475        result
476    }
477
478    #[allow(clippy::vec_box)]
479    fn export_batch_sync<E>(
480        exporter: &mut E,
481        batch: &mut Vec<SpanData>,
482        last_export_time: &mut Instant,
483    ) -> OTelSdkResult
484    where
485        E: SpanExporter + Send + Sync + 'static,
486    {
487        *last_export_time = Instant::now();
488
489        if batch.is_empty() {
490            return OTelSdkResult::Ok(());
491        }
492
493        let export = exporter.export(batch.split_off(0));
494        let export_result = futures_executor::block_on(export);
495
496        match export_result {
497            Ok(_) => OTelSdkResult::Ok(()),
498            Err(err) => {
499                otel_error!(
500                    name: "BatchSpanProcessor.ExportError",
501                    error = format!("{}", err)
502                );
503                Err(OTelSdkError::InternalFailure(err.to_string()))
504            }
505        }
506    }
507}
508
509impl SpanProcessor for BatchSpanProcessor {
510    /// Handles span start.
511    fn on_start(&self, _span: &mut Span, _cx: &Context) {
512        // Ignored
513    }
514
515    /// Handles span end.
516    fn on_end(&self, span: SpanData) {
517        if self.is_shutdown.load(Ordering::Relaxed) {
518            // this is a warning, as the user is trying to emit after the processor has been shutdown
519            otel_warn!(
520                name: "BatchSpanProcessor.Emit.ProcessorShutdown",
521                message = "BatchSpanProcessor has been shutdown. No further spans will be emitted."
522            );
523            return;
524        }
525        let result = self.span_sender.try_send(span);
526
527        if result.is_err() {
528            // Increment dropped span count. The first time we have to drop a span,
529            // emit a warning.
530            if self.dropped_span_count.fetch_add(1, Ordering::Relaxed) == 0 {
531                otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
532                    message = "BatchSpanProcessor dropped a Span due to queue full/internal errors. No further internal log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total Spans dropped.");
533            }
534        }
535        // At this point, sending the span to the data channel was successful.
536        // Increment the current batch size and check if it has reached the max export batch size.
537        if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size
538        {
539            // Check if the a control message for exporting spans is already sent to the worker thread.
540            // If not, send a control message to export spans.
541            // `export_span_message_sent` is set to false ONLY when the worker thread has processed the control message.
542
543            if !self.export_span_message_sent.load(Ordering::Relaxed) {
544                // This is a cost-efficient check as atomic load operations do not require exclusive access to cache line.
545                // Perform atomic swap to `export_span_message_sent` ONLY when the atomic load operation above returns false.
546                // Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures.
547                // We could have used compare_exchange as well here, but it's more verbose than swap.
548                if !self.export_span_message_sent.swap(true, Ordering::Relaxed) {
549                    match self.message_sender.try_send(BatchMessage::ExportSpan(
550                        self.export_span_message_sent.clone(),
551                    )) {
552                        Ok(_) => {
553                            // Control message sent successfully.
554                        }
555                        Err(_err) => {
556                            // TODO: Log error
557                            // If the control message could not be sent, reset the `export_span_message_sent` flag.
558                            self.export_span_message_sent
559                                .store(false, Ordering::Relaxed);
560                        }
561                    }
562                }
563            }
564        }
565    }
566
567    /// Flushes all pending spans.
568    fn force_flush(&self) -> OTelSdkResult {
569        if self.is_shutdown.load(Ordering::Relaxed) {
570            return Err(OTelSdkError::AlreadyShutdown);
571        }
572        let (sender, receiver) = sync_channel(1);
573        self.message_sender
574            .try_send(BatchMessage::ForceFlush(sender))
575            .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
576
577        receiver
578            .recv_timeout(self.forceflush_timeout)
579            .map_err(|_| OTelSdkError::Timeout(self.forceflush_timeout))?
580    }
581
582    /// Shuts down the processor.
583    fn shutdown(&self) -> OTelSdkResult {
584        if self.is_shutdown.swap(true, Ordering::Relaxed) {
585            return Err(OTelSdkError::AlreadyShutdown);
586        }
587        let dropped_spans = self.dropped_span_count.load(Ordering::Relaxed);
588        let max_queue_size = self.max_queue_size;
589        if dropped_spans > 0 {
590            otel_warn!(
591                name: "BatchSpanProcessor.SpansDropped",
592                dropped_span_count = dropped_spans,
593                max_queue_size = max_queue_size,
594                message = "Spans were dropped due to a queue being full or other error. The count represents the total count of spans dropped in the lifetime of this BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals."
595            );
596        }
597
598        let (sender, receiver) = sync_channel(1);
599        self.message_sender
600            .try_send(BatchMessage::Shutdown(sender))
601            .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
602
603        let result = receiver
604            .recv_timeout(self.shutdown_timeout)
605            .map_err(|_| OTelSdkError::Timeout(self.shutdown_timeout))?;
606        if let Some(handle) = self.handle.lock().unwrap().take() {
607            if let Err(err) = handle.join() {
608                return Err(OTelSdkError::InternalFailure(format!(
609                    "Background thread failed to join during shutdown. This may indicate a panic or unexpected termination: {:?}",
610                    err
611                )));
612            }
613        }
614        result
615    }
616
617    /// Set the resource for the processor.
618    fn set_resource(&mut self, resource: &Resource) {
619        let resource = Arc::new(resource.clone());
620        let _ = self
621            .message_sender
622            .try_send(BatchMessage::SetResource(resource));
623    }
624}
625
626/// Builder for `BatchSpanProcessorDedicatedThread`.
627#[derive(Debug, Default)]
628pub struct BatchSpanProcessorBuilder<E>
629where
630    E: SpanExporter + Send + 'static,
631{
632    exporter: E,
633    config: BatchConfig,
634}
635
636impl<E> BatchSpanProcessorBuilder<E>
637where
638    E: SpanExporter + Send + 'static,
639{
640    /// Set the BatchConfig for [BatchSpanProcessorBuilder]
641    pub fn with_batch_config(self, config: BatchConfig) -> Self {
642        BatchSpanProcessorBuilder { config, ..self }
643    }
644
645    /// Build a new instance of `BatchSpanProcessor`.
646    pub fn build(self) -> BatchSpanProcessor {
647        BatchSpanProcessor::new(self.exporter, self.config)
648    }
649}
650
651/// Batch span processor configuration.
652/// Use [`BatchConfigBuilder`] to configure your own instance of [`BatchConfig`].
653#[derive(Debug)]
654pub struct BatchConfig {
655    /// The maximum queue size to buffer spans for delayed processing. If the
656    /// queue gets full it drops the spans. The default value of is 2048.
657    pub(crate) max_queue_size: usize,
658
659    /// The delay interval in milliseconds between two consecutive processing
660    /// of batches. The default value is 5 seconds.
661    pub(crate) scheduled_delay: Duration,
662
663    #[allow(dead_code)]
664    /// The maximum number of spans to process in a single batch. If there are
665    /// more than one batch worth of spans then it processes multiple batches
666    /// of spans one batch after the other without any delay. The default value
667    /// is 512.
668    pub(crate) max_export_batch_size: usize,
669
670    #[allow(dead_code)]
671    /// The maximum duration to export a batch of data.
672    pub(crate) max_export_timeout: Duration,
673
674    #[allow(dead_code)]
675    /// Maximum number of concurrent exports
676    ///
677    /// Limits the number of spawned tasks for exports and thus memory consumed
678    /// by an exporter. A value of 1 will cause exports to be performed
679    /// synchronously on the BatchSpanProcessor task.
680    pub(crate) max_concurrent_exports: usize,
681}
682
683impl Default for BatchConfig {
684    fn default() -> Self {
685        BatchConfigBuilder::default().build()
686    }
687}
688
689/// A builder for creating [`BatchConfig`] instances.
690#[derive(Debug)]
691pub struct BatchConfigBuilder {
692    max_queue_size: usize,
693    scheduled_delay: Duration,
694    max_export_batch_size: usize,
695    max_export_timeout: Duration,
696    max_concurrent_exports: usize,
697}
698
699impl Default for BatchConfigBuilder {
700    /// Create a new [`BatchConfigBuilder`] initialized with default batch config values as per the specs.
701    /// The values are overriden by environment variables if set.
702    /// The supported environment variables are:
703    /// * `OTEL_BSP_MAX_QUEUE_SIZE`
704    /// * `OTEL_BSP_SCHEDULE_DELAY`
705    /// * `OTEL_BSP_MAX_EXPORT_BATCH_SIZE`
706    /// * `OTEL_BSP_EXPORT_TIMEOUT`
707    /// * `OTEL_BSP_MAX_CONCURRENT_EXPORTS`
708    fn default() -> Self {
709        BatchConfigBuilder {
710            max_queue_size: OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
711            scheduled_delay: Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT),
712            max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
713            max_export_timeout: Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT),
714            max_concurrent_exports: OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT,
715        }
716        .init_from_env_vars()
717    }
718}
719
720impl BatchConfigBuilder {
721    /// Set max_queue_size for [`BatchConfigBuilder`].
722    /// It's the maximum queue size to buffer spans for delayed processing.
723    /// If the queue gets full it will drops the spans.
724    /// The default value of is 2048.
725    pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
726        self.max_queue_size = max_queue_size;
727        self
728    }
729
730    /// Set max_export_batch_size for [`BatchConfigBuilder`].
731    /// It's the maximum number of spans to process in a single batch. If there are
732    /// more than one batch worth of spans then it processes multiple batches
733    /// of spans one batch after the other without any delay. The default value
734    /// is 512.
735    pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
736        self.max_export_batch_size = max_export_batch_size;
737        self
738    }
739
740    #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
741    /// Set max_concurrent_exports for [`BatchConfigBuilder`].
742    /// It's the maximum number of concurrent exports.
743    /// Limits the number of spawned tasks for exports and thus memory consumed by an exporter.
744    /// The default value is 1.
745    /// If the max_concurrent_exports value is default value, it will cause exports to be performed
746    /// synchronously on the BatchSpanProcessor task.
747    pub fn with_max_concurrent_exports(mut self, max_concurrent_exports: usize) -> Self {
748        self.max_concurrent_exports = max_concurrent_exports;
749        self
750    }
751
752    /// Set scheduled_delay_duration for [`BatchConfigBuilder`].
753    /// It's the delay interval in milliseconds between two consecutive processing of batches.
754    /// The default value is 5000 milliseconds.
755    pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
756        self.scheduled_delay = scheduled_delay;
757        self
758    }
759
760    /// Set max_export_timeout for [`BatchConfigBuilder`].
761    /// It's the maximum duration to export a batch of data.
762    /// The The default value is 30000 milliseconds.
763    #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
764    pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
765        self.max_export_timeout = max_export_timeout;
766        self
767    }
768
769    /// Builds a `BatchConfig` enforcing the following invariants:
770    /// * `max_export_batch_size` must be less than or equal to `max_queue_size`.
771    pub fn build(self) -> BatchConfig {
772        // max export batch size must be less or equal to max queue size.
773        // we set max export batch size to max queue size if it's larger than max queue size.
774        let max_export_batch_size = min(self.max_export_batch_size, self.max_queue_size);
775
776        BatchConfig {
777            max_queue_size: self.max_queue_size,
778            scheduled_delay: self.scheduled_delay,
779            max_export_timeout: self.max_export_timeout,
780            max_concurrent_exports: self.max_concurrent_exports,
781            max_export_batch_size,
782        }
783    }
784
785    fn init_from_env_vars(mut self) -> Self {
786        if let Some(max_concurrent_exports) = env::var(OTEL_BSP_MAX_CONCURRENT_EXPORTS)
787            .ok()
788            .and_then(|max_concurrent_exports| usize::from_str(&max_concurrent_exports).ok())
789        {
790            self.max_concurrent_exports = max_concurrent_exports;
791        }
792
793        if let Some(max_queue_size) = env::var(OTEL_BSP_MAX_QUEUE_SIZE)
794            .ok()
795            .and_then(|queue_size| usize::from_str(&queue_size).ok())
796        {
797            self.max_queue_size = max_queue_size;
798        }
799
800        if let Some(scheduled_delay) = env::var(OTEL_BSP_SCHEDULE_DELAY)
801            .ok()
802            .and_then(|delay| u64::from_str(&delay).ok())
803        {
804            self.scheduled_delay = Duration::from_millis(scheduled_delay);
805        }
806
807        if let Some(max_export_batch_size) = env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE)
808            .ok()
809            .and_then(|batch_size| usize::from_str(&batch_size).ok())
810        {
811            self.max_export_batch_size = max_export_batch_size;
812        }
813
814        // max export batch size must be less or equal to max queue size.
815        // we set max export batch size to max queue size if it's larger than max queue size.
816        if self.max_export_batch_size > self.max_queue_size {
817            self.max_export_batch_size = self.max_queue_size;
818        }
819
820        if let Some(max_export_timeout) = env::var(OTEL_BSP_EXPORT_TIMEOUT)
821            .ok()
822            .and_then(|timeout| u64::from_str(&timeout).ok())
823        {
824            self.max_export_timeout = Duration::from_millis(max_export_timeout);
825        }
826
827        self
828    }
829}
830
831#[cfg(all(test, feature = "testing", feature = "trace"))]
832mod tests {
833    // cargo test trace::span_processor::tests:: --features=testing
834    use super::{
835        BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT,
836        OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
837        OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT,
838    };
839    use crate::error::OTelSdkResult;
840    use crate::testing::trace::new_test_export_span_data;
841    use crate::trace::span_processor::{
842        OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_CONCURRENT_EXPORTS,
843        OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
844    };
845    use crate::trace::InMemorySpanExporterBuilder;
846    use crate::trace::{BatchConfig, BatchConfigBuilder, SpanEvents, SpanLinks};
847    use crate::trace::{SpanData, SpanExporter};
848    use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status};
849    use std::fmt::Debug;
850    use std::time::Duration;
851
852    #[test]
853    fn simple_span_processor_on_end_calls_export() {
854        let exporter = InMemorySpanExporterBuilder::new().build();
855        let processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
856        let span_data = new_test_export_span_data();
857        processor.on_end(span_data.clone());
858        assert_eq!(exporter.get_finished_spans().unwrap()[0], span_data);
859        let _result = processor.shutdown();
860    }
861
862    #[test]
863    fn simple_span_processor_on_end_skips_export_if_not_sampled() {
864        let exporter = InMemorySpanExporterBuilder::new().build();
865        let processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
866        let unsampled = SpanData {
867            span_context: SpanContext::empty_context(),
868            parent_span_id: SpanId::INVALID,
869            span_kind: SpanKind::Internal,
870            name: "opentelemetry".into(),
871            start_time: opentelemetry::time::now(),
872            end_time: opentelemetry::time::now(),
873            attributes: Vec::new(),
874            dropped_attributes_count: 0,
875            events: SpanEvents::default(),
876            links: SpanLinks::default(),
877            status: Status::Unset,
878            instrumentation_scope: Default::default(),
879        };
880        processor.on_end(unsampled);
881        assert!(exporter.get_finished_spans().unwrap().is_empty());
882    }
883
884    #[test]
885    fn simple_span_processor_shutdown_calls_shutdown() {
886        let exporter = InMemorySpanExporterBuilder::new().build();
887        let processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
888        let span_data = new_test_export_span_data();
889        processor.on_end(span_data.clone());
890        assert!(!exporter.get_finished_spans().unwrap().is_empty());
891        let _result = processor.shutdown();
892        // Assume shutdown is called by ensuring spans are empty in the exporter
893        assert!(exporter.get_finished_spans().unwrap().is_empty());
894    }
895
896    #[test]
897    fn test_default_const_values() {
898        assert_eq!(OTEL_BSP_MAX_QUEUE_SIZE, "OTEL_BSP_MAX_QUEUE_SIZE");
899        assert_eq!(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, 2048);
900        assert_eq!(OTEL_BSP_SCHEDULE_DELAY, "OTEL_BSP_SCHEDULE_DELAY");
901        assert_eq!(OTEL_BSP_SCHEDULE_DELAY_DEFAULT, 5000);
902        assert_eq!(
903            OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
904            "OTEL_BSP_MAX_EXPORT_BATCH_SIZE"
905        );
906        assert_eq!(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
907        assert_eq!(OTEL_BSP_EXPORT_TIMEOUT, "OTEL_BSP_EXPORT_TIMEOUT");
908        assert_eq!(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, 30000);
909    }
910
911    #[test]
912    fn test_default_batch_config_adheres_to_specification() {
913        let env_vars = vec![
914            OTEL_BSP_SCHEDULE_DELAY,
915            OTEL_BSP_EXPORT_TIMEOUT,
916            OTEL_BSP_MAX_QUEUE_SIZE,
917            OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
918            OTEL_BSP_MAX_CONCURRENT_EXPORTS,
919        ];
920
921        let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);
922
923        assert_eq!(
924            config.max_concurrent_exports,
925            OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT
926        );
927        assert_eq!(
928            config.scheduled_delay,
929            Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT)
930        );
931        assert_eq!(
932            config.max_export_timeout,
933            Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT)
934        );
935        assert_eq!(config.max_queue_size, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT);
936        assert_eq!(
937            config.max_export_batch_size,
938            OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT
939        );
940    }
941
942    #[test]
943    fn test_batch_config_configurable_by_env_vars() {
944        let env_vars = vec![
945            (OTEL_BSP_SCHEDULE_DELAY, Some("2000")),
946            (OTEL_BSP_EXPORT_TIMEOUT, Some("60000")),
947            (OTEL_BSP_MAX_QUEUE_SIZE, Some("4096")),
948            (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
949        ];
950
951        let config = temp_env::with_vars(env_vars, BatchConfig::default);
952
953        assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
954        assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
955        assert_eq!(config.max_queue_size, 4096);
956        assert_eq!(config.max_export_batch_size, 1024);
957    }
958
959    #[test]
960    fn test_batch_config_max_export_batch_size_validation() {
961        let env_vars = vec![
962            (OTEL_BSP_MAX_QUEUE_SIZE, Some("256")),
963            (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
964        ];
965
966        let config = temp_env::with_vars(env_vars, BatchConfig::default);
967
968        assert_eq!(config.max_queue_size, 256);
969        assert_eq!(config.max_export_batch_size, 256);
970        assert_eq!(
971            config.scheduled_delay,
972            Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT)
973        );
974        assert_eq!(
975            config.max_export_timeout,
976            Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT)
977        );
978    }
979
980    #[test]
981    fn test_batch_config_with_fields() {
982        let batch = BatchConfigBuilder::default()
983            .with_max_export_batch_size(10)
984            .with_scheduled_delay(Duration::from_millis(10))
985            .with_max_queue_size(10);
986        #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
987        let batch = batch.with_max_concurrent_exports(10);
988        #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
989        let batch = batch.with_max_export_timeout(Duration::from_millis(10));
990        let batch = batch.build();
991        assert_eq!(batch.max_export_batch_size, 10);
992        assert_eq!(batch.scheduled_delay, Duration::from_millis(10));
993        assert_eq!(batch.max_export_timeout, Duration::from_millis(10));
994        assert_eq!(batch.max_concurrent_exports, 10);
995        assert_eq!(batch.max_queue_size, 10);
996    }
997
998    // Helper function to create a default test span
999    fn create_test_span(name: &str) -> SpanData {
1000        SpanData {
1001            span_context: SpanContext::empty_context(),
1002            parent_span_id: SpanId::INVALID,
1003            span_kind: SpanKind::Internal,
1004            name: name.to_string().into(),
1005            start_time: opentelemetry::time::now(),
1006            end_time: opentelemetry::time::now(),
1007            attributes: Vec::new(),
1008            dropped_attributes_count: 0,
1009            events: SpanEvents::default(),
1010            links: SpanLinks::default(),
1011            status: Status::Unset,
1012            instrumentation_scope: Default::default(),
1013        }
1014    }
1015
1016    use crate::Resource;
1017    use futures_util::future::BoxFuture;
1018    use futures_util::FutureExt;
1019    use opentelemetry::{Key, KeyValue, Value};
1020    use std::sync::{atomic::Ordering, Arc, Mutex};
1021
1022    // Mock exporter to test functionality
1023    #[derive(Debug)]
1024    struct MockSpanExporter {
1025        exported_spans: Arc<Mutex<Vec<SpanData>>>,
1026        exported_resource: Arc<Mutex<Option<Resource>>>,
1027    }
1028
1029    impl MockSpanExporter {
1030        fn new() -> Self {
1031            Self {
1032                exported_spans: Arc::new(Mutex::new(Vec::new())),
1033                exported_resource: Arc::new(Mutex::new(None)),
1034            }
1035        }
1036    }
1037
1038    impl SpanExporter for MockSpanExporter {
1039        fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult> {
1040            let exported_spans = self.exported_spans.clone();
1041            async move {
1042                exported_spans.lock().unwrap().extend(batch);
1043                Ok(())
1044            }
1045            .boxed()
1046        }
1047
1048        fn shutdown(&mut self) -> OTelSdkResult {
1049            Ok(())
1050        }
1051        fn set_resource(&mut self, resource: &Resource) {
1052            let mut exported_resource = self.exported_resource.lock().unwrap();
1053            *exported_resource = Some(resource.clone());
1054        }
1055    }
1056
1057    #[test]
1058    fn batchspanprocessor_handles_on_end() {
1059        let exporter = MockSpanExporter::new();
1060        let exporter_shared = exporter.exported_spans.clone();
1061        let config = BatchConfigBuilder::default()
1062            .with_max_queue_size(10)
1063            .with_max_export_batch_size(10)
1064            .with_scheduled_delay(Duration::from_secs(5))
1065            .build();
1066        let processor = BatchSpanProcessor::new(exporter, config);
1067
1068        let test_span = create_test_span("test_span");
1069        processor.on_end(test_span.clone());
1070
1071        // Wait for flush interval to ensure the span is processed
1072        std::thread::sleep(Duration::from_secs(6));
1073
1074        let exported_spans = exporter_shared.lock().unwrap();
1075        assert_eq!(exported_spans.len(), 1);
1076        assert_eq!(exported_spans[0].name, "test_span");
1077    }
1078
1079    #[test]
1080    fn batchspanprocessor_force_flush() {
1081        let exporter = MockSpanExporter::new();
1082        let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans
1083        let config = BatchConfigBuilder::default()
1084            .with_max_queue_size(10)
1085            .with_max_export_batch_size(10)
1086            .with_scheduled_delay(Duration::from_secs(5))
1087            .build();
1088        let processor = BatchSpanProcessor::new(exporter, config);
1089
1090        // Create a test span and send it to the processor
1091        let test_span = create_test_span("force_flush_span");
1092        processor.on_end(test_span.clone());
1093
1094        // Call force_flush to immediately export the spans
1095        let flush_result = processor.force_flush();
1096        assert!(flush_result.is_ok(), "Force flush failed unexpectedly");
1097
1098        // Verify the exported spans in the mock exporter
1099        let exported_spans = exporter_shared.lock().unwrap();
1100        assert_eq!(
1101            exported_spans.len(),
1102            1,
1103            "Unexpected number of exported spans"
1104        );
1105        assert_eq!(exported_spans[0].name, "force_flush_span");
1106    }
1107
1108    #[test]
1109    fn batchspanprocessor_shutdown() {
1110        let exporter = MockSpanExporter::new();
1111        let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans
1112        let config = BatchConfigBuilder::default()
1113            .with_max_queue_size(10)
1114            .with_max_export_batch_size(10)
1115            .with_scheduled_delay(Duration::from_secs(5))
1116            .build();
1117        let processor = BatchSpanProcessor::new(exporter, config);
1118
1119        // Create a test span and send it to the processor
1120        let test_span = create_test_span("shutdown_span");
1121        processor.on_end(test_span.clone());
1122
1123        // Call shutdown to flush and export all pending spans
1124        let shutdown_result = processor.shutdown();
1125        assert!(shutdown_result.is_ok(), "Shutdown failed unexpectedly");
1126
1127        // Verify the exported spans in the mock exporter
1128        let exported_spans = exporter_shared.lock().unwrap();
1129        assert_eq!(
1130            exported_spans.len(),
1131            1,
1132            "Unexpected number of exported spans"
1133        );
1134        assert_eq!(exported_spans[0].name, "shutdown_span");
1135
1136        // Ensure further calls to shutdown are idempotent
1137        let second_shutdown_result = processor.shutdown();
1138        assert!(
1139            second_shutdown_result.is_err(),
1140            "Shutdown should fail when called a second time"
1141        );
1142    }
1143
1144    #[test]
1145    fn batchspanprocessor_handles_dropped_spans() {
1146        let exporter = MockSpanExporter::new();
1147        let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans
1148        let config = BatchConfigBuilder::default()
1149            .with_max_queue_size(2) // Small queue size to test span dropping
1150            .with_scheduled_delay(Duration::from_secs(5))
1151            .build();
1152        let processor = BatchSpanProcessor::new(exporter, config);
1153
1154        // Create test spans and send them to the processor
1155        let span1 = create_test_span("span1");
1156        let span2 = create_test_span("span2");
1157        let span3 = create_test_span("span3"); // This span should be dropped
1158
1159        processor.on_end(span1.clone());
1160        processor.on_end(span2.clone());
1161        processor.on_end(span3.clone()); // This span exceeds the queue size
1162
1163        // Wait for the scheduled delay to expire
1164        std::thread::sleep(Duration::from_secs(3));
1165
1166        let exported_spans = exporter_shared.lock().unwrap();
1167
1168        // Verify that only the first two spans are exported
1169        assert_eq!(
1170            exported_spans.len(),
1171            2,
1172            "Unexpected number of exported spans"
1173        );
1174        assert!(exported_spans.iter().any(|s| s.name == "span1"));
1175        assert!(exported_spans.iter().any(|s| s.name == "span2"));
1176
1177        // Ensure the third span is dropped
1178        assert!(
1179            !exported_spans.iter().any(|s| s.name == "span3"),
1180            "Span3 should have been dropped"
1181        );
1182
1183        // Verify dropped spans count (if accessible in your implementation)
1184        let dropped_count = processor.dropped_span_count.load(Ordering::Relaxed);
1185        assert_eq!(dropped_count, 1, "Unexpected number of dropped spans");
1186    }
1187
1188    #[test]
1189    fn validate_span_attributes_exported_correctly() {
1190        let exporter = MockSpanExporter::new();
1191        let exporter_shared = exporter.exported_spans.clone();
1192        let config = BatchConfigBuilder::default().build();
1193        let processor = BatchSpanProcessor::new(exporter, config);
1194
1195        // Create a span with attributes
1196        let mut span_data = create_test_span("attribute_validation");
1197        span_data.attributes = vec![
1198            KeyValue::new("key1", "value1"),
1199            KeyValue::new("key2", "value2"),
1200        ];
1201        processor.on_end(span_data.clone());
1202
1203        // Force flush to export the span
1204        let _ = processor.force_flush();
1205
1206        // Validate the exported attributes
1207        let exported_spans = exporter_shared.lock().unwrap();
1208        assert_eq!(exported_spans.len(), 1);
1209        let exported_span = &exported_spans[0];
1210        assert!(exported_span
1211            .attributes
1212            .contains(&KeyValue::new("key1", "value1")));
1213        assert!(exported_span
1214            .attributes
1215            .contains(&KeyValue::new("key2", "value2")));
1216    }
1217
1218    #[test]
1219    fn batchspanprocessor_sets_and_exports_with_resource() {
1220        let exporter = MockSpanExporter::new();
1221        let exporter_shared = exporter.exported_spans.clone();
1222        let resource_shared = exporter.exported_resource.clone();
1223        let config = BatchConfigBuilder::default().build();
1224        let mut processor = BatchSpanProcessor::new(exporter, config);
1225
1226        // Set a resource for the processor
1227        let resource = Resource::new(vec![KeyValue::new("service.name", "test_service")]);
1228        processor.set_resource(&resource);
1229
1230        // Create a span and send it to the processor
1231        let test_span = create_test_span("resource_test");
1232        processor.on_end(test_span.clone());
1233
1234        // Force flush to ensure the span is exported
1235        let _ = processor.force_flush();
1236
1237        // Validate spans are exported
1238        let exported_spans = exporter_shared.lock().unwrap();
1239        assert_eq!(exported_spans.len(), 1);
1240
1241        // Validate the resource is correctly set in the exporter
1242        let exported_resource = resource_shared.lock().unwrap();
1243        assert!(exported_resource.is_some());
1244        assert_eq!(
1245            exported_resource
1246                .as_ref()
1247                .unwrap()
1248                .get(&Key::new("service.name")),
1249            Some(Value::from("test_service"))
1250        );
1251    }
1252
1253    #[tokio::test(flavor = "current_thread")]
1254    async fn test_batch_processor_current_thread_runtime() {
1255        let exporter = MockSpanExporter::new();
1256        let exporter_shared = exporter.exported_spans.clone();
1257
1258        let config = BatchConfigBuilder::default()
1259            .with_max_queue_size(5)
1260            .with_max_export_batch_size(3)
1261            .with_scheduled_delay(Duration::from_millis(50))
1262            .build();
1263
1264        let processor = BatchSpanProcessor::new(exporter, config);
1265
1266        for _ in 0..4 {
1267            let span = new_test_export_span_data();
1268            processor.on_end(span);
1269        }
1270
1271        tokio::time::sleep(Duration::from_millis(200)).await;
1272
1273        let exported_spans = exporter_shared.lock().unwrap();
1274        assert_eq!(exported_spans.len(), 4);
1275    }
1276
1277    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1278    async fn test_batch_processor_multi_thread_count_1_runtime() {
1279        let exporter = MockSpanExporter::new();
1280        let exporter_shared = exporter.exported_spans.clone();
1281
1282        let config = BatchConfigBuilder::default()
1283            .with_max_queue_size(5)
1284            .with_max_export_batch_size(3)
1285            .with_scheduled_delay(Duration::from_millis(50))
1286            .build();
1287
1288        let processor = BatchSpanProcessor::new(exporter, config);
1289
1290        for _ in 0..4 {
1291            let span = new_test_export_span_data();
1292            processor.on_end(span);
1293        }
1294
1295        tokio::time::sleep(Duration::from_millis(200)).await;
1296
1297        let exported_spans = exporter_shared.lock().unwrap();
1298        assert_eq!(exported_spans.len(), 4);
1299    }
1300
1301    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1302    async fn test_batch_processor_multi_thread() {
1303        let exporter = MockSpanExporter::new();
1304        let exporter_shared = exporter.exported_spans.clone();
1305
1306        let config = BatchConfigBuilder::default()
1307            .with_max_queue_size(20)
1308            .with_max_export_batch_size(5)
1309            .with_scheduled_delay(Duration::from_millis(50))
1310            .build();
1311
1312        // Create the processor with the thread-safe exporter
1313        let processor = Arc::new(BatchSpanProcessor::new(exporter, config));
1314
1315        let mut handles = vec![];
1316        for _ in 0..10 {
1317            let processor_clone = Arc::clone(&processor);
1318            let handle = tokio::spawn(async move {
1319                let span = new_test_export_span_data();
1320                processor_clone.on_end(span);
1321            });
1322            handles.push(handle);
1323        }
1324
1325        for handle in handles {
1326            handle.await.unwrap();
1327        }
1328
1329        // Allow time for batching and export
1330        tokio::time::sleep(Duration::from_millis(200)).await;
1331
1332        // Verify exported spans
1333        let exported_spans = exporter_shared.lock().unwrap();
1334        assert_eq!(exported_spans.len(), 10);
1335    }
1336}