1use std::{
29 collections::{HashMap, HashSet},
30 net::SocketAddr,
31 pin::Pin,
32 task::{Context, Poll},
33 time::Duration,
34};
35
36use futures::{
37 future::TryFutureExt,
38 ready,
39 stream::{FuturesUnordered, Stream},
40 FutureExt,
41};
42use pin_project::{pin_project, pinned_drop};
43use thiserror::Error;
44use tokio::{sync::oneshot, task::JoinHandle};
45use tower::{Service, ServiceExt};
46use tracing_futures::Instrument;
47
48use zebra_chain::{
49 block::Height,
50 transaction::{self, UnminedTxId, VerifiedUnminedTx},
51 transparent,
52};
53use zebra_consensus::transaction as tx;
54use zebra_network::{self as zn, PeerSocketAddr};
55use zebra_node_services::mempool::Gossip;
56use zebra_state::{self as zs, CloneError};
57
58use crate::components::{
59 mempool::crawler::RATE_LIMIT_DELAY,
60 sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT},
61};
62
63use super::MempoolError;
64
65type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
66
67pub(crate) const TRANSACTION_DOWNLOAD_TIMEOUT: Duration = BLOCK_DOWNLOAD_TIMEOUT;
73
74pub(crate) const TRANSACTION_VERIFY_TIMEOUT: Duration = BLOCK_VERIFY_TIMEOUT;
82
83pub const MAX_INBOUND_CONCURRENCY: usize = 500;
107
108pub const MAX_INBOUND_CONCURRENCY_PER_PEER: usize = 5;
117
118#[derive(Copy, Clone, Debug, Eq, PartialEq)]
120struct CancelDownloadAndVerify;
121
122#[derive(Error, Debug, Clone)]
124#[allow(dead_code)]
125pub enum TransactionDownloadVerifyError {
126 #[error("transaction is already in state")]
127 InState,
128
129 #[error("error in state service: {0}")]
130 StateError(#[source] CloneError),
131
132 #[error("error downloading transaction: {0}")]
133 DownloadFailed(#[source] CloneError),
134
135 #[error("transaction download / verification was cancelled")]
136 Cancelled,
137
138 #[error("transaction did not pass consensus validation: {error}")]
139 Invalid {
140 error: zebra_consensus::error::TransactionError,
141 advertiser_addr: Option<PeerSocketAddr>,
142 },
143}
144
145#[pin_project(PinnedDrop)]
147#[derive(Debug)]
148pub struct Downloads<ZN, ZV, ZS>
149where
150 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
151 ZN::Future: Send,
152 ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
153 ZV::Future: Send,
154 ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
155 ZS::Future: Send,
156{
157 network: ZN,
161
162 verifier: ZV,
164
165 state: ZS,
167
168 #[pin]
171 pending: FuturesUnordered<
172 JoinHandle<
173 Result<
174 Result<
175 (
176 VerifiedUnminedTx,
177 Vec<transparent::OutPoint>,
178 Option<Height>,
179 Option<oneshot::Sender<Result<(), BoxError>>>,
180 ),
181 Box<(TransactionDownloadVerifyError, UnminedTxId)>,
182 >,
183 (UnminedTxId, tokio::time::error::Elapsed),
184 >,
185 >,
186 >,
187
188 cancel_handles: HashMap<
193 UnminedTxId,
194 (
195 oneshot::Sender<CancelDownloadAndVerify>,
196 Gossip,
197 Option<SocketAddr>,
198 ),
199 >,
200
201 pending_per_peer: HashMap<SocketAddr, usize>,
207}
208
209impl<ZN, ZV, ZS> Stream for Downloads<ZN, ZV, ZS>
210where
211 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
212 ZN::Future: Send,
213 ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
214 ZV::Future: Send,
215 ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
216 ZS::Future: Send,
217{
218 type Item = Result<
219 Result<
220 (
221 VerifiedUnminedTx,
222 Vec<transparent::OutPoint>,
223 Option<Height>,
224 Option<oneshot::Sender<Result<(), BoxError>>>,
225 ),
226 Box<(UnminedTxId, TransactionDownloadVerifyError)>,
227 >,
228 (UnminedTxId, tokio::time::error::Elapsed),
229 >;
230
231 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
232 let this = self.project();
233 let item = if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
243 let result = join_result.expect("transaction download and verify tasks must not panic");
244 let (result, completed_txid) = match result {
245 Ok(Ok((tx, spent_mempool_outpoints, tip_height, rsp_tx))) => {
246 let hash = tx.transaction.id;
247 (
248 Ok(Ok((tx, spent_mempool_outpoints, tip_height, rsp_tx))),
249 Some(hash),
250 )
251 }
252 Ok(Err(boxed_err)) => {
253 let (e, hash) = *boxed_err;
254 (Ok(Err(Box::new((hash, e)))), Some(hash))
255 }
256 Err((txid, elapsed)) => {
257 this.cancel_handles.remove(&txid);
262 (Err((txid, elapsed)), None)
263 }
264 };
265
266 if let Some(hash) = completed_txid {
267 if let Some((_, _gossip, Some(source))) = this.cancel_handles.remove(&hash) {
268 Self::release_peer_slot(this.pending_per_peer, source);
269 }
270 }
271
272 Some(result)
273 } else {
274 None
275 };
276
277 Poll::Ready(item)
278 }
279
280 fn size_hint(&self) -> (usize, Option<usize>) {
281 self.pending.size_hint()
282 }
283}
284
285impl<ZN, ZV, ZS> Downloads<ZN, ZV, ZS>
286where
287 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
288 ZN::Future: Send,
289 ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
290 ZV::Future: Send,
291 ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
292 ZS::Future: Send,
293{
294 pub fn new(network: ZN, verifier: ZV, state: ZS) -> Self {
304 Self {
305 network,
306 verifier,
307 state,
308 pending: FuturesUnordered::new(),
309 cancel_handles: HashMap::new(),
310 pending_per_peer: HashMap::new(),
311 }
312 }
313
314 #[instrument(skip(self, gossiped_tx), fields(txid = %gossiped_tx.id()))]
322 #[allow(clippy::unwrap_in_result)]
323 pub fn download_if_needed_and_verify(
324 &mut self,
325 gossiped_tx: Gossip,
326 source: Option<SocketAddr>,
327 mut rsp_tx: Option<oneshot::Sender<Result<(), BoxError>>>,
328 ) -> Result<(), MempoolError> {
329 let txid = gossiped_tx.id();
330
331 if self.cancel_handles.contains_key(&txid) {
332 debug!(
333 ?txid,
334 queue_len = self.pending.len(),
335 ?MAX_INBOUND_CONCURRENCY,
336 "transaction id already queued for inbound download: ignored transaction"
337 );
338 metrics::gauge!("mempool.currently.queued.transactions",)
339 .set(self.pending.len() as f64);
340
341 return Err(MempoolError::AlreadyQueued);
342 }
343
344 if self.pending.len() >= MAX_INBOUND_CONCURRENCY {
345 debug!(
346 ?txid,
347 queue_len = self.pending.len(),
348 ?MAX_INBOUND_CONCURRENCY,
349 "too many transactions queued for inbound download: ignored transaction"
350 );
351 metrics::gauge!("mempool.currently.queued.transactions",)
352 .set(self.pending.len() as f64);
353
354 return Err(MempoolError::FullQueue);
355 }
356
357 if let Some(source) = source {
360 let count = self.pending_per_peer.get(&source).copied().unwrap_or(0);
361 if count >= MAX_INBOUND_CONCURRENCY_PER_PEER {
362 debug!(
363 ?txid,
364 peer_queue_len = count,
365 ?MAX_INBOUND_CONCURRENCY_PER_PEER,
366 "too many transactions queued for this peer: ignored transaction"
367 );
368 metrics::counter!("mempool.full_queue.per_peer.total").increment(1);
369 return Err(MempoolError::FullQueue);
370 }
371 }
372
373 let (cancel_tx, mut cancel_rx) = oneshot::channel::<CancelDownloadAndVerify>();
375
376 let network = self.network.clone();
377 let verifier = self.verifier.clone();
378 let mut state = self.state.clone();
379
380 let gossiped_tx_req = gossiped_tx.clone();
381
382 let fut = async move {
383 Self::transaction_in_best_chain(&mut state, txid).await?;
385
386 trace!(?txid, "transaction is not in best chain");
387
388 let (tip_height, next_height) = match state.oneshot(zs::Request::Tip).await {
389 Ok(zs::Response::Tip(None)) => Ok((None, Height(0))),
390 Ok(zs::Response::Tip(Some((height, _hash)))) => {
391 let next_height =
392 (height + 1).expect("valid heights are far below the maximum");
393 Ok((Some(height), next_height))
394 }
395 Ok(_) => unreachable!("wrong response"),
396 Err(e) => Err(TransactionDownloadVerifyError::StateError(e.into())),
397 }?;
398
399 trace!(?txid, ?next_height, "got next height");
400
401 let (tx, advertiser_addr) = match gossiped_tx {
402 Gossip::Id(txid) => {
403 let req = zn::Request::TransactionsById(std::iter::once(txid).collect());
404
405 let tx = match network
406 .oneshot(req)
407 .await
408 .map_err(CloneError::from)
409 .map_err(TransactionDownloadVerifyError::DownloadFailed)?
410 {
411 zn::Response::Transactions(mut txs) => txs.pop().ok_or_else(|| {
412 TransactionDownloadVerifyError::DownloadFailed(
413 BoxError::from("no transactions returned").into(),
414 )
415 })?,
416 _ => unreachable!("wrong response to transaction request"),
417 };
418
419 let (tx, advertiser_addr) = tx.available().expect(
420 "unexpected missing tx status: single tx failures should be errors",
421 );
422
423 metrics::counter!(
424 "mempool.downloaded.transactions.total",
425 "version" => format!("{}",tx.transaction.version()),
426 ).increment(1);
427 (tx, advertiser_addr)
428 }
429 Gossip::Tx(tx) => {
430 metrics::counter!(
431 "mempool.pushed.transactions.total",
432 "version" => format!("{}",tx.transaction.version()),
433 ).increment(1);
434 (tx, None)
435 }
436 };
437
438 trace!(?txid, "got tx");
439
440 let result = verifier
441 .oneshot(tx::Request::Mempool {
442 transaction: tx.clone(),
443 height: next_height,
444 })
445 .map_ok(|rsp| {
446 let tx::Response::Mempool { transaction, spent_mempool_outpoints } = rsp else {
447 panic!("unexpected non-mempool response to mempool request")
448 };
449
450 (transaction, spent_mempool_outpoints, tip_height)
451 })
452 .await;
453
454 trace!(?txid, result = ?result.as_ref().map(|_tx| ()), "verified transaction for the mempool");
456
457 result.map_err(|e| TransactionDownloadVerifyError::Invalid { error: e.into(), advertiser_addr } )
458 }
459 .map_ok(|(tx, spent_mempool_outpoints, tip_height)| {
460 metrics::counter!(
461 "mempool.verified.transactions.total",
462 "version" => format!("{}", tx.transaction.transaction.version()),
463 ).increment(1);
464 (tx, spent_mempool_outpoints, tip_height)
465 })
466 .map_err(move |e| Box::new((e, txid)))
469 .inspect(move |result| {
470 let result = result.as_ref().map(|_tx| txid);
472 debug!("mempool transaction result: {result:?}");
473 })
474 .in_current_span();
475
476 let task = tokio::spawn(async move {
477 let fut = tokio::time::timeout(RATE_LIMIT_DELAY, fut);
478
479 let result = tokio::select! {
481 biased;
482 _ = &mut cancel_rx => {
483 trace!("task cancelled prior to completion");
484 metrics::counter!("mempool.cancelled.verify.tasks.total").increment(1);
485 if let Some(rsp_tx) = rsp_tx.take() {
486 let _ = rsp_tx.send(Err("verification cancelled".into()));
487 }
488
489 Ok(Err(Box::new((TransactionDownloadVerifyError::Cancelled, txid))))
490 }
491 verification = fut => {
492 verification
493 .inspect_err(|_elapsed| {
494 if let Some(rsp_tx) = rsp_tx.take() {
495 let _ = rsp_tx.send(Err("timeout waiting for verification result".into()));
496 }
497 })
498 .map_err(|elapsed| (txid, elapsed))
499 .map(|inner_result| {
500 match inner_result {
501 Ok((transaction, spent_mempool_outpoints, tip_height)) => Ok((transaction, spent_mempool_outpoints, tip_height, rsp_tx)),
502 Err(boxed_err) => {
503 let (tx_verifier_error, tx_id) = *boxed_err;
504 if let Some(rsp_tx) = rsp_tx.take() {
505 let error_msg = format!(
506 "failed to validate tx: {tx_id}, error: {tx_verifier_error}"
507 );
508 let _ = rsp_tx.send(Err(error_msg.into()));
509 };
510
511 Err(Box::new((tx_verifier_error, tx_id)))
512 }
513 }
514 })
515 },
516 };
517
518 result
519 });
520
521 self.pending.push(task);
522 assert!(
523 self.cancel_handles
524 .insert(txid, (cancel_tx, gossiped_tx_req, source))
525 .is_none(),
526 "transactions are only queued once"
527 );
528 if let Some(source) = source {
529 *self.pending_per_peer.entry(source).or_insert(0) += 1;
532 }
533
534 debug!(
535 ?txid,
536 queue_len = self.pending.len(),
537 ?MAX_INBOUND_CONCURRENCY,
538 "queued transaction hash for download"
539 );
540 metrics::gauge!("mempool.currently.queued.transactions",).set(self.pending.len() as f64);
541 metrics::counter!("mempool.queued.transactions.total").increment(1);
542
543 Ok(())
544 }
545
546 pub fn cancel(&mut self, mined_ids: &HashSet<transaction::Hash>) {
549 let removed_txids: Vec<UnminedTxId> = self
552 .cancel_handles
553 .keys()
554 .filter(|txid| mined_ids.contains(&txid.mined_id()))
555 .cloned()
556 .collect();
557
558 for txid in removed_txids {
559 if let Some((cancel_tx, _gossip, source)) = self.cancel_handles.remove(&txid) {
560 let _ = cancel_tx.send(CancelDownloadAndVerify);
561 if let Some(source) = source {
562 Self::release_peer_slot(&mut self.pending_per_peer, source);
563 }
564 }
565 }
566 }
567
568 pub fn cancel_all(&mut self) {
571 let _ = std::mem::take(&mut self.pending);
573 for (_hash, (cancel_tx, _gossip, _source)) in self.cancel_handles.drain() {
577 let _ = cancel_tx.send(CancelDownloadAndVerify);
578 }
579 self.pending_per_peer.clear();
580 assert!(self.pending.is_empty());
581 assert!(self.cancel_handles.is_empty());
582 metrics::gauge!("mempool.currently.queued.transactions",).set(self.pending.len() as f64);
583 }
584
585 fn release_peer_slot(pending_per_peer: &mut HashMap<SocketAddr, usize>, source: SocketAddr) {
588 if let Some(count) = pending_per_peer.get_mut(&source) {
589 *count = count.saturating_sub(1);
590 if *count == 0 {
591 pending_per_peer.remove(&source);
592 }
593 }
594 }
595
596 #[allow(dead_code)]
598 pub fn in_flight(&self) -> usize {
599 self.pending.len()
600 }
601
602 pub fn transaction_requests(&self) -> impl Iterator<Item = &Gossip> {
604 self.cancel_handles
605 .iter()
606 .map(|(_tx_id, (_handle, tx, _source))| tx)
607 }
608
609 async fn transaction_in_best_chain(
611 state: &mut ZS,
612 txid: UnminedTxId,
613 ) -> Result<(), TransactionDownloadVerifyError> {
614 match state
615 .ready()
616 .await
617 .map_err(CloneError::from)
618 .map_err(TransactionDownloadVerifyError::StateError)?
619 .call(zs::Request::Transaction(txid.mined_id()))
620 .await
621 {
622 Ok(zs::Response::Transaction(None)) => Ok(()),
623 Ok(zs::Response::Transaction(Some(_))) => Err(TransactionDownloadVerifyError::InState),
624 Ok(_) => unreachable!("wrong response"),
625 Err(e) => Err(TransactionDownloadVerifyError::StateError(e.into())),
626 }?;
627
628 Ok(())
629 }
630}
631
632#[pinned_drop]
633impl<ZN, ZV, ZS> PinnedDrop for Downloads<ZN, ZV, ZS>
634where
635 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
636 ZN::Future: Send,
637 ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
638 ZV::Future: Send,
639 ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
640 ZS::Future: Send,
641{
642 fn drop(mut self: Pin<&mut Self>) {
643 self.cancel_all();
644
645 metrics::gauge!("mempool.currently.queued.transactions").set(0 as f64);
646 }
647}