opentelemetry_sdk/metrics/
mod.rs

1//! The crust of the OpenTelemetry metrics SDK.
2//!
3//! ## Configuration
4//!
5//! The metrics SDK configuration is stored with each [SdkMeterProvider].
6//! Configuration for [Resource]s, [View]s, and [ManualReader] or
7//! [PeriodicReader] instances can be specified.
8//!
9//! ### Example
10//!
11//! ```
12//! use opentelemetry::global;
13//! use opentelemetry::KeyValue;
14//! use opentelemetry_sdk::{metrics::SdkMeterProvider, Resource};
15//!
16//! // Generate SDK configuration, resource, views, etc
17//! let resource = Resource::builder().build(); // default attributes about the current process
18//!
19//! // Create a meter provider with the desired config
20//! let meter_provider = SdkMeterProvider::builder().with_resource(resource).build();
21//! global::set_meter_provider(meter_provider.clone());
22//!
23//! // Use the meter provider to create meter instances
24//! let meter = global::meter("my_app");
25//!
26//! // Create instruments scoped to the meter
27//! let counter = meter
28//!     .u64_counter("power_consumption")
29//!     .with_unit("kWh")
30//!     .build();
31//!
32//! // use instruments to record measurements
33//! counter.add(10, &[KeyValue::new("rate", "standard")]);
34//!
35//! // shutdown the provider at the end of the application to ensure any metrics not yet
36//! // exported are flushed.
37//! meter_provider.shutdown().unwrap();
38//! ```
39//!
40//! [Resource]: crate::Resource
41
42pub(crate) mod aggregation;
43pub mod data;
44mod error;
45pub mod exporter;
46pub(crate) mod instrument;
47pub(crate) mod internal;
48pub(crate) mod manual_reader;
49pub(crate) mod meter;
50mod meter_provider;
51pub(crate) mod noop;
52pub(crate) mod periodic_reader;
53#[cfg(feature = "experimental_metrics_periodicreader_with_async_runtime")]
54/// Module for periodic reader with async runtime.
55pub mod periodic_reader_with_async_runtime;
56pub(crate) mod pipeline;
57pub mod reader;
58pub(crate) mod view;
59
60/// In-Memory metric exporter for testing purpose.
61#[cfg(any(feature = "testing", test))]
62#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
63pub mod in_memory_exporter;
64#[cfg(any(feature = "testing", test))]
65#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
66pub use in_memory_exporter::{InMemoryMetricExporter, InMemoryMetricExporterBuilder};
67
68pub use aggregation::*;
69pub use error::{MetricError, MetricResult};
70pub use manual_reader::*;
71pub use meter_provider::*;
72pub use periodic_reader::*;
73pub use pipeline::Pipeline;
74
75pub use instrument::InstrumentKind;
76
77#[cfg(feature = "spec_unstable_metrics_views")]
78pub use instrument::*;
79// #[cfg(not(feature = "spec_unstable_metrics_views"))]
80// pub(crate) use instrument::*;
81
82#[cfg(feature = "spec_unstable_metrics_views")]
83pub use view::*;
84// #[cfg(not(feature = "spec_unstable_metrics_views"))]
85// pub(crate) use view::*;
86
87use std::hash::Hash;
88
89/// Defines the window that an aggregation was calculated over.
90#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, Hash)]
91#[non_exhaustive]
92pub enum Temporality {
93    /// A measurement interval that continues to expand forward in time from a
94    /// starting point.
95    ///
96    /// New measurements are added to all previous measurements since a start time.
97    #[default]
98    Cumulative,
99
100    /// A measurement interval that resets each cycle.
101    ///
102    /// Measurements from one cycle are recorded independently, measurements from
103    /// other cycles do not affect them.
104    Delta,
105
106    /// Configures Synchronous Counter and Histogram instruments to use
107    /// Delta aggregation temporality, which allows them to shed memory
108    /// following a cardinality explosion, thus use less memory.
109    LowMemory,
110}
111
112#[cfg(all(test, feature = "testing"))]
113mod tests {
114    use self::data::{HistogramDataPoint, ScopeMetrics, SumDataPoint};
115    use super::*;
116    use crate::metrics::data::Aggregation;
117    use crate::metrics::data::ResourceMetrics;
118    use crate::metrics::InMemoryMetricExporter;
119    use crate::metrics::InMemoryMetricExporterBuilder;
120    use data::Gauge;
121    use data::GaugeDataPoint;
122    use data::Histogram;
123    use data::Sum;
124    use opentelemetry::metrics::{Counter, Meter, UpDownCounter};
125    use opentelemetry::InstrumentationScope;
126    use opentelemetry::{metrics::MeterProvider as _, KeyValue};
127    use rand::{rngs, Rng, SeedableRng};
128    use std::cmp::{max, min};
129    use std::sync::atomic::{AtomicBool, Ordering};
130    use std::sync::{Arc, Mutex};
131    use std::thread;
132    use std::time::Duration;
133
134    // Run all tests in this mod
135    // cargo test metrics::tests --features=testing,spec_unstable_metrics_views
136    // Note for all tests from this point onwards in this mod:
137    // "multi_thread" tokio flavor must be used else flush won't
138    // be able to make progress!
139
140    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
141    #[cfg(not(feature = "experimental_metrics_disable_name_validation"))]
142    async fn invalid_instrument_config_noops() {
143        // Run this test with stdout enabled to see output.
144        // cargo test invalid_instrument_config_noops --features=testing,spec_unstable_metrics_views -- --nocapture
145        let invalid_instrument_names = vec![
146            "_startWithNoneAlphabet",
147            "utf8char锈",
148            "a".repeat(256).leak(),
149            "invalid name",
150        ];
151        for name in invalid_instrument_names {
152            let test_context = TestContext::new(Temporality::Cumulative);
153            let counter = test_context.meter().u64_counter(name).build();
154            counter.add(1, &[]);
155
156            let up_down_counter = test_context.meter().i64_up_down_counter(name).build();
157            up_down_counter.add(1, &[]);
158
159            let gauge = test_context.meter().f64_gauge(name).build();
160            gauge.record(1.9, &[]);
161
162            let histogram = test_context.meter().f64_histogram(name).build();
163            histogram.record(1.0, &[]);
164
165            let _observable_counter = test_context
166                .meter()
167                .u64_observable_counter(name)
168                .with_callback(move |observer| {
169                    observer.observe(1, &[]);
170                })
171                .build();
172
173            let _observable_gauge = test_context
174                .meter()
175                .f64_observable_gauge(name)
176                .with_callback(move |observer| {
177                    observer.observe(1.0, &[]);
178                })
179                .build();
180
181            let _observable_up_down_counter = test_context
182                .meter()
183                .i64_observable_up_down_counter(name)
184                .with_callback(move |observer| {
185                    observer.observe(1, &[]);
186                })
187                .build();
188
189            test_context.flush_metrics();
190
191            // As instrument name is invalid, no metrics should be exported
192            test_context.check_no_metrics();
193        }
194
195        let invalid_bucket_boundaries = vec![
196            vec![1.0, 1.0],                          // duplicate boundaries
197            vec![1.0, 2.0, 3.0, 2.0],                // duplicate non consequent boundaries
198            vec![1.0, 2.0, 3.0, 4.0, 2.5],           // unsorted boundaries
199            vec![1.0, 2.0, 3.0, f64::INFINITY, 4.0], // boundaries with positive infinity
200            vec![1.0, 2.0, 3.0, f64::NAN],           // boundaries with NaNs
201            vec![f64::NEG_INFINITY, 2.0, 3.0],       // boundaries with negative infinity
202        ];
203        for bucket_boundaries in invalid_bucket_boundaries {
204            let test_context = TestContext::new(Temporality::Cumulative);
205            let histogram = test_context
206                .meter()
207                .f64_histogram("test")
208                .with_boundaries(bucket_boundaries)
209                .build();
210            histogram.record(1.9, &[]);
211            test_context.flush_metrics();
212
213            // As bucket boundaries provided via advisory params are invalid,
214            // no metrics should be exported
215            test_context.check_no_metrics();
216        }
217    }
218
219    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
220    #[cfg(feature = "experimental_metrics_disable_name_validation")]
221    async fn valid_instrument_config_with_feature_experimental_metrics_disable_name_validation() {
222        // Run this test with stdout enabled to see output.
223        // cargo test valid_instrument_config_with_feature_experimental_metrics_disable_name_validation --all-features -- --nocapture
224        let invalid_instrument_names = vec![
225            "_startWithNoneAlphabet",
226            "utf8char锈",
227            "",
228            "a".repeat(256).leak(),
229            "\\allow\\slash /sec",
230            "\\allow\\$$slash /sec",
231            "Total $ Count",
232            "\\test\\UsagePercent(Total) > 80%",
233            "invalid name",
234        ];
235        for name in invalid_instrument_names {
236            let test_context = TestContext::new(Temporality::Cumulative);
237            let counter = test_context.meter().u64_counter(name).build();
238            counter.add(1, &[]);
239
240            let up_down_counter = test_context.meter().i64_up_down_counter(name).build();
241            up_down_counter.add(1, &[]);
242
243            let gauge = test_context.meter().f64_gauge(name).build();
244            gauge.record(1.9, &[]);
245
246            let histogram = test_context.meter().f64_histogram(name).build();
247            histogram.record(1.0, &[]);
248
249            let _observable_counter = test_context
250                .meter()
251                .u64_observable_counter(name)
252                .with_callback(move |observer| {
253                    observer.observe(1, &[]);
254                })
255                .build();
256
257            let _observable_gauge = test_context
258                .meter()
259                .f64_observable_gauge(name)
260                .with_callback(move |observer| {
261                    observer.observe(1.0, &[]);
262                })
263                .build();
264
265            let _observable_up_down_counter = test_context
266                .meter()
267                .i64_observable_up_down_counter(name)
268                .with_callback(move |observer| {
269                    observer.observe(1, &[]);
270                })
271                .build();
272
273            test_context.flush_metrics();
274
275            // As instrument name are valid because of the feature flag, metrics should be exported
276            let resource_metrics = test_context
277                .exporter
278                .get_finished_metrics()
279                .expect("metrics expected to be exported");
280
281            assert!(!resource_metrics.is_empty(), "metrics should be exported");
282        }
283
284        // Ensuring that the Histograms with invalid bucket boundaries are not exported
285        // when using the feature flag
286        let invalid_bucket_boundaries = vec![
287            vec![1.0, 1.0],                          // duplicate boundaries
288            vec![1.0, 2.0, 3.0, 2.0],                // duplicate non consequent boundaries
289            vec![1.0, 2.0, 3.0, 4.0, 2.5],           // unsorted boundaries
290            vec![1.0, 2.0, 3.0, f64::INFINITY, 4.0], // boundaries with positive infinity
291            vec![1.0, 2.0, 3.0, f64::NAN],           // boundaries with NaNs
292            vec![f64::NEG_INFINITY, 2.0, 3.0],       // boundaries with negative infinity
293        ];
294        for bucket_boundaries in invalid_bucket_boundaries {
295            let test_context = TestContext::new(Temporality::Cumulative);
296            let histogram = test_context
297                .meter()
298                .f64_histogram("test")
299                .with_boundaries(bucket_boundaries)
300                .build();
301            histogram.record(1.9, &[]);
302            test_context.flush_metrics();
303
304            // As bucket boundaries provided via advisory params are invalid,
305            // no metrics should be exported
306            test_context.check_no_metrics();
307        }
308    }
309
310    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
311    async fn counter_aggregation_delta() {
312        // Run this test with stdout enabled to see output.
313        // cargo test counter_aggregation_delta --features=testing -- --nocapture
314        counter_aggregation_helper(Temporality::Delta);
315    }
316
317    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
318    async fn counter_aggregation_cumulative() {
319        // Run this test with stdout enabled to see output.
320        // cargo test counter_aggregation_cumulative --features=testing -- --nocapture
321        counter_aggregation_helper(Temporality::Cumulative);
322    }
323
324    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
325    async fn counter_aggregation_no_attributes_cumulative() {
326        let mut test_context = TestContext::new(Temporality::Cumulative);
327        let counter = test_context.u64_counter("test", "my_counter", None);
328
329        counter.add(50, &[]);
330        test_context.flush_metrics();
331
332        let sum = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
333
334        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
335        assert!(sum.is_monotonic, "Should produce monotonic.");
336        assert_eq!(
337            sum.temporality,
338            Temporality::Cumulative,
339            "Should produce cumulative"
340        );
341
342        let data_point = &sum.data_points[0];
343        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
344        assert_eq!(data_point.value, 50, "Unexpected data point value");
345    }
346
347    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
348    async fn counter_aggregation_no_attributes_delta() {
349        let mut test_context = TestContext::new(Temporality::Delta);
350        let counter = test_context.u64_counter("test", "my_counter", None);
351
352        counter.add(50, &[]);
353        test_context.flush_metrics();
354
355        let sum = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
356
357        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
358        assert!(sum.is_monotonic, "Should produce monotonic.");
359        assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
360
361        let data_point = &sum.data_points[0];
362        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
363        assert_eq!(data_point.value, 50, "Unexpected data point value");
364    }
365
366    #[ignore = "https://github.com/open-telemetry/opentelemetry-rust/issues/1065"]
367    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
368    async fn counter_aggregation_overflow_delta() {
369        counter_aggregation_overflow_helper(Temporality::Delta);
370    }
371
372    #[ignore = "https://github.com/open-telemetry/opentelemetry-rust/issues/1065"]
373    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
374    async fn counter_aggregation_overflow_cumulative() {
375        counter_aggregation_overflow_helper(Temporality::Cumulative);
376    }
377
378    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
379    async fn counter_aggregation_attribute_order_sorted_first_delta() {
380        // Run this test with stdout enabled to see output.
381        // cargo test counter_aggregation_attribute_order_sorted_first_delta --features=testing -- --nocapture
382        counter_aggregation_attribute_order_helper(Temporality::Delta, true);
383    }
384
385    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
386    async fn counter_aggregation_attribute_order_sorted_first_cumulative() {
387        // Run this test with stdout enabled to see output.
388        // cargo test counter_aggregation_attribute_order_sorted_first_cumulative --features=testing -- --nocapture
389        counter_aggregation_attribute_order_helper(Temporality::Cumulative, true);
390    }
391
392    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
393    async fn counter_aggregation_attribute_order_unsorted_first_delta() {
394        // Run this test with stdout enabled to see output.
395        // cargo test counter_aggregation_attribute_order_unsorted_first_delta --features=testing -- --nocapture
396
397        counter_aggregation_attribute_order_helper(Temporality::Delta, false);
398    }
399
400    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
401    async fn counter_aggregation_attribute_order_unsorted_first_cumulative() {
402        // Run this test with stdout enabled to see output.
403        // cargo test counter_aggregation_attribute_order_unsorted_first_cumulative --features=testing -- --nocapture
404
405        counter_aggregation_attribute_order_helper(Temporality::Cumulative, false);
406    }
407
408    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
409    async fn histogram_aggregation_cumulative() {
410        // Run this test with stdout enabled to see output.
411        // cargo test histogram_aggregation_cumulative --features=testing -- --nocapture
412        histogram_aggregation_helper(Temporality::Cumulative);
413    }
414
415    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
416    async fn histogram_aggregation_delta() {
417        // Run this test with stdout enabled to see output.
418        // cargo test histogram_aggregation_delta --features=testing -- --nocapture
419        histogram_aggregation_helper(Temporality::Delta);
420    }
421
422    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
423    async fn histogram_aggregation_with_custom_bounds() {
424        // Run this test with stdout enabled to see output.
425        // cargo test histogram_aggregation_with_custom_bounds --features=testing -- --nocapture
426        histogram_aggregation_with_custom_bounds_helper(Temporality::Delta);
427        histogram_aggregation_with_custom_bounds_helper(Temporality::Cumulative);
428    }
429
430    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
431    async fn updown_counter_aggregation_cumulative() {
432        // Run this test with stdout enabled to see output.
433        // cargo test updown_counter_aggregation_cumulative --features=testing -- --nocapture
434        updown_counter_aggregation_helper(Temporality::Cumulative);
435    }
436
437    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
438    async fn updown_counter_aggregation_delta() {
439        // Run this test with stdout enabled to see output.
440        // cargo test updown_counter_aggregation_delta --features=testing -- --nocapture
441        updown_counter_aggregation_helper(Temporality::Delta);
442    }
443
444    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
445    async fn gauge_aggregation() {
446        // Run this test with stdout enabled to see output.
447        // cargo test gauge_aggregation --features=testing -- --nocapture
448
449        // Gauge should use last value aggregation regardless of the aggregation temporality used.
450        gauge_aggregation_helper(Temporality::Delta);
451        gauge_aggregation_helper(Temporality::Cumulative);
452    }
453
454    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
455    async fn observable_gauge_aggregation() {
456        // Run this test with stdout enabled to see output.
457        // cargo test observable_gauge_aggregation --features=testing -- --nocapture
458
459        // Gauge should use last value aggregation regardless of the aggregation temporality used.
460        observable_gauge_aggregation_helper(Temporality::Delta, false);
461        observable_gauge_aggregation_helper(Temporality::Delta, true);
462        observable_gauge_aggregation_helper(Temporality::Cumulative, false);
463        observable_gauge_aggregation_helper(Temporality::Cumulative, true);
464    }
465
466    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
467    async fn observable_counter_aggregation_cumulative_non_zero_increment() {
468        // Run this test with stdout enabled to see output.
469        // cargo test observable_counter_aggregation_cumulative_non_zero_increment --features=testing -- --nocapture
470        observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4, false);
471    }
472
473    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
474    async fn observable_counter_aggregation_cumulative_non_zero_increment_no_attrs() {
475        // Run this test with stdout enabled to see output.
476        // cargo test observable_counter_aggregation_cumulative_non_zero_increment_no_attrs --features=testing -- --nocapture
477        observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4, true);
478    }
479
480    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
481    async fn observable_counter_aggregation_delta_non_zero_increment() {
482        // Run this test with stdout enabled to see output.
483        // cargo test observable_counter_aggregation_delta_non_zero_increment --features=testing -- --nocapture
484        observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4, false);
485    }
486
487    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
488    async fn observable_counter_aggregation_delta_non_zero_increment_no_attrs() {
489        // Run this test with stdout enabled to see output.
490        // cargo test observable_counter_aggregation_delta_non_zero_increment_no_attrs --features=testing -- --nocapture
491        observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4, true);
492    }
493
494    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
495    async fn observable_counter_aggregation_cumulative_zero_increment() {
496        // Run this test with stdout enabled to see output.
497        // cargo test observable_counter_aggregation_cumulative_zero_increment --features=testing -- --nocapture
498        observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4, false);
499    }
500
501    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
502    async fn observable_counter_aggregation_cumulative_zero_increment_no_attrs() {
503        // Run this test with stdout enabled to see output.
504        // cargo test observable_counter_aggregation_cumulative_zero_increment_no_attrs --features=testing -- --nocapture
505        observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4, true);
506    }
507
508    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
509    async fn observable_counter_aggregation_delta_zero_increment() {
510        // Run this test with stdout enabled to see output.
511        // cargo test observable_counter_aggregation_delta_zero_increment --features=testing -- --nocapture
512        observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4, false);
513    }
514
515    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
516    async fn observable_counter_aggregation_delta_zero_increment_no_attrs() {
517        // Run this test with stdout enabled to see output.
518        // cargo test observable_counter_aggregation_delta_zero_increment_no_attrs --features=testing -- --nocapture
519        observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4, true);
520    }
521
522    fn observable_counter_aggregation_helper(
523        temporality: Temporality,
524        start: u64,
525        increment: u64,
526        length: u64,
527        is_empty_attributes: bool,
528    ) {
529        // Arrange
530        let mut test_context = TestContext::new(temporality);
531        let attributes = if is_empty_attributes {
532            vec![]
533        } else {
534            vec![KeyValue::new("key1", "value1")]
535        };
536        // The Observable counter reports values[0], values[1],....values[n] on each flush.
537        let values: Vec<u64> = (0..length).map(|i| start + i * increment).collect();
538        println!("Testing with observable values: {:?}", values);
539        let values = Arc::new(values);
540        let values_clone = values.clone();
541        let i = Arc::new(Mutex::new(0));
542        let _observable_counter = test_context
543            .meter()
544            .u64_observable_counter("my_observable_counter")
545            .with_unit("my_unit")
546            .with_callback(move |observer| {
547                let mut index = i.lock().unwrap();
548                if *index < values.len() {
549                    observer.observe(values[*index], &attributes);
550                    *index += 1;
551                }
552            })
553            .build();
554
555        for (iter, v) in values_clone.iter().enumerate() {
556            test_context.flush_metrics();
557            let sum = test_context.get_aggregation::<Sum<u64>>("my_observable_counter", None);
558            assert_eq!(sum.data_points.len(), 1);
559            assert!(sum.is_monotonic, "Counter should produce monotonic.");
560            if let Temporality::Cumulative = temporality {
561                assert_eq!(
562                    sum.temporality,
563                    Temporality::Cumulative,
564                    "Should produce cumulative"
565                );
566            } else {
567                assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
568            }
569
570            // find and validate datapoint
571            let data_point = if is_empty_attributes {
572                &sum.data_points[0]
573            } else {
574                find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
575                    .expect("datapoint with key1=value1 expected")
576            };
577
578            if let Temporality::Cumulative = temporality {
579                // Cumulative counter should have the value as is.
580                assert_eq!(data_point.value, *v);
581            } else {
582                // Delta counter should have the increment value.
583                // Except for the first value which should be the start value.
584                if iter == 0 {
585                    assert_eq!(data_point.value, start);
586                } else {
587                    assert_eq!(data_point.value, increment);
588                }
589            }
590
591            test_context.reset_metrics();
592        }
593    }
594
595    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
596    async fn empty_meter_name_retained() {
597        async fn meter_name_retained_helper(
598            meter: Meter,
599            provider: SdkMeterProvider,
600            exporter: InMemoryMetricExporter,
601        ) {
602            // Act
603            let counter = meter.u64_counter("my_counter").build();
604
605            counter.add(10, &[]);
606            provider.force_flush().unwrap();
607
608            // Assert
609            let resource_metrics = exporter
610                .get_finished_metrics()
611                .expect("metrics are expected to be exported.");
612            assert!(
613                resource_metrics[0].scope_metrics[0].metrics.len() == 1,
614                "There should be a single metric"
615            );
616            let meter_name = resource_metrics[0].scope_metrics[0].scope.name();
617            assert_eq!(meter_name, "");
618        }
619
620        let exporter = InMemoryMetricExporter::default();
621        let meter_provider = SdkMeterProvider::builder()
622            .with_periodic_exporter(exporter.clone())
623            .build();
624
625        // Test Meter creation in 2 ways, both with empty string as meter name
626        let meter1 = meter_provider.meter("");
627        meter_name_retained_helper(meter1, meter_provider.clone(), exporter.clone()).await;
628
629        let meter_scope = InstrumentationScope::builder("").build();
630        let meter2 = meter_provider.meter_with_scope(meter_scope);
631        meter_name_retained_helper(meter2, meter_provider, exporter).await;
632    }
633
634    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
635    async fn counter_duplicate_instrument_merge() {
636        // Arrange
637        let exporter = InMemoryMetricExporter::default();
638        let meter_provider = SdkMeterProvider::builder()
639            .with_periodic_exporter(exporter.clone())
640            .build();
641
642        // Act
643        let meter = meter_provider.meter("test");
644        let counter = meter
645            .u64_counter("my_counter")
646            .with_unit("my_unit")
647            .with_description("my_description")
648            .build();
649
650        let counter_duplicated = meter
651            .u64_counter("my_counter")
652            .with_unit("my_unit")
653            .with_description("my_description")
654            .build();
655
656        let attribute = vec![KeyValue::new("key1", "value1")];
657        counter.add(10, &attribute);
658        counter_duplicated.add(5, &attribute);
659
660        meter_provider.force_flush().unwrap();
661
662        // Assert
663        let resource_metrics = exporter
664            .get_finished_metrics()
665            .expect("metrics are expected to be exported.");
666        assert!(
667            resource_metrics[0].scope_metrics[0].metrics.len() == 1,
668            "There should be single metric merging duplicate instruments"
669        );
670        let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
671        assert_eq!(metric.name, "my_counter");
672        assert_eq!(metric.unit, "my_unit");
673        let sum = metric
674            .data
675            .as_any()
676            .downcast_ref::<Sum<u64>>()
677            .expect("Sum aggregation expected for Counter instruments by default");
678
679        // Expecting 1 time-series.
680        assert_eq!(sum.data_points.len(), 1);
681
682        let datapoint = &sum.data_points[0];
683        assert_eq!(datapoint.value, 15);
684    }
685
686    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
687    async fn counter_duplicate_instrument_different_meter_no_merge() {
688        // Arrange
689        let exporter = InMemoryMetricExporter::default();
690        let meter_provider = SdkMeterProvider::builder()
691            .with_periodic_exporter(exporter.clone())
692            .build();
693
694        // Act
695        let meter1 = meter_provider.meter("test.meter1");
696        let meter2 = meter_provider.meter("test.meter2");
697        let counter1 = meter1
698            .u64_counter("my_counter")
699            .with_unit("my_unit")
700            .with_description("my_description")
701            .build();
702
703        let counter2 = meter2
704            .u64_counter("my_counter")
705            .with_unit("my_unit")
706            .with_description("my_description")
707            .build();
708
709        let attribute = vec![KeyValue::new("key1", "value1")];
710        counter1.add(10, &attribute);
711        counter2.add(5, &attribute);
712
713        meter_provider.force_flush().unwrap();
714
715        // Assert
716        let resource_metrics = exporter
717            .get_finished_metrics()
718            .expect("metrics are expected to be exported.");
719        assert!(
720            resource_metrics[0].scope_metrics.len() == 2,
721            "There should be 2 separate scope"
722        );
723        assert!(
724            resource_metrics[0].scope_metrics[0].metrics.len() == 1,
725            "There should be single metric for the scope"
726        );
727        assert!(
728            resource_metrics[0].scope_metrics[1].metrics.len() == 1,
729            "There should be single metric for the scope"
730        );
731
732        let scope1 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter1");
733        let scope2 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter2");
734
735        if let Some(scope1) = scope1 {
736            let metric1 = &scope1.metrics[0];
737            assert_eq!(metric1.name, "my_counter");
738            assert_eq!(metric1.unit, "my_unit");
739            assert_eq!(metric1.description, "my_description");
740            let sum1 = metric1
741                .data
742                .as_any()
743                .downcast_ref::<Sum<u64>>()
744                .expect("Sum aggregation expected for Counter instruments by default");
745
746            // Expecting 1 time-series.
747            assert_eq!(sum1.data_points.len(), 1);
748
749            let datapoint1 = &sum1.data_points[0];
750            assert_eq!(datapoint1.value, 10);
751        } else {
752            panic!("No MetricScope found for 'test.meter1'");
753        }
754
755        if let Some(scope2) = scope2 {
756            let metric2 = &scope2.metrics[0];
757            assert_eq!(metric2.name, "my_counter");
758            assert_eq!(metric2.unit, "my_unit");
759            assert_eq!(metric2.description, "my_description");
760            let sum2 = metric2
761                .data
762                .as_any()
763                .downcast_ref::<Sum<u64>>()
764                .expect("Sum aggregation expected for Counter instruments by default");
765
766            // Expecting 1 time-series.
767            assert_eq!(sum2.data_points.len(), 1);
768
769            let datapoint2 = &sum2.data_points[0];
770            assert_eq!(datapoint2.value, 5);
771        } else {
772            panic!("No MetricScope found for 'test.meter2'");
773        }
774    }
775
776    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
777    async fn instrumentation_scope_identity_test() {
778        // Arrange
779        let exporter = InMemoryMetricExporter::default();
780        let meter_provider = SdkMeterProvider::builder()
781            .with_periodic_exporter(exporter.clone())
782            .build();
783
784        // Act
785        // Meters are identical except for scope attributes, but scope attributes are not an identifying property.
786        // Hence there should be a single metric stream output for this test.
787        let make_scope = |attributes| {
788            InstrumentationScope::builder("test.meter")
789                .with_version("v0.1.0")
790                .with_schema_url("http://example.com")
791                .with_attributes(attributes)
792                .build()
793        };
794
795        let meter1 =
796            meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("key", "value1")]));
797        let meter2 =
798            meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("key", "value2")]));
799
800        let counter1 = meter1
801            .u64_counter("my_counter")
802            .with_unit("my_unit")
803            .with_description("my_description")
804            .build();
805
806        let counter2 = meter2
807            .u64_counter("my_counter")
808            .with_unit("my_unit")
809            .with_description("my_description")
810            .build();
811
812        let attribute = vec![KeyValue::new("key1", "value1")];
813        counter1.add(10, &attribute);
814        counter2.add(5, &attribute);
815
816        meter_provider.force_flush().unwrap();
817
818        // Assert
819        let resource_metrics = exporter
820            .get_finished_metrics()
821            .expect("metrics are expected to be exported.");
822        println!("resource_metrics: {:?}", resource_metrics);
823        assert!(
824            resource_metrics[0].scope_metrics.len() == 1,
825            "There should be a single scope as the meters are identical"
826        );
827        assert!(
828            resource_metrics[0].scope_metrics[0].metrics.len() == 1,
829            "There should be single metric for the scope as instruments are identical"
830        );
831
832        let scope = &resource_metrics[0].scope_metrics[0].scope;
833        assert_eq!(scope.name(), "test.meter");
834        assert_eq!(scope.version(), Some("v0.1.0"));
835        assert_eq!(scope.schema_url(), Some("http://example.com"));
836
837        // This is validating current behavior, but it is not guaranteed to be the case in the future,
838        // as this is a user error and SDK reserves right to change this behavior.
839        assert!(scope.attributes().eq(&[KeyValue::new("key", "value1")]));
840
841        let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
842        assert_eq!(metric.name, "my_counter");
843        assert_eq!(metric.unit, "my_unit");
844        assert_eq!(metric.description, "my_description");
845        let sum = metric
846            .data
847            .as_any()
848            .downcast_ref::<Sum<u64>>()
849            .expect("Sum aggregation expected for Counter instruments by default");
850
851        // Expecting 1 time-series.
852        assert_eq!(sum.data_points.len(), 1);
853
854        let datapoint = &sum.data_points[0];
855        assert_eq!(datapoint.value, 15);
856    }
857
858    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
859    async fn histogram_aggregation_with_invalid_aggregation_should_proceed_as_if_view_not_exist() {
860        // Run this test with stdout enabled to see output.
861        // cargo test histogram_aggregation_with_invalid_aggregation_should_proceed_as_if_view_not_exist --features=testing -- --nocapture
862
863        // Arrange
864        let exporter = InMemoryMetricExporter::default();
865        let criteria = Instrument::new().name("test_histogram");
866        let stream_invalid_aggregation = Stream::new()
867            .aggregation(aggregation::Aggregation::ExplicitBucketHistogram {
868                boundaries: vec![0.9, 1.9, 1.2, 1.3, 1.4, 1.5], // invalid boundaries
869                record_min_max: false,
870            })
871            .name("test_histogram_renamed")
872            .unit("test_unit_renamed");
873
874        let view =
875            new_view(criteria, stream_invalid_aggregation).expect("Expected to create a new view");
876        let meter_provider = SdkMeterProvider::builder()
877            .with_periodic_exporter(exporter.clone())
878            .with_view(view)
879            .build();
880
881        // Act
882        let meter = meter_provider.meter("test");
883        let histogram = meter
884            .f64_histogram("test_histogram")
885            .with_unit("test_unit")
886            .build();
887
888        histogram.record(1.5, &[KeyValue::new("key1", "value1")]);
889        meter_provider.force_flush().unwrap();
890
891        // Assert
892        let resource_metrics = exporter
893            .get_finished_metrics()
894            .expect("metrics are expected to be exported.");
895        assert!(!resource_metrics.is_empty());
896        let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
897        assert_eq!(
898            metric.name, "test_histogram",
899            "View rename should be ignored and original name retained."
900        );
901        assert_eq!(
902            metric.unit, "test_unit",
903            "View rename of unit should be ignored and original unit retained."
904        );
905    }
906
907    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
908    #[ignore = "Spatial aggregation is not yet implemented."]
909    async fn spatial_aggregation_when_view_drops_attributes_observable_counter() {
910        // cargo test metrics::tests::spatial_aggregation_when_view_drops_attributes_observable_counter --features=testing
911
912        // Arrange
913        let exporter = InMemoryMetricExporter::default();
914        let criteria = Instrument::new().name("my_observable_counter");
915        // View drops all attributes.
916        let stream_invalid_aggregation = Stream::new().allowed_attribute_keys(vec![]);
917
918        let view =
919            new_view(criteria, stream_invalid_aggregation).expect("Expected to create a new view");
920        let meter_provider = SdkMeterProvider::builder()
921            .with_periodic_exporter(exporter.clone())
922            .with_view(view)
923            .build();
924
925        // Act
926        let meter = meter_provider.meter("test");
927        let _observable_counter = meter
928            .u64_observable_counter("my_observable_counter")
929            .with_callback(|observer| {
930                observer.observe(
931                    100,
932                    &[
933                        KeyValue::new("statusCode", "200"),
934                        KeyValue::new("verb", "get"),
935                    ],
936                );
937
938                observer.observe(
939                    100,
940                    &[
941                        KeyValue::new("statusCode", "200"),
942                        KeyValue::new("verb", "post"),
943                    ],
944                );
945
946                observer.observe(
947                    100,
948                    &[
949                        KeyValue::new("statusCode", "500"),
950                        KeyValue::new("verb", "get"),
951                    ],
952                );
953            })
954            .build();
955
956        meter_provider.force_flush().unwrap();
957
958        // Assert
959        let resource_metrics = exporter
960            .get_finished_metrics()
961            .expect("metrics are expected to be exported.");
962        assert!(!resource_metrics.is_empty());
963        let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
964        assert_eq!(metric.name, "my_observable_counter",);
965
966        let sum = metric
967            .data
968            .as_any()
969            .downcast_ref::<Sum<u64>>()
970            .expect("Sum aggregation expected for ObservableCounter instruments by default");
971
972        // Expecting 1 time-series only, as the view drops all attributes resulting
973        // in a single time-series.
974        // This is failing today, due to lack of support for spatial aggregation.
975        assert_eq!(sum.data_points.len(), 1);
976
977        // find and validate the single datapoint
978        let data_point = &sum.data_points[0];
979        assert_eq!(data_point.value, 300);
980    }
981
982    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
983    async fn spatial_aggregation_when_view_drops_attributes_counter() {
984        // cargo test spatial_aggregation_when_view_drops_attributes_counter --features=testing
985
986        // Arrange
987        let exporter = InMemoryMetricExporter::default();
988        let criteria = Instrument::new().name("my_counter");
989        // View drops all attributes.
990        let stream_invalid_aggregation = Stream::new().allowed_attribute_keys(vec![]);
991
992        let view =
993            new_view(criteria, stream_invalid_aggregation).expect("Expected to create a new view");
994        let meter_provider = SdkMeterProvider::builder()
995            .with_periodic_exporter(exporter.clone())
996            .with_view(view)
997            .build();
998
999        // Act
1000        let meter = meter_provider.meter("test");
1001        let counter = meter.u64_counter("my_counter").build();
1002
1003        // Normally, this would generate 3 time-series, but since the view
1004        // drops all attributes, we expect only 1 time-series.
1005        counter.add(
1006            10,
1007            [
1008                KeyValue::new("statusCode", "200"),
1009                KeyValue::new("verb", "Get"),
1010            ]
1011            .as_ref(),
1012        );
1013
1014        counter.add(
1015            10,
1016            [
1017                KeyValue::new("statusCode", "500"),
1018                KeyValue::new("verb", "Get"),
1019            ]
1020            .as_ref(),
1021        );
1022
1023        counter.add(
1024            10,
1025            [
1026                KeyValue::new("statusCode", "200"),
1027                KeyValue::new("verb", "Post"),
1028            ]
1029            .as_ref(),
1030        );
1031
1032        meter_provider.force_flush().unwrap();
1033
1034        // Assert
1035        let resource_metrics = exporter
1036            .get_finished_metrics()
1037            .expect("metrics are expected to be exported.");
1038        assert!(!resource_metrics.is_empty());
1039        let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
1040        assert_eq!(metric.name, "my_counter",);
1041
1042        let sum = metric
1043            .data
1044            .as_any()
1045            .downcast_ref::<Sum<u64>>()
1046            .expect("Sum aggregation expected for Counter instruments by default");
1047
1048        // Expecting 1 time-series only, as the view drops all attributes resulting
1049        // in a single time-series.
1050        // This is failing today, due to lack of support for spatial aggregation.
1051        assert_eq!(sum.data_points.len(), 1);
1052        // find and validate the single datapoint
1053        let data_point = &sum.data_points[0];
1054        assert_eq!(data_point.value, 30);
1055    }
1056
1057    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1058    async fn no_attr_cumulative_up_down_counter() {
1059        let mut test_context = TestContext::new(Temporality::Cumulative);
1060        let counter = test_context.i64_up_down_counter("test", "my_counter", Some("my_unit"));
1061
1062        counter.add(50, &[]);
1063        test_context.flush_metrics();
1064
1065        let sum = test_context.get_aggregation::<Sum<i64>>("my_counter", Some("my_unit"));
1066
1067        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1068        assert!(!sum.is_monotonic, "Should not produce monotonic.");
1069        assert_eq!(
1070            sum.temporality,
1071            Temporality::Cumulative,
1072            "Should produce cumulative"
1073        );
1074
1075        let data_point = &sum.data_points[0];
1076        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1077        assert_eq!(data_point.value, 50, "Unexpected data point value");
1078    }
1079
1080    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1081    async fn no_attr_up_down_counter_always_cumulative() {
1082        let mut test_context = TestContext::new(Temporality::Delta);
1083        let counter = test_context.i64_up_down_counter("test", "my_counter", Some("my_unit"));
1084
1085        counter.add(50, &[]);
1086        test_context.flush_metrics();
1087
1088        let sum = test_context.get_aggregation::<Sum<i64>>("my_counter", Some("my_unit"));
1089
1090        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1091        assert!(!sum.is_monotonic, "Should not produce monotonic.");
1092        assert_eq!(
1093            sum.temporality,
1094            Temporality::Cumulative,
1095            "Should produce Cumulative due to UpDownCounter temporality_preference"
1096        );
1097
1098        let data_point = &sum.data_points[0];
1099        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1100        assert_eq!(data_point.value, 50, "Unexpected data point value");
1101    }
1102
1103    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1104    async fn no_attr_cumulative_counter_value_added_after_export() {
1105        let mut test_context = TestContext::new(Temporality::Cumulative);
1106        let counter = test_context.u64_counter("test", "my_counter", None);
1107
1108        counter.add(50, &[]);
1109        test_context.flush_metrics();
1110        let _ = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
1111        test_context.reset_metrics();
1112
1113        counter.add(5, &[]);
1114        test_context.flush_metrics();
1115        let sum = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
1116
1117        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1118        assert!(sum.is_monotonic, "Should produce monotonic.");
1119        assert_eq!(
1120            sum.temporality,
1121            Temporality::Cumulative,
1122            "Should produce cumulative"
1123        );
1124
1125        let data_point = &sum.data_points[0];
1126        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1127        assert_eq!(data_point.value, 55, "Unexpected data point value");
1128    }
1129
1130    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1131    async fn no_attr_delta_counter_value_reset_after_export() {
1132        let mut test_context = TestContext::new(Temporality::Delta);
1133        let counter = test_context.u64_counter("test", "my_counter", None);
1134
1135        counter.add(50, &[]);
1136        test_context.flush_metrics();
1137        let _ = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
1138        test_context.reset_metrics();
1139
1140        counter.add(5, &[]);
1141        test_context.flush_metrics();
1142        let sum = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
1143
1144        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1145        assert!(sum.is_monotonic, "Should produce monotonic.");
1146        assert_eq!(
1147            sum.temporality,
1148            Temporality::Delta,
1149            "Should produce cumulative"
1150        );
1151
1152        let data_point = &sum.data_points[0];
1153        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1154        assert_eq!(data_point.value, 5, "Unexpected data point value");
1155    }
1156
1157    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1158    async fn second_delta_export_does_not_give_no_attr_value_if_add_not_called() {
1159        let mut test_context = TestContext::new(Temporality::Delta);
1160        let counter = test_context.u64_counter("test", "my_counter", None);
1161
1162        counter.add(50, &[]);
1163        test_context.flush_metrics();
1164        let _ = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
1165        test_context.reset_metrics();
1166
1167        counter.add(50, &[KeyValue::new("a", "b")]);
1168        test_context.flush_metrics();
1169        let sum = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
1170
1171        let no_attr_data_point = sum.data_points.iter().find(|x| x.attributes.is_empty());
1172
1173        assert!(
1174            no_attr_data_point.is_none(),
1175            "Expected no data points with no attributes"
1176        );
1177    }
1178
1179    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1180    async fn delta_memory_efficiency_test() {
1181        // Run this test with stdout enabled to see output.
1182        // cargo test delta_memory_efficiency_test --features=testing -- --nocapture
1183
1184        // Arrange
1185        let mut test_context = TestContext::new(Temporality::Delta);
1186        let counter = test_context.u64_counter("test", "my_counter", None);
1187
1188        // Act
1189        counter.add(1, &[KeyValue::new("key1", "value1")]);
1190        counter.add(1, &[KeyValue::new("key1", "value1")]);
1191        counter.add(1, &[KeyValue::new("key1", "value1")]);
1192        counter.add(1, &[KeyValue::new("key1", "value1")]);
1193        counter.add(1, &[KeyValue::new("key1", "value1")]);
1194
1195        counter.add(1, &[KeyValue::new("key1", "value2")]);
1196        counter.add(1, &[KeyValue::new("key1", "value2")]);
1197        counter.add(1, &[KeyValue::new("key1", "value2")]);
1198        test_context.flush_metrics();
1199
1200        let sum = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
1201
1202        // Expecting 2 time-series.
1203        assert_eq!(sum.data_points.len(), 2);
1204
1205        // find and validate key1=value1 datapoint
1206        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1207            .expect("datapoint with key1=value1 expected");
1208        assert_eq!(data_point1.value, 5);
1209
1210        // find and validate key1=value2 datapoint
1211        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
1212            .expect("datapoint with key1=value2 expected");
1213        assert_eq!(data_point1.value, 3);
1214
1215        test_context.exporter.reset();
1216        // flush again, and validate that nothing is flushed
1217        // as delta temporality.
1218        test_context.flush_metrics();
1219
1220        let resource_metrics = test_context
1221            .exporter
1222            .get_finished_metrics()
1223            .expect("metrics are expected to be exported.");
1224        println!("resource_metrics: {:?}", resource_metrics);
1225        assert!(resource_metrics.is_empty(), "No metrics should be exported as no new measurements were recorded since last collect.");
1226    }
1227
1228    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1229    async fn counter_multithreaded() {
1230        // Run this test with stdout enabled to see output.
1231        // cargo test counter_multithreaded --features=testing -- --nocapture
1232
1233        counter_multithreaded_aggregation_helper(Temporality::Delta);
1234        counter_multithreaded_aggregation_helper(Temporality::Cumulative);
1235    }
1236
1237    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1238    async fn counter_f64_multithreaded() {
1239        // Run this test with stdout enabled to see output.
1240        // cargo test counter_f64_multithreaded --features=testing -- --nocapture
1241
1242        counter_f64_multithreaded_aggregation_helper(Temporality::Delta);
1243        counter_f64_multithreaded_aggregation_helper(Temporality::Cumulative);
1244    }
1245
1246    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1247    async fn histogram_multithreaded() {
1248        // Run this test with stdout enabled to see output.
1249        // cargo test histogram_multithreaded --features=testing -- --nocapture
1250
1251        histogram_multithreaded_aggregation_helper(Temporality::Delta);
1252        histogram_multithreaded_aggregation_helper(Temporality::Cumulative);
1253    }
1254
1255    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1256    async fn histogram_f64_multithreaded() {
1257        // Run this test with stdout enabled to see output.
1258        // cargo test histogram_f64_multithreaded --features=testing -- --nocapture
1259
1260        histogram_f64_multithreaded_aggregation_helper(Temporality::Delta);
1261        histogram_f64_multithreaded_aggregation_helper(Temporality::Cumulative);
1262    }
1263    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1264    async fn synchronous_instruments_cumulative_with_gap_in_measurements() {
1265        // Run this test with stdout enabled to see output.
1266        // cargo test synchronous_instruments_cumulative_with_gap_in_measurements --features=testing -- --nocapture
1267
1268        synchronous_instruments_cumulative_with_gap_in_measurements_helper("counter");
1269        synchronous_instruments_cumulative_with_gap_in_measurements_helper("updown_counter");
1270        synchronous_instruments_cumulative_with_gap_in_measurements_helper("histogram");
1271        synchronous_instruments_cumulative_with_gap_in_measurements_helper("gauge");
1272    }
1273
1274    fn synchronous_instruments_cumulative_with_gap_in_measurements_helper(
1275        instrument_name: &'static str,
1276    ) {
1277        let mut test_context = TestContext::new(Temporality::Cumulative);
1278        let attributes = &[KeyValue::new("key1", "value1")];
1279
1280        // Create instrument and emit measurements
1281        match instrument_name {
1282            "counter" => {
1283                let counter = test_context.meter().u64_counter("test_counter").build();
1284                counter.add(5, &[]);
1285                counter.add(10, attributes);
1286            }
1287            "updown_counter" => {
1288                let updown_counter = test_context
1289                    .meter()
1290                    .i64_up_down_counter("test_updowncounter")
1291                    .build();
1292                updown_counter.add(15, &[]);
1293                updown_counter.add(20, attributes);
1294            }
1295            "histogram" => {
1296                let histogram = test_context.meter().u64_histogram("test_histogram").build();
1297                histogram.record(25, &[]);
1298                histogram.record(30, attributes);
1299            }
1300            "gauge" => {
1301                let gauge = test_context.meter().u64_gauge("test_gauge").build();
1302                gauge.record(35, &[]);
1303                gauge.record(40, attributes);
1304            }
1305            _ => panic!("Incorrect instrument kind provided"),
1306        };
1307
1308        test_context.flush_metrics();
1309
1310        // Test the first export
1311        assert_correct_export(&mut test_context, instrument_name);
1312
1313        // Reset and export again without making any measurements
1314        test_context.reset_metrics();
1315
1316        test_context.flush_metrics();
1317
1318        // Test that latest export has the same data as the previous one
1319        assert_correct_export(&mut test_context, instrument_name);
1320
1321        fn assert_correct_export(test_context: &mut TestContext, instrument_name: &'static str) {
1322            match instrument_name {
1323                "counter" => {
1324                    let counter_data =
1325                        test_context.get_aggregation::<Sum<u64>>("test_counter", None);
1326                    assert_eq!(counter_data.data_points.len(), 2);
1327                    let zero_attribute_datapoint =
1328                        find_sum_datapoint_with_no_attributes(&counter_data.data_points)
1329                            .expect("datapoint with no attributes expected");
1330                    assert_eq!(zero_attribute_datapoint.value, 5);
1331                    let data_point1 = find_sum_datapoint_with_key_value(
1332                        &counter_data.data_points,
1333                        "key1",
1334                        "value1",
1335                    )
1336                    .expect("datapoint with key1=value1 expected");
1337                    assert_eq!(data_point1.value, 10);
1338                }
1339                "updown_counter" => {
1340                    let updown_counter_data =
1341                        test_context.get_aggregation::<Sum<i64>>("test_updowncounter", None);
1342                    assert_eq!(updown_counter_data.data_points.len(), 2);
1343                    let zero_attribute_datapoint =
1344                        find_sum_datapoint_with_no_attributes(&updown_counter_data.data_points)
1345                            .expect("datapoint with no attributes expected");
1346                    assert_eq!(zero_attribute_datapoint.value, 15);
1347                    let data_point1 = find_sum_datapoint_with_key_value(
1348                        &updown_counter_data.data_points,
1349                        "key1",
1350                        "value1",
1351                    )
1352                    .expect("datapoint with key1=value1 expected");
1353                    assert_eq!(data_point1.value, 20);
1354                }
1355                "histogram" => {
1356                    let histogram_data =
1357                        test_context.get_aggregation::<Histogram<u64>>("test_histogram", None);
1358                    assert_eq!(histogram_data.data_points.len(), 2);
1359                    let zero_attribute_datapoint =
1360                        find_histogram_datapoint_with_no_attributes(&histogram_data.data_points)
1361                            .expect("datapoint with no attributes expected");
1362                    assert_eq!(zero_attribute_datapoint.count, 1);
1363                    assert_eq!(zero_attribute_datapoint.sum, 25);
1364                    assert_eq!(zero_attribute_datapoint.min, Some(25));
1365                    assert_eq!(zero_attribute_datapoint.max, Some(25));
1366                    let data_point1 = find_histogram_datapoint_with_key_value(
1367                        &histogram_data.data_points,
1368                        "key1",
1369                        "value1",
1370                    )
1371                    .expect("datapoint with key1=value1 expected");
1372                    assert_eq!(data_point1.count, 1);
1373                    assert_eq!(data_point1.sum, 30);
1374                    assert_eq!(data_point1.min, Some(30));
1375                    assert_eq!(data_point1.max, Some(30));
1376                }
1377                "gauge" => {
1378                    let gauge_data = test_context.get_aggregation::<Gauge<u64>>("test_gauge", None);
1379                    assert_eq!(gauge_data.data_points.len(), 2);
1380                    let zero_attribute_datapoint =
1381                        find_gauge_datapoint_with_no_attributes(&gauge_data.data_points)
1382                            .expect("datapoint with no attributes expected");
1383                    assert_eq!(zero_attribute_datapoint.value, 35);
1384                    let data_point1 = find_gauge_datapoint_with_key_value(
1385                        &gauge_data.data_points,
1386                        "key1",
1387                        "value1",
1388                    )
1389                    .expect("datapoint with key1=value1 expected");
1390                    assert_eq!(data_point1.value, 40);
1391                }
1392                _ => panic!("Incorrect instrument kind provided"),
1393            }
1394        }
1395    }
1396
1397    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1398    async fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement() {
1399        // Run this test with stdout enabled to see output.
1400        // cargo test asynchronous_instruments_cumulative_data_points_only_from_last_measurement --features=testing -- --nocapture
1401
1402        asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1403            "gauge", true,
1404        );
1405        // TODO fix: all asynchronous instruments should not emit data points if not measured
1406        // but these implementations are still buggy
1407        asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1408            "counter", false,
1409        );
1410        asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1411            "updown_counter",
1412            false,
1413        );
1414    }
1415
1416    fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1417        instrument_name: &'static str,
1418        should_not_emit: bool,
1419    ) {
1420        let mut test_context = TestContext::new(Temporality::Cumulative);
1421        let attributes = Arc::new([KeyValue::new("key1", "value1")]);
1422
1423        // Create instrument and emit measurements once
1424        match instrument_name {
1425            "counter" => {
1426                let has_run = AtomicBool::new(false);
1427                let _observable_counter = test_context
1428                    .meter()
1429                    .u64_observable_counter("test_counter")
1430                    .with_callback(move |observer| {
1431                        if !has_run.load(Ordering::SeqCst) {
1432                            observer.observe(5, &[]);
1433                            observer.observe(10, &*attributes.clone());
1434                            has_run.store(true, Ordering::SeqCst);
1435                        }
1436                    })
1437                    .build();
1438            }
1439            "updown_counter" => {
1440                let has_run = AtomicBool::new(false);
1441                let _observable_up_down_counter = test_context
1442                    .meter()
1443                    .i64_observable_up_down_counter("test_updowncounter")
1444                    .with_callback(move |observer| {
1445                        if !has_run.load(Ordering::SeqCst) {
1446                            observer.observe(15, &[]);
1447                            observer.observe(20, &*attributes.clone());
1448                            has_run.store(true, Ordering::SeqCst);
1449                        }
1450                    })
1451                    .build();
1452            }
1453            "gauge" => {
1454                let has_run = AtomicBool::new(false);
1455                let _observable_gauge = test_context
1456                    .meter()
1457                    .u64_observable_gauge("test_gauge")
1458                    .with_callback(move |observer| {
1459                        if !has_run.load(Ordering::SeqCst) {
1460                            observer.observe(25, &[]);
1461                            observer.observe(30, &*attributes.clone());
1462                            has_run.store(true, Ordering::SeqCst);
1463                        }
1464                    })
1465                    .build();
1466            }
1467            _ => panic!("Incorrect instrument kind provided"),
1468        };
1469
1470        test_context.flush_metrics();
1471
1472        // Test the first export
1473        assert_correct_export(&mut test_context, instrument_name);
1474
1475        // Reset and export again without making any measurements
1476        test_context.reset_metrics();
1477
1478        test_context.flush_metrics();
1479
1480        if should_not_emit {
1481            test_context.check_no_metrics();
1482        } else {
1483            // Test that latest export has the same data as the previous one
1484            assert_correct_export(&mut test_context, instrument_name);
1485        }
1486
1487        fn assert_correct_export(test_context: &mut TestContext, instrument_name: &'static str) {
1488            match instrument_name {
1489                "counter" => {
1490                    let counter_data =
1491                        test_context.get_aggregation::<Sum<u64>>("test_counter", None);
1492                    assert_eq!(counter_data.data_points.len(), 2);
1493                    assert!(counter_data.is_monotonic);
1494                    let zero_attribute_datapoint =
1495                        find_sum_datapoint_with_no_attributes(&counter_data.data_points)
1496                            .expect("datapoint with no attributes expected");
1497                    assert_eq!(zero_attribute_datapoint.value, 5);
1498                    let data_point1 = find_sum_datapoint_with_key_value(
1499                        &counter_data.data_points,
1500                        "key1",
1501                        "value1",
1502                    )
1503                    .expect("datapoint with key1=value1 expected");
1504                    assert_eq!(data_point1.value, 10);
1505                }
1506                "updown_counter" => {
1507                    let updown_counter_data =
1508                        test_context.get_aggregation::<Sum<i64>>("test_updowncounter", None);
1509                    assert_eq!(updown_counter_data.data_points.len(), 2);
1510                    assert!(!updown_counter_data.is_monotonic);
1511                    let zero_attribute_datapoint =
1512                        find_sum_datapoint_with_no_attributes(&updown_counter_data.data_points)
1513                            .expect("datapoint with no attributes expected");
1514                    assert_eq!(zero_attribute_datapoint.value, 15);
1515                    let data_point1 = find_sum_datapoint_with_key_value(
1516                        &updown_counter_data.data_points,
1517                        "key1",
1518                        "value1",
1519                    )
1520                    .expect("datapoint with key1=value1 expected");
1521                    assert_eq!(data_point1.value, 20);
1522                }
1523                "gauge" => {
1524                    let gauge_data = test_context.get_aggregation::<Gauge<u64>>("test_gauge", None);
1525                    assert_eq!(gauge_data.data_points.len(), 2);
1526                    let zero_attribute_datapoint =
1527                        find_gauge_datapoint_with_no_attributes(&gauge_data.data_points)
1528                            .expect("datapoint with no attributes expected");
1529                    assert_eq!(zero_attribute_datapoint.value, 25);
1530                    let data_point1 = find_gauge_datapoint_with_key_value(
1531                        &gauge_data.data_points,
1532                        "key1",
1533                        "value1",
1534                    )
1535                    .expect("datapoint with key1=value1 expected");
1536                    assert_eq!(data_point1.value, 30);
1537                }
1538                _ => panic!("Incorrect instrument kind provided"),
1539            }
1540        }
1541    }
1542
1543    fn counter_multithreaded_aggregation_helper(temporality: Temporality) {
1544        // Arrange
1545        let mut test_context = TestContext::new(temporality);
1546        let counter = Arc::new(test_context.u64_counter("test", "my_counter", None));
1547
1548        for i in 0..10 {
1549            thread::scope(|s| {
1550                s.spawn(|| {
1551                    counter.add(1, &[]);
1552
1553                    counter.add(1, &[KeyValue::new("key1", "value1")]);
1554                    counter.add(1, &[KeyValue::new("key1", "value1")]);
1555                    counter.add(1, &[KeyValue::new("key1", "value1")]);
1556
1557                    // Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time.
1558                    if i % 2 == 0 {
1559                        test_context.flush_metrics();
1560                        thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing
1561                    }
1562
1563                    counter.add(1, &[KeyValue::new("key1", "value1")]);
1564                    counter.add(1, &[KeyValue::new("key1", "value1")]);
1565                });
1566            });
1567        }
1568
1569        test_context.flush_metrics();
1570
1571        // Assert
1572        // We invoke `test_context.flush_metrics()` six times.
1573        let sums = test_context.get_from_multiple_aggregations::<Sum<u64>>("my_counter", None, 6);
1574
1575        let mut sum_zero_attributes = 0;
1576        let mut sum_key1_value1 = 0;
1577        sums.iter().for_each(|sum| {
1578            assert_eq!(sum.data_points.len(), 2); // Expecting 1 time-series.
1579            assert!(sum.is_monotonic, "Counter should produce monotonic.");
1580            assert_eq!(sum.temporality, temporality);
1581
1582            if temporality == Temporality::Delta {
1583                sum_zero_attributes += sum.data_points[0].value;
1584                sum_key1_value1 += sum.data_points[1].value;
1585            } else {
1586                sum_zero_attributes = sum.data_points[0].value;
1587                sum_key1_value1 = sum.data_points[1].value;
1588            };
1589        });
1590
1591        assert_eq!(sum_zero_attributes, 10);
1592        assert_eq!(sum_key1_value1, 50); // Each of the 10 update threads record measurements summing up to 5.
1593    }
1594
1595    fn counter_f64_multithreaded_aggregation_helper(temporality: Temporality) {
1596        // Arrange
1597        let mut test_context = TestContext::new(temporality);
1598        let counter = Arc::new(test_context.meter().f64_counter("test_counter").build());
1599
1600        for i in 0..10 {
1601            thread::scope(|s| {
1602                s.spawn(|| {
1603                    counter.add(1.23, &[]);
1604
1605                    counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1606                    counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1607                    counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1608
1609                    // Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time.
1610                    if i % 2 == 0 {
1611                        test_context.flush_metrics();
1612                        thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing
1613                    }
1614
1615                    counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1616                    counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1617                });
1618            });
1619        }
1620
1621        test_context.flush_metrics();
1622
1623        // Assert
1624        // We invoke `test_context.flush_metrics()` six times.
1625        let sums = test_context.get_from_multiple_aggregations::<Sum<f64>>("test_counter", None, 6);
1626
1627        let mut sum_zero_attributes = 0.0;
1628        let mut sum_key1_value1 = 0.0;
1629        sums.iter().for_each(|sum| {
1630            assert_eq!(sum.data_points.len(), 2); // Expecting 1 time-series.
1631            assert!(sum.is_monotonic, "Counter should produce monotonic.");
1632            assert_eq!(sum.temporality, temporality);
1633
1634            if temporality == Temporality::Delta {
1635                sum_zero_attributes += sum.data_points[0].value;
1636                sum_key1_value1 += sum.data_points[1].value;
1637            } else {
1638                sum_zero_attributes = sum.data_points[0].value;
1639                sum_key1_value1 = sum.data_points[1].value;
1640            };
1641        });
1642
1643        assert!(f64::abs(12.3 - sum_zero_attributes) < 0.0001);
1644        assert!(f64::abs(61.5 - sum_key1_value1) < 0.0001); // Each of the 10 update threads record measurements 5 times = 10 * 5 * 1.23 = 61.5
1645    }
1646
1647    fn histogram_multithreaded_aggregation_helper(temporality: Temporality) {
1648        // Arrange
1649        let mut test_context = TestContext::new(temporality);
1650        let histogram = Arc::new(test_context.meter().u64_histogram("test_histogram").build());
1651
1652        for i in 0..10 {
1653            thread::scope(|s| {
1654                s.spawn(|| {
1655                    histogram.record(1, &[]);
1656                    histogram.record(4, &[]);
1657
1658                    histogram.record(5, &[KeyValue::new("key1", "value1")]);
1659                    histogram.record(7, &[KeyValue::new("key1", "value1")]);
1660                    histogram.record(18, &[KeyValue::new("key1", "value1")]);
1661
1662                    // Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time.
1663                    if i % 2 == 0 {
1664                        test_context.flush_metrics();
1665                        thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing
1666                    }
1667
1668                    histogram.record(35, &[KeyValue::new("key1", "value1")]);
1669                    histogram.record(35, &[KeyValue::new("key1", "value1")]);
1670                });
1671            });
1672        }
1673
1674        test_context.flush_metrics();
1675
1676        // Assert
1677        // We invoke `test_context.flush_metrics()` six times.
1678        let histograms = test_context.get_from_multiple_aggregations::<Histogram<u64>>(
1679            "test_histogram",
1680            None,
1681            6,
1682        );
1683
1684        let (
1685            mut sum_zero_attributes,
1686            mut count_zero_attributes,
1687            mut min_zero_attributes,
1688            mut max_zero_attributes,
1689        ) = (0, 0, u64::MAX, u64::MIN);
1690        let (mut sum_key1_value1, mut count_key1_value1, mut min_key1_value1, mut max_key1_value1) =
1691            (0, 0, u64::MAX, u64::MIN);
1692
1693        let mut bucket_counts_zero_attributes = vec![0; 16]; // There are 16 buckets for the default configuration
1694        let mut bucket_counts_key1_value1 = vec![0; 16];
1695
1696        histograms.iter().for_each(|histogram| {
1697            assert_eq!(histogram.data_points.len(), 2); // Expecting 1 time-series.
1698            assert_eq!(histogram.temporality, temporality);
1699
1700            let data_point_zero_attributes =
1701                find_histogram_datapoint_with_no_attributes(&histogram.data_points).unwrap();
1702            let data_point_key1_value1 =
1703                find_histogram_datapoint_with_key_value(&histogram.data_points, "key1", "value1")
1704                    .unwrap();
1705
1706            if temporality == Temporality::Delta {
1707                sum_zero_attributes += data_point_zero_attributes.sum;
1708                sum_key1_value1 += data_point_key1_value1.sum;
1709
1710                count_zero_attributes += data_point_zero_attributes.count;
1711                count_key1_value1 += data_point_key1_value1.count;
1712
1713                min_zero_attributes =
1714                    min(min_zero_attributes, data_point_zero_attributes.min.unwrap());
1715                min_key1_value1 = min(min_key1_value1, data_point_key1_value1.min.unwrap());
1716
1717                max_zero_attributes =
1718                    max(max_zero_attributes, data_point_zero_attributes.max.unwrap());
1719                max_key1_value1 = max(max_key1_value1, data_point_key1_value1.max.unwrap());
1720
1721                assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
1722                assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
1723
1724                for (i, _) in data_point_zero_attributes.bucket_counts.iter().enumerate() {
1725                    bucket_counts_zero_attributes[i] += data_point_zero_attributes.bucket_counts[i];
1726                }
1727
1728                for (i, _) in data_point_key1_value1.bucket_counts.iter().enumerate() {
1729                    bucket_counts_key1_value1[i] += data_point_key1_value1.bucket_counts[i];
1730                }
1731            } else {
1732                sum_zero_attributes = data_point_zero_attributes.sum;
1733                sum_key1_value1 = data_point_key1_value1.sum;
1734
1735                count_zero_attributes = data_point_zero_attributes.count;
1736                count_key1_value1 = data_point_key1_value1.count;
1737
1738                min_zero_attributes = data_point_zero_attributes.min.unwrap();
1739                min_key1_value1 = data_point_key1_value1.min.unwrap();
1740
1741                max_zero_attributes = data_point_zero_attributes.max.unwrap();
1742                max_key1_value1 = data_point_key1_value1.max.unwrap();
1743
1744                assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
1745                assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
1746
1747                bucket_counts_zero_attributes.clone_from(&data_point_zero_attributes.bucket_counts);
1748                bucket_counts_key1_value1.clone_from(&data_point_key1_value1.bucket_counts);
1749            };
1750        });
1751
1752        // Default buckets:
1753        // (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, 25.0], (25.0, 50.0], (50.0, 75.0], (75.0, 100.0], (100.0, 250.0], (250.0, 500.0],
1754        // (500.0, 750.0], (750.0, 1000.0], (1000.0, 2500.0], (2500.0, 5000.0], (5000.0, 7500.0], (7500.0, 10000.0], (10000.0, +∞).
1755
1756        assert_eq!(count_zero_attributes, 20); // Each of the 10 update threads record two measurements.
1757        assert_eq!(sum_zero_attributes, 50); // Each of the 10 update threads record measurements summing up to 5.
1758        assert_eq!(min_zero_attributes, 1);
1759        assert_eq!(max_zero_attributes, 4);
1760
1761        for (i, count) in bucket_counts_zero_attributes.iter().enumerate() {
1762            match i {
1763                1 => assert_eq!(*count, 20), // For each of the 10 update threads, both the recorded values 1 and 4 fall under the bucket (0, 5].
1764                _ => assert_eq!(*count, 0),
1765            }
1766        }
1767
1768        assert_eq!(count_key1_value1, 50); // Each of the 10 update threads record 5 measurements.
1769        assert_eq!(sum_key1_value1, 1000); // Each of the 10 update threads record measurements summing up to 100 (5 + 7 + 18 + 35 + 35).
1770        assert_eq!(min_key1_value1, 5);
1771        assert_eq!(max_key1_value1, 35);
1772
1773        for (i, count) in bucket_counts_key1_value1.iter().enumerate() {
1774            match i {
1775                1 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 5 falls under the bucket (0, 5].
1776                2 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 7 falls under the bucket (5, 10].
1777                3 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 18 falls under the bucket (10, 25].
1778                4 => assert_eq!(*count, 20), // For each of the 10 update threads, the recorded value 35 (recorded twice) falls under the bucket (25, 50].
1779                _ => assert_eq!(*count, 0),
1780            }
1781        }
1782    }
1783
1784    fn histogram_f64_multithreaded_aggregation_helper(temporality: Temporality) {
1785        // Arrange
1786        let mut test_context = TestContext::new(temporality);
1787        let histogram = Arc::new(test_context.meter().f64_histogram("test_histogram").build());
1788
1789        for i in 0..10 {
1790            thread::scope(|s| {
1791                s.spawn(|| {
1792                    histogram.record(1.5, &[]);
1793                    histogram.record(4.6, &[]);
1794
1795                    histogram.record(5.0, &[KeyValue::new("key1", "value1")]);
1796                    histogram.record(7.3, &[KeyValue::new("key1", "value1")]);
1797                    histogram.record(18.1, &[KeyValue::new("key1", "value1")]);
1798
1799                    // Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time.
1800                    if i % 2 == 0 {
1801                        test_context.flush_metrics();
1802                        thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing
1803                    }
1804
1805                    histogram.record(35.1, &[KeyValue::new("key1", "value1")]);
1806                    histogram.record(35.1, &[KeyValue::new("key1", "value1")]);
1807                });
1808            });
1809        }
1810
1811        test_context.flush_metrics();
1812
1813        // Assert
1814        // We invoke `test_context.flush_metrics()` six times.
1815        let histograms = test_context.get_from_multiple_aggregations::<Histogram<f64>>(
1816            "test_histogram",
1817            None,
1818            6,
1819        );
1820
1821        let (
1822            mut sum_zero_attributes,
1823            mut count_zero_attributes,
1824            mut min_zero_attributes,
1825            mut max_zero_attributes,
1826        ) = (0.0, 0, f64::MAX, f64::MIN);
1827        let (mut sum_key1_value1, mut count_key1_value1, mut min_key1_value1, mut max_key1_value1) =
1828            (0.0, 0, f64::MAX, f64::MIN);
1829
1830        let mut bucket_counts_zero_attributes = vec![0; 16]; // There are 16 buckets for the default configuration
1831        let mut bucket_counts_key1_value1 = vec![0; 16];
1832
1833        histograms.iter().for_each(|histogram| {
1834            assert_eq!(histogram.data_points.len(), 2); // Expecting 1 time-series.
1835            assert_eq!(histogram.temporality, temporality);
1836
1837            let data_point_zero_attributes =
1838                find_histogram_datapoint_with_no_attributes(&histogram.data_points).unwrap();
1839            let data_point_key1_value1 =
1840                find_histogram_datapoint_with_key_value(&histogram.data_points, "key1", "value1")
1841                    .unwrap();
1842
1843            if temporality == Temporality::Delta {
1844                sum_zero_attributes += data_point_zero_attributes.sum;
1845                sum_key1_value1 += data_point_key1_value1.sum;
1846
1847                count_zero_attributes += data_point_zero_attributes.count;
1848                count_key1_value1 += data_point_key1_value1.count;
1849
1850                min_zero_attributes =
1851                    min_zero_attributes.min(data_point_zero_attributes.min.unwrap());
1852                min_key1_value1 = min_key1_value1.min(data_point_key1_value1.min.unwrap());
1853
1854                max_zero_attributes =
1855                    max_zero_attributes.max(data_point_zero_attributes.max.unwrap());
1856                max_key1_value1 = max_key1_value1.max(data_point_key1_value1.max.unwrap());
1857
1858                assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
1859                assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
1860
1861                for (i, _) in data_point_zero_attributes.bucket_counts.iter().enumerate() {
1862                    bucket_counts_zero_attributes[i] += data_point_zero_attributes.bucket_counts[i];
1863                }
1864
1865                for (i, _) in data_point_key1_value1.bucket_counts.iter().enumerate() {
1866                    bucket_counts_key1_value1[i] += data_point_key1_value1.bucket_counts[i];
1867                }
1868            } else {
1869                sum_zero_attributes = data_point_zero_attributes.sum;
1870                sum_key1_value1 = data_point_key1_value1.sum;
1871
1872                count_zero_attributes = data_point_zero_attributes.count;
1873                count_key1_value1 = data_point_key1_value1.count;
1874
1875                min_zero_attributes = data_point_zero_attributes.min.unwrap();
1876                min_key1_value1 = data_point_key1_value1.min.unwrap();
1877
1878                max_zero_attributes = data_point_zero_attributes.max.unwrap();
1879                max_key1_value1 = data_point_key1_value1.max.unwrap();
1880
1881                assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
1882                assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
1883
1884                bucket_counts_zero_attributes.clone_from(&data_point_zero_attributes.bucket_counts);
1885                bucket_counts_key1_value1.clone_from(&data_point_key1_value1.bucket_counts);
1886            };
1887        });
1888
1889        // Default buckets:
1890        // (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, 25.0], (25.0, 50.0], (50.0, 75.0], (75.0, 100.0], (100.0, 250.0], (250.0, 500.0],
1891        // (500.0, 750.0], (750.0, 1000.0], (1000.0, 2500.0], (2500.0, 5000.0], (5000.0, 7500.0], (7500.0, 10000.0], (10000.0, +∞).
1892
1893        assert_eq!(count_zero_attributes, 20); // Each of the 10 update threads record two measurements.
1894        assert!(f64::abs(61.0 - sum_zero_attributes) < 0.0001); // Each of the 10 update threads record measurements summing up to 6.1 (1.5 + 4.6)
1895        assert_eq!(min_zero_attributes, 1.5);
1896        assert_eq!(max_zero_attributes, 4.6);
1897
1898        for (i, count) in bucket_counts_zero_attributes.iter().enumerate() {
1899            match i {
1900                1 => assert_eq!(*count, 20), // For each of the 10 update threads, both the recorded values 1.5 and 4.6 fall under the bucket (0, 5.0].
1901                _ => assert_eq!(*count, 0),
1902            }
1903        }
1904
1905        assert_eq!(count_key1_value1, 50); // Each of the 10 update threads record 5 measurements.
1906        assert!(f64::abs(1006.0 - sum_key1_value1) < 0.0001); // Each of the 10 update threads record measurements summing up to 100.4 (5.0 + 7.3 + 18.1 + 35.1 + 35.1).
1907        assert_eq!(min_key1_value1, 5.0);
1908        assert_eq!(max_key1_value1, 35.1);
1909
1910        for (i, count) in bucket_counts_key1_value1.iter().enumerate() {
1911            match i {
1912                1 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 5.0 falls under the bucket (0, 5.0].
1913                2 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 7.3 falls under the bucket (5.0, 10.0].
1914                3 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 18.1 falls under the bucket (10.0, 25.0].
1915                4 => assert_eq!(*count, 20), // For each of the 10 update threads, the recorded value 35.1 (recorded twice) falls under the bucket (25.0, 50.0].
1916                _ => assert_eq!(*count, 0),
1917            }
1918        }
1919    }
1920
1921    fn histogram_aggregation_helper(temporality: Temporality) {
1922        // Arrange
1923        let mut test_context = TestContext::new(temporality);
1924        let histogram = test_context.meter().u64_histogram("my_histogram").build();
1925
1926        // Act
1927        let mut rand = rngs::SmallRng::from_entropy();
1928        let values_kv1 = (0..50)
1929            .map(|_| rand.gen_range(0..100))
1930            .collect::<Vec<u64>>();
1931        for value in values_kv1.iter() {
1932            histogram.record(*value, &[KeyValue::new("key1", "value1")]);
1933        }
1934
1935        let values_kv2 = (0..30)
1936            .map(|_| rand.gen_range(0..100))
1937            .collect::<Vec<u64>>();
1938        for value in values_kv2.iter() {
1939            histogram.record(*value, &[KeyValue::new("key1", "value2")]);
1940        }
1941
1942        test_context.flush_metrics();
1943
1944        // Assert
1945        let histogram_data = test_context.get_aggregation::<Histogram<u64>>("my_histogram", None);
1946        // Expecting 2 time-series.
1947        assert_eq!(histogram_data.data_points.len(), 2);
1948        if let Temporality::Cumulative = temporality {
1949            assert_eq!(
1950                histogram_data.temporality,
1951                Temporality::Cumulative,
1952                "Should produce cumulative"
1953            );
1954        } else {
1955            assert_eq!(
1956                histogram_data.temporality,
1957                Temporality::Delta,
1958                "Should produce delta"
1959            );
1960        }
1961
1962        // find and validate key1=value2 datapoint
1963        let data_point1 =
1964            find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
1965                .expect("datapoint with key1=value1 expected");
1966        assert_eq!(data_point1.count, values_kv1.len() as u64);
1967        assert_eq!(data_point1.sum, values_kv1.iter().sum::<u64>());
1968        assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
1969        assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
1970
1971        let data_point2 =
1972            find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value2")
1973                .expect("datapoint with key1=value2 expected");
1974        assert_eq!(data_point2.count, values_kv2.len() as u64);
1975        assert_eq!(data_point2.sum, values_kv2.iter().sum::<u64>());
1976        assert_eq!(data_point2.min.unwrap(), *values_kv2.iter().min().unwrap());
1977        assert_eq!(data_point2.max.unwrap(), *values_kv2.iter().max().unwrap());
1978
1979        // Reset and report more measurements
1980        test_context.reset_metrics();
1981        for value in values_kv1.iter() {
1982            histogram.record(*value, &[KeyValue::new("key1", "value1")]);
1983        }
1984
1985        for value in values_kv2.iter() {
1986            histogram.record(*value, &[KeyValue::new("key1", "value2")]);
1987        }
1988
1989        test_context.flush_metrics();
1990
1991        let histogram_data = test_context.get_aggregation::<Histogram<u64>>("my_histogram", None);
1992        assert_eq!(histogram_data.data_points.len(), 2);
1993        let data_point1 =
1994            find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
1995                .expect("datapoint with key1=value1 expected");
1996        if temporality == Temporality::Cumulative {
1997            assert_eq!(data_point1.count, 2 * (values_kv1.len() as u64));
1998            assert_eq!(data_point1.sum, 2 * (values_kv1.iter().sum::<u64>()));
1999            assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
2000            assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
2001        } else {
2002            assert_eq!(data_point1.count, values_kv1.len() as u64);
2003            assert_eq!(data_point1.sum, values_kv1.iter().sum::<u64>());
2004            assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
2005            assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
2006        }
2007
2008        let data_point1 =
2009            find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value2")
2010                .expect("datapoint with key1=value1 expected");
2011        if temporality == Temporality::Cumulative {
2012            assert_eq!(data_point1.count, 2 * (values_kv2.len() as u64));
2013            assert_eq!(data_point1.sum, 2 * (values_kv2.iter().sum::<u64>()));
2014            assert_eq!(data_point1.min.unwrap(), *values_kv2.iter().min().unwrap());
2015            assert_eq!(data_point1.max.unwrap(), *values_kv2.iter().max().unwrap());
2016        } else {
2017            assert_eq!(data_point1.count, values_kv2.len() as u64);
2018            assert_eq!(data_point1.sum, values_kv2.iter().sum::<u64>());
2019            assert_eq!(data_point1.min.unwrap(), *values_kv2.iter().min().unwrap());
2020            assert_eq!(data_point1.max.unwrap(), *values_kv2.iter().max().unwrap());
2021        }
2022    }
2023
2024    fn histogram_aggregation_with_custom_bounds_helper(temporality: Temporality) {
2025        let mut test_context = TestContext::new(temporality);
2026        let histogram = test_context
2027            .meter()
2028            .u64_histogram("test_histogram")
2029            .with_boundaries(vec![1.0, 2.5, 5.5])
2030            .build();
2031        histogram.record(1, &[KeyValue::new("key1", "value1")]);
2032        histogram.record(2, &[KeyValue::new("key1", "value1")]);
2033        histogram.record(3, &[KeyValue::new("key1", "value1")]);
2034        histogram.record(4, &[KeyValue::new("key1", "value1")]);
2035        histogram.record(5, &[KeyValue::new("key1", "value1")]);
2036
2037        test_context.flush_metrics();
2038
2039        // Assert
2040        let histogram_data = test_context.get_aggregation::<Histogram<u64>>("test_histogram", None);
2041        // Expecting 2 time-series.
2042        assert_eq!(histogram_data.data_points.len(), 1);
2043        if let Temporality::Cumulative = temporality {
2044            assert_eq!(
2045                histogram_data.temporality,
2046                Temporality::Cumulative,
2047                "Should produce cumulative"
2048            );
2049        } else {
2050            assert_eq!(
2051                histogram_data.temporality,
2052                Temporality::Delta,
2053                "Should produce delta"
2054            );
2055        }
2056
2057        // find and validate key1=value1 datapoint
2058        let data_point =
2059            find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
2060                .expect("datapoint with key1=value1 expected");
2061
2062        assert_eq!(data_point.count, 5);
2063        assert_eq!(data_point.sum, 15);
2064
2065        // Check the bucket counts
2066        // -∞ to 1.0: 1
2067        // 1.0 to 2.5: 1
2068        // 2.5 to 5.5: 3
2069        // 5.5 to +∞: 0
2070
2071        assert_eq!(vec![1.0, 2.5, 5.5], data_point.bounds);
2072        assert_eq!(vec![1, 1, 3, 0], data_point.bucket_counts);
2073    }
2074    fn gauge_aggregation_helper(temporality: Temporality) {
2075        // Arrange
2076        let mut test_context = TestContext::new(temporality);
2077        let gauge = test_context.meter().i64_gauge("my_gauge").build();
2078
2079        // Act
2080        gauge.record(1, &[KeyValue::new("key1", "value1")]);
2081        gauge.record(2, &[KeyValue::new("key1", "value1")]);
2082        gauge.record(1, &[KeyValue::new("key1", "value1")]);
2083        gauge.record(3, &[KeyValue::new("key1", "value1")]);
2084        gauge.record(4, &[KeyValue::new("key1", "value1")]);
2085
2086        gauge.record(11, &[KeyValue::new("key1", "value2")]);
2087        gauge.record(13, &[KeyValue::new("key1", "value2")]);
2088        gauge.record(6, &[KeyValue::new("key1", "value2")]);
2089
2090        test_context.flush_metrics();
2091
2092        // Assert
2093        let gauge_data_point = test_context.get_aggregation::<Gauge<i64>>("my_gauge", None);
2094        // Expecting 2 time-series.
2095        assert_eq!(gauge_data_point.data_points.len(), 2);
2096
2097        // find and validate key1=value2 datapoint
2098        let data_point1 =
2099            find_gauge_datapoint_with_key_value(&gauge_data_point.data_points, "key1", "value1")
2100                .expect("datapoint with key1=value1 expected");
2101        assert_eq!(data_point1.value, 4);
2102
2103        let data_point1 =
2104            find_gauge_datapoint_with_key_value(&gauge_data_point.data_points, "key1", "value2")
2105                .expect("datapoint with key1=value2 expected");
2106        assert_eq!(data_point1.value, 6);
2107
2108        // Reset and report more measurements
2109        test_context.reset_metrics();
2110        gauge.record(1, &[KeyValue::new("key1", "value1")]);
2111        gauge.record(2, &[KeyValue::new("key1", "value1")]);
2112        gauge.record(11, &[KeyValue::new("key1", "value1")]);
2113        gauge.record(3, &[KeyValue::new("key1", "value1")]);
2114        gauge.record(41, &[KeyValue::new("key1", "value1")]);
2115
2116        gauge.record(34, &[KeyValue::new("key1", "value2")]);
2117        gauge.record(12, &[KeyValue::new("key1", "value2")]);
2118        gauge.record(54, &[KeyValue::new("key1", "value2")]);
2119
2120        test_context.flush_metrics();
2121
2122        let gauge = test_context.get_aggregation::<Gauge<i64>>("my_gauge", None);
2123        assert_eq!(gauge.data_points.len(), 2);
2124        let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1")
2125            .expect("datapoint with key1=value1 expected");
2126        assert_eq!(data_point1.value, 41);
2127
2128        let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value2")
2129            .expect("datapoint with key1=value2 expected");
2130        assert_eq!(data_point1.value, 54);
2131    }
2132
2133    fn observable_gauge_aggregation_helper(temporality: Temporality, use_empty_attributes: bool) {
2134        // Arrange
2135        let mut test_context = TestContext::new(temporality);
2136        let _observable_gauge = test_context
2137            .meter()
2138            .i64_observable_gauge("test_observable_gauge")
2139            .with_callback(move |observer| {
2140                if use_empty_attributes {
2141                    observer.observe(1, &[]);
2142                }
2143                observer.observe(4, &[KeyValue::new("key1", "value1")]);
2144                observer.observe(5, &[KeyValue::new("key2", "value2")]);
2145            })
2146            .build();
2147
2148        test_context.flush_metrics();
2149
2150        // Assert
2151        let gauge = test_context.get_aggregation::<Gauge<i64>>("test_observable_gauge", None);
2152        // Expecting 2 time-series.
2153        let expected_time_series_count = if use_empty_attributes { 3 } else { 2 };
2154        assert_eq!(gauge.data_points.len(), expected_time_series_count);
2155
2156        if use_empty_attributes {
2157            // find and validate zero attribute datapoint
2158            let zero_attribute_datapoint =
2159                find_gauge_datapoint_with_no_attributes(&gauge.data_points)
2160                    .expect("datapoint with no attributes expected");
2161            assert_eq!(zero_attribute_datapoint.value, 1);
2162        }
2163
2164        // find and validate key1=value1 datapoint
2165        let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1")
2166            .expect("datapoint with key1=value1 expected");
2167        assert_eq!(data_point1.value, 4);
2168
2169        // find and validate key2=value2 datapoint
2170        let data_point2 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key2", "value2")
2171            .expect("datapoint with key2=value2 expected");
2172        assert_eq!(data_point2.value, 5);
2173
2174        // Reset and report more measurements
2175        test_context.reset_metrics();
2176
2177        test_context.flush_metrics();
2178
2179        let gauge = test_context.get_aggregation::<Gauge<i64>>("test_observable_gauge", None);
2180        assert_eq!(gauge.data_points.len(), expected_time_series_count);
2181
2182        if use_empty_attributes {
2183            let zero_attribute_datapoint =
2184                find_gauge_datapoint_with_no_attributes(&gauge.data_points)
2185                    .expect("datapoint with no attributes expected");
2186            assert_eq!(zero_attribute_datapoint.value, 1);
2187        }
2188
2189        let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1")
2190            .expect("datapoint with key1=value1 expected");
2191        assert_eq!(data_point1.value, 4);
2192
2193        let data_point2 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key2", "value2")
2194            .expect("datapoint with key2=value2 expected");
2195        assert_eq!(data_point2.value, 5);
2196    }
2197
2198    fn counter_aggregation_helper(temporality: Temporality) {
2199        // Arrange
2200        let mut test_context = TestContext::new(temporality);
2201        let counter = test_context.u64_counter("test", "my_counter", None);
2202
2203        // Act
2204        counter.add(1, &[KeyValue::new("key1", "value1")]);
2205        counter.add(1, &[KeyValue::new("key1", "value1")]);
2206        counter.add(1, &[KeyValue::new("key1", "value1")]);
2207        counter.add(1, &[KeyValue::new("key1", "value1")]);
2208        counter.add(1, &[KeyValue::new("key1", "value1")]);
2209
2210        counter.add(1, &[KeyValue::new("key1", "value2")]);
2211        counter.add(1, &[KeyValue::new("key1", "value2")]);
2212        counter.add(1, &[KeyValue::new("key1", "value2")]);
2213
2214        test_context.flush_metrics();
2215
2216        // Assert
2217        let sum = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
2218        // Expecting 2 time-series.
2219        assert_eq!(sum.data_points.len(), 2);
2220        assert!(sum.is_monotonic, "Counter should produce monotonic.");
2221        if let Temporality::Cumulative = temporality {
2222            assert_eq!(
2223                sum.temporality,
2224                Temporality::Cumulative,
2225                "Should produce cumulative"
2226            );
2227        } else {
2228            assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
2229        }
2230
2231        // find and validate key1=value2 datapoint
2232        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
2233            .expect("datapoint with key1=value1 expected");
2234        assert_eq!(data_point1.value, 5);
2235
2236        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
2237            .expect("datapoint with key1=value2 expected");
2238        assert_eq!(data_point1.value, 3);
2239
2240        // Reset and report more measurements
2241        test_context.reset_metrics();
2242        counter.add(1, &[KeyValue::new("key1", "value1")]);
2243        counter.add(1, &[KeyValue::new("key1", "value1")]);
2244        counter.add(1, &[KeyValue::new("key1", "value1")]);
2245        counter.add(1, &[KeyValue::new("key1", "value1")]);
2246        counter.add(1, &[KeyValue::new("key1", "value1")]);
2247
2248        counter.add(1, &[KeyValue::new("key1", "value2")]);
2249        counter.add(1, &[KeyValue::new("key1", "value2")]);
2250        counter.add(1, &[KeyValue::new("key1", "value2")]);
2251
2252        test_context.flush_metrics();
2253
2254        let sum = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
2255        assert_eq!(sum.data_points.len(), 2);
2256        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
2257            .expect("datapoint with key1=value1 expected");
2258        if temporality == Temporality::Cumulative {
2259            assert_eq!(data_point1.value, 10);
2260        } else {
2261            assert_eq!(data_point1.value, 5);
2262        }
2263
2264        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
2265            .expect("datapoint with key1=value2 expected");
2266        if temporality == Temporality::Cumulative {
2267            assert_eq!(data_point1.value, 6);
2268        } else {
2269            assert_eq!(data_point1.value, 3);
2270        }
2271    }
2272
2273    fn counter_aggregation_overflow_helper(temporality: Temporality) {
2274        // Arrange
2275        let mut test_context = TestContext::new(temporality);
2276        let counter = test_context.u64_counter("test", "my_counter", None);
2277
2278        // Act
2279        // Record measurements with A:0, A:1,.......A:1999, which just fits in the 2000 limit
2280        for v in 0..2000 {
2281            counter.add(100, &[KeyValue::new("A", v.to_string())]);
2282        }
2283
2284        // Empty attributes is specially treated and does not count towards the limit.
2285        counter.add(3, &[]);
2286        counter.add(3, &[]);
2287
2288        // All of the below will now go into overflow.
2289        counter.add(100, &[KeyValue::new("A", "foo")]);
2290        counter.add(100, &[KeyValue::new("A", "another")]);
2291        counter.add(100, &[KeyValue::new("A", "yet_another")]);
2292        test_context.flush_metrics();
2293
2294        let sum = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
2295
2296        // Expecting 2002 metric points. (2000 + 1 overflow + Empty attributes)
2297        assert_eq!(sum.data_points.len(), 2002);
2298
2299        let data_point =
2300            find_sum_datapoint_with_key_value(&sum.data_points, "otel.metric.overflow", "true")
2301                .expect("overflow point expected");
2302        assert_eq!(data_point.value, 300);
2303
2304        // let empty_attrs_data_point = &sum.data_points[0];
2305        let empty_attrs_data_point = find_sum_datapoint_with_no_attributes(&sum.data_points)
2306            .expect("Empty attributes point expected");
2307        assert!(
2308            empty_attrs_data_point.attributes.is_empty(),
2309            "Non-empty attribute set"
2310        );
2311        assert_eq!(
2312            empty_attrs_data_point.value, 6,
2313            "Empty attributes value should be 3+3=6"
2314        );
2315    }
2316
2317    fn counter_aggregation_attribute_order_helper(temporality: Temporality, start_sorted: bool) {
2318        // Arrange
2319        let mut test_context = TestContext::new(temporality);
2320        let counter = test_context.u64_counter("test", "my_counter", None);
2321
2322        // Act
2323        // Add the same set of attributes in different order. (they are expected
2324        // to be treated as same attributes)
2325        // start with sorted order
2326        if start_sorted {
2327            counter.add(
2328                1,
2329                &[
2330                    KeyValue::new("A", "a"),
2331                    KeyValue::new("B", "b"),
2332                    KeyValue::new("C", "c"),
2333                ],
2334            );
2335        } else {
2336            counter.add(
2337                1,
2338                &[
2339                    KeyValue::new("A", "a"),
2340                    KeyValue::new("C", "c"),
2341                    KeyValue::new("B", "b"),
2342                ],
2343            );
2344        }
2345
2346        counter.add(
2347            1,
2348            &[
2349                KeyValue::new("A", "a"),
2350                KeyValue::new("C", "c"),
2351                KeyValue::new("B", "b"),
2352            ],
2353        );
2354        counter.add(
2355            1,
2356            &[
2357                KeyValue::new("B", "b"),
2358                KeyValue::new("A", "a"),
2359                KeyValue::new("C", "c"),
2360            ],
2361        );
2362        counter.add(
2363            1,
2364            &[
2365                KeyValue::new("B", "b"),
2366                KeyValue::new("C", "c"),
2367                KeyValue::new("A", "a"),
2368            ],
2369        );
2370        counter.add(
2371            1,
2372            &[
2373                KeyValue::new("C", "c"),
2374                KeyValue::new("B", "b"),
2375                KeyValue::new("A", "a"),
2376            ],
2377        );
2378        counter.add(
2379            1,
2380            &[
2381                KeyValue::new("C", "c"),
2382                KeyValue::new("A", "a"),
2383                KeyValue::new("B", "b"),
2384            ],
2385        );
2386        test_context.flush_metrics();
2387
2388        let sum = test_context.get_aggregation::<Sum<u64>>("my_counter", None);
2389
2390        // Expecting 1 time-series.
2391        assert_eq!(sum.data_points.len(), 1);
2392
2393        // validate the sole datapoint
2394        let data_point1 = &sum.data_points[0];
2395        assert_eq!(data_point1.value, 6);
2396    }
2397
2398    fn updown_counter_aggregation_helper(temporality: Temporality) {
2399        // Arrange
2400        let mut test_context = TestContext::new(temporality);
2401        let counter = test_context.i64_up_down_counter("test", "my_updown_counter", None);
2402
2403        // Act
2404        counter.add(10, &[KeyValue::new("key1", "value1")]);
2405        counter.add(-1, &[KeyValue::new("key1", "value1")]);
2406        counter.add(-5, &[KeyValue::new("key1", "value1")]);
2407        counter.add(0, &[KeyValue::new("key1", "value1")]);
2408        counter.add(1, &[KeyValue::new("key1", "value1")]);
2409
2410        counter.add(10, &[KeyValue::new("key1", "value2")]);
2411        counter.add(0, &[KeyValue::new("key1", "value2")]);
2412        counter.add(-3, &[KeyValue::new("key1", "value2")]);
2413
2414        test_context.flush_metrics();
2415
2416        // Assert
2417        let sum = test_context.get_aggregation::<Sum<i64>>("my_updown_counter", None);
2418        // Expecting 2 time-series.
2419        assert_eq!(sum.data_points.len(), 2);
2420        assert!(
2421            !sum.is_monotonic,
2422            "UpDownCounter should produce non-monotonic."
2423        );
2424        assert_eq!(
2425            sum.temporality,
2426            Temporality::Cumulative,
2427            "Should produce Cumulative for UpDownCounter"
2428        );
2429
2430        // find and validate key1=value2 datapoint
2431        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
2432            .expect("datapoint with key1=value1 expected");
2433        assert_eq!(data_point1.value, 5);
2434
2435        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
2436            .expect("datapoint with key1=value2 expected");
2437        assert_eq!(data_point1.value, 7);
2438
2439        // Reset and report more measurements
2440        test_context.reset_metrics();
2441        counter.add(10, &[KeyValue::new("key1", "value1")]);
2442        counter.add(-1, &[KeyValue::new("key1", "value1")]);
2443        counter.add(-5, &[KeyValue::new("key1", "value1")]);
2444        counter.add(0, &[KeyValue::new("key1", "value1")]);
2445        counter.add(1, &[KeyValue::new("key1", "value1")]);
2446
2447        counter.add(10, &[KeyValue::new("key1", "value2")]);
2448        counter.add(0, &[KeyValue::new("key1", "value2")]);
2449        counter.add(-3, &[KeyValue::new("key1", "value2")]);
2450
2451        test_context.flush_metrics();
2452
2453        let sum = test_context.get_aggregation::<Sum<i64>>("my_updown_counter", None);
2454        assert_eq!(sum.data_points.len(), 2);
2455        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
2456            .expect("datapoint with key1=value1 expected");
2457        assert_eq!(data_point1.value, 10);
2458
2459        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
2460            .expect("datapoint with key1=value2 expected");
2461        assert_eq!(data_point1.value, 14);
2462    }
2463
2464    fn find_sum_datapoint_with_key_value<'a, T>(
2465        data_points: &'a [SumDataPoint<T>],
2466        key: &str,
2467        value: &str,
2468    ) -> Option<&'a SumDataPoint<T>> {
2469        data_points.iter().find(|&datapoint| {
2470            datapoint
2471                .attributes
2472                .iter()
2473                .any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
2474        })
2475    }
2476
2477    fn find_gauge_datapoint_with_key_value<'a, T>(
2478        data_points: &'a [GaugeDataPoint<T>],
2479        key: &str,
2480        value: &str,
2481    ) -> Option<&'a GaugeDataPoint<T>> {
2482        data_points.iter().find(|&datapoint| {
2483            datapoint
2484                .attributes
2485                .iter()
2486                .any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
2487        })
2488    }
2489
2490    fn find_sum_datapoint_with_no_attributes<T>(
2491        data_points: &[SumDataPoint<T>],
2492    ) -> Option<&SumDataPoint<T>> {
2493        data_points
2494            .iter()
2495            .find(|&datapoint| datapoint.attributes.is_empty())
2496    }
2497
2498    fn find_gauge_datapoint_with_no_attributes<T>(
2499        data_points: &[GaugeDataPoint<T>],
2500    ) -> Option<&GaugeDataPoint<T>> {
2501        data_points
2502            .iter()
2503            .find(|&datapoint| datapoint.attributes.is_empty())
2504    }
2505
2506    fn find_histogram_datapoint_with_key_value<'a, T>(
2507        data_points: &'a [HistogramDataPoint<T>],
2508        key: &str,
2509        value: &str,
2510    ) -> Option<&'a HistogramDataPoint<T>> {
2511        data_points.iter().find(|&datapoint| {
2512            datapoint
2513                .attributes
2514                .iter()
2515                .any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
2516        })
2517    }
2518
2519    fn find_histogram_datapoint_with_no_attributes<T>(
2520        data_points: &[HistogramDataPoint<T>],
2521    ) -> Option<&HistogramDataPoint<T>> {
2522        data_points
2523            .iter()
2524            .find(|&datapoint| datapoint.attributes.is_empty())
2525    }
2526
2527    fn find_scope_metric<'a>(
2528        metrics: &'a [ScopeMetrics],
2529        name: &'a str,
2530    ) -> Option<&'a ScopeMetrics> {
2531        metrics
2532            .iter()
2533            .find(|&scope_metric| scope_metric.scope.name() == name)
2534    }
2535
2536    struct TestContext {
2537        exporter: InMemoryMetricExporter,
2538        meter_provider: SdkMeterProvider,
2539
2540        // Saving this on the test context for lifetime simplicity
2541        resource_metrics: Vec<ResourceMetrics>,
2542    }
2543
2544    impl TestContext {
2545        fn new(temporality: Temporality) -> Self {
2546            let exporter = InMemoryMetricExporterBuilder::new().with_temporality(temporality);
2547            let exporter = exporter.build();
2548            let meter_provider = SdkMeterProvider::builder()
2549                .with_periodic_exporter(exporter.clone())
2550                .build();
2551
2552            TestContext {
2553                exporter,
2554                meter_provider,
2555                resource_metrics: vec![],
2556            }
2557        }
2558
2559        fn u64_counter(
2560            &self,
2561            meter_name: &'static str,
2562            counter_name: &'static str,
2563            unit: Option<&'static str>,
2564        ) -> Counter<u64> {
2565            let meter = self.meter_provider.meter(meter_name);
2566            let mut counter_builder = meter.u64_counter(counter_name);
2567            if let Some(unit_name) = unit {
2568                counter_builder = counter_builder.with_unit(unit_name);
2569            }
2570            counter_builder.build()
2571        }
2572
2573        fn i64_up_down_counter(
2574            &self,
2575            meter_name: &'static str,
2576            counter_name: &'static str,
2577            unit: Option<&'static str>,
2578        ) -> UpDownCounter<i64> {
2579            let meter = self.meter_provider.meter(meter_name);
2580            let mut updown_counter_builder = meter.i64_up_down_counter(counter_name);
2581            if let Some(unit_name) = unit {
2582                updown_counter_builder = updown_counter_builder.with_unit(unit_name);
2583            }
2584            updown_counter_builder.build()
2585        }
2586
2587        fn meter(&self) -> Meter {
2588            self.meter_provider.meter("test")
2589        }
2590
2591        fn flush_metrics(&self) {
2592            self.meter_provider.force_flush().unwrap();
2593        }
2594
2595        fn reset_metrics(&self) {
2596            self.exporter.reset();
2597        }
2598
2599        fn check_no_metrics(&self) {
2600            let resource_metrics = self
2601                .exporter
2602                .get_finished_metrics()
2603                .expect("metrics expected to be exported"); // TODO: Need to fix InMemoryMetricExporter to return None.
2604
2605            assert!(resource_metrics.is_empty(), "no metrics should be exported");
2606        }
2607
2608        fn get_aggregation<T: Aggregation>(
2609            &mut self,
2610            counter_name: &str,
2611            unit_name: Option<&str>,
2612        ) -> &T {
2613            self.resource_metrics = self
2614                .exporter
2615                .get_finished_metrics()
2616                .expect("metrics expected to be exported");
2617
2618            assert!(
2619                !self.resource_metrics.is_empty(),
2620                "no metrics were exported"
2621            );
2622
2623            assert!(
2624                self.resource_metrics.len() == 1,
2625                "Expected single resource metrics."
2626            );
2627            let resource_metric = self
2628                .resource_metrics
2629                .first()
2630                .expect("This should contain exactly one resource metric, as validated above.");
2631
2632            assert!(
2633                !resource_metric.scope_metrics.is_empty(),
2634                "No scope metrics in latest export"
2635            );
2636            assert!(!resource_metric.scope_metrics[0].metrics.is_empty());
2637
2638            let metric = &resource_metric.scope_metrics[0].metrics[0];
2639            assert_eq!(metric.name, counter_name);
2640            if let Some(expected_unit) = unit_name {
2641                assert_eq!(metric.unit, expected_unit);
2642            }
2643
2644            metric
2645                .data
2646                .as_any()
2647                .downcast_ref::<T>()
2648                .expect("Failed to cast aggregation to expected type")
2649        }
2650
2651        fn get_from_multiple_aggregations<T: Aggregation>(
2652            &mut self,
2653            counter_name: &str,
2654            unit_name: Option<&str>,
2655            invocation_count: usize,
2656        ) -> Vec<&T> {
2657            self.resource_metrics = self
2658                .exporter
2659                .get_finished_metrics()
2660                .expect("metrics expected to be exported");
2661
2662            assert!(
2663                !self.resource_metrics.is_empty(),
2664                "no metrics were exported"
2665            );
2666
2667            assert_eq!(
2668                self.resource_metrics.len(),
2669                invocation_count,
2670                "Expected collect to be called {} times",
2671                invocation_count
2672            );
2673
2674            let result = self
2675                .resource_metrics
2676                .iter()
2677                .map(|resource_metric| {
2678                    assert!(
2679                        !resource_metric.scope_metrics.is_empty(),
2680                        "An export with no scope metrics occurred"
2681                    );
2682
2683                    assert!(!resource_metric.scope_metrics[0].metrics.is_empty());
2684
2685                    let metric = &resource_metric.scope_metrics[0].metrics[0];
2686                    assert_eq!(metric.name, counter_name);
2687
2688                    if let Some(expected_unit) = unit_name {
2689                        assert_eq!(metric.unit, expected_unit);
2690                    }
2691
2692                    let aggregation = metric
2693                        .data
2694                        .as_any()
2695                        .downcast_ref::<T>()
2696                        .expect("Failed to cast aggregation to expected type");
2697                    aggregation
2698                })
2699                .collect::<Vec<_>>();
2700
2701            result
2702        }
2703    }
2704}