Skip to main content

zebrad/commands/
start.rs

1//! `start` subcommand - entry point for starting a zebra node
2//!
3//! ## Application Structure
4//!
5//! A zebra node consists of the following major services and tasks:
6//!
7//! Peers:
8//!  * Peer Connection Pool Service
9//!    * primary external interface for outbound requests from this node to remote peers
10//!    * accepts requests from services and tasks in this node, and sends them to remote peers
11//!  * Peer Discovery Service
12//!    * maintains a list of peer addresses, and connection priority metadata
13//!    * discovers new peer addresses from existing peer connections
14//!    * initiates new outbound peer connections in response to demand from tasks within this node
15//!  * Peer Cache Service
16//!    * Reads previous peer cache on startup, and adds it to the configured DNS seed peers
17//!    * Periodically updates the peer cache on disk from the latest address book state
18//!
19//! Blocks & Mempool Transactions:
20//!  * Consensus Service
21//!    * handles all validation logic for the node
22//!    * verifies blocks using zebra-chain, then stores verified blocks in zebra-state
23//!    * verifies mempool and block transactions using zebra-chain and zebra-script,
24//!      and returns verified mempool transactions for mempool storage
25//!  * Inbound Service
26//!    * primary external interface for inbound peer requests to this node
27//!    * handles requests from peers for network data, chain data, and mempool transactions
28//!    * spawns download and verify tasks for each gossiped block
29//!    * sends gossiped transactions to the mempool service
30//!
31//! Blocks:
32//!  * Sync Task
33//!    * runs in the background and continuously queries the network for
34//!      new blocks to be verified and added to the local state
35//!    * spawns download and verify tasks for each crawled block
36//!  * State Service
37//!    * contextually verifies blocks
38//!    * handles in-memory storage of multiple non-finalized chains
39//!    * handles permanent storage of the best finalized chain
40//!  * Old State Version Cleanup Task
41//!    * deletes outdated state versions
42//!  * Block Gossip Task
43//!    * runs in the background and continuously queries the state for
44//!      newly committed blocks to be gossiped to peers
45//!  * Progress Task
46//!    * logs progress towards the chain tip
47//!
48//! Block Mining:
49//!  * Internal Miner Task
50//!    * if the user has configured Zebra to mine blocks, spawns tasks to generate new blocks,
51//!      and submits them for verification. This automatically shares these new blocks with peers.
52//!
53//! Mempool Transactions:
54//!  * Mempool Service
55//!    * activates when the syncer is near the chain tip
56//!    * spawns download and verify tasks for each crawled or gossiped transaction
57//!    * handles in-memory storage of unmined transactions
58//!  * Queue Checker Task
59//!    * runs in the background, polling the mempool to store newly verified transactions
60//!  * Transaction Gossip Task
61//!    * runs in the background and gossips newly added mempool transactions
62//!      to peers
63//!
64//! Remote Procedure Calls:
65//!  * JSON-RPC Service
66//!    * answers RPC client requests using the State Service and Mempool Service
67//!    * submits client transactions to the node's mempool
68//!
69//! Zebra also has diagnostic support:
70//! * [metrics](https://github.com/ZcashFoundation/zebra/blob/main/book/src/user/metrics.md)
71//! * [tracing](https://github.com/ZcashFoundation/zebra/blob/main/book/src/user/tracing.md)
72//! * [progress-bar](https://docs.rs/howudoin/0.1.1/howudoin)
73//!
74//! Some of the diagnostic features are optional, and need to be enabled at compile-time.
75
76use std::sync::Arc;
77
78use abscissa_core::{config, Command, FrameworkError};
79use color_eyre::eyre::{eyre, Report};
80use futures::FutureExt;
81use tokio::{pin, select, sync::oneshot};
82use tower::{builder::ServiceBuilder, util::BoxService, ServiceExt};
83use tracing_futures::Instrument;
84
85use zebra_chain::block::genesis::regtest_genesis_block;
86use zebra_consensus::router::BackgroundTaskHandles;
87use zebra_rpc::{methods::RpcImpl, server::RpcServer, SubmitBlockChannel};
88
89use crate::{
90    application::{build_version, user_agent, LAST_WARN_ERROR_LOG_SENDER},
91    components::{
92        health,
93        inbound::{self, InboundSetupData, MAX_INBOUND_RESPONSE_TIME},
94        mempool::{self, Mempool},
95        sync::{self, show_block_chain_progress, VERIFICATION_PIPELINE_SCALING_MULTIPLIER},
96        tokio::{RuntimeRun, TokioComponent},
97        ChainSync, Inbound,
98    },
99    config::ZebradConfig,
100    prelude::*,
101};
102
103#[cfg(feature = "internal-miner")]
104use crate::components;
105
106/// Start the application (default command)
107#[derive(Command, Debug, Default, clap::Parser)]
108pub struct StartCmd {
109    /// Filter strings which override the config file and defaults
110    #[clap(help = "tracing filters which override the zebrad.toml config")]
111    filters: Vec<String>,
112}
113
114impl StartCmd {
115    async fn start(&self) -> Result<(), Report> {
116        let config = APPLICATION.config();
117        let is_regtest = config.network.network.is_regtest();
118
119        let config = if is_regtest {
120            Arc::new(ZebradConfig {
121                mempool: mempool::Config {
122                    debug_enable_at_height: Some(0),
123                    ..config.mempool
124                },
125                ..Arc::unwrap_or_clone(config)
126            })
127        } else {
128            config
129        };
130
131        info!("initializing node state");
132        let (_, max_checkpoint_height) = zebra_consensus::router::init_checkpoint_list(
133            config.consensus.clone(),
134            &config.network.network,
135        );
136
137        info!("opening database, this may take a few minutes");
138
139        let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
140            zebra_state::init(
141                config.state.clone(),
142                &config.network.network,
143                max_checkpoint_height,
144                config.sync.checkpoint_verify_concurrency_limit
145                    * (VERIFICATION_PIPELINE_SCALING_MULTIPLIER + 1),
146            )
147            .await;
148
149        info!("logging database metrics on startup");
150        read_only_state_service.log_db_metrics();
151
152        let state = ServiceBuilder::new()
153            .buffer(Self::state_buffer_bound())
154            .service(state_service);
155
156        info!("initializing network");
157        // The service that our node uses to respond to requests by peers. The
158        // load_shed middleware ensures that we reduce the size of the peer set
159        // in response to excess load.
160        //
161        // # Security
162        //
163        // This layer stack is security-sensitive, modifying it can cause hangs,
164        // or enable denial of service attacks.
165        //
166        // See `zebra_network::Connection::drive_peer_request()` for details.
167        let (setup_tx, setup_rx) = oneshot::channel();
168        let inbound = ServiceBuilder::new()
169            .load_shed()
170            .buffer(inbound::downloads::MAX_INBOUND_CONCURRENCY)
171            .timeout(MAX_INBOUND_RESPONSE_TIME)
172            .service(Inbound::new(
173                config.sync.full_verify_concurrency_limit,
174                setup_rx,
175            ));
176
177        let (peer_set, address_book, misbehavior_sender) = zebra_network::init(
178            config.network.clone(),
179            inbound,
180            latest_chain_tip.clone(),
181            user_agent(),
182        )
183        .await;
184
185        // Start health server if configured (after sync_status is available)
186
187        info!("initializing verifiers");
188        let (tx_verifier_setup_tx, tx_verifier_setup_rx) = oneshot::channel();
189        let (block_verifier_router, tx_verifier, consensus_task_handles, max_checkpoint_height) =
190            zebra_consensus::router::init(
191                config.consensus.clone(),
192                &config.network.network,
193                state.clone(),
194                tx_verifier_setup_rx,
195            )
196            .await;
197
198        info!("initializing syncer");
199        let (mut syncer, sync_status) = ChainSync::new(
200            &config,
201            max_checkpoint_height,
202            peer_set.clone(),
203            block_verifier_router.clone(),
204            state.clone(),
205            latest_chain_tip.clone(),
206            misbehavior_sender.clone(),
207        );
208
209        info!("initializing mempool");
210        let (mempool, mempool_transaction_subscriber) = Mempool::new(
211            &config.mempool,
212            peer_set.clone(),
213            state.clone(),
214            tx_verifier,
215            sync_status.clone(),
216            latest_chain_tip.clone(),
217            chain_tip_change.clone(),
218            misbehavior_sender.clone(),
219        );
220        let mempool = BoxService::new(mempool);
221        let mempool = ServiceBuilder::new()
222            .buffer(mempool::downloads::MAX_INBOUND_CONCURRENCY)
223            .service(mempool);
224
225        if tx_verifier_setup_tx.send(mempool.clone()).is_err() {
226            warn!("error setting up the transaction verifier with a handle to the mempool service");
227        };
228
229        info!("fully initializing inbound peer request handler");
230        // Fully start the inbound service as soon as possible
231        let setup_data = InboundSetupData {
232            address_book: address_book.clone(),
233            block_download_peer_set: peer_set.clone(),
234            block_verifier: block_verifier_router.clone(),
235            mempool: mempool.clone(),
236            state: state.clone(),
237            latest_chain_tip: latest_chain_tip.clone(),
238            misbehavior_sender,
239        };
240        setup_tx
241            .send(setup_data)
242            .map_err(|_| eyre!("could not send setup data to inbound service"))?;
243        // And give it time to clear its queue
244        tokio::task::yield_now().await;
245
246        // Create a channel to send mined blocks to the gossip task
247        let submit_block_channel = SubmitBlockChannel::new();
248
249        // Launch RPC server
250        let (rpc_impl, mut rpc_tx_queue_handle) = RpcImpl::new(
251            config.network.network.clone(),
252            config.mining.clone(),
253            config.rpc.debug_force_finished_sync,
254            build_version(),
255            user_agent(),
256            mempool.clone(),
257            state.clone(),
258            read_only_state_service.clone(),
259            block_verifier_router.clone(),
260            sync_status.clone(),
261            latest_chain_tip.clone(),
262            address_book.clone(),
263            LAST_WARN_ERROR_LOG_SENDER.subscribe(),
264            Some(submit_block_channel.sender()),
265        );
266
267        let rpc_task_handle = if config.rpc.listen_addr.is_some() {
268            RpcServer::start(rpc_impl.clone(), config.rpc.clone())
269                .await
270                .expect("server should start")
271        } else {
272            tokio::spawn(std::future::pending().in_current_span())
273        };
274
275        // TODO: Add a shutdown signal and start the server with `serve_with_incoming_shutdown()` if
276        //       any related unit tests sometimes crash with memory errors
277        let indexer_rpc_task_handle = {
278            if let Some(indexer_listen_addr) = config.rpc.indexer_listen_addr {
279                info!("spawning indexer RPC server");
280                let (indexer_rpc_task_handle, _listen_addr) = zebra_rpc::indexer::server::init(
281                    indexer_listen_addr,
282                    read_only_state_service.clone(),
283                    latest_chain_tip.clone(),
284                    mempool_transaction_subscriber.clone(),
285                )
286                .await
287                .map_err(|err| eyre!(err))?;
288
289                indexer_rpc_task_handle
290            } else {
291                warn!("configure an indexer_listen_addr to start the indexer RPC server");
292                tokio::spawn(std::future::pending().in_current_span())
293            }
294        };
295
296        // Start concurrent tasks which don't add load to other tasks
297        info!("spawning block gossip task");
298        let block_gossip_task_handle = tokio::spawn(
299            sync::gossip_best_tip_block_hashes(
300                sync_status.clone(),
301                chain_tip_change.clone(),
302                peer_set.clone(),
303                Some(submit_block_channel.receiver()),
304            )
305            .in_current_span(),
306        );
307
308        info!("spawning mempool queue checker task");
309        let mempool_queue_checker_task_handle = mempool::QueueChecker::spawn(mempool.clone());
310
311        info!("spawning mempool transaction gossip task");
312        let tx_gossip_task_handle = tokio::spawn(
313            mempool::gossip_mempool_transaction_id(
314                mempool_transaction_subscriber.subscribe(),
315                peer_set.clone(),
316            )
317            .in_current_span(),
318        );
319
320        info!("spawning delete old databases task");
321        let mut old_databases_task_handle = zebra_state::check_and_delete_old_state_databases(
322            &config.state,
323            &config.network.network,
324        );
325
326        info!("spawning progress logging task");
327        let (chain_tip_metrics_sender, chain_tip_metrics_receiver) =
328            health::ChainTipMetrics::channel();
329        let progress_task_handle = tokio::spawn(
330            show_block_chain_progress(
331                config.network.network.clone(),
332                latest_chain_tip.clone(),
333                sync_status.clone(),
334                chain_tip_metrics_sender,
335            )
336            .in_current_span(),
337        );
338
339        // Start health server if configured
340        info!("initializing health endpoints");
341        let (health_task_handle, _) = health::init(
342            config.health.clone(),
343            config.network.network.clone(),
344            chain_tip_metrics_receiver,
345            sync_status.clone(),
346            address_book.clone(),
347        )
348        .await;
349
350        // Spawn never ending end of support task.
351        info!("spawning end of support checking task");
352        let end_of_support_task_handle = tokio::spawn(
353            sync::end_of_support::start(config.network.network.clone(), latest_chain_tip.clone())
354                .in_current_span(),
355        );
356
357        // Give the inbound service more time to clear its queue,
358        // then start concurrent tasks that can add load to the inbound service
359        // (by opening more peer connections, so those peers send us requests)
360        tokio::task::yield_now().await;
361
362        // The crawler only activates immediately in tests that use mempool debug mode
363        info!("spawning mempool crawler task");
364        let mempool_crawler_task_handle = mempool::Crawler::spawn(
365            &config.mempool,
366            peer_set,
367            mempool.clone(),
368            sync_status.clone(),
369            chain_tip_change.clone(),
370        );
371
372        info!("spawning syncer task");
373        let syncer_task_handle = if is_regtest {
374            if !syncer
375                .state_contains(config.network.network.genesis_hash())
376                .await?
377            {
378                let genesis_hash = block_verifier_router
379                    .clone()
380                    .oneshot(zebra_consensus::Request::Commit(regtest_genesis_block()))
381                    .await
382                    .expect("should validate Regtest genesis block");
383
384                assert_eq!(
385                    genesis_hash,
386                    config.network.network.genesis_hash(),
387                    "validated block hash should match network genesis hash"
388                )
389            }
390
391            tokio::spawn(std::future::pending().in_current_span())
392        } else {
393            tokio::spawn(syncer.sync().in_current_span())
394        };
395
396        // And finally, spawn the internal Zcash miner, if it is enabled.
397        //
398        // TODO: add a config to enable the miner rather than a feature.
399        #[cfg(feature = "internal-miner")]
400        let miner_task_handle = if config.mining.is_internal_miner_enabled() {
401            info!("spawning Zcash miner");
402            components::miner::spawn_init(&config.metrics, rpc_impl)
403        } else {
404            tokio::spawn(std::future::pending().in_current_span())
405        };
406
407        #[cfg(not(feature = "internal-miner"))]
408        // Spawn a dummy miner task which doesn't do anything and never finishes.
409        let miner_task_handle: tokio::task::JoinHandle<Result<(), Report>> =
410            tokio::spawn(std::future::pending().in_current_span());
411
412        info!("spawned initial Zebra tasks");
413
414        // TODO: put tasks into an ongoing FuturesUnordered and a startup FuturesUnordered?
415
416        // ongoing tasks
417        pin!(rpc_task_handle);
418        pin!(indexer_rpc_task_handle);
419        pin!(syncer_task_handle);
420        pin!(block_gossip_task_handle);
421        pin!(mempool_crawler_task_handle);
422        pin!(mempool_queue_checker_task_handle);
423        pin!(tx_gossip_task_handle);
424        pin!(progress_task_handle);
425        pin!(end_of_support_task_handle);
426        pin!(miner_task_handle);
427
428        // startup tasks
429        let BackgroundTaskHandles {
430            mut state_checkpoint_verify_handle,
431        } = consensus_task_handles;
432
433        let state_checkpoint_verify_handle_fused = (&mut state_checkpoint_verify_handle).fuse();
434        pin!(state_checkpoint_verify_handle_fused);
435
436        let old_databases_task_handle_fused = (&mut old_databases_task_handle).fuse();
437        pin!(old_databases_task_handle_fused);
438
439        // Wait for tasks to finish
440        let exit_status = loop {
441            let mut exit_when_task_finishes = true;
442
443            let result = select! {
444                rpc_join_result = &mut rpc_task_handle => {
445                    let rpc_server_result = rpc_join_result
446                        .expect("unexpected panic in the rpc task");
447                    info!(?rpc_server_result, "rpc task exited");
448                    Ok(())
449                }
450
451                rpc_tx_queue_result = &mut rpc_tx_queue_handle => {
452                    rpc_tx_queue_result
453                        .expect("unexpected panic in the rpc transaction queue task");
454                    info!("rpc transaction queue task exited");
455                    Ok(())
456                }
457
458                indexer_rpc_join_result = &mut indexer_rpc_task_handle => {
459                    let indexer_rpc_server_result = indexer_rpc_join_result
460                        .expect("unexpected panic in the indexer task");
461                    info!(?indexer_rpc_server_result, "indexer rpc task exited");
462                    Ok(())
463                }
464
465                sync_result = &mut syncer_task_handle => sync_result
466                    .expect("unexpected panic in the syncer task")
467                    .map(|_| info!("syncer task exited")),
468
469                block_gossip_result = &mut block_gossip_task_handle => block_gossip_result
470                    .expect("unexpected panic in the chain tip block gossip task")
471                    .map(|_| info!("chain tip block gossip task exited"))
472                    .map_err(|e| eyre!(e)),
473
474                mempool_crawl_result = &mut mempool_crawler_task_handle => mempool_crawl_result
475                    .expect("unexpected panic in the mempool crawler")
476                    .map(|_| info!("mempool crawler task exited"))
477                    .map_err(|e| eyre!(e)),
478
479                mempool_queue_result = &mut mempool_queue_checker_task_handle => mempool_queue_result
480                    .expect("unexpected panic in the mempool queue checker")
481                    .map(|_| info!("mempool queue checker task exited"))
482                    .map_err(|e| eyre!(e)),
483
484                tx_gossip_result = &mut tx_gossip_task_handle => tx_gossip_result
485                    .expect("unexpected panic in the transaction gossip task")
486                    .map(|_| info!("transaction gossip task exited"))
487                    .map_err(|e| eyre!(e)),
488
489                // The progress task runs forever, unless it panics.
490                // So we don't need to provide an exit status for it.
491                progress_result = &mut progress_task_handle => {
492                    info!("chain progress task exited");
493                    progress_result
494                        .expect("unexpected panic in the chain progress task");
495                }
496
497                end_of_support_result = &mut end_of_support_task_handle => end_of_support_result
498                    .expect("unexpected panic in the end of support task")
499                    .map(|_| info!("end of support task exited")),
500
501                // We also expect the state checkpoint verify task to finish.
502                state_checkpoint_verify_result = &mut state_checkpoint_verify_handle_fused => {
503                    state_checkpoint_verify_result
504                        .unwrap_or_else(|_| panic!(
505                            "unexpected panic checking previous state followed the best chain"));
506
507                    exit_when_task_finishes = false;
508                    Ok(())
509                }
510
511                // And the old databases task should finish while Zebra is running.
512                old_databases_result = &mut old_databases_task_handle_fused => {
513                    old_databases_result
514                        .unwrap_or_else(|_| panic!(
515                            "unexpected panic deleting old database directories"));
516
517                    exit_when_task_finishes = false;
518                    Ok(())
519                }
520
521                miner_result = &mut miner_task_handle => miner_result
522                    .expect("unexpected panic in the miner task")
523                    .map(|_| info!("miner task exited")),
524            };
525
526            // Stop Zebra if a task finished and returned an error,
527            // or if an ongoing task exited.
528            if let Err(err) = result {
529                break Err(err);
530            }
531
532            if exit_when_task_finishes {
533                break Ok(());
534            }
535        };
536
537        info!("exiting Zebra because an ongoing task exited: asking other tasks to stop");
538
539        // ongoing tasks
540        rpc_task_handle.abort();
541        rpc_tx_queue_handle.abort();
542        health_task_handle.abort();
543        syncer_task_handle.abort();
544        block_gossip_task_handle.abort();
545        mempool_crawler_task_handle.abort();
546        mempool_queue_checker_task_handle.abort();
547        tx_gossip_task_handle.abort();
548        progress_task_handle.abort();
549        end_of_support_task_handle.abort();
550        miner_task_handle.abort();
551
552        // startup tasks
553        state_checkpoint_verify_handle.abort();
554        old_databases_task_handle.abort();
555
556        info!(
557            "exiting Zebra: all tasks have been asked to stop, waiting for remaining tasks to finish"
558        );
559
560        exit_status
561    }
562
563    /// Returns the bound for the state service buffer,
564    /// based on the configurations of the services that use the state concurrently.
565    fn state_buffer_bound() -> usize {
566        let config = APPLICATION.config();
567
568        // Ignore the checkpoint verify limit, because it is very large.
569        //
570        // TODO: do we also need to account for concurrent use across services?
571        //       we could multiply the maximum by 3/2, or add a fixed constant
572        [
573            config.sync.download_concurrency_limit,
574            config.sync.full_verify_concurrency_limit,
575            inbound::downloads::MAX_INBOUND_CONCURRENCY,
576            mempool::downloads::MAX_INBOUND_CONCURRENCY,
577        ]
578        .into_iter()
579        .max()
580        .unwrap()
581    }
582}
583
584impl Runnable for StartCmd {
585    /// Start the application.
586    fn run(&self) {
587        info!("Starting zebrad");
588        let rt = APPLICATION
589            .state()
590            .components_mut()
591            .get_downcast_mut::<TokioComponent>()
592            .expect("TokioComponent should be available")
593            .rt
594            .take();
595
596        rt.expect("runtime should not already be taken")
597            .run(self.start());
598
599        info!("stopping zebrad");
600    }
601}
602
603impl config::Override<ZebradConfig> for StartCmd {
604    // Process the given command line options, overriding settings from
605    // a configuration file using explicit flags taken from command-line
606    // arguments.
607    fn override_config(&self, mut config: ZebradConfig) -> Result<ZebradConfig, FrameworkError> {
608        if !self.filters.is_empty() {
609            config.tracing.filter = Some(self.filters.join(","));
610        }
611
612        Ok(config)
613    }
614}