1pub(crate) mod aggregation;
43pub mod data;
44mod error;
45pub mod exporter;
46pub(crate) mod instrument;
47pub(crate) mod internal;
48pub(crate) mod manual_reader;
49pub(crate) mod meter;
50mod meter_provider;
51pub(crate) mod noop;
52pub(crate) mod periodic_reader;
53#[cfg(feature = "experimental_metrics_periodicreader_with_async_runtime")]
54pub mod periodic_reader_with_async_runtime;
56pub(crate) mod pipeline;
57pub mod reader;
58pub(crate) mod view;
59
60#[cfg(any(feature = "testing", test))]
62#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
63pub mod in_memory_exporter;
64#[cfg(any(feature = "testing", test))]
65#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
66pub use in_memory_exporter::{InMemoryMetricExporter, InMemoryMetricExporterBuilder};
67
68pub use aggregation::*;
69pub use error::{MetricError, MetricResult};
70pub use manual_reader::*;
71pub use meter_provider::*;
72pub use periodic_reader::*;
73pub use pipeline::Pipeline;
74
75pub use instrument::InstrumentKind;
76
77#[cfg(feature = "spec_unstable_metrics_views")]
78pub use instrument::*;
79#[cfg(feature = "spec_unstable_metrics_views")]
83pub use view::*;
84use std::hash::Hash;
88
89#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, Hash)]
91#[non_exhaustive]
92pub enum Temporality {
93 #[default]
98 Cumulative,
99
100 Delta,
105
106 LowMemory,
110}
111
112#[cfg(all(test, feature = "testing"))]
113mod tests {
114 use self::data::{HistogramDataPoint, ScopeMetrics, SumDataPoint};
115 use super::*;
116 use crate::metrics::data::Aggregation;
117 use crate::metrics::data::ResourceMetrics;
118 use crate::metrics::InMemoryMetricExporter;
119 use crate::metrics::InMemoryMetricExporterBuilder;
120 use data::Gauge;
121 use data::GaugeDataPoint;
122 use data::Histogram;
123 use data::Sum;
124 use opentelemetry::metrics::{Counter, Meter, UpDownCounter};
125 use opentelemetry::InstrumentationScope;
126 use opentelemetry::{metrics::MeterProvider as _, KeyValue};
127 use rand::{rngs, Rng, SeedableRng};
128 use std::cmp::{max, min};
129 use std::sync::atomic::{AtomicBool, Ordering};
130 use std::sync::{Arc, Mutex};
131 use std::thread;
132 use std::time::Duration;
133
134 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
141 #[cfg(not(feature = "experimental_metrics_disable_name_validation"))]
142 async fn invalid_instrument_config_noops() {
143 let invalid_instrument_names = vec![
146 "_startWithNoneAlphabet",
147 "utf8char锈",
148 "a".repeat(256).leak(),
149 "invalid name",
150 ];
151 for name in invalid_instrument_names {
152 let test_context = TestContext::new(Temporality::Cumulative);
153 let counter = test_context.meter().u64_counter(name).build();
154 counter.add(1, &[]);
155
156 let up_down_counter = test_context.meter().i64_up_down_counter(name).build();
157 up_down_counter.add(1, &[]);
158
159 let gauge = test_context.meter().f64_gauge(name).build();
160 gauge.record(1.9, &[]);
161
162 let histogram = test_context.meter().f64_histogram(name).build();
163 histogram.record(1.0, &[]);
164
165 let _observable_counter = test_context
166 .meter()
167 .u64_observable_counter(name)
168 .with_callback(move |observer| {
169 observer.observe(1, &[]);
170 })
171 .build();
172
173 let _observable_gauge = test_context
174 .meter()
175 .f64_observable_gauge(name)
176 .with_callback(move |observer| {
177 observer.observe(1.0, &[]);
178 })
179 .build();
180
181 let _observable_up_down_counter = test_context
182 .meter()
183 .i64_observable_up_down_counter(name)
184 .with_callback(move |observer| {
185 observer.observe(1, &[]);
186 })
187 .build();
188
189 test_context.flush_metrics();
190
191 test_context.check_no_metrics();
193 }
194
195 let invalid_bucket_boundaries = vec![
196 vec![1.0, 1.0], vec![1.0, 2.0, 3.0, 2.0], vec![1.0, 2.0, 3.0, 4.0, 2.5], vec![1.0, 2.0, 3.0, f64::INFINITY, 4.0], vec![1.0, 2.0, 3.0, f64::NAN], vec![f64::NEG_INFINITY, 2.0, 3.0], ];
203 for bucket_boundaries in invalid_bucket_boundaries {
204 let test_context = TestContext::new(Temporality::Cumulative);
205 let histogram = test_context
206 .meter()
207 .f64_histogram("test")
208 .with_boundaries(bucket_boundaries)
209 .build();
210 histogram.record(1.9, &[]);
211 test_context.flush_metrics();
212
213 test_context.check_no_metrics();
216 }
217 }
218
219 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
220 #[cfg(feature = "experimental_metrics_disable_name_validation")]
221 async fn valid_instrument_config_with_feature_experimental_metrics_disable_name_validation() {
222 let invalid_instrument_names = vec![
225 "_startWithNoneAlphabet",
226 "utf8char锈",
227 "",
228 "a".repeat(256).leak(),
229 "\\allow\\slash /sec",
230 "\\allow\\$$slash /sec",
231 "Total $ Count",
232 "\\test\\UsagePercent(Total) > 80%",
233 "invalid name",
234 ];
235 for name in invalid_instrument_names {
236 let test_context = TestContext::new(Temporality::Cumulative);
237 let counter = test_context.meter().u64_counter(name).build();
238 counter.add(1, &[]);
239
240 let up_down_counter = test_context.meter().i64_up_down_counter(name).build();
241 up_down_counter.add(1, &[]);
242
243 let gauge = test_context.meter().f64_gauge(name).build();
244 gauge.record(1.9, &[]);
245
246 let histogram = test_context.meter().f64_histogram(name).build();
247 histogram.record(1.0, &[]);
248
249 let _observable_counter = test_context
250 .meter()
251 .u64_observable_counter(name)
252 .with_callback(move |observer| {
253 observer.observe(1, &[]);
254 })
255 .build();
256
257 let _observable_gauge = test_context
258 .meter()
259 .f64_observable_gauge(name)
260 .with_callback(move |observer| {
261 observer.observe(1.0, &[]);
262 })
263 .build();
264
265 let _observable_up_down_counter = test_context
266 .meter()
267 .i64_observable_up_down_counter(name)
268 .with_callback(move |observer| {
269 observer.observe(1, &[]);
270 })
271 .build();
272
273 test_context.flush_metrics();
274
275 let resource_metrics = test_context
277 .exporter
278 .get_finished_metrics()
279 .expect("metrics expected to be exported");
280
281 assert!(!resource_metrics.is_empty(), "metrics should be exported");
282 }
283
284 let invalid_bucket_boundaries = vec![
287 vec![1.0, 1.0], vec![1.0, 2.0, 3.0, 2.0], vec![1.0, 2.0, 3.0, 4.0, 2.5], vec![1.0, 2.0, 3.0, f64::INFINITY, 4.0], vec![1.0, 2.0, 3.0, f64::NAN], vec![f64::NEG_INFINITY, 2.0, 3.0], ];
294 for bucket_boundaries in invalid_bucket_boundaries {
295 let test_context = TestContext::new(Temporality::Cumulative);
296 let histogram = test_context
297 .meter()
298 .f64_histogram("test")
299 .with_boundaries(bucket_boundaries)
300 .build();
301 histogram.record(1.9, &[]);
302 test_context.flush_metrics();
303
304 test_context.check_no_metrics();
307 }
308 }
309
310 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
311 async fn counter_aggregation_delta() {
312 counter_aggregation_helper(Temporality::Delta);
315 }
316
317 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
318 async fn counter_aggregation_cumulative() {
319 counter_aggregation_helper(Temporality::Cumulative);
322 }
323
324 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
325 async fn counter_aggregation_no_attributes_cumulative() {
326 let mut test_context = TestContext::new(Temporality::Cumulative);
327 let counter = test_context.u64_counter("test", "my_counter", None);
328
329 counter.add(50, &[]);
330 test_context.flush_metrics();
331
332 let sum = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
333
334 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
335 assert!(sum.is_monotonic, "Should produce monotonic.");
336 assert_eq!(
337 sum.temporality,
338 Temporality::Cumulative,
339 "Should produce cumulative"
340 );
341
342 let data_point = &sum.data_points[0];
343 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
344 assert_eq!(data_point.value, 50, "Unexpected data point value");
345 }
346
347 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
348 async fn counter_aggregation_no_attributes_delta() {
349 let mut test_context = TestContext::new(Temporality::Delta);
350 let counter = test_context.u64_counter("test", "my_counter", None);
351
352 counter.add(50, &[]);
353 test_context.flush_metrics();
354
355 let sum = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
356
357 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
358 assert!(sum.is_monotonic, "Should produce monotonic.");
359 assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
360
361 let data_point = &sum.data_points[0];
362 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
363 assert_eq!(data_point.value, 50, "Unexpected data point value");
364 }
365
366 #[ignore = "https://github.com/open-telemetry/opentelemetry-rust/issues/1065"]
367 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
368 async fn counter_aggregation_overflow_delta() {
369 counter_aggregation_overflow_helper(Temporality::Delta);
370 }
371
372 #[ignore = "https://github.com/open-telemetry/opentelemetry-rust/issues/1065"]
373 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
374 async fn counter_aggregation_overflow_cumulative() {
375 counter_aggregation_overflow_helper(Temporality::Cumulative);
376 }
377
378 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
379 async fn counter_aggregation_attribute_order_sorted_first_delta() {
380 counter_aggregation_attribute_order_helper(Temporality::Delta, true);
383 }
384
385 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
386 async fn counter_aggregation_attribute_order_sorted_first_cumulative() {
387 counter_aggregation_attribute_order_helper(Temporality::Cumulative, true);
390 }
391
392 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
393 async fn counter_aggregation_attribute_order_unsorted_first_delta() {
394 counter_aggregation_attribute_order_helper(Temporality::Delta, false);
398 }
399
400 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
401 async fn counter_aggregation_attribute_order_unsorted_first_cumulative() {
402 counter_aggregation_attribute_order_helper(Temporality::Cumulative, false);
406 }
407
408 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
409 async fn histogram_aggregation_cumulative() {
410 histogram_aggregation_helper(Temporality::Cumulative);
413 }
414
415 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
416 async fn histogram_aggregation_delta() {
417 histogram_aggregation_helper(Temporality::Delta);
420 }
421
422 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
423 async fn histogram_aggregation_with_custom_bounds() {
424 histogram_aggregation_with_custom_bounds_helper(Temporality::Delta);
427 histogram_aggregation_with_custom_bounds_helper(Temporality::Cumulative);
428 }
429
430 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
431 async fn updown_counter_aggregation_cumulative() {
432 updown_counter_aggregation_helper(Temporality::Cumulative);
435 }
436
437 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
438 async fn updown_counter_aggregation_delta() {
439 updown_counter_aggregation_helper(Temporality::Delta);
442 }
443
444 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
445 async fn gauge_aggregation() {
446 gauge_aggregation_helper(Temporality::Delta);
451 gauge_aggregation_helper(Temporality::Cumulative);
452 }
453
454 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
455 async fn observable_gauge_aggregation() {
456 observable_gauge_aggregation_helper(Temporality::Delta, false);
461 observable_gauge_aggregation_helper(Temporality::Delta, true);
462 observable_gauge_aggregation_helper(Temporality::Cumulative, false);
463 observable_gauge_aggregation_helper(Temporality::Cumulative, true);
464 }
465
466 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
467 async fn observable_counter_aggregation_cumulative_non_zero_increment() {
468 observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4, false);
471 }
472
473 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
474 async fn observable_counter_aggregation_cumulative_non_zero_increment_no_attrs() {
475 observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4, true);
478 }
479
480 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
481 async fn observable_counter_aggregation_delta_non_zero_increment() {
482 observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4, false);
485 }
486
487 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
488 async fn observable_counter_aggregation_delta_non_zero_increment_no_attrs() {
489 observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4, true);
492 }
493
494 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
495 async fn observable_counter_aggregation_cumulative_zero_increment() {
496 observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4, false);
499 }
500
501 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
502 async fn observable_counter_aggregation_cumulative_zero_increment_no_attrs() {
503 observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4, true);
506 }
507
508 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
509 async fn observable_counter_aggregation_delta_zero_increment() {
510 observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4, false);
513 }
514
515 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
516 async fn observable_counter_aggregation_delta_zero_increment_no_attrs() {
517 observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4, true);
520 }
521
522 fn observable_counter_aggregation_helper(
523 temporality: Temporality,
524 start: u64,
525 increment: u64,
526 length: u64,
527 is_empty_attributes: bool,
528 ) {
529 let mut test_context = TestContext::new(temporality);
531 let attributes = if is_empty_attributes {
532 vec![]
533 } else {
534 vec![KeyValue::new("key1", "value1")]
535 };
536 let values: Vec<u64> = (0..length).map(|i| start + i * increment).collect();
538 println!("Testing with observable values: {:?}", values);
539 let values = Arc::new(values);
540 let values_clone = values.clone();
541 let i = Arc::new(Mutex::new(0));
542 let _observable_counter = test_context
543 .meter()
544 .u64_observable_counter("my_observable_counter")
545 .with_unit("my_unit")
546 .with_callback(move |observer| {
547 let mut index = i.lock().unwrap();
548 if *index < values.len() {
549 observer.observe(values[*index], &attributes);
550 *index += 1;
551 }
552 })
553 .build();
554
555 for (iter, v) in values_clone.iter().enumerate() {
556 test_context.flush_metrics();
557 let sum = test_context.get_aggregation::<Sum<u64>>("my_observable_counter", None);
558 assert_eq!(sum.data_points.len(), 1);
559 assert!(sum.is_monotonic, "Counter should produce monotonic.");
560 if let Temporality::Cumulative = temporality {
561 assert_eq!(
562 sum.temporality,
563 Temporality::Cumulative,
564 "Should produce cumulative"
565 );
566 } else {
567 assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
568 }
569
570 let data_point = if is_empty_attributes {
572 &sum.data_points[0]
573 } else {
574 find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
575 .expect("datapoint with key1=value1 expected")
576 };
577
578 if let Temporality::Cumulative = temporality {
579 assert_eq!(data_point.value, *v);
581 } else {
582 if iter == 0 {
585 assert_eq!(data_point.value, start);
586 } else {
587 assert_eq!(data_point.value, increment);
588 }
589 }
590
591 test_context.reset_metrics();
592 }
593 }
594
595 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
596 async fn empty_meter_name_retained() {
597 async fn meter_name_retained_helper(
598 meter: Meter,
599 provider: SdkMeterProvider,
600 exporter: InMemoryMetricExporter,
601 ) {
602 let counter = meter.u64_counter("my_counter").build();
604
605 counter.add(10, &[]);
606 provider.force_flush().unwrap();
607
608 let resource_metrics = exporter
610 .get_finished_metrics()
611 .expect("metrics are expected to be exported.");
612 assert!(
613 resource_metrics[0].scope_metrics[0].metrics.len() == 1,
614 "There should be a single metric"
615 );
616 let meter_name = resource_metrics[0].scope_metrics[0].scope.name();
617 assert_eq!(meter_name, "");
618 }
619
620 let exporter = InMemoryMetricExporter::default();
621 let meter_provider = SdkMeterProvider::builder()
622 .with_periodic_exporter(exporter.clone())
623 .build();
624
625 let meter1 = meter_provider.meter("");
627 meter_name_retained_helper(meter1, meter_provider.clone(), exporter.clone()).await;
628
629 let meter_scope = InstrumentationScope::builder("").build();
630 let meter2 = meter_provider.meter_with_scope(meter_scope);
631 meter_name_retained_helper(meter2, meter_provider, exporter).await;
632 }
633
634 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
635 async fn counter_duplicate_instrument_merge() {
636 let exporter = InMemoryMetricExporter::default();
638 let meter_provider = SdkMeterProvider::builder()
639 .with_periodic_exporter(exporter.clone())
640 .build();
641
642 let meter = meter_provider.meter("test");
644 let counter = meter
645 .u64_counter("my_counter")
646 .with_unit("my_unit")
647 .with_description("my_description")
648 .build();
649
650 let counter_duplicated = meter
651 .u64_counter("my_counter")
652 .with_unit("my_unit")
653 .with_description("my_description")
654 .build();
655
656 let attribute = vec![KeyValue::new("key1", "value1")];
657 counter.add(10, &attribute);
658 counter_duplicated.add(5, &attribute);
659
660 meter_provider.force_flush().unwrap();
661
662 let resource_metrics = exporter
664 .get_finished_metrics()
665 .expect("metrics are expected to be exported.");
666 assert!(
667 resource_metrics[0].scope_metrics[0].metrics.len() == 1,
668 "There should be single metric merging duplicate instruments"
669 );
670 let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
671 assert_eq!(metric.name, "my_counter");
672 assert_eq!(metric.unit, "my_unit");
673 let sum = metric
674 .data
675 .as_any()
676 .downcast_ref::<Sum<u64>>()
677 .expect("Sum aggregation expected for Counter instruments by default");
678
679 assert_eq!(sum.data_points.len(), 1);
681
682 let datapoint = &sum.data_points[0];
683 assert_eq!(datapoint.value, 15);
684 }
685
686 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
687 async fn counter_duplicate_instrument_different_meter_no_merge() {
688 let exporter = InMemoryMetricExporter::default();
690 let meter_provider = SdkMeterProvider::builder()
691 .with_periodic_exporter(exporter.clone())
692 .build();
693
694 let meter1 = meter_provider.meter("test.meter1");
696 let meter2 = meter_provider.meter("test.meter2");
697 let counter1 = meter1
698 .u64_counter("my_counter")
699 .with_unit("my_unit")
700 .with_description("my_description")
701 .build();
702
703 let counter2 = meter2
704 .u64_counter("my_counter")
705 .with_unit("my_unit")
706 .with_description("my_description")
707 .build();
708
709 let attribute = vec![KeyValue::new("key1", "value1")];
710 counter1.add(10, &attribute);
711 counter2.add(5, &attribute);
712
713 meter_provider.force_flush().unwrap();
714
715 let resource_metrics = exporter
717 .get_finished_metrics()
718 .expect("metrics are expected to be exported.");
719 assert!(
720 resource_metrics[0].scope_metrics.len() == 2,
721 "There should be 2 separate scope"
722 );
723 assert!(
724 resource_metrics[0].scope_metrics[0].metrics.len() == 1,
725 "There should be single metric for the scope"
726 );
727 assert!(
728 resource_metrics[0].scope_metrics[1].metrics.len() == 1,
729 "There should be single metric for the scope"
730 );
731
732 let scope1 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter1");
733 let scope2 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter2");
734
735 if let Some(scope1) = scope1 {
736 let metric1 = &scope1.metrics[0];
737 assert_eq!(metric1.name, "my_counter");
738 assert_eq!(metric1.unit, "my_unit");
739 assert_eq!(metric1.description, "my_description");
740 let sum1 = metric1
741 .data
742 .as_any()
743 .downcast_ref::<Sum<u64>>()
744 .expect("Sum aggregation expected for Counter instruments by default");
745
746 assert_eq!(sum1.data_points.len(), 1);
748
749 let datapoint1 = &sum1.data_points[0];
750 assert_eq!(datapoint1.value, 10);
751 } else {
752 panic!("No MetricScope found for 'test.meter1'");
753 }
754
755 if let Some(scope2) = scope2 {
756 let metric2 = &scope2.metrics[0];
757 assert_eq!(metric2.name, "my_counter");
758 assert_eq!(metric2.unit, "my_unit");
759 assert_eq!(metric2.description, "my_description");
760 let sum2 = metric2
761 .data
762 .as_any()
763 .downcast_ref::<Sum<u64>>()
764 .expect("Sum aggregation expected for Counter instruments by default");
765
766 assert_eq!(sum2.data_points.len(), 1);
768
769 let datapoint2 = &sum2.data_points[0];
770 assert_eq!(datapoint2.value, 5);
771 } else {
772 panic!("No MetricScope found for 'test.meter2'");
773 }
774 }
775
776 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
777 async fn instrumentation_scope_identity_test() {
778 let exporter = InMemoryMetricExporter::default();
780 let meter_provider = SdkMeterProvider::builder()
781 .with_periodic_exporter(exporter.clone())
782 .build();
783
784 let make_scope = |attributes| {
788 InstrumentationScope::builder("test.meter")
789 .with_version("v0.1.0")
790 .with_schema_url("http://example.com")
791 .with_attributes(attributes)
792 .build()
793 };
794
795 let meter1 =
796 meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("key", "value1")]));
797 let meter2 =
798 meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("key", "value2")]));
799
800 let counter1 = meter1
801 .u64_counter("my_counter")
802 .with_unit("my_unit")
803 .with_description("my_description")
804 .build();
805
806 let counter2 = meter2
807 .u64_counter("my_counter")
808 .with_unit("my_unit")
809 .with_description("my_description")
810 .build();
811
812 let attribute = vec![KeyValue::new("key1", "value1")];
813 counter1.add(10, &attribute);
814 counter2.add(5, &attribute);
815
816 meter_provider.force_flush().unwrap();
817
818 let resource_metrics = exporter
820 .get_finished_metrics()
821 .expect("metrics are expected to be exported.");
822 println!("resource_metrics: {:?}", resource_metrics);
823 assert!(
824 resource_metrics[0].scope_metrics.len() == 1,
825 "There should be a single scope as the meters are identical"
826 );
827 assert!(
828 resource_metrics[0].scope_metrics[0].metrics.len() == 1,
829 "There should be single metric for the scope as instruments are identical"
830 );
831
832 let scope = &resource_metrics[0].scope_metrics[0].scope;
833 assert_eq!(scope.name(), "test.meter");
834 assert_eq!(scope.version(), Some("v0.1.0"));
835 assert_eq!(scope.schema_url(), Some("http://example.com"));
836
837 assert!(scope.attributes().eq(&[KeyValue::new("key", "value1")]));
840
841 let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
842 assert_eq!(metric.name, "my_counter");
843 assert_eq!(metric.unit, "my_unit");
844 assert_eq!(metric.description, "my_description");
845 let sum = metric
846 .data
847 .as_any()
848 .downcast_ref::<Sum<u64>>()
849 .expect("Sum aggregation expected for Counter instruments by default");
850
851 assert_eq!(sum.data_points.len(), 1);
853
854 let datapoint = &sum.data_points[0];
855 assert_eq!(datapoint.value, 15);
856 }
857
858 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
859 async fn histogram_aggregation_with_invalid_aggregation_should_proceed_as_if_view_not_exist() {
860 let exporter = InMemoryMetricExporter::default();
865 let criteria = Instrument::new().name("test_histogram");
866 let stream_invalid_aggregation = Stream::new()
867 .aggregation(aggregation::Aggregation::ExplicitBucketHistogram {
868 boundaries: vec![0.9, 1.9, 1.2, 1.3, 1.4, 1.5], record_min_max: false,
870 })
871 .name("test_histogram_renamed")
872 .unit("test_unit_renamed");
873
874 let view =
875 new_view(criteria, stream_invalid_aggregation).expect("Expected to create a new view");
876 let meter_provider = SdkMeterProvider::builder()
877 .with_periodic_exporter(exporter.clone())
878 .with_view(view)
879 .build();
880
881 let meter = meter_provider.meter("test");
883 let histogram = meter
884 .f64_histogram("test_histogram")
885 .with_unit("test_unit")
886 .build();
887
888 histogram.record(1.5, &[KeyValue::new("key1", "value1")]);
889 meter_provider.force_flush().unwrap();
890
891 let resource_metrics = exporter
893 .get_finished_metrics()
894 .expect("metrics are expected to be exported.");
895 assert!(!resource_metrics.is_empty());
896 let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
897 assert_eq!(
898 metric.name, "test_histogram",
899 "View rename should be ignored and original name retained."
900 );
901 assert_eq!(
902 metric.unit, "test_unit",
903 "View rename of unit should be ignored and original unit retained."
904 );
905 }
906
907 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
908 #[ignore = "Spatial aggregation is not yet implemented."]
909 async fn spatial_aggregation_when_view_drops_attributes_observable_counter() {
910 let exporter = InMemoryMetricExporter::default();
914 let criteria = Instrument::new().name("my_observable_counter");
915 let stream_invalid_aggregation = Stream::new().allowed_attribute_keys(vec![]);
917
918 let view =
919 new_view(criteria, stream_invalid_aggregation).expect("Expected to create a new view");
920 let meter_provider = SdkMeterProvider::builder()
921 .with_periodic_exporter(exporter.clone())
922 .with_view(view)
923 .build();
924
925 let meter = meter_provider.meter("test");
927 let _observable_counter = meter
928 .u64_observable_counter("my_observable_counter")
929 .with_callback(|observer| {
930 observer.observe(
931 100,
932 &[
933 KeyValue::new("statusCode", "200"),
934 KeyValue::new("verb", "get"),
935 ],
936 );
937
938 observer.observe(
939 100,
940 &[
941 KeyValue::new("statusCode", "200"),
942 KeyValue::new("verb", "post"),
943 ],
944 );
945
946 observer.observe(
947 100,
948 &[
949 KeyValue::new("statusCode", "500"),
950 KeyValue::new("verb", "get"),
951 ],
952 );
953 })
954 .build();
955
956 meter_provider.force_flush().unwrap();
957
958 let resource_metrics = exporter
960 .get_finished_metrics()
961 .expect("metrics are expected to be exported.");
962 assert!(!resource_metrics.is_empty());
963 let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
964 assert_eq!(metric.name, "my_observable_counter",);
965
966 let sum = metric
967 .data
968 .as_any()
969 .downcast_ref::<Sum<u64>>()
970 .expect("Sum aggregation expected for ObservableCounter instruments by default");
971
972 assert_eq!(sum.data_points.len(), 1);
976
977 let data_point = &sum.data_points[0];
979 assert_eq!(data_point.value, 300);
980 }
981
982 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
983 async fn spatial_aggregation_when_view_drops_attributes_counter() {
984 let exporter = InMemoryMetricExporter::default();
988 let criteria = Instrument::new().name("my_counter");
989 let stream_invalid_aggregation = Stream::new().allowed_attribute_keys(vec![]);
991
992 let view =
993 new_view(criteria, stream_invalid_aggregation).expect("Expected to create a new view");
994 let meter_provider = SdkMeterProvider::builder()
995 .with_periodic_exporter(exporter.clone())
996 .with_view(view)
997 .build();
998
999 let meter = meter_provider.meter("test");
1001 let counter = meter.u64_counter("my_counter").build();
1002
1003 counter.add(
1006 10,
1007 [
1008 KeyValue::new("statusCode", "200"),
1009 KeyValue::new("verb", "Get"),
1010 ]
1011 .as_ref(),
1012 );
1013
1014 counter.add(
1015 10,
1016 [
1017 KeyValue::new("statusCode", "500"),
1018 KeyValue::new("verb", "Get"),
1019 ]
1020 .as_ref(),
1021 );
1022
1023 counter.add(
1024 10,
1025 [
1026 KeyValue::new("statusCode", "200"),
1027 KeyValue::new("verb", "Post"),
1028 ]
1029 .as_ref(),
1030 );
1031
1032 meter_provider.force_flush().unwrap();
1033
1034 let resource_metrics = exporter
1036 .get_finished_metrics()
1037 .expect("metrics are expected to be exported.");
1038 assert!(!resource_metrics.is_empty());
1039 let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
1040 assert_eq!(metric.name, "my_counter",);
1041
1042 let sum = metric
1043 .data
1044 .as_any()
1045 .downcast_ref::<Sum<u64>>()
1046 .expect("Sum aggregation expected for Counter instruments by default");
1047
1048 assert_eq!(sum.data_points.len(), 1);
1052 let data_point = &sum.data_points[0];
1054 assert_eq!(data_point.value, 30);
1055 }
1056
1057 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1058 async fn no_attr_cumulative_up_down_counter() {
1059 let mut test_context = TestContext::new(Temporality::Cumulative);
1060 let counter = test_context.i64_up_down_counter("test", "my_counter", Some("my_unit"));
1061
1062 counter.add(50, &[]);
1063 test_context.flush_metrics();
1064
1065 let sum = test_context.get_aggregation::<Sum<i64>>("my_counter", Some("my_unit"));
1066
1067 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1068 assert!(!sum.is_monotonic, "Should not produce monotonic.");
1069 assert_eq!(
1070 sum.temporality,
1071 Temporality::Cumulative,
1072 "Should produce cumulative"
1073 );
1074
1075 let data_point = &sum.data_points[0];
1076 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1077 assert_eq!(data_point.value, 50, "Unexpected data point value");
1078 }
1079
1080 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1081 async fn no_attr_up_down_counter_always_cumulative() {
1082 let mut test_context = TestContext::new(Temporality::Delta);
1083 let counter = test_context.i64_up_down_counter("test", "my_counter", Some("my_unit"));
1084
1085 counter.add(50, &[]);
1086 test_context.flush_metrics();
1087
1088 let sum = test_context.get_aggregation::<Sum<i64>>("my_counter", Some("my_unit"));
1089
1090 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1091 assert!(!sum.is_monotonic, "Should not produce monotonic.");
1092 assert_eq!(
1093 sum.temporality,
1094 Temporality::Cumulative,
1095 "Should produce Cumulative due to UpDownCounter temporality_preference"
1096 );
1097
1098 let data_point = &sum.data_points[0];
1099 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1100 assert_eq!(data_point.value, 50, "Unexpected data point value");
1101 }
1102
1103 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1104 async fn no_attr_cumulative_counter_value_added_after_export() {
1105 let mut test_context = TestContext::new(Temporality::Cumulative);
1106 let counter = test_context.u64_counter("test", "my_counter", None);
1107
1108 counter.add(50, &[]);
1109 test_context.flush_metrics();
1110 let _ = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
1111 test_context.reset_metrics();
1112
1113 counter.add(5, &[]);
1114 test_context.flush_metrics();
1115 let sum = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
1116
1117 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1118 assert!(sum.is_monotonic, "Should produce monotonic.");
1119 assert_eq!(
1120 sum.temporality,
1121 Temporality::Cumulative,
1122 "Should produce cumulative"
1123 );
1124
1125 let data_point = &sum.data_points[0];
1126 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1127 assert_eq!(data_point.value, 55, "Unexpected data point value");
1128 }
1129
1130 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1131 async fn no_attr_delta_counter_value_reset_after_export() {
1132 let mut test_context = TestContext::new(Temporality::Delta);
1133 let counter = test_context.u64_counter("test", "my_counter", None);
1134
1135 counter.add(50, &[]);
1136 test_context.flush_metrics();
1137 let _ = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
1138 test_context.reset_metrics();
1139
1140 counter.add(5, &[]);
1141 test_context.flush_metrics();
1142 let sum = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
1143
1144 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1145 assert!(sum.is_monotonic, "Should produce monotonic.");
1146 assert_eq!(
1147 sum.temporality,
1148 Temporality::Delta,
1149 "Should produce cumulative"
1150 );
1151
1152 let data_point = &sum.data_points[0];
1153 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1154 assert_eq!(data_point.value, 5, "Unexpected data point value");
1155 }
1156
1157 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1158 async fn second_delta_export_does_not_give_no_attr_value_if_add_not_called() {
1159 let mut test_context = TestContext::new(Temporality::Delta);
1160 let counter = test_context.u64_counter("test", "my_counter", None);
1161
1162 counter.add(50, &[]);
1163 test_context.flush_metrics();
1164 let _ = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
1165 test_context.reset_metrics();
1166
1167 counter.add(50, &[KeyValue::new("a", "b")]);
1168 test_context.flush_metrics();
1169 let sum = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
1170
1171 let no_attr_data_point = sum.data_points.iter().find(|x| x.attributes.is_empty());
1172
1173 assert!(
1174 no_attr_data_point.is_none(),
1175 "Expected no data points with no attributes"
1176 );
1177 }
1178
1179 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1180 async fn delta_memory_efficiency_test() {
1181 let mut test_context = TestContext::new(Temporality::Delta);
1186 let counter = test_context.u64_counter("test", "my_counter", None);
1187
1188 counter.add(1, &[KeyValue::new("key1", "value1")]);
1190 counter.add(1, &[KeyValue::new("key1", "value1")]);
1191 counter.add(1, &[KeyValue::new("key1", "value1")]);
1192 counter.add(1, &[KeyValue::new("key1", "value1")]);
1193 counter.add(1, &[KeyValue::new("key1", "value1")]);
1194
1195 counter.add(1, &[KeyValue::new("key1", "value2")]);
1196 counter.add(1, &[KeyValue::new("key1", "value2")]);
1197 counter.add(1, &[KeyValue::new("key1", "value2")]);
1198 test_context.flush_metrics();
1199
1200 let sum = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
1201
1202 assert_eq!(sum.data_points.len(), 2);
1204
1205 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1207 .expect("datapoint with key1=value1 expected");
1208 assert_eq!(data_point1.value, 5);
1209
1210 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
1212 .expect("datapoint with key1=value2 expected");
1213 assert_eq!(data_point1.value, 3);
1214
1215 test_context.exporter.reset();
1216 test_context.flush_metrics();
1219
1220 let resource_metrics = test_context
1221 .exporter
1222 .get_finished_metrics()
1223 .expect("metrics are expected to be exported.");
1224 println!("resource_metrics: {:?}", resource_metrics);
1225 assert!(resource_metrics.is_empty(), "No metrics should be exported as no new measurements were recorded since last collect.");
1226 }
1227
1228 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1229 async fn counter_multithreaded() {
1230 counter_multithreaded_aggregation_helper(Temporality::Delta);
1234 counter_multithreaded_aggregation_helper(Temporality::Cumulative);
1235 }
1236
1237 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1238 async fn counter_f64_multithreaded() {
1239 counter_f64_multithreaded_aggregation_helper(Temporality::Delta);
1243 counter_f64_multithreaded_aggregation_helper(Temporality::Cumulative);
1244 }
1245
1246 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1247 async fn histogram_multithreaded() {
1248 histogram_multithreaded_aggregation_helper(Temporality::Delta);
1252 histogram_multithreaded_aggregation_helper(Temporality::Cumulative);
1253 }
1254
1255 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1256 async fn histogram_f64_multithreaded() {
1257 histogram_f64_multithreaded_aggregation_helper(Temporality::Delta);
1261 histogram_f64_multithreaded_aggregation_helper(Temporality::Cumulative);
1262 }
1263 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1264 async fn synchronous_instruments_cumulative_with_gap_in_measurements() {
1265 synchronous_instruments_cumulative_with_gap_in_measurements_helper("counter");
1269 synchronous_instruments_cumulative_with_gap_in_measurements_helper("updown_counter");
1270 synchronous_instruments_cumulative_with_gap_in_measurements_helper("histogram");
1271 synchronous_instruments_cumulative_with_gap_in_measurements_helper("gauge");
1272 }
1273
1274 fn synchronous_instruments_cumulative_with_gap_in_measurements_helper(
1275 instrument_name: &'static str,
1276 ) {
1277 let mut test_context = TestContext::new(Temporality::Cumulative);
1278 let attributes = &[KeyValue::new("key1", "value1")];
1279
1280 match instrument_name {
1282 "counter" => {
1283 let counter = test_context.meter().u64_counter("test_counter").build();
1284 counter.add(5, &[]);
1285 counter.add(10, attributes);
1286 }
1287 "updown_counter" => {
1288 let updown_counter = test_context
1289 .meter()
1290 .i64_up_down_counter("test_updowncounter")
1291 .build();
1292 updown_counter.add(15, &[]);
1293 updown_counter.add(20, attributes);
1294 }
1295 "histogram" => {
1296 let histogram = test_context.meter().u64_histogram("test_histogram").build();
1297 histogram.record(25, &[]);
1298 histogram.record(30, attributes);
1299 }
1300 "gauge" => {
1301 let gauge = test_context.meter().u64_gauge("test_gauge").build();
1302 gauge.record(35, &[]);
1303 gauge.record(40, attributes);
1304 }
1305 _ => panic!("Incorrect instrument kind provided"),
1306 };
1307
1308 test_context.flush_metrics();
1309
1310 assert_correct_export(&mut test_context, instrument_name);
1312
1313 test_context.reset_metrics();
1315
1316 test_context.flush_metrics();
1317
1318 assert_correct_export(&mut test_context, instrument_name);
1320
1321 fn assert_correct_export(test_context: &mut TestContext, instrument_name: &'static str) {
1322 match instrument_name {
1323 "counter" => {
1324 let counter_data =
1325 test_context.get_aggregation::<Sum<u64>>("test_counter", None);
1326 assert_eq!(counter_data.data_points.len(), 2);
1327 let zero_attribute_datapoint =
1328 find_sum_datapoint_with_no_attributes(&counter_data.data_points)
1329 .expect("datapoint with no attributes expected");
1330 assert_eq!(zero_attribute_datapoint.value, 5);
1331 let data_point1 = find_sum_datapoint_with_key_value(
1332 &counter_data.data_points,
1333 "key1",
1334 "value1",
1335 )
1336 .expect("datapoint with key1=value1 expected");
1337 assert_eq!(data_point1.value, 10);
1338 }
1339 "updown_counter" => {
1340 let updown_counter_data =
1341 test_context.get_aggregation::<Sum<i64>>("test_updowncounter", None);
1342 assert_eq!(updown_counter_data.data_points.len(), 2);
1343 let zero_attribute_datapoint =
1344 find_sum_datapoint_with_no_attributes(&updown_counter_data.data_points)
1345 .expect("datapoint with no attributes expected");
1346 assert_eq!(zero_attribute_datapoint.value, 15);
1347 let data_point1 = find_sum_datapoint_with_key_value(
1348 &updown_counter_data.data_points,
1349 "key1",
1350 "value1",
1351 )
1352 .expect("datapoint with key1=value1 expected");
1353 assert_eq!(data_point1.value, 20);
1354 }
1355 "histogram" => {
1356 let histogram_data =
1357 test_context.get_aggregation::<Histogram<u64>>("test_histogram", None);
1358 assert_eq!(histogram_data.data_points.len(), 2);
1359 let zero_attribute_datapoint =
1360 find_histogram_datapoint_with_no_attributes(&histogram_data.data_points)
1361 .expect("datapoint with no attributes expected");
1362 assert_eq!(zero_attribute_datapoint.count, 1);
1363 assert_eq!(zero_attribute_datapoint.sum, 25);
1364 assert_eq!(zero_attribute_datapoint.min, Some(25));
1365 assert_eq!(zero_attribute_datapoint.max, Some(25));
1366 let data_point1 = find_histogram_datapoint_with_key_value(
1367 &histogram_data.data_points,
1368 "key1",
1369 "value1",
1370 )
1371 .expect("datapoint with key1=value1 expected");
1372 assert_eq!(data_point1.count, 1);
1373 assert_eq!(data_point1.sum, 30);
1374 assert_eq!(data_point1.min, Some(30));
1375 assert_eq!(data_point1.max, Some(30));
1376 }
1377 "gauge" => {
1378 let gauge_data = test_context.get_aggregation::<Gauge<u64>>("test_gauge", None);
1379 assert_eq!(gauge_data.data_points.len(), 2);
1380 let zero_attribute_datapoint =
1381 find_gauge_datapoint_with_no_attributes(&gauge_data.data_points)
1382 .expect("datapoint with no attributes expected");
1383 assert_eq!(zero_attribute_datapoint.value, 35);
1384 let data_point1 = find_gauge_datapoint_with_key_value(
1385 &gauge_data.data_points,
1386 "key1",
1387 "value1",
1388 )
1389 .expect("datapoint with key1=value1 expected");
1390 assert_eq!(data_point1.value, 40);
1391 }
1392 _ => panic!("Incorrect instrument kind provided"),
1393 }
1394 }
1395 }
1396
1397 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1398 async fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement() {
1399 asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1403 "gauge", true,
1404 );
1405 asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1408 "counter", false,
1409 );
1410 asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1411 "updown_counter",
1412 false,
1413 );
1414 }
1415
1416 fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1417 instrument_name: &'static str,
1418 should_not_emit: bool,
1419 ) {
1420 let mut test_context = TestContext::new(Temporality::Cumulative);
1421 let attributes = Arc::new([KeyValue::new("key1", "value1")]);
1422
1423 match instrument_name {
1425 "counter" => {
1426 let has_run = AtomicBool::new(false);
1427 let _observable_counter = test_context
1428 .meter()
1429 .u64_observable_counter("test_counter")
1430 .with_callback(move |observer| {
1431 if !has_run.load(Ordering::SeqCst) {
1432 observer.observe(5, &[]);
1433 observer.observe(10, &*attributes.clone());
1434 has_run.store(true, Ordering::SeqCst);
1435 }
1436 })
1437 .build();
1438 }
1439 "updown_counter" => {
1440 let has_run = AtomicBool::new(false);
1441 let _observable_up_down_counter = test_context
1442 .meter()
1443 .i64_observable_up_down_counter("test_updowncounter")
1444 .with_callback(move |observer| {
1445 if !has_run.load(Ordering::SeqCst) {
1446 observer.observe(15, &[]);
1447 observer.observe(20, &*attributes.clone());
1448 has_run.store(true, Ordering::SeqCst);
1449 }
1450 })
1451 .build();
1452 }
1453 "gauge" => {
1454 let has_run = AtomicBool::new(false);
1455 let _observable_gauge = test_context
1456 .meter()
1457 .u64_observable_gauge("test_gauge")
1458 .with_callback(move |observer| {
1459 if !has_run.load(Ordering::SeqCst) {
1460 observer.observe(25, &[]);
1461 observer.observe(30, &*attributes.clone());
1462 has_run.store(true, Ordering::SeqCst);
1463 }
1464 })
1465 .build();
1466 }
1467 _ => panic!("Incorrect instrument kind provided"),
1468 };
1469
1470 test_context.flush_metrics();
1471
1472 assert_correct_export(&mut test_context, instrument_name);
1474
1475 test_context.reset_metrics();
1477
1478 test_context.flush_metrics();
1479
1480 if should_not_emit {
1481 test_context.check_no_metrics();
1482 } else {
1483 assert_correct_export(&mut test_context, instrument_name);
1485 }
1486
1487 fn assert_correct_export(test_context: &mut TestContext, instrument_name: &'static str) {
1488 match instrument_name {
1489 "counter" => {
1490 let counter_data =
1491 test_context.get_aggregation::<Sum<u64>>("test_counter", None);
1492 assert_eq!(counter_data.data_points.len(), 2);
1493 assert!(counter_data.is_monotonic);
1494 let zero_attribute_datapoint =
1495 find_sum_datapoint_with_no_attributes(&counter_data.data_points)
1496 .expect("datapoint with no attributes expected");
1497 assert_eq!(zero_attribute_datapoint.value, 5);
1498 let data_point1 = find_sum_datapoint_with_key_value(
1499 &counter_data.data_points,
1500 "key1",
1501 "value1",
1502 )
1503 .expect("datapoint with key1=value1 expected");
1504 assert_eq!(data_point1.value, 10);
1505 }
1506 "updown_counter" => {
1507 let updown_counter_data =
1508 test_context.get_aggregation::<Sum<i64>>("test_updowncounter", None);
1509 assert_eq!(updown_counter_data.data_points.len(), 2);
1510 assert!(!updown_counter_data.is_monotonic);
1511 let zero_attribute_datapoint =
1512 find_sum_datapoint_with_no_attributes(&updown_counter_data.data_points)
1513 .expect("datapoint with no attributes expected");
1514 assert_eq!(zero_attribute_datapoint.value, 15);
1515 let data_point1 = find_sum_datapoint_with_key_value(
1516 &updown_counter_data.data_points,
1517 "key1",
1518 "value1",
1519 )
1520 .expect("datapoint with key1=value1 expected");
1521 assert_eq!(data_point1.value, 20);
1522 }
1523 "gauge" => {
1524 let gauge_data = test_context.get_aggregation::<Gauge<u64>>("test_gauge", None);
1525 assert_eq!(gauge_data.data_points.len(), 2);
1526 let zero_attribute_datapoint =
1527 find_gauge_datapoint_with_no_attributes(&gauge_data.data_points)
1528 .expect("datapoint with no attributes expected");
1529 assert_eq!(zero_attribute_datapoint.value, 25);
1530 let data_point1 = find_gauge_datapoint_with_key_value(
1531 &gauge_data.data_points,
1532 "key1",
1533 "value1",
1534 )
1535 .expect("datapoint with key1=value1 expected");
1536 assert_eq!(data_point1.value, 30);
1537 }
1538 _ => panic!("Incorrect instrument kind provided"),
1539 }
1540 }
1541 }
1542
1543 fn counter_multithreaded_aggregation_helper(temporality: Temporality) {
1544 let mut test_context = TestContext::new(temporality);
1546 let counter = Arc::new(test_context.u64_counter("test", "my_counter", None));
1547
1548 for i in 0..10 {
1549 thread::scope(|s| {
1550 s.spawn(|| {
1551 counter.add(1, &[]);
1552
1553 counter.add(1, &[KeyValue::new("key1", "value1")]);
1554 counter.add(1, &[KeyValue::new("key1", "value1")]);
1555 counter.add(1, &[KeyValue::new("key1", "value1")]);
1556
1557 if i % 2 == 0 {
1559 test_context.flush_metrics();
1560 thread::sleep(Duration::from_millis(i)); }
1562
1563 counter.add(1, &[KeyValue::new("key1", "value1")]);
1564 counter.add(1, &[KeyValue::new("key1", "value1")]);
1565 });
1566 });
1567 }
1568
1569 test_context.flush_metrics();
1570
1571 let sums = test_context.get_from_multiple_aggregations::<Sum<u64>>("my_counter", None, 6);
1574
1575 let mut sum_zero_attributes = 0;
1576 let mut sum_key1_value1 = 0;
1577 sums.iter().for_each(|sum| {
1578 assert_eq!(sum.data_points.len(), 2); assert!(sum.is_monotonic, "Counter should produce monotonic.");
1580 assert_eq!(sum.temporality, temporality);
1581
1582 if temporality == Temporality::Delta {
1583 sum_zero_attributes += sum.data_points[0].value;
1584 sum_key1_value1 += sum.data_points[1].value;
1585 } else {
1586 sum_zero_attributes = sum.data_points[0].value;
1587 sum_key1_value1 = sum.data_points[1].value;
1588 };
1589 });
1590
1591 assert_eq!(sum_zero_attributes, 10);
1592 assert_eq!(sum_key1_value1, 50); }
1594
1595 fn counter_f64_multithreaded_aggregation_helper(temporality: Temporality) {
1596 let mut test_context = TestContext::new(temporality);
1598 let counter = Arc::new(test_context.meter().f64_counter("test_counter").build());
1599
1600 for i in 0..10 {
1601 thread::scope(|s| {
1602 s.spawn(|| {
1603 counter.add(1.23, &[]);
1604
1605 counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1606 counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1607 counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1608
1609 if i % 2 == 0 {
1611 test_context.flush_metrics();
1612 thread::sleep(Duration::from_millis(i)); }
1614
1615 counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1616 counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1617 });
1618 });
1619 }
1620
1621 test_context.flush_metrics();
1622
1623 let sums = test_context.get_from_multiple_aggregations::<Sum<f64>>("test_counter", None, 6);
1626
1627 let mut sum_zero_attributes = 0.0;
1628 let mut sum_key1_value1 = 0.0;
1629 sums.iter().for_each(|sum| {
1630 assert_eq!(sum.data_points.len(), 2); assert!(sum.is_monotonic, "Counter should produce monotonic.");
1632 assert_eq!(sum.temporality, temporality);
1633
1634 if temporality == Temporality::Delta {
1635 sum_zero_attributes += sum.data_points[0].value;
1636 sum_key1_value1 += sum.data_points[1].value;
1637 } else {
1638 sum_zero_attributes = sum.data_points[0].value;
1639 sum_key1_value1 = sum.data_points[1].value;
1640 };
1641 });
1642
1643 assert!(f64::abs(12.3 - sum_zero_attributes) < 0.0001);
1644 assert!(f64::abs(61.5 - sum_key1_value1) < 0.0001); }
1646
1647 fn histogram_multithreaded_aggregation_helper(temporality: Temporality) {
1648 let mut test_context = TestContext::new(temporality);
1650 let histogram = Arc::new(test_context.meter().u64_histogram("test_histogram").build());
1651
1652 for i in 0..10 {
1653 thread::scope(|s| {
1654 s.spawn(|| {
1655 histogram.record(1, &[]);
1656 histogram.record(4, &[]);
1657
1658 histogram.record(5, &[KeyValue::new("key1", "value1")]);
1659 histogram.record(7, &[KeyValue::new("key1", "value1")]);
1660 histogram.record(18, &[KeyValue::new("key1", "value1")]);
1661
1662 if i % 2 == 0 {
1664 test_context.flush_metrics();
1665 thread::sleep(Duration::from_millis(i)); }
1667
1668 histogram.record(35, &[KeyValue::new("key1", "value1")]);
1669 histogram.record(35, &[KeyValue::new("key1", "value1")]);
1670 });
1671 });
1672 }
1673
1674 test_context.flush_metrics();
1675
1676 let histograms = test_context.get_from_multiple_aggregations::<Histogram<u64>>(
1679 "test_histogram",
1680 None,
1681 6,
1682 );
1683
1684 let (
1685 mut sum_zero_attributes,
1686 mut count_zero_attributes,
1687 mut min_zero_attributes,
1688 mut max_zero_attributes,
1689 ) = (0, 0, u64::MAX, u64::MIN);
1690 let (mut sum_key1_value1, mut count_key1_value1, mut min_key1_value1, mut max_key1_value1) =
1691 (0, 0, u64::MAX, u64::MIN);
1692
1693 let mut bucket_counts_zero_attributes = vec![0; 16]; let mut bucket_counts_key1_value1 = vec![0; 16];
1695
1696 histograms.iter().for_each(|histogram| {
1697 assert_eq!(histogram.data_points.len(), 2); assert_eq!(histogram.temporality, temporality);
1699
1700 let data_point_zero_attributes =
1701 find_histogram_datapoint_with_no_attributes(&histogram.data_points).unwrap();
1702 let data_point_key1_value1 =
1703 find_histogram_datapoint_with_key_value(&histogram.data_points, "key1", "value1")
1704 .unwrap();
1705
1706 if temporality == Temporality::Delta {
1707 sum_zero_attributes += data_point_zero_attributes.sum;
1708 sum_key1_value1 += data_point_key1_value1.sum;
1709
1710 count_zero_attributes += data_point_zero_attributes.count;
1711 count_key1_value1 += data_point_key1_value1.count;
1712
1713 min_zero_attributes =
1714 min(min_zero_attributes, data_point_zero_attributes.min.unwrap());
1715 min_key1_value1 = min(min_key1_value1, data_point_key1_value1.min.unwrap());
1716
1717 max_zero_attributes =
1718 max(max_zero_attributes, data_point_zero_attributes.max.unwrap());
1719 max_key1_value1 = max(max_key1_value1, data_point_key1_value1.max.unwrap());
1720
1721 assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
1722 assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
1723
1724 for (i, _) in data_point_zero_attributes.bucket_counts.iter().enumerate() {
1725 bucket_counts_zero_attributes[i] += data_point_zero_attributes.bucket_counts[i];
1726 }
1727
1728 for (i, _) in data_point_key1_value1.bucket_counts.iter().enumerate() {
1729 bucket_counts_key1_value1[i] += data_point_key1_value1.bucket_counts[i];
1730 }
1731 } else {
1732 sum_zero_attributes = data_point_zero_attributes.sum;
1733 sum_key1_value1 = data_point_key1_value1.sum;
1734
1735 count_zero_attributes = data_point_zero_attributes.count;
1736 count_key1_value1 = data_point_key1_value1.count;
1737
1738 min_zero_attributes = data_point_zero_attributes.min.unwrap();
1739 min_key1_value1 = data_point_key1_value1.min.unwrap();
1740
1741 max_zero_attributes = data_point_zero_attributes.max.unwrap();
1742 max_key1_value1 = data_point_key1_value1.max.unwrap();
1743
1744 assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
1745 assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
1746
1747 bucket_counts_zero_attributes.clone_from(&data_point_zero_attributes.bucket_counts);
1748 bucket_counts_key1_value1.clone_from(&data_point_key1_value1.bucket_counts);
1749 };
1750 });
1751
1752 assert_eq!(count_zero_attributes, 20); assert_eq!(sum_zero_attributes, 50); assert_eq!(min_zero_attributes, 1);
1759 assert_eq!(max_zero_attributes, 4);
1760
1761 for (i, count) in bucket_counts_zero_attributes.iter().enumerate() {
1762 match i {
1763 1 => assert_eq!(*count, 20), _ => assert_eq!(*count, 0),
1765 }
1766 }
1767
1768 assert_eq!(count_key1_value1, 50); assert_eq!(sum_key1_value1, 1000); assert_eq!(min_key1_value1, 5);
1771 assert_eq!(max_key1_value1, 35);
1772
1773 for (i, count) in bucket_counts_key1_value1.iter().enumerate() {
1774 match i {
1775 1 => assert_eq!(*count, 10), 2 => assert_eq!(*count, 10), 3 => assert_eq!(*count, 10), 4 => assert_eq!(*count, 20), _ => assert_eq!(*count, 0),
1780 }
1781 }
1782 }
1783
1784 fn histogram_f64_multithreaded_aggregation_helper(temporality: Temporality) {
1785 let mut test_context = TestContext::new(temporality);
1787 let histogram = Arc::new(test_context.meter().f64_histogram("test_histogram").build());
1788
1789 for i in 0..10 {
1790 thread::scope(|s| {
1791 s.spawn(|| {
1792 histogram.record(1.5, &[]);
1793 histogram.record(4.6, &[]);
1794
1795 histogram.record(5.0, &[KeyValue::new("key1", "value1")]);
1796 histogram.record(7.3, &[KeyValue::new("key1", "value1")]);
1797 histogram.record(18.1, &[KeyValue::new("key1", "value1")]);
1798
1799 if i % 2 == 0 {
1801 test_context.flush_metrics();
1802 thread::sleep(Duration::from_millis(i)); }
1804
1805 histogram.record(35.1, &[KeyValue::new("key1", "value1")]);
1806 histogram.record(35.1, &[KeyValue::new("key1", "value1")]);
1807 });
1808 });
1809 }
1810
1811 test_context.flush_metrics();
1812
1813 let histograms = test_context.get_from_multiple_aggregations::<Histogram<f64>>(
1816 "test_histogram",
1817 None,
1818 6,
1819 );
1820
1821 let (
1822 mut sum_zero_attributes,
1823 mut count_zero_attributes,
1824 mut min_zero_attributes,
1825 mut max_zero_attributes,
1826 ) = (0.0, 0, f64::MAX, f64::MIN);
1827 let (mut sum_key1_value1, mut count_key1_value1, mut min_key1_value1, mut max_key1_value1) =
1828 (0.0, 0, f64::MAX, f64::MIN);
1829
1830 let mut bucket_counts_zero_attributes = vec![0; 16]; let mut bucket_counts_key1_value1 = vec![0; 16];
1832
1833 histograms.iter().for_each(|histogram| {
1834 assert_eq!(histogram.data_points.len(), 2); assert_eq!(histogram.temporality, temporality);
1836
1837 let data_point_zero_attributes =
1838 find_histogram_datapoint_with_no_attributes(&histogram.data_points).unwrap();
1839 let data_point_key1_value1 =
1840 find_histogram_datapoint_with_key_value(&histogram.data_points, "key1", "value1")
1841 .unwrap();
1842
1843 if temporality == Temporality::Delta {
1844 sum_zero_attributes += data_point_zero_attributes.sum;
1845 sum_key1_value1 += data_point_key1_value1.sum;
1846
1847 count_zero_attributes += data_point_zero_attributes.count;
1848 count_key1_value1 += data_point_key1_value1.count;
1849
1850 min_zero_attributes =
1851 min_zero_attributes.min(data_point_zero_attributes.min.unwrap());
1852 min_key1_value1 = min_key1_value1.min(data_point_key1_value1.min.unwrap());
1853
1854 max_zero_attributes =
1855 max_zero_attributes.max(data_point_zero_attributes.max.unwrap());
1856 max_key1_value1 = max_key1_value1.max(data_point_key1_value1.max.unwrap());
1857
1858 assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
1859 assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
1860
1861 for (i, _) in data_point_zero_attributes.bucket_counts.iter().enumerate() {
1862 bucket_counts_zero_attributes[i] += data_point_zero_attributes.bucket_counts[i];
1863 }
1864
1865 for (i, _) in data_point_key1_value1.bucket_counts.iter().enumerate() {
1866 bucket_counts_key1_value1[i] += data_point_key1_value1.bucket_counts[i];
1867 }
1868 } else {
1869 sum_zero_attributes = data_point_zero_attributes.sum;
1870 sum_key1_value1 = data_point_key1_value1.sum;
1871
1872 count_zero_attributes = data_point_zero_attributes.count;
1873 count_key1_value1 = data_point_key1_value1.count;
1874
1875 min_zero_attributes = data_point_zero_attributes.min.unwrap();
1876 min_key1_value1 = data_point_key1_value1.min.unwrap();
1877
1878 max_zero_attributes = data_point_zero_attributes.max.unwrap();
1879 max_key1_value1 = data_point_key1_value1.max.unwrap();
1880
1881 assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
1882 assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
1883
1884 bucket_counts_zero_attributes.clone_from(&data_point_zero_attributes.bucket_counts);
1885 bucket_counts_key1_value1.clone_from(&data_point_key1_value1.bucket_counts);
1886 };
1887 });
1888
1889 assert_eq!(count_zero_attributes, 20); assert!(f64::abs(61.0 - sum_zero_attributes) < 0.0001); assert_eq!(min_zero_attributes, 1.5);
1896 assert_eq!(max_zero_attributes, 4.6);
1897
1898 for (i, count) in bucket_counts_zero_attributes.iter().enumerate() {
1899 match i {
1900 1 => assert_eq!(*count, 20), _ => assert_eq!(*count, 0),
1902 }
1903 }
1904
1905 assert_eq!(count_key1_value1, 50); assert!(f64::abs(1006.0 - sum_key1_value1) < 0.0001); assert_eq!(min_key1_value1, 5.0);
1908 assert_eq!(max_key1_value1, 35.1);
1909
1910 for (i, count) in bucket_counts_key1_value1.iter().enumerate() {
1911 match i {
1912 1 => assert_eq!(*count, 10), 2 => assert_eq!(*count, 10), 3 => assert_eq!(*count, 10), 4 => assert_eq!(*count, 20), _ => assert_eq!(*count, 0),
1917 }
1918 }
1919 }
1920
1921 fn histogram_aggregation_helper(temporality: Temporality) {
1922 let mut test_context = TestContext::new(temporality);
1924 let histogram = test_context.meter().u64_histogram("my_histogram").build();
1925
1926 let mut rand = rngs::SmallRng::from_entropy();
1928 let values_kv1 = (0..50)
1929 .map(|_| rand.gen_range(0..100))
1930 .collect::<Vec<u64>>();
1931 for value in values_kv1.iter() {
1932 histogram.record(*value, &[KeyValue::new("key1", "value1")]);
1933 }
1934
1935 let values_kv2 = (0..30)
1936 .map(|_| rand.gen_range(0..100))
1937 .collect::<Vec<u64>>();
1938 for value in values_kv2.iter() {
1939 histogram.record(*value, &[KeyValue::new("key1", "value2")]);
1940 }
1941
1942 test_context.flush_metrics();
1943
1944 let histogram_data = test_context.get_aggregation::<Histogram<u64>>("my_histogram", None);
1946 assert_eq!(histogram_data.data_points.len(), 2);
1948 if let Temporality::Cumulative = temporality {
1949 assert_eq!(
1950 histogram_data.temporality,
1951 Temporality::Cumulative,
1952 "Should produce cumulative"
1953 );
1954 } else {
1955 assert_eq!(
1956 histogram_data.temporality,
1957 Temporality::Delta,
1958 "Should produce delta"
1959 );
1960 }
1961
1962 let data_point1 =
1964 find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
1965 .expect("datapoint with key1=value1 expected");
1966 assert_eq!(data_point1.count, values_kv1.len() as u64);
1967 assert_eq!(data_point1.sum, values_kv1.iter().sum::<u64>());
1968 assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
1969 assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
1970
1971 let data_point2 =
1972 find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value2")
1973 .expect("datapoint with key1=value2 expected");
1974 assert_eq!(data_point2.count, values_kv2.len() as u64);
1975 assert_eq!(data_point2.sum, values_kv2.iter().sum::<u64>());
1976 assert_eq!(data_point2.min.unwrap(), *values_kv2.iter().min().unwrap());
1977 assert_eq!(data_point2.max.unwrap(), *values_kv2.iter().max().unwrap());
1978
1979 test_context.reset_metrics();
1981 for value in values_kv1.iter() {
1982 histogram.record(*value, &[KeyValue::new("key1", "value1")]);
1983 }
1984
1985 for value in values_kv2.iter() {
1986 histogram.record(*value, &[KeyValue::new("key1", "value2")]);
1987 }
1988
1989 test_context.flush_metrics();
1990
1991 let histogram_data = test_context.get_aggregation::<Histogram<u64>>("my_histogram", None);
1992 assert_eq!(histogram_data.data_points.len(), 2);
1993 let data_point1 =
1994 find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
1995 .expect("datapoint with key1=value1 expected");
1996 if temporality == Temporality::Cumulative {
1997 assert_eq!(data_point1.count, 2 * (values_kv1.len() as u64));
1998 assert_eq!(data_point1.sum, 2 * (values_kv1.iter().sum::<u64>()));
1999 assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
2000 assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
2001 } else {
2002 assert_eq!(data_point1.count, values_kv1.len() as u64);
2003 assert_eq!(data_point1.sum, values_kv1.iter().sum::<u64>());
2004 assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
2005 assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
2006 }
2007
2008 let data_point1 =
2009 find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value2")
2010 .expect("datapoint with key1=value1 expected");
2011 if temporality == Temporality::Cumulative {
2012 assert_eq!(data_point1.count, 2 * (values_kv2.len() as u64));
2013 assert_eq!(data_point1.sum, 2 * (values_kv2.iter().sum::<u64>()));
2014 assert_eq!(data_point1.min.unwrap(), *values_kv2.iter().min().unwrap());
2015 assert_eq!(data_point1.max.unwrap(), *values_kv2.iter().max().unwrap());
2016 } else {
2017 assert_eq!(data_point1.count, values_kv2.len() as u64);
2018 assert_eq!(data_point1.sum, values_kv2.iter().sum::<u64>());
2019 assert_eq!(data_point1.min.unwrap(), *values_kv2.iter().min().unwrap());
2020 assert_eq!(data_point1.max.unwrap(), *values_kv2.iter().max().unwrap());
2021 }
2022 }
2023
2024 fn histogram_aggregation_with_custom_bounds_helper(temporality: Temporality) {
2025 let mut test_context = TestContext::new(temporality);
2026 let histogram = test_context
2027 .meter()
2028 .u64_histogram("test_histogram")
2029 .with_boundaries(vec![1.0, 2.5, 5.5])
2030 .build();
2031 histogram.record(1, &[KeyValue::new("key1", "value1")]);
2032 histogram.record(2, &[KeyValue::new("key1", "value1")]);
2033 histogram.record(3, &[KeyValue::new("key1", "value1")]);
2034 histogram.record(4, &[KeyValue::new("key1", "value1")]);
2035 histogram.record(5, &[KeyValue::new("key1", "value1")]);
2036
2037 test_context.flush_metrics();
2038
2039 let histogram_data = test_context.get_aggregation::<Histogram<u64>>("test_histogram", None);
2041 assert_eq!(histogram_data.data_points.len(), 1);
2043 if let Temporality::Cumulative = temporality {
2044 assert_eq!(
2045 histogram_data.temporality,
2046 Temporality::Cumulative,
2047 "Should produce cumulative"
2048 );
2049 } else {
2050 assert_eq!(
2051 histogram_data.temporality,
2052 Temporality::Delta,
2053 "Should produce delta"
2054 );
2055 }
2056
2057 let data_point =
2059 find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
2060 .expect("datapoint with key1=value1 expected");
2061
2062 assert_eq!(data_point.count, 5);
2063 assert_eq!(data_point.sum, 15);
2064
2065 assert_eq!(vec![1.0, 2.5, 5.5], data_point.bounds);
2072 assert_eq!(vec![1, 1, 3, 0], data_point.bucket_counts);
2073 }
2074 fn gauge_aggregation_helper(temporality: Temporality) {
2075 let mut test_context = TestContext::new(temporality);
2077 let gauge = test_context.meter().i64_gauge("my_gauge").build();
2078
2079 gauge.record(1, &[KeyValue::new("key1", "value1")]);
2081 gauge.record(2, &[KeyValue::new("key1", "value1")]);
2082 gauge.record(1, &[KeyValue::new("key1", "value1")]);
2083 gauge.record(3, &[KeyValue::new("key1", "value1")]);
2084 gauge.record(4, &[KeyValue::new("key1", "value1")]);
2085
2086 gauge.record(11, &[KeyValue::new("key1", "value2")]);
2087 gauge.record(13, &[KeyValue::new("key1", "value2")]);
2088 gauge.record(6, &[KeyValue::new("key1", "value2")]);
2089
2090 test_context.flush_metrics();
2091
2092 let gauge_data_point = test_context.get_aggregation::<Gauge<i64>>("my_gauge", None);
2094 assert_eq!(gauge_data_point.data_points.len(), 2);
2096
2097 let data_point1 =
2099 find_gauge_datapoint_with_key_value(&gauge_data_point.data_points, "key1", "value1")
2100 .expect("datapoint with key1=value1 expected");
2101 assert_eq!(data_point1.value, 4);
2102
2103 let data_point1 =
2104 find_gauge_datapoint_with_key_value(&gauge_data_point.data_points, "key1", "value2")
2105 .expect("datapoint with key1=value2 expected");
2106 assert_eq!(data_point1.value, 6);
2107
2108 test_context.reset_metrics();
2110 gauge.record(1, &[KeyValue::new("key1", "value1")]);
2111 gauge.record(2, &[KeyValue::new("key1", "value1")]);
2112 gauge.record(11, &[KeyValue::new("key1", "value1")]);
2113 gauge.record(3, &[KeyValue::new("key1", "value1")]);
2114 gauge.record(41, &[KeyValue::new("key1", "value1")]);
2115
2116 gauge.record(34, &[KeyValue::new("key1", "value2")]);
2117 gauge.record(12, &[KeyValue::new("key1", "value2")]);
2118 gauge.record(54, &[KeyValue::new("key1", "value2")]);
2119
2120 test_context.flush_metrics();
2121
2122 let gauge = test_context.get_aggregation::<Gauge<i64>>("my_gauge", None);
2123 assert_eq!(gauge.data_points.len(), 2);
2124 let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1")
2125 .expect("datapoint with key1=value1 expected");
2126 assert_eq!(data_point1.value, 41);
2127
2128 let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value2")
2129 .expect("datapoint with key1=value2 expected");
2130 assert_eq!(data_point1.value, 54);
2131 }
2132
2133 fn observable_gauge_aggregation_helper(temporality: Temporality, use_empty_attributes: bool) {
2134 let mut test_context = TestContext::new(temporality);
2136 let _observable_gauge = test_context
2137 .meter()
2138 .i64_observable_gauge("test_observable_gauge")
2139 .with_callback(move |observer| {
2140 if use_empty_attributes {
2141 observer.observe(1, &[]);
2142 }
2143 observer.observe(4, &[KeyValue::new("key1", "value1")]);
2144 observer.observe(5, &[KeyValue::new("key2", "value2")]);
2145 })
2146 .build();
2147
2148 test_context.flush_metrics();
2149
2150 let gauge = test_context.get_aggregation::<Gauge<i64>>("test_observable_gauge", None);
2152 let expected_time_series_count = if use_empty_attributes { 3 } else { 2 };
2154 assert_eq!(gauge.data_points.len(), expected_time_series_count);
2155
2156 if use_empty_attributes {
2157 let zero_attribute_datapoint =
2159 find_gauge_datapoint_with_no_attributes(&gauge.data_points)
2160 .expect("datapoint with no attributes expected");
2161 assert_eq!(zero_attribute_datapoint.value, 1);
2162 }
2163
2164 let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1")
2166 .expect("datapoint with key1=value1 expected");
2167 assert_eq!(data_point1.value, 4);
2168
2169 let data_point2 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key2", "value2")
2171 .expect("datapoint with key2=value2 expected");
2172 assert_eq!(data_point2.value, 5);
2173
2174 test_context.reset_metrics();
2176
2177 test_context.flush_metrics();
2178
2179 let gauge = test_context.get_aggregation::<Gauge<i64>>("test_observable_gauge", None);
2180 assert_eq!(gauge.data_points.len(), expected_time_series_count);
2181
2182 if use_empty_attributes {
2183 let zero_attribute_datapoint =
2184 find_gauge_datapoint_with_no_attributes(&gauge.data_points)
2185 .expect("datapoint with no attributes expected");
2186 assert_eq!(zero_attribute_datapoint.value, 1);
2187 }
2188
2189 let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1")
2190 .expect("datapoint with key1=value1 expected");
2191 assert_eq!(data_point1.value, 4);
2192
2193 let data_point2 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key2", "value2")
2194 .expect("datapoint with key2=value2 expected");
2195 assert_eq!(data_point2.value, 5);
2196 }
2197
2198 fn counter_aggregation_helper(temporality: Temporality) {
2199 let mut test_context = TestContext::new(temporality);
2201 let counter = test_context.u64_counter("test", "my_counter", None);
2202
2203 counter.add(1, &[KeyValue::new("key1", "value1")]);
2205 counter.add(1, &[KeyValue::new("key1", "value1")]);
2206 counter.add(1, &[KeyValue::new("key1", "value1")]);
2207 counter.add(1, &[KeyValue::new("key1", "value1")]);
2208 counter.add(1, &[KeyValue::new("key1", "value1")]);
2209
2210 counter.add(1, &[KeyValue::new("key1", "value2")]);
2211 counter.add(1, &[KeyValue::new("key1", "value2")]);
2212 counter.add(1, &[KeyValue::new("key1", "value2")]);
2213
2214 test_context.flush_metrics();
2215
2216 let sum = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
2218 assert_eq!(sum.data_points.len(), 2);
2220 assert!(sum.is_monotonic, "Counter should produce monotonic.");
2221 if let Temporality::Cumulative = temporality {
2222 assert_eq!(
2223 sum.temporality,
2224 Temporality::Cumulative,
2225 "Should produce cumulative"
2226 );
2227 } else {
2228 assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
2229 }
2230
2231 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
2233 .expect("datapoint with key1=value1 expected");
2234 assert_eq!(data_point1.value, 5);
2235
2236 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
2237 .expect("datapoint with key1=value2 expected");
2238 assert_eq!(data_point1.value, 3);
2239
2240 test_context.reset_metrics();
2242 counter.add(1, &[KeyValue::new("key1", "value1")]);
2243 counter.add(1, &[KeyValue::new("key1", "value1")]);
2244 counter.add(1, &[KeyValue::new("key1", "value1")]);
2245 counter.add(1, &[KeyValue::new("key1", "value1")]);
2246 counter.add(1, &[KeyValue::new("key1", "value1")]);
2247
2248 counter.add(1, &[KeyValue::new("key1", "value2")]);
2249 counter.add(1, &[KeyValue::new("key1", "value2")]);
2250 counter.add(1, &[KeyValue::new("key1", "value2")]);
2251
2252 test_context.flush_metrics();
2253
2254 let sum = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
2255 assert_eq!(sum.data_points.len(), 2);
2256 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
2257 .expect("datapoint with key1=value1 expected");
2258 if temporality == Temporality::Cumulative {
2259 assert_eq!(data_point1.value, 10);
2260 } else {
2261 assert_eq!(data_point1.value, 5);
2262 }
2263
2264 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
2265 .expect("datapoint with key1=value2 expected");
2266 if temporality == Temporality::Cumulative {
2267 assert_eq!(data_point1.value, 6);
2268 } else {
2269 assert_eq!(data_point1.value, 3);
2270 }
2271 }
2272
2273 fn counter_aggregation_overflow_helper(temporality: Temporality) {
2274 let mut test_context = TestContext::new(temporality);
2276 let counter = test_context.u64_counter("test", "my_counter", None);
2277
2278 for v in 0..2000 {
2281 counter.add(100, &[KeyValue::new("A", v.to_string())]);
2282 }
2283
2284 counter.add(3, &[]);
2286 counter.add(3, &[]);
2287
2288 counter.add(100, &[KeyValue::new("A", "foo")]);
2290 counter.add(100, &[KeyValue::new("A", "another")]);
2291 counter.add(100, &[KeyValue::new("A", "yet_another")]);
2292 test_context.flush_metrics();
2293
2294 let sum = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
2295
2296 assert_eq!(sum.data_points.len(), 2002);
2298
2299 let data_point =
2300 find_sum_datapoint_with_key_value(&sum.data_points, "otel.metric.overflow", "true")
2301 .expect("overflow point expected");
2302 assert_eq!(data_point.value, 300);
2303
2304 let empty_attrs_data_point = find_sum_datapoint_with_no_attributes(&sum.data_points)
2306 .expect("Empty attributes point expected");
2307 assert!(
2308 empty_attrs_data_point.attributes.is_empty(),
2309 "Non-empty attribute set"
2310 );
2311 assert_eq!(
2312 empty_attrs_data_point.value, 6,
2313 "Empty attributes value should be 3+3=6"
2314 );
2315 }
2316
2317 fn counter_aggregation_attribute_order_helper(temporality: Temporality, start_sorted: bool) {
2318 let mut test_context = TestContext::new(temporality);
2320 let counter = test_context.u64_counter("test", "my_counter", None);
2321
2322 if start_sorted {
2327 counter.add(
2328 1,
2329 &[
2330 KeyValue::new("A", "a"),
2331 KeyValue::new("B", "b"),
2332 KeyValue::new("C", "c"),
2333 ],
2334 );
2335 } else {
2336 counter.add(
2337 1,
2338 &[
2339 KeyValue::new("A", "a"),
2340 KeyValue::new("C", "c"),
2341 KeyValue::new("B", "b"),
2342 ],
2343 );
2344 }
2345
2346 counter.add(
2347 1,
2348 &[
2349 KeyValue::new("A", "a"),
2350 KeyValue::new("C", "c"),
2351 KeyValue::new("B", "b"),
2352 ],
2353 );
2354 counter.add(
2355 1,
2356 &[
2357 KeyValue::new("B", "b"),
2358 KeyValue::new("A", "a"),
2359 KeyValue::new("C", "c"),
2360 ],
2361 );
2362 counter.add(
2363 1,
2364 &[
2365 KeyValue::new("B", "b"),
2366 KeyValue::new("C", "c"),
2367 KeyValue::new("A", "a"),
2368 ],
2369 );
2370 counter.add(
2371 1,
2372 &[
2373 KeyValue::new("C", "c"),
2374 KeyValue::new("B", "b"),
2375 KeyValue::new("A", "a"),
2376 ],
2377 );
2378 counter.add(
2379 1,
2380 &[
2381 KeyValue::new("C", "c"),
2382 KeyValue::new("A", "a"),
2383 KeyValue::new("B", "b"),
2384 ],
2385 );
2386 test_context.flush_metrics();
2387
2388 let sum = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
2389
2390 assert_eq!(sum.data_points.len(), 1);
2392
2393 let data_point1 = &sum.data_points[0];
2395 assert_eq!(data_point1.value, 6);
2396 }
2397
2398 fn updown_counter_aggregation_helper(temporality: Temporality) {
2399 let mut test_context = TestContext::new(temporality);
2401 let counter = test_context.i64_up_down_counter("test", "my_updown_counter", None);
2402
2403 counter.add(10, &[KeyValue::new("key1", "value1")]);
2405 counter.add(-1, &[KeyValue::new("key1", "value1")]);
2406 counter.add(-5, &[KeyValue::new("key1", "value1")]);
2407 counter.add(0, &[KeyValue::new("key1", "value1")]);
2408 counter.add(1, &[KeyValue::new("key1", "value1")]);
2409
2410 counter.add(10, &[KeyValue::new("key1", "value2")]);
2411 counter.add(0, &[KeyValue::new("key1", "value2")]);
2412 counter.add(-3, &[KeyValue::new("key1", "value2")]);
2413
2414 test_context.flush_metrics();
2415
2416 let sum = test_context.get_aggregation::<Sum<i64>>("my_updown_counter", None);
2418 assert_eq!(sum.data_points.len(), 2);
2420 assert!(
2421 !sum.is_monotonic,
2422 "UpDownCounter should produce non-monotonic."
2423 );
2424 assert_eq!(
2425 sum.temporality,
2426 Temporality::Cumulative,
2427 "Should produce Cumulative for UpDownCounter"
2428 );
2429
2430 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
2432 .expect("datapoint with key1=value1 expected");
2433 assert_eq!(data_point1.value, 5);
2434
2435 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
2436 .expect("datapoint with key1=value2 expected");
2437 assert_eq!(data_point1.value, 7);
2438
2439 test_context.reset_metrics();
2441 counter.add(10, &[KeyValue::new("key1", "value1")]);
2442 counter.add(-1, &[KeyValue::new("key1", "value1")]);
2443 counter.add(-5, &[KeyValue::new("key1", "value1")]);
2444 counter.add(0, &[KeyValue::new("key1", "value1")]);
2445 counter.add(1, &[KeyValue::new("key1", "value1")]);
2446
2447 counter.add(10, &[KeyValue::new("key1", "value2")]);
2448 counter.add(0, &[KeyValue::new("key1", "value2")]);
2449 counter.add(-3, &[KeyValue::new("key1", "value2")]);
2450
2451 test_context.flush_metrics();
2452
2453 let sum = test_context.get_aggregation::<Sum<i64>>("my_updown_counter", None);
2454 assert_eq!(sum.data_points.len(), 2);
2455 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
2456 .expect("datapoint with key1=value1 expected");
2457 assert_eq!(data_point1.value, 10);
2458
2459 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
2460 .expect("datapoint with key1=value2 expected");
2461 assert_eq!(data_point1.value, 14);
2462 }
2463
2464 fn find_sum_datapoint_with_key_value<'a, T>(
2465 data_points: &'a [SumDataPoint<T>],
2466 key: &str,
2467 value: &str,
2468 ) -> Option<&'a SumDataPoint<T>> {
2469 data_points.iter().find(|&datapoint| {
2470 datapoint
2471 .attributes
2472 .iter()
2473 .any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
2474 })
2475 }
2476
2477 fn find_gauge_datapoint_with_key_value<'a, T>(
2478 data_points: &'a [GaugeDataPoint<T>],
2479 key: &str,
2480 value: &str,
2481 ) -> Option<&'a GaugeDataPoint<T>> {
2482 data_points.iter().find(|&datapoint| {
2483 datapoint
2484 .attributes
2485 .iter()
2486 .any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
2487 })
2488 }
2489
2490 fn find_sum_datapoint_with_no_attributes<T>(
2491 data_points: &[SumDataPoint<T>],
2492 ) -> Option<&SumDataPoint<T>> {
2493 data_points
2494 .iter()
2495 .find(|&datapoint| datapoint.attributes.is_empty())
2496 }
2497
2498 fn find_gauge_datapoint_with_no_attributes<T>(
2499 data_points: &[GaugeDataPoint<T>],
2500 ) -> Option<&GaugeDataPoint<T>> {
2501 data_points
2502 .iter()
2503 .find(|&datapoint| datapoint.attributes.is_empty())
2504 }
2505
2506 fn find_histogram_datapoint_with_key_value<'a, T>(
2507 data_points: &'a [HistogramDataPoint<T>],
2508 key: &str,
2509 value: &str,
2510 ) -> Option<&'a HistogramDataPoint<T>> {
2511 data_points.iter().find(|&datapoint| {
2512 datapoint
2513 .attributes
2514 .iter()
2515 .any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
2516 })
2517 }
2518
2519 fn find_histogram_datapoint_with_no_attributes<T>(
2520 data_points: &[HistogramDataPoint<T>],
2521 ) -> Option<&HistogramDataPoint<T>> {
2522 data_points
2523 .iter()
2524 .find(|&datapoint| datapoint.attributes.is_empty())
2525 }
2526
2527 fn find_scope_metric<'a>(
2528 metrics: &'a [ScopeMetrics],
2529 name: &'a str,
2530 ) -> Option<&'a ScopeMetrics> {
2531 metrics
2532 .iter()
2533 .find(|&scope_metric| scope_metric.scope.name() == name)
2534 }
2535
2536 struct TestContext {
2537 exporter: InMemoryMetricExporter,
2538 meter_provider: SdkMeterProvider,
2539
2540 resource_metrics: Vec<ResourceMetrics>,
2542 }
2543
2544 impl TestContext {
2545 fn new(temporality: Temporality) -> Self {
2546 let exporter = InMemoryMetricExporterBuilder::new().with_temporality(temporality);
2547 let exporter = exporter.build();
2548 let meter_provider = SdkMeterProvider::builder()
2549 .with_periodic_exporter(exporter.clone())
2550 .build();
2551
2552 TestContext {
2553 exporter,
2554 meter_provider,
2555 resource_metrics: vec![],
2556 }
2557 }
2558
2559 fn u64_counter(
2560 &self,
2561 meter_name: &'static str,
2562 counter_name: &'static str,
2563 unit: Option<&'static str>,
2564 ) -> Counter<u64> {
2565 let meter = self.meter_provider.meter(meter_name);
2566 let mut counter_builder = meter.u64_counter(counter_name);
2567 if let Some(unit_name) = unit {
2568 counter_builder = counter_builder.with_unit(unit_name);
2569 }
2570 counter_builder.build()
2571 }
2572
2573 fn i64_up_down_counter(
2574 &self,
2575 meter_name: &'static str,
2576 counter_name: &'static str,
2577 unit: Option<&'static str>,
2578 ) -> UpDownCounter<i64> {
2579 let meter = self.meter_provider.meter(meter_name);
2580 let mut updown_counter_builder = meter.i64_up_down_counter(counter_name);
2581 if let Some(unit_name) = unit {
2582 updown_counter_builder = updown_counter_builder.with_unit(unit_name);
2583 }
2584 updown_counter_builder.build()
2585 }
2586
2587 fn meter(&self) -> Meter {
2588 self.meter_provider.meter("test")
2589 }
2590
2591 fn flush_metrics(&self) {
2592 self.meter_provider.force_flush().unwrap();
2593 }
2594
2595 fn reset_metrics(&self) {
2596 self.exporter.reset();
2597 }
2598
2599 fn check_no_metrics(&self) {
2600 let resource_metrics = self
2601 .exporter
2602 .get_finished_metrics()
2603 .expect("metrics expected to be exported"); assert!(resource_metrics.is_empty(), "no metrics should be exported");
2606 }
2607
2608 fn get_aggregation<T: Aggregation>(
2609 &mut self,
2610 counter_name: &str,
2611 unit_name: Option<&str>,
2612 ) -> &T {
2613 self.resource_metrics = self
2614 .exporter
2615 .get_finished_metrics()
2616 .expect("metrics expected to be exported");
2617
2618 assert!(
2619 !self.resource_metrics.is_empty(),
2620 "no metrics were exported"
2621 );
2622
2623 assert!(
2624 self.resource_metrics.len() == 1,
2625 "Expected single resource metrics."
2626 );
2627 let resource_metric = self
2628 .resource_metrics
2629 .first()
2630 .expect("This should contain exactly one resource metric, as validated above.");
2631
2632 assert!(
2633 !resource_metric.scope_metrics.is_empty(),
2634 "No scope metrics in latest export"
2635 );
2636 assert!(!resource_metric.scope_metrics[0].metrics.is_empty());
2637
2638 let metric = &resource_metric.scope_metrics[0].metrics[0];
2639 assert_eq!(metric.name, counter_name);
2640 if let Some(expected_unit) = unit_name {
2641 assert_eq!(metric.unit, expected_unit);
2642 }
2643
2644 metric
2645 .data
2646 .as_any()
2647 .downcast_ref::<T>()
2648 .expect("Failed to cast aggregation to expected type")
2649 }
2650
2651 fn get_from_multiple_aggregations<T: Aggregation>(
2652 &mut self,
2653 counter_name: &str,
2654 unit_name: Option<&str>,
2655 invocation_count: usize,
2656 ) -> Vec<&T> {
2657 self.resource_metrics = self
2658 .exporter
2659 .get_finished_metrics()
2660 .expect("metrics expected to be exported");
2661
2662 assert!(
2663 !self.resource_metrics.is_empty(),
2664 "no metrics were exported"
2665 );
2666
2667 assert_eq!(
2668 self.resource_metrics.len(),
2669 invocation_count,
2670 "Expected collect to be called {} times",
2671 invocation_count
2672 );
2673
2674 let result = self
2675 .resource_metrics
2676 .iter()
2677 .map(|resource_metric| {
2678 assert!(
2679 !resource_metric.scope_metrics.is_empty(),
2680 "An export with no scope metrics occurred"
2681 );
2682
2683 assert!(!resource_metric.scope_metrics[0].metrics.is_empty());
2684
2685 let metric = &resource_metric.scope_metrics[0].metrics[0];
2686 assert_eq!(metric.name, counter_name);
2687
2688 if let Some(expected_unit) = unit_name {
2689 assert_eq!(metric.unit, expected_unit);
2690 }
2691
2692 let aggregation = metric
2693 .data
2694 .as_any()
2695 .downcast_ref::<T>()
2696 .expect("Failed to cast aggregation to expected type");
2697 aggregation
2698 })
2699 .collect::<Vec<_>>();
2700
2701 result
2702 }
2703 }
2704}