1use std::{f64::consts::LOG2_E, mem::replace, ops::DerefMut, sync::Mutex};
2
3use opentelemetry::{otel_debug, KeyValue};
4use std::sync::OnceLock;
5
6use crate::metrics::{
7 data::{self, Aggregation, ExponentialHistogram},
8 Temporality,
9};
10
11use super::{
12 aggregate::{AggregateTimeInitiator, AttributeSetFilter},
13 Aggregator, ComputeAggregation, Measure, Number, ValueMap,
14};
15
16pub(crate) const EXPO_MAX_SCALE: i8 = 20;
17pub(crate) const EXPO_MIN_SCALE: i8 = -10;
18
19#[derive(Debug, PartialEq)]
21struct ExpoHistogramDataPoint<T> {
22 max_size: i32,
23 count: usize,
24 min: T,
25 max: T,
26 sum: T,
27 scale: i8,
28 pos_buckets: ExpoBuckets,
29 neg_buckets: ExpoBuckets,
30 zero_count: u64,
31}
32
33impl<T: Number> ExpoHistogramDataPoint<T> {
34 fn new(config: &BucketConfig) -> Self {
35 ExpoHistogramDataPoint {
36 max_size: config.max_size,
37 count: 0,
38 min: T::max(),
39 max: T::min(),
40 sum: T::default(),
41 scale: config.max_scale,
42 pos_buckets: ExpoBuckets::default(),
43 neg_buckets: ExpoBuckets::default(),
44 zero_count: 0,
45 }
46 }
47}
48
49impl<T: Number> ExpoHistogramDataPoint<T> {
50 fn record(&mut self, v: T) {
54 self.count += 1;
55
56 if v < self.min {
57 self.min = v;
58 }
59 if v > self.max {
60 self.max = v;
61 }
62 self.sum += v;
63
64 let abs_v = v.into_float().abs();
65
66 if abs_v == 0.0 {
67 self.zero_count += 1;
68 return;
69 }
70
71 let mut bin = self.get_bin(abs_v);
72
73 let v_is_negative = v < T::default();
74
75 let scale_delta = {
78 let bucket = if v_is_negative {
79 &self.neg_buckets
80 } else {
81 &self.pos_buckets
82 };
83
84 scale_change(
85 self.max_size,
86 bin,
87 bucket.start_bin,
88 bucket.counts.len() as i32,
89 )
90 };
91 if scale_delta > 0 {
92 if (self.scale - scale_delta as i8) < EXPO_MIN_SCALE {
93 otel_debug!(
98 name: "ExponentialHistogramDataPoint.Scale.Underflow",
99 current_scale = self.scale,
100 scale_delta = scale_delta,
101 max_size = self.max_size,
102 min_scale = EXPO_MIN_SCALE,
103 value = format!("{:?}", v),
104 message = "The measurement will be dropped due to scale underflow. Check the histogram configuration"
105 );
106
107 return;
108 }
109 self.scale -= scale_delta as i8;
111 self.pos_buckets.downscale(scale_delta);
112 self.neg_buckets.downscale(scale_delta);
113
114 bin = self.get_bin(abs_v);
115 }
116
117 if v_is_negative {
118 self.neg_buckets.record(bin)
119 } else {
120 self.pos_buckets.record(bin)
121 }
122 }
123
124 fn get_bin(&self, v: f64) -> i32 {
126 let (frac, exp) = frexp(v);
127 if self.scale <= 0 {
128 let mut correction = 1;
130 if frac == 0.5 {
131 correction = 2;
134 }
135 return (exp - correction) >> -self.scale;
136 }
137 (exp << self.scale) + (frac.ln() * scale_factors()[self.scale as usize]) as i32 - 1
138 }
139}
140
141fn scale_change(max_size: i32, bin: i32, start_bin: i32, length: i32) -> u32 {
145 if length == 0 {
146 return 0;
148 }
149
150 let mut low = start_bin;
151 let mut high = bin;
152 if start_bin >= bin {
153 low = bin;
154 high = start_bin + length - 1;
155 }
156
157 let mut count = 0u32;
158 while high - low >= max_size {
159 low >>= 1;
160 high >>= 1;
161 count += 1;
162
163 if count > (EXPO_MAX_SCALE - EXPO_MIN_SCALE) as u32 {
164 return count;
165 }
166 }
167
168 count
169}
170
171static SCALE_FACTORS: OnceLock<[f64; 21]> = OnceLock::new();
173
174#[inline]
176fn scale_factors() -> &'static [f64; 21] {
177 SCALE_FACTORS.get_or_init(|| {
178 [
179 LOG2_E * 2f64.powi(0),
180 LOG2_E * 2f64.powi(1),
181 LOG2_E * 2f64.powi(2),
182 LOG2_E * 2f64.powi(3),
183 LOG2_E * 2f64.powi(4),
184 LOG2_E * 2f64.powi(5),
185 LOG2_E * 2f64.powi(6),
186 LOG2_E * 2f64.powi(7),
187 LOG2_E * 2f64.powi(8),
188 LOG2_E * 2f64.powi(9),
189 LOG2_E * 2f64.powi(10),
190 LOG2_E * 2f64.powi(11),
191 LOG2_E * 2f64.powi(12),
192 LOG2_E * 2f64.powi(13),
193 LOG2_E * 2f64.powi(14),
194 LOG2_E * 2f64.powi(15),
195 LOG2_E * 2f64.powi(16),
196 LOG2_E * 2f64.powi(17),
197 LOG2_E * 2f64.powi(18),
198 LOG2_E * 2f64.powi(19),
199 LOG2_E * 2f64.powi(20),
200 ]
201 })
202}
203
204#[inline(always)]
209fn frexp(x: f64) -> (f64, i32) {
210 let mut y = x.to_bits();
211 let ee = ((y >> 52) & 0x7ff) as i32;
212
213 if ee == 0 {
214 if x != 0.0 {
215 let x1p64 = f64::from_bits(0x43f0000000000000);
216 let (x, e) = frexp(x * x1p64);
217 return (x, e - 64);
218 }
219 return (x, 0);
220 } else if ee == 0x7ff {
221 return (x, 0);
222 }
223
224 let e = ee - 0x3fe;
225 y &= 0x800fffffffffffff;
226 y |= 0x3fe0000000000000;
227
228 (f64::from_bits(y), e)
229}
230
231#[derive(Default, Debug, PartialEq)]
233struct ExpoBuckets {
234 start_bin: i32,
235 counts: Vec<u64>,
236}
237
238impl ExpoBuckets {
239 fn record(&mut self, bin: i32) {
243 if self.counts.is_empty() {
244 self.counts = vec![1];
245 self.start_bin = bin;
246 return;
247 }
248
249 let end_bin = self.start_bin + self.counts.len() as i32 - 1;
250
251 if bin >= self.start_bin && bin <= end_bin {
253 self.counts[(bin - self.start_bin) as usize] += 1;
254 return;
255 }
256
257 if bin < self.start_bin {
259 let mut zeroes = vec![0; (end_bin - bin + 1) as usize];
260 let shift = (self.start_bin - bin) as usize;
261 zeroes[shift..].copy_from_slice(&self.counts);
262 self.counts = zeroes;
263 self.counts[0] = 1;
264 self.start_bin = bin;
265 } else if bin > end_bin {
266 if ((bin - self.start_bin) as usize) < self.counts.capacity() {
268 self.counts.resize((bin - self.start_bin + 1) as usize, 0);
269 self.counts[(bin - self.start_bin) as usize] = 1;
270 return;
271 }
272
273 self.counts.extend(
274 std::iter::repeat(0).take((bin - self.start_bin) as usize - self.counts.len() + 1),
275 );
276 self.counts[(bin - self.start_bin) as usize] = 1
277 }
278 }
279
280 fn downscale(&mut self, delta: u32) {
284 if self.counts.len() <= 1 || delta < 1 {
294 self.start_bin >>= delta;
295 return;
296 }
297
298 let steps = 1 << delta;
299 let mut offset = self.start_bin % steps;
300 offset = (offset + steps) % steps; for i in 1..self.counts.len() {
302 let idx = i + offset as usize;
303 if idx % steps as usize == 0 {
304 self.counts[idx / steps as usize] = self.counts[i];
305 continue;
306 }
307 self.counts[idx / steps as usize] += self.counts[i];
308 }
309
310 let last_idx = (self.counts.len() as i32 - 1 + offset) / steps;
311 self.counts = self.counts[..last_idx as usize + 1].to_vec();
312 self.start_bin >>= delta;
313 }
314}
315
316impl<T> Aggregator for Mutex<ExpoHistogramDataPoint<T>>
317where
318 T: Number,
319{
320 type InitConfig = BucketConfig;
321
322 type PreComputedValue = T;
323
324 fn create(init: &BucketConfig) -> Self {
325 Mutex::new(ExpoHistogramDataPoint::new(init))
326 }
327
328 fn update(&self, value: T) {
329 let mut this = match self.lock() {
330 Ok(guard) => guard,
331 Err(_) => return,
332 };
333 this.record(value);
334 }
335
336 fn clone_and_reset(&self, init: &BucketConfig) -> Self {
337 let mut current = self.lock().unwrap_or_else(|err| err.into_inner());
338 let cloned = replace(current.deref_mut(), ExpoHistogramDataPoint::new(init));
339 Mutex::new(cloned)
340 }
341}
342
343#[derive(Debug, Clone, Copy, PartialEq)]
344struct BucketConfig {
345 max_size: i32,
346 max_scale: i8,
347}
348
349pub(crate) struct ExpoHistogram<T: Number> {
355 value_map: ValueMap<Mutex<ExpoHistogramDataPoint<T>>>,
356 init_time: AggregateTimeInitiator,
357 temporality: Temporality,
358 filter: AttributeSetFilter,
359 record_sum: bool,
360 record_min_max: bool,
361}
362
363impl<T: Number> ExpoHistogram<T> {
364 pub(crate) fn new(
366 temporality: Temporality,
367 filter: AttributeSetFilter,
368 max_size: u32,
369 max_scale: i8,
370 record_min_max: bool,
371 record_sum: bool,
372 ) -> Self {
373 ExpoHistogram {
374 value_map: ValueMap::new(BucketConfig {
375 max_size: max_size as i32,
376 max_scale,
377 }),
378 init_time: AggregateTimeInitiator::default(),
379 temporality,
380 filter,
381 record_sum,
382 record_min_max,
383 }
384 }
385
386 fn delta(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
387 let time = self.init_time.delta();
388
389 let h = dest.and_then(|d| d.as_mut().downcast_mut::<ExponentialHistogram<T>>());
390 let mut new_agg = if h.is_none() {
391 Some(data::ExponentialHistogram {
392 data_points: vec![],
393 start_time: time.start,
394 time: time.current,
395 temporality: Temporality::Delta,
396 })
397 } else {
398 None
399 };
400 let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
401 h.temporality = Temporality::Delta;
402 h.start_time = time.start;
403 h.time = time.current;
404
405 self.value_map
406 .collect_and_reset(&mut h.data_points, |attributes, attr| {
407 let b = attr.into_inner().unwrap_or_else(|err| err.into_inner());
408 data::ExponentialHistogramDataPoint {
409 attributes,
410 count: b.count,
411 min: if self.record_min_max {
412 Some(b.min)
413 } else {
414 None
415 },
416 max: if self.record_min_max {
417 Some(b.max)
418 } else {
419 None
420 },
421 sum: if self.record_sum { b.sum } else { T::default() },
422 scale: b.scale,
423 zero_count: b.zero_count,
424 positive_bucket: data::ExponentialBucket {
425 offset: b.pos_buckets.start_bin,
426 counts: b.pos_buckets.counts,
427 },
428 negative_bucket: data::ExponentialBucket {
429 offset: b.neg_buckets.start_bin,
430 counts: b.neg_buckets.counts,
431 },
432 zero_threshold: 0.0,
433 exemplars: vec![],
434 }
435 });
436
437 (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
438 }
439
440 fn cumulative(
441 &self,
442 dest: Option<&mut dyn Aggregation>,
443 ) -> (usize, Option<Box<dyn Aggregation>>) {
444 let time = self.init_time.cumulative();
445
446 let h = dest.and_then(|d| d.as_mut().downcast_mut::<ExponentialHistogram<T>>());
447 let mut new_agg = if h.is_none() {
448 Some(data::ExponentialHistogram {
449 data_points: vec![],
450 start_time: time.start,
451 time: time.current,
452 temporality: Temporality::Cumulative,
453 })
454 } else {
455 None
456 };
457 let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
458 h.temporality = Temporality::Cumulative;
459 h.start_time = time.start;
460 h.time = time.current;
461
462 self.value_map
463 .collect_readonly(&mut h.data_points, |attributes, attr| {
464 let b = attr.lock().unwrap_or_else(|err| err.into_inner());
465 data::ExponentialHistogramDataPoint {
466 attributes,
467 count: b.count,
468 min: if self.record_min_max {
469 Some(b.min)
470 } else {
471 None
472 },
473 max: if self.record_min_max {
474 Some(b.max)
475 } else {
476 None
477 },
478 sum: if self.record_sum { b.sum } else { T::default() },
479 scale: b.scale,
480 zero_count: b.zero_count,
481 positive_bucket: data::ExponentialBucket {
482 offset: b.pos_buckets.start_bin,
483 counts: b.pos_buckets.counts.clone(),
484 },
485 negative_bucket: data::ExponentialBucket {
486 offset: b.neg_buckets.start_bin,
487 counts: b.neg_buckets.counts.clone(),
488 },
489 zero_threshold: 0.0,
490 exemplars: vec![],
491 }
492 });
493
494 (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
495 }
496}
497
498impl<T> Measure<T> for ExpoHistogram<T>
499where
500 T: Number,
501{
502 fn call(&self, measurement: T, attrs: &[KeyValue]) {
503 let f_value = measurement.into_float();
504 if !f_value.is_finite() {
507 return;
508 }
509
510 self.filter.apply(attrs, |filtered| {
511 self.value_map.measure(measurement, filtered);
512 })
513 }
514}
515
516impl<T> ComputeAggregation for ExpoHistogram<T>
517where
518 T: Number,
519{
520 fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
521 match self.temporality {
522 Temporality::Delta => self.delta(dest),
523 _ => self.cumulative(dest),
524 }
525 }
526}
527#[cfg(test)]
528mod tests {
529 use data::{ExponentialHistogram, Gauge, Histogram, Sum};
530 use opentelemetry::time::now;
531 use std::ops::Neg;
532 use tests::internal::AggregateFns;
533
534 use crate::metrics::internal::{self, AggregateBuilder};
535
536 use super::*;
537
538 #[test]
539 fn test_expo_histogram_data_point_record() {
540 run_data_point_record::<f64>();
541 run_data_point_record_f64();
542 run_min_max_sum_f64();
543 run_min_max_sum::<i64>();
544 run_min_max_sum::<u64>();
545 run_data_point_record::<i64>();
546 }
547
548 fn run_data_point_record<T: Number + Neg<Output = T> + From<u32>>() {
549 struct TestCase<T> {
550 max_size: i32,
551 values: Vec<T>,
552 expected_buckets: ExpoBuckets,
553 expected_scale: i8,
554 }
555 let test_cases: Vec<TestCase<T>> = vec![
556 TestCase {
557 max_size: 4,
558 values: vec![2, 4, 1].into_iter().map(Into::into).collect(),
559 expected_buckets: ExpoBuckets {
560 start_bin: -1,
561 counts: vec![1, 1, 1],
562 },
563 expected_scale: 0,
564 },
565 TestCase {
566 max_size: 4,
567 values: vec![4, 4, 4, 2, 16, 1]
568 .into_iter()
569 .map(Into::into)
570 .collect(),
571 expected_buckets: ExpoBuckets {
572 start_bin: -1,
573 counts: vec![1, 4, 1],
574 },
575 expected_scale: -1,
576 },
577 TestCase {
578 max_size: 2,
579 values: vec![1, 2, 4].into_iter().map(Into::into).collect(),
580 expected_buckets: ExpoBuckets {
581 start_bin: -1,
582 counts: vec![1, 2],
583 },
584 expected_scale: -1,
585 },
586 TestCase {
587 max_size: 2,
588 values: vec![1, 4, 2].into_iter().map(Into::into).collect(),
589 expected_buckets: ExpoBuckets {
590 start_bin: -1,
591 counts: vec![1, 2],
592 },
593 expected_scale: -1,
594 },
595 TestCase {
596 max_size: 2,
597 values: vec![2, 4, 1].into_iter().map(Into::into).collect(),
598 expected_buckets: ExpoBuckets {
599 start_bin: -1,
600 counts: vec![1, 2],
601 },
602 expected_scale: -1,
603 },
604 TestCase {
605 max_size: 2,
606 values: vec![2, 1, 4].into_iter().map(Into::into).collect(),
607 expected_buckets: ExpoBuckets {
608 start_bin: -1,
609 counts: vec![1, 2],
610 },
611 expected_scale: -1,
612 },
613 TestCase {
614 max_size: 2,
615 values: vec![4, 1, 2].into_iter().map(Into::into).collect(),
616 expected_buckets: ExpoBuckets {
617 start_bin: -1,
618 counts: vec![1, 2],
619 },
620 expected_scale: -1,
621 },
622 TestCase {
623 max_size: 2,
624 values: vec![4, 2, 1].into_iter().map(Into::into).collect(),
625 expected_buckets: ExpoBuckets {
626 start_bin: -1,
627 counts: vec![1, 2],
628 },
629 expected_scale: -1,
630 },
631 ];
632
633 for test in test_cases {
634 let mut dp = ExpoHistogramDataPoint::<T>::new(&BucketConfig {
635 max_size: test.max_size,
636 max_scale: 20,
637 });
638 for v in test.values {
639 dp.record(v);
640 dp.record(-v);
641 }
642
643 assert_eq!(test.expected_buckets, dp.pos_buckets, "positive buckets");
644 assert_eq!(test.expected_buckets, dp.neg_buckets, "negative buckets");
645 assert_eq!(test.expected_scale, dp.scale, "scale");
646 }
647 }
648
649 fn run_min_max_sum_f64() {
650 struct Expected {
651 min: f64,
652 max: f64,
653 sum: f64,
654 count: usize,
655 }
656 impl Expected {
657 fn new(min: f64, max: f64, sum: f64, count: usize) -> Self {
658 Expected {
659 min,
660 max,
661 sum,
662 count,
663 }
664 }
665 }
666 struct TestCase {
667 values: Vec<f64>,
668 expected: Expected,
669 }
670
671 let test_cases = vec![
672 TestCase {
673 values: vec![2.0, 4.0, 1.0],
674 expected: Expected::new(1.0, 4.0, 7.0, 3),
675 },
676 TestCase {
677 values: vec![2.0, 4.0, 1.0, f64::INFINITY],
678 expected: Expected::new(1.0, 4.0, 7.0, 3),
679 },
680 TestCase {
681 values: vec![2.0, 4.0, 1.0, -f64::INFINITY],
682 expected: Expected::new(1.0, 4.0, 7.0, 3),
683 },
684 TestCase {
685 values: vec![2.0, 4.0, 1.0, f64::NAN],
686 expected: Expected::new(1.0, 4.0, 7.0, 3),
687 },
688 TestCase {
689 values: vec![4.0, 4.0, 4.0, 2.0, 16.0, 1.0],
690 expected: Expected::new(1.0, 16.0, 31.0, 6),
691 },
692 ];
693
694 for test in test_cases {
695 let h = ExpoHistogram::new(
696 Temporality::Cumulative,
697 AttributeSetFilter::new(None),
698 4,
699 20,
700 true,
701 true,
702 );
703 for v in test.values {
704 Measure::call(&h, v, &[]);
705 }
706 let dp = h.value_map.no_attribute_tracker.lock().unwrap();
707
708 assert_eq!(test.expected.max, dp.max);
709 assert_eq!(test.expected.min, dp.min);
710 assert_eq!(test.expected.sum, dp.sum);
711 assert_eq!(test.expected.count, dp.count);
712 }
713 }
714
715 fn run_min_max_sum<T: Number + From<u32>>() {
716 struct Expected<T> {
717 min: T,
718 max: T,
719 sum: T,
720 count: usize,
721 }
722 impl<T: Number> Expected<T> {
723 fn new(min: T, max: T, sum: T, count: usize) -> Self {
724 Expected {
725 min,
726 max,
727 sum,
728 count,
729 }
730 }
731 }
732 struct TestCase<T> {
733 values: Vec<T>,
734 expected: Expected<T>,
735 }
736 let test_cases: Vec<TestCase<T>> = vec![
737 TestCase {
738 values: vec![2, 4, 1].into_iter().map(Into::into).collect(),
739 expected: Expected::new(1.into(), 4.into(), 7.into(), 3),
740 },
741 TestCase {
742 values: vec![4, 4, 4, 2, 16, 1]
743 .into_iter()
744 .map(Into::into)
745 .collect(),
746 expected: Expected::new(1.into(), 16.into(), 31.into(), 6),
747 },
748 ];
749
750 for test in test_cases {
751 let h = ExpoHistogram::new(
752 Temporality::Cumulative,
753 AttributeSetFilter::new(None),
754 4,
755 20,
756 true,
757 true,
758 );
759 for v in test.values {
760 Measure::call(&h, v, &[]);
761 }
762 let dp = h.value_map.no_attribute_tracker.lock().unwrap();
763
764 assert_eq!(test.expected.max, dp.max);
765 assert_eq!(test.expected.min, dp.min);
766 assert_eq!(test.expected.sum, dp.sum);
767 assert_eq!(test.expected.count, dp.count);
768 }
769 }
770
771 fn run_data_point_record_f64() {
772 struct TestCase {
773 max_size: i32,
774 values: Vec<f64>,
775 expected_buckets: ExpoBuckets,
776 expected_scale: i8,
777 }
778
779 let test_cases = vec![
780 TestCase {
781 max_size: 4,
782 values: vec![2.0, 2.0, 2.0, 1.0, 8.0, 0.5],
783 expected_buckets: ExpoBuckets {
784 start_bin: -1,
785 counts: vec![2, 3, 1],
786 },
787 expected_scale: -1,
788 },
789 TestCase {
790 max_size: 2,
791 values: vec![1.0, 0.5, 2.0],
792 expected_buckets: ExpoBuckets {
793 start_bin: -1,
794 counts: vec![2, 1],
795 },
796 expected_scale: -1,
797 },
798 TestCase {
799 max_size: 2,
800 values: vec![1.0, 2.0, 0.5],
801 expected_buckets: ExpoBuckets {
802 start_bin: -1,
803 counts: vec![2, 1],
804 },
805 expected_scale: -1,
806 },
807 TestCase {
808 max_size: 2,
809 values: vec![2.0, 0.5, 1.0],
810 expected_buckets: ExpoBuckets {
811 start_bin: -1,
812 counts: vec![2, 1],
813 },
814 expected_scale: -1,
815 },
816 TestCase {
817 max_size: 2,
818 values: vec![2.0, 1.0, 0.5],
819 expected_buckets: ExpoBuckets {
820 start_bin: -1,
821 counts: vec![2, 1],
822 },
823 expected_scale: -1,
824 },
825 TestCase {
826 max_size: 2,
827 values: vec![0.5, 1.0, 2.0],
828 expected_buckets: ExpoBuckets {
829 start_bin: -1,
830 counts: vec![2, 1],
831 },
832 expected_scale: -1,
833 },
834 TestCase {
835 max_size: 2,
836 values: vec![0.5, 2.0, 1.0],
837 expected_buckets: ExpoBuckets {
838 start_bin: -1,
839 counts: vec![2, 1],
840 },
841 expected_scale: -1,
842 },
843 ];
844 for test in test_cases {
845 let mut dp = ExpoHistogramDataPoint::new(&BucketConfig {
846 max_size: test.max_size,
847 max_scale: 20,
848 });
849 for v in test.values {
850 dp.record(v);
851 dp.record(-v);
852 }
853
854 assert_eq!(test.expected_buckets, dp.pos_buckets);
855 assert_eq!(test.expected_buckets, dp.neg_buckets);
856 assert_eq!(test.expected_scale, dp.scale);
857 }
858 }
859
860 #[test]
861 fn data_point_record_limits() {
862 let cfg = BucketConfig {
866 max_size: 4,
867 max_scale: 20,
868 };
869 let mut fdp = ExpoHistogramDataPoint::new(&cfg);
870 fdp.record(f64::MAX);
871
872 assert_eq!(
873 fdp.pos_buckets.start_bin, 1073741823,
874 "start bin does not match for large f64 values",
875 );
876
877 let mut fdp = ExpoHistogramDataPoint::new(&cfg);
878 fdp.record(f64::MIN_POSITIVE);
879
880 assert_eq!(
881 fdp.pos_buckets.start_bin, -1071644673,
882 "start bin does not match for small positive values",
883 );
884
885 let mut idp = ExpoHistogramDataPoint::new(&cfg);
886 idp.record(i64::MAX);
887
888 assert_eq!(
889 idp.pos_buckets.start_bin, 66060287,
890 "start bin does not match for max i64 values",
891 );
892 }
893
894 #[test]
895 fn expo_bucket_downscale() {
896 struct TestCase {
897 name: &'static str,
898 bucket: ExpoBuckets,
899 scale: i8,
900 want: ExpoBuckets,
901 }
902
903 let test_cases = vec![
904 TestCase {
905 name: "Empty bucket",
906 bucket: ExpoBuckets {
907 start_bin: 0,
908 counts: vec![],
909 },
910 scale: 3,
911 want: ExpoBuckets {
912 start_bin: 0,
913 counts: vec![],
914 },
915 },
916 TestCase {
917 name: "1 size bucket",
918 bucket: ExpoBuckets {
919 start_bin: 50,
920 counts: vec![7],
921 },
922 scale: 4,
923 want: ExpoBuckets {
924 start_bin: 3,
925 counts: vec![7],
926 },
927 },
928 TestCase {
929 name: "zero scale",
930 bucket: ExpoBuckets {
931 start_bin: 50,
932 counts: vec![7, 5],
933 },
934 scale: 0,
935 want: ExpoBuckets {
936 start_bin: 50,
937 counts: vec![7, 5],
938 },
939 },
940 TestCase {
941 name: "aligned bucket scale 1",
942 bucket: ExpoBuckets {
943 start_bin: 0,
944 counts: vec![1, 2, 3, 4, 5, 6],
945 },
946 scale: 1,
947 want: ExpoBuckets {
948 start_bin: 0,
949 counts: vec![3, 7, 11],
950 },
951 },
952 TestCase {
953 name: "aligned bucket scale 2",
954 bucket: ExpoBuckets {
955 start_bin: 0,
956 counts: vec![1, 2, 3, 4, 5, 6],
957 },
958 scale: 2,
959 want: ExpoBuckets {
960 start_bin: 0,
961 counts: vec![10, 11],
962 },
963 },
964 TestCase {
965 name: "aligned bucket scale 3",
966 bucket: ExpoBuckets {
967 start_bin: 0,
968 counts: vec![1, 2, 3, 4, 5, 6],
969 },
970 scale: 3,
971 want: ExpoBuckets {
972 start_bin: 0,
973 counts: vec![21],
974 },
975 },
976 TestCase {
977 name: "unaligned bucket scale 1",
978 bucket: ExpoBuckets {
979 start_bin: 5,
980 counts: vec![1, 2, 3, 4, 5, 6],
981 }, scale: 1,
983 want: ExpoBuckets {
984 start_bin: 2,
985 counts: vec![1, 5, 9, 6],
986 }, },
988 TestCase {
989 name: "unaligned bucket scale 2",
990 bucket: ExpoBuckets {
991 start_bin: 7,
992 counts: vec![1, 2, 3, 4, 5, 6],
993 }, scale: 2,
995 want: ExpoBuckets {
996 start_bin: 1,
997 counts: vec![1, 14, 6],
998 }, },
1000 TestCase {
1001 name: "unaligned bucket scale 3",
1002 bucket: ExpoBuckets {
1003 start_bin: 3,
1004 counts: vec![1, 2, 3, 4, 5, 6],
1005 }, scale: 3,
1007 want: ExpoBuckets {
1008 start_bin: 0,
1009 counts: vec![15, 6],
1010 }, },
1012 TestCase {
1013 name: "unaligned bucket scale 1",
1014 bucket: ExpoBuckets {
1015 start_bin: 1,
1016 counts: vec![1, 0, 1],
1017 },
1018 scale: 1,
1019 want: ExpoBuckets {
1020 start_bin: 0,
1021 counts: vec![1, 1],
1022 },
1023 },
1024 TestCase {
1025 name: "negative start_bin",
1026 bucket: ExpoBuckets {
1027 start_bin: -1,
1028 counts: vec![1, 0, 3],
1029 },
1030 scale: 1,
1031 want: ExpoBuckets {
1032 start_bin: -1,
1033 counts: vec![1, 3],
1034 },
1035 },
1036 TestCase {
1037 name: "negative start_bin 2",
1038 bucket: ExpoBuckets {
1039 start_bin: -4,
1040 counts: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
1041 },
1042 scale: 1,
1043 want: ExpoBuckets {
1044 start_bin: -2,
1045 counts: vec![3, 7, 11, 15, 19],
1046 },
1047 },
1048 ];
1049 for mut test in test_cases {
1050 test.bucket.downscale(test.scale as u32);
1051 assert_eq!(test.want, test.bucket, "{}", test.name);
1052 }
1053 }
1054
1055 #[test]
1056 fn expo_bucket_record() {
1057 struct TestCase {
1058 name: &'static str,
1059 bucket: ExpoBuckets,
1060 bin: i32,
1061 want: ExpoBuckets,
1062 }
1063
1064 let test_cases = vec![
1065 TestCase {
1066 name: "Empty bucket creates first count",
1067 bucket: ExpoBuckets {
1068 start_bin: 0,
1069 counts: vec![],
1070 },
1071 bin: -5,
1072 want: ExpoBuckets {
1073 start_bin: -5,
1074 counts: vec![1],
1075 },
1076 },
1077 TestCase {
1078 name: "Bin is in the bucket",
1079 bucket: ExpoBuckets {
1080 start_bin: 3,
1081 counts: vec![1, 2, 3, 4, 5, 6],
1082 },
1083 bin: 5,
1084 want: ExpoBuckets {
1085 start_bin: 3,
1086 counts: vec![1, 2, 4, 4, 5, 6],
1087 },
1088 },
1089 TestCase {
1090 name: "Bin is before the start of the bucket",
1091 bucket: ExpoBuckets {
1092 start_bin: 1,
1093 counts: vec![1, 2, 3, 4, 5, 6],
1094 },
1095 bin: -2,
1096 want: ExpoBuckets {
1097 start_bin: -2,
1098 counts: vec![1, 0, 0, 1, 2, 3, 4, 5, 6],
1099 },
1100 },
1101 TestCase {
1102 name: "Bin is after the end of the bucket",
1103 bucket: ExpoBuckets {
1104 start_bin: -2,
1105 counts: vec![1, 2, 3, 4, 5, 6],
1106 },
1107 bin: 4,
1108 want: ExpoBuckets {
1109 start_bin: -2,
1110 counts: vec![1, 2, 3, 4, 5, 6, 1],
1111 },
1112 },
1113 ];
1114
1115 for mut test in test_cases {
1116 test.bucket.record(test.bin);
1117 assert_eq!(test.want, test.bucket, "{}", test.name);
1118 }
1119 }
1120
1121 #[test]
1122 fn scale_change_rescaling() {
1123 struct Args {
1124 bin: i32,
1125 start_bin: i32,
1126 length: i32,
1127 max_size: i32,
1128 }
1129 struct TestCase {
1130 name: &'static str,
1131 args: Args,
1132 want: u32,
1133 }
1134 let test_cases = vec![
1135 TestCase {
1136 name: "if length is 0, no rescale is needed",
1137 args: Args {
1139 bin: 5,
1140 start_bin: 0,
1141 length: 0,
1142 max_size: 4,
1143 },
1144 want: 0,
1145 },
1146 TestCase {
1147 name: "if bin is between start, and the end, no rescale needed",
1148 args: Args {
1150 bin: 5,
1151 start_bin: -1,
1152 length: 10,
1153 max_size: 20,
1154 },
1155 want: 0,
1156 },
1157 TestCase {
1158 name: "if [bin,... end].len() > max_size, rescale needed",
1159 args: Args {
1161 bin: 5,
1162 start_bin: 8,
1163 length: 3,
1164 max_size: 5,
1165 },
1166 want: 1,
1167 },
1168 TestCase {
1169 name: "if [start, ..., bin].len() > max_size, rescale needed",
1170 args: Args {
1172 bin: 7,
1173 start_bin: 2,
1174 length: 3,
1175 max_size: 5,
1176 },
1177 want: 1,
1178 },
1179 TestCase {
1180 name: "if [start, ..., bin].len() > max_size, rescale needed",
1181 args: Args {
1183 bin: 13,
1184 start_bin: 2,
1185 length: 3,
1186 max_size: 5,
1187 },
1188 want: 2,
1189 },
1190 TestCase {
1191 name: "It should not hang if it will never be able to rescale",
1192 args: Args {
1193 bin: 1,
1194 start_bin: -1,
1195 length: 1,
1196 max_size: 1,
1197 },
1198 want: 31,
1199 },
1200 ];
1201
1202 for test in test_cases {
1203 let got = scale_change(
1204 test.args.max_size,
1205 test.args.bin,
1206 test.args.start_bin,
1207 test.args.length,
1208 );
1209 assert_eq!(got, test.want, "incorrect scale change, {}", test.name);
1210 }
1211 }
1212
1213 #[test]
1214 fn sub_normal() {
1215 let want = ExpoHistogramDataPoint {
1216 max_size: 4,
1217 count: 3,
1218 min: f64::MIN_POSITIVE,
1219 max: f64::MIN_POSITIVE,
1220 sum: 3.0 * f64::MIN_POSITIVE,
1221
1222 scale: 20,
1223 pos_buckets: ExpoBuckets {
1224 start_bin: -1071644673,
1225 counts: vec![3],
1226 },
1227 neg_buckets: ExpoBuckets {
1228 start_bin: 0,
1229 counts: vec![],
1230 },
1231 zero_count: 0,
1232 };
1233
1234 let mut ehdp = ExpoHistogramDataPoint::new(&BucketConfig {
1235 max_size: 4,
1236 max_scale: 20,
1237 });
1238 ehdp.record(f64::MIN_POSITIVE);
1239 ehdp.record(f64::MIN_POSITIVE);
1240 ehdp.record(f64::MIN_POSITIVE);
1241
1242 assert_eq!(want, ehdp);
1243 }
1244
1245 #[test]
1246 fn hist_aggregations() {
1247 hist_aggregation::<i64>();
1248 hist_aggregation::<u64>();
1249 hist_aggregation::<f64>();
1250 }
1251
1252 fn hist_aggregation<T: Number + From<u32>>() {
1253 let max_size = 4;
1254 let max_scale = 20;
1255 let record_min_max = true;
1256 let record_sum = true;
1257
1258 #[allow(clippy::type_complexity)]
1259 struct TestCase<T> {
1260 name: &'static str,
1261 build: Box<dyn Fn() -> AggregateFns<T>>,
1262 input: Vec<Vec<T>>,
1263 want: data::ExponentialHistogram<T>,
1264 want_count: usize,
1265 }
1266 let test_cases: Vec<TestCase<T>> = vec![
1267 TestCase {
1268 name: "Delta Single",
1269 build: Box::new(move || {
1270 AggregateBuilder::new(Temporality::Delta, None).exponential_bucket_histogram(
1271 max_size,
1272 max_scale,
1273 record_min_max,
1274 record_sum,
1275 )
1276 }),
1277 input: vec![vec![4, 4, 4, 2, 16, 1]
1278 .into_iter()
1279 .map(Into::into)
1280 .collect()],
1281 want: data::ExponentialHistogram {
1282 temporality: Temporality::Delta,
1283 data_points: vec![data::ExponentialHistogramDataPoint {
1284 attributes: vec![],
1285 count: 6,
1286 min: Some(1.into()),
1287 max: Some(16.into()),
1288 sum: 31.into(),
1289 scale: -1,
1290 positive_bucket: data::ExponentialBucket {
1291 offset: -1,
1292 counts: vec![1, 4, 1],
1293 },
1294 negative_bucket: data::ExponentialBucket {
1295 offset: 0,
1296 counts: vec![],
1297 },
1298 exemplars: vec![],
1299 zero_threshold: 0.0,
1300 zero_count: 0,
1301 }],
1302 start_time: now(),
1303 time: now(),
1304 },
1305 want_count: 1,
1306 },
1307 TestCase {
1308 name: "Cumulative Single",
1309 build: Box::new(move || {
1310 internal::AggregateBuilder::new(Temporality::Cumulative, None)
1311 .exponential_bucket_histogram(
1312 max_size,
1313 max_scale,
1314 record_min_max,
1315 record_sum,
1316 )
1317 }),
1318 input: vec![vec![4, 4, 4, 2, 16, 1]
1319 .into_iter()
1320 .map(Into::into)
1321 .collect()],
1322 want: data::ExponentialHistogram {
1323 temporality: Temporality::Cumulative,
1324 data_points: vec![data::ExponentialHistogramDataPoint {
1325 attributes: vec![],
1326 count: 6,
1327 min: Some(1.into()),
1328 max: Some(16.into()),
1329 sum: 31.into(),
1330 scale: -1,
1331 positive_bucket: data::ExponentialBucket {
1332 offset: -1,
1333 counts: vec![1, 4, 1],
1334 },
1335 negative_bucket: data::ExponentialBucket {
1336 offset: 0,
1337 counts: vec![],
1338 },
1339 exemplars: vec![],
1340 zero_threshold: 0.0,
1341 zero_count: 0,
1342 }],
1343 start_time: now(),
1344 time: now(),
1345 },
1346 want_count: 1,
1347 },
1348 TestCase {
1349 name: "Delta Multiple",
1350 build: Box::new(move || {
1351 internal::AggregateBuilder::new(Temporality::Delta, None)
1352 .exponential_bucket_histogram(
1353 max_size,
1354 max_scale,
1355 record_min_max,
1356 record_sum,
1357 )
1358 }),
1359 input: vec![
1360 vec![2, 3, 8].into_iter().map(Into::into).collect(),
1361 vec![4, 4, 4, 2, 16, 1]
1362 .into_iter()
1363 .map(Into::into)
1364 .collect(),
1365 ],
1366 want: data::ExponentialHistogram {
1367 temporality: Temporality::Delta,
1368 data_points: vec![data::ExponentialHistogramDataPoint {
1369 attributes: vec![],
1370 count: 6,
1371 min: Some(1.into()),
1372 max: Some(16.into()),
1373 sum: 31.into(),
1374 scale: -1,
1375 positive_bucket: data::ExponentialBucket {
1376 offset: -1,
1377 counts: vec![1, 4, 1],
1378 },
1379 negative_bucket: data::ExponentialBucket {
1380 offset: 0,
1381 counts: vec![],
1382 },
1383 exemplars: vec![],
1384 zero_threshold: 0.0,
1385 zero_count: 0,
1386 }],
1387 start_time: now(),
1388 time: now(),
1389 },
1390 want_count: 1,
1391 },
1392 TestCase {
1393 name: "Cumulative Multiple ",
1394 build: Box::new(move || {
1395 internal::AggregateBuilder::new(Temporality::Cumulative, None)
1396 .exponential_bucket_histogram(
1397 max_size,
1398 max_scale,
1399 record_min_max,
1400 record_sum,
1401 )
1402 }),
1403 input: vec![
1404 vec![2, 3, 8].into_iter().map(Into::into).collect(),
1405 vec![4, 4, 4, 2, 16, 1]
1406 .into_iter()
1407 .map(Into::into)
1408 .collect(),
1409 ],
1410 want: data::ExponentialHistogram {
1411 temporality: Temporality::Cumulative,
1412 data_points: vec![data::ExponentialHistogramDataPoint {
1413 count: 9,
1414 min: Some(1.into()),
1415 max: Some(16.into()),
1416 sum: 44.into(),
1417 scale: -1,
1418 positive_bucket: data::ExponentialBucket {
1419 offset: -1,
1420 counts: vec![1, 6, 2],
1421 },
1422 attributes: vec![],
1423 negative_bucket: data::ExponentialBucket {
1424 offset: 0,
1425 counts: vec![],
1426 },
1427 exemplars: vec![],
1428 zero_threshold: 0.0,
1429 zero_count: 0,
1430 }],
1431 start_time: now(),
1432 time: now(),
1433 },
1434 want_count: 1,
1435 },
1436 ];
1437
1438 for test in test_cases {
1439 let AggregateFns { measure, collect } = (test.build)();
1440
1441 let mut got: Box<dyn data::Aggregation> = Box::new(data::ExponentialHistogram::<T> {
1442 data_points: vec![],
1443 start_time: now(),
1444 time: now(),
1445 temporality: Temporality::Delta,
1446 });
1447 let mut count = 0;
1448 for n in test.input {
1449 for v in n {
1450 measure.call(v, &[])
1451 }
1452 count = collect.call(Some(got.as_mut())).0
1453 }
1454
1455 assert_aggregation_eq::<T>(Box::new(test.want), got, test.name);
1456 assert_eq!(test.want_count, count, "{}", test.name);
1457 }
1458 }
1459
1460 fn assert_aggregation_eq<T: Number + PartialEq>(
1461 a: Box<dyn Aggregation>,
1462 b: Box<dyn Aggregation>,
1463 test_name: &'static str,
1464 ) {
1465 assert_eq!(
1466 a.as_any().type_id(),
1467 b.as_any().type_id(),
1468 "{} Aggregation types not equal",
1469 test_name
1470 );
1471
1472 if let Some(a) = a.as_any().downcast_ref::<Gauge<T>>() {
1473 let b = b.as_any().downcast_ref::<Gauge<T>>().unwrap();
1474 assert_eq!(
1475 a.data_points.len(),
1476 b.data_points.len(),
1477 "{} gauge counts",
1478 test_name
1479 );
1480 for (a, b) in a.data_points.iter().zip(b.data_points.iter()) {
1481 assert_gauge_data_points_eq(a, b, "mismatching gauge data points", test_name);
1482 }
1483 } else if let Some(a) = a.as_any().downcast_ref::<Sum<T>>() {
1484 let b = b.as_any().downcast_ref::<Sum<T>>().unwrap();
1485 assert_eq!(
1486 a.temporality, b.temporality,
1487 "{} mismatching sum temporality",
1488 test_name
1489 );
1490 assert_eq!(
1491 a.is_monotonic, b.is_monotonic,
1492 "{} mismatching sum monotonicity",
1493 test_name,
1494 );
1495 assert_eq!(
1496 a.data_points.len(),
1497 b.data_points.len(),
1498 "{} sum counts",
1499 test_name
1500 );
1501 for (a, b) in a.data_points.iter().zip(b.data_points.iter()) {
1502 assert_sum_data_points_eq(a, b, "mismatching sum data points", test_name);
1503 }
1504 } else if let Some(a) = a.as_any().downcast_ref::<Histogram<T>>() {
1505 let b = b.as_any().downcast_ref::<Histogram<T>>().unwrap();
1506 assert_eq!(
1507 a.temporality, b.temporality,
1508 "{}: mismatching hist temporality",
1509 test_name
1510 );
1511 assert_eq!(
1512 a.data_points.len(),
1513 b.data_points.len(),
1514 "{} hist counts",
1515 test_name
1516 );
1517 for (a, b) in a.data_points.iter().zip(b.data_points.iter()) {
1518 assert_hist_data_points_eq(a, b, "mismatching hist data points", test_name);
1519 }
1520 } else if let Some(a) = a.as_any().downcast_ref::<ExponentialHistogram<T>>() {
1521 let b = b
1522 .as_any()
1523 .downcast_ref::<ExponentialHistogram<T>>()
1524 .unwrap();
1525 assert_eq!(
1526 a.temporality, b.temporality,
1527 "{} mismatching hist temporality",
1528 test_name
1529 );
1530 assert_eq!(
1531 a.data_points.len(),
1532 b.data_points.len(),
1533 "{} hist counts",
1534 test_name
1535 );
1536 for (a, b) in a.data_points.iter().zip(b.data_points.iter()) {
1537 assert_exponential_hist_data_points_eq(
1538 a,
1539 b,
1540 "mismatching hist data points",
1541 test_name,
1542 );
1543 }
1544 } else {
1545 panic!("Aggregation of unknown types")
1546 }
1547 }
1548
1549 fn assert_sum_data_points_eq<T: Number>(
1550 a: &data::SumDataPoint<T>,
1551 b: &data::SumDataPoint<T>,
1552 message: &'static str,
1553 test_name: &'static str,
1554 ) {
1555 assert_eq!(
1556 a.attributes, b.attributes,
1557 "{}: {} attributes",
1558 test_name, message
1559 );
1560 assert_eq!(a.value, b.value, "{}: {} value", test_name, message);
1561 }
1562
1563 fn assert_gauge_data_points_eq<T: Number>(
1564 a: &data::GaugeDataPoint<T>,
1565 b: &data::GaugeDataPoint<T>,
1566 message: &'static str,
1567 test_name: &'static str,
1568 ) {
1569 assert_eq!(
1570 a.attributes, b.attributes,
1571 "{}: {} attributes",
1572 test_name, message
1573 );
1574 assert_eq!(a.value, b.value, "{}: {} value", test_name, message);
1575 }
1576
1577 fn assert_hist_data_points_eq<T: Number>(
1578 a: &data::HistogramDataPoint<T>,
1579 b: &data::HistogramDataPoint<T>,
1580 message: &'static str,
1581 test_name: &'static str,
1582 ) {
1583 assert_eq!(
1584 a.attributes, b.attributes,
1585 "{}: {} attributes",
1586 test_name, message
1587 );
1588 assert_eq!(a.count, b.count, "{}: {} count", test_name, message);
1589 assert_eq!(a.bounds, b.bounds, "{}: {} bounds", test_name, message);
1590 assert_eq!(
1591 a.bucket_counts, b.bucket_counts,
1592 "{}: {} bucket counts",
1593 test_name, message
1594 );
1595 assert_eq!(a.min, b.min, "{}: {} min", test_name, message);
1596 assert_eq!(a.max, b.max, "{}: {} max", test_name, message);
1597 assert_eq!(a.sum, b.sum, "{}: {} sum", test_name, message);
1598 }
1599
1600 fn assert_exponential_hist_data_points_eq<T: Number>(
1601 a: &data::ExponentialHistogramDataPoint<T>,
1602 b: &data::ExponentialHistogramDataPoint<T>,
1603 message: &'static str,
1604 test_name: &'static str,
1605 ) {
1606 assert_eq!(
1607 a.attributes, b.attributes,
1608 "{}: {} attributes",
1609 test_name, message
1610 );
1611 assert_eq!(a.count, b.count, "{}: {} count", test_name, message);
1612 assert_eq!(a.min, b.min, "{}: {} min", test_name, message);
1613 assert_eq!(a.max, b.max, "{}: {} max", test_name, message);
1614 assert_eq!(a.sum, b.sum, "{}: {} sum", test_name, message);
1615
1616 assert_eq!(a.scale, b.scale, "{}: {} scale", test_name, message);
1617 assert_eq!(
1618 a.zero_count, b.zero_count,
1619 "{}: {} zeros",
1620 test_name, message
1621 );
1622
1623 assert_eq!(
1624 a.positive_bucket, b.positive_bucket,
1625 "{}: {} pos",
1626 test_name, message
1627 );
1628 assert_eq!(
1629 a.negative_bucket, b.negative_bucket,
1630 "{}: {} neg",
1631 test_name, message
1632 );
1633 }
1634}