zebrad/components/sync/gossip.rs
1//! A task that gossips newly verified [`block::Hash`]es to peers.
2//!
3//! [`block::Hash`]: zebra_chain::block::Hash
4
5use std::time::Duration;
6
7use futures::TryFutureExt;
8use thiserror::Error;
9use tokio::sync::{mpsc, watch};
10use tower::{timeout::Timeout, Service, ServiceExt};
11use tracing::Instrument;
12
13use zebra_chain::{block, chain_tip::ChainTip};
14use zebra_network as zn;
15use zebra_state::ChainTipChange;
16
17use crate::{
18 components::sync::{SyncStatus, PEER_GOSSIP_DELAY, TIPS_RESPONSE_TIMEOUT},
19 BoxError,
20};
21
22use BlockGossipError::*;
23
24/// Errors that can occur when gossiping committed blocks
25#[derive(Error, Debug)]
26pub enum BlockGossipError {
27 #[error("chain tip sender was dropped")]
28 TipChange(watch::error::RecvError),
29
30 #[error("sync status sender was dropped")]
31 SyncStatus(watch::error::RecvError),
32
33 #[error("permanent peer set failure")]
34 PeerSetReadiness(zn::BoxError),
35}
36
37/// Run continuously, gossiping newly verified [`block::Hash`]es to peers.
38///
39/// Once the state has reached the chain tip, broadcast the [`block::Hash`]es
40/// of newly verified blocks to all ready peers.
41///
42/// Blocks are only gossiped if they are:
43/// - on the best chain, and
44/// - the most recent block verified since the last gossip.
45///
46/// In particular, if a lot of blocks are committed at the same time,
47/// gossips will be disabled or skipped until the state reaches the latest tip.
48///
49/// [`block::Hash`]: zebra_chain::block::Hash
50pub async fn gossip_best_tip_block_hashes<ZN>(
51 sync_status: SyncStatus,
52 mut chain_state: ChainTipChange,
53 broadcast_network: ZN,
54 mut mined_block_receiver: Option<mpsc::Receiver<(block::Hash, block::Height)>>,
55) -> Result<(), BlockGossipError>
56where
57 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
58 ZN::Future: Send,
59{
60 info!("initializing block gossip task");
61
62 // use the same timeout as tips requests,
63 // so broadcasts don't delay the syncer too long
64 let mut broadcast_network = Timeout::new(broadcast_network, TIPS_RESPONSE_TIMEOUT);
65
66 loop {
67 // TODO: Refactor this into a struct and move the contents of this loop into its own method.
68 let mut sync_status = sync_status.clone();
69 let mut chain_tip = chain_state.clone();
70
71 // TODO: Move the contents of this async block to its own method
72 let tip_change_close_to_network_tip_fut = async move {
73 /// A brief duration to wait after a tip change for a new message in the mined block channel.
74 // TODO: Add a test to check that Zebra does not advertise mined blocks to peers twice.
75 const WAIT_FOR_BLOCK_SUBMISSION_DELAY: Duration = Duration::from_micros(100);
76
77 // wait for at least the network timeout between gossips
78 //
79 // in practice, we expect blocks to arrive approximately every 75 seconds,
80 // so waiting 6 seconds won't make much difference
81 tokio::time::sleep(PEER_GOSSIP_DELAY).await;
82
83 // wait for at least one tip change, to make sure we have a new block hash to broadcast
84 let tip_action = chain_tip.wait_for_tip_change().await.map_err(TipChange)?;
85
86 // wait for block submissions to be received through the `mined_block_receiver` if the tip
87 // change is from a block submission.
88 tokio::time::sleep(WAIT_FOR_BLOCK_SUBMISSION_DELAY).await;
89
90 // wait until we're close to the tip, because broadcasts are only useful for nodes near the tip
91 // (if they're a long way from the tip, they use the syncer and block locators), unless a mined block
92 // hash is received before `wait_until_close_to_tip()` is ready.
93 sync_status
94 .wait_until_close_to_tip()
95 .map_err(SyncStatus)
96 .await?;
97
98 // get the latest tip change when close to tip - it might be different to the change we awaited,
99 // because the syncer might take a long time to reach the tip
100 let best_tip = chain_tip
101 .last_tip_change()
102 .unwrap_or(tip_action)
103 .best_tip_hash_and_height();
104
105 Ok((best_tip, "sending committed block broadcast", chain_tip))
106 }
107 .in_current_span();
108
109 // TODO: Move this logic for selecting the first ready future and updating `chain_state` to its own method.
110 let (((hash, height), log_msg, updated_chain_state), is_block_submission) =
111 if let Some(mined_block_receiver) = mined_block_receiver.as_mut() {
112 tokio::select! {
113 tip_change_close_to_network_tip = tip_change_close_to_network_tip_fut => {
114 (tip_change_close_to_network_tip?, false)
115 },
116
117 Some(tip_change) = mined_block_receiver.recv() => {
118 ((tip_change, "sending mined block broadcast", chain_state), true)
119 }
120 }
121 } else {
122 (tip_change_close_to_network_tip_fut.await?, false)
123 };
124
125 chain_state = updated_chain_state;
126
127 // TODO: Move logic for calling the peer set to its own method.
128
129 // block broadcasts inform other nodes about new blocks,
130 // so our internal Grow or Reset state doesn't matter to them
131 let request = if is_block_submission {
132 zn::Request::AdvertiseBlockToAll(hash)
133 } else {
134 zn::Request::AdvertiseBlock(hash)
135 };
136
137 info!(?height, ?request, log_msg);
138 let broadcast_fut = broadcast_network
139 .ready()
140 .await
141 .map_err(PeerSetReadiness)?
142 .call(request);
143
144 // Await the broadcast future in a spawned task to avoid waiting on
145 // `AdvertiseBlockToAll` requests when there are unready peers.
146 // Broadcast requests don't return errors, and we'd just want to ignore them anyway.
147 tokio::spawn(broadcast_fut);
148
149 // TODO: Move this logic for marking the last change hash as seen to its own method.
150
151 // Mark the last change hash of `chain_state` as the last block submission hash to avoid
152 // advertising a block hash to some peers twice.
153 if is_block_submission
154 && mined_block_receiver
155 .as_ref()
156 .is_some_and(|rx| rx.is_empty())
157 && chain_state.latest_chain_tip().best_tip_hash() == Some(hash)
158 {
159 chain_state.mark_last_change_hash(hash);
160 }
161 }
162}