opentelemetry_sdk/
runtime.rs

1//! Provides an abstraction of several async runtimes
2//!
3//! This  allows OpenTelemetry to work with any current or future runtime. There are currently
4//! builtin implementations for [Tokio] and [async-std].
5//!
6//! [Tokio]: https://crates.io/crates/tokio
7//! [async-std]: https://crates.io/crates/async-std
8
9use futures_util::{future::BoxFuture, stream::Stream};
10use std::{fmt::Debug, future::Future, time::Duration};
11use thiserror::Error;
12
13/// A runtime is an abstraction of an async runtime like [Tokio] or [async-std]. It allows
14/// OpenTelemetry to work with any current and hopefully future runtime implementation.
15///
16/// [Tokio]: https://crates.io/crates/tokio
17/// [async-std]: https://crates.io/crates/async-std
18#[cfg(feature = "experimental_async_runtime")]
19pub trait Runtime: Clone + Send + Sync + 'static {
20    /// A future stream, which returns items in a previously specified interval. The item type is
21    /// not important.
22    type Interval: Stream + Send;
23
24    /// A future, which resolves after a previously specified amount of time. The output type is
25    /// not important.
26    type Delay: Future + Send + Unpin;
27
28    /// Create a [futures_util::stream::Stream], which returns a new item every
29    /// [std::time::Duration].
30    fn interval(&self, duration: Duration) -> Self::Interval;
31
32    /// Spawn a new task or thread, which executes the given future.
33    ///
34    /// # Note
35    ///
36    /// This is mainly used to run batch span processing in the background. Note, that the function
37    /// does not return a handle. OpenTelemetry will use a different way to wait for the future to
38    /// finish when TracerProvider gets shutdown. At the moment this happens by blocking the
39    /// current thread. This means runtime implementations need to make sure they can still execute
40    /// the given future even if the main thread is blocked.
41    fn spawn(&self, future: BoxFuture<'static, ()>);
42
43    /// Return a new future, which resolves after the specified [std::time::Duration].
44    fn delay(&self, duration: Duration) -> Self::Delay;
45}
46
47/// Runtime implementation, which works with Tokio's multi thread runtime.
48#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))]
49#[cfg_attr(
50    docsrs,
51    doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
52)]
53#[derive(Debug, Clone)]
54pub struct Tokio;
55
56#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))]
57#[cfg_attr(
58    docsrs,
59    doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
60)]
61impl Runtime for Tokio {
62    type Interval = tokio_stream::wrappers::IntervalStream;
63    type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;
64
65    fn interval(&self, duration: Duration) -> Self::Interval {
66        crate::util::tokio_interval_stream(duration)
67    }
68
69    fn spawn(&self, future: BoxFuture<'static, ()>) {
70        #[allow(clippy::let_underscore_future)]
71        // we don't have to await on the returned future to execute
72        let _ = tokio::spawn(future);
73    }
74
75    fn delay(&self, duration: Duration) -> Self::Delay {
76        Box::pin(tokio::time::sleep(duration))
77    }
78}
79
80/// Runtime implementation, which works with Tokio's current thread runtime.
81#[cfg(all(
82    feature = "experimental_async_runtime",
83    feature = "rt-tokio-current-thread"
84))]
85#[cfg_attr(
86    docsrs,
87    doc(cfg(all(
88        feature = "experimental_async_runtime",
89        feature = "rt-tokio-current-thread"
90    )))
91)]
92#[derive(Debug, Clone)]
93pub struct TokioCurrentThread;
94
95#[cfg(all(
96    feature = "experimental_async_runtime",
97    feature = "rt-tokio-current-thread"
98))]
99#[cfg_attr(
100    docsrs,
101    doc(cfg(all(
102        feature = "experimental_async_runtime",
103        feature = "rt-tokio-current-thread"
104    )))
105)]
106impl Runtime for TokioCurrentThread {
107    type Interval = tokio_stream::wrappers::IntervalStream;
108    type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;
109
110    fn interval(&self, duration: Duration) -> Self::Interval {
111        crate::util::tokio_interval_stream(duration)
112    }
113
114    fn spawn(&self, future: BoxFuture<'static, ()>) {
115        // We cannot force push tracing in current thread tokio scheduler because we rely on
116        // BatchSpanProcessor to export spans in a background task, meanwhile we need to block the
117        // shutdown function so that the runtime will not finish the blocked task and kill any
118        // remaining tasks. But there is only one thread to run task, so it's a deadlock
119        //
120        // Thus, we spawn the background task in a separate thread.
121        std::thread::spawn(move || {
122            let rt = tokio::runtime::Builder::new_current_thread()
123                .enable_all()
124                .build()
125                .expect("failed to create Tokio current thead runtime for OpenTelemetry batch processing");
126            rt.block_on(future);
127        });
128    }
129
130    fn delay(&self, duration: Duration) -> Self::Delay {
131        Box::pin(tokio::time::sleep(duration))
132    }
133}
134
135/// Runtime implementation, which works with async-std.
136#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))]
137#[cfg_attr(
138    docsrs,
139    doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std")))
140)]
141#[derive(Debug, Clone)]
142pub struct AsyncStd;
143
144#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))]
145#[cfg_attr(
146    docsrs,
147    doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std")))
148)]
149impl Runtime for AsyncStd {
150    type Interval = async_std::stream::Interval;
151    type Delay = BoxFuture<'static, ()>;
152
153    fn interval(&self, duration: Duration) -> Self::Interval {
154        async_std::stream::interval(duration)
155    }
156
157    fn spawn(&self, future: BoxFuture<'static, ()>) {
158        #[allow(clippy::let_underscore_future)]
159        let _ = async_std::task::spawn(future);
160    }
161
162    fn delay(&self, duration: Duration) -> Self::Delay {
163        Box::pin(async_std::task::sleep(duration))
164    }
165}
166
167/// `RuntimeChannel` is an extension to [`Runtime`]. Currently, it provides a
168/// channel that is used by the [log] and [span] batch processors.
169///
170/// [log]: crate::logs::BatchLogProcessor
171/// [span]: crate::trace::BatchSpanProcessor
172#[cfg(feature = "experimental_async_runtime")]
173pub trait RuntimeChannel: Runtime {
174    /// A future stream to receive batch messages from channels.
175    type Receiver<T: Debug + Send>: Stream<Item = T> + Send;
176    /// A batch messages sender that can be sent across threads safely.
177    type Sender<T: Debug + Send>: TrySend<Message = T> + Debug;
178
179    /// Return the sender and receiver used to send batch messages.
180    fn batch_message_channel<T: Debug + Send>(
181        &self,
182        capacity: usize,
183    ) -> (Self::Sender<T>, Self::Receiver<T>);
184}
185
186/// Error returned by a [`TrySend`] implementation.
187#[cfg(feature = "experimental_async_runtime")]
188#[derive(Debug, Error)]
189pub enum TrySendError {
190    /// Send failed due to the channel being full.
191    #[error("cannot send message to batch processor as the channel is full")]
192    ChannelFull,
193    /// Send failed due to the channel being closed.
194    #[error("cannot send message to batch processor as the channel is closed")]
195    ChannelClosed,
196    /// Any other send error that isnt covered above.
197    #[error(transparent)]
198    Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
199}
200
201/// TrySend is an abstraction of `Sender` that is capable of sending messages through a reference.
202#[cfg(feature = "experimental_async_runtime")]
203pub trait TrySend: Sync + Send {
204    /// The message that will be sent.
205    type Message;
206
207    /// Try to send a message batch to a worker thread.
208    ///
209    /// A failure can be due to either a closed receiver, or a depleted buffer.
210    fn try_send(&self, item: Self::Message) -> Result<(), TrySendError>;
211}
212
213#[cfg(all(
214    feature = "experimental_async_runtime",
215    any(feature = "rt-tokio", feature = "rt-tokio-current-thread")
216))]
217impl<T: Send> TrySend for tokio::sync::mpsc::Sender<T> {
218    type Message = T;
219
220    fn try_send(&self, item: Self::Message) -> Result<(), TrySendError> {
221        self.try_send(item).map_err(|err| match err {
222            tokio::sync::mpsc::error::TrySendError::Full(_) => TrySendError::ChannelFull,
223            tokio::sync::mpsc::error::TrySendError::Closed(_) => TrySendError::ChannelClosed,
224        })
225    }
226}
227
228#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))]
229#[cfg_attr(
230    docsrs,
231    doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
232)]
233impl RuntimeChannel for Tokio {
234    type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
235    type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;
236
237    fn batch_message_channel<T: Debug + Send>(
238        &self,
239        capacity: usize,
240    ) -> (Self::Sender<T>, Self::Receiver<T>) {
241        let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
242        (
243            sender,
244            tokio_stream::wrappers::ReceiverStream::new(receiver),
245        )
246    }
247}
248
249#[cfg(all(
250    feature = "experimental_async_runtime",
251    feature = "rt-tokio-current-thread"
252))]
253#[cfg_attr(
254    docsrs,
255    doc(cfg(all(
256        feature = "experimental_async_runtime",
257        feature = "rt-tokio-current-thread"
258    )))
259)]
260impl RuntimeChannel for TokioCurrentThread {
261    type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
262    type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;
263
264    fn batch_message_channel<T: Debug + Send>(
265        &self,
266        capacity: usize,
267    ) -> (Self::Sender<T>, Self::Receiver<T>) {
268        let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
269        (
270            sender,
271            tokio_stream::wrappers::ReceiverStream::new(receiver),
272        )
273    }
274}
275
276#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))]
277impl<T: Send> TrySend for async_std::channel::Sender<T> {
278    type Message = T;
279
280    fn try_send(&self, item: Self::Message) -> Result<(), TrySendError> {
281        self.try_send(item).map_err(|err| match err {
282            async_std::channel::TrySendError::Full(_) => TrySendError::ChannelFull,
283            async_std::channel::TrySendError::Closed(_) => TrySendError::ChannelClosed,
284        })
285    }
286}
287
288#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))]
289#[cfg_attr(
290    docsrs,
291    doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std")))
292)]
293impl RuntimeChannel for AsyncStd {
294    type Receiver<T: Debug + Send> = async_std::channel::Receiver<T>;
295    type Sender<T: Debug + Send> = async_std::channel::Sender<T>;
296
297    fn batch_message_channel<T: Debug + Send>(
298        &self,
299        capacity: usize,
300    ) -> (Self::Sender<T>, Self::Receiver<T>) {
301        async_std::channel::bounded(capacity)
302    }
303}