1use crate::error::{OTelSdkError, OTelSdkResult};
35use crate::{
36 logs::{LogBatch, LogExporter, SdkLogRecord},
37 Resource,
38};
39use std::sync::mpsc::{self, RecvTimeoutError, SyncSender};
40
41#[cfg(feature = "spec_unstable_logs_enabled")]
42use opentelemetry::logs::Severity;
43use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn, InstrumentationScope};
44
45use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
46use std::{cmp::min, env, sync::Mutex};
47use std::{
48 fmt::{self, Debug, Formatter},
49 str::FromStr,
50 sync::Arc,
51 thread,
52 time::Duration,
53 time::Instant,
54};
55
56pub(crate) const OTEL_BLRP_SCHEDULE_DELAY: &str = "OTEL_BLRP_SCHEDULE_DELAY";
58pub(crate) const OTEL_BLRP_SCHEDULE_DELAY_DEFAULT: u64 = 1_000;
60#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
62pub(crate) const OTEL_BLRP_EXPORT_TIMEOUT: &str = "OTEL_BLRP_EXPORT_TIMEOUT";
63#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
65pub(crate) const OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000;
66pub(crate) const OTEL_BLRP_MAX_QUEUE_SIZE: &str = "OTEL_BLRP_MAX_QUEUE_SIZE";
68pub(crate) const OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
70pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE";
72pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
74
75pub trait LogProcessor: Send + Sync + Debug {
79 fn emit(&self, data: &mut SdkLogRecord, instrumentation: &InstrumentationScope);
91 fn force_flush(&self) -> OTelSdkResult;
93 fn shutdown(&self) -> OTelSdkResult;
97 #[cfg(feature = "spec_unstable_logs_enabled")]
98 fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
100 true
102 }
103
104 fn set_resource(&self, _resource: &Resource) {}
106}
107
108#[derive(Debug)]
137pub struct SimpleLogProcessor<T: LogExporter> {
138 exporter: Mutex<T>,
139 is_shutdown: AtomicBool,
140}
141
142impl<T: LogExporter> SimpleLogProcessor<T> {
143 pub(crate) fn new(exporter: T) -> Self {
144 SimpleLogProcessor {
145 exporter: Mutex::new(exporter),
146 is_shutdown: AtomicBool::new(false),
147 }
148 }
149}
150
151impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
152 fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
153 if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
155 otel_warn!(
157 name: "SimpleLogProcessor.Emit.ProcessorShutdown",
158 );
159 return;
160 }
161
162 let result = self
163 .exporter
164 .lock()
165 .map_err(|_| OTelSdkError::InternalFailure("SimpleLogProcessor mutex poison".into()))
166 .and_then(|exporter| {
167 let log_tuple = &[(record as &SdkLogRecord, instrumentation)];
168 futures_executor::block_on(exporter.export(LogBatch::new(log_tuple)))
169 });
170 match result {
172 Err(OTelSdkError::InternalFailure(_)) => {
173 otel_debug!(
175 name: "SimpleLogProcessor.Emit.MutexPoisoning",
176 );
177 }
178 Err(err) => {
179 otel_error!(
180 name: "SimpleLogProcessor.Emit.ExportError",
181 error = format!("{}",err)
182 );
183 }
184 _ => {}
185 }
186 }
187
188 fn force_flush(&self) -> OTelSdkResult {
189 Ok(())
190 }
191
192 fn shutdown(&self) -> OTelSdkResult {
193 self.is_shutdown
194 .store(true, std::sync::atomic::Ordering::Relaxed);
195 if let Ok(mut exporter) = self.exporter.lock() {
196 exporter.shutdown()
197 } else {
198 Err(OTelSdkError::InternalFailure(
199 "SimpleLogProcessor mutex poison at shutdown".into(),
200 ))
201 }
202 }
203
204 fn set_resource(&self, resource: &Resource) {
205 if let Ok(mut exporter) = self.exporter.lock() {
206 exporter.set_resource(resource);
207 }
208 }
209}
210
211#[allow(clippy::large_enum_variant)]
213#[derive(Debug)]
214enum BatchMessage {
215 ExportLog(Arc<AtomicBool>),
217 ForceFlush(mpsc::SyncSender<OTelSdkResult>),
219 Shutdown(mpsc::SyncSender<OTelSdkResult>),
221 SetResource(Arc<Resource>),
223}
224
225type LogsData = Box<(SdkLogRecord, InstrumentationScope)>;
226
227pub struct BatchLogProcessor {
284 logs_sender: SyncSender<LogsData>, message_sender: SyncSender<BatchMessage>, handle: Mutex<Option<thread::JoinHandle<()>>>,
287 forceflush_timeout: Duration,
288 shutdown_timeout: Duration,
289 export_log_message_sent: Arc<AtomicBool>,
290 current_batch_size: Arc<AtomicUsize>,
291 max_export_batch_size: usize,
292
293 dropped_logs_count: AtomicUsize,
295
296 max_queue_size: usize,
298}
299
300impl Debug for BatchLogProcessor {
301 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
302 f.debug_struct("BatchLogProcessor")
303 .field("message_sender", &self.message_sender)
304 .finish()
305 }
306}
307
308impl LogProcessor for BatchLogProcessor {
309 fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
310 let result = self
311 .logs_sender
312 .try_send(Box::new((record.clone(), instrumentation.clone())));
313
314 match result {
316 Ok(_) => {
317 if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1
321 >= self.max_export_batch_size
322 {
323 if !self.export_log_message_sent.load(Ordering::Relaxed) {
330 if !self.export_log_message_sent.swap(true, Ordering::Relaxed) {
340 match self.message_sender.try_send(BatchMessage::ExportLog(
341 self.export_log_message_sent.clone(),
342 )) {
343 Ok(_) => {
344 }
346 Err(_err) => {
347 self.export_log_message_sent.store(false, Ordering::Relaxed);
351 }
352 }
353 }
354 }
355 }
356 }
357 Err(mpsc::TrySendError::Full(_)) => {
358 if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 {
361 otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
362 message = "BatchLogProcessor dropped a LogRecord due to queue full. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
363 }
364 }
365 Err(mpsc::TrySendError::Disconnected(_)) => {
366 otel_warn!(
369 name: "BatchLogProcessor.Emit.AfterShutdown",
370 message = "Logs are being emitted even after Shutdown. This indicates incorrect lifecycle management of OTelLoggerProvider in application. Logs will not be exported."
371 );
372 }
373 }
374 }
375
376 fn force_flush(&self) -> OTelSdkResult {
377 let (sender, receiver) = mpsc::sync_channel(1);
378 match self
379 .message_sender
380 .try_send(BatchMessage::ForceFlush(sender))
381 {
382 Ok(_) => receiver
383 .recv_timeout(self.forceflush_timeout)
384 .map_err(|err| {
385 if err == RecvTimeoutError::Timeout {
386 OTelSdkError::Timeout(self.forceflush_timeout)
387 } else {
388 OTelSdkError::InternalFailure(format!("{}", err))
389 }
390 })?,
391 Err(mpsc::TrySendError::Full(_)) => {
392 otel_debug!(
394 name: "BatchLogProcessor.ForceFlush.ControlChannelFull",
395 message = "Control message to flush the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call."
396 );
397 Err(OTelSdkError::InternalFailure("ForceFlush cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call.".into()))
398 }
399 Err(mpsc::TrySendError::Disconnected(_)) => {
400 otel_debug!(
403 name: "BatchLogProcessor.ForceFlush.AlreadyShutdown",
404 message = "ForceFlush invoked after Shutdown. This will not perform Flush and indicates a incorrect lifecycle management in Application."
405 );
406
407 Err(OTelSdkError::AlreadyShutdown)
408 }
409 }
410 }
411
412 fn shutdown(&self) -> OTelSdkResult {
413 let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
414 let max_queue_size = self.max_queue_size;
415 if dropped_logs > 0 {
416 otel_warn!(
417 name: "BatchLogProcessor.LogsDropped",
418 dropped_logs_count = dropped_logs,
419 max_queue_size = max_queue_size,
420 message = "Logs were dropped due to a queue being full. The count represents the total count of log records dropped in the lifetime of this BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals."
421 );
422 }
423
424 let (sender, receiver) = mpsc::sync_channel(1);
425 match self.message_sender.try_send(BatchMessage::Shutdown(sender)) {
426 Ok(_) => {
427 receiver
428 .recv_timeout(self.shutdown_timeout)
429 .map(|_| {
430 if let Some(handle) = self.handle.lock().unwrap().take() {
433 handle.join().unwrap();
434 }
435 OTelSdkResult::Ok(())
436 })
437 .map_err(|err| match err {
438 RecvTimeoutError::Timeout => {
439 otel_error!(
440 name: "BatchLogProcessor.Shutdown.Timeout",
441 message = "BatchLogProcessor shutdown timing out."
442 );
443 OTelSdkError::Timeout(self.shutdown_timeout)
444 }
445 _ => {
446 otel_error!(
447 name: "BatchLogProcessor.Shutdown.Error",
448 error = format!("{}", err)
449 );
450 OTelSdkError::InternalFailure(format!("{}", err))
451 }
452 })?
453 }
454 Err(mpsc::TrySendError::Full(_)) => {
455 otel_debug!(
457 name: "BatchLogProcessor.Shutdown.ControlChannelFull",
458 message = "Control message to shutdown the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call."
459 );
460 Err(OTelSdkError::InternalFailure("Shutdown cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call.".into()))
461 }
462 Err(mpsc::TrySendError::Disconnected(_)) => {
463 otel_debug!(
466 name: "BatchLogProcessor.Shutdown.AlreadyShutdown",
467 message = "Shutdown is being invoked more than once. This is noop, but indicates a potential issue in the application's lifecycle management."
468 );
469
470 Err(OTelSdkError::AlreadyShutdown)
471 }
472 }
473 }
474
475 fn set_resource(&self, resource: &Resource) {
476 let resource = Arc::new(resource.clone());
477 let _ = self
478 .message_sender
479 .try_send(BatchMessage::SetResource(resource));
480 }
481}
482
483impl BatchLogProcessor {
484 pub(crate) fn new<E>(mut exporter: E, config: BatchConfig) -> Self
485 where
486 E: LogExporter + Send + Sync + 'static,
487 {
488 let (logs_sender, logs_receiver) = mpsc::sync_channel::<LogsData>(config.max_queue_size);
489 let (message_sender, message_receiver) = mpsc::sync_channel::<BatchMessage>(64); let max_queue_size = config.max_queue_size;
491 let max_export_batch_size = config.max_export_batch_size;
492 let current_batch_size = Arc::new(AtomicUsize::new(0));
493 let current_batch_size_for_thread = current_batch_size.clone();
494
495 let handle = thread::Builder::new()
496 .name("OpenTelemetry.Logs.BatchProcessor".to_string())
497 .spawn(move || {
498 otel_info!(
499 name: "BatchLogProcessor.ThreadStarted",
500 interval_in_millisecs = config.scheduled_delay.as_millis(),
501 max_export_batch_size = config.max_export_batch_size,
502 max_queue_size = max_queue_size,
503 );
504 let mut last_export_time = Instant::now();
505 let mut logs = Vec::with_capacity(config.max_export_batch_size);
506 let current_batch_size = current_batch_size_for_thread;
507
508 #[inline]
512 fn get_logs_and_export<E>(
513 logs_receiver: &mpsc::Receiver<LogsData>,
514 exporter: &E,
515 logs: &mut Vec<LogsData>,
516 last_export_time: &mut Instant,
517 current_batch_size: &AtomicUsize,
518 config: &BatchConfig,
519 ) -> OTelSdkResult
520 where
521 E: LogExporter + Send + Sync + 'static,
522 {
523 let target = current_batch_size.load(Ordering::Relaxed); let mut result = OTelSdkResult::Ok(());
525 let mut total_exported_logs: usize = 0;
526
527 while target > 0 && total_exported_logs < target {
528 while let Ok(log) = logs_receiver.try_recv() {
530 logs.push(log);
531 if logs.len() == config.max_export_batch_size {
532 break;
533 }
534 }
535
536 let count_of_logs = logs.len(); total_exported_logs += count_of_logs;
538
539 result = export_batch_sync(exporter, logs, last_export_time); current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
542 }
543 result
544 }
545
546 loop {
547 let remaining_time = config
548 .scheduled_delay
549 .checked_sub(last_export_time.elapsed())
550 .unwrap_or(config.scheduled_delay);
551
552 match message_receiver.recv_timeout(remaining_time) {
553 Ok(BatchMessage::ExportLog(export_log_message_sent)) => {
554 export_log_message_sent.store(false, Ordering::Relaxed);
556
557 otel_debug!(
558 name: "BatchLogProcessor.ExportingDueToBatchSize",
559 );
560
561 let _ = get_logs_and_export(
562 &logs_receiver,
563 &exporter,
564 &mut logs,
565 &mut last_export_time,
566 ¤t_batch_size,
567 &config,
568 );
569 }
570 Ok(BatchMessage::ForceFlush(sender)) => {
571 otel_debug!(name: "BatchLogProcessor.ExportingDueToForceFlush");
572 let result = get_logs_and_export(
573 &logs_receiver,
574 &exporter,
575 &mut logs,
576 &mut last_export_time,
577 ¤t_batch_size,
578 &config,
579 );
580 let _ = sender.send(result);
581 }
582 Ok(BatchMessage::Shutdown(sender)) => {
583 otel_debug!(name: "BatchLogProcessor.ExportingDueToShutdown");
584 let result = get_logs_and_export(
585 &logs_receiver,
586 &exporter,
587 &mut logs,
588 &mut last_export_time,
589 ¤t_batch_size,
590 &config,
591 );
592 let _ = sender.send(result);
593
594 otel_debug!(
595 name: "BatchLogProcessor.ThreadExiting",
596 reason = "ShutdownRequested"
597 );
598 break;
602 }
603 Ok(BatchMessage::SetResource(resource)) => {
604 exporter.set_resource(&resource);
605 }
606 Err(RecvTimeoutError::Timeout) => {
607 otel_debug!(
608 name: "BatchLogProcessor.ExportingDueToTimer",
609 );
610
611 let _ = get_logs_and_export(
612 &logs_receiver,
613 &exporter,
614 &mut logs,
615 &mut last_export_time,
616 ¤t_batch_size,
617 &config,
618 );
619 }
620 Err(RecvTimeoutError::Disconnected) => {
621 otel_debug!(
624 name: "BatchLogProcessor.ThreadExiting",
625 reason = "MessageSenderDisconnected"
626 );
627 break;
628 }
629 }
630 }
631 otel_info!(
632 name: "BatchLogProcessor.ThreadStopped"
633 );
634 })
635 .expect("Thread spawn failed."); BatchLogProcessor {
639 logs_sender,
640 message_sender,
641 handle: Mutex::new(Some(handle)),
642 forceflush_timeout: Duration::from_secs(5), shutdown_timeout: Duration::from_secs(5), dropped_logs_count: AtomicUsize::new(0),
645 max_queue_size,
646 export_log_message_sent: Arc::new(AtomicBool::new(false)),
647 current_batch_size,
648 max_export_batch_size,
649 }
650 }
651
652 pub fn builder<E>(exporter: E) -> BatchLogProcessorBuilder<E>
654 where
655 E: LogExporter,
656 {
657 BatchLogProcessorBuilder {
658 exporter,
659 config: Default::default(),
660 }
661 }
662}
663
664#[allow(clippy::vec_box)]
665fn export_batch_sync<E>(
666 exporter: &E,
667 batch: &mut Vec<Box<(SdkLogRecord, InstrumentationScope)>>,
668 last_export_time: &mut Instant,
669) -> OTelSdkResult
670where
671 E: LogExporter + ?Sized,
672{
673 *last_export_time = Instant::now();
674
675 if batch.is_empty() {
676 return OTelSdkResult::Ok(());
677 }
678
679 let export = exporter.export(LogBatch::new_with_owned_data(batch.as_slice()));
680 let export_result = futures_executor::block_on(export);
681
682 batch.clear();
684
685 match export_result {
686 Ok(_) => OTelSdkResult::Ok(()),
687 Err(err) => {
688 otel_error!(
689 name: "BatchLogProcessor.ExportError",
690 error = format!("{}", err)
691 );
692 OTelSdkResult::Err(err)
693 }
694 }
695}
696
697#[derive(Debug)]
701pub struct BatchLogProcessorBuilder<E> {
702 exporter: E,
703 config: BatchConfig,
704}
705
706impl<E> BatchLogProcessorBuilder<E>
707where
708 E: LogExporter + 'static,
709{
710 pub fn with_batch_config(self, config: BatchConfig) -> Self {
712 BatchLogProcessorBuilder { config, ..self }
713 }
714
715 pub fn build(self) -> BatchLogProcessor {
717 BatchLogProcessor::new(self.exporter, self.config)
718 }
719}
720
721#[derive(Debug)]
724#[allow(dead_code)]
725pub struct BatchConfig {
726 pub(crate) max_queue_size: usize,
729
730 pub(crate) scheduled_delay: Duration,
733
734 pub(crate) max_export_batch_size: usize,
739
740 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
742 pub(crate) max_export_timeout: Duration,
743}
744
745impl Default for BatchConfig {
746 fn default() -> Self {
747 BatchConfigBuilder::default().build()
748 }
749}
750
751#[derive(Debug)]
753pub struct BatchConfigBuilder {
754 max_queue_size: usize,
755 scheduled_delay: Duration,
756 max_export_batch_size: usize,
757 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
758 max_export_timeout: Duration,
759}
760
761impl Default for BatchConfigBuilder {
762 fn default() -> Self {
770 BatchConfigBuilder {
771 max_queue_size: OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT,
772 scheduled_delay: Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT),
773 max_export_batch_size: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
774 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
775 max_export_timeout: Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT),
776 }
777 .init_from_env_vars()
778 }
779}
780
781impl BatchConfigBuilder {
782 pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
787 self.max_queue_size = max_queue_size;
788 self
789 }
790
791 pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
795 self.scheduled_delay = scheduled_delay;
796 self
797 }
798
799 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
803 pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
804 self.max_export_timeout = max_export_timeout;
805 self
806 }
807
808 pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
814 self.max_export_batch_size = max_export_batch_size;
815 self
816 }
817
818 pub fn build(self) -> BatchConfig {
821 let max_export_batch_size = min(self.max_export_batch_size, self.max_queue_size);
824
825 BatchConfig {
826 max_queue_size: self.max_queue_size,
827 scheduled_delay: self.scheduled_delay,
828 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
829 max_export_timeout: self.max_export_timeout,
830 max_export_batch_size,
831 }
832 }
833
834 fn init_from_env_vars(mut self) -> Self {
835 if let Some(max_queue_size) = env::var(OTEL_BLRP_MAX_QUEUE_SIZE)
836 .ok()
837 .and_then(|queue_size| usize::from_str(&queue_size).ok())
838 {
839 self.max_queue_size = max_queue_size;
840 }
841
842 if let Some(max_export_batch_size) = env::var(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE)
843 .ok()
844 .and_then(|batch_size| usize::from_str(&batch_size).ok())
845 {
846 self.max_export_batch_size = max_export_batch_size;
847 }
848
849 if let Some(scheduled_delay) = env::var(OTEL_BLRP_SCHEDULE_DELAY)
850 .ok()
851 .and_then(|delay| u64::from_str(&delay).ok())
852 {
853 self.scheduled_delay = Duration::from_millis(scheduled_delay);
854 }
855
856 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
857 if let Some(max_export_timeout) = env::var(OTEL_BLRP_EXPORT_TIMEOUT)
858 .ok()
859 .and_then(|s| u64::from_str(&s).ok())
860 {
861 self.max_export_timeout = Duration::from_millis(max_export_timeout);
862 }
863
864 self
865 }
866}
867
868#[cfg(all(test, feature = "testing", feature = "logs"))]
869mod tests {
870 use super::{
871 BatchLogProcessor, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, OTEL_BLRP_MAX_QUEUE_SIZE,
872 OTEL_BLRP_SCHEDULE_DELAY,
873 };
874 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
875 use super::{OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT};
876 use crate::logs::{LogBatch, LogExporter, SdkLogRecord};
877 use crate::{
878 error::OTelSdkResult,
879 logs::{
880 log_processor::{
881 OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT,
882 OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
883 },
884 BatchConfig, BatchConfigBuilder, InMemoryLogExporter, InMemoryLogExporterBuilder,
885 LogProcessor, SdkLoggerProvider, SimpleLogProcessor,
886 },
887 Resource,
888 };
889 use opentelemetry::logs::AnyValue;
890 use opentelemetry::logs::LogRecord as _;
891 use opentelemetry::logs::{Logger, LoggerProvider};
892 use opentelemetry::KeyValue;
893 use opentelemetry::{InstrumentationScope, Key};
894 use std::sync::atomic::{AtomicUsize, Ordering};
895 use std::sync::{Arc, Mutex};
896 use std::time::Duration;
897
898 #[derive(Debug, Clone)]
899 struct MockLogExporter {
900 resource: Arc<Mutex<Option<Resource>>>,
901 }
902
903 impl LogExporter for MockLogExporter {
904 #[allow(clippy::manual_async_fn)]
905 fn export(
906 &self,
907 _batch: LogBatch<'_>,
908 ) -> impl std::future::Future<Output = OTelSdkResult> + Send {
909 async { Ok(()) }
910 }
911
912 fn shutdown(&mut self) -> OTelSdkResult {
913 Ok(())
914 }
915
916 fn set_resource(&mut self, resource: &Resource) {
917 self.resource
918 .lock()
919 .map(|mut res_opt| {
920 res_opt.replace(resource.clone());
921 })
922 .expect("mock log exporter shouldn't error when setting resource");
923 }
924 }
925
926 impl MockLogExporter {
928 fn get_resource(&self) -> Option<Resource> {
929 (*self.resource).lock().unwrap().clone()
930 }
931 }
932
933 #[test]
934 fn test_default_const_values() {
935 assert_eq!(OTEL_BLRP_SCHEDULE_DELAY, "OTEL_BLRP_SCHEDULE_DELAY");
936 assert_eq!(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, 1_000);
937 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
938 assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT, "OTEL_BLRP_EXPORT_TIMEOUT");
939 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
940 assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, 30_000);
941 assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE, "OTEL_BLRP_MAX_QUEUE_SIZE");
942 assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, 2_048);
943 assert_eq!(
944 OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
945 "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"
946 );
947 assert_eq!(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
948 }
949
950 #[test]
951 fn test_default_batch_config_adheres_to_specification() {
952 let env_vars = vec![
954 OTEL_BLRP_SCHEDULE_DELAY,
955 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
956 OTEL_BLRP_EXPORT_TIMEOUT,
957 OTEL_BLRP_MAX_QUEUE_SIZE,
958 OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
959 ];
960
961 let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);
962
963 assert_eq!(
964 config.scheduled_delay,
965 Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
966 );
967 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
968 assert_eq!(
969 config.max_export_timeout,
970 Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT)
971 );
972 assert_eq!(config.max_queue_size, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT);
973 assert_eq!(
974 config.max_export_batch_size,
975 OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT
976 );
977 }
978
979 #[test]
980 fn test_batch_config_configurable_by_env_vars() {
981 let env_vars = vec![
982 (OTEL_BLRP_SCHEDULE_DELAY, Some("2000")),
983 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
984 (OTEL_BLRP_EXPORT_TIMEOUT, Some("60000")),
985 (OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")),
986 (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
987 ];
988
989 let config = temp_env::with_vars(env_vars, BatchConfig::default);
990
991 assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
992 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
993 assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
994 assert_eq!(config.max_queue_size, 4096);
995 assert_eq!(config.max_export_batch_size, 1024);
996 }
997
998 #[test]
999 fn test_batch_config_max_export_batch_size_validation() {
1000 let env_vars = vec![
1001 (OTEL_BLRP_MAX_QUEUE_SIZE, Some("256")),
1002 (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
1003 ];
1004
1005 let config = temp_env::with_vars(env_vars, BatchConfig::default);
1006
1007 assert_eq!(config.max_queue_size, 256);
1008 assert_eq!(config.max_export_batch_size, 256);
1009 assert_eq!(
1010 config.scheduled_delay,
1011 Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
1012 );
1013 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
1014 assert_eq!(
1015 config.max_export_timeout,
1016 Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT)
1017 );
1018 }
1019
1020 #[test]
1021 fn test_batch_config_with_fields() {
1022 let batch_builder = BatchConfigBuilder::default()
1023 .with_max_export_batch_size(1)
1024 .with_scheduled_delay(Duration::from_millis(2))
1025 .with_max_queue_size(4);
1026
1027 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
1028 let batch_builder = batch_builder.with_max_export_timeout(Duration::from_millis(3));
1029 let batch = batch_builder.build();
1030
1031 assert_eq!(batch.max_export_batch_size, 1);
1032 assert_eq!(batch.scheduled_delay, Duration::from_millis(2));
1033 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
1034 assert_eq!(batch.max_export_timeout, Duration::from_millis(3));
1035 assert_eq!(batch.max_queue_size, 4);
1036 }
1037
1038 #[test]
1039 fn test_build_batch_log_processor_builder() {
1040 let mut env_vars = vec![
1041 (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")),
1042 (OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")),
1043 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
1044 (OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")),
1045 ];
1046 temp_env::with_vars(env_vars.clone(), || {
1047 let builder = BatchLogProcessor::builder(InMemoryLogExporter::default());
1048
1049 assert_eq!(builder.config.max_export_batch_size, 500);
1050 assert_eq!(
1051 builder.config.scheduled_delay,
1052 Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
1053 );
1054 assert_eq!(
1055 builder.config.max_queue_size,
1056 OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT
1057 );
1058
1059 #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
1060 assert_eq!(
1061 builder.config.max_export_timeout,
1062 Duration::from_millis(2046)
1063 );
1064 });
1065
1066 env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120")));
1067
1068 temp_env::with_vars(env_vars, || {
1069 let builder = BatchLogProcessor::builder(InMemoryLogExporter::default());
1070 assert_eq!(builder.config.max_export_batch_size, 120);
1071 assert_eq!(builder.config.max_queue_size, 120);
1072 });
1073 }
1074
1075 #[test]
1076 fn test_build_batch_log_processor_builder_with_custom_config() {
1077 let expected = BatchConfigBuilder::default()
1078 .with_max_export_batch_size(1)
1079 .with_scheduled_delay(Duration::from_millis(2))
1080 .with_max_queue_size(4)
1081 .build();
1082
1083 let builder =
1084 BatchLogProcessor::builder(InMemoryLogExporter::default()).with_batch_config(expected);
1085
1086 let actual = &builder.config;
1087 assert_eq!(actual.max_export_batch_size, 1);
1088 assert_eq!(actual.scheduled_delay, Duration::from_millis(2));
1089 assert_eq!(actual.max_queue_size, 4);
1090 }
1091
1092 #[test]
1093 fn test_set_resource_simple_processor() {
1094 let exporter = MockLogExporter {
1095 resource: Arc::new(Mutex::new(None)),
1096 };
1097 let processor = SimpleLogProcessor::new(exporter.clone());
1098 let _ = SdkLoggerProvider::builder()
1099 .with_log_processor(processor)
1100 .with_resource(
1101 Resource::builder_empty()
1102 .with_attributes([
1103 KeyValue::new("k1", "v1"),
1104 KeyValue::new("k2", "v3"),
1105 KeyValue::new("k3", "v3"),
1106 KeyValue::new("k4", "v4"),
1107 KeyValue::new("k5", "v5"),
1108 ])
1109 .build(),
1110 )
1111 .build();
1112 assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
1113 }
1114
1115 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1116 async fn test_set_resource_batch_processor() {
1117 let exporter = MockLogExporter {
1118 resource: Arc::new(Mutex::new(None)),
1119 };
1120 let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
1121 let provider = SdkLoggerProvider::builder()
1122 .with_log_processor(processor)
1123 .with_resource(
1124 Resource::builder_empty()
1125 .with_attributes([
1126 KeyValue::new("k1", "v1"),
1127 KeyValue::new("k2", "v3"),
1128 KeyValue::new("k3", "v3"),
1129 KeyValue::new("k4", "v4"),
1130 KeyValue::new("k5", "v5"),
1131 ])
1132 .build(),
1133 )
1134 .build();
1135
1136 tokio::time::sleep(Duration::from_millis(100)).await;
1138
1139 assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
1140 let _ = provider.shutdown();
1141 }
1142
1143 #[tokio::test(flavor = "multi_thread")]
1144 async fn test_batch_shutdown() {
1145 let exporter = InMemoryLogExporterBuilder::default()
1148 .keep_records_on_shutdown()
1149 .build();
1150 let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
1151
1152 let mut record = SdkLogRecord::new();
1153 let instrumentation = InstrumentationScope::default();
1154
1155 processor.emit(&mut record, &instrumentation);
1156 processor.force_flush().unwrap();
1157 processor.shutdown().unwrap();
1158 processor.emit(&mut record, &instrumentation);
1160 assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
1161 }
1162
1163 #[test]
1164 fn test_simple_shutdown() {
1165 let exporter = InMemoryLogExporterBuilder::default()
1166 .keep_records_on_shutdown()
1167 .build();
1168 let processor = SimpleLogProcessor::new(exporter.clone());
1169
1170 let mut record: SdkLogRecord = SdkLogRecord::new();
1171 let instrumentation: InstrumentationScope = Default::default();
1172
1173 processor.emit(&mut record, &instrumentation);
1174
1175 processor.shutdown().unwrap();
1176
1177 let is_shutdown = processor
1178 .is_shutdown
1179 .load(std::sync::atomic::Ordering::Relaxed);
1180 assert!(is_shutdown);
1181
1182 processor.emit(&mut record, &instrumentation);
1183
1184 assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
1185 }
1186
1187 #[tokio::test(flavor = "current_thread")]
1188 async fn test_batch_log_processor_shutdown_under_async_runtime_current_flavor_multi_thread() {
1189 let exporter = InMemoryLogExporterBuilder::default().build();
1190 let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
1191
1192 processor.shutdown().unwrap();
1193 }
1194
1195 #[tokio::test(flavor = "current_thread")]
1196 async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_current_thread() {
1197 let exporter = InMemoryLogExporterBuilder::default().build();
1198 let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
1199 processor.shutdown().unwrap();
1200 }
1201
1202 #[tokio::test(flavor = "multi_thread")]
1203 async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_multi_thread() {
1204 let exporter = InMemoryLogExporterBuilder::default().build();
1205 let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
1206 processor.shutdown().unwrap();
1207 }
1208
1209 #[tokio::test(flavor = "multi_thread")]
1210 async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() {
1211 let exporter = InMemoryLogExporterBuilder::default().build();
1212 let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
1213 processor.shutdown().unwrap();
1214 }
1215
1216 #[derive(Debug)]
1217 struct FirstProcessor {
1218 pub(crate) logs: Arc<Mutex<Vec<(SdkLogRecord, InstrumentationScope)>>>,
1219 }
1220
1221 impl LogProcessor for FirstProcessor {
1222 fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
1223 record.add_attribute(
1225 Key::from_static_str("processed_by"),
1226 AnyValue::String("FirstProcessor".into()),
1227 );
1228 record.body = Some("Updated by FirstProcessor".into());
1230
1231 self.logs
1232 .lock()
1233 .unwrap()
1234 .push((record.clone(), instrumentation.clone())); }
1236
1237 fn force_flush(&self) -> OTelSdkResult {
1238 Ok(())
1239 }
1240
1241 fn shutdown(&self) -> OTelSdkResult {
1242 Ok(())
1243 }
1244 }
1245
1246 #[derive(Debug)]
1247 struct SecondProcessor {
1248 pub(crate) logs: Arc<Mutex<Vec<(SdkLogRecord, InstrumentationScope)>>>,
1249 }
1250
1251 impl LogProcessor for SecondProcessor {
1252 fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
1253 assert!(record.attributes_contains(
1254 &Key::from_static_str("processed_by"),
1255 &AnyValue::String("FirstProcessor".into())
1256 ));
1257 assert!(
1258 record.body.clone().unwrap()
1259 == AnyValue::String("Updated by FirstProcessor".into())
1260 );
1261 self.logs
1262 .lock()
1263 .unwrap()
1264 .push((record.clone(), instrumentation.clone()));
1265 }
1266
1267 fn force_flush(&self) -> OTelSdkResult {
1268 Ok(())
1269 }
1270
1271 fn shutdown(&self) -> OTelSdkResult {
1272 Ok(())
1273 }
1274 }
1275 #[test]
1276 fn test_log_data_modification_by_multiple_processors() {
1277 let first_processor_logs = Arc::new(Mutex::new(Vec::new()));
1278 let second_processor_logs = Arc::new(Mutex::new(Vec::new()));
1279
1280 let first_processor = FirstProcessor {
1281 logs: Arc::clone(&first_processor_logs),
1282 };
1283 let second_processor = SecondProcessor {
1284 logs: Arc::clone(&second_processor_logs),
1285 };
1286
1287 let logger_provider = SdkLoggerProvider::builder()
1288 .with_log_processor(first_processor)
1289 .with_log_processor(second_processor)
1290 .build();
1291
1292 let logger = logger_provider.logger("test-logger");
1293 let mut log_record = logger.create_log_record();
1294 log_record.body = Some(AnyValue::String("Test log".into()));
1295
1296 logger.emit(log_record);
1297
1298 assert_eq!(first_processor_logs.lock().unwrap().len(), 1);
1299 assert_eq!(second_processor_logs.lock().unwrap().len(), 1);
1300
1301 let first_log = &first_processor_logs.lock().unwrap()[0];
1302 let second_log = &second_processor_logs.lock().unwrap()[0];
1303
1304 assert!(first_log.0.attributes_contains(
1305 &Key::from_static_str("processed_by"),
1306 &AnyValue::String("FirstProcessor".into())
1307 ));
1308 assert!(second_log.0.attributes_contains(
1309 &Key::from_static_str("processed_by"),
1310 &AnyValue::String("FirstProcessor".into())
1311 ));
1312
1313 assert!(
1314 first_log.0.body.clone().unwrap()
1315 == AnyValue::String("Updated by FirstProcessor".into())
1316 );
1317 assert!(
1318 second_log.0.body.clone().unwrap()
1319 == AnyValue::String("Updated by FirstProcessor".into())
1320 );
1321 }
1322
1323 #[test]
1324 fn test_simple_processor_sync_exporter_without_runtime() {
1325 let exporter = InMemoryLogExporterBuilder::default().build();
1326 let processor = SimpleLogProcessor::new(exporter.clone());
1327
1328 let mut record: SdkLogRecord = SdkLogRecord::new();
1329 let instrumentation: InstrumentationScope = Default::default();
1330
1331 processor.emit(&mut record, &instrumentation);
1332
1333 assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
1334 }
1335
1336 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1337 async fn test_simple_processor_sync_exporter_with_runtime() {
1338 let exporter = InMemoryLogExporterBuilder::default().build();
1339 let processor = SimpleLogProcessor::new(exporter.clone());
1340
1341 let mut record: SdkLogRecord = SdkLogRecord::new();
1342 let instrumentation: InstrumentationScope = Default::default();
1343
1344 processor.emit(&mut record, &instrumentation);
1345
1346 assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
1347 }
1348
1349 #[tokio::test(flavor = "multi_thread")]
1350 async fn test_simple_processor_sync_exporter_with_multi_thread_runtime() {
1351 let exporter = InMemoryLogExporterBuilder::default().build();
1352 let processor = Arc::new(SimpleLogProcessor::new(exporter.clone()));
1353
1354 let mut handles = vec![];
1355 for _ in 0..10 {
1356 let processor_clone = Arc::clone(&processor);
1357 let handle = tokio::spawn(async move {
1358 let mut record: SdkLogRecord = SdkLogRecord::new();
1359 let instrumentation: InstrumentationScope = Default::default();
1360 processor_clone.emit(&mut record, &instrumentation);
1361 });
1362 handles.push(handle);
1363 }
1364
1365 for handle in handles {
1366 handle.await.unwrap();
1367 }
1368
1369 assert_eq!(exporter.get_emitted_logs().unwrap().len(), 10);
1370 }
1371
1372 #[tokio::test(flavor = "current_thread")]
1373 async fn test_simple_processor_sync_exporter_with_current_thread_runtime() {
1374 let exporter = InMemoryLogExporterBuilder::default().build();
1375 let processor = SimpleLogProcessor::new(exporter.clone());
1376
1377 let mut record: SdkLogRecord = SdkLogRecord::new();
1378 let instrumentation: InstrumentationScope = Default::default();
1379
1380 processor.emit(&mut record, &instrumentation);
1381
1382 assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
1383 }
1384
1385 #[derive(Debug, Clone)]
1386 struct LogExporterThatRequiresTokio {
1387 export_count: Arc<AtomicUsize>,
1388 }
1389
1390 impl LogExporterThatRequiresTokio {
1391 fn new() -> Self {
1393 LogExporterThatRequiresTokio {
1394 export_count: Arc::new(AtomicUsize::new(0)),
1395 }
1396 }
1397
1398 fn len(&self) -> usize {
1400 self.export_count.load(Ordering::Acquire)
1401 }
1402 }
1403
1404 impl LogExporter for LogExporterThatRequiresTokio {
1405 #[allow(clippy::manual_async_fn)]
1406 fn export(
1407 &self,
1408 batch: LogBatch<'_>,
1409 ) -> impl std::future::Future<Output = OTelSdkResult> + Send {
1410 async move {
1412 tokio::time::sleep(Duration::from_millis(50)).await;
1413
1414 for _ in batch.iter() {
1415 self.export_count.fetch_add(1, Ordering::Acquire);
1416 }
1417 Ok(())
1418 }
1419 }
1420 }
1421
1422 #[test]
1423 fn test_simple_processor_async_exporter_without_runtime() {
1424 let result = std::panic::catch_unwind(|| {
1426 let exporter = LogExporterThatRequiresTokio::new();
1427 let processor = SimpleLogProcessor::new(exporter.clone());
1428
1429 let mut record: SdkLogRecord = SdkLogRecord::new();
1430 let instrumentation: InstrumentationScope = Default::default();
1431
1432 processor.emit(&mut record, &instrumentation);
1434 });
1435
1436 assert!(
1438 result.is_err(),
1439 "The test should fail due to missing Tokio runtime, but it did not."
1440 );
1441 let panic_payload = result.unwrap_err();
1442 let panic_message = panic_payload
1443 .downcast_ref::<String>()
1444 .map(|s| s.as_str())
1445 .or_else(|| panic_payload.downcast_ref::<&str>().copied())
1446 .unwrap_or("No panic message");
1447
1448 assert!(
1449 panic_message.contains("no reactor running")
1450 || panic_message.contains("must be called from the context of a Tokio 1.x runtime"),
1451 "Expected panic message about missing Tokio runtime, but got: {}",
1452 panic_message
1453 );
1454 }
1455
1456 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1457 #[ignore]
1458 async fn test_simple_processor_async_exporter_with_all_runtime_worker_threads_blocked() {
1475 let exporter = LogExporterThatRequiresTokio::new();
1476 let processor = Arc::new(SimpleLogProcessor::new(exporter.clone()));
1477
1478 let concurrent_emit = 4; let mut handles = vec![];
1481 for _ in 0..concurrent_emit {
1483 let processor_clone = Arc::clone(&processor);
1484 let handle = tokio::spawn(async move {
1485 let mut record: SdkLogRecord = SdkLogRecord::new();
1486 let instrumentation: InstrumentationScope = Default::default();
1487 processor_clone.emit(&mut record, &instrumentation);
1488 });
1489 handles.push(handle);
1490 }
1491
1492 for handle in handles {
1494 handle.await.unwrap();
1495 }
1496 assert_eq!(exporter.len(), concurrent_emit);
1497 }
1498
1499 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1500 async fn test_simple_processor_async_exporter_with_runtime() {
1506 let exporter = LogExporterThatRequiresTokio::new();
1507 let processor = SimpleLogProcessor::new(exporter.clone());
1508
1509 let mut record: SdkLogRecord = SdkLogRecord::new();
1510 let instrumentation: InstrumentationScope = Default::default();
1511
1512 processor.emit(&mut record, &instrumentation);
1513
1514 assert_eq!(exporter.len(), 1);
1515 }
1516
1517 #[tokio::test(flavor = "multi_thread")]
1518 async fn test_simple_processor_async_exporter_with_multi_thread_runtime() {
1524 let exporter = LogExporterThatRequiresTokio::new();
1525
1526 let processor = SimpleLogProcessor::new(exporter.clone());
1527
1528 let mut record: SdkLogRecord = SdkLogRecord::new();
1529 let instrumentation: InstrumentationScope = Default::default();
1530
1531 processor.emit(&mut record, &instrumentation);
1532
1533 assert_eq!(exporter.len(), 1);
1534 }
1535
1536 #[tokio::test(flavor = "current_thread")]
1537 #[ignore]
1538 async fn test_simple_processor_async_exporter_with_current_thread_runtime() {
1544 let exporter = LogExporterThatRequiresTokio::new();
1545
1546 let processor = SimpleLogProcessor::new(exporter.clone());
1547
1548 let mut record: SdkLogRecord = SdkLogRecord::new();
1549 let instrumentation: InstrumentationScope = Default::default();
1550
1551 processor.emit(&mut record, &instrumentation);
1552
1553 assert_eq!(exporter.len(), 1);
1554 }
1555}