1use std::{
4 collections::HashMap,
5 convert,
6 pin::Pin,
7 sync::Arc,
8 task::{Context, Poll},
9};
10
11use futures::{
12 future::{FutureExt, TryFutureExt},
13 ready,
14 stream::{FuturesUnordered, Stream},
15};
16use pin_project::pin_project;
17use thiserror::Error;
18use tokio::{
19 sync::{oneshot, watch},
20 task::JoinHandle,
21 time::timeout,
22};
23use tower::{hedge, Service, ServiceExt};
24use tracing_futures::Instrument;
25
26use zebra_chain::{
27 block::{self, Height, HeightDiff},
28 chain_tip::ChainTip,
29};
30use zebra_network::{self as zn, PeerSocketAddr};
31use zebra_state as zs;
32
33use crate::components::sync::{
34 FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT, FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT_LIMIT,
35};
36
37type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
38
39pub const VERIFICATION_PIPELINE_SCALING_MULTIPLIER: usize = 2;
56
57pub const VERIFICATION_PIPELINE_DROP_LIMIT: HeightDiff = 50_000;
60
61#[derive(Copy, Clone, Debug)]
62pub(super) struct AlwaysHedge;
63
64impl<Request: Clone> hedge::Policy<Request> for AlwaysHedge {
65 fn can_retry(&self, _req: &Request) -> bool {
66 true
67 }
68 fn clone_request(&self, req: &Request) -> Option<Request> {
69 Some(req.clone())
70 }
71}
72
73#[derive(Error, Debug)]
75#[allow(dead_code)]
76pub enum BlockDownloadVerifyError {
77 #[error("permanent readiness error from the network service: {error:?}")]
78 NetworkServiceError {
79 #[source]
80 error: BoxError,
81 },
82
83 #[error("permanent readiness error from the verifier service: {error:?}")]
84 VerifierServiceError {
85 #[source]
86 error: BoxError,
87 },
88
89 #[error("duplicate block hash queued for download: {hash:?}")]
90 DuplicateBlockQueuedForDownload { hash: block::Hash },
91
92 #[error("error downloading block: {error:?} {hash:?}")]
93 DownloadFailed {
94 #[source]
95 error: BoxError,
96 hash: block::Hash,
97 },
98
99 #[error("downloaded block was too far ahead of the chain tip: {height:?} {hash:?}")]
108 AboveLookaheadHeightLimit {
109 height: block::Height,
110 hash: block::Hash,
111 },
112
113 #[error("downloaded block was too far behind the chain tip: {height:?} {hash:?}")]
114 BehindTipHeightLimit {
115 height: block::Height,
116 hash: block::Hash,
117 },
118
119 #[error("downloaded block had an invalid height: {hash:?}")]
120 InvalidHeight { hash: block::Hash },
121
122 #[error("block failed consensus validation: {error:?} {height:?} {hash:?}")]
123 Invalid {
124 #[source]
125 error: zebra_consensus::router::RouterError,
126 height: block::Height,
127 hash: block::Hash,
128 advertiser_addr: Option<PeerSocketAddr>,
129 },
130
131 #[error("block validation request failed: {error:?} {height:?} {hash:?}")]
132 ValidationRequestError {
133 #[source]
134 error: BoxError,
135 height: block::Height,
136 hash: block::Hash,
137 },
138
139 #[error("block download & verification was cancelled during download: {hash:?}")]
140 CancelledDuringDownload { hash: block::Hash },
141
142 #[error(
143 "block download & verification was cancelled while waiting for the verifier service: \
144 to become ready: {height:?} {hash:?}"
145 )]
146 CancelledAwaitingVerifierReadiness {
147 height: block::Height,
148 hash: block::Hash,
149 },
150
151 #[error(
152 "block download & verification was cancelled during verification: {height:?} {hash:?}"
153 )]
154 CancelledDuringVerification {
155 height: block::Height,
156 hash: block::Hash,
157 },
158
159 #[error(
160 "timeout during service readiness, download, verification, or internal downloader operation"
161 )]
162 Timeout,
163}
164
165impl From<tokio::time::error::Elapsed> for BlockDownloadVerifyError {
166 fn from(_value: tokio::time::error::Elapsed) -> Self {
167 BlockDownloadVerifyError::Timeout
168 }
169}
170
171#[pin_project]
173#[derive(Debug)]
174pub struct Downloads<ZN, ZV, ZSTip>
175where
176 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Sync + 'static,
177 ZN::Future: Send,
178 ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
179 + Send
180 + Sync
181 + Clone
182 + 'static,
183 ZV::Future: Send,
184 ZSTip: ChainTip + Clone + Send + 'static,
185{
186 network: ZN,
191
192 verifier: ZV,
194
195 latest_chain_tip: ZSTip,
197
198 lookahead_limit: usize,
202
203 max_checkpoint_height: Height,
205
206 past_lookahead_limit_sender: Arc<std::sync::Mutex<watch::Sender<bool>>>,
211
212 past_lookahead_limit_receiver: zs::WatchReceiver<bool>,
214
215 #[pin]
219 pending: FuturesUnordered<
220 JoinHandle<Result<(Height, block::Hash), (BlockDownloadVerifyError, block::Hash)>>,
221 >,
222
223 cancel_handles: HashMap<block::Hash, oneshot::Sender<()>>,
226}
227
228impl<ZN, ZV, ZSTip> Stream for Downloads<ZN, ZV, ZSTip>
229where
230 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Sync + 'static,
231 ZN::Future: Send,
232 ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
233 + Send
234 + Sync
235 + Clone
236 + 'static,
237 ZV::Future: Send,
238 ZSTip: ChainTip + Clone + Send + 'static,
239{
240 type Item = Result<(Height, block::Hash), BlockDownloadVerifyError>;
241
242 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
243 let this = self.project();
244 if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
254 match join_result.expect("block download and verify tasks must not panic") {
255 Ok((height, hash)) => {
256 this.cancel_handles.remove(&hash);
257
258 Poll::Ready(Some(Ok((height, hash))))
259 }
260 Err((e, hash)) => {
261 this.cancel_handles.remove(&hash);
262 Poll::Ready(Some(Err(e)))
263 }
264 }
265 } else {
266 Poll::Ready(None)
267 }
268 }
269
270 fn size_hint(&self) -> (usize, Option<usize>) {
271 self.pending.size_hint()
272 }
273}
274
275impl<ZN, ZV, ZSTip> Downloads<ZN, ZV, ZSTip>
276where
277 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Sync + 'static,
278 ZN::Future: Send,
279 ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
280 + Send
281 + Sync
282 + Clone
283 + 'static,
284 ZV::Future: Send,
285 ZSTip: ChainTip + Clone + Send + 'static,
286{
287 pub fn new(
298 network: ZN,
299 verifier: ZV,
300 latest_chain_tip: ZSTip,
301 past_lookahead_limit_sender: watch::Sender<bool>,
302 lookahead_limit: usize,
303 max_checkpoint_height: Height,
304 ) -> Self {
305 let past_lookahead_limit_receiver =
306 zs::WatchReceiver::new(past_lookahead_limit_sender.subscribe());
307
308 Self {
309 network,
310 verifier,
311 latest_chain_tip,
312 lookahead_limit,
313 max_checkpoint_height,
314 past_lookahead_limit_sender: Arc::new(std::sync::Mutex::new(
315 past_lookahead_limit_sender,
316 )),
317 past_lookahead_limit_receiver,
318 pending: FuturesUnordered::new(),
319 cancel_handles: HashMap::new(),
320 }
321 }
322
323 #[instrument(level = "debug", skip(self), fields(%hash))]
329 pub async fn download_and_verify(
330 &mut self,
331 hash: block::Hash,
332 ) -> Result<(), BlockDownloadVerifyError> {
333 if self.cancel_handles.contains_key(&hash) {
334 metrics::counter!("sync.already.queued.dropped.block.hash.count").increment(1);
335 return Err(BlockDownloadVerifyError::DuplicateBlockQueuedForDownload { hash });
336 }
337
338 let block_req = self
346 .network
347 .ready()
348 .await
349 .map_err(|error| BlockDownloadVerifyError::NetworkServiceError { error })?
350 .call(zn::Request::BlocksByHash(std::iter::once(hash).collect()));
351
352 let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
354
355 let mut verifier = self.verifier.clone();
356 let latest_chain_tip = self.latest_chain_tip.clone();
357
358 let lookahead_limit = self.lookahead_limit;
359 let max_checkpoint_height = self.max_checkpoint_height;
360
361 let past_lookahead_limit_sender = self.past_lookahead_limit_sender.clone();
362 let past_lookahead_limit_receiver = self.past_lookahead_limit_receiver.clone();
363
364 let task = tokio::spawn(
365 async move {
366 let download_start = std::time::Instant::now();
369 let rsp = tokio::select! {
370 biased;
371 _ = &mut cancel_rx => {
372 trace!("task cancelled prior to download completion");
373 metrics::counter!("sync.cancelled.download.count").increment(1);
374 metrics::histogram!("sync.block.download.duration_seconds", "result" => "cancelled")
375 .record(download_start.elapsed().as_secs_f64());
376 return Err(BlockDownloadVerifyError::CancelledDuringDownload { hash })
377 }
378 rsp = block_req => rsp.map_err(|error| BlockDownloadVerifyError::DownloadFailed { error, hash})?,
379 };
380
381 let (block, advertiser_addr) = if let zn::Response::Blocks(blocks) = rsp {
382 assert_eq!(
383 blocks.len(),
384 1,
385 "wrong number of blocks in response to a single hash"
386 );
387
388 blocks
389 .first()
390 .expect("just checked length")
391 .available()
392 .expect("unexpected missing block status: single block failures should be errors")
393 } else {
394 unreachable!("wrong response to block request");
395 };
396 metrics::counter!("sync.downloaded.block.count").increment(1);
397 metrics::histogram!("sync.block.download.duration_seconds", "result" => "success")
398 .record(download_start.elapsed().as_secs_f64());
399
400 let tip_height = latest_chain_tip.best_tip_height();
404
405 let (lookahead_drop_height, lookahead_pause_height, lookahead_reset_height) = if let Some(tip_height) = tip_height {
406 let lookahead_pause = HeightDiff::try_from(
409 lookahead_limit + lookahead_limit * VERIFICATION_PIPELINE_SCALING_MULTIPLIER,
410 )
411 .expect("fits in HeightDiff");
412
413
414 ((tip_height + VERIFICATION_PIPELINE_DROP_LIMIT).expect("tip is much lower than Height::MAX"),
415 (tip_height + lookahead_pause).expect("tip is much lower than Height::MAX"),
416 (tip_height + lookahead_pause/2).expect("tip is much lower than Height::MAX"))
417 } else {
418 let genesis_drop = VERIFICATION_PIPELINE_DROP_LIMIT.try_into().expect("fits in u32");
419 let genesis_lookahead =
420 u32::try_from(lookahead_limit - 1).expect("fits in u32");
421
422 (block::Height(genesis_drop),
423 block::Height(genesis_lookahead),
424 block::Height(genesis_lookahead/2))
425 };
426
427 let min_accepted_height = tip_height
437 .map(|tip_height| {
438 block::Height(tip_height.0.saturating_sub(zs::MAX_BLOCK_REORG_HEIGHT))
439 })
440 .unwrap_or(block::Height(0));
441
442 let block_height = if let Some(block_height) = block.coinbase_height() {
443 block_height
444 } else {
445 debug!(
446 ?hash,
447 "synced block with no height: dropped downloaded block"
448 );
449 metrics::counter!("sync.no.height.dropped.block.count").increment(1);
450
451 return Err(BlockDownloadVerifyError::InvalidHeight { hash });
452 };
453
454 if block_height > lookahead_drop_height {
455 Err(BlockDownloadVerifyError::AboveLookaheadHeightLimit { height: block_height, hash })?;
456 } else if block_height > lookahead_pause_height {
457 if !past_lookahead_limit_receiver.cloned_watch_data() {
460 info!(
461 ?hash,
462 ?block_height,
463 ?tip_height,
464 ?lookahead_pause_height,
465 ?lookahead_reset_height,
466 lookahead_limit = ?lookahead_limit,
467 "synced block height too far ahead of the tip: \
468 waiting for downloaded blocks to commit to the state",
469 );
470
471 let _ = past_lookahead_limit_sender.lock().expect("thread panicked while holding the past_lookahead_limit_sender mutex guard").send(true);
476 } else {
477 debug!(
478 ?hash,
479 ?block_height,
480 ?tip_height,
481 ?lookahead_pause_height,
482 ?lookahead_reset_height,
483 lookahead_limit = ?lookahead_limit,
484 "synced block height too far ahead of the tip: \
485 waiting for downloaded blocks to commit to the state",
486 );
487 }
488
489 metrics::counter!("sync.max.height.limit.paused.count").increment(1);
490 } else if block_height <= lookahead_reset_height && past_lookahead_limit_receiver.cloned_watch_data() {
491 let _ = past_lookahead_limit_sender.lock().expect("thread panicked while holding the past_lookahead_limit_sender mutex guard").send(false);
496 metrics::counter!("sync.max.height.limit.reset.count").increment(1);
497
498 metrics::counter!("sync.max.height.limit.reset.attempt.count").increment(1);
499 }
500
501 if block_height < min_accepted_height {
502 debug!(
503 ?hash,
504 ?block_height,
505 ?tip_height,
506 ?min_accepted_height,
507 behind_tip_limit = ?zs::MAX_BLOCK_REORG_HEIGHT,
508 "synced block height behind the finalized tip: dropped downloaded block"
509 );
510 metrics::counter!("gossip.min.height.limit.dropped.block.count").increment(1);
511
512 Err(BlockDownloadVerifyError::BehindTipHeightLimit { height: block_height, hash })?;
513 }
514
515 let readiness = verifier.ready();
517 let verifier = tokio::select! {
519 biased;
520 _ = &mut cancel_rx => {
521 trace!("task cancelled waiting for verifier service readiness");
522 metrics::counter!("sync.cancelled.verify.ready.count").increment(1);
523 return Err(BlockDownloadVerifyError::CancelledAwaitingVerifierReadiness { height: block_height, hash })
524 }
525 verifier = readiness => verifier,
526 };
527
528 let verify_start = std::time::Instant::now();
530 let mut rsp = verifier
531 .map_err(|error| BlockDownloadVerifyError::VerifierServiceError { error })?
532 .call(zebra_consensus::Request::Commit(block)).boxed();
533
534 let short_timeout_max = (max_checkpoint_height + FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT_LIMIT).expect("checkpoint block height is in valid range");
536 if block_height >= max_checkpoint_height && block_height <= short_timeout_max {
537 rsp = timeout(FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT, rsp)
538 .map_err(|timeout| format!("initial fully verified block timed out: retrying: {timeout:?}").into())
539 .map(|nested_result| nested_result.and_then(convert::identity)).boxed();
540 }
541
542 let verification = tokio::select! {
543 biased;
544 _ = &mut cancel_rx => {
545 trace!("task cancelled prior to verification");
546 metrics::counter!("sync.cancelled.verify.count").increment(1);
547 metrics::histogram!("sync.block.verify.duration_seconds", "result" => "cancelled")
548 .record(verify_start.elapsed().as_secs_f64());
549 return Err(BlockDownloadVerifyError::CancelledDuringVerification { height: block_height, hash })
550 }
551 verification = rsp => verification,
552 };
553
554 let verify_result = if verification.is_ok() { "success" } else { "failure" };
555 metrics::histogram!("sync.block.verify.duration_seconds", "result" => verify_result)
556 .record(verify_start.elapsed().as_secs_f64());
557
558 if verification.is_ok() {
559 metrics::counter!("sync.verified.block.count").increment(1);
560 }
561
562 verification
563 .map(|hash| (block_height, hash))
564 .map_err(|err| {
565 match err.downcast::<zebra_consensus::router::RouterError>() {
566 Ok(error) => BlockDownloadVerifyError::Invalid { error: *error, height: block_height, hash, advertiser_addr },
567 Err(error) => BlockDownloadVerifyError::ValidationRequestError { error, height: block_height, hash },
568 }
569 })
570 }
571 .in_current_span()
572 .map_err(move |e| (e, hash)),
575 );
576
577 tokio::task::yield_now().await;
579
580 self.pending.push(task);
581 assert!(
582 self.cancel_handles.insert(hash, cancel_tx).is_none(),
583 "blocks are only queued once"
584 );
585
586 Ok(())
587 }
588
589 pub fn cancel_all(&mut self) {
591 let _ = std::mem::take(&mut self.pending);
593
594 for (_hash, cancel) in self.cancel_handles.drain() {
598 let _ = cancel.send(());
599 }
600
601 assert!(self.pending.is_empty());
602 assert!(self.cancel_handles.is_empty());
603
604 let _ = self
609 .past_lookahead_limit_sender
610 .lock()
611 .expect("thread panicked while holding the past_lookahead_limit_sender mutex guard")
612 .send(false);
613 }
614
615 pub fn in_flight(&mut self) -> usize {
617 self.pending.len()
618 }
619
620 #[allow(dead_code)]
622 pub fn is_empty(&mut self) -> bool {
623 self.pending.is_empty()
624 }
625}