opentelemetry_sdk/metrics/
periodic_reader.rs1use std::{
2 env, fmt,
3 sync::{
4 mpsc::{self, Receiver, Sender},
5 Arc, Mutex, Weak,
6 },
7 thread,
8 time::{Duration, Instant},
9};
10
11use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn};
12
13use crate::{
14 error::{OTelSdkError, OTelSdkResult},
15 metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult},
16 Resource,
17};
18
19use super::{
20 data::ResourceMetrics, instrument::InstrumentKind, reader::MetricReader, Pipeline, Temporality,
21};
22
23const DEFAULT_INTERVAL: Duration = Duration::from_secs(60);
24
25const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL";
26
27#[derive(Debug)]
29pub struct PeriodicReaderBuilder<E> {
30 interval: Duration,
31 exporter: E,
32}
33
34impl<E> PeriodicReaderBuilder<E>
35where
36 E: PushMetricExporter,
37{
38 fn new(exporter: E) -> Self {
39 let interval = env::var(METRIC_EXPORT_INTERVAL_NAME)
40 .ok()
41 .and_then(|v| v.parse().map(Duration::from_millis).ok())
42 .unwrap_or(DEFAULT_INTERVAL);
43
44 PeriodicReaderBuilder { interval, exporter }
45 }
46
47 pub fn with_interval(mut self, interval: Duration) -> Self {
55 if !interval.is_zero() {
56 self.interval = interval;
57 }
58 self
59 }
60
61 pub fn build(self) -> PeriodicReader {
63 PeriodicReader::new(self.exporter, self.interval)
64 }
65}
66
67#[derive(Clone)]
128pub struct PeriodicReader {
129 inner: Arc<PeriodicReaderInner>,
130}
131
132impl PeriodicReader {
133 pub fn builder<E>(exporter: E) -> PeriodicReaderBuilder<E>
135 where
136 E: PushMetricExporter,
137 {
138 PeriodicReaderBuilder::new(exporter)
139 }
140
141 fn new<E>(exporter: E, interval: Duration) -> Self
142 where
143 E: PushMetricExporter,
144 {
145 let (message_sender, message_receiver): (Sender<Message>, Receiver<Message>) =
146 mpsc::channel();
147 let exporter_arc = Arc::new(exporter);
148 let reader = PeriodicReader {
149 inner: Arc::new(PeriodicReaderInner {
150 message_sender,
151 producer: Mutex::new(None),
152 exporter: exporter_arc.clone(),
153 }),
154 };
155 let cloned_reader = reader.clone();
156
157 let result_thread_creation = thread::Builder::new()
158 .name("OpenTelemetry.Metrics.PeriodicReader".to_string())
159 .spawn(move || {
160 let mut interval_start = Instant::now();
161 let mut remaining_interval = interval;
162 otel_info!(
163 name: "PeriodReaderThreadStarted",
164 interval_in_millisecs = interval.as_millis(),
165 );
166 loop {
167 otel_debug!(
168 name: "PeriodReaderThreadLoopAlive", message = "Next export will happen after interval, unless flush or shutdown is triggered.", interval_in_millisecs = remaining_interval.as_millis()
169 );
170 match message_receiver.recv_timeout(remaining_interval) {
171 Ok(Message::Flush(response_sender)) => {
172 otel_debug!(
173 name: "PeriodReaderThreadExportingDueToFlush"
174 );
175 let export_result = cloned_reader.collect_and_export();
176 otel_debug!(
177 name: "PeriodReaderInvokedExport",
178 export_result = format!("{:?}", export_result)
179 );
180
181 if export_result.is_err() {
191 if response_sender.send(false).is_err() {
192 otel_info!(
193 name: "PeriodReader.Flush.ResponseSendError",
194 message = "PeriodicReader's flush has failed, but unable to send this info back to caller.
195 This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the flush timeout."
196 );
197 }
198 } else if response_sender.send(true).is_err() {
199 otel_info!(
200 name: "PeriodReader.Flush.ResponseSendError",
201 message = "PeriodicReader's flush has completed successfully, but unable to send this info back to caller.
202 This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the flush timeout."
203 );
204 }
205
206 let elapsed = interval_start.elapsed();
208 if elapsed < interval {
209 remaining_interval = interval - elapsed;
210 otel_debug!(
211 name: "PeriodReaderThreadAdjustingRemainingIntervalAfterFlush",
212 remaining_interval = remaining_interval.as_secs()
213 );
214 } else {
215 otel_debug!(
216 name: "PeriodReaderThreadAdjustingExportAfterFlush",
217 );
218 interval_start = Instant::now();
225 remaining_interval = Duration::ZERO;
226 }
227 }
228 Ok(Message::Shutdown(response_sender)) => {
229 otel_debug!(name: "PeriodReaderThreadExportingDueToShutdown");
231 let export_result = cloned_reader.collect_and_export();
232 otel_debug!(
233 name: "PeriodReaderInvokedExport",
234 export_result = format!("{:?}", export_result)
235 );
236 let shutdown_result = exporter_arc.shutdown();
237 otel_debug!(
238 name: "PeriodReaderInvokedExporterShutdown",
239 shutdown_result = format!("{:?}", shutdown_result)
240 );
241
242 if export_result.is_err() || shutdown_result.is_err() {
252 if response_sender.send(false).is_err() {
253 otel_info!(
254 name: "PeriodReaderThreadShutdown.ResponseSendError",
255 message = "PeriodicReader's shutdown has failed, but unable to send this info back to caller.
256 This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the shutdown timeout."
257 );
258 }
259 } else if response_sender.send(true).is_err() {
260 otel_info!(
261 name: "PeriodReaderThreadShutdown.ResponseSendError",
262 message = "PeriodicReader completed its shutdown, but unable to send this info back to caller.
263 This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the shutdown timeout."
264 );
265 }
266
267 otel_debug!(
268 name: "PeriodReaderThreadExiting",
269 reason = "ShutdownRequested"
270 );
271 break;
272 }
273 Err(mpsc::RecvTimeoutError::Timeout) => {
274 let export_start = Instant::now();
275 otel_debug!(
276 name: "PeriodReaderThreadExportingDueToTimer"
277 );
278
279 let export_result = cloned_reader.collect_and_export();
280 otel_debug!(
281 name: "PeriodReaderInvokedExport",
282 export_result = format!("{:?}", export_result)
283 );
284
285 let time_taken_for_export = export_start.elapsed();
286 if time_taken_for_export > interval {
287 otel_debug!(
288 name: "PeriodReaderThreadExportTookLongerThanInterval"
289 );
290 interval_start = Instant::now();
297 remaining_interval = Duration::ZERO;
298 } else {
299 remaining_interval = interval - time_taken_for_export;
300 interval_start = Instant::now();
301 }
302 }
303 Err(mpsc::RecvTimeoutError::Disconnected) => {
304 otel_debug!(
307 name: "PeriodReaderThreadExiting",
308 reason = "MessageSenderDisconnected"
309 );
310 break;
311 }
312 }
313 }
314 otel_info!(
315 name: "PeriodReaderThreadStopped"
316 );
317 });
318
319 #[allow(unused_variables)]
321 if let Err(e) = result_thread_creation {
322 otel_error!(
323 name: "PeriodReaderThreadStartError",
324 message = "Failed to start PeriodicReader thread. Metrics will not be exported.",
325 error = format!("{:?}", e)
326 );
327 }
328 reader
329 }
330
331 fn collect_and_export(&self) -> OTelSdkResult {
332 self.inner.collect_and_export()
333 }
334}
335
336impl fmt::Debug for PeriodicReader {
337 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
338 f.debug_struct("PeriodicReader").finish()
339 }
340}
341
342struct PeriodicReaderInner {
343 exporter: Arc<dyn PushMetricExporter>,
344 message_sender: mpsc::Sender<Message>,
345 producer: Mutex<Option<Weak<dyn SdkProducer>>>,
346}
347
348impl PeriodicReaderInner {
349 fn register_pipeline(&self, producer: Weak<dyn SdkProducer>) {
350 let mut inner = self.producer.lock().expect("lock poisoned");
351 *inner = Some(producer);
352 }
353
354 fn temporality(&self, _kind: InstrumentKind) -> Temporality {
355 self.exporter.temporality()
356 }
357
358 fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
359 let producer = self.producer.lock().expect("lock poisoned");
360 if let Some(p) = producer.as_ref() {
361 p.upgrade()
362 .ok_or_else(|| MetricError::Other("pipeline is dropped".into()))?
363 .produce(rm)?;
364 Ok(())
365 } else {
366 otel_warn!(
367 name: "PeriodReader.MeterProviderNotRegistered",
368 message = "PeriodicReader is not registered with MeterProvider. Metrics will not be collected. \
369 This occurs when a periodic reader is created but not associated with a MeterProvider \
370 by calling `.with_reader(reader)` on MeterProviderBuilder."
371 );
372 Err(MetricError::Other("MeterProvider is not registered".into()))
373 }
374 }
375
376 fn collect_and_export(&self) -> OTelSdkResult {
377 let mut rm = ResourceMetrics {
380 resource: Resource::empty(),
381 scope_metrics: Vec::new(),
382 };
383
384 let current_time = Instant::now();
385 let collect_result = self.collect(&mut rm);
386 let time_taken_for_collect = current_time.elapsed();
387
388 #[allow(clippy::question_mark)]
389 if let Err(e) = collect_result {
390 otel_warn!(
391 name: "PeriodReaderCollectError",
392 error = format!("{:?}", e)
393 );
394 return Err(OTelSdkError::InternalFailure(e.to_string()));
395 }
396
397 if rm.scope_metrics.is_empty() {
398 otel_debug!(name: "NoMetricsCollected");
399 return Ok(());
400 }
401
402 let metrics_count = rm.scope_metrics.iter().fold(0, |count, scope_metrics| {
403 count + scope_metrics.metrics.len()
404 });
405 otel_debug!(name: "PeriodicReaderMetricsCollected", count = metrics_count, time_taken_in_millis = time_taken_for_collect.as_millis());
406
407 futures_executor::block_on(self.exporter.export(&mut rm))
410 }
411
412 fn force_flush(&self) -> OTelSdkResult {
413 let (response_tx, response_rx) = mpsc::channel();
427 self.message_sender
428 .send(Message::Flush(response_tx))
429 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
430
431 if let Ok(response) = response_rx.recv() {
432 if response {
434 Ok(())
435 } else {
436 Err(OTelSdkError::InternalFailure("Failed to flush".into()))
437 }
438 } else {
439 Err(OTelSdkError::InternalFailure("Failed to flush".into()))
440 }
441 }
442
443 fn shutdown(&self) -> OTelSdkResult {
444 let (response_tx, response_rx) = mpsc::channel();
446 self.message_sender
447 .send(Message::Shutdown(response_tx))
448 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
449
450 match response_rx.recv_timeout(Duration::from_secs(5)) {
452 Ok(response) => {
453 if response {
454 Ok(())
455 } else {
456 Err(OTelSdkError::InternalFailure("Failed to shutdown".into()))
457 }
458 }
459 Err(mpsc::RecvTimeoutError::Timeout) => {
460 Err(OTelSdkError::Timeout(Duration::from_secs(5)))
461 }
462 Err(mpsc::RecvTimeoutError::Disconnected) => {
463 Err(OTelSdkError::InternalFailure("Failed to shutdown".into()))
464 }
465 }
466 }
467}
468
469#[derive(Debug)]
470enum Message {
471 Flush(Sender<bool>),
472 Shutdown(Sender<bool>),
473}
474
475impl MetricReader for PeriodicReader {
476 fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
477 self.inner.register_pipeline(pipeline);
478 }
479
480 fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
481 self.inner.collect(rm)
482 }
483
484 fn force_flush(&self) -> OTelSdkResult {
485 self.inner.force_flush()
486 }
487
488 fn shutdown(&self) -> OTelSdkResult {
493 self.inner.shutdown()
494 }
495
496 fn temporality(&self, kind: InstrumentKind) -> Temporality {
504 kind.temporality_preference(self.inner.temporality(kind))
505 }
506}
507
508#[cfg(all(test, feature = "testing"))]
509mod tests {
510 use super::PeriodicReader;
511 use crate::{
512 error::{OTelSdkError, OTelSdkResult},
513 metrics::{
514 data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader,
515 InMemoryMetricExporter, SdkMeterProvider, Temporality,
516 },
517 Resource,
518 };
519 use async_trait::async_trait;
520 use opentelemetry::metrics::MeterProvider;
521 use std::{
522 sync::{
523 atomic::{AtomicBool, AtomicUsize, Ordering},
524 mpsc, Arc,
525 },
526 time::Duration,
527 };
528
529 #[derive(Debug, Clone)]
533 struct MetricExporterThatFailsOnlyOnFirst {
534 count: Arc<AtomicUsize>,
535 }
536
537 impl Default for MetricExporterThatFailsOnlyOnFirst {
538 fn default() -> Self {
539 MetricExporterThatFailsOnlyOnFirst {
540 count: Arc::new(AtomicUsize::new(0)),
541 }
542 }
543 }
544
545 impl MetricExporterThatFailsOnlyOnFirst {
546 fn get_count(&self) -> usize {
547 self.count.load(Ordering::Relaxed)
548 }
549 }
550
551 #[async_trait]
552 impl PushMetricExporter for MetricExporterThatFailsOnlyOnFirst {
553 async fn export(&self, _metrics: &mut ResourceMetrics) -> OTelSdkResult {
554 if self.count.fetch_add(1, Ordering::Relaxed) == 0 {
555 Err(OTelSdkError::InternalFailure("export failed".into()))
556 } else {
557 Ok(())
558 }
559 }
560
561 async fn force_flush(&self) -> OTelSdkResult {
562 Ok(())
563 }
564
565 fn shutdown(&self) -> OTelSdkResult {
566 Ok(())
567 }
568
569 fn temporality(&self) -> Temporality {
570 Temporality::Cumulative
571 }
572 }
573
574 #[derive(Debug, Clone, Default)]
575 struct MockMetricExporter {
576 is_shutdown: Arc<AtomicBool>,
577 }
578
579 #[async_trait]
580 impl PushMetricExporter for MockMetricExporter {
581 async fn export(&self, _metrics: &mut ResourceMetrics) -> OTelSdkResult {
582 Ok(())
583 }
584
585 async fn force_flush(&self) -> OTelSdkResult {
586 Ok(())
587 }
588
589 fn shutdown(&self) -> OTelSdkResult {
590 self.is_shutdown.store(true, Ordering::Relaxed);
591 Ok(())
592 }
593
594 fn temporality(&self) -> Temporality {
595 Temporality::Cumulative
596 }
597 }
598
599 #[test]
600 fn collection_triggered_by_interval_multiple() {
601 let interval = std::time::Duration::from_millis(1);
603 let exporter = InMemoryMetricExporter::default();
604 let reader = PeriodicReader::builder(exporter.clone())
605 .with_interval(interval)
606 .build();
607 let i = Arc::new(AtomicUsize::new(0));
608 let i_clone = i.clone();
609
610 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
612 let meter = meter_provider.meter("test");
613 let _counter = meter
614 .u64_observable_counter("testcounter")
615 .with_callback(move |_| {
616 i_clone.fetch_add(1, Ordering::Relaxed);
617 })
618 .build();
619
620 std::thread::sleep(interval * 5 * 20);
626
627 assert!(i.load(Ordering::Relaxed) >= 5);
629 }
630
631 #[test]
632 fn shutdown_repeat() {
633 let exporter = InMemoryMetricExporter::default();
635 let reader = PeriodicReader::builder(exporter.clone()).build();
636
637 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
638 let result = meter_provider.shutdown();
639 assert!(result.is_ok());
640
641 let result = meter_provider.shutdown();
643 assert!(result.is_err());
644 assert!(matches!(result, Err(OTelSdkError::AlreadyShutdown)));
645
646 let result = meter_provider.shutdown();
648 assert!(result.is_err());
649 assert!(matches!(result, Err(OTelSdkError::AlreadyShutdown)));
650 }
651
652 #[test]
653 fn flush_after_shutdown() {
654 let exporter = InMemoryMetricExporter::default();
656 let reader = PeriodicReader::builder(exporter.clone()).build();
657
658 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
659 let result = meter_provider.force_flush();
660 assert!(result.is_ok());
661
662 let result = meter_provider.shutdown();
663 assert!(result.is_ok());
664
665 let result = meter_provider.force_flush();
667 assert!(result.is_err());
668 }
669
670 #[test]
671 fn flush_repeat() {
672 let exporter = InMemoryMetricExporter::default();
674 let reader = PeriodicReader::builder(exporter.clone()).build();
675
676 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
677 let result = meter_provider.force_flush();
678 assert!(result.is_ok());
679
680 let result = meter_provider.force_flush();
682 assert!(result.is_ok());
683 }
684
685 #[test]
686 fn periodic_reader_without_pipeline() {
687 let exporter = InMemoryMetricExporter::default();
689 let reader = PeriodicReader::builder(exporter.clone()).build();
690
691 let rm = &mut ResourceMetrics {
692 resource: Resource::empty(),
693 scope_metrics: Vec::new(),
694 };
695 let result = reader.collect(rm);
697 assert!(result.is_err());
698
699 let result = reader.force_flush();
701 assert!(result.is_err());
702
703 let meter_provider = SdkMeterProvider::builder()
706 .with_reader(reader.clone())
707 .build();
708
709 let result = reader.collect(rm);
711 assert!(result.is_ok());
712
713 let result = meter_provider.force_flush();
714 assert!(result.is_ok());
715 }
716
717 #[test]
718 fn exporter_failures_are_handled() {
719 let interval = std::time::Duration::from_millis(10);
724 let exporter = MetricExporterThatFailsOnlyOnFirst::default();
725 let reader = PeriodicReader::builder(exporter.clone())
726 .with_interval(interval)
727 .build();
728
729 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
730 let meter = meter_provider.meter("test");
731 let counter = meter.u64_counter("sync_counter").build();
732 counter.add(1, &[]);
733 let _obs_counter = meter
734 .u64_observable_counter("testcounter")
735 .with_callback(move |observer| {
736 observer.observe(1, &[]);
737 })
738 .build();
739
740 std::thread::sleep(Duration::from_millis(500));
746
747 assert!(exporter.get_count() >= 2);
749 }
750
751 #[test]
752 fn shutdown_passed_to_exporter() {
753 let exporter = MockMetricExporter::default();
755 let reader = PeriodicReader::builder(exporter.clone()).build();
756
757 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
758 let meter = meter_provider.meter("test");
759 let counter = meter.u64_counter("sync_counter").build();
760 counter.add(1, &[]);
761
762 let result = meter_provider.shutdown();
765 assert!(result.is_ok());
766 assert!(exporter.is_shutdown.load(Ordering::Relaxed));
767 }
768
769 #[test]
770 fn collection() {
771 collection_triggered_by_interval_helper();
772 collection_triggered_by_flush_helper();
773 collection_triggered_by_shutdown_helper();
774 collection_triggered_by_drop_helper();
775 }
776
777 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
778 async fn collection_from_tokio_multi_with_one_worker() {
779 collection_triggered_by_interval_helper();
780 collection_triggered_by_flush_helper();
781 collection_triggered_by_shutdown_helper();
782 collection_triggered_by_drop_helper();
783 }
784
785 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
786 async fn collection_from_tokio_with_two_worker() {
787 collection_triggered_by_interval_helper();
788 collection_triggered_by_flush_helper();
789 collection_triggered_by_shutdown_helper();
790 collection_triggered_by_drop_helper();
791 }
792
793 #[tokio::test(flavor = "current_thread")]
794 async fn collection_from_tokio_current() {
795 collection_triggered_by_interval_helper();
796 collection_triggered_by_flush_helper();
797 collection_triggered_by_shutdown_helper();
798 collection_triggered_by_drop_helper();
799 }
800
801 fn collection_triggered_by_interval_helper() {
802 collection_helper(|_| {
803 std::thread::sleep(Duration::from_millis(500));
808 });
809 }
810
811 fn collection_triggered_by_flush_helper() {
812 collection_helper(|meter_provider| {
813 meter_provider.force_flush().expect("flush should succeed");
814 });
815 }
816
817 fn collection_triggered_by_shutdown_helper() {
818 collection_helper(|meter_provider| {
819 meter_provider.shutdown().expect("shutdown should succeed");
820 });
821 }
822
823 fn collection_triggered_by_drop_helper() {
824 collection_helper(|meter_provider| {
825 drop(meter_provider);
826 });
827 }
828
829 fn collection_helper(trigger: fn(SdkMeterProvider)) {
830 let exporter = InMemoryMetricExporter::default();
832 let reader = PeriodicReader::builder(exporter.clone()).build();
833 let (sender, receiver) = mpsc::channel();
834
835 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
836 let meter = meter_provider.meter("test");
837 let _counter = meter
838 .u64_observable_counter("testcounter")
839 .with_callback(move |observer| {
840 observer.observe(1, &[]);
841 sender.send(()).expect("channel should still be open");
842 })
843 .build();
844
845 trigger(meter_provider);
847
848 receiver
850 .recv_timeout(Duration::ZERO)
851 .expect("message should be available in channel, indicating a collection occurred, which should trigger observable callback");
852
853 let exported_metrics = exporter
854 .get_finished_metrics()
855 .expect("this should not fail");
856 assert!(
857 !exported_metrics.is_empty(),
858 "Metrics should be available in exporter."
859 );
860 }
861
862 async fn some_async_function() -> u64 {
863 std::thread::sleep(std::time::Duration::from_millis(1));
865 1
866 }
867
868 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
869 async fn async_inside_observable_callback_from_tokio_multi_with_one_worker() {
870 async_inside_observable_callback_helper();
871 }
872
873 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
874 async fn async_inside_observable_callback_from_tokio_multi_with_two_worker() {
875 async_inside_observable_callback_helper();
876 }
877
878 #[tokio::test(flavor = "current_thread")]
879 async fn async_inside_observable_callback_from_tokio_current_thread() {
880 async_inside_observable_callback_helper();
881 }
882
883 #[test]
884 fn async_inside_observable_callback_from_regular_main() {
885 async_inside_observable_callback_helper();
886 }
887
888 fn async_inside_observable_callback_helper() {
889 let interval = std::time::Duration::from_millis(10);
890 let exporter = InMemoryMetricExporter::default();
891 let reader = PeriodicReader::builder(exporter.clone())
892 .with_interval(interval)
893 .build();
894
895 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
896 let meter = meter_provider.meter("test");
897 let _gauge = meter
898 .u64_observable_gauge("my_observable_gauge")
899 .with_callback(|observer| {
900 let value = futures_executor::block_on(some_async_function());
903 observer.observe(value, &[]);
904 })
905 .build();
906
907 meter_provider.force_flush().expect("flush should succeed");
908 let exported_metrics = exporter
909 .get_finished_metrics()
910 .expect("this should not fail");
911 assert!(
912 !exported_metrics.is_empty(),
913 "Metrics should be available in exporter."
914 );
915 }
916
917 async fn some_tokio_async_function() -> u64 {
918 tokio::time::sleep(Duration::from_millis(1)).await;
920 1
921 }
922
923 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
924
925 async fn tokio_async_inside_observable_callback_from_tokio_multi_with_one_worker() {
926 tokio_async_inside_observable_callback_helper(true);
927 }
928
929 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
930 async fn tokio_async_inside_observable_callback_from_tokio_multi_with_two_worker() {
931 tokio_async_inside_observable_callback_helper(true);
932 }
933
934 #[tokio::test(flavor = "current_thread")]
935 #[ignore] async fn tokio_async_inside_observable_callback_from_tokio_current_thread() {
937 tokio_async_inside_observable_callback_helper(true);
938 }
939
940 #[test]
941 fn tokio_async_inside_observable_callback_from_regular_main() {
942 tokio_async_inside_observable_callback_helper(false);
943 }
944
945 fn tokio_async_inside_observable_callback_helper(use_current_tokio_runtime: bool) {
946 let exporter = InMemoryMetricExporter::default();
947 let reader = PeriodicReader::builder(exporter.clone()).build();
948
949 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
950 let meter = meter_provider.meter("test");
951
952 if use_current_tokio_runtime {
953 let rt = tokio::runtime::Handle::current().clone();
954 let _gauge = meter
955 .u64_observable_gauge("my_observable_gauge")
956 .with_callback(move |observer| {
957 let value = rt.block_on(some_tokio_async_function());
959 observer.observe(value, &[]);
960 })
961 .build();
962 } else {
965 let rt = tokio::runtime::Runtime::new().unwrap();
966 let _gauge = meter
967 .u64_observable_gauge("my_observable_gauge")
968 .with_callback(move |observer| {
969 let value = rt.block_on(some_tokio_async_function());
971 observer.observe(value, &[]);
972 })
973 .build();
974 };
978
979 meter_provider.force_flush().expect("flush should succeed");
980 let exported_metrics = exporter
981 .get_finished_metrics()
982 .expect("this should not fail");
983 assert!(
984 !exported_metrics.is_empty(),
985 "Metrics should be available in exporter."
986 );
987 }
988}