snix_tracing/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2
3#[cfg(feature = "clap")]
4use clap_verbosity_flag::{InfoLevel, LogLevel, Verbosity};
5#[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
6use enumset::EnumSet;
7use std::sync::LazyLock;
8use tracing::{Level, level_filters::LevelFilter};
9use tracing_indicatif::{
10    IndicatifLayer, IndicatifWriter, filter::IndicatifFilter, style::ProgressStyle,
11    util::FilteredFormatFields, writer,
12};
13use tracing_subscriber::{
14    EnvFilter, Layer, Registry,
15    layer::{Identity, SubscriberExt},
16    util::SubscriberInitExt as _,
17};
18
19#[cfg(feature = "otlp")]
20use opentelemetry_sdk::{
21    Resource, propagation::TraceContextPropagator, resource::SdkProvidedResourceDetector,
22};
23#[cfg(feature = "tracy")]
24use tracing_tracy::TracyLayer;
25
26pub mod propagate;
27
28pub static PB_PROGRESS_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
29    ProgressStyle::with_template(
30        "{span_child_prefix} {wide_msg} {bar:10} ({elapsed}) {pos:>7}/{len:7}",
31    )
32    .expect("invalid progress template")
33});
34pub static PB_TRANSFER_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
35    ProgressStyle::with_template(
36        "{span_child_prefix} {wide_msg} {binary_bytes:>7}/{binary_total_bytes:7}@{decimal_bytes_per_sec} ({elapsed}) {bar:10} "
37    )
38    .expect("invalid progress template")
39});
40pub static PB_SPINNER_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
41    ProgressStyle::with_template(
42        "{span_child_prefix}{spinner} {wide_msg} ({elapsed}) {pos:>7}/{len:7}",
43    )
44    .expect("invalid progress template")
45});
46
47#[derive(thiserror::Error, Debug)]
48pub enum Error {
49    #[error(transparent)]
50    Init(#[from] tracing_subscriber::util::TryInitError),
51
52    #[cfg(feature = "otlp")]
53    #[error(transparent)]
54    OTEL(#[from] opentelemetry_sdk::error::OTelSdkError),
55}
56
57#[derive(Clone)]
58pub struct TracingHandle {
59    stdout_writer: IndicatifWriter<writer::Stdout>,
60    stderr_writer: IndicatifWriter<writer::Stderr>,
61
62    #[cfg(feature = "chrome")]
63    #[allow(dead_code)]
64    chrome_guard: Option<std::rc::Rc<tracing_chrome::FlushGuard>>,
65
66    #[cfg(feature = "otlp")]
67    meter_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
68
69    #[cfg(feature = "otlp")]
70    tracer_provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
71}
72
73impl TracingHandle {
74    /// Returns a writer for [std::io::Stdout] that ensures its output will not be clobbered by
75    /// active progress bars.
76    ///
77    /// Instead of `println!(...)` prefer `writeln!(handle.get_stdout_writer(), ...)`
78    pub fn get_stdout_writer(&self) -> IndicatifWriter<writer::Stdout> {
79        // clone is fine here because its only a wrapper over an `Arc`
80        self.stdout_writer.clone()
81    }
82
83    /// Returns a writer for [std::io::Stderr] that ensures its output will not be clobbered by
84    /// active progress bars.
85    ///
86    /// Instead of `println!(...)` prefer `writeln!(handle.get_stderr_writer(), ...)`.
87    pub fn get_stderr_writer(&self) -> IndicatifWriter<writer::Stderr> {
88        // clone is fine here because its only a wrapper over an `Arc`
89        self.stderr_writer.clone()
90    }
91
92    /// This will flush possible attached tracing providers, e.g. otlp exported, if enabled.
93    /// If there is none enabled this will result in a noop.
94    ///
95    /// It will wait until the flush is complete.
96    pub async fn flush(&self) -> Result<(), Error> {
97        #[cfg(feature = "otlp")]
98        {
99            if let Some(tracer_provider) = &self.tracer_provider {
100                tracer_provider.force_flush()?;
101            }
102            if let Some(meter_provider) = &self.meter_provider {
103                meter_provider.force_flush()?;
104            }
105        }
106        Ok(())
107    }
108
109    /// This will flush all attached tracing providers and will wait until the flush is completed, then call shutdown.
110    /// If no tracing providers like otlp are attached then this will be a noop.
111    ///
112    /// This should only be called on a regular shutdown.
113    pub async fn shutdown(&self) -> Result<(), Error> {
114        self.flush().await?;
115        #[cfg(feature = "otlp")]
116        {
117            if let Some(tracer_provider) = &self.tracer_provider {
118                tracer_provider.shutdown()?;
119            }
120            if let Some(meter_provider) = &self.meter_provider {
121                meter_provider.shutdown()?;
122            }
123        }
124
125        Ok(())
126    }
127}
128
129#[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
130#[derive(enumset::EnumSetType, Debug)]
131#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
132pub enum Tracer {
133    #[cfg(feature = "otlp")]
134    Otlp,
135    #[cfg(feature = "tracy")]
136    Tracy,
137    #[cfg(feature = "chrome")]
138    ChromeStyle,
139}
140
141#[must_use = "Don't forget to call build() to enable tracing."]
142#[derive(Default)]
143pub struct TracingBuilder {
144    progess_bar: bool,
145
146    #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
147    tracers: EnumSet<Tracer>,
148
149    quiet: bool,
150    verbosity: Option<Level>,
151}
152
153impl TracingBuilder {
154    #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
155    /// Enable the given tracer
156    pub fn enable_tracer(mut self, tracer: Tracer) -> TracingBuilder {
157        self.tracers.insert(tracer);
158        self
159    }
160
161    #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
162    /// Enable the given tracers
163    pub fn enable_tracers<I>(mut self, tracers: I) -> TracingBuilder
164    where
165        I: IntoIterator<Item = Tracer>,
166    {
167        self.tracers.extend(tracers);
168        self
169    }
170
171    /// Enable progress bar layer, default is disabled
172    pub fn enable_progressbar(mut self) -> TracingBuilder {
173        self.progess_bar = true;
174        self
175    }
176
177    /// Supress log and progress bar output to stderr
178    pub fn quiet_output(mut self) -> TracingBuilder {
179        self.quiet = true;
180        self
181    }
182
183    /// Sets the verbosity level, default is INFO
184    pub fn with_max_level(mut self, level: Level) -> TracingBuilder {
185        self.verbosity = Some(level);
186        self
187    }
188
189    /// This will setup tracing based on the configuration passed in.
190    /// It will setup a stderr writer output layer and configure EnvFilter to honor RUST_LOG.
191    /// The EnvFilter will be applied to all configured layers, also otlp.
192    ///
193    /// It will also configure otlp if the feature is enabled and a service_name was provided. It
194    /// will then correctly setup a channel which is later used for flushing the provider.
195    pub fn build(self) -> Result<TracingHandle, Error> {
196        self.build_with_additional(Identity::new())
197    }
198
199    /// Similar to `build()` but allows passing in an additional tracing [`Layer`].
200    ///
201    /// This method is generic over the `Layer` to avoid the runtime cost of dynamic dispatch.
202    /// While it only allows passing a single `Layer`, it can be composed of multiple ones:
203    ///
204    /// ```ignore
205    /// build_with_additional(
206    ///   fmt::layer()
207    ///     .and_then(some_other_layer)
208    ///     .and_then(yet_another_layer)
209    ///     .with_filter(my_filter)
210    /// )
211    /// ```
212    /// [`Layer`]: tracing_subscriber::layer::Layer
213    pub fn build_with_additional<L>(self, additional_layer: L) -> Result<TracingHandle, Error>
214    where
215        L: Layer<Registry> + Send + Sync + 'static,
216    {
217        // Set up the tracing subscriber.
218        let indicatif_layer = IndicatifLayer::new().with_progress_style(PB_SPINNER_STYLE.clone());
219        let stdout_writer = indicatif_layer.get_stdout_writer();
220        let stderr_writer = indicatif_layer.get_stderr_writer();
221
222        let layered = if !self.quiet {
223            Some(
224                tracing_subscriber::fmt::Layer::new()
225                    .fmt_fields(FilteredFormatFields::new(
226                        tracing_subscriber::fmt::format::DefaultFields::new(),
227                        |field| field.name() != "indicatif.pb_show",
228                    ))
229                    .with_writer(indicatif_layer.get_stderr_writer())
230                    .compact()
231                    .and_then((self.progess_bar).then(|| {
232                        indicatif_layer.with_filter(
233                            // only show progress for spans with indicatif.pb_show field being set
234                            IndicatifFilter::new(false),
235                        )
236                    })),
237            )
238        } else {
239            None
240        };
241        #[cfg(feature = "chrome")]
242        let (layered, chrome_guard) = if self.tracers.contains(Tracer::ChromeStyle) {
243            let (chrome_layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
244                .include_args(true)
245                .trace_style(tracing_chrome::TraceStyle::Async)
246                .build();
247            (
248                Layer::and_then(layered, Some(chrome_layer)),
249                Some(std::rc::Rc::new(guard)),
250            )
251        } else {
252            (Layer::and_then(layered, None), None)
253        };
254
255        #[cfg(feature = "otlp")]
256        let mut g_tracer_provider = None;
257        #[cfg(feature = "otlp")]
258        let mut g_meter_provider = None;
259
260        // Setup otlp if a service_name is configured
261        #[cfg(feature = "otlp")]
262        let layered = Layer::and_then(layered, {
263            self.tracers.contains(Tracer::Otlp).then(|| {
264                use opentelemetry::trace::TracerProvider;
265
266                // register a text map propagator for trace propagation
267                opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
268
269                let tracer_provider =
270                    gen_tracer_provider().expect("Unable to configure trace provider");
271
272                let meter_provider =
273                    gen_meter_provider().expect("Unable to configure meter provider");
274
275                // Register the returned meter provider as the global one.
276                // FUTUREWORK: store in the struct and provide getter too?
277                opentelemetry::global::set_meter_provider(meter_provider.clone());
278
279                g_tracer_provider = Some(tracer_provider.clone());
280                g_meter_provider = Some(meter_provider);
281
282                // Create a tracing layer with the configured tracer
283                tracing_opentelemetry::layer().with_tracer(tracer_provider.tracer("snix"))
284            })
285        });
286
287        #[cfg(feature = "tracy")]
288        let layered = Layer::and_then(
289            layered,
290            self.tracers
291                .contains(Tracer::Tracy)
292                .then(TracyLayer::default),
293        );
294
295        let layered = layered.with_filter({
296            let b = EnvFilter::builder()
297                .with_default_directive(LevelFilter::INFO.into())
298                .from_env()
299                .expect("invalid RUST_LOG");
300            if let Some(level) = self.verbosity {
301                b.add_directive(level.into())
302            } else {
303                b
304            }
305        });
306
307        tracing_subscriber::registry()
308            // TODO: if additional_layer has global filters, there is a risk that it will disable the "default" ones,
309            // while it could be solved by registering `additional_layer` last, it requires boxing `additional_layer`.
310            .with(additional_layer)
311            .with(layered)
312            .try_init()?;
313
314        Ok(TracingHandle {
315            stdout_writer,
316            stderr_writer,
317
318            #[cfg(feature = "otlp")]
319            meter_provider: g_meter_provider,
320            #[cfg(feature = "otlp")]
321            tracer_provider: g_tracer_provider,
322            #[cfg(feature = "chrome")]
323            chrome_guard,
324        })
325    }
326
327    #[cfg(feature = "clap")]
328    /// Configure with verbosity flags.
329    pub fn handle_verbosity_flags<L: LogLevel>(mut self, args: &Verbosity<L>) -> Self {
330        if let Some(level) = args.tracing_level() {
331            self = self.with_max_level(level)
332        } else {
333            self = self.quiet_output()
334        }
335
336        self
337    }
338
339    #[cfg(feature = "clap")]
340    /// Configure with the tracing-related args.
341    pub fn handle_tracing_args<L: LogLevel>(
342        #[allow(unused_mut)] mut self,
343        args: &TracingArgs<L>,
344    ) -> Self {
345        #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
346        {
347            self = self.enable_tracers(args.tracers());
348        }
349        self.handle_verbosity_flags(&args.verbosity)
350    }
351}
352
353#[cfg(feature = "otlp")]
354fn gen_resources() -> Resource {
355    // use SdkProvidedResourceDetector.detect to detect resources.
356    Resource::builder()
357        .with_detector(Box::new(SdkProvidedResourceDetector))
358        .build()
359}
360
361/// Returns an OTLP tracer, and the TX part of a channel, which can be used
362/// to request flushes (and signal back the completion of the flush).
363#[cfg(feature = "otlp")]
364fn gen_tracer_provider()
365-> Result<opentelemetry_sdk::trace::SdkTracerProvider, opentelemetry::trace::TraceError> {
366    use opentelemetry_otlp::{ExportConfig, SpanExporter, WithExportConfig};
367
368    let exporter = SpanExporter::builder()
369        .with_tonic()
370        .with_export_config(ExportConfig::default())
371        .build()?;
372
373    let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
374        .with_batch_exporter(exporter)
375        .with_resource(gen_resources())
376        .build();
377    // Unclear how to configure this
378    // let batch_config = BatchConfigBuilder::default()
379    //     // the default values for `max_export_batch_size` is set to 512, which we will fill
380    //     // pretty quickly, which will then result in an export. We want to make sure that
381    //     // the export is only done once the schedule is met and not as soon as 512 spans
382    //     // are collected.
383    //     .with_max_export_batch_size(4096)
384    //     // analog to default config `max_export_batch_size * 4`
385    //     .with_max_queue_size(4096 * 4)
386    //     // only force an export to the otlp collector every 10 seconds to reduce the amount
387    //     // of error messages if an otlp collector is not available
388    //     .with_scheduled_delay(std::time::Duration::from_secs(10))
389    //     .build();
390
391    // use opentelemetry_sdk::trace::BatchSpanProcessor;
392    // let batch_span_processor = BatchSpanProcessor::builder(exporter, runtime::Tokio)
393    //     .with_batch_config(batch_config)
394    //     .build();
395
396    Ok(tracer_provider)
397}
398
399// Metric export interval should be less than or equal to 15s
400// if the metrics may be converted to Prometheus metrics.
401// Prometheus' query engine and compatible implementations
402// require ~4 data points / interval for range queries,
403// so queries ranging over 1m requre <= 15s scrape intervals.
404// OTEL SDKS also respect the env var `OTEL_METRIC_EXPORT_INTERVAL` (no underscore prefix).
405const _OTEL_METRIC_EXPORT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);
406
407#[cfg(feature = "otlp")]
408fn gen_meter_provider()
409-> Result<opentelemetry_sdk::metrics::SdkMeterProvider, opentelemetry_sdk::metrics::MetricError> {
410    use std::time::Duration;
411
412    use opentelemetry_otlp::WithExportConfig;
413    use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
414    let exporter = opentelemetry_otlp::MetricExporter::builder()
415        .with_tonic()
416        .with_timeout(Duration::from_secs(10))
417        .build()?;
418
419    let reader = PeriodicReader::builder(exporter)
420        .with_interval(_OTEL_METRIC_EXPORT_INTERVAL)
421        .build();
422
423    Ok(SdkMeterProvider::builder()
424        .with_reader(reader)
425        .with_resource(gen_resources())
426        .build())
427}
428
429#[cfg(feature = "clap")]
430#[derive(clap::Parser, Clone)]
431pub struct TracingArgs<L: LogLevel = InfoLevel> {
432    #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
433    /// Which tracers to enable.
434    #[arg(long, value_enum, action(clap::ArgAction::Append))]
435    tracer: Vec<Tracer>,
436
437    #[clap(flatten)]
438    verbosity: Verbosity<L>,
439}
440
441#[cfg(feature = "clap")]
442impl<L: LogLevel> TracingArgs<L> {
443    #[cfg(any(feature = "otlp", feature = "tracy", feature = "chrome"))]
444    pub fn tracers(&self) -> EnumSet<Tracer> {
445        self.tracer.iter().cloned().collect()
446    }
447}