zebrad/components/sync.rs
1//! The syncer downloads and verifies large numbers of blocks from peers to Zebra.
2//!
3//! It is used when Zebra is a long way behind the current chain tip.
4
5use std::{cmp::max, collections::HashSet, convert, pin::Pin, task::Poll, time::Duration};
6
7use color_eyre::eyre::{eyre, Report};
8use futures::stream::{FuturesUnordered, StreamExt};
9use indexmap::IndexSet;
10use serde::{Deserialize, Serialize};
11use tokio::{
12 sync::{mpsc, watch},
13 task::JoinError,
14 time::{sleep, timeout},
15};
16use tower::{
17 builder::ServiceBuilder, hedge::Hedge, limit::ConcurrencyLimit, retry::Retry, timeout::Timeout,
18 Service, ServiceExt,
19};
20
21use zebra_chain::{
22 block::{self, Height, HeightDiff},
23 chain_tip::ChainTip,
24};
25use zebra_network::{self as zn, PeerSocketAddr};
26use zebra_state as zs;
27
28use crate::{
29 components::sync::downloads::BlockDownloadVerifyError, config::ZebradConfig, BoxError,
30};
31
32mod downloads;
33pub mod end_of_support;
34mod gossip;
35mod progress;
36mod recent_sync_lengths;
37mod status;
38
39#[cfg(test)]
40mod tests;
41
42use downloads::{AlwaysHedge, Downloads};
43
44pub use downloads::VERIFICATION_PIPELINE_SCALING_MULTIPLIER;
45pub use gossip::{gossip_best_tip_block_hashes, BlockGossipError};
46pub use progress::show_block_chain_progress;
47pub use recent_sync_lengths::RecentSyncLengths;
48pub use status::SyncStatus;
49
50/// Controls the number of peers used for each ObtainTips and ExtendTips request.
51const FANOUT: usize = 3;
52
53/// Controls how many times we will retry each block download.
54///
55/// Failing block downloads is important because it defends against peers who
56/// feed us bad hashes. But spurious failures of valid blocks cause the syncer to
57/// restart from the previous checkpoint, potentially re-downloading blocks.
58///
59/// We also hedge requests, so we may retry up to twice this many times. Hedged
60/// retries may be concurrent, inner retries are sequential.
61const BLOCK_DOWNLOAD_RETRY_LIMIT: usize = 3;
62
63/// A lower bound on the user-specified checkpoint verification concurrency limit.
64///
65/// Set to the maximum checkpoint interval, so the pipeline holds around a checkpoint's
66/// worth of blocks.
67///
68/// ## Security
69///
70/// If a malicious node is chosen for an ObtainTips or ExtendTips request, it can
71/// provide up to 500 malicious block hashes. These block hashes will be
72/// distributed across all available peers. Assuming there are around 50 connected
73/// peers, the malicious node will receive approximately 10 of those block requests.
74///
75/// Malicious deserialized blocks can take up a large amount of RAM, see
76/// [`super::inbound::downloads::MAX_INBOUND_CONCURRENCY`] and #1880 for details.
77/// So we want to keep the lookahead limit reasonably small.
78///
79/// Once these malicious blocks start failing validation, the syncer will cancel all
80/// the pending download and verify tasks, drop all the blocks, and start a new
81/// ObtainTips with a new set of peers.
82pub const MIN_CHECKPOINT_CONCURRENCY_LIMIT: usize = zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP;
83
84/// The default for the user-specified lookahead limit.
85///
86/// See [`MIN_CHECKPOINT_CONCURRENCY_LIMIT`] for details.
87pub const DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT: usize = MAX_TIPS_RESPONSE_HASH_COUNT * 2;
88
89/// A lower bound on the user-specified concurrency limit.
90///
91/// If the concurrency limit is 0, Zebra can't download or verify any blocks.
92pub const MIN_CONCURRENCY_LIMIT: usize = 1;
93
94/// The expected maximum number of hashes in an ObtainTips or ExtendTips response.
95///
96/// This is used to allow block heights that are slightly beyond the lookahead limit,
97/// but still limit the number of blocks in the pipeline between the downloader and
98/// the state.
99///
100/// See [`MIN_CHECKPOINT_CONCURRENCY_LIMIT`] for details.
101pub const MAX_TIPS_RESPONSE_HASH_COUNT: usize = 500;
102
103/// Controls how long we wait for a tips response to return.
104///
105/// ## Correctness
106///
107/// If this timeout is removed (or set too high), the syncer will sometimes hang.
108///
109/// If this timeout is set too low, the syncer will sometimes get stuck in a
110/// failure loop.
111pub const TIPS_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6);
112
113/// Controls how long we wait between gossiping successive blocks or transactions.
114///
115/// ## Correctness
116///
117/// If this timeout is set too high, blocks and transactions won't propagate through
118/// the network efficiently.
119///
120/// If this timeout is set too low, the peer set and remote peers can get overloaded.
121pub const PEER_GOSSIP_DELAY: Duration = Duration::from_secs(7);
122
123/// Controls how long we wait for a block download request to complete.
124///
125/// This timeout makes sure that the syncer doesn't hang when:
126/// - the lookahead queue is full, and
127/// - we are waiting for a request that is stuck.
128///
129/// See [`BLOCK_VERIFY_TIMEOUT`] for details.
130///
131/// ## Correctness
132///
133/// If this timeout is removed (or set too high), the syncer will sometimes hang.
134///
135/// If this timeout is set too low, the syncer will sometimes get stuck in a
136/// failure loop.
137///
138/// We set the timeout so that it requires under 1 Mbps bandwidth for a full 2 MB block.
139pub(super) const BLOCK_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(20);
140
141/// Controls how long we wait for a block verify request to complete.
142///
143/// This timeout makes sure that the syncer doesn't hang when:
144/// - the lookahead queue is full, and
145/// - all pending verifications:
146/// - are waiting on a missing download request,
147/// - are waiting on a download or verify request that has failed, but we have
148/// deliberately ignored the error,
149/// - are for blocks a long way ahead of the current tip, or
150/// - are for invalid blocks which will never verify, because they depend on
151/// missing blocks or transactions.
152///
153/// These conditions can happen during normal operation - they are not bugs.
154///
155/// This timeout also mitigates or hides the following kinds of bugs:
156/// - all pending verifications:
157/// - are waiting on a download or verify request that has failed, but we have
158/// accidentally dropped the error,
159/// - are waiting on a download request that has hung inside Zebra,
160/// - are on tokio threads that are waiting for blocked operations.
161///
162/// ## Correctness
163///
164/// If this timeout is removed (or set too high), the syncer will sometimes hang.
165///
166/// If this timeout is set too low, the syncer will sometimes get stuck in a
167/// failure loop.
168///
169/// We've observed spurious 15 minute timeouts when a lot of blocks are being committed to
170/// the state. But there are also some blocks that seem to hang entirely, and never return.
171///
172/// So we allow about half the spurious timeout, which might cause some re-downloads.
173pub(super) const BLOCK_VERIFY_TIMEOUT: Duration = Duration::from_secs(8 * 60);
174
175/// A shorter timeout used for the first few blocks after the final checkpoint.
176///
177/// This is a workaround for bug #5125, where the first fully validated blocks
178/// after the final checkpoint fail with a timeout, due to a UTXO race condition.
179const FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT: Duration = Duration::from_secs(2 * 60);
180
181/// The number of blocks after the final checkpoint that get the shorter timeout.
182///
183/// We've only seen this error on the first few blocks after the final checkpoint.
184const FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT_LIMIT: HeightDiff = 100;
185
186/// Controls how long we wait to restart syncing after finishing a sync run.
187///
188/// This delay should be long enough to:
189/// - allow zcashd peers to process pending requests. If the node only has a
190/// few peers, we want to clear as much peer state as possible. In
191/// particular, zcashd sends "next block range" hints, based on zcashd's
192/// internal model of our sync progress. But we want to discard these hints,
193/// so they don't get confused with ObtainTips and ExtendTips responses, and
194/// - allow in-progress downloads to time out.
195///
196/// This delay is particularly important on instances with slow or unreliable
197/// networks, and on testnet, which has a small number of slow peers.
198///
199/// Using a prime number makes sure that syncer fanouts don't synchronise with other crawls.
200///
201/// ## Correctness
202///
203/// If this delay is removed (or set too low), the syncer will
204/// sometimes get stuck in a failure loop, due to leftover downloads from
205/// previous sync runs.
206const SYNC_RESTART_DELAY: Duration = Duration::from_secs(67);
207
208/// Controls how long we wait to retry a failed attempt to download
209/// and verify the genesis block.
210///
211/// This timeout gives the crawler time to find better peers.
212///
213/// ## Security
214///
215/// If this timeout is removed (or set too low), Zebra will immediately retry
216/// to download and verify the genesis block from its peers. This can cause
217/// a denial of service on those peers.
218///
219/// If this timeout is too short, old or buggy nodes will keep making useless
220/// network requests. If there are a lot of them, it could overwhelm the network.
221const GENESIS_TIMEOUT_RETRY: Duration = Duration::from_secs(10);
222
223/// Sync configuration section.
224#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
225#[serde(deny_unknown_fields, default)]
226pub struct Config {
227 /// The number of parallel block download requests.
228 ///
229 /// This is set to a low value by default, to avoid task and
230 /// network contention. Increasing this value may improve
231 /// performance on machines with a fast network connection.
232 #[serde(alias = "max_concurrent_block_requests")]
233 pub download_concurrency_limit: usize,
234
235 /// The number of blocks submitted in parallel to the checkpoint verifier.
236 ///
237 /// Increasing this limit increases the buffer size, so it reduces
238 /// the impact of an individual block request failing. However, it
239 /// also increases memory and CPU usage if block validation stalls,
240 /// or there are some large blocks in the pipeline.
241 ///
242 /// The block size limit is 2MB, so in theory, this could represent multiple
243 /// gigabytes of data, if we downloaded arbitrary blocks. However,
244 /// because we randomly load balance outbound requests, and separate
245 /// block download from obtaining block hashes, an adversary would
246 /// have to control a significant fraction of our peers to lead us
247 /// astray.
248 ///
249 /// For reliable checkpoint syncing, Zebra enforces a
250 /// [`MIN_CHECKPOINT_CONCURRENCY_LIMIT`].
251 ///
252 /// This is set to a high value by default, to avoid verification pipeline stalls.
253 /// Decreasing this value reduces RAM usage.
254 #[serde(alias = "lookahead_limit")]
255 pub checkpoint_verify_concurrency_limit: usize,
256
257 /// The number of blocks submitted in parallel to the full verifier.
258 ///
259 /// This is set to a low value by default, to avoid verification timeouts on large blocks.
260 /// Increasing this value may improve performance on machines with many cores.
261 pub full_verify_concurrency_limit: usize,
262
263 /// The number of threads used to verify signatures, proofs, and other CPU-intensive code.
264 ///
265 /// If the number of threads is not configured or zero, Zebra uses the number of logical cores.
266 /// If the number of logical cores can't be detected, Zebra uses one thread.
267 /// For details, see [the `rayon` documentation](https://docs.rs/rayon/latest/rayon/struct.ThreadPoolBuilder.html#method.num_threads).
268 pub parallel_cpu_threads: usize,
269}
270
271impl Default for Config {
272 fn default() -> Self {
273 Self {
274 // 2/3 of the default outbound peer limit.
275 download_concurrency_limit: 50,
276
277 // A few max-length checkpoints.
278 checkpoint_verify_concurrency_limit: DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT,
279
280 // This default is deliberately very low, so Zebra can verify a few large blocks in under 60 seconds,
281 // even on machines with only a few cores.
282 //
283 // This lets users see the committed block height changing in every progress log,
284 // and avoids hangs due to out-of-order verifications flooding the CPUs.
285 //
286 // TODO:
287 // - limit full verification concurrency based on block transaction counts?
288 // - move more disk work to blocking tokio threads,
289 // and CPU work to the rayon thread pool inside blocking tokio threads
290 full_verify_concurrency_limit: 20,
291
292 // Use one thread per CPU.
293 //
294 // If this causes tokio executor starvation, move CPU-intensive tasks to rayon threads,
295 // or reserve a few cores for tokio threads, based on `num_cpus()`.
296 parallel_cpu_threads: 0,
297 }
298 }
299}
300
301/// Helps work around defects in the bitcoin protocol by checking whether
302/// the returned hashes actually extend a chain tip.
303#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
304struct CheckedTip {
305 tip: block::Hash,
306 expected_next: block::Hash,
307}
308
309pub struct ChainSync<ZN, ZS, ZV, ZSTip>
310where
311 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError>
312 + Send
313 + Sync
314 + Clone
315 + 'static,
316 ZN::Future: Send,
317 ZS: Service<zs::Request, Response = zs::Response, Error = BoxError>
318 + Send
319 + Sync
320 + Clone
321 + 'static,
322 ZS::Future: Send,
323 ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
324 + Send
325 + Sync
326 + Clone
327 + 'static,
328 ZV::Future: Send,
329 ZSTip: ChainTip + Clone + Send + 'static,
330{
331 // Configuration
332 //
333 /// The genesis hash for the configured network
334 genesis_hash: block::Hash,
335
336 /// The largest block height for the checkpoint verifier, based on the current config.
337 max_checkpoint_height: Height,
338
339 /// The configured checkpoint verification concurrency limit, after applying the minimum limit.
340 checkpoint_verify_concurrency_limit: usize,
341
342 /// The configured full verification concurrency limit, after applying the minimum limit.
343 full_verify_concurrency_limit: usize,
344
345 // Services
346 //
347 /// A network service which is used to perform ObtainTips and ExtendTips
348 /// requests.
349 ///
350 /// Has no retry logic, because failover is handled using fanout.
351 tip_network: Timeout<ZN>,
352
353 /// A service which downloads and verifies blocks, using the provided
354 /// network and verifier services.
355 downloads: Pin<
356 Box<
357 Downloads<
358 Hedge<ConcurrencyLimit<Retry<zn::RetryLimit, Timeout<ZN>>>, AlwaysHedge>,
359 Timeout<ZV>,
360 ZSTip,
361 >,
362 >,
363 >,
364
365 /// The cached block chain state.
366 state: ZS,
367
368 /// Allows efficient access to the best tip of the blockchain.
369 latest_chain_tip: ZSTip,
370
371 // Internal sync state
372 //
373 /// The tips that the syncer is currently following.
374 prospective_tips: HashSet<CheckedTip>,
375
376 /// The lengths of recent sync responses.
377 recent_syncs: RecentSyncLengths,
378
379 /// Receiver that is `true` when the downloader is past the lookahead limit.
380 /// This is based on the downloaded block height and the state tip height.
381 past_lookahead_limit_receiver: zs::WatchReceiver<bool>,
382
383 /// Sender for reporting peer addresses that advertised unexpectedly invalid transactions.
384 misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
385}
386
387/// Polls the network to determine whether further blocks are available and
388/// downloads them.
389///
390/// This component is used for initial block sync, but the `Inbound` service is
391/// responsible for participating in the gossip protocols used for block
392/// diffusion.
393impl<ZN, ZS, ZV, ZSTip> ChainSync<ZN, ZS, ZV, ZSTip>
394where
395 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError>
396 + Send
397 + Sync
398 + Clone
399 + 'static,
400 ZN::Future: Send,
401 ZS: Service<zs::Request, Response = zs::Response, Error = BoxError>
402 + Send
403 + Sync
404 + Clone
405 + 'static,
406 ZS::Future: Send,
407 ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
408 + Send
409 + Sync
410 + Clone
411 + 'static,
412 ZV::Future: Send,
413 ZSTip: ChainTip + Clone + Send + 'static,
414{
415 /// Returns a new syncer instance, using:
416 /// - chain: the zebra-chain `Network` to download (Mainnet or Testnet)
417 /// - peers: the zebra-network peers to contact for downloads
418 /// - verifier: the zebra-consensus verifier that checks the chain
419 /// - state: the zebra-state that stores the chain
420 /// - latest_chain_tip: the latest chain tip from `state`
421 ///
422 /// Also returns a [`SyncStatus`] to check if the syncer has likely reached the chain tip.
423 pub fn new(
424 config: &ZebradConfig,
425 max_checkpoint_height: Height,
426 peers: ZN,
427 verifier: ZV,
428 state: ZS,
429 latest_chain_tip: ZSTip,
430 misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
431 ) -> (Self, SyncStatus) {
432 let mut download_concurrency_limit = config.sync.download_concurrency_limit;
433 let mut checkpoint_verify_concurrency_limit =
434 config.sync.checkpoint_verify_concurrency_limit;
435 let mut full_verify_concurrency_limit = config.sync.full_verify_concurrency_limit;
436
437 if download_concurrency_limit < MIN_CONCURRENCY_LIMIT {
438 warn!(
439 "configured download concurrency limit {} too low, increasing to {}",
440 config.sync.download_concurrency_limit, MIN_CONCURRENCY_LIMIT,
441 );
442
443 download_concurrency_limit = MIN_CONCURRENCY_LIMIT;
444 }
445
446 if checkpoint_verify_concurrency_limit < MIN_CHECKPOINT_CONCURRENCY_LIMIT {
447 warn!(
448 "configured checkpoint verify concurrency limit {} too low, increasing to {}",
449 config.sync.checkpoint_verify_concurrency_limit, MIN_CHECKPOINT_CONCURRENCY_LIMIT,
450 );
451
452 checkpoint_verify_concurrency_limit = MIN_CHECKPOINT_CONCURRENCY_LIMIT;
453 }
454
455 if full_verify_concurrency_limit < MIN_CONCURRENCY_LIMIT {
456 warn!(
457 "configured full verify concurrency limit {} too low, increasing to {}",
458 config.sync.full_verify_concurrency_limit, MIN_CONCURRENCY_LIMIT,
459 );
460
461 full_verify_concurrency_limit = MIN_CONCURRENCY_LIMIT;
462 }
463
464 let tip_network = Timeout::new(peers.clone(), TIPS_RESPONSE_TIMEOUT);
465
466 // The Hedge middleware is the outermost layer, hedging requests
467 // between two retry-wrapped networks. The innermost timeout
468 // layer is relatively unimportant, because slow requests will
469 // probably be preemptively hedged.
470 //
471 // The Hedge goes outside the Retry, because the Retry layer
472 // abstracts away spurious failures from individual peers
473 // making a less-fallible network service, and the Hedge layer
474 // tries to reduce latency of that less-fallible service.
475 let block_network = Hedge::new(
476 ServiceBuilder::new()
477 .concurrency_limit(download_concurrency_limit)
478 .retry(zn::RetryLimit::new(BLOCK_DOWNLOAD_RETRY_LIMIT))
479 .timeout(BLOCK_DOWNLOAD_TIMEOUT)
480 .service(peers),
481 AlwaysHedge,
482 20,
483 0.95,
484 2 * SYNC_RESTART_DELAY,
485 );
486
487 // We apply a timeout to the verifier to avoid hangs due to missing earlier blocks.
488 let verifier = Timeout::new(verifier, BLOCK_VERIFY_TIMEOUT);
489
490 let (sync_status, recent_syncs) = SyncStatus::new();
491
492 let (past_lookahead_limit_sender, past_lookahead_limit_receiver) = watch::channel(false);
493 let past_lookahead_limit_receiver = zs::WatchReceiver::new(past_lookahead_limit_receiver);
494
495 let downloads = Box::pin(Downloads::new(
496 block_network,
497 verifier,
498 latest_chain_tip.clone(),
499 past_lookahead_limit_sender,
500 max(
501 checkpoint_verify_concurrency_limit,
502 full_verify_concurrency_limit,
503 ),
504 max_checkpoint_height,
505 ));
506
507 let new_syncer = Self {
508 genesis_hash: config.network.network.genesis_hash(),
509 max_checkpoint_height,
510 checkpoint_verify_concurrency_limit,
511 full_verify_concurrency_limit,
512 tip_network,
513 downloads,
514 state,
515 latest_chain_tip,
516 prospective_tips: HashSet::new(),
517 recent_syncs,
518 past_lookahead_limit_receiver,
519 misbehavior_sender,
520 };
521
522 (new_syncer, sync_status)
523 }
524
525 /// Runs the syncer to synchronize the chain and keep it synchronized.
526 #[instrument(skip(self))]
527 pub async fn sync(mut self) -> Result<(), Report> {
528 // We can't download the genesis block using our normal algorithm,
529 // due to protocol limitations
530 self.request_genesis().await?;
531
532 loop {
533 if self.try_to_sync().await.is_err() {
534 self.downloads.cancel_all();
535 }
536
537 self.update_metrics();
538
539 info!(
540 timeout = ?SYNC_RESTART_DELAY,
541 state_tip = ?self.latest_chain_tip.best_tip_height(),
542 "waiting to restart sync"
543 );
544 sleep(SYNC_RESTART_DELAY).await;
545 }
546 }
547
548 /// Tries to synchronize the chain as far as it can.
549 ///
550 /// Obtains some prospective tips and iteratively tries to extend them and download the missing
551 /// blocks.
552 ///
553 /// Returns `Ok` if it was able to synchronize as much of the chain as it could, and then ran
554 /// out of prospective tips. This happens when synchronization finishes or if Zebra ended up
555 /// following a fork. Either way, Zebra should attempt to obtain some more tips.
556 ///
557 /// Returns `Err` if there was an unrecoverable error and restarting the synchronization is
558 /// necessary. This includes outer timeouts, where an entire syncing step takes an extremely
559 /// long time. (These usually indicate hangs.)
560 #[instrument(skip(self))]
561 async fn try_to_sync(&mut self) -> Result<(), Report> {
562 self.prospective_tips = HashSet::new();
563
564 info!(
565 state_tip = ?self.latest_chain_tip.best_tip_height(),
566 "starting sync, obtaining new tips"
567 );
568 let mut extra_hashes = timeout(SYNC_RESTART_DELAY, self.obtain_tips())
569 .await
570 .map_err(Into::into)
571 // TODO: replace with flatten() when it stabilises (#70142)
572 .and_then(convert::identity)
573 .map_err(|e| {
574 info!("temporary error obtaining tips: {:#}", e);
575 e
576 })?;
577 self.update_metrics();
578
579 while !self.prospective_tips.is_empty() || !extra_hashes.is_empty() {
580 // Avoid hangs due to service readiness or other internal operations
581 extra_hashes = timeout(BLOCK_VERIFY_TIMEOUT, self.try_to_sync_once(extra_hashes))
582 .await
583 .map_err(Into::into)
584 // TODO: replace with flatten() when it stabilises (#70142)
585 .and_then(convert::identity)?;
586 }
587
588 info!("exhausted prospective tip set");
589
590 Ok(())
591 }
592
593 /// Tries to synchronize the chain once, using the existing `extra_hashes`.
594 ///
595 /// Tries to extend the existing tips and download the missing blocks.
596 ///
597 /// Returns `Ok(extra_hashes)` if it was able to extend once and synchronize sone of the chain.
598 /// Returns `Err` if there was an unrecoverable error and restarting the synchronization is
599 /// necessary.
600 #[instrument(skip(self, extra_hashes))]
601 async fn try_to_sync_once(
602 &mut self,
603 mut extra_hashes: IndexSet<block::Hash>,
604 ) -> Result<IndexSet<block::Hash>, Report> {
605 // Check whether any block tasks are currently ready.
606 while let Poll::Ready(Some(rsp)) = futures::poll!(self.downloads.next()) {
607 // Some temporary errors are ignored, and syncing continues with other blocks.
608 // If it turns out they were actually important, syncing will run out of blocks, and
609 // the syncer will reset itself.
610 self.handle_block_response(rsp)?;
611 }
612 self.update_metrics();
613
614 // Pause new downloads while the syncer or downloader are past their lookahead limits.
615 //
616 // To avoid a deadlock or long waits for blocks to expire, we ignore the download
617 // lookahead limit when there are only a small number of blocks waiting.
618 while self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len())
619 || (self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len()) / 2
620 && self.past_lookahead_limit_receiver.cloned_watch_data())
621 {
622 trace!(
623 tips.len = self.prospective_tips.len(),
624 in_flight = self.downloads.in_flight(),
625 extra_hashes = extra_hashes.len(),
626 lookahead_limit = self.lookahead_limit(extra_hashes.len()),
627 state_tip = ?self.latest_chain_tip.best_tip_height(),
628 "waiting for pending blocks",
629 );
630
631 let response = self.downloads.next().await.expect("downloads is nonempty");
632
633 self.handle_block_response(response)?;
634 self.update_metrics();
635 }
636
637 // Once we're below the lookahead limit, we can request more blocks or hashes.
638 if !extra_hashes.is_empty() {
639 debug!(
640 tips.len = self.prospective_tips.len(),
641 in_flight = self.downloads.in_flight(),
642 extra_hashes = extra_hashes.len(),
643 lookahead_limit = self.lookahead_limit(extra_hashes.len()),
644 state_tip = ?self.latest_chain_tip.best_tip_height(),
645 "requesting more blocks",
646 );
647
648 let response = self.request_blocks(extra_hashes).await;
649 extra_hashes = Self::handle_hash_response(response)?;
650 } else {
651 info!(
652 tips.len = self.prospective_tips.len(),
653 in_flight = self.downloads.in_flight(),
654 extra_hashes = extra_hashes.len(),
655 lookahead_limit = self.lookahead_limit(extra_hashes.len()),
656 state_tip = ?self.latest_chain_tip.best_tip_height(),
657 "extending tips",
658 );
659
660 extra_hashes = self.extend_tips().await.map_err(|e| {
661 info!("temporary error extending tips: {:#}", e);
662 e
663 })?;
664 }
665 self.update_metrics();
666
667 Ok(extra_hashes)
668 }
669
670 /// Given a block_locator list fan out request for subsequent hashes to
671 /// multiple peers
672 #[instrument(skip(self))]
673 async fn obtain_tips(&mut self) -> Result<IndexSet<block::Hash>, Report> {
674 let stage_start = std::time::Instant::now();
675
676 let block_locator = self
677 .state
678 .ready()
679 .await
680 .map_err(|e| eyre!(e))?
681 .call(zebra_state::Request::BlockLocator)
682 .await
683 .map(|response| match response {
684 zebra_state::Response::BlockLocator(block_locator) => block_locator,
685 _ => unreachable!(
686 "GetBlockLocator request can only result in Response::BlockLocator"
687 ),
688 })
689 .map_err(|e| eyre!(e))?;
690
691 debug!(
692 tip = ?block_locator.first().expect("we have at least one block locator object"),
693 ?block_locator,
694 "got block locator and trying to obtain new chain tips"
695 );
696
697 let mut requests = FuturesUnordered::new();
698 for attempt in 0..FANOUT {
699 if attempt > 0 {
700 // Let other tasks run, so we're more likely to choose a different peer.
701 //
702 // TODO: move fanouts into the PeerSet, so we always choose different peers (#2214)
703 tokio::task::yield_now().await;
704 }
705
706 let ready_tip_network = self.tip_network.ready().await;
707 requests.push(tokio::spawn(ready_tip_network.map_err(|e| eyre!(e))?.call(
708 zn::Request::FindBlocks {
709 known_blocks: block_locator.clone(),
710 stop: None,
711 },
712 )));
713 }
714
715 let mut download_set = IndexSet::new();
716 while let Some(res) = requests.next().await {
717 match res
718 .unwrap_or_else(|e @ JoinError { .. }| {
719 if e.is_panic() {
720 panic!("panic in obtain tips task: {e:?}");
721 } else {
722 info!(
723 "task error during obtain tips task: {e:?},\
724 is Zebra shutting down?"
725 );
726 Err(e.into())
727 }
728 })
729 .map_err::<Report, _>(|e| eyre!(e))
730 {
731 Ok(zn::Response::BlockHashes(hashes)) => {
732 trace!(?hashes);
733
734 // zcashd sometimes appends an unrelated hash at the start
735 // or end of its response.
736 //
737 // We can't discard the first hash, because it might be a
738 // block we want to download. So we just accept any
739 // out-of-order first hashes.
740
741 // We use the last hash for the tip, and we want to avoid bad
742 // tips. So we discard the last hash. (We don't need to worry
743 // about missed downloads, because we will pick them up again
744 // in ExtendTips.)
745 let hashes = match hashes.as_slice() {
746 [] => continue,
747 [rest @ .., _last] => rest,
748 };
749
750 let mut first_unknown = None;
751 for (i, &hash) in hashes.iter().enumerate() {
752 if !self.state_contains(hash).await? {
753 first_unknown = Some(i);
754 break;
755 }
756 }
757
758 debug!(hashes.len = ?hashes.len(), ?first_unknown);
759
760 let unknown_hashes = if let Some(index) = first_unknown {
761 &hashes[index..]
762 } else {
763 continue;
764 };
765
766 trace!(?unknown_hashes);
767
768 let new_tip = if let Some(end) = unknown_hashes.rchunks_exact(2).next() {
769 CheckedTip {
770 tip: end[0],
771 expected_next: end[1],
772 }
773 } else {
774 debug!("discarding response that extends only one block");
775 continue;
776 };
777
778 // Make sure we get the same tips, regardless of the
779 // order of peer responses
780 if !download_set.contains(&new_tip.expected_next) {
781 debug!(?new_tip,
782 "adding new prospective tip, and removing existing tips in the new block hash list");
783 self.prospective_tips
784 .retain(|t| !unknown_hashes.contains(&t.expected_next));
785 self.prospective_tips.insert(new_tip);
786 } else {
787 debug!(
788 ?new_tip,
789 "discarding prospective tip: already in download set"
790 );
791 }
792
793 // security: the first response determines our download order
794 //
795 // TODO: can we make the download order independent of response order?
796 let prev_download_len = download_set.len();
797 download_set.extend(unknown_hashes);
798 let new_download_len = download_set.len();
799 let new_hashes = new_download_len - prev_download_len;
800 debug!(new_hashes, "added hashes to download set");
801 metrics::histogram!("sync.obtain.response.hash.count")
802 .record(new_hashes as f64);
803 }
804 Ok(_) => unreachable!("network returned wrong response"),
805 // We ignore this error because we made multiple fanout requests.
806 Err(e) => debug!(?e),
807 }
808 }
809
810 debug!(?self.prospective_tips);
811
812 // Check that the new tips we got are actually unknown.
813 for hash in &download_set {
814 debug!(?hash, "checking if state contains hash");
815 if self.state_contains(*hash).await? {
816 return Err(eyre!("queued download of hash behind our chain tip"));
817 }
818 }
819
820 let new_downloads = download_set.len();
821 debug!(new_downloads, "queueing new downloads");
822 metrics::gauge!("sync.obtain.queued.hash.count").set(new_downloads as f64);
823
824 // security: use the actual number of new downloads from all peers,
825 // so the last peer to respond can't toggle our mempool
826 self.recent_syncs.push_obtain_tips_length(new_downloads);
827
828 let response = self.request_blocks(download_set).await;
829
830 metrics::histogram!("sync.stage.duration_seconds", "stage" => "obtain_tips")
831 .record(stage_start.elapsed().as_secs_f64());
832
833 Self::handle_hash_response(response).map_err(Into::into)
834 }
835
836 #[instrument(skip(self))]
837 async fn extend_tips(&mut self) -> Result<IndexSet<block::Hash>, Report> {
838 let stage_start = std::time::Instant::now();
839
840 let tips = std::mem::take(&mut self.prospective_tips);
841
842 let mut download_set = IndexSet::new();
843 debug!(tips = ?tips.len(), "trying to extend chain tips");
844 for tip in tips {
845 debug!(?tip, "asking peers to extend chain tip");
846 let mut responses = FuturesUnordered::new();
847 for attempt in 0..FANOUT {
848 if attempt > 0 {
849 // Let other tasks run, so we're more likely to choose a different peer.
850 //
851 // TODO: move fanouts into the PeerSet, so we always choose different peers (#2214)
852 tokio::task::yield_now().await;
853 }
854
855 let ready_tip_network = self.tip_network.ready().await;
856 responses.push(tokio::spawn(ready_tip_network.map_err(|e| eyre!(e))?.call(
857 zn::Request::FindBlocks {
858 known_blocks: vec![tip.tip],
859 stop: None,
860 },
861 )));
862 }
863 while let Some(res) = responses.next().await {
864 match res
865 .expect("panic in spawned extend tips request")
866 .map_err::<Report, _>(|e| eyre!(e))
867 {
868 Ok(zn::Response::BlockHashes(hashes)) => {
869 debug!(first = ?hashes.first(), len = ?hashes.len());
870 trace!(?hashes);
871
872 // zcashd sometimes appends an unrelated hash at the
873 // start or end of its response. Check the first hash
874 // against the previous response, and discard mismatches.
875 let unknown_hashes = match hashes.as_slice() {
876 [expected_hash, rest @ ..] if expected_hash == &tip.expected_next => {
877 rest
878 }
879 // If the first hash doesn't match, retry with the second.
880 [first_hash, expected_hash, rest @ ..]
881 if expected_hash == &tip.expected_next =>
882 {
883 debug!(?first_hash,
884 ?tip.expected_next,
885 ?tip.tip,
886 "unexpected first hash, but the second matches: using the hashes after the match");
887 rest
888 }
889 // We ignore these responses
890 [] => continue,
891 [single_hash] => {
892 debug!(?single_hash,
893 ?tip.expected_next,
894 ?tip.tip,
895 "discarding response containing a single unexpected hash");
896 continue;
897 }
898 [first_hash, second_hash, rest @ ..] => {
899 debug!(?first_hash,
900 ?second_hash,
901 rest_len = ?rest.len(),
902 ?tip.expected_next,
903 ?tip.tip,
904 "discarding response that starts with two unexpected hashes");
905 continue;
906 }
907 };
908
909 // We use the last hash for the tip, and we want to avoid
910 // bad tips. So we discard the last hash. (We don't need
911 // to worry about missed downloads, because we will pick
912 // them up again in the next ExtendTips.)
913 let unknown_hashes = match unknown_hashes {
914 [] => continue,
915 [rest @ .., _last] => rest,
916 };
917
918 let new_tip = if let Some(end) = unknown_hashes.rchunks_exact(2).next() {
919 CheckedTip {
920 tip: end[0],
921 expected_next: end[1],
922 }
923 } else {
924 debug!("discarding response that extends only one block");
925 continue;
926 };
927
928 trace!(?unknown_hashes);
929
930 // Make sure we get the same tips, regardless of the
931 // order of peer responses
932 if !download_set.contains(&new_tip.expected_next) {
933 debug!(?new_tip,
934 "adding new prospective tip, and removing any existing tips in the new block hash list");
935 self.prospective_tips
936 .retain(|t| !unknown_hashes.contains(&t.expected_next));
937 self.prospective_tips.insert(new_tip);
938 } else {
939 debug!(
940 ?new_tip,
941 "discarding prospective tip: already in download set"
942 );
943 }
944
945 // security: the first response determines our download order
946 //
947 // TODO: can we make the download order independent of response order?
948 let prev_download_len = download_set.len();
949 download_set.extend(unknown_hashes);
950 let new_download_len = download_set.len();
951 let new_hashes = new_download_len - prev_download_len;
952 debug!(new_hashes, "added hashes to download set");
953 metrics::histogram!("sync.extend.response.hash.count")
954 .record(new_hashes as f64);
955 }
956 Ok(_) => unreachable!("network returned wrong response"),
957 // We ignore this error because we made multiple fanout requests.
958 Err(e) => debug!(?e),
959 }
960 }
961 }
962
963 let new_downloads = download_set.len();
964 debug!(new_downloads, "queueing new downloads");
965 metrics::gauge!("sync.extend.queued.hash.count").set(new_downloads as f64);
966
967 // security: use the actual number of new downloads from all peers,
968 // so the last peer to respond can't toggle our mempool
969 self.recent_syncs.push_extend_tips_length(new_downloads);
970
971 let response = self.request_blocks(download_set).await;
972
973 metrics::histogram!("sync.stage.duration_seconds", "stage" => "extend_tips")
974 .record(stage_start.elapsed().as_secs_f64());
975
976 Self::handle_hash_response(response).map_err(Into::into)
977 }
978
979 /// Download and verify the genesis block, if it isn't currently known to
980 /// our node.
981 async fn request_genesis(&mut self) -> Result<(), Report> {
982 // Due to Bitcoin protocol limitations, we can't request the genesis
983 // block using our standard tip-following algorithm:
984 // - getblocks requires at least one hash
985 // - responses start with the block *after* the requested block, and
986 // - the genesis hash is used as a placeholder for "no matches".
987 //
988 // So we just download and verify the genesis block here.
989 while !self.state_contains(self.genesis_hash).await? {
990 info!("starting genesis block download and verify");
991
992 let response = timeout(SYNC_RESTART_DELAY, self.request_genesis_once())
993 .await
994 .map_err(Into::into);
995
996 // 3 layers of results is not ideal, but we need the timeout on the outside.
997 match response {
998 Ok(Ok(Ok(response))) => self
999 .handle_block_response(Ok(response))
1000 .expect("never returns Err for Ok"),
1001 // Handle fatal errors
1002 Ok(Err(fatal_error)) => Err(fatal_error)?,
1003 // Handle timeouts and block errors
1004 Err(error) | Ok(Ok(Err(error))) => {
1005 // TODO: exit syncer on permanent service errors (NetworkError, VerifierError)
1006 if Self::should_restart_sync(&error) {
1007 warn!(
1008 ?error,
1009 "could not download or verify genesis block, retrying"
1010 );
1011 } else {
1012 info!(
1013 ?error,
1014 "temporary error downloading or verifying genesis block, retrying"
1015 );
1016 }
1017
1018 tokio::time::sleep(GENESIS_TIMEOUT_RETRY).await;
1019 }
1020 }
1021 }
1022
1023 Ok(())
1024 }
1025
1026 /// Try to download and verify the genesis block once.
1027 ///
1028 /// Fatal errors are returned in the outer result, temporary errors in the inner one.
1029 async fn request_genesis_once(
1030 &mut self,
1031 ) -> Result<Result<(Height, block::Hash), BlockDownloadVerifyError>, Report> {
1032 let response = self.downloads.download_and_verify(self.genesis_hash).await;
1033 Self::handle_response(response).map_err(|e| eyre!(e))?;
1034
1035 let response = self.downloads.next().await.expect("downloads is nonempty");
1036
1037 Ok(response)
1038 }
1039
1040 /// Queue download and verify tasks for each block that isn't currently known to our node.
1041 ///
1042 /// TODO: turn obtain and extend tips into a separate task, which sends hashes via a channel?
1043 async fn request_blocks(
1044 &mut self,
1045 mut hashes: IndexSet<block::Hash>,
1046 ) -> Result<IndexSet<block::Hash>, BlockDownloadVerifyError> {
1047 let lookahead_limit = self.lookahead_limit(hashes.len());
1048
1049 debug!(
1050 hashes.len = hashes.len(),
1051 ?lookahead_limit,
1052 "requesting blocks",
1053 );
1054
1055 let extra_hashes = if hashes.len() > lookahead_limit {
1056 hashes.split_off(lookahead_limit)
1057 } else {
1058 IndexSet::new()
1059 };
1060
1061 for hash in hashes.into_iter() {
1062 self.downloads.download_and_verify(hash).await?;
1063 }
1064
1065 Ok(extra_hashes)
1066 }
1067
1068 /// The configured lookahead limit, based on the currently verified height,
1069 /// and the number of hashes we haven't queued yet.
1070 fn lookahead_limit(&self, new_hashes: usize) -> usize {
1071 let max_checkpoint_height: usize = self
1072 .max_checkpoint_height
1073 .0
1074 .try_into()
1075 .expect("fits in usize");
1076
1077 // When the state is empty, we want to verify using checkpoints
1078 let verified_height: usize = self
1079 .latest_chain_tip
1080 .best_tip_height()
1081 .unwrap_or(Height(0))
1082 .0
1083 .try_into()
1084 .expect("fits in usize");
1085
1086 if verified_height >= max_checkpoint_height {
1087 self.full_verify_concurrency_limit
1088 } else if (verified_height + new_hashes) >= max_checkpoint_height {
1089 // If we're just about to start full verification, allow enough for the remaining checkpoint,
1090 // and also enough for a separate full verification lookahead.
1091 let checkpoint_hashes = verified_height + new_hashes - max_checkpoint_height;
1092
1093 self.full_verify_concurrency_limit + checkpoint_hashes
1094 } else {
1095 self.checkpoint_verify_concurrency_limit
1096 }
1097 }
1098
1099 /// Handles a response for a requested block.
1100 ///
1101 /// See [`Self::handle_response`] for more details.
1102 #[allow(unknown_lints)]
1103 fn handle_block_response(
1104 &mut self,
1105 response: Result<(Height, block::Hash), BlockDownloadVerifyError>,
1106 ) -> Result<(), BlockDownloadVerifyError> {
1107 match response {
1108 Ok((height, hash)) => {
1109 trace!(?height, ?hash, "verified and committed block to state");
1110
1111 return Ok(());
1112 }
1113
1114 Err(BlockDownloadVerifyError::Invalid {
1115 ref error,
1116 advertiser_addr: Some(advertiser_addr),
1117 ..
1118 }) if error.misbehavior_score() != 0 => {
1119 let _ = self
1120 .misbehavior_sender
1121 .try_send((advertiser_addr, error.misbehavior_score()));
1122 }
1123
1124 Err(_) => {}
1125 };
1126
1127 Self::handle_response(response)
1128 }
1129
1130 /// Handles a response to block hash submission, passing through any extra hashes.
1131 ///
1132 /// See [`Self::handle_response`] for more details.
1133 #[allow(unknown_lints)]
1134 fn handle_hash_response(
1135 response: Result<IndexSet<block::Hash>, BlockDownloadVerifyError>,
1136 ) -> Result<IndexSet<block::Hash>, BlockDownloadVerifyError> {
1137 match response {
1138 Ok(extra_hashes) => Ok(extra_hashes),
1139 Err(_) => Self::handle_response(response).map(|()| IndexSet::new()),
1140 }
1141 }
1142
1143 /// Handles a response to a syncer request.
1144 ///
1145 /// Returns `Ok` if the request was successful, or if an expected error occurred,
1146 /// so that the synchronization can continue normally.
1147 ///
1148 /// Returns `Err` if an unexpected error occurred, to force the synchronizer to restart.
1149 #[allow(unknown_lints)]
1150 fn handle_response<T>(
1151 response: Result<T, BlockDownloadVerifyError>,
1152 ) -> Result<(), BlockDownloadVerifyError> {
1153 match response {
1154 Ok(_t) => Ok(()),
1155 Err(error) => {
1156 // TODO: exit syncer on permanent service errors (NetworkError, VerifierError)
1157 if Self::should_restart_sync(&error) {
1158 Err(error)
1159 } else {
1160 Ok(())
1161 }
1162 }
1163 }
1164 }
1165
1166 /// Returns `true` if the hash is present in the state, and `false`
1167 /// if the hash is not present in the state.
1168 pub(crate) async fn state_contains(&mut self, hash: block::Hash) -> Result<bool, Report> {
1169 match self
1170 .state
1171 .ready()
1172 .await
1173 .map_err(|e| eyre!(e))?
1174 .call(zebra_state::Request::KnownBlock(hash))
1175 .await
1176 .map_err(|e| eyre!(e))?
1177 {
1178 zs::Response::KnownBlock(loc) => Ok(loc.is_some()),
1179 _ => unreachable!("wrong response to known block request"),
1180 }
1181 }
1182
1183 fn update_metrics(&mut self) {
1184 metrics::gauge!("sync.prospective_tips.len",).set(self.prospective_tips.len() as f64);
1185 metrics::gauge!("sync.downloads.in_flight",).set(self.downloads.in_flight() as f64);
1186 }
1187
1188 /// Return if the sync should be restarted based on the given error
1189 /// from the block downloader and verifier stream.
1190 fn should_restart_sync(e: &BlockDownloadVerifyError) -> bool {
1191 match e {
1192 // Structural matches: downcasts
1193 BlockDownloadVerifyError::Invalid { error, .. } if error.is_duplicate_request() => {
1194 debug!(error = ?e, "block was already verified or committed, possibly from a previous sync run, continuing");
1195 false
1196 }
1197
1198 // Structural matches: direct
1199 BlockDownloadVerifyError::CancelledDuringDownload { .. }
1200 | BlockDownloadVerifyError::CancelledDuringVerification { .. } => {
1201 debug!(error = ?e, "block verification was cancelled, continuing");
1202 false
1203 }
1204 BlockDownloadVerifyError::BehindTipHeightLimit { .. } => {
1205 debug!(
1206 error = ?e,
1207 "block height is behind the current state tip, \
1208 assuming the syncer will eventually catch up to the state, continuing"
1209 );
1210 false
1211 }
1212 BlockDownloadVerifyError::DuplicateBlockQueuedForDownload { .. } => {
1213 debug!(
1214 error = ?e,
1215 "queued duplicate block hash for download, \
1216 assuming the syncer will eventually resolve duplicates, continuing"
1217 );
1218 false
1219 }
1220
1221 BlockDownloadVerifyError::DownloadFailed { ref error, .. }
1222 if format!("{error:?}").contains("NotFound") =>
1223 {
1224 // Covers these errors:
1225 // - NotFoundResponse
1226 // - NotFoundRegistry
1227 //
1228 // TODO: improve this by checking the type (#2908)
1229 // restart after a certain number of NotFound errors?
1230 debug!(error = ?e, "block was not found, possibly from a peer that doesn't have the block yet, continuing");
1231 false
1232 }
1233
1234 _ => {
1235 // download_and_verify downcasts errors from the block verifier
1236 // into VerifyChainError, and puts the result inside one of the
1237 // BlockDownloadVerifyError enumerations. This downcast could
1238 // become incorrect e.g. after some refactoring, and it is difficult
1239 // to write a test to check it. The test below is a best-effort
1240 // attempt to catch if that happens and log it.
1241 //
1242 // TODO: add a proper test and remove this
1243 // https://github.com/ZcashFoundation/zebra/issues/2909
1244 let err_str = format!("{e:?}");
1245 if err_str.contains("NotFound") {
1246 error!(?e,
1247 "a BlockDownloadVerifyError that should have been filtered out was detected, \
1248 which possibly indicates a programming error in the downcast inside \
1249 zebrad::components::sync::downloads::Downloads::download_and_verify"
1250 )
1251 }
1252
1253 warn!(?e, "error downloading and verifying block");
1254 true
1255 }
1256 }
1257 }
1258}