zebrad/components/sync.rs
1//! The syncer downloads and verifies large numbers of blocks from peers to Zebra.
2//!
3//! It is used when Zebra is a long way behind the current chain tip.
4
5use std::{cmp::max, collections::HashSet, convert, pin::Pin, task::Poll, time::Duration};
6
7use color_eyre::eyre::{eyre, Report};
8use futures::stream::{FuturesUnordered, StreamExt};
9use indexmap::IndexSet;
10use serde::{Deserialize, Serialize};
11use tokio::{
12 sync::{mpsc, watch},
13 task::JoinError,
14 time::{sleep, timeout},
15};
16use tower::{
17 builder::ServiceBuilder, hedge::Hedge, limit::ConcurrencyLimit, retry::Retry, timeout::Timeout,
18 Service, ServiceExt,
19};
20
21use zebra_chain::{
22 block::{self, Height, HeightDiff},
23 chain_tip::ChainTip,
24};
25use zebra_network::{self as zn, PeerSocketAddr};
26use zebra_state as zs;
27
28use crate::{
29 components::sync::downloads::BlockDownloadVerifyError, config::ZebradConfig, BoxError,
30};
31
32mod downloads;
33pub mod end_of_support;
34mod gossip;
35mod progress;
36mod recent_sync_lengths;
37mod status;
38
39#[cfg(test)]
40mod tests;
41
42use downloads::{AlwaysHedge, Downloads};
43
44pub use downloads::VERIFICATION_PIPELINE_SCALING_MULTIPLIER;
45pub use gossip::{gossip_best_tip_block_hashes, BlockGossipError};
46pub use progress::show_block_chain_progress;
47pub use recent_sync_lengths::RecentSyncLengths;
48pub use status::SyncStatus;
49
50/// Controls the number of peers used for each ObtainTips and ExtendTips request.
51const FANOUT: usize = 3;
52
53/// Controls how many times we will retry each block download.
54///
55/// Failing block downloads is important because it defends against peers who
56/// feed us bad hashes. But spurious failures of valid blocks cause the syncer to
57/// restart from the previous checkpoint, potentially re-downloading blocks.
58///
59/// We also hedge requests, so we may retry up to twice this many times. Hedged
60/// retries may be concurrent, inner retries are sequential.
61const BLOCK_DOWNLOAD_RETRY_LIMIT: usize = 3;
62
63/// A lower bound on the user-specified checkpoint verification concurrency limit.
64///
65/// Set to the maximum checkpoint interval, so the pipeline holds around a checkpoint's
66/// worth of blocks.
67///
68/// ## Security
69///
70/// If a malicious node is chosen for an ObtainTips or ExtendTips request, it can
71/// provide up to 500 malicious block hashes. These block hashes will be
72/// distributed across all available peers. Assuming there are around 50 connected
73/// peers, the malicious node will receive approximately 10 of those block requests.
74///
75/// Malicious deserialized blocks can take up a large amount of RAM, see
76/// [`super::inbound::downloads::MAX_INBOUND_CONCURRENCY`] and #1880 for details.
77/// So we want to keep the lookahead limit reasonably small.
78///
79/// Once these malicious blocks start failing validation, the syncer will cancel all
80/// the pending download and verify tasks, drop all the blocks, and start a new
81/// ObtainTips with a new set of peers.
82pub const MIN_CHECKPOINT_CONCURRENCY_LIMIT: usize = zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP;
83
84/// The default for the user-specified lookahead limit.
85///
86/// See [`MIN_CHECKPOINT_CONCURRENCY_LIMIT`] for details.
87pub const DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT: usize = MAX_TIPS_RESPONSE_HASH_COUNT * 2;
88
89/// A lower bound on the user-specified concurrency limit.
90///
91/// If the concurrency limit is 0, Zebra can't download or verify any blocks.
92pub const MIN_CONCURRENCY_LIMIT: usize = 1;
93
94/// The expected maximum number of hashes in an ObtainTips or ExtendTips response.
95///
96/// This is used to allow block heights that are slightly beyond the lookahead limit,
97/// but still limit the number of blocks in the pipeline between the downloader and
98/// the state.
99///
100/// See [`MIN_CHECKPOINT_CONCURRENCY_LIMIT`] for details.
101pub const MAX_TIPS_RESPONSE_HASH_COUNT: usize = 500;
102
103/// Controls how long we wait for a tips response to return.
104///
105/// ## Correctness
106///
107/// If this timeout is removed (or set too high), the syncer will sometimes hang.
108///
109/// If this timeout is set too low, the syncer will sometimes get stuck in a
110/// failure loop.
111pub const TIPS_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6);
112
113/// Controls how long we wait between gossiping successive blocks or transactions.
114///
115/// ## Correctness
116///
117/// If this timeout is set too high, blocks and transactions won't propagate through
118/// the network efficiently.
119///
120/// If this timeout is set too low, the peer set and remote peers can get overloaded.
121pub const PEER_GOSSIP_DELAY: Duration = Duration::from_secs(7);
122
123/// Controls how long we wait for a block download request to complete.
124///
125/// This timeout makes sure that the syncer doesn't hang when:
126/// - the lookahead queue is full, and
127/// - we are waiting for a request that is stuck.
128///
129/// See [`BLOCK_VERIFY_TIMEOUT`] for details.
130///
131/// ## Correctness
132///
133/// If this timeout is removed (or set too high), the syncer will sometimes hang.
134///
135/// If this timeout is set too low, the syncer will sometimes get stuck in a
136/// failure loop.
137///
138/// We set the timeout so that it requires under 1 Mbps bandwidth for a full 2 MB block.
139pub(super) const BLOCK_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(20);
140
141/// Controls how long we wait for a block verify request to complete.
142///
143/// This timeout makes sure that the syncer doesn't hang when:
144/// - the lookahead queue is full, and
145/// - all pending verifications:
146/// - are waiting on a missing download request,
147/// - are waiting on a download or verify request that has failed, but we have
148/// deliberately ignored the error,
149/// - are for blocks a long way ahead of the current tip, or
150/// - are for invalid blocks which will never verify, because they depend on
151/// missing blocks or transactions.
152///
153/// These conditions can happen during normal operation - they are not bugs.
154///
155/// This timeout also mitigates or hides the following kinds of bugs:
156/// - all pending verifications:
157/// - are waiting on a download or verify request that has failed, but we have
158/// accidentally dropped the error,
159/// - are waiting on a download request that has hung inside Zebra,
160/// - are on tokio threads that are waiting for blocked operations.
161///
162/// ## Correctness
163///
164/// If this timeout is removed (or set too high), the syncer will sometimes hang.
165///
166/// If this timeout is set too low, the syncer will sometimes get stuck in a
167/// failure loop.
168///
169/// We've observed spurious 15 minute timeouts when a lot of blocks are being committed to
170/// the state. But there are also some blocks that seem to hang entirely, and never return.
171///
172/// So we allow about half the spurious timeout, which might cause some re-downloads.
173pub(super) const BLOCK_VERIFY_TIMEOUT: Duration = Duration::from_secs(8 * 60);
174
175/// A shorter timeout used for the first few blocks after the final checkpoint.
176///
177/// This is a workaround for bug #5125, where the first fully validated blocks
178/// after the final checkpoint fail with a timeout, due to a UTXO race condition.
179const FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT: Duration = Duration::from_secs(2 * 60);
180
181/// The number of blocks after the final checkpoint that get the shorter timeout.
182///
183/// We've only seen this error on the first few blocks after the final checkpoint.
184const FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT_LIMIT: HeightDiff = 100;
185
186/// Controls how long we wait to restart syncing after finishing a sync run.
187///
188/// This delay should be long enough to:
189/// - allow zcashd peers to process pending requests. If the node only has a
190/// few peers, we want to clear as much peer state as possible. In
191/// particular, zcashd sends "next block range" hints, based on zcashd's
192/// internal model of our sync progress. But we want to discard these hints,
193/// so they don't get confused with ObtainTips and ExtendTips responses, and
194/// - allow in-progress downloads to time out.
195///
196/// This delay is particularly important on instances with slow or unreliable
197/// networks, and on testnet, which has a small number of slow peers.
198///
199/// Using a prime number makes sure that syncer fanouts don't synchronise with other crawls.
200///
201/// ## Correctness
202///
203/// If this delay is removed (or set too low), the syncer will
204/// sometimes get stuck in a failure loop, due to leftover downloads from
205/// previous sync runs.
206const SYNC_RESTART_DELAY: Duration = Duration::from_secs(67);
207
208/// In regtest, use a much shorter restart delay so that downstream nodes pick up
209/// newly-mined blocks quickly (e.g. after `generate(N)` in integration tests).
210/// The default 67-second delay exceeds the typical `sync_all` timeout of 60 seconds.
211const REGTEST_SYNC_RESTART_DELAY: Duration = Duration::from_secs(2);
212
213/// Controls how long we wait to retry a failed attempt to download
214/// and verify the genesis block.
215///
216/// This timeout gives the crawler time to find better peers.
217///
218/// ## Security
219///
220/// If this timeout is removed (or set too low), Zebra will immediately retry
221/// to download and verify the genesis block from its peers. This can cause
222/// a denial of service on those peers.
223///
224/// If this timeout is too short, old or buggy nodes will keep making useless
225/// network requests. If there are a lot of them, it could overwhelm the network.
226const GENESIS_TIMEOUT_RETRY: Duration = Duration::from_secs(10);
227
228/// Sync configuration section.
229#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
230#[serde(deny_unknown_fields, default)]
231pub struct Config {
232 /// The number of parallel block download requests.
233 ///
234 /// This is set to a low value by default, to avoid task and
235 /// network contention. Increasing this value may improve
236 /// performance on machines with a fast network connection.
237 #[serde(alias = "max_concurrent_block_requests")]
238 pub download_concurrency_limit: usize,
239
240 /// The number of blocks submitted in parallel to the checkpoint verifier.
241 ///
242 /// Increasing this limit increases the buffer size, so it reduces
243 /// the impact of an individual block request failing. However, it
244 /// also increases memory and CPU usage if block validation stalls,
245 /// or there are some large blocks in the pipeline.
246 ///
247 /// The block size limit is 2MB, so in theory, this could represent multiple
248 /// gigabytes of data, if we downloaded arbitrary blocks. However,
249 /// because we randomly load balance outbound requests, and separate
250 /// block download from obtaining block hashes, an adversary would
251 /// have to control a significant fraction of our peers to lead us
252 /// astray.
253 ///
254 /// For reliable checkpoint syncing, Zebra enforces a
255 /// [`MIN_CHECKPOINT_CONCURRENCY_LIMIT`].
256 ///
257 /// This is set to a high value by default, to avoid verification pipeline stalls.
258 /// Decreasing this value reduces RAM usage.
259 #[serde(alias = "lookahead_limit")]
260 pub checkpoint_verify_concurrency_limit: usize,
261
262 /// The number of blocks submitted in parallel to the full verifier.
263 ///
264 /// This is set to a low value by default, to avoid verification timeouts on large blocks.
265 /// Increasing this value may improve performance on machines with many cores.
266 pub full_verify_concurrency_limit: usize,
267
268 /// The number of threads used to verify signatures, proofs, and other CPU-intensive code.
269 ///
270 /// If the number of threads is not configured or zero, Zebra uses the number of logical cores.
271 /// If the number of logical cores can't be detected, Zebra uses one thread.
272 /// For details, see [the `rayon` documentation](https://docs.rs/rayon/latest/rayon/struct.ThreadPoolBuilder.html#method.num_threads).
273 pub parallel_cpu_threads: usize,
274}
275
276impl Default for Config {
277 fn default() -> Self {
278 Self {
279 // 2/3 of the default outbound peer limit.
280 download_concurrency_limit: 50,
281
282 // A few max-length checkpoints.
283 checkpoint_verify_concurrency_limit: DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT,
284
285 // This default is deliberately very low, so Zebra can verify a few large blocks in under 60 seconds,
286 // even on machines with only a few cores.
287 //
288 // This lets users see the committed block height changing in every progress log,
289 // and avoids hangs due to out-of-order verifications flooding the CPUs.
290 //
291 // TODO:
292 // - limit full verification concurrency based on block transaction counts?
293 // - move more disk work to blocking tokio threads,
294 // and CPU work to the rayon thread pool inside blocking tokio threads
295 full_verify_concurrency_limit: 20,
296
297 // Use one thread per CPU.
298 //
299 // If this causes tokio executor starvation, move CPU-intensive tasks to rayon threads,
300 // or reserve a few cores for tokio threads, based on `num_cpus()`.
301 parallel_cpu_threads: 0,
302 }
303 }
304}
305
306/// Helps work around defects in the bitcoin protocol by checking whether
307/// the returned hashes actually extend a chain tip.
308#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
309struct CheckedTip {
310 tip: block::Hash,
311 expected_next: block::Hash,
312}
313
314pub struct ChainSync<ZN, ZS, ZV, ZSTip>
315where
316 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError>
317 + Send
318 + Sync
319 + Clone
320 + 'static,
321 ZN::Future: Send,
322 ZS: Service<zs::Request, Response = zs::Response, Error = BoxError>
323 + Send
324 + Sync
325 + Clone
326 + 'static,
327 ZS::Future: Send,
328 ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
329 + Send
330 + Sync
331 + Clone
332 + 'static,
333 ZV::Future: Send,
334 ZSTip: ChainTip + Clone + Send + 'static,
335{
336 // Configuration
337 //
338 /// The genesis hash for the configured network
339 genesis_hash: block::Hash,
340
341 /// The largest block height for the checkpoint verifier, based on the current config.
342 max_checkpoint_height: Height,
343
344 /// The configured checkpoint verification concurrency limit, after applying the minimum limit.
345 checkpoint_verify_concurrency_limit: usize,
346
347 /// The configured full verification concurrency limit, after applying the minimum limit.
348 full_verify_concurrency_limit: usize,
349
350 /// Whether the node is running on regtest. Used to apply a shorter sync restart delay.
351 is_regtest: bool,
352
353 // Services
354 //
355 /// A network service which is used to perform ObtainTips and ExtendTips
356 /// requests.
357 ///
358 /// Has no retry logic, because failover is handled using fanout.
359 tip_network: Timeout<ZN>,
360
361 /// A service which downloads and verifies blocks, using the provided
362 /// network and verifier services.
363 downloads: Pin<
364 Box<
365 Downloads<
366 Hedge<ConcurrencyLimit<Retry<zn::RetryLimit, Timeout<ZN>>>, AlwaysHedge>,
367 Timeout<ZV>,
368 ZSTip,
369 >,
370 >,
371 >,
372
373 /// The cached block chain state.
374 state: ZS,
375
376 /// Allows efficient access to the best tip of the blockchain.
377 latest_chain_tip: ZSTip,
378
379 // Internal sync state
380 //
381 /// The tips that the syncer is currently following.
382 prospective_tips: HashSet<CheckedTip>,
383
384 /// The lengths of recent sync responses.
385 recent_syncs: RecentSyncLengths,
386
387 /// Receiver that is `true` when the downloader is past the lookahead limit.
388 /// This is based on the downloaded block height and the state tip height.
389 past_lookahead_limit_receiver: zs::WatchReceiver<bool>,
390
391 /// Sender for reporting peer addresses that advertised unexpectedly invalid transactions.
392 misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
393}
394
395/// Polls the network to determine whether further blocks are available and
396/// downloads them.
397///
398/// This component is used for initial block sync, but the `Inbound` service is
399/// responsible for participating in the gossip protocols used for block
400/// diffusion.
401impl<ZN, ZS, ZV, ZSTip> ChainSync<ZN, ZS, ZV, ZSTip>
402where
403 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError>
404 + Send
405 + Sync
406 + Clone
407 + 'static,
408 ZN::Future: Send,
409 ZS: Service<zs::Request, Response = zs::Response, Error = BoxError>
410 + Send
411 + Sync
412 + Clone
413 + 'static,
414 ZS::Future: Send,
415 ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
416 + Send
417 + Sync
418 + Clone
419 + 'static,
420 ZV::Future: Send,
421 ZSTip: ChainTip + Clone + Send + 'static,
422{
423 /// Returns a new syncer instance, using:
424 /// - chain: the zebra-chain `Network` to download (Mainnet or Testnet)
425 /// - peers: the zebra-network peers to contact for downloads
426 /// - verifier: the zebra-consensus verifier that checks the chain
427 /// - state: the zebra-state that stores the chain
428 /// - latest_chain_tip: the latest chain tip from `state`
429 ///
430 /// Also returns a [`SyncStatus`] to check if the syncer has likely reached the chain tip.
431 pub fn new(
432 config: &ZebradConfig,
433 max_checkpoint_height: Height,
434 peers: ZN,
435 verifier: ZV,
436 state: ZS,
437 latest_chain_tip: ZSTip,
438 misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
439 ) -> (Self, SyncStatus) {
440 let mut download_concurrency_limit = config.sync.download_concurrency_limit;
441 let mut checkpoint_verify_concurrency_limit =
442 config.sync.checkpoint_verify_concurrency_limit;
443 let mut full_verify_concurrency_limit = config.sync.full_verify_concurrency_limit;
444
445 if download_concurrency_limit < MIN_CONCURRENCY_LIMIT {
446 warn!(
447 "configured download concurrency limit {} too low, increasing to {}",
448 config.sync.download_concurrency_limit, MIN_CONCURRENCY_LIMIT,
449 );
450
451 download_concurrency_limit = MIN_CONCURRENCY_LIMIT;
452 }
453
454 if checkpoint_verify_concurrency_limit < MIN_CHECKPOINT_CONCURRENCY_LIMIT {
455 warn!(
456 "configured checkpoint verify concurrency limit {} too low, increasing to {}",
457 config.sync.checkpoint_verify_concurrency_limit, MIN_CHECKPOINT_CONCURRENCY_LIMIT,
458 );
459
460 checkpoint_verify_concurrency_limit = MIN_CHECKPOINT_CONCURRENCY_LIMIT;
461 }
462
463 if full_verify_concurrency_limit < MIN_CONCURRENCY_LIMIT {
464 warn!(
465 "configured full verify concurrency limit {} too low, increasing to {}",
466 config.sync.full_verify_concurrency_limit, MIN_CONCURRENCY_LIMIT,
467 );
468
469 full_verify_concurrency_limit = MIN_CONCURRENCY_LIMIT;
470 }
471
472 let tip_network = Timeout::new(peers.clone(), TIPS_RESPONSE_TIMEOUT);
473
474 // The Hedge middleware is the outermost layer, hedging requests
475 // between two retry-wrapped networks. The innermost timeout
476 // layer is relatively unimportant, because slow requests will
477 // probably be preemptively hedged.
478 //
479 // The Hedge goes outside the Retry, because the Retry layer
480 // abstracts away spurious failures from individual peers
481 // making a less-fallible network service, and the Hedge layer
482 // tries to reduce latency of that less-fallible service.
483 let block_network = Hedge::new(
484 ServiceBuilder::new()
485 .concurrency_limit(download_concurrency_limit)
486 .retry(zn::RetryLimit::new(BLOCK_DOWNLOAD_RETRY_LIMIT))
487 .timeout(BLOCK_DOWNLOAD_TIMEOUT)
488 .service(peers),
489 AlwaysHedge,
490 20,
491 0.95,
492 2 * SYNC_RESTART_DELAY,
493 );
494
495 // We apply a timeout to the verifier to avoid hangs due to missing earlier blocks.
496 let verifier = Timeout::new(verifier, BLOCK_VERIFY_TIMEOUT);
497
498 let (sync_status, recent_syncs) = SyncStatus::new_for_network(&config.network.network);
499
500 let (past_lookahead_limit_sender, past_lookahead_limit_receiver) = watch::channel(false);
501 let past_lookahead_limit_receiver = zs::WatchReceiver::new(past_lookahead_limit_receiver);
502
503 let downloads = Box::pin(Downloads::new(
504 block_network,
505 verifier,
506 latest_chain_tip.clone(),
507 past_lookahead_limit_sender,
508 max(
509 checkpoint_verify_concurrency_limit,
510 full_verify_concurrency_limit,
511 ),
512 max_checkpoint_height,
513 ));
514
515 let new_syncer = Self {
516 genesis_hash: config.network.network.genesis_hash(),
517 max_checkpoint_height,
518 checkpoint_verify_concurrency_limit,
519 full_verify_concurrency_limit,
520 is_regtest: config.network.network.is_regtest(),
521 tip_network,
522 downloads,
523 state,
524 latest_chain_tip,
525 prospective_tips: HashSet::new(),
526 recent_syncs,
527 past_lookahead_limit_receiver,
528 misbehavior_sender,
529 };
530
531 (new_syncer, sync_status)
532 }
533
534 /// Runs the syncer to synchronize the chain and keep it synchronized.
535 #[instrument(skip(self))]
536 pub async fn sync(mut self) -> Result<(), Report> {
537 // We can't download the genesis block using our normal algorithm,
538 // due to protocol limitations
539 self.request_genesis().await?;
540
541 loop {
542 if self.try_to_sync().await.is_err() {
543 self.downloads.cancel_all();
544 }
545
546 self.update_metrics();
547
548 let restart_delay = if self.is_regtest {
549 REGTEST_SYNC_RESTART_DELAY
550 } else {
551 SYNC_RESTART_DELAY
552 };
553 info!(
554 timeout = ?restart_delay,
555 state_tip = ?self.latest_chain_tip.best_tip_height(),
556 "waiting to restart sync"
557 );
558 sleep(restart_delay).await;
559 }
560 }
561
562 /// Tries to synchronize the chain as far as it can.
563 ///
564 /// Obtains some prospective tips and iteratively tries to extend them and download the missing
565 /// blocks.
566 ///
567 /// Returns `Ok` if it was able to synchronize as much of the chain as it could, and then ran
568 /// out of prospective tips. This happens when synchronization finishes or if Zebra ended up
569 /// following a fork. Either way, Zebra should attempt to obtain some more tips.
570 ///
571 /// Returns `Err` if there was an unrecoverable error and restarting the synchronization is
572 /// necessary. This includes outer timeouts, where an entire syncing step takes an extremely
573 /// long time. (These usually indicate hangs.)
574 #[instrument(skip(self))]
575 async fn try_to_sync(&mut self) -> Result<(), Report> {
576 self.prospective_tips = HashSet::new();
577
578 info!(
579 state_tip = ?self.latest_chain_tip.best_tip_height(),
580 "starting sync, obtaining new tips"
581 );
582 let mut extra_hashes = timeout(SYNC_RESTART_DELAY, self.obtain_tips())
583 .await
584 .map_err(Into::into)
585 // TODO: replace with flatten() when it stabilises (#70142)
586 .and_then(convert::identity)
587 .map_err(|e| {
588 info!("temporary error obtaining tips: {:#}", e);
589 e
590 })?;
591 self.update_metrics();
592
593 while !self.prospective_tips.is_empty() || !extra_hashes.is_empty() {
594 // Avoid hangs due to service readiness or other internal operations
595 extra_hashes = timeout(BLOCK_VERIFY_TIMEOUT, self.try_to_sync_once(extra_hashes))
596 .await
597 .map_err(Into::into)
598 // TODO: replace with flatten() when it stabilises (#70142)
599 .and_then(convert::identity)?;
600 }
601
602 info!("exhausted prospective tip set");
603
604 Ok(())
605 }
606
607 /// Tries to synchronize the chain once, using the existing `extra_hashes`.
608 ///
609 /// Tries to extend the existing tips and download the missing blocks.
610 ///
611 /// Returns `Ok(extra_hashes)` if it was able to extend once and synchronize sone of the chain.
612 /// Returns `Err` if there was an unrecoverable error and restarting the synchronization is
613 /// necessary.
614 #[instrument(skip(self, extra_hashes))]
615 async fn try_to_sync_once(
616 &mut self,
617 mut extra_hashes: IndexSet<block::Hash>,
618 ) -> Result<IndexSet<block::Hash>, Report> {
619 // Check whether any block tasks are currently ready.
620 while let Poll::Ready(Some(rsp)) = futures::poll!(self.downloads.next()) {
621 // Some temporary errors are ignored, and syncing continues with other blocks.
622 // If it turns out they were actually important, syncing will run out of blocks, and
623 // the syncer will reset itself.
624 self.handle_block_response(rsp)?;
625 }
626 self.update_metrics();
627
628 // Pause new downloads while the syncer or downloader are past their lookahead limits.
629 //
630 // To avoid a deadlock or long waits for blocks to expire, we ignore the download
631 // lookahead limit when there are only a small number of blocks waiting.
632 while self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len())
633 || (self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len()) / 2
634 && self.past_lookahead_limit_receiver.cloned_watch_data())
635 {
636 trace!(
637 tips.len = self.prospective_tips.len(),
638 in_flight = self.downloads.in_flight(),
639 extra_hashes = extra_hashes.len(),
640 lookahead_limit = self.lookahead_limit(extra_hashes.len()),
641 state_tip = ?self.latest_chain_tip.best_tip_height(),
642 "waiting for pending blocks",
643 );
644
645 let response = self.downloads.next().await.expect("downloads is nonempty");
646
647 self.handle_block_response(response)?;
648 self.update_metrics();
649 }
650
651 // Once we're below the lookahead limit, we can request more blocks or hashes.
652 if !extra_hashes.is_empty() {
653 debug!(
654 tips.len = self.prospective_tips.len(),
655 in_flight = self.downloads.in_flight(),
656 extra_hashes = extra_hashes.len(),
657 lookahead_limit = self.lookahead_limit(extra_hashes.len()),
658 state_tip = ?self.latest_chain_tip.best_tip_height(),
659 "requesting more blocks",
660 );
661
662 let response = self.request_blocks(extra_hashes).await;
663 extra_hashes = Self::handle_hash_response(response)?;
664 } else {
665 info!(
666 tips.len = self.prospective_tips.len(),
667 in_flight = self.downloads.in_flight(),
668 extra_hashes = extra_hashes.len(),
669 lookahead_limit = self.lookahead_limit(extra_hashes.len()),
670 state_tip = ?self.latest_chain_tip.best_tip_height(),
671 "extending tips",
672 );
673
674 extra_hashes = self.extend_tips().await.map_err(|e| {
675 info!("temporary error extending tips: {:#}", e);
676 e
677 })?;
678 }
679 self.update_metrics();
680
681 Ok(extra_hashes)
682 }
683
684 /// Given a block_locator list fan out request for subsequent hashes to
685 /// multiple peers
686 #[instrument(skip(self))]
687 async fn obtain_tips(&mut self) -> Result<IndexSet<block::Hash>, Report> {
688 let stage_start = std::time::Instant::now();
689
690 let block_locator = self
691 .state
692 .ready()
693 .await
694 .map_err(|e| eyre!(e))?
695 .call(zebra_state::Request::BlockLocator)
696 .await
697 .map(|response| match response {
698 zebra_state::Response::BlockLocator(block_locator) => block_locator,
699 _ => unreachable!(
700 "GetBlockLocator request can only result in Response::BlockLocator"
701 ),
702 })
703 .map_err(|e| eyre!(e))?;
704
705 debug!(
706 tip = ?block_locator.first().expect("we have at least one block locator object"),
707 ?block_locator,
708 "got block locator and trying to obtain new chain tips"
709 );
710
711 let mut requests = FuturesUnordered::new();
712 for attempt in 0..FANOUT {
713 if attempt > 0 {
714 // Let other tasks run, so we're more likely to choose a different peer.
715 //
716 // TODO: move fanouts into the PeerSet, so we always choose different peers (#2214)
717 tokio::task::yield_now().await;
718 }
719
720 let ready_tip_network = self.tip_network.ready().await;
721 requests.push(tokio::spawn(ready_tip_network.map_err(|e| eyre!(e))?.call(
722 zn::Request::FindBlocks {
723 known_blocks: block_locator.clone(),
724 stop: None,
725 },
726 )));
727 }
728
729 let mut download_set = IndexSet::new();
730 while let Some(res) = requests.next().await {
731 match res
732 .unwrap_or_else(|e @ JoinError { .. }| {
733 if e.is_panic() {
734 panic!("panic in obtain tips task: {e:?}");
735 } else {
736 info!(
737 "task error during obtain tips task: {e:?},\
738 is Zebra shutting down?"
739 );
740 Err(e.into())
741 }
742 })
743 .map_err::<Report, _>(|e| eyre!(e))
744 {
745 Ok(zn::Response::BlockHashes(hashes)) => {
746 trace!(?hashes);
747
748 // zcashd sometimes appends an unrelated hash at the start
749 // or end of its response.
750 //
751 // We can't discard the first hash, because it might be a
752 // block we want to download. So we just accept any
753 // out-of-order first hashes.
754
755 // We use the last hash for the tip, and we want to avoid bad
756 // tips from zcashd's quirk of appending an unrelated hash.
757 // So we discard the last hash on mainnet/testnet.
758 // (We don't need to worry about missed downloads, because we
759 // will pick them up again in ExtendTips.)
760 //
761 // In regtest we only connect to Zebra nodes, not zcashd,
762 // so we trust all hashes in the response and keep them all.
763 // This is necessary when there are only a small number of
764 // blocks to sync (e.g. 2 new blocks), where stripping the
765 // last hash leaves only 1 unknown hash and rchunks_exact(2)
766 // would discard the entire response.
767 let hashes = if self.is_regtest {
768 hashes.as_slice()
769 } else {
770 match hashes.as_slice() {
771 [] => continue,
772 [rest @ .., _last] => rest,
773 }
774 };
775 if hashes.is_empty() {
776 continue;
777 }
778
779 let mut first_unknown = None;
780 for (i, &hash) in hashes.iter().enumerate() {
781 if !self.state_contains(hash).await? {
782 first_unknown = Some(i);
783 break;
784 }
785 }
786
787 debug!(hashes.len = ?hashes.len(), ?first_unknown);
788
789 let unknown_hashes = if let Some(index) = first_unknown {
790 &hashes[index..]
791 } else {
792 continue;
793 };
794
795 trace!(?unknown_hashes);
796
797 let new_tip = if let Some(end) = unknown_hashes.rchunks_exact(2).next() {
798 CheckedTip {
799 tip: end[0],
800 expected_next: end[1],
801 }
802 } else {
803 debug!("discarding response that extends only one block");
804 continue;
805 };
806
807 // Make sure we get the same tips, regardless of the
808 // order of peer responses
809 if !download_set.contains(&new_tip.expected_next) {
810 debug!(?new_tip,
811 "adding new prospective tip, and removing existing tips in the new block hash list");
812 self.prospective_tips
813 .retain(|t| !unknown_hashes.contains(&t.expected_next));
814 self.prospective_tips.insert(new_tip);
815 } else {
816 debug!(
817 ?new_tip,
818 "discarding prospective tip: already in download set"
819 );
820 }
821
822 // security: the first response determines our download order
823 //
824 // TODO: can we make the download order independent of response order?
825 let prev_download_len = download_set.len();
826 download_set.extend(unknown_hashes);
827 let new_download_len = download_set.len();
828 let new_hashes = new_download_len - prev_download_len;
829 debug!(new_hashes, "added hashes to download set");
830 metrics::histogram!("sync.obtain.response.hash.count")
831 .record(new_hashes as f64);
832 }
833 Ok(_) => unreachable!("network returned wrong response"),
834 // We ignore this error because we made multiple fanout requests.
835 Err(e) => debug!(?e),
836 }
837 }
838
839 debug!(?self.prospective_tips);
840
841 // Check that the new tips we got are actually unknown.
842 for hash in &download_set {
843 debug!(?hash, "checking if state contains hash");
844 if self.state_contains(*hash).await? {
845 return Err(eyre!("queued download of hash behind our chain tip"));
846 }
847 }
848
849 let new_downloads = download_set.len();
850 debug!(new_downloads, "queueing new downloads");
851 metrics::gauge!("sync.obtain.queued.hash.count").set(new_downloads as f64);
852
853 // security: use the actual number of new downloads from all peers,
854 // so the last peer to respond can't toggle our mempool
855 self.recent_syncs.push_obtain_tips_length(new_downloads);
856
857 let response = self.request_blocks(download_set).await;
858
859 metrics::histogram!("sync.stage.duration_seconds", "stage" => "obtain_tips")
860 .record(stage_start.elapsed().as_secs_f64());
861
862 Self::handle_hash_response(response).map_err(Into::into)
863 }
864
865 #[instrument(skip(self))]
866 async fn extend_tips(&mut self) -> Result<IndexSet<block::Hash>, Report> {
867 let stage_start = std::time::Instant::now();
868
869 let tips = std::mem::take(&mut self.prospective_tips);
870
871 let mut download_set = IndexSet::new();
872 debug!(tips = ?tips.len(), "trying to extend chain tips");
873 for tip in tips {
874 debug!(?tip, "asking peers to extend chain tip");
875 let mut responses = FuturesUnordered::new();
876 for attempt in 0..FANOUT {
877 if attempt > 0 {
878 // Let other tasks run, so we're more likely to choose a different peer.
879 //
880 // TODO: move fanouts into the PeerSet, so we always choose different peers (#2214)
881 tokio::task::yield_now().await;
882 }
883
884 let ready_tip_network = self.tip_network.ready().await;
885 responses.push(tokio::spawn(ready_tip_network.map_err(|e| eyre!(e))?.call(
886 zn::Request::FindBlocks {
887 known_blocks: vec![tip.tip],
888 stop: None,
889 },
890 )));
891 }
892 while let Some(res) = responses.next().await {
893 match res
894 .expect("panic in spawned extend tips request")
895 .map_err::<Report, _>(|e| eyre!(e))
896 {
897 Ok(zn::Response::BlockHashes(hashes)) => {
898 debug!(first = ?hashes.first(), len = ?hashes.len());
899 trace!(?hashes);
900
901 // zcashd sometimes appends an unrelated hash at the
902 // start or end of its response. Check the first hash
903 // against the previous response, and discard mismatches.
904 let unknown_hashes = match hashes.as_slice() {
905 [expected_hash, rest @ ..] if expected_hash == &tip.expected_next => {
906 rest
907 }
908 // If the first hash doesn't match, retry with the second.
909 [first_hash, expected_hash, rest @ ..]
910 if expected_hash == &tip.expected_next =>
911 {
912 debug!(?first_hash,
913 ?tip.expected_next,
914 ?tip.tip,
915 "unexpected first hash, but the second matches: using the hashes after the match");
916 rest
917 }
918 // We ignore these responses
919 [] => continue,
920 [single_hash] => {
921 debug!(?single_hash,
922 ?tip.expected_next,
923 ?tip.tip,
924 "discarding response containing a single unexpected hash");
925 continue;
926 }
927 [first_hash, second_hash, rest @ ..] => {
928 debug!(?first_hash,
929 ?second_hash,
930 rest_len = ?rest.len(),
931 ?tip.expected_next,
932 ?tip.tip,
933 "discarding response that starts with two unexpected hashes");
934 continue;
935 }
936 };
937
938 // We use the last hash for the tip, and we want to avoid
939 // bad tips. So we discard the last hash. (We don't need
940 // to worry about missed downloads, because we will pick
941 // them up again in the next ExtendTips.)
942 let unknown_hashes = match unknown_hashes {
943 [] => continue,
944 [rest @ .., _last] => rest,
945 };
946
947 let new_tip = if let Some(end) = unknown_hashes.rchunks_exact(2).next() {
948 CheckedTip {
949 tip: end[0],
950 expected_next: end[1],
951 }
952 } else {
953 debug!("discarding response that extends only one block");
954 continue;
955 };
956
957 trace!(?unknown_hashes);
958
959 // Make sure we get the same tips, regardless of the
960 // order of peer responses
961 if !download_set.contains(&new_tip.expected_next) {
962 debug!(?new_tip,
963 "adding new prospective tip, and removing any existing tips in the new block hash list");
964 self.prospective_tips
965 .retain(|t| !unknown_hashes.contains(&t.expected_next));
966 self.prospective_tips.insert(new_tip);
967 } else {
968 debug!(
969 ?new_tip,
970 "discarding prospective tip: already in download set"
971 );
972 }
973
974 // security: the first response determines our download order
975 //
976 // TODO: can we make the download order independent of response order?
977 let prev_download_len = download_set.len();
978 download_set.extend(unknown_hashes);
979 let new_download_len = download_set.len();
980 let new_hashes = new_download_len - prev_download_len;
981 debug!(new_hashes, "added hashes to download set");
982 metrics::histogram!("sync.extend.response.hash.count")
983 .record(new_hashes as f64);
984 }
985 Ok(_) => unreachable!("network returned wrong response"),
986 // We ignore this error because we made multiple fanout requests.
987 Err(e) => debug!(?e),
988 }
989 }
990 }
991
992 let new_downloads = download_set.len();
993 debug!(new_downloads, "queueing new downloads");
994 metrics::gauge!("sync.extend.queued.hash.count").set(new_downloads as f64);
995
996 // security: use the actual number of new downloads from all peers,
997 // so the last peer to respond can't toggle our mempool
998 self.recent_syncs.push_extend_tips_length(new_downloads);
999
1000 let response = self.request_blocks(download_set).await;
1001
1002 metrics::histogram!("sync.stage.duration_seconds", "stage" => "extend_tips")
1003 .record(stage_start.elapsed().as_secs_f64());
1004
1005 Self::handle_hash_response(response).map_err(Into::into)
1006 }
1007
1008 /// Download and verify the genesis block, if it isn't currently known to
1009 /// our node.
1010 async fn request_genesis(&mut self) -> Result<(), Report> {
1011 // Due to Bitcoin protocol limitations, we can't request the genesis
1012 // block using our standard tip-following algorithm:
1013 // - getblocks requires at least one hash
1014 // - responses start with the block *after* the requested block, and
1015 // - the genesis hash is used as a placeholder for "no matches".
1016 //
1017 // So we just download and verify the genesis block here.
1018 while !self.state_contains(self.genesis_hash).await? {
1019 info!("starting genesis block download and verify");
1020
1021 let response = timeout(SYNC_RESTART_DELAY, self.request_genesis_once())
1022 .await
1023 .map_err(Into::into);
1024
1025 // 3 layers of results is not ideal, but we need the timeout on the outside.
1026 match response {
1027 Ok(Ok(Ok(response))) => self
1028 .handle_block_response(Ok(response))
1029 .expect("never returns Err for Ok"),
1030 // Handle fatal errors
1031 Ok(Err(fatal_error)) => Err(fatal_error)?,
1032 // Handle timeouts and block errors
1033 Err(error) | Ok(Ok(Err(error))) => {
1034 // TODO: exit syncer on permanent service errors (NetworkError, VerifierError)
1035 if Self::should_restart_sync(&error) {
1036 warn!(
1037 ?error,
1038 "could not download or verify genesis block, retrying"
1039 );
1040 } else {
1041 info!(
1042 ?error,
1043 "temporary error downloading or verifying genesis block, retrying"
1044 );
1045 }
1046
1047 tokio::time::sleep(GENESIS_TIMEOUT_RETRY).await;
1048 }
1049 }
1050 }
1051
1052 Ok(())
1053 }
1054
1055 /// Try to download and verify the genesis block once.
1056 ///
1057 /// Fatal errors are returned in the outer result, temporary errors in the inner one.
1058 async fn request_genesis_once(
1059 &mut self,
1060 ) -> Result<Result<(Height, block::Hash), BlockDownloadVerifyError>, Report> {
1061 let response = self.downloads.download_and_verify(self.genesis_hash).await;
1062 Self::handle_response(response).map_err(|e| eyre!(e))?;
1063
1064 let response = self.downloads.next().await.expect("downloads is nonempty");
1065
1066 Ok(response)
1067 }
1068
1069 /// Queue download and verify tasks for each block that isn't currently known to our node.
1070 ///
1071 /// TODO: turn obtain and extend tips into a separate task, which sends hashes via a channel?
1072 async fn request_blocks(
1073 &mut self,
1074 mut hashes: IndexSet<block::Hash>,
1075 ) -> Result<IndexSet<block::Hash>, BlockDownloadVerifyError> {
1076 let lookahead_limit = self.lookahead_limit(hashes.len());
1077
1078 debug!(
1079 hashes.len = hashes.len(),
1080 ?lookahead_limit,
1081 "requesting blocks",
1082 );
1083
1084 let extra_hashes = if hashes.len() > lookahead_limit {
1085 hashes.split_off(lookahead_limit)
1086 } else {
1087 IndexSet::new()
1088 };
1089
1090 for hash in hashes.into_iter() {
1091 self.downloads.download_and_verify(hash).await?;
1092 }
1093
1094 Ok(extra_hashes)
1095 }
1096
1097 /// The configured lookahead limit, based on the currently verified height,
1098 /// and the number of hashes we haven't queued yet.
1099 fn lookahead_limit(&self, new_hashes: usize) -> usize {
1100 let max_checkpoint_height: usize = self
1101 .max_checkpoint_height
1102 .0
1103 .try_into()
1104 .expect("fits in usize");
1105
1106 // When the state is empty, we want to verify using checkpoints
1107 let verified_height: usize = self
1108 .latest_chain_tip
1109 .best_tip_height()
1110 .unwrap_or(Height(0))
1111 .0
1112 .try_into()
1113 .expect("fits in usize");
1114
1115 if verified_height >= max_checkpoint_height {
1116 self.full_verify_concurrency_limit
1117 } else if (verified_height + new_hashes) >= max_checkpoint_height {
1118 // If we're just about to start full verification, allow enough for the remaining checkpoint,
1119 // and also enough for a separate full verification lookahead.
1120 let checkpoint_hashes = verified_height + new_hashes - max_checkpoint_height;
1121
1122 self.full_verify_concurrency_limit + checkpoint_hashes
1123 } else {
1124 self.checkpoint_verify_concurrency_limit
1125 }
1126 }
1127
1128 /// Handles a response for a requested block.
1129 ///
1130 /// See [`Self::handle_response`] for more details.
1131 #[allow(unknown_lints)]
1132 fn handle_block_response(
1133 &mut self,
1134 response: Result<(Height, block::Hash), BlockDownloadVerifyError>,
1135 ) -> Result<(), BlockDownloadVerifyError> {
1136 match response {
1137 Ok((height, hash)) => {
1138 trace!(?height, ?hash, "verified and committed block to state");
1139
1140 return Ok(());
1141 }
1142
1143 Err(BlockDownloadVerifyError::Invalid {
1144 ref error,
1145 advertiser_addr: Some(advertiser_addr),
1146 ..
1147 }) if error.misbehavior_score() != 0 => {
1148 let _ = self
1149 .misbehavior_sender
1150 .try_send((advertiser_addr, error.misbehavior_score()));
1151 }
1152
1153 Err(_) => {}
1154 };
1155
1156 Self::handle_response(response)
1157 }
1158
1159 /// Handles a response to block hash submission, passing through any extra hashes.
1160 ///
1161 /// See [`Self::handle_response`] for more details.
1162 #[allow(unknown_lints)]
1163 fn handle_hash_response(
1164 response: Result<IndexSet<block::Hash>, BlockDownloadVerifyError>,
1165 ) -> Result<IndexSet<block::Hash>, BlockDownloadVerifyError> {
1166 match response {
1167 Ok(extra_hashes) => Ok(extra_hashes),
1168 Err(_) => Self::handle_response(response).map(|()| IndexSet::new()),
1169 }
1170 }
1171
1172 /// Handles a response to a syncer request.
1173 ///
1174 /// Returns `Ok` if the request was successful, or if an expected error occurred,
1175 /// so that the synchronization can continue normally.
1176 ///
1177 /// Returns `Err` if an unexpected error occurred, to force the synchronizer to restart.
1178 #[allow(unknown_lints)]
1179 fn handle_response<T>(
1180 response: Result<T, BlockDownloadVerifyError>,
1181 ) -> Result<(), BlockDownloadVerifyError> {
1182 match response {
1183 Ok(_t) => Ok(()),
1184 Err(error) => {
1185 // TODO: exit syncer on permanent service errors (NetworkError, VerifierError)
1186 if Self::should_restart_sync(&error) {
1187 Err(error)
1188 } else {
1189 Ok(())
1190 }
1191 }
1192 }
1193 }
1194
1195 /// Returns `true` if the hash is present in the state, and `false`
1196 /// if the hash is not present in the state.
1197 pub(crate) async fn state_contains(&mut self, hash: block::Hash) -> Result<bool, Report> {
1198 match self
1199 .state
1200 .ready()
1201 .await
1202 .map_err(|e| eyre!(e))?
1203 .call(zebra_state::Request::KnownBlock(hash))
1204 .await
1205 .map_err(|e| eyre!(e))?
1206 {
1207 zs::Response::KnownBlock(loc) => Ok(loc.is_some()),
1208 _ => unreachable!("wrong response to known block request"),
1209 }
1210 }
1211
1212 fn update_metrics(&mut self) {
1213 metrics::gauge!("sync.prospective_tips.len",).set(self.prospective_tips.len() as f64);
1214 metrics::gauge!("sync.downloads.in_flight",).set(self.downloads.in_flight() as f64);
1215 }
1216
1217 /// Return if the sync should be restarted based on the given error
1218 /// from the block downloader and verifier stream.
1219 fn should_restart_sync(e: &BlockDownloadVerifyError) -> bool {
1220 match e {
1221 // Structural matches: downcasts
1222 BlockDownloadVerifyError::Invalid { error, .. } if error.is_duplicate_request() => {
1223 debug!(error = ?e, "block was already verified or committed, possibly from a previous sync run, continuing");
1224 false
1225 }
1226
1227 // Structural matches: direct
1228 BlockDownloadVerifyError::CancelledDuringDownload { .. }
1229 | BlockDownloadVerifyError::CancelledDuringVerification { .. } => {
1230 debug!(error = ?e, "block verification was cancelled, continuing");
1231 false
1232 }
1233 BlockDownloadVerifyError::BehindTipHeightLimit { .. } => {
1234 debug!(
1235 error = ?e,
1236 "block height is behind the current state tip, \
1237 assuming the syncer will eventually catch up to the state, continuing"
1238 );
1239 false
1240 }
1241 BlockDownloadVerifyError::DuplicateBlockQueuedForDownload { .. } => {
1242 debug!(
1243 error = ?e,
1244 "queued duplicate block hash for download, \
1245 assuming the syncer will eventually resolve duplicates, continuing"
1246 );
1247 false
1248 }
1249
1250 BlockDownloadVerifyError::DownloadFailed { ref error, .. }
1251 if format!("{error:?}").contains("NotFound") =>
1252 {
1253 // Covers these errors:
1254 // - NotFoundResponse
1255 // - NotFoundRegistry
1256 //
1257 // TODO: improve this by checking the type (#2908)
1258 // restart after a certain number of NotFound errors?
1259 debug!(error = ?e, "block was not found, possibly from a peer that doesn't have the block yet, continuing");
1260 false
1261 }
1262
1263 _ => {
1264 // download_and_verify downcasts errors from the block verifier
1265 // into VerifyChainError, and puts the result inside one of the
1266 // BlockDownloadVerifyError enumerations. This downcast could
1267 // become incorrect e.g. after some refactoring, and it is difficult
1268 // to write a test to check it. The test below is a best-effort
1269 // attempt to catch if that happens and log it.
1270 //
1271 // TODO: add a proper test and remove this
1272 // https://github.com/ZcashFoundation/zebra/issues/2909
1273 let err_str = format!("{e:?}");
1274 if err_str.contains("NotFound") {
1275 error!(?e,
1276 "a BlockDownloadVerifyError that should have been filtered out was detected, \
1277 which possibly indicates a programming error in the downcast inside \
1278 zebrad::components::sync::downloads::Downloads::download_and_verify"
1279 )
1280 }
1281
1282 warn!(?e, "error downloading and verifying block");
1283 true
1284 }
1285 }
1286 }
1287}