Skip to main content

zebrad/commands/
start.rs

1//! `start` subcommand - entry point for starting a zebra node
2//!
3//! ## Application Structure
4//!
5//! A zebra node consists of the following major services and tasks:
6//!
7//! Peers:
8//!  * Peer Connection Pool Service
9//!    * primary external interface for outbound requests from this node to remote peers
10//!    * accepts requests from services and tasks in this node, and sends them to remote peers
11//!  * Peer Discovery Service
12//!    * maintains a list of peer addresses, and connection priority metadata
13//!    * discovers new peer addresses from existing peer connections
14//!    * initiates new outbound peer connections in response to demand from tasks within this node
15//!  * Peer Cache Service
16//!    * Reads previous peer cache on startup, and adds it to the configured DNS seed peers
17//!    * Periodically updates the peer cache on disk from the latest address book state
18//!
19//! Blocks & Mempool Transactions:
20//!  * Consensus Service
21//!    * handles all validation logic for the node
22//!    * verifies blocks using zebra-chain, then stores verified blocks in zebra-state
23//!    * verifies mempool and block transactions using zebra-chain and zebra-script,
24//!      and returns verified mempool transactions for mempool storage
25//!  * Inbound Service
26//!    * primary external interface for inbound peer requests to this node
27//!    * handles requests from peers for network data, chain data, and mempool transactions
28//!    * spawns download and verify tasks for each gossiped block
29//!    * sends gossiped transactions to the mempool service
30//!
31//! Blocks:
32//!  * Sync Task
33//!    * runs in the background and continuously queries the network for
34//!      new blocks to be verified and added to the local state
35//!    * spawns download and verify tasks for each crawled block
36//!  * State Service
37//!    * contextually verifies blocks
38//!    * handles in-memory storage of multiple non-finalized chains
39//!    * handles permanent storage of the best finalized chain
40//!  * Old State Version Cleanup Task
41//!    * deletes outdated state versions
42//!  * Block Gossip Task
43//!    * runs in the background and continuously queries the state for
44//!      newly committed blocks to be gossiped to peers
45//!  * Progress Task
46//!    * logs progress towards the chain tip
47//!
48//! Block Mining:
49//!  * Internal Miner Task
50//!    * if the user has configured Zebra to mine blocks, spawns tasks to generate new blocks,
51//!      and submits them for verification. This automatically shares these new blocks with peers.
52//!
53//! Mempool Transactions:
54//!  * Mempool Service
55//!    * activates when the syncer is near the chain tip
56//!    * spawns download and verify tasks for each crawled or gossiped transaction
57//!    * handles in-memory storage of unmined transactions
58//!  * Queue Checker Task
59//!    * runs in the background, polling the mempool to store newly verified transactions
60//!  * Transaction Gossip Task
61//!    * runs in the background and gossips newly added mempool transactions
62//!      to peers
63//!
64//! Remote Procedure Calls:
65//!  * JSON-RPC Service
66//!    * answers RPC client requests using the State Service and Mempool Service
67//!    * submits client transactions to the node's mempool
68//!
69//! Zebra also has diagnostic support:
70//! * [metrics](https://github.com/ZcashFoundation/zebra/blob/main/book/src/user/metrics.md)
71//! * [tracing](https://github.com/ZcashFoundation/zebra/blob/main/book/src/user/tracing.md)
72//! * [progress-bar](https://docs.rs/howudoin/0.1.1/howudoin)
73//!
74//! Some of the diagnostic features are optional, and need to be enabled at compile-time.
75
76use std::sync::Arc;
77
78use abscissa_core::{config, Command, FrameworkError};
79use color_eyre::eyre::{eyre, Report};
80use futures::FutureExt;
81use tokio::{pin, select, sync::oneshot};
82use tower::{builder::ServiceBuilder, util::BoxService, ServiceExt};
83use tracing_futures::Instrument;
84
85use zebra_chain::block::genesis::regtest_genesis_block;
86use zebra_consensus::router::BackgroundTaskHandles;
87use zebra_rpc::{methods::RpcImpl, server::RpcServer, SubmitBlockChannel};
88
89use crate::{
90    application::{build_version, user_agent, LAST_WARN_ERROR_LOG_SENDER},
91    components::{
92        health,
93        inbound::{self, InboundSetupData, MAX_INBOUND_RESPONSE_TIME},
94        mempool::{self, Mempool},
95        sync::{self, show_block_chain_progress, VERIFICATION_PIPELINE_SCALING_MULTIPLIER},
96        tokio::{RuntimeRun, TokioComponent},
97        ChainSync, Inbound,
98    },
99    config::ZebradConfig,
100    prelude::*,
101};
102
103#[cfg(feature = "internal-miner")]
104use crate::components;
105
106/// Start the application (default command)
107#[derive(Command, Debug, Default, clap::Parser)]
108pub struct StartCmd {
109    /// Filter strings which override the config file and defaults
110    #[clap(help = "tracing filters which override the zebrad.toml config")]
111    filters: Vec<String>,
112}
113
114/// Warns if Linux TCP slow-start-after-idle is enabled, which significantly
115/// reduces single-peer throughput for block propagation.
116///
117/// See `book/src/user/troubleshooting.md`.
118#[cfg(target_os = "linux")]
119fn check_tcp_slow_start_after_idle() {
120    const PATH: &str = "/proc/sys/net/ipv4/tcp_slow_start_after_idle";
121
122    let raw = match std::fs::read_to_string(PATH) {
123        Ok(raw) => raw,
124        Err(error) => {
125            debug!(
126                ?error,
127                path = PATH,
128                "could not read TCP sysctl, skipping check"
129            );
130            return;
131        }
132    };
133
134    if raw.trim() == "0" {
135        return;
136    }
137
138    warn!(
139        setting = "net.ipv4.tcp_slow_start_after_idle",
140        "TCP slow-start-after-idle is enabled, which resets TCP's congestion window \
141         between block requests and significantly reduces single-peer throughput for \
142         block propagation. \
143         Hint: set `net.ipv4.tcp_slow_start_after_idle=0` via sysctl. \
144         See https://zebra.zfnd.org/user/troubleshooting.html#linux-tcp-tuning-for-block-propagation"
145    );
146}
147
148#[cfg(not(target_os = "linux"))]
149fn check_tcp_slow_start_after_idle() {}
150
151impl StartCmd {
152    async fn start(&self) -> Result<(), Report> {
153        check_tcp_slow_start_after_idle();
154
155        let config = APPLICATION.config();
156        let is_regtest = config.network.network.is_regtest();
157
158        let config = if is_regtest {
159            Arc::new(ZebradConfig {
160                mempool: mempool::Config {
161                    debug_enable_at_height: Some(0),
162                    ..config.mempool
163                },
164                ..Arc::unwrap_or_clone(config)
165            })
166        } else {
167            config
168        };
169
170        info!("initializing node state");
171        let (_, max_checkpoint_height) = zebra_consensus::router::init_checkpoint_list(
172            config.consensus.clone(),
173            &config.network.network,
174        );
175
176        info!("opening database, this may take a few minutes");
177
178        let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
179            zebra_state::init(
180                config.state.clone(),
181                &config.network.network,
182                max_checkpoint_height,
183                config.sync.checkpoint_verify_concurrency_limit
184                    * (VERIFICATION_PIPELINE_SCALING_MULTIPLIER + 1),
185            )
186            .await;
187
188        info!("logging database metrics on startup");
189        read_only_state_service.log_db_metrics();
190
191        let state = ServiceBuilder::new()
192            .buffer(Self::state_buffer_bound())
193            .service(state_service);
194
195        info!("initializing network");
196        // The service that our node uses to respond to requests by peers. The
197        // load_shed middleware ensures that we reduce the size of the peer set
198        // in response to excess load.
199        //
200        // # Security
201        //
202        // This layer stack is security-sensitive, modifying it can cause hangs,
203        // or enable denial of service attacks.
204        //
205        // See `zebra_network::Connection::drive_peer_request()` for details.
206        let (setup_tx, setup_rx) = oneshot::channel();
207        let inbound = ServiceBuilder::new()
208            .load_shed()
209            .buffer(inbound::downloads::MAX_INBOUND_CONCURRENCY)
210            .timeout(MAX_INBOUND_RESPONSE_TIME)
211            .service(Inbound::new(
212                config.sync.full_verify_concurrency_limit,
213                setup_rx,
214            ));
215
216        let (peer_set, address_book, misbehavior_sender) = zebra_network::init(
217            config.network.clone(),
218            inbound,
219            latest_chain_tip.clone(),
220            user_agent(),
221        )
222        .await;
223
224        // Start health server if configured (after sync_status is available)
225
226        info!("initializing verifiers");
227        let (tx_verifier_setup_tx, tx_verifier_setup_rx) = oneshot::channel();
228        let (block_verifier_router, tx_verifier, consensus_task_handles, max_checkpoint_height) =
229            zebra_consensus::router::init(
230                config.consensus.clone(),
231                &config.network.network,
232                state.clone(),
233                tx_verifier_setup_rx,
234            )
235            .await;
236
237        info!("initializing syncer");
238        let (mut syncer, sync_status) = ChainSync::new(
239            &config,
240            max_checkpoint_height,
241            peer_set.clone(),
242            block_verifier_router.clone(),
243            state.clone(),
244            latest_chain_tip.clone(),
245            misbehavior_sender.clone(),
246        );
247
248        info!("initializing mempool");
249        let (mempool, mempool_transaction_subscriber) = Mempool::new(
250            &config.mempool,
251            peer_set.clone(),
252            state.clone(),
253            tx_verifier,
254            sync_status.clone(),
255            latest_chain_tip.clone(),
256            chain_tip_change.clone(),
257            misbehavior_sender.clone(),
258        );
259        let mempool = BoxService::new(mempool);
260        let mempool = ServiceBuilder::new()
261            .buffer(mempool::downloads::MAX_INBOUND_CONCURRENCY)
262            .service(mempool);
263
264        if tx_verifier_setup_tx.send(mempool.clone()).is_err() {
265            warn!("error setting up the transaction verifier with a handle to the mempool service");
266        };
267
268        info!("fully initializing inbound peer request handler");
269        // Fully start the inbound service as soon as possible
270        let setup_data = InboundSetupData {
271            address_book: address_book.clone(),
272            block_download_peer_set: peer_set.clone(),
273            block_verifier: block_verifier_router.clone(),
274            mempool: mempool.clone(),
275            state: state.clone(),
276            latest_chain_tip: latest_chain_tip.clone(),
277            misbehavior_sender,
278        };
279        setup_tx
280            .send(setup_data)
281            .map_err(|_| eyre!("could not send setup data to inbound service"))?;
282        // And give it time to clear its queue
283        tokio::task::yield_now().await;
284
285        // Create a channel to send mined blocks to the gossip task
286        let submit_block_channel = SubmitBlockChannel::new();
287
288        // Launch RPC server
289        let (rpc_impl, mut rpc_tx_queue_handle) = RpcImpl::new(
290            config.network.network.clone(),
291            config.mining.clone(),
292            config.rpc.debug_force_finished_sync,
293            build_version(),
294            user_agent(),
295            mempool.clone(),
296            state.clone(),
297            read_only_state_service.clone(),
298            block_verifier_router.clone(),
299            sync_status.clone(),
300            latest_chain_tip.clone(),
301            address_book.clone(),
302            LAST_WARN_ERROR_LOG_SENDER.subscribe(),
303            Some(submit_block_channel.sender()),
304        );
305
306        let rpc_task_handle = if config.rpc.listen_addr.is_some() {
307            RpcServer::start(rpc_impl.clone(), config.rpc.clone())
308                .await
309                .expect("server should start")
310        } else {
311            tokio::spawn(std::future::pending().in_current_span())
312        };
313
314        // TODO: Add a shutdown signal and start the server with `serve_with_incoming_shutdown()` if
315        //       any related unit tests sometimes crash with memory errors
316        let indexer_rpc_task_handle = {
317            if let Some(indexer_listen_addr) = config.rpc.indexer_listen_addr {
318                info!("spawning indexer RPC server");
319                let (indexer_rpc_task_handle, _listen_addr) = zebra_rpc::indexer::server::init(
320                    indexer_listen_addr,
321                    read_only_state_service.clone(),
322                    latest_chain_tip.clone(),
323                    mempool_transaction_subscriber.clone(),
324                )
325                .await
326                .map_err(|err| eyre!(err))?;
327
328                indexer_rpc_task_handle
329            } else {
330                warn!("configure an indexer_listen_addr to start the indexer RPC server");
331                tokio::spawn(std::future::pending().in_current_span())
332            }
333        };
334
335        // Start concurrent tasks which don't add load to other tasks
336        info!("spawning block gossip task");
337        let block_gossip_task_handle = tokio::spawn(
338            sync::gossip_best_tip_block_hashes(
339                sync_status.clone(),
340                chain_tip_change.clone(),
341                peer_set.clone(),
342                Some(submit_block_channel.receiver()),
343            )
344            .in_current_span(),
345        );
346
347        info!("spawning mempool queue checker task");
348        let mempool_queue_checker_task_handle = mempool::QueueChecker::spawn(mempool.clone());
349
350        info!("spawning mempool transaction gossip task");
351        let tx_gossip_task_handle = tokio::spawn(
352            mempool::gossip_mempool_transaction_id(
353                mempool_transaction_subscriber.subscribe(),
354                peer_set.clone(),
355            )
356            .in_current_span(),
357        );
358
359        info!("spawning delete old databases task");
360        let mut old_databases_task_handle = zebra_state::check_and_delete_old_state_databases(
361            &config.state,
362            &config.network.network,
363        );
364
365        info!("spawning progress logging task");
366        let (chain_tip_metrics_sender, chain_tip_metrics_receiver) =
367            health::ChainTipMetrics::channel();
368        let progress_task_handle = tokio::spawn(
369            show_block_chain_progress(
370                config.network.network.clone(),
371                latest_chain_tip.clone(),
372                sync_status.clone(),
373                chain_tip_metrics_sender,
374            )
375            .in_current_span(),
376        );
377
378        // Start health server if configured
379        info!("initializing health endpoints");
380        let (health_task_handle, _) = health::init(
381            config.health.clone(),
382            config.network.network.clone(),
383            chain_tip_metrics_receiver,
384            sync_status.clone(),
385            address_book.clone(),
386        )
387        .await;
388
389        // Spawn never ending end of support task.
390        info!("spawning end of support checking task");
391        let end_of_support_task_handle = tokio::spawn(
392            sync::end_of_support::start(config.network.network.clone(), latest_chain_tip.clone())
393                .in_current_span(),
394        );
395
396        // Give the inbound service more time to clear its queue,
397        // then start concurrent tasks that can add load to the inbound service
398        // (by opening more peer connections, so those peers send us requests)
399        tokio::task::yield_now().await;
400
401        // The crawler only activates immediately in tests that use mempool debug mode
402        info!("spawning mempool crawler task");
403        let mempool_crawler_task_handle = mempool::Crawler::spawn(
404            &config.mempool,
405            peer_set,
406            mempool.clone(),
407            sync_status.clone(),
408            chain_tip_change.clone(),
409        );
410
411        info!("spawning syncer task");
412        // In regtest, commit the genesis block directly (bypassing the syncer's genesis
413        // download, which requires a connected peer). Then run the syncer normally so
414        // that multi-hop block propagation works: gossiped blocks that arrive out of
415        // order (e.g. only the latest tip hash was gossiped) will be recovered by the
416        // syncer using block locators within REGTEST_SYNC_RESTART_DELAY (2 seconds).
417        if is_regtest
418            && !syncer
419                .state_contains(config.network.network.genesis_hash())
420                .await?
421        {
422            let genesis_hash = block_verifier_router
423                .clone()
424                .oneshot(zebra_consensus::Request::Commit(regtest_genesis_block()))
425                .await
426                .expect("should validate Regtest genesis block");
427
428            assert_eq!(
429                genesis_hash,
430                config.network.network.genesis_hash(),
431                "validated block hash should match network genesis hash"
432            )
433        }
434        let syncer_task_handle = tokio::spawn(syncer.sync().in_current_span());
435
436        // And finally, spawn the internal Zcash miner, if it is enabled.
437        //
438        // TODO: add a config to enable the miner rather than a feature.
439        #[cfg(feature = "internal-miner")]
440        let miner_task_handle = if config.mining.is_internal_miner_enabled() {
441            info!("spawning Zcash miner");
442            components::miner::spawn_init(&config.metrics, rpc_impl)
443        } else {
444            tokio::spawn(std::future::pending().in_current_span())
445        };
446
447        #[cfg(not(feature = "internal-miner"))]
448        // Spawn a dummy miner task which doesn't do anything and never finishes.
449        let miner_task_handle: tokio::task::JoinHandle<Result<(), Report>> =
450            tokio::spawn(std::future::pending().in_current_span());
451
452        info!("spawned initial Zebra tasks");
453
454        // TODO: put tasks into an ongoing FuturesUnordered and a startup FuturesUnordered?
455
456        // ongoing tasks
457        pin!(rpc_task_handle);
458        pin!(indexer_rpc_task_handle);
459        pin!(syncer_task_handle);
460        pin!(block_gossip_task_handle);
461        pin!(mempool_crawler_task_handle);
462        pin!(mempool_queue_checker_task_handle);
463        pin!(tx_gossip_task_handle);
464        pin!(progress_task_handle);
465        pin!(end_of_support_task_handle);
466        pin!(miner_task_handle);
467
468        // startup tasks
469        let BackgroundTaskHandles {
470            mut state_checkpoint_verify_handle,
471        } = consensus_task_handles;
472
473        let state_checkpoint_verify_handle_fused = (&mut state_checkpoint_verify_handle).fuse();
474        pin!(state_checkpoint_verify_handle_fused);
475
476        let old_databases_task_handle_fused = (&mut old_databases_task_handle).fuse();
477        pin!(old_databases_task_handle_fused);
478
479        // Wait for tasks to finish
480        let exit_status = loop {
481            let mut exit_when_task_finishes = true;
482
483            let result = select! {
484                rpc_join_result = &mut rpc_task_handle => {
485                    let rpc_server_result = rpc_join_result
486                        .expect("unexpected panic in the rpc task");
487                    info!(?rpc_server_result, "rpc task exited");
488                    Ok(())
489                }
490
491                rpc_tx_queue_result = &mut rpc_tx_queue_handle => {
492                    rpc_tx_queue_result
493                        .expect("unexpected panic in the rpc transaction queue task");
494                    info!("rpc transaction queue task exited");
495                    Ok(())
496                }
497
498                indexer_rpc_join_result = &mut indexer_rpc_task_handle => {
499                    let indexer_rpc_server_result = indexer_rpc_join_result
500                        .expect("unexpected panic in the indexer task");
501                    info!(?indexer_rpc_server_result, "indexer rpc task exited");
502                    Ok(())
503                }
504
505                sync_result = &mut syncer_task_handle => sync_result
506                    .expect("unexpected panic in the syncer task")
507                    .map(|_| info!("syncer task exited")),
508
509                block_gossip_result = &mut block_gossip_task_handle => block_gossip_result
510                    .expect("unexpected panic in the chain tip block gossip task")
511                    .map(|_| info!("chain tip block gossip task exited"))
512                    .map_err(|e| eyre!(e)),
513
514                mempool_crawl_result = &mut mempool_crawler_task_handle => mempool_crawl_result
515                    .expect("unexpected panic in the mempool crawler")
516                    .map(|_| info!("mempool crawler task exited"))
517                    .map_err(|e| eyre!(e)),
518
519                mempool_queue_result = &mut mempool_queue_checker_task_handle => mempool_queue_result
520                    .expect("unexpected panic in the mempool queue checker")
521                    .map(|_| info!("mempool queue checker task exited"))
522                    .map_err(|e| eyre!(e)),
523
524                tx_gossip_result = &mut tx_gossip_task_handle => tx_gossip_result
525                    .expect("unexpected panic in the transaction gossip task")
526                    .map(|_| info!("transaction gossip task exited"))
527                    .map_err(|e| eyre!(e)),
528
529                // The progress task runs forever, unless it panics.
530                // So we don't need to provide an exit status for it.
531                progress_result = &mut progress_task_handle => {
532                    info!("chain progress task exited");
533                    progress_result
534                        .expect("unexpected panic in the chain progress task");
535                }
536
537                end_of_support_result = &mut end_of_support_task_handle => end_of_support_result
538                    .expect("unexpected panic in the end of support task")
539                    .map(|_| info!("end of support task exited")),
540
541                // We also expect the state checkpoint verify task to finish.
542                state_checkpoint_verify_result = &mut state_checkpoint_verify_handle_fused => {
543                    state_checkpoint_verify_result
544                        .unwrap_or_else(|_| panic!(
545                            "unexpected panic checking previous state followed the best chain"));
546
547                    exit_when_task_finishes = false;
548                    Ok(())
549                }
550
551                // And the old databases task should finish while Zebra is running.
552                old_databases_result = &mut old_databases_task_handle_fused => {
553                    old_databases_result
554                        .unwrap_or_else(|_| panic!(
555                            "unexpected panic deleting old database directories"));
556
557                    exit_when_task_finishes = false;
558                    Ok(())
559                }
560
561                miner_result = &mut miner_task_handle => miner_result
562                    .expect("unexpected panic in the miner task")
563                    .map(|_| info!("miner task exited")),
564            };
565
566            // Stop Zebra if a task finished and returned an error,
567            // or if an ongoing task exited.
568            if let Err(err) = result {
569                break Err(err);
570            }
571
572            if exit_when_task_finishes {
573                break Ok(());
574            }
575        };
576
577        info!("exiting Zebra because an ongoing task exited: asking other tasks to stop");
578
579        // ongoing tasks
580        rpc_task_handle.abort();
581        rpc_tx_queue_handle.abort();
582        health_task_handle.abort();
583        syncer_task_handle.abort();
584        block_gossip_task_handle.abort();
585        mempool_crawler_task_handle.abort();
586        mempool_queue_checker_task_handle.abort();
587        tx_gossip_task_handle.abort();
588        progress_task_handle.abort();
589        end_of_support_task_handle.abort();
590        miner_task_handle.abort();
591
592        // startup tasks
593        state_checkpoint_verify_handle.abort();
594        old_databases_task_handle.abort();
595
596        info!(
597            "exiting Zebra: all tasks have been asked to stop, waiting for remaining tasks to finish"
598        );
599
600        exit_status
601    }
602
603    /// Returns the bound for the state service buffer,
604    /// based on the configurations of the services that use the state concurrently.
605    fn state_buffer_bound() -> usize {
606        let config = APPLICATION.config();
607
608        // Ignore the checkpoint verify limit, because it is very large.
609        //
610        // TODO: do we also need to account for concurrent use across services?
611        //       we could multiply the maximum by 3/2, or add a fixed constant
612        [
613            config.sync.download_concurrency_limit,
614            config.sync.full_verify_concurrency_limit,
615            inbound::downloads::MAX_INBOUND_CONCURRENCY,
616            mempool::downloads::MAX_INBOUND_CONCURRENCY,
617        ]
618        .into_iter()
619        .max()
620        .unwrap()
621    }
622}
623
624impl Runnable for StartCmd {
625    /// Start the application.
626    fn run(&self) {
627        info!("Starting zebrad");
628        let rt = APPLICATION
629            .state()
630            .components_mut()
631            .get_downcast_mut::<TokioComponent>()
632            .expect("TokioComponent should be available")
633            .rt
634            .take();
635
636        rt.expect("runtime should not already be taken")
637            .run(self.start());
638
639        info!("stopping zebrad");
640    }
641}
642
643impl config::Override<ZebradConfig> for StartCmd {
644    // Process the given command line options, overriding settings from
645    // a configuration file using explicit flags taken from command-line
646    // arguments.
647    fn override_config(&self, mut config: ZebradConfig) -> Result<ZebradConfig, FrameworkError> {
648        if !self.filters.is_empty() {
649            config.tracing.filter = Some(self.filters.join(","));
650        }
651
652        Ok(config)
653    }
654}