snix_tracing/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2
3#[cfg(feature = "clap")]
4use clap_verbosity_flag::{InfoLevel, LogLevel, Verbosity};
5#[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
6use enumset::EnumSet;
7use std::sync::LazyLock;
8use tracing::Level;
9use tracing_indicatif::{
10    IndicatifLayer, IndicatifWriter, filter::IndicatifFilter, style::ProgressStyle,
11    util::FilteredFormatFields, writer,
12};
13use tracing_subscriber::{
14    EnvFilter, Layer, Registry,
15    layer::{Identity, SubscriberExt},
16    util::SubscriberInitExt as _,
17};
18
19#[cfg(feature = "otlp")]
20use opentelemetry_sdk::{
21    Resource, propagation::TraceContextPropagator, resource::SdkProvidedResourceDetector,
22};
23#[cfg(feature = "tracy")]
24use tracing_tracy::TracyLayer;
25
26pub mod propagate;
27
28/// A classical progress bar.
29pub static PB_PROGRESS_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
30    ProgressStyle::with_template(
31        "{span_child_prefix} {wide_msg} {bar:10} ({elapsed}) {pos:>7}/{len:7}",
32    )
33    .expect("invalid progress template")
34});
35
36/// Used for file transfers, where we know an exact number of bytes and showing a trasfer speed makes sense.
37pub static PB_TRANSFER_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
38    ProgressStyle::with_template(
39        "{span_child_prefix} {wide_msg} {binary_bytes:>7}/{binary_total_bytes:7}@{decimal_bytes_per_sec} ({elapsed}) {bar:10} "
40    )
41    .expect("invalid progress template")
42});
43pub static PB_SPINNER_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
44    ProgressStyle::with_template(
45        "{span_child_prefix}{spinner} {wide_msg} ({elapsed}) {pos:>7}/{len:7}",
46    )
47    .expect("invalid progress template")
48});
49
50/// Used for long-running operations without a known total.
51/// Does not show the elapsed time either.
52pub static PB_SPINNER_LONG_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
53    ProgressStyle::with_template("{span_child_prefix}{spinner} {wide_msg} {pos:>7}/?")
54        .expect("invalid progress template")
55});
56
57#[derive(thiserror::Error, Debug)]
58pub enum Error {
59    #[error(transparent)]
60    Init(#[from] tracing_subscriber::util::TryInitError),
61
62    #[cfg(feature = "otlp")]
63    #[error(transparent)]
64    OTEL(#[from] opentelemetry_sdk::error::OTelSdkError),
65}
66
67#[derive(Clone)]
68pub struct TracingHandle {
69    stdout_writer: IndicatifWriter<writer::Stdout>,
70    stderr_writer: IndicatifWriter<writer::Stderr>,
71
72    #[cfg(feature = "chrome")]
73    #[allow(dead_code)]
74    chrome_guard: Option<std::rc::Rc<tracing_chrome::FlushGuard>>,
75
76    #[cfg(feature = "otlp")]
77    meter_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
78
79    #[cfg(feature = "otlp")]
80    tracer_provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
81}
82
83impl TracingHandle {
84    /// Returns a writer for [std::io::Stdout] that ensures its output will not be clobbered by
85    /// active progress bars.
86    ///
87    /// Instead of `println!(...)` prefer `writeln!(handle.get_stdout_writer(), ...)`
88    pub fn get_stdout_writer(&self) -> IndicatifWriter<writer::Stdout> {
89        // clone is fine here because its only a wrapper over an `Arc`
90        self.stdout_writer.clone()
91    }
92
93    /// Returns a writer for [std::io::Stderr] that ensures its output will not be clobbered by
94    /// active progress bars.
95    ///
96    /// Instead of `println!(...)` prefer `writeln!(handle.get_stderr_writer(), ...)`.
97    pub fn get_stderr_writer(&self) -> IndicatifWriter<writer::Stderr> {
98        // clone is fine here because its only a wrapper over an `Arc`
99        self.stderr_writer.clone()
100    }
101
102    /// This will flush possible attached tracing providers, e.g. otlp exported, if enabled.
103    /// If there is none enabled this will result in a noop.
104    ///
105    /// It will wait until the flush is complete.
106    pub async fn flush(&self) -> Result<(), Error> {
107        #[cfg(feature = "otlp")]
108        {
109            if let Some(tracer_provider) = &self.tracer_provider {
110                tracer_provider.force_flush()?;
111            }
112            if let Some(meter_provider) = &self.meter_provider {
113                meter_provider.force_flush()?;
114            }
115        }
116        Ok(())
117    }
118
119    /// This will flush all attached tracing providers and will wait until the flush is completed, then call shutdown.
120    /// If no tracing providers like otlp are attached then this will be a noop.
121    ///
122    /// This should only be called on a regular shutdown.
123    pub async fn shutdown(&mut self) -> Result<(), Error> {
124        self.flush().await?;
125        #[cfg(feature = "otlp")]
126        {
127            use tokio::task::spawn_blocking;
128            if let Some(tracer_provider) = self.tracer_provider.take() {
129                spawn_blocking(move || tracer_provider.shutdown())
130                    .await
131                    .map_err(|err| {
132                        Error::OTEL(opentelemetry_sdk::error::OTelSdkError::InternalFailure(
133                            err.to_string(),
134                        ))
135                    })??;
136            }
137            if let Some(meter_provider) = self.meter_provider.take() {
138                spawn_blocking(move || meter_provider.shutdown())
139                    .await
140                    .map_err(|err| {
141                        Error::OTEL(opentelemetry_sdk::error::OTelSdkError::InternalFailure(
142                            err.to_string(),
143                        ))
144                    })??;
145            }
146        }
147        #[cfg(feature = "tracy")]
148        {
149            if tracing_tracy::client::Client::is_running() {
150                unsafe { tracing_tracy::client::sys::___tracy_shutdown_profiler() }
151            }
152        }
153
154        Ok(())
155    }
156}
157
158#[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
159#[derive(enumset::EnumSetType, Debug)]
160#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
161pub enum Tracer {
162    #[cfg(feature = "otlp")]
163    Otlp,
164    #[cfg(feature = "tracy")]
165    Tracy,
166    #[cfg(feature = "chrome")]
167    ChromeStyle,
168}
169
170/// Encodes the verbosity level chosen by the user through CLI arguments.
171#[derive(Clone, Debug, PartialEq, Eq)]
172enum ChosenLevel {
173    /// Not set. We still store the default level passed as a type argument in Verbosity
174    Unset(Level),
175    /// No output at all requested (quiet mode)
176    NoOutput,
177    /// Specific log level selected
178    Level(Level),
179}
180
181#[must_use = "Don't forget to call build() to enable tracing."]
182pub struct TracingBuilder {
183    // Can be used to disable progress bars entirely,
184    // even though they would still match the chosen level
185    disable_progress_bars: bool,
186
187    #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
188    tracers: EnumSet<Tracer>,
189
190    // The desired verbosity level
191    level: ChosenLevel,
192}
193
194impl Default for TracingBuilder {
195    fn default() -> Self {
196        Self {
197            #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
198            tracers: Default::default(),
199            level: ChosenLevel::Unset(Level::INFO),
200            disable_progress_bars: false,
201        }
202    }
203}
204
205impl TracingBuilder {
206    #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
207    /// Enable the given tracer
208    pub fn enable_tracer(mut self, tracer: Tracer) -> TracingBuilder {
209        self.tracers.insert(tracer);
210        self
211    }
212
213    #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
214    /// Enable the given tracers
215    pub fn enable_tracers<I>(mut self, tracers: I) -> TracingBuilder
216    where
217        I: IntoIterator<Item = Tracer>,
218    {
219        self.tracers.extend(tracers);
220        self
221    }
222
223    /// Disable progress bars explicitly, even though they would still match the chosen log level.
224    pub fn disable_progress_bars(mut self) -> TracingBuilder {
225        self.disable_progress_bars = true;
226        self
227    }
228
229    /// This will setup tracing based on the configuration passed in.
230    /// It will setup a stderr writer output layer and configure EnvFilter to honor RUST_LOG.
231    /// The EnvFilter will be applied to all configured layers, also otlp.
232    ///
233    /// It will also configure otlp if the feature is enabled and a service_name was provided. It
234    /// will then correctly setup a channel which is later used for flushing the provider.
235    pub fn build(self) -> Result<TracingHandle, Error> {
236        self.build_with_additional(Identity::new())
237    }
238
239    /// Similar to `build()` but allows passing in an additional tracing [`Layer`].
240    ///
241    /// This method is generic over the `Layer` to avoid the runtime cost of dynamic dispatch.
242    /// While it only allows passing a single `Layer`, it can be composed of multiple ones:
243    ///
244    /// ```ignore
245    /// build_with_additional(
246    ///   fmt::layer()
247    ///     .and_then(some_other_layer)
248    ///     .and_then(yet_another_layer)
249    ///     .with_filter(my_filter)
250    /// )
251    /// ```
252    /// [`Layer`]: tracing_subscriber::layer::Layer
253    pub fn build_with_additional<L>(self, additional_layer: L) -> Result<TracingHandle, Error>
254    where
255        L: Layer<Registry> + Send + Sync + 'static,
256    {
257        // Set up the tracing subscriber.
258        let indicatif_layer = IndicatifLayer::new().with_progress_style(PB_SPINNER_STYLE.clone());
259        let stdout_writer = indicatif_layer.get_stdout_writer();
260        let stderr_writer = indicatif_layer.get_stderr_writer();
261
262        let layered = tracing_subscriber::fmt::Layer::new()
263            .fmt_fields(FilteredFormatFields::new(
264                tracing_subscriber::fmt::format::DefaultFields::new(),
265                |field| field.name() != "indicatif.pb_show",
266            ))
267            .with_writer(indicatif_layer.get_stderr_writer())
268            .compact()
269            .and_then((!self.disable_progress_bars).then(|| {
270                indicatif_layer.with_filter(
271                    // only show progress for spans with indicatif.pb_show field being set
272                    IndicatifFilter::new(false),
273                )
274            }));
275
276        #[cfg(feature = "chrome")]
277        let (layered, chrome_guard) = if self.tracers.contains(Tracer::ChromeStyle) {
278            let (chrome_layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
279                .include_args(true)
280                .trace_style(tracing_chrome::TraceStyle::Async)
281                .build();
282            (
283                Layer::and_then(layered, Some(chrome_layer)),
284                Some(std::rc::Rc::new(guard)),
285            )
286        } else {
287            (Layer::and_then(layered, None), None)
288        };
289
290        #[cfg(feature = "otlp")]
291        let mut g_tracer_provider = None;
292        #[cfg(feature = "otlp")]
293        let mut g_meter_provider = None;
294
295        // Setup otlp if a service_name is configured
296        #[cfg(feature = "otlp")]
297        let layered = Layer::and_then(layered, {
298            self.tracers.contains(Tracer::Otlp).then(|| {
299                use opentelemetry::trace::TracerProvider;
300
301                // register a text map propagator for trace propagation
302                opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
303
304                let tracer_provider =
305                    gen_tracer_provider().expect("Unable to configure trace provider");
306
307                let meter_provider =
308                    gen_meter_provider().expect("Unable to configure meter provider");
309
310                // Register the returned meter provider as the global one.
311                // FUTUREWORK: store in the struct and provide getter too?
312                opentelemetry::global::set_meter_provider(meter_provider.clone());
313
314                g_tracer_provider = Some(tracer_provider.clone());
315                g_meter_provider = Some(meter_provider);
316
317                // Create a tracing layer with the configured tracer
318                tracing_opentelemetry::layer().with_tracer(tracer_provider.tracer("snix"))
319            })
320        });
321
322        #[cfg(feature = "tracy")]
323        let layered = Layer::and_then(
324            layered,
325            self.tracers.contains(Tracer::Tracy).then(|| {
326                let _client = tracing_tracy::client::Client::start();
327                TracyLayer::default()
328            }),
329        );
330
331        let layered = layered.with_filter(construct_filter(self.level.to_owned()));
332
333        tracing_subscriber::registry()
334            // TODO: if additional_layer has global filters, there is a risk that it will disable the "default" ones,
335            // while it could be solved by registering `additional_layer` last, it requires boxing `additional_layer`.
336            .with(additional_layer)
337            .with(layered)
338            .try_init()?;
339
340        Ok(TracingHandle {
341            stdout_writer,
342            stderr_writer,
343
344            #[cfg(feature = "otlp")]
345            meter_provider: g_meter_provider,
346            #[cfg(feature = "otlp")]
347            tracer_provider: g_tracer_provider,
348            #[cfg(feature = "chrome")]
349            chrome_guard,
350        })
351    }
352
353    #[cfg(feature = "clap")]
354    /// Configure with verbosity flags.
355    pub fn handle_verbosity_flags<L: LogLevel>(mut self, args: &Verbosity<L>) -> Self {
356        if args.is_silent() {
357            self.level = ChosenLevel::NoOutput;
358            self.disable_progress_bars = true;
359            return self;
360        }
361
362        use std::io::IsTerminal;
363        if !std::io::stderr().is_terminal() {
364            self.disable_progress_bars = true
365        }
366
367        if args.is_present() {
368            self.level = ChosenLevel::Level(args.tracing_level().expect("not silent"));
369        } else {
370            self.level = ChosenLevel::Unset(args.tracing_level().expect("not silent"))
371        }
372
373        self
374    }
375
376    #[cfg(feature = "clap")]
377    /// Configure with the tracing-related args.
378    pub fn handle_tracing_args<L: LogLevel>(
379        #[allow(unused_mut)] mut self,
380        args: &TracingArgs<L>,
381    ) -> Self {
382        #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
383        {
384            self = self.enable_tracers(args.tracers());
385        }
386
387        self.handle_verbosity_flags(&args.verbosity)
388    }
389}
390
391#[cfg(feature = "otlp")]
392fn gen_resources() -> Resource {
393    // use SdkProvidedResourceDetector.detect to detect resources.
394    Resource::builder()
395        .with_detector(Box::new(SdkProvidedResourceDetector))
396        .build()
397}
398
399/// Returns an OTLP tracer, and the TX part of a channel, which can be used
400/// to request flushes (and signal back the completion of the flush).
401#[cfg(feature = "otlp")]
402fn gen_tracer_provider()
403-> Result<opentelemetry_sdk::trace::SdkTracerProvider, opentelemetry_otlp::ExporterBuildError> {
404    use opentelemetry_otlp::{ExportConfig, SpanExporter, WithExportConfig};
405
406    let exporter = SpanExporter::builder()
407        .with_tonic()
408        .with_export_config(ExportConfig::default())
409        .build()?;
410
411    let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
412        .with_batch_exporter(exporter)
413        .with_resource(gen_resources())
414        .build();
415    // Unclear how to configure this
416    // let batch_config = BatchConfigBuilder::default()
417    //     // the default values for `max_export_batch_size` is set to 512, which we will fill
418    //     // pretty quickly, which will then result in an export. We want to make sure that
419    //     // the export is only done once the schedule is met and not as soon as 512 spans
420    //     // are collected.
421    //     .with_max_export_batch_size(4096)
422    //     // analog to default config `max_export_batch_size * 4`
423    //     .with_max_queue_size(4096 * 4)
424    //     // only force an export to the otlp collector every 10 seconds to reduce the amount
425    //     // of error messages if an otlp collector is not available
426    //     .with_scheduled_delay(std::time::Duration::from_secs(10))
427    //     .build();
428
429    // use opentelemetry_sdk::trace::BatchSpanProcessor;
430    // let batch_span_processor = BatchSpanProcessor::builder(exporter, runtime::Tokio)
431    //     .with_batch_config(batch_config)
432    //     .build();
433
434    Ok(tracer_provider)
435}
436
437// Metric export interval should be less than or equal to 15s
438// if the metrics may be converted to Prometheus metrics.
439// Prometheus' query engine and compatible implementations
440// require ~4 data points / interval for range queries,
441// so queries ranging over 1m requre <= 15s scrape intervals.
442// OTEL SDKS also respect the env var `OTEL_METRIC_EXPORT_INTERVAL` (no underscore prefix).
443const _OTEL_METRIC_EXPORT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);
444
445#[cfg(feature = "otlp")]
446fn gen_meter_provider()
447-> Result<opentelemetry_sdk::metrics::SdkMeterProvider, opentelemetry_otlp::ExporterBuildError> {
448    use std::time::Duration;
449
450    use opentelemetry_otlp::WithExportConfig;
451    use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
452    let exporter = opentelemetry_otlp::MetricExporter::builder()
453        .with_tonic()
454        .with_timeout(Duration::from_secs(10))
455        .build()?;
456
457    let reader = PeriodicReader::builder(exporter)
458        .with_interval(_OTEL_METRIC_EXPORT_INTERVAL)
459        .build();
460
461    Ok(SdkMeterProvider::builder()
462        .with_reader(reader)
463        .with_resource(gen_resources())
464        .build())
465}
466
467#[cfg(feature = "clap")]
468#[derive(clap::Parser, Clone)]
469pub struct TracingArgs<L: LogLevel = InfoLevel> {
470    #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
471    /// Which tracers to enable.
472    #[arg(long, value_enum, action(clap::ArgAction::Append))]
473    tracer: Vec<Tracer>,
474
475    #[clap(flatten)]
476    verbosity: Verbosity<L>,
477}
478
479#[cfg(feature = "clap")]
480impl<L: LogLevel> TracingArgs<L> {
481    #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
482    pub fn tracers(&self) -> EnumSet<Tracer> {
483        self.tracer.iter().cloned().collect()
484    }
485}
486
487/// Helper assembling a filter filtering events for the [ChosenLevel].
488fn construct_filter<S>(level: ChosenLevel) -> impl tracing_subscriber::layer::Filter<S> {
489    let mut b = EnvFilter::builder();
490    if let ChosenLevel::Unset(level) = level {
491        b = b.with_default_directive(level.to_owned().into());
492    }
493    let mut f = b.from_env().expect("invalid RUST_LOG");
494    if let ChosenLevel::Level(level) = level {
495        f = f.add_directive(level.to_owned().into());
496    }
497    f
498}