opentelemetry_sdk/metrics/
manual_reader.rs1use std::{
2 fmt,
3 sync::{Mutex, Weak},
4};
5
6use opentelemetry::otel_debug;
7
8use crate::{
9 error::{OTelSdkError, OTelSdkResult},
10 metrics::{MetricError, MetricResult, Temporality},
11};
12
13use super::{
14 data::ResourceMetrics,
15 pipeline::Pipeline,
16 reader::{MetricReader, SdkProducer},
17};
18
19pub struct ManualReader {
33 inner: Mutex<ManualReaderInner>,
34 temporality: Temporality,
35}
36
37impl Default for ManualReader {
38 fn default() -> Self {
39 ManualReader::builder().build()
40 }
41}
42
43impl fmt::Debug for ManualReader {
44 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45 f.write_str("ManualReader")
46 }
47}
48
49#[derive(Debug)]
50struct ManualReaderInner {
51 sdk_producer: Option<Weak<dyn SdkProducer>>,
52 is_shutdown: bool,
53}
54
55impl ManualReader {
56 pub fn builder() -> ManualReaderBuilder {
58 ManualReaderBuilder::default()
59 }
60
61 pub(crate) fn new(temporality: Temporality) -> Self {
63 ManualReader {
64 inner: Mutex::new(ManualReaderInner {
65 sdk_producer: None,
66 is_shutdown: false,
67 }),
68 temporality,
69 }
70 }
71}
72
73impl MetricReader for ManualReader {
74 fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
77 let _ = self.inner.lock().map(|mut inner| {
78 if inner.sdk_producer.is_none() {
80 inner.sdk_producer = Some(pipeline);
81 } else {
82 otel_debug!(
83 name: "ManualReader.DuplicateRegistration",
84 message = "The pipeline is already registered to the Reader. Registering pipeline multiple times is not allowed.");
85 }
86 });
87 }
88
89 fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
94 let inner = self.inner.lock()?;
95 match &inner.sdk_producer.as_ref().and_then(|w| w.upgrade()) {
96 Some(producer) => producer.produce(rm)?,
97 None => {
98 return Err(MetricError::Other(
99 "reader is shut down or not registered".into(),
100 ))
101 }
102 };
103
104 Ok(())
105 }
106
107 fn force_flush(&self) -> OTelSdkResult {
109 Ok(())
110 }
111
112 fn shutdown(&self) -> OTelSdkResult {
114 let mut inner = self
115 .inner
116 .lock()
117 .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {}", e)))?;
118
119 inner.sdk_producer = None;
121 inner.is_shutdown = true;
122
123 Ok(())
124 }
125
126 fn temporality(&self, kind: super::InstrumentKind) -> Temporality {
127 kind.temporality_preference(self.temporality)
128 }
129}
130
131#[derive(Default)]
133pub struct ManualReaderBuilder {
134 temporality: Temporality,
135}
136
137impl fmt::Debug for ManualReaderBuilder {
138 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139 f.write_str("ManualReaderBuilder")
140 }
141}
142
143impl ManualReaderBuilder {
144 pub fn new() -> Self {
146 Default::default()
147 }
148
149 pub fn with_temporality(mut self, temporality: Temporality) -> Self {
151 self.temporality = temporality;
152 self
153 }
154
155 pub fn build(self) -> ManualReader {
157 ManualReader::new(self.temporality)
158 }
159}