opentelemetry_sdk/metrics/internal/
mod.rs1mod aggregate;
2mod exponential_histogram;
3mod histogram;
4mod last_value;
5mod precomputed_sum;
6mod sum;
7
8use core::fmt;
9use std::collections::{HashMap, HashSet};
10use std::mem::swap;
11use std::ops::{Add, AddAssign, DerefMut, Sub};
12use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
13use std::sync::{Arc, OnceLock, RwLock};
14
15use aggregate::{is_under_cardinality_limit, STREAM_CARDINALITY_LIMIT};
16pub(crate) use aggregate::{AggregateBuilder, AggregateFns, ComputeAggregation, Measure};
17pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
18use opentelemetry::{otel_warn, KeyValue};
19
20pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: OnceLock<Vec<KeyValue>> = OnceLock::new();
22
23#[inline]
24fn stream_overflow_attributes() -> &'static Vec<KeyValue> {
25 STREAM_OVERFLOW_ATTRIBUTES.get_or_init(|| vec![KeyValue::new("otel.metric.overflow", "true")])
26}
27
28pub(crate) trait Aggregator {
29 type InitConfig;
32
33 type PreComputedValue;
37
38 fn create(init: &Self::InitConfig) -> Self;
40
41 fn update(&self, value: Self::PreComputedValue);
43
44 fn clone_and_reset(&self, init: &Self::InitConfig) -> Self;
46}
47
48pub(crate) struct ValueMap<A>
53where
54 A: Aggregator,
55{
56 trackers: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,
58
59 trackers_for_collect: OnceLock<RwLock<HashMap<Vec<KeyValue>, Arc<A>>>>,
63
64 count: AtomicUsize,
66 has_no_attribute_value: AtomicBool,
68 no_attribute_tracker: A,
70 config: A::InitConfig,
72}
73
74impl<A> ValueMap<A>
75where
76 A: Aggregator,
77{
78 fn new(config: A::InitConfig) -> Self {
79 ValueMap {
80 trackers: RwLock::new(HashMap::with_capacity(1 + STREAM_CARDINALITY_LIMIT)),
81 trackers_for_collect: OnceLock::new(),
82 has_no_attribute_value: AtomicBool::new(false),
83 no_attribute_tracker: A::create(&config),
84 count: AtomicUsize::new(0),
85 config,
86 }
87 }
88
89 #[inline]
90 fn trackers_for_collect(&self) -> &RwLock<HashMap<Vec<KeyValue>, Arc<A>>> {
91 self.trackers_for_collect
92 .get_or_init(|| RwLock::new(HashMap::with_capacity(1 + STREAM_CARDINALITY_LIMIT)))
93 }
94
95 fn measure(&self, value: A::PreComputedValue, attributes: &[KeyValue]) {
96 if attributes.is_empty() {
97 self.no_attribute_tracker.update(value);
98 self.has_no_attribute_value.store(true, Ordering::Release);
99 return;
100 }
101
102 let Ok(trackers) = self.trackers.read() else {
103 return;
104 };
105
106 if let Some(tracker) = trackers.get(attributes) {
108 tracker.update(value);
109 return;
110 }
111
112 let sorted_attrs = sort_and_dedup(attributes);
114 if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
115 tracker.update(value);
116 return;
117 }
118
119 drop(trackers);
121
122 let Ok(mut trackers) = self.trackers.write() else {
123 return;
124 };
125
126 if let Some(tracker) = trackers.get(attributes) {
129 tracker.update(value);
130 } else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
131 tracker.update(value);
132 } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) {
133 let new_tracker = Arc::new(A::create(&self.config));
134 new_tracker.update(value);
135
136 trackers.insert(attributes.to_vec(), new_tracker.clone());
138 trackers.insert(sorted_attrs, new_tracker);
139
140 self.count.fetch_add(1, Ordering::SeqCst);
141 } else if let Some(overflow_value) = trackers.get(stream_overflow_attributes().as_slice()) {
142 overflow_value.update(value);
143 } else {
144 let new_tracker = A::create(&self.config);
145 new_tracker.update(value);
146 trackers.insert(stream_overflow_attributes().clone(), Arc::new(new_tracker));
147 otel_warn!( name: "ValueMap.measure",
148 message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged."
149 );
150 }
151 }
152
153 pub(crate) fn collect_readonly<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
156 where
157 MapFn: FnMut(Vec<KeyValue>, &A) -> Res,
158 {
159 prepare_data(dest, self.count.load(Ordering::SeqCst));
160 if self.has_no_attribute_value.load(Ordering::Acquire) {
161 dest.push(map_fn(vec![], &self.no_attribute_tracker));
162 }
163
164 let Ok(trackers) = self.trackers.read() else {
165 return;
166 };
167
168 let mut seen = HashSet::new();
169 for (attrs, tracker) in trackers.iter() {
170 if seen.insert(Arc::as_ptr(tracker)) {
171 dest.push(map_fn(attrs.clone(), tracker));
172 }
173 }
174 }
175
176 pub(crate) fn collect_and_reset<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
179 where
180 MapFn: FnMut(Vec<KeyValue>, A) -> Res,
181 {
182 prepare_data(dest, self.count.load(Ordering::SeqCst));
183 if self.has_no_attribute_value.swap(false, Ordering::AcqRel) {
184 dest.push(map_fn(
185 vec![],
186 self.no_attribute_tracker.clone_and_reset(&self.config),
187 ));
188 }
189
190 if let Ok(mut trackers_collect) = self.trackers_for_collect().write() {
191 if let Ok(mut trackers_current) = self.trackers.write() {
192 swap(trackers_collect.deref_mut(), trackers_current.deref_mut());
193 self.count.store(0, Ordering::SeqCst);
194 } else {
195 otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers lock poisoned");
196 return;
197 }
198
199 let mut seen = HashSet::new();
200 for (attrs, tracker) in trackers_collect.drain() {
201 if seen.insert(Arc::as_ptr(&tracker)) {
202 dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config)));
203 }
204 }
205 } else {
206 otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers for collect lock poisoned");
207 }
208 }
209}
210
211fn prepare_data<T>(data: &mut Vec<T>, list_len: usize) {
213 data.clear();
214 let total_len = list_len + 2; if total_len > data.capacity() {
216 data.reserve_exact(total_len - data.capacity());
217 }
218}
219
220fn sort_and_dedup(attributes: &[KeyValue]) -> Vec<KeyValue> {
221 let mut sorted = attributes.to_vec();
225 sorted.sort_unstable_by(|a, b| a.key.cmp(&b.key));
226 sorted.dedup_by(|a, b| a.key == b.key);
227 sorted
228}
229
230pub(crate) trait AtomicTracker<T>: Sync + Send + 'static {
233 fn store(&self, _value: T);
234 fn add(&self, _value: T);
235 fn get_value(&self) -> T;
236 fn get_and_reset_value(&self) -> T;
237}
238
239pub(crate) trait AtomicallyUpdate<T> {
241 type AtomicTracker: AtomicTracker<T>;
242 fn new_atomic_tracker(init: T) -> Self::AtomicTracker;
243}
244
245pub(crate) trait Number:
246 Add<Output = Self>
247 + AddAssign
248 + Sub<Output = Self>
249 + PartialOrd
250 + fmt::Debug
251 + Clone
252 + Copy
253 + PartialEq
254 + Default
255 + Send
256 + Sync
257 + 'static
258 + AtomicallyUpdate<Self>
259{
260 fn min() -> Self;
261 fn max() -> Self;
262
263 fn into_float(self) -> f64;
264}
265
266impl Number for i64 {
267 fn min() -> Self {
268 i64::MIN
269 }
270
271 fn max() -> Self {
272 i64::MAX
273 }
274
275 fn into_float(self) -> f64 {
276 self as f64
278 }
279}
280impl Number for u64 {
281 fn min() -> Self {
282 u64::MIN
283 }
284
285 fn max() -> Self {
286 u64::MAX
287 }
288
289 fn into_float(self) -> f64 {
290 self as f64
292 }
293}
294impl Number for f64 {
295 fn min() -> Self {
296 f64::MIN
297 }
298
299 fn max() -> Self {
300 f64::MAX
301 }
302
303 fn into_float(self) -> f64 {
304 self
305 }
306}
307
308impl AtomicTracker<u64> for AtomicU64 {
309 fn store(&self, value: u64) {
310 self.store(value, Ordering::Relaxed);
311 }
312
313 fn add(&self, value: u64) {
314 self.fetch_add(value, Ordering::Relaxed);
315 }
316
317 fn get_value(&self) -> u64 {
318 self.load(Ordering::Relaxed)
319 }
320
321 fn get_and_reset_value(&self) -> u64 {
322 self.swap(0, Ordering::Relaxed)
323 }
324}
325
326impl AtomicallyUpdate<u64> for u64 {
327 type AtomicTracker = AtomicU64;
328
329 fn new_atomic_tracker(init: u64) -> Self::AtomicTracker {
330 AtomicU64::new(init)
331 }
332}
333
334impl AtomicTracker<i64> for AtomicI64 {
335 fn store(&self, value: i64) {
336 self.store(value, Ordering::Relaxed);
337 }
338
339 fn add(&self, value: i64) {
340 self.fetch_add(value, Ordering::Relaxed);
341 }
342
343 fn get_value(&self) -> i64 {
344 self.load(Ordering::Relaxed)
345 }
346
347 fn get_and_reset_value(&self) -> i64 {
348 self.swap(0, Ordering::Relaxed)
349 }
350}
351
352impl AtomicallyUpdate<i64> for i64 {
353 type AtomicTracker = AtomicI64;
354
355 fn new_atomic_tracker(init: i64) -> Self::AtomicTracker {
356 AtomicI64::new(init)
357 }
358}
359
360pub(crate) struct F64AtomicTracker {
361 inner: AtomicU64, }
363
364impl F64AtomicTracker {
365 fn new(init: f64) -> Self {
366 let value_as_u64 = init.to_bits();
367 F64AtomicTracker {
368 inner: AtomicU64::new(value_as_u64),
369 }
370 }
371}
372
373impl AtomicTracker<f64> for F64AtomicTracker {
374 fn store(&self, value: f64) {
375 let value_as_u64 = value.to_bits();
376 self.inner.store(value_as_u64, Ordering::Relaxed);
377 }
378
379 fn add(&self, value: f64) {
380 let mut current_value_as_u64 = self.inner.load(Ordering::Relaxed);
381
382 loop {
383 let current_value = f64::from_bits(current_value_as_u64);
384 let new_value = current_value + value;
385 let new_value_as_u64 = new_value.to_bits();
386 match self.inner.compare_exchange(
387 current_value_as_u64,
388 new_value_as_u64,
389 Ordering::Relaxed,
390 Ordering::Relaxed,
391 ) {
392 Ok(_) => return,
394
395 Err(v) => current_value_as_u64 = v,
398 }
399 }
400 }
401
402 fn get_value(&self) -> f64 {
403 let value_as_u64 = self.inner.load(Ordering::Relaxed);
404 f64::from_bits(value_as_u64)
405 }
406
407 fn get_and_reset_value(&self) -> f64 {
408 let zero_as_u64 = 0.0_f64.to_bits();
409 let value = self.inner.swap(zero_as_u64, Ordering::Relaxed);
410 f64::from_bits(value)
411 }
412}
413
414impl AtomicallyUpdate<f64> for f64 {
415 type AtomicTracker = F64AtomicTracker;
416
417 fn new_atomic_tracker(init: f64) -> Self::AtomicTracker {
418 F64AtomicTracker::new(init)
419 }
420}
421
422#[cfg(test)]
423mod tests {
424 use super::*;
425
426 #[test]
427 fn can_store_u64_atomic_value() {
428 let atomic = u64::new_atomic_tracker(0);
429 let atomic_tracker = &atomic as &dyn AtomicTracker<u64>;
430
431 let value = atomic.get_value();
432 assert_eq!(value, 0);
433
434 atomic_tracker.store(25);
435 let value = atomic.get_value();
436 assert_eq!(value, 25);
437 }
438
439 #[test]
440 fn can_add_and_get_u64_atomic_value() {
441 let atomic = u64::new_atomic_tracker(0);
442 atomic.add(15);
443 atomic.add(10);
444
445 let value = atomic.get_value();
446 assert_eq!(value, 25);
447 }
448
449 #[test]
450 fn can_reset_u64_atomic_value() {
451 let atomic = u64::new_atomic_tracker(0);
452 atomic.add(15);
453
454 let value = atomic.get_and_reset_value();
455 let value2 = atomic.get_value();
456
457 assert_eq!(value, 15, "Incorrect first value");
458 assert_eq!(value2, 0, "Incorrect second value");
459 }
460
461 #[test]
462 fn can_store_i64_atomic_value() {
463 let atomic = i64::new_atomic_tracker(0);
464 let atomic_tracker = &atomic as &dyn AtomicTracker<i64>;
465
466 let value = atomic.get_value();
467 assert_eq!(value, 0);
468
469 atomic_tracker.store(-25);
470 let value = atomic.get_value();
471 assert_eq!(value, -25);
472
473 atomic_tracker.store(25);
474 let value = atomic.get_value();
475 assert_eq!(value, 25);
476 }
477
478 #[test]
479 fn can_add_and_get_i64_atomic_value() {
480 let atomic = i64::new_atomic_tracker(0);
481 atomic.add(15);
482 atomic.add(-10);
483
484 let value = atomic.get_value();
485 assert_eq!(value, 5);
486 }
487
488 #[test]
489 fn can_reset_i64_atomic_value() {
490 let atomic = i64::new_atomic_tracker(0);
491 atomic.add(15);
492
493 let value = atomic.get_and_reset_value();
494 let value2 = atomic.get_value();
495
496 assert_eq!(value, 15, "Incorrect first value");
497 assert_eq!(value2, 0, "Incorrect second value");
498 }
499
500 #[test]
501 fn can_store_f64_atomic_value() {
502 let atomic = f64::new_atomic_tracker(0.0);
503 let atomic_tracker = &atomic as &dyn AtomicTracker<f64>;
504
505 let value = atomic.get_value();
506 assert_eq!(value, 0.0);
507
508 atomic_tracker.store(-15.5);
509 let value = atomic.get_value();
510 assert!(f64::abs(-15.5 - value) < 0.0001);
511
512 atomic_tracker.store(25.7);
513 let value = atomic.get_value();
514 assert!(f64::abs(25.7 - value) < 0.0001);
515 }
516
517 #[test]
518 fn can_add_and_get_f64_atomic_value() {
519 let atomic = f64::new_atomic_tracker(0.0);
520 atomic.add(15.3);
521 atomic.add(10.4);
522
523 let value = atomic.get_value();
524
525 assert!(f64::abs(25.7 - value) < 0.0001);
526 }
527
528 #[test]
529 fn can_reset_f64_atomic_value() {
530 let atomic = f64::new_atomic_tracker(0.0);
531 atomic.add(15.5);
532
533 let value = atomic.get_and_reset_value();
534 let value2 = atomic.get_value();
535
536 assert!(f64::abs(15.5 - value) < 0.0001, "Incorrect first value");
537 assert!(f64::abs(0.0 - value2) < 0.0001, "Incorrect second value");
538 }
539}