zebrad/components/inbound/
downloads.rs1use std::{
4 collections::{HashMap, HashSet},
5 net::IpAddr,
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10use futures::{
11 future::TryFutureExt,
12 ready,
13 stream::{FuturesUnordered, Stream},
14};
15use pin_project::pin_project;
16use tokio::{sync::oneshot, task::JoinHandle};
17use tower::{Service, ServiceExt};
18use tracing_futures::Instrument;
19
20use zebra_chain::{
21 block::{self, HeightDiff},
22 chain_tip::ChainTip,
23};
24use zebra_network::{self as zn, PeerSocketAddr};
25use zebra_state as zs;
26
27use crate::components::sync::MIN_CONCURRENCY_LIMIT;
28
29type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
30
31pub const MAX_INBOUND_CONCURRENCY: usize = 200;
50
51pub enum DownloadAction {
53 AddedToQueue,
55
56 AlreadyQueued,
60
61 FullQueue,
66
67 TooManyFromPeer,
73}
74
75#[pin_project]
77#[derive(Debug)]
78pub struct Downloads<ZN, ZV, ZS>
79where
80 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
81 ZN::Future: Send,
82 ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
83 + Send
84 + Clone
85 + 'static,
86 ZV::Future: Send,
87 ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
88 ZS::Future: Send,
89{
90 full_verify_concurrency_limit: usize,
94
95 network: ZN,
100
101 verifier: ZV,
103
104 state: ZS,
106
107 latest_chain_tip: zs::LatestChainTip,
109
110 #[pin]
114 pending: FuturesUnordered<
115 JoinHandle<Result<block::Hash, (BoxError, block::Hash, Option<PeerSocketAddr>)>>,
116 >,
117
118 cancel_handles: HashMap<block::Hash, (oneshot::Sender<()>, Option<IpAddr>)>,
122
123 in_flight_ips: HashSet<IpAddr>,
132}
133
134impl<ZN, ZV, ZS> Stream for Downloads<ZN, ZV, ZS>
135where
136 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
137 ZN::Future: Send,
138 ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
139 + Send
140 + Clone
141 + 'static,
142 ZV::Future: Send,
143 ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
144 ZS::Future: Send,
145{
146 type Item = Result<block::Hash, (BoxError, Option<PeerSocketAddr>)>;
147
148 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
149 let this = self.project();
150 if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
160 let (result, hash) =
161 match join_result.expect("block download and verify tasks must not panic") {
162 Ok(hash) => (Ok(hash), hash),
163 Err((e, hash, advertiser_addr)) => (Err((e, advertiser_addr)), hash),
164 };
165 if let Some((_, Some(ip))) = this.cancel_handles.remove(&hash) {
166 assert!(
167 this.in_flight_ips.remove(&ip),
168 "every tracked IP was inserted when its download was queued",
169 );
170 }
171 Poll::Ready(Some(result))
172 } else {
173 Poll::Ready(None)
174 }
175 }
176
177 fn size_hint(&self) -> (usize, Option<usize>) {
178 self.pending.size_hint()
179 }
180}
181
182impl<ZN, ZV, ZS> Downloads<ZN, ZV, ZS>
183where
184 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
185 ZN::Future: Send,
186 ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
187 + Send
188 + Clone
189 + 'static,
190 ZV::Future: Send,
191 ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
192 ZS::Future: Send,
193{
194 pub fn new(
201 full_verify_concurrency_limit: usize,
202 network: ZN,
203 verifier: ZV,
204 state: ZS,
205 latest_chain_tip: zs::LatestChainTip,
206 ) -> Self {
207 let full_verify_concurrency_limit =
209 full_verify_concurrency_limit.clamp(MIN_CONCURRENCY_LIMIT, MAX_INBOUND_CONCURRENCY);
210
211 Self {
212 full_verify_concurrency_limit,
213 network,
214 verifier,
215 state,
216 latest_chain_tip,
217 pending: FuturesUnordered::new(),
218 cancel_handles: HashMap::new(),
219 in_flight_ips: HashSet::new(),
220 }
221 }
222
223 #[instrument(skip(self, hash), fields(hash = %hash))]
230 pub fn download_and_verify(
231 &mut self,
232 hash: block::Hash,
233 advertiser: Option<PeerSocketAddr>,
234 ) -> DownloadAction {
235 if self.cancel_handles.contains_key(&hash) {
236 debug!(
237 ?hash,
238 queue_len = self.pending.len(),
239 concurrency_limit = self.full_verify_concurrency_limit,
240 "block hash already queued for inbound download: ignored block",
241 );
242
243 metrics::gauge!("gossip.queued.block.count").set(self.pending.len() as f64);
244 metrics::counter!("gossip.already.queued.dropped.block.hash.count").increment(1);
245
246 return DownloadAction::AlreadyQueued;
247 }
248
249 if self.pending.len() >= self.full_verify_concurrency_limit {
250 debug!(
251 ?hash,
252 queue_len = self.pending.len(),
253 concurrency_limit = self.full_verify_concurrency_limit,
254 "too many blocks queued for inbound download: ignored block",
255 );
256
257 metrics::gauge!("gossip.queued.block.count").set(self.pending.len() as f64);
258 metrics::counter!("gossip.full.queue.dropped.block.hash.count").increment(1);
259
260 return DownloadAction::FullQueue;
261 }
262
263 let advertiser_ip = advertiser.map(|addr| addr.ip());
264 if let Some(ip) = advertiser_ip {
265 if self.in_flight_ips.contains(&ip) {
266 debug!(
267 ?hash,
268 ?advertiser,
269 "already have an in-flight inbound download from peer IP: ignored block",
270 );
271
272 metrics::counter!("gossip.peer.limit.dropped.block.hash.count").increment(1);
273
274 return DownloadAction::TooManyFromPeer;
275 }
276 }
277
278 let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
280
281 let state = self.state.clone();
282 let network = self.network.clone();
283 let verifier = self.verifier.clone();
284 let latest_chain_tip = self.latest_chain_tip.clone();
285 let full_verify_concurrency_limit = self.full_verify_concurrency_limit;
286
287 let fut = async move {
288 match state.oneshot(zs::Request::KnownBlock(hash)).await {
290 Ok(zs::Response::KnownBlock(None)) => Ok(()),
291 Ok(zs::Response::KnownBlock(Some(_))) => Err("already present".into()),
292 Ok(_) => unreachable!("wrong response"),
293 Err(e) => Err(e),
294 }
295 .map_err(|e| (e, None))?;
296
297 let (block, advertiser_addr) = if let zn::Response::Blocks(blocks) = network
298 .oneshot(zn::Request::BlocksByHash(std::iter::once(hash).collect()))
299 .await
300 .map_err(|e| (e, None))?
301 {
302 assert_eq!(
303 blocks.len(),
304 1,
305 "wrong number of blocks in response to a single hash",
306 );
307
308 blocks
309 .first()
310 .expect("just checked length")
311 .available()
312 .expect(
313 "unexpected missing block status: single block failures should be errors",
314 )
315 } else {
316 unreachable!("wrong response to block request");
317 };
318 metrics::counter!("gossip.downloaded.block.count").increment(1);
319
320 let tip_height = latest_chain_tip.best_tip_height();
328
329 let max_lookahead_height = if let Some(tip_height) = tip_height {
330 let lookahead = HeightDiff::try_from(full_verify_concurrency_limit)
331 .expect("fits in HeightDiff");
332 (tip_height + lookahead).expect("tip is much lower than Height::MAX")
333 } else {
334 let genesis_lookahead =
335 u32::try_from(full_verify_concurrency_limit - 1).expect("fits in u32");
336 block::Height(genesis_lookahead)
337 };
338
339 let min_accepted_height = tip_height
349 .map(|tip_height| {
350 block::Height(tip_height.0.saturating_sub(zs::MAX_BLOCK_REORG_HEIGHT))
351 })
352 .unwrap_or(block::Height(0));
353
354 let block_height = block
355 .coinbase_height()
356 .ok_or_else(|| {
357 debug!(
358 ?hash,
359 "gossiped block with no height: dropped downloaded block"
360 );
361 metrics::counter!("gossip.no.height.dropped.block.count").increment(1);
362
363 BoxError::from("gossiped block with no height")
364 })
365 .map_err(|e| (e, None))?;
366
367 if block_height > max_lookahead_height {
368 debug!(
369 ?hash,
370 ?block_height,
371 ?tip_height,
372 ?max_lookahead_height,
373 lookahead_limit = full_verify_concurrency_limit,
374 "gossiped block height too far ahead of the tip: dropped downloaded block",
375 );
376 metrics::counter!("gossip.max.height.limit.dropped.block.count").increment(1);
377
378 Err("gossiped block height too far ahead").map_err(|e| (e.into(), None))?;
379 } else if block_height < min_accepted_height {
380 debug!(
381 ?hash,
382 ?block_height,
383 ?tip_height,
384 ?min_accepted_height,
385 behind_tip_limit = ?zs::MAX_BLOCK_REORG_HEIGHT,
386 "gossiped block height behind the finalized tip: dropped downloaded block",
387 );
388 metrics::counter!("gossip.min.height.limit.dropped.block.count").increment(1);
389
390 Err("gossiped block height behind the finalized tip")
391 .map_err(|e| (e.into(), None))?;
392 }
393
394 verifier
395 .oneshot(zebra_consensus::Request::Commit(block))
396 .await
397 .map(|hash| (hash, block_height))
398 .map_err(|e| (e, advertiser_addr))
399 }
400 .map_ok(|(hash, height)| {
401 info!(?height, "downloaded and verified gossiped block");
402 metrics::counter!("gossip.verified.block.count").increment(1);
403 hash
404 })
405 .map_err(move |(e, advertiser_addr)| (e, hash, advertiser_addr))
408 .in_current_span();
409
410 let task = tokio::spawn(async move {
411 tokio::select! {
413 biased;
414 _ = &mut cancel_rx => {
415 trace!("task cancelled prior to completion");
416 metrics::counter!("gossip.cancelled.count").increment(1);
417 Err(("canceled".into(), hash, None))
418 }
419 verification = fut => verification,
420 }
421 });
422
423 self.pending.push(task);
424 assert!(
425 self.cancel_handles
426 .insert(hash, (cancel_tx, advertiser_ip))
427 .is_none(),
428 "blocks are only queued once"
429 );
430 if let Some(ip) = advertiser_ip {
431 assert!(
432 self.in_flight_ips.insert(ip),
433 "the per-IP cap check above rejects any IP already in flight",
434 );
435 }
436
437 debug!(
438 ?hash,
439 queue_len = self.pending.len(),
440 concurrency_limit = self.full_verify_concurrency_limit,
441 "queued hash for download",
442 );
443 metrics::gauge!("gossip.queued.block.count").set(self.pending.len() as f64);
444
445 DownloadAction::AddedToQueue
446 }
447}