1use super::{BatchLogProcessor, LogProcessor, SdkLogger, SimpleLogProcessor};
2use crate::error::{OTelSdkError, OTelSdkResult};
3use crate::logs::LogExporter;
4use crate::Resource;
5use opentelemetry::{otel_debug, otel_info, InstrumentationScope};
6use std::{
7 borrow::Cow,
8 sync::{
9 atomic::{AtomicBool, Ordering},
10 Arc, OnceLock,
11 },
12};
13
14static NOOP_LOGGER_PROVIDER: OnceLock<SdkLoggerProvider> = OnceLock::new();
17
18#[inline]
19fn noop_logger_provider() -> &'static SdkLoggerProvider {
20 NOOP_LOGGER_PROVIDER.get_or_init(|| SdkLoggerProvider {
21 inner: Arc::new(LoggerProviderInner {
22 processors: Vec::new(),
23 resource: Resource::empty(),
24 is_shutdown: AtomicBool::new(true),
25 }),
26 })
27}
28
29#[derive(Debug, Clone)]
30pub struct SdkLoggerProvider {
44 inner: Arc<LoggerProviderInner>,
45}
46
47impl opentelemetry::logs::LoggerProvider for SdkLoggerProvider {
48 type Logger = SdkLogger;
49
50 fn logger(&self, name: impl Into<Cow<'static, str>>) -> Self::Logger {
51 let scope = InstrumentationScope::builder(name).build();
52 self.logger_with_scope(scope)
53 }
54
55 fn logger_with_scope(&self, scope: InstrumentationScope) -> Self::Logger {
56 if self.inner.is_shutdown.load(Ordering::Relaxed) {
58 otel_debug!(
59 name: "LoggerProvider.NoOpLoggerReturned",
60 logger_name = scope.name(),
61 );
62 return SdkLogger::new(scope, noop_logger_provider().clone());
63 }
64 if scope.name().is_empty() {
65 otel_info!(name: "LoggerNameEmpty", message = "Logger name is empty; consider providing a meaningful name. Logger will function normally and the provided name will be used as-is.");
66 };
67 otel_debug!(
68 name: "LoggerProvider.NewLoggerReturned",
69 logger_name = scope.name(),
70 );
71 SdkLogger::new(scope, self.clone())
72 }
73}
74
75impl SdkLoggerProvider {
76 pub fn builder() -> LoggerProviderBuilder {
78 LoggerProviderBuilder::default()
79 }
80
81 pub(crate) fn log_processors(&self) -> &[Box<dyn LogProcessor>] {
82 &self.inner.processors
83 }
84
85 pub(crate) fn resource(&self) -> &Resource {
86 &self.inner.resource
87 }
88
89 pub fn force_flush(&self) -> OTelSdkResult {
91 let result: Vec<_> = self
92 .log_processors()
93 .iter()
94 .map(|processor| processor.force_flush())
95 .collect();
96 if result.iter().all(|r| r.is_ok()) {
97 Ok(())
98 } else {
99 Err(OTelSdkError::InternalFailure(format!("errs: {:?}", result)))
100 }
101 }
102
103 pub fn shutdown(&self) -> OTelSdkResult {
105 otel_debug!(
106 name: "LoggerProvider.ShutdownInvokedByUser",
107 );
108 if self
109 .inner
110 .is_shutdown
111 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
112 .is_ok()
113 {
114 let result = self.inner.shutdown();
116 if result.iter().all(|res| res.is_ok()) {
117 Ok(())
118 } else {
119 Err(OTelSdkError::InternalFailure(format!(
120 "Shutdown errors: {:?}",
121 result
122 .into_iter()
123 .filter_map(Result::err)
124 .collect::<Vec<_>>()
125 )))
126 }
127 } else {
128 Err(OTelSdkError::AlreadyShutdown)
129 }
130 }
131}
132
133#[derive(Debug)]
134struct LoggerProviderInner {
135 processors: Vec<Box<dyn LogProcessor>>,
136 resource: Resource,
137 is_shutdown: AtomicBool,
138}
139
140impl LoggerProviderInner {
141 pub(crate) fn shutdown(&self) -> Vec<OTelSdkResult> {
143 let mut results = vec![];
144 for processor in &self.processors {
145 let result = processor.shutdown();
146 if let Err(err) = &result {
147 otel_debug!(name: "LoggerProvider.ShutdownError",
152 error = format!("{err}"));
153 }
154 results.push(result);
155 }
156 results
157 }
158}
159
160impl Drop for LoggerProviderInner {
161 fn drop(&mut self) {
162 if !self.is_shutdown.load(Ordering::Relaxed) {
163 otel_info!(
164 name: "LoggerProvider.Drop",
165 message = "Last reference of LoggerProvider dropped, initiating shutdown."
166 );
167 let _ = self.shutdown(); } else {
169 otel_debug!(
170 name: "LoggerProvider.Drop.AlreadyShutdown",
171 message = "LoggerProvider was already shut down; drop will not attempt shutdown again."
172 );
173 }
174 }
175}
176
177#[derive(Debug, Default)]
178pub struct LoggerProviderBuilder {
180 processors: Vec<Box<dyn LogProcessor>>,
181 resource: Option<Resource>,
182}
183
184impl LoggerProviderBuilder {
185 pub fn with_simple_exporter<T: LogExporter + 'static>(self, exporter: T) -> Self {
197 let mut processors = self.processors;
198 processors.push(Box::new(SimpleLogProcessor::new(exporter)));
199
200 LoggerProviderBuilder { processors, ..self }
201 }
202
203 pub fn with_batch_exporter<T: LogExporter + 'static>(self, exporter: T) -> Self {
215 let batch = BatchLogProcessor::builder(exporter).build();
216 self.with_log_processor(batch)
217 }
218
219 pub fn with_log_processor<T: LogProcessor + 'static>(self, processor: T) -> Self {
231 let mut processors = self.processors;
232 processors.push(Box::new(processor));
233
234 LoggerProviderBuilder { processors, ..self }
235 }
236
237 pub fn with_resource(self, resource: Resource) -> Self {
239 LoggerProviderBuilder {
240 resource: Some(resource),
241 ..self
242 }
243 }
244
245 pub fn build(self) -> SdkLoggerProvider {
247 let resource = self.resource.unwrap_or(Resource::builder().build());
248
249 let logger_provider = SdkLoggerProvider {
250 inner: Arc::new(LoggerProviderInner {
251 processors: self.processors,
252 resource,
253 is_shutdown: AtomicBool::new(false),
254 }),
255 };
256
257 for processor in logger_provider.log_processors() {
259 processor.set_resource(logger_provider.resource());
260 }
261
262 otel_debug!(
263 name: "LoggerProvider.Built",
264 );
265 logger_provider
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use crate::{
272 logs::{InMemoryLogExporter, SdkLogRecord, TraceContext},
273 resource::{
274 SERVICE_NAME, TELEMETRY_SDK_LANGUAGE, TELEMETRY_SDK_NAME, TELEMETRY_SDK_VERSION,
275 },
276 trace::SdkTracerProvider,
277 Resource,
278 };
279
280 use super::*;
281 use opentelemetry::trace::{SpanId, TraceId, Tracer as _, TracerProvider};
282 use opentelemetry::{
283 logs::{AnyValue, LogRecord as _, Logger, LoggerProvider},
284 trace::TraceContextExt,
285 };
286 use opentelemetry::{Key, KeyValue, Value};
287 use std::fmt::{Debug, Formatter};
288 use std::sync::atomic::AtomicU64;
289 use std::sync::Mutex;
290 use std::thread;
291
292 struct ShutdownTestLogProcessor {
293 is_shutdown: Arc<Mutex<bool>>,
294 counter: Arc<AtomicU64>,
295 }
296
297 impl Debug for ShutdownTestLogProcessor {
298 fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
299 todo!()
300 }
301 }
302
303 impl ShutdownTestLogProcessor {
304 pub(crate) fn new(counter: Arc<AtomicU64>) -> Self {
305 ShutdownTestLogProcessor {
306 is_shutdown: Arc::new(Mutex::new(false)),
307 counter,
308 }
309 }
310 }
311
312 impl LogProcessor for ShutdownTestLogProcessor {
313 fn emit(&self, _data: &mut SdkLogRecord, _scope: &InstrumentationScope) {
314 self.is_shutdown
315 .lock()
316 .map(|is_shutdown| {
317 if !*is_shutdown {
318 self.counter
319 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
320 }
321 })
322 .expect("lock poisoned");
323 }
324
325 fn force_flush(&self) -> OTelSdkResult {
326 Ok(())
327 }
328
329 fn shutdown(&self) -> OTelSdkResult {
330 self.is_shutdown
331 .lock()
332 .map(|mut is_shutdown| *is_shutdown = true)
333 .expect("lock poisoned");
334 Ok(())
335 }
336 }
337 #[test]
338 fn test_logger_provider_default_resource() {
339 let assert_resource = |provider: &super::SdkLoggerProvider,
340 resource_key: &'static str,
341 expect: Option<&'static str>| {
342 assert_eq!(
343 provider
344 .resource()
345 .get(&Key::from_static_str(resource_key))
346 .map(|v| v.to_string()),
347 expect.map(|s| s.to_string())
348 );
349 };
350 let assert_telemetry_resource = |provider: &super::SdkLoggerProvider| {
351 assert_eq!(
352 provider.resource().get(&TELEMETRY_SDK_LANGUAGE.into()),
353 Some(Value::from("rust"))
354 );
355 assert_eq!(
356 provider.resource().get(&TELEMETRY_SDK_NAME.into()),
357 Some(Value::from("opentelemetry"))
358 );
359 assert_eq!(
360 provider.resource().get(&TELEMETRY_SDK_VERSION.into()),
361 Some(Value::from(env!("CARGO_PKG_VERSION")))
362 );
363 };
364
365 temp_env::with_var_unset("OTEL_RESOURCE_ATTRIBUTES", || {
367 let default_config_provider = super::SdkLoggerProvider::builder().build();
368 assert_resource(
369 &default_config_provider,
370 SERVICE_NAME,
371 Some("unknown_service"),
372 );
373 assert_telemetry_resource(&default_config_provider);
374 });
375
376 let custom_config_provider = super::SdkLoggerProvider::builder()
378 .with_resource(
379 Resource::builder_empty()
380 .with_service_name("test_service")
381 .build(),
382 )
383 .build();
384 assert_resource(&custom_config_provider, SERVICE_NAME, Some("test_service"));
385 assert_eq!(custom_config_provider.resource().len(), 1);
386
387 temp_env::with_var(
389 "OTEL_RESOURCE_ATTRIBUTES",
390 Some("key1=value1, k2, k3=value2"),
391 || {
392 let env_resource_provider = super::SdkLoggerProvider::builder().build();
393 assert_resource(
394 &env_resource_provider,
395 SERVICE_NAME,
396 Some("unknown_service"),
397 );
398 assert_resource(&env_resource_provider, "key1", Some("value1"));
399 assert_resource(&env_resource_provider, "k3", Some("value2"));
400 assert_telemetry_resource(&env_resource_provider);
401 assert_eq!(env_resource_provider.resource().len(), 6);
402 },
403 );
404
405 temp_env::with_var(
407 "OTEL_RESOURCE_ATTRIBUTES",
408 Some("my-custom-key=env-val,k2=value2"),
409 || {
410 let user_provided_resource_config_provider = super::SdkLoggerProvider::builder()
411 .with_resource(
412 Resource::builder()
413 .with_attributes([
414 KeyValue::new("my-custom-key", "my-custom-value"),
415 KeyValue::new("my-custom-key2", "my-custom-value2"),
416 ])
417 .build(),
418 )
419 .build();
420 assert_resource(
421 &user_provided_resource_config_provider,
422 SERVICE_NAME,
423 Some("unknown_service"),
424 );
425 assert_resource(
426 &user_provided_resource_config_provider,
427 "my-custom-key",
428 Some("my-custom-value"),
429 );
430 assert_resource(
431 &user_provided_resource_config_provider,
432 "my-custom-key2",
433 Some("my-custom-value2"),
434 );
435 assert_resource(
436 &user_provided_resource_config_provider,
437 "k2",
438 Some("value2"),
439 );
440 assert_telemetry_resource(&user_provided_resource_config_provider);
441 assert_eq!(user_provided_resource_config_provider.resource().len(), 7);
442 },
443 );
444
445 let no_service_name = super::SdkLoggerProvider::builder()
447 .with_resource(Resource::empty())
448 .build();
449 assert_eq!(no_service_name.resource().len(), 0);
450 }
451
452 #[test]
453 fn trace_context_test() {
454 let exporter = InMemoryLogExporter::default();
455
456 let logger_provider = SdkLoggerProvider::builder()
457 .with_simple_exporter(exporter.clone())
458 .build();
459
460 let logger = logger_provider.logger("test-logger");
461
462 let tracer_provider = SdkTracerProvider::builder().build();
463
464 let tracer = tracer_provider.tracer("test-tracer");
465
466 tracer.in_span("test-span", |cx| {
467 let ambient_ctxt = cx.span().span_context().clone();
468 let explicit_ctxt = TraceContext {
469 trace_id: TraceId::from_u128(13),
470 span_id: SpanId::from_u64(14),
471 trace_flags: None,
472 };
473
474 let mut ambient_ctxt_record = logger.create_log_record();
475 ambient_ctxt_record.set_body(AnyValue::String("ambient".into()));
476
477 let mut explicit_ctxt_record = logger.create_log_record();
478 explicit_ctxt_record.set_body(AnyValue::String("explicit".into()));
479 explicit_ctxt_record.set_trace_context(
480 explicit_ctxt.trace_id,
481 explicit_ctxt.span_id,
482 explicit_ctxt.trace_flags,
483 );
484
485 logger.emit(ambient_ctxt_record);
486 logger.emit(explicit_ctxt_record);
487
488 let emitted = exporter.get_emitted_logs().unwrap();
489
490 assert_eq!(
491 Some(AnyValue::String("ambient".into())),
492 emitted[0].record.body
493 );
494 assert_eq!(
495 ambient_ctxt.trace_id(),
496 emitted[0].record.trace_context.as_ref().unwrap().trace_id
497 );
498 assert_eq!(
499 ambient_ctxt.span_id(),
500 emitted[0].record.trace_context.as_ref().unwrap().span_id
501 );
502
503 assert_eq!(
504 Some(AnyValue::String("explicit".into())),
505 emitted[1].record.body
506 );
507 assert_eq!(
508 explicit_ctxt.trace_id,
509 emitted[1].record.trace_context.as_ref().unwrap().trace_id
510 );
511 assert_eq!(
512 explicit_ctxt.span_id,
513 emitted[1].record.trace_context.as_ref().unwrap().span_id
514 );
515 });
516 }
517
518 #[test]
519 fn shutdown_test() {
520 let counter = Arc::new(AtomicU64::new(0));
521 let logger_provider = SdkLoggerProvider::builder()
522 .with_log_processor(ShutdownTestLogProcessor::new(counter.clone()))
523 .build();
524
525 let logger1 = logger_provider.logger("test-logger1");
526 let logger2 = logger_provider.logger("test-logger2");
527 logger1.emit(logger1.create_log_record());
528 logger2.emit(logger1.create_log_record());
529
530 let logger3 = logger_provider.logger("test-logger3");
531 let handle = thread::spawn(move || {
532 logger3.emit(logger3.create_log_record());
533 });
534 handle.join().expect("thread panicked");
535
536 let _ = logger_provider.shutdown();
537 logger1.emit(logger1.create_log_record());
538
539 assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 3);
540 }
541
542 #[test]
543 fn shutdown_idempotent_test() {
544 let counter = Arc::new(AtomicU64::new(0));
545 let logger_provider = SdkLoggerProvider::builder()
546 .with_log_processor(ShutdownTestLogProcessor::new(counter.clone()))
547 .build();
548
549 let shutdown_res = logger_provider.shutdown();
550 assert!(shutdown_res.is_ok());
551
552 let shutdown_res = logger_provider.shutdown();
554 assert!(shutdown_res.is_err());
555
556 let shutdown_res = logger_provider.shutdown();
558 assert!(shutdown_res.is_err());
559 }
560
561 #[test]
562 fn global_shutdown_test() {
563 let shutdown_called = Arc::new(Mutex::new(false));
567 let flush_called = Arc::new(Mutex::new(false));
568 let logger_provider = SdkLoggerProvider::builder()
569 .with_log_processor(LazyLogProcessor::new(
570 shutdown_called.clone(),
571 flush_called.clone(),
572 ))
573 .build();
574 let logger1 = logger_provider.logger("test-logger1");
576 let logger2 = logger_provider.logger("test-logger2");
577
578 logger1.emit(logger1.create_log_record());
580 logger2.emit(logger1.create_log_record());
581
582 let _ = logger_provider.shutdown();
585
586 assert!(*shutdown_called.lock().unwrap());
590
591 assert!(!*flush_called.lock().unwrap());
593 }
594
595 #[test]
596 fn drop_test_with_multiple_providers() {
597 let shutdown_called = Arc::new(Mutex::new(false));
598 let flush_called = Arc::new(Mutex::new(false));
599 {
600 let shared_inner = Arc::new(LoggerProviderInner {
602 processors: vec![Box::new(LazyLogProcessor::new(
603 shutdown_called.clone(),
604 flush_called.clone(),
605 ))],
606 resource: Resource::empty(),
607 is_shutdown: AtomicBool::new(false),
608 });
609
610 {
611 let logger_provider1 = SdkLoggerProvider {
612 inner: shared_inner.clone(),
613 };
614 let logger_provider2 = SdkLoggerProvider {
615 inner: shared_inner.clone(),
616 };
617
618 let logger1 = logger_provider1.logger("test-logger1");
619 let logger2 = logger_provider2.logger("test-logger2");
620
621 logger1.emit(logger1.create_log_record());
622 logger2.emit(logger1.create_log_record());
623
624 }
627 }
630 assert!(*shutdown_called.lock().unwrap());
632 assert!(!*flush_called.lock().unwrap());
634 }
635
636 #[test]
637 fn drop_after_shutdown_test_with_multiple_providers() {
638 let shutdown_called = Arc::new(Mutex::new(0)); let flush_called = Arc::new(Mutex::new(false));
640
641 let shared_inner = Arc::new(LoggerProviderInner {
643 processors: vec![Box::new(CountingShutdownProcessor::new(
644 shutdown_called.clone(),
645 flush_called.clone(),
646 ))],
647 resource: Resource::empty(),
648 is_shutdown: AtomicBool::new(false),
649 });
650
651 {
653 let logger_provider1 = SdkLoggerProvider {
654 inner: shared_inner.clone(),
655 };
656 let logger_provider2 = SdkLoggerProvider {
657 inner: shared_inner.clone(),
658 };
659
660 let shutdown_result = logger_provider1.shutdown();
662 println!("---->Result: {:?}", shutdown_result);
663 assert!(shutdown_result.is_ok());
664
665 assert_eq!(*shutdown_called.lock().unwrap(), 1);
667
668 let shutdown_result2 = logger_provider2.shutdown();
670 assert!(shutdown_result2.is_err());
671
672 }
674
675 assert_eq!(*shutdown_called.lock().unwrap(), 1);
677 }
678
679 #[test]
680 fn test_empty_logger_name() {
681 let exporter = InMemoryLogExporter::default();
682 let logger_provider = SdkLoggerProvider::builder()
683 .with_simple_exporter(exporter.clone())
684 .build();
685 let logger = logger_provider.logger("");
686 let mut record = logger.create_log_record();
687 record.set_body("Testing empty logger name".into());
688 logger.emit(record);
689
690 let scope = InstrumentationScope::builder("").build();
692 let scoped_logger = logger_provider.logger_with_scope(scope);
693 let mut scoped_record = scoped_logger.create_log_record();
694 scoped_record.set_body("Testing empty logger scope name".into());
695 scoped_logger.emit(scoped_record);
696
697 let emitted_logs = exporter.get_emitted_logs().unwrap();
699 assert_eq!(emitted_logs.len(), 2);
700 assert_eq!(
702 emitted_logs[0].clone().record.body,
703 Some(AnyValue::String("Testing empty logger name".into()))
704 );
705 assert_eq!(logger.instrumentation_scope().name(), "");
706
707 assert_eq!(
709 emitted_logs[1].clone().record.body,
710 Some(AnyValue::String("Testing empty logger scope name".into()))
711 );
712 assert_eq!(scoped_logger.instrumentation_scope().name(), "");
713 }
714
715 #[derive(Debug)]
716 pub(crate) struct LazyLogProcessor {
717 shutdown_called: Arc<Mutex<bool>>,
718 flush_called: Arc<Mutex<bool>>,
719 }
720
721 impl LazyLogProcessor {
722 pub(crate) fn new(
723 shutdown_called: Arc<Mutex<bool>>,
724 flush_called: Arc<Mutex<bool>>,
725 ) -> Self {
726 LazyLogProcessor {
727 shutdown_called,
728 flush_called,
729 }
730 }
731 }
732
733 impl LogProcessor for LazyLogProcessor {
734 fn emit(&self, _data: &mut SdkLogRecord, _scope: &InstrumentationScope) {
735 }
737
738 fn force_flush(&self) -> OTelSdkResult {
739 *self.flush_called.lock().unwrap() = true;
740 Ok(())
741 }
742
743 fn shutdown(&self) -> OTelSdkResult {
744 *self.shutdown_called.lock().unwrap() = true;
745 Ok(())
746 }
747 }
748
749 #[derive(Debug)]
750 struct CountingShutdownProcessor {
751 shutdown_count: Arc<Mutex<i32>>,
752 flush_called: Arc<Mutex<bool>>,
753 }
754
755 impl CountingShutdownProcessor {
756 fn new(shutdown_count: Arc<Mutex<i32>>, flush_called: Arc<Mutex<bool>>) -> Self {
757 CountingShutdownProcessor {
758 shutdown_count,
759 flush_called,
760 }
761 }
762 }
763
764 impl LogProcessor for CountingShutdownProcessor {
765 fn emit(&self, _data: &mut SdkLogRecord, _scope: &InstrumentationScope) {
766 }
768
769 fn force_flush(&self) -> OTelSdkResult {
770 *self.flush_called.lock().unwrap() = true;
771 Ok(())
772 }
773
774 fn shutdown(&self) -> OTelSdkResult {
775 let mut count = self.shutdown_count.lock().unwrap();
776 *count += 1;
777 Ok(())
778 }
779 }
780}