1use std::sync::Arc;
77
78use abscissa_core::{config, Command, FrameworkError};
79use color_eyre::eyre::{eyre, Report};
80use futures::FutureExt;
81use tokio::{pin, select, sync::oneshot};
82use tower::{builder::ServiceBuilder, util::BoxService, ServiceExt};
83use tracing_futures::Instrument;
84
85use zebra_chain::block::genesis::regtest_genesis_block;
86use zebra_consensus::router::BackgroundTaskHandles;
87use zebra_rpc::{methods::RpcImpl, server::RpcServer, SubmitBlockChannel};
88
89use crate::{
90 application::{build_version, user_agent, LAST_WARN_ERROR_LOG_SENDER},
91 components::{
92 health,
93 inbound::{self, InboundSetupData, MAX_INBOUND_RESPONSE_TIME},
94 mempool::{self, Mempool},
95 sync::{self, show_block_chain_progress, VERIFICATION_PIPELINE_SCALING_MULTIPLIER},
96 tokio::{RuntimeRun, TokioComponent},
97 ChainSync, Inbound,
98 },
99 config::ZebradConfig,
100 prelude::*,
101};
102
103#[cfg(feature = "internal-miner")]
104use crate::components;
105
106#[derive(Command, Debug, Default, clap::Parser)]
108pub struct StartCmd {
109 #[clap(help = "tracing filters which override the zebrad.toml config")]
111 filters: Vec<String>,
112}
113
114#[cfg(target_os = "linux")]
119fn check_tcp_slow_start_after_idle() {
120 const PATH: &str = "/proc/sys/net/ipv4/tcp_slow_start_after_idle";
121
122 let raw = match std::fs::read_to_string(PATH) {
123 Ok(raw) => raw,
124 Err(error) => {
125 debug!(
126 ?error,
127 path = PATH,
128 "could not read TCP sysctl, skipping check"
129 );
130 return;
131 }
132 };
133
134 if raw.trim() == "0" {
135 return;
136 }
137
138 warn!(
139 setting = "net.ipv4.tcp_slow_start_after_idle",
140 "TCP slow-start-after-idle is enabled, which resets TCP's congestion window \
141 between block requests and significantly reduces single-peer throughput for \
142 block propagation. \
143 Hint: set `net.ipv4.tcp_slow_start_after_idle=0` via sysctl. \
144 See https://zebra.zfnd.org/user/troubleshooting.html#linux-tcp-tuning-for-block-propagation"
145 );
146}
147
148#[cfg(not(target_os = "linux"))]
149fn check_tcp_slow_start_after_idle() {}
150
151impl StartCmd {
152 async fn start(&self) -> Result<(), Report> {
153 check_tcp_slow_start_after_idle();
154
155 let config = APPLICATION.config();
156 let is_regtest = config.network.network.is_regtest();
157
158 let config = if is_regtest {
159 Arc::new(ZebradConfig {
160 mempool: mempool::Config {
161 debug_enable_at_height: Some(0),
162 ..config.mempool
163 },
164 ..Arc::unwrap_or_clone(config)
165 })
166 } else {
167 config
168 };
169
170 info!("initializing node state");
171 let (_, max_checkpoint_height) = zebra_consensus::router::init_checkpoint_list(
172 config.consensus.clone(),
173 &config.network.network,
174 );
175
176 info!("opening database, this may take a few minutes");
177
178 let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
179 zebra_state::init(
180 config.state.clone(),
181 &config.network.network,
182 max_checkpoint_height,
183 config.sync.checkpoint_verify_concurrency_limit
184 * (VERIFICATION_PIPELINE_SCALING_MULTIPLIER + 1),
185 )
186 .await;
187
188 info!("logging database metrics on startup");
189 read_only_state_service.log_db_metrics();
190
191 let state = ServiceBuilder::new()
192 .buffer(Self::state_buffer_bound())
193 .service(state_service);
194
195 info!("initializing network");
196 let (setup_tx, setup_rx) = oneshot::channel();
207 let inbound = ServiceBuilder::new()
208 .load_shed()
209 .buffer(inbound::downloads::MAX_INBOUND_CONCURRENCY)
210 .timeout(MAX_INBOUND_RESPONSE_TIME)
211 .service(Inbound::new(
212 config.sync.full_verify_concurrency_limit,
213 setup_rx,
214 ));
215
216 let (peer_set, address_book, misbehavior_sender) = zebra_network::init(
217 config.network.clone(),
218 inbound,
219 latest_chain_tip.clone(),
220 user_agent(),
221 )
222 .await;
223
224 info!("initializing verifiers");
227 let (tx_verifier_setup_tx, tx_verifier_setup_rx) = oneshot::channel();
228 let (block_verifier_router, tx_verifier, consensus_task_handles, max_checkpoint_height) =
229 zebra_consensus::router::init(
230 config.consensus.clone(),
231 &config.network.network,
232 state.clone(),
233 tx_verifier_setup_rx,
234 )
235 .await;
236
237 info!("initializing syncer");
238 let (mut syncer, sync_status) = ChainSync::new(
239 &config,
240 max_checkpoint_height,
241 peer_set.clone(),
242 block_verifier_router.clone(),
243 state.clone(),
244 latest_chain_tip.clone(),
245 misbehavior_sender.clone(),
246 );
247
248 info!("initializing mempool");
249 let (mempool, mempool_transaction_subscriber) = Mempool::new(
250 &config.mempool,
251 peer_set.clone(),
252 state.clone(),
253 tx_verifier,
254 sync_status.clone(),
255 latest_chain_tip.clone(),
256 chain_tip_change.clone(),
257 misbehavior_sender.clone(),
258 );
259 let mempool = BoxService::new(mempool);
260 let mempool = ServiceBuilder::new()
261 .buffer(mempool::downloads::MAX_INBOUND_CONCURRENCY)
262 .service(mempool);
263
264 if tx_verifier_setup_tx.send(mempool.clone()).is_err() {
265 warn!("error setting up the transaction verifier with a handle to the mempool service");
266 };
267
268 info!("fully initializing inbound peer request handler");
269 let setup_data = InboundSetupData {
271 address_book: address_book.clone(),
272 block_download_peer_set: peer_set.clone(),
273 block_verifier: block_verifier_router.clone(),
274 mempool: mempool.clone(),
275 state: state.clone(),
276 latest_chain_tip: latest_chain_tip.clone(),
277 misbehavior_sender,
278 };
279 setup_tx
280 .send(setup_data)
281 .map_err(|_| eyre!("could not send setup data to inbound service"))?;
282 tokio::task::yield_now().await;
284
285 let submit_block_channel = SubmitBlockChannel::new();
287
288 let (rpc_impl, mut rpc_tx_queue_handle) = RpcImpl::new(
290 config.network.network.clone(),
291 config.mining.clone(),
292 config.rpc.debug_force_finished_sync,
293 build_version(),
294 user_agent(),
295 mempool.clone(),
296 state.clone(),
297 read_only_state_service.clone(),
298 block_verifier_router.clone(),
299 sync_status.clone(),
300 latest_chain_tip.clone(),
301 address_book.clone(),
302 LAST_WARN_ERROR_LOG_SENDER.subscribe(),
303 Some(submit_block_channel.sender()),
304 );
305
306 let rpc_task_handle = if config.rpc.listen_addr.is_some() {
307 RpcServer::start(rpc_impl.clone(), config.rpc.clone())
308 .await
309 .expect("server should start")
310 } else {
311 tokio::spawn(std::future::pending().in_current_span())
312 };
313
314 let indexer_rpc_task_handle = {
317 if let Some(indexer_listen_addr) = config.rpc.indexer_listen_addr {
318 info!("spawning indexer RPC server");
319 let (indexer_rpc_task_handle, _listen_addr) = zebra_rpc::indexer::server::init(
320 indexer_listen_addr,
321 read_only_state_service.clone(),
322 latest_chain_tip.clone(),
323 mempool_transaction_subscriber.clone(),
324 )
325 .await
326 .map_err(|err| eyre!(err))?;
327
328 indexer_rpc_task_handle
329 } else {
330 warn!("configure an indexer_listen_addr to start the indexer RPC server");
331 tokio::spawn(std::future::pending().in_current_span())
332 }
333 };
334
335 info!("spawning block gossip task");
337 let block_gossip_task_handle = tokio::spawn(
338 sync::gossip_best_tip_block_hashes(
339 sync_status.clone(),
340 chain_tip_change.clone(),
341 peer_set.clone(),
342 Some(submit_block_channel.receiver()),
343 )
344 .in_current_span(),
345 );
346
347 info!("spawning mempool queue checker task");
348 let mempool_queue_checker_task_handle = mempool::QueueChecker::spawn(mempool.clone());
349
350 info!("spawning mempool transaction gossip task");
351 let tx_gossip_task_handle = tokio::spawn(
352 mempool::gossip_mempool_transaction_id(
353 mempool_transaction_subscriber.subscribe(),
354 peer_set.clone(),
355 )
356 .in_current_span(),
357 );
358
359 info!("spawning delete old databases task");
360 let mut old_databases_task_handle = zebra_state::check_and_delete_old_state_databases(
361 &config.state,
362 &config.network.network,
363 );
364
365 info!("spawning progress logging task");
366 let (chain_tip_metrics_sender, chain_tip_metrics_receiver) =
367 health::ChainTipMetrics::channel();
368 let progress_task_handle = tokio::spawn(
369 show_block_chain_progress(
370 config.network.network.clone(),
371 latest_chain_tip.clone(),
372 sync_status.clone(),
373 chain_tip_metrics_sender,
374 )
375 .in_current_span(),
376 );
377
378 info!("initializing health endpoints");
380 let (health_task_handle, _) = health::init(
381 config.health.clone(),
382 config.network.network.clone(),
383 chain_tip_metrics_receiver,
384 sync_status.clone(),
385 address_book.clone(),
386 )
387 .await;
388
389 info!("spawning end of support checking task");
391 let end_of_support_task_handle = tokio::spawn(
392 sync::end_of_support::start(config.network.network.clone(), latest_chain_tip.clone())
393 .in_current_span(),
394 );
395
396 tokio::task::yield_now().await;
400
401 info!("spawning mempool crawler task");
403 let mempool_crawler_task_handle = mempool::Crawler::spawn(
404 &config.mempool,
405 peer_set,
406 mempool.clone(),
407 sync_status.clone(),
408 chain_tip_change.clone(),
409 );
410
411 info!("spawning syncer task");
412 if is_regtest
418 && !syncer
419 .state_contains(config.network.network.genesis_hash())
420 .await?
421 {
422 let genesis_hash = block_verifier_router
423 .clone()
424 .oneshot(zebra_consensus::Request::Commit(regtest_genesis_block()))
425 .await
426 .expect("should validate Regtest genesis block");
427
428 assert_eq!(
429 genesis_hash,
430 config.network.network.genesis_hash(),
431 "validated block hash should match network genesis hash"
432 )
433 }
434 let syncer_task_handle = tokio::spawn(syncer.sync().in_current_span());
435
436 #[cfg(feature = "internal-miner")]
440 let miner_task_handle = if config.mining.is_internal_miner_enabled() {
441 info!("spawning Zcash miner");
442 components::miner::spawn_init(&config.metrics, rpc_impl)
443 } else {
444 tokio::spawn(std::future::pending().in_current_span())
445 };
446
447 #[cfg(not(feature = "internal-miner"))]
448 let miner_task_handle: tokio::task::JoinHandle<Result<(), Report>> =
450 tokio::spawn(std::future::pending().in_current_span());
451
452 info!("spawned initial Zebra tasks");
453
454 pin!(rpc_task_handle);
458 pin!(indexer_rpc_task_handle);
459 pin!(syncer_task_handle);
460 pin!(block_gossip_task_handle);
461 pin!(mempool_crawler_task_handle);
462 pin!(mempool_queue_checker_task_handle);
463 pin!(tx_gossip_task_handle);
464 pin!(progress_task_handle);
465 pin!(end_of_support_task_handle);
466 pin!(miner_task_handle);
467
468 let BackgroundTaskHandles {
470 mut state_checkpoint_verify_handle,
471 } = consensus_task_handles;
472
473 let state_checkpoint_verify_handle_fused = (&mut state_checkpoint_verify_handle).fuse();
474 pin!(state_checkpoint_verify_handle_fused);
475
476 let old_databases_task_handle_fused = (&mut old_databases_task_handle).fuse();
477 pin!(old_databases_task_handle_fused);
478
479 let exit_status = loop {
481 let mut exit_when_task_finishes = true;
482
483 let result = select! {
484 rpc_join_result = &mut rpc_task_handle => {
485 let rpc_server_result = rpc_join_result
486 .expect("unexpected panic in the rpc task");
487 info!(?rpc_server_result, "rpc task exited");
488 Ok(())
489 }
490
491 rpc_tx_queue_result = &mut rpc_tx_queue_handle => {
492 rpc_tx_queue_result
493 .expect("unexpected panic in the rpc transaction queue task");
494 info!("rpc transaction queue task exited");
495 Ok(())
496 }
497
498 indexer_rpc_join_result = &mut indexer_rpc_task_handle => {
499 let indexer_rpc_server_result = indexer_rpc_join_result
500 .expect("unexpected panic in the indexer task");
501 info!(?indexer_rpc_server_result, "indexer rpc task exited");
502 Ok(())
503 }
504
505 sync_result = &mut syncer_task_handle => sync_result
506 .expect("unexpected panic in the syncer task")
507 .map(|_| info!("syncer task exited")),
508
509 block_gossip_result = &mut block_gossip_task_handle => block_gossip_result
510 .expect("unexpected panic in the chain tip block gossip task")
511 .map(|_| info!("chain tip block gossip task exited"))
512 .map_err(|e| eyre!(e)),
513
514 mempool_crawl_result = &mut mempool_crawler_task_handle => mempool_crawl_result
515 .expect("unexpected panic in the mempool crawler")
516 .map(|_| info!("mempool crawler task exited"))
517 .map_err(|e| eyre!(e)),
518
519 mempool_queue_result = &mut mempool_queue_checker_task_handle => mempool_queue_result
520 .expect("unexpected panic in the mempool queue checker")
521 .map(|_| info!("mempool queue checker task exited"))
522 .map_err(|e| eyre!(e)),
523
524 tx_gossip_result = &mut tx_gossip_task_handle => tx_gossip_result
525 .expect("unexpected panic in the transaction gossip task")
526 .map(|_| info!("transaction gossip task exited"))
527 .map_err(|e| eyre!(e)),
528
529 progress_result = &mut progress_task_handle => {
532 info!("chain progress task exited");
533 progress_result
534 .expect("unexpected panic in the chain progress task");
535 }
536
537 end_of_support_result = &mut end_of_support_task_handle => end_of_support_result
538 .expect("unexpected panic in the end of support task")
539 .map(|_| info!("end of support task exited")),
540
541 state_checkpoint_verify_result = &mut state_checkpoint_verify_handle_fused => {
543 state_checkpoint_verify_result
544 .unwrap_or_else(|_| panic!(
545 "unexpected panic checking previous state followed the best chain"));
546
547 exit_when_task_finishes = false;
548 Ok(())
549 }
550
551 old_databases_result = &mut old_databases_task_handle_fused => {
553 old_databases_result
554 .unwrap_or_else(|_| panic!(
555 "unexpected panic deleting old database directories"));
556
557 exit_when_task_finishes = false;
558 Ok(())
559 }
560
561 miner_result = &mut miner_task_handle => miner_result
562 .expect("unexpected panic in the miner task")
563 .map(|_| info!("miner task exited")),
564 };
565
566 if let Err(err) = result {
569 break Err(err);
570 }
571
572 if exit_when_task_finishes {
573 break Ok(());
574 }
575 };
576
577 info!("exiting Zebra because an ongoing task exited: asking other tasks to stop");
578
579 rpc_task_handle.abort();
581 rpc_tx_queue_handle.abort();
582 health_task_handle.abort();
583 syncer_task_handle.abort();
584 block_gossip_task_handle.abort();
585 mempool_crawler_task_handle.abort();
586 mempool_queue_checker_task_handle.abort();
587 tx_gossip_task_handle.abort();
588 progress_task_handle.abort();
589 end_of_support_task_handle.abort();
590 miner_task_handle.abort();
591
592 state_checkpoint_verify_handle.abort();
594 old_databases_task_handle.abort();
595
596 info!(
597 "exiting Zebra: all tasks have been asked to stop, waiting for remaining tasks to finish"
598 );
599
600 exit_status
601 }
602
603 fn state_buffer_bound() -> usize {
606 let config = APPLICATION.config();
607
608 [
613 config.sync.download_concurrency_limit,
614 config.sync.full_verify_concurrency_limit,
615 inbound::downloads::MAX_INBOUND_CONCURRENCY,
616 mempool::downloads::MAX_INBOUND_CONCURRENCY,
617 ]
618 .into_iter()
619 .max()
620 .unwrap()
621 }
622}
623
624impl Runnable for StartCmd {
625 fn run(&self) {
627 info!("Starting zebrad");
628 let rt = APPLICATION
629 .state()
630 .components_mut()
631 .get_downcast_mut::<TokioComponent>()
632 .expect("TokioComponent should be available")
633 .rt
634 .take();
635
636 rt.expect("runtime should not already be taken")
637 .run(self.start());
638
639 info!("stopping zebrad");
640 }
641}
642
643impl config::Override<ZebradConfig> for StartCmd {
644 fn override_config(&self, mut config: ZebradConfig) -> Result<ZebradConfig, FrameworkError> {
648 if !self.filters.is_empty() {
649 config.tracing.filter = Some(self.filters.join(","));
650 }
651
652 Ok(config)
653 }
654}