opentelemetry_proto/transform/
metrics.rs

1// The prost currently will generate a non optional deprecated field for labels.
2// We cannot assign value to it otherwise clippy will complain.
3// We cannot ignore it as it's not an optional field.
4// We can remove this after we removed the labels field from proto.
5#[allow(deprecated)]
6#[cfg(feature = "gen-tonic-messages")]
7pub mod tonic {
8    use std::any::Any;
9    use std::fmt;
10
11    use opentelemetry::{otel_debug, Key, Value};
12    use opentelemetry_sdk::metrics::data::{
13        Exemplar as SdkExemplar, ExponentialHistogram as SdkExponentialHistogram,
14        Gauge as SdkGauge, Histogram as SdkHistogram, Metric as SdkMetric, ResourceMetrics,
15        ScopeMetrics as SdkScopeMetrics, Sum as SdkSum,
16    };
17    use opentelemetry_sdk::metrics::Temporality;
18    use opentelemetry_sdk::Resource as SdkResource;
19
20    use crate::proto::tonic::{
21        collector::metrics::v1::ExportMetricsServiceRequest,
22        common::v1::KeyValue,
23        metrics::v1::{
24            exemplar, exemplar::Value as TonicExemplarValue,
25            exponential_histogram_data_point::Buckets as TonicBuckets,
26            metric::Data as TonicMetricData, number_data_point,
27            number_data_point::Value as TonicDataPointValue,
28            AggregationTemporality as TonicTemporality, AggregationTemporality,
29            DataPointFlags as TonicDataPointFlags, Exemplar as TonicExemplar,
30            ExponentialHistogram as TonicExponentialHistogram,
31            ExponentialHistogramDataPoint as TonicExponentialHistogramDataPoint,
32            Gauge as TonicGauge, Histogram as TonicHistogram,
33            HistogramDataPoint as TonicHistogramDataPoint, Metric as TonicMetric,
34            NumberDataPoint as TonicNumberDataPoint, ResourceMetrics as TonicResourceMetrics,
35            ScopeMetrics as TonicScopeMetrics, Sum as TonicSum,
36        },
37        resource::v1::Resource as TonicResource,
38    };
39    use crate::transform::common::to_nanos;
40
41    impl From<u64> for exemplar::Value {
42        fn from(value: u64) -> Self {
43            exemplar::Value::AsInt(i64::try_from(value).unwrap_or_default())
44        }
45    }
46
47    impl From<i64> for exemplar::Value {
48        fn from(value: i64) -> Self {
49            exemplar::Value::AsInt(value)
50        }
51    }
52
53    impl From<f64> for exemplar::Value {
54        fn from(value: f64) -> Self {
55            exemplar::Value::AsDouble(value)
56        }
57    }
58
59    impl From<u64> for number_data_point::Value {
60        fn from(value: u64) -> Self {
61            number_data_point::Value::AsInt(i64::try_from(value).unwrap_or_default())
62        }
63    }
64
65    impl From<i64> for number_data_point::Value {
66        fn from(value: i64) -> Self {
67            number_data_point::Value::AsInt(value)
68        }
69    }
70
71    impl From<f64> for number_data_point::Value {
72        fn from(value: f64) -> Self {
73            number_data_point::Value::AsDouble(value)
74        }
75    }
76
77    impl From<(&Key, &Value)> for KeyValue {
78        fn from(kv: (&Key, &Value)) -> Self {
79            KeyValue {
80                key: kv.0.to_string(),
81                value: Some(kv.1.clone().into()),
82            }
83        }
84    }
85
86    impl From<&opentelemetry::KeyValue> for KeyValue {
87        fn from(kv: &opentelemetry::KeyValue) -> Self {
88            KeyValue {
89                key: kv.key.to_string(),
90                value: Some(kv.value.clone().into()),
91            }
92        }
93    }
94
95    impl From<Temporality> for AggregationTemporality {
96        fn from(temporality: Temporality) -> Self {
97            match temporality {
98                Temporality::Cumulative => AggregationTemporality::Cumulative,
99                Temporality::Delta => AggregationTemporality::Delta,
100                other => {
101                    otel_debug!(
102                        name: "AggregationTemporality::Unknown",
103                        message = "Unknown temporality,using default instead.",
104                        unknown_temporality = format!("{:?}", other),
105                        default_temporality = format!("{:?}", Temporality::Cumulative)
106                    );
107                    AggregationTemporality::Cumulative
108                }
109            }
110        }
111    }
112
113    impl From<&ResourceMetrics> for ExportMetricsServiceRequest {
114        fn from(rm: &ResourceMetrics) -> Self {
115            ExportMetricsServiceRequest {
116                resource_metrics: vec![TonicResourceMetrics {
117                    resource: Some((&rm.resource).into()),
118                    scope_metrics: rm.scope_metrics.iter().map(Into::into).collect(),
119                    schema_url: rm.resource.schema_url().map(Into::into).unwrap_or_default(),
120                }],
121            }
122        }
123    }
124
125    impl From<&SdkResource> for TonicResource {
126        fn from(resource: &SdkResource) -> Self {
127            TonicResource {
128                attributes: resource.iter().map(Into::into).collect(),
129                dropped_attributes_count: 0,
130            }
131        }
132    }
133
134    impl From<&SdkScopeMetrics> for TonicScopeMetrics {
135        fn from(sm: &SdkScopeMetrics) -> Self {
136            TonicScopeMetrics {
137                scope: Some((&sm.scope, None).into()),
138                metrics: sm.metrics.iter().map(Into::into).collect(),
139                schema_url: sm
140                    .scope
141                    .schema_url()
142                    .map(ToOwned::to_owned)
143                    .unwrap_or_default(),
144            }
145        }
146    }
147
148    impl From<&SdkMetric> for TonicMetric {
149        fn from(metric: &SdkMetric) -> Self {
150            TonicMetric {
151                name: metric.name.to_string(),
152                description: metric.description.to_string(),
153                unit: metric.unit.to_string(),
154                metadata: vec![], // internal and currently unused
155                data: metric.data.as_any().try_into().ok(),
156            }
157        }
158    }
159
160    impl TryFrom<&dyn Any> for TonicMetricData {
161        type Error = ();
162
163        fn try_from(data: &dyn Any) -> Result<Self, Self::Error> {
164            if let Some(hist) = data.downcast_ref::<SdkHistogram<i64>>() {
165                Ok(TonicMetricData::Histogram(hist.into()))
166            } else if let Some(hist) = data.downcast_ref::<SdkHistogram<u64>>() {
167                Ok(TonicMetricData::Histogram(hist.into()))
168            } else if let Some(hist) = data.downcast_ref::<SdkHistogram<f64>>() {
169                Ok(TonicMetricData::Histogram(hist.into()))
170            } else if let Some(hist) = data.downcast_ref::<SdkExponentialHistogram<i64>>() {
171                Ok(TonicMetricData::ExponentialHistogram(hist.into()))
172            } else if let Some(hist) = data.downcast_ref::<SdkExponentialHistogram<u64>>() {
173                Ok(TonicMetricData::ExponentialHistogram(hist.into()))
174            } else if let Some(hist) = data.downcast_ref::<SdkExponentialHistogram<f64>>() {
175                Ok(TonicMetricData::ExponentialHistogram(hist.into()))
176            } else if let Some(sum) = data.downcast_ref::<SdkSum<u64>>() {
177                Ok(TonicMetricData::Sum(sum.into()))
178            } else if let Some(sum) = data.downcast_ref::<SdkSum<i64>>() {
179                Ok(TonicMetricData::Sum(sum.into()))
180            } else if let Some(sum) = data.downcast_ref::<SdkSum<f64>>() {
181                Ok(TonicMetricData::Sum(sum.into()))
182            } else if let Some(gauge) = data.downcast_ref::<SdkGauge<u64>>() {
183                Ok(TonicMetricData::Gauge(gauge.into()))
184            } else if let Some(gauge) = data.downcast_ref::<SdkGauge<i64>>() {
185                Ok(TonicMetricData::Gauge(gauge.into()))
186            } else if let Some(gauge) = data.downcast_ref::<SdkGauge<f64>>() {
187                Ok(TonicMetricData::Gauge(gauge.into()))
188            } else {
189                otel_debug!(
190                    name: "TonicMetricData::UnknownAggregator",
191                    message= "Unknown aggregator type",
192                    unknown_type= format!("{:?}", data),
193                );
194                Err(())
195            }
196        }
197    }
198
199    trait Numeric: Into<TonicExemplarValue> + Into<TonicDataPointValue> + Copy {
200        // lossy at large values for u64 and i64 but otlp histograms only handle float values
201        fn into_f64(self) -> f64;
202    }
203
204    impl Numeric for u64 {
205        fn into_f64(self) -> f64 {
206            self as f64
207        }
208    }
209
210    impl Numeric for i64 {
211        fn into_f64(self) -> f64 {
212            self as f64
213        }
214    }
215
216    impl Numeric for f64 {
217        fn into_f64(self) -> f64 {
218            self
219        }
220    }
221
222    impl<T> From<&SdkHistogram<T>> for TonicHistogram
223    where
224        T: Numeric,
225    {
226        fn from(hist: &SdkHistogram<T>) -> Self {
227            TonicHistogram {
228                data_points: hist
229                    .data_points
230                    .iter()
231                    .map(|dp| TonicHistogramDataPoint {
232                        attributes: dp.attributes.iter().map(Into::into).collect(),
233                        start_time_unix_nano: to_nanos(hist.start_time),
234                        time_unix_nano: to_nanos(hist.time),
235                        count: dp.count,
236                        sum: Some(dp.sum.into_f64()),
237                        bucket_counts: dp.bucket_counts.clone(),
238                        explicit_bounds: dp.bounds.clone(),
239                        exemplars: dp.exemplars.iter().map(Into::into).collect(),
240                        flags: TonicDataPointFlags::default() as u32,
241                        min: dp.min.map(Numeric::into_f64),
242                        max: dp.max.map(Numeric::into_f64),
243                    })
244                    .collect(),
245                aggregation_temporality: TonicTemporality::from(hist.temporality).into(),
246            }
247        }
248    }
249
250    impl<T> From<&SdkExponentialHistogram<T>> for TonicExponentialHistogram
251    where
252        T: Numeric,
253    {
254        fn from(hist: &SdkExponentialHistogram<T>) -> Self {
255            TonicExponentialHistogram {
256                data_points: hist
257                    .data_points
258                    .iter()
259                    .map(|dp| TonicExponentialHistogramDataPoint {
260                        attributes: dp.attributes.iter().map(Into::into).collect(),
261                        start_time_unix_nano: to_nanos(hist.start_time),
262                        time_unix_nano: to_nanos(hist.time),
263                        count: dp.count as u64,
264                        sum: Some(dp.sum.into_f64()),
265                        scale: dp.scale.into(),
266                        zero_count: dp.zero_count,
267                        positive: Some(TonicBuckets {
268                            offset: dp.positive_bucket.offset,
269                            bucket_counts: dp.positive_bucket.counts.clone(),
270                        }),
271                        negative: Some(TonicBuckets {
272                            offset: dp.negative_bucket.offset,
273                            bucket_counts: dp.negative_bucket.counts.clone(),
274                        }),
275                        flags: TonicDataPointFlags::default() as u32,
276                        exemplars: dp.exemplars.iter().map(Into::into).collect(),
277                        min: dp.min.map(Numeric::into_f64),
278                        max: dp.max.map(Numeric::into_f64),
279                        zero_threshold: dp.zero_threshold,
280                    })
281                    .collect(),
282                aggregation_temporality: TonicTemporality::from(hist.temporality).into(),
283            }
284        }
285    }
286
287    impl<T> From<&SdkSum<T>> for TonicSum
288    where
289        T: fmt::Debug + Into<TonicExemplarValue> + Into<TonicDataPointValue> + Copy,
290    {
291        fn from(sum: &SdkSum<T>) -> Self {
292            TonicSum {
293                data_points: sum
294                    .data_points
295                    .iter()
296                    .map(|dp| TonicNumberDataPoint {
297                        attributes: dp.attributes.iter().map(Into::into).collect(),
298                        start_time_unix_nano: to_nanos(sum.start_time),
299                        time_unix_nano: to_nanos(sum.time),
300                        exemplars: dp.exemplars.iter().map(Into::into).collect(),
301                        flags: TonicDataPointFlags::default() as u32,
302                        value: Some(dp.value.into()),
303                    })
304                    .collect(),
305                aggregation_temporality: TonicTemporality::from(sum.temporality).into(),
306                is_monotonic: sum.is_monotonic,
307            }
308        }
309    }
310
311    impl<T> From<&SdkGauge<T>> for TonicGauge
312    where
313        T: fmt::Debug + Into<TonicExemplarValue> + Into<TonicDataPointValue> + Copy,
314    {
315        fn from(gauge: &SdkGauge<T>) -> Self {
316            TonicGauge {
317                data_points: gauge
318                    .data_points
319                    .iter()
320                    .map(|dp| TonicNumberDataPoint {
321                        attributes: dp.attributes.iter().map(Into::into).collect(),
322                        start_time_unix_nano: gauge.start_time.map(to_nanos).unwrap_or_default(),
323                        time_unix_nano: to_nanos(gauge.time),
324                        exemplars: dp.exemplars.iter().map(Into::into).collect(),
325                        flags: TonicDataPointFlags::default() as u32,
326                        value: Some(dp.value.into()),
327                    })
328                    .collect(),
329            }
330        }
331    }
332
333    impl<T> From<&SdkExemplar<T>> for TonicExemplar
334    where
335        T: Into<TonicExemplarValue> + Copy,
336    {
337        fn from(ex: &SdkExemplar<T>) -> Self {
338            TonicExemplar {
339                filtered_attributes: ex
340                    .filtered_attributes
341                    .iter()
342                    .map(|kv| (&kv.key, &kv.value).into())
343                    .collect(),
344                time_unix_nano: to_nanos(ex.time),
345                span_id: ex.span_id.into(),
346                trace_id: ex.trace_id.into(),
347                value: Some(ex.value.into()),
348            }
349        }
350    }
351}