snix_tracing/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2
3#[cfg(feature = "clap")]
4use clap_verbosity_flag::{InfoLevel, LogLevel, Verbosity};
5#[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
6use enumset::EnumSet;
7use std::sync::LazyLock;
8use tracing::Level;
9use tracing_indicatif::{
10    IndicatifLayer, IndicatifWriter, filter::IndicatifFilter, style::ProgressStyle,
11    util::FilteredFormatFields, writer,
12};
13use tracing_subscriber::{
14    EnvFilter, Layer, Registry,
15    layer::{Identity, SubscriberExt},
16    util::SubscriberInitExt as _,
17};
18
19#[cfg(feature = "otlp")]
20use opentelemetry_sdk::{
21    Resource, propagation::TraceContextPropagator, resource::SdkProvidedResourceDetector,
22};
23#[cfg(feature = "tracy")]
24use tracing_tracy::TracyLayer;
25
26pub mod propagate;
27
28/// A classical progress bar.
29pub static PB_PROGRESS_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
30    ProgressStyle::with_template(
31        "{span_child_prefix} {wide_msg} {bar:10} ({elapsed}) {pos:>7}/{len:7}",
32    )
33    .expect("invalid progress template")
34});
35
36/// Used for file transfers, where we know an exact number of bytes and showing a trasfer speed makes sense.
37pub static PB_TRANSFER_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
38    ProgressStyle::with_template(
39        "{span_child_prefix} {wide_msg} {binary_bytes:>7}/{binary_total_bytes:7}@{decimal_bytes_per_sec} ({elapsed}) {bar:10} "
40    )
41    .expect("invalid progress template")
42});
43pub static PB_SPINNER_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
44    ProgressStyle::with_template(
45        "{span_child_prefix}{spinner} {wide_msg} ({elapsed}) {pos:>7}/{len:7}",
46    )
47    .expect("invalid progress template")
48});
49
50/// Used for long-running operations without a known total.
51/// Does not show the elapsed time either.
52pub static PB_SPINNER_LONG_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
53    ProgressStyle::with_template("{span_child_prefix}{spinner} {wide_msg} {pos:>7}/?")
54        .expect("invalid progress template")
55});
56
57#[derive(thiserror::Error, Debug)]
58pub enum Error {
59    #[error(transparent)]
60    Init(#[from] tracing_subscriber::util::TryInitError),
61
62    #[cfg(feature = "otlp")]
63    #[error(transparent)]
64    OTEL(#[from] opentelemetry_sdk::error::OTelSdkError),
65}
66
67#[derive(Clone)]
68pub struct TracingHandle {
69    stdout_writer: IndicatifWriter<writer::Stdout>,
70    stderr_writer: IndicatifWriter<writer::Stderr>,
71
72    #[cfg(feature = "chrome")]
73    #[allow(dead_code)]
74    chrome_guard: Option<std::rc::Rc<tracing_chrome::FlushGuard>>,
75
76    #[cfg(feature = "otlp")]
77    meter_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
78
79    #[cfg(feature = "otlp")]
80    tracer_provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
81}
82
83impl TracingHandle {
84    /// Returns a writer for [std::io::Stdout] that ensures its output will not be clobbered by
85    /// active progress bars.
86    ///
87    /// Instead of `println!(...)` prefer `writeln!(handle.get_stdout_writer(), ...)`
88    pub fn get_stdout_writer(&self) -> IndicatifWriter<writer::Stdout> {
89        // clone is fine here because its only a wrapper over an `Arc`
90        self.stdout_writer.clone()
91    }
92
93    /// Returns a writer for [std::io::Stderr] that ensures its output will not be clobbered by
94    /// active progress bars.
95    ///
96    /// Instead of `println!(...)` prefer `writeln!(handle.get_stderr_writer(), ...)`.
97    pub fn get_stderr_writer(&self) -> IndicatifWriter<writer::Stderr> {
98        // clone is fine here because its only a wrapper over an `Arc`
99        self.stderr_writer.clone()
100    }
101
102    /// This will flush possible attached tracing providers, e.g. otlp exported, if enabled.
103    /// If there is none enabled this will result in a noop.
104    ///
105    /// It will wait until the flush is complete.
106    pub async fn flush(&self) -> Result<(), Error> {
107        #[cfg(feature = "otlp")]
108        {
109            if let Some(tracer_provider) = &self.tracer_provider {
110                tracer_provider.force_flush()?;
111            }
112            if let Some(meter_provider) = &self.meter_provider {
113                meter_provider.force_flush()?;
114            }
115        }
116        Ok(())
117    }
118
119    /// This will flush all attached tracing providers and will wait until the flush is completed, then call shutdown.
120    /// If no tracing providers like otlp are attached then this will be a noop.
121    ///
122    /// This should only be called on a regular shutdown.
123    pub async fn shutdown(&mut self) -> Result<(), Error> {
124        self.flush().await?;
125        #[cfg(feature = "otlp")]
126        {
127            use tokio::task::spawn_blocking;
128            if let Some(tracer_provider) = self.tracer_provider.take() {
129                spawn_blocking(move || tracer_provider.shutdown())
130                    .await
131                    .map_err(|err| {
132                        Error::OTEL(opentelemetry_sdk::error::OTelSdkError::InternalFailure(
133                            err.to_string(),
134                        ))
135                    })??;
136            }
137            if let Some(meter_provider) = self.meter_provider.take() {
138                spawn_blocking(move || meter_provider.shutdown())
139                    .await
140                    .map_err(|err| {
141                        Error::OTEL(opentelemetry_sdk::error::OTelSdkError::InternalFailure(
142                            err.to_string(),
143                        ))
144                    })??;
145            }
146        }
147        #[cfg(feature = "tracy")]
148        {
149            if tracing_tracy::client::Client::is_running() {
150                unsafe { tracing_tracy::client::sys::___tracy_shutdown_profiler() }
151            }
152        }
153
154        Ok(())
155    }
156}
157
158#[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
159#[derive(enumset::EnumSetType, Debug)]
160#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
161pub enum Tracer {
162    #[cfg(feature = "otlp")]
163    Otlp,
164    #[cfg(feature = "tracy")]
165    Tracy,
166    #[cfg(feature = "chrome")]
167    ChromeStyle,
168}
169
170/// Encodes the verbosity level chosen by the user through CLI arguments.
171#[derive(Clone, Debug, PartialEq, Eq)]
172enum ChosenLevel {
173    /// Not set. We still store the default level passed as a type argument in Verbosity
174    Unset(Level),
175    /// No output at all requested (quiet mode)
176    NoOutput,
177    /// Specific log level selected
178    Level(Level),
179}
180
181#[must_use = "Don't forget to call build() to enable tracing."]
182pub struct TracingBuilder {
183    // Can be used to disable progress bars entirely,
184    // even though they would still match the chosen level
185    disable_progress_bars: bool,
186
187    #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
188    tracers: EnumSet<Tracer>,
189
190    // The desired verbosity level
191    level: ChosenLevel,
192}
193
194impl Default for TracingBuilder {
195    fn default() -> Self {
196        Self {
197            #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
198            tracers: Default::default(),
199            level: ChosenLevel::Unset(Level::INFO),
200            disable_progress_bars: false,
201        }
202    }
203}
204
205impl TracingBuilder {
206    #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
207    /// Enable the given tracer
208    pub fn enable_tracer(mut self, tracer: Tracer) -> TracingBuilder {
209        self.tracers.insert(tracer);
210        self
211    }
212
213    #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
214    /// Enable the given tracers
215    pub fn enable_tracers<I>(mut self, tracers: I) -> TracingBuilder
216    where
217        I: IntoIterator<Item = Tracer>,
218    {
219        self.tracers.extend(tracers);
220        self
221    }
222
223    /// Disable progress bars explicitly, even though they would still match the chosen log level.
224    pub fn disable_progress_bars(mut self) -> TracingBuilder {
225        self.disable_progress_bars = true;
226        self
227    }
228
229    /// This will setup tracing based on the configuration passed in.
230    /// It will setup a stderr writer output layer and configure EnvFilter to honor RUST_LOG.
231    /// The EnvFilter will be applied to all configured layers, also otlp.
232    ///
233    /// It will also configure otlp if the feature is enabled and a service_name was provided. It
234    /// will then correctly setup a channel which is later used for flushing the provider.
235    pub fn build(self) -> Result<TracingHandle, Error> {
236        self.build_with_additional(Identity::new())
237    }
238
239    /// Similar to `build()` but allows passing in an additional tracing [`Layer`].
240    ///
241    /// This method is generic over the `Layer` to avoid the runtime cost of dynamic dispatch.
242    /// While it only allows passing a single `Layer`, it can be composed of multiple ones:
243    ///
244    /// ```ignore
245    /// build_with_additional(
246    ///   fmt::layer()
247    ///     .and_then(some_other_layer)
248    ///     .and_then(yet_another_layer)
249    ///     .with_filter(my_filter)
250    /// )
251    /// ```
252    /// [`Layer`]: tracing_subscriber::layer::Layer
253    pub fn build_with_additional<L>(self, additional_layer: L) -> Result<TracingHandle, Error>
254    where
255        L: Layer<Registry> + Send + Sync + 'static,
256    {
257        // Set up the tracing subscriber.
258        let indicatif_layer = IndicatifLayer::new().with_progress_style(PB_SPINNER_STYLE.clone());
259        let stdout_writer = indicatif_layer.get_stdout_writer();
260        let stderr_writer = indicatif_layer.get_stderr_writer();
261
262        let layered = tracing_subscriber::fmt::Layer::new()
263            .fmt_fields(FilteredFormatFields::new(
264                tracing_subscriber::fmt::format::DefaultFields::new(),
265                |field| field.name() != "indicatif.pb_show",
266            ))
267            .with_writer(indicatif_layer.get_stderr_writer())
268            .compact()
269            .with_filter(construct_filter(self.level.to_owned()))
270            .and_then((!self.disable_progress_bars).then(|| {
271                indicatif_layer.with_filter(
272                    // only show progress for spans with indicatif.pb_show field being set
273                    IndicatifFilter::new(false),
274                )
275            }));
276
277        #[cfg(feature = "chrome")]
278        let (layered, chrome_guard) = if self.tracers.contains(Tracer::ChromeStyle) {
279            let (chrome_layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
280                .include_args(true)
281                .trace_style(tracing_chrome::TraceStyle::Async)
282                .build();
283            (
284                Layer::and_then(layered, Some(chrome_layer)),
285                Some(std::rc::Rc::new(guard)),
286            )
287        } else {
288            (Layer::and_then(layered, None), None)
289        };
290
291        #[cfg(feature = "otlp")]
292        let mut g_tracer_provider = None;
293        #[cfg(feature = "otlp")]
294        let mut g_meter_provider = None;
295
296        // Setup otlp if a service_name is configured
297        #[cfg(feature = "otlp")]
298        let layered = Layer::and_then(layered, {
299            self.tracers.contains(Tracer::Otlp).then(|| {
300                use opentelemetry::trace::TracerProvider;
301
302                // register a text map propagator for trace propagation
303                opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
304
305                let tracer_provider =
306                    gen_tracer_provider().expect("Unable to configure trace provider");
307
308                let meter_provider =
309                    gen_meter_provider().expect("Unable to configure meter provider");
310
311                // Register the returned meter provider as the global one.
312                // FUTUREWORK: store in the struct and provide getter too?
313                opentelemetry::global::set_meter_provider(meter_provider.clone());
314
315                g_tracer_provider = Some(tracer_provider.clone());
316                g_meter_provider = Some(meter_provider);
317
318                // Create a tracing layer with the configured tracer
319                tracing_opentelemetry::layer().with_tracer(tracer_provider.tracer("snix"))
320            })
321        });
322
323        #[cfg(feature = "tracy")]
324        let layered = Layer::and_then(
325            layered,
326            self.tracers.contains(Tracer::Tracy).then(|| {
327                let _client = tracing_tracy::client::Client::start();
328                TracyLayer::default()
329            }),
330        );
331
332        tracing_subscriber::registry()
333            // TODO: if additional_layer has global filters, there is a risk that it will disable the "default" ones,
334            // while it could be solved by registering `additional_layer` last, it requires boxing `additional_layer`.
335            .with(additional_layer)
336            .with(layered)
337            .try_init()?;
338
339        Ok(TracingHandle {
340            stdout_writer,
341            stderr_writer,
342
343            #[cfg(feature = "otlp")]
344            meter_provider: g_meter_provider,
345            #[cfg(feature = "otlp")]
346            tracer_provider: g_tracer_provider,
347            #[cfg(feature = "chrome")]
348            chrome_guard,
349        })
350    }
351
352    #[cfg(feature = "clap")]
353    /// Configure with verbosity flags.
354    pub fn handle_verbosity_flags<L: LogLevel>(mut self, args: &Verbosity<L>) -> Self {
355        if args.is_silent() {
356            self.level = ChosenLevel::NoOutput;
357            self.disable_progress_bars = true;
358            return self;
359        }
360
361        use std::io::IsTerminal;
362        if !std::io::stderr().is_terminal() {
363            self.disable_progress_bars = true
364        }
365
366        if args.is_present() {
367            self.level = ChosenLevel::Level(args.tracing_level().expect("not silent"));
368        } else {
369            self.level = ChosenLevel::Unset(args.tracing_level().expect("not silent"))
370        }
371
372        self
373    }
374
375    #[cfg(feature = "clap")]
376    /// Configure with the tracing-related args.
377    pub fn handle_tracing_args<L: LogLevel>(
378        #[allow(unused_mut)] mut self,
379        args: &TracingArgs<L>,
380    ) -> Self {
381        #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
382        {
383            self = self.enable_tracers(args.tracers());
384        }
385
386        self.handle_verbosity_flags(&args.verbosity)
387    }
388}
389
390#[cfg(feature = "otlp")]
391fn gen_resources() -> Resource {
392    // use SdkProvidedResourceDetector.detect to detect resources.
393    Resource::builder()
394        .with_detector(Box::new(SdkProvidedResourceDetector))
395        .build()
396}
397
398/// Returns an OTLP tracer, and the TX part of a channel, which can be used
399/// to request flushes (and signal back the completion of the flush).
400#[cfg(feature = "otlp")]
401fn gen_tracer_provider()
402-> Result<opentelemetry_sdk::trace::SdkTracerProvider, opentelemetry_otlp::ExporterBuildError> {
403    use opentelemetry_otlp::{ExportConfig, SpanExporter, WithExportConfig};
404
405    let exporter = SpanExporter::builder()
406        .with_tonic()
407        .with_export_config(ExportConfig::default())
408        .build()?;
409
410    let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
411        .with_batch_exporter(exporter)
412        .with_resource(gen_resources())
413        .build();
414    // Unclear how to configure this
415    // let batch_config = BatchConfigBuilder::default()
416    //     // the default values for `max_export_batch_size` is set to 512, which we will fill
417    //     // pretty quickly, which will then result in an export. We want to make sure that
418    //     // the export is only done once the schedule is met and not as soon as 512 spans
419    //     // are collected.
420    //     .with_max_export_batch_size(4096)
421    //     // analog to default config `max_export_batch_size * 4`
422    //     .with_max_queue_size(4096 * 4)
423    //     // only force an export to the otlp collector every 10 seconds to reduce the amount
424    //     // of error messages if an otlp collector is not available
425    //     .with_scheduled_delay(std::time::Duration::from_secs(10))
426    //     .build();
427
428    // use opentelemetry_sdk::trace::BatchSpanProcessor;
429    // let batch_span_processor = BatchSpanProcessor::builder(exporter, runtime::Tokio)
430    //     .with_batch_config(batch_config)
431    //     .build();
432
433    Ok(tracer_provider)
434}
435
436// Metric export interval should be less than or equal to 15s
437// if the metrics may be converted to Prometheus metrics.
438// Prometheus' query engine and compatible implementations
439// require ~4 data points / interval for range queries,
440// so queries ranging over 1m requre <= 15s scrape intervals.
441// OTEL SDKS also respect the env var `OTEL_METRIC_EXPORT_INTERVAL` (no underscore prefix).
442const _OTEL_METRIC_EXPORT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);
443
444#[cfg(feature = "otlp")]
445fn gen_meter_provider()
446-> Result<opentelemetry_sdk::metrics::SdkMeterProvider, opentelemetry_otlp::ExporterBuildError> {
447    use std::time::Duration;
448
449    use opentelemetry_otlp::WithExportConfig;
450    use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
451    let exporter = opentelemetry_otlp::MetricExporter::builder()
452        .with_tonic()
453        .with_timeout(Duration::from_secs(10))
454        .build()?;
455
456    let reader = PeriodicReader::builder(exporter)
457        .with_interval(_OTEL_METRIC_EXPORT_INTERVAL)
458        .build();
459
460    Ok(SdkMeterProvider::builder()
461        .with_reader(reader)
462        .with_resource(gen_resources())
463        .build())
464}
465
466#[cfg(feature = "clap")]
467#[derive(clap::Parser, Clone)]
468pub struct TracingArgs<L: LogLevel = InfoLevel> {
469    #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
470    /// Which tracers to enable.
471    #[arg(long, value_enum, action(clap::ArgAction::Append), env)]
472    tracer: Vec<Tracer>,
473
474    #[clap(flatten)]
475    verbosity: Verbosity<L>,
476}
477
478#[cfg(feature = "clap")]
479impl<L: LogLevel> TracingArgs<L> {
480    #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
481    pub fn tracers(&self) -> EnumSet<Tracer> {
482        self.tracer.iter().cloned().collect()
483    }
484}
485
486/// Helper assembling a filter filtering events for the [ChosenLevel].
487fn construct_filter<S>(level: ChosenLevel) -> impl tracing_subscriber::layer::Filter<S> {
488    let mut b = EnvFilter::builder();
489    if let ChosenLevel::Unset(level) = level {
490        b = b.with_default_directive(level.to_owned().into());
491    }
492    let mut f = b.from_env().expect("invalid RUST_LOG");
493    if let ChosenLevel::Level(level) = level {
494        f = f.add_directive(level.to_owned().into());
495    }
496    f
497}