opentelemetry_sdk/metrics/internal/
last_value.rs1use crate::metrics::{
2 data::{self, Aggregation, Gauge, GaugeDataPoint},
3 Temporality,
4};
5use opentelemetry::KeyValue;
6
7use super::{
8 aggregate::{AggregateTimeInitiator, AttributeSetFilter},
9 Aggregator, AtomicTracker, AtomicallyUpdate, ComputeAggregation, Measure, Number, ValueMap,
10};
11
12pub(crate) struct Assign<T>
14where
15 T: AtomicallyUpdate<T>,
16{
17 pub(crate) value: T::AtomicTracker,
18}
19
20impl<T> Aggregator for Assign<T>
21where
22 T: Number,
23{
24 type InitConfig = ();
25 type PreComputedValue = T;
26
27 fn create(_init: &()) -> Self {
28 Self {
29 value: T::new_atomic_tracker(T::default()),
30 }
31 }
32
33 fn update(&self, value: T) {
34 self.value.store(value)
35 }
36
37 fn clone_and_reset(&self, _: &()) -> Self {
38 Self {
39 value: T::new_atomic_tracker(self.value.get_and_reset_value()),
40 }
41 }
42}
43
44pub(crate) struct LastValue<T: Number> {
46 value_map: ValueMap<Assign<T>>,
47 init_time: AggregateTimeInitiator,
48 temporality: Temporality,
49 filter: AttributeSetFilter,
50}
51
52impl<T: Number> LastValue<T> {
53 pub(crate) fn new(temporality: Temporality, filter: AttributeSetFilter) -> Self {
54 LastValue {
55 value_map: ValueMap::new(()),
56 init_time: AggregateTimeInitiator::default(),
57 temporality,
58 filter,
59 }
60 }
61
62 pub(crate) fn delta(
63 &self,
64 dest: Option<&mut dyn Aggregation>,
65 ) -> (usize, Option<Box<dyn Aggregation>>) {
66 let time = self.init_time.delta();
67
68 let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<Gauge<T>>());
69 let mut new_agg = if s_data.is_none() {
70 Some(data::Gauge {
71 data_points: vec![],
72 start_time: Some(time.start),
73 time: time.current,
74 })
75 } else {
76 None
77 };
78 let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
79 s_data.start_time = Some(time.start);
80 s_data.time = time.current;
81
82 self.value_map
83 .collect_and_reset(&mut s_data.data_points, |attributes, aggr| GaugeDataPoint {
84 attributes,
85 value: aggr.value.get_value(),
86 exemplars: vec![],
87 });
88
89 (
90 s_data.data_points.len(),
91 new_agg.map(|a| Box::new(a) as Box<_>),
92 )
93 }
94
95 pub(crate) fn cumulative(
96 &self,
97 dest: Option<&mut dyn Aggregation>,
98 ) -> (usize, Option<Box<dyn Aggregation>>) {
99 let time = self.init_time.cumulative();
100 let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<Gauge<T>>());
101 let mut new_agg = if s_data.is_none() {
102 Some(data::Gauge {
103 data_points: vec![],
104 start_time: Some(time.start),
105 time: time.current,
106 })
107 } else {
108 None
109 };
110 let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
111
112 s_data.start_time = Some(time.start);
113 s_data.time = time.current;
114
115 self.value_map
116 .collect_readonly(&mut s_data.data_points, |attributes, aggr| GaugeDataPoint {
117 attributes,
118 value: aggr.value.get_value(),
119 exemplars: vec![],
120 });
121
122 (
123 s_data.data_points.len(),
124 new_agg.map(|a| Box::new(a) as Box<_>),
125 )
126 }
127}
128
129impl<T> Measure<T> for LastValue<T>
130where
131 T: Number,
132{
133 fn call(&self, measurement: T, attrs: &[KeyValue]) {
134 self.filter.apply(attrs, |filtered| {
135 self.value_map.measure(measurement, filtered);
136 })
137 }
138}
139
140impl<T> ComputeAggregation for LastValue<T>
141where
142 T: Number,
143{
144 fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
145 match self.temporality {
146 Temporality::Delta => self.delta(dest),
147 _ => self.cumulative(dest),
148 }
149 }
150}