opentelemetry_sdk/metrics/
periodic_reader.rs

1use std::{
2    env, fmt,
3    sync::{
4        mpsc::{self, Receiver, Sender},
5        Arc, Mutex, Weak,
6    },
7    thread,
8    time::{Duration, Instant},
9};
10
11use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn};
12
13use crate::{
14    error::{OTelSdkError, OTelSdkResult},
15    metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult},
16    Resource,
17};
18
19use super::{
20    data::ResourceMetrics, instrument::InstrumentKind, reader::MetricReader, Pipeline, Temporality,
21};
22
23const DEFAULT_INTERVAL: Duration = Duration::from_secs(60);
24
25const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL";
26
27/// Configuration options for [PeriodicReader].
28#[derive(Debug)]
29pub struct PeriodicReaderBuilder<E> {
30    interval: Duration,
31    exporter: E,
32}
33
34impl<E> PeriodicReaderBuilder<E>
35where
36    E: PushMetricExporter,
37{
38    fn new(exporter: E) -> Self {
39        let interval = env::var(METRIC_EXPORT_INTERVAL_NAME)
40            .ok()
41            .and_then(|v| v.parse().map(Duration::from_millis).ok())
42            .unwrap_or(DEFAULT_INTERVAL);
43
44        PeriodicReaderBuilder { interval, exporter }
45    }
46
47    /// Configures the intervening time between exports for a [PeriodicReader].
48    ///
49    /// This option overrides any value set for the `OTEL_METRIC_EXPORT_INTERVAL`
50    /// environment variable.
51    ///
52    /// If this option is not used or `interval` is equal to zero, 60 seconds is
53    /// used as the default.
54    pub fn with_interval(mut self, interval: Duration) -> Self {
55        if !interval.is_zero() {
56            self.interval = interval;
57        }
58        self
59    }
60
61    /// Create a [PeriodicReader] with the given config.
62    pub fn build(self) -> PeriodicReader {
63        PeriodicReader::new(self.exporter, self.interval)
64    }
65}
66
67/// A `MetricReader` that periodically collects and exports metrics at a configurable interval.
68///
69/// By default, [`PeriodicReader`] collects and exports metrics every **60 seconds**.
70/// The time taken for export is **not** included in the interval. Use [`PeriodicReaderBuilder`]
71/// to customize the interval.
72///
73/// [`PeriodicReader`] spawns a background thread to handle metric collection and export.
74/// This thread remains active until [`shutdown()`] is called.
75///
76/// ## Collection Process
77/// "Collection" refers to gathering aggregated metrics from the SDK's internal storage.
78/// During this phase, callbacks from observable instruments are also triggered.
79///
80/// [`PeriodicReader`] does **not** enforce a timeout for collection. If an
81/// observable callback takes too long, it may delay the next collection cycle.
82/// If a callback never returns, it **will stall** all metric collection (and exports)
83/// indefinitely.
84///
85/// ## Exporter Compatibility
86/// When used with the [`OTLP Exporter`](https://docs.rs/opentelemetry-otlp), the following
87/// transport options are supported:
88///
89/// - **`grpc-tonic`**: Requires [`MeterProvider`] to be initialized within a `tokio` runtime.
90/// - **`reqwest-blocking-client`**: Works with both a standard (`main`) function and `tokio::main`.
91///
92/// [`PeriodicReader`] does **not** enforce a timeout for exports either. Instead,
93/// the configured exporter is responsible for enforcing timeouts. If an export operation
94/// never returns, [`PeriodicReader`] will **stop exporting new metrics**, stalling
95/// metric collection.
96///
97/// ## Manual Export & Shutdown
98/// Users can manually trigger an export via [`force_flush()`]. Calling [`shutdown()`]
99/// exports any remaining metrics and should be done before application exit to ensure
100/// all data is sent.
101///
102/// **Warning**: If using **tokio’s current-thread runtime**, calling [`shutdown()`]
103/// from the main thread may cause a deadlock. To prevent this, call [`shutdown()`]
104/// from a separate thread or use tokio's `spawn_blocking`.
105///
106/// [`PeriodicReader`]: crate::metrics::PeriodicReader
107/// [`PeriodicReaderBuilder`]: crate::metrics::PeriodicReaderBuilder
108/// [`MeterProvider`]: crate::metrics::SdkMeterProvider
109/// [`shutdown()`]: crate::metrics::SdkMeterProvider::shutdown
110/// [`force_flush()`]: crate::metrics::SdkMeterProvider::force_flush
111///
112/// # Example
113///
114/// ```no_run
115/// use opentelemetry_sdk::metrics::PeriodicReader;
116/// # fn example<E>(get_exporter: impl Fn() -> E)
117/// # where
118/// #     E: opentelemetry_sdk::metrics::exporter::PushMetricExporter,
119/// # {
120///
121/// let exporter = get_exporter(); // set up a push exporter
122///
123/// let reader = PeriodicReader::builder(exporter).build();
124/// # drop(reader);
125/// # }
126/// ```
127#[derive(Clone)]
128pub struct PeriodicReader {
129    inner: Arc<PeriodicReaderInner>,
130}
131
132impl PeriodicReader {
133    /// Configuration options for a periodic reader with own thread
134    pub fn builder<E>(exporter: E) -> PeriodicReaderBuilder<E>
135    where
136        E: PushMetricExporter,
137    {
138        PeriodicReaderBuilder::new(exporter)
139    }
140
141    fn new<E>(exporter: E, interval: Duration) -> Self
142    where
143        E: PushMetricExporter,
144    {
145        let (message_sender, message_receiver): (Sender<Message>, Receiver<Message>) =
146            mpsc::channel();
147        let exporter_arc = Arc::new(exporter);
148        let reader = PeriodicReader {
149            inner: Arc::new(PeriodicReaderInner {
150                message_sender,
151                producer: Mutex::new(None),
152                exporter: exporter_arc.clone(),
153            }),
154        };
155        let cloned_reader = reader.clone();
156
157        let result_thread_creation = thread::Builder::new()
158            .name("OpenTelemetry.Metrics.PeriodicReader".to_string())
159            .spawn(move || {
160                let mut interval_start = Instant::now();
161                let mut remaining_interval = interval;
162                otel_info!(
163                    name: "PeriodReaderThreadStarted",
164                    interval_in_millisecs = interval.as_millis(),
165                );
166                loop {
167                    otel_debug!(
168                        name: "PeriodReaderThreadLoopAlive", message = "Next export will happen after interval, unless flush or shutdown is triggered.", interval_in_millisecs = remaining_interval.as_millis()
169                    );
170                    match message_receiver.recv_timeout(remaining_interval) {
171                        Ok(Message::Flush(response_sender)) => {
172                            otel_debug!(
173                                name: "PeriodReaderThreadExportingDueToFlush"
174                            );
175                            let export_result = cloned_reader.collect_and_export();
176                            otel_debug!(
177                                name: "PeriodReaderInvokedExport",
178                                export_result = format!("{:?}", export_result)
179                            );
180
181                            // If response_sender is disconnected, we can't send
182                            // the result back. This occurs when the thread that
183                            // initiated flush gave up due to timeout.
184                            // Gracefully handle that with internal logs. The
185                            // internal errors are of Info level, as this is
186                            // useful for user to know whether the flush was
187                            // successful or not, when flush() itself merely
188                            // tells that it timed out.
189
190                            if export_result.is_err() {
191                                if response_sender.send(false).is_err() {
192                                    otel_info!(
193                                        name: "PeriodReader.Flush.ResponseSendError",
194                                        message = "PeriodicReader's flush has failed, but unable to send this info back to caller. 
195                                        This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the flush timeout."
196                                    );
197                                }
198                            } else if response_sender.send(true).is_err() {
199                                otel_info!(
200                                    name: "PeriodReader.Flush.ResponseSendError",
201                                    message = "PeriodicReader's flush has completed successfully, but unable to send this info back to caller. 
202                                    This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the flush timeout."
203                                );
204                            }
205
206                            // Adjust the remaining interval after the flush
207                            let elapsed = interval_start.elapsed();
208                            if elapsed < interval {
209                                remaining_interval = interval - elapsed;
210                                otel_debug!(
211                                    name: "PeriodReaderThreadAdjustingRemainingIntervalAfterFlush",
212                                    remaining_interval = remaining_interval.as_secs()
213                                );
214                            } else {
215                                otel_debug!(
216                                    name: "PeriodReaderThreadAdjustingExportAfterFlush",
217                                );
218                                // Reset the interval if the flush finishes after the expected export time
219                                // effectively missing the normal export.
220                                // Should we attempt to do the missed export immediately?
221                                // Or do the next export at the next interval?
222                                // Currently this attempts the next export immediately.
223                                // i.e calling Flush can affect the regularity.
224                                interval_start = Instant::now();
225                                remaining_interval = Duration::ZERO;
226                            }
227                        }
228                        Ok(Message::Shutdown(response_sender)) => {
229                            // Perform final export and break out of loop and exit the thread
230                            otel_debug!(name: "PeriodReaderThreadExportingDueToShutdown");
231                            let export_result = cloned_reader.collect_and_export();
232                            otel_debug!(
233                                name: "PeriodReaderInvokedExport",
234                                export_result = format!("{:?}", export_result)
235                            );
236                            let shutdown_result = exporter_arc.shutdown();
237                            otel_debug!(
238                                name: "PeriodReaderInvokedExporterShutdown",
239                                shutdown_result = format!("{:?}", shutdown_result)
240                            );
241
242                            // If response_sender is disconnected, we can't send
243                            // the result back. This occurs when the thread that
244                            // initiated shutdown gave up due to timeout.
245                            // Gracefully handle that with internal logs and
246                            // continue with shutdown (i.e exit thread) The
247                            // internal errors are of Info level, as this is
248                            // useful for user to know whether the shutdown was
249                            // successful or not, when shutdown() itself merely
250                            // tells that it timed out.
251                            if export_result.is_err() || shutdown_result.is_err() {
252                                if response_sender.send(false).is_err() {
253                                    otel_info!(
254                                        name: "PeriodReaderThreadShutdown.ResponseSendError",
255                                        message = "PeriodicReader's shutdown has failed, but unable to send this info back to caller. 
256                                        This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the shutdown timeout."
257                                    );
258                                }
259                            } else if response_sender.send(true).is_err() {
260                                otel_info!(
261                                    name: "PeriodReaderThreadShutdown.ResponseSendError",
262                                    message = "PeriodicReader completed its shutdown, but unable to send this info back to caller. 
263                                    This occurs when the caller has timed out waiting for the response. If you see this occuring frequently, consider increasing the shutdown timeout."
264                                );
265                            }
266
267                            otel_debug!(
268                                name: "PeriodReaderThreadExiting",
269                                reason = "ShutdownRequested"
270                            );
271                            break;
272                        }
273                        Err(mpsc::RecvTimeoutError::Timeout) => {
274                            let export_start = Instant::now();
275                            otel_debug!(
276                                name: "PeriodReaderThreadExportingDueToTimer"
277                            );
278
279                            let export_result = cloned_reader.collect_and_export();
280                            otel_debug!(
281                                name: "PeriodReaderInvokedExport",
282                                export_result = format!("{:?}", export_result)
283                            );
284
285                            let time_taken_for_export = export_start.elapsed();
286                            if time_taken_for_export > interval {
287                                otel_debug!(
288                                    name: "PeriodReaderThreadExportTookLongerThanInterval"
289                                );
290                                // if export took longer than interval, do the
291                                // next export immediately.
292                                // Alternatively, we could skip the next export
293                                // and wait for the next interval.
294                                // Or enforce that export timeout is less than interval.
295                                // What is the desired behavior?
296                                interval_start = Instant::now();
297                                remaining_interval = Duration::ZERO;
298                            } else {
299                                remaining_interval = interval - time_taken_for_export;
300                                interval_start = Instant::now();
301                            }
302                        }
303                        Err(mpsc::RecvTimeoutError::Disconnected) => {
304                            // Channel disconnected, only thing to do is break
305                            // out (i.e exit the thread)
306                            otel_debug!(
307                                name: "PeriodReaderThreadExiting",
308                                reason = "MessageSenderDisconnected"
309                            );
310                            break;
311                        }
312                    }
313                }
314                otel_info!(
315                    name: "PeriodReaderThreadStopped"
316                );
317            });
318
319        // TODO: Should we fail-fast here and bubble up the error to user?
320        #[allow(unused_variables)]
321        if let Err(e) = result_thread_creation {
322            otel_error!(
323                name: "PeriodReaderThreadStartError",
324                message = "Failed to start PeriodicReader thread. Metrics will not be exported.",
325                error = format!("{:?}", e)
326            );
327        }
328        reader
329    }
330
331    fn collect_and_export(&self) -> OTelSdkResult {
332        self.inner.collect_and_export()
333    }
334}
335
336impl fmt::Debug for PeriodicReader {
337    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
338        f.debug_struct("PeriodicReader").finish()
339    }
340}
341
342struct PeriodicReaderInner {
343    exporter: Arc<dyn PushMetricExporter>,
344    message_sender: mpsc::Sender<Message>,
345    producer: Mutex<Option<Weak<dyn SdkProducer>>>,
346}
347
348impl PeriodicReaderInner {
349    fn register_pipeline(&self, producer: Weak<dyn SdkProducer>) {
350        let mut inner = self.producer.lock().expect("lock poisoned");
351        *inner = Some(producer);
352    }
353
354    fn temporality(&self, _kind: InstrumentKind) -> Temporality {
355        self.exporter.temporality()
356    }
357
358    fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
359        let producer = self.producer.lock().expect("lock poisoned");
360        if let Some(p) = producer.as_ref() {
361            p.upgrade()
362                .ok_or_else(|| MetricError::Other("pipeline is dropped".into()))?
363                .produce(rm)?;
364            Ok(())
365        } else {
366            otel_warn!(
367            name: "PeriodReader.MeterProviderNotRegistered",
368            message = "PeriodicReader is not registered with MeterProvider. Metrics will not be collected. \
369                   This occurs when a periodic reader is created but not associated with a MeterProvider \
370                   by calling `.with_reader(reader)` on MeterProviderBuilder."
371            );
372            Err(MetricError::Other("MeterProvider is not registered".into()))
373        }
374    }
375
376    fn collect_and_export(&self) -> OTelSdkResult {
377        // TODO: Reuse the internal vectors. Or refactor to avoid needing any
378        // owned data structures to be passed to exporters.
379        let mut rm = ResourceMetrics {
380            resource: Resource::empty(),
381            scope_metrics: Vec::new(),
382        };
383
384        let current_time = Instant::now();
385        let collect_result = self.collect(&mut rm);
386        let time_taken_for_collect = current_time.elapsed();
387
388        #[allow(clippy::question_mark)]
389        if let Err(e) = collect_result {
390            otel_warn!(
391                name: "PeriodReaderCollectError",
392                error = format!("{:?}", e)
393            );
394            return Err(OTelSdkError::InternalFailure(e.to_string()));
395        }
396
397        if rm.scope_metrics.is_empty() {
398            otel_debug!(name: "NoMetricsCollected");
399            return Ok(());
400        }
401
402        let metrics_count = rm.scope_metrics.iter().fold(0, |count, scope_metrics| {
403            count + scope_metrics.metrics.len()
404        });
405        otel_debug!(name: "PeriodicReaderMetricsCollected", count = metrics_count, time_taken_in_millis = time_taken_for_collect.as_millis());
406
407        // Relying on futures executor to execute async call.
408        // TODO: Pass timeout to exporter
409        futures_executor::block_on(self.exporter.export(&mut rm))
410    }
411
412    fn force_flush(&self) -> OTelSdkResult {
413        // TODO: Better message for this scenario.
414        // Flush and Shutdown called from 2 threads Flush check shutdown
415        // flag before shutdown thread sets it. Both threads attempt to send
416        // message to the same channel. Case1: Flush thread sends message first,
417        // shutdown thread sends message next. Flush would succeed, as
418        // background thread won't process shutdown message until flush
419        // triggered export is done. Case2: Shutdown thread sends message first,
420        // flush thread sends message next. Shutdown would succeed, as
421        // background thread would process shutdown message first. The
422        // background exits so it won't receive the flush message. ForceFlush
423        // returns Failure, but we could indicate specifically that shutdown has
424        // completed. TODO is to see if this message can be improved.
425
426        let (response_tx, response_rx) = mpsc::channel();
427        self.message_sender
428            .send(Message::Flush(response_tx))
429            .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
430
431        if let Ok(response) = response_rx.recv() {
432            // TODO: call exporter's force_flush method.
433            if response {
434                Ok(())
435            } else {
436                Err(OTelSdkError::InternalFailure("Failed to flush".into()))
437            }
438        } else {
439            Err(OTelSdkError::InternalFailure("Failed to flush".into()))
440        }
441    }
442
443    fn shutdown(&self) -> OTelSdkResult {
444        // TODO: See if this is better to be created upfront.
445        let (response_tx, response_rx) = mpsc::channel();
446        self.message_sender
447            .send(Message::Shutdown(response_tx))
448            .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
449
450        // TODO: Make this timeout configurable.
451        match response_rx.recv_timeout(Duration::from_secs(5)) {
452            Ok(response) => {
453                if response {
454                    Ok(())
455                } else {
456                    Err(OTelSdkError::InternalFailure("Failed to shutdown".into()))
457                }
458            }
459            Err(mpsc::RecvTimeoutError::Timeout) => {
460                Err(OTelSdkError::Timeout(Duration::from_secs(5)))
461            }
462            Err(mpsc::RecvTimeoutError::Disconnected) => {
463                Err(OTelSdkError::InternalFailure("Failed to shutdown".into()))
464            }
465        }
466    }
467}
468
469#[derive(Debug)]
470enum Message {
471    Flush(Sender<bool>),
472    Shutdown(Sender<bool>),
473}
474
475impl MetricReader for PeriodicReader {
476    fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
477        self.inner.register_pipeline(pipeline);
478    }
479
480    fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
481        self.inner.collect(rm)
482    }
483
484    fn force_flush(&self) -> OTelSdkResult {
485        self.inner.force_flush()
486    }
487
488    // TODO: Offer an async version of shutdown so users can await the shutdown
489    // completion, and avoid blocking the thread. The default shutdown on drop
490    // can still use blocking call. If user already explicitly called shutdown,
491    // drop won't call shutdown again.
492    fn shutdown(&self) -> OTelSdkResult {
493        self.inner.shutdown()
494    }
495
496    /// To construct a [MetricReader][metric-reader] when setting up an SDK,
497    /// The output temporality (optional), a function of instrument kind.
498    /// This function SHOULD be obtained from the exporter.
499    ///
500    /// If not configured, the Cumulative temporality SHOULD be used.
501    ///  
502    /// [metric-reader]: https://github.com/open-telemetry/opentelemetry-specification/blob/0a78571045ca1dca48621c9648ec3c832c3c541c/specification/metrics/sdk.md#metricreader
503    fn temporality(&self, kind: InstrumentKind) -> Temporality {
504        kind.temporality_preference(self.inner.temporality(kind))
505    }
506}
507
508#[cfg(all(test, feature = "testing"))]
509mod tests {
510    use super::PeriodicReader;
511    use crate::{
512        error::{OTelSdkError, OTelSdkResult},
513        metrics::{
514            data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader,
515            InMemoryMetricExporter, SdkMeterProvider, Temporality,
516        },
517        Resource,
518    };
519    use async_trait::async_trait;
520    use opentelemetry::metrics::MeterProvider;
521    use std::{
522        sync::{
523            atomic::{AtomicBool, AtomicUsize, Ordering},
524            mpsc, Arc,
525        },
526        time::Duration,
527    };
528
529    // use below command to run all tests
530    // cargo test metrics::periodic_reader::tests --features=testing,spec_unstable_metrics_views -- --nocapture
531
532    #[derive(Debug, Clone)]
533    struct MetricExporterThatFailsOnlyOnFirst {
534        count: Arc<AtomicUsize>,
535    }
536
537    impl Default for MetricExporterThatFailsOnlyOnFirst {
538        fn default() -> Self {
539            MetricExporterThatFailsOnlyOnFirst {
540                count: Arc::new(AtomicUsize::new(0)),
541            }
542        }
543    }
544
545    impl MetricExporterThatFailsOnlyOnFirst {
546        fn get_count(&self) -> usize {
547            self.count.load(Ordering::Relaxed)
548        }
549    }
550
551    #[async_trait]
552    impl PushMetricExporter for MetricExporterThatFailsOnlyOnFirst {
553        async fn export(&self, _metrics: &mut ResourceMetrics) -> OTelSdkResult {
554            if self.count.fetch_add(1, Ordering::Relaxed) == 0 {
555                Err(OTelSdkError::InternalFailure("export failed".into()))
556            } else {
557                Ok(())
558            }
559        }
560
561        async fn force_flush(&self) -> OTelSdkResult {
562            Ok(())
563        }
564
565        fn shutdown(&self) -> OTelSdkResult {
566            Ok(())
567        }
568
569        fn temporality(&self) -> Temporality {
570            Temporality::Cumulative
571        }
572    }
573
574    #[derive(Debug, Clone, Default)]
575    struct MockMetricExporter {
576        is_shutdown: Arc<AtomicBool>,
577    }
578
579    #[async_trait]
580    impl PushMetricExporter for MockMetricExporter {
581        async fn export(&self, _metrics: &mut ResourceMetrics) -> OTelSdkResult {
582            Ok(())
583        }
584
585        async fn force_flush(&self) -> OTelSdkResult {
586            Ok(())
587        }
588
589        fn shutdown(&self) -> OTelSdkResult {
590            self.is_shutdown.store(true, Ordering::Relaxed);
591            Ok(())
592        }
593
594        fn temporality(&self) -> Temporality {
595            Temporality::Cumulative
596        }
597    }
598
599    #[test]
600    fn collection_triggered_by_interval_multiple() {
601        // Arrange
602        let interval = std::time::Duration::from_millis(1);
603        let exporter = InMemoryMetricExporter::default();
604        let reader = PeriodicReader::builder(exporter.clone())
605            .with_interval(interval)
606            .build();
607        let i = Arc::new(AtomicUsize::new(0));
608        let i_clone = i.clone();
609
610        // Act
611        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
612        let meter = meter_provider.meter("test");
613        let _counter = meter
614            .u64_observable_counter("testcounter")
615            .with_callback(move |_| {
616                i_clone.fetch_add(1, Ordering::Relaxed);
617            })
618            .build();
619
620        // Sleep for a duration 5X (plus liberal buffer to account for potential
621        // CI slowness) the interval to ensure multiple collection.
622        // Not a fan of such tests, but this seems to be the only way to test
623        // if periodic reader is doing its job.
624        // TODO: Decide if this should be ignored in CI
625        std::thread::sleep(interval * 5 * 20);
626
627        // Assert
628        assert!(i.load(Ordering::Relaxed) >= 5);
629    }
630
631    #[test]
632    fn shutdown_repeat() {
633        // Arrange
634        let exporter = InMemoryMetricExporter::default();
635        let reader = PeriodicReader::builder(exporter.clone()).build();
636
637        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
638        let result = meter_provider.shutdown();
639        assert!(result.is_ok());
640
641        // calling shutdown again should return Err
642        let result = meter_provider.shutdown();
643        assert!(result.is_err());
644        assert!(matches!(result, Err(OTelSdkError::AlreadyShutdown)));
645
646        // calling shutdown again should return Err
647        let result = meter_provider.shutdown();
648        assert!(result.is_err());
649        assert!(matches!(result, Err(OTelSdkError::AlreadyShutdown)));
650    }
651
652    #[test]
653    fn flush_after_shutdown() {
654        // Arrange
655        let exporter = InMemoryMetricExporter::default();
656        let reader = PeriodicReader::builder(exporter.clone()).build();
657
658        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
659        let result = meter_provider.force_flush();
660        assert!(result.is_ok());
661
662        let result = meter_provider.shutdown();
663        assert!(result.is_ok());
664
665        // calling force_flush after shutdown should return Err
666        let result = meter_provider.force_flush();
667        assert!(result.is_err());
668    }
669
670    #[test]
671    fn flush_repeat() {
672        // Arrange
673        let exporter = InMemoryMetricExporter::default();
674        let reader = PeriodicReader::builder(exporter.clone()).build();
675
676        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
677        let result = meter_provider.force_flush();
678        assert!(result.is_ok());
679
680        // calling force_flush again should return Ok
681        let result = meter_provider.force_flush();
682        assert!(result.is_ok());
683    }
684
685    #[test]
686    fn periodic_reader_without_pipeline() {
687        // Arrange
688        let exporter = InMemoryMetricExporter::default();
689        let reader = PeriodicReader::builder(exporter.clone()).build();
690
691        let rm = &mut ResourceMetrics {
692            resource: Resource::empty(),
693            scope_metrics: Vec::new(),
694        };
695        // Pipeline is not registered, so collect should return an error
696        let result = reader.collect(rm);
697        assert!(result.is_err());
698
699        // Pipeline is not registered, so flush should return an error
700        let result = reader.force_flush();
701        assert!(result.is_err());
702
703        // Adding reader to meter provider should register the pipeline
704        // TODO: This part might benefit from a different design.
705        let meter_provider = SdkMeterProvider::builder()
706            .with_reader(reader.clone())
707            .build();
708
709        // Now collect and flush should succeed
710        let result = reader.collect(rm);
711        assert!(result.is_ok());
712
713        let result = meter_provider.force_flush();
714        assert!(result.is_ok());
715    }
716
717    #[test]
718    fn exporter_failures_are_handled() {
719        // create a mock exporter that fails 1st time and succeeds 2nd time
720        // Validate using this exporter that periodic reader can handle exporter failure
721        // and continue to export metrics.
722        // Arrange
723        let interval = std::time::Duration::from_millis(10);
724        let exporter = MetricExporterThatFailsOnlyOnFirst::default();
725        let reader = PeriodicReader::builder(exporter.clone())
726            .with_interval(interval)
727            .build();
728
729        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
730        let meter = meter_provider.meter("test");
731        let counter = meter.u64_counter("sync_counter").build();
732        counter.add(1, &[]);
733        let _obs_counter = meter
734            .u64_observable_counter("testcounter")
735            .with_callback(move |observer| {
736                observer.observe(1, &[]);
737            })
738            .build();
739
740        // Sleep for a duration much longer than the interval to trigger
741        // multiple exports, including failures.
742        // Not a fan of such tests, but this seems to be the
743        // only way to test if periodic reader is doing its job. TODO: Decide if
744        // this should be ignored in CI
745        std::thread::sleep(Duration::from_millis(500));
746
747        // Assert that atleast 2 exports are attempted given the 1st one fails.
748        assert!(exporter.get_count() >= 2);
749    }
750
751    #[test]
752    fn shutdown_passed_to_exporter() {
753        // Arrange
754        let exporter = MockMetricExporter::default();
755        let reader = PeriodicReader::builder(exporter.clone()).build();
756
757        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
758        let meter = meter_provider.meter("test");
759        let counter = meter.u64_counter("sync_counter").build();
760        counter.add(1, &[]);
761
762        // shutdown the provider, which should call shutdown on periodic reader
763        // which in turn should call shutdown on exporter.
764        let result = meter_provider.shutdown();
765        assert!(result.is_ok());
766        assert!(exporter.is_shutdown.load(Ordering::Relaxed));
767    }
768
769    #[test]
770    fn collection() {
771        collection_triggered_by_interval_helper();
772        collection_triggered_by_flush_helper();
773        collection_triggered_by_shutdown_helper();
774        collection_triggered_by_drop_helper();
775    }
776
777    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
778    async fn collection_from_tokio_multi_with_one_worker() {
779        collection_triggered_by_interval_helper();
780        collection_triggered_by_flush_helper();
781        collection_triggered_by_shutdown_helper();
782        collection_triggered_by_drop_helper();
783    }
784
785    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
786    async fn collection_from_tokio_with_two_worker() {
787        collection_triggered_by_interval_helper();
788        collection_triggered_by_flush_helper();
789        collection_triggered_by_shutdown_helper();
790        collection_triggered_by_drop_helper();
791    }
792
793    #[tokio::test(flavor = "current_thread")]
794    async fn collection_from_tokio_current() {
795        collection_triggered_by_interval_helper();
796        collection_triggered_by_flush_helper();
797        collection_triggered_by_shutdown_helper();
798        collection_triggered_by_drop_helper();
799    }
800
801    fn collection_triggered_by_interval_helper() {
802        collection_helper(|_| {
803            // Sleep for a duration longer than the interval to ensure at least one collection
804            // Not a fan of such tests, but this seems to be the only way to test
805            // if periodic reader is doing its job.
806            // TODO: Decide if this should be ignored in CI
807            std::thread::sleep(Duration::from_millis(500));
808        });
809    }
810
811    fn collection_triggered_by_flush_helper() {
812        collection_helper(|meter_provider| {
813            meter_provider.force_flush().expect("flush should succeed");
814        });
815    }
816
817    fn collection_triggered_by_shutdown_helper() {
818        collection_helper(|meter_provider| {
819            meter_provider.shutdown().expect("shutdown should succeed");
820        });
821    }
822
823    fn collection_triggered_by_drop_helper() {
824        collection_helper(|meter_provider| {
825            drop(meter_provider);
826        });
827    }
828
829    fn collection_helper(trigger: fn(SdkMeterProvider)) {
830        // Arrange
831        let exporter = InMemoryMetricExporter::default();
832        let reader = PeriodicReader::builder(exporter.clone()).build();
833        let (sender, receiver) = mpsc::channel();
834
835        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
836        let meter = meter_provider.meter("test");
837        let _counter = meter
838            .u64_observable_counter("testcounter")
839            .with_callback(move |observer| {
840                observer.observe(1, &[]);
841                sender.send(()).expect("channel should still be open");
842            })
843            .build();
844
845        // Act
846        trigger(meter_provider);
847
848        // Assert
849        receiver
850            .recv_timeout(Duration::ZERO)
851            .expect("message should be available in channel, indicating a collection occurred, which should trigger observable callback");
852
853        let exported_metrics = exporter
854            .get_finished_metrics()
855            .expect("this should not fail");
856        assert!(
857            !exported_metrics.is_empty(),
858            "Metrics should be available in exporter."
859        );
860    }
861
862    async fn some_async_function() -> u64 {
863        // No dependency on any particular async runtime.
864        std::thread::sleep(std::time::Duration::from_millis(1));
865        1
866    }
867
868    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
869    async fn async_inside_observable_callback_from_tokio_multi_with_one_worker() {
870        async_inside_observable_callback_helper();
871    }
872
873    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
874    async fn async_inside_observable_callback_from_tokio_multi_with_two_worker() {
875        async_inside_observable_callback_helper();
876    }
877
878    #[tokio::test(flavor = "current_thread")]
879    async fn async_inside_observable_callback_from_tokio_current_thread() {
880        async_inside_observable_callback_helper();
881    }
882
883    #[test]
884    fn async_inside_observable_callback_from_regular_main() {
885        async_inside_observable_callback_helper();
886    }
887
888    fn async_inside_observable_callback_helper() {
889        let interval = std::time::Duration::from_millis(10);
890        let exporter = InMemoryMetricExporter::default();
891        let reader = PeriodicReader::builder(exporter.clone())
892            .with_interval(interval)
893            .build();
894
895        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
896        let meter = meter_provider.meter("test");
897        let _gauge = meter
898            .u64_observable_gauge("my_observable_gauge")
899            .with_callback(|observer| {
900                // using futures_executor::block_on intentionally and avoiding
901                // any particular async runtime.
902                let value = futures_executor::block_on(some_async_function());
903                observer.observe(value, &[]);
904            })
905            .build();
906
907        meter_provider.force_flush().expect("flush should succeed");
908        let exported_metrics = exporter
909            .get_finished_metrics()
910            .expect("this should not fail");
911        assert!(
912            !exported_metrics.is_empty(),
913            "Metrics should be available in exporter."
914        );
915    }
916
917    async fn some_tokio_async_function() -> u64 {
918        // Tokio specific async function
919        tokio::time::sleep(Duration::from_millis(1)).await;
920        1
921    }
922
923    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
924
925    async fn tokio_async_inside_observable_callback_from_tokio_multi_with_one_worker() {
926        tokio_async_inside_observable_callback_helper(true);
927    }
928
929    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
930    async fn tokio_async_inside_observable_callback_from_tokio_multi_with_two_worker() {
931        tokio_async_inside_observable_callback_helper(true);
932    }
933
934    #[tokio::test(flavor = "current_thread")]
935    #[ignore] //TODO: Investigate if this can be fixed.
936    async fn tokio_async_inside_observable_callback_from_tokio_current_thread() {
937        tokio_async_inside_observable_callback_helper(true);
938    }
939
940    #[test]
941    fn tokio_async_inside_observable_callback_from_regular_main() {
942        tokio_async_inside_observable_callback_helper(false);
943    }
944
945    fn tokio_async_inside_observable_callback_helper(use_current_tokio_runtime: bool) {
946        let exporter = InMemoryMetricExporter::default();
947        let reader = PeriodicReader::builder(exporter.clone()).build();
948
949        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
950        let meter = meter_provider.meter("test");
951
952        if use_current_tokio_runtime {
953            let rt = tokio::runtime::Handle::current().clone();
954            let _gauge = meter
955                .u64_observable_gauge("my_observable_gauge")
956                .with_callback(move |observer| {
957                    // call tokio specific async function from here
958                    let value = rt.block_on(some_tokio_async_function());
959                    observer.observe(value, &[]);
960                })
961                .build();
962            // rt here is a reference to the current tokio runtime.
963            // Dropping it occurs when the tokio::main itself ends.
964        } else {
965            let rt = tokio::runtime::Runtime::new().unwrap();
966            let _gauge = meter
967                .u64_observable_gauge("my_observable_gauge")
968                .with_callback(move |observer| {
969                    // call tokio specific async function from here
970                    let value = rt.block_on(some_tokio_async_function());
971                    observer.observe(value, &[]);
972                })
973                .build();
974            // rt is not dropped here as it is moved to the closure,
975            // and is dropped only when MeterProvider itself is dropped.
976            // This works when called from normal main.
977        };
978
979        meter_provider.force_flush().expect("flush should succeed");
980        let exported_metrics = exporter
981            .get_finished_metrics()
982            .expect("this should not fail");
983        assert!(
984            !exported_metrics.is_empty(),
985            "Metrics should be available in exporter."
986        );
987    }
988}