Skip to main content

zebrad/components/sync/
downloads.rs

1//! A download stream for Zebra's block syncer.
2
3use std::{
4    collections::HashMap,
5    convert,
6    pin::Pin,
7    sync::Arc,
8    task::{Context, Poll},
9};
10
11use futures::{
12    future::{FutureExt, TryFutureExt},
13    ready,
14    stream::{FuturesUnordered, Stream},
15};
16use pin_project::pin_project;
17use thiserror::Error;
18use tokio::{
19    sync::{oneshot, watch},
20    task::JoinHandle,
21    time::timeout,
22};
23use tower::{hedge, Service, ServiceExt};
24use tracing_futures::Instrument;
25
26use zebra_chain::{
27    block::{self, Height, HeightDiff},
28    chain_tip::ChainTip,
29};
30use zebra_network::{self as zn, PeerSocketAddr};
31use zebra_state as zs;
32
33use crate::components::sync::{
34    FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT, FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT_LIMIT,
35};
36
37type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
38
39/// A multiplier used to calculate the extra number of blocks we allow in the
40/// verifier, state, and block commit pipelines, on top of the lookahead limit.
41///
42/// The extra number of blocks is calculated using
43/// `lookahead_limit * VERIFICATION_PIPELINE_SCALING_MULTIPLIER`.
44///
45/// This allows the verifier and state queues, and the block commit channel,
46/// to hold a few extra tips responses worth of blocks,
47/// even if the syncer queue is full. Any unused capacity is shared between both queues.
48///
49/// If this capacity is exceeded, the downloader will tell the syncer to pause new downloads.
50///
51/// Since the syncer queue is limited to the `lookahead_limit`,
52/// the rest of the capacity is reserved for the other queues.
53/// There is no reserved capacity for the syncer queue:
54/// if the other queues stay full, the syncer will eventually time out and reset.
55pub const VERIFICATION_PIPELINE_SCALING_MULTIPLIER: usize = 2;
56
57/// The maximum height difference between Zebra's state tip and a downloaded block.
58/// Blocks higher than this will get dropped and return an error.
59pub const VERIFICATION_PIPELINE_DROP_LIMIT: HeightDiff = 50_000;
60
61#[derive(Copy, Clone, Debug)]
62pub(super) struct AlwaysHedge;
63
64impl<Request: Clone> hedge::Policy<Request> for AlwaysHedge {
65    fn can_retry(&self, _req: &Request) -> bool {
66        true
67    }
68    fn clone_request(&self, req: &Request) -> Option<Request> {
69        Some(req.clone())
70    }
71}
72
73/// Errors that can occur while downloading and verifying a block.
74#[derive(Error, Debug)]
75#[allow(dead_code)]
76pub enum BlockDownloadVerifyError {
77    #[error("permanent readiness error from the network service: {error:?}")]
78    NetworkServiceError {
79        #[source]
80        error: BoxError,
81    },
82
83    #[error("permanent readiness error from the verifier service: {error:?}")]
84    VerifierServiceError {
85        #[source]
86        error: BoxError,
87    },
88
89    #[error("duplicate block hash queued for download: {hash:?}")]
90    DuplicateBlockQueuedForDownload { hash: block::Hash },
91
92    #[error("error downloading block: {error:?} {hash:?}")]
93    DownloadFailed {
94        #[source]
95        error: BoxError,
96        hash: block::Hash,
97    },
98
99    /// A downloaded block was a long way ahead of the state chain tip.
100    /// This error should be very rare during normal operation.
101    ///
102    /// We need to reset the syncer on this error, to allow the verifier and state to catch up,
103    /// or prevent it following a bad chain.
104    ///
105    /// If we don't reset the syncer on this error, it will continue downloading blocks from a bad
106    /// chain, or blocks far ahead of the current state tip.
107    #[error("downloaded block was too far ahead of the chain tip: {height:?} {hash:?}")]
108    AboveLookaheadHeightLimit {
109        height: block::Height,
110        hash: block::Hash,
111    },
112
113    #[error("downloaded block was too far behind the chain tip: {height:?} {hash:?}")]
114    BehindTipHeightLimit {
115        height: block::Height,
116        hash: block::Hash,
117    },
118
119    #[error("downloaded block had an invalid height: {hash:?}")]
120    InvalidHeight { hash: block::Hash },
121
122    #[error("block failed consensus validation: {error:?} {height:?} {hash:?}")]
123    Invalid {
124        #[source]
125        error: zebra_consensus::router::RouterError,
126        height: block::Height,
127        hash: block::Hash,
128        advertiser_addr: Option<PeerSocketAddr>,
129    },
130
131    #[error("block validation request failed: {error:?} {height:?} {hash:?}")]
132    ValidationRequestError {
133        #[source]
134        error: BoxError,
135        height: block::Height,
136        hash: block::Hash,
137    },
138
139    #[error("block download & verification was cancelled during download: {hash:?}")]
140    CancelledDuringDownload { hash: block::Hash },
141
142    #[error(
143        "block download & verification was cancelled while waiting for the verifier service: \
144         to become ready: {height:?} {hash:?}"
145    )]
146    CancelledAwaitingVerifierReadiness {
147        height: block::Height,
148        hash: block::Hash,
149    },
150
151    #[error(
152        "block download & verification was cancelled during verification: {height:?} {hash:?}"
153    )]
154    CancelledDuringVerification {
155        height: block::Height,
156        hash: block::Hash,
157    },
158
159    #[error(
160        "timeout during service readiness, download, verification, or internal downloader operation"
161    )]
162    Timeout,
163}
164
165impl From<tokio::time::error::Elapsed> for BlockDownloadVerifyError {
166    fn from(_value: tokio::time::error::Elapsed) -> Self {
167        BlockDownloadVerifyError::Timeout
168    }
169}
170
171/// Represents a [`Stream`] of download and verification tasks during chain sync.
172#[pin_project]
173#[derive(Debug)]
174pub struct Downloads<ZN, ZV, ZSTip>
175where
176    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Sync + 'static,
177    ZN::Future: Send,
178    ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
179        + Send
180        + Sync
181        + Clone
182        + 'static,
183    ZV::Future: Send,
184    ZSTip: ChainTip + Clone + Send + 'static,
185{
186    // Services
187    //
188    /// A service that forwards requests to connected peers, and returns their
189    /// responses.
190    network: ZN,
191
192    /// A service that verifies downloaded blocks.
193    verifier: ZV,
194
195    /// Allows efficient access to the best tip of the blockchain.
196    latest_chain_tip: ZSTip,
197
198    // Configuration
199    //
200    /// The configured lookahead limit, after applying the minimum limit.
201    lookahead_limit: usize,
202
203    /// The largest block height for the checkpoint verifier, based on the current config.
204    max_checkpoint_height: Height,
205
206    // Shared syncer state
207    //
208    /// Sender that is set to `true` when the downloader is past the lookahead limit.
209    /// This is based on the downloaded block height and the state tip height.
210    past_lookahead_limit_sender: Arc<std::sync::Mutex<watch::Sender<bool>>>,
211
212    /// Receiver for `past_lookahead_limit_sender`, which is used to avoid accessing the mutex.
213    past_lookahead_limit_receiver: zs::WatchReceiver<bool>,
214
215    // Internal downloads state
216    //
217    /// A list of pending block download and verify tasks.
218    #[pin]
219    pending: FuturesUnordered<
220        JoinHandle<Result<(Height, block::Hash), (BlockDownloadVerifyError, block::Hash)>>,
221    >,
222
223    /// A list of channels that can be used to cancel pending block download and
224    /// verify tasks.
225    cancel_handles: HashMap<block::Hash, oneshot::Sender<()>>,
226}
227
228impl<ZN, ZV, ZSTip> Stream for Downloads<ZN, ZV, ZSTip>
229where
230    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Sync + 'static,
231    ZN::Future: Send,
232    ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
233        + Send
234        + Sync
235        + Clone
236        + 'static,
237    ZV::Future: Send,
238    ZSTip: ChainTip + Clone + Send + 'static,
239{
240    type Item = Result<(Height, block::Hash), BlockDownloadVerifyError>;
241
242    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
243        let this = self.project();
244        // CORRECTNESS
245        //
246        // The current task must be scheduled for wakeup every time we return
247        // `Poll::Pending`.
248        //
249        // If no download and verify tasks have exited since the last poll, this
250        // task is scheduled for wakeup when the next task becomes ready.
251        //
252        // TODO: this would be cleaner with poll_map (#2693)
253        if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
254            match join_result.expect("block download and verify tasks must not panic") {
255                Ok((height, hash)) => {
256                    this.cancel_handles.remove(&hash);
257
258                    Poll::Ready(Some(Ok((height, hash))))
259                }
260                Err((e, hash)) => {
261                    this.cancel_handles.remove(&hash);
262                    Poll::Ready(Some(Err(e)))
263                }
264            }
265        } else {
266            Poll::Ready(None)
267        }
268    }
269
270    fn size_hint(&self) -> (usize, Option<usize>) {
271        self.pending.size_hint()
272    }
273}
274
275impl<ZN, ZV, ZSTip> Downloads<ZN, ZV, ZSTip>
276where
277    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Sync + 'static,
278    ZN::Future: Send,
279    ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
280        + Send
281        + Sync
282        + Clone
283        + 'static,
284    ZV::Future: Send,
285    ZSTip: ChainTip + Clone + Send + 'static,
286{
287    /// Initialize a new download stream with the provided `network` and
288    /// `verifier` services.
289    ///
290    /// Uses the `latest_chain_tip` and `lookahead_limit` to drop blocks
291    /// that are too far ahead of the current state tip.
292    /// Uses `max_checkpoint_height` to work around a known block timeout (#5125).
293    ///
294    /// The [`Downloads`] stream is agnostic to the network policy, so retry and
295    /// timeout limits should be applied to the `network` service passed into
296    /// this constructor.
297    pub fn new(
298        network: ZN,
299        verifier: ZV,
300        latest_chain_tip: ZSTip,
301        past_lookahead_limit_sender: watch::Sender<bool>,
302        lookahead_limit: usize,
303        max_checkpoint_height: Height,
304    ) -> Self {
305        let past_lookahead_limit_receiver =
306            zs::WatchReceiver::new(past_lookahead_limit_sender.subscribe());
307
308        Self {
309            network,
310            verifier,
311            latest_chain_tip,
312            lookahead_limit,
313            max_checkpoint_height,
314            past_lookahead_limit_sender: Arc::new(std::sync::Mutex::new(
315                past_lookahead_limit_sender,
316            )),
317            past_lookahead_limit_receiver,
318            pending: FuturesUnordered::new(),
319            cancel_handles: HashMap::new(),
320        }
321    }
322
323    /// Queue a block for download and verification.
324    ///
325    /// This method waits for the network to become ready, and returns an error
326    /// only if the network service fails. It returns immediately after queuing
327    /// the request.
328    #[instrument(level = "debug", skip(self), fields(%hash))]
329    pub async fn download_and_verify(
330        &mut self,
331        hash: block::Hash,
332    ) -> Result<(), BlockDownloadVerifyError> {
333        if self.cancel_handles.contains_key(&hash) {
334            metrics::counter!("sync.already.queued.dropped.block.hash.count").increment(1);
335            return Err(BlockDownloadVerifyError::DuplicateBlockQueuedForDownload { hash });
336        }
337
338        // We construct the block requests sequentially, waiting for the peer
339        // set to be ready to process each request. This ensures that we start
340        // block downloads in the order we want them (though they may resolve
341        // out of order), and it means that we respect backpressure. Otherwise,
342        // if we waited for readiness and did the service call in the spawned
343        // tasks, all of the spawned tasks would race each other waiting for the
344        // network to become ready.
345        let block_req = self
346            .network
347            .ready()
348            .await
349            .map_err(|error| BlockDownloadVerifyError::NetworkServiceError { error })?
350            .call(zn::Request::BlocksByHash(std::iter::once(hash).collect()));
351
352        // This oneshot is used to signal cancellation to the download task.
353        let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
354
355        let mut verifier = self.verifier.clone();
356        let latest_chain_tip = self.latest_chain_tip.clone();
357
358        let lookahead_limit = self.lookahead_limit;
359        let max_checkpoint_height = self.max_checkpoint_height;
360
361        let past_lookahead_limit_sender = self.past_lookahead_limit_sender.clone();
362        let past_lookahead_limit_receiver = self.past_lookahead_limit_receiver.clone();
363
364        let task = tokio::spawn(
365            async move {
366                // Download the block.
367                // Prefer the cancel handle if both are ready.
368                let download_start = std::time::Instant::now();
369                let rsp = tokio::select! {
370                    biased;
371                    _ = &mut cancel_rx => {
372                        trace!("task cancelled prior to download completion");
373                        metrics::counter!("sync.cancelled.download.count").increment(1);
374                        metrics::histogram!("sync.block.download.duration_seconds", "result" => "cancelled")
375                            .record(download_start.elapsed().as_secs_f64());
376                        return Err(BlockDownloadVerifyError::CancelledDuringDownload { hash })
377                    }
378                    rsp = block_req => rsp.map_err(|error| BlockDownloadVerifyError::DownloadFailed { error, hash})?,
379                };
380
381                let (block, advertiser_addr) = if let zn::Response::Blocks(blocks) = rsp {
382                    assert_eq!(
383                        blocks.len(),
384                        1,
385                        "wrong number of blocks in response to a single hash"
386                    );
387
388                    blocks
389                        .first()
390                        .expect("just checked length")
391                        .available()
392                        .expect("unexpected missing block status: single block failures should be errors")
393                } else {
394                    unreachable!("wrong response to block request");
395                };
396                metrics::counter!("sync.downloaded.block.count").increment(1);
397                metrics::histogram!("sync.block.download.duration_seconds", "result" => "success")
398                    .record(download_start.elapsed().as_secs_f64());
399
400                // Security & Performance: reject blocks that are too far ahead of our tip.
401                // Avoids denial of service attacks, and reduces wasted work on high blocks
402                // that will timeout before being verified.
403                let tip_height = latest_chain_tip.best_tip_height();
404
405                let (lookahead_drop_height, lookahead_pause_height, lookahead_reset_height) = if let Some(tip_height) = tip_height {
406                    // Scale the height limit with the lookahead limit,
407                    // so users with low capacity or under DoS can reduce them both.
408                    let lookahead_pause = HeightDiff::try_from(
409                        lookahead_limit + lookahead_limit * VERIFICATION_PIPELINE_SCALING_MULTIPLIER,
410                    )
411                        .expect("fits in HeightDiff");
412
413
414                    ((tip_height + VERIFICATION_PIPELINE_DROP_LIMIT).expect("tip is much lower than Height::MAX"),
415                     (tip_height + lookahead_pause).expect("tip is much lower than Height::MAX"),
416                     (tip_height + lookahead_pause/2).expect("tip is much lower than Height::MAX"))
417                } else {
418                    let genesis_drop = VERIFICATION_PIPELINE_DROP_LIMIT.try_into().expect("fits in u32");
419                    let genesis_lookahead =
420                        u32::try_from(lookahead_limit - 1).expect("fits in u32");
421
422                    (block::Height(genesis_drop),
423                     block::Height(genesis_lookahead),
424                     block::Height(genesis_lookahead/2))
425                };
426
427                // Get the finalized tip height, assuming we're using the non-finalized state.
428                //
429                // It doesn't matter if we're a few blocks off here, because blocks this low
430                // are part of a fork with much less work. So they would be rejected anyway.
431                //
432                // And if we're still checkpointing, the checkpointer will reject blocks behind
433                // the finalized tip anyway.
434                //
435                // TODO: get the actual finalized tip height
436                let min_accepted_height = tip_height
437                    .map(|tip_height| {
438                        block::Height(tip_height.0.saturating_sub(zs::MAX_BLOCK_REORG_HEIGHT))
439                    })
440                    .unwrap_or(block::Height(0));
441
442                let block_height = if let Some(block_height) = block.coinbase_height() {
443                    block_height
444                } else {
445                    debug!(
446                        ?hash,
447                        "synced block with no height: dropped downloaded block"
448                    );
449                    metrics::counter!("sync.no.height.dropped.block.count").increment(1);
450
451                    return Err(BlockDownloadVerifyError::InvalidHeight { hash });
452                };
453
454                if block_height > lookahead_drop_height {
455                    Err(BlockDownloadVerifyError::AboveLookaheadHeightLimit { height: block_height, hash })?;
456                } else if block_height > lookahead_pause_height {
457                    // This log can be very verbose, usually hundreds of blocks are dropped.
458                    // So we only log at info level for the first above-height block.
459                    if !past_lookahead_limit_receiver.cloned_watch_data() {
460                        info!(
461                            ?hash,
462                            ?block_height,
463                            ?tip_height,
464                            ?lookahead_pause_height,
465                            ?lookahead_reset_height,
466                            lookahead_limit = ?lookahead_limit,
467                            "synced block height too far ahead of the tip: \
468                             waiting for downloaded blocks to commit to the state",
469                        );
470
471                        // Set the watched value to true, since we're over the limit.
472                        //
473                        // It is ok to block here, because we're going to pause new downloads anyway.
474                        // But if Zebra is shutting down, ignore the send error.
475                        let _ = past_lookahead_limit_sender.lock().expect("thread panicked while holding the past_lookahead_limit_sender mutex guard").send(true);
476                    } else {
477                        debug!(
478                            ?hash,
479                            ?block_height,
480                            ?tip_height,
481                            ?lookahead_pause_height,
482                            ?lookahead_reset_height,
483                            lookahead_limit = ?lookahead_limit,
484                            "synced block height too far ahead of the tip: \
485                             waiting for downloaded blocks to commit to the state",
486                        );
487                    }
488
489                    metrics::counter!("sync.max.height.limit.paused.count").increment(1);
490                } else if block_height <= lookahead_reset_height && past_lookahead_limit_receiver.cloned_watch_data() {
491                    // Reset the watched value to false, since we're well under the limit.
492                    // We need to block here, because if we don't the syncer can hang.
493
494                    // But if Zebra is shutting down, ignore the send error.
495                    let _ = past_lookahead_limit_sender.lock().expect("thread panicked while holding the past_lookahead_limit_sender mutex guard").send(false);
496                    metrics::counter!("sync.max.height.limit.reset.count").increment(1);
497
498                    metrics::counter!("sync.max.height.limit.reset.attempt.count").increment(1);
499                }
500
501                if block_height < min_accepted_height {
502                    debug!(
503                        ?hash,
504                        ?block_height,
505                        ?tip_height,
506                        ?min_accepted_height,
507                        behind_tip_limit = ?zs::MAX_BLOCK_REORG_HEIGHT,
508                        "synced block height behind the finalized tip: dropped downloaded block"
509                    );
510                    metrics::counter!("gossip.min.height.limit.dropped.block.count").increment(1);
511
512                    Err(BlockDownloadVerifyError::BehindTipHeightLimit { height: block_height, hash })?;
513                }
514
515                // Wait for the verifier service to be ready.
516                let readiness = verifier.ready();
517                // Prefer the cancel handle if both are ready.
518                let verifier = tokio::select! {
519                    biased;
520                    _ = &mut cancel_rx => {
521                        trace!("task cancelled waiting for verifier service readiness");
522                        metrics::counter!("sync.cancelled.verify.ready.count").increment(1);
523                        return Err(BlockDownloadVerifyError::CancelledAwaitingVerifierReadiness { height: block_height, hash })
524                    }
525                    verifier = readiness => verifier,
526                };
527
528                // Verify the block.
529                let verify_start = std::time::Instant::now();
530                let mut rsp = verifier
531                    .map_err(|error| BlockDownloadVerifyError::VerifierServiceError { error })?
532                    .call(zebra_consensus::Request::Commit(block)).boxed();
533
534                // Add a shorter timeout to workaround a known bug (#5125)
535                let short_timeout_max = (max_checkpoint_height + FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT_LIMIT).expect("checkpoint block height is in valid range");
536                if block_height >= max_checkpoint_height && block_height <= short_timeout_max {
537                    rsp = timeout(FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT, rsp)
538                        .map_err(|timeout| format!("initial fully verified block timed out: retrying: {timeout:?}").into())
539                        .map(|nested_result| nested_result.and_then(convert::identity)).boxed();
540                }
541
542                let verification = tokio::select! {
543                    biased;
544                    _ = &mut cancel_rx => {
545                        trace!("task cancelled prior to verification");
546                        metrics::counter!("sync.cancelled.verify.count").increment(1);
547                        metrics::histogram!("sync.block.verify.duration_seconds", "result" => "cancelled")
548                            .record(verify_start.elapsed().as_secs_f64());
549                        return Err(BlockDownloadVerifyError::CancelledDuringVerification { height: block_height, hash })
550                    }
551                    verification = rsp => verification,
552                };
553
554                let verify_result = if verification.is_ok() { "success" } else { "failure" };
555                metrics::histogram!("sync.block.verify.duration_seconds", "result" => verify_result)
556                    .record(verify_start.elapsed().as_secs_f64());
557
558                if verification.is_ok() {
559                    metrics::counter!("sync.verified.block.count").increment(1);
560                }
561
562                verification
563                    .map(|hash| (block_height, hash))
564                    .map_err(|err| {
565                        match err.downcast::<zebra_consensus::router::RouterError>() {
566                            Ok(error) => BlockDownloadVerifyError::Invalid { error: *error, height: block_height, hash, advertiser_addr },
567                            Err(error) => BlockDownloadVerifyError::ValidationRequestError { error, height: block_height, hash },
568                        }
569                    })
570            }
571            .in_current_span()
572            // Tack the hash onto the error so we can remove the cancel handle
573            // on failure as well as on success.
574            .map_err(move |e| (e, hash)),
575        );
576
577        // Try to start the spawned task before queueing the next block request
578        tokio::task::yield_now().await;
579
580        self.pending.push(task);
581        assert!(
582            self.cancel_handles.insert(hash, cancel_tx).is_none(),
583            "blocks are only queued once"
584        );
585
586        Ok(())
587    }
588
589    /// Cancel all running tasks and reset the downloader state.
590    pub fn cancel_all(&mut self) {
591        // Replace the pending task list with an empty one and drop it.
592        let _ = std::mem::take(&mut self.pending);
593
594        // Signal cancellation to all running tasks.
595        // Since we already dropped the JoinHandles above, they should
596        // fail silently.
597        for (_hash, cancel) in self.cancel_handles.drain() {
598            let _ = cancel.send(());
599        }
600
601        assert!(self.pending.is_empty());
602        assert!(self.cancel_handles.is_empty());
603
604        // Set the lookahead limit to false, since we're empty (so we're under the limit).
605        //
606        // It is ok to block here, because we're doing a reset and sleep anyway.
607        // But if Zebra is shutting down, ignore the send error.
608        let _ = self
609            .past_lookahead_limit_sender
610            .lock()
611            .expect("thread panicked while holding the past_lookahead_limit_sender mutex guard")
612            .send(false);
613    }
614
615    /// Get the number of currently in-flight download and verify tasks.
616    pub fn in_flight(&mut self) -> usize {
617        self.pending.len()
618    }
619
620    /// Returns true if there are no in-flight download and verify tasks.
621    #[allow(dead_code)]
622    pub fn is_empty(&mut self) -> bool {
623        self.pending.is_empty()
624    }
625}