1use std::{
29 collections::{HashMap, HashSet},
30 pin::Pin,
31 task::{Context, Poll},
32 time::Duration,
33};
34
35use futures::{
36 future::TryFutureExt,
37 ready,
38 stream::{FuturesUnordered, Stream},
39 FutureExt,
40};
41use pin_project::{pin_project, pinned_drop};
42use thiserror::Error;
43use tokio::{sync::oneshot, task::JoinHandle};
44use tower::{Service, ServiceExt};
45use tracing_futures::Instrument;
46
47use zebra_chain::{
48 block::Height,
49 transaction::{self, UnminedTxId, VerifiedUnminedTx},
50 transparent,
51};
52use zebra_consensus::transaction as tx;
53use zebra_network::{self as zn, PeerSocketAddr};
54use zebra_node_services::mempool::Gossip;
55use zebra_state::{self as zs, CloneError};
56
57use crate::components::{
58 mempool::crawler::RATE_LIMIT_DELAY,
59 sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT},
60};
61
62use super::MempoolError;
63
64type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
65
66pub(crate) const TRANSACTION_DOWNLOAD_TIMEOUT: Duration = BLOCK_DOWNLOAD_TIMEOUT;
72
73pub(crate) const TRANSACTION_VERIFY_TIMEOUT: Duration = BLOCK_VERIFY_TIMEOUT;
81
82pub const MAX_INBOUND_CONCURRENCY: usize = 25;
106
107#[derive(Copy, Clone, Debug, Eq, PartialEq)]
109struct CancelDownloadAndVerify;
110
111#[derive(Error, Debug, Clone)]
113#[allow(dead_code)]
114pub enum TransactionDownloadVerifyError {
115 #[error("transaction is already in state")]
116 InState,
117
118 #[error("error in state service: {0}")]
119 StateError(#[source] CloneError),
120
121 #[error("error downloading transaction: {0}")]
122 DownloadFailed(#[source] CloneError),
123
124 #[error("transaction download / verification was cancelled")]
125 Cancelled,
126
127 #[error("transaction did not pass consensus validation: {error}")]
128 Invalid {
129 error: zebra_consensus::error::TransactionError,
130 advertiser_addr: Option<PeerSocketAddr>,
131 },
132}
133
134#[pin_project(PinnedDrop)]
136#[derive(Debug)]
137pub struct Downloads<ZN, ZV, ZS>
138where
139 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
140 ZN::Future: Send,
141 ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
142 ZV::Future: Send,
143 ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
144 ZS::Future: Send,
145{
146 network: ZN,
150
151 verifier: ZV,
153
154 state: ZS,
156
157 #[pin]
160 pending: FuturesUnordered<
161 JoinHandle<
162 Result<
163 Result<
164 (
165 VerifiedUnminedTx,
166 Vec<transparent::OutPoint>,
167 Option<Height>,
168 Option<oneshot::Sender<Result<(), BoxError>>>,
169 ),
170 Box<(TransactionDownloadVerifyError, UnminedTxId)>,
171 >,
172 tokio::time::error::Elapsed,
173 >,
174 >,
175 >,
176
177 cancel_handles: HashMap<UnminedTxId, (oneshot::Sender<CancelDownloadAndVerify>, Gossip)>,
180}
181
182impl<ZN, ZV, ZS> Stream for Downloads<ZN, ZV, ZS>
183where
184 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
185 ZN::Future: Send,
186 ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
187 ZV::Future: Send,
188 ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
189 ZS::Future: Send,
190{
191 type Item = Result<
192 Result<
193 (
194 VerifiedUnminedTx,
195 Vec<transparent::OutPoint>,
196 Option<Height>,
197 Option<oneshot::Sender<Result<(), BoxError>>>,
198 ),
199 Box<(UnminedTxId, TransactionDownloadVerifyError)>,
200 >,
201 tokio::time::error::Elapsed,
202 >;
203
204 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
205 let this = self.project();
206 let item = if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
216 let result = join_result.expect("transaction download and verify tasks must not panic");
217 let result = match result {
218 Ok(Ok((tx, spent_mempool_outpoints, tip_height, rsp_tx))) => {
219 this.cancel_handles.remove(&tx.transaction.id);
220 Ok(Ok((tx, spent_mempool_outpoints, tip_height, rsp_tx)))
221 }
222 Ok(Err(boxed_err)) => {
223 let (e, hash) = *boxed_err;
224 this.cancel_handles.remove(&hash);
225 Ok(Err(Box::new((hash, e))))
226 }
227 Err(elapsed) => Err(elapsed),
228 };
229
230 Some(result)
231 } else {
232 None
233 };
234
235 Poll::Ready(item)
236 }
237
238 fn size_hint(&self) -> (usize, Option<usize>) {
239 self.pending.size_hint()
240 }
241}
242
243impl<ZN, ZV, ZS> Downloads<ZN, ZV, ZS>
244where
245 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
246 ZN::Future: Send,
247 ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
248 ZV::Future: Send,
249 ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
250 ZS::Future: Send,
251{
252 pub fn new(network: ZN, verifier: ZV, state: ZS) -> Self {
262 Self {
263 network,
264 verifier,
265 state,
266 pending: FuturesUnordered::new(),
267 cancel_handles: HashMap::new(),
268 }
269 }
270
271 #[instrument(skip(self, gossiped_tx), fields(txid = %gossiped_tx.id()))]
275 #[allow(clippy::unwrap_in_result)]
276 pub fn download_if_needed_and_verify(
277 &mut self,
278 gossiped_tx: Gossip,
279 mut rsp_tx: Option<oneshot::Sender<Result<(), BoxError>>>,
280 ) -> Result<(), MempoolError> {
281 let txid = gossiped_tx.id();
282
283 if self.cancel_handles.contains_key(&txid) {
284 debug!(
285 ?txid,
286 queue_len = self.pending.len(),
287 ?MAX_INBOUND_CONCURRENCY,
288 "transaction id already queued for inbound download: ignored transaction"
289 );
290 metrics::gauge!("mempool.currently.queued.transactions",)
291 .set(self.pending.len() as f64);
292
293 return Err(MempoolError::AlreadyQueued);
294 }
295
296 if self.pending.len() >= MAX_INBOUND_CONCURRENCY {
297 debug!(
298 ?txid,
299 queue_len = self.pending.len(),
300 ?MAX_INBOUND_CONCURRENCY,
301 "too many transactions queued for inbound download: ignored transaction"
302 );
303 metrics::gauge!("mempool.currently.queued.transactions",)
304 .set(self.pending.len() as f64);
305
306 return Err(MempoolError::FullQueue);
307 }
308
309 let (cancel_tx, mut cancel_rx) = oneshot::channel::<CancelDownloadAndVerify>();
311
312 let network = self.network.clone();
313 let verifier = self.verifier.clone();
314 let mut state = self.state.clone();
315
316 let gossiped_tx_req = gossiped_tx.clone();
317
318 let fut = async move {
319 Self::transaction_in_best_chain(&mut state, txid).await?;
321
322 trace!(?txid, "transaction is not in best chain");
323
324 let (tip_height, next_height) = match state.oneshot(zs::Request::Tip).await {
325 Ok(zs::Response::Tip(None)) => Ok((None, Height(0))),
326 Ok(zs::Response::Tip(Some((height, _hash)))) => {
327 let next_height =
328 (height + 1).expect("valid heights are far below the maximum");
329 Ok((Some(height), next_height))
330 }
331 Ok(_) => unreachable!("wrong response"),
332 Err(e) => Err(TransactionDownloadVerifyError::StateError(e.into())),
333 }?;
334
335 trace!(?txid, ?next_height, "got next height");
336
337 let (tx, advertiser_addr) = match gossiped_tx {
338 Gossip::Id(txid) => {
339 let req = zn::Request::TransactionsById(std::iter::once(txid).collect());
340
341 let tx = match network
342 .oneshot(req)
343 .await
344 .map_err(CloneError::from)
345 .map_err(TransactionDownloadVerifyError::DownloadFailed)?
346 {
347 zn::Response::Transactions(mut txs) => txs.pop().ok_or_else(|| {
348 TransactionDownloadVerifyError::DownloadFailed(
349 BoxError::from("no transactions returned").into(),
350 )
351 })?,
352 _ => unreachable!("wrong response to transaction request"),
353 };
354
355 let (tx, advertiser_addr) = tx.available().expect(
356 "unexpected missing tx status: single tx failures should be errors",
357 );
358
359 metrics::counter!(
360 "mempool.downloaded.transactions.total",
361 "version" => format!("{}",tx.transaction.version()),
362 ).increment(1);
363 (tx, advertiser_addr)
364 }
365 Gossip::Tx(tx) => {
366 metrics::counter!(
367 "mempool.pushed.transactions.total",
368 "version" => format!("{}",tx.transaction.version()),
369 ).increment(1);
370 (tx, None)
371 }
372 };
373
374 trace!(?txid, "got tx");
375
376 let result = verifier
377 .oneshot(tx::Request::Mempool {
378 transaction: tx.clone(),
379 height: next_height,
380 })
381 .map_ok(|rsp| {
382 let tx::Response::Mempool { transaction, spent_mempool_outpoints } = rsp else {
383 panic!("unexpected non-mempool response to mempool request")
384 };
385
386 (transaction, spent_mempool_outpoints, tip_height)
387 })
388 .await;
389
390 trace!(?txid, result = ?result.as_ref().map(|_tx| ()), "verified transaction for the mempool");
392
393 result.map_err(|e| TransactionDownloadVerifyError::Invalid { error: e.into(), advertiser_addr } )
394 }
395 .map_ok(|(tx, spent_mempool_outpoints, tip_height)| {
396 metrics::counter!(
397 "mempool.verified.transactions.total",
398 "version" => format!("{}", tx.transaction.transaction.version()),
399 ).increment(1);
400 (tx, spent_mempool_outpoints, tip_height)
401 })
402 .map_err(move |e| Box::new((e, txid)))
405 .inspect(move |result| {
406 let result = result.as_ref().map(|_tx| txid);
408 debug!("mempool transaction result: {result:?}");
409 })
410 .in_current_span();
411
412 let task = tokio::spawn(async move {
413 let fut = tokio::time::timeout(RATE_LIMIT_DELAY, fut);
414
415 let result = tokio::select! {
417 biased;
418 _ = &mut cancel_rx => {
419 trace!("task cancelled prior to completion");
420 metrics::counter!("mempool.cancelled.verify.tasks.total").increment(1);
421 if let Some(rsp_tx) = rsp_tx.take() {
422 let _ = rsp_tx.send(Err("verification cancelled".into()));
423 }
424
425 Ok(Err(Box::new((TransactionDownloadVerifyError::Cancelled, txid))))
426 }
427 verification = fut => {
428 verification
429 .inspect_err(|_elapsed| {
430 if let Some(rsp_tx) = rsp_tx.take() {
431 let _ = rsp_tx.send(Err("timeout waiting for verification result".into()));
432 }
433 })
434 .map(|inner_result| {
435 match inner_result {
436 Ok((transaction, spent_mempool_outpoints, tip_height)) => Ok((transaction, spent_mempool_outpoints, tip_height, rsp_tx)),
437 Err(boxed_err) => {
438 let (tx_verifier_error, tx_id) = *boxed_err;
439 if let Some(rsp_tx) = rsp_tx.take() {
440 let error_msg = format!(
441 "failed to validate tx: {tx_id}, error: {tx_verifier_error}"
442 );
443 let _ = rsp_tx.send(Err(error_msg.into()));
444 };
445
446 Err(Box::new((tx_verifier_error, tx_id)))
447 }
448 }
449 })
450 },
451 };
452
453 result
454 });
455
456 self.pending.push(task);
457 assert!(
458 self.cancel_handles
459 .insert(txid, (cancel_tx, gossiped_tx_req))
460 .is_none(),
461 "transactions are only queued once"
462 );
463
464 debug!(
465 ?txid,
466 queue_len = self.pending.len(),
467 ?MAX_INBOUND_CONCURRENCY,
468 "queued transaction hash for download"
469 );
470 metrics::gauge!("mempool.currently.queued.transactions",).set(self.pending.len() as f64);
471 metrics::counter!("mempool.queued.transactions.total").increment(1);
472
473 Ok(())
474 }
475
476 pub fn cancel(&mut self, mined_ids: &HashSet<transaction::Hash>) {
479 let removed_txids: Vec<UnminedTxId> = self
482 .cancel_handles
483 .keys()
484 .filter(|txid| mined_ids.contains(&txid.mined_id()))
485 .cloned()
486 .collect();
487
488 for txid in removed_txids {
489 if let Some(handle) = self.cancel_handles.remove(&txid) {
490 let _ = handle.0.send(CancelDownloadAndVerify);
491 }
492 }
493 }
494
495 pub fn cancel_all(&mut self) {
498 let _ = std::mem::take(&mut self.pending);
500 for (_hash, cancel) in self.cancel_handles.drain() {
504 let _ = cancel.0.send(CancelDownloadAndVerify);
505 }
506 assert!(self.pending.is_empty());
507 assert!(self.cancel_handles.is_empty());
508 metrics::gauge!("mempool.currently.queued.transactions",).set(self.pending.len() as f64);
509 }
510
511 #[allow(dead_code)]
513 pub fn in_flight(&self) -> usize {
514 self.pending.len()
515 }
516
517 pub fn transaction_requests(&self) -> impl Iterator<Item = &Gossip> {
519 self.cancel_handles.iter().map(|(_tx_id, (_handle, tx))| tx)
520 }
521
522 async fn transaction_in_best_chain(
524 state: &mut ZS,
525 txid: UnminedTxId,
526 ) -> Result<(), TransactionDownloadVerifyError> {
527 match state
528 .ready()
529 .await
530 .map_err(CloneError::from)
531 .map_err(TransactionDownloadVerifyError::StateError)?
532 .call(zs::Request::Transaction(txid.mined_id()))
533 .await
534 {
535 Ok(zs::Response::Transaction(None)) => Ok(()),
536 Ok(zs::Response::Transaction(Some(_))) => Err(TransactionDownloadVerifyError::InState),
537 Ok(_) => unreachable!("wrong response"),
538 Err(e) => Err(TransactionDownloadVerifyError::StateError(e.into())),
539 }?;
540
541 Ok(())
542 }
543}
544
545#[pinned_drop]
546impl<ZN, ZV, ZS> PinnedDrop for Downloads<ZN, ZV, ZS>
547where
548 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
549 ZN::Future: Send,
550 ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
551 ZV::Future: Send,
552 ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
553 ZS::Future: Send,
554{
555 fn drop(mut self: Pin<&mut Self>) {
556 self.cancel_all();
557
558 metrics::gauge!("mempool.currently.queued.transactions").set(0 as f64);
559 }
560}