opentelemetry_sdk/metrics/internal/
histogram.rs1use std::mem::replace;
2use std::ops::DerefMut;
3use std::sync::Mutex;
4
5use crate::metrics::data::HistogramDataPoint;
6use crate::metrics::data::{self, Aggregation};
7use crate::metrics::Temporality;
8use opentelemetry::KeyValue;
9
10use super::aggregate::AggregateTimeInitiator;
11use super::aggregate::AttributeSetFilter;
12use super::ComputeAggregation;
13use super::Measure;
14use super::ValueMap;
15use super::{Aggregator, Number};
16
17impl<T> Aggregator for Mutex<Buckets<T>>
18where
19 T: Number,
20{
21 type InitConfig = usize;
22 type PreComputedValue = (T, usize);
24
25 fn update(&self, (value, index): (T, usize)) {
26 let mut buckets = self.lock().unwrap_or_else(|err| err.into_inner());
27
28 buckets.total += value;
29 buckets.count += 1;
30 buckets.counts[index] += 1;
31 if value < buckets.min {
32 buckets.min = value;
33 }
34 if value > buckets.max {
35 buckets.max = value
36 }
37 }
38
39 fn create(count: &usize) -> Self {
40 Mutex::new(Buckets::<T>::new(*count))
41 }
42
43 fn clone_and_reset(&self, count: &usize) -> Self {
44 let mut current = self.lock().unwrap_or_else(|err| err.into_inner());
45 Mutex::new(replace(current.deref_mut(), Buckets::new(*count)))
46 }
47}
48
49#[derive(Default)]
50struct Buckets<T> {
51 counts: Vec<u64>,
52 count: u64,
53 total: T,
54 min: T,
55 max: T,
56}
57
58impl<T: Number> Buckets<T> {
59 fn new(n: usize) -> Buckets<T> {
61 Buckets {
62 counts: vec![0; n],
63 min: T::max(),
64 max: T::min(),
65 ..Default::default()
66 }
67 }
68}
69
70pub(crate) struct Histogram<T: Number> {
73 value_map: ValueMap<Mutex<Buckets<T>>>,
74 init_time: AggregateTimeInitiator,
75 temporality: Temporality,
76 filter: AttributeSetFilter,
77 bounds: Vec<f64>,
78 record_min_max: bool,
79 record_sum: bool,
80}
81
82impl<T: Number> Histogram<T> {
83 #[allow(unused_mut)]
84 pub(crate) fn new(
85 temporality: Temporality,
86 filter: AttributeSetFilter,
87 mut bounds: Vec<f64>,
88 record_min_max: bool,
89 record_sum: bool,
90 ) -> Self {
91 #[cfg(feature = "spec_unstable_metrics_views")]
92 {
93 bounds.retain(|v| !v.is_nan());
95 bounds.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));
96 }
97
98 let buckets_count = bounds.len() + 1;
99 Histogram {
100 value_map: ValueMap::new(buckets_count),
101 init_time: AggregateTimeInitiator::default(),
102 temporality,
103 filter,
104 bounds,
105 record_min_max,
106 record_sum,
107 }
108 }
109
110 fn delta(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
111 let time = self.init_time.delta();
112
113 let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
114 let mut new_agg = if h.is_none() {
115 Some(data::Histogram {
116 data_points: vec![],
117 start_time: time.start,
118 time: time.current,
119 temporality: Temporality::Delta,
120 })
121 } else {
122 None
123 };
124 let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
125 h.temporality = Temporality::Delta;
126 h.start_time = time.start;
127 h.time = time.current;
128
129 self.value_map
130 .collect_and_reset(&mut h.data_points, |attributes, aggr| {
131 let b = aggr.into_inner().unwrap_or_else(|err| err.into_inner());
132 HistogramDataPoint {
133 attributes,
134 count: b.count,
135 bounds: self.bounds.clone(),
136 bucket_counts: b.counts,
137 sum: if self.record_sum {
138 b.total
139 } else {
140 T::default()
141 },
142 min: if self.record_min_max {
143 Some(b.min)
144 } else {
145 None
146 },
147 max: if self.record_min_max {
148 Some(b.max)
149 } else {
150 None
151 },
152 exemplars: vec![],
153 }
154 });
155
156 (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
157 }
158
159 fn cumulative(
160 &self,
161 dest: Option<&mut dyn Aggregation>,
162 ) -> (usize, Option<Box<dyn Aggregation>>) {
163 let time = self.init_time.cumulative();
164 let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
165 let mut new_agg = if h.is_none() {
166 Some(data::Histogram {
167 data_points: vec![],
168 start_time: time.start,
169 time: time.current,
170 temporality: Temporality::Cumulative,
171 })
172 } else {
173 None
174 };
175 let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
176 h.temporality = Temporality::Cumulative;
177 h.start_time = time.start;
178 h.time = time.current;
179
180 self.value_map
181 .collect_readonly(&mut h.data_points, |attributes, aggr| {
182 let b = aggr.lock().unwrap_or_else(|err| err.into_inner());
183 HistogramDataPoint {
184 attributes,
185 count: b.count,
186 bounds: self.bounds.clone(),
187 bucket_counts: b.counts.clone(),
188 sum: if self.record_sum {
189 b.total
190 } else {
191 T::default()
192 },
193 min: if self.record_min_max {
194 Some(b.min)
195 } else {
196 None
197 },
198 max: if self.record_min_max {
199 Some(b.max)
200 } else {
201 None
202 },
203 exemplars: vec![],
204 }
205 });
206
207 (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
208 }
209}
210
211impl<T> Measure<T> for Histogram<T>
212where
213 T: Number,
214{
215 fn call(&self, measurement: T, attrs: &[KeyValue]) {
216 let f = measurement.into_float();
217 let index = self.bounds.partition_point(|&x| x < f);
223
224 self.filter.apply(attrs, |filtered| {
225 self.value_map.measure((measurement, index), filtered);
226 })
227 }
228}
229
230impl<T> ComputeAggregation for Histogram<T>
231where
232 T: Number,
233{
234 fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
235 match self.temporality {
236 Temporality::Delta => self.delta(dest),
237 _ => self.cumulative(dest),
238 }
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use super::*;
245
246 #[test]
247 fn check_buckets_are_selected_correctly() {
248 let hist = Histogram::<i64>::new(
249 Temporality::Cumulative,
250 AttributeSetFilter::new(None),
251 vec![1.0, 3.0, 6.0],
252 false,
253 false,
254 );
255 for v in 1..11 {
256 Measure::call(&hist, v, &[]);
257 }
258 let (count, dp) = ComputeAggregation::call(&hist, None);
259 let dp = dp.unwrap();
260 let dp = dp.as_any().downcast_ref::<data::Histogram<i64>>().unwrap();
261 assert_eq!(count, 1);
262 assert_eq!(dp.data_points[0].count, 10);
263 assert_eq!(dp.data_points[0].bucket_counts.len(), 4);
264 assert_eq!(dp.data_points[0].bucket_counts[0], 1); assert_eq!(dp.data_points[0].bucket_counts[1], 2); assert_eq!(dp.data_points[0].bucket_counts[2], 3); assert_eq!(dp.data_points[0].bucket_counts[3], 4); }
269}