opentelemetry_proto/transform/
metrics.rs1#[allow(deprecated)]
6#[cfg(feature = "gen-tonic-messages")]
7pub mod tonic {
8 use std::any::Any;
9 use std::fmt;
10
11 use opentelemetry::{otel_debug, Key, Value};
12 use opentelemetry_sdk::metrics::data::{
13 Exemplar as SdkExemplar, ExponentialHistogram as SdkExponentialHistogram,
14 Gauge as SdkGauge, Histogram as SdkHistogram, Metric as SdkMetric, ResourceMetrics,
15 ScopeMetrics as SdkScopeMetrics, Sum as SdkSum,
16 };
17 use opentelemetry_sdk::metrics::Temporality;
18 use opentelemetry_sdk::Resource as SdkResource;
19
20 use crate::proto::tonic::{
21 collector::metrics::v1::ExportMetricsServiceRequest,
22 common::v1::KeyValue,
23 metrics::v1::{
24 exemplar, exemplar::Value as TonicExemplarValue,
25 exponential_histogram_data_point::Buckets as TonicBuckets,
26 metric::Data as TonicMetricData, number_data_point,
27 number_data_point::Value as TonicDataPointValue,
28 AggregationTemporality as TonicTemporality, AggregationTemporality,
29 DataPointFlags as TonicDataPointFlags, Exemplar as TonicExemplar,
30 ExponentialHistogram as TonicExponentialHistogram,
31 ExponentialHistogramDataPoint as TonicExponentialHistogramDataPoint,
32 Gauge as TonicGauge, Histogram as TonicHistogram,
33 HistogramDataPoint as TonicHistogramDataPoint, Metric as TonicMetric,
34 NumberDataPoint as TonicNumberDataPoint, ResourceMetrics as TonicResourceMetrics,
35 ScopeMetrics as TonicScopeMetrics, Sum as TonicSum,
36 },
37 resource::v1::Resource as TonicResource,
38 };
39 use crate::transform::common::to_nanos;
40
41 impl From<u64> for exemplar::Value {
42 fn from(value: u64) -> Self {
43 exemplar::Value::AsInt(i64::try_from(value).unwrap_or_default())
44 }
45 }
46
47 impl From<i64> for exemplar::Value {
48 fn from(value: i64) -> Self {
49 exemplar::Value::AsInt(value)
50 }
51 }
52
53 impl From<f64> for exemplar::Value {
54 fn from(value: f64) -> Self {
55 exemplar::Value::AsDouble(value)
56 }
57 }
58
59 impl From<u64> for number_data_point::Value {
60 fn from(value: u64) -> Self {
61 number_data_point::Value::AsInt(i64::try_from(value).unwrap_or_default())
62 }
63 }
64
65 impl From<i64> for number_data_point::Value {
66 fn from(value: i64) -> Self {
67 number_data_point::Value::AsInt(value)
68 }
69 }
70
71 impl From<f64> for number_data_point::Value {
72 fn from(value: f64) -> Self {
73 number_data_point::Value::AsDouble(value)
74 }
75 }
76
77 impl From<(&Key, &Value)> for KeyValue {
78 fn from(kv: (&Key, &Value)) -> Self {
79 KeyValue {
80 key: kv.0.to_string(),
81 value: Some(kv.1.clone().into()),
82 }
83 }
84 }
85
86 impl From<&opentelemetry::KeyValue> for KeyValue {
87 fn from(kv: &opentelemetry::KeyValue) -> Self {
88 KeyValue {
89 key: kv.key.to_string(),
90 value: Some(kv.value.clone().into()),
91 }
92 }
93 }
94
95 impl From<Temporality> for AggregationTemporality {
96 fn from(temporality: Temporality) -> Self {
97 match temporality {
98 Temporality::Cumulative => AggregationTemporality::Cumulative,
99 Temporality::Delta => AggregationTemporality::Delta,
100 other => {
101 otel_debug!(
102 name: "AggregationTemporality::Unknown",
103 message = "Unknown temporality,using default instead.",
104 unknown_temporality = format!("{:?}", other),
105 default_temporality = format!("{:?}", Temporality::Cumulative)
106 );
107 AggregationTemporality::Cumulative
108 }
109 }
110 }
111 }
112
113 impl From<&ResourceMetrics> for ExportMetricsServiceRequest {
114 fn from(rm: &ResourceMetrics) -> Self {
115 ExportMetricsServiceRequest {
116 resource_metrics: vec![TonicResourceMetrics {
117 resource: Some((&rm.resource).into()),
118 scope_metrics: rm.scope_metrics.iter().map(Into::into).collect(),
119 schema_url: rm.resource.schema_url().map(Into::into).unwrap_or_default(),
120 }],
121 }
122 }
123 }
124
125 impl From<&SdkResource> for TonicResource {
126 fn from(resource: &SdkResource) -> Self {
127 TonicResource {
128 attributes: resource.iter().map(Into::into).collect(),
129 dropped_attributes_count: 0,
130 }
131 }
132 }
133
134 impl From<&SdkScopeMetrics> for TonicScopeMetrics {
135 fn from(sm: &SdkScopeMetrics) -> Self {
136 TonicScopeMetrics {
137 scope: Some((&sm.scope, None).into()),
138 metrics: sm.metrics.iter().map(Into::into).collect(),
139 schema_url: sm
140 .scope
141 .schema_url()
142 .map(ToOwned::to_owned)
143 .unwrap_or_default(),
144 }
145 }
146 }
147
148 impl From<&SdkMetric> for TonicMetric {
149 fn from(metric: &SdkMetric) -> Self {
150 TonicMetric {
151 name: metric.name.to_string(),
152 description: metric.description.to_string(),
153 unit: metric.unit.to_string(),
154 metadata: vec![], data: metric.data.as_any().try_into().ok(),
156 }
157 }
158 }
159
160 impl TryFrom<&dyn Any> for TonicMetricData {
161 type Error = ();
162
163 fn try_from(data: &dyn Any) -> Result<Self, Self::Error> {
164 if let Some(hist) = data.downcast_ref::<SdkHistogram<i64>>() {
165 Ok(TonicMetricData::Histogram(hist.into()))
166 } else if let Some(hist) = data.downcast_ref::<SdkHistogram<u64>>() {
167 Ok(TonicMetricData::Histogram(hist.into()))
168 } else if let Some(hist) = data.downcast_ref::<SdkHistogram<f64>>() {
169 Ok(TonicMetricData::Histogram(hist.into()))
170 } else if let Some(hist) = data.downcast_ref::<SdkExponentialHistogram<i64>>() {
171 Ok(TonicMetricData::ExponentialHistogram(hist.into()))
172 } else if let Some(hist) = data.downcast_ref::<SdkExponentialHistogram<u64>>() {
173 Ok(TonicMetricData::ExponentialHistogram(hist.into()))
174 } else if let Some(hist) = data.downcast_ref::<SdkExponentialHistogram<f64>>() {
175 Ok(TonicMetricData::ExponentialHistogram(hist.into()))
176 } else if let Some(sum) = data.downcast_ref::<SdkSum<u64>>() {
177 Ok(TonicMetricData::Sum(sum.into()))
178 } else if let Some(sum) = data.downcast_ref::<SdkSum<i64>>() {
179 Ok(TonicMetricData::Sum(sum.into()))
180 } else if let Some(sum) = data.downcast_ref::<SdkSum<f64>>() {
181 Ok(TonicMetricData::Sum(sum.into()))
182 } else if let Some(gauge) = data.downcast_ref::<SdkGauge<u64>>() {
183 Ok(TonicMetricData::Gauge(gauge.into()))
184 } else if let Some(gauge) = data.downcast_ref::<SdkGauge<i64>>() {
185 Ok(TonicMetricData::Gauge(gauge.into()))
186 } else if let Some(gauge) = data.downcast_ref::<SdkGauge<f64>>() {
187 Ok(TonicMetricData::Gauge(gauge.into()))
188 } else {
189 otel_debug!(
190 name: "TonicMetricData::UnknownAggregator",
191 message= "Unknown aggregator type",
192 unknown_type= format!("{:?}", data),
193 );
194 Err(())
195 }
196 }
197 }
198
199 trait Numeric: Into<TonicExemplarValue> + Into<TonicDataPointValue> + Copy {
200 fn into_f64(self) -> f64;
202 }
203
204 impl Numeric for u64 {
205 fn into_f64(self) -> f64 {
206 self as f64
207 }
208 }
209
210 impl Numeric for i64 {
211 fn into_f64(self) -> f64 {
212 self as f64
213 }
214 }
215
216 impl Numeric for f64 {
217 fn into_f64(self) -> f64 {
218 self
219 }
220 }
221
222 impl<T> From<&SdkHistogram<T>> for TonicHistogram
223 where
224 T: Numeric,
225 {
226 fn from(hist: &SdkHistogram<T>) -> Self {
227 TonicHistogram {
228 data_points: hist
229 .data_points
230 .iter()
231 .map(|dp| TonicHistogramDataPoint {
232 attributes: dp.attributes.iter().map(Into::into).collect(),
233 start_time_unix_nano: to_nanos(hist.start_time),
234 time_unix_nano: to_nanos(hist.time),
235 count: dp.count,
236 sum: Some(dp.sum.into_f64()),
237 bucket_counts: dp.bucket_counts.clone(),
238 explicit_bounds: dp.bounds.clone(),
239 exemplars: dp.exemplars.iter().map(Into::into).collect(),
240 flags: TonicDataPointFlags::default() as u32,
241 min: dp.min.map(Numeric::into_f64),
242 max: dp.max.map(Numeric::into_f64),
243 })
244 .collect(),
245 aggregation_temporality: TonicTemporality::from(hist.temporality).into(),
246 }
247 }
248 }
249
250 impl<T> From<&SdkExponentialHistogram<T>> for TonicExponentialHistogram
251 where
252 T: Numeric,
253 {
254 fn from(hist: &SdkExponentialHistogram<T>) -> Self {
255 TonicExponentialHistogram {
256 data_points: hist
257 .data_points
258 .iter()
259 .map(|dp| TonicExponentialHistogramDataPoint {
260 attributes: dp.attributes.iter().map(Into::into).collect(),
261 start_time_unix_nano: to_nanos(hist.start_time),
262 time_unix_nano: to_nanos(hist.time),
263 count: dp.count as u64,
264 sum: Some(dp.sum.into_f64()),
265 scale: dp.scale.into(),
266 zero_count: dp.zero_count,
267 positive: Some(TonicBuckets {
268 offset: dp.positive_bucket.offset,
269 bucket_counts: dp.positive_bucket.counts.clone(),
270 }),
271 negative: Some(TonicBuckets {
272 offset: dp.negative_bucket.offset,
273 bucket_counts: dp.negative_bucket.counts.clone(),
274 }),
275 flags: TonicDataPointFlags::default() as u32,
276 exemplars: dp.exemplars.iter().map(Into::into).collect(),
277 min: dp.min.map(Numeric::into_f64),
278 max: dp.max.map(Numeric::into_f64),
279 zero_threshold: dp.zero_threshold,
280 })
281 .collect(),
282 aggregation_temporality: TonicTemporality::from(hist.temporality).into(),
283 }
284 }
285 }
286
287 impl<T> From<&SdkSum<T>> for TonicSum
288 where
289 T: fmt::Debug + Into<TonicExemplarValue> + Into<TonicDataPointValue> + Copy,
290 {
291 fn from(sum: &SdkSum<T>) -> Self {
292 TonicSum {
293 data_points: sum
294 .data_points
295 .iter()
296 .map(|dp| TonicNumberDataPoint {
297 attributes: dp.attributes.iter().map(Into::into).collect(),
298 start_time_unix_nano: to_nanos(sum.start_time),
299 time_unix_nano: to_nanos(sum.time),
300 exemplars: dp.exemplars.iter().map(Into::into).collect(),
301 flags: TonicDataPointFlags::default() as u32,
302 value: Some(dp.value.into()),
303 })
304 .collect(),
305 aggregation_temporality: TonicTemporality::from(sum.temporality).into(),
306 is_monotonic: sum.is_monotonic,
307 }
308 }
309 }
310
311 impl<T> From<&SdkGauge<T>> for TonicGauge
312 where
313 T: fmt::Debug + Into<TonicExemplarValue> + Into<TonicDataPointValue> + Copy,
314 {
315 fn from(gauge: &SdkGauge<T>) -> Self {
316 TonicGauge {
317 data_points: gauge
318 .data_points
319 .iter()
320 .map(|dp| TonicNumberDataPoint {
321 attributes: dp.attributes.iter().map(Into::into).collect(),
322 start_time_unix_nano: gauge.start_time.map(to_nanos).unwrap_or_default(),
323 time_unix_nano: to_nanos(gauge.time),
324 exemplars: dp.exemplars.iter().map(Into::into).collect(),
325 flags: TonicDataPointFlags::default() as u32,
326 value: Some(dp.value.into()),
327 })
328 .collect(),
329 }
330 }
331 }
332
333 impl<T> From<&SdkExemplar<T>> for TonicExemplar
334 where
335 T: Into<TonicExemplarValue> + Copy,
336 {
337 fn from(ex: &SdkExemplar<T>) -> Self {
338 TonicExemplar {
339 filtered_attributes: ex
340 .filtered_attributes
341 .iter()
342 .map(|kv| (&kv.key, &kv.value).into())
343 .collect(),
344 time_unix_nano: to_nanos(ex.time),
345 span_id: ex.span_id.into(),
346 trace_id: ex.trace_id.into(),
347 value: Some(ex.value.into()),
348 }
349 }
350 }
351}