opentelemetry_sdk/metrics/
manual_reader.rs

1use std::{
2    fmt,
3    sync::{Mutex, Weak},
4};
5
6use opentelemetry::otel_debug;
7
8use crate::{
9    error::{OTelSdkError, OTelSdkResult},
10    metrics::{MetricError, MetricResult, Temporality},
11};
12
13use super::{
14    data::ResourceMetrics,
15    pipeline::Pipeline,
16    reader::{MetricReader, SdkProducer},
17};
18
19/// A simple [MetricReader] that allows an application to read metrics on demand.
20///
21/// See [ManualReaderBuilder] for configuration options.
22///
23/// # Example
24///
25/// ```
26/// use opentelemetry_sdk::metrics::ManualReader;
27///
28/// // can specify additional reader configuration
29/// let reader = ManualReader::builder().build();
30/// # drop(reader)
31/// ```
32pub struct ManualReader {
33    inner: Mutex<ManualReaderInner>,
34    temporality: Temporality,
35}
36
37impl Default for ManualReader {
38    fn default() -> Self {
39        ManualReader::builder().build()
40    }
41}
42
43impl fmt::Debug for ManualReader {
44    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45        f.write_str("ManualReader")
46    }
47}
48
49#[derive(Debug)]
50struct ManualReaderInner {
51    sdk_producer: Option<Weak<dyn SdkProducer>>,
52    is_shutdown: bool,
53}
54
55impl ManualReader {
56    /// Configuration for this reader
57    pub fn builder() -> ManualReaderBuilder {
58        ManualReaderBuilder::default()
59    }
60
61    /// A [MetricReader] which is directly called to collect metrics.
62    pub(crate) fn new(temporality: Temporality) -> Self {
63        ManualReader {
64            inner: Mutex::new(ManualReaderInner {
65                sdk_producer: None,
66                is_shutdown: false,
67            }),
68            temporality,
69        }
70    }
71}
72
73impl MetricReader for ManualReader {
74    ///  Register a pipeline which enables the caller to read metrics from the SDK
75    ///  on demand.
76    fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
77        let _ = self.inner.lock().map(|mut inner| {
78            // Only register once. If producer is already set, do nothing.
79            if inner.sdk_producer.is_none() {
80                inner.sdk_producer = Some(pipeline);
81            } else {
82                otel_debug!(
83                    name: "ManualReader.DuplicateRegistration",
84                    message = "The pipeline is already registered to the Reader. Registering pipeline multiple times is not allowed.");
85            }
86        });
87    }
88
89    /// Gathers all metrics from the SDK, calling any
90    /// callbacks necessary and returning the results.
91    ///
92    /// Returns an error if called after shutdown.
93    fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
94        let inner = self.inner.lock()?;
95        match &inner.sdk_producer.as_ref().and_then(|w| w.upgrade()) {
96            Some(producer) => producer.produce(rm)?,
97            None => {
98                return Err(MetricError::Other(
99                    "reader is shut down or not registered".into(),
100                ))
101            }
102        };
103
104        Ok(())
105    }
106
107    /// ForceFlush is a no-op, it always returns nil.
108    fn force_flush(&self) -> OTelSdkResult {
109        Ok(())
110    }
111
112    /// Closes any connections and frees any resources used by the reader.
113    fn shutdown(&self) -> OTelSdkResult {
114        let mut inner = self
115            .inner
116            .lock()
117            .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {}", e)))?;
118
119        // Any future call to collect will now return an error.
120        inner.sdk_producer = None;
121        inner.is_shutdown = true;
122
123        Ok(())
124    }
125
126    fn temporality(&self, kind: super::InstrumentKind) -> Temporality {
127        kind.temporality_preference(self.temporality)
128    }
129}
130
131/// Configuration for a [ManualReader]
132#[derive(Default)]
133pub struct ManualReaderBuilder {
134    temporality: Temporality,
135}
136
137impl fmt::Debug for ManualReaderBuilder {
138    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139        f.write_str("ManualReaderBuilder")
140    }
141}
142
143impl ManualReaderBuilder {
144    /// New manual builder configuration
145    pub fn new() -> Self {
146        Default::default()
147    }
148
149    /// Set the [Temporality] of the exporter.
150    pub fn with_temporality(mut self, temporality: Temporality) -> Self {
151        self.temporality = temporality;
152        self
153    }
154
155    /// Create a new [ManualReader] from this configuration.
156    pub fn build(self) -> ManualReader {
157        ManualReader::new(self.temporality)
158    }
159}