1use std::{
4 collections::HashMap,
5 convert,
6 pin::Pin,
7 sync::Arc,
8 task::{Context, Poll},
9};
10
11use futures::{
12 future::{FutureExt, TryFutureExt},
13 ready,
14 stream::{FuturesUnordered, Stream},
15};
16use pin_project::pin_project;
17use thiserror::Error;
18use tokio::{
19 sync::{oneshot, watch},
20 task::JoinHandle,
21 time::timeout,
22};
23use tower::{hedge, Service, ServiceExt};
24use tracing_futures::Instrument;
25
26use zebra_chain::{
27 block::{self, Height, HeightDiff},
28 chain_tip::ChainTip,
29};
30use zebra_network::{self as zn, PeerSocketAddr};
31use zebra_state as zs;
32
33use crate::components::sync::{
34 FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT, FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT_LIMIT,
35};
36
37type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
38
39pub const VERIFICATION_PIPELINE_SCALING_MULTIPLIER: usize = 2;
56
57pub const VERIFICATION_PIPELINE_DROP_LIMIT: HeightDiff = 50_000;
60
61#[derive(Copy, Clone, Debug)]
62pub(super) struct AlwaysHedge;
63
64impl<Request: Clone> hedge::Policy<Request> for AlwaysHedge {
65 fn can_retry(&self, _req: &Request) -> bool {
66 true
67 }
68 fn clone_request(&self, req: &Request) -> Option<Request> {
69 Some(req.clone())
70 }
71}
72
73#[derive(Error, Debug)]
75#[allow(dead_code)]
76pub enum BlockDownloadVerifyError {
77 #[error("permanent readiness error from the network service: {error:?}")]
78 NetworkServiceError {
79 #[source]
80 error: BoxError,
81 },
82
83 #[error("permanent readiness error from the verifier service: {error:?}")]
84 VerifierServiceError {
85 #[source]
86 error: BoxError,
87 },
88
89 #[error("duplicate block hash queued for download: {hash:?}")]
90 DuplicateBlockQueuedForDownload { hash: block::Hash },
91
92 #[error("error downloading block: {error:?} {hash:?}")]
93 DownloadFailed {
94 #[source]
95 error: BoxError,
96 hash: block::Hash,
97 },
98
99 #[error("downloaded block was too far ahead of the chain tip: {height:?} {hash:?}")]
108 AboveLookaheadHeightLimit {
109 height: block::Height,
110 hash: block::Hash,
111 advertiser_addr: Option<PeerSocketAddr>,
112 },
113
114 #[error("downloaded block was too far behind the chain tip: {height:?} {hash:?}")]
115 BehindTipHeightLimit {
116 height: block::Height,
117 hash: block::Hash,
118 },
119
120 #[error("downloaded block had an invalid height: {hash:?}")]
121 InvalidHeight {
122 hash: block::Hash,
123 advertiser_addr: Option<PeerSocketAddr>,
124 },
125
126 #[error("block failed consensus validation: {error:?} {height:?} {hash:?}")]
127 Invalid {
128 #[source]
129 error: zebra_consensus::router::RouterError,
130 height: block::Height,
131 hash: block::Hash,
132 advertiser_addr: Option<PeerSocketAddr>,
133 },
134
135 #[error("block validation request failed: {error:?} {height:?} {hash:?}")]
136 ValidationRequestError {
137 #[source]
138 error: BoxError,
139 height: block::Height,
140 hash: block::Hash,
141 },
142
143 #[error("block download & verification was cancelled during download: {hash:?}")]
144 CancelledDuringDownload { hash: block::Hash },
145
146 #[error(
147 "block download & verification was cancelled while waiting for the verifier service: \
148 to become ready: {height:?} {hash:?}"
149 )]
150 CancelledAwaitingVerifierReadiness {
151 height: block::Height,
152 hash: block::Hash,
153 },
154
155 #[error(
156 "block download & verification was cancelled during verification: {height:?} {hash:?}"
157 )]
158 CancelledDuringVerification {
159 height: block::Height,
160 hash: block::Hash,
161 },
162
163 #[error(
164 "timeout during service readiness, download, verification, or internal downloader operation"
165 )]
166 Timeout,
167}
168
169impl From<tokio::time::error::Elapsed> for BlockDownloadVerifyError {
170 fn from(_value: tokio::time::error::Elapsed) -> Self {
171 BlockDownloadVerifyError::Timeout
172 }
173}
174
175#[pin_project]
177#[derive(Debug)]
178pub struct Downloads<ZN, ZV, ZSTip>
179where
180 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Sync + 'static,
181 ZN::Future: Send,
182 ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
183 + Send
184 + Sync
185 + Clone
186 + 'static,
187 ZV::Future: Send,
188 ZSTip: ChainTip + Clone + Send + 'static,
189{
190 network: ZN,
195
196 verifier: ZV,
198
199 latest_chain_tip: ZSTip,
201
202 lookahead_limit: usize,
206
207 max_checkpoint_height: Height,
209
210 past_lookahead_limit_sender: Arc<std::sync::Mutex<watch::Sender<bool>>>,
215
216 past_lookahead_limit_receiver: zs::WatchReceiver<bool>,
218
219 #[pin]
223 pending: FuturesUnordered<
224 JoinHandle<Result<(Height, block::Hash), (BlockDownloadVerifyError, block::Hash)>>,
225 >,
226
227 cancel_handles: HashMap<block::Hash, oneshot::Sender<()>>,
230}
231
232impl<ZN, ZV, ZSTip> Stream for Downloads<ZN, ZV, ZSTip>
233where
234 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Sync + 'static,
235 ZN::Future: Send,
236 ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
237 + Send
238 + Sync
239 + Clone
240 + 'static,
241 ZV::Future: Send,
242 ZSTip: ChainTip + Clone + Send + 'static,
243{
244 type Item = Result<(Height, block::Hash), BlockDownloadVerifyError>;
245
246 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
247 let this = self.project();
248 if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
258 match join_result.expect("block download and verify tasks must not panic") {
259 Ok((height, hash)) => {
260 this.cancel_handles.remove(&hash);
261
262 Poll::Ready(Some(Ok((height, hash))))
263 }
264 Err((e, hash)) => {
265 this.cancel_handles.remove(&hash);
266 Poll::Ready(Some(Err(e)))
267 }
268 }
269 } else {
270 Poll::Ready(None)
271 }
272 }
273
274 fn size_hint(&self) -> (usize, Option<usize>) {
275 self.pending.size_hint()
276 }
277}
278
279impl<ZN, ZV, ZSTip> Downloads<ZN, ZV, ZSTip>
280where
281 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Sync + 'static,
282 ZN::Future: Send,
283 ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
284 + Send
285 + Sync
286 + Clone
287 + 'static,
288 ZV::Future: Send,
289 ZSTip: ChainTip + Clone + Send + 'static,
290{
291 pub fn new(
302 network: ZN,
303 verifier: ZV,
304 latest_chain_tip: ZSTip,
305 past_lookahead_limit_sender: watch::Sender<bool>,
306 lookahead_limit: usize,
307 max_checkpoint_height: Height,
308 ) -> Self {
309 let past_lookahead_limit_receiver =
310 zs::WatchReceiver::new(past_lookahead_limit_sender.subscribe());
311
312 Self {
313 network,
314 verifier,
315 latest_chain_tip,
316 lookahead_limit,
317 max_checkpoint_height,
318 past_lookahead_limit_sender: Arc::new(std::sync::Mutex::new(
319 past_lookahead_limit_sender,
320 )),
321 past_lookahead_limit_receiver,
322 pending: FuturesUnordered::new(),
323 cancel_handles: HashMap::new(),
324 }
325 }
326
327 #[instrument(level = "debug", skip(self), fields(%hash))]
333 pub async fn download_and_verify(
334 &mut self,
335 hash: block::Hash,
336 ) -> Result<(), BlockDownloadVerifyError> {
337 if self.cancel_handles.contains_key(&hash) {
338 metrics::counter!("sync.already.queued.dropped.block.hash.count").increment(1);
339 return Err(BlockDownloadVerifyError::DuplicateBlockQueuedForDownload { hash });
340 }
341
342 let block_req = self
350 .network
351 .ready()
352 .await
353 .map_err(|error| BlockDownloadVerifyError::NetworkServiceError { error })?
354 .call(zn::Request::BlocksByHash(std::iter::once(hash).collect()));
355
356 let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
358
359 let mut verifier = self.verifier.clone();
360 let latest_chain_tip = self.latest_chain_tip.clone();
361
362 let lookahead_limit = self.lookahead_limit;
363 let max_checkpoint_height = self.max_checkpoint_height;
364
365 let past_lookahead_limit_sender = self.past_lookahead_limit_sender.clone();
366 let past_lookahead_limit_receiver = self.past_lookahead_limit_receiver.clone();
367
368 let task = tokio::spawn(
369 async move {
370 let download_start = std::time::Instant::now();
373 let rsp = tokio::select! {
374 biased;
375 _ = &mut cancel_rx => {
376 trace!("task cancelled prior to download completion");
377 metrics::counter!("sync.cancelled.download.count").increment(1);
378 metrics::histogram!("sync.block.download.duration_seconds", "result" => "cancelled")
379 .record(download_start.elapsed().as_secs_f64());
380 return Err(BlockDownloadVerifyError::CancelledDuringDownload { hash })
381 }
382 rsp = block_req => rsp.map_err(|error| BlockDownloadVerifyError::DownloadFailed { error, hash})?,
383 };
384
385 let (block, advertiser_addr) = if let zn::Response::Blocks(blocks) = rsp {
386 assert_eq!(
387 blocks.len(),
388 1,
389 "wrong number of blocks in response to a single hash"
390 );
391
392 blocks
393 .first()
394 .expect("just checked length")
395 .available()
396 .expect("unexpected missing block status: single block failures should be errors")
397 } else {
398 unreachable!("wrong response to block request");
399 };
400 metrics::counter!("sync.downloaded.block.count").increment(1);
401 metrics::histogram!("sync.block.download.duration_seconds", "result" => "success")
402 .record(download_start.elapsed().as_secs_f64());
403
404 let tip_height = latest_chain_tip.best_tip_height();
408
409 let (lookahead_drop_height, lookahead_pause_height, lookahead_reset_height) = if let Some(tip_height) = tip_height {
410 let lookahead_pause = HeightDiff::try_from(
413 lookahead_limit + lookahead_limit * VERIFICATION_PIPELINE_SCALING_MULTIPLIER,
414 )
415 .expect("fits in HeightDiff");
416
417
418 ((tip_height + VERIFICATION_PIPELINE_DROP_LIMIT).expect("tip is much lower than Height::MAX"),
419 (tip_height + lookahead_pause).expect("tip is much lower than Height::MAX"),
420 (tip_height + lookahead_pause/2).expect("tip is much lower than Height::MAX"))
421 } else {
422 let genesis_drop = VERIFICATION_PIPELINE_DROP_LIMIT.try_into().expect("fits in u32");
423 let genesis_lookahead =
424 u32::try_from(lookahead_limit - 1).expect("fits in u32");
425
426 (block::Height(genesis_drop),
427 block::Height(genesis_lookahead),
428 block::Height(genesis_lookahead/2))
429 };
430
431 let min_accepted_height = tip_height
441 .map(|tip_height| {
442 block::Height(tip_height.0.saturating_sub(zs::MAX_BLOCK_REORG_HEIGHT))
443 })
444 .unwrap_or(block::Height(0));
445
446 let block_height = if let Some(block_height) = block.coinbase_height() {
447 block_height
448 } else {
449 debug!(
450 ?hash,
451 "synced block with no height: dropped downloaded block"
452 );
453 metrics::counter!("sync.no.height.dropped.block.count").increment(1);
454
455 return Err(BlockDownloadVerifyError::InvalidHeight { hash, advertiser_addr });
456 };
457
458 if block_height > lookahead_drop_height {
459 Err(BlockDownloadVerifyError::AboveLookaheadHeightLimit { height: block_height, hash, advertiser_addr })?;
460 } else if block_height > lookahead_pause_height {
461 if !past_lookahead_limit_receiver.cloned_watch_data() {
464 info!(
465 ?hash,
466 ?block_height,
467 ?tip_height,
468 ?lookahead_pause_height,
469 ?lookahead_reset_height,
470 lookahead_limit = ?lookahead_limit,
471 "synced block height too far ahead of the tip: \
472 waiting for downloaded blocks to commit to the state",
473 );
474
475 let _ = past_lookahead_limit_sender.lock().expect("thread panicked while holding the past_lookahead_limit_sender mutex guard").send(true);
480 } else {
481 debug!(
482 ?hash,
483 ?block_height,
484 ?tip_height,
485 ?lookahead_pause_height,
486 ?lookahead_reset_height,
487 lookahead_limit = ?lookahead_limit,
488 "synced block height too far ahead of the tip: \
489 waiting for downloaded blocks to commit to the state",
490 );
491 }
492
493 metrics::counter!("sync.max.height.limit.paused.count").increment(1);
494 } else if block_height <= lookahead_reset_height && past_lookahead_limit_receiver.cloned_watch_data() {
495 let _ = past_lookahead_limit_sender.lock().expect("thread panicked while holding the past_lookahead_limit_sender mutex guard").send(false);
500 metrics::counter!("sync.max.height.limit.reset.count").increment(1);
501
502 metrics::counter!("sync.max.height.limit.reset.attempt.count").increment(1);
503 }
504
505 if block_height < min_accepted_height {
506 debug!(
507 ?hash,
508 ?block_height,
509 ?tip_height,
510 ?min_accepted_height,
511 behind_tip_limit = ?zs::MAX_BLOCK_REORG_HEIGHT,
512 "synced block height behind the finalized tip: dropped downloaded block"
513 );
514 metrics::counter!("gossip.min.height.limit.dropped.block.count").increment(1);
515
516 Err(BlockDownloadVerifyError::BehindTipHeightLimit { height: block_height, hash })?;
517 }
518
519 let readiness = verifier.ready();
521 let verifier = tokio::select! {
523 biased;
524 _ = &mut cancel_rx => {
525 trace!("task cancelled waiting for verifier service readiness");
526 metrics::counter!("sync.cancelled.verify.ready.count").increment(1);
527 return Err(BlockDownloadVerifyError::CancelledAwaitingVerifierReadiness { height: block_height, hash })
528 }
529 verifier = readiness => verifier,
530 };
531
532 let verify_start = std::time::Instant::now();
534 let mut rsp = verifier
535 .map_err(|error| BlockDownloadVerifyError::VerifierServiceError { error })?
536 .call(zebra_consensus::Request::Commit(block)).boxed();
537
538 let short_timeout_max = (max_checkpoint_height + FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT_LIMIT).expect("checkpoint block height is in valid range");
540 if block_height >= max_checkpoint_height && block_height <= short_timeout_max {
541 rsp = timeout(FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT, rsp)
542 .map_err(|timeout| format!("initial fully verified block timed out: retrying: {timeout:?}").into())
543 .map(|nested_result| nested_result.and_then(convert::identity)).boxed();
544 }
545
546 let verification = tokio::select! {
547 biased;
548 _ = &mut cancel_rx => {
549 trace!("task cancelled prior to verification");
550 metrics::counter!("sync.cancelled.verify.count").increment(1);
551 metrics::histogram!("sync.block.verify.duration_seconds", "result" => "cancelled")
552 .record(verify_start.elapsed().as_secs_f64());
553 return Err(BlockDownloadVerifyError::CancelledDuringVerification { height: block_height, hash })
554 }
555 verification = rsp => verification,
556 };
557
558 let verify_result = if verification.is_ok() { "success" } else { "failure" };
559 metrics::histogram!("sync.block.verify.duration_seconds", "result" => verify_result)
560 .record(verify_start.elapsed().as_secs_f64());
561
562 if verification.is_ok() {
563 metrics::counter!("sync.verified.block.count").increment(1);
564 }
565
566 verification
567 .map(|hash| (block_height, hash))
568 .map_err(|err| {
569 match err.downcast::<zebra_consensus::router::RouterError>() {
570 Ok(error) => BlockDownloadVerifyError::Invalid { error: *error, height: block_height, hash, advertiser_addr },
571 Err(error) => BlockDownloadVerifyError::ValidationRequestError { error, height: block_height, hash },
572 }
573 })
574 }
575 .in_current_span()
576 .map_err(move |e| (e, hash)),
579 );
580
581 tokio::task::yield_now().await;
583
584 self.pending.push(task);
585 assert!(
586 self.cancel_handles.insert(hash, cancel_tx).is_none(),
587 "blocks are only queued once"
588 );
589
590 Ok(())
591 }
592
593 pub fn cancel_all(&mut self) {
595 let _ = std::mem::take(&mut self.pending);
597
598 for (_hash, cancel) in self.cancel_handles.drain() {
602 let _ = cancel.send(());
603 }
604
605 assert!(self.pending.is_empty());
606 assert!(self.cancel_handles.is_empty());
607
608 let _ = self
613 .past_lookahead_limit_sender
614 .lock()
615 .expect("thread panicked while holding the past_lookahead_limit_sender mutex guard")
616 .send(false);
617 }
618
619 pub fn in_flight(&mut self) -> usize {
621 self.pending.len()
622 }
623
624 #[allow(dead_code)]
626 pub fn is_empty(&mut self) -> bool {
627 self.pending.is_empty()
628 }
629}