Skip to main content

zebrad/components/mempool/
downloads.rs

1//! Transaction downloader and verifier.
2//!
3//! The main struct [`Downloads`] allows downloading and verifying transactions.
4//! It is used by the mempool to get transactions into it. It is also able to
5//! just verify transactions that were directly pushed.
6//!
7//! The verification itself is done by the [`zebra_consensus`] crate.
8//!
9//! Verified transactions are returned to the caller in [`Downloads::poll_next`].
10//! This is in contrast to the block downloader and verifiers which don't
11//! return anything and forward the verified blocks to the state themselves.
12//!
13//! # Correctness
14//!
15//! The mempool downloader doesn't send verified transactions to the [`Mempool`]
16//! service. So Zebra must spawn a task that regularly polls the downloader for
17//! ready transactions. (To ensure that transactions propagate across the entire
18//! network in each 75s block interval, the polling interval should be around
19//! 5-10 seconds.)
20//!
21//! Polling the downloader from [`Mempool::poll_ready`] is not sufficient.
22//! [`Service::poll_ready`] is only called when there is a service request.
23//! But we want to download and gossip transactions,
24//! even when there are no other service requests.
25//!
26//! [`Mempool`]: super::Mempool
27//! [`Mempool::poll_ready`]: super::Mempool::poll_ready
28use std::{
29    collections::{HashMap, HashSet},
30    net::SocketAddr,
31    pin::Pin,
32    task::{Context, Poll},
33    time::Duration,
34};
35
36use futures::{
37    future::TryFutureExt,
38    ready,
39    stream::{FuturesUnordered, Stream},
40    FutureExt,
41};
42use pin_project::{pin_project, pinned_drop};
43use thiserror::Error;
44use tokio::{sync::oneshot, task::JoinHandle};
45use tower::{Service, ServiceExt};
46use tracing_futures::Instrument;
47
48use zebra_chain::{
49    block::Height,
50    transaction::{self, UnminedTxId, VerifiedUnminedTx},
51    transparent,
52};
53use zebra_consensus::transaction as tx;
54use zebra_network::{self as zn, PeerSocketAddr};
55use zebra_node_services::mempool::Gossip;
56use zebra_state::{self as zs, CloneError};
57
58use crate::components::{
59    mempool::crawler::RATE_LIMIT_DELAY,
60    sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT},
61};
62
63use super::MempoolError;
64
65type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
66
67/// Controls how long we wait for a transaction download request to complete.
68///
69/// This is currently equal to [`BLOCK_DOWNLOAD_TIMEOUT`] for
70/// consistency, even though parts of the rationale used for defining the value
71/// don't apply here (e.g. we can drop transactions hashes when the queue is full).
72pub(crate) const TRANSACTION_DOWNLOAD_TIMEOUT: Duration = BLOCK_DOWNLOAD_TIMEOUT;
73
74/// Controls how long we wait for a transaction verify request to complete.
75///
76/// This is currently equal to [`BLOCK_VERIFY_TIMEOUT`] for
77/// consistency.
78///
79/// This timeout may lead to denial of service, which will be handled in
80/// [#2694](https://github.com/ZcashFoundation/zebra/issues/2694)
81pub(crate) const TRANSACTION_VERIFY_TIMEOUT: Duration = BLOCK_VERIFY_TIMEOUT;
82
83/// The maximum number of concurrent inbound download and verify tasks.
84///
85/// We expect the mempool crawler to download and verify most mempool transactions, so this bound
86/// can be small. But it should be at least the default `network.peerset_initial_target_size` config,
87/// to avoid disconnecting peers on startup.
88///
89/// ## Security
90///
91/// We use a small concurrency limit, to prevent memory denial-of-service
92/// attacks.
93///
94/// The maximum transaction size is 2 million bytes. A deserialized malicious
95/// transaction with ~225_000 transparent outputs can take up 9MB of RAM.
96/// (See #1880 for more details.)
97///
98/// Malicious transactions will eventually timeout or fail validation.
99/// Once validation fails, the transaction is dropped, and its memory is deallocated.
100///
101/// Since Zebra keeps an `inv` index, inbound downloads for malicious transactions
102/// will be directed to the malicious node that originally gossiped the hash.
103/// Therefore, this attack can be carried out by a single malicious node.
104//
105// TODO: replace with the configured value of network.peerset_initial_target_size
106pub const MAX_INBOUND_CONCURRENCY: usize = 500;
107
108/// The maximum number of concurrent inbound download tasks attributable to a
109/// single advertising peer.
110///
111/// Caps how many slots of [`MAX_INBOUND_CONCURRENCY`] one peer's `Inv`
112/// advertisements can occupy, so a single peer cannot saturate the global
113/// queue with fake txids and deny gossip-path mempool admission for honest
114/// peers. See `GHSA-4fc2-h7jh-287c`. Crawler-driven and locally-pushed
115/// transactions have no source peer and are not counted against the cap.
116pub const MAX_INBOUND_CONCURRENCY_PER_PEER: usize = 5;
117
118/// A marker struct for the oneshot channels which cancel a pending download and verify.
119#[derive(Copy, Clone, Debug, Eq, PartialEq)]
120struct CancelDownloadAndVerify;
121
122/// Errors that can occur while downloading and verifying a transaction.
123#[derive(Error, Debug, Clone)]
124#[allow(dead_code)]
125pub enum TransactionDownloadVerifyError {
126    #[error("transaction is already in state")]
127    InState,
128
129    #[error("error in state service: {0}")]
130    StateError(#[source] CloneError),
131
132    #[error("error downloading transaction: {0}")]
133    DownloadFailed(#[source] CloneError),
134
135    #[error("transaction download / verification was cancelled")]
136    Cancelled,
137
138    #[error("transaction did not pass consensus validation: {error}")]
139    Invalid {
140        error: zebra_consensus::error::TransactionError,
141        advertiser_addr: Option<PeerSocketAddr>,
142    },
143}
144
145/// Represents a [`Stream`] of download and verification tasks.
146#[pin_project(PinnedDrop)]
147#[derive(Debug)]
148pub struct Downloads<ZN, ZV, ZS>
149where
150    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
151    ZN::Future: Send,
152    ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
153    ZV::Future: Send,
154    ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
155    ZS::Future: Send,
156{
157    // Services
158    /// A service that forwards requests to connected peers, and returns their
159    /// responses.
160    network: ZN,
161
162    /// A service that verifies downloaded transactions.
163    verifier: ZV,
164
165    /// A service that manages cached blockchain state.
166    state: ZS,
167
168    // Internal downloads state
169    /// A list of pending transaction download and verify tasks.
170    #[pin]
171    pending: FuturesUnordered<
172        JoinHandle<
173            Result<
174                Result<
175                    (
176                        VerifiedUnminedTx,
177                        Vec<transparent::OutPoint>,
178                        Option<Height>,
179                        Option<oneshot::Sender<Result<(), BoxError>>>,
180                    ),
181                    Box<(TransactionDownloadVerifyError, UnminedTxId)>,
182                >,
183                (UnminedTxId, tokio::time::error::Elapsed),
184            >,
185        >,
186    >,
187
188    /// A list of channels that can be used to cancel pending transaction
189    /// download and verify tasks. Each entry also stores the corresponding
190    /// gossip request and the announcing peer (when known), so completion can
191    /// release the per-peer slot by `UnminedTxId` lookup.
192    cancel_handles: HashMap<
193        UnminedTxId,
194        (
195            oneshot::Sender<CancelDownloadAndVerify>,
196            Gossip,
197            Option<SocketAddr>,
198        ),
199    >,
200
201    /// The number of currently in-flight download tasks per advertising peer.
202    ///
203    /// Invariant: a peer is present here iff some entry in [`Self::cancel_handles`]
204    /// has it as the third tuple element. Enforces
205    /// [`MAX_INBOUND_CONCURRENCY_PER_PEER`]. See `GHSA-4fc2-h7jh-287c`.
206    pending_per_peer: HashMap<SocketAddr, usize>,
207}
208
209impl<ZN, ZV, ZS> Stream for Downloads<ZN, ZV, ZS>
210where
211    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
212    ZN::Future: Send,
213    ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
214    ZV::Future: Send,
215    ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
216    ZS::Future: Send,
217{
218    type Item = Result<
219        Result<
220            (
221                VerifiedUnminedTx,
222                Vec<transparent::OutPoint>,
223                Option<Height>,
224                Option<oneshot::Sender<Result<(), BoxError>>>,
225            ),
226            Box<(UnminedTxId, TransactionDownloadVerifyError)>,
227        >,
228        (UnminedTxId, tokio::time::error::Elapsed),
229    >;
230
231    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
232        let this = self.project();
233        // CORRECTNESS
234        //
235        // The current task must be scheduled for wakeup every time we return
236        // `Poll::Pending`.
237        //
238        // If no download and verify tasks have exited since the last poll, this
239        // task is scheduled for wakeup when the next task becomes ready.
240        //
241        // TODO: this would be cleaner with poll_map (#2693)
242        let item = if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
243            let result = join_result.expect("transaction download and verify tasks must not panic");
244            let (result, completed_txid) = match result {
245                Ok(Ok((tx, spent_mempool_outpoints, tip_height, rsp_tx))) => {
246                    let hash = tx.transaction.id;
247                    (
248                        Ok(Ok((tx, spent_mempool_outpoints, tip_height, rsp_tx))),
249                        Some(hash),
250                    )
251                }
252                Ok(Err(boxed_err)) => {
253                    let (e, hash) = *boxed_err;
254                    (Ok(Err(Box::new((hash, e)))), Some(hash))
255                }
256                Err((txid, elapsed)) => {
257                    // Remove the cancel handle so the spawned task's queued `Gossip`
258                    // doesn't stay resident in `cancel_handles` after a verification
259                    // timeout. Without this, a peer that gets each transaction to
260                    // hit `RATE_LIMIT_DELAY` can leak ~2 MB per tx until OOM.
261                    this.cancel_handles.remove(&txid);
262                    (Err((txid, elapsed)), None)
263                }
264            };
265
266            if let Some(hash) = completed_txid {
267                if let Some((_, _gossip, Some(source))) = this.cancel_handles.remove(&hash) {
268                    Self::release_peer_slot(this.pending_per_peer, source);
269                }
270            }
271
272            Some(result)
273        } else {
274            None
275        };
276
277        Poll::Ready(item)
278    }
279
280    fn size_hint(&self) -> (usize, Option<usize>) {
281        self.pending.size_hint()
282    }
283}
284
285impl<ZN, ZV, ZS> Downloads<ZN, ZV, ZS>
286where
287    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
288    ZN::Future: Send,
289    ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
290    ZV::Future: Send,
291    ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
292    ZS::Future: Send,
293{
294    /// Initialize a new download stream with the provided services.
295    ///
296    /// `network` is used to download transactions.
297    /// `verifier` is used to verify transactions.
298    /// `state` is used to check if transactions are already in the state.
299    ///
300    /// The [`Downloads`] stream is agnostic to the network policy, so retry and
301    /// timeout limits should be applied to the `network` service passed into
302    /// this constructor.
303    pub fn new(network: ZN, verifier: ZV, state: ZS) -> Self {
304        Self {
305            network,
306            verifier,
307            state,
308            pending: FuturesUnordered::new(),
309            cancel_handles: HashMap::new(),
310            pending_per_peer: HashMap::new(),
311        }
312    }
313
314    /// Queue a transaction for download (if needed) and verification.
315    ///
316    /// Returns the action taken in response to the queue request.
317    ///
318    /// When `source` is `Some`, the per-peer cap
319    /// [`MAX_INBOUND_CONCURRENCY_PER_PEER`] is enforced; crawler-driven and
320    /// locally-pushed transactions pass `None` and are not capped per peer.
321    #[instrument(skip(self, gossiped_tx), fields(txid = %gossiped_tx.id()))]
322    #[allow(clippy::unwrap_in_result)]
323    pub fn download_if_needed_and_verify(
324        &mut self,
325        gossiped_tx: Gossip,
326        source: Option<SocketAddr>,
327        mut rsp_tx: Option<oneshot::Sender<Result<(), BoxError>>>,
328    ) -> Result<(), MempoolError> {
329        let txid = gossiped_tx.id();
330
331        if self.cancel_handles.contains_key(&txid) {
332            debug!(
333                ?txid,
334                queue_len = self.pending.len(),
335                ?MAX_INBOUND_CONCURRENCY,
336                "transaction id already queued for inbound download: ignored transaction"
337            );
338            metrics::gauge!("mempool.currently.queued.transactions",)
339                .set(self.pending.len() as f64);
340
341            return Err(MempoolError::AlreadyQueued);
342        }
343
344        if self.pending.len() >= MAX_INBOUND_CONCURRENCY {
345            debug!(
346                ?txid,
347                queue_len = self.pending.len(),
348                ?MAX_INBOUND_CONCURRENCY,
349                "too many transactions queued for inbound download: ignored transaction"
350            );
351            metrics::gauge!("mempool.currently.queued.transactions",)
352                .set(self.pending.len() as f64);
353
354            return Err(MempoolError::FullQueue);
355        }
356
357        // Per-peer cap: a single advertising peer cannot saturate the queue
358        // with attacker-supplied fake txids. See `GHSA-4fc2-h7jh-287c`.
359        if let Some(source) = source {
360            let count = self.pending_per_peer.get(&source).copied().unwrap_or(0);
361            if count >= MAX_INBOUND_CONCURRENCY_PER_PEER {
362                debug!(
363                    ?txid,
364                    peer_queue_len = count,
365                    ?MAX_INBOUND_CONCURRENCY_PER_PEER,
366                    "too many transactions queued for this peer: ignored transaction"
367                );
368                metrics::counter!("mempool.full_queue.per_peer.total").increment(1);
369                return Err(MempoolError::FullQueue);
370            }
371        }
372
373        // This oneshot is used to signal cancellation to the download task.
374        let (cancel_tx, mut cancel_rx) = oneshot::channel::<CancelDownloadAndVerify>();
375
376        let network = self.network.clone();
377        let verifier = self.verifier.clone();
378        let mut state = self.state.clone();
379
380        let gossiped_tx_req = gossiped_tx.clone();
381
382        let fut = async move {
383            // Don't download/verify if the transaction is already in the best chain.
384            Self::transaction_in_best_chain(&mut state, txid).await?;
385
386            trace!(?txid, "transaction is not in best chain");
387
388            let (tip_height, next_height) = match state.oneshot(zs::Request::Tip).await {
389                Ok(zs::Response::Tip(None)) => Ok((None, Height(0))),
390                Ok(zs::Response::Tip(Some((height, _hash)))) => {
391                    let next_height =
392                        (height + 1).expect("valid heights are far below the maximum");
393                    Ok((Some(height), next_height))
394                }
395                Ok(_) => unreachable!("wrong response"),
396                Err(e) => Err(TransactionDownloadVerifyError::StateError(e.into())),
397            }?;
398
399            trace!(?txid, ?next_height, "got next height");
400
401            let (tx, advertiser_addr) = match gossiped_tx {
402                Gossip::Id(txid) => {
403                    let req = zn::Request::TransactionsById(std::iter::once(txid).collect());
404
405                    let tx = match network
406                        .oneshot(req)
407                        .await
408                        .map_err(CloneError::from)
409                        .map_err(TransactionDownloadVerifyError::DownloadFailed)?
410                    {
411                        zn::Response::Transactions(mut txs) => txs.pop().ok_or_else(|| {
412                            TransactionDownloadVerifyError::DownloadFailed(
413                                BoxError::from("no transactions returned").into(),
414                            )
415                        })?,
416                        _ => unreachable!("wrong response to transaction request"),
417                    };
418
419                    let (tx, advertiser_addr) = tx.available().expect(
420                        "unexpected missing tx status: single tx failures should be errors",
421                    );
422
423                    metrics::counter!(
424                        "mempool.downloaded.transactions.total",
425                        "version" => format!("{}",tx.transaction.version()),
426                    ).increment(1);
427                    (tx, advertiser_addr)
428                }
429                Gossip::Tx(tx) => {
430                    metrics::counter!(
431                        "mempool.pushed.transactions.total",
432                        "version" => format!("{}",tx.transaction.version()),
433                    ).increment(1);
434                    (tx, None)
435                }
436            };
437
438            trace!(?txid, "got tx");
439
440            let result = verifier
441                .oneshot(tx::Request::Mempool {
442                    transaction: tx.clone(),
443                    height: next_height,
444                })
445                .map_ok(|rsp| {
446                    let tx::Response::Mempool { transaction, spent_mempool_outpoints } = rsp else {
447                        panic!("unexpected non-mempool response to mempool request")
448                    };
449
450                    (transaction, spent_mempool_outpoints, tip_height)
451                })
452                .await;
453
454            // Hide the transaction data to avoid filling the logs
455            trace!(?txid, result = ?result.as_ref().map(|_tx| ()), "verified transaction for the mempool");
456
457            result.map_err(|e| TransactionDownloadVerifyError::Invalid { error: e.into(), advertiser_addr } )
458        }
459        .map_ok(|(tx, spent_mempool_outpoints, tip_height)| {
460            metrics::counter!(
461                "mempool.verified.transactions.total",
462                "version" => format!("{}", tx.transaction.transaction.version()),
463            ).increment(1);
464            (tx, spent_mempool_outpoints, tip_height)
465        })
466        // Tack the hash onto the error so we can remove the cancel handle
467        // on failure as well as on success.
468        .map_err(move |e| Box::new((e, txid)))
469        .inspect(move |result| {
470            // Hide the transaction data to avoid filling the logs
471            let result = result.as_ref().map(|_tx| txid);
472            debug!("mempool transaction result: {result:?}");
473        })
474        .in_current_span();
475
476        let task = tokio::spawn(async move {
477            let fut = tokio::time::timeout(RATE_LIMIT_DELAY, fut);
478
479            // Prefer the cancel handle if both are ready.
480            let result = tokio::select! {
481                biased;
482                _ = &mut cancel_rx => {
483                    trace!("task cancelled prior to completion");
484                    metrics::counter!("mempool.cancelled.verify.tasks.total").increment(1);
485                    if let Some(rsp_tx) = rsp_tx.take() {
486                        let _ = rsp_tx.send(Err("verification cancelled".into()));
487                    }
488
489                    Ok(Err(Box::new((TransactionDownloadVerifyError::Cancelled, txid))))
490                }
491                verification = fut => {
492                    verification
493                        .inspect_err(|_elapsed| {
494                            if let Some(rsp_tx) = rsp_tx.take() {
495                                let _ = rsp_tx.send(Err("timeout waiting for verification result".into()));
496                            }
497                        })
498                        .map_err(|elapsed| (txid, elapsed))
499                        .map(|inner_result| {
500                            match inner_result {
501                                Ok((transaction, spent_mempool_outpoints, tip_height)) => Ok((transaction, spent_mempool_outpoints, tip_height, rsp_tx)),
502                                Err(boxed_err) => {
503                                    let (tx_verifier_error, tx_id) = *boxed_err;
504                                    if let Some(rsp_tx) = rsp_tx.take() {
505                                        let error_msg = format!(
506                                            "failed to validate tx: {tx_id}, error: {tx_verifier_error}"
507                                        );
508                                        let _ = rsp_tx.send(Err(error_msg.into()));
509                                    };
510
511                                    Err(Box::new((tx_verifier_error, tx_id)))
512                                }
513                            }
514                        })
515                },
516            };
517
518            result
519        });
520
521        self.pending.push(task);
522        assert!(
523            self.cancel_handles
524                .insert(txid, (cancel_tx, gossiped_tx_req, source))
525                .is_none(),
526            "transactions are only queued once"
527        );
528        if let Some(source) = source {
529            // The per-peer cap check above ensures this can't exceed
530            // `MAX_INBOUND_CONCURRENCY_PER_PEER`.
531            *self.pending_per_peer.entry(source).or_insert(0) += 1;
532        }
533
534        debug!(
535            ?txid,
536            queue_len = self.pending.len(),
537            ?MAX_INBOUND_CONCURRENCY,
538            "queued transaction hash for download"
539        );
540        metrics::gauge!("mempool.currently.queued.transactions",).set(self.pending.len() as f64);
541        metrics::counter!("mempool.queued.transactions.total").increment(1);
542
543        Ok(())
544    }
545
546    /// Cancel download/verification tasks of transactions with the
547    /// given transaction hash (see [`UnminedTxId::mined_id`]).
548    pub fn cancel(&mut self, mined_ids: &HashSet<transaction::Hash>) {
549        // TODO: this can be simplified with [`HashMap::drain_filter`] which
550        // is currently nightly-only experimental API.
551        let removed_txids: Vec<UnminedTxId> = self
552            .cancel_handles
553            .keys()
554            .filter(|txid| mined_ids.contains(&txid.mined_id()))
555            .cloned()
556            .collect();
557
558        for txid in removed_txids {
559            if let Some((cancel_tx, _gossip, source)) = self.cancel_handles.remove(&txid) {
560                let _ = cancel_tx.send(CancelDownloadAndVerify);
561                if let Some(source) = source {
562                    Self::release_peer_slot(&mut self.pending_per_peer, source);
563                }
564            }
565        }
566    }
567
568    /// Cancel all running tasks and reset the downloader state.
569    // Note: copied from zebrad/src/components/sync/downloads.rs
570    pub fn cancel_all(&mut self) {
571        // Replace the pending task list with an empty one and drop it.
572        let _ = std::mem::take(&mut self.pending);
573        // Signal cancellation to all running tasks.
574        // Since we already dropped the JoinHandles above, they should
575        // fail silently.
576        for (_hash, (cancel_tx, _gossip, _source)) in self.cancel_handles.drain() {
577            let _ = cancel_tx.send(CancelDownloadAndVerify);
578        }
579        self.pending_per_peer.clear();
580        assert!(self.pending.is_empty());
581        assert!(self.cancel_handles.is_empty());
582        metrics::gauge!("mempool.currently.queued.transactions",).set(self.pending.len() as f64);
583    }
584
585    /// Decrement the per-peer pending count for `source`, removing the entry
586    /// when it reaches zero.
587    fn release_peer_slot(pending_per_peer: &mut HashMap<SocketAddr, usize>, source: SocketAddr) {
588        if let Some(count) = pending_per_peer.get_mut(&source) {
589            *count = count.saturating_sub(1);
590            if *count == 0 {
591                pending_per_peer.remove(&source);
592            }
593        }
594    }
595
596    /// Get the number of currently in-flight download tasks.
597    #[allow(dead_code)]
598    pub fn in_flight(&self) -> usize {
599        self.pending.len()
600    }
601
602    /// Get a list of the currently pending transaction requests.
603    pub fn transaction_requests(&self) -> impl Iterator<Item = &Gossip> {
604        self.cancel_handles
605            .iter()
606            .map(|(_tx_id, (_handle, tx, _source))| tx)
607    }
608
609    /// Check if transaction is already in the best chain.
610    async fn transaction_in_best_chain(
611        state: &mut ZS,
612        txid: UnminedTxId,
613    ) -> Result<(), TransactionDownloadVerifyError> {
614        match state
615            .ready()
616            .await
617            .map_err(CloneError::from)
618            .map_err(TransactionDownloadVerifyError::StateError)?
619            .call(zs::Request::Transaction(txid.mined_id()))
620            .await
621        {
622            Ok(zs::Response::Transaction(None)) => Ok(()),
623            Ok(zs::Response::Transaction(Some(_))) => Err(TransactionDownloadVerifyError::InState),
624            Ok(_) => unreachable!("wrong response"),
625            Err(e) => Err(TransactionDownloadVerifyError::StateError(e.into())),
626        }?;
627
628        Ok(())
629    }
630}
631
632#[pinned_drop]
633impl<ZN, ZV, ZS> PinnedDrop for Downloads<ZN, ZV, ZS>
634where
635    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
636    ZN::Future: Send,
637    ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
638    ZV::Future: Send,
639    ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
640    ZS::Future: Send,
641{
642    fn drop(mut self: Pin<&mut Self>) {
643        self.cancel_all();
644
645        metrics::gauge!("mempool.currently.queued.transactions").set(0 as f64);
646    }
647}