opentelemetry/trace/
context.rs

1//! Context extensions for tracing
2use crate::{
3    global, otel_debug,
4    trace::{Span, SpanContext, Status},
5    Context, ContextGuard, KeyValue,
6};
7use futures_core::stream::Stream;
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10use std::{
11    borrow::Cow,
12    error::Error,
13    pin::Pin,
14    sync::Mutex,
15    task::{Context as TaskContext, Poll},
16};
17
18const NOOP_SPAN: SynchronizedSpan = SynchronizedSpan {
19    span_context: SpanContext::NONE,
20    inner: None,
21};
22
23/// A reference to the currently active span in this context.
24#[derive(Debug)]
25pub struct SpanRef<'a>(&'a SynchronizedSpan);
26
27#[derive(Debug)]
28pub(crate) struct SynchronizedSpan {
29    /// Immutable span context
30    span_context: SpanContext,
31    /// Mutable span inner that requires synchronization
32    inner: Option<Mutex<global::BoxedSpan>>,
33}
34
35impl SynchronizedSpan {
36    pub(crate) fn span_context(&self) -> &SpanContext {
37        &self.span_context
38    }
39}
40
41impl From<SpanContext> for SynchronizedSpan {
42    fn from(value: SpanContext) -> Self {
43        Self {
44            span_context: value,
45            inner: None,
46        }
47    }
48}
49
50impl<T: Span + Send + Sync + 'static> From<T> for SynchronizedSpan {
51    fn from(value: T) -> Self {
52        Self {
53            span_context: value.span_context().clone(),
54            inner: Some(Mutex::new(global::BoxedSpan::new(value))),
55        }
56    }
57}
58
59impl SpanRef<'_> {
60    fn with_inner_mut<F: FnOnce(&mut global::BoxedSpan)>(&self, f: F) {
61        if let Some(ref inner) = self.0.inner {
62            match inner.lock() {
63                Ok(mut locked) => f(&mut locked),
64                Err(err) => {
65                    otel_debug!(
66                        name: "SpanRef.LockFailed",
67                        message = "Failed to acquire lock for SpanRef: {:?}",
68                        reason = format!("{:?}", err),
69                        span_context = format!("{:?}", self.0.span_context));
70                }
71            }
72        }
73    }
74}
75
76impl SpanRef<'_> {
77    /// Record an event in the context this span.
78    ///
79    /// Note that the OpenTelemetry project documents certain "[standard
80    /// attributes]" that have prescribed semantic meanings and are available via
81    /// the [opentelemetry_semantic_conventions] crate.
82    ///
83    /// [standard attributes]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/trace/semantic_conventions/README.md
84    /// [opentelemetry_semantic_conventions]: https://docs.rs/opentelemetry-semantic-conventions
85    pub fn add_event<T>(&self, name: T, attributes: Vec<KeyValue>)
86    where
87        T: Into<Cow<'static, str>>,
88    {
89        self.with_inner_mut(|inner| inner.add_event(name, attributes))
90    }
91
92    /// Record an error as an event for this span.
93    ///
94    /// An additional call to [Span::set_status] is required if the status of the
95    /// span should be set to error, as this method does not change the span status.
96    ///
97    /// If this span is not being recorded then this method does nothing.
98    pub fn record_error(&self, err: &dyn Error) {
99        self.with_inner_mut(|inner| inner.record_error(err))
100    }
101
102    /// Record an event with a timestamp in the context this span.
103    ///
104    /// Note that the OpenTelemetry project documents certain "[standard
105    /// attributes]" that have prescribed semantic meanings and are available via
106    /// the [opentelemetry_semantic_conventions] crate.
107    ///
108    /// [standard attributes]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/trace/semantic_conventions/README.md
109    /// [opentelemetry_semantic_conventions]: https://docs.rs/opentelemetry-semantic-conventions
110    pub fn add_event_with_timestamp<T>(
111        &self,
112        name: T,
113        timestamp: std::time::SystemTime,
114        attributes: Vec<crate::KeyValue>,
115    ) where
116        T: Into<Cow<'static, str>>,
117    {
118        self.with_inner_mut(move |inner| {
119            inner.add_event_with_timestamp(name, timestamp, attributes)
120        })
121    }
122
123    /// A reference to the [`SpanContext`] for this span.
124    pub fn span_context(&self) -> &SpanContext {
125        &self.0.span_context
126    }
127
128    /// Returns `true` if this span is recording information.
129    ///
130    /// Spans will not be recording information after they have ended.
131    ///
132    /// This flag may be `true` despite the entire trace being sampled out. This
133    /// allows recording and processing of information about the individual
134    /// spans without sending it to the backend. An example of this scenario may
135    /// be recording and processing of all incoming requests for the processing
136    /// and building of SLA/SLO latency charts while sending only a subset -
137    /// sampled spans - to the backend.
138    pub fn is_recording(&self) -> bool {
139        self.0
140            .inner
141            .as_ref()
142            .and_then(|inner| inner.lock().ok().map(|active| active.is_recording()))
143            .unwrap_or(false)
144    }
145
146    /// Set an attribute of this span.
147    ///
148    /// Setting an attribute with the same key as an existing attribute
149    /// generally overwrites the existing attribute's value.
150    ///
151    /// Note that the OpenTelemetry project documents certain "[standard
152    /// attributes]" that have prescribed semantic meanings and are available via
153    /// the [opentelemetry_semantic_conventions] crate.
154    ///
155    /// [standard attributes]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/trace/semantic_conventions/README.md
156    /// [opentelemetry_semantic_conventions]: https://docs.rs/opentelemetry-semantic-conventions
157    pub fn set_attribute(&self, attribute: crate::KeyValue) {
158        self.with_inner_mut(move |inner| inner.set_attribute(attribute))
159    }
160
161    /// Set multiple attributes of this span.
162    ///
163    /// Setting an attribute with the same key as an existing attribute
164    /// generally overwrites the existing attribute's value.
165    ///
166    /// Note that the OpenTelemetry project documents certain "[standard
167    /// attributes]" that have prescribed semantic meanings and are available via
168    /// the [opentelemetry_semantic_conventions] crate.
169    ///
170    /// [standard attributes]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/trace/semantic_conventions/README.md
171    /// [opentelemetry_semantic_conventions]: https://docs.rs/opentelemetry-semantic-conventions
172    pub fn set_attributes(&self, attributes: impl IntoIterator<Item = KeyValue>) {
173        self.with_inner_mut(move |inner| inner.set_attributes(attributes))
174    }
175
176    /// Sets the status of this `Span`.
177    ///
178    /// If used, this will override the default span status, which is [`Status::Unset`].
179    pub fn set_status(&self, status: Status) {
180        self.with_inner_mut(move |inner| inner.set_status(status))
181    }
182
183    /// Updates the span's name.
184    ///
185    /// After this update, any sampling behavior based on the name will depend on
186    /// the implementation.
187    pub fn update_name<T>(&self, new_name: T)
188    where
189        T: Into<Cow<'static, str>>,
190    {
191        self.with_inner_mut(move |inner| inner.update_name(new_name))
192    }
193
194    /// Signals that the operation described by this span has now ended.
195    pub fn end(&self) {
196        self.end_with_timestamp(crate::time::now());
197    }
198
199    /// Signals that the operation described by this span ended at the given time.
200    pub fn end_with_timestamp(&self, timestamp: std::time::SystemTime) {
201        self.with_inner_mut(move |inner| inner.end_with_timestamp(timestamp))
202    }
203}
204
205/// Methods for storing and retrieving trace data in a [`Context`].
206///
207/// See [`Context`] for examples of setting and retrieving the current context.
208pub trait TraceContextExt {
209    /// Returns a clone of the current context with the included [`Span`].
210    ///
211    /// # Examples
212    ///
213    /// ```
214    /// use opentelemetry::{global, trace::{TraceContextExt, Tracer}, Context};
215    ///
216    /// let tracer = global::tracer("example");
217    ///
218    /// // build a span
219    /// let span = tracer.start("parent_span");
220    ///
221    /// // create a new context from the currently active context that includes this span
222    /// let cx = Context::current_with_span(span);
223    ///
224    /// // create a child span by explicitly specifying the parent context
225    /// let child = tracer.start_with_context("child_span", &cx);
226    /// # drop(child)
227    /// ```
228    fn current_with_span<T: crate::trace::Span + Send + Sync + 'static>(span: T) -> Self;
229
230    /// Returns a clone of this context with the included span.
231    ///
232    /// # Examples
233    ///
234    /// ```
235    /// use opentelemetry::{global, trace::{TraceContextExt, Tracer}, Context};
236    ///
237    /// fn fn_with_passed_in_context(cx: &Context) {
238    ///     let tracer = global::tracer("example");
239    ///
240    ///     // build a span
241    ///     let span = tracer.start("parent_span");
242    ///
243    ///     // create a new context from the given context that includes the span
244    ///     let cx_with_parent = cx.with_span(span);
245    ///
246    ///     // create a child span by explicitly specifying the parent context
247    ///     let child = tracer.start_with_context("child_span", &cx_with_parent);
248    ///     # drop(child)
249    /// }
250    ///
251    fn with_span<T: crate::trace::Span + Send + Sync + 'static>(&self, span: T) -> Self;
252
253    /// Returns a reference to this context's span, or the default no-op span if
254    /// none has been set.
255    ///
256    /// # Examples
257    ///
258    /// ```
259    /// use opentelemetry::{trace::TraceContextExt, Context};
260    ///
261    /// // Add an event to the currently active span
262    /// Context::map_current(|cx| cx.span().add_event("An event!", vec![]));
263    /// ```
264    fn span(&self) -> SpanRef<'_>;
265
266    /// Returns whether or not an active span has been set.
267    ///
268    /// # Examples
269    ///
270    /// ```
271    /// use opentelemetry::{trace::TraceContextExt, Context};
272    ///
273    /// assert!(!Context::map_current(|cx| cx.has_active_span()));
274    /// ```
275    fn has_active_span(&self) -> bool;
276
277    /// Returns a copy of this context with the span context included.
278    ///
279    /// This is useful for building propagators.
280    fn with_remote_span_context(&self, span_context: crate::trace::SpanContext) -> Self;
281}
282
283impl TraceContextExt for Context {
284    fn current_with_span<T: crate::trace::Span + Send + Sync + 'static>(span: T) -> Self {
285        Context::current_with_synchronized_span(span.into())
286    }
287
288    fn with_span<T: crate::trace::Span + Send + Sync + 'static>(&self, span: T) -> Self {
289        self.with_synchronized_span(span.into())
290    }
291
292    fn span(&self) -> SpanRef<'_> {
293        if let Some(span) = self.span.as_ref() {
294            SpanRef(span)
295        } else {
296            SpanRef(&NOOP_SPAN)
297        }
298    }
299
300    fn has_active_span(&self) -> bool {
301        self.span.is_some()
302    }
303
304    fn with_remote_span_context(&self, span_context: crate::trace::SpanContext) -> Self {
305        self.with_synchronized_span(span_context.into())
306    }
307}
308
309/// Mark a given `Span` as active.
310///
311/// The `Tracer` MUST provide a way to update its active `Span`, and MAY provide convenience
312/// methods to manage a `Span`'s lifetime and the scope in which a `Span` is active. When an
313/// active `Span` is made inactive, the previously-active `Span` SHOULD be made active. A `Span`
314/// maybe finished (i.e. have a non-null end time) but still be active. A `Span` may be active
315/// on one thread after it has been made inactive on another.
316///
317/// # Examples
318///
319/// ```
320/// use opentelemetry::{global, trace::{Span, Tracer}, KeyValue};
321/// use opentelemetry::trace::{get_active_span, mark_span_as_active};
322///
323/// fn my_function() {
324///     let tracer = global::tracer("my-component-a");
325///     // start an active span in one function
326///     let span = tracer.start("span-name");
327///     let _guard = mark_span_as_active(span);
328///     // anything happening in functions we call can still access the active span...
329///     my_other_function();
330/// }
331///
332/// fn my_other_function() {
333///     // call methods on the current span from
334///     get_active_span(|span| {
335///         span.add_event("An event!".to_string(), vec![KeyValue::new("happened", true)]);
336///     });
337/// }
338/// ```
339#[must_use = "Dropping the guard detaches the context."]
340pub fn mark_span_as_active<T: crate::trace::Span + Send + Sync + 'static>(span: T) -> ContextGuard {
341    let cx = Context::current_with_span(span);
342    cx.attach()
343}
344
345/// Executes a closure with a reference to this thread's current span.
346///
347/// # Examples
348///
349/// ```
350/// use opentelemetry::{global, trace::{Span, Tracer}, KeyValue};
351/// use opentelemetry::trace::get_active_span;
352///
353/// fn my_function() {
354///     // start an active span in one function
355///     global::tracer("my-component").in_span("span-name", |_cx| {
356///         // anything happening in functions we call can still access the active span...
357///         my_other_function();
358///     })
359/// }
360///
361/// fn my_other_function() {
362///     // call methods on the current span from
363///     get_active_span(|span| {
364///         span.add_event("An event!", vec![KeyValue::new("happened", true)]);
365///     })
366/// }
367/// ```
368pub fn get_active_span<F, T>(f: F) -> T
369where
370    F: FnOnce(SpanRef<'_>) -> T,
371{
372    Context::map_current(|cx| f(cx.span()))
373}
374
375pin_project! {
376    /// A future, stream, or sink that has an associated context.
377    #[derive(Clone, Debug)]
378    pub struct WithContext<T> {
379        #[pin]
380        inner: T,
381        otel_cx: Context,
382    }
383}
384
385impl<T: Sized> FutureExt for T {}
386
387impl<T: std::future::Future> std::future::Future for WithContext<T> {
388    type Output = T::Output;
389
390    fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
391        let this = self.project();
392        let _guard = this.otel_cx.clone().attach();
393
394        this.inner.poll(task_cx)
395    }
396}
397
398impl<T: Stream> Stream for WithContext<T> {
399    type Item = T::Item;
400
401    fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
402        let this = self.project();
403        let _guard = this.otel_cx.clone().attach();
404        T::poll_next(this.inner, task_cx)
405    }
406}
407
408impl<I, T: Sink<I>> Sink<I> for WithContext<T>
409where
410    T: Sink<I>,
411{
412    type Error = T::Error;
413
414    fn poll_ready(
415        self: Pin<&mut Self>,
416        task_cx: &mut TaskContext<'_>,
417    ) -> Poll<Result<(), Self::Error>> {
418        let this = self.project();
419        let _guard = this.otel_cx.clone().attach();
420        T::poll_ready(this.inner, task_cx)
421    }
422
423    fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
424        let this = self.project();
425        let _guard = this.otel_cx.clone().attach();
426        T::start_send(this.inner, item)
427    }
428
429    fn poll_flush(
430        self: Pin<&mut Self>,
431        task_cx: &mut TaskContext<'_>,
432    ) -> Poll<Result<(), Self::Error>> {
433        let this = self.project();
434        let _guard = this.otel_cx.clone().attach();
435        T::poll_flush(this.inner, task_cx)
436    }
437
438    fn poll_close(
439        self: Pin<&mut Self>,
440        task_cx: &mut TaskContext<'_>,
441    ) -> Poll<Result<(), Self::Error>> {
442        let this = self.project();
443        let _enter = this.otel_cx.clone().attach();
444        T::poll_close(this.inner, task_cx)
445    }
446}
447
448/// Extension trait allowing futures, streams, and sinks to be traced with a span.
449pub trait FutureExt: Sized {
450    /// Attaches the provided [`Context`] to this type, returning a `WithContext`
451    /// wrapper.
452    ///
453    /// When the wrapped type is a future, stream, or sink, the attached context
454    /// will be set as current while it is being polled.
455    ///
456    /// [`Context`]: crate::Context
457    fn with_context(self, otel_cx: Context) -> WithContext<Self> {
458        WithContext {
459            inner: self,
460            otel_cx,
461        }
462    }
463
464    /// Attaches the current [`Context`] to this type, returning a `WithContext`
465    /// wrapper.
466    ///
467    /// When the wrapped type is a future, stream, or sink, the attached context
468    /// will be set as the default while it is being polled.
469    ///
470    /// [`Context`]: crate::Context
471    fn with_current_context(self) -> WithContext<Self> {
472        let otel_cx = Context::current();
473        self.with_context(otel_cx)
474    }
475}