zebrad/components/tracing/
component.rs1use std::{
4 fs::{self, File},
5 io::Write,
6};
7
8use abscissa_core::{Component, FrameworkError, Shutdown};
9
10use tokio::sync::watch;
11use tracing::{field::Visit, Level};
12use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};
13use tracing_error::ErrorLayer;
14use tracing_subscriber::{
15 fmt::{format, Formatter},
16 layer::SubscriberExt,
17 reload::Handle,
18 util::SubscriberInitExt,
19 EnvFilter, Layer,
20};
21use zebra_chain::parameters::Network;
22
23use crate::{application::build_version, components::tracing::Config};
24
25#[cfg(feature = "flamegraph")]
26use super::flame;
27
28static ZEBRA_ART: [u8; include_bytes!("zebra.utf8").len()] = *include_bytes!("zebra.utf8");
48
49pub type BoxWrite = Box<dyn Write + Send + Sync + 'static>;
51
52pub struct Tracing {
54 filter_handle: Option<
58 Handle<
59 EnvFilter,
60 Formatter<format::DefaultFields, format::Format<format::Full>, NonBlocking>,
61 >,
62 >,
63
64 initial_filter: String,
66
67 #[cfg(feature = "flamegraph")]
69 flamegrapher: Option<flame::Grapher>,
70
71 #[cfg(feature = "opentelemetry")]
73 otel_provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
74
75 _guard: Option<WorkerGuard>,
80}
81
82impl Tracing {
83 #[allow(clippy::print_stdout, clippy::print_stderr, clippy::unwrap_in_result)]
90 pub fn new(
91 network: &Network,
92 config: Config,
93 uses_intro: bool,
94 ) -> Result<Self, FrameworkError> {
95 let use_color = config.use_color_stdout();
98 let use_color_stderr = config.use_color_stderr();
99
100 let filter = config.filter.clone().unwrap_or_default();
101 let flame_root = &config.flamegraph;
102
103 if uses_intro && !network.is_regtest() {
106 if use_color_stderr {
110 eprint!("\x1B[2J");
112 eprintln!(
113 "{}",
114 std::str::from_utf8(&ZEBRA_ART)
115 .expect("should always work on a UTF-8 encoded constant")
116 );
117 }
118
119 eprintln!(
120 "Thank you for running a {} zebrad {} node!",
121 network.lowercase_name(),
122 build_version()
123 );
124 eprintln!(
125 "You're helping to strengthen the network and contributing to a social good :)"
126 );
127 }
128
129 let writer = if let Some(log_file) = config.log_file.as_ref() {
130 let log_file_dir = log_file.parent();
142 if let Some(log_file_dir) = log_file_dir {
143 if !log_file_dir.exists() {
144 eprintln!("Directory for log file {log_file:?} does not exist, trying to create it...");
145
146 if let Err(create_dir_error) = fs::create_dir_all(log_file_dir) {
147 eprintln!("Failed to create directory for log file: {create_dir_error}");
148 eprintln!("Trying log file anyway...");
149 }
150 }
151 }
152
153 if uses_intro {
154 println!("Sending logs to {log_file:?}...");
156 }
157 let log_file = File::options().append(true).create(true).open(log_file)?;
158 Box::new(log_file) as BoxWrite
159 } else {
160 let stdout = std::io::stdout();
161 Box::new(stdout) as BoxWrite
162 };
163
164 let (non_blocking, worker_guard) = NonBlockingBuilder::default()
168 .buffered_lines_limit(config.buffer_limit.max(100))
169 .finish(writer);
170
171 #[cfg(not(all(feature = "tokio-console", tokio_unstable)))]
176 let (subscriber, filter_handle) = {
177 use tracing_subscriber::FmtSubscriber;
178
179 let logger = FmtSubscriber::builder()
180 .with_ansi(use_color)
181 .with_writer(non_blocking)
182 .with_env_filter(&filter);
183
184 #[cfg(feature = "filter-reload")]
186 let (filter_handle, logger) = {
187 let logger = logger.with_filter_reloading();
188
189 (Some(logger.reload_handle()), logger)
190 };
191
192 #[cfg(not(feature = "filter-reload"))]
193 let filter_handle = None;
194
195 let warn_error_layer = LastWarnErrorLayer {
196 last_warn_error_sender: crate::application::LAST_WARN_ERROR_LOG_SENDER.clone(),
197 };
198 let subscriber = logger
199 .finish()
200 .with(warn_error_layer)
201 .with(ErrorLayer::default());
202
203 (subscriber, filter_handle)
204 };
205
206 #[cfg(all(feature = "tokio-console", tokio_unstable))]
212 let (subscriber, filter_handle) = {
213 use tracing_subscriber::{fmt, Layer};
214
215 let subscriber = tracing_subscriber::registry();
216 let logger = fmt::Layer::new()
223 .with_ansi(use_color)
224 .with_writer(non_blocking)
225 .with_filter(EnvFilter::from(&filter));
226
227 let subscriber = subscriber.with(logger);
228
229 let span_logger = ErrorLayer::default().with_filter(EnvFilter::from(&filter));
230 let subscriber = subscriber.with(span_logger);
231
232 (subscriber, None)
233 };
234
235 #[cfg(feature = "flamegraph")]
239 let (flamelayer, flamegrapher) = if let Some(path) = flame_root {
240 let (flamelayer, flamegrapher) = flame::layer(path);
241
242 (Some(flamelayer), Some(flamegrapher))
243 } else {
244 (None, None)
245 };
246 #[cfg(feature = "flamegraph")]
247 let subscriber = subscriber.with(flamelayer);
248
249 #[cfg(feature = "journald")]
250 let journaldlayer = if config.use_journald {
251 use abscissa_core::FrameworkErrorKind;
252
253 let layer = tracing_journald::layer()
254 .map_err(|e| FrameworkErrorKind::ComponentError.context(e))?;
255
256 #[cfg(all(feature = "tokio-console", tokio_unstable))]
259 let layer = {
260 use tracing_subscriber::Layer;
261 layer.with_filter(EnvFilter::from(&filter))
262 };
263
264 Some(layer)
265 } else {
266 None
267 };
268 #[cfg(feature = "journald")]
269 let subscriber = subscriber.with(journaldlayer);
270
271 #[cfg(feature = "sentry")]
272 let subscriber = subscriber.with(sentry::integrations::tracing::layer());
273
274 #[cfg(feature = "opentelemetry")]
276 let (otel_layer, otel_provider, otel_resolved_config) = {
277 let endpoint = config
279 .opentelemetry_endpoint
280 .clone()
281 .or_else(|| std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").ok());
282 let service_name = config
283 .opentelemetry_service_name
284 .clone()
285 .or_else(|| std::env::var("OTEL_SERVICE_NAME").ok());
286 let sample_percent = config.opentelemetry_sample_percent.or_else(|| {
287 std::env::var("OTEL_TRACES_SAMPLER_ARG")
288 .ok()
289 .and_then(|s| s.parse().ok())
290 });
291
292 let resolved_config = (
294 endpoint.clone(),
295 service_name.clone().unwrap_or_else(|| "zebra".to_string()),
296 sample_percent.unwrap_or(100),
297 );
298
299 match super::otel::layer(endpoint.as_deref(), service_name.as_deref(), sample_percent) {
300 Ok((layer, provider)) => (layer, provider, resolved_config),
301 Err(e) => {
302 tracing::warn!(
303 ?e,
304 "failed to initialize OpenTelemetry, traces will not be exported"
305 );
306 (None, None, resolved_config)
307 }
308 }
309 };
310 #[cfg(feature = "opentelemetry")]
311 let subscriber = subscriber.with(otel_layer);
312
313 #[cfg(all(feature = "tokio-console", tokio_unstable))]
316 let subscriber = subscriber.with(console_subscriber::spawn());
317
318 subscriber.init();
320
321 tracing::info!(
323 ?filter,
324 TRACING_STATIC_MAX_LEVEL = ?tracing::level_filters::STATIC_MAX_LEVEL,
325 LOG_STATIC_MAX_LEVEL = ?log::STATIC_MAX_LEVEL,
326 "started tracing component",
327 );
328
329 if flame_root.is_some() {
330 if cfg!(feature = "flamegraph") {
331 info!(flamegraph = ?flame_root, "installed flamegraph tracing layer");
332 } else {
333 warn!(
334 flamegraph = ?flame_root,
335 "unable to activate configured flamegraph: \
336 enable the 'flamegraph' feature when compiling zebrad",
337 );
338 }
339 }
340
341 if config.use_journald {
342 if cfg!(feature = "journald") {
343 info!("installed journald tracing layer");
344 } else {
345 warn!(
346 "unable to activate configured journald tracing: \
347 enable the 'journald' feature when compiling zebrad",
348 );
349 }
350 }
351
352 #[cfg(feature = "sentry")]
353 info!("installed sentry tracing layer");
354
355 #[cfg(all(feature = "tokio-console", tokio_unstable))]
356 info!(
357 TRACING_STATIC_MAX_LEVEL = ?tracing::level_filters::STATIC_MAX_LEVEL,
358 LOG_STATIC_MAX_LEVEL = ?log::STATIC_MAX_LEVEL,
359 "installed tokio-console tracing layer",
360 );
361
362 #[cfg(feature = "opentelemetry")]
364 if otel_provider.is_some() {
365 let (ref endpoint, ref service_name, sample_percent) = otel_resolved_config;
366 info!(
367 ?endpoint,
368 %service_name,
369 sample_percent,
370 "installed OpenTelemetry tracing layer",
371 );
372 }
373
374 #[cfg(not(feature = "opentelemetry"))]
376 if config.opentelemetry_endpoint.is_some() {
377 warn!(
378 endpoint = ?config.opentelemetry_endpoint,
379 "unable to activate OpenTelemetry tracing: \
380 enable the 'opentelemetry' feature when compiling zebrad",
381 );
382 }
383
384 #[cfg(feature = "progress-bar")]
388 if let Some(progress_bar_config) = config.progress_bar.as_ref() {
389 use howudoin::consumers::TermLine;
390 use std::time::Duration;
391
392 const PROGRESS_BAR_DEBOUNCE: Duration = Duration::from_secs(2);
394
395 let terminal_consumer = TermLine::with_debounce(PROGRESS_BAR_DEBOUNCE);
396 howudoin::init(terminal_consumer);
397
398 info!(?progress_bar_config, "activated progress bars");
399 } else {
400 info!(
401 "set 'tracing.progress_bar =\"summary\"' in zebrad.toml to activate progress bars"
402 );
403 }
404
405 Ok(Self {
406 filter_handle,
407 initial_filter: filter,
408 #[cfg(feature = "flamegraph")]
409 flamegrapher,
410 #[cfg(feature = "opentelemetry")]
411 otel_provider,
412 _guard: Some(worker_guard),
413 })
414 }
415
416 pub fn shutdown(&mut self) {
419 self.filter_handle.take();
420
421 #[cfg(feature = "flamegraph")]
422 self.flamegrapher.take();
423
424 #[cfg(feature = "opentelemetry")]
425 if let Some(provider) = self.otel_provider.take() {
426 if let Err(e) = provider.shutdown() {
427 tracing::warn!(?e, "OpenTelemetry shutdown error");
428 }
429 }
430
431 self._guard.take();
432 }
433
434 pub fn filter(&self) -> String {
436 if let Some(filter_handle) = self.filter_handle.as_ref() {
437 filter_handle
438 .with_current(|filter| filter.to_string())
439 .expect("the subscriber is not dropped before the component is")
440 } else {
441 self.initial_filter.clone()
442 }
443 }
444
445 pub fn reload_filter(&self, filter: impl Into<EnvFilter>) {
449 let filter = filter.into();
450
451 if let Some(filter_handle) = self.filter_handle.as_ref() {
452 tracing::info!(
453 ?filter,
454 TRACING_STATIC_MAX_LEVEL = ?tracing::level_filters::STATIC_MAX_LEVEL,
455 LOG_STATIC_MAX_LEVEL = ?log::STATIC_MAX_LEVEL,
456 "reloading tracing filter",
457 );
458
459 filter_handle
460 .reload(filter)
461 .expect("the subscriber is not dropped before the component is");
462 } else {
463 tracing::warn!(
464 ?filter,
465 TRACING_STATIC_MAX_LEVEL = ?tracing::level_filters::STATIC_MAX_LEVEL,
466 LOG_STATIC_MAX_LEVEL = ?log::STATIC_MAX_LEVEL,
467 "attempted to reload tracing filter, but filter reloading is disabled",
468 );
469 }
470 }
471}
472
473impl std::fmt::Debug for Tracing {
474 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
475 f.debug_struct("Tracing").finish()
476 }
477}
478
479impl<A: abscissa_core::Application> Component<A> for Tracing {
480 fn id(&self) -> abscissa_core::component::Id {
481 abscissa_core::component::Id::new("zebrad::components::tracing::component::Tracing")
482 }
483
484 fn version(&self) -> abscissa_core::Version {
485 build_version()
486 }
487
488 fn before_shutdown(&self, _kind: Shutdown) -> Result<(), FrameworkError> {
489 #[cfg(feature = "flamegraph")]
490 if let Some(ref grapher) = self.flamegrapher {
491 use abscissa_core::FrameworkErrorKind;
492
493 info!("writing flamegraph");
494
495 grapher
496 .write_flamegraph()
497 .map_err(|e| FrameworkErrorKind::ComponentError.context(e))?
498 }
499
500 #[cfg(feature = "progress-bar")]
501 howudoin::disable();
502
503 Ok(())
504 }
505}
506
507impl Drop for Tracing {
508 fn drop(&mut self) {
509 #[cfg(feature = "progress-bar")]
510 howudoin::disable();
511 }
512}
513
514struct MessageVisitor {
516 message: Option<String>,
517}
518
519impl Visit for MessageVisitor {
520 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
521 if field.name() == "message" {
522 self.message = Some(format!("{value:?}"));
523 }
524 }
525}
526
527#[derive(Debug, Clone)]
529struct LastWarnErrorLayer {
530 last_warn_error_sender: watch::Sender<Option<(String, Level, chrono::DateTime<chrono::Utc>)>>,
531}
532
533impl<S> Layer<S> for LastWarnErrorLayer
534where
535 S: tracing::Subscriber + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>,
536{
537 fn on_event(
538 &self,
539 event: &tracing::Event<'_>,
540 _ctx: tracing_subscriber::layer::Context<'_, S>,
541 ) {
542 let level = *event.metadata().level();
543 let timestamp = chrono::Utc::now();
544
545 if level == Level::WARN || level == Level::ERROR {
546 let mut visitor = MessageVisitor { message: None };
547 event.record(&mut visitor);
548
549 if let Some(message) = visitor.message {
550 let _ = self
551 .last_warn_error_sender
552 .send(Some((message, level, timestamp)));
553 }
554 }
555 }
556}