Skip to main content

zebrad/components/mempool/
downloads.rs

1//! Transaction downloader and verifier.
2//!
3//! The main struct [`Downloads`] allows downloading and verifying transactions.
4//! It is used by the mempool to get transactions into it. It is also able to
5//! just verify transactions that were directly pushed.
6//!
7//! The verification itself is done by the [`zebra_consensus`] crate.
8//!
9//! Verified transactions are returned to the caller in [`Downloads::poll_next`].
10//! This is in contrast to the block downloader and verifiers which don't
11//! return anything and forward the verified blocks to the state themselves.
12//!
13//! # Correctness
14//!
15//! The mempool downloader doesn't send verified transactions to the [`Mempool`]
16//! service. So Zebra must spawn a task that regularly polls the downloader for
17//! ready transactions. (To ensure that transactions propagate across the entire
18//! network in each 75s block interval, the polling interval should be around
19//! 5-10 seconds.)
20//!
21//! Polling the downloader from [`Mempool::poll_ready`] is not sufficient.
22//! [`Service::poll_ready`] is only called when there is a service request.
23//! But we want to download and gossip transactions,
24//! even when there are no other service requests.
25//!
26//! [`Mempool`]: super::Mempool
27//! [`Mempool::poll_ready`]: super::Mempool::poll_ready
28use std::{
29    collections::{HashMap, HashSet},
30    pin::Pin,
31    task::{Context, Poll},
32    time::Duration,
33};
34
35use futures::{
36    future::TryFutureExt,
37    ready,
38    stream::{FuturesUnordered, Stream},
39    FutureExt,
40};
41use pin_project::{pin_project, pinned_drop};
42use thiserror::Error;
43use tokio::{sync::oneshot, task::JoinHandle};
44use tower::{Service, ServiceExt};
45use tracing_futures::Instrument;
46
47use zebra_chain::{
48    block::Height,
49    transaction::{self, UnminedTxId, VerifiedUnminedTx},
50    transparent,
51};
52use zebra_consensus::transaction as tx;
53use zebra_network::{self as zn, PeerSocketAddr};
54use zebra_node_services::mempool::Gossip;
55use zebra_state::{self as zs, CloneError};
56
57use crate::components::{
58    mempool::crawler::RATE_LIMIT_DELAY,
59    sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT},
60};
61
62use super::MempoolError;
63
64type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
65
66/// Controls how long we wait for a transaction download request to complete.
67///
68/// This is currently equal to [`BLOCK_DOWNLOAD_TIMEOUT`] for
69/// consistency, even though parts of the rationale used for defining the value
70/// don't apply here (e.g. we can drop transactions hashes when the queue is full).
71pub(crate) const TRANSACTION_DOWNLOAD_TIMEOUT: Duration = BLOCK_DOWNLOAD_TIMEOUT;
72
73/// Controls how long we wait for a transaction verify request to complete.
74///
75/// This is currently equal to [`BLOCK_VERIFY_TIMEOUT`] for
76/// consistency.
77///
78/// This timeout may lead to denial of service, which will be handled in
79/// [#2694](https://github.com/ZcashFoundation/zebra/issues/2694)
80pub(crate) const TRANSACTION_VERIFY_TIMEOUT: Duration = BLOCK_VERIFY_TIMEOUT;
81
82/// The maximum number of concurrent inbound download and verify tasks.
83///
84/// We expect the mempool crawler to download and verify most mempool transactions, so this bound
85/// can be small. But it should be at least the default `network.peerset_initial_target_size` config,
86/// to avoid disconnecting peers on startup.
87///
88/// ## Security
89///
90/// We use a small concurrency limit, to prevent memory denial-of-service
91/// attacks.
92///
93/// The maximum transaction size is 2 million bytes. A deserialized malicious
94/// transaction with ~225_000 transparent outputs can take up 9MB of RAM.
95/// (See #1880 for more details.)
96///
97/// Malicious transactions will eventually timeout or fail validation.
98/// Once validation fails, the transaction is dropped, and its memory is deallocated.
99///
100/// Since Zebra keeps an `inv` index, inbound downloads for malicious transactions
101/// will be directed to the malicious node that originally gossiped the hash.
102/// Therefore, this attack can be carried out by a single malicious node.
103//
104// TODO: replace with the configured value of network.peerset_initial_target_size
105pub const MAX_INBOUND_CONCURRENCY: usize = 25;
106
107/// A marker struct for the oneshot channels which cancel a pending download and verify.
108#[derive(Copy, Clone, Debug, Eq, PartialEq)]
109struct CancelDownloadAndVerify;
110
111/// Errors that can occur while downloading and verifying a transaction.
112#[derive(Error, Debug, Clone)]
113#[allow(dead_code)]
114pub enum TransactionDownloadVerifyError {
115    #[error("transaction is already in state")]
116    InState,
117
118    #[error("error in state service: {0}")]
119    StateError(#[source] CloneError),
120
121    #[error("error downloading transaction: {0}")]
122    DownloadFailed(#[source] CloneError),
123
124    #[error("transaction download / verification was cancelled")]
125    Cancelled,
126
127    #[error("transaction did not pass consensus validation: {error}")]
128    Invalid {
129        error: zebra_consensus::error::TransactionError,
130        advertiser_addr: Option<PeerSocketAddr>,
131    },
132}
133
134/// Represents a [`Stream`] of download and verification tasks.
135#[pin_project(PinnedDrop)]
136#[derive(Debug)]
137pub struct Downloads<ZN, ZV, ZS>
138where
139    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
140    ZN::Future: Send,
141    ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
142    ZV::Future: Send,
143    ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
144    ZS::Future: Send,
145{
146    // Services
147    /// A service that forwards requests to connected peers, and returns their
148    /// responses.
149    network: ZN,
150
151    /// A service that verifies downloaded transactions.
152    verifier: ZV,
153
154    /// A service that manages cached blockchain state.
155    state: ZS,
156
157    // Internal downloads state
158    /// A list of pending transaction download and verify tasks.
159    #[pin]
160    pending: FuturesUnordered<
161        JoinHandle<
162            Result<
163                Result<
164                    (
165                        VerifiedUnminedTx,
166                        Vec<transparent::OutPoint>,
167                        Option<Height>,
168                        Option<oneshot::Sender<Result<(), BoxError>>>,
169                    ),
170                    Box<(TransactionDownloadVerifyError, UnminedTxId)>,
171                >,
172                tokio::time::error::Elapsed,
173            >,
174        >,
175    >,
176
177    /// A list of channels that can be used to cancel pending transaction download and
178    /// verify tasks. Each channel also has the corresponding request.
179    cancel_handles: HashMap<UnminedTxId, (oneshot::Sender<CancelDownloadAndVerify>, Gossip)>,
180}
181
182impl<ZN, ZV, ZS> Stream for Downloads<ZN, ZV, ZS>
183where
184    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
185    ZN::Future: Send,
186    ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
187    ZV::Future: Send,
188    ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
189    ZS::Future: Send,
190{
191    type Item = Result<
192        Result<
193            (
194                VerifiedUnminedTx,
195                Vec<transparent::OutPoint>,
196                Option<Height>,
197                Option<oneshot::Sender<Result<(), BoxError>>>,
198            ),
199            Box<(UnminedTxId, TransactionDownloadVerifyError)>,
200        >,
201        tokio::time::error::Elapsed,
202    >;
203
204    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
205        let this = self.project();
206        // CORRECTNESS
207        //
208        // The current task must be scheduled for wakeup every time we return
209        // `Poll::Pending`.
210        //
211        // If no download and verify tasks have exited since the last poll, this
212        // task is scheduled for wakeup when the next task becomes ready.
213        //
214        // TODO: this would be cleaner with poll_map (#2693)
215        let item = if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
216            let result = join_result.expect("transaction download and verify tasks must not panic");
217            let result = match result {
218                Ok(Ok((tx, spent_mempool_outpoints, tip_height, rsp_tx))) => {
219                    this.cancel_handles.remove(&tx.transaction.id);
220                    Ok(Ok((tx, spent_mempool_outpoints, tip_height, rsp_tx)))
221                }
222                Ok(Err(boxed_err)) => {
223                    let (e, hash) = *boxed_err;
224                    this.cancel_handles.remove(&hash);
225                    Ok(Err(Box::new((hash, e))))
226                }
227                Err(elapsed) => Err(elapsed),
228            };
229
230            Some(result)
231        } else {
232            None
233        };
234
235        Poll::Ready(item)
236    }
237
238    fn size_hint(&self) -> (usize, Option<usize>) {
239        self.pending.size_hint()
240    }
241}
242
243impl<ZN, ZV, ZS> Downloads<ZN, ZV, ZS>
244where
245    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
246    ZN::Future: Send,
247    ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
248    ZV::Future: Send,
249    ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
250    ZS::Future: Send,
251{
252    /// Initialize a new download stream with the provided services.
253    ///
254    /// `network` is used to download transactions.
255    /// `verifier` is used to verify transactions.
256    /// `state` is used to check if transactions are already in the state.
257    ///
258    /// The [`Downloads`] stream is agnostic to the network policy, so retry and
259    /// timeout limits should be applied to the `network` service passed into
260    /// this constructor.
261    pub fn new(network: ZN, verifier: ZV, state: ZS) -> Self {
262        Self {
263            network,
264            verifier,
265            state,
266            pending: FuturesUnordered::new(),
267            cancel_handles: HashMap::new(),
268        }
269    }
270
271    /// Queue a transaction for download (if needed) and verification.
272    ///
273    /// Returns the action taken in response to the queue request.
274    #[instrument(skip(self, gossiped_tx), fields(txid = %gossiped_tx.id()))]
275    #[allow(clippy::unwrap_in_result)]
276    pub fn download_if_needed_and_verify(
277        &mut self,
278        gossiped_tx: Gossip,
279        mut rsp_tx: Option<oneshot::Sender<Result<(), BoxError>>>,
280    ) -> Result<(), MempoolError> {
281        let txid = gossiped_tx.id();
282
283        if self.cancel_handles.contains_key(&txid) {
284            debug!(
285                ?txid,
286                queue_len = self.pending.len(),
287                ?MAX_INBOUND_CONCURRENCY,
288                "transaction id already queued for inbound download: ignored transaction"
289            );
290            metrics::gauge!("mempool.currently.queued.transactions",)
291                .set(self.pending.len() as f64);
292
293            return Err(MempoolError::AlreadyQueued);
294        }
295
296        if self.pending.len() >= MAX_INBOUND_CONCURRENCY {
297            debug!(
298                ?txid,
299                queue_len = self.pending.len(),
300                ?MAX_INBOUND_CONCURRENCY,
301                "too many transactions queued for inbound download: ignored transaction"
302            );
303            metrics::gauge!("mempool.currently.queued.transactions",)
304                .set(self.pending.len() as f64);
305
306            return Err(MempoolError::FullQueue);
307        }
308
309        // This oneshot is used to signal cancellation to the download task.
310        let (cancel_tx, mut cancel_rx) = oneshot::channel::<CancelDownloadAndVerify>();
311
312        let network = self.network.clone();
313        let verifier = self.verifier.clone();
314        let mut state = self.state.clone();
315
316        let gossiped_tx_req = gossiped_tx.clone();
317
318        let fut = async move {
319            // Don't download/verify if the transaction is already in the best chain.
320            Self::transaction_in_best_chain(&mut state, txid).await?;
321
322            trace!(?txid, "transaction is not in best chain");
323
324            let (tip_height, next_height) = match state.oneshot(zs::Request::Tip).await {
325                Ok(zs::Response::Tip(None)) => Ok((None, Height(0))),
326                Ok(zs::Response::Tip(Some((height, _hash)))) => {
327                    let next_height =
328                        (height + 1).expect("valid heights are far below the maximum");
329                    Ok((Some(height), next_height))
330                }
331                Ok(_) => unreachable!("wrong response"),
332                Err(e) => Err(TransactionDownloadVerifyError::StateError(e.into())),
333            }?;
334
335            trace!(?txid, ?next_height, "got next height");
336
337            let (tx, advertiser_addr) = match gossiped_tx {
338                Gossip::Id(txid) => {
339                    let req = zn::Request::TransactionsById(std::iter::once(txid).collect());
340
341                    let tx = match network
342                        .oneshot(req)
343                        .await
344                        .map_err(CloneError::from)
345                        .map_err(TransactionDownloadVerifyError::DownloadFailed)?
346                    {
347                        zn::Response::Transactions(mut txs) => txs.pop().ok_or_else(|| {
348                            TransactionDownloadVerifyError::DownloadFailed(
349                                BoxError::from("no transactions returned").into(),
350                            )
351                        })?,
352                        _ => unreachable!("wrong response to transaction request"),
353                    };
354
355                    let (tx, advertiser_addr) = tx.available().expect(
356                        "unexpected missing tx status: single tx failures should be errors",
357                    );
358
359                    metrics::counter!(
360                        "mempool.downloaded.transactions.total",
361                        "version" => format!("{}",tx.transaction.version()),
362                    ).increment(1);
363                    (tx, advertiser_addr)
364                }
365                Gossip::Tx(tx) => {
366                    metrics::counter!(
367                        "mempool.pushed.transactions.total",
368                        "version" => format!("{}",tx.transaction.version()),
369                    ).increment(1);
370                    (tx, None)
371                }
372            };
373
374            trace!(?txid, "got tx");
375
376            let result = verifier
377                .oneshot(tx::Request::Mempool {
378                    transaction: tx.clone(),
379                    height: next_height,
380                })
381                .map_ok(|rsp| {
382                    let tx::Response::Mempool { transaction, spent_mempool_outpoints } = rsp else {
383                        panic!("unexpected non-mempool response to mempool request")
384                    };
385
386                    (transaction, spent_mempool_outpoints, tip_height)
387                })
388                .await;
389
390            // Hide the transaction data to avoid filling the logs
391            trace!(?txid, result = ?result.as_ref().map(|_tx| ()), "verified transaction for the mempool");
392
393            result.map_err(|e| TransactionDownloadVerifyError::Invalid { error: e.into(), advertiser_addr } )
394        }
395        .map_ok(|(tx, spent_mempool_outpoints, tip_height)| {
396            metrics::counter!(
397                "mempool.verified.transactions.total",
398                "version" => format!("{}", tx.transaction.transaction.version()),
399            ).increment(1);
400            (tx, spent_mempool_outpoints, tip_height)
401        })
402        // Tack the hash onto the error so we can remove the cancel handle
403        // on failure as well as on success.
404        .map_err(move |e| Box::new((e, txid)))
405        .inspect(move |result| {
406            // Hide the transaction data to avoid filling the logs
407            let result = result.as_ref().map(|_tx| txid);
408            debug!("mempool transaction result: {result:?}");
409        })
410        .in_current_span();
411
412        let task = tokio::spawn(async move {
413            let fut = tokio::time::timeout(RATE_LIMIT_DELAY, fut);
414
415            // Prefer the cancel handle if both are ready.
416            let result = tokio::select! {
417                biased;
418                _ = &mut cancel_rx => {
419                    trace!("task cancelled prior to completion");
420                    metrics::counter!("mempool.cancelled.verify.tasks.total").increment(1);
421                    if let Some(rsp_tx) = rsp_tx.take() {
422                        let _ = rsp_tx.send(Err("verification cancelled".into()));
423                    }
424
425                    Ok(Err(Box::new((TransactionDownloadVerifyError::Cancelled, txid))))
426                }
427                verification = fut => {
428                    verification
429                        .inspect_err(|_elapsed| {
430                            if let Some(rsp_tx) = rsp_tx.take() {
431                                let _ = rsp_tx.send(Err("timeout waiting for verification result".into()));
432                            }
433                        })
434                        .map(|inner_result| {
435                            match inner_result {
436                                Ok((transaction, spent_mempool_outpoints, tip_height)) => Ok((transaction, spent_mempool_outpoints, tip_height, rsp_tx)),
437                                Err(boxed_err) => {
438                                    let (tx_verifier_error, tx_id) = *boxed_err;
439                                    if let Some(rsp_tx) = rsp_tx.take() {
440                                        let error_msg = format!(
441                                            "failed to validate tx: {tx_id}, error: {tx_verifier_error}"
442                                        );
443                                        let _ = rsp_tx.send(Err(error_msg.into()));
444                                    };
445
446                                    Err(Box::new((tx_verifier_error, tx_id)))
447                                }
448                            }
449                        })
450                },
451            };
452
453            result
454        });
455
456        self.pending.push(task);
457        assert!(
458            self.cancel_handles
459                .insert(txid, (cancel_tx, gossiped_tx_req))
460                .is_none(),
461            "transactions are only queued once"
462        );
463
464        debug!(
465            ?txid,
466            queue_len = self.pending.len(),
467            ?MAX_INBOUND_CONCURRENCY,
468            "queued transaction hash for download"
469        );
470        metrics::gauge!("mempool.currently.queued.transactions",).set(self.pending.len() as f64);
471        metrics::counter!("mempool.queued.transactions.total").increment(1);
472
473        Ok(())
474    }
475
476    /// Cancel download/verification tasks of transactions with the
477    /// given transaction hash (see [`UnminedTxId::mined_id`]).
478    pub fn cancel(&mut self, mined_ids: &HashSet<transaction::Hash>) {
479        // TODO: this can be simplified with [`HashMap::drain_filter`] which
480        // is currently nightly-only experimental API.
481        let removed_txids: Vec<UnminedTxId> = self
482            .cancel_handles
483            .keys()
484            .filter(|txid| mined_ids.contains(&txid.mined_id()))
485            .cloned()
486            .collect();
487
488        for txid in removed_txids {
489            if let Some(handle) = self.cancel_handles.remove(&txid) {
490                let _ = handle.0.send(CancelDownloadAndVerify);
491            }
492        }
493    }
494
495    /// Cancel all running tasks and reset the downloader state.
496    // Note: copied from zebrad/src/components/sync/downloads.rs
497    pub fn cancel_all(&mut self) {
498        // Replace the pending task list with an empty one and drop it.
499        let _ = std::mem::take(&mut self.pending);
500        // Signal cancellation to all running tasks.
501        // Since we already dropped the JoinHandles above, they should
502        // fail silently.
503        for (_hash, cancel) in self.cancel_handles.drain() {
504            let _ = cancel.0.send(CancelDownloadAndVerify);
505        }
506        assert!(self.pending.is_empty());
507        assert!(self.cancel_handles.is_empty());
508        metrics::gauge!("mempool.currently.queued.transactions",).set(self.pending.len() as f64);
509    }
510
511    /// Get the number of currently in-flight download tasks.
512    #[allow(dead_code)]
513    pub fn in_flight(&self) -> usize {
514        self.pending.len()
515    }
516
517    /// Get a list of the currently pending transaction requests.
518    pub fn transaction_requests(&self) -> impl Iterator<Item = &Gossip> {
519        self.cancel_handles.iter().map(|(_tx_id, (_handle, tx))| tx)
520    }
521
522    /// Check if transaction is already in the best chain.
523    async fn transaction_in_best_chain(
524        state: &mut ZS,
525        txid: UnminedTxId,
526    ) -> Result<(), TransactionDownloadVerifyError> {
527        match state
528            .ready()
529            .await
530            .map_err(CloneError::from)
531            .map_err(TransactionDownloadVerifyError::StateError)?
532            .call(zs::Request::Transaction(txid.mined_id()))
533            .await
534        {
535            Ok(zs::Response::Transaction(None)) => Ok(()),
536            Ok(zs::Response::Transaction(Some(_))) => Err(TransactionDownloadVerifyError::InState),
537            Ok(_) => unreachable!("wrong response"),
538            Err(e) => Err(TransactionDownloadVerifyError::StateError(e.into())),
539        }?;
540
541        Ok(())
542    }
543}
544
545#[pinned_drop]
546impl<ZN, ZV, ZS> PinnedDrop for Downloads<ZN, ZV, ZS>
547where
548    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
549    ZN::Future: Send,
550    ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
551    ZV::Future: Send,
552    ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
553    ZS::Future: Send,
554{
555    fn drop(mut self: Pin<&mut Self>) {
556        self.cancel_all();
557
558        metrics::gauge!("mempool.currently.queued.transactions").set(0 as f64);
559    }
560}