opentelemetry_sdk/metrics/internal/
precomputed_sum.rs

1use opentelemetry::KeyValue;
2
3use crate::metrics::data::{self, Aggregation, Sum, SumDataPoint};
4use crate::metrics::Temporality;
5
6use super::aggregate::{AggregateTimeInitiator, AttributeSetFilter};
7use super::{last_value::Assign, AtomicTracker, Number, ValueMap};
8use super::{ComputeAggregation, Measure};
9use std::{collections::HashMap, sync::Mutex};
10
11/// Summarizes a set of pre-computed sums as their arithmetic sum.
12pub(crate) struct PrecomputedSum<T: Number> {
13    value_map: ValueMap<Assign<T>>,
14    init_time: AggregateTimeInitiator,
15    temporality: Temporality,
16    filter: AttributeSetFilter,
17    monotonic: bool,
18    reported: Mutex<HashMap<Vec<KeyValue>, T>>,
19}
20
21impl<T: Number> PrecomputedSum<T> {
22    pub(crate) fn new(
23        temporality: Temporality,
24        filter: AttributeSetFilter,
25        monotonic: bool,
26    ) -> Self {
27        PrecomputedSum {
28            value_map: ValueMap::new(()),
29            init_time: AggregateTimeInitiator::default(),
30            temporality,
31            filter,
32            monotonic,
33            reported: Mutex::new(Default::default()),
34        }
35    }
36
37    pub(crate) fn delta(
38        &self,
39        dest: Option<&mut dyn Aggregation>,
40    ) -> (usize, Option<Box<dyn Aggregation>>) {
41        let time = self.init_time.delta();
42
43        let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<Sum<T>>());
44        let mut new_agg = if s_data.is_none() {
45            Some(data::Sum {
46                data_points: vec![],
47                start_time: time.start,
48                time: time.current,
49                temporality: Temporality::Delta,
50                is_monotonic: self.monotonic,
51            })
52        } else {
53            None
54        };
55        let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
56        s_data.start_time = time.start;
57        s_data.time = time.current;
58        s_data.temporality = Temporality::Delta;
59        s_data.is_monotonic = self.monotonic;
60
61        let mut reported = match self.reported.lock() {
62            Ok(r) => r,
63            Err(_) => return (0, None),
64        };
65        let mut new_reported = HashMap::with_capacity(reported.len());
66
67        self.value_map
68            .collect_and_reset(&mut s_data.data_points, |attributes, aggr| {
69                let value = aggr.value.get_value();
70                new_reported.insert(attributes.clone(), value);
71                let delta = value - *reported.get(&attributes).unwrap_or(&T::default());
72                SumDataPoint {
73                    attributes,
74                    value: delta,
75                    exemplars: vec![],
76                }
77            });
78
79        *reported = new_reported;
80        drop(reported); // drop before values guard is dropped
81
82        (
83            s_data.data_points.len(),
84            new_agg.map(|a| Box::new(a) as Box<_>),
85        )
86    }
87
88    pub(crate) fn cumulative(
89        &self,
90        dest: Option<&mut dyn Aggregation>,
91    ) -> (usize, Option<Box<dyn Aggregation>>) {
92        let time = self.init_time.cumulative();
93
94        let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<Sum<T>>());
95        let mut new_agg = if s_data.is_none() {
96            Some(data::Sum {
97                data_points: vec![],
98                start_time: time.start,
99                time: time.current,
100                temporality: Temporality::Cumulative,
101                is_monotonic: self.monotonic,
102            })
103        } else {
104            None
105        };
106        let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
107        s_data.start_time = time.start;
108        s_data.time = time.current;
109        s_data.temporality = Temporality::Cumulative;
110        s_data.is_monotonic = self.monotonic;
111
112        self.value_map
113            .collect_readonly(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
114                attributes,
115                value: aggr.value.get_value(),
116                exemplars: vec![],
117            });
118
119        (
120            s_data.data_points.len(),
121            new_agg.map(|a| Box::new(a) as Box<_>),
122        )
123    }
124}
125
126impl<T> Measure<T> for PrecomputedSum<T>
127where
128    T: Number,
129{
130    fn call(&self, measurement: T, attrs: &[KeyValue]) {
131        self.filter.apply(attrs, |filtered| {
132            self.value_map.measure(measurement, filtered);
133        })
134    }
135}
136
137impl<T> ComputeAggregation for PrecomputedSum<T>
138where
139    T: Number,
140{
141    fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
142        match self.temporality {
143            Temporality::Delta => self.delta(dest),
144            _ => self.cumulative(dest),
145        }
146    }
147}