opentelemetry_sdk/metrics/
meter_provider.rs1use core::fmt;
2use std::{
3 collections::HashMap,
4 sync::{
5 atomic::{AtomicBool, Ordering},
6 Arc, Mutex,
7 },
8};
9
10use opentelemetry::{
11 metrics::{Meter, MeterProvider},
12 otel_debug, otel_error, otel_info, InstrumentationScope,
13};
14
15use crate::error::OTelSdkResult;
16use crate::Resource;
17
18use super::{
19 exporter::PushMetricExporter, meter::SdkMeter, noop::NoopMeter, pipeline::Pipelines,
20 reader::MetricReader, view::View, PeriodicReader,
21};
22
23#[derive(Clone, Debug)]
34pub struct SdkMeterProvider {
35 inner: Arc<SdkMeterProviderInner>,
36}
37
38#[derive(Debug)]
39struct SdkMeterProviderInner {
40 pipes: Arc<Pipelines>,
41 meters: Mutex<HashMap<InstrumentationScope, Arc<SdkMeter>>>,
42 shutdown_invoked: AtomicBool,
43}
44
45impl Default for SdkMeterProvider {
46 fn default() -> Self {
47 SdkMeterProvider::builder().build()
48 }
49}
50
51impl SdkMeterProvider {
52 pub fn builder() -> MeterProviderBuilder {
54 MeterProviderBuilder::default()
55 }
56
57 pub fn force_flush(&self) -> OTelSdkResult {
97 self.inner.force_flush()
98 }
99
100 pub fn shutdown(&self) -> OTelSdkResult {
113 otel_info!(
114 name: "MeterProvider.Shutdown",
115 message = "User initiated shutdown of MeterProvider."
116 );
117 self.inner.shutdown()
118 }
119}
120
121impl SdkMeterProviderInner {
122 fn force_flush(&self) -> OTelSdkResult {
123 if self
124 .shutdown_invoked
125 .load(std::sync::atomic::Ordering::Relaxed)
126 {
127 Err(crate::error::OTelSdkError::AlreadyShutdown)
128 } else {
129 self.pipes.force_flush()
130 }
131 }
132
133 fn shutdown(&self) -> OTelSdkResult {
134 if self
135 .shutdown_invoked
136 .swap(true, std::sync::atomic::Ordering::SeqCst)
137 {
138 Err(crate::error::OTelSdkError::AlreadyShutdown)
140 } else {
141 self.pipes.shutdown()
142 }
143 }
144}
145
146impl Drop for SdkMeterProviderInner {
147 fn drop(&mut self) {
148 if self.shutdown_invoked.load(Ordering::Relaxed) {
151 otel_debug!(
152 name: "MeterProvider.Drop.AlreadyShutdown",
153 message = "MeterProvider was already shut down; drop will not attempt shutdown again."
154 );
155 } else {
156 otel_info!(
157 name: "MeterProvider.Drop",
158 message = "Last reference of MeterProvider dropped, initiating shutdown."
159 );
160 if let Err(err) = self.shutdown() {
161 otel_error!(
162 name: "MeterProvider.Drop.ShutdownFailed",
163 message = "Shutdown attempt failed during drop of MeterProvider.",
164 reason = format!("{}", err)
165 );
166 } else {
167 otel_info!(
168 name: "MeterProvider.Drop.ShutdownCompleted",
169 );
170 }
171 }
172 }
173}
174
175impl MeterProvider for SdkMeterProvider {
176 fn meter(&self, name: &'static str) -> Meter {
177 let scope = InstrumentationScope::builder(name).build();
178 self.meter_with_scope(scope)
179 }
180
181 fn meter_with_scope(&self, scope: InstrumentationScope) -> Meter {
182 if self.inner.shutdown_invoked.load(Ordering::Relaxed) {
183 otel_debug!(
184 name: "MeterProvider.NoOpMeterReturned",
185 meter_name = scope.name(),
186 );
187 return Meter::new(Arc::new(NoopMeter::new()));
188 }
189
190 if scope.name().is_empty() {
191 otel_info!(name: "MeterNameEmpty", message = "Meter name is empty; consider providing a meaningful name. Meter will function normally and the provided name will be used as-is.");
192 };
193
194 if let Ok(mut meters) = self.inner.meters.lock() {
195 if let Some(existing_meter) = meters.get(&scope) {
196 otel_debug!(
197 name: "MeterProvider.ExistingMeterReturned",
198 meter_name = scope.name(),
199 );
200 Meter::new(existing_meter.clone())
201 } else {
202 let new_meter = Arc::new(SdkMeter::new(scope.clone(), self.inner.pipes.clone()));
203 meters.insert(scope.clone(), new_meter.clone());
204 otel_debug!(
205 name: "MeterProvider.NewMeterCreated",
206 meter_name = scope.name(),
207 );
208 Meter::new(new_meter)
209 }
210 } else {
211 otel_debug!(
212 name: "MeterProvider.NoOpMeterReturned",
213 meter_name = scope.name(),
214 );
215 Meter::new(Arc::new(NoopMeter::new()))
216 }
217 }
218}
219
220#[derive(Default)]
222pub struct MeterProviderBuilder {
223 resource: Option<Resource>,
224 readers: Vec<Box<dyn MetricReader>>,
225 views: Vec<Arc<dyn View>>,
226}
227
228impl MeterProviderBuilder {
229 pub fn with_resource(mut self, resource: Resource) -> Self {
238 self.resource = Some(resource);
239 self
240 }
241
242 pub fn with_reader<T: MetricReader>(mut self, reader: T) -> Self {
249 self.readers.push(Box::new(reader));
250 self
251 }
252
253 pub fn with_periodic_exporter<T>(mut self, exporter: T) -> Self
266 where
267 T: PushMetricExporter,
268 {
269 let reader = PeriodicReader::builder(exporter).build();
270 self.readers.push(Box::new(reader));
271 self
272 }
273
274 #[cfg(feature = "spec_unstable_metrics_views")]
275 pub fn with_view<T: View>(mut self, view: T) -> Self {
283 self.views.push(Arc::new(view));
284 self
285 }
286
287 pub fn build(self) -> SdkMeterProvider {
289 otel_debug!(
290 name: "MeterProvider.Building",
291 builder = format!("{:?}", &self),
292 );
293
294 let meter_provider = SdkMeterProvider {
295 inner: Arc::new(SdkMeterProviderInner {
296 pipes: Arc::new(Pipelines::new(
297 self.resource.unwrap_or(Resource::builder().build()),
298 self.readers,
299 self.views,
300 )),
301 meters: Default::default(),
302 shutdown_invoked: AtomicBool::new(false),
303 }),
304 };
305
306 otel_info!(
307 name: "MeterProvider.Built",
308 );
309 meter_provider
310 }
311}
312
313impl fmt::Debug for MeterProviderBuilder {
314 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
315 f.debug_struct("MeterProviderBuilder")
316 .field("resource", &self.resource)
317 .field("readers", &self.readers)
318 .field("views", &self.views.len())
319 .finish()
320 }
321}
322#[cfg(all(test, feature = "testing"))]
323mod tests {
324 use crate::error::OTelSdkError;
325 use crate::resource::{
326 SERVICE_NAME, TELEMETRY_SDK_LANGUAGE, TELEMETRY_SDK_NAME, TELEMETRY_SDK_VERSION,
327 };
328 use crate::testing::metrics::metric_reader::TestMetricReader;
329 use crate::Resource;
330 use opentelemetry::metrics::MeterProvider;
331 use opentelemetry::{global, InstrumentationScope};
332 use opentelemetry::{Key, KeyValue, Value};
333 use std::env;
334
335 #[test]
336 fn test_meter_provider_resource() {
337 let assert_resource = |provider: &super::SdkMeterProvider,
338 resource_key: &'static str,
339 expect: Option<&'static str>| {
340 assert_eq!(
341 provider.inner.pipes.0[0]
342 .resource
343 .get(&Key::from_static_str(resource_key))
344 .map(|v| v.to_string()),
345 expect.map(|s| s.to_string())
346 );
347 };
348 let assert_telemetry_resource = |provider: &super::SdkMeterProvider| {
349 assert_eq!(
350 provider.inner.pipes.0[0]
351 .resource
352 .get(&TELEMETRY_SDK_LANGUAGE.into()),
353 Some(Value::from("rust"))
354 );
355 assert_eq!(
356 provider.inner.pipes.0[0]
357 .resource
358 .get(&TELEMETRY_SDK_NAME.into()),
359 Some(Value::from("opentelemetry"))
360 );
361 assert_eq!(
362 provider.inner.pipes.0[0]
363 .resource
364 .get(&TELEMETRY_SDK_VERSION.into()),
365 Some(Value::from(env!("CARGO_PKG_VERSION")))
366 );
367 };
368
369 temp_env::with_var_unset("OTEL_RESOURCE_ATTRIBUTES", || {
371 let reader = TestMetricReader::new();
372 let default_meter_provider = super::SdkMeterProvider::builder()
373 .with_reader(reader)
374 .build();
375 assert_resource(
376 &default_meter_provider,
377 SERVICE_NAME,
378 Some("unknown_service"),
379 );
380 assert_telemetry_resource(&default_meter_provider);
381 });
382
383 let reader2 = TestMetricReader::new();
385 let custom_meter_provider = super::SdkMeterProvider::builder()
386 .with_reader(reader2)
387 .with_resource(
388 Resource::builder_empty()
389 .with_service_name("test_service")
390 .build(),
391 )
392 .build();
393 assert_resource(&custom_meter_provider, SERVICE_NAME, Some("test_service"));
394 assert_eq!(custom_meter_provider.inner.pipes.0[0].resource.len(), 1);
395
396 temp_env::with_var(
397 "OTEL_RESOURCE_ATTRIBUTES",
398 Some("key1=value1, k2, k3=value2"),
399 || {
400 let reader3 = TestMetricReader::new();
402 let env_resource_provider = super::SdkMeterProvider::builder()
403 .with_reader(reader3)
404 .build();
405 assert_resource(
406 &env_resource_provider,
407 SERVICE_NAME,
408 Some("unknown_service"),
409 );
410 assert_resource(&env_resource_provider, "key1", Some("value1"));
411 assert_resource(&env_resource_provider, "k3", Some("value2"));
412 assert_telemetry_resource(&env_resource_provider);
413 assert_eq!(env_resource_provider.inner.pipes.0[0].resource.len(), 6);
414 },
415 );
416
417 temp_env::with_var(
419 "OTEL_RESOURCE_ATTRIBUTES",
420 Some("my-custom-key=env-val,k2=value2"),
421 || {
422 let reader4 = TestMetricReader::new();
423 let user_provided_resource_config_provider = super::SdkMeterProvider::builder()
424 .with_reader(reader4)
425 .with_resource(
426 Resource::builder()
427 .with_attributes([
428 KeyValue::new("my-custom-key", "my-custom-value"),
429 KeyValue::new("my-custom-key2", "my-custom-value2"),
430 ])
431 .build(),
432 )
433 .build();
434 assert_resource(
435 &user_provided_resource_config_provider,
436 SERVICE_NAME,
437 Some("unknown_service"),
438 );
439 assert_resource(
440 &user_provided_resource_config_provider,
441 "my-custom-key",
442 Some("my-custom-value"),
443 );
444 assert_resource(
445 &user_provided_resource_config_provider,
446 "my-custom-key2",
447 Some("my-custom-value2"),
448 );
449 assert_resource(
450 &user_provided_resource_config_provider,
451 "k2",
452 Some("value2"),
453 );
454 assert_telemetry_resource(&user_provided_resource_config_provider);
455 assert_eq!(
456 user_provided_resource_config_provider.inner.pipes.0[0]
457 .resource
458 .len(),
459 7
460 );
461 },
462 );
463
464 let reader5 = TestMetricReader::new();
466 let no_service_name = super::SdkMeterProvider::builder()
467 .with_reader(reader5)
468 .with_resource(Resource::empty())
469 .build();
470
471 assert_eq!(no_service_name.inner.pipes.0[0].resource.len(), 0)
472 }
473
474 #[test]
475 fn test_meter_provider_shutdown() {
476 let reader = TestMetricReader::new();
477 let provider = super::SdkMeterProvider::builder()
478 .with_reader(reader.clone())
479 .build();
480 global::set_meter_provider(provider.clone());
481 assert!(!reader.is_shutdown());
482 let meter = global::meter("test");
484 let counter = meter.u64_counter("test_counter").build();
485 let shutdown_res = provider.shutdown();
487 assert!(shutdown_res.is_ok());
488
489 let shutdown_res = provider.shutdown();
491 assert!(matches!(shutdown_res, Err(OTelSdkError::AlreadyShutdown)));
492
493 assert!(shutdown_res.is_err());
494 assert!(reader.is_shutdown());
495 counter.add(1, &[]);
498 }
499 #[test]
500 fn test_shutdown_invoked_on_last_drop() {
501 let reader = TestMetricReader::new();
502 let provider = super::SdkMeterProvider::builder()
503 .with_reader(reader.clone())
504 .build();
505 let clone1 = provider.clone();
506 let clone2 = provider.clone();
507
508 assert!(!reader.is_shutdown());
510
511 drop(clone1);
513 assert!(!reader.is_shutdown());
514
515 drop(clone2);
517 assert!(!reader.is_shutdown());
518
519 drop(provider);
521 assert!(reader.is_shutdown());
523 }
524
525 #[test]
526 fn same_meter_reused_same_scope() {
527 let provider = super::SdkMeterProvider::builder().build();
528 let _meter1 = provider.meter("test");
529 let _meter2 = provider.meter("test");
530 assert_eq!(provider.inner.meters.lock().unwrap().len(), 1);
531
532 let scope = InstrumentationScope::builder("test")
533 .with_version("1.0.0")
534 .with_schema_url("http://example.com")
535 .build();
536
537 let _meter3 = provider.meter_with_scope(scope.clone());
538 let _meter4 = provider.meter_with_scope(scope.clone());
539 let _meter5 = provider.meter_with_scope(scope);
540 assert_eq!(provider.inner.meters.lock().unwrap().len(), 2);
541
542 let make_scope = |name| {
544 InstrumentationScope::builder(name)
545 .with_version("1.0.0")
546 .with_schema_url("http://example.com")
547 .build()
548 };
549
550 let _meter6 = provider.meter_with_scope(make_scope("ABC"));
551 let _meter7 = provider.meter_with_scope(make_scope("Abc"));
552 let _meter8 = provider.meter_with_scope(make_scope("abc"));
553
554 assert_eq!(provider.inner.meters.lock().unwrap().len(), 5);
555 }
556}