1use std::sync::Arc;
77
78use abscissa_core::{config, Command, FrameworkError};
79use color_eyre::eyre::{eyre, Report};
80use futures::FutureExt;
81use tokio::{pin, select, sync::oneshot};
82use tower::{builder::ServiceBuilder, util::BoxService, ServiceExt};
83use tracing_futures::Instrument;
84
85use zebra_chain::block::genesis::regtest_genesis_block;
86use zebra_consensus::router::BackgroundTaskHandles;
87use zebra_rpc::{methods::RpcImpl, server::RpcServer, SubmitBlockChannel};
88
89use crate::{
90 application::{build_version, user_agent, LAST_WARN_ERROR_LOG_SENDER},
91 components::{
92 health,
93 inbound::{self, InboundSetupData, MAX_INBOUND_RESPONSE_TIME},
94 mempool::{self, Mempool},
95 sync::{self, show_block_chain_progress, VERIFICATION_PIPELINE_SCALING_MULTIPLIER},
96 tokio::{RuntimeRun, TokioComponent},
97 ChainSync, Inbound,
98 },
99 config::ZebradConfig,
100 prelude::*,
101};
102
103#[cfg(feature = "internal-miner")]
104use crate::components;
105
106#[derive(Command, Debug, Default, clap::Parser)]
108pub struct StartCmd {
109 #[clap(help = "tracing filters which override the zebrad.toml config")]
111 filters: Vec<String>,
112}
113
114impl StartCmd {
115 async fn start(&self) -> Result<(), Report> {
116 let config = APPLICATION.config();
117 let is_regtest = config.network.network.is_regtest();
118
119 let config = if is_regtest {
120 Arc::new(ZebradConfig {
121 mempool: mempool::Config {
122 debug_enable_at_height: Some(0),
123 ..config.mempool
124 },
125 ..Arc::unwrap_or_clone(config)
126 })
127 } else {
128 config
129 };
130
131 info!("initializing node state");
132 let (_, max_checkpoint_height) = zebra_consensus::router::init_checkpoint_list(
133 config.consensus.clone(),
134 &config.network.network,
135 );
136
137 info!("opening database, this may take a few minutes");
138
139 let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
140 zebra_state::init(
141 config.state.clone(),
142 &config.network.network,
143 max_checkpoint_height,
144 config.sync.checkpoint_verify_concurrency_limit
145 * (VERIFICATION_PIPELINE_SCALING_MULTIPLIER + 1),
146 )
147 .await;
148
149 info!("logging database metrics on startup");
150 read_only_state_service.log_db_metrics();
151
152 let state = ServiceBuilder::new()
153 .buffer(Self::state_buffer_bound())
154 .service(state_service);
155
156 info!("initializing network");
157 let (setup_tx, setup_rx) = oneshot::channel();
168 let inbound = ServiceBuilder::new()
169 .load_shed()
170 .buffer(inbound::downloads::MAX_INBOUND_CONCURRENCY)
171 .timeout(MAX_INBOUND_RESPONSE_TIME)
172 .service(Inbound::new(
173 config.sync.full_verify_concurrency_limit,
174 setup_rx,
175 ));
176
177 let (peer_set, address_book, misbehavior_sender) = zebra_network::init(
178 config.network.clone(),
179 inbound,
180 latest_chain_tip.clone(),
181 user_agent(),
182 )
183 .await;
184
185 info!("initializing verifiers");
188 let (tx_verifier_setup_tx, tx_verifier_setup_rx) = oneshot::channel();
189 let (block_verifier_router, tx_verifier, consensus_task_handles, max_checkpoint_height) =
190 zebra_consensus::router::init(
191 config.consensus.clone(),
192 &config.network.network,
193 state.clone(),
194 tx_verifier_setup_rx,
195 )
196 .await;
197
198 info!("initializing syncer");
199 let (mut syncer, sync_status) = ChainSync::new(
200 &config,
201 max_checkpoint_height,
202 peer_set.clone(),
203 block_verifier_router.clone(),
204 state.clone(),
205 latest_chain_tip.clone(),
206 misbehavior_sender.clone(),
207 );
208
209 info!("initializing mempool");
210 let (mempool, mempool_transaction_subscriber) = Mempool::new(
211 &config.mempool,
212 peer_set.clone(),
213 state.clone(),
214 tx_verifier,
215 sync_status.clone(),
216 latest_chain_tip.clone(),
217 chain_tip_change.clone(),
218 misbehavior_sender.clone(),
219 );
220 let mempool = BoxService::new(mempool);
221 let mempool = ServiceBuilder::new()
222 .buffer(mempool::downloads::MAX_INBOUND_CONCURRENCY)
223 .service(mempool);
224
225 if tx_verifier_setup_tx.send(mempool.clone()).is_err() {
226 warn!("error setting up the transaction verifier with a handle to the mempool service");
227 };
228
229 info!("fully initializing inbound peer request handler");
230 let setup_data = InboundSetupData {
232 address_book: address_book.clone(),
233 block_download_peer_set: peer_set.clone(),
234 block_verifier: block_verifier_router.clone(),
235 mempool: mempool.clone(),
236 state: state.clone(),
237 latest_chain_tip: latest_chain_tip.clone(),
238 misbehavior_sender,
239 };
240 setup_tx
241 .send(setup_data)
242 .map_err(|_| eyre!("could not send setup data to inbound service"))?;
243 tokio::task::yield_now().await;
245
246 let submit_block_channel = SubmitBlockChannel::new();
248
249 let (rpc_impl, mut rpc_tx_queue_handle) = RpcImpl::new(
251 config.network.network.clone(),
252 config.mining.clone(),
253 config.rpc.debug_force_finished_sync,
254 build_version(),
255 user_agent(),
256 mempool.clone(),
257 state.clone(),
258 read_only_state_service.clone(),
259 block_verifier_router.clone(),
260 sync_status.clone(),
261 latest_chain_tip.clone(),
262 address_book.clone(),
263 LAST_WARN_ERROR_LOG_SENDER.subscribe(),
264 Some(submit_block_channel.sender()),
265 );
266
267 let rpc_task_handle = if config.rpc.listen_addr.is_some() {
268 RpcServer::start(rpc_impl.clone(), config.rpc.clone())
269 .await
270 .expect("server should start")
271 } else {
272 tokio::spawn(std::future::pending().in_current_span())
273 };
274
275 let indexer_rpc_task_handle = {
278 if let Some(indexer_listen_addr) = config.rpc.indexer_listen_addr {
279 info!("spawning indexer RPC server");
280 let (indexer_rpc_task_handle, _listen_addr) = zebra_rpc::indexer::server::init(
281 indexer_listen_addr,
282 read_only_state_service.clone(),
283 latest_chain_tip.clone(),
284 mempool_transaction_subscriber.clone(),
285 )
286 .await
287 .map_err(|err| eyre!(err))?;
288
289 indexer_rpc_task_handle
290 } else {
291 warn!("configure an indexer_listen_addr to start the indexer RPC server");
292 tokio::spawn(std::future::pending().in_current_span())
293 }
294 };
295
296 info!("spawning block gossip task");
298 let block_gossip_task_handle = tokio::spawn(
299 sync::gossip_best_tip_block_hashes(
300 sync_status.clone(),
301 chain_tip_change.clone(),
302 peer_set.clone(),
303 Some(submit_block_channel.receiver()),
304 )
305 .in_current_span(),
306 );
307
308 info!("spawning mempool queue checker task");
309 let mempool_queue_checker_task_handle = mempool::QueueChecker::spawn(mempool.clone());
310
311 info!("spawning mempool transaction gossip task");
312 let tx_gossip_task_handle = tokio::spawn(
313 mempool::gossip_mempool_transaction_id(
314 mempool_transaction_subscriber.subscribe(),
315 peer_set.clone(),
316 )
317 .in_current_span(),
318 );
319
320 info!("spawning delete old databases task");
321 let mut old_databases_task_handle = zebra_state::check_and_delete_old_state_databases(
322 &config.state,
323 &config.network.network,
324 );
325
326 info!("spawning progress logging task");
327 let (chain_tip_metrics_sender, chain_tip_metrics_receiver) =
328 health::ChainTipMetrics::channel();
329 let progress_task_handle = tokio::spawn(
330 show_block_chain_progress(
331 config.network.network.clone(),
332 latest_chain_tip.clone(),
333 sync_status.clone(),
334 chain_tip_metrics_sender,
335 )
336 .in_current_span(),
337 );
338
339 info!("initializing health endpoints");
341 let (health_task_handle, _) = health::init(
342 config.health.clone(),
343 config.network.network.clone(),
344 chain_tip_metrics_receiver,
345 sync_status.clone(),
346 address_book.clone(),
347 )
348 .await;
349
350 info!("spawning end of support checking task");
352 let end_of_support_task_handle = tokio::spawn(
353 sync::end_of_support::start(config.network.network.clone(), latest_chain_tip.clone())
354 .in_current_span(),
355 );
356
357 tokio::task::yield_now().await;
361
362 info!("spawning mempool crawler task");
364 let mempool_crawler_task_handle = mempool::Crawler::spawn(
365 &config.mempool,
366 peer_set,
367 mempool.clone(),
368 sync_status.clone(),
369 chain_tip_change.clone(),
370 );
371
372 info!("spawning syncer task");
373 let syncer_task_handle = if is_regtest {
374 if !syncer
375 .state_contains(config.network.network.genesis_hash())
376 .await?
377 {
378 let genesis_hash = block_verifier_router
379 .clone()
380 .oneshot(zebra_consensus::Request::Commit(regtest_genesis_block()))
381 .await
382 .expect("should validate Regtest genesis block");
383
384 assert_eq!(
385 genesis_hash,
386 config.network.network.genesis_hash(),
387 "validated block hash should match network genesis hash"
388 )
389 }
390
391 tokio::spawn(std::future::pending().in_current_span())
392 } else {
393 tokio::spawn(syncer.sync().in_current_span())
394 };
395
396 #[cfg(feature = "internal-miner")]
400 let miner_task_handle = if config.mining.is_internal_miner_enabled() {
401 info!("spawning Zcash miner");
402 components::miner::spawn_init(&config.metrics, rpc_impl)
403 } else {
404 tokio::spawn(std::future::pending().in_current_span())
405 };
406
407 #[cfg(not(feature = "internal-miner"))]
408 let miner_task_handle: tokio::task::JoinHandle<Result<(), Report>> =
410 tokio::spawn(std::future::pending().in_current_span());
411
412 info!("spawned initial Zebra tasks");
413
414 pin!(rpc_task_handle);
418 pin!(indexer_rpc_task_handle);
419 pin!(syncer_task_handle);
420 pin!(block_gossip_task_handle);
421 pin!(mempool_crawler_task_handle);
422 pin!(mempool_queue_checker_task_handle);
423 pin!(tx_gossip_task_handle);
424 pin!(progress_task_handle);
425 pin!(end_of_support_task_handle);
426 pin!(miner_task_handle);
427
428 let BackgroundTaskHandles {
430 mut state_checkpoint_verify_handle,
431 } = consensus_task_handles;
432
433 let state_checkpoint_verify_handle_fused = (&mut state_checkpoint_verify_handle).fuse();
434 pin!(state_checkpoint_verify_handle_fused);
435
436 let old_databases_task_handle_fused = (&mut old_databases_task_handle).fuse();
437 pin!(old_databases_task_handle_fused);
438
439 let exit_status = loop {
441 let mut exit_when_task_finishes = true;
442
443 let result = select! {
444 rpc_join_result = &mut rpc_task_handle => {
445 let rpc_server_result = rpc_join_result
446 .expect("unexpected panic in the rpc task");
447 info!(?rpc_server_result, "rpc task exited");
448 Ok(())
449 }
450
451 rpc_tx_queue_result = &mut rpc_tx_queue_handle => {
452 rpc_tx_queue_result
453 .expect("unexpected panic in the rpc transaction queue task");
454 info!("rpc transaction queue task exited");
455 Ok(())
456 }
457
458 indexer_rpc_join_result = &mut indexer_rpc_task_handle => {
459 let indexer_rpc_server_result = indexer_rpc_join_result
460 .expect("unexpected panic in the indexer task");
461 info!(?indexer_rpc_server_result, "indexer rpc task exited");
462 Ok(())
463 }
464
465 sync_result = &mut syncer_task_handle => sync_result
466 .expect("unexpected panic in the syncer task")
467 .map(|_| info!("syncer task exited")),
468
469 block_gossip_result = &mut block_gossip_task_handle => block_gossip_result
470 .expect("unexpected panic in the chain tip block gossip task")
471 .map(|_| info!("chain tip block gossip task exited"))
472 .map_err(|e| eyre!(e)),
473
474 mempool_crawl_result = &mut mempool_crawler_task_handle => mempool_crawl_result
475 .expect("unexpected panic in the mempool crawler")
476 .map(|_| info!("mempool crawler task exited"))
477 .map_err(|e| eyre!(e)),
478
479 mempool_queue_result = &mut mempool_queue_checker_task_handle => mempool_queue_result
480 .expect("unexpected panic in the mempool queue checker")
481 .map(|_| info!("mempool queue checker task exited"))
482 .map_err(|e| eyre!(e)),
483
484 tx_gossip_result = &mut tx_gossip_task_handle => tx_gossip_result
485 .expect("unexpected panic in the transaction gossip task")
486 .map(|_| info!("transaction gossip task exited"))
487 .map_err(|e| eyre!(e)),
488
489 progress_result = &mut progress_task_handle => {
492 info!("chain progress task exited");
493 progress_result
494 .expect("unexpected panic in the chain progress task");
495 }
496
497 end_of_support_result = &mut end_of_support_task_handle => end_of_support_result
498 .expect("unexpected panic in the end of support task")
499 .map(|_| info!("end of support task exited")),
500
501 state_checkpoint_verify_result = &mut state_checkpoint_verify_handle_fused => {
503 state_checkpoint_verify_result
504 .unwrap_or_else(|_| panic!(
505 "unexpected panic checking previous state followed the best chain"));
506
507 exit_when_task_finishes = false;
508 Ok(())
509 }
510
511 old_databases_result = &mut old_databases_task_handle_fused => {
513 old_databases_result
514 .unwrap_or_else(|_| panic!(
515 "unexpected panic deleting old database directories"));
516
517 exit_when_task_finishes = false;
518 Ok(())
519 }
520
521 miner_result = &mut miner_task_handle => miner_result
522 .expect("unexpected panic in the miner task")
523 .map(|_| info!("miner task exited")),
524 };
525
526 if let Err(err) = result {
529 break Err(err);
530 }
531
532 if exit_when_task_finishes {
533 break Ok(());
534 }
535 };
536
537 info!("exiting Zebra because an ongoing task exited: asking other tasks to stop");
538
539 rpc_task_handle.abort();
541 rpc_tx_queue_handle.abort();
542 health_task_handle.abort();
543 syncer_task_handle.abort();
544 block_gossip_task_handle.abort();
545 mempool_crawler_task_handle.abort();
546 mempool_queue_checker_task_handle.abort();
547 tx_gossip_task_handle.abort();
548 progress_task_handle.abort();
549 end_of_support_task_handle.abort();
550 miner_task_handle.abort();
551
552 state_checkpoint_verify_handle.abort();
554 old_databases_task_handle.abort();
555
556 info!(
557 "exiting Zebra: all tasks have been asked to stop, waiting for remaining tasks to finish"
558 );
559
560 exit_status
561 }
562
563 fn state_buffer_bound() -> usize {
566 let config = APPLICATION.config();
567
568 [
573 config.sync.download_concurrency_limit,
574 config.sync.full_verify_concurrency_limit,
575 inbound::downloads::MAX_INBOUND_CONCURRENCY,
576 mempool::downloads::MAX_INBOUND_CONCURRENCY,
577 ]
578 .into_iter()
579 .max()
580 .unwrap()
581 }
582}
583
584impl Runnable for StartCmd {
585 fn run(&self) {
587 info!("Starting zebrad");
588 let rt = APPLICATION
589 .state()
590 .components_mut()
591 .get_downcast_mut::<TokioComponent>()
592 .expect("TokioComponent should be available")
593 .rt
594 .take();
595
596 rt.expect("runtime should not already be taken")
597 .run(self.start());
598
599 info!("stopping zebrad");
600 }
601}
602
603impl config::Override<ZebradConfig> for StartCmd {
604 fn override_config(&self, mut config: ZebradConfig) -> Result<ZebradConfig, FrameworkError> {
608 if !self.filters.is_empty() {
609 config.tracing.filter = Some(self.filters.join(","));
610 }
611
612 Ok(config)
613 }
614}