Skip to main content

zebrad/components/
mempool.rs

1//! Zebra mempool.
2//!
3//! A zebrad application component that manages the active collection, reception,
4//! gossip, verification, in-memory storage, eviction, and rejection of unmined Zcash
5//! transactions (those that have not been confirmed in a mined block on the
6//! blockchain).
7//!
8//! Major parts of the mempool include:
9//!  * [Mempool Service][`Mempool`]
10//!    * activates when the syncer is near the chain tip
11//!    * spawns [download and verify tasks][`downloads::Downloads`] for each crawled or gossiped transaction
12//!    * handles in-memory [storage][`storage::Storage`] of unmined transactions
13//!  * [Crawler][`crawler::Crawler`]
14//!    * runs in the background to periodically poll peers for fresh unmined transactions
15//!  * [Queue Checker][`queue_checker::QueueChecker`]
16//!    * runs in the background, polling the mempool to store newly verified transactions
17//!  * [Transaction Gossip Task][`gossip::gossip_mempool_transaction_id`]
18//!    * runs in the background and gossips newly added mempool transactions
19//!      to peers
20
21use std::{
22    collections::HashSet,
23    future::Future,
24    iter,
25    pin::{pin, Pin},
26    task::{Context, Poll},
27};
28
29use futures::{future::FutureExt, stream::Stream};
30use tokio::sync::{broadcast, mpsc, oneshot};
31use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service};
32
33use zebra_chain::{
34    block::{self, Height},
35    chain_sync_status::ChainSyncStatus,
36    chain_tip::ChainTip,
37    transaction::UnminedTxId,
38};
39use zebra_consensus::{error::TransactionError, transaction};
40use zebra_network::{self as zn, PeerSocketAddr};
41use zebra_node_services::mempool::{
42    CreatedOrSpent, Gossip, MempoolChange, MempoolTxSubscriber, Request, Response,
43};
44use zebra_state as zs;
45use zebra_state::{ChainTipChange, TipAction};
46
47use crate::components::sync::SyncStatus;
48
49pub mod config;
50mod crawler;
51pub mod downloads;
52mod error;
53pub mod gossip;
54mod pending_outputs;
55mod queue_checker;
56mod storage;
57
58#[cfg(test)]
59mod tests;
60
61pub use crate::BoxError;
62
63pub use config::Config;
64pub use crawler::Crawler;
65pub use error::MempoolError;
66pub use gossip::gossip_mempool_transaction_id;
67pub use queue_checker::QueueChecker;
68pub use storage::{
69    ExactTipRejectionError, SameEffectsChainRejectionError, SameEffectsTipRejectionError, Storage,
70};
71
72#[cfg(test)]
73pub use self::tests::UnboxMempoolError;
74
75use downloads::{
76    Downloads as TxDownloads, TransactionDownloadVerifyError, TRANSACTION_DOWNLOAD_TIMEOUT,
77    TRANSACTION_VERIFY_TIMEOUT,
78};
79
80type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
81type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
82type TxVerifier = Buffer<
83    BoxService<transaction::Request, transaction::Response, TransactionError>,
84    transaction::Request,
85>;
86type InboundTxDownloads = TxDownloads<Timeout<Outbound>, Timeout<TxVerifier>, State>;
87
88/// The state of the mempool.
89///
90/// Indicates whether it is enabled or disabled and, if enabled, contains
91/// the necessary data to run it.
92//
93// Zebra only has one mempool, so the enum variant size difference doesn't matter.
94#[allow(clippy::large_enum_variant)]
95#[derive(Default)]
96enum ActiveState {
97    /// The Mempool is disabled.
98    #[default]
99    Disabled,
100
101    /// The Mempool is enabled.
102    Enabled {
103        /// The Mempool storage itself.
104        ///
105        /// # Correctness
106        ///
107        /// Only components internal to the [`Mempool`] struct are allowed to
108        /// inject transactions into `storage`, as transactions must be verified beforehand.
109        storage: Storage,
110
111        /// The transaction download and verify stream.
112        tx_downloads: Pin<Box<InboundTxDownloads>>,
113
114        /// Last seen chain tip hash that mempool transactions have been verified against.
115        ///
116        /// In some tests, this is initialized to the latest chain tip, then updated in `poll_ready()` before each request.
117        last_seen_tip_hash: block::Hash,
118    },
119}
120
121impl ActiveState {
122    /// Returns the current state, leaving [`Self::Disabled`] in its place.
123    fn take(&mut self) -> Self {
124        std::mem::take(self)
125    }
126
127    /// Returns a list of requests that will retry every stored and pending transaction.
128    fn transaction_retry_requests(&self) -> Vec<Gossip> {
129        match self {
130            ActiveState::Disabled => Vec::new(),
131            ActiveState::Enabled {
132                storage,
133                tx_downloads,
134                ..
135            } => {
136                let mut transactions = Vec::new();
137
138                let storage = storage
139                    .transactions()
140                    .values()
141                    .map(|tx| tx.transaction.clone().into());
142                transactions.extend(storage);
143
144                let pending = tx_downloads.transaction_requests().cloned();
145                transactions.extend(pending);
146
147                transactions
148            }
149        }
150    }
151
152    /// Returns the number of pending transactions waiting for download or verify,
153    /// or zero if the mempool is disabled.
154    #[cfg(feature = "progress-bar")]
155    fn queued_transaction_count(&self) -> usize {
156        match self {
157            ActiveState::Disabled => 0,
158            ActiveState::Enabled { tx_downloads, .. } => tx_downloads.in_flight(),
159        }
160    }
161
162    /// Returns the number of transactions in storage, or zero if the mempool is disabled.
163    #[cfg(feature = "progress-bar")]
164    fn transaction_count(&self) -> usize {
165        match self {
166            ActiveState::Disabled => 0,
167            ActiveState::Enabled { storage, .. } => storage.transaction_count(),
168        }
169    }
170
171    /// Returns the cost of the transactions in the mempool, according to ZIP-401.
172    /// Returns zero if the mempool is disabled.
173    #[cfg(feature = "progress-bar")]
174    fn total_cost(&self) -> u64 {
175        match self {
176            ActiveState::Disabled => 0,
177            ActiveState::Enabled { storage, .. } => storage.total_cost(),
178        }
179    }
180
181    /// Returns the total serialized size of the verified transactions in the set,
182    /// or zero if the mempool is disabled.
183    ///
184    /// See [`Storage::total_serialized_size()`] for details.
185    #[cfg(feature = "progress-bar")]
186    pub fn total_serialized_size(&self) -> usize {
187        match self {
188            ActiveState::Disabled => 0,
189            ActiveState::Enabled { storage, .. } => storage.total_serialized_size(),
190        }
191    }
192
193    /// Returns the number of rejected transaction hashes in storage,
194    /// or zero if the mempool is disabled.
195    #[cfg(feature = "progress-bar")]
196    fn rejected_transaction_count(&mut self) -> usize {
197        match self {
198            ActiveState::Disabled => 0,
199            ActiveState::Enabled { storage, .. } => storage.rejected_transaction_count(),
200        }
201    }
202}
203
204/// Mempool async management and query service.
205///
206/// The mempool is the set of all verified transactions that this node is aware
207/// of that have yet to be confirmed by the Zcash network. A transaction is
208/// confirmed when it has been included in a block ('mined').
209pub struct Mempool {
210    /// The configurable options for the mempool, persisted between states.
211    config: Config,
212
213    /// The state of the mempool.
214    active_state: ActiveState,
215
216    /// Allows checking if we are near the tip to enable/disable the mempool.
217    sync_status: SyncStatus,
218
219    /// If the state's best chain tip has reached this height, always enable the mempool.
220    debug_enable_at_height: Option<Height>,
221
222    /// Allows efficient access to the best tip of the blockchain.
223    latest_chain_tip: zs::LatestChainTip,
224
225    /// Allows the detection of newly added chain tip blocks,
226    /// and chain tip resets.
227    chain_tip_change: ChainTipChange,
228
229    /// Handle to the outbound service.
230    /// Used to construct the transaction downloader.
231    outbound: Outbound,
232
233    /// Handle to the state service.
234    /// Used to construct the transaction downloader.
235    state: State,
236
237    /// Handle to the transaction verifier service.
238    /// Used to construct the transaction downloader.
239    tx_verifier: TxVerifier,
240
241    /// Sender part of a gossip transactions channel.
242    /// Used to broadcast transaction ids to peers.
243    transaction_sender: broadcast::Sender<MempoolChange>,
244
245    /// Sender for reporting peer addresses that advertised unexpectedly invalid transactions.
246    misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
247
248    // Diagnostics
249    //
250    /// Queued transactions pending download or verification transmitter.
251    /// Only displayed after the mempool's first activation.
252    #[cfg(feature = "progress-bar")]
253    queued_count_bar: Option<howudoin::Tx>,
254
255    /// Number of mempool transactions transmitter.
256    /// Only displayed after the mempool's first activation.
257    #[cfg(feature = "progress-bar")]
258    transaction_count_bar: Option<howudoin::Tx>,
259
260    /// Mempool transaction cost transmitter.
261    /// Only displayed after the mempool's first activation.
262    #[cfg(feature = "progress-bar")]
263    transaction_cost_bar: Option<howudoin::Tx>,
264
265    /// Rejected transactions transmitter.
266    /// Only displayed after the mempool's first activation.
267    #[cfg(feature = "progress-bar")]
268    rejected_count_bar: Option<howudoin::Tx>,
269}
270
271impl Mempool {
272    #[allow(clippy::too_many_arguments)]
273    pub(crate) fn new(
274        config: &Config,
275        outbound: Outbound,
276        state: State,
277        tx_verifier: TxVerifier,
278        sync_status: SyncStatus,
279        latest_chain_tip: zs::LatestChainTip,
280        chain_tip_change: ChainTipChange,
281        misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
282    ) -> (Self, MempoolTxSubscriber) {
283        let (transaction_sender, _) =
284            tokio::sync::broadcast::channel(gossip::MAX_CHANGES_BEFORE_SEND * 2);
285        let transaction_subscriber = MempoolTxSubscriber::new(transaction_sender.clone());
286
287        let mut service = Mempool {
288            config: config.clone(),
289            active_state: ActiveState::Disabled,
290            sync_status,
291            debug_enable_at_height: config.debug_enable_at_height.map(Height),
292            latest_chain_tip,
293            chain_tip_change,
294            outbound,
295            state,
296            tx_verifier,
297            transaction_sender,
298            misbehavior_sender,
299            #[cfg(feature = "progress-bar")]
300            queued_count_bar: None,
301            #[cfg(feature = "progress-bar")]
302            transaction_count_bar: None,
303            #[cfg(feature = "progress-bar")]
304            transaction_cost_bar: None,
305            #[cfg(feature = "progress-bar")]
306            rejected_count_bar: None,
307        };
308
309        // Make sure `is_enabled` is accurate.
310        // Otherwise, it is only updated in `poll_ready`, right before each service call.
311        service.update_state(None);
312
313        (service, transaction_subscriber)
314    }
315
316    /// Is the mempool enabled by a debug config option?
317    fn is_enabled_by_debug(&self) -> bool {
318        let mut is_debug_enabled = false;
319
320        // optimise non-debug performance
321        if self.debug_enable_at_height.is_none() {
322            return is_debug_enabled;
323        }
324
325        let enable_at_height = self
326            .debug_enable_at_height
327            .expect("unexpected debug_enable_at_height: just checked for None");
328
329        if let Some(best_tip_height) = self.latest_chain_tip.best_tip_height() {
330            is_debug_enabled = best_tip_height >= enable_at_height;
331
332            if is_debug_enabled && !self.is_enabled() {
333                info!(
334                    ?best_tip_height,
335                    ?enable_at_height,
336                    "enabling mempool for debugging"
337                );
338            }
339        }
340
341        is_debug_enabled
342    }
343
344    /// Update the mempool state (enabled / disabled) depending on how close to
345    /// the tip is the synchronization, including side effects to state changes.
346    ///
347    /// Accepts an optional [`TipAction`] for setting the `last_seen_tip_hash` field
348    /// when enabling the mempool state, it will not enable the mempool if this is None.
349    ///
350    /// Returns `true` if the state changed.
351    fn update_state(&mut self, tip_action: Option<&TipAction>) -> bool {
352        let is_close_to_tip = self.sync_status.is_close_to_tip() || self.is_enabled_by_debug();
353
354        match (is_close_to_tip, self.is_enabled(), tip_action) {
355            // the active state is up to date, or there is no tip action to activate the mempool
356            (false, false, _) | (true, true, _) | (true, false, None) => return false,
357
358            // Enable state - there should be a chain tip when Zebra is close to the network tip
359            (true, false, Some(tip_action)) => {
360                let (last_seen_tip_hash, tip_height) = tip_action.best_tip_hash_and_height();
361
362                info!(?tip_height, "activating mempool: Zebra is close to the tip");
363
364                let tx_downloads = Box::pin(TxDownloads::new(
365                    Timeout::new(self.outbound.clone(), TRANSACTION_DOWNLOAD_TIMEOUT),
366                    Timeout::new(self.tx_verifier.clone(), TRANSACTION_VERIFY_TIMEOUT),
367                    self.state.clone(),
368                ));
369                self.active_state = ActiveState::Enabled {
370                    storage: storage::Storage::new(&self.config),
371                    tx_downloads,
372                    last_seen_tip_hash,
373                };
374            }
375
376            // Disable state
377            (false, true, _) => {
378                info!(
379                    tip_height = ?self.latest_chain_tip.best_tip_height(),
380                    "deactivating mempool: Zebra is syncing lots of blocks"
381                );
382
383                // This drops the previous ActiveState::Enabled, cancelling its download tasks.
384                // We don't preserve the previous transactions, because we are syncing lots of blocks.
385                self.active_state = ActiveState::Disabled;
386            }
387        };
388
389        true
390    }
391
392    /// Return whether the mempool is enabled or not.
393    pub fn is_enabled(&self) -> bool {
394        match self.active_state {
395            ActiveState::Disabled => false,
396            ActiveState::Enabled { .. } => true,
397        }
398    }
399
400    /// Remove expired transaction ids from a given list of inserted ones.
401    fn remove_expired_from_peer_list(
402        send_to_peers_ids: &HashSet<UnminedTxId>,
403        expired_transactions: &HashSet<UnminedTxId>,
404    ) -> HashSet<UnminedTxId> {
405        send_to_peers_ids
406            .iter()
407            .filter(|id| !expired_transactions.contains(id))
408            .copied()
409            .collect()
410    }
411
412    /// Update metrics for the mempool.
413    fn update_metrics(&mut self) {
414        // Shutdown if needed
415        #[cfg(feature = "progress-bar")]
416        if matches!(howudoin::cancelled(), Some(true)) {
417            self.disable_metrics();
418            return;
419        }
420
421        // Initialize if just activated
422        #[cfg(feature = "progress-bar")]
423        if self.is_enabled()
424            && (self.queued_count_bar.is_none()
425                || self.transaction_count_bar.is_none()
426                || self.transaction_cost_bar.is_none()
427                || self.rejected_count_bar.is_none())
428        {
429            let _max_transaction_count = self.config.tx_cost_limit
430                / zebra_chain::transaction::MEMPOOL_TRANSACTION_COST_THRESHOLD;
431
432            let transaction_count_bar = *howudoin::new_root()
433                .label("Mempool Transactions")
434                .set_pos(0u64);
435            // .set_len(max_transaction_count);
436
437            let transaction_cost_bar = howudoin::new_with_parent(transaction_count_bar.id())
438                .label("Mempool Cost")
439                .set_pos(0u64)
440                // .set_len(self.config.tx_cost_limit)
441                .fmt_as_bytes(true);
442
443            let queued_count_bar = *howudoin::new_with_parent(transaction_cost_bar.id())
444                .label("Mempool Queue")
445                .set_pos(0u64);
446            // .set_len(
447            //     u64::try_from(downloads::MAX_INBOUND_CONCURRENCY).expect("fits in u64"),
448            // );
449
450            let rejected_count_bar = *howudoin::new_with_parent(queued_count_bar.id())
451                .label("Mempool Rejects")
452                .set_pos(0u64);
453            // .set_len(
454            //     u64::try_from(storage::MAX_EVICTION_MEMORY_ENTRIES).expect("fits in u64"),
455            // );
456
457            self.transaction_count_bar = Some(transaction_count_bar);
458            self.transaction_cost_bar = Some(transaction_cost_bar);
459            self.queued_count_bar = Some(queued_count_bar);
460            self.rejected_count_bar = Some(rejected_count_bar);
461        }
462
463        // Update if the mempool has ever been active
464        #[cfg(feature = "progress-bar")]
465        if let (
466            Some(queued_count_bar),
467            Some(transaction_count_bar),
468            Some(transaction_cost_bar),
469            Some(rejected_count_bar),
470        ) = (
471            self.queued_count_bar,
472            self.transaction_count_bar,
473            self.transaction_cost_bar,
474            self.rejected_count_bar,
475        ) {
476            let queued_count = self.active_state.queued_transaction_count();
477            let transaction_count = self.active_state.transaction_count();
478
479            let transaction_cost = self.active_state.total_cost();
480            let transaction_size = self.active_state.total_serialized_size();
481            let transaction_size =
482                indicatif::HumanBytes(transaction_size.try_into().expect("fits in u64"));
483
484            let rejected_count = self.active_state.rejected_transaction_count();
485
486            queued_count_bar.set_pos(u64::try_from(queued_count).expect("fits in u64"));
487
488            transaction_count_bar.set_pos(u64::try_from(transaction_count).expect("fits in u64"));
489
490            // Display the cost and cost limit, with the actual size as a description.
491            //
492            // Costs can be much higher than the transaction size due to the
493            // MEMPOOL_TRANSACTION_COST_THRESHOLD minimum cost.
494            transaction_cost_bar
495                .set_pos(transaction_cost)
496                .desc(format!("Actual size {transaction_size}"));
497
498            rejected_count_bar.set_pos(u64::try_from(rejected_count).expect("fits in u64"));
499        }
500    }
501
502    /// Disable metrics for the mempool.
503    fn disable_metrics(&self) {
504        #[cfg(feature = "progress-bar")]
505        {
506            if let Some(bar) = self.queued_count_bar {
507                bar.close()
508            }
509            if let Some(bar) = self.transaction_count_bar {
510                bar.close()
511            }
512            if let Some(bar) = self.transaction_cost_bar {
513                bar.close()
514            }
515            if let Some(bar) = self.rejected_count_bar {
516                bar.close()
517            }
518        }
519    }
520}
521
522impl Service<Request> for Mempool {
523    type Response = Response;
524    type Error = BoxError;
525    type Future =
526        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
527
528    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
529        let tip_action = self.chain_tip_change.last_tip_change();
530
531        // TODO: Consider broadcasting a `MempoolChange` when the mempool is disabled.
532        let is_state_changed = self.update_state(tip_action.as_ref());
533
534        tracing::trace!(is_enabled = ?self.is_enabled(), ?is_state_changed, "started polling the mempool...");
535
536        // When the mempool is disabled we still return that the service is ready.
537        // Otherwise, callers could block waiting for the mempool to be enabled.
538        if !self.is_enabled() {
539            self.update_metrics();
540
541            return Poll::Ready(Ok(()));
542        }
543
544        // Clear the mempool and cancel downloads if there has been a chain tip reset.
545        //
546        // But if the mempool was just freshly enabled,
547        // skip resetting and removing mined transactions for this tip.
548        if !is_state_changed && matches!(tip_action, Some(TipAction::Reset { .. })) {
549            info!(
550                tip_height = ?tip_action.as_ref().unwrap().best_tip_height(),
551                "resetting mempool: switched best chain, skipped blocks, or activated network upgrade"
552            );
553
554            let previous_state = self.active_state.take();
555            let tx_retries = previous_state.transaction_retry_requests();
556
557            // Use the same code for dropping and resetting the mempool,
558            // to avoid subtle bugs.
559            //
560            // Drop the current contents of the state,
561            // cancelling any pending download tasks,
562            // and dropping completed verification results.
563            std::mem::drop(previous_state);
564
565            // Re-initialise an empty state.
566            self.update_state(tip_action.as_ref());
567
568            // Re-verify the transactions that were pending or valid at the previous tip.
569            // This saves us the time and data needed to re-download them.
570            if let ActiveState::Enabled { tx_downloads, .. } = &mut self.active_state {
571                info!(
572                    transactions = tx_retries.len(),
573                    "re-verifying mempool transactions after a chain fork"
574                );
575
576                for tx in tx_retries {
577                    // This is just an efficiency optimisation, so we don't care if queueing
578                    // transaction requests fails.
579                    let _result = tx_downloads.download_if_needed_and_verify(tx, None);
580                }
581            }
582
583            self.update_metrics();
584
585            return Poll::Ready(Ok(()));
586        }
587
588        if let ActiveState::Enabled {
589            storage,
590            tx_downloads,
591            last_seen_tip_hash,
592        } = &mut self.active_state
593        {
594            // Collect inserted transaction ids.
595            let mut send_to_peers_ids = HashSet::<_>::new();
596            let mut invalidated_ids = HashSet::<_>::new();
597            let mut mined_mempool_ids = HashSet::<_>::new();
598
599            let best_tip_height = self.latest_chain_tip.best_tip_height();
600
601            // Clean up completed download tasks and add to mempool if successful.
602            while let Poll::Ready(Some(result)) = pin!(&mut *tx_downloads).poll_next(cx) {
603                match result {
604                    Ok(Ok((tx, spent_mempool_outpoints, expected_tip_height, rsp_tx))) => {
605                        // # Correctness:
606                        //
607                        // It's okay to use tip height here instead of the tip hash since
608                        // chain_tip_change.last_tip_change() returns a `TipAction::Reset` when
609                        // the best chain changes (which is the only way to stay at the same height), and the
610                        // mempool re-verifies all pending tx_downloads when there's a `TipAction::Reset`.
611                        if best_tip_height == expected_tip_height {
612                            let tx_id = tx.transaction.id;
613                            let insert_result =
614                                storage.insert(tx, spent_mempool_outpoints, best_tip_height);
615
616                            tracing::trace!(
617                                ?insert_result,
618                                "got Ok(_) transaction verify, tried to store",
619                            );
620
621                            if let Ok(inserted_id) = insert_result {
622                                // Save transaction ids that we will send to peers
623                                send_to_peers_ids.insert(inserted_id);
624                            } else {
625                                invalidated_ids.insert(tx_id);
626                            }
627
628                            // Send the result to responder channel if one was provided.
629                            if let Some(rsp_tx) = rsp_tx {
630                                let _ = rsp_tx
631                                    .send(insert_result.map(|_| ()).map_err(|err| err.into()));
632                            }
633                        } else {
634                            tracing::trace!("chain grew during tx verification, retrying ..",);
635
636                            // We don't care if re-queueing the transaction request fails.
637                            let _result = tx_downloads
638                                .download_if_needed_and_verify(tx.transaction.into(), rsp_tx);
639                        }
640                    }
641                    Ok(Err(boxed_err)) => {
642                        let (tx_id, error) = *boxed_err;
643                        if let TransactionDownloadVerifyError::Invalid {
644                            error,
645                            advertiser_addr: Some(advertiser_addr),
646                        } = &error
647                        {
648                            if error.mempool_misbehavior_score() != 0 {
649                                let _ = self.misbehavior_sender.try_send((
650                                    *advertiser_addr,
651                                    error.mempool_misbehavior_score(),
652                                ));
653                            }
654                        };
655
656                        tracing::debug!(?tx_id, ?error, "mempool transaction failed to verify");
657
658                        metrics::counter!("mempool.failed.verify.tasks.total", "reason" => error.to_string()).increment(1);
659
660                        invalidated_ids.insert(tx_id);
661                        storage.reject_if_needed(tx_id, error);
662                    }
663                    Err(_elapsed) => {
664                        // A timeout happens when the stream hangs waiting for another service,
665                        // so there is no specific transaction ID.
666
667                        // TODO: Return the transaction id that timed out during verification so it can be
668                        //       included in the list of invalidated transactions and change `warn!` to `info!`.
669                        tracing::warn!("mempool transaction failed to verify due to timeout");
670
671                        metrics::counter!("mempool.failed.verify.tasks.total", "reason" => "timeout").increment(1);
672                    }
673                };
674            }
675
676            // Handle best chain tip changes
677            if let Some(TipAction::Grow { block }) = tip_action {
678                tracing::trace!(block_height = ?block.height, "handling blocks added to tip");
679                *last_seen_tip_hash = block.hash;
680
681                // Cancel downloads/verifications/storage of transactions
682                // with the same mined IDs as recently mined transactions.
683                let mined_ids = block.transaction_hashes.iter().cloned().collect();
684                tx_downloads.cancel(&mined_ids);
685                storage.clear_mined_dependencies(&mined_ids);
686
687                let storage::RemovedTransactionIds { mined, invalidated } =
688                    storage.reject_and_remove_same_effects(&mined_ids, block.transactions);
689
690                // Clear any transaction rejections if they might have become valid after
691                // the new block was added to the tip.
692                storage.clear_tip_rejections();
693
694                mined_mempool_ids.extend(mined);
695                invalidated_ids.extend(invalidated);
696            }
697
698            // Remove expired transactions from the mempool.
699            //
700            // Lock times never expire, because block times are strictly increasing.
701            // So we don't need to check them here.
702            if let Some(tip_height) = best_tip_height {
703                let expired_transactions = storage.remove_expired_transactions(tip_height);
704                // Remove transactions that are expired from the peers list
705                send_to_peers_ids =
706                    Self::remove_expired_from_peer_list(&send_to_peers_ids, &expired_transactions);
707
708                if !expired_transactions.is_empty() {
709                    tracing::debug!(
710                        ?expired_transactions,
711                        "removed expired transactions from the mempool",
712                    );
713
714                    invalidated_ids.extend(expired_transactions);
715                }
716            }
717
718            // Send transactions that were not rejected nor expired to peers and RPC listeners.
719            if !send_to_peers_ids.is_empty() {
720                tracing::trace!(
721                    ?send_to_peers_ids,
722                    "sending new transactions to peers and RPC listeners"
723                );
724
725                self.transaction_sender
726                    .send(MempoolChange::added(send_to_peers_ids))?;
727            }
728
729            // Send transactions that were rejected to RPC listeners.
730            if !invalidated_ids.is_empty() {
731                tracing::trace!(
732                    ?invalidated_ids,
733                    "sending invalidated transactions to RPC listeners"
734                );
735
736                self.transaction_sender
737                    .send(MempoolChange::invalidated(invalidated_ids))?;
738            }
739
740            // Send transactions that were mined onto the best chain to RPC listeners.
741            if !mined_mempool_ids.is_empty() {
742                tracing::trace!(
743                    ?mined_mempool_ids,
744                    "sending mined transactions to RPC listeners"
745                );
746
747                self.transaction_sender
748                    .send(MempoolChange::mined(mined_mempool_ids))?;
749            }
750        }
751
752        self.update_metrics();
753
754        Poll::Ready(Ok(()))
755    }
756
757    /// Call the mempool service.
758    ///
759    /// Errors indicate that the peer has done something wrong or unexpected,
760    /// and will cause callers to disconnect from the remote peer.
761    #[instrument(name = "mempool", skip(self, req))]
762    fn call(&mut self, req: Request) -> Self::Future {
763        match &mut self.active_state {
764            ActiveState::Enabled {
765                storage,
766                tx_downloads,
767                last_seen_tip_hash,
768            } => match req {
769                // Queries
770                Request::TransactionIds => {
771                    trace!(?req, "got mempool request");
772
773                    let res: HashSet<_> = storage.tx_ids().collect();
774
775                    trace!(?req, res_count = ?res.len(), "answered mempool request");
776
777                    async move { Ok(Response::TransactionIds(res)) }.boxed()
778                }
779
780                Request::TransactionsById(ref ids) => {
781                    trace!(?req, "got mempool request");
782
783                    let res: Vec<_> = storage.transactions_exact(ids.clone()).cloned().collect();
784
785                    trace!(?req, res_count = ?res.len(), "answered mempool request");
786
787                    async move { Ok(Response::Transactions(res)) }.boxed()
788                }
789                Request::TransactionsByMinedId(ref ids) => {
790                    trace!(?req, "got mempool request");
791
792                    let res: Vec<_> = storage
793                        .transactions_same_effects(ids.clone())
794                        .cloned()
795                        .collect();
796
797                    trace!(?req, res_count = ?res.len(), "answered mempool request");
798
799                    async move { Ok(Response::Transactions(res)) }.boxed()
800                }
801                Request::TransactionWithDepsByMinedId(tx_id) => {
802                    trace!(?req, "got mempool request");
803
804                    let res = if let Some((transaction, dependencies)) =
805                        storage.transaction_with_deps(tx_id)
806                    {
807                        Ok(Response::TransactionWithDeps {
808                            transaction,
809                            dependencies,
810                        })
811                    } else {
812                        Err("transaction not found in mempool".into())
813                    };
814
815                    trace!(?req, ?res, "answered mempool request");
816
817                    async move { res }.boxed()
818                }
819
820                Request::AwaitOutput(outpoint) => {
821                    trace!(?req, "got mempool request");
822
823                    let response_fut = storage.pending_outputs.queue(outpoint);
824
825                    if let Some(output) = storage.created_output(&outpoint) {
826                        storage.pending_outputs.respond(&outpoint, output)
827                    }
828
829                    trace!("answered mempool request");
830
831                    response_fut.boxed()
832                }
833
834                Request::FullTransactions => {
835                    trace!(?req, "got mempool request");
836
837                    let transactions: Vec<_> = storage.transactions().values().cloned().collect();
838                    let transaction_dependencies = storage.transaction_dependencies().clone();
839
840                    trace!(?req, transactions_count = ?transactions.len(), "answered mempool request");
841
842                    let response = Response::FullTransactions {
843                        transactions,
844                        transaction_dependencies,
845                        last_seen_tip_hash: *last_seen_tip_hash,
846                    };
847
848                    async move { Ok(response) }.boxed()
849                }
850
851                Request::RejectedTransactionIds(ref ids) => {
852                    trace!(?req, "got mempool request");
853
854                    let res = storage.rejected_transactions(ids.clone()).collect();
855
856                    trace!(?req, ?res, "answered mempool request");
857
858                    async move { Ok(Response::RejectedTransactionIds(res)) }.boxed()
859                }
860
861                // Queue mempool candidates
862                Request::Queue(gossiped_txs) => {
863                    trace!(req_count = ?gossiped_txs.len(), "got mempool Queue request");
864
865                    let rsp: Vec<Result<oneshot::Receiver<Result<(), BoxError>>, BoxError>> =
866                        gossiped_txs
867                            .into_iter()
868                            .map(
869                                |gossiped_tx| -> Result<
870                                    oneshot::Receiver<Result<(), BoxError>>,
871                                    MempoolError,
872                                > {
873                                    let (rsp_tx, rsp_rx) = oneshot::channel();
874                                    storage.should_download_or_verify(gossiped_tx.id())?;
875                                    tx_downloads
876                                        .download_if_needed_and_verify(gossiped_tx, Some(rsp_tx))?;
877
878                                    Ok(rsp_rx)
879                                },
880                            )
881                            .map(|result| result.map_err(BoxError::from))
882                            .collect();
883
884                    // We've added transactions to the queue
885                    self.update_metrics();
886
887                    async move { Ok(Response::Queued(rsp)) }.boxed()
888                }
889
890                // Store successfully downloaded and verified transactions in the mempool
891                Request::CheckForVerifiedTransactions => {
892                    trace!(?req, "got mempool request");
893
894                    // all the work for this request is done in poll_ready
895                    async move { Ok(Response::CheckedForVerifiedTransactions) }.boxed()
896                }
897
898                // Summary statistics for the mempool: count, total size, and memory usage.
899                //
900                // Used by the `getmempoolinfo` RPC method
901                Request::QueueStats => {
902                    trace!(?req, "got mempool request");
903
904                    let size = storage.transaction_count();
905
906                    let bytes = storage.total_serialized_size();
907
908                    let usage = bytes; // TODO: Placeholder, should be fixed later
909
910                    // TODO: Set to Some(true) on regtest once network info is available.
911                    let fully_notified = None;
912
913                    trace!(size, bytes, usage, "answered mempool request");
914
915                    async move {
916                        Ok(Response::QueueStats {
917                            size,
918                            bytes,
919                            usage,
920                            fully_notified,
921                        })
922                    }
923                    .boxed()
924                }
925                Request::UnspentOutput(outpoint) => {
926                    trace!(?req, "got mempool request");
927
928                    if storage.has_spent_outpoint(&outpoint) {
929                        trace!(?req, "answered mempool request");
930
931                        return async move {
932                            Ok(Response::TransparentOutput(Some(CreatedOrSpent::Spent)))
933                        }
934                        .boxed();
935                    }
936
937                    if let Some((tx_version, output)) = storage
938                        .transactions()
939                        .get(&outpoint.hash)
940                        .map(|tx| tx.transaction.transaction.clone())
941                        .and_then(|tx| {
942                            tx.outputs()
943                                .get(outpoint.index as usize)
944                                .map(|output| (tx.version(), output.clone()))
945                        })
946                    {
947                        trace!(?req, "answered mempool request");
948
949                        let last_seen_hash = *last_seen_tip_hash;
950                        return async move {
951                            Ok(Response::TransparentOutput(Some(CreatedOrSpent::Created {
952                                output,
953                                tx_version,
954                                last_seen_hash,
955                            })))
956                        }
957                        .boxed();
958                    }
959
960                    trace!(?req, "answered mempool request");
961
962                    async move { Ok(Response::TransparentOutput(None)) }.boxed()
963                }
964            },
965            ActiveState::Disabled => {
966                // TODO: add the name of the request, but not the content,
967                //       like the command() or Display impls of network requests
968                trace!("got mempool request while mempool is disabled");
969
970                // We can't return an error since that will cause a disconnection
971                // by the peer connection handler. Therefore, return successful
972                // empty responses.
973
974                let resp = match req {
975                    // Return empty responses for queries.
976                    Request::TransactionIds => Response::TransactionIds(Default::default()),
977
978                    Request::TransactionsById(_) => Response::Transactions(Default::default()),
979                    Request::TransactionsByMinedId(_) => Response::Transactions(Default::default()),
980                    Request::TransactionWithDepsByMinedId(_)
981                    | Request::AwaitOutput(_)
982                    | Request::UnspentOutput(_) => {
983                        return async move {
984                            Err("mempool is not active: wait for Zebra to sync to the tip".into())
985                        }
986                        .boxed()
987                    }
988
989                    Request::FullTransactions => {
990                        return async move {
991                            Err("mempool is not active: wait for Zebra to sync to the tip".into())
992                        }
993                        .boxed()
994                    }
995
996                    Request::RejectedTransactionIds(_) => {
997                        Response::RejectedTransactionIds(Default::default())
998                    }
999
1000                    // Don't queue mempool candidates, because there is no queue.
1001                    Request::Queue(gossiped_txs) => Response::Queued(
1002                        // Special case; we can signal the error inside the response,
1003                        // because the inbound service ignores inner errors.
1004                        iter::repeat_n(MempoolError::Disabled, gossiped_txs.len())
1005                            .map(BoxError::from)
1006                            .map(Err)
1007                            .collect(),
1008                    ),
1009
1010                    // Check if the mempool should be enabled.
1011                    // This request makes sure mempools are debug-enabled in the acceptance tests.
1012                    Request::CheckForVerifiedTransactions => {
1013                        // all the work for this request is done in poll_ready
1014                        Response::CheckedForVerifiedTransactions
1015                    }
1016
1017                    // Return empty mempool stats
1018                    Request::QueueStats => Response::QueueStats {
1019                        size: 0,
1020                        bytes: 0,
1021                        usage: 0,
1022                        fully_notified: None,
1023                    },
1024                };
1025
1026                async move { Ok(resp) }.boxed()
1027            }
1028        }
1029    }
1030}
1031
1032impl Drop for Mempool {
1033    fn drop(&mut self) {
1034        self.disable_metrics();
1035    }
1036}