snix_tracing/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2
3#[cfg(feature = "clap")]
4use clap_verbosity_flag::{InfoLevel, LogLevel, Verbosity};
5#[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
6use enumset::EnumSet;
7use std::sync::LazyLock;
8use tracing::Level;
9use tracing_indicatif::{
10    IndicatifLayer, IndicatifWriter, filter::IndicatifFilter, style::ProgressStyle,
11    util::FilteredFormatFields, writer,
12};
13use tracing_subscriber::{
14    EnvFilter, Layer, Registry,
15    layer::{Identity, SubscriberExt},
16    util::SubscriberInitExt as _,
17};
18
19#[cfg(feature = "otlp")]
20use opentelemetry_sdk::{
21    Resource, propagation::TraceContextPropagator, resource::SdkProvidedResourceDetector,
22};
23#[cfg(feature = "tracy")]
24use tracing_tracy::TracyLayer;
25
26pub mod propagate;
27
28pub static PB_PROGRESS_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
29    ProgressStyle::with_template(
30        "{span_child_prefix} {wide_msg} {bar:10} ({elapsed}) {pos:>7}/{len:7}",
31    )
32    .expect("invalid progress template")
33});
34pub static PB_TRANSFER_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
35    ProgressStyle::with_template(
36        "{span_child_prefix} {wide_msg} {binary_bytes:>7}/{binary_total_bytes:7}@{decimal_bytes_per_sec} ({elapsed}) {bar:10} "
37    )
38    .expect("invalid progress template")
39});
40pub static PB_SPINNER_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
41    ProgressStyle::with_template(
42        "{span_child_prefix}{spinner} {wide_msg} ({elapsed}) {pos:>7}/{len:7}",
43    )
44    .expect("invalid progress template")
45});
46
47#[derive(thiserror::Error, Debug)]
48pub enum Error {
49    #[error(transparent)]
50    Init(#[from] tracing_subscriber::util::TryInitError),
51
52    #[cfg(feature = "otlp")]
53    #[error(transparent)]
54    OTEL(#[from] opentelemetry_sdk::error::OTelSdkError),
55}
56
57#[derive(Clone)]
58pub struct TracingHandle {
59    stdout_writer: IndicatifWriter<writer::Stdout>,
60    stderr_writer: IndicatifWriter<writer::Stderr>,
61
62    #[cfg(feature = "chrome")]
63    #[allow(dead_code)]
64    chrome_guard: Option<std::rc::Rc<tracing_chrome::FlushGuard>>,
65
66    #[cfg(feature = "otlp")]
67    meter_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
68
69    #[cfg(feature = "otlp")]
70    tracer_provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
71}
72
73impl TracingHandle {
74    /// Returns a writer for [std::io::Stdout] that ensures its output will not be clobbered by
75    /// active progress bars.
76    ///
77    /// Instead of `println!(...)` prefer `writeln!(handle.get_stdout_writer(), ...)`
78    pub fn get_stdout_writer(&self) -> IndicatifWriter<writer::Stdout> {
79        // clone is fine here because its only a wrapper over an `Arc`
80        self.stdout_writer.clone()
81    }
82
83    /// Returns a writer for [std::io::Stderr] that ensures its output will not be clobbered by
84    /// active progress bars.
85    ///
86    /// Instead of `println!(...)` prefer `writeln!(handle.get_stderr_writer(), ...)`.
87    pub fn get_stderr_writer(&self) -> IndicatifWriter<writer::Stderr> {
88        // clone is fine here because its only a wrapper over an `Arc`
89        self.stderr_writer.clone()
90    }
91
92    /// This will flush possible attached tracing providers, e.g. otlp exported, if enabled.
93    /// If there is none enabled this will result in a noop.
94    ///
95    /// It will wait until the flush is complete.
96    pub async fn flush(&self) -> Result<(), Error> {
97        #[cfg(feature = "otlp")]
98        {
99            if let Some(tracer_provider) = &self.tracer_provider {
100                tracer_provider.force_flush()?;
101            }
102            if let Some(meter_provider) = &self.meter_provider {
103                meter_provider.force_flush()?;
104            }
105        }
106        Ok(())
107    }
108
109    /// This will flush all attached tracing providers and will wait until the flush is completed, then call shutdown.
110    /// If no tracing providers like otlp are attached then this will be a noop.
111    ///
112    /// This should only be called on a regular shutdown.
113    pub async fn shutdown(&mut self) -> Result<(), Error> {
114        self.flush().await?;
115        #[cfg(feature = "otlp")]
116        {
117            use tokio::task::spawn_blocking;
118            if let Some(tracer_provider) = self.tracer_provider.take() {
119                spawn_blocking(move || tracer_provider.shutdown())
120                    .await
121                    .map_err(|err| {
122                        Error::OTEL(opentelemetry_sdk::error::OTelSdkError::InternalFailure(
123                            err.to_string(),
124                        ))
125                    })??;
126            }
127            if let Some(meter_provider) = self.meter_provider.take() {
128                spawn_blocking(move || meter_provider.shutdown())
129                    .await
130                    .map_err(|err| {
131                        Error::OTEL(opentelemetry_sdk::error::OTelSdkError::InternalFailure(
132                            err.to_string(),
133                        ))
134                    })??;
135            }
136        }
137
138        Ok(())
139    }
140}
141
142#[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
143#[derive(enumset::EnumSetType, Debug)]
144#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
145pub enum Tracer {
146    #[cfg(feature = "otlp")]
147    Otlp,
148    #[cfg(feature = "tracy")]
149    Tracy,
150    #[cfg(feature = "chrome")]
151    ChromeStyle,
152}
153
154/// Encodes the verbosity level chosen by the user through CLI arguments.
155#[derive(Clone, Debug, PartialEq, Eq)]
156enum ChosenLevel {
157    /// Not set. We still store the default level passed as a type argument in Verbosity
158    Unset(Level),
159    /// No output at all requested (quiet mode)
160    NoOutput,
161    /// Specific log level selected
162    Level(Level),
163}
164
165#[must_use = "Don't forget to call build() to enable tracing."]
166pub struct TracingBuilder {
167    // Can be used to disable progress bars entirely,
168    // even though they would still match the chosen level
169    disable_progress_bars: bool,
170
171    #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
172    tracers: EnumSet<Tracer>,
173
174    // The desired verbosity level
175    level: ChosenLevel,
176}
177
178impl Default for TracingBuilder {
179    fn default() -> Self {
180        Self {
181            #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
182            tracers: Default::default(),
183            level: ChosenLevel::Unset(Level::INFO),
184            disable_progress_bars: false,
185        }
186    }
187}
188
189impl TracingBuilder {
190    #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
191    /// Enable the given tracer
192    pub fn enable_tracer(mut self, tracer: Tracer) -> TracingBuilder {
193        self.tracers.insert(tracer);
194        self
195    }
196
197    #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
198    /// Enable the given tracers
199    pub fn enable_tracers<I>(mut self, tracers: I) -> TracingBuilder
200    where
201        I: IntoIterator<Item = Tracer>,
202    {
203        self.tracers.extend(tracers);
204        self
205    }
206
207    /// Disable progress bars explicitly, even though they would still match the chosen log level.
208    pub fn disable_progress_bars(mut self) -> TracingBuilder {
209        self.disable_progress_bars = true;
210        self
211    }
212
213    /// This will setup tracing based on the configuration passed in.
214    /// It will setup a stderr writer output layer and configure EnvFilter to honor RUST_LOG.
215    /// The EnvFilter will be applied to all configured layers, also otlp.
216    ///
217    /// It will also configure otlp if the feature is enabled and a service_name was provided. It
218    /// will then correctly setup a channel which is later used for flushing the provider.
219    pub fn build(self) -> Result<TracingHandle, Error> {
220        self.build_with_additional(Identity::new())
221    }
222
223    /// Similar to `build()` but allows passing in an additional tracing [`Layer`].
224    ///
225    /// This method is generic over the `Layer` to avoid the runtime cost of dynamic dispatch.
226    /// While it only allows passing a single `Layer`, it can be composed of multiple ones:
227    ///
228    /// ```ignore
229    /// build_with_additional(
230    ///   fmt::layer()
231    ///     .and_then(some_other_layer)
232    ///     .and_then(yet_another_layer)
233    ///     .with_filter(my_filter)
234    /// )
235    /// ```
236    /// [`Layer`]: tracing_subscriber::layer::Layer
237    pub fn build_with_additional<L>(self, additional_layer: L) -> Result<TracingHandle, Error>
238    where
239        L: Layer<Registry> + Send + Sync + 'static,
240    {
241        // Set up the tracing subscriber.
242        let indicatif_layer = IndicatifLayer::new().with_progress_style(PB_SPINNER_STYLE.clone());
243        let stdout_writer = indicatif_layer.get_stdout_writer();
244        let stderr_writer = indicatif_layer.get_stderr_writer();
245
246        let layered = tracing_subscriber::fmt::Layer::new()
247            .fmt_fields(FilteredFormatFields::new(
248                tracing_subscriber::fmt::format::DefaultFields::new(),
249                |field| field.name() != "indicatif.pb_show",
250            ))
251            .with_writer(indicatif_layer.get_stderr_writer())
252            .compact()
253            .and_then((!self.disable_progress_bars).then(|| {
254                indicatif_layer.with_filter(
255                    // only show progress for spans with indicatif.pb_show field being set
256                    IndicatifFilter::new(false),
257                )
258            }));
259
260        #[cfg(feature = "chrome")]
261        let (layered, chrome_guard) = if self.tracers.contains(Tracer::ChromeStyle) {
262            let (chrome_layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
263                .include_args(true)
264                .trace_style(tracing_chrome::TraceStyle::Async)
265                .build();
266            (
267                Layer::and_then(layered, Some(chrome_layer)),
268                Some(std::rc::Rc::new(guard)),
269            )
270        } else {
271            (Layer::and_then(layered, None), None)
272        };
273
274        #[cfg(feature = "otlp")]
275        let mut g_tracer_provider = None;
276        #[cfg(feature = "otlp")]
277        let mut g_meter_provider = None;
278
279        // Setup otlp if a service_name is configured
280        #[cfg(feature = "otlp")]
281        let layered = Layer::and_then(layered, {
282            self.tracers.contains(Tracer::Otlp).then(|| {
283                use opentelemetry::trace::TracerProvider;
284
285                // register a text map propagator for trace propagation
286                opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
287
288                let tracer_provider =
289                    gen_tracer_provider().expect("Unable to configure trace provider");
290
291                let meter_provider =
292                    gen_meter_provider().expect("Unable to configure meter provider");
293
294                // Register the returned meter provider as the global one.
295                // FUTUREWORK: store in the struct and provide getter too?
296                opentelemetry::global::set_meter_provider(meter_provider.clone());
297
298                g_tracer_provider = Some(tracer_provider.clone());
299                g_meter_provider = Some(meter_provider);
300
301                // Create a tracing layer with the configured tracer
302                tracing_opentelemetry::layer().with_tracer(tracer_provider.tracer("snix"))
303            })
304        });
305
306        #[cfg(feature = "tracy")]
307        let layered = Layer::and_then(
308            layered,
309            self.tracers
310                .contains(Tracer::Tracy)
311                .then(TracyLayer::default),
312        );
313
314        let layered = layered.with_filter(construct_filter(self.level.to_owned()));
315
316        tracing_subscriber::registry()
317            // TODO: if additional_layer has global filters, there is a risk that it will disable the "default" ones,
318            // while it could be solved by registering `additional_layer` last, it requires boxing `additional_layer`.
319            .with(additional_layer)
320            .with(layered)
321            .try_init()?;
322
323        Ok(TracingHandle {
324            stdout_writer,
325            stderr_writer,
326
327            #[cfg(feature = "otlp")]
328            meter_provider: g_meter_provider,
329            #[cfg(feature = "otlp")]
330            tracer_provider: g_tracer_provider,
331            #[cfg(feature = "chrome")]
332            chrome_guard,
333        })
334    }
335
336    #[cfg(feature = "clap")]
337    /// Configure with verbosity flags.
338    pub fn handle_verbosity_flags<L: LogLevel>(mut self, args: &Verbosity<L>) -> Self {
339        if args.is_silent() {
340            self.level = ChosenLevel::NoOutput;
341            self.disable_progress_bars = true;
342            return self;
343        }
344
345        use std::io::IsTerminal;
346        if !std::io::stderr().is_terminal() {
347            self.disable_progress_bars = true
348        }
349
350        if args.is_present() {
351            self.level = ChosenLevel::Level(args.tracing_level().expect("not silent"));
352        } else {
353            self.level = ChosenLevel::Unset(args.tracing_level().expect("not silent"))
354        }
355
356        self
357    }
358
359    #[cfg(feature = "clap")]
360    /// Configure with the tracing-related args.
361    pub fn handle_tracing_args<L: LogLevel>(
362        #[allow(unused_mut)] mut self,
363        args: &TracingArgs<L>,
364    ) -> Self {
365        #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
366        {
367            self = self.enable_tracers(args.tracers());
368        }
369
370        self.handle_verbosity_flags(&args.verbosity)
371    }
372}
373
374#[cfg(feature = "otlp")]
375fn gen_resources() -> Resource {
376    // use SdkProvidedResourceDetector.detect to detect resources.
377    Resource::builder()
378        .with_detector(Box::new(SdkProvidedResourceDetector))
379        .build()
380}
381
382/// Returns an OTLP tracer, and the TX part of a channel, which can be used
383/// to request flushes (and signal back the completion of the flush).
384#[cfg(feature = "otlp")]
385fn gen_tracer_provider()
386-> Result<opentelemetry_sdk::trace::SdkTracerProvider, opentelemetry_otlp::ExporterBuildError> {
387    use opentelemetry_otlp::{ExportConfig, SpanExporter, WithExportConfig};
388
389    let exporter = SpanExporter::builder()
390        .with_tonic()
391        .with_export_config(ExportConfig::default())
392        .build()?;
393
394    let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
395        .with_batch_exporter(exporter)
396        .with_resource(gen_resources())
397        .build();
398    // Unclear how to configure this
399    // let batch_config = BatchConfigBuilder::default()
400    //     // the default values for `max_export_batch_size` is set to 512, which we will fill
401    //     // pretty quickly, which will then result in an export. We want to make sure that
402    //     // the export is only done once the schedule is met and not as soon as 512 spans
403    //     // are collected.
404    //     .with_max_export_batch_size(4096)
405    //     // analog to default config `max_export_batch_size * 4`
406    //     .with_max_queue_size(4096 * 4)
407    //     // only force an export to the otlp collector every 10 seconds to reduce the amount
408    //     // of error messages if an otlp collector is not available
409    //     .with_scheduled_delay(std::time::Duration::from_secs(10))
410    //     .build();
411
412    // use opentelemetry_sdk::trace::BatchSpanProcessor;
413    // let batch_span_processor = BatchSpanProcessor::builder(exporter, runtime::Tokio)
414    //     .with_batch_config(batch_config)
415    //     .build();
416
417    Ok(tracer_provider)
418}
419
420// Metric export interval should be less than or equal to 15s
421// if the metrics may be converted to Prometheus metrics.
422// Prometheus' query engine and compatible implementations
423// require ~4 data points / interval for range queries,
424// so queries ranging over 1m requre <= 15s scrape intervals.
425// OTEL SDKS also respect the env var `OTEL_METRIC_EXPORT_INTERVAL` (no underscore prefix).
426const _OTEL_METRIC_EXPORT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);
427
428#[cfg(feature = "otlp")]
429fn gen_meter_provider()
430-> Result<opentelemetry_sdk::metrics::SdkMeterProvider, opentelemetry_otlp::ExporterBuildError> {
431    use std::time::Duration;
432
433    use opentelemetry_otlp::WithExportConfig;
434    use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
435    let exporter = opentelemetry_otlp::MetricExporter::builder()
436        .with_tonic()
437        .with_timeout(Duration::from_secs(10))
438        .build()?;
439
440    let reader = PeriodicReader::builder(exporter)
441        .with_interval(_OTEL_METRIC_EXPORT_INTERVAL)
442        .build();
443
444    Ok(SdkMeterProvider::builder()
445        .with_reader(reader)
446        .with_resource(gen_resources())
447        .build())
448}
449
450#[cfg(feature = "clap")]
451#[derive(clap::Parser, Clone)]
452pub struct TracingArgs<L: LogLevel = InfoLevel> {
453    #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
454    /// Which tracers to enable.
455    #[arg(long, value_enum, action(clap::ArgAction::Append))]
456    tracer: Vec<Tracer>,
457
458    #[clap(flatten)]
459    verbosity: Verbosity<L>,
460}
461
462#[cfg(feature = "clap")]
463impl<L: LogLevel> TracingArgs<L> {
464    #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
465    pub fn tracers(&self) -> EnumSet<Tracer> {
466        self.tracer.iter().cloned().collect()
467    }
468}
469
470/// Helper assembling a filter filtering events for the [ChosenLevel].
471fn construct_filter<S>(level: ChosenLevel) -> impl tracing_subscriber::layer::Filter<S> {
472    let mut b = EnvFilter::builder();
473    if let ChosenLevel::Unset(level) = level {
474        b = b.with_default_directive(level.to_owned().into());
475    }
476    let mut f = b.from_env().expect("invalid RUST_LOG");
477    if let ChosenLevel::Level(level) = level {
478        f = f.add_directive(level.to_owned().into());
479    }
480    f
481}