opentelemetry/trace/context.rs
1//! Context extensions for tracing
2use crate::{
3 global, otel_debug,
4 trace::{Span, SpanContext, Status},
5 Context, ContextGuard, KeyValue,
6};
7use futures_core::stream::Stream;
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10use std::{
11 borrow::Cow,
12 error::Error,
13 pin::Pin,
14 sync::Mutex,
15 task::{Context as TaskContext, Poll},
16};
17
18const NOOP_SPAN: SynchronizedSpan = SynchronizedSpan {
19 span_context: SpanContext::NONE,
20 inner: None,
21};
22
23/// A reference to the currently active span in this context.
24#[derive(Debug)]
25pub struct SpanRef<'a>(&'a SynchronizedSpan);
26
27#[derive(Debug)]
28pub(crate) struct SynchronizedSpan {
29 /// Immutable span context
30 span_context: SpanContext,
31 /// Mutable span inner that requires synchronization
32 inner: Option<Mutex<global::BoxedSpan>>,
33}
34
35impl SynchronizedSpan {
36 pub(crate) fn span_context(&self) -> &SpanContext {
37 &self.span_context
38 }
39}
40
41impl From<SpanContext> for SynchronizedSpan {
42 fn from(value: SpanContext) -> Self {
43 Self {
44 span_context: value,
45 inner: None,
46 }
47 }
48}
49
50impl<T: Span + Send + Sync + 'static> From<T> for SynchronizedSpan {
51 fn from(value: T) -> Self {
52 Self {
53 span_context: value.span_context().clone(),
54 inner: Some(Mutex::new(global::BoxedSpan::new(value))),
55 }
56 }
57}
58
59impl SpanRef<'_> {
60 fn with_inner_mut<F: FnOnce(&mut global::BoxedSpan)>(&self, f: F) {
61 if let Some(ref inner) = self.0.inner {
62 match inner.lock() {
63 Ok(mut locked) => f(&mut locked),
64 Err(err) => {
65 otel_debug!(
66 name: "SpanRef.LockFailed",
67 message = "Failed to acquire lock for SpanRef: {:?}",
68 reason = format!("{:?}", err),
69 span_context = format!("{:?}", self.0.span_context));
70 }
71 }
72 }
73 }
74}
75
76impl SpanRef<'_> {
77 /// Record an event in the context this span.
78 ///
79 /// Note that the OpenTelemetry project documents certain "[standard
80 /// attributes]" that have prescribed semantic meanings and are available via
81 /// the [opentelemetry_semantic_conventions] crate.
82 ///
83 /// [standard attributes]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/trace/semantic_conventions/README.md
84 /// [opentelemetry_semantic_conventions]: https://docs.rs/opentelemetry-semantic-conventions
85 pub fn add_event<T>(&self, name: T, attributes: Vec<KeyValue>)
86 where
87 T: Into<Cow<'static, str>>,
88 {
89 self.with_inner_mut(|inner| inner.add_event(name, attributes))
90 }
91
92 /// Record an error as an event for this span.
93 ///
94 /// An additional call to [Span::set_status] is required if the status of the
95 /// span should be set to error, as this method does not change the span status.
96 ///
97 /// If this span is not being recorded then this method does nothing.
98 pub fn record_error(&self, err: &dyn Error) {
99 self.with_inner_mut(|inner| inner.record_error(err))
100 }
101
102 /// Record an event with a timestamp in the context this span.
103 ///
104 /// Note that the OpenTelemetry project documents certain "[standard
105 /// attributes]" that have prescribed semantic meanings and are available via
106 /// the [opentelemetry_semantic_conventions] crate.
107 ///
108 /// [standard attributes]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/trace/semantic_conventions/README.md
109 /// [opentelemetry_semantic_conventions]: https://docs.rs/opentelemetry-semantic-conventions
110 pub fn add_event_with_timestamp<T>(
111 &self,
112 name: T,
113 timestamp: std::time::SystemTime,
114 attributes: Vec<crate::KeyValue>,
115 ) where
116 T: Into<Cow<'static, str>>,
117 {
118 self.with_inner_mut(move |inner| {
119 inner.add_event_with_timestamp(name, timestamp, attributes)
120 })
121 }
122
123 /// A reference to the [`SpanContext`] for this span.
124 pub fn span_context(&self) -> &SpanContext {
125 &self.0.span_context
126 }
127
128 /// Returns `true` if this span is recording information.
129 ///
130 /// Spans will not be recording information after they have ended.
131 ///
132 /// This flag may be `true` despite the entire trace being sampled out. This
133 /// allows recording and processing of information about the individual
134 /// spans without sending it to the backend. An example of this scenario may
135 /// be recording and processing of all incoming requests for the processing
136 /// and building of SLA/SLO latency charts while sending only a subset -
137 /// sampled spans - to the backend.
138 pub fn is_recording(&self) -> bool {
139 self.0
140 .inner
141 .as_ref()
142 .and_then(|inner| inner.lock().ok().map(|active| active.is_recording()))
143 .unwrap_or(false)
144 }
145
146 /// Set an attribute of this span.
147 ///
148 /// Setting an attribute with the same key as an existing attribute
149 /// generally overwrites the existing attribute's value.
150 ///
151 /// Note that the OpenTelemetry project documents certain "[standard
152 /// attributes]" that have prescribed semantic meanings and are available via
153 /// the [opentelemetry_semantic_conventions] crate.
154 ///
155 /// [standard attributes]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/trace/semantic_conventions/README.md
156 /// [opentelemetry_semantic_conventions]: https://docs.rs/opentelemetry-semantic-conventions
157 pub fn set_attribute(&self, attribute: crate::KeyValue) {
158 self.with_inner_mut(move |inner| inner.set_attribute(attribute))
159 }
160
161 /// Set multiple attributes of this span.
162 ///
163 /// Setting an attribute with the same key as an existing attribute
164 /// generally overwrites the existing attribute's value.
165 ///
166 /// Note that the OpenTelemetry project documents certain "[standard
167 /// attributes]" that have prescribed semantic meanings and are available via
168 /// the [opentelemetry_semantic_conventions] crate.
169 ///
170 /// [standard attributes]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/trace/semantic_conventions/README.md
171 /// [opentelemetry_semantic_conventions]: https://docs.rs/opentelemetry-semantic-conventions
172 pub fn set_attributes(&self, attributes: impl IntoIterator<Item = KeyValue>) {
173 self.with_inner_mut(move |inner| inner.set_attributes(attributes))
174 }
175
176 /// Sets the status of this `Span`.
177 ///
178 /// If used, this will override the default span status, which is [`Status::Unset`].
179 pub fn set_status(&self, status: Status) {
180 self.with_inner_mut(move |inner| inner.set_status(status))
181 }
182
183 /// Updates the span's name.
184 ///
185 /// After this update, any sampling behavior based on the name will depend on
186 /// the implementation.
187 pub fn update_name<T>(&self, new_name: T)
188 where
189 T: Into<Cow<'static, str>>,
190 {
191 self.with_inner_mut(move |inner| inner.update_name(new_name))
192 }
193
194 /// Signals that the operation described by this span has now ended.
195 pub fn end(&self) {
196 self.end_with_timestamp(crate::time::now());
197 }
198
199 /// Signals that the operation described by this span ended at the given time.
200 pub fn end_with_timestamp(&self, timestamp: std::time::SystemTime) {
201 self.with_inner_mut(move |inner| inner.end_with_timestamp(timestamp))
202 }
203}
204
205/// Methods for storing and retrieving trace data in a [`Context`].
206///
207/// See [`Context`] for examples of setting and retrieving the current context.
208pub trait TraceContextExt {
209 /// Returns a clone of the current context with the included [`Span`].
210 ///
211 /// # Examples
212 ///
213 /// ```
214 /// use opentelemetry::{global, trace::{TraceContextExt, Tracer}, Context};
215 ///
216 /// let tracer = global::tracer("example");
217 ///
218 /// // build a span
219 /// let span = tracer.start("parent_span");
220 ///
221 /// // create a new context from the currently active context that includes this span
222 /// let cx = Context::current_with_span(span);
223 ///
224 /// // create a child span by explicitly specifying the parent context
225 /// let child = tracer.start_with_context("child_span", &cx);
226 /// # drop(child)
227 /// ```
228 fn current_with_span<T: crate::trace::Span + Send + Sync + 'static>(span: T) -> Self;
229
230 /// Returns a clone of this context with the included span.
231 ///
232 /// # Examples
233 ///
234 /// ```
235 /// use opentelemetry::{global, trace::{TraceContextExt, Tracer}, Context};
236 ///
237 /// fn fn_with_passed_in_context(cx: &Context) {
238 /// let tracer = global::tracer("example");
239 ///
240 /// // build a span
241 /// let span = tracer.start("parent_span");
242 ///
243 /// // create a new context from the given context that includes the span
244 /// let cx_with_parent = cx.with_span(span);
245 ///
246 /// // create a child span by explicitly specifying the parent context
247 /// let child = tracer.start_with_context("child_span", &cx_with_parent);
248 /// # drop(child)
249 /// }
250 ///
251 fn with_span<T: crate::trace::Span + Send + Sync + 'static>(&self, span: T) -> Self;
252
253 /// Returns a reference to this context's span, or the default no-op span if
254 /// none has been set.
255 ///
256 /// # Examples
257 ///
258 /// ```
259 /// use opentelemetry::{trace::TraceContextExt, Context};
260 ///
261 /// // Add an event to the currently active span
262 /// Context::map_current(|cx| cx.span().add_event("An event!", vec![]));
263 /// ```
264 fn span(&self) -> SpanRef<'_>;
265
266 /// Returns whether or not an active span has been set.
267 ///
268 /// # Examples
269 ///
270 /// ```
271 /// use opentelemetry::{trace::TraceContextExt, Context};
272 ///
273 /// assert!(!Context::map_current(|cx| cx.has_active_span()));
274 /// ```
275 fn has_active_span(&self) -> bool;
276
277 /// Returns a copy of this context with the span context included.
278 ///
279 /// This is useful for building propagators.
280 fn with_remote_span_context(&self, span_context: crate::trace::SpanContext) -> Self;
281}
282
283impl TraceContextExt for Context {
284 fn current_with_span<T: crate::trace::Span + Send + Sync + 'static>(span: T) -> Self {
285 Context::current_with_synchronized_span(span.into())
286 }
287
288 fn with_span<T: crate::trace::Span + Send + Sync + 'static>(&self, span: T) -> Self {
289 self.with_synchronized_span(span.into())
290 }
291
292 fn span(&self) -> SpanRef<'_> {
293 if let Some(span) = self.span.as_ref() {
294 SpanRef(span)
295 } else {
296 SpanRef(&NOOP_SPAN)
297 }
298 }
299
300 fn has_active_span(&self) -> bool {
301 self.span.is_some()
302 }
303
304 fn with_remote_span_context(&self, span_context: crate::trace::SpanContext) -> Self {
305 self.with_synchronized_span(span_context.into())
306 }
307}
308
309/// Mark a given `Span` as active.
310///
311/// The `Tracer` MUST provide a way to update its active `Span`, and MAY provide convenience
312/// methods to manage a `Span`'s lifetime and the scope in which a `Span` is active. When an
313/// active `Span` is made inactive, the previously-active `Span` SHOULD be made active. A `Span`
314/// maybe finished (i.e. have a non-null end time) but still be active. A `Span` may be active
315/// on one thread after it has been made inactive on another.
316///
317/// # Examples
318///
319/// ```
320/// use opentelemetry::{global, trace::{Span, Tracer}, KeyValue};
321/// use opentelemetry::trace::{get_active_span, mark_span_as_active};
322///
323/// fn my_function() {
324/// let tracer = global::tracer("my-component-a");
325/// // start an active span in one function
326/// let span = tracer.start("span-name");
327/// let _guard = mark_span_as_active(span);
328/// // anything happening in functions we call can still access the active span...
329/// my_other_function();
330/// }
331///
332/// fn my_other_function() {
333/// // call methods on the current span from
334/// get_active_span(|span| {
335/// span.add_event("An event!".to_string(), vec![KeyValue::new("happened", true)]);
336/// });
337/// }
338/// ```
339#[must_use = "Dropping the guard detaches the context."]
340pub fn mark_span_as_active<T: crate::trace::Span + Send + Sync + 'static>(span: T) -> ContextGuard {
341 let cx = Context::current_with_span(span);
342 cx.attach()
343}
344
345/// Executes a closure with a reference to this thread's current span.
346///
347/// # Examples
348///
349/// ```
350/// use opentelemetry::{global, trace::{Span, Tracer}, KeyValue};
351/// use opentelemetry::trace::get_active_span;
352///
353/// fn my_function() {
354/// // start an active span in one function
355/// global::tracer("my-component").in_span("span-name", |_cx| {
356/// // anything happening in functions we call can still access the active span...
357/// my_other_function();
358/// })
359/// }
360///
361/// fn my_other_function() {
362/// // call methods on the current span from
363/// get_active_span(|span| {
364/// span.add_event("An event!", vec![KeyValue::new("happened", true)]);
365/// })
366/// }
367/// ```
368pub fn get_active_span<F, T>(f: F) -> T
369where
370 F: FnOnce(SpanRef<'_>) -> T,
371{
372 Context::map_current(|cx| f(cx.span()))
373}
374
375pin_project! {
376 /// A future, stream, or sink that has an associated context.
377 #[derive(Clone, Debug)]
378 pub struct WithContext<T> {
379 #[pin]
380 inner: T,
381 otel_cx: Context,
382 }
383}
384
385impl<T: Sized> FutureExt for T {}
386
387impl<T: std::future::Future> std::future::Future for WithContext<T> {
388 type Output = T::Output;
389
390 fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
391 let this = self.project();
392 let _guard = this.otel_cx.clone().attach();
393
394 this.inner.poll(task_cx)
395 }
396}
397
398impl<T: Stream> Stream for WithContext<T> {
399 type Item = T::Item;
400
401 fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
402 let this = self.project();
403 let _guard = this.otel_cx.clone().attach();
404 T::poll_next(this.inner, task_cx)
405 }
406}
407
408impl<I, T: Sink<I>> Sink<I> for WithContext<T>
409where
410 T: Sink<I>,
411{
412 type Error = T::Error;
413
414 fn poll_ready(
415 self: Pin<&mut Self>,
416 task_cx: &mut TaskContext<'_>,
417 ) -> Poll<Result<(), Self::Error>> {
418 let this = self.project();
419 let _guard = this.otel_cx.clone().attach();
420 T::poll_ready(this.inner, task_cx)
421 }
422
423 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
424 let this = self.project();
425 let _guard = this.otel_cx.clone().attach();
426 T::start_send(this.inner, item)
427 }
428
429 fn poll_flush(
430 self: Pin<&mut Self>,
431 task_cx: &mut TaskContext<'_>,
432 ) -> Poll<Result<(), Self::Error>> {
433 let this = self.project();
434 let _guard = this.otel_cx.clone().attach();
435 T::poll_flush(this.inner, task_cx)
436 }
437
438 fn poll_close(
439 self: Pin<&mut Self>,
440 task_cx: &mut TaskContext<'_>,
441 ) -> Poll<Result<(), Self::Error>> {
442 let this = self.project();
443 let _enter = this.otel_cx.clone().attach();
444 T::poll_close(this.inner, task_cx)
445 }
446}
447
448/// Extension trait allowing futures, streams, and sinks to be traced with a span.
449pub trait FutureExt: Sized {
450 /// Attaches the provided [`Context`] to this type, returning a `WithContext`
451 /// wrapper.
452 ///
453 /// When the wrapped type is a future, stream, or sink, the attached context
454 /// will be set as current while it is being polled.
455 ///
456 /// [`Context`]: crate::Context
457 fn with_context(self, otel_cx: Context) -> WithContext<Self> {
458 WithContext {
459 inner: self,
460 otel_cx,
461 }
462 }
463
464 /// Attaches the current [`Context`] to this type, returning a `WithContext`
465 /// wrapper.
466 ///
467 /// When the wrapped type is a future, stream, or sink, the attached context
468 /// will be set as the default while it is being polled.
469 ///
470 /// [`Context`]: crate::Context
471 fn with_current_context(self) -> WithContext<Self> {
472 let otel_cx = Context::current();
473 self.with_context(otel_cx)
474 }
475}