1use std::{
22 collections::HashSet,
23 future::Future,
24 iter,
25 pin::{pin, Pin},
26 task::{Context, Poll},
27};
28
29use futures::{future::FutureExt, stream::Stream};
30use tokio::sync::{broadcast, mpsc, oneshot};
31use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service};
32
33use zebra_chain::{
34 block::{self, Height},
35 chain_sync_status::ChainSyncStatus,
36 chain_tip::ChainTip,
37 transaction::UnminedTxId,
38};
39use zebra_consensus::{error::TransactionError, transaction};
40use zebra_network::{self as zn, PeerSocketAddr};
41use zebra_node_services::mempool::{
42 CreatedOrSpent, Gossip, MempoolChange, MempoolTxSubscriber, Request, Response,
43};
44use zebra_state as zs;
45use zebra_state::{ChainTipChange, TipAction};
46
47use crate::components::sync::SyncStatus;
48
49pub mod config;
50mod crawler;
51pub mod downloads;
52mod error;
53pub mod gossip;
54mod pending_outputs;
55mod queue_checker;
56mod storage;
57
58#[cfg(test)]
59mod tests;
60
61pub use crate::BoxError;
62
63pub use config::Config;
64pub use crawler::Crawler;
65pub use error::MempoolError;
66pub use gossip::gossip_mempool_transaction_id;
67pub use queue_checker::QueueChecker;
68pub use storage::{
69 ExactTipRejectionError, SameEffectsChainRejectionError, SameEffectsTipRejectionError, Storage,
70};
71
72#[cfg(test)]
73pub use self::tests::UnboxMempoolError;
74
75use downloads::{
76 Downloads as TxDownloads, TransactionDownloadVerifyError, TRANSACTION_DOWNLOAD_TIMEOUT,
77 TRANSACTION_VERIFY_TIMEOUT,
78};
79
80type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
81type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
82type TxVerifier = Buffer<
83 BoxService<transaction::Request, transaction::Response, TransactionError>,
84 transaction::Request,
85>;
86type InboundTxDownloads = TxDownloads<Timeout<Outbound>, Timeout<TxVerifier>, State>;
87
88#[allow(clippy::large_enum_variant)]
95#[derive(Default)]
96enum ActiveState {
97 #[default]
99 Disabled,
100
101 Enabled {
103 storage: Storage,
110
111 tx_downloads: Pin<Box<InboundTxDownloads>>,
113
114 last_seen_tip_hash: block::Hash,
118 },
119}
120
121impl ActiveState {
122 fn take(&mut self) -> Self {
124 std::mem::take(self)
125 }
126
127 fn transaction_retry_requests(&self) -> Vec<Gossip> {
129 match self {
130 ActiveState::Disabled => Vec::new(),
131 ActiveState::Enabled {
132 storage,
133 tx_downloads,
134 ..
135 } => {
136 let mut transactions = Vec::new();
137
138 let storage = storage
139 .transactions()
140 .values()
141 .map(|tx| tx.transaction.clone().into());
142 transactions.extend(storage);
143
144 let pending = tx_downloads.transaction_requests().cloned();
145 transactions.extend(pending);
146
147 transactions
148 }
149 }
150 }
151
152 #[cfg(feature = "progress-bar")]
155 fn queued_transaction_count(&self) -> usize {
156 match self {
157 ActiveState::Disabled => 0,
158 ActiveState::Enabled { tx_downloads, .. } => tx_downloads.in_flight(),
159 }
160 }
161
162 #[cfg(feature = "progress-bar")]
164 fn transaction_count(&self) -> usize {
165 match self {
166 ActiveState::Disabled => 0,
167 ActiveState::Enabled { storage, .. } => storage.transaction_count(),
168 }
169 }
170
171 #[cfg(feature = "progress-bar")]
174 fn total_cost(&self) -> u64 {
175 match self {
176 ActiveState::Disabled => 0,
177 ActiveState::Enabled { storage, .. } => storage.total_cost(),
178 }
179 }
180
181 #[cfg(feature = "progress-bar")]
186 pub fn total_serialized_size(&self) -> usize {
187 match self {
188 ActiveState::Disabled => 0,
189 ActiveState::Enabled { storage, .. } => storage.total_serialized_size(),
190 }
191 }
192
193 #[cfg(feature = "progress-bar")]
196 fn rejected_transaction_count(&mut self) -> usize {
197 match self {
198 ActiveState::Disabled => 0,
199 ActiveState::Enabled { storage, .. } => storage.rejected_transaction_count(),
200 }
201 }
202}
203
204pub struct Mempool {
210 config: Config,
212
213 active_state: ActiveState,
215
216 sync_status: SyncStatus,
218
219 debug_enable_at_height: Option<Height>,
221
222 latest_chain_tip: zs::LatestChainTip,
224
225 chain_tip_change: ChainTipChange,
228
229 outbound: Outbound,
232
233 state: State,
236
237 tx_verifier: TxVerifier,
240
241 transaction_sender: broadcast::Sender<MempoolChange>,
244
245 misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
247
248 #[cfg(feature = "progress-bar")]
253 queued_count_bar: Option<howudoin::Tx>,
254
255 #[cfg(feature = "progress-bar")]
258 transaction_count_bar: Option<howudoin::Tx>,
259
260 #[cfg(feature = "progress-bar")]
263 transaction_cost_bar: Option<howudoin::Tx>,
264
265 #[cfg(feature = "progress-bar")]
268 rejected_count_bar: Option<howudoin::Tx>,
269}
270
271impl Mempool {
272 #[allow(clippy::too_many_arguments)]
273 pub(crate) fn new(
274 config: &Config,
275 outbound: Outbound,
276 state: State,
277 tx_verifier: TxVerifier,
278 sync_status: SyncStatus,
279 latest_chain_tip: zs::LatestChainTip,
280 chain_tip_change: ChainTipChange,
281 misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
282 ) -> (Self, MempoolTxSubscriber) {
283 let (transaction_sender, _) =
284 tokio::sync::broadcast::channel(gossip::MAX_CHANGES_BEFORE_SEND * 2);
285 let transaction_subscriber = MempoolTxSubscriber::new(transaction_sender.clone());
286
287 let mut service = Mempool {
288 config: config.clone(),
289 active_state: ActiveState::Disabled,
290 sync_status,
291 debug_enable_at_height: config.debug_enable_at_height.map(Height),
292 latest_chain_tip,
293 chain_tip_change,
294 outbound,
295 state,
296 tx_verifier,
297 transaction_sender,
298 misbehavior_sender,
299 #[cfg(feature = "progress-bar")]
300 queued_count_bar: None,
301 #[cfg(feature = "progress-bar")]
302 transaction_count_bar: None,
303 #[cfg(feature = "progress-bar")]
304 transaction_cost_bar: None,
305 #[cfg(feature = "progress-bar")]
306 rejected_count_bar: None,
307 };
308
309 service.update_state(None);
312
313 (service, transaction_subscriber)
314 }
315
316 fn is_enabled_by_debug(&self) -> bool {
318 let mut is_debug_enabled = false;
319
320 if self.debug_enable_at_height.is_none() {
322 return is_debug_enabled;
323 }
324
325 let enable_at_height = self
326 .debug_enable_at_height
327 .expect("unexpected debug_enable_at_height: just checked for None");
328
329 if let Some(best_tip_height) = self.latest_chain_tip.best_tip_height() {
330 is_debug_enabled = best_tip_height >= enable_at_height;
331
332 if is_debug_enabled && !self.is_enabled() {
333 info!(
334 ?best_tip_height,
335 ?enable_at_height,
336 "enabling mempool for debugging"
337 );
338 }
339 }
340
341 is_debug_enabled
342 }
343
344 fn update_state(&mut self, tip_action: Option<&TipAction>) -> bool {
352 let is_close_to_tip = self.sync_status.is_close_to_tip() || self.is_enabled_by_debug();
353
354 match (is_close_to_tip, self.is_enabled(), tip_action) {
355 (false, false, _) | (true, true, _) | (true, false, None) => return false,
357
358 (true, false, Some(tip_action)) => {
360 let (last_seen_tip_hash, tip_height) = tip_action.best_tip_hash_and_height();
361
362 info!(?tip_height, "activating mempool: Zebra is close to the tip");
363
364 let tx_downloads = Box::pin(TxDownloads::new(
365 Timeout::new(self.outbound.clone(), TRANSACTION_DOWNLOAD_TIMEOUT),
366 Timeout::new(self.tx_verifier.clone(), TRANSACTION_VERIFY_TIMEOUT),
367 self.state.clone(),
368 ));
369 self.active_state = ActiveState::Enabled {
370 storage: storage::Storage::new(&self.config),
371 tx_downloads,
372 last_seen_tip_hash,
373 };
374 }
375
376 (false, true, _) => {
378 info!(
379 tip_height = ?self.latest_chain_tip.best_tip_height(),
380 "deactivating mempool: Zebra is syncing lots of blocks"
381 );
382
383 self.active_state = ActiveState::Disabled;
386 }
387 };
388
389 true
390 }
391
392 pub fn is_enabled(&self) -> bool {
394 match self.active_state {
395 ActiveState::Disabled => false,
396 ActiveState::Enabled { .. } => true,
397 }
398 }
399
400 fn remove_expired_from_peer_list(
402 send_to_peers_ids: &HashSet<UnminedTxId>,
403 expired_transactions: &HashSet<UnminedTxId>,
404 ) -> HashSet<UnminedTxId> {
405 send_to_peers_ids
406 .iter()
407 .filter(|id| !expired_transactions.contains(id))
408 .copied()
409 .collect()
410 }
411
412 fn update_metrics(&mut self) {
414 #[cfg(feature = "progress-bar")]
416 if matches!(howudoin::cancelled(), Some(true)) {
417 self.disable_metrics();
418 return;
419 }
420
421 #[cfg(feature = "progress-bar")]
423 if self.is_enabled()
424 && (self.queued_count_bar.is_none()
425 || self.transaction_count_bar.is_none()
426 || self.transaction_cost_bar.is_none()
427 || self.rejected_count_bar.is_none())
428 {
429 let _max_transaction_count = self.config.tx_cost_limit
430 / zebra_chain::transaction::MEMPOOL_TRANSACTION_COST_THRESHOLD;
431
432 let transaction_count_bar = *howudoin::new_root()
433 .label("Mempool Transactions")
434 .set_pos(0u64);
435 let transaction_cost_bar = howudoin::new_with_parent(transaction_count_bar.id())
438 .label("Mempool Cost")
439 .set_pos(0u64)
440 .fmt_as_bytes(true);
442
443 let queued_count_bar = *howudoin::new_with_parent(transaction_cost_bar.id())
444 .label("Mempool Queue")
445 .set_pos(0u64);
446 let rejected_count_bar = *howudoin::new_with_parent(queued_count_bar.id())
451 .label("Mempool Rejects")
452 .set_pos(0u64);
453 self.transaction_count_bar = Some(transaction_count_bar);
458 self.transaction_cost_bar = Some(transaction_cost_bar);
459 self.queued_count_bar = Some(queued_count_bar);
460 self.rejected_count_bar = Some(rejected_count_bar);
461 }
462
463 #[cfg(feature = "progress-bar")]
465 if let (
466 Some(queued_count_bar),
467 Some(transaction_count_bar),
468 Some(transaction_cost_bar),
469 Some(rejected_count_bar),
470 ) = (
471 self.queued_count_bar,
472 self.transaction_count_bar,
473 self.transaction_cost_bar,
474 self.rejected_count_bar,
475 ) {
476 let queued_count = self.active_state.queued_transaction_count();
477 let transaction_count = self.active_state.transaction_count();
478
479 let transaction_cost = self.active_state.total_cost();
480 let transaction_size = self.active_state.total_serialized_size();
481 let transaction_size =
482 indicatif::HumanBytes(transaction_size.try_into().expect("fits in u64"));
483
484 let rejected_count = self.active_state.rejected_transaction_count();
485
486 queued_count_bar.set_pos(u64::try_from(queued_count).expect("fits in u64"));
487
488 transaction_count_bar.set_pos(u64::try_from(transaction_count).expect("fits in u64"));
489
490 transaction_cost_bar
495 .set_pos(transaction_cost)
496 .desc(format!("Actual size {transaction_size}"));
497
498 rejected_count_bar.set_pos(u64::try_from(rejected_count).expect("fits in u64"));
499 }
500 }
501
502 fn disable_metrics(&self) {
504 #[cfg(feature = "progress-bar")]
505 {
506 if let Some(bar) = self.queued_count_bar {
507 bar.close()
508 }
509 if let Some(bar) = self.transaction_count_bar {
510 bar.close()
511 }
512 if let Some(bar) = self.transaction_cost_bar {
513 bar.close()
514 }
515 if let Some(bar) = self.rejected_count_bar {
516 bar.close()
517 }
518 }
519 }
520}
521
522impl Service<Request> for Mempool {
523 type Response = Response;
524 type Error = BoxError;
525 type Future =
526 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
527
528 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
529 let tip_action = self.chain_tip_change.last_tip_change();
530
531 let is_state_changed = self.update_state(tip_action.as_ref());
533
534 tracing::trace!(is_enabled = ?self.is_enabled(), ?is_state_changed, "started polling the mempool...");
535
536 if !self.is_enabled() {
539 self.update_metrics();
540
541 return Poll::Ready(Ok(()));
542 }
543
544 if !is_state_changed && matches!(tip_action, Some(TipAction::Reset { .. })) {
549 info!(
550 tip_height = ?tip_action.as_ref().unwrap().best_tip_height(),
551 "resetting mempool: switched best chain, skipped blocks, or activated network upgrade"
552 );
553
554 let previous_state = self.active_state.take();
555 let tx_retries = previous_state.transaction_retry_requests();
556
557 std::mem::drop(previous_state);
564
565 self.update_state(tip_action.as_ref());
567
568 if let ActiveState::Enabled { tx_downloads, .. } = &mut self.active_state {
571 info!(
572 transactions = tx_retries.len(),
573 "re-verifying mempool transactions after a chain fork"
574 );
575
576 for tx in tx_retries {
577 let _result = tx_downloads.download_if_needed_and_verify(tx, None, None);
580 }
581 }
582
583 self.update_metrics();
584
585 return Poll::Ready(Ok(()));
586 }
587
588 if let ActiveState::Enabled {
589 storage,
590 tx_downloads,
591 last_seen_tip_hash,
592 } = &mut self.active_state
593 {
594 let mut send_to_peers_ids = HashSet::<_>::new();
596 let mut invalidated_ids = HashSet::<_>::new();
597 let mut mined_mempool_ids = HashSet::<_>::new();
598
599 let best_tip_height = self.latest_chain_tip.best_tip_height();
600
601 while let Poll::Ready(Some(result)) = pin!(&mut *tx_downloads).poll_next(cx) {
603 match result {
604 Ok(Ok((tx, spent_mempool_outpoints, expected_tip_height, rsp_tx))) => {
605 if best_tip_height == expected_tip_height {
612 let tx_id = tx.transaction.id;
613 let insert_result =
614 storage.insert(tx, spent_mempool_outpoints, best_tip_height);
615
616 tracing::trace!(
617 ?insert_result,
618 "got Ok(_) transaction verify, tried to store",
619 );
620
621 if let Ok(inserted_id) = insert_result {
622 send_to_peers_ids.insert(inserted_id);
624 } else {
625 invalidated_ids.insert(tx_id);
626 }
627
628 if let Some(rsp_tx) = rsp_tx {
630 let _ = rsp_tx
631 .send(insert_result.map(|_| ()).map_err(|err| err.into()));
632 }
633 } else {
634 tracing::trace!("chain grew during tx verification, retrying ..",);
635
636 let _result = tx_downloads.download_if_needed_and_verify(
638 tx.transaction.into(),
639 None,
640 rsp_tx,
641 );
642 }
643 }
644 Ok(Err(boxed_err)) => {
645 let (tx_id, error) = *boxed_err;
646 if let TransactionDownloadVerifyError::Invalid {
647 error,
648 advertiser_addr: Some(advertiser_addr),
649 } = &error
650 {
651 if error.mempool_misbehavior_score() != 0 {
652 let _ = self.misbehavior_sender.try_send((
653 *advertiser_addr,
654 error.mempool_misbehavior_score(),
655 ));
656 }
657 };
658
659 tracing::debug!(?tx_id, ?error, "mempool transaction failed to verify");
660
661 metrics::counter!("mempool.failed.verify.tasks.total", "reason" => error.to_string()).increment(1);
662
663 invalidated_ids.insert(tx_id);
664 storage.reject_if_needed(tx_id, error);
665 }
666 Err((tx_id, _elapsed)) => {
667 tracing::info!(
668 ?tx_id,
669 "mempool transaction failed to verify due to timeout"
670 );
671
672 invalidated_ids.insert(tx_id);
673
674 metrics::counter!("mempool.failed.verify.tasks.total", "reason" => "timeout").increment(1);
675 }
676 };
677 }
678
679 if let Some(TipAction::Grow { block }) = tip_action {
681 tracing::trace!(block_height = ?block.height, "handling blocks added to tip");
682 *last_seen_tip_hash = block.hash;
683
684 let mined_ids = block.transaction_hashes.iter().cloned().collect();
687 tx_downloads.cancel(&mined_ids);
688 storage.clear_mined_dependencies(&mined_ids);
689
690 let storage::RemovedTransactionIds { mined, invalidated } =
691 storage.reject_and_remove_same_effects(&mined_ids, block.transactions);
692
693 storage.clear_tip_rejections();
696
697 mined_mempool_ids.extend(mined);
698 invalidated_ids.extend(invalidated);
699 }
700
701 if let Some(tip_height) = best_tip_height {
706 let expired_transactions = storage.remove_expired_transactions(tip_height);
707 send_to_peers_ids =
709 Self::remove_expired_from_peer_list(&send_to_peers_ids, &expired_transactions);
710
711 if !expired_transactions.is_empty() {
712 tracing::debug!(
713 ?expired_transactions,
714 "removed expired transactions from the mempool",
715 );
716
717 invalidated_ids.extend(expired_transactions);
718 }
719 }
720
721 if !send_to_peers_ids.is_empty() {
723 tracing::trace!(
724 ?send_to_peers_ids,
725 "sending new transactions to peers and RPC listeners"
726 );
727
728 self.transaction_sender
729 .send(MempoolChange::added(send_to_peers_ids))?;
730 }
731
732 if !invalidated_ids.is_empty() {
734 tracing::trace!(
735 ?invalidated_ids,
736 "sending invalidated transactions to RPC listeners"
737 );
738
739 self.transaction_sender
740 .send(MempoolChange::invalidated(invalidated_ids))?;
741 }
742
743 if !mined_mempool_ids.is_empty() {
745 tracing::trace!(
746 ?mined_mempool_ids,
747 "sending mined transactions to RPC listeners"
748 );
749
750 self.transaction_sender
751 .send(MempoolChange::mined(mined_mempool_ids))?;
752 }
753 }
754
755 self.update_metrics();
756
757 Poll::Ready(Ok(()))
758 }
759
760 #[instrument(name = "mempool", skip(self, req))]
765 fn call(&mut self, req: Request) -> Self::Future {
766 match &mut self.active_state {
767 ActiveState::Enabled {
768 storage,
769 tx_downloads,
770 last_seen_tip_hash,
771 } => match req {
772 Request::TransactionIds => {
774 trace!(?req, "got mempool request");
775
776 let res: HashSet<_> = storage.tx_ids().collect();
777
778 trace!(?req, res_count = ?res.len(), "answered mempool request");
779
780 async move { Ok(Response::TransactionIds(res)) }.boxed()
781 }
782
783 Request::TransactionsById(ref ids) => {
784 trace!(?req, "got mempool request");
785
786 let res: Vec<_> = storage.transactions_exact(ids.clone()).cloned().collect();
787
788 trace!(?req, res_count = ?res.len(), "answered mempool request");
789
790 async move { Ok(Response::Transactions(res)) }.boxed()
791 }
792 Request::TransactionsByMinedId(ref ids) => {
793 trace!(?req, "got mempool request");
794
795 let res: Vec<_> = storage
796 .transactions_same_effects(ids.clone())
797 .cloned()
798 .collect();
799
800 trace!(?req, res_count = ?res.len(), "answered mempool request");
801
802 async move { Ok(Response::Transactions(res)) }.boxed()
803 }
804 Request::TransactionWithDepsByMinedId(tx_id) => {
805 trace!(?req, "got mempool request");
806
807 let res = if let Some((transaction, dependencies)) =
808 storage.transaction_with_deps(tx_id)
809 {
810 Ok(Response::TransactionWithDeps {
811 transaction,
812 dependencies,
813 })
814 } else {
815 Err("transaction not found in mempool".into())
816 };
817
818 trace!(?req, ?res, "answered mempool request");
819
820 async move { res }.boxed()
821 }
822
823 Request::AwaitOutput(outpoint) => {
824 trace!(?req, "got mempool request");
825
826 let response_fut = storage.pending_outputs.queue(outpoint);
827
828 if let Some(output) = storage.created_output(&outpoint) {
829 storage.pending_outputs.respond(&outpoint, output)
830 }
831
832 trace!("answered mempool request");
833
834 response_fut.boxed()
835 }
836
837 Request::FullTransactions => {
838 trace!(?req, "got mempool request");
839
840 let transactions: Vec<_> = storage.transactions().values().cloned().collect();
841 let transaction_dependencies = storage.transaction_dependencies().clone();
842
843 trace!(?req, transactions_count = ?transactions.len(), "answered mempool request");
844
845 let response = Response::FullTransactions {
846 transactions,
847 transaction_dependencies,
848 last_seen_tip_hash: *last_seen_tip_hash,
849 };
850
851 async move { Ok(response) }.boxed()
852 }
853
854 Request::RejectedTransactionIds(ref ids) => {
855 trace!(?req, "got mempool request");
856
857 let res = storage.rejected_transactions(ids.clone()).collect();
858
859 trace!(?req, ?res, "answered mempool request");
860
861 async move { Ok(Response::RejectedTransactionIds(res)) }.boxed()
862 }
863
864 Request::Queue(gossiped_txs) => {
866 trace!(req_count = ?gossiped_txs.len(), "got mempool Queue request");
867
868 let rsp: Vec<Result<oneshot::Receiver<Result<(), BoxError>>, BoxError>> =
869 gossiped_txs
870 .into_iter()
871 .map(
872 |gossiped_tx| -> Result<
873 oneshot::Receiver<Result<(), BoxError>>,
874 MempoolError,
875 > {
876 let (rsp_tx, rsp_rx) = oneshot::channel();
877 storage.should_download_or_verify(gossiped_tx.id())?;
878 tx_downloads.download_if_needed_and_verify(
879 gossiped_tx,
880 None,
881 Some(rsp_tx),
882 )?;
883
884 Ok(rsp_rx)
885 },
886 )
887 .map(|result| result.map_err(BoxError::from))
888 .collect();
889
890 self.update_metrics();
892
893 async move { Ok(Response::Queued(rsp)) }.boxed()
894 }
895
896 Request::QueueFromPeer { txids, source } => {
899 trace!(req_count = ?txids.len(), ?source, "got mempool QueueFromPeer request");
900
901 for txid in txids {
902 if storage.should_download_or_verify(txid).is_err() {
903 continue;
904 }
905 let _ = tx_downloads.download_if_needed_and_verify(
906 Gossip::Id(txid),
907 Some(source),
908 None,
909 );
910 }
911
912 self.update_metrics();
913
914 async move { Ok(Response::Queued(Vec::new())) }.boxed()
915 }
916
917 Request::CheckForVerifiedTransactions => {
919 trace!(?req, "got mempool request");
920
921 async move { Ok(Response::CheckedForVerifiedTransactions) }.boxed()
923 }
924
925 Request::QueueStats => {
929 trace!(?req, "got mempool request");
930
931 let size = storage.transaction_count();
932
933 let bytes = storage.total_serialized_size();
934
935 let usage = bytes; let fully_notified = None;
939
940 trace!(size, bytes, usage, "answered mempool request");
941
942 async move {
943 Ok(Response::QueueStats {
944 size,
945 bytes,
946 usage,
947 fully_notified,
948 })
949 }
950 .boxed()
951 }
952 Request::UnspentOutput(outpoint) => {
953 trace!(?req, "got mempool request");
954
955 if storage.has_spent_outpoint(&outpoint) {
956 trace!(?req, "answered mempool request");
957
958 return async move {
959 Ok(Response::TransparentOutput(Some(CreatedOrSpent::Spent)))
960 }
961 .boxed();
962 }
963
964 if let Some((tx_version, output)) = storage
965 .transactions()
966 .get(&outpoint.hash)
967 .map(|tx| tx.transaction.transaction.clone())
968 .and_then(|tx| {
969 tx.outputs()
970 .get(outpoint.index as usize)
971 .map(|output| (tx.version(), output.clone()))
972 })
973 {
974 trace!(?req, "answered mempool request");
975
976 let last_seen_hash = *last_seen_tip_hash;
977 return async move {
978 Ok(Response::TransparentOutput(Some(CreatedOrSpent::Created {
979 output,
980 tx_version,
981 last_seen_hash,
982 })))
983 }
984 .boxed();
985 }
986
987 trace!(?req, "answered mempool request");
988
989 async move { Ok(Response::TransparentOutput(None)) }.boxed()
990 }
991 },
992 ActiveState::Disabled => {
993 trace!("got mempool request while mempool is disabled");
996
997 let resp = match req {
1002 Request::TransactionIds => Response::TransactionIds(Default::default()),
1004
1005 Request::TransactionsById(_) => Response::Transactions(Default::default()),
1006 Request::TransactionsByMinedId(_) => Response::Transactions(Default::default()),
1007 Request::TransactionWithDepsByMinedId(_)
1008 | Request::AwaitOutput(_)
1009 | Request::UnspentOutput(_) => {
1010 return async move {
1011 Err("mempool is not active: wait for Zebra to sync to the tip".into())
1012 }
1013 .boxed()
1014 }
1015
1016 Request::FullTransactions => {
1017 return async move {
1018 Err("mempool is not active: wait for Zebra to sync to the tip".into())
1019 }
1020 .boxed()
1021 }
1022
1023 Request::RejectedTransactionIds(_) => {
1024 Response::RejectedTransactionIds(Default::default())
1025 }
1026
1027 Request::Queue(gossiped_txs) => Response::Queued(
1029 iter::repeat_n(MempoolError::Disabled, gossiped_txs.len())
1032 .map(BoxError::from)
1033 .map(Err)
1034 .collect(),
1035 ),
1036
1037 Request::QueueFromPeer { .. } => Response::Queued(Vec::new()),
1039
1040 Request::CheckForVerifiedTransactions => {
1043 Response::CheckedForVerifiedTransactions
1045 }
1046
1047 Request::QueueStats => Response::QueueStats {
1049 size: 0,
1050 bytes: 0,
1051 usage: 0,
1052 fully_notified: None,
1053 },
1054 };
1055
1056 async move { Ok(resp) }.boxed()
1057 }
1058 }
1059 }
1060}
1061
1062impl Drop for Mempool {
1063 fn drop(&mut self) {
1064 self.disable_metrics();
1065 }
1066}