opentelemetry_sdk/metrics/
pipeline.rs

1use core::fmt;
2use std::{
3    borrow::Cow,
4    collections::{HashMap, HashSet},
5    sync::{Arc, Mutex},
6};
7
8use opentelemetry::{otel_debug, InstrumentationScope, KeyValue};
9
10use crate::{
11    error::{OTelSdkError, OTelSdkResult},
12    metrics::{
13        aggregation,
14        data::{Metric, ResourceMetrics, ScopeMetrics},
15        instrument::{Instrument, InstrumentId, InstrumentKind, Stream},
16        internal::{self, AggregateBuilder, Number},
17        reader::{MetricReader, SdkProducer},
18        view::View,
19        MetricError, MetricResult,
20    },
21    Resource,
22};
23
24use self::internal::AggregateFns;
25
26use super::{Aggregation, Temporality};
27
28/// Connects all of the instruments created by a meter provider to a [MetricReader].
29///
30/// This is the object that will be registered when a meter provider is
31/// created.
32///
33/// As instruments are created the instrument should be checked if it exists in
34/// the views of a the reader, and if so each aggregate function should be added
35/// to the pipeline.
36#[doc(hidden)]
37pub struct Pipeline {
38    pub(crate) resource: Resource,
39    reader: Box<dyn MetricReader>,
40    views: Vec<Arc<dyn View>>,
41    inner: Mutex<PipelineInner>,
42}
43
44impl fmt::Debug for Pipeline {
45    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46        f.write_str("Pipeline")
47    }
48}
49
50/// Single or multi-instrument callbacks
51type GenericCallback = Arc<dyn Fn() + Send + Sync>;
52
53#[derive(Default)]
54struct PipelineInner {
55    aggregations: HashMap<InstrumentationScope, Vec<InstrumentSync>>,
56    callbacks: Vec<GenericCallback>,
57}
58
59impl fmt::Debug for PipelineInner {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        f.debug_struct("PipelineInner")
62            .field("aggregations", &self.aggregations)
63            .field("callbacks", &self.callbacks.len())
64            .finish()
65    }
66}
67
68impl Pipeline {
69    /// Adds the [InstrumentSync] to pipeline with scope.
70    ///
71    /// This method is not idempotent. Duplicate calls will result in duplicate
72    /// additions, it is the callers responsibility to ensure this is called with
73    /// unique values.
74    fn add_sync(&self, scope: InstrumentationScope, i_sync: InstrumentSync) {
75        let _ = self.inner.lock().map(|mut inner| {
76            otel_debug!(
77                name : "InstrumentCreated",
78                instrument_name = i_sync.name.as_ref(),
79            );
80            inner.aggregations.entry(scope).or_default().push(i_sync);
81        });
82    }
83
84    /// Registers a single instrument callback to be run when `produce` is called.
85    fn add_callback(&self, callback: GenericCallback) {
86        let _ = self
87            .inner
88            .lock()
89            .map(|mut inner| inner.callbacks.push(callback));
90    }
91
92    /// Send accumulated telemetry
93    fn force_flush(&self) -> OTelSdkResult {
94        self.reader.force_flush()
95    }
96
97    /// Shut down pipeline
98    fn shutdown(&self) -> OTelSdkResult {
99        self.reader.shutdown()
100    }
101}
102
103impl SdkProducer for Pipeline {
104    /// Returns aggregated metrics from a single collection.
105    fn produce(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
106        let inner = self.inner.lock()?;
107        otel_debug!(
108            name: "MeterProviderInvokingObservableCallbacks",
109            count =  inner.callbacks.len(),
110        );
111        for cb in &inner.callbacks {
112            // TODO consider parallel callbacks.
113            cb();
114        }
115
116        rm.resource = self.resource.clone();
117        if inner.aggregations.len() > rm.scope_metrics.len() {
118            rm.scope_metrics
119                .reserve(inner.aggregations.len() - rm.scope_metrics.len());
120        }
121
122        let mut i = 0;
123        for (scope, instruments) in inner.aggregations.iter() {
124            let sm = match rm.scope_metrics.get_mut(i) {
125                Some(sm) => sm,
126                None => {
127                    rm.scope_metrics.push(ScopeMetrics::default());
128                    rm.scope_metrics.last_mut().unwrap()
129                }
130            };
131            if instruments.len() > sm.metrics.len() {
132                sm.metrics.reserve(instruments.len() - sm.metrics.len());
133            }
134
135            let mut j = 0;
136            for inst in instruments {
137                let mut m = sm.metrics.get_mut(j);
138                match (inst.comp_agg.call(m.as_mut().map(|m| m.data.as_mut())), m) {
139                    // No metric to re-use, expect agg to create new metric data
140                    ((len, Some(initial_agg)), None) if len > 0 => sm.metrics.push(Metric {
141                        name: inst.name.clone(),
142                        description: inst.description.clone(),
143                        unit: inst.unit.clone(),
144                        data: initial_agg,
145                    }),
146                    // Existing metric can be re-used, update its values
147                    ((len, data), Some(prev_agg)) if len > 0 => {
148                        if let Some(data) = data {
149                            // previous aggregation was of a different type
150                            prev_agg.data = data;
151                        }
152                        prev_agg.name.clone_from(&inst.name);
153                        prev_agg.description.clone_from(&inst.description);
154                        prev_agg.unit.clone_from(&inst.unit);
155                    }
156                    _ => continue,
157                }
158
159                j += 1;
160            }
161
162            sm.metrics.truncate(j);
163            if !sm.metrics.is_empty() {
164                sm.scope = scope.clone();
165                i += 1;
166            }
167        }
168
169        rm.scope_metrics.truncate(i);
170
171        Ok(())
172    }
173}
174
175/// A synchronization point between a [Pipeline] and an instrument's aggregate function.
176struct InstrumentSync {
177    name: Cow<'static, str>,
178    description: Cow<'static, str>,
179    unit: Cow<'static, str>,
180    comp_agg: Arc<dyn internal::ComputeAggregation>,
181}
182
183impl fmt::Debug for InstrumentSync {
184    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185        f.debug_struct("InstrumentSync")
186            .field("name", &self.name)
187            .field("description", &self.description)
188            .field("unit", &self.unit)
189            .finish()
190    }
191}
192
193type Cache<T> = Mutex<HashMap<InstrumentId, MetricResult<Option<Arc<dyn internal::Measure<T>>>>>>;
194
195/// Facilitates inserting of new instruments from a single scope into a pipeline.
196struct Inserter<T> {
197    /// A cache that holds aggregate function inputs whose
198    /// outputs have been inserted into the underlying reader pipeline.
199    ///
200    /// This cache ensures no duplicate aggregate functions are inserted into
201    /// the reader pipeline and if a new request during an instrument creation
202    /// asks for the same aggregate function input the same instance is
203    /// returned.
204    aggregators: Cache<T>,
205
206    /// A cache that holds instrument identifiers for all the instruments a [Meter] has
207    /// created.
208    ///
209    /// It is provided from the `Meter` that owns this inserter. This cache ensures
210    /// that during the creation of instruments with the same name but different
211    /// options (e.g. description, unit) a warning message is logged.
212    views: Arc<Mutex<HashMap<Cow<'static, str>, InstrumentId>>>,
213
214    pipeline: Arc<Pipeline>,
215}
216
217impl<T> Inserter<T>
218where
219    T: Number,
220{
221    fn new(p: Arc<Pipeline>, vc: Arc<Mutex<HashMap<Cow<'static, str>, InstrumentId>>>) -> Self {
222        Inserter {
223            aggregators: Default::default(),
224            views: vc,
225            pipeline: Arc::clone(&p),
226        }
227    }
228
229    /// Inserts the provided instrument into a pipeline.
230    ///
231    /// All views the pipeline contains are matched against, and any matching view
232    /// that creates a unique [Aggregator] will be inserted into the pipeline and
233    /// included in the returned list.
234    ///
235    /// The returned aggregate functions are ensured to be deduplicated and unique.
236    /// If another view in another pipeline that is cached by this inserter's cache
237    /// has already inserted the same aggregate function for the same instrument,
238    /// that function's instance is returned.
239    ///
240    /// If another instrument has already been inserted by this inserter, or any
241    /// other using the same cache, and it conflicts with the instrument being
242    /// inserted in this call, an aggregate function matching the arguments will
243    /// still be returned but a log message will also be logged to the OTel global
244    /// logger.
245    ///
246    /// If the passed instrument would result in an incompatible aggregate function,
247    /// an error is returned and that aggregate function is not inserted or
248    /// returned.
249    ///
250    /// If an instrument is determined to use a [aggregation::Aggregation::Drop],
251    /// that instrument is not inserted nor returned.
252    fn instrument(
253        &self,
254        inst: Instrument,
255        boundaries: Option<&[f64]>,
256    ) -> MetricResult<Vec<Arc<dyn internal::Measure<T>>>> {
257        let mut matched = false;
258        let mut measures = vec![];
259        let mut errs = vec![];
260        let kind = match inst.kind {
261            Some(kind) => kind,
262            None => return Err(MetricError::Other("instrument must have a kind".into())),
263        };
264
265        // The cache will return the same Aggregator instance. Use stream ids to de duplicate.
266        let mut seen = HashSet::new();
267        for v in &self.pipeline.views {
268            let stream = match v.match_inst(&inst) {
269                Some(stream) => stream,
270                None => continue,
271            };
272            matched = true;
273
274            let id = self.inst_id(kind, &stream);
275            if seen.contains(&id) {
276                continue; // This aggregator has already been added
277            }
278
279            let agg = match self.cached_aggregator(&inst.scope, kind, stream) {
280                Ok(Some(agg)) => agg,
281                Ok(None) => continue, // Drop aggregator.
282                Err(err) => {
283                    errs.push(err);
284                    continue;
285                }
286            };
287            seen.insert(id);
288            measures.push(agg);
289        }
290
291        if matched {
292            if errs.is_empty() {
293                return Ok(measures);
294            } else {
295                return Err(MetricError::Other(format!("{errs:?}")));
296            }
297        }
298
299        // Apply implicit default view if no explicit matched.
300        let mut stream = Stream {
301            name: inst.name,
302            description: inst.description,
303            unit: inst.unit,
304            aggregation: None,
305            allowed_attribute_keys: None,
306        };
307
308        // Override default histogram boundaries if provided.
309        if let Some(boundaries) = boundaries {
310            stream.aggregation = Some(Aggregation::ExplicitBucketHistogram {
311                boundaries: boundaries.to_vec(),
312                record_min_max: true,
313            });
314        }
315
316        match self.cached_aggregator(&inst.scope, kind, stream) {
317            Ok(agg) => {
318                if errs.is_empty() {
319                    if let Some(agg) = agg {
320                        measures.push(agg);
321                    }
322                    Ok(measures)
323                } else {
324                    Err(MetricError::Other(format!("{errs:?}")))
325                }
326            }
327            Err(err) => {
328                errs.push(err);
329                Err(MetricError::Other(format!("{errs:?}")))
330            }
331        }
332    }
333
334    /// Returns the appropriate aggregate functions for an instrument configuration.
335    ///
336    /// If the exact instrument has been created within the [Scope], that
337    /// aggregate function instance will be returned. Otherwise, a new computed
338    /// aggregate function will be cached and returned.
339    ///
340    /// If the instrument configuration conflicts with an instrument that has
341    /// already been created (e.g. description, unit, data type) a warning will be
342    /// logged with the global OTel logger. A valid new aggregate function for the
343    /// instrument configuration will still be returned without an error.
344    ///
345    /// If the instrument defines an unknown or incompatible aggregation, an error
346    /// is returned.
347    fn cached_aggregator(
348        &self,
349        scope: &InstrumentationScope,
350        kind: InstrumentKind,
351        mut stream: Stream,
352    ) -> MetricResult<Option<Arc<dyn internal::Measure<T>>>> {
353        let mut agg = stream
354            .aggregation
355            .take()
356            .unwrap_or_else(|| default_aggregation_selector(kind));
357
358        // Apply default if stream or reader aggregation returns default
359        if matches!(agg, aggregation::Aggregation::Default) {
360            agg = default_aggregation_selector(kind);
361        }
362
363        if let Err(err) = is_aggregator_compatible(&kind, &agg) {
364            return Err(MetricError::Other(format!(
365                "creating aggregator with instrumentKind: {:?}, aggregation {:?}: {:?}",
366                kind, stream.aggregation, err,
367            )));
368        }
369
370        let mut id = self.inst_id(kind, &stream);
371        // If there is a conflict, the specification says the view should
372        // still be applied and a warning should be logged.
373        self.log_conflict(&id);
374
375        // If there are requests for the same instrument with different name
376        // casing, the first-seen needs to be returned. Use a normalize ID for the
377        // cache lookup to ensure the correct comparison.
378        id.normalize();
379
380        let mut cache = self.aggregators.lock()?;
381
382        let cached = cache.entry(id).or_insert_with(|| {
383            let filter = stream
384                .allowed_attribute_keys
385                .clone()
386                .map(|allowed| Arc::new(move |kv: &KeyValue| allowed.contains(&kv.key)) as Arc<_>);
387
388            let b = AggregateBuilder::new(self.pipeline.reader.temporality(kind), filter);
389            let AggregateFns { measure, collect } = match aggregate_fn(b, &agg, kind) {
390                Ok(Some(inst)) => inst,
391                other => return other.map(|fs| fs.map(|inst| inst.measure)), // Drop aggregator or error
392            };
393
394            self.pipeline.add_sync(
395                scope.clone(),
396                InstrumentSync {
397                    name: stream.name,
398                    description: stream.description,
399                    unit: stream.unit,
400                    comp_agg: collect,
401                },
402            );
403
404            Ok(Some(measure))
405        });
406
407        match cached {
408            Ok(opt) => Ok(opt.clone()),
409            Err(err) => Err(MetricError::Other(err.to_string())),
410        }
411    }
412
413    /// Validates if an instrument with the same name as id has already been created.
414    ///
415    /// If that instrument conflicts with id, a warning is logged.
416    fn log_conflict(&self, id: &InstrumentId) {
417        if let Ok(views) = self.views.lock() {
418            if let Some(existing) = views.get(id.name.to_lowercase().as_str()) {
419                if existing == id {
420                    return;
421                }
422                // If an existing instrument with the same name but different attributes is found,
423                // log a warning with details about the conflicting metric stream definitions.
424                otel_debug!(
425                    name: "Instrument.DuplicateMetricStreamDefinitions",
426                    message = "duplicate metric stream definitions",
427                    reason = format!("names: ({} and {}), descriptions: ({} and {}), kinds: ({:?} and {:?}), units: ({:?} and {:?}), and numbers: ({} and {})",
428                    existing.name, id.name,
429                    existing.description, id.description,
430                    existing.kind, id.kind,
431                    existing.unit, id.unit,
432                    existing.number, id.number,)
433                );
434            }
435        }
436    }
437
438    fn inst_id(&self, kind: InstrumentKind, stream: &Stream) -> InstrumentId {
439        InstrumentId {
440            name: stream.name.clone(),
441            description: stream.description.clone(),
442            kind,
443            unit: stream.unit.clone(),
444            number: Cow::Borrowed(std::any::type_name::<T>()),
445        }
446    }
447}
448
449/// The default aggregation and parameters for an instrument of [InstrumentKind].
450///
451/// This aggregation selector uses the following selection mapping per [the spec]:
452///
453/// * Counter ⇨ Sum
454/// * Observable Counter ⇨ Sum
455/// * UpDownCounter ⇨ Sum
456/// * Observable UpDownCounter ⇨ Sum
457/// * Gauge ⇨ LastValue
458/// * Observable Gauge ⇨ LastValue
459/// * Histogram ⇨ ExplicitBucketHistogram
460///
461/// [the spec]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.19.0/specification/metrics/sdk.md#default-aggregation
462fn default_aggregation_selector(kind: InstrumentKind) -> Aggregation {
463    match kind {
464        InstrumentKind::Counter
465        | InstrumentKind::UpDownCounter
466        | InstrumentKind::ObservableCounter
467        | InstrumentKind::ObservableUpDownCounter => Aggregation::Sum,
468        InstrumentKind::Gauge => Aggregation::LastValue,
469        InstrumentKind::ObservableGauge => Aggregation::LastValue,
470        InstrumentKind::Histogram => Aggregation::ExplicitBucketHistogram {
471            boundaries: vec![
472                0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0,
473                5000.0, 7500.0, 10000.0,
474            ],
475            record_min_max: true,
476        },
477    }
478}
479
480/// Returns new aggregate functions for the given params.
481///
482/// If the aggregation is unknown or temporality is invalid, an error is returned.
483fn aggregate_fn<T: Number>(
484    b: AggregateBuilder<T>,
485    agg: &aggregation::Aggregation,
486    kind: InstrumentKind,
487) -> MetricResult<Option<AggregateFns<T>>> {
488    match agg {
489        Aggregation::Default => aggregate_fn(b, &default_aggregation_selector(kind), kind),
490        Aggregation::Drop => Ok(None),
491        Aggregation::LastValue => {
492            match kind {
493                InstrumentKind::Gauge => Ok(Some(b.last_value(None))),
494                // temporality for LastValue only affects how data points are reported, so we can always use
495                // delta temporality, because observable instruments should report data points only since previous collection
496                InstrumentKind::ObservableGauge => Ok(Some(b.last_value(Some(Temporality::Delta)))),
497                _ => Err(MetricError::Other(format!("LastValue aggregation is only available for Gauge or ObservableGauge, but not for {kind:?}")))
498            }
499        }
500        Aggregation::Sum => {
501            let fns = match kind {
502                // TODO implement: observable instruments should not report data points on every collect
503                // from SDK: For asynchronous instruments with Delta or Cumulative aggregation temporality,
504                // MetricReader.Collect MUST only receive data points with measurements recorded since the previous collection
505                InstrumentKind::ObservableCounter => b.precomputed_sum(true),
506                InstrumentKind::ObservableUpDownCounter => b.precomputed_sum(false),
507                InstrumentKind::Counter | InstrumentKind::Histogram => b.sum(true),
508                _ => b.sum(false),
509            };
510            Ok(Some(fns))
511        }
512        Aggregation::ExplicitBucketHistogram {
513            boundaries,
514            record_min_max,
515        } => {
516            let record_sum = !matches!(
517                kind,
518                InstrumentKind::UpDownCounter
519                    | InstrumentKind::ObservableUpDownCounter
520                    | InstrumentKind::ObservableGauge
521            );
522            // TODO implement: observable instruments should not report data points on every collect
523            // from SDK: For asynchronous instruments with Delta or Cumulative aggregation temporality,
524            // MetricReader.Collect MUST only receive data points with measurements recorded since the previous collection
525            Ok(Some(b.explicit_bucket_histogram(
526                boundaries.to_vec(),
527                *record_min_max,
528                record_sum,
529            )))
530        }
531        Aggregation::Base2ExponentialHistogram {
532            max_size,
533            max_scale,
534            record_min_max,
535        } => {
536            let record_sum = !matches!(
537                kind,
538                InstrumentKind::UpDownCounter
539                    | InstrumentKind::ObservableUpDownCounter
540                    | InstrumentKind::ObservableGauge
541            );
542            Ok(Some(b.exponential_bucket_histogram(
543                *max_size,
544                *max_scale,
545                *record_min_max,
546                record_sum,
547            )))
548        }
549    }
550}
551
552/// Checks if the aggregation can be used by the instrument.
553///
554/// Current compatibility:
555///
556/// | Instrument Kind          | Drop | LastValue | Sum | Histogram | Exponential Histogram |
557/// |--------------------------|------|-----------|-----|-----------|-----------------------|
558/// | Counter                  | ✓    |           | ✓   | ✓         | ✓                     |
559/// | UpDownCounter            | ✓    |           | ✓   | ✓         | ✓                     |
560/// | Histogram                | ✓    |           | ✓   | ✓         | ✓                     |
561/// | Observable Counter       | ✓    |           | ✓   | ✓         | ✓                     |
562/// | Observable UpDownCounter | ✓    |           | ✓   | ✓         | ✓                     |
563/// | Gauge                    | ✓    | ✓         |     | ✓         | ✓                     |
564/// | Observable Gauge         | ✓    | ✓         |     | ✓         | ✓                     |
565fn is_aggregator_compatible(
566    kind: &InstrumentKind,
567    agg: &aggregation::Aggregation,
568) -> MetricResult<()> {
569    match agg {
570        Aggregation::Default => Ok(()),
571        Aggregation::ExplicitBucketHistogram { .. }
572        | Aggregation::Base2ExponentialHistogram { .. } => {
573            if matches!(
574                kind,
575                InstrumentKind::Counter
576                    | InstrumentKind::UpDownCounter
577                    | InstrumentKind::Gauge
578                    | InstrumentKind::Histogram
579                    | InstrumentKind::ObservableCounter
580                    | InstrumentKind::ObservableUpDownCounter
581                    | InstrumentKind::ObservableGauge
582            ) {
583                return Ok(());
584            }
585            Err(MetricError::Other("incompatible aggregation".into()))
586        }
587        Aggregation::Sum => {
588            match kind {
589                InstrumentKind::ObservableCounter
590                | InstrumentKind::ObservableUpDownCounter
591                | InstrumentKind::Counter
592                | InstrumentKind::Histogram
593                | InstrumentKind::UpDownCounter => Ok(()),
594                _ => {
595                    // TODO: review need for aggregation check after
596                    // https://github.com/open-telemetry/opentelemetry-specification/issues/2710
597                    Err(MetricError::Other("incompatible aggregation".into()))
598                }
599            }
600        }
601        Aggregation::LastValue => {
602            match kind {
603                InstrumentKind::Gauge | InstrumentKind::ObservableGauge => Ok(()),
604                _ => {
605                    // TODO: review need for aggregation check after
606                    // https://github.com/open-telemetry/opentelemetry-specification/issues/2710
607                    Err(MetricError::Other("incompatible aggregation".into()))
608                }
609            }
610        }
611        Aggregation::Drop => Ok(()),
612    }
613}
614
615/// The group of pipelines connecting Readers with instrument measurement.
616#[derive(Clone, Debug)]
617pub(crate) struct Pipelines(pub(crate) Vec<Arc<Pipeline>>);
618
619impl Pipelines {
620    pub(crate) fn new(
621        res: Resource,
622        readers: Vec<Box<dyn MetricReader>>,
623        views: Vec<Arc<dyn View>>,
624    ) -> Self {
625        let mut pipes = Vec::with_capacity(readers.len());
626        for r in readers {
627            let p = Arc::new(Pipeline {
628                resource: res.clone(),
629                reader: r,
630                views: views.clone(),
631                inner: Default::default(),
632            });
633            p.reader.register_pipeline(Arc::downgrade(&p));
634            pipes.push(p);
635        }
636
637        Pipelines(pipes)
638    }
639
640    pub(crate) fn register_callback<F>(&self, callback: F)
641    where
642        F: Fn() + Send + Sync + 'static,
643    {
644        let cb = Arc::new(callback);
645        for pipe in &self.0 {
646            pipe.add_callback(cb.clone())
647        }
648    }
649
650    /// Force flush all pipelines
651    pub(crate) fn force_flush(&self) -> OTelSdkResult {
652        let mut errs = vec![];
653        for pipeline in &self.0 {
654            if let Err(err) = pipeline.force_flush() {
655                errs.push(err);
656            }
657        }
658
659        if errs.is_empty() {
660            Ok(())
661        } else {
662            Err(OTelSdkError::InternalFailure(format!("{errs:?}")))
663        }
664    }
665
666    /// Shut down all pipelines
667    pub(crate) fn shutdown(&self) -> OTelSdkResult {
668        let mut errs = vec![];
669        for pipeline in &self.0 {
670            if let Err(err) = pipeline.shutdown() {
671                errs.push(err);
672            }
673        }
674
675        if errs.is_empty() {
676            Ok(())
677        } else {
678            Err(crate::error::OTelSdkError::InternalFailure(format!(
679                "{errs:?}"
680            )))
681        }
682    }
683}
684
685/// resolver facilitates resolving aggregate functions an instrument calls to
686/// aggregate measurements with while updating all pipelines that need to pull from
687/// those aggregations.
688pub(crate) struct Resolver<T> {
689    inserters: Vec<Inserter<T>>,
690}
691
692impl<T> Resolver<T>
693where
694    T: Number,
695{
696    pub(crate) fn new(
697        pipelines: Arc<Pipelines>,
698        view_cache: Arc<Mutex<HashMap<Cow<'static, str>, InstrumentId>>>,
699    ) -> Self {
700        let inserters = pipelines
701            .0
702            .iter()
703            .map(|pipe| Inserter::new(Arc::clone(pipe), Arc::clone(&view_cache)))
704            .collect();
705
706        Resolver { inserters }
707    }
708
709    /// The measures that must be updated by the instrument defined by key.
710    pub(crate) fn measures(
711        &self,
712        id: Instrument,
713        boundaries: Option<Vec<f64>>,
714    ) -> MetricResult<Vec<Arc<dyn internal::Measure<T>>>> {
715        let (mut measures, mut errs) = (vec![], vec![]);
716
717        for inserter in &self.inserters {
718            match inserter.instrument(id.clone(), boundaries.as_deref()) {
719                Ok(ms) => measures.extend(ms),
720                Err(err) => errs.push(err),
721            }
722        }
723
724        if errs.is_empty() {
725            if measures.is_empty() {
726                // TODO: Emit internal log that measurements from the instrument
727                // are being dropped due to view configuration
728            }
729            Ok(measures)
730        } else {
731            Err(MetricError::Other(format!("{errs:?}")))
732        }
733    }
734}