1use std::{
2 marker,
3 mem::replace,
4 ops::DerefMut,
5 sync::{Arc, Mutex},
6 time::SystemTime,
7};
8
9use crate::metrics::{data::Aggregation, Temporality};
10use opentelemetry::time::now;
11use opentelemetry::KeyValue;
12
13use super::{
14 exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue,
15 precomputed_sum::PrecomputedSum, sum::Sum, Number,
16};
17
18pub(crate) const STREAM_CARDINALITY_LIMIT: usize = 2000;
19
20pub(crate) fn is_under_cardinality_limit(_size: usize) -> bool {
22 true
23
24 }
27
28pub(crate) trait Measure<T>: Send + Sync + 'static {
30 fn call(&self, measurement: T, attrs: &[KeyValue]);
31}
32
33pub(crate) trait ComputeAggregation: Send + Sync + 'static {
36 fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>);
42}
43
44pub(crate) struct AggregateFns<T> {
46 pub(crate) measure: Arc<dyn Measure<T>>,
47 pub(crate) collect: Arc<dyn ComputeAggregation>,
48}
49
50impl<A, T> From<A> for AggregateFns<T>
52where
53 A: Measure<T> + ComputeAggregation,
54{
55 fn from(value: A) -> Self {
56 let inst = Arc::new(value);
57 Self {
58 measure: inst.clone(),
59 collect: inst,
60 }
61 }
62}
63
64pub(crate) struct AggregateTime {
65 pub start: SystemTime,
66 pub current: SystemTime,
67}
68
69pub(crate) struct AggregateTimeInitiator(Mutex<SystemTime>);
71
72impl AggregateTimeInitiator {
73 pub(crate) fn delta(&self) -> AggregateTime {
74 let current_time = now();
75 let start_time = self
76 .0
77 .lock()
78 .map(|mut start| replace(start.deref_mut(), current_time))
79 .unwrap_or(current_time);
80 AggregateTime {
81 start: start_time,
82 current: current_time,
83 }
84 }
85
86 pub(crate) fn cumulative(&self) -> AggregateTime {
87 let current_time = now();
88 let start_time = self.0.lock().map(|start| *start).unwrap_or(current_time);
89 AggregateTime {
90 start: start_time,
91 current: current_time,
92 }
93 }
94}
95
96impl Default for AggregateTimeInitiator {
97 fn default() -> Self {
98 Self(Mutex::new(now()))
99 }
100}
101
102type Filter = Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>;
103
104#[derive(Clone)]
107pub(crate) struct AttributeSetFilter {
108 filter: Option<Filter>,
109}
110
111impl AttributeSetFilter {
112 pub(crate) fn new(filter: Option<Filter>) -> Self {
113 Self { filter }
114 }
115
116 pub(crate) fn apply(&self, attrs: &[KeyValue], run: impl FnOnce(&[KeyValue])) {
117 if let Some(filter) = &self.filter {
118 let filtered_attrs: Vec<KeyValue> =
119 attrs.iter().filter(|kv| filter(kv)).cloned().collect();
120 run(&filtered_attrs);
121 } else {
122 run(attrs);
123 };
124 }
125}
126
127pub(crate) struct AggregateBuilder<T> {
129 temporality: Temporality,
131
132 filter: AttributeSetFilter,
135
136 _marker: marker::PhantomData<T>,
137}
138
139impl<T: Number> AggregateBuilder<T> {
140 pub(crate) fn new(temporality: Temporality, filter: Option<Filter>) -> Self {
141 AggregateBuilder {
142 temporality,
143 filter: AttributeSetFilter::new(filter),
144 _marker: marker::PhantomData,
145 }
146 }
147
148 pub(crate) fn last_value(&self, overwrite_temporality: Option<Temporality>) -> AggregateFns<T> {
150 LastValue::new(
151 overwrite_temporality.unwrap_or(self.temporality),
152 self.filter.clone(),
153 )
154 .into()
155 }
156
157 pub(crate) fn precomputed_sum(&self, monotonic: bool) -> AggregateFns<T> {
159 PrecomputedSum::new(self.temporality, self.filter.clone(), monotonic).into()
160 }
161
162 pub(crate) fn sum(&self, monotonic: bool) -> AggregateFns<T> {
164 Sum::new(self.temporality, self.filter.clone(), monotonic).into()
165 }
166
167 pub(crate) fn explicit_bucket_histogram(
169 &self,
170 boundaries: Vec<f64>,
171 record_min_max: bool,
172 record_sum: bool,
173 ) -> AggregateFns<T> {
174 Histogram::new(
175 self.temporality,
176 self.filter.clone(),
177 boundaries,
178 record_min_max,
179 record_sum,
180 )
181 .into()
182 }
183
184 pub(crate) fn exponential_bucket_histogram(
186 &self,
187 max_size: u32,
188 max_scale: i8,
189 record_min_max: bool,
190 record_sum: bool,
191 ) -> AggregateFns<T> {
192 ExpoHistogram::new(
193 self.temporality,
194 self.filter.clone(),
195 max_size,
196 max_scale,
197 record_min_max,
198 record_sum,
199 )
200 .into()
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use crate::metrics::data::{
207 ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint, Gauge,
208 GaugeDataPoint, Histogram, HistogramDataPoint, Sum, SumDataPoint,
209 };
210 use std::vec;
211
212 use super::*;
213
214 #[test]
215 fn last_value_aggregation() {
216 let AggregateFns { measure, collect } =
217 AggregateBuilder::<u64>::new(Temporality::Cumulative, None).last_value(None);
218 let mut a = Gauge {
219 data_points: vec![GaugeDataPoint {
220 attributes: vec![KeyValue::new("a", 1)],
221 value: 1u64,
222 exemplars: vec![],
223 }],
224 start_time: Some(now()),
225 time: now(),
226 };
227 let new_attributes = [KeyValue::new("b", 2)];
228 measure.call(2, &new_attributes[..]);
229
230 let (count, new_agg) = collect.call(Some(&mut a));
231
232 assert_eq!(count, 1);
233 assert!(new_agg.is_none());
234 assert_eq!(a.data_points.len(), 1);
235 assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
236 assert_eq!(a.data_points[0].value, 2);
237 }
238
239 #[test]
240 fn precomputed_sum_aggregation() {
241 for temporality in [Temporality::Delta, Temporality::Cumulative] {
242 let AggregateFns { measure, collect } =
243 AggregateBuilder::<u64>::new(temporality, None).precomputed_sum(true);
244 let mut a = Sum {
245 data_points: vec![
246 SumDataPoint {
247 attributes: vec![KeyValue::new("a1", 1)],
248 value: 1u64,
249 exemplars: vec![],
250 },
251 SumDataPoint {
252 attributes: vec![KeyValue::new("a2", 1)],
253 value: 2u64,
254 exemplars: vec![],
255 },
256 ],
257 start_time: now(),
258 time: now(),
259 temporality: if temporality == Temporality::Delta {
260 Temporality::Cumulative
261 } else {
262 Temporality::Delta
263 },
264 is_monotonic: false,
265 };
266 let new_attributes = [KeyValue::new("b", 2)];
267 measure.call(3, &new_attributes[..]);
268
269 let (count, new_agg) = collect.call(Some(&mut a));
270
271 assert_eq!(count, 1);
272 assert!(new_agg.is_none());
273 assert_eq!(a.temporality, temporality);
274 assert!(a.is_monotonic);
275 assert_eq!(a.data_points.len(), 1);
276 assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
277 assert_eq!(a.data_points[0].value, 3);
278 }
279 }
280
281 #[test]
282 fn sum_aggregation() {
283 for temporality in [Temporality::Delta, Temporality::Cumulative] {
284 let AggregateFns { measure, collect } =
285 AggregateBuilder::<u64>::new(temporality, None).sum(true);
286 let mut a = Sum {
287 data_points: vec![
288 SumDataPoint {
289 attributes: vec![KeyValue::new("a1", 1)],
290 value: 1u64,
291 exemplars: vec![],
292 },
293 SumDataPoint {
294 attributes: vec![KeyValue::new("a2", 1)],
295 value: 2u64,
296 exemplars: vec![],
297 },
298 ],
299 start_time: now(),
300 time: now(),
301 temporality: if temporality == Temporality::Delta {
302 Temporality::Cumulative
303 } else {
304 Temporality::Delta
305 },
306 is_monotonic: false,
307 };
308 let new_attributes = [KeyValue::new("b", 2)];
309 measure.call(3, &new_attributes[..]);
310
311 let (count, new_agg) = collect.call(Some(&mut a));
312
313 assert_eq!(count, 1);
314 assert!(new_agg.is_none());
315 assert_eq!(a.temporality, temporality);
316 assert!(a.is_monotonic);
317 assert_eq!(a.data_points.len(), 1);
318 assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
319 assert_eq!(a.data_points[0].value, 3);
320 }
321 }
322
323 #[test]
324 fn explicit_bucket_histogram_aggregation() {
325 for temporality in [Temporality::Delta, Temporality::Cumulative] {
326 let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None)
327 .explicit_bucket_histogram(vec![1.0], true, true);
328 let mut a = Histogram {
329 data_points: vec![HistogramDataPoint {
330 attributes: vec![KeyValue::new("a1", 1)],
331 count: 2,
332 bounds: vec![1.0, 2.0],
333 bucket_counts: vec![0, 1, 1],
334 min: None,
335 max: None,
336 sum: 3u64,
337 exemplars: vec![],
338 }],
339 start_time: now(),
340 time: now(),
341 temporality: if temporality == Temporality::Delta {
342 Temporality::Cumulative
343 } else {
344 Temporality::Delta
345 },
346 };
347 let new_attributes = [KeyValue::new("b", 2)];
348 measure.call(3, &new_attributes[..]);
349
350 let (count, new_agg) = collect.call(Some(&mut a));
351
352 assert_eq!(count, 1);
353 assert!(new_agg.is_none());
354 assert_eq!(a.temporality, temporality);
355 assert_eq!(a.data_points.len(), 1);
356 assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
357 assert_eq!(a.data_points[0].count, 1);
358 assert_eq!(a.data_points[0].bounds, vec![1.0]);
359 assert_eq!(a.data_points[0].bucket_counts, vec![0, 1]);
360 assert_eq!(a.data_points[0].min, Some(3));
361 assert_eq!(a.data_points[0].max, Some(3));
362 assert_eq!(a.data_points[0].sum, 3);
363 }
364 }
365
366 #[test]
367 fn exponential_histogram_aggregation() {
368 for temporality in [Temporality::Delta, Temporality::Cumulative] {
369 let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None)
370 .exponential_bucket_histogram(4, 20, true, true);
371 let mut a = ExponentialHistogram {
372 data_points: vec![ExponentialHistogramDataPoint {
373 attributes: vec![KeyValue::new("a1", 1)],
374 count: 2,
375 min: None,
376 max: None,
377 sum: 3u64,
378 scale: 10,
379 zero_count: 1,
380 positive_bucket: ExponentialBucket {
381 offset: 1,
382 counts: vec![1],
383 },
384 negative_bucket: ExponentialBucket {
385 offset: 1,
386 counts: vec![1],
387 },
388 zero_threshold: 1.0,
389 exemplars: vec![],
390 }],
391 start_time: now(),
392 time: now(),
393 temporality: if temporality == Temporality::Delta {
394 Temporality::Cumulative
395 } else {
396 Temporality::Delta
397 },
398 };
399 let new_attributes = [KeyValue::new("b", 2)];
400 measure.call(3, &new_attributes[..]);
401
402 let (count, new_agg) = collect.call(Some(&mut a));
403
404 assert_eq!(count, 1);
405 assert!(new_agg.is_none());
406 assert_eq!(a.temporality, temporality);
407 assert_eq!(a.data_points.len(), 1);
408 assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
409 assert_eq!(a.data_points[0].count, 1);
410 assert_eq!(a.data_points[0].min, Some(3));
411 assert_eq!(a.data_points[0].max, Some(3));
412 assert_eq!(a.data_points[0].sum, 3);
413 assert_eq!(a.data_points[0].zero_count, 0);
414 assert_eq!(a.data_points[0].zero_threshold, 0.0);
415 assert_eq!(a.data_points[0].positive_bucket.offset, 1661953);
416 assert_eq!(a.data_points[0].positive_bucket.counts, vec![1]);
417 assert_eq!(a.data_points[0].negative_bucket.offset, 0);
418 assert!(a.data_points[0].negative_bucket.counts.is_empty());
419 }
420 }
421}