opentelemetry_sdk/metrics/internal/
sum.rs

1use crate::metrics::data::{self, Aggregation, SumDataPoint};
2use crate::metrics::Temporality;
3use opentelemetry::KeyValue;
4
5use super::aggregate::{AggregateTimeInitiator, AttributeSetFilter};
6use super::{Aggregator, AtomicTracker, ComputeAggregation, Measure, Number};
7use super::{AtomicallyUpdate, ValueMap};
8
9struct Increment<T>
10where
11    T: AtomicallyUpdate<T>,
12{
13    value: T::AtomicTracker,
14}
15
16impl<T> Aggregator for Increment<T>
17where
18    T: Number,
19{
20    type InitConfig = ();
21    type PreComputedValue = T;
22
23    fn create(_init: &()) -> Self {
24        Self {
25            value: T::new_atomic_tracker(T::default()),
26        }
27    }
28
29    fn update(&self, value: T) {
30        self.value.add(value)
31    }
32
33    fn clone_and_reset(&self, _: &()) -> Self {
34        Self {
35            value: T::new_atomic_tracker(self.value.get_and_reset_value()),
36        }
37    }
38}
39
40/// Summarizes a set of measurements made as their arithmetic sum.
41pub(crate) struct Sum<T: Number> {
42    value_map: ValueMap<Increment<T>>,
43    init_time: AggregateTimeInitiator,
44    temporality: Temporality,
45    filter: AttributeSetFilter,
46    monotonic: bool,
47}
48
49impl<T: Number> Sum<T> {
50    /// Returns an aggregator that summarizes a set of measurements as their
51    /// arithmetic sum.
52    ///
53    /// Each sum is scoped by attributes and the aggregation cycle the measurements
54    /// were made in.
55    pub(crate) fn new(
56        temporality: Temporality,
57        filter: AttributeSetFilter,
58        monotonic: bool,
59    ) -> Self {
60        Sum {
61            value_map: ValueMap::new(()),
62            init_time: AggregateTimeInitiator::default(),
63            temporality,
64            filter,
65            monotonic,
66        }
67    }
68
69    pub(crate) fn delta(
70        &self,
71        dest: Option<&mut dyn Aggregation>,
72    ) -> (usize, Option<Box<dyn Aggregation>>) {
73        let time = self.init_time.delta();
74        let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>());
75        let mut new_agg = if s_data.is_none() {
76            Some(data::Sum {
77                data_points: vec![],
78                start_time: time.start,
79                time: time.current,
80                temporality: Temporality::Delta,
81                is_monotonic: self.monotonic,
82            })
83        } else {
84            None
85        };
86        let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
87        s_data.start_time = time.start;
88        s_data.time = time.current;
89        s_data.temporality = Temporality::Delta;
90        s_data.is_monotonic = self.monotonic;
91
92        self.value_map
93            .collect_and_reset(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
94                attributes,
95                value: aggr.value.get_value(),
96                exemplars: vec![],
97            });
98
99        (
100            s_data.data_points.len(),
101            new_agg.map(|a| Box::new(a) as Box<_>),
102        )
103    }
104
105    pub(crate) fn cumulative(
106        &self,
107        dest: Option<&mut dyn Aggregation>,
108    ) -> (usize, Option<Box<dyn Aggregation>>) {
109        let time = self.init_time.cumulative();
110        let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>());
111        let mut new_agg = if s_data.is_none() {
112            Some(data::Sum {
113                data_points: vec![],
114                start_time: time.start,
115                time: time.current,
116                temporality: Temporality::Cumulative,
117                is_monotonic: self.monotonic,
118            })
119        } else {
120            None
121        };
122        let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
123
124        s_data.start_time = time.start;
125        s_data.time = time.current;
126        s_data.temporality = Temporality::Cumulative;
127        s_data.is_monotonic = self.monotonic;
128
129        self.value_map
130            .collect_readonly(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
131                attributes,
132                value: aggr.value.get_value(),
133                exemplars: vec![],
134            });
135
136        (
137            s_data.data_points.len(),
138            new_agg.map(|a| Box::new(a) as Box<_>),
139        )
140    }
141}
142
143impl<T> Measure<T> for Sum<T>
144where
145    T: Number,
146{
147    fn call(&self, measurement: T, attrs: &[KeyValue]) {
148        self.filter.apply(attrs, |filtered| {
149            self.value_map.measure(measurement, filtered);
150        })
151    }
152}
153
154impl<T> ComputeAggregation for Sum<T>
155where
156    T: Number,
157{
158    fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
159        match self.temporality {
160            Temporality::Delta => self.delta(dest),
161            _ => self.cumulative(dest),
162        }
163    }
164}