opentelemetry_otlp/exporter/http/
logs.rs

1use super::OtlpHttpClient;
2use http::{header::CONTENT_TYPE, Method};
3use opentelemetry::otel_debug;
4use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
5use opentelemetry_sdk::logs::{LogBatch, LogExporter};
6
7impl LogExporter for OtlpHttpClient {
8    #[allow(clippy::manual_async_fn)]
9    fn export(
10        &self,
11        batch: LogBatch<'_>,
12    ) -> impl std::future::Future<Output = OTelSdkResult> + Send {
13        async move {
14            let client = self
15                .client
16                .lock()
17                .map_err(|e| OTelSdkError::InternalFailure(format!("Mutex lock failed: {}", e)))?
18                .clone()
19                .ok_or(OTelSdkError::AlreadyShutdown)?;
20
21            let (body, content_type) = self
22                .build_logs_export_body(batch)
23                .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
24
25            let mut request = http::Request::builder()
26                .method(Method::POST)
27                .uri(&self.collector_endpoint)
28                .header(CONTENT_TYPE, content_type)
29                .body(body.into())
30                .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
31
32            for (k, v) in &self.headers {
33                request.headers_mut().insert(k.clone(), v.clone());
34            }
35
36            let request_uri = request.uri().to_string();
37            otel_debug!(name: "HttpLogsClient.CallingExport");
38            let response = client
39                .send_bytes(request)
40                .await
41                .map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
42            if !response.status().is_success() {
43                let error = format!(
44                    "OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}",
45                    request_uri,
46                    response.status().as_u16(),
47                    response.body()
48                );
49                return Err(OTelSdkError::InternalFailure(error));
50            }
51            Ok(())
52        }
53    }
54
55    fn shutdown(&mut self) -> OTelSdkResult {
56        let mut client_guard = self.client.lock().map_err(|e| {
57            OTelSdkError::InternalFailure(format!("Failed to acquire client lock: {}", e))
58        })?;
59
60        if client_guard.take().is_none() {
61            return Err(OTelSdkError::AlreadyShutdown);
62        }
63
64        Ok(())
65    }
66
67    fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
68        self.resource = resource.into();
69    }
70}