Skip to main content

zebrad/components/inbound/
downloads.rs

1//! A download stream that handles gossiped blocks from peers.
2
3use std::{
4    collections::{HashMap, HashSet},
5    net::IpAddr,
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10use futures::{
11    future::TryFutureExt,
12    ready,
13    stream::{FuturesUnordered, Stream},
14};
15use pin_project::pin_project;
16use tokio::{sync::oneshot, task::JoinHandle};
17use tower::{Service, ServiceExt};
18use tracing_futures::Instrument;
19
20use zebra_chain::{
21    block::{self, HeightDiff},
22    chain_tip::ChainTip,
23};
24use zebra_network::{self as zn, PeerSocketAddr};
25use zebra_state as zs;
26
27use crate::components::sync::MIN_CONCURRENCY_LIMIT;
28
29type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
30
31/// The maximum number of concurrent inbound download and verify tasks.
32/// Also used as the maximum lookahead limit, before block verification.
33///
34/// We expect the syncer to download and verify checkpoints, so this bound
35/// can be small.
36///
37/// ## Security
38///
39/// The maximum block size is 2 million bytes. A deserialized malicious
40/// block with ~225_000 transparent outputs can take up 9MB of RAM.
41/// The total queue bound is `MAX_INBOUND_CONCURRENCY * 9 MB`. Each peer IP
42/// is limited to one in-flight download (9 MB) by the per-IP cap enforced
43/// in [`Downloads::download_and_verify`], so a sybil or IPv6-range attacker
44/// still needs many distinct source IPs to approach the total bound.
45/// (See #1880 for more details.)
46///
47/// Malicious blocks will eventually timeout or fail contextual validation.
48/// Once validation fails, the block is dropped, and its memory is deallocated.
49pub const MAX_INBOUND_CONCURRENCY: usize = 200;
50
51/// The action taken in response to a peer's gossiped block hash.
52pub enum DownloadAction {
53    /// The block hash was successfully queued for download and verification.
54    AddedToQueue,
55
56    /// The block hash is already queued, so this request was ignored.
57    ///
58    /// Another peer has already gossiped the same hash to us.
59    AlreadyQueued,
60
61    /// The queue is at capacity, so this request was ignored.
62    ///
63    /// The sync service should discover this block later, when we are closer
64    /// to the tip. The queue's capacity is [`Downloads::full_verify_concurrency_limit`].
65    FullQueue,
66
67    /// The advertising peer's IP already has an in-flight download, so
68    /// this request was ignored. Zcash's post-Blossom target block spacing
69    /// is 75 seconds, so honest peers rarely gossip more than one block
70    /// before the first is verified; during reorgs or recovery the same
71    /// hash also arrives from other peers or via the syncer.
72    TooManyFromPeer,
73}
74
75/// Manages download and verification of blocks gossiped to this peer.
76#[pin_project]
77#[derive(Debug)]
78pub struct Downloads<ZN, ZV, ZS>
79where
80    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
81    ZN::Future: Send,
82    ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
83        + Send
84        + Clone
85        + 'static,
86    ZV::Future: Send,
87    ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
88    ZS::Future: Send,
89{
90    // Configuration
91    //
92    /// The configured full verification concurrency limit, after applying the minimum limit.
93    full_verify_concurrency_limit: usize,
94
95    // Services
96    //
97    /// A service that forwards requests to connected peers, and returns their
98    /// responses.
99    network: ZN,
100
101    /// A service that verifies downloaded blocks.
102    verifier: ZV,
103
104    /// A service that manages cached blockchain state.
105    state: ZS,
106
107    /// Allows efficient access to the best tip of the blockchain.
108    latest_chain_tip: zs::LatestChainTip,
109
110    // Internal downloads state
111    //
112    /// A list of pending block download and verify tasks.
113    #[pin]
114    pending: FuturesUnordered<
115        JoinHandle<Result<block::Hash, (BoxError, block::Hash, Option<PeerSocketAddr>)>>,
116    >,
117
118    /// Cancellation handles for tasks in [`Self::pending`], keyed by block
119    /// hash. The `Option<IpAddr>` is the advertiser IP recorded in
120    /// [`Self::in_flight_ips`], so completion can remove it by hash lookup.
121    cancel_handles: HashMap<block::Hash, (oneshot::Sender<()>, Option<IpAddr>)>,
122
123    /// Advertiser IPs with an in-flight download and verify task.
124    ///
125    /// Invariant: an IP is present iff some entry in [`Self::cancel_handles`]
126    /// has value `(_, Some(ip))`. Enforces the one-download-per-IP cap.
127    ///
128    /// Size-bounded by `full_verify_concurrency_limit` (≤ [`MAX_INBOUND_CONCURRENCY`]),
129    /// inherited from the [`DownloadAction::FullQueue`] check on
130    /// [`Self::pending`].
131    in_flight_ips: HashSet<IpAddr>,
132}
133
134impl<ZN, ZV, ZS> Stream for Downloads<ZN, ZV, ZS>
135where
136    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
137    ZN::Future: Send,
138    ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
139        + Send
140        + Clone
141        + 'static,
142    ZV::Future: Send,
143    ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
144    ZS::Future: Send,
145{
146    type Item = Result<block::Hash, (BoxError, Option<PeerSocketAddr>)>;
147
148    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
149        let this = self.project();
150        // CORRECTNESS
151        //
152        // The current task must be scheduled for wakeup every time we return
153        // `Poll::Pending`.
154        //
155        // If no download and verify tasks have exited since the last poll, this
156        // task is scheduled for wakeup when the next task becomes ready.
157        //
158        // TODO: this would be cleaner with poll_map (#2693)
159        if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
160            let (result, hash) =
161                match join_result.expect("block download and verify tasks must not panic") {
162                    Ok(hash) => (Ok(hash), hash),
163                    Err((e, hash, advertiser_addr)) => (Err((e, advertiser_addr)), hash),
164                };
165            if let Some((_, Some(ip))) = this.cancel_handles.remove(&hash) {
166                assert!(
167                    this.in_flight_ips.remove(&ip),
168                    "every tracked IP was inserted when its download was queued",
169                );
170            }
171            Poll::Ready(Some(result))
172        } else {
173            Poll::Ready(None)
174        }
175    }
176
177    fn size_hint(&self) -> (usize, Option<usize>) {
178        self.pending.size_hint()
179    }
180}
181
182impl<ZN, ZV, ZS> Downloads<ZN, ZV, ZS>
183where
184    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
185    ZN::Future: Send,
186    ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
187        + Send
188        + Clone
189        + 'static,
190    ZV::Future: Send,
191    ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
192    ZS::Future: Send,
193{
194    /// Initialize a new download stream with the provided `network`, `verifier`, and `state` services.
195    /// The `latest_chain_tip` must be linked to the provided `state` service.
196    ///
197    /// The [`Downloads`] stream is agnostic to the network policy, so retry and
198    /// timeout limits should be applied to the `network` service passed into
199    /// this constructor.
200    pub fn new(
201        full_verify_concurrency_limit: usize,
202        network: ZN,
203        verifier: ZV,
204        state: ZS,
205        latest_chain_tip: zs::LatestChainTip,
206    ) -> Self {
207        // The syncer already warns about the minimum.
208        let full_verify_concurrency_limit =
209            full_verify_concurrency_limit.clamp(MIN_CONCURRENCY_LIMIT, MAX_INBOUND_CONCURRENCY);
210
211        Self {
212            full_verify_concurrency_limit,
213            network,
214            verifier,
215            state,
216            latest_chain_tip,
217            pending: FuturesUnordered::new(),
218            cancel_handles: HashMap::new(),
219            in_flight_ips: HashSet::new(),
220        }
221    }
222
223    /// Queue a block for download and verification.
224    ///
225    /// When `advertiser` is `Some`, its IP is tracked in
226    /// [`Self::in_flight_ips`] and used to enforce the one-download-per-IP
227    /// cap; `None` bypasses per-IP accounting (for example when Zebra
228    /// triggers the download internally).
229    #[instrument(skip(self, hash), fields(hash = %hash))]
230    pub fn download_and_verify(
231        &mut self,
232        hash: block::Hash,
233        advertiser: Option<PeerSocketAddr>,
234    ) -> DownloadAction {
235        if self.cancel_handles.contains_key(&hash) {
236            debug!(
237                ?hash,
238                queue_len = self.pending.len(),
239                concurrency_limit = self.full_verify_concurrency_limit,
240                "block hash already queued for inbound download: ignored block",
241            );
242
243            metrics::gauge!("gossip.queued.block.count").set(self.pending.len() as f64);
244            metrics::counter!("gossip.already.queued.dropped.block.hash.count").increment(1);
245
246            return DownloadAction::AlreadyQueued;
247        }
248
249        if self.pending.len() >= self.full_verify_concurrency_limit {
250            debug!(
251                ?hash,
252                queue_len = self.pending.len(),
253                concurrency_limit = self.full_verify_concurrency_limit,
254                "too many blocks queued for inbound download: ignored block",
255            );
256
257            metrics::gauge!("gossip.queued.block.count").set(self.pending.len() as f64);
258            metrics::counter!("gossip.full.queue.dropped.block.hash.count").increment(1);
259
260            return DownloadAction::FullQueue;
261        }
262
263        let advertiser_ip = advertiser.map(|addr| addr.ip());
264        if let Some(ip) = advertiser_ip {
265            if self.in_flight_ips.contains(&ip) {
266                debug!(
267                    ?hash,
268                    ?advertiser,
269                    "already have an in-flight inbound download from peer IP: ignored block",
270                );
271
272                metrics::counter!("gossip.peer.limit.dropped.block.hash.count").increment(1);
273
274                return DownloadAction::TooManyFromPeer;
275            }
276        }
277
278        // This oneshot is used to signal cancellation to the download task.
279        let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
280
281        let state = self.state.clone();
282        let network = self.network.clone();
283        let verifier = self.verifier.clone();
284        let latest_chain_tip = self.latest_chain_tip.clone();
285        let full_verify_concurrency_limit = self.full_verify_concurrency_limit;
286
287        let fut = async move {
288            // Check if the block is already in the state.
289            match state.oneshot(zs::Request::KnownBlock(hash)).await {
290                Ok(zs::Response::KnownBlock(None)) => Ok(()),
291                Ok(zs::Response::KnownBlock(Some(_))) => Err("already present".into()),
292                Ok(_) => unreachable!("wrong response"),
293                Err(e) => Err(e),
294            }
295            .map_err(|e| (e, None))?;
296
297            let (block, advertiser_addr) = if let zn::Response::Blocks(blocks) = network
298                .oneshot(zn::Request::BlocksByHash(std::iter::once(hash).collect()))
299                .await
300                .map_err(|e| (e, None))?
301            {
302                assert_eq!(
303                    blocks.len(),
304                    1,
305                    "wrong number of blocks in response to a single hash",
306                );
307
308                blocks
309                    .first()
310                    .expect("just checked length")
311                    .available()
312                    .expect(
313                        "unexpected missing block status: single block failures should be errors",
314                    )
315            } else {
316                unreachable!("wrong response to block request");
317            };
318            metrics::counter!("gossip.downloaded.block.count").increment(1);
319
320            // # Security & Performance
321            //
322            // Reject blocks that are too far ahead of our tip,
323            // and blocks that are behind the finalized tip.
324            //
325            // Avoids denial of service attacks. Also reduces wasted work on high blocks
326            // that will timeout before being verified, and low blocks that can never be finalized.
327            let tip_height = latest_chain_tip.best_tip_height();
328
329            let max_lookahead_height = if let Some(tip_height) = tip_height {
330                let lookahead = HeightDiff::try_from(full_verify_concurrency_limit)
331                    .expect("fits in HeightDiff");
332                (tip_height + lookahead).expect("tip is much lower than Height::MAX")
333            } else {
334                let genesis_lookahead =
335                    u32::try_from(full_verify_concurrency_limit - 1).expect("fits in u32");
336                block::Height(genesis_lookahead)
337            };
338
339            // Get the finalized tip height, assuming we're using the non-finalized state.
340            //
341            // It doesn't matter if we're a few blocks off here, because blocks this low
342            // are part of a fork with much less work. So they would be rejected anyway.
343            //
344            // And if we're still checkpointing, the checkpointer will reject blocks behind
345            // the finalized tip anyway.
346            //
347            // TODO: get the actual finalized tip height
348            let min_accepted_height = tip_height
349                .map(|tip_height| {
350                    block::Height(tip_height.0.saturating_sub(zs::MAX_BLOCK_REORG_HEIGHT))
351                })
352                .unwrap_or(block::Height(0));
353
354            let block_height = block
355                .coinbase_height()
356                .ok_or_else(|| {
357                    debug!(
358                        ?hash,
359                        "gossiped block with no height: dropped downloaded block"
360                    );
361                    metrics::counter!("gossip.no.height.dropped.block.count").increment(1);
362
363                    BoxError::from("gossiped block with no height")
364                })
365                .map_err(|e| (e, None))?;
366
367            if block_height > max_lookahead_height {
368                debug!(
369                    ?hash,
370                    ?block_height,
371                    ?tip_height,
372                    ?max_lookahead_height,
373                    lookahead_limit = full_verify_concurrency_limit,
374                    "gossiped block height too far ahead of the tip: dropped downloaded block",
375                );
376                metrics::counter!("gossip.max.height.limit.dropped.block.count").increment(1);
377
378                Err("gossiped block height too far ahead").map_err(|e| (e.into(), None))?;
379            } else if block_height < min_accepted_height {
380                debug!(
381                    ?hash,
382                    ?block_height,
383                    ?tip_height,
384                    ?min_accepted_height,
385                    behind_tip_limit = ?zs::MAX_BLOCK_REORG_HEIGHT,
386                    "gossiped block height behind the finalized tip: dropped downloaded block",
387                );
388                metrics::counter!("gossip.min.height.limit.dropped.block.count").increment(1);
389
390                Err("gossiped block height behind the finalized tip")
391                    .map_err(|e| (e.into(), None))?;
392            }
393
394            verifier
395                .oneshot(zebra_consensus::Request::Commit(block))
396                .await
397                .map(|hash| (hash, block_height))
398                .map_err(|e| (e, advertiser_addr))
399        }
400        .map_ok(|(hash, height)| {
401            info!(?height, "downloaded and verified gossiped block");
402            metrics::counter!("gossip.verified.block.count").increment(1);
403            hash
404        })
405        // Tack the hash onto the error so poll_next can look up the cancel
406        // handle and advertising IP on failure as well as success.
407        .map_err(move |(e, advertiser_addr)| (e, hash, advertiser_addr))
408        .in_current_span();
409
410        let task = tokio::spawn(async move {
411            // Prefer the cancel handle if both are ready.
412            tokio::select! {
413                biased;
414                _ = &mut cancel_rx => {
415                    trace!("task cancelled prior to completion");
416                    metrics::counter!("gossip.cancelled.count").increment(1);
417                    Err(("canceled".into(), hash, None))
418                }
419                verification = fut => verification,
420            }
421        });
422
423        self.pending.push(task);
424        assert!(
425            self.cancel_handles
426                .insert(hash, (cancel_tx, advertiser_ip))
427                .is_none(),
428            "blocks are only queued once"
429        );
430        if let Some(ip) = advertiser_ip {
431            assert!(
432                self.in_flight_ips.insert(ip),
433                "the per-IP cap check above rejects any IP already in flight",
434            );
435        }
436
437        debug!(
438            ?hash,
439            queue_len = self.pending.len(),
440            concurrency_limit = self.full_verify_concurrency_limit,
441            "queued hash for download",
442        );
443        metrics::gauge!("gossip.queued.block.count").set(self.pending.len() as f64);
444
445        DownloadAction::AddedToQueue
446    }
447}