opentelemetry_sdk/metrics/internal/
sum.rs1use crate::metrics::data::{self, Aggregation, SumDataPoint};
2use crate::metrics::Temporality;
3use opentelemetry::KeyValue;
4
5use super::aggregate::{AggregateTimeInitiator, AttributeSetFilter};
6use super::{Aggregator, AtomicTracker, ComputeAggregation, Measure, Number};
7use super::{AtomicallyUpdate, ValueMap};
8
9struct Increment<T>
10where
11 T: AtomicallyUpdate<T>,
12{
13 value: T::AtomicTracker,
14}
15
16impl<T> Aggregator for Increment<T>
17where
18 T: Number,
19{
20 type InitConfig = ();
21 type PreComputedValue = T;
22
23 fn create(_init: &()) -> Self {
24 Self {
25 value: T::new_atomic_tracker(T::default()),
26 }
27 }
28
29 fn update(&self, value: T) {
30 self.value.add(value)
31 }
32
33 fn clone_and_reset(&self, _: &()) -> Self {
34 Self {
35 value: T::new_atomic_tracker(self.value.get_and_reset_value()),
36 }
37 }
38}
39
40pub(crate) struct Sum<T: Number> {
42 value_map: ValueMap<Increment<T>>,
43 init_time: AggregateTimeInitiator,
44 temporality: Temporality,
45 filter: AttributeSetFilter,
46 monotonic: bool,
47}
48
49impl<T: Number> Sum<T> {
50 pub(crate) fn new(
56 temporality: Temporality,
57 filter: AttributeSetFilter,
58 monotonic: bool,
59 ) -> Self {
60 Sum {
61 value_map: ValueMap::new(()),
62 init_time: AggregateTimeInitiator::default(),
63 temporality,
64 filter,
65 monotonic,
66 }
67 }
68
69 pub(crate) fn delta(
70 &self,
71 dest: Option<&mut dyn Aggregation>,
72 ) -> (usize, Option<Box<dyn Aggregation>>) {
73 let time = self.init_time.delta();
74 let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>());
75 let mut new_agg = if s_data.is_none() {
76 Some(data::Sum {
77 data_points: vec![],
78 start_time: time.start,
79 time: time.current,
80 temporality: Temporality::Delta,
81 is_monotonic: self.monotonic,
82 })
83 } else {
84 None
85 };
86 let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
87 s_data.start_time = time.start;
88 s_data.time = time.current;
89 s_data.temporality = Temporality::Delta;
90 s_data.is_monotonic = self.monotonic;
91
92 self.value_map
93 .collect_and_reset(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
94 attributes,
95 value: aggr.value.get_value(),
96 exemplars: vec![],
97 });
98
99 (
100 s_data.data_points.len(),
101 new_agg.map(|a| Box::new(a) as Box<_>),
102 )
103 }
104
105 pub(crate) fn cumulative(
106 &self,
107 dest: Option<&mut dyn Aggregation>,
108 ) -> (usize, Option<Box<dyn Aggregation>>) {
109 let time = self.init_time.cumulative();
110 let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>());
111 let mut new_agg = if s_data.is_none() {
112 Some(data::Sum {
113 data_points: vec![],
114 start_time: time.start,
115 time: time.current,
116 temporality: Temporality::Cumulative,
117 is_monotonic: self.monotonic,
118 })
119 } else {
120 None
121 };
122 let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
123
124 s_data.start_time = time.start;
125 s_data.time = time.current;
126 s_data.temporality = Temporality::Cumulative;
127 s_data.is_monotonic = self.monotonic;
128
129 self.value_map
130 .collect_readonly(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
131 attributes,
132 value: aggr.value.get_value(),
133 exemplars: vec![],
134 });
135
136 (
137 s_data.data_points.len(),
138 new_agg.map(|a| Box::new(a) as Box<_>),
139 )
140 }
141}
142
143impl<T> Measure<T> for Sum<T>
144where
145 T: Number,
146{
147 fn call(&self, measurement: T, attrs: &[KeyValue]) {
148 self.filter.apply(attrs, |filtered| {
149 self.value_map.measure(measurement, filtered);
150 })
151 }
152}
153
154impl<T> ComputeAggregation for Sum<T>
155where
156 T: Number,
157{
158 fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
159 match self.temporality {
160 Temporality::Delta => self.delta(dest),
161 _ => self.cumulative(dest),
162 }
163 }
164}