opentelemetry_sdk/metrics/internal/
aggregate.rs

1use std::{
2    marker,
3    mem::replace,
4    ops::DerefMut,
5    sync::{Arc, Mutex},
6    time::SystemTime,
7};
8
9use crate::metrics::{data::Aggregation, Temporality};
10use opentelemetry::time::now;
11use opentelemetry::KeyValue;
12
13use super::{
14    exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue,
15    precomputed_sum::PrecomputedSum, sum::Sum, Number,
16};
17
18pub(crate) const STREAM_CARDINALITY_LIMIT: usize = 2000;
19
20/// Checks whether aggregator has hit cardinality limit for metric streams
21pub(crate) fn is_under_cardinality_limit(_size: usize) -> bool {
22    true
23
24    // TODO: Implement this feature, after allowing the ability to customize the cardinality limit.
25    // size < STREAM_CARDINALITY_LIMIT
26}
27
28/// Receives measurements to be aggregated.
29pub(crate) trait Measure<T>: Send + Sync + 'static {
30    fn call(&self, measurement: T, attrs: &[KeyValue]);
31}
32
33/// Stores the aggregate of measurements into the aggregation and returns the number
34/// of aggregate data-points output.
35pub(crate) trait ComputeAggregation: Send + Sync + 'static {
36    /// Compute the new aggregation and store in `dest`.
37    ///
38    /// If no initial aggregation exists, `dest` will be `None`, in which case the
39    /// returned option is expected to contain a new aggregation with the data from
40    /// the current collection cycle.
41    fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>);
42}
43
44/// Separate `measure` and `collect` functions for an aggregate.
45pub(crate) struct AggregateFns<T> {
46    pub(crate) measure: Arc<dyn Measure<T>>,
47    pub(crate) collect: Arc<dyn ComputeAggregation>,
48}
49
50/// Creates aggregate functions out of aggregate instance
51impl<A, T> From<A> for AggregateFns<T>
52where
53    A: Measure<T> + ComputeAggregation,
54{
55    fn from(value: A) -> Self {
56        let inst = Arc::new(value);
57        Self {
58            measure: inst.clone(),
59            collect: inst,
60        }
61    }
62}
63
64pub(crate) struct AggregateTime {
65    pub start: SystemTime,
66    pub current: SystemTime,
67}
68
69/// Initialized [`AggregateTime`] for specific [`Temporality`]
70pub(crate) struct AggregateTimeInitiator(Mutex<SystemTime>);
71
72impl AggregateTimeInitiator {
73    pub(crate) fn delta(&self) -> AggregateTime {
74        let current_time = now();
75        let start_time = self
76            .0
77            .lock()
78            .map(|mut start| replace(start.deref_mut(), current_time))
79            .unwrap_or(current_time);
80        AggregateTime {
81            start: start_time,
82            current: current_time,
83        }
84    }
85
86    pub(crate) fn cumulative(&self) -> AggregateTime {
87        let current_time = now();
88        let start_time = self.0.lock().map(|start| *start).unwrap_or(current_time);
89        AggregateTime {
90            start: start_time,
91            current: current_time,
92        }
93    }
94}
95
96impl Default for AggregateTimeInitiator {
97    fn default() -> Self {
98        Self(Mutex::new(now()))
99    }
100}
101
102type Filter = Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>;
103
104/// Applies filter on provided attribute set
105/// No-op, if filter is not set
106#[derive(Clone)]
107pub(crate) struct AttributeSetFilter {
108    filter: Option<Filter>,
109}
110
111impl AttributeSetFilter {
112    pub(crate) fn new(filter: Option<Filter>) -> Self {
113        Self { filter }
114    }
115
116    pub(crate) fn apply(&self, attrs: &[KeyValue], run: impl FnOnce(&[KeyValue])) {
117        if let Some(filter) = &self.filter {
118            let filtered_attrs: Vec<KeyValue> =
119                attrs.iter().filter(|kv| filter(kv)).cloned().collect();
120            run(&filtered_attrs);
121        } else {
122            run(attrs);
123        };
124    }
125}
126
127/// Builds aggregate functions
128pub(crate) struct AggregateBuilder<T> {
129    /// The temporality used for the returned aggregate functions.
130    temporality: Temporality,
131
132    /// The attribute filter the aggregate function will use on the input of
133    /// measurements.
134    filter: AttributeSetFilter,
135
136    _marker: marker::PhantomData<T>,
137}
138
139impl<T: Number> AggregateBuilder<T> {
140    pub(crate) fn new(temporality: Temporality, filter: Option<Filter>) -> Self {
141        AggregateBuilder {
142            temporality,
143            filter: AttributeSetFilter::new(filter),
144            _marker: marker::PhantomData,
145        }
146    }
147
148    /// Builds a last-value aggregate function input and output.
149    pub(crate) fn last_value(&self, overwrite_temporality: Option<Temporality>) -> AggregateFns<T> {
150        LastValue::new(
151            overwrite_temporality.unwrap_or(self.temporality),
152            self.filter.clone(),
153        )
154        .into()
155    }
156
157    /// Builds a precomputed sum aggregate function input and output.
158    pub(crate) fn precomputed_sum(&self, monotonic: bool) -> AggregateFns<T> {
159        PrecomputedSum::new(self.temporality, self.filter.clone(), monotonic).into()
160    }
161
162    /// Builds a sum aggregate function input and output.
163    pub(crate) fn sum(&self, monotonic: bool) -> AggregateFns<T> {
164        Sum::new(self.temporality, self.filter.clone(), monotonic).into()
165    }
166
167    /// Builds a histogram aggregate function input and output.
168    pub(crate) fn explicit_bucket_histogram(
169        &self,
170        boundaries: Vec<f64>,
171        record_min_max: bool,
172        record_sum: bool,
173    ) -> AggregateFns<T> {
174        Histogram::new(
175            self.temporality,
176            self.filter.clone(),
177            boundaries,
178            record_min_max,
179            record_sum,
180        )
181        .into()
182    }
183
184    /// Builds an exponential histogram aggregate function input and output.
185    pub(crate) fn exponential_bucket_histogram(
186        &self,
187        max_size: u32,
188        max_scale: i8,
189        record_min_max: bool,
190        record_sum: bool,
191    ) -> AggregateFns<T> {
192        ExpoHistogram::new(
193            self.temporality,
194            self.filter.clone(),
195            max_size,
196            max_scale,
197            record_min_max,
198            record_sum,
199        )
200        .into()
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use crate::metrics::data::{
207        ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint, Gauge,
208        GaugeDataPoint, Histogram, HistogramDataPoint, Sum, SumDataPoint,
209    };
210    use std::vec;
211
212    use super::*;
213
214    #[test]
215    fn last_value_aggregation() {
216        let AggregateFns { measure, collect } =
217            AggregateBuilder::<u64>::new(Temporality::Cumulative, None).last_value(None);
218        let mut a = Gauge {
219            data_points: vec![GaugeDataPoint {
220                attributes: vec![KeyValue::new("a", 1)],
221                value: 1u64,
222                exemplars: vec![],
223            }],
224            start_time: Some(now()),
225            time: now(),
226        };
227        let new_attributes = [KeyValue::new("b", 2)];
228        measure.call(2, &new_attributes[..]);
229
230        let (count, new_agg) = collect.call(Some(&mut a));
231
232        assert_eq!(count, 1);
233        assert!(new_agg.is_none());
234        assert_eq!(a.data_points.len(), 1);
235        assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
236        assert_eq!(a.data_points[0].value, 2);
237    }
238
239    #[test]
240    fn precomputed_sum_aggregation() {
241        for temporality in [Temporality::Delta, Temporality::Cumulative] {
242            let AggregateFns { measure, collect } =
243                AggregateBuilder::<u64>::new(temporality, None).precomputed_sum(true);
244            let mut a = Sum {
245                data_points: vec![
246                    SumDataPoint {
247                        attributes: vec![KeyValue::new("a1", 1)],
248                        value: 1u64,
249                        exemplars: vec![],
250                    },
251                    SumDataPoint {
252                        attributes: vec![KeyValue::new("a2", 1)],
253                        value: 2u64,
254                        exemplars: vec![],
255                    },
256                ],
257                start_time: now(),
258                time: now(),
259                temporality: if temporality == Temporality::Delta {
260                    Temporality::Cumulative
261                } else {
262                    Temporality::Delta
263                },
264                is_monotonic: false,
265            };
266            let new_attributes = [KeyValue::new("b", 2)];
267            measure.call(3, &new_attributes[..]);
268
269            let (count, new_agg) = collect.call(Some(&mut a));
270
271            assert_eq!(count, 1);
272            assert!(new_agg.is_none());
273            assert_eq!(a.temporality, temporality);
274            assert!(a.is_monotonic);
275            assert_eq!(a.data_points.len(), 1);
276            assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
277            assert_eq!(a.data_points[0].value, 3);
278        }
279    }
280
281    #[test]
282    fn sum_aggregation() {
283        for temporality in [Temporality::Delta, Temporality::Cumulative] {
284            let AggregateFns { measure, collect } =
285                AggregateBuilder::<u64>::new(temporality, None).sum(true);
286            let mut a = Sum {
287                data_points: vec![
288                    SumDataPoint {
289                        attributes: vec![KeyValue::new("a1", 1)],
290                        value: 1u64,
291                        exemplars: vec![],
292                    },
293                    SumDataPoint {
294                        attributes: vec![KeyValue::new("a2", 1)],
295                        value: 2u64,
296                        exemplars: vec![],
297                    },
298                ],
299                start_time: now(),
300                time: now(),
301                temporality: if temporality == Temporality::Delta {
302                    Temporality::Cumulative
303                } else {
304                    Temporality::Delta
305                },
306                is_monotonic: false,
307            };
308            let new_attributes = [KeyValue::new("b", 2)];
309            measure.call(3, &new_attributes[..]);
310
311            let (count, new_agg) = collect.call(Some(&mut a));
312
313            assert_eq!(count, 1);
314            assert!(new_agg.is_none());
315            assert_eq!(a.temporality, temporality);
316            assert!(a.is_monotonic);
317            assert_eq!(a.data_points.len(), 1);
318            assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
319            assert_eq!(a.data_points[0].value, 3);
320        }
321    }
322
323    #[test]
324    fn explicit_bucket_histogram_aggregation() {
325        for temporality in [Temporality::Delta, Temporality::Cumulative] {
326            let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None)
327                .explicit_bucket_histogram(vec![1.0], true, true);
328            let mut a = Histogram {
329                data_points: vec![HistogramDataPoint {
330                    attributes: vec![KeyValue::new("a1", 1)],
331                    count: 2,
332                    bounds: vec![1.0, 2.0],
333                    bucket_counts: vec![0, 1, 1],
334                    min: None,
335                    max: None,
336                    sum: 3u64,
337                    exemplars: vec![],
338                }],
339                start_time: now(),
340                time: now(),
341                temporality: if temporality == Temporality::Delta {
342                    Temporality::Cumulative
343                } else {
344                    Temporality::Delta
345                },
346            };
347            let new_attributes = [KeyValue::new("b", 2)];
348            measure.call(3, &new_attributes[..]);
349
350            let (count, new_agg) = collect.call(Some(&mut a));
351
352            assert_eq!(count, 1);
353            assert!(new_agg.is_none());
354            assert_eq!(a.temporality, temporality);
355            assert_eq!(a.data_points.len(), 1);
356            assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
357            assert_eq!(a.data_points[0].count, 1);
358            assert_eq!(a.data_points[0].bounds, vec![1.0]);
359            assert_eq!(a.data_points[0].bucket_counts, vec![0, 1]);
360            assert_eq!(a.data_points[0].min, Some(3));
361            assert_eq!(a.data_points[0].max, Some(3));
362            assert_eq!(a.data_points[0].sum, 3);
363        }
364    }
365
366    #[test]
367    fn exponential_histogram_aggregation() {
368        for temporality in [Temporality::Delta, Temporality::Cumulative] {
369            let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None)
370                .exponential_bucket_histogram(4, 20, true, true);
371            let mut a = ExponentialHistogram {
372                data_points: vec![ExponentialHistogramDataPoint {
373                    attributes: vec![KeyValue::new("a1", 1)],
374                    count: 2,
375                    min: None,
376                    max: None,
377                    sum: 3u64,
378                    scale: 10,
379                    zero_count: 1,
380                    positive_bucket: ExponentialBucket {
381                        offset: 1,
382                        counts: vec![1],
383                    },
384                    negative_bucket: ExponentialBucket {
385                        offset: 1,
386                        counts: vec![1],
387                    },
388                    zero_threshold: 1.0,
389                    exemplars: vec![],
390                }],
391                start_time: now(),
392                time: now(),
393                temporality: if temporality == Temporality::Delta {
394                    Temporality::Cumulative
395                } else {
396                    Temporality::Delta
397                },
398            };
399            let new_attributes = [KeyValue::new("b", 2)];
400            measure.call(3, &new_attributes[..]);
401
402            let (count, new_agg) = collect.call(Some(&mut a));
403
404            assert_eq!(count, 1);
405            assert!(new_agg.is_none());
406            assert_eq!(a.temporality, temporality);
407            assert_eq!(a.data_points.len(), 1);
408            assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
409            assert_eq!(a.data_points[0].count, 1);
410            assert_eq!(a.data_points[0].min, Some(3));
411            assert_eq!(a.data_points[0].max, Some(3));
412            assert_eq!(a.data_points[0].sum, 3);
413            assert_eq!(a.data_points[0].zero_count, 0);
414            assert_eq!(a.data_points[0].zero_threshold, 0.0);
415            assert_eq!(a.data_points[0].positive_bucket.offset, 1661953);
416            assert_eq!(a.data_points[0].positive_bucket.counts, vec![1]);
417            assert_eq!(a.data_points[0].negative_bucket.offset, 0);
418            assert!(a.data_points[0].negative_bucket.counts.is_empty());
419        }
420    }
421}