opentelemetry_sdk/metrics/internal/
last_value.rs

1use crate::metrics::{
2    data::{self, Aggregation, Gauge, GaugeDataPoint},
3    Temporality,
4};
5use opentelemetry::KeyValue;
6
7use super::{
8    aggregate::{AggregateTimeInitiator, AttributeSetFilter},
9    Aggregator, AtomicTracker, AtomicallyUpdate, ComputeAggregation, Measure, Number, ValueMap,
10};
11
12/// this is reused by PrecomputedSum
13pub(crate) struct Assign<T>
14where
15    T: AtomicallyUpdate<T>,
16{
17    pub(crate) value: T::AtomicTracker,
18}
19
20impl<T> Aggregator for Assign<T>
21where
22    T: Number,
23{
24    type InitConfig = ();
25    type PreComputedValue = T;
26
27    fn create(_init: &()) -> Self {
28        Self {
29            value: T::new_atomic_tracker(T::default()),
30        }
31    }
32
33    fn update(&self, value: T) {
34        self.value.store(value)
35    }
36
37    fn clone_and_reset(&self, _: &()) -> Self {
38        Self {
39            value: T::new_atomic_tracker(self.value.get_and_reset_value()),
40        }
41    }
42}
43
44/// Summarizes a set of measurements as the last one made.
45pub(crate) struct LastValue<T: Number> {
46    value_map: ValueMap<Assign<T>>,
47    init_time: AggregateTimeInitiator,
48    temporality: Temporality,
49    filter: AttributeSetFilter,
50}
51
52impl<T: Number> LastValue<T> {
53    pub(crate) fn new(temporality: Temporality, filter: AttributeSetFilter) -> Self {
54        LastValue {
55            value_map: ValueMap::new(()),
56            init_time: AggregateTimeInitiator::default(),
57            temporality,
58            filter,
59        }
60    }
61
62    pub(crate) fn delta(
63        &self,
64        dest: Option<&mut dyn Aggregation>,
65    ) -> (usize, Option<Box<dyn Aggregation>>) {
66        let time = self.init_time.delta();
67
68        let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<Gauge<T>>());
69        let mut new_agg = if s_data.is_none() {
70            Some(data::Gauge {
71                data_points: vec![],
72                start_time: Some(time.start),
73                time: time.current,
74            })
75        } else {
76            None
77        };
78        let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
79        s_data.start_time = Some(time.start);
80        s_data.time = time.current;
81
82        self.value_map
83            .collect_and_reset(&mut s_data.data_points, |attributes, aggr| GaugeDataPoint {
84                attributes,
85                value: aggr.value.get_value(),
86                exemplars: vec![],
87            });
88
89        (
90            s_data.data_points.len(),
91            new_agg.map(|a| Box::new(a) as Box<_>),
92        )
93    }
94
95    pub(crate) fn cumulative(
96        &self,
97        dest: Option<&mut dyn Aggregation>,
98    ) -> (usize, Option<Box<dyn Aggregation>>) {
99        let time = self.init_time.cumulative();
100        let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<Gauge<T>>());
101        let mut new_agg = if s_data.is_none() {
102            Some(data::Gauge {
103                data_points: vec![],
104                start_time: Some(time.start),
105                time: time.current,
106            })
107        } else {
108            None
109        };
110        let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
111
112        s_data.start_time = Some(time.start);
113        s_data.time = time.current;
114
115        self.value_map
116            .collect_readonly(&mut s_data.data_points, |attributes, aggr| GaugeDataPoint {
117                attributes,
118                value: aggr.value.get_value(),
119                exemplars: vec![],
120            });
121
122        (
123            s_data.data_points.len(),
124            new_agg.map(|a| Box::new(a) as Box<_>),
125        )
126    }
127}
128
129impl<T> Measure<T> for LastValue<T>
130where
131    T: Number,
132{
133    fn call(&self, measurement: T, attrs: &[KeyValue]) {
134        self.filter.apply(attrs, |filtered| {
135            self.value_map.measure(measurement, filtered);
136        })
137    }
138}
139
140impl<T> ComputeAggregation for LastValue<T>
141where
142    T: Number,
143{
144    fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
145        match self.temporality {
146            Temporality::Delta => self.delta(dest),
147            _ => self.cumulative(dest),
148        }
149    }
150}