opentelemetry_sdk/
runtime.rs1use futures_util::{future::BoxFuture, stream::Stream};
10use std::{fmt::Debug, future::Future, time::Duration};
11use thiserror::Error;
12
13#[cfg(feature = "experimental_async_runtime")]
19pub trait Runtime: Clone + Send + Sync + 'static {
20 type Interval: Stream + Send;
23
24 type Delay: Future + Send + Unpin;
27
28 fn interval(&self, duration: Duration) -> Self::Interval;
31
32 fn spawn(&self, future: BoxFuture<'static, ()>);
42
43 fn delay(&self, duration: Duration) -> Self::Delay;
45}
46
47#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))]
49#[cfg_attr(
50 docsrs,
51 doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
52)]
53#[derive(Debug, Clone)]
54pub struct Tokio;
55
56#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))]
57#[cfg_attr(
58 docsrs,
59 doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
60)]
61impl Runtime for Tokio {
62 type Interval = tokio_stream::wrappers::IntervalStream;
63 type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;
64
65 fn interval(&self, duration: Duration) -> Self::Interval {
66 crate::util::tokio_interval_stream(duration)
67 }
68
69 fn spawn(&self, future: BoxFuture<'static, ()>) {
70 #[allow(clippy::let_underscore_future)]
71 let _ = tokio::spawn(future);
73 }
74
75 fn delay(&self, duration: Duration) -> Self::Delay {
76 Box::pin(tokio::time::sleep(duration))
77 }
78}
79
80#[cfg(all(
82 feature = "experimental_async_runtime",
83 feature = "rt-tokio-current-thread"
84))]
85#[cfg_attr(
86 docsrs,
87 doc(cfg(all(
88 feature = "experimental_async_runtime",
89 feature = "rt-tokio-current-thread"
90 )))
91)]
92#[derive(Debug, Clone)]
93pub struct TokioCurrentThread;
94
95#[cfg(all(
96 feature = "experimental_async_runtime",
97 feature = "rt-tokio-current-thread"
98))]
99#[cfg_attr(
100 docsrs,
101 doc(cfg(all(
102 feature = "experimental_async_runtime",
103 feature = "rt-tokio-current-thread"
104 )))
105)]
106impl Runtime for TokioCurrentThread {
107 type Interval = tokio_stream::wrappers::IntervalStream;
108 type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;
109
110 fn interval(&self, duration: Duration) -> Self::Interval {
111 crate::util::tokio_interval_stream(duration)
112 }
113
114 fn spawn(&self, future: BoxFuture<'static, ()>) {
115 std::thread::spawn(move || {
122 let rt = tokio::runtime::Builder::new_current_thread()
123 .enable_all()
124 .build()
125 .expect("failed to create Tokio current thead runtime for OpenTelemetry batch processing");
126 rt.block_on(future);
127 });
128 }
129
130 fn delay(&self, duration: Duration) -> Self::Delay {
131 Box::pin(tokio::time::sleep(duration))
132 }
133}
134
135#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))]
137#[cfg_attr(
138 docsrs,
139 doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std")))
140)]
141#[derive(Debug, Clone)]
142pub struct AsyncStd;
143
144#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))]
145#[cfg_attr(
146 docsrs,
147 doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std")))
148)]
149impl Runtime for AsyncStd {
150 type Interval = async_std::stream::Interval;
151 type Delay = BoxFuture<'static, ()>;
152
153 fn interval(&self, duration: Duration) -> Self::Interval {
154 async_std::stream::interval(duration)
155 }
156
157 fn spawn(&self, future: BoxFuture<'static, ()>) {
158 #[allow(clippy::let_underscore_future)]
159 let _ = async_std::task::spawn(future);
160 }
161
162 fn delay(&self, duration: Duration) -> Self::Delay {
163 Box::pin(async_std::task::sleep(duration))
164 }
165}
166
167#[cfg(feature = "experimental_async_runtime")]
173pub trait RuntimeChannel: Runtime {
174 type Receiver<T: Debug + Send>: Stream<Item = T> + Send;
176 type Sender<T: Debug + Send>: TrySend<Message = T> + Debug;
178
179 fn batch_message_channel<T: Debug + Send>(
181 &self,
182 capacity: usize,
183 ) -> (Self::Sender<T>, Self::Receiver<T>);
184}
185
186#[cfg(feature = "experimental_async_runtime")]
188#[derive(Debug, Error)]
189pub enum TrySendError {
190 #[error("cannot send message to batch processor as the channel is full")]
192 ChannelFull,
193 #[error("cannot send message to batch processor as the channel is closed")]
195 ChannelClosed,
196 #[error(transparent)]
198 Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
199}
200
201#[cfg(feature = "experimental_async_runtime")]
203pub trait TrySend: Sync + Send {
204 type Message;
206
207 fn try_send(&self, item: Self::Message) -> Result<(), TrySendError>;
211}
212
213#[cfg(all(
214 feature = "experimental_async_runtime",
215 any(feature = "rt-tokio", feature = "rt-tokio-current-thread")
216))]
217impl<T: Send> TrySend for tokio::sync::mpsc::Sender<T> {
218 type Message = T;
219
220 fn try_send(&self, item: Self::Message) -> Result<(), TrySendError> {
221 self.try_send(item).map_err(|err| match err {
222 tokio::sync::mpsc::error::TrySendError::Full(_) => TrySendError::ChannelFull,
223 tokio::sync::mpsc::error::TrySendError::Closed(_) => TrySendError::ChannelClosed,
224 })
225 }
226}
227
228#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))]
229#[cfg_attr(
230 docsrs,
231 doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
232)]
233impl RuntimeChannel for Tokio {
234 type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
235 type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;
236
237 fn batch_message_channel<T: Debug + Send>(
238 &self,
239 capacity: usize,
240 ) -> (Self::Sender<T>, Self::Receiver<T>) {
241 let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
242 (
243 sender,
244 tokio_stream::wrappers::ReceiverStream::new(receiver),
245 )
246 }
247}
248
249#[cfg(all(
250 feature = "experimental_async_runtime",
251 feature = "rt-tokio-current-thread"
252))]
253#[cfg_attr(
254 docsrs,
255 doc(cfg(all(
256 feature = "experimental_async_runtime",
257 feature = "rt-tokio-current-thread"
258 )))
259)]
260impl RuntimeChannel for TokioCurrentThread {
261 type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
262 type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;
263
264 fn batch_message_channel<T: Debug + Send>(
265 &self,
266 capacity: usize,
267 ) -> (Self::Sender<T>, Self::Receiver<T>) {
268 let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
269 (
270 sender,
271 tokio_stream::wrappers::ReceiverStream::new(receiver),
272 )
273 }
274}
275
276#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))]
277impl<T: Send> TrySend for async_std::channel::Sender<T> {
278 type Message = T;
279
280 fn try_send(&self, item: Self::Message) -> Result<(), TrySendError> {
281 self.try_send(item).map_err(|err| match err {
282 async_std::channel::TrySendError::Full(_) => TrySendError::ChannelFull,
283 async_std::channel::TrySendError::Closed(_) => TrySendError::ChannelClosed,
284 })
285 }
286}
287
288#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))]
289#[cfg_attr(
290 docsrs,
291 doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std")))
292)]
293impl RuntimeChannel for AsyncStd {
294 type Receiver<T: Debug + Send> = async_std::channel::Receiver<T>;
295 type Sender<T: Debug + Send> = async_std::channel::Sender<T>;
296
297 fn batch_message_channel<T: Debug + Send>(
298 &self,
299 capacity: usize,
300 ) -> (Self::Sender<T>, Self::Receiver<T>) {
301 async_std::channel::bounded(capacity)
302 }
303}