opentelemetry_sdk/metrics/internal/
precomputed_sum.rs1use opentelemetry::KeyValue;
2
3use crate::metrics::data::{self, Aggregation, Sum, SumDataPoint};
4use crate::metrics::Temporality;
5
6use super::aggregate::{AggregateTimeInitiator, AttributeSetFilter};
7use super::{last_value::Assign, AtomicTracker, Number, ValueMap};
8use super::{ComputeAggregation, Measure};
9use std::{collections::HashMap, sync::Mutex};
10
11pub(crate) struct PrecomputedSum<T: Number> {
13 value_map: ValueMap<Assign<T>>,
14 init_time: AggregateTimeInitiator,
15 temporality: Temporality,
16 filter: AttributeSetFilter,
17 monotonic: bool,
18 reported: Mutex<HashMap<Vec<KeyValue>, T>>,
19}
20
21impl<T: Number> PrecomputedSum<T> {
22 pub(crate) fn new(
23 temporality: Temporality,
24 filter: AttributeSetFilter,
25 monotonic: bool,
26 ) -> Self {
27 PrecomputedSum {
28 value_map: ValueMap::new(()),
29 init_time: AggregateTimeInitiator::default(),
30 temporality,
31 filter,
32 monotonic,
33 reported: Mutex::new(Default::default()),
34 }
35 }
36
37 pub(crate) fn delta(
38 &self,
39 dest: Option<&mut dyn Aggregation>,
40 ) -> (usize, Option<Box<dyn Aggregation>>) {
41 let time = self.init_time.delta();
42
43 let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<Sum<T>>());
44 let mut new_agg = if s_data.is_none() {
45 Some(data::Sum {
46 data_points: vec![],
47 start_time: time.start,
48 time: time.current,
49 temporality: Temporality::Delta,
50 is_monotonic: self.monotonic,
51 })
52 } else {
53 None
54 };
55 let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
56 s_data.start_time = time.start;
57 s_data.time = time.current;
58 s_data.temporality = Temporality::Delta;
59 s_data.is_monotonic = self.monotonic;
60
61 let mut reported = match self.reported.lock() {
62 Ok(r) => r,
63 Err(_) => return (0, None),
64 };
65 let mut new_reported = HashMap::with_capacity(reported.len());
66
67 self.value_map
68 .collect_and_reset(&mut s_data.data_points, |attributes, aggr| {
69 let value = aggr.value.get_value();
70 new_reported.insert(attributes.clone(), value);
71 let delta = value - *reported.get(&attributes).unwrap_or(&T::default());
72 SumDataPoint {
73 attributes,
74 value: delta,
75 exemplars: vec![],
76 }
77 });
78
79 *reported = new_reported;
80 drop(reported); (
83 s_data.data_points.len(),
84 new_agg.map(|a| Box::new(a) as Box<_>),
85 )
86 }
87
88 pub(crate) fn cumulative(
89 &self,
90 dest: Option<&mut dyn Aggregation>,
91 ) -> (usize, Option<Box<dyn Aggregation>>) {
92 let time = self.init_time.cumulative();
93
94 let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<Sum<T>>());
95 let mut new_agg = if s_data.is_none() {
96 Some(data::Sum {
97 data_points: vec![],
98 start_time: time.start,
99 time: time.current,
100 temporality: Temporality::Cumulative,
101 is_monotonic: self.monotonic,
102 })
103 } else {
104 None
105 };
106 let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
107 s_data.start_time = time.start;
108 s_data.time = time.current;
109 s_data.temporality = Temporality::Cumulative;
110 s_data.is_monotonic = self.monotonic;
111
112 self.value_map
113 .collect_readonly(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
114 attributes,
115 value: aggr.value.get_value(),
116 exemplars: vec![],
117 });
118
119 (
120 s_data.data_points.len(),
121 new_agg.map(|a| Box::new(a) as Box<_>),
122 )
123 }
124}
125
126impl<T> Measure<T> for PrecomputedSum<T>
127where
128 T: Number,
129{
130 fn call(&self, measurement: T, attrs: &[KeyValue]) {
131 self.filter.apply(attrs, |filtered| {
132 self.value_map.measure(measurement, filtered);
133 })
134 }
135}
136
137impl<T> ComputeAggregation for PrecomputedSum<T>
138where
139 T: Number,
140{
141 fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
142 match self.temporality {
143 Temporality::Delta => self.delta(dest),
144 _ => self.cumulative(dest),
145 }
146 }
147}