opentelemetry_sdk/metrics/internal/
histogram.rs

1use std::mem::replace;
2use std::ops::DerefMut;
3use std::sync::Mutex;
4
5use crate::metrics::data::HistogramDataPoint;
6use crate::metrics::data::{self, Aggregation};
7use crate::metrics::Temporality;
8use opentelemetry::KeyValue;
9
10use super::aggregate::AggregateTimeInitiator;
11use super::aggregate::AttributeSetFilter;
12use super::ComputeAggregation;
13use super::Measure;
14use super::ValueMap;
15use super::{Aggregator, Number};
16
17impl<T> Aggregator for Mutex<Buckets<T>>
18where
19    T: Number,
20{
21    type InitConfig = usize;
22    /// Value and bucket index
23    type PreComputedValue = (T, usize);
24
25    fn update(&self, (value, index): (T, usize)) {
26        let mut buckets = self.lock().unwrap_or_else(|err| err.into_inner());
27
28        buckets.total += value;
29        buckets.count += 1;
30        buckets.counts[index] += 1;
31        if value < buckets.min {
32            buckets.min = value;
33        }
34        if value > buckets.max {
35            buckets.max = value
36        }
37    }
38
39    fn create(count: &usize) -> Self {
40        Mutex::new(Buckets::<T>::new(*count))
41    }
42
43    fn clone_and_reset(&self, count: &usize) -> Self {
44        let mut current = self.lock().unwrap_or_else(|err| err.into_inner());
45        Mutex::new(replace(current.deref_mut(), Buckets::new(*count)))
46    }
47}
48
49#[derive(Default)]
50struct Buckets<T> {
51    counts: Vec<u64>,
52    count: u64,
53    total: T,
54    min: T,
55    max: T,
56}
57
58impl<T: Number> Buckets<T> {
59    /// returns buckets with `n` bins.
60    fn new(n: usize) -> Buckets<T> {
61        Buckets {
62            counts: vec![0; n],
63            min: T::max(),
64            max: T::min(),
65            ..Default::default()
66        }
67    }
68}
69
70/// Summarizes a set of measurements as a histogram with explicitly defined
71/// buckets.
72pub(crate) struct Histogram<T: Number> {
73    value_map: ValueMap<Mutex<Buckets<T>>>,
74    init_time: AggregateTimeInitiator,
75    temporality: Temporality,
76    filter: AttributeSetFilter,
77    bounds: Vec<f64>,
78    record_min_max: bool,
79    record_sum: bool,
80}
81
82impl<T: Number> Histogram<T> {
83    #[allow(unused_mut)]
84    pub(crate) fn new(
85        temporality: Temporality,
86        filter: AttributeSetFilter,
87        mut bounds: Vec<f64>,
88        record_min_max: bool,
89        record_sum: bool,
90    ) -> Self {
91        #[cfg(feature = "spec_unstable_metrics_views")]
92        {
93            // TODO: When views are used, validate this upfront
94            bounds.retain(|v| !v.is_nan());
95            bounds.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));
96        }
97
98        let buckets_count = bounds.len() + 1;
99        Histogram {
100            value_map: ValueMap::new(buckets_count),
101            init_time: AggregateTimeInitiator::default(),
102            temporality,
103            filter,
104            bounds,
105            record_min_max,
106            record_sum,
107        }
108    }
109
110    fn delta(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
111        let time = self.init_time.delta();
112
113        let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
114        let mut new_agg = if h.is_none() {
115            Some(data::Histogram {
116                data_points: vec![],
117                start_time: time.start,
118                time: time.current,
119                temporality: Temporality::Delta,
120            })
121        } else {
122            None
123        };
124        let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
125        h.temporality = Temporality::Delta;
126        h.start_time = time.start;
127        h.time = time.current;
128
129        self.value_map
130            .collect_and_reset(&mut h.data_points, |attributes, aggr| {
131                let b = aggr.into_inner().unwrap_or_else(|err| err.into_inner());
132                HistogramDataPoint {
133                    attributes,
134                    count: b.count,
135                    bounds: self.bounds.clone(),
136                    bucket_counts: b.counts,
137                    sum: if self.record_sum {
138                        b.total
139                    } else {
140                        T::default()
141                    },
142                    min: if self.record_min_max {
143                        Some(b.min)
144                    } else {
145                        None
146                    },
147                    max: if self.record_min_max {
148                        Some(b.max)
149                    } else {
150                        None
151                    },
152                    exemplars: vec![],
153                }
154            });
155
156        (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
157    }
158
159    fn cumulative(
160        &self,
161        dest: Option<&mut dyn Aggregation>,
162    ) -> (usize, Option<Box<dyn Aggregation>>) {
163        let time = self.init_time.cumulative();
164        let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
165        let mut new_agg = if h.is_none() {
166            Some(data::Histogram {
167                data_points: vec![],
168                start_time: time.start,
169                time: time.current,
170                temporality: Temporality::Cumulative,
171            })
172        } else {
173            None
174        };
175        let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
176        h.temporality = Temporality::Cumulative;
177        h.start_time = time.start;
178        h.time = time.current;
179
180        self.value_map
181            .collect_readonly(&mut h.data_points, |attributes, aggr| {
182                let b = aggr.lock().unwrap_or_else(|err| err.into_inner());
183                HistogramDataPoint {
184                    attributes,
185                    count: b.count,
186                    bounds: self.bounds.clone(),
187                    bucket_counts: b.counts.clone(),
188                    sum: if self.record_sum {
189                        b.total
190                    } else {
191                        T::default()
192                    },
193                    min: if self.record_min_max {
194                        Some(b.min)
195                    } else {
196                        None
197                    },
198                    max: if self.record_min_max {
199                        Some(b.max)
200                    } else {
201                        None
202                    },
203                    exemplars: vec![],
204                }
205            });
206
207        (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
208    }
209}
210
211impl<T> Measure<T> for Histogram<T>
212where
213    T: Number,
214{
215    fn call(&self, measurement: T, attrs: &[KeyValue]) {
216        let f = measurement.into_float();
217        // This search will return an index in the range `[0, bounds.len()]`, where
218        // it will return `bounds.len()` if value is greater than the last element
219        // of `bounds`. This aligns with the buckets in that the length of buckets
220        // is `bounds.len()+1`, with the last bucket representing:
221        // `(bounds[bounds.len()-1], +∞)`.
222        let index = self.bounds.partition_point(|&x| x < f);
223
224        self.filter.apply(attrs, |filtered| {
225            self.value_map.measure((measurement, index), filtered);
226        })
227    }
228}
229
230impl<T> ComputeAggregation for Histogram<T>
231where
232    T: Number,
233{
234    fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
235        match self.temporality {
236            Temporality::Delta => self.delta(dest),
237            _ => self.cumulative(dest),
238        }
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use super::*;
245
246    #[test]
247    fn check_buckets_are_selected_correctly() {
248        let hist = Histogram::<i64>::new(
249            Temporality::Cumulative,
250            AttributeSetFilter::new(None),
251            vec![1.0, 3.0, 6.0],
252            false,
253            false,
254        );
255        for v in 1..11 {
256            Measure::call(&hist, v, &[]);
257        }
258        let (count, dp) = ComputeAggregation::call(&hist, None);
259        let dp = dp.unwrap();
260        let dp = dp.as_any().downcast_ref::<data::Histogram<i64>>().unwrap();
261        assert_eq!(count, 1);
262        assert_eq!(dp.data_points[0].count, 10);
263        assert_eq!(dp.data_points[0].bucket_counts.len(), 4);
264        assert_eq!(dp.data_points[0].bucket_counts[0], 1); // 1
265        assert_eq!(dp.data_points[0].bucket_counts[1], 2); // 2, 3
266        assert_eq!(dp.data_points[0].bucket_counts[2], 3); // 4, 5, 6
267        assert_eq!(dp.data_points[0].bucket_counts[3], 4); // 7, 8, 9, 10
268    }
269}