opentelemetry_sdk/metrics/internal/
mod.rs

1mod aggregate;
2mod exponential_histogram;
3mod histogram;
4mod last_value;
5mod precomputed_sum;
6mod sum;
7
8use core::fmt;
9use std::collections::{HashMap, HashSet};
10use std::mem::swap;
11use std::ops::{Add, AddAssign, DerefMut, Sub};
12use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
13use std::sync::{Arc, OnceLock, RwLock};
14
15use aggregate::{is_under_cardinality_limit, STREAM_CARDINALITY_LIMIT};
16pub(crate) use aggregate::{AggregateBuilder, AggregateFns, ComputeAggregation, Measure};
17pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
18use opentelemetry::{otel_warn, KeyValue};
19
20// TODO Replace it with LazyLock once it is stable
21pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: OnceLock<Vec<KeyValue>> = OnceLock::new();
22
23#[inline]
24fn stream_overflow_attributes() -> &'static Vec<KeyValue> {
25    STREAM_OVERFLOW_ATTRIBUTES.get_or_init(|| vec![KeyValue::new("otel.metric.overflow", "true")])
26}
27
28pub(crate) trait Aggregator {
29    /// A static configuration that is needed in order to initialize aggregator.
30    /// E.g. bucket_size at creation time .
31    type InitConfig;
32
33    /// Some aggregators can do some computations before updating aggregator.
34    /// This helps to reduce contention for aggregators because it makes
35    /// [`Aggregator::update`] as short as possible.
36    type PreComputedValue;
37
38    /// Called everytime a new attribute-set is stored.
39    fn create(init: &Self::InitConfig) -> Self;
40
41    /// Called for each measurement.
42    fn update(&self, value: Self::PreComputedValue);
43
44    /// Return current value and reset this instance
45    fn clone_and_reset(&self, init: &Self::InitConfig) -> Self;
46}
47
48/// The storage for sums.
49///
50/// This structure is parametrized by an `Operation` that indicates how
51/// updates to the underlying value trackers should be performed.
52pub(crate) struct ValueMap<A>
53where
54    A: Aggregator,
55{
56    /// Trackers store the values associated with different attribute sets.
57    trackers: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,
58
59    /// Used ONLY by Delta collect. The data type must match the one used in
60    /// `trackers` to allow mem::swap. Wrapping the type in `OnceLock` to
61    /// avoid this allocation for Cumulative aggregation.
62    trackers_for_collect: OnceLock<RwLock<HashMap<Vec<KeyValue>, Arc<A>>>>,
63
64    /// Number of different attribute set stored in the `trackers` map.
65    count: AtomicUsize,
66    /// Indicates whether a value with no attributes has been stored.
67    has_no_attribute_value: AtomicBool,
68    /// Tracker for values with no attributes attached.
69    no_attribute_tracker: A,
70    /// Configuration for an Aggregator
71    config: A::InitConfig,
72}
73
74impl<A> ValueMap<A>
75where
76    A: Aggregator,
77{
78    fn new(config: A::InitConfig) -> Self {
79        ValueMap {
80            trackers: RwLock::new(HashMap::with_capacity(1 + STREAM_CARDINALITY_LIMIT)),
81            trackers_for_collect: OnceLock::new(),
82            has_no_attribute_value: AtomicBool::new(false),
83            no_attribute_tracker: A::create(&config),
84            count: AtomicUsize::new(0),
85            config,
86        }
87    }
88
89    #[inline]
90    fn trackers_for_collect(&self) -> &RwLock<HashMap<Vec<KeyValue>, Arc<A>>> {
91        self.trackers_for_collect
92            .get_or_init(|| RwLock::new(HashMap::with_capacity(1 + STREAM_CARDINALITY_LIMIT)))
93    }
94
95    fn measure(&self, value: A::PreComputedValue, attributes: &[KeyValue]) {
96        if attributes.is_empty() {
97            self.no_attribute_tracker.update(value);
98            self.has_no_attribute_value.store(true, Ordering::Release);
99            return;
100        }
101
102        let Ok(trackers) = self.trackers.read() else {
103            return;
104        };
105
106        // Try to retrieve and update the tracker with the attributes in the provided order first
107        if let Some(tracker) = trackers.get(attributes) {
108            tracker.update(value);
109            return;
110        }
111
112        // Try to retrieve and update the tracker with the attributes sorted.
113        let sorted_attrs = sort_and_dedup(attributes);
114        if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
115            tracker.update(value);
116            return;
117        }
118
119        // Give up the read lock before acquiring the write lock.
120        drop(trackers);
121
122        let Ok(mut trackers) = self.trackers.write() else {
123            return;
124        };
125
126        // Recheck both the provided and sorted orders after acquiring the write lock
127        // in case another thread has pushed an update in the meantime.
128        if let Some(tracker) = trackers.get(attributes) {
129            tracker.update(value);
130        } else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
131            tracker.update(value);
132        } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) {
133            let new_tracker = Arc::new(A::create(&self.config));
134            new_tracker.update(value);
135
136            // Insert tracker with the attributes in the provided and sorted orders
137            trackers.insert(attributes.to_vec(), new_tracker.clone());
138            trackers.insert(sorted_attrs, new_tracker);
139
140            self.count.fetch_add(1, Ordering::SeqCst);
141        } else if let Some(overflow_value) = trackers.get(stream_overflow_attributes().as_slice()) {
142            overflow_value.update(value);
143        } else {
144            let new_tracker = A::create(&self.config);
145            new_tracker.update(value);
146            trackers.insert(stream_overflow_attributes().clone(), Arc::new(new_tracker));
147            otel_warn!( name: "ValueMap.measure",
148                message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged."
149            );
150        }
151    }
152
153    /// Iterate through all attribute sets and populate `DataPoints` in readonly mode.
154    /// This is used in Cumulative temporality mode, where [`ValueMap`] is not cleared.
155    pub(crate) fn collect_readonly<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
156    where
157        MapFn: FnMut(Vec<KeyValue>, &A) -> Res,
158    {
159        prepare_data(dest, self.count.load(Ordering::SeqCst));
160        if self.has_no_attribute_value.load(Ordering::Acquire) {
161            dest.push(map_fn(vec![], &self.no_attribute_tracker));
162        }
163
164        let Ok(trackers) = self.trackers.read() else {
165            return;
166        };
167
168        let mut seen = HashSet::new();
169        for (attrs, tracker) in trackers.iter() {
170            if seen.insert(Arc::as_ptr(tracker)) {
171                dest.push(map_fn(attrs.clone(), tracker));
172            }
173        }
174    }
175
176    /// Iterate through all attribute sets, populate `DataPoints` and reset.
177    /// This is used in Delta temporality mode, where [`ValueMap`] is reset after collection.
178    pub(crate) fn collect_and_reset<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
179    where
180        MapFn: FnMut(Vec<KeyValue>, A) -> Res,
181    {
182        prepare_data(dest, self.count.load(Ordering::SeqCst));
183        if self.has_no_attribute_value.swap(false, Ordering::AcqRel) {
184            dest.push(map_fn(
185                vec![],
186                self.no_attribute_tracker.clone_and_reset(&self.config),
187            ));
188        }
189
190        if let Ok(mut trackers_collect) = self.trackers_for_collect().write() {
191            if let Ok(mut trackers_current) = self.trackers.write() {
192                swap(trackers_collect.deref_mut(), trackers_current.deref_mut());
193                self.count.store(0, Ordering::SeqCst);
194            } else {
195                otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers lock poisoned");
196                return;
197            }
198
199            let mut seen = HashSet::new();
200            for (attrs, tracker) in trackers_collect.drain() {
201                if seen.insert(Arc::as_ptr(&tracker)) {
202                    dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config)));
203                }
204            }
205        } else {
206            otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers for collect lock poisoned");
207        }
208    }
209}
210
211/// Clear and allocate exactly required amount of space for all attribute-sets
212fn prepare_data<T>(data: &mut Vec<T>, list_len: usize) {
213    data.clear();
214    let total_len = list_len + 2; // to account for no_attributes case + overflow state
215    if total_len > data.capacity() {
216        data.reserve_exact(total_len - data.capacity());
217    }
218}
219
220fn sort_and_dedup(attributes: &[KeyValue]) -> Vec<KeyValue> {
221    // Use newly allocated vec here as incoming attributes are immutable so
222    // cannot sort/de-dup in-place. TODO: This allocation can be avoided by
223    // leveraging a ThreadLocal vec.
224    let mut sorted = attributes.to_vec();
225    sorted.sort_unstable_by(|a, b| a.key.cmp(&b.key));
226    sorted.dedup_by(|a, b| a.key == b.key);
227    sorted
228}
229
230/// Marks a type that can have a value added and retrieved atomically. Required since
231/// different types have different backing atomic mechanisms
232pub(crate) trait AtomicTracker<T>: Sync + Send + 'static {
233    fn store(&self, _value: T);
234    fn add(&self, _value: T);
235    fn get_value(&self) -> T;
236    fn get_and_reset_value(&self) -> T;
237}
238
239/// Marks a type that can have an atomic tracker generated for it
240pub(crate) trait AtomicallyUpdate<T> {
241    type AtomicTracker: AtomicTracker<T>;
242    fn new_atomic_tracker(init: T) -> Self::AtomicTracker;
243}
244
245pub(crate) trait Number:
246    Add<Output = Self>
247    + AddAssign
248    + Sub<Output = Self>
249    + PartialOrd
250    + fmt::Debug
251    + Clone
252    + Copy
253    + PartialEq
254    + Default
255    + Send
256    + Sync
257    + 'static
258    + AtomicallyUpdate<Self>
259{
260    fn min() -> Self;
261    fn max() -> Self;
262
263    fn into_float(self) -> f64;
264}
265
266impl Number for i64 {
267    fn min() -> Self {
268        i64::MIN
269    }
270
271    fn max() -> Self {
272        i64::MAX
273    }
274
275    fn into_float(self) -> f64 {
276        // May have precision loss at high values
277        self as f64
278    }
279}
280impl Number for u64 {
281    fn min() -> Self {
282        u64::MIN
283    }
284
285    fn max() -> Self {
286        u64::MAX
287    }
288
289    fn into_float(self) -> f64 {
290        // May have precision loss at high values
291        self as f64
292    }
293}
294impl Number for f64 {
295    fn min() -> Self {
296        f64::MIN
297    }
298
299    fn max() -> Self {
300        f64::MAX
301    }
302
303    fn into_float(self) -> f64 {
304        self
305    }
306}
307
308impl AtomicTracker<u64> for AtomicU64 {
309    fn store(&self, value: u64) {
310        self.store(value, Ordering::Relaxed);
311    }
312
313    fn add(&self, value: u64) {
314        self.fetch_add(value, Ordering::Relaxed);
315    }
316
317    fn get_value(&self) -> u64 {
318        self.load(Ordering::Relaxed)
319    }
320
321    fn get_and_reset_value(&self) -> u64 {
322        self.swap(0, Ordering::Relaxed)
323    }
324}
325
326impl AtomicallyUpdate<u64> for u64 {
327    type AtomicTracker = AtomicU64;
328
329    fn new_atomic_tracker(init: u64) -> Self::AtomicTracker {
330        AtomicU64::new(init)
331    }
332}
333
334impl AtomicTracker<i64> for AtomicI64 {
335    fn store(&self, value: i64) {
336        self.store(value, Ordering::Relaxed);
337    }
338
339    fn add(&self, value: i64) {
340        self.fetch_add(value, Ordering::Relaxed);
341    }
342
343    fn get_value(&self) -> i64 {
344        self.load(Ordering::Relaxed)
345    }
346
347    fn get_and_reset_value(&self) -> i64 {
348        self.swap(0, Ordering::Relaxed)
349    }
350}
351
352impl AtomicallyUpdate<i64> for i64 {
353    type AtomicTracker = AtomicI64;
354
355    fn new_atomic_tracker(init: i64) -> Self::AtomicTracker {
356        AtomicI64::new(init)
357    }
358}
359
360pub(crate) struct F64AtomicTracker {
361    inner: AtomicU64, // Floating points don't have true atomics, so we need to use the their binary representation to perform atomic operations
362}
363
364impl F64AtomicTracker {
365    fn new(init: f64) -> Self {
366        let value_as_u64 = init.to_bits();
367        F64AtomicTracker {
368            inner: AtomicU64::new(value_as_u64),
369        }
370    }
371}
372
373impl AtomicTracker<f64> for F64AtomicTracker {
374    fn store(&self, value: f64) {
375        let value_as_u64 = value.to_bits();
376        self.inner.store(value_as_u64, Ordering::Relaxed);
377    }
378
379    fn add(&self, value: f64) {
380        let mut current_value_as_u64 = self.inner.load(Ordering::Relaxed);
381
382        loop {
383            let current_value = f64::from_bits(current_value_as_u64);
384            let new_value = current_value + value;
385            let new_value_as_u64 = new_value.to_bits();
386            match self.inner.compare_exchange(
387                current_value_as_u64,
388                new_value_as_u64,
389                Ordering::Relaxed,
390                Ordering::Relaxed,
391            ) {
392                // Succeeded in updating the value
393                Ok(_) => return,
394
395                // Some other thread changed the value before this thread could update it.
396                // Read the latest value again and try to swap it with the recomputed `new_value_as_u64`.
397                Err(v) => current_value_as_u64 = v,
398            }
399        }
400    }
401
402    fn get_value(&self) -> f64 {
403        let value_as_u64 = self.inner.load(Ordering::Relaxed);
404        f64::from_bits(value_as_u64)
405    }
406
407    fn get_and_reset_value(&self) -> f64 {
408        let zero_as_u64 = 0.0_f64.to_bits();
409        let value = self.inner.swap(zero_as_u64, Ordering::Relaxed);
410        f64::from_bits(value)
411    }
412}
413
414impl AtomicallyUpdate<f64> for f64 {
415    type AtomicTracker = F64AtomicTracker;
416
417    fn new_atomic_tracker(init: f64) -> Self::AtomicTracker {
418        F64AtomicTracker::new(init)
419    }
420}
421
422#[cfg(test)]
423mod tests {
424    use super::*;
425
426    #[test]
427    fn can_store_u64_atomic_value() {
428        let atomic = u64::new_atomic_tracker(0);
429        let atomic_tracker = &atomic as &dyn AtomicTracker<u64>;
430
431        let value = atomic.get_value();
432        assert_eq!(value, 0);
433
434        atomic_tracker.store(25);
435        let value = atomic.get_value();
436        assert_eq!(value, 25);
437    }
438
439    #[test]
440    fn can_add_and_get_u64_atomic_value() {
441        let atomic = u64::new_atomic_tracker(0);
442        atomic.add(15);
443        atomic.add(10);
444
445        let value = atomic.get_value();
446        assert_eq!(value, 25);
447    }
448
449    #[test]
450    fn can_reset_u64_atomic_value() {
451        let atomic = u64::new_atomic_tracker(0);
452        atomic.add(15);
453
454        let value = atomic.get_and_reset_value();
455        let value2 = atomic.get_value();
456
457        assert_eq!(value, 15, "Incorrect first value");
458        assert_eq!(value2, 0, "Incorrect second value");
459    }
460
461    #[test]
462    fn can_store_i64_atomic_value() {
463        let atomic = i64::new_atomic_tracker(0);
464        let atomic_tracker = &atomic as &dyn AtomicTracker<i64>;
465
466        let value = atomic.get_value();
467        assert_eq!(value, 0);
468
469        atomic_tracker.store(-25);
470        let value = atomic.get_value();
471        assert_eq!(value, -25);
472
473        atomic_tracker.store(25);
474        let value = atomic.get_value();
475        assert_eq!(value, 25);
476    }
477
478    #[test]
479    fn can_add_and_get_i64_atomic_value() {
480        let atomic = i64::new_atomic_tracker(0);
481        atomic.add(15);
482        atomic.add(-10);
483
484        let value = atomic.get_value();
485        assert_eq!(value, 5);
486    }
487
488    #[test]
489    fn can_reset_i64_atomic_value() {
490        let atomic = i64::new_atomic_tracker(0);
491        atomic.add(15);
492
493        let value = atomic.get_and_reset_value();
494        let value2 = atomic.get_value();
495
496        assert_eq!(value, 15, "Incorrect first value");
497        assert_eq!(value2, 0, "Incorrect second value");
498    }
499
500    #[test]
501    fn can_store_f64_atomic_value() {
502        let atomic = f64::new_atomic_tracker(0.0);
503        let atomic_tracker = &atomic as &dyn AtomicTracker<f64>;
504
505        let value = atomic.get_value();
506        assert_eq!(value, 0.0);
507
508        atomic_tracker.store(-15.5);
509        let value = atomic.get_value();
510        assert!(f64::abs(-15.5 - value) < 0.0001);
511
512        atomic_tracker.store(25.7);
513        let value = atomic.get_value();
514        assert!(f64::abs(25.7 - value) < 0.0001);
515    }
516
517    #[test]
518    fn can_add_and_get_f64_atomic_value() {
519        let atomic = f64::new_atomic_tracker(0.0);
520        atomic.add(15.3);
521        atomic.add(10.4);
522
523        let value = atomic.get_value();
524
525        assert!(f64::abs(25.7 - value) < 0.0001);
526    }
527
528    #[test]
529    fn can_reset_f64_atomic_value() {
530        let atomic = f64::new_atomic_tracker(0.0);
531        atomic.add(15.5);
532
533        let value = atomic.get_and_reset_value();
534        let value2 = atomic.get_value();
535
536        assert!(f64::abs(15.5 - value) < 0.0001, "Incorrect first value");
537        assert!(f64::abs(0.0 - value2) < 0.0001, "Incorrect second value");
538    }
539}