opentelemetry_sdk/logs/
log_processor.rs

1//! # OpenTelemetry Log Processor Interface
2//!
3//! The `LogProcessor` interface provides hooks for log record processing and
4//! exporting. Log processors receive `LogRecord`s emitted by the SDK's
5//! `Logger` and determine how these records are handled.
6//!
7//! Built-in log processors are responsible for converting logs to exportable
8//! representations and passing them to configured exporters. They can be
9//! registered directly with a `LoggerProvider`.
10//!
11//! ## Types of Log Processors
12//!
13//! - **SimpleLogProcessor**: Forwards log records to the exporter immediately
14//!   after they are emitted. This processor is **synchronous** and is designed
15//!   for debugging or testing purposes. It is **not suitable for production**
16//!   environments due to its lack of batching, performance optimizations, or support
17//!   for high-throughput scenarios.
18//!
19//! - **BatchLogProcessor**: Buffers log records and sends them to the exporter
20//!   in batches. This processor is designed for **production use** in high-throughput
21//!   applications and reduces the overhead of frequent exports by using a background
22//!   thread for batch processing.
23//!
24//! ## Diagram
25//!
26//! ```ascii
27//!   +-----+---------------+   +-----------------------+   +-------------------+
28//!   |     |               |   |                       |   |                   |
29//!   | SDK | Logger.emit() +---> (Simple)LogProcessor  +--->  LogExporter      |
30//!   |     |               |   | (Batch)LogProcessor   +--->  (OTLPExporter)   |
31//!   +-----+---------------+   +-----------------------+   +-------------------+
32//! ```
33
34use crate::error::{OTelSdkError, OTelSdkResult};
35use crate::{
36    logs::{LogBatch, LogExporter, SdkLogRecord},
37    Resource,
38};
39use std::sync::mpsc::{self, RecvTimeoutError, SyncSender};
40
41#[cfg(feature = "spec_unstable_logs_enabled")]
42use opentelemetry::logs::Severity;
43use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn, InstrumentationScope};
44
45use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
46use std::{cmp::min, env, sync::Mutex};
47use std::{
48    fmt::{self, Debug, Formatter},
49    str::FromStr,
50    sync::Arc,
51    thread,
52    time::Duration,
53    time::Instant,
54};
55
56/// Delay interval between two consecutive exports.
57pub(crate) const OTEL_BLRP_SCHEDULE_DELAY: &str = "OTEL_BLRP_SCHEDULE_DELAY";
58/// Default delay interval between two consecutive exports.
59pub(crate) const OTEL_BLRP_SCHEDULE_DELAY_DEFAULT: u64 = 1_000;
60/// Maximum allowed time to export data.
61#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
62pub(crate) const OTEL_BLRP_EXPORT_TIMEOUT: &str = "OTEL_BLRP_EXPORT_TIMEOUT";
63/// Default maximum allowed time to export data.
64#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
65pub(crate) const OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000;
66/// Maximum queue size.
67pub(crate) const OTEL_BLRP_MAX_QUEUE_SIZE: &str = "OTEL_BLRP_MAX_QUEUE_SIZE";
68/// Default maximum queue size.
69pub(crate) const OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
70/// Maximum batch size, must be less than or equal to OTEL_BLRP_MAX_QUEUE_SIZE.
71pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE";
72/// Default maximum batch size.
73pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
74
75/// The interface for plugging into a [`SdkLogger`].
76///
77/// [`SdkLogger`]: crate::logs::SdkLogger
78pub trait LogProcessor: Send + Sync + Debug {
79    /// Called when a log record is ready to processed and exported.
80    ///
81    /// This method receives a mutable reference to `LogRecord`. If the processor
82    /// needs to handle the export asynchronously, it should clone the data to
83    /// ensure it can be safely processed without lifetime issues. Any changes
84    /// made to the log data in this method will be reflected in the next log
85    /// processor in the chain.
86    ///
87    /// # Parameters
88    /// - `record`: A mutable reference to `LogRecord` representing the log record.
89    /// - `instrumentation`: The instrumentation scope associated with the log record.
90    fn emit(&self, data: &mut SdkLogRecord, instrumentation: &InstrumentationScope);
91    /// Force the logs lying in the cache to be exported.
92    fn force_flush(&self) -> OTelSdkResult;
93    /// Shuts down the processor.
94    /// After shutdown returns the log processor should stop processing any logs.
95    /// It's up to the implementation on when to drop the LogProcessor.
96    fn shutdown(&self) -> OTelSdkResult;
97    #[cfg(feature = "spec_unstable_logs_enabled")]
98    /// Check if logging is enabled
99    fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
100        // By default, all logs are enabled
101        true
102    }
103
104    /// Set the resource for the log processor.
105    fn set_resource(&self, _resource: &Resource) {}
106}
107
108/// A [`LogProcessor`] designed for testing and debugging purpose, that immediately
109/// exports log records as they are emitted. Log records are exported synchronously
110/// in the same thread that emits the log record.
111/// When using this processor with the OTLP Exporter, the following exporter
112/// features are supported:
113/// - `grpc-tonic`: This requires LoggerProvider to be created within a tokio
114///   runtime. Logs can be emitted from any thread, including tokio runtime
115///   threads.
116/// - `reqwest-blocking-client`: LoggerProvider may be created anywhere, but
117///   logs must be emitted from a non-tokio runtime thread.
118/// - `reqwest-client`: LoggerProvider may be created anywhere, but logs must be
119///   emitted from a tokio runtime thread.
120///
121/// ## Example
122///
123/// ### Using a SimpleLogProcessor
124///
125/// ```rust
126/// use opentelemetry_sdk::logs::{SimpleLogProcessor, LoggerProvider, LogExporter};
127/// use opentelemetry::global;
128/// use opentelemetry_sdk::logs::InMemoryLogExporter;
129///
130/// let exporter = InMemoryLogExporter::default(); // Replace with an actual exporter
131/// let provider = LoggerProvider::builder()
132///     .with_simple_exporter(exporter)
133///     .build();
134///
135/// ```
136#[derive(Debug)]
137pub struct SimpleLogProcessor<T: LogExporter> {
138    exporter: Mutex<T>,
139    is_shutdown: AtomicBool,
140}
141
142impl<T: LogExporter> SimpleLogProcessor<T> {
143    pub(crate) fn new(exporter: T) -> Self {
144        SimpleLogProcessor {
145            exporter: Mutex::new(exporter),
146            is_shutdown: AtomicBool::new(false),
147        }
148    }
149}
150
151impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
152    fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
153        // noop after shutdown
154        if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
155            // this is a warning, as the user is trying to log after the processor has been shutdown
156            otel_warn!(
157                name: "SimpleLogProcessor.Emit.ProcessorShutdown",
158            );
159            return;
160        }
161
162        let result = self
163            .exporter
164            .lock()
165            .map_err(|_| OTelSdkError::InternalFailure("SimpleLogProcessor mutex poison".into()))
166            .and_then(|exporter| {
167                let log_tuple = &[(record as &SdkLogRecord, instrumentation)];
168                futures_executor::block_on(exporter.export(LogBatch::new(log_tuple)))
169            });
170        // Handle errors with specific static names
171        match result {
172            Err(OTelSdkError::InternalFailure(_)) => {
173                // logging as debug as this is not a user error
174                otel_debug!(
175                    name: "SimpleLogProcessor.Emit.MutexPoisoning",
176                );
177            }
178            Err(err) => {
179                otel_error!(
180                    name: "SimpleLogProcessor.Emit.ExportError",
181                    error = format!("{}",err)
182                );
183            }
184            _ => {}
185        }
186    }
187
188    fn force_flush(&self) -> OTelSdkResult {
189        Ok(())
190    }
191
192    fn shutdown(&self) -> OTelSdkResult {
193        self.is_shutdown
194            .store(true, std::sync::atomic::Ordering::Relaxed);
195        if let Ok(mut exporter) = self.exporter.lock() {
196            exporter.shutdown()
197        } else {
198            Err(OTelSdkError::InternalFailure(
199                "SimpleLogProcessor mutex poison at shutdown".into(),
200            ))
201        }
202    }
203
204    fn set_resource(&self, resource: &Resource) {
205        if let Ok(mut exporter) = self.exporter.lock() {
206            exporter.set_resource(resource);
207        }
208    }
209}
210
211/// Messages sent between application thread and batch log processor's work thread.
212#[allow(clippy::large_enum_variant)]
213#[derive(Debug)]
214enum BatchMessage {
215    /// This is ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`.
216    ExportLog(Arc<AtomicBool>),
217    /// ForceFlush flushes the current buffer to the exporter.
218    ForceFlush(mpsc::SyncSender<OTelSdkResult>),
219    /// Shut down the worker thread, push all logs in buffer to the exporter.
220    Shutdown(mpsc::SyncSender<OTelSdkResult>),
221    /// Set the resource for the exporter.
222    SetResource(Arc<Resource>),
223}
224
225type LogsData = Box<(SdkLogRecord, InstrumentationScope)>;
226
227/// The `BatchLogProcessor` collects finished logs in a buffer and exports them
228/// in batches to the configured `LogExporter`. This processor is ideal for
229/// high-throughput environments, as it minimizes the overhead of exporting logs
230/// individually. It uses a **dedicated background thread** to manage and export logs
231/// asynchronously, ensuring that the application's main execution flow is not blocked.
232///
233/// This processor supports the following configurations:
234/// - **Queue size**: Maximum number of log records that can be buffered.
235/// - **Batch size**: Maximum number of log records to include in a single export.
236/// - **Scheduled delay**: Frequency at which the batch is exported.
237///
238/// When using this processor with the OTLP Exporter, the following exporter
239/// features are supported:
240/// - `grpc-tonic`: Requires `LoggerProvider` to be created within a tokio runtime.
241/// - `reqwest-blocking-client`: Works with a regular `main` or `tokio::main`.
242///
243/// In other words, other clients like `reqwest` and `hyper` are not supported.
244///
245/// `BatchLogProcessor` buffers logs in memory and exports them in batches. An
246/// export is triggered when `max_export_batch_size` is reached or every
247/// `scheduled_delay` milliseconds. Users can explicitly trigger an export using
248/// the `force_flush` method. Shutdown also triggers an export of all buffered
249/// logs and is recommended to be called before the application exits to ensure
250/// all buffered logs are exported.
251///
252/// **Warning**: When using tokio's current-thread runtime, `shutdown()`, which
253/// is a blocking call ,should not be called from your main thread. This can
254/// cause deadlock. Instead, call `shutdown()` from a separate thread or use
255/// tokio's `spawn_blocking`.
256///
257/// [`shutdown()`]: crate::logs::LoggerProvider::shutdown
258/// [`force_flush()`]: crate::logs::LoggerProvider::force_flush
259///
260/// ### Using a BatchLogProcessor:
261///
262/// ```rust
263/// use opentelemetry_sdk::logs::{BatchLogProcessor, BatchConfigBuilder, LoggerProvider};
264/// use opentelemetry::global;
265/// use std::time::Duration;
266/// use opentelemetry_sdk::logs::InMemoryLogExporter;
267///
268/// let exporter = InMemoryLogExporter::default(); // Replace with an actual exporter
269/// let processor = BatchLogProcessor::builder(exporter)
270///     .with_batch_config(
271///         BatchConfigBuilder::default()
272///             .with_max_queue_size(2048)
273///             .with_max_export_batch_size(512)
274///             .with_scheduled_delay(Duration::from_secs(5))
275///             .build(),
276///     )
277///     .build();
278///
279/// let provider = LoggerProvider::builder()
280///     .with_log_processor(processor)
281///     .build();
282///
283pub struct BatchLogProcessor {
284    logs_sender: SyncSender<LogsData>, // Data channel to store log records and instrumentation scopes
285    message_sender: SyncSender<BatchMessage>, // Control channel to store control messages for the worker thread
286    handle: Mutex<Option<thread::JoinHandle<()>>>,
287    forceflush_timeout: Duration,
288    shutdown_timeout: Duration,
289    export_log_message_sent: Arc<AtomicBool>,
290    current_batch_size: Arc<AtomicUsize>,
291    max_export_batch_size: usize,
292
293    // Track dropped logs - we'll log this at shutdown
294    dropped_logs_count: AtomicUsize,
295
296    // Track the maximum queue size that was configured for this processor
297    max_queue_size: usize,
298}
299
300impl Debug for BatchLogProcessor {
301    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
302        f.debug_struct("BatchLogProcessor")
303            .field("message_sender", &self.message_sender)
304            .finish()
305    }
306}
307
308impl LogProcessor for BatchLogProcessor {
309    fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
310        let result = self
311            .logs_sender
312            .try_send(Box::new((record.clone(), instrumentation.clone())));
313
314        // match for result and handle each separately
315        match result {
316            Ok(_) => {
317                // Successfully sent the log record to the data channel.
318                // Increment the current batch size and check if it has reached
319                // the max export batch size.
320                if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1
321                    >= self.max_export_batch_size
322                {
323                    // Check if the a control message for exporting logs is
324                    // already sent to the worker thread. If not, send a control
325                    // message to export logs. `export_log_message_sent` is set
326                    // to false ONLY when the worker thread has processed the
327                    // control message.
328
329                    if !self.export_log_message_sent.load(Ordering::Relaxed) {
330                        // This is a cost-efficient check as atomic load
331                        // operations do not require exclusive access to cache
332                        // line. Perform atomic swap to
333                        // `export_log_message_sent` ONLY when the atomic load
334                        // operation above returns false. Atomic
335                        // swap/compare_exchange operations require exclusive
336                        // access to cache line on most processor architectures.
337                        // We could have used compare_exchange as well here, but
338                        // it's more verbose than swap.
339                        if !self.export_log_message_sent.swap(true, Ordering::Relaxed) {
340                            match self.message_sender.try_send(BatchMessage::ExportLog(
341                                self.export_log_message_sent.clone(),
342                            )) {
343                                Ok(_) => {
344                                    // Control message sent successfully.
345                                }
346                                Err(_err) => {
347                                    // TODO: Log error If the control message
348                                    // could not be sent, reset the
349                                    // `export_log_message_sent` flag.
350                                    self.export_log_message_sent.store(false, Ordering::Relaxed);
351                                }
352                            }
353                        }
354                    }
355                }
356            }
357            Err(mpsc::TrySendError::Full(_)) => {
358                // Increment dropped logs count. The first time we have to drop
359                // a log, emit a warning.
360                if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 {
361                    otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
362                        message = "BatchLogProcessor dropped a LogRecord due to queue full. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
363                }
364            }
365            Err(mpsc::TrySendError::Disconnected(_)) => {
366                // Given background thread is the only receiver, and it's
367                // disconnected, it indicates the thread is shutdown
368                otel_warn!(
369                    name: "BatchLogProcessor.Emit.AfterShutdown",
370                    message = "Logs are being emitted even after Shutdown. This indicates incorrect lifecycle management of OTelLoggerProvider in application. Logs will not be exported."
371                );
372            }
373        }
374    }
375
376    fn force_flush(&self) -> OTelSdkResult {
377        let (sender, receiver) = mpsc::sync_channel(1);
378        match self
379            .message_sender
380            .try_send(BatchMessage::ForceFlush(sender))
381        {
382            Ok(_) => receiver
383                .recv_timeout(self.forceflush_timeout)
384                .map_err(|err| {
385                    if err == RecvTimeoutError::Timeout {
386                        OTelSdkError::Timeout(self.forceflush_timeout)
387                    } else {
388                        OTelSdkError::InternalFailure(format!("{}", err))
389                    }
390                })?,
391            Err(mpsc::TrySendError::Full(_)) => {
392                // If the control message could not be sent, emit a warning.
393                otel_debug!(
394                    name: "BatchLogProcessor.ForceFlush.ControlChannelFull",
395                    message = "Control message to flush the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call."
396                );
397                Err(OTelSdkError::InternalFailure("ForceFlush cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call.".into()))
398            }
399            Err(mpsc::TrySendError::Disconnected(_)) => {
400                // Given background thread is the only receiver, and it's
401                // disconnected, it indicates the thread is shutdown
402                otel_debug!(
403                    name: "BatchLogProcessor.ForceFlush.AlreadyShutdown",
404                    message = "ForceFlush invoked after Shutdown. This will not perform Flush and indicates a incorrect lifecycle management in Application."
405                );
406
407                Err(OTelSdkError::AlreadyShutdown)
408            }
409        }
410    }
411
412    fn shutdown(&self) -> OTelSdkResult {
413        let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
414        let max_queue_size = self.max_queue_size;
415        if dropped_logs > 0 {
416            otel_warn!(
417                name: "BatchLogProcessor.LogsDropped",
418                dropped_logs_count = dropped_logs,
419                max_queue_size = max_queue_size,
420                message = "Logs were dropped due to a queue being full. The count represents the total count of log records dropped in the lifetime of this BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals."
421            );
422        }
423
424        let (sender, receiver) = mpsc::sync_channel(1);
425        match self.message_sender.try_send(BatchMessage::Shutdown(sender)) {
426            Ok(_) => {
427                receiver
428                    .recv_timeout(self.shutdown_timeout)
429                    .map(|_| {
430                        // join the background thread after receiving back the
431                        // shutdown signal
432                        if let Some(handle) = self.handle.lock().unwrap().take() {
433                            handle.join().unwrap();
434                        }
435                        OTelSdkResult::Ok(())
436                    })
437                    .map_err(|err| match err {
438                        RecvTimeoutError::Timeout => {
439                            otel_error!(
440                                name: "BatchLogProcessor.Shutdown.Timeout",
441                                message = "BatchLogProcessor shutdown timing out."
442                            );
443                            OTelSdkError::Timeout(self.shutdown_timeout)
444                        }
445                        _ => {
446                            otel_error!(
447                                name: "BatchLogProcessor.Shutdown.Error",
448                                error = format!("{}", err)
449                            );
450                            OTelSdkError::InternalFailure(format!("{}", err))
451                        }
452                    })?
453            }
454            Err(mpsc::TrySendError::Full(_)) => {
455                // If the control message could not be sent, emit a warning.
456                otel_debug!(
457                    name: "BatchLogProcessor.Shutdown.ControlChannelFull",
458                    message = "Control message to shutdown the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call."
459                );
460                Err(OTelSdkError::InternalFailure("Shutdown cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call.".into()))
461            }
462            Err(mpsc::TrySendError::Disconnected(_)) => {
463                // Given background thread is the only receiver, and it's
464                // disconnected, it indicates the thread is shutdown
465                otel_debug!(
466                    name: "BatchLogProcessor.Shutdown.AlreadyShutdown",
467                    message = "Shutdown is being invoked more than once. This is noop, but indicates a potential issue in the application's lifecycle management."
468                );
469
470                Err(OTelSdkError::AlreadyShutdown)
471            }
472        }
473    }
474
475    fn set_resource(&self, resource: &Resource) {
476        let resource = Arc::new(resource.clone());
477        let _ = self
478            .message_sender
479            .try_send(BatchMessage::SetResource(resource));
480    }
481}
482
483impl BatchLogProcessor {
484    pub(crate) fn new<E>(mut exporter: E, config: BatchConfig) -> Self
485    where
486        E: LogExporter + Send + Sync + 'static,
487    {
488        let (logs_sender, logs_receiver) = mpsc::sync_channel::<LogsData>(config.max_queue_size);
489        let (message_sender, message_receiver) = mpsc::sync_channel::<BatchMessage>(64); // Is this a reasonable bound?
490        let max_queue_size = config.max_queue_size;
491        let max_export_batch_size = config.max_export_batch_size;
492        let current_batch_size = Arc::new(AtomicUsize::new(0));
493        let current_batch_size_for_thread = current_batch_size.clone();
494
495        let handle = thread::Builder::new()
496            .name("OpenTelemetry.Logs.BatchProcessor".to_string())
497            .spawn(move || {
498                otel_info!(
499                    name: "BatchLogProcessor.ThreadStarted",
500                    interval_in_millisecs = config.scheduled_delay.as_millis(),
501                    max_export_batch_size = config.max_export_batch_size,
502                    max_queue_size = max_queue_size,
503                );
504                let mut last_export_time = Instant::now();
505                let mut logs = Vec::with_capacity(config.max_export_batch_size);
506                let current_batch_size = current_batch_size_for_thread;
507
508                // This method gets up to `max_export_batch_size` amount of logs from the channel and exports them.
509                // It returns the result of the export operation.
510                // It expects the logs vec to be empty when it's called.
511                #[inline]
512                fn get_logs_and_export<E>(
513                    logs_receiver: &mpsc::Receiver<LogsData>,
514                    exporter: &E,
515                    logs: &mut Vec<LogsData>,
516                    last_export_time: &mut Instant,
517                    current_batch_size: &AtomicUsize,
518                    config: &BatchConfig,
519                ) -> OTelSdkResult
520                where
521                    E: LogExporter + Send + Sync + 'static,
522                {
523                    let target = current_batch_size.load(Ordering::Relaxed); // `target` is used to determine the stopping criteria for exporting logs.
524                    let mut result = OTelSdkResult::Ok(());
525                    let mut total_exported_logs: usize = 0;
526
527                    while target > 0 && total_exported_logs < target {
528                        // Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
529                        while let Ok(log) = logs_receiver.try_recv() {
530                            logs.push(log);
531                            if logs.len() == config.max_export_batch_size {
532                                break;
533                            }
534                        }
535
536                        let count_of_logs = logs.len(); // Count of logs that will be exported
537                        total_exported_logs += count_of_logs;
538
539                        result = export_batch_sync(exporter, logs, last_export_time); // This method clears the logs vec after exporting
540
541                        current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
542                    }
543                    result
544                }
545
546                loop {
547                    let remaining_time = config
548                        .scheduled_delay
549                        .checked_sub(last_export_time.elapsed())
550                        .unwrap_or(config.scheduled_delay);
551
552                    match message_receiver.recv_timeout(remaining_time) {
553                        Ok(BatchMessage::ExportLog(export_log_message_sent)) => {
554                            // Reset the export log message sent flag now it has has been processed.
555                            export_log_message_sent.store(false, Ordering::Relaxed);
556
557                            otel_debug!(
558                                name: "BatchLogProcessor.ExportingDueToBatchSize",
559                            );
560
561                            let _ = get_logs_and_export(
562                                &logs_receiver,
563                                &exporter,
564                                &mut logs,
565                                &mut last_export_time,
566                                &current_batch_size,
567                                &config,
568                            );
569                        }
570                        Ok(BatchMessage::ForceFlush(sender)) => {
571                            otel_debug!(name: "BatchLogProcessor.ExportingDueToForceFlush");
572                            let result = get_logs_and_export(
573                                &logs_receiver,
574                                &exporter,
575                                &mut logs,
576                                &mut last_export_time,
577                                &current_batch_size,
578                                &config,
579                            );
580                            let _ = sender.send(result);
581                        }
582                        Ok(BatchMessage::Shutdown(sender)) => {
583                            otel_debug!(name: "BatchLogProcessor.ExportingDueToShutdown");
584                            let result = get_logs_and_export(
585                                &logs_receiver,
586                                &exporter,
587                                &mut logs,
588                                &mut last_export_time,
589                                &current_batch_size,
590                                &config,
591                            );
592                            let _ = sender.send(result);
593
594                            otel_debug!(
595                                name: "BatchLogProcessor.ThreadExiting",
596                                reason = "ShutdownRequested"
597                            );
598                            //
599                            // break out the loop and return from the current background thread.
600                            //
601                            break;
602                        }
603                        Ok(BatchMessage::SetResource(resource)) => {
604                            exporter.set_resource(&resource);
605                        }
606                        Err(RecvTimeoutError::Timeout) => {
607                            otel_debug!(
608                                name: "BatchLogProcessor.ExportingDueToTimer",
609                            );
610
611                            let _ = get_logs_and_export(
612                                &logs_receiver,
613                                &exporter,
614                                &mut logs,
615                                &mut last_export_time,
616                                &current_batch_size,
617                                &config,
618                            );
619                        }
620                        Err(RecvTimeoutError::Disconnected) => {
621                            // Channel disconnected, only thing to do is break
622                            // out (i.e exit the thread)
623                            otel_debug!(
624                                name: "BatchLogProcessor.ThreadExiting",
625                                reason = "MessageSenderDisconnected"
626                            );
627                            break;
628                        }
629                    }
630                }
631                otel_info!(
632                    name: "BatchLogProcessor.ThreadStopped"
633                );
634            })
635            .expect("Thread spawn failed."); //TODO: Handle thread spawn failure
636
637        // Return batch processor with link to worker
638        BatchLogProcessor {
639            logs_sender,
640            message_sender,
641            handle: Mutex::new(Some(handle)),
642            forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
643            shutdown_timeout: Duration::from_secs(5),   // TODO: make this configurable
644            dropped_logs_count: AtomicUsize::new(0),
645            max_queue_size,
646            export_log_message_sent: Arc::new(AtomicBool::new(false)),
647            current_batch_size,
648            max_export_batch_size,
649        }
650    }
651
652    /// Create a new batch processor builder
653    pub fn builder<E>(exporter: E) -> BatchLogProcessorBuilder<E>
654    where
655        E: LogExporter,
656    {
657        BatchLogProcessorBuilder {
658            exporter,
659            config: Default::default(),
660        }
661    }
662}
663
664#[allow(clippy::vec_box)]
665fn export_batch_sync<E>(
666    exporter: &E,
667    batch: &mut Vec<Box<(SdkLogRecord, InstrumentationScope)>>,
668    last_export_time: &mut Instant,
669) -> OTelSdkResult
670where
671    E: LogExporter + ?Sized,
672{
673    *last_export_time = Instant::now();
674
675    if batch.is_empty() {
676        return OTelSdkResult::Ok(());
677    }
678
679    let export = exporter.export(LogBatch::new_with_owned_data(batch.as_slice()));
680    let export_result = futures_executor::block_on(export);
681
682    // Clear the batch vec after exporting
683    batch.clear();
684
685    match export_result {
686        Ok(_) => OTelSdkResult::Ok(()),
687        Err(err) => {
688            otel_error!(
689                name: "BatchLogProcessor.ExportError",
690                error = format!("{}", err)
691            );
692            OTelSdkResult::Err(err)
693        }
694    }
695}
696
697///
698/// A builder for creating [`BatchLogProcessor`] instances.
699///
700#[derive(Debug)]
701pub struct BatchLogProcessorBuilder<E> {
702    exporter: E,
703    config: BatchConfig,
704}
705
706impl<E> BatchLogProcessorBuilder<E>
707where
708    E: LogExporter + 'static,
709{
710    /// Set the BatchConfig for [`BatchLogProcessorBuilder`]
711    pub fn with_batch_config(self, config: BatchConfig) -> Self {
712        BatchLogProcessorBuilder { config, ..self }
713    }
714
715    /// Build a batch processor
716    pub fn build(self) -> BatchLogProcessor {
717        BatchLogProcessor::new(self.exporter, self.config)
718    }
719}
720
721/// Batch log processor configuration.
722/// Use [`BatchConfigBuilder`] to configure your own instance of [`BatchConfig`].
723#[derive(Debug)]
724#[allow(dead_code)]
725pub struct BatchConfig {
726    /// The maximum queue size to buffer logs for delayed processing. If the
727    /// queue gets full it drops the logs. The default value of is 2048.
728    pub(crate) max_queue_size: usize,
729
730    /// The delay interval in milliseconds between two consecutive processing
731    /// of batches. The default value is 1 second.
732    pub(crate) scheduled_delay: Duration,
733
734    /// The maximum number of logs to process in a single batch. If there are
735    /// more than one batch worth of logs then it processes multiple batches
736    /// of logs one batch after the other without any delay. The default value
737    /// is 512.
738    pub(crate) max_export_batch_size: usize,
739
740    /// The maximum duration to export a batch of data.
741    #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
742    pub(crate) max_export_timeout: Duration,
743}
744
745impl Default for BatchConfig {
746    fn default() -> Self {
747        BatchConfigBuilder::default().build()
748    }
749}
750
751/// A builder for creating [`BatchConfig`] instances.
752#[derive(Debug)]
753pub struct BatchConfigBuilder {
754    max_queue_size: usize,
755    scheduled_delay: Duration,
756    max_export_batch_size: usize,
757    #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
758    max_export_timeout: Duration,
759}
760
761impl Default for BatchConfigBuilder {
762    /// Create a new [`BatchConfigBuilder`] initialized with default batch config values as per the specs.
763    /// The values are overridden by environment variables if set.
764    /// The supported environment variables are:
765    /// * `OTEL_BLRP_MAX_QUEUE_SIZE`
766    /// * `OTEL_BLRP_SCHEDULE_DELAY`
767    /// * `OTEL_BLRP_MAX_EXPORT_BATCH_SIZE`
768    /// * `OTEL_BLRP_EXPORT_TIMEOUT`
769    fn default() -> Self {
770        BatchConfigBuilder {
771            max_queue_size: OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT,
772            scheduled_delay: Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT),
773            max_export_batch_size: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
774            #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
775            max_export_timeout: Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT),
776        }
777        .init_from_env_vars()
778    }
779}
780
781impl BatchConfigBuilder {
782    /// Set max_queue_size for [`BatchConfigBuilder`].
783    /// It's the maximum queue size to buffer logs for delayed processing.
784    /// If the queue gets full it will drop the logs.
785    /// The default value of is 2048.
786    pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
787        self.max_queue_size = max_queue_size;
788        self
789    }
790
791    /// Set scheduled_delay for [`BatchConfigBuilder`].
792    /// It's the delay interval in milliseconds between two consecutive processing of batches.
793    /// The default value is 1000 milliseconds.
794    pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
795        self.scheduled_delay = scheduled_delay;
796        self
797    }
798
799    /// Set max_export_timeout for [`BatchConfigBuilder`].
800    /// It's the maximum duration to export a batch of data.
801    /// The default value is 30000 milliseconds.
802    #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
803    pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
804        self.max_export_timeout = max_export_timeout;
805        self
806    }
807
808    /// Set max_export_batch_size for [`BatchConfigBuilder`].
809    /// It's the maximum number of logs to process in a single batch. If there are
810    /// more than one batch worth of logs then it processes multiple batches
811    /// of logs one batch after the other without any delay.
812    /// The default value is 512.
813    pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
814        self.max_export_batch_size = max_export_batch_size;
815        self
816    }
817
818    /// Builds a `BatchConfig` enforcing the following invariants:
819    /// * `max_export_batch_size` must be less than or equal to `max_queue_size`.
820    pub fn build(self) -> BatchConfig {
821        // max export batch size must be less or equal to max queue size.
822        // we set max export batch size to max queue size if it's larger than max queue size.
823        let max_export_batch_size = min(self.max_export_batch_size, self.max_queue_size);
824
825        BatchConfig {
826            max_queue_size: self.max_queue_size,
827            scheduled_delay: self.scheduled_delay,
828            #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
829            max_export_timeout: self.max_export_timeout,
830            max_export_batch_size,
831        }
832    }
833
834    fn init_from_env_vars(mut self) -> Self {
835        if let Some(max_queue_size) = env::var(OTEL_BLRP_MAX_QUEUE_SIZE)
836            .ok()
837            .and_then(|queue_size| usize::from_str(&queue_size).ok())
838        {
839            self.max_queue_size = max_queue_size;
840        }
841
842        if let Some(max_export_batch_size) = env::var(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE)
843            .ok()
844            .and_then(|batch_size| usize::from_str(&batch_size).ok())
845        {
846            self.max_export_batch_size = max_export_batch_size;
847        }
848
849        if let Some(scheduled_delay) = env::var(OTEL_BLRP_SCHEDULE_DELAY)
850            .ok()
851            .and_then(|delay| u64::from_str(&delay).ok())
852        {
853            self.scheduled_delay = Duration::from_millis(scheduled_delay);
854        }
855
856        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
857        if let Some(max_export_timeout) = env::var(OTEL_BLRP_EXPORT_TIMEOUT)
858            .ok()
859            .and_then(|s| u64::from_str(&s).ok())
860        {
861            self.max_export_timeout = Duration::from_millis(max_export_timeout);
862        }
863
864        self
865    }
866}
867
868#[cfg(all(test, feature = "testing", feature = "logs"))]
869mod tests {
870    use super::{
871        BatchLogProcessor, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, OTEL_BLRP_MAX_QUEUE_SIZE,
872        OTEL_BLRP_SCHEDULE_DELAY,
873    };
874    #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
875    use super::{OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT};
876    use crate::logs::{LogBatch, LogExporter, SdkLogRecord};
877    use crate::{
878        error::OTelSdkResult,
879        logs::{
880            log_processor::{
881                OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT,
882                OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
883            },
884            BatchConfig, BatchConfigBuilder, InMemoryLogExporter, InMemoryLogExporterBuilder,
885            LogProcessor, SdkLoggerProvider, SimpleLogProcessor,
886        },
887        Resource,
888    };
889    use opentelemetry::logs::AnyValue;
890    use opentelemetry::logs::LogRecord as _;
891    use opentelemetry::logs::{Logger, LoggerProvider};
892    use opentelemetry::KeyValue;
893    use opentelemetry::{InstrumentationScope, Key};
894    use std::sync::atomic::{AtomicUsize, Ordering};
895    use std::sync::{Arc, Mutex};
896    use std::time::Duration;
897
898    #[derive(Debug, Clone)]
899    struct MockLogExporter {
900        resource: Arc<Mutex<Option<Resource>>>,
901    }
902
903    impl LogExporter for MockLogExporter {
904        #[allow(clippy::manual_async_fn)]
905        fn export(
906            &self,
907            _batch: LogBatch<'_>,
908        ) -> impl std::future::Future<Output = OTelSdkResult> + Send {
909            async { Ok(()) }
910        }
911
912        fn shutdown(&mut self) -> OTelSdkResult {
913            Ok(())
914        }
915
916        fn set_resource(&mut self, resource: &Resource) {
917            self.resource
918                .lock()
919                .map(|mut res_opt| {
920                    res_opt.replace(resource.clone());
921                })
922                .expect("mock log exporter shouldn't error when setting resource");
923        }
924    }
925
926    // Implementation specific to the MockLogExporter, not part of the LogExporter trait
927    impl MockLogExporter {
928        fn get_resource(&self) -> Option<Resource> {
929            (*self.resource).lock().unwrap().clone()
930        }
931    }
932
933    #[test]
934    fn test_default_const_values() {
935        assert_eq!(OTEL_BLRP_SCHEDULE_DELAY, "OTEL_BLRP_SCHEDULE_DELAY");
936        assert_eq!(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, 1_000);
937        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
938        assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT, "OTEL_BLRP_EXPORT_TIMEOUT");
939        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
940        assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, 30_000);
941        assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE, "OTEL_BLRP_MAX_QUEUE_SIZE");
942        assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, 2_048);
943        assert_eq!(
944            OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
945            "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"
946        );
947        assert_eq!(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
948    }
949
950    #[test]
951    fn test_default_batch_config_adheres_to_specification() {
952        // The following environment variables are expected to be unset so that their default values are used.
953        let env_vars = vec![
954            OTEL_BLRP_SCHEDULE_DELAY,
955            #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
956            OTEL_BLRP_EXPORT_TIMEOUT,
957            OTEL_BLRP_MAX_QUEUE_SIZE,
958            OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
959        ];
960
961        let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);
962
963        assert_eq!(
964            config.scheduled_delay,
965            Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
966        );
967        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
968        assert_eq!(
969            config.max_export_timeout,
970            Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT)
971        );
972        assert_eq!(config.max_queue_size, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT);
973        assert_eq!(
974            config.max_export_batch_size,
975            OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT
976        );
977    }
978
979    #[test]
980    fn test_batch_config_configurable_by_env_vars() {
981        let env_vars = vec![
982            (OTEL_BLRP_SCHEDULE_DELAY, Some("2000")),
983            #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
984            (OTEL_BLRP_EXPORT_TIMEOUT, Some("60000")),
985            (OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")),
986            (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
987        ];
988
989        let config = temp_env::with_vars(env_vars, BatchConfig::default);
990
991        assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
992        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
993        assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
994        assert_eq!(config.max_queue_size, 4096);
995        assert_eq!(config.max_export_batch_size, 1024);
996    }
997
998    #[test]
999    fn test_batch_config_max_export_batch_size_validation() {
1000        let env_vars = vec![
1001            (OTEL_BLRP_MAX_QUEUE_SIZE, Some("256")),
1002            (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
1003        ];
1004
1005        let config = temp_env::with_vars(env_vars, BatchConfig::default);
1006
1007        assert_eq!(config.max_queue_size, 256);
1008        assert_eq!(config.max_export_batch_size, 256);
1009        assert_eq!(
1010            config.scheduled_delay,
1011            Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
1012        );
1013        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
1014        assert_eq!(
1015            config.max_export_timeout,
1016            Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT)
1017        );
1018    }
1019
1020    #[test]
1021    fn test_batch_config_with_fields() {
1022        let batch_builder = BatchConfigBuilder::default()
1023            .with_max_export_batch_size(1)
1024            .with_scheduled_delay(Duration::from_millis(2))
1025            .with_max_queue_size(4);
1026
1027        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
1028        let batch_builder = batch_builder.with_max_export_timeout(Duration::from_millis(3));
1029        let batch = batch_builder.build();
1030
1031        assert_eq!(batch.max_export_batch_size, 1);
1032        assert_eq!(batch.scheduled_delay, Duration::from_millis(2));
1033        #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
1034        assert_eq!(batch.max_export_timeout, Duration::from_millis(3));
1035        assert_eq!(batch.max_queue_size, 4);
1036    }
1037
1038    #[test]
1039    fn test_build_batch_log_processor_builder() {
1040        let mut env_vars = vec![
1041            (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")),
1042            (OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")),
1043            #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
1044            (OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")),
1045        ];
1046        temp_env::with_vars(env_vars.clone(), || {
1047            let builder = BatchLogProcessor::builder(InMemoryLogExporter::default());
1048
1049            assert_eq!(builder.config.max_export_batch_size, 500);
1050            assert_eq!(
1051                builder.config.scheduled_delay,
1052                Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
1053            );
1054            assert_eq!(
1055                builder.config.max_queue_size,
1056                OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT
1057            );
1058
1059            #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
1060            assert_eq!(
1061                builder.config.max_export_timeout,
1062                Duration::from_millis(2046)
1063            );
1064        });
1065
1066        env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120")));
1067
1068        temp_env::with_vars(env_vars, || {
1069            let builder = BatchLogProcessor::builder(InMemoryLogExporter::default());
1070            assert_eq!(builder.config.max_export_batch_size, 120);
1071            assert_eq!(builder.config.max_queue_size, 120);
1072        });
1073    }
1074
1075    #[test]
1076    fn test_build_batch_log_processor_builder_with_custom_config() {
1077        let expected = BatchConfigBuilder::default()
1078            .with_max_export_batch_size(1)
1079            .with_scheduled_delay(Duration::from_millis(2))
1080            .with_max_queue_size(4)
1081            .build();
1082
1083        let builder =
1084            BatchLogProcessor::builder(InMemoryLogExporter::default()).with_batch_config(expected);
1085
1086        let actual = &builder.config;
1087        assert_eq!(actual.max_export_batch_size, 1);
1088        assert_eq!(actual.scheduled_delay, Duration::from_millis(2));
1089        assert_eq!(actual.max_queue_size, 4);
1090    }
1091
1092    #[test]
1093    fn test_set_resource_simple_processor() {
1094        let exporter = MockLogExporter {
1095            resource: Arc::new(Mutex::new(None)),
1096        };
1097        let processor = SimpleLogProcessor::new(exporter.clone());
1098        let _ = SdkLoggerProvider::builder()
1099            .with_log_processor(processor)
1100            .with_resource(
1101                Resource::builder_empty()
1102                    .with_attributes([
1103                        KeyValue::new("k1", "v1"),
1104                        KeyValue::new("k2", "v3"),
1105                        KeyValue::new("k3", "v3"),
1106                        KeyValue::new("k4", "v4"),
1107                        KeyValue::new("k5", "v5"),
1108                    ])
1109                    .build(),
1110            )
1111            .build();
1112        assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
1113    }
1114
1115    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1116    async fn test_set_resource_batch_processor() {
1117        let exporter = MockLogExporter {
1118            resource: Arc::new(Mutex::new(None)),
1119        };
1120        let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
1121        let provider = SdkLoggerProvider::builder()
1122            .with_log_processor(processor)
1123            .with_resource(
1124                Resource::builder_empty()
1125                    .with_attributes([
1126                        KeyValue::new("k1", "v1"),
1127                        KeyValue::new("k2", "v3"),
1128                        KeyValue::new("k3", "v3"),
1129                        KeyValue::new("k4", "v4"),
1130                        KeyValue::new("k5", "v5"),
1131                    ])
1132                    .build(),
1133            )
1134            .build();
1135
1136        // wait for the batch processor to process the resource.
1137        tokio::time::sleep(Duration::from_millis(100)).await;
1138
1139        assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
1140        let _ = provider.shutdown();
1141    }
1142
1143    #[tokio::test(flavor = "multi_thread")]
1144    async fn test_batch_shutdown() {
1145        // assert we will receive an error
1146        // setup
1147        let exporter = InMemoryLogExporterBuilder::default()
1148            .keep_records_on_shutdown()
1149            .build();
1150        let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
1151
1152        let mut record = SdkLogRecord::new();
1153        let instrumentation = InstrumentationScope::default();
1154
1155        processor.emit(&mut record, &instrumentation);
1156        processor.force_flush().unwrap();
1157        processor.shutdown().unwrap();
1158        // todo: expect to see errors here. How should we assert this?
1159        processor.emit(&mut record, &instrumentation);
1160        assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
1161    }
1162
1163    #[test]
1164    fn test_simple_shutdown() {
1165        let exporter = InMemoryLogExporterBuilder::default()
1166            .keep_records_on_shutdown()
1167            .build();
1168        let processor = SimpleLogProcessor::new(exporter.clone());
1169
1170        let mut record: SdkLogRecord = SdkLogRecord::new();
1171        let instrumentation: InstrumentationScope = Default::default();
1172
1173        processor.emit(&mut record, &instrumentation);
1174
1175        processor.shutdown().unwrap();
1176
1177        let is_shutdown = processor
1178            .is_shutdown
1179            .load(std::sync::atomic::Ordering::Relaxed);
1180        assert!(is_shutdown);
1181
1182        processor.emit(&mut record, &instrumentation);
1183
1184        assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
1185    }
1186
1187    #[tokio::test(flavor = "current_thread")]
1188    async fn test_batch_log_processor_shutdown_under_async_runtime_current_flavor_multi_thread() {
1189        let exporter = InMemoryLogExporterBuilder::default().build();
1190        let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
1191
1192        processor.shutdown().unwrap();
1193    }
1194
1195    #[tokio::test(flavor = "current_thread")]
1196    async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_current_thread() {
1197        let exporter = InMemoryLogExporterBuilder::default().build();
1198        let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
1199        processor.shutdown().unwrap();
1200    }
1201
1202    #[tokio::test(flavor = "multi_thread")]
1203    async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_multi_thread() {
1204        let exporter = InMemoryLogExporterBuilder::default().build();
1205        let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
1206        processor.shutdown().unwrap();
1207    }
1208
1209    #[tokio::test(flavor = "multi_thread")]
1210    async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() {
1211        let exporter = InMemoryLogExporterBuilder::default().build();
1212        let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
1213        processor.shutdown().unwrap();
1214    }
1215
1216    #[derive(Debug)]
1217    struct FirstProcessor {
1218        pub(crate) logs: Arc<Mutex<Vec<(SdkLogRecord, InstrumentationScope)>>>,
1219    }
1220
1221    impl LogProcessor for FirstProcessor {
1222        fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
1223            // add attribute
1224            record.add_attribute(
1225                Key::from_static_str("processed_by"),
1226                AnyValue::String("FirstProcessor".into()),
1227            );
1228            // update body
1229            record.body = Some("Updated by FirstProcessor".into());
1230
1231            self.logs
1232                .lock()
1233                .unwrap()
1234                .push((record.clone(), instrumentation.clone())); //clone as the LogProcessor is storing the data.
1235        }
1236
1237        fn force_flush(&self) -> OTelSdkResult {
1238            Ok(())
1239        }
1240
1241        fn shutdown(&self) -> OTelSdkResult {
1242            Ok(())
1243        }
1244    }
1245
1246    #[derive(Debug)]
1247    struct SecondProcessor {
1248        pub(crate) logs: Arc<Mutex<Vec<(SdkLogRecord, InstrumentationScope)>>>,
1249    }
1250
1251    impl LogProcessor for SecondProcessor {
1252        fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
1253            assert!(record.attributes_contains(
1254                &Key::from_static_str("processed_by"),
1255                &AnyValue::String("FirstProcessor".into())
1256            ));
1257            assert!(
1258                record.body.clone().unwrap()
1259                    == AnyValue::String("Updated by FirstProcessor".into())
1260            );
1261            self.logs
1262                .lock()
1263                .unwrap()
1264                .push((record.clone(), instrumentation.clone()));
1265        }
1266
1267        fn force_flush(&self) -> OTelSdkResult {
1268            Ok(())
1269        }
1270
1271        fn shutdown(&self) -> OTelSdkResult {
1272            Ok(())
1273        }
1274    }
1275    #[test]
1276    fn test_log_data_modification_by_multiple_processors() {
1277        let first_processor_logs = Arc::new(Mutex::new(Vec::new()));
1278        let second_processor_logs = Arc::new(Mutex::new(Vec::new()));
1279
1280        let first_processor = FirstProcessor {
1281            logs: Arc::clone(&first_processor_logs),
1282        };
1283        let second_processor = SecondProcessor {
1284            logs: Arc::clone(&second_processor_logs),
1285        };
1286
1287        let logger_provider = SdkLoggerProvider::builder()
1288            .with_log_processor(first_processor)
1289            .with_log_processor(second_processor)
1290            .build();
1291
1292        let logger = logger_provider.logger("test-logger");
1293        let mut log_record = logger.create_log_record();
1294        log_record.body = Some(AnyValue::String("Test log".into()));
1295
1296        logger.emit(log_record);
1297
1298        assert_eq!(first_processor_logs.lock().unwrap().len(), 1);
1299        assert_eq!(second_processor_logs.lock().unwrap().len(), 1);
1300
1301        let first_log = &first_processor_logs.lock().unwrap()[0];
1302        let second_log = &second_processor_logs.lock().unwrap()[0];
1303
1304        assert!(first_log.0.attributes_contains(
1305            &Key::from_static_str("processed_by"),
1306            &AnyValue::String("FirstProcessor".into())
1307        ));
1308        assert!(second_log.0.attributes_contains(
1309            &Key::from_static_str("processed_by"),
1310            &AnyValue::String("FirstProcessor".into())
1311        ));
1312
1313        assert!(
1314            first_log.0.body.clone().unwrap()
1315                == AnyValue::String("Updated by FirstProcessor".into())
1316        );
1317        assert!(
1318            second_log.0.body.clone().unwrap()
1319                == AnyValue::String("Updated by FirstProcessor".into())
1320        );
1321    }
1322
1323    #[test]
1324    fn test_simple_processor_sync_exporter_without_runtime() {
1325        let exporter = InMemoryLogExporterBuilder::default().build();
1326        let processor = SimpleLogProcessor::new(exporter.clone());
1327
1328        let mut record: SdkLogRecord = SdkLogRecord::new();
1329        let instrumentation: InstrumentationScope = Default::default();
1330
1331        processor.emit(&mut record, &instrumentation);
1332
1333        assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
1334    }
1335
1336    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1337    async fn test_simple_processor_sync_exporter_with_runtime() {
1338        let exporter = InMemoryLogExporterBuilder::default().build();
1339        let processor = SimpleLogProcessor::new(exporter.clone());
1340
1341        let mut record: SdkLogRecord = SdkLogRecord::new();
1342        let instrumentation: InstrumentationScope = Default::default();
1343
1344        processor.emit(&mut record, &instrumentation);
1345
1346        assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
1347    }
1348
1349    #[tokio::test(flavor = "multi_thread")]
1350    async fn test_simple_processor_sync_exporter_with_multi_thread_runtime() {
1351        let exporter = InMemoryLogExporterBuilder::default().build();
1352        let processor = Arc::new(SimpleLogProcessor::new(exporter.clone()));
1353
1354        let mut handles = vec![];
1355        for _ in 0..10 {
1356            let processor_clone = Arc::clone(&processor);
1357            let handle = tokio::spawn(async move {
1358                let mut record: SdkLogRecord = SdkLogRecord::new();
1359                let instrumentation: InstrumentationScope = Default::default();
1360                processor_clone.emit(&mut record, &instrumentation);
1361            });
1362            handles.push(handle);
1363        }
1364
1365        for handle in handles {
1366            handle.await.unwrap();
1367        }
1368
1369        assert_eq!(exporter.get_emitted_logs().unwrap().len(), 10);
1370    }
1371
1372    #[tokio::test(flavor = "current_thread")]
1373    async fn test_simple_processor_sync_exporter_with_current_thread_runtime() {
1374        let exporter = InMemoryLogExporterBuilder::default().build();
1375        let processor = SimpleLogProcessor::new(exporter.clone());
1376
1377        let mut record: SdkLogRecord = SdkLogRecord::new();
1378        let instrumentation: InstrumentationScope = Default::default();
1379
1380        processor.emit(&mut record, &instrumentation);
1381
1382        assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
1383    }
1384
1385    #[derive(Debug, Clone)]
1386    struct LogExporterThatRequiresTokio {
1387        export_count: Arc<AtomicUsize>,
1388    }
1389
1390    impl LogExporterThatRequiresTokio {
1391        /// Creates a new instance of `LogExporterThatRequiresTokio`.
1392        fn new() -> Self {
1393            LogExporterThatRequiresTokio {
1394                export_count: Arc::new(AtomicUsize::new(0)),
1395            }
1396        }
1397
1398        /// Returns the number of logs stored in the exporter.
1399        fn len(&self) -> usize {
1400            self.export_count.load(Ordering::Acquire)
1401        }
1402    }
1403
1404    impl LogExporter for LogExporterThatRequiresTokio {
1405        #[allow(clippy::manual_async_fn)]
1406        fn export(
1407            &self,
1408            batch: LogBatch<'_>,
1409        ) -> impl std::future::Future<Output = OTelSdkResult> + Send {
1410            // Simulate minimal dependency on tokio by sleeping asynchronously for a short duration
1411            async move {
1412                tokio::time::sleep(Duration::from_millis(50)).await;
1413
1414                for _ in batch.iter() {
1415                    self.export_count.fetch_add(1, Ordering::Acquire);
1416                }
1417                Ok(())
1418            }
1419        }
1420    }
1421
1422    #[test]
1423    fn test_simple_processor_async_exporter_without_runtime() {
1424        // Use `catch_unwind` to catch the panic caused by missing Tokio runtime
1425        let result = std::panic::catch_unwind(|| {
1426            let exporter = LogExporterThatRequiresTokio::new();
1427            let processor = SimpleLogProcessor::new(exporter.clone());
1428
1429            let mut record: SdkLogRecord = SdkLogRecord::new();
1430            let instrumentation: InstrumentationScope = Default::default();
1431
1432            // This will panic because an tokio async operation within exporter without a runtime.
1433            processor.emit(&mut record, &instrumentation);
1434        });
1435
1436        // Verify that the panic occurred and check the panic message for the absence of a Tokio runtime
1437        assert!(
1438            result.is_err(),
1439            "The test should fail due to missing Tokio runtime, but it did not."
1440        );
1441        let panic_payload = result.unwrap_err();
1442        let panic_message = panic_payload
1443            .downcast_ref::<String>()
1444            .map(|s| s.as_str())
1445            .or_else(|| panic_payload.downcast_ref::<&str>().copied())
1446            .unwrap_or("No panic message");
1447
1448        assert!(
1449            panic_message.contains("no reactor running")
1450                || panic_message.contains("must be called from the context of a Tokio 1.x runtime"),
1451            "Expected panic message about missing Tokio runtime, but got: {}",
1452            panic_message
1453        );
1454    }
1455
1456    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1457    #[ignore]
1458    // This test demonstrates a potential deadlock scenario in a multi-threaded Tokio runtime.
1459    // It spawns Tokio tasks equal to the number of runtime worker threads (4) to emit log events.
1460    // Each task attempts to acquire a mutex on the exporter in `SimpleLogProcessor::emit`.
1461    // Only one task obtains the lock, while the others are blocked, waiting for its release.
1462    //
1463    // The task holding the lock invokes the LogExporterThatRequiresTokio, which performs an
1464    // asynchronous operation (e.g., network I/O simulated by `tokio::sleep`). This operation
1465    // requires yielding control back to the Tokio runtime to make progress.
1466    //
1467    // However, all worker threads are occupied:
1468    // - One thread is executing the async exporter operation
1469    // - Three threads are blocked waiting for the mutex
1470    //
1471    // This leads to a deadlock as there are no available threads to drive the async operation
1472    // to completion, preventing the mutex from being released. Consequently, neither the blocked
1473    // tasks nor the exporter can proceed.
1474    async fn test_simple_processor_async_exporter_with_all_runtime_worker_threads_blocked() {
1475        let exporter = LogExporterThatRequiresTokio::new();
1476        let processor = Arc::new(SimpleLogProcessor::new(exporter.clone()));
1477
1478        let concurrent_emit = 4; // number of worker threads
1479
1480        let mut handles = vec![];
1481        // try send `concurrent_emit` events concurrently
1482        for _ in 0..concurrent_emit {
1483            let processor_clone = Arc::clone(&processor);
1484            let handle = tokio::spawn(async move {
1485                let mut record: SdkLogRecord = SdkLogRecord::new();
1486                let instrumentation: InstrumentationScope = Default::default();
1487                processor_clone.emit(&mut record, &instrumentation);
1488            });
1489            handles.push(handle);
1490        }
1491
1492        // below code won't get executed
1493        for handle in handles {
1494            handle.await.unwrap();
1495        }
1496        assert_eq!(exporter.len(), concurrent_emit);
1497    }
1498
1499    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1500    // This test uses a multi-threaded runtime setup with a single worker thread. Note that even
1501    // though only one worker thread is created, it is distinct from the main thread. The processor
1502    // emits a log event, and the exporter performs an async operation that requires the runtime.
1503    // The single worker thread handles this operation without deadlocking, as long as no other
1504    // tasks occupy the runtime.
1505    async fn test_simple_processor_async_exporter_with_runtime() {
1506        let exporter = LogExporterThatRequiresTokio::new();
1507        let processor = SimpleLogProcessor::new(exporter.clone());
1508
1509        let mut record: SdkLogRecord = SdkLogRecord::new();
1510        let instrumentation: InstrumentationScope = Default::default();
1511
1512        processor.emit(&mut record, &instrumentation);
1513
1514        assert_eq!(exporter.len(), 1);
1515    }
1516
1517    #[tokio::test(flavor = "multi_thread")]
1518    // This test uses a multi-threaded runtime setup with the default number of worker threads.
1519    // The processor emits a log event, and the exporter, which requires the runtime for its async
1520    // operations, can access one of the available worker threads to complete its task. As there
1521    // are multiple threads, the exporter can proceed without blocking other tasks, ensuring the
1522    // test completes successfully.
1523    async fn test_simple_processor_async_exporter_with_multi_thread_runtime() {
1524        let exporter = LogExporterThatRequiresTokio::new();
1525
1526        let processor = SimpleLogProcessor::new(exporter.clone());
1527
1528        let mut record: SdkLogRecord = SdkLogRecord::new();
1529        let instrumentation: InstrumentationScope = Default::default();
1530
1531        processor.emit(&mut record, &instrumentation);
1532
1533        assert_eq!(exporter.len(), 1);
1534    }
1535
1536    #[tokio::test(flavor = "current_thread")]
1537    #[ignore]
1538    // This test uses a current-thread runtime, where all operations run on the main thread.
1539    // The processor emits a log event while the runtime is blocked using `futures::block_on`
1540    // to complete the export operation. The exporter, which performs an async operation and
1541    // requires the runtime, cannot progress because the main thread is already blocked.
1542    // This results in a deadlock, as the runtime cannot move forward.
1543    async fn test_simple_processor_async_exporter_with_current_thread_runtime() {
1544        let exporter = LogExporterThatRequiresTokio::new();
1545
1546        let processor = SimpleLogProcessor::new(exporter.clone());
1547
1548        let mut record: SdkLogRecord = SdkLogRecord::new();
1549        let instrumentation: InstrumentationScope = Default::default();
1550
1551        processor.emit(&mut record, &instrumentation);
1552
1553        assert_eq!(exporter.len(), 1);
1554    }
1555}