opentelemetry_sdk/metrics/
meter_provider.rs

1use core::fmt;
2use std::{
3    collections::HashMap,
4    sync::{
5        atomic::{AtomicBool, Ordering},
6        Arc, Mutex,
7    },
8};
9
10use opentelemetry::{
11    metrics::{Meter, MeterProvider},
12    otel_debug, otel_error, otel_info, InstrumentationScope,
13};
14
15use crate::error::OTelSdkResult;
16use crate::Resource;
17
18use super::{
19    exporter::PushMetricExporter, meter::SdkMeter, noop::NoopMeter, pipeline::Pipelines,
20    reader::MetricReader, view::View, PeriodicReader,
21};
22
23/// Handles the creation and coordination of [Meter]s.
24///
25/// All `Meter`s created by a `MeterProvider` will be associated with the same
26/// [Resource], have the same [View]s applied to them, and have their produced
27/// metric telemetry passed to the configured [MetricReader]s. This is a
28/// clonable handle to the MeterProvider implementation itself, and cloning it
29/// will create a new reference, not a new instance of a MeterProvider. Dropping
30/// the last reference to it will trigger shutdown of the provider. Shutdown can
31/// also be triggered manually by calling the `shutdown` method.
32/// [Meter]: opentelemetry::metrics::Meter
33#[derive(Clone, Debug)]
34pub struct SdkMeterProvider {
35    inner: Arc<SdkMeterProviderInner>,
36}
37
38#[derive(Debug)]
39struct SdkMeterProviderInner {
40    pipes: Arc<Pipelines>,
41    meters: Mutex<HashMap<InstrumentationScope, Arc<SdkMeter>>>,
42    shutdown_invoked: AtomicBool,
43}
44
45impl Default for SdkMeterProvider {
46    fn default() -> Self {
47        SdkMeterProvider::builder().build()
48    }
49}
50
51impl SdkMeterProvider {
52    /// Return default [MeterProviderBuilder]
53    pub fn builder() -> MeterProviderBuilder {
54        MeterProviderBuilder::default()
55    }
56
57    /// Flushes all pending telemetry.
58    ///
59    /// There is no guaranteed that all telemetry be flushed or all resources have
60    /// been released on error.
61    ///
62    /// # Examples
63    ///
64    /// ```
65    /// use opentelemetry::{global, Context};
66    /// use opentelemetry_sdk::metrics::SdkMeterProvider;
67    ///
68    /// fn init_metrics() -> SdkMeterProvider {
69    ///     // Setup metric pipelines with readers + views, default has no
70    ///     // readers so nothing is exported.
71    ///     let provider = SdkMeterProvider::default();
72    ///
73    ///     // Set provider to be used as global meter provider
74    ///     let _ = global::set_meter_provider(provider.clone());
75    ///
76    ///     provider
77    /// }
78    ///
79    /// fn main() -> Result<(), Box<dyn std::error::Error>> {
80    ///     let provider = init_metrics();
81    ///
82    ///     // create instruments + record measurements
83    ///
84    ///     // force all instruments to flush
85    ///     provider.force_flush()?;
86    ///
87    ///     // record more measurements..
88    ///
89    ///     // shutdown ensures any cleanup required by the provider is done,
90    ///     // and also invokes shutdown on the readers.
91    ///     provider.shutdown()?;
92    ///
93    ///     Ok(())
94    /// }
95    /// ```
96    pub fn force_flush(&self) -> OTelSdkResult {
97        self.inner.force_flush()
98    }
99
100    /// Shuts down the meter provider flushing all pending telemetry and releasing
101    /// any held computational resources.
102    ///
103    /// This call is idempotent. The first call will perform all flush and releasing
104    /// operations. Subsequent calls will perform no action and will return an error
105    /// stating this.
106    ///
107    /// Measurements made by instruments from meters this MeterProvider created will
108    /// not be exported after Shutdown is called.
109    ///
110    /// There is no guaranteed that all telemetry be flushed or all resources have
111    /// been released on error.
112    pub fn shutdown(&self) -> OTelSdkResult {
113        otel_info!(
114            name: "MeterProvider.Shutdown",
115            message = "User initiated shutdown of MeterProvider."
116        );
117        self.inner.shutdown()
118    }
119}
120
121impl SdkMeterProviderInner {
122    fn force_flush(&self) -> OTelSdkResult {
123        if self
124            .shutdown_invoked
125            .load(std::sync::atomic::Ordering::Relaxed)
126        {
127            Err(crate::error::OTelSdkError::AlreadyShutdown)
128        } else {
129            self.pipes.force_flush()
130        }
131    }
132
133    fn shutdown(&self) -> OTelSdkResult {
134        if self
135            .shutdown_invoked
136            .swap(true, std::sync::atomic::Ordering::SeqCst)
137        {
138            // If the previous value was true, shutdown was already invoked.
139            Err(crate::error::OTelSdkError::AlreadyShutdown)
140        } else {
141            self.pipes.shutdown()
142        }
143    }
144}
145
146impl Drop for SdkMeterProviderInner {
147    fn drop(&mut self) {
148        // If user has already shutdown the provider manually by calling
149        // shutdown(), then we don't need to call shutdown again.
150        if self.shutdown_invoked.load(Ordering::Relaxed) {
151            otel_debug!(
152                name: "MeterProvider.Drop.AlreadyShutdown",
153                message = "MeterProvider was already shut down; drop will not attempt shutdown again."
154            );
155        } else {
156            otel_info!(
157                name: "MeterProvider.Drop",
158                message = "Last reference of MeterProvider dropped, initiating shutdown."
159            );
160            if let Err(err) = self.shutdown() {
161                otel_error!(
162                    name: "MeterProvider.Drop.ShutdownFailed",
163                    message = "Shutdown attempt failed during drop of MeterProvider.",
164                    reason = format!("{}", err)
165                );
166            } else {
167                otel_info!(
168                    name: "MeterProvider.Drop.ShutdownCompleted",
169                );
170            }
171        }
172    }
173}
174
175impl MeterProvider for SdkMeterProvider {
176    fn meter(&self, name: &'static str) -> Meter {
177        let scope = InstrumentationScope::builder(name).build();
178        self.meter_with_scope(scope)
179    }
180
181    fn meter_with_scope(&self, scope: InstrumentationScope) -> Meter {
182        if self.inner.shutdown_invoked.load(Ordering::Relaxed) {
183            otel_debug!(
184                name: "MeterProvider.NoOpMeterReturned",
185                meter_name = scope.name(),
186            );
187            return Meter::new(Arc::new(NoopMeter::new()));
188        }
189
190        if scope.name().is_empty() {
191            otel_info!(name: "MeterNameEmpty", message = "Meter name is empty; consider providing a meaningful name. Meter will function normally and the provided name will be used as-is.");
192        };
193
194        if let Ok(mut meters) = self.inner.meters.lock() {
195            if let Some(existing_meter) = meters.get(&scope) {
196                otel_debug!(
197                    name: "MeterProvider.ExistingMeterReturned",
198                    meter_name = scope.name(),
199                );
200                Meter::new(existing_meter.clone())
201            } else {
202                let new_meter = Arc::new(SdkMeter::new(scope.clone(), self.inner.pipes.clone()));
203                meters.insert(scope.clone(), new_meter.clone());
204                otel_debug!(
205                    name: "MeterProvider.NewMeterCreated",
206                    meter_name = scope.name(),
207                );
208                Meter::new(new_meter)
209            }
210        } else {
211            otel_debug!(
212                name: "MeterProvider.NoOpMeterReturned",
213                meter_name = scope.name(),
214            );
215            Meter::new(Arc::new(NoopMeter::new()))
216        }
217    }
218}
219
220/// Configuration options for a [MeterProvider].
221#[derive(Default)]
222pub struct MeterProviderBuilder {
223    resource: Option<Resource>,
224    readers: Vec<Box<dyn MetricReader>>,
225    views: Vec<Arc<dyn View>>,
226}
227
228impl MeterProviderBuilder {
229    /// Associates a [Resource] with a [MeterProvider].
230    ///
231    /// This [Resource] represents the entity producing telemetry and is associated
232    /// with all [Meter]s the [MeterProvider] will create.
233    ///
234    /// By default, if this option is not used, the default [Resource] will be used.
235    ///
236    /// [Meter]: opentelemetry::metrics::Meter
237    pub fn with_resource(mut self, resource: Resource) -> Self {
238        self.resource = Some(resource);
239        self
240    }
241
242    /// Associates a [MetricReader] with a [MeterProvider].
243    /// [`MeterProviderBuilder::with_periodic_exporter()] can be used to add a PeriodicReader which is
244    /// the most common use case.
245    ///
246    /// A [MeterProvider] will export no metrics without [MetricReader]
247    /// added.
248    pub fn with_reader<T: MetricReader>(mut self, reader: T) -> Self {
249        self.readers.push(Box::new(reader));
250        self
251    }
252
253    /// Adds a [`PushMetricExporter`] to the [`MeterProvider`] and configures it
254    /// to export metrics at **fixed** intervals (60 seconds) using a
255    /// [`PeriodicReader`].
256    ///
257    /// To customize the export interval, set the
258    /// **"OTEL_METRIC_EXPORT_INTERVAL"** environment variable (in
259    /// milliseconds).
260    ///
261    /// Most users should use this method to attach an exporter. Advanced users
262    /// who need finer control over the export process can use
263    /// [`crate::metrics::PeriodicReaderBuilder`] to configure a custom reader and attach it
264    /// using [`MeterProviderBuilder::with_reader()`].
265    pub fn with_periodic_exporter<T>(mut self, exporter: T) -> Self
266    where
267        T: PushMetricExporter,
268    {
269        let reader = PeriodicReader::builder(exporter).build();
270        self.readers.push(Box::new(reader));
271        self
272    }
273
274    #[cfg(feature = "spec_unstable_metrics_views")]
275    /// Associates a [View] with a [MeterProvider].
276    ///
277    /// [View]s are appended to existing ones in a [MeterProvider] if this option is
278    /// used multiple times.
279    ///
280    /// By default, if this option is not used, the [MeterProvider] will use the
281    /// default view.
282    pub fn with_view<T: View>(mut self, view: T) -> Self {
283        self.views.push(Arc::new(view));
284        self
285    }
286
287    /// Construct a new [MeterProvider] with this configuration.
288    pub fn build(self) -> SdkMeterProvider {
289        otel_debug!(
290            name: "MeterProvider.Building",
291            builder = format!("{:?}", &self),
292        );
293
294        let meter_provider = SdkMeterProvider {
295            inner: Arc::new(SdkMeterProviderInner {
296                pipes: Arc::new(Pipelines::new(
297                    self.resource.unwrap_or(Resource::builder().build()),
298                    self.readers,
299                    self.views,
300                )),
301                meters: Default::default(),
302                shutdown_invoked: AtomicBool::new(false),
303            }),
304        };
305
306        otel_info!(
307            name: "MeterProvider.Built",
308        );
309        meter_provider
310    }
311}
312
313impl fmt::Debug for MeterProviderBuilder {
314    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
315        f.debug_struct("MeterProviderBuilder")
316            .field("resource", &self.resource)
317            .field("readers", &self.readers)
318            .field("views", &self.views.len())
319            .finish()
320    }
321}
322#[cfg(all(test, feature = "testing"))]
323mod tests {
324    use crate::error::OTelSdkError;
325    use crate::resource::{
326        SERVICE_NAME, TELEMETRY_SDK_LANGUAGE, TELEMETRY_SDK_NAME, TELEMETRY_SDK_VERSION,
327    };
328    use crate::testing::metrics::metric_reader::TestMetricReader;
329    use crate::Resource;
330    use opentelemetry::metrics::MeterProvider;
331    use opentelemetry::{global, InstrumentationScope};
332    use opentelemetry::{Key, KeyValue, Value};
333    use std::env;
334
335    #[test]
336    fn test_meter_provider_resource() {
337        let assert_resource = |provider: &super::SdkMeterProvider,
338                               resource_key: &'static str,
339                               expect: Option<&'static str>| {
340            assert_eq!(
341                provider.inner.pipes.0[0]
342                    .resource
343                    .get(&Key::from_static_str(resource_key))
344                    .map(|v| v.to_string()),
345                expect.map(|s| s.to_string())
346            );
347        };
348        let assert_telemetry_resource = |provider: &super::SdkMeterProvider| {
349            assert_eq!(
350                provider.inner.pipes.0[0]
351                    .resource
352                    .get(&TELEMETRY_SDK_LANGUAGE.into()),
353                Some(Value::from("rust"))
354            );
355            assert_eq!(
356                provider.inner.pipes.0[0]
357                    .resource
358                    .get(&TELEMETRY_SDK_NAME.into()),
359                Some(Value::from("opentelemetry"))
360            );
361            assert_eq!(
362                provider.inner.pipes.0[0]
363                    .resource
364                    .get(&TELEMETRY_SDK_VERSION.into()),
365                Some(Value::from(env!("CARGO_PKG_VERSION")))
366            );
367        };
368
369        // If users didn't provide a resource and there isn't a env var set. Use default one.
370        temp_env::with_var_unset("OTEL_RESOURCE_ATTRIBUTES", || {
371            let reader = TestMetricReader::new();
372            let default_meter_provider = super::SdkMeterProvider::builder()
373                .with_reader(reader)
374                .build();
375            assert_resource(
376                &default_meter_provider,
377                SERVICE_NAME,
378                Some("unknown_service"),
379            );
380            assert_telemetry_resource(&default_meter_provider);
381        });
382
383        // If user provided a resource, use that.
384        let reader2 = TestMetricReader::new();
385        let custom_meter_provider = super::SdkMeterProvider::builder()
386            .with_reader(reader2)
387            .with_resource(
388                Resource::builder_empty()
389                    .with_service_name("test_service")
390                    .build(),
391            )
392            .build();
393        assert_resource(&custom_meter_provider, SERVICE_NAME, Some("test_service"));
394        assert_eq!(custom_meter_provider.inner.pipes.0[0].resource.len(), 1);
395
396        temp_env::with_var(
397            "OTEL_RESOURCE_ATTRIBUTES",
398            Some("key1=value1, k2, k3=value2"),
399            || {
400                // If `OTEL_RESOURCE_ATTRIBUTES` is set, read them automatically
401                let reader3 = TestMetricReader::new();
402                let env_resource_provider = super::SdkMeterProvider::builder()
403                    .with_reader(reader3)
404                    .build();
405                assert_resource(
406                    &env_resource_provider,
407                    SERVICE_NAME,
408                    Some("unknown_service"),
409                );
410                assert_resource(&env_resource_provider, "key1", Some("value1"));
411                assert_resource(&env_resource_provider, "k3", Some("value2"));
412                assert_telemetry_resource(&env_resource_provider);
413                assert_eq!(env_resource_provider.inner.pipes.0[0].resource.len(), 6);
414            },
415        );
416
417        // When `OTEL_RESOURCE_ATTRIBUTES` is set and also user provided config
418        temp_env::with_var(
419            "OTEL_RESOURCE_ATTRIBUTES",
420            Some("my-custom-key=env-val,k2=value2"),
421            || {
422                let reader4 = TestMetricReader::new();
423                let user_provided_resource_config_provider = super::SdkMeterProvider::builder()
424                    .with_reader(reader4)
425                    .with_resource(
426                        Resource::builder()
427                            .with_attributes([
428                                KeyValue::new("my-custom-key", "my-custom-value"),
429                                KeyValue::new("my-custom-key2", "my-custom-value2"),
430                            ])
431                            .build(),
432                    )
433                    .build();
434                assert_resource(
435                    &user_provided_resource_config_provider,
436                    SERVICE_NAME,
437                    Some("unknown_service"),
438                );
439                assert_resource(
440                    &user_provided_resource_config_provider,
441                    "my-custom-key",
442                    Some("my-custom-value"),
443                );
444                assert_resource(
445                    &user_provided_resource_config_provider,
446                    "my-custom-key2",
447                    Some("my-custom-value2"),
448                );
449                assert_resource(
450                    &user_provided_resource_config_provider,
451                    "k2",
452                    Some("value2"),
453                );
454                assert_telemetry_resource(&user_provided_resource_config_provider);
455                assert_eq!(
456                    user_provided_resource_config_provider.inner.pipes.0[0]
457                        .resource
458                        .len(),
459                    7
460                );
461            },
462        );
463
464        // If user provided a resource, it takes priority during collision.
465        let reader5 = TestMetricReader::new();
466        let no_service_name = super::SdkMeterProvider::builder()
467            .with_reader(reader5)
468            .with_resource(Resource::empty())
469            .build();
470
471        assert_eq!(no_service_name.inner.pipes.0[0].resource.len(), 0)
472    }
473
474    #[test]
475    fn test_meter_provider_shutdown() {
476        let reader = TestMetricReader::new();
477        let provider = super::SdkMeterProvider::builder()
478            .with_reader(reader.clone())
479            .build();
480        global::set_meter_provider(provider.clone());
481        assert!(!reader.is_shutdown());
482        // create a meter and an instrument
483        let meter = global::meter("test");
484        let counter = meter.u64_counter("test_counter").build();
485        // no need to drop a meter for meter_provider shutdown
486        let shutdown_res = provider.shutdown();
487        assert!(shutdown_res.is_ok());
488
489        // shutdown once more should return an error
490        let shutdown_res = provider.shutdown();
491        assert!(matches!(shutdown_res, Err(OTelSdkError::AlreadyShutdown)));
492
493        assert!(shutdown_res.is_err());
494        assert!(reader.is_shutdown());
495        // TODO Fix: the instrument is still available, and can be used.
496        // While the reader is shutdown, and no collect is happening
497        counter.add(1, &[]);
498    }
499    #[test]
500    fn test_shutdown_invoked_on_last_drop() {
501        let reader = TestMetricReader::new();
502        let provider = super::SdkMeterProvider::builder()
503            .with_reader(reader.clone())
504            .build();
505        let clone1 = provider.clone();
506        let clone2 = provider.clone();
507
508        // Initially, shutdown should not be called
509        assert!(!reader.is_shutdown());
510
511        // Drop the first clone
512        drop(clone1);
513        assert!(!reader.is_shutdown());
514
515        // Drop the second clone
516        drop(clone2);
517        assert!(!reader.is_shutdown());
518
519        // Drop the last original provider
520        drop(provider);
521        // Now the shutdown should be invoked
522        assert!(reader.is_shutdown());
523    }
524
525    #[test]
526    fn same_meter_reused_same_scope() {
527        let provider = super::SdkMeterProvider::builder().build();
528        let _meter1 = provider.meter("test");
529        let _meter2 = provider.meter("test");
530        assert_eq!(provider.inner.meters.lock().unwrap().len(), 1);
531
532        let scope = InstrumentationScope::builder("test")
533            .with_version("1.0.0")
534            .with_schema_url("http://example.com")
535            .build();
536
537        let _meter3 = provider.meter_with_scope(scope.clone());
538        let _meter4 = provider.meter_with_scope(scope.clone());
539        let _meter5 = provider.meter_with_scope(scope);
540        assert_eq!(provider.inner.meters.lock().unwrap().len(), 2);
541
542        // these are different meters because meter names are case sensitive
543        let make_scope = |name| {
544            InstrumentationScope::builder(name)
545                .with_version("1.0.0")
546                .with_schema_url("http://example.com")
547                .build()
548        };
549
550        let _meter6 = provider.meter_with_scope(make_scope("ABC"));
551        let _meter7 = provider.meter_with_scope(make_scope("Abc"));
552        let _meter8 = provider.meter_with_scope(make_scope("abc"));
553
554        assert_eq!(provider.inner.meters.lock().unwrap().len(), 5);
555    }
556}