Skip to main content

zebrad/components/sync/
downloads.rs

1//! A download stream for Zebra's block syncer.
2
3use std::{
4    collections::HashMap,
5    convert,
6    pin::Pin,
7    sync::Arc,
8    task::{Context, Poll},
9};
10
11use futures::{
12    future::{FutureExt, TryFutureExt},
13    ready,
14    stream::{FuturesUnordered, Stream},
15};
16use pin_project::pin_project;
17use thiserror::Error;
18use tokio::{
19    sync::{oneshot, watch},
20    task::JoinHandle,
21    time::timeout,
22};
23use tower::{hedge, Service, ServiceExt};
24use tracing_futures::Instrument;
25
26use zebra_chain::{
27    block::{self, Height, HeightDiff},
28    chain_tip::ChainTip,
29};
30use zebra_network::{self as zn, PeerSocketAddr};
31use zebra_state as zs;
32
33use crate::components::sync::{
34    FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT, FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT_LIMIT,
35};
36
37type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
38
39/// A multiplier used to calculate the extra number of blocks we allow in the
40/// verifier, state, and block commit pipelines, on top of the lookahead limit.
41///
42/// The extra number of blocks is calculated using
43/// `lookahead_limit * VERIFICATION_PIPELINE_SCALING_MULTIPLIER`.
44///
45/// This allows the verifier and state queues, and the block commit channel,
46/// to hold a few extra tips responses worth of blocks,
47/// even if the syncer queue is full. Any unused capacity is shared between both queues.
48///
49/// If this capacity is exceeded, the downloader will tell the syncer to pause new downloads.
50///
51/// Since the syncer queue is limited to the `lookahead_limit`,
52/// the rest of the capacity is reserved for the other queues.
53/// There is no reserved capacity for the syncer queue:
54/// if the other queues stay full, the syncer will eventually time out and reset.
55pub const VERIFICATION_PIPELINE_SCALING_MULTIPLIER: usize = 2;
56
57/// The maximum height difference between Zebra's state tip and a downloaded block.
58/// Blocks higher than this will get dropped and return an error.
59pub const VERIFICATION_PIPELINE_DROP_LIMIT: HeightDiff = 50_000;
60
61#[derive(Copy, Clone, Debug)]
62pub(super) struct AlwaysHedge;
63
64impl<Request: Clone> hedge::Policy<Request> for AlwaysHedge {
65    fn can_retry(&self, _req: &Request) -> bool {
66        true
67    }
68    fn clone_request(&self, req: &Request) -> Option<Request> {
69        Some(req.clone())
70    }
71}
72
73/// Errors that can occur while downloading and verifying a block.
74#[derive(Error, Debug)]
75#[allow(dead_code)]
76pub enum BlockDownloadVerifyError {
77    #[error("permanent readiness error from the network service: {error:?}")]
78    NetworkServiceError {
79        #[source]
80        error: BoxError,
81    },
82
83    #[error("permanent readiness error from the verifier service: {error:?}")]
84    VerifierServiceError {
85        #[source]
86        error: BoxError,
87    },
88
89    #[error("duplicate block hash queued for download: {hash:?}")]
90    DuplicateBlockQueuedForDownload { hash: block::Hash },
91
92    #[error("error downloading block: {error:?} {hash:?}")]
93    DownloadFailed {
94        #[source]
95        error: BoxError,
96        hash: block::Hash,
97    },
98
99    /// A downloaded block was a long way ahead of the state chain tip.
100    /// This error should be very rare during normal operation.
101    ///
102    /// We need to reset the syncer on this error, to allow the verifier and state to catch up,
103    /// or prevent it following a bad chain.
104    ///
105    /// If we don't reset the syncer on this error, it will continue downloading blocks from a bad
106    /// chain, or blocks far ahead of the current state tip.
107    #[error("downloaded block was too far ahead of the chain tip: {height:?} {hash:?}")]
108    AboveLookaheadHeightLimit {
109        height: block::Height,
110        hash: block::Hash,
111        advertiser_addr: Option<PeerSocketAddr>,
112    },
113
114    #[error("downloaded block was too far behind the chain tip: {height:?} {hash:?}")]
115    BehindTipHeightLimit {
116        height: block::Height,
117        hash: block::Hash,
118    },
119
120    #[error("downloaded block had an invalid height: {hash:?}")]
121    InvalidHeight {
122        hash: block::Hash,
123        advertiser_addr: Option<PeerSocketAddr>,
124    },
125
126    #[error("block failed consensus validation: {error:?} {height:?} {hash:?}")]
127    Invalid {
128        #[source]
129        error: zebra_consensus::router::RouterError,
130        height: block::Height,
131        hash: block::Hash,
132        advertiser_addr: Option<PeerSocketAddr>,
133    },
134
135    #[error("block validation request failed: {error:?} {height:?} {hash:?}")]
136    ValidationRequestError {
137        #[source]
138        error: BoxError,
139        height: block::Height,
140        hash: block::Hash,
141    },
142
143    #[error("block download & verification was cancelled during download: {hash:?}")]
144    CancelledDuringDownload { hash: block::Hash },
145
146    #[error(
147        "block download & verification was cancelled while waiting for the verifier service: \
148         to become ready: {height:?} {hash:?}"
149    )]
150    CancelledAwaitingVerifierReadiness {
151        height: block::Height,
152        hash: block::Hash,
153    },
154
155    #[error(
156        "block download & verification was cancelled during verification: {height:?} {hash:?}"
157    )]
158    CancelledDuringVerification {
159        height: block::Height,
160        hash: block::Hash,
161    },
162
163    #[error(
164        "timeout during service readiness, download, verification, or internal downloader operation"
165    )]
166    Timeout,
167}
168
169impl From<tokio::time::error::Elapsed> for BlockDownloadVerifyError {
170    fn from(_value: tokio::time::error::Elapsed) -> Self {
171        BlockDownloadVerifyError::Timeout
172    }
173}
174
175/// Represents a [`Stream`] of download and verification tasks during chain sync.
176#[pin_project]
177#[derive(Debug)]
178pub struct Downloads<ZN, ZV, ZSTip>
179where
180    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Sync + 'static,
181    ZN::Future: Send,
182    ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
183        + Send
184        + Sync
185        + Clone
186        + 'static,
187    ZV::Future: Send,
188    ZSTip: ChainTip + Clone + Send + 'static,
189{
190    // Services
191    //
192    /// A service that forwards requests to connected peers, and returns their
193    /// responses.
194    network: ZN,
195
196    /// A service that verifies downloaded blocks.
197    verifier: ZV,
198
199    /// Allows efficient access to the best tip of the blockchain.
200    latest_chain_tip: ZSTip,
201
202    // Configuration
203    //
204    /// The configured lookahead limit, after applying the minimum limit.
205    lookahead_limit: usize,
206
207    /// The largest block height for the checkpoint verifier, based on the current config.
208    max_checkpoint_height: Height,
209
210    // Shared syncer state
211    //
212    /// Sender that is set to `true` when the downloader is past the lookahead limit.
213    /// This is based on the downloaded block height and the state tip height.
214    past_lookahead_limit_sender: Arc<std::sync::Mutex<watch::Sender<bool>>>,
215
216    /// Receiver for `past_lookahead_limit_sender`, which is used to avoid accessing the mutex.
217    past_lookahead_limit_receiver: zs::WatchReceiver<bool>,
218
219    // Internal downloads state
220    //
221    /// A list of pending block download and verify tasks.
222    #[pin]
223    pending: FuturesUnordered<
224        JoinHandle<Result<(Height, block::Hash), (BlockDownloadVerifyError, block::Hash)>>,
225    >,
226
227    /// A list of channels that can be used to cancel pending block download and
228    /// verify tasks.
229    cancel_handles: HashMap<block::Hash, oneshot::Sender<()>>,
230}
231
232impl<ZN, ZV, ZSTip> Stream for Downloads<ZN, ZV, ZSTip>
233where
234    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Sync + 'static,
235    ZN::Future: Send,
236    ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
237        + Send
238        + Sync
239        + Clone
240        + 'static,
241    ZV::Future: Send,
242    ZSTip: ChainTip + Clone + Send + 'static,
243{
244    type Item = Result<(Height, block::Hash), BlockDownloadVerifyError>;
245
246    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
247        let this = self.project();
248        // CORRECTNESS
249        //
250        // The current task must be scheduled for wakeup every time we return
251        // `Poll::Pending`.
252        //
253        // If no download and verify tasks have exited since the last poll, this
254        // task is scheduled for wakeup when the next task becomes ready.
255        //
256        // TODO: this would be cleaner with poll_map (#2693)
257        if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
258            match join_result.expect("block download and verify tasks must not panic") {
259                Ok((height, hash)) => {
260                    this.cancel_handles.remove(&hash);
261
262                    Poll::Ready(Some(Ok((height, hash))))
263                }
264                Err((e, hash)) => {
265                    this.cancel_handles.remove(&hash);
266                    Poll::Ready(Some(Err(e)))
267                }
268            }
269        } else {
270            Poll::Ready(None)
271        }
272    }
273
274    fn size_hint(&self) -> (usize, Option<usize>) {
275        self.pending.size_hint()
276    }
277}
278
279impl<ZN, ZV, ZSTip> Downloads<ZN, ZV, ZSTip>
280where
281    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Sync + 'static,
282    ZN::Future: Send,
283    ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
284        + Send
285        + Sync
286        + Clone
287        + 'static,
288    ZV::Future: Send,
289    ZSTip: ChainTip + Clone + Send + 'static,
290{
291    /// Initialize a new download stream with the provided `network` and
292    /// `verifier` services.
293    ///
294    /// Uses the `latest_chain_tip` and `lookahead_limit` to drop blocks
295    /// that are too far ahead of the current state tip.
296    /// Uses `max_checkpoint_height` to work around a known block timeout (#5125).
297    ///
298    /// The [`Downloads`] stream is agnostic to the network policy, so retry and
299    /// timeout limits should be applied to the `network` service passed into
300    /// this constructor.
301    pub fn new(
302        network: ZN,
303        verifier: ZV,
304        latest_chain_tip: ZSTip,
305        past_lookahead_limit_sender: watch::Sender<bool>,
306        lookahead_limit: usize,
307        max_checkpoint_height: Height,
308    ) -> Self {
309        let past_lookahead_limit_receiver =
310            zs::WatchReceiver::new(past_lookahead_limit_sender.subscribe());
311
312        Self {
313            network,
314            verifier,
315            latest_chain_tip,
316            lookahead_limit,
317            max_checkpoint_height,
318            past_lookahead_limit_sender: Arc::new(std::sync::Mutex::new(
319                past_lookahead_limit_sender,
320            )),
321            past_lookahead_limit_receiver,
322            pending: FuturesUnordered::new(),
323            cancel_handles: HashMap::new(),
324        }
325    }
326
327    /// Queue a block for download and verification.
328    ///
329    /// This method waits for the network to become ready, and returns an error
330    /// only if the network service fails. It returns immediately after queuing
331    /// the request.
332    #[instrument(level = "debug", skip(self), fields(%hash))]
333    pub async fn download_and_verify(
334        &mut self,
335        hash: block::Hash,
336    ) -> Result<(), BlockDownloadVerifyError> {
337        if self.cancel_handles.contains_key(&hash) {
338            metrics::counter!("sync.already.queued.dropped.block.hash.count").increment(1);
339            return Err(BlockDownloadVerifyError::DuplicateBlockQueuedForDownload { hash });
340        }
341
342        // We construct the block requests sequentially, waiting for the peer
343        // set to be ready to process each request. This ensures that we start
344        // block downloads in the order we want them (though they may resolve
345        // out of order), and it means that we respect backpressure. Otherwise,
346        // if we waited for readiness and did the service call in the spawned
347        // tasks, all of the spawned tasks would race each other waiting for the
348        // network to become ready.
349        let block_req = self
350            .network
351            .ready()
352            .await
353            .map_err(|error| BlockDownloadVerifyError::NetworkServiceError { error })?
354            .call(zn::Request::BlocksByHash(std::iter::once(hash).collect()));
355
356        // This oneshot is used to signal cancellation to the download task.
357        let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
358
359        let mut verifier = self.verifier.clone();
360        let latest_chain_tip = self.latest_chain_tip.clone();
361
362        let lookahead_limit = self.lookahead_limit;
363        let max_checkpoint_height = self.max_checkpoint_height;
364
365        let past_lookahead_limit_sender = self.past_lookahead_limit_sender.clone();
366        let past_lookahead_limit_receiver = self.past_lookahead_limit_receiver.clone();
367
368        let task = tokio::spawn(
369            async move {
370                // Download the block.
371                // Prefer the cancel handle if both are ready.
372                let download_start = std::time::Instant::now();
373                let rsp = tokio::select! {
374                    biased;
375                    _ = &mut cancel_rx => {
376                        trace!("task cancelled prior to download completion");
377                        metrics::counter!("sync.cancelled.download.count").increment(1);
378                        metrics::histogram!("sync.block.download.duration_seconds", "result" => "cancelled")
379                            .record(download_start.elapsed().as_secs_f64());
380                        return Err(BlockDownloadVerifyError::CancelledDuringDownload { hash })
381                    }
382                    rsp = block_req => rsp.map_err(|error| BlockDownloadVerifyError::DownloadFailed { error, hash})?,
383                };
384
385                let (block, advertiser_addr) = if let zn::Response::Blocks(blocks) = rsp {
386                    assert_eq!(
387                        blocks.len(),
388                        1,
389                        "wrong number of blocks in response to a single hash"
390                    );
391
392                    blocks
393                        .first()
394                        .expect("just checked length")
395                        .available()
396                        .expect("unexpected missing block status: single block failures should be errors")
397                } else {
398                    unreachable!("wrong response to block request");
399                };
400                metrics::counter!("sync.downloaded.block.count").increment(1);
401                metrics::histogram!("sync.block.download.duration_seconds", "result" => "success")
402                    .record(download_start.elapsed().as_secs_f64());
403
404                // Security & Performance: reject blocks that are too far ahead of our tip.
405                // Avoids denial of service attacks, and reduces wasted work on high blocks
406                // that will timeout before being verified.
407                let tip_height = latest_chain_tip.best_tip_height();
408
409                let (lookahead_drop_height, lookahead_pause_height, lookahead_reset_height) = if let Some(tip_height) = tip_height {
410                    // Scale the height limit with the lookahead limit,
411                    // so users with low capacity or under DoS can reduce them both.
412                    let lookahead_pause = HeightDiff::try_from(
413                        lookahead_limit + lookahead_limit * VERIFICATION_PIPELINE_SCALING_MULTIPLIER,
414                    )
415                        .expect("fits in HeightDiff");
416
417
418                    ((tip_height + VERIFICATION_PIPELINE_DROP_LIMIT).expect("tip is much lower than Height::MAX"),
419                     (tip_height + lookahead_pause).expect("tip is much lower than Height::MAX"),
420                     (tip_height + lookahead_pause/2).expect("tip is much lower than Height::MAX"))
421                } else {
422                    let genesis_drop = VERIFICATION_PIPELINE_DROP_LIMIT.try_into().expect("fits in u32");
423                    let genesis_lookahead =
424                        u32::try_from(lookahead_limit - 1).expect("fits in u32");
425
426                    (block::Height(genesis_drop),
427                     block::Height(genesis_lookahead),
428                     block::Height(genesis_lookahead/2))
429                };
430
431                // Get the finalized tip height, assuming we're using the non-finalized state.
432                //
433                // It doesn't matter if we're a few blocks off here, because blocks this low
434                // are part of a fork with much less work. So they would be rejected anyway.
435                //
436                // And if we're still checkpointing, the checkpointer will reject blocks behind
437                // the finalized tip anyway.
438                //
439                // TODO: get the actual finalized tip height
440                let min_accepted_height = tip_height
441                    .map(|tip_height| {
442                        block::Height(tip_height.0.saturating_sub(zs::MAX_BLOCK_REORG_HEIGHT))
443                    })
444                    .unwrap_or(block::Height(0));
445
446                let block_height = if let Some(block_height) = block.coinbase_height() {
447                    block_height
448                } else {
449                    debug!(
450                        ?hash,
451                        "synced block with no height: dropped downloaded block"
452                    );
453                    metrics::counter!("sync.no.height.dropped.block.count").increment(1);
454
455                    return Err(BlockDownloadVerifyError::InvalidHeight { hash, advertiser_addr });
456                };
457
458                if block_height > lookahead_drop_height {
459                    Err(BlockDownloadVerifyError::AboveLookaheadHeightLimit { height: block_height, hash, advertiser_addr })?;
460                } else if block_height > lookahead_pause_height {
461                    // This log can be very verbose, usually hundreds of blocks are dropped.
462                    // So we only log at info level for the first above-height block.
463                    if !past_lookahead_limit_receiver.cloned_watch_data() {
464                        info!(
465                            ?hash,
466                            ?block_height,
467                            ?tip_height,
468                            ?lookahead_pause_height,
469                            ?lookahead_reset_height,
470                            lookahead_limit = ?lookahead_limit,
471                            "synced block height too far ahead of the tip: \
472                             waiting for downloaded blocks to commit to the state",
473                        );
474
475                        // Set the watched value to true, since we're over the limit.
476                        //
477                        // It is ok to block here, because we're going to pause new downloads anyway.
478                        // But if Zebra is shutting down, ignore the send error.
479                        let _ = past_lookahead_limit_sender.lock().expect("thread panicked while holding the past_lookahead_limit_sender mutex guard").send(true);
480                    } else {
481                        debug!(
482                            ?hash,
483                            ?block_height,
484                            ?tip_height,
485                            ?lookahead_pause_height,
486                            ?lookahead_reset_height,
487                            lookahead_limit = ?lookahead_limit,
488                            "synced block height too far ahead of the tip: \
489                             waiting for downloaded blocks to commit to the state",
490                        );
491                    }
492
493                    metrics::counter!("sync.max.height.limit.paused.count").increment(1);
494                } else if block_height <= lookahead_reset_height && past_lookahead_limit_receiver.cloned_watch_data() {
495                    // Reset the watched value to false, since we're well under the limit.
496                    // We need to block here, because if we don't the syncer can hang.
497
498                    // But if Zebra is shutting down, ignore the send error.
499                    let _ = past_lookahead_limit_sender.lock().expect("thread panicked while holding the past_lookahead_limit_sender mutex guard").send(false);
500                    metrics::counter!("sync.max.height.limit.reset.count").increment(1);
501
502                    metrics::counter!("sync.max.height.limit.reset.attempt.count").increment(1);
503                }
504
505                if block_height < min_accepted_height {
506                    debug!(
507                        ?hash,
508                        ?block_height,
509                        ?tip_height,
510                        ?min_accepted_height,
511                        behind_tip_limit = ?zs::MAX_BLOCK_REORG_HEIGHT,
512                        "synced block height behind the finalized tip: dropped downloaded block"
513                    );
514                    metrics::counter!("gossip.min.height.limit.dropped.block.count").increment(1);
515
516                    Err(BlockDownloadVerifyError::BehindTipHeightLimit { height: block_height, hash })?;
517                }
518
519                // Wait for the verifier service to be ready.
520                let readiness = verifier.ready();
521                // Prefer the cancel handle if both are ready.
522                let verifier = tokio::select! {
523                    biased;
524                    _ = &mut cancel_rx => {
525                        trace!("task cancelled waiting for verifier service readiness");
526                        metrics::counter!("sync.cancelled.verify.ready.count").increment(1);
527                        return Err(BlockDownloadVerifyError::CancelledAwaitingVerifierReadiness { height: block_height, hash })
528                    }
529                    verifier = readiness => verifier,
530                };
531
532                // Verify the block.
533                let verify_start = std::time::Instant::now();
534                let mut rsp = verifier
535                    .map_err(|error| BlockDownloadVerifyError::VerifierServiceError { error })?
536                    .call(zebra_consensus::Request::Commit(block)).boxed();
537
538                // Add a shorter timeout to workaround a known bug (#5125)
539                let short_timeout_max = (max_checkpoint_height + FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT_LIMIT).expect("checkpoint block height is in valid range");
540                if block_height >= max_checkpoint_height && block_height <= short_timeout_max {
541                    rsp = timeout(FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT, rsp)
542                        .map_err(|timeout| format!("initial fully verified block timed out: retrying: {timeout:?}").into())
543                        .map(|nested_result| nested_result.and_then(convert::identity)).boxed();
544                }
545
546                let verification = tokio::select! {
547                    biased;
548                    _ = &mut cancel_rx => {
549                        trace!("task cancelled prior to verification");
550                        metrics::counter!("sync.cancelled.verify.count").increment(1);
551                        metrics::histogram!("sync.block.verify.duration_seconds", "result" => "cancelled")
552                            .record(verify_start.elapsed().as_secs_f64());
553                        return Err(BlockDownloadVerifyError::CancelledDuringVerification { height: block_height, hash })
554                    }
555                    verification = rsp => verification,
556                };
557
558                let verify_result = if verification.is_ok() { "success" } else { "failure" };
559                metrics::histogram!("sync.block.verify.duration_seconds", "result" => verify_result)
560                    .record(verify_start.elapsed().as_secs_f64());
561
562                if verification.is_ok() {
563                    metrics::counter!("sync.verified.block.count").increment(1);
564                }
565
566                verification
567                    .map(|hash| (block_height, hash))
568                    .map_err(|err| {
569                        match err.downcast::<zebra_consensus::router::RouterError>() {
570                            Ok(error) => BlockDownloadVerifyError::Invalid { error: *error, height: block_height, hash, advertiser_addr },
571                            Err(error) => BlockDownloadVerifyError::ValidationRequestError { error, height: block_height, hash },
572                        }
573                    })
574            }
575            .in_current_span()
576            // Tack the hash onto the error so we can remove the cancel handle
577            // on failure as well as on success.
578            .map_err(move |e| (e, hash)),
579        );
580
581        // Try to start the spawned task before queueing the next block request
582        tokio::task::yield_now().await;
583
584        self.pending.push(task);
585        assert!(
586            self.cancel_handles.insert(hash, cancel_tx).is_none(),
587            "blocks are only queued once"
588        );
589
590        Ok(())
591    }
592
593    /// Cancel all running tasks and reset the downloader state.
594    pub fn cancel_all(&mut self) {
595        // Replace the pending task list with an empty one and drop it.
596        let _ = std::mem::take(&mut self.pending);
597
598        // Signal cancellation to all running tasks.
599        // Since we already dropped the JoinHandles above, they should
600        // fail silently.
601        for (_hash, cancel) in self.cancel_handles.drain() {
602            let _ = cancel.send(());
603        }
604
605        assert!(self.pending.is_empty());
606        assert!(self.cancel_handles.is_empty());
607
608        // Set the lookahead limit to false, since we're empty (so we're under the limit).
609        //
610        // It is ok to block here, because we're doing a reset and sleep anyway.
611        // But if Zebra is shutting down, ignore the send error.
612        let _ = self
613            .past_lookahead_limit_sender
614            .lock()
615            .expect("thread panicked while holding the past_lookahead_limit_sender mutex guard")
616            .send(false);
617    }
618
619    /// Get the number of currently in-flight download and verify tasks.
620    pub fn in_flight(&mut self) -> usize {
621        self.pending.len()
622    }
623
624    /// Returns true if there are no in-flight download and verify tasks.
625    #[allow(dead_code)]
626    pub fn is_empty(&mut self) -> bool {
627        self.pending.is_empty()
628    }
629}