1use std::{
22 collections::HashSet,
23 future::Future,
24 iter,
25 pin::{pin, Pin},
26 task::{Context, Poll},
27};
28
29use futures::{future::FutureExt, stream::Stream};
30use tokio::sync::{broadcast, mpsc, oneshot};
31use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service};
32
33use zebra_chain::{
34 block::{self, Height},
35 chain_sync_status::ChainSyncStatus,
36 chain_tip::ChainTip,
37 transaction::UnminedTxId,
38};
39use zebra_consensus::{error::TransactionError, transaction};
40use zebra_network::{self as zn, PeerSocketAddr};
41use zebra_node_services::mempool::{
42 CreatedOrSpent, Gossip, MempoolChange, MempoolTxSubscriber, Request, Response,
43};
44use zebra_state as zs;
45use zebra_state::{ChainTipChange, TipAction};
46
47use crate::components::sync::SyncStatus;
48
49pub mod config;
50mod crawler;
51pub mod downloads;
52mod error;
53pub mod gossip;
54mod pending_outputs;
55mod queue_checker;
56mod storage;
57
58#[cfg(test)]
59mod tests;
60
61pub use crate::BoxError;
62
63pub use config::Config;
64pub use crawler::Crawler;
65pub use error::MempoolError;
66pub use gossip::gossip_mempool_transaction_id;
67pub use queue_checker::QueueChecker;
68pub use storage::{
69 ExactTipRejectionError, SameEffectsChainRejectionError, SameEffectsTipRejectionError, Storage,
70};
71
72#[cfg(test)]
73pub use self::tests::UnboxMempoolError;
74
75use downloads::{
76 Downloads as TxDownloads, TransactionDownloadVerifyError, TRANSACTION_DOWNLOAD_TIMEOUT,
77 TRANSACTION_VERIFY_TIMEOUT,
78};
79
80type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
81type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
82type TxVerifier = Buffer<
83 BoxService<transaction::Request, transaction::Response, TransactionError>,
84 transaction::Request,
85>;
86type InboundTxDownloads = TxDownloads<Timeout<Outbound>, Timeout<TxVerifier>, State>;
87
88#[allow(clippy::large_enum_variant)]
95#[derive(Default)]
96enum ActiveState {
97 #[default]
99 Disabled,
100
101 Enabled {
103 storage: Storage,
110
111 tx_downloads: Pin<Box<InboundTxDownloads>>,
113
114 last_seen_tip_hash: block::Hash,
118 },
119}
120
121impl ActiveState {
122 fn take(&mut self) -> Self {
124 std::mem::take(self)
125 }
126
127 fn transaction_retry_requests(&self) -> Vec<Gossip> {
129 match self {
130 ActiveState::Disabled => Vec::new(),
131 ActiveState::Enabled {
132 storage,
133 tx_downloads,
134 ..
135 } => {
136 let mut transactions = Vec::new();
137
138 let storage = storage
139 .transactions()
140 .values()
141 .map(|tx| tx.transaction.clone().into());
142 transactions.extend(storage);
143
144 let pending = tx_downloads.transaction_requests().cloned();
145 transactions.extend(pending);
146
147 transactions
148 }
149 }
150 }
151
152 #[cfg(feature = "progress-bar")]
155 fn queued_transaction_count(&self) -> usize {
156 match self {
157 ActiveState::Disabled => 0,
158 ActiveState::Enabled { tx_downloads, .. } => tx_downloads.in_flight(),
159 }
160 }
161
162 #[cfg(feature = "progress-bar")]
164 fn transaction_count(&self) -> usize {
165 match self {
166 ActiveState::Disabled => 0,
167 ActiveState::Enabled { storage, .. } => storage.transaction_count(),
168 }
169 }
170
171 #[cfg(feature = "progress-bar")]
174 fn total_cost(&self) -> u64 {
175 match self {
176 ActiveState::Disabled => 0,
177 ActiveState::Enabled { storage, .. } => storage.total_cost(),
178 }
179 }
180
181 #[cfg(feature = "progress-bar")]
186 pub fn total_serialized_size(&self) -> usize {
187 match self {
188 ActiveState::Disabled => 0,
189 ActiveState::Enabled { storage, .. } => storage.total_serialized_size(),
190 }
191 }
192
193 #[cfg(feature = "progress-bar")]
196 fn rejected_transaction_count(&mut self) -> usize {
197 match self {
198 ActiveState::Disabled => 0,
199 ActiveState::Enabled { storage, .. } => storage.rejected_transaction_count(),
200 }
201 }
202}
203
204pub struct Mempool {
210 config: Config,
212
213 active_state: ActiveState,
215
216 sync_status: SyncStatus,
218
219 debug_enable_at_height: Option<Height>,
221
222 latest_chain_tip: zs::LatestChainTip,
224
225 chain_tip_change: ChainTipChange,
228
229 outbound: Outbound,
232
233 state: State,
236
237 tx_verifier: TxVerifier,
240
241 transaction_sender: broadcast::Sender<MempoolChange>,
244
245 misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
247
248 #[cfg(feature = "progress-bar")]
253 queued_count_bar: Option<howudoin::Tx>,
254
255 #[cfg(feature = "progress-bar")]
258 transaction_count_bar: Option<howudoin::Tx>,
259
260 #[cfg(feature = "progress-bar")]
263 transaction_cost_bar: Option<howudoin::Tx>,
264
265 #[cfg(feature = "progress-bar")]
268 rejected_count_bar: Option<howudoin::Tx>,
269}
270
271impl Mempool {
272 #[allow(clippy::too_many_arguments)]
273 pub(crate) fn new(
274 config: &Config,
275 outbound: Outbound,
276 state: State,
277 tx_verifier: TxVerifier,
278 sync_status: SyncStatus,
279 latest_chain_tip: zs::LatestChainTip,
280 chain_tip_change: ChainTipChange,
281 misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
282 ) -> (Self, MempoolTxSubscriber) {
283 let (transaction_sender, _) =
284 tokio::sync::broadcast::channel(gossip::MAX_CHANGES_BEFORE_SEND * 2);
285 let transaction_subscriber = MempoolTxSubscriber::new(transaction_sender.clone());
286
287 let mut service = Mempool {
288 config: config.clone(),
289 active_state: ActiveState::Disabled,
290 sync_status,
291 debug_enable_at_height: config.debug_enable_at_height.map(Height),
292 latest_chain_tip,
293 chain_tip_change,
294 outbound,
295 state,
296 tx_verifier,
297 transaction_sender,
298 misbehavior_sender,
299 #[cfg(feature = "progress-bar")]
300 queued_count_bar: None,
301 #[cfg(feature = "progress-bar")]
302 transaction_count_bar: None,
303 #[cfg(feature = "progress-bar")]
304 transaction_cost_bar: None,
305 #[cfg(feature = "progress-bar")]
306 rejected_count_bar: None,
307 };
308
309 service.update_state(None);
312
313 (service, transaction_subscriber)
314 }
315
316 fn is_enabled_by_debug(&self) -> bool {
318 let mut is_debug_enabled = false;
319
320 if self.debug_enable_at_height.is_none() {
322 return is_debug_enabled;
323 }
324
325 let enable_at_height = self
326 .debug_enable_at_height
327 .expect("unexpected debug_enable_at_height: just checked for None");
328
329 if let Some(best_tip_height) = self.latest_chain_tip.best_tip_height() {
330 is_debug_enabled = best_tip_height >= enable_at_height;
331
332 if is_debug_enabled && !self.is_enabled() {
333 info!(
334 ?best_tip_height,
335 ?enable_at_height,
336 "enabling mempool for debugging"
337 );
338 }
339 }
340
341 is_debug_enabled
342 }
343
344 fn update_state(&mut self, tip_action: Option<&TipAction>) -> bool {
352 let is_close_to_tip = self.sync_status.is_close_to_tip() || self.is_enabled_by_debug();
353
354 match (is_close_to_tip, self.is_enabled(), tip_action) {
355 (false, false, _) | (true, true, _) | (true, false, None) => return false,
357
358 (true, false, Some(tip_action)) => {
360 let (last_seen_tip_hash, tip_height) = tip_action.best_tip_hash_and_height();
361
362 info!(?tip_height, "activating mempool: Zebra is close to the tip");
363
364 let tx_downloads = Box::pin(TxDownloads::new(
365 Timeout::new(self.outbound.clone(), TRANSACTION_DOWNLOAD_TIMEOUT),
366 Timeout::new(self.tx_verifier.clone(), TRANSACTION_VERIFY_TIMEOUT),
367 self.state.clone(),
368 ));
369 self.active_state = ActiveState::Enabled {
370 storage: storage::Storage::new(&self.config),
371 tx_downloads,
372 last_seen_tip_hash,
373 };
374 }
375
376 (false, true, _) => {
378 info!(
379 tip_height = ?self.latest_chain_tip.best_tip_height(),
380 "deactivating mempool: Zebra is syncing lots of blocks"
381 );
382
383 self.active_state = ActiveState::Disabled;
386 }
387 };
388
389 true
390 }
391
392 pub fn is_enabled(&self) -> bool {
394 match self.active_state {
395 ActiveState::Disabled => false,
396 ActiveState::Enabled { .. } => true,
397 }
398 }
399
400 fn remove_expired_from_peer_list(
402 send_to_peers_ids: &HashSet<UnminedTxId>,
403 expired_transactions: &HashSet<UnminedTxId>,
404 ) -> HashSet<UnminedTxId> {
405 send_to_peers_ids
406 .iter()
407 .filter(|id| !expired_transactions.contains(id))
408 .copied()
409 .collect()
410 }
411
412 fn update_metrics(&mut self) {
414 #[cfg(feature = "progress-bar")]
416 if matches!(howudoin::cancelled(), Some(true)) {
417 self.disable_metrics();
418 return;
419 }
420
421 #[cfg(feature = "progress-bar")]
423 if self.is_enabled()
424 && (self.queued_count_bar.is_none()
425 || self.transaction_count_bar.is_none()
426 || self.transaction_cost_bar.is_none()
427 || self.rejected_count_bar.is_none())
428 {
429 let _max_transaction_count = self.config.tx_cost_limit
430 / zebra_chain::transaction::MEMPOOL_TRANSACTION_COST_THRESHOLD;
431
432 let transaction_count_bar = *howudoin::new_root()
433 .label("Mempool Transactions")
434 .set_pos(0u64);
435 let transaction_cost_bar = howudoin::new_with_parent(transaction_count_bar.id())
438 .label("Mempool Cost")
439 .set_pos(0u64)
440 .fmt_as_bytes(true);
442
443 let queued_count_bar = *howudoin::new_with_parent(transaction_cost_bar.id())
444 .label("Mempool Queue")
445 .set_pos(0u64);
446 let rejected_count_bar = *howudoin::new_with_parent(queued_count_bar.id())
451 .label("Mempool Rejects")
452 .set_pos(0u64);
453 self.transaction_count_bar = Some(transaction_count_bar);
458 self.transaction_cost_bar = Some(transaction_cost_bar);
459 self.queued_count_bar = Some(queued_count_bar);
460 self.rejected_count_bar = Some(rejected_count_bar);
461 }
462
463 #[cfg(feature = "progress-bar")]
465 if let (
466 Some(queued_count_bar),
467 Some(transaction_count_bar),
468 Some(transaction_cost_bar),
469 Some(rejected_count_bar),
470 ) = (
471 self.queued_count_bar,
472 self.transaction_count_bar,
473 self.transaction_cost_bar,
474 self.rejected_count_bar,
475 ) {
476 let queued_count = self.active_state.queued_transaction_count();
477 let transaction_count = self.active_state.transaction_count();
478
479 let transaction_cost = self.active_state.total_cost();
480 let transaction_size = self.active_state.total_serialized_size();
481 let transaction_size =
482 indicatif::HumanBytes(transaction_size.try_into().expect("fits in u64"));
483
484 let rejected_count = self.active_state.rejected_transaction_count();
485
486 queued_count_bar.set_pos(u64::try_from(queued_count).expect("fits in u64"));
487
488 transaction_count_bar.set_pos(u64::try_from(transaction_count).expect("fits in u64"));
489
490 transaction_cost_bar
495 .set_pos(transaction_cost)
496 .desc(format!("Actual size {transaction_size}"));
497
498 rejected_count_bar.set_pos(u64::try_from(rejected_count).expect("fits in u64"));
499 }
500 }
501
502 fn disable_metrics(&self) {
504 #[cfg(feature = "progress-bar")]
505 {
506 if let Some(bar) = self.queued_count_bar {
507 bar.close()
508 }
509 if let Some(bar) = self.transaction_count_bar {
510 bar.close()
511 }
512 if let Some(bar) = self.transaction_cost_bar {
513 bar.close()
514 }
515 if let Some(bar) = self.rejected_count_bar {
516 bar.close()
517 }
518 }
519 }
520}
521
522impl Service<Request> for Mempool {
523 type Response = Response;
524 type Error = BoxError;
525 type Future =
526 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
527
528 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
529 let tip_action = self.chain_tip_change.last_tip_change();
530
531 let is_state_changed = self.update_state(tip_action.as_ref());
533
534 tracing::trace!(is_enabled = ?self.is_enabled(), ?is_state_changed, "started polling the mempool...");
535
536 if !self.is_enabled() {
539 self.update_metrics();
540
541 return Poll::Ready(Ok(()));
542 }
543
544 if !is_state_changed && matches!(tip_action, Some(TipAction::Reset { .. })) {
549 info!(
550 tip_height = ?tip_action.as_ref().unwrap().best_tip_height(),
551 "resetting mempool: switched best chain, skipped blocks, or activated network upgrade"
552 );
553
554 let previous_state = self.active_state.take();
555 let tx_retries = previous_state.transaction_retry_requests();
556
557 std::mem::drop(previous_state);
564
565 self.update_state(tip_action.as_ref());
567
568 if let ActiveState::Enabled { tx_downloads, .. } = &mut self.active_state {
571 info!(
572 transactions = tx_retries.len(),
573 "re-verifying mempool transactions after a chain fork"
574 );
575
576 for tx in tx_retries {
577 let _result = tx_downloads.download_if_needed_and_verify(tx, None);
580 }
581 }
582
583 self.update_metrics();
584
585 return Poll::Ready(Ok(()));
586 }
587
588 if let ActiveState::Enabled {
589 storage,
590 tx_downloads,
591 last_seen_tip_hash,
592 } = &mut self.active_state
593 {
594 let mut send_to_peers_ids = HashSet::<_>::new();
596 let mut invalidated_ids = HashSet::<_>::new();
597 let mut mined_mempool_ids = HashSet::<_>::new();
598
599 let best_tip_height = self.latest_chain_tip.best_tip_height();
600
601 while let Poll::Ready(Some(result)) = pin!(&mut *tx_downloads).poll_next(cx) {
603 match result {
604 Ok(Ok((tx, spent_mempool_outpoints, expected_tip_height, rsp_tx))) => {
605 if best_tip_height == expected_tip_height {
612 let tx_id = tx.transaction.id;
613 let insert_result =
614 storage.insert(tx, spent_mempool_outpoints, best_tip_height);
615
616 tracing::trace!(
617 ?insert_result,
618 "got Ok(_) transaction verify, tried to store",
619 );
620
621 if let Ok(inserted_id) = insert_result {
622 send_to_peers_ids.insert(inserted_id);
624 } else {
625 invalidated_ids.insert(tx_id);
626 }
627
628 if let Some(rsp_tx) = rsp_tx {
630 let _ = rsp_tx
631 .send(insert_result.map(|_| ()).map_err(|err| err.into()));
632 }
633 } else {
634 tracing::trace!("chain grew during tx verification, retrying ..",);
635
636 let _result = tx_downloads
638 .download_if_needed_and_verify(tx.transaction.into(), rsp_tx);
639 }
640 }
641 Ok(Err(boxed_err)) => {
642 let (tx_id, error) = *boxed_err;
643 if let TransactionDownloadVerifyError::Invalid {
644 error,
645 advertiser_addr: Some(advertiser_addr),
646 } = &error
647 {
648 if error.mempool_misbehavior_score() != 0 {
649 let _ = self.misbehavior_sender.try_send((
650 *advertiser_addr,
651 error.mempool_misbehavior_score(),
652 ));
653 }
654 };
655
656 tracing::debug!(?tx_id, ?error, "mempool transaction failed to verify");
657
658 metrics::counter!("mempool.failed.verify.tasks.total", "reason" => error.to_string()).increment(1);
659
660 invalidated_ids.insert(tx_id);
661 storage.reject_if_needed(tx_id, error);
662 }
663 Err(_elapsed) => {
664 tracing::warn!("mempool transaction failed to verify due to timeout");
670
671 metrics::counter!("mempool.failed.verify.tasks.total", "reason" => "timeout").increment(1);
672 }
673 };
674 }
675
676 if let Some(TipAction::Grow { block }) = tip_action {
678 tracing::trace!(block_height = ?block.height, "handling blocks added to tip");
679 *last_seen_tip_hash = block.hash;
680
681 let mined_ids = block.transaction_hashes.iter().cloned().collect();
684 tx_downloads.cancel(&mined_ids);
685 storage.clear_mined_dependencies(&mined_ids);
686
687 let storage::RemovedTransactionIds { mined, invalidated } =
688 storage.reject_and_remove_same_effects(&mined_ids, block.transactions);
689
690 storage.clear_tip_rejections();
693
694 mined_mempool_ids.extend(mined);
695 invalidated_ids.extend(invalidated);
696 }
697
698 if let Some(tip_height) = best_tip_height {
703 let expired_transactions = storage.remove_expired_transactions(tip_height);
704 send_to_peers_ids =
706 Self::remove_expired_from_peer_list(&send_to_peers_ids, &expired_transactions);
707
708 if !expired_transactions.is_empty() {
709 tracing::debug!(
710 ?expired_transactions,
711 "removed expired transactions from the mempool",
712 );
713
714 invalidated_ids.extend(expired_transactions);
715 }
716 }
717
718 if !send_to_peers_ids.is_empty() {
720 tracing::trace!(
721 ?send_to_peers_ids,
722 "sending new transactions to peers and RPC listeners"
723 );
724
725 self.transaction_sender
726 .send(MempoolChange::added(send_to_peers_ids))?;
727 }
728
729 if !invalidated_ids.is_empty() {
731 tracing::trace!(
732 ?invalidated_ids,
733 "sending invalidated transactions to RPC listeners"
734 );
735
736 self.transaction_sender
737 .send(MempoolChange::invalidated(invalidated_ids))?;
738 }
739
740 if !mined_mempool_ids.is_empty() {
742 tracing::trace!(
743 ?mined_mempool_ids,
744 "sending mined transactions to RPC listeners"
745 );
746
747 self.transaction_sender
748 .send(MempoolChange::mined(mined_mempool_ids))?;
749 }
750 }
751
752 self.update_metrics();
753
754 Poll::Ready(Ok(()))
755 }
756
757 #[instrument(name = "mempool", skip(self, req))]
762 fn call(&mut self, req: Request) -> Self::Future {
763 match &mut self.active_state {
764 ActiveState::Enabled {
765 storage,
766 tx_downloads,
767 last_seen_tip_hash,
768 } => match req {
769 Request::TransactionIds => {
771 trace!(?req, "got mempool request");
772
773 let res: HashSet<_> = storage.tx_ids().collect();
774
775 trace!(?req, res_count = ?res.len(), "answered mempool request");
776
777 async move { Ok(Response::TransactionIds(res)) }.boxed()
778 }
779
780 Request::TransactionsById(ref ids) => {
781 trace!(?req, "got mempool request");
782
783 let res: Vec<_> = storage.transactions_exact(ids.clone()).cloned().collect();
784
785 trace!(?req, res_count = ?res.len(), "answered mempool request");
786
787 async move { Ok(Response::Transactions(res)) }.boxed()
788 }
789 Request::TransactionsByMinedId(ref ids) => {
790 trace!(?req, "got mempool request");
791
792 let res: Vec<_> = storage
793 .transactions_same_effects(ids.clone())
794 .cloned()
795 .collect();
796
797 trace!(?req, res_count = ?res.len(), "answered mempool request");
798
799 async move { Ok(Response::Transactions(res)) }.boxed()
800 }
801 Request::TransactionWithDepsByMinedId(tx_id) => {
802 trace!(?req, "got mempool request");
803
804 let res = if let Some((transaction, dependencies)) =
805 storage.transaction_with_deps(tx_id)
806 {
807 Ok(Response::TransactionWithDeps {
808 transaction,
809 dependencies,
810 })
811 } else {
812 Err("transaction not found in mempool".into())
813 };
814
815 trace!(?req, ?res, "answered mempool request");
816
817 async move { res }.boxed()
818 }
819
820 Request::AwaitOutput(outpoint) => {
821 trace!(?req, "got mempool request");
822
823 let response_fut = storage.pending_outputs.queue(outpoint);
824
825 if let Some(output) = storage.created_output(&outpoint) {
826 storage.pending_outputs.respond(&outpoint, output)
827 }
828
829 trace!("answered mempool request");
830
831 response_fut.boxed()
832 }
833
834 Request::FullTransactions => {
835 trace!(?req, "got mempool request");
836
837 let transactions: Vec<_> = storage.transactions().values().cloned().collect();
838 let transaction_dependencies = storage.transaction_dependencies().clone();
839
840 trace!(?req, transactions_count = ?transactions.len(), "answered mempool request");
841
842 let response = Response::FullTransactions {
843 transactions,
844 transaction_dependencies,
845 last_seen_tip_hash: *last_seen_tip_hash,
846 };
847
848 async move { Ok(response) }.boxed()
849 }
850
851 Request::RejectedTransactionIds(ref ids) => {
852 trace!(?req, "got mempool request");
853
854 let res = storage.rejected_transactions(ids.clone()).collect();
855
856 trace!(?req, ?res, "answered mempool request");
857
858 async move { Ok(Response::RejectedTransactionIds(res)) }.boxed()
859 }
860
861 Request::Queue(gossiped_txs) => {
863 trace!(req_count = ?gossiped_txs.len(), "got mempool Queue request");
864
865 let rsp: Vec<Result<oneshot::Receiver<Result<(), BoxError>>, BoxError>> =
866 gossiped_txs
867 .into_iter()
868 .map(
869 |gossiped_tx| -> Result<
870 oneshot::Receiver<Result<(), BoxError>>,
871 MempoolError,
872 > {
873 let (rsp_tx, rsp_rx) = oneshot::channel();
874 storage.should_download_or_verify(gossiped_tx.id())?;
875 tx_downloads
876 .download_if_needed_and_verify(gossiped_tx, Some(rsp_tx))?;
877
878 Ok(rsp_rx)
879 },
880 )
881 .map(|result| result.map_err(BoxError::from))
882 .collect();
883
884 self.update_metrics();
886
887 async move { Ok(Response::Queued(rsp)) }.boxed()
888 }
889
890 Request::CheckForVerifiedTransactions => {
892 trace!(?req, "got mempool request");
893
894 async move { Ok(Response::CheckedForVerifiedTransactions) }.boxed()
896 }
897
898 Request::QueueStats => {
902 trace!(?req, "got mempool request");
903
904 let size = storage.transaction_count();
905
906 let bytes = storage.total_serialized_size();
907
908 let usage = bytes; let fully_notified = None;
912
913 trace!(size, bytes, usage, "answered mempool request");
914
915 async move {
916 Ok(Response::QueueStats {
917 size,
918 bytes,
919 usage,
920 fully_notified,
921 })
922 }
923 .boxed()
924 }
925 Request::UnspentOutput(outpoint) => {
926 trace!(?req, "got mempool request");
927
928 if storage.has_spent_outpoint(&outpoint) {
929 trace!(?req, "answered mempool request");
930
931 return async move {
932 Ok(Response::TransparentOutput(Some(CreatedOrSpent::Spent)))
933 }
934 .boxed();
935 }
936
937 if let Some((tx_version, output)) = storage
938 .transactions()
939 .get(&outpoint.hash)
940 .map(|tx| tx.transaction.transaction.clone())
941 .and_then(|tx| {
942 tx.outputs()
943 .get(outpoint.index as usize)
944 .map(|output| (tx.version(), output.clone()))
945 })
946 {
947 trace!(?req, "answered mempool request");
948
949 let last_seen_hash = *last_seen_tip_hash;
950 return async move {
951 Ok(Response::TransparentOutput(Some(CreatedOrSpent::Created {
952 output,
953 tx_version,
954 last_seen_hash,
955 })))
956 }
957 .boxed();
958 }
959
960 trace!(?req, "answered mempool request");
961
962 async move { Ok(Response::TransparentOutput(None)) }.boxed()
963 }
964 },
965 ActiveState::Disabled => {
966 trace!("got mempool request while mempool is disabled");
969
970 let resp = match req {
975 Request::TransactionIds => Response::TransactionIds(Default::default()),
977
978 Request::TransactionsById(_) => Response::Transactions(Default::default()),
979 Request::TransactionsByMinedId(_) => Response::Transactions(Default::default()),
980 Request::TransactionWithDepsByMinedId(_)
981 | Request::AwaitOutput(_)
982 | Request::UnspentOutput(_) => {
983 return async move {
984 Err("mempool is not active: wait for Zebra to sync to the tip".into())
985 }
986 .boxed()
987 }
988
989 Request::FullTransactions => {
990 return async move {
991 Err("mempool is not active: wait for Zebra to sync to the tip".into())
992 }
993 .boxed()
994 }
995
996 Request::RejectedTransactionIds(_) => {
997 Response::RejectedTransactionIds(Default::default())
998 }
999
1000 Request::Queue(gossiped_txs) => Response::Queued(
1002 iter::repeat_n(MempoolError::Disabled, gossiped_txs.len())
1005 .map(BoxError::from)
1006 .map(Err)
1007 .collect(),
1008 ),
1009
1010 Request::CheckForVerifiedTransactions => {
1013 Response::CheckedForVerifiedTransactions
1015 }
1016
1017 Request::QueueStats => Response::QueueStats {
1019 size: 0,
1020 bytes: 0,
1021 usage: 0,
1022 fully_notified: None,
1023 },
1024 };
1025
1026 async move { Ok(resp) }.boxed()
1027 }
1028 }
1029 }
1030}
1031
1032impl Drop for Mempool {
1033 fn drop(&mut self) {
1034 self.disable_metrics();
1035 }
1036}