Skip to main content

zebrad/components/sync/
gossip.rs

1//! A task that gossips newly verified [`block::Hash`]es to peers.
2//!
3//! [`block::Hash`]: zebra_chain::block::Hash
4
5use std::time::Duration;
6
7use futures::TryFutureExt;
8use thiserror::Error;
9use tokio::sync::{mpsc, watch};
10use tower::{timeout::Timeout, Service, ServiceExt};
11use tracing::Instrument;
12
13use zebra_chain::{block, chain_tip::ChainTip};
14use zebra_network as zn;
15use zebra_state::ChainTipChange;
16
17use crate::{
18    components::sync::{SyncStatus, PEER_GOSSIP_DELAY, TIPS_RESPONSE_TIMEOUT},
19    BoxError,
20};
21
22use BlockGossipError::*;
23
24/// Errors that can occur when gossiping committed blocks
25#[derive(Error, Debug)]
26pub enum BlockGossipError {
27    #[error("chain tip sender was dropped")]
28    TipChange(watch::error::RecvError),
29
30    #[error("sync status sender was dropped")]
31    SyncStatus(watch::error::RecvError),
32
33    #[error("permanent peer set failure")]
34    PeerSetReadiness(zn::BoxError),
35}
36
37/// Run continuously, gossiping newly verified [`block::Hash`]es to peers.
38///
39/// Once the state has reached the chain tip, broadcast the [`block::Hash`]es
40/// of newly verified blocks to all ready peers.
41///
42/// Blocks are only gossiped if they are:
43/// - on the best chain, and
44/// - the most recent block verified since the last gossip.
45///
46/// In particular, if a lot of blocks are committed at the same time,
47/// gossips will be disabled or skipped until the state reaches the latest tip.
48///
49/// [`block::Hash`]: zebra_chain::block::Hash
50pub async fn gossip_best_tip_block_hashes<ZN>(
51    sync_status: SyncStatus,
52    mut chain_state: ChainTipChange,
53    broadcast_network: ZN,
54    mut mined_block_receiver: Option<mpsc::Receiver<(block::Hash, block::Height)>>,
55) -> Result<(), BlockGossipError>
56where
57    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
58    ZN::Future: Send,
59{
60    info!("initializing block gossip task");
61
62    // use the same timeout as tips requests,
63    // so broadcasts don't delay the syncer too long
64    let mut broadcast_network = Timeout::new(broadcast_network, TIPS_RESPONSE_TIMEOUT);
65
66    loop {
67        // TODO: Refactor this into a struct and move the contents of this loop into its own method.
68        let mut sync_status = sync_status.clone();
69        let mut chain_tip = chain_state.clone();
70
71        // TODO: Move the contents of this async block to its own method
72        let tip_change_close_to_network_tip_fut = async move {
73            /// A brief duration to wait after a tip change for a new message in the mined block channel.
74            // TODO: Add a test to check that Zebra does not advertise mined blocks to peers twice.
75            const WAIT_FOR_BLOCK_SUBMISSION_DELAY: Duration = Duration::from_micros(100);
76
77            // wait for at least the network timeout between gossips
78            //
79            // in practice, we expect blocks to arrive approximately every 75 seconds,
80            // so waiting 6 seconds won't make much difference
81            tokio::time::sleep(PEER_GOSSIP_DELAY).await;
82
83            // wait for at least one tip change, to make sure we have a new block hash to broadcast
84            let tip_action = chain_tip.wait_for_tip_change().await.map_err(TipChange)?;
85
86            // wait for block submissions to be received through the `mined_block_receiver` if the tip
87            // change is from a block submission.
88            tokio::time::sleep(WAIT_FOR_BLOCK_SUBMISSION_DELAY).await;
89
90            // wait until we're close to the tip, because broadcasts are only useful for nodes near the tip
91            // (if they're a long way from the tip, they use the syncer and block locators), unless a mined block
92            // hash is received before `wait_until_close_to_tip()` is ready.
93            sync_status
94                .wait_until_close_to_tip()
95                .map_err(SyncStatus)
96                .await?;
97
98            // get the latest tip change when close to tip - it might be different to the change we awaited,
99            // because the syncer might take a long time to reach the tip
100            let best_tip = chain_tip
101                .last_tip_change()
102                .unwrap_or(tip_action)
103                .best_tip_hash_and_height();
104
105            Ok((best_tip, "sending committed block broadcast", chain_tip))
106        }
107        .in_current_span();
108
109        // TODO: Move this logic for selecting the first ready future and updating `chain_state` to its own method.
110        let (((hash, height), log_msg, updated_chain_state), is_block_submission) =
111            if let Some(mined_block_receiver) = mined_block_receiver.as_mut() {
112                tokio::select! {
113                    tip_change_close_to_network_tip = tip_change_close_to_network_tip_fut => {
114                        (tip_change_close_to_network_tip?, false)
115                    },
116
117                    Some(tip_change) = mined_block_receiver.recv() => {
118                       ((tip_change, "sending mined block broadcast", chain_state), true)
119                    }
120                }
121            } else {
122                (tip_change_close_to_network_tip_fut.await?, false)
123            };
124
125        chain_state = updated_chain_state;
126
127        // TODO: Move logic for calling the peer set to its own method.
128
129        // block broadcasts inform other nodes about new blocks,
130        // so our internal Grow or Reset state doesn't matter to them
131        let request = if is_block_submission {
132            zn::Request::AdvertiseBlockToAll(hash)
133        } else {
134            zn::Request::AdvertiseBlock(hash)
135        };
136
137        info!(?height, ?request, log_msg);
138        let broadcast_fut = broadcast_network
139            .ready()
140            .await
141            .map_err(PeerSetReadiness)?
142            .call(request);
143
144        // Await the broadcast future in a spawned task to avoid waiting on
145        // `AdvertiseBlockToAll` requests when there are unready peers.
146        // Broadcast requests don't return errors, and we'd just want to ignore them anyway.
147        tokio::spawn(broadcast_fut);
148
149        // TODO: Move this logic for marking the last change hash as seen to its own method.
150
151        // Mark the last change hash of `chain_state` as the last block submission hash to avoid
152        // advertising a block hash to some peers twice.
153        if is_block_submission
154            && mined_block_receiver
155                .as_ref()
156                .is_some_and(|rx| rx.is_empty())
157            && chain_state.latest_chain_tip().best_tip_hash() == Some(hash)
158        {
159            chain_state.mark_last_change_hash(hash);
160        }
161    }
162}