1use core::fmt;
2use std::{
3 borrow::Cow,
4 collections::{HashMap, HashSet},
5 sync::{Arc, Mutex},
6};
7
8use opentelemetry::{otel_debug, InstrumentationScope, KeyValue};
9
10use crate::{
11 error::{OTelSdkError, OTelSdkResult},
12 metrics::{
13 aggregation,
14 data::{Metric, ResourceMetrics, ScopeMetrics},
15 instrument::{Instrument, InstrumentId, InstrumentKind, Stream},
16 internal::{self, AggregateBuilder, Number},
17 reader::{MetricReader, SdkProducer},
18 view::View,
19 MetricError, MetricResult,
20 },
21 Resource,
22};
23
24use self::internal::AggregateFns;
25
26use super::{Aggregation, Temporality};
27
28#[doc(hidden)]
37pub struct Pipeline {
38 pub(crate) resource: Resource,
39 reader: Box<dyn MetricReader>,
40 views: Vec<Arc<dyn View>>,
41 inner: Mutex<PipelineInner>,
42}
43
44impl fmt::Debug for Pipeline {
45 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46 f.write_str("Pipeline")
47 }
48}
49
50type GenericCallback = Arc<dyn Fn() + Send + Sync>;
52
53#[derive(Default)]
54struct PipelineInner {
55 aggregations: HashMap<InstrumentationScope, Vec<InstrumentSync>>,
56 callbacks: Vec<GenericCallback>,
57}
58
59impl fmt::Debug for PipelineInner {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 f.debug_struct("PipelineInner")
62 .field("aggregations", &self.aggregations)
63 .field("callbacks", &self.callbacks.len())
64 .finish()
65 }
66}
67
68impl Pipeline {
69 fn add_sync(&self, scope: InstrumentationScope, i_sync: InstrumentSync) {
75 let _ = self.inner.lock().map(|mut inner| {
76 otel_debug!(
77 name : "InstrumentCreated",
78 instrument_name = i_sync.name.as_ref(),
79 );
80 inner.aggregations.entry(scope).or_default().push(i_sync);
81 });
82 }
83
84 fn add_callback(&self, callback: GenericCallback) {
86 let _ = self
87 .inner
88 .lock()
89 .map(|mut inner| inner.callbacks.push(callback));
90 }
91
92 fn force_flush(&self) -> OTelSdkResult {
94 self.reader.force_flush()
95 }
96
97 fn shutdown(&self) -> OTelSdkResult {
99 self.reader.shutdown()
100 }
101}
102
103impl SdkProducer for Pipeline {
104 fn produce(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
106 let inner = self.inner.lock()?;
107 otel_debug!(
108 name: "MeterProviderInvokingObservableCallbacks",
109 count = inner.callbacks.len(),
110 );
111 for cb in &inner.callbacks {
112 cb();
114 }
115
116 rm.resource = self.resource.clone();
117 if inner.aggregations.len() > rm.scope_metrics.len() {
118 rm.scope_metrics
119 .reserve(inner.aggregations.len() - rm.scope_metrics.len());
120 }
121
122 let mut i = 0;
123 for (scope, instruments) in inner.aggregations.iter() {
124 let sm = match rm.scope_metrics.get_mut(i) {
125 Some(sm) => sm,
126 None => {
127 rm.scope_metrics.push(ScopeMetrics::default());
128 rm.scope_metrics.last_mut().unwrap()
129 }
130 };
131 if instruments.len() > sm.metrics.len() {
132 sm.metrics.reserve(instruments.len() - sm.metrics.len());
133 }
134
135 let mut j = 0;
136 for inst in instruments {
137 let mut m = sm.metrics.get_mut(j);
138 match (inst.comp_agg.call(m.as_mut().map(|m| m.data.as_mut())), m) {
139 ((len, Some(initial_agg)), None) if len > 0 => sm.metrics.push(Metric {
141 name: inst.name.clone(),
142 description: inst.description.clone(),
143 unit: inst.unit.clone(),
144 data: initial_agg,
145 }),
146 ((len, data), Some(prev_agg)) if len > 0 => {
148 if let Some(data) = data {
149 prev_agg.data = data;
151 }
152 prev_agg.name.clone_from(&inst.name);
153 prev_agg.description.clone_from(&inst.description);
154 prev_agg.unit.clone_from(&inst.unit);
155 }
156 _ => continue,
157 }
158
159 j += 1;
160 }
161
162 sm.metrics.truncate(j);
163 if !sm.metrics.is_empty() {
164 sm.scope = scope.clone();
165 i += 1;
166 }
167 }
168
169 rm.scope_metrics.truncate(i);
170
171 Ok(())
172 }
173}
174
175struct InstrumentSync {
177 name: Cow<'static, str>,
178 description: Cow<'static, str>,
179 unit: Cow<'static, str>,
180 comp_agg: Arc<dyn internal::ComputeAggregation>,
181}
182
183impl fmt::Debug for InstrumentSync {
184 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185 f.debug_struct("InstrumentSync")
186 .field("name", &self.name)
187 .field("description", &self.description)
188 .field("unit", &self.unit)
189 .finish()
190 }
191}
192
193type Cache<T> = Mutex<HashMap<InstrumentId, MetricResult<Option<Arc<dyn internal::Measure<T>>>>>>;
194
195struct Inserter<T> {
197 aggregators: Cache<T>,
205
206 views: Arc<Mutex<HashMap<Cow<'static, str>, InstrumentId>>>,
213
214 pipeline: Arc<Pipeline>,
215}
216
217impl<T> Inserter<T>
218where
219 T: Number,
220{
221 fn new(p: Arc<Pipeline>, vc: Arc<Mutex<HashMap<Cow<'static, str>, InstrumentId>>>) -> Self {
222 Inserter {
223 aggregators: Default::default(),
224 views: vc,
225 pipeline: Arc::clone(&p),
226 }
227 }
228
229 fn instrument(
253 &self,
254 inst: Instrument,
255 boundaries: Option<&[f64]>,
256 ) -> MetricResult<Vec<Arc<dyn internal::Measure<T>>>> {
257 let mut matched = false;
258 let mut measures = vec![];
259 let mut errs = vec![];
260 let kind = match inst.kind {
261 Some(kind) => kind,
262 None => return Err(MetricError::Other("instrument must have a kind".into())),
263 };
264
265 let mut seen = HashSet::new();
267 for v in &self.pipeline.views {
268 let stream = match v.match_inst(&inst) {
269 Some(stream) => stream,
270 None => continue,
271 };
272 matched = true;
273
274 let id = self.inst_id(kind, &stream);
275 if seen.contains(&id) {
276 continue; }
278
279 let agg = match self.cached_aggregator(&inst.scope, kind, stream) {
280 Ok(Some(agg)) => agg,
281 Ok(None) => continue, Err(err) => {
283 errs.push(err);
284 continue;
285 }
286 };
287 seen.insert(id);
288 measures.push(agg);
289 }
290
291 if matched {
292 if errs.is_empty() {
293 return Ok(measures);
294 } else {
295 return Err(MetricError::Other(format!("{errs:?}")));
296 }
297 }
298
299 let mut stream = Stream {
301 name: inst.name,
302 description: inst.description,
303 unit: inst.unit,
304 aggregation: None,
305 allowed_attribute_keys: None,
306 };
307
308 if let Some(boundaries) = boundaries {
310 stream.aggregation = Some(Aggregation::ExplicitBucketHistogram {
311 boundaries: boundaries.to_vec(),
312 record_min_max: true,
313 });
314 }
315
316 match self.cached_aggregator(&inst.scope, kind, stream) {
317 Ok(agg) => {
318 if errs.is_empty() {
319 if let Some(agg) = agg {
320 measures.push(agg);
321 }
322 Ok(measures)
323 } else {
324 Err(MetricError::Other(format!("{errs:?}")))
325 }
326 }
327 Err(err) => {
328 errs.push(err);
329 Err(MetricError::Other(format!("{errs:?}")))
330 }
331 }
332 }
333
334 fn cached_aggregator(
348 &self,
349 scope: &InstrumentationScope,
350 kind: InstrumentKind,
351 mut stream: Stream,
352 ) -> MetricResult<Option<Arc<dyn internal::Measure<T>>>> {
353 let mut agg = stream
354 .aggregation
355 .take()
356 .unwrap_or_else(|| default_aggregation_selector(kind));
357
358 if matches!(agg, aggregation::Aggregation::Default) {
360 agg = default_aggregation_selector(kind);
361 }
362
363 if let Err(err) = is_aggregator_compatible(&kind, &agg) {
364 return Err(MetricError::Other(format!(
365 "creating aggregator with instrumentKind: {:?}, aggregation {:?}: {:?}",
366 kind, stream.aggregation, err,
367 )));
368 }
369
370 let mut id = self.inst_id(kind, &stream);
371 self.log_conflict(&id);
374
375 id.normalize();
379
380 let mut cache = self.aggregators.lock()?;
381
382 let cached = cache.entry(id).or_insert_with(|| {
383 let filter = stream
384 .allowed_attribute_keys
385 .clone()
386 .map(|allowed| Arc::new(move |kv: &KeyValue| allowed.contains(&kv.key)) as Arc<_>);
387
388 let b = AggregateBuilder::new(self.pipeline.reader.temporality(kind), filter);
389 let AggregateFns { measure, collect } = match aggregate_fn(b, &agg, kind) {
390 Ok(Some(inst)) => inst,
391 other => return other.map(|fs| fs.map(|inst| inst.measure)), };
393
394 self.pipeline.add_sync(
395 scope.clone(),
396 InstrumentSync {
397 name: stream.name,
398 description: stream.description,
399 unit: stream.unit,
400 comp_agg: collect,
401 },
402 );
403
404 Ok(Some(measure))
405 });
406
407 match cached {
408 Ok(opt) => Ok(opt.clone()),
409 Err(err) => Err(MetricError::Other(err.to_string())),
410 }
411 }
412
413 fn log_conflict(&self, id: &InstrumentId) {
417 if let Ok(views) = self.views.lock() {
418 if let Some(existing) = views.get(id.name.to_lowercase().as_str()) {
419 if existing == id {
420 return;
421 }
422 otel_debug!(
425 name: "Instrument.DuplicateMetricStreamDefinitions",
426 message = "duplicate metric stream definitions",
427 reason = format!("names: ({} and {}), descriptions: ({} and {}), kinds: ({:?} and {:?}), units: ({:?} and {:?}), and numbers: ({} and {})",
428 existing.name, id.name,
429 existing.description, id.description,
430 existing.kind, id.kind,
431 existing.unit, id.unit,
432 existing.number, id.number,)
433 );
434 }
435 }
436 }
437
438 fn inst_id(&self, kind: InstrumentKind, stream: &Stream) -> InstrumentId {
439 InstrumentId {
440 name: stream.name.clone(),
441 description: stream.description.clone(),
442 kind,
443 unit: stream.unit.clone(),
444 number: Cow::Borrowed(std::any::type_name::<T>()),
445 }
446 }
447}
448
449fn default_aggregation_selector(kind: InstrumentKind) -> Aggregation {
463 match kind {
464 InstrumentKind::Counter
465 | InstrumentKind::UpDownCounter
466 | InstrumentKind::ObservableCounter
467 | InstrumentKind::ObservableUpDownCounter => Aggregation::Sum,
468 InstrumentKind::Gauge => Aggregation::LastValue,
469 InstrumentKind::ObservableGauge => Aggregation::LastValue,
470 InstrumentKind::Histogram => Aggregation::ExplicitBucketHistogram {
471 boundaries: vec![
472 0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0,
473 5000.0, 7500.0, 10000.0,
474 ],
475 record_min_max: true,
476 },
477 }
478}
479
480fn aggregate_fn<T: Number>(
484 b: AggregateBuilder<T>,
485 agg: &aggregation::Aggregation,
486 kind: InstrumentKind,
487) -> MetricResult<Option<AggregateFns<T>>> {
488 match agg {
489 Aggregation::Default => aggregate_fn(b, &default_aggregation_selector(kind), kind),
490 Aggregation::Drop => Ok(None),
491 Aggregation::LastValue => {
492 match kind {
493 InstrumentKind::Gauge => Ok(Some(b.last_value(None))),
494 InstrumentKind::ObservableGauge => Ok(Some(b.last_value(Some(Temporality::Delta)))),
497 _ => Err(MetricError::Other(format!("LastValue aggregation is only available for Gauge or ObservableGauge, but not for {kind:?}")))
498 }
499 }
500 Aggregation::Sum => {
501 let fns = match kind {
502 InstrumentKind::ObservableCounter => b.precomputed_sum(true),
506 InstrumentKind::ObservableUpDownCounter => b.precomputed_sum(false),
507 InstrumentKind::Counter | InstrumentKind::Histogram => b.sum(true),
508 _ => b.sum(false),
509 };
510 Ok(Some(fns))
511 }
512 Aggregation::ExplicitBucketHistogram {
513 boundaries,
514 record_min_max,
515 } => {
516 let record_sum = !matches!(
517 kind,
518 InstrumentKind::UpDownCounter
519 | InstrumentKind::ObservableUpDownCounter
520 | InstrumentKind::ObservableGauge
521 );
522 Ok(Some(b.explicit_bucket_histogram(
526 boundaries.to_vec(),
527 *record_min_max,
528 record_sum,
529 )))
530 }
531 Aggregation::Base2ExponentialHistogram {
532 max_size,
533 max_scale,
534 record_min_max,
535 } => {
536 let record_sum = !matches!(
537 kind,
538 InstrumentKind::UpDownCounter
539 | InstrumentKind::ObservableUpDownCounter
540 | InstrumentKind::ObservableGauge
541 );
542 Ok(Some(b.exponential_bucket_histogram(
543 *max_size,
544 *max_scale,
545 *record_min_max,
546 record_sum,
547 )))
548 }
549 }
550}
551
552fn is_aggregator_compatible(
566 kind: &InstrumentKind,
567 agg: &aggregation::Aggregation,
568) -> MetricResult<()> {
569 match agg {
570 Aggregation::Default => Ok(()),
571 Aggregation::ExplicitBucketHistogram { .. }
572 | Aggregation::Base2ExponentialHistogram { .. } => {
573 if matches!(
574 kind,
575 InstrumentKind::Counter
576 | InstrumentKind::UpDownCounter
577 | InstrumentKind::Gauge
578 | InstrumentKind::Histogram
579 | InstrumentKind::ObservableCounter
580 | InstrumentKind::ObservableUpDownCounter
581 | InstrumentKind::ObservableGauge
582 ) {
583 return Ok(());
584 }
585 Err(MetricError::Other("incompatible aggregation".into()))
586 }
587 Aggregation::Sum => {
588 match kind {
589 InstrumentKind::ObservableCounter
590 | InstrumentKind::ObservableUpDownCounter
591 | InstrumentKind::Counter
592 | InstrumentKind::Histogram
593 | InstrumentKind::UpDownCounter => Ok(()),
594 _ => {
595 Err(MetricError::Other("incompatible aggregation".into()))
598 }
599 }
600 }
601 Aggregation::LastValue => {
602 match kind {
603 InstrumentKind::Gauge | InstrumentKind::ObservableGauge => Ok(()),
604 _ => {
605 Err(MetricError::Other("incompatible aggregation".into()))
608 }
609 }
610 }
611 Aggregation::Drop => Ok(()),
612 }
613}
614
615#[derive(Clone, Debug)]
617pub(crate) struct Pipelines(pub(crate) Vec<Arc<Pipeline>>);
618
619impl Pipelines {
620 pub(crate) fn new(
621 res: Resource,
622 readers: Vec<Box<dyn MetricReader>>,
623 views: Vec<Arc<dyn View>>,
624 ) -> Self {
625 let mut pipes = Vec::with_capacity(readers.len());
626 for r in readers {
627 let p = Arc::new(Pipeline {
628 resource: res.clone(),
629 reader: r,
630 views: views.clone(),
631 inner: Default::default(),
632 });
633 p.reader.register_pipeline(Arc::downgrade(&p));
634 pipes.push(p);
635 }
636
637 Pipelines(pipes)
638 }
639
640 pub(crate) fn register_callback<F>(&self, callback: F)
641 where
642 F: Fn() + Send + Sync + 'static,
643 {
644 let cb = Arc::new(callback);
645 for pipe in &self.0 {
646 pipe.add_callback(cb.clone())
647 }
648 }
649
650 pub(crate) fn force_flush(&self) -> OTelSdkResult {
652 let mut errs = vec![];
653 for pipeline in &self.0 {
654 if let Err(err) = pipeline.force_flush() {
655 errs.push(err);
656 }
657 }
658
659 if errs.is_empty() {
660 Ok(())
661 } else {
662 Err(OTelSdkError::InternalFailure(format!("{errs:?}")))
663 }
664 }
665
666 pub(crate) fn shutdown(&self) -> OTelSdkResult {
668 let mut errs = vec![];
669 for pipeline in &self.0 {
670 if let Err(err) = pipeline.shutdown() {
671 errs.push(err);
672 }
673 }
674
675 if errs.is_empty() {
676 Ok(())
677 } else {
678 Err(crate::error::OTelSdkError::InternalFailure(format!(
679 "{errs:?}"
680 )))
681 }
682 }
683}
684
685pub(crate) struct Resolver<T> {
689 inserters: Vec<Inserter<T>>,
690}
691
692impl<T> Resolver<T>
693where
694 T: Number,
695{
696 pub(crate) fn new(
697 pipelines: Arc<Pipelines>,
698 view_cache: Arc<Mutex<HashMap<Cow<'static, str>, InstrumentId>>>,
699 ) -> Self {
700 let inserters = pipelines
701 .0
702 .iter()
703 .map(|pipe| Inserter::new(Arc::clone(pipe), Arc::clone(&view_cache)))
704 .collect();
705
706 Resolver { inserters }
707 }
708
709 pub(crate) fn measures(
711 &self,
712 id: Instrument,
713 boundaries: Option<Vec<f64>>,
714 ) -> MetricResult<Vec<Arc<dyn internal::Measure<T>>>> {
715 let (mut measures, mut errs) = (vec![], vec![]);
716
717 for inserter in &self.inserters {
718 match inserter.instrument(id.clone(), boundaries.as_deref()) {
719 Ok(ms) => measures.extend(ms),
720 Err(err) => errs.push(err),
721 }
722 }
723
724 if errs.is_empty() {
725 if measures.is_empty() {
726 }
729 Ok(measures)
730 } else {
731 Err(MetricError::Other(format!("{errs:?}")))
732 }
733 }
734}