1use std::{
18 collections::HashMap,
19 future::Future,
20 pin::Pin,
21 sync::Arc,
22 task::{Context, Poll},
23 time::{Duration, Instant},
24};
25
26use futures::future::FutureExt;
27use tokio::sync::oneshot;
28use tower::{util::BoxService, Service, ServiceExt};
29use tracing::{instrument, Instrument, Span};
30
31#[cfg(any(test, feature = "proptest-impl"))]
32use tower::buffer::Buffer;
33
34use zebra_chain::{
35 block::{self, CountedHeader, HeightDiff},
36 diagnostic::CodeTimer,
37 parameters::{Network, NetworkUpgrade},
38 serialization::ZcashSerialize,
39 subtree::NoteCommitmentSubtreeIndex,
40};
41
42use crate::{
43 constants::{
44 MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS, MAX_LEGACY_CHAIN_BLOCKS,
45 },
46 error::{CommitBlockError, CommitCheckpointVerifiedError, InvalidateError, ReconsiderError},
47 request::TimedSpan,
48 response::NonFinalizedBlocksListener,
49 service::{
50 block_iter::any_ancestor_blocks,
51 chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
52 finalized_state::{FinalizedState, ZebraDb},
53 non_finalized_state::{Chain, NonFinalizedState},
54 pending_utxos::PendingUtxos,
55 queued_blocks::QueuedBlocks,
56 read::find,
57 watch_receiver::WatchReceiver,
58 },
59 BoxError, CheckpointVerifiedBlock, CommitSemanticallyVerifiedError, Config, KnownBlock,
60 ReadRequest, ReadResponse, Request, Response, SemanticallyVerifiedBlock,
61};
62
63pub mod block_iter;
64pub mod chain_tip;
65pub mod watch_receiver;
66
67pub mod check;
68
69pub(crate) mod finalized_state;
70pub(crate) mod non_finalized_state;
71mod pending_utxos;
72mod queued_blocks;
73pub(crate) mod read;
74mod traits;
75mod write;
76
77#[cfg(any(test, feature = "proptest-impl"))]
78pub mod arbitrary;
79
80#[cfg(test)]
81mod tests;
82
83pub use finalized_state::{OutputLocation, TransactionIndex, TransactionLocation};
84use write::NonFinalizedWriteMessage;
85
86use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes};
87
88pub use self::traits::{ReadState, State};
89
90#[derive(Debug)]
109pub(crate) struct StateService {
110 network: Network,
114
115 full_verifier_utxo_lookahead: block::Height,
121
122 non_finalized_state_queued_blocks: QueuedBlocks,
127
128 finalized_state_queued_blocks: HashMap<block::Hash, QueuedCheckpointVerified>,
133
134 block_write_sender: write::BlockWriteSender,
136
137 finalized_block_write_last_sent_hash: block::Hash,
147
148 non_finalized_block_write_sent_hashes: SentHashes,
151
152 invalid_block_write_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
158
159 pending_utxos: PendingUtxos,
163
164 last_prune: Instant,
166
167 read_service: ReadStateService,
173
174 max_finalized_queue_height: f64,
181}
182
183#[derive(Clone, Debug)]
195pub struct ReadStateService {
196 network: Network,
200
201 non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
208
209 db: ZebraDb,
217
218 block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
223}
224
225impl Drop for StateService {
226 fn drop(&mut self) {
227 self.invalid_block_write_reset_receiver.close();
234
235 std::mem::drop(self.block_write_sender.finalized.take());
236 std::mem::drop(self.block_write_sender.non_finalized.take());
237
238 self.clear_finalized_block_queue(CommitBlockError::WriteTaskExited);
239 self.clear_non_finalized_block_queue(CommitBlockError::WriteTaskExited);
240
241 info!("dropping the state: logging database metrics");
243 self.log_db_metrics();
244
245 }
248}
249
250impl Drop for ReadStateService {
251 fn drop(&mut self) {
252 if let Some(block_write_task) = self.block_write_task.take() {
257 if let Some(block_write_task_handle) = Arc::into_inner(block_write_task) {
258 self.db.shutdown(true);
262
263 #[cfg(not(test))]
269 info!("waiting for the block write task to finish");
270 #[cfg(test)]
271 debug!("waiting for the block write task to finish");
272
273 if let Err(thread_panic) = block_write_task_handle.join() {
275 std::panic::resume_unwind(thread_panic);
276 } else {
277 debug!("shutting down the state because the block write task has finished");
278 }
279 }
280 } else {
281 self.db.shutdown(false);
285 }
286 }
287}
288
289impl StateService {
290 const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
291
292 pub async fn new(
300 config: Config,
301 network: &Network,
302 max_checkpoint_height: block::Height,
303 checkpoint_verify_concurrency_limit: usize,
304 ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
305 let (finalized_state, finalized_tip, timer) = {
306 let config = config.clone();
307 let network = network.clone();
308 tokio::task::spawn_blocking(move || {
309 let timer = CodeTimer::start();
310 let finalized_state = FinalizedState::new(
311 &config,
312 &network,
313 #[cfg(feature = "elasticsearch")]
314 true,
315 );
316 timer.finish_desc("opening finalized state database");
317
318 let timer = CodeTimer::start();
319 let finalized_tip = finalized_state.db.tip_block();
320
321 (finalized_state, finalized_tip, timer)
322 })
323 .await
324 .expect("failed to join blocking task")
325 };
326
327 let is_finalized_tip_past_max_checkpoint = if let Some(tip) = &finalized_tip {
339 tip.coinbase_height().expect("valid block must have height") >= max_checkpoint_height
340 } else {
341 false
342 };
343 let backup_dir_path = config.non_finalized_state_backup_dir(network);
344 let skip_backup_task = config.debug_skip_non_finalized_state_backup_task;
345 let (non_finalized_state, non_finalized_state_sender, non_finalized_state_receiver) =
346 NonFinalizedState::new(network)
347 .with_backup(
348 backup_dir_path.clone(),
349 &finalized_state.db,
350 is_finalized_tip_past_max_checkpoint,
351 config.debug_skip_non_finalized_state_backup_task,
352 )
353 .await;
354
355 let non_finalized_block_write_sent_hashes = SentHashes::new(&non_finalized_state);
356 let initial_tip = non_finalized_state
357 .best_tip_block()
358 .map(|cv_block| cv_block.block.clone())
359 .or(finalized_tip)
360 .map(CheckpointVerifiedBlock::from)
361 .map(ChainTipBlock::from);
362
363 tracing::info!(chain_tip = ?initial_tip.as_ref().map(|tip| (tip.hash, tip.height)), "loaded Zebra state cache");
364
365 let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
366 ChainTipSender::new(initial_tip, network);
367
368 let finalized_state_for_writing = finalized_state.clone();
369 let should_use_finalized_block_write_sender = non_finalized_state.is_chain_set_empty();
370 let sync_backup_dir_path = backup_dir_path.filter(|_| skip_backup_task);
371 let (block_write_sender, invalid_block_write_reset_receiver, block_write_task) =
372 write::BlockWriteSender::spawn(
373 finalized_state_for_writing,
374 non_finalized_state,
375 chain_tip_sender,
376 non_finalized_state_sender,
377 should_use_finalized_block_write_sender,
378 sync_backup_dir_path,
379 );
380
381 let read_service = ReadStateService::new(
382 &finalized_state,
383 block_write_task,
384 non_finalized_state_receiver,
385 );
386
387 let full_verifier_utxo_lookahead = max_checkpoint_height
388 - HeightDiff::try_from(checkpoint_verify_concurrency_limit)
389 .expect("fits in HeightDiff");
390 let full_verifier_utxo_lookahead =
391 full_verifier_utxo_lookahead.unwrap_or(block::Height::MIN);
392 let non_finalized_state_queued_blocks = QueuedBlocks::default();
393 let pending_utxos = PendingUtxos::default();
394
395 let finalized_block_write_last_sent_hash =
396 tokio::task::spawn_blocking(move || finalized_state.db.finalized_tip_hash())
397 .await
398 .expect("failed to join blocking task");
399
400 let state = Self {
401 network: network.clone(),
402 full_verifier_utxo_lookahead,
403 non_finalized_state_queued_blocks,
404 finalized_state_queued_blocks: HashMap::new(),
405 block_write_sender,
406 finalized_block_write_last_sent_hash,
407 non_finalized_block_write_sent_hashes,
408 invalid_block_write_reset_receiver,
409 pending_utxos,
410 last_prune: Instant::now(),
411 read_service: read_service.clone(),
412 max_finalized_queue_height: f64::NAN,
413 };
414 timer.finish_desc("initializing state service");
415
416 tracing::info!("starting legacy chain check");
417 let timer = CodeTimer::start();
418
419 if let (Some(tip), Some(nu5_activation_height)) = (
420 {
421 let read_state = state.read_service.clone();
422 tokio::task::spawn_blocking(move || read_state.best_tip())
423 .await
424 .expect("task should not panic")
425 },
426 NetworkUpgrade::Nu5.activation_height(network),
427 ) {
428 if let Err(error) = check::legacy_chain(
429 nu5_activation_height,
430 any_ancestor_blocks(
431 &state.read_service.latest_non_finalized_state(),
432 &state.read_service.db,
433 tip.1,
434 ),
435 &state.network,
436 MAX_LEGACY_CHAIN_BLOCKS,
437 ) {
438 let legacy_db_path = state.read_service.db.path().to_path_buf();
439 panic!(
440 "Cached state contains a legacy chain.\n\
441 An outdated Zebra version did not know about a recent network upgrade,\n\
442 so it followed a legacy chain using outdated consensus branch rules.\n\
443 Hint: Delete your database, and restart Zebra to do a full sync.\n\
444 Database path: {legacy_db_path:?}\n\
445 Error: {error:?}",
446 );
447 }
448 }
449
450 tracing::info!("cached state consensus branch is valid: no legacy chain found");
451 timer.finish_desc("legacy chain check");
452
453 let db_for_metrics = read_service.db.clone();
455 tokio::spawn(async move {
456 let mut interval = tokio::time::interval(Duration::from_secs(30));
457 loop {
458 interval.tick().await;
459 db_for_metrics.export_metrics();
460 }
461 });
462
463 (state, read_service, latest_chain_tip, chain_tip_change)
464 }
465
466 pub fn log_db_metrics(&self) {
468 self.read_service.db.print_db_metrics();
469 }
470
471 fn queue_and_commit_to_finalized_state(
475 &mut self,
476 checkpoint_verified: CheckpointVerifiedBlock,
477 ) -> oneshot::Receiver<Result<block::Hash, CommitCheckpointVerifiedError>> {
478 let queued_prev_hash = checkpoint_verified.block.header.previous_block_hash;
484 let queued_height = checkpoint_verified.height;
485
486 if self.is_close_to_final_checkpoint(queued_height) {
489 self.non_finalized_block_write_sent_hashes
490 .add_finalized(&checkpoint_verified)
491 }
492
493 let (rsp_tx, rsp_rx) = oneshot::channel();
494 let queued = (checkpoint_verified, rsp_tx);
495
496 if self.block_write_sender.finalized.is_some() {
497 if let Some(duplicate_queued) = self
499 .finalized_state_queued_blocks
500 .insert(queued_prev_hash, queued)
501 {
502 Self::send_checkpoint_verified_block_error(
503 duplicate_queued,
504 CommitBlockError::new_duplicate(
505 Some(queued_prev_hash.into()),
506 KnownBlock::Queue,
507 ),
508 );
509 }
510
511 self.drain_finalized_queue_and_commit();
512 } else {
513 Self::send_checkpoint_verified_block_error(
519 queued,
520 CommitBlockError::new_duplicate(None, KnownBlock::Finalized),
521 );
522
523 self.clear_finalized_block_queue(CommitBlockError::new_duplicate(
524 None,
525 KnownBlock::Finalized,
526 ));
527 }
528
529 if self.finalized_state_queued_blocks.is_empty() {
530 self.max_finalized_queue_height = f64::NAN;
531 } else if self.max_finalized_queue_height.is_nan()
532 || self.max_finalized_queue_height < queued_height.0 as f64
533 {
534 self.max_finalized_queue_height = queued_height.0 as f64;
540 }
541
542 metrics::gauge!("state.checkpoint.queued.max.height").set(self.max_finalized_queue_height);
543 metrics::gauge!("state.checkpoint.queued.block.count")
544 .set(self.finalized_state_queued_blocks.len() as f64);
545
546 rsp_rx
547 }
548
549 pub fn drain_finalized_queue_and_commit(&mut self) {
557 use tokio::sync::mpsc::error::{SendError, TryRecvError};
558
559 match self.invalid_block_write_reset_receiver.try_recv() {
566 Ok(reset_tip_hash) => self.finalized_block_write_last_sent_hash = reset_tip_hash,
567 Err(TryRecvError::Disconnected) => {
568 info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
569 return;
570 }
571 Err(TryRecvError::Empty) => {}
573 }
574
575 while let Some(queued_block) = self
576 .finalized_state_queued_blocks
577 .remove(&self.finalized_block_write_last_sent_hash)
578 {
579 let last_sent_finalized_block_height = queued_block.0.height;
580
581 self.finalized_block_write_last_sent_hash = queued_block.0.hash;
582
583 if let Some(finalized_block_write_sender) = &self.block_write_sender.finalized {
586 let send_result = finalized_block_write_sender.send(queued_block);
587
588 if let Err(SendError(queued)) = send_result {
590 Self::send_checkpoint_verified_block_error(
592 queued,
593 CommitBlockError::WriteTaskExited,
594 );
595
596 self.clear_finalized_block_queue(CommitBlockError::WriteTaskExited);
597 } else {
598 metrics::gauge!("state.checkpoint.sent.block.height")
599 .set(last_sent_finalized_block_height.0 as f64);
600 };
601 }
602 }
603 }
604
605 fn clear_finalized_block_queue(
607 &mut self,
608 error: impl Into<CommitCheckpointVerifiedError> + Clone,
609 ) {
610 for (_hash, queued) in self.finalized_state_queued_blocks.drain() {
611 Self::send_checkpoint_verified_block_error(queued, error.clone());
612 }
613 }
614
615 fn send_checkpoint_verified_block_error(
617 queued: QueuedCheckpointVerified,
618 error: impl Into<CommitCheckpointVerifiedError>,
619 ) {
620 let (finalized, rsp_tx) = queued;
621
622 let _ = rsp_tx.send(Err(error.into()));
625 std::mem::drop(finalized);
626 }
627
628 fn clear_non_finalized_block_queue(
630 &mut self,
631 error: impl Into<CommitSemanticallyVerifiedError> + Clone,
632 ) {
633 for (_hash, queued) in self.non_finalized_state_queued_blocks.drain() {
634 Self::send_semantically_verified_block_error(queued, error.clone());
635 }
636 }
637
638 fn send_semantically_verified_block_error(
640 queued: QueuedSemanticallyVerified,
641 error: impl Into<CommitSemanticallyVerifiedError>,
642 ) {
643 let (finalized, rsp_tx) = queued;
644
645 let _ = rsp_tx.send(Err(error.into()));
648 std::mem::drop(finalized);
649 }
650
651 #[instrument(level = "debug", skip(self, semantically_verified))]
659 fn queue_and_commit_to_non_finalized_state(
660 &mut self,
661 semantically_verified: SemanticallyVerifiedBlock,
662 ) -> oneshot::Receiver<Result<block::Hash, CommitSemanticallyVerifiedError>> {
663 tracing::debug!(block = %semantically_verified.block, "queueing block for contextual verification");
664 let parent_hash = semantically_verified.block.header.previous_block_hash;
665
666 if self
667 .non_finalized_block_write_sent_hashes
668 .contains(&semantically_verified.hash)
669 {
670 let (rsp_tx, rsp_rx) = oneshot::channel();
671 let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
672 Some(semantically_verified.hash.into()),
673 KnownBlock::WriteChannel,
674 )
675 .into()));
676 return rsp_rx;
677 }
678
679 if self
680 .read_service
681 .db
682 .contains_height(semantically_verified.height)
683 {
684 let (rsp_tx, rsp_rx) = oneshot::channel();
685 let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
686 Some(semantically_verified.height.into()),
687 KnownBlock::Finalized,
688 )
689 .into()));
690 return rsp_rx;
691 }
692
693 let rsp_rx = if let Some((_, old_rsp_tx)) = self
697 .non_finalized_state_queued_blocks
698 .get_mut(&semantically_verified.hash)
699 {
700 tracing::debug!("replacing older queued request with new request");
701 let (mut rsp_tx, rsp_rx) = oneshot::channel();
702 std::mem::swap(old_rsp_tx, &mut rsp_tx);
703 let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
704 Some(semantically_verified.hash.into()),
705 KnownBlock::Queue,
706 )
707 .into()));
708 rsp_rx
709 } else {
710 let (rsp_tx, rsp_rx) = oneshot::channel();
711 self.non_finalized_state_queued_blocks
712 .queue((semantically_verified, rsp_tx));
713 rsp_rx
714 };
715
716 if self.block_write_sender.finalized.is_some()
725 && self
726 .non_finalized_state_queued_blocks
727 .has_queued_children(self.finalized_block_write_last_sent_hash)
728 && self.read_service.db.finalized_tip_hash()
729 == self.finalized_block_write_last_sent_hash
730 {
731 std::mem::drop(self.block_write_sender.finalized.take());
734 self.non_finalized_block_write_sent_hashes = SentHashes::default();
736 self.non_finalized_block_write_sent_hashes
738 .can_fork_chain_at_hashes = true;
739 self.send_ready_non_finalized_queued(self.finalized_block_write_last_sent_hash);
741 self.clear_finalized_block_queue(CommitBlockError::new_duplicate(
743 None,
744 KnownBlock::Finalized,
745 ));
746 } else if !self.can_fork_chain_at(&parent_hash) {
747 tracing::trace!("unready to verify, returning early");
748 } else if self.block_write_sender.finalized.is_none() {
749 self.send_ready_non_finalized_queued(parent_hash);
751
752 let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
753 "Finalized state must have at least one block before committing non-finalized state",
754 );
755
756 self.non_finalized_state_queued_blocks
757 .prune_by_height(finalized_tip_height);
758
759 self.non_finalized_block_write_sent_hashes
760 .prune_by_height(finalized_tip_height);
761 }
762
763 rsp_rx
764 }
765
766 fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
768 self.non_finalized_block_write_sent_hashes
769 .can_fork_chain_at(hash)
770 || &self.read_service.db.finalized_tip_hash() == hash
771 }
772
773 fn is_close_to_final_checkpoint(&self, queued_height: block::Height) -> bool {
781 queued_height >= self.full_verifier_utxo_lookahead
782 }
783
784 #[tracing::instrument(level = "debug", skip(self, new_parent))]
787 fn send_ready_non_finalized_queued(&mut self, new_parent: block::Hash) {
788 use tokio::sync::mpsc::error::SendError;
789 if let Some(non_finalized_block_write_sender) = &self.block_write_sender.non_finalized {
790 let mut new_parents: Vec<block::Hash> = vec![new_parent];
791
792 while let Some(parent_hash) = new_parents.pop() {
793 let queued_children = self
794 .non_finalized_state_queued_blocks
795 .dequeue_children(parent_hash);
796
797 for queued_child in queued_children {
798 let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child;
799
800 self.non_finalized_block_write_sent_hashes
801 .add(&queued_child.0);
802 let send_result = non_finalized_block_write_sender.send(queued_child.into());
803
804 if let Err(SendError(NonFinalizedWriteMessage::Commit(queued))) = send_result {
805 Self::send_semantically_verified_block_error(
807 queued,
808 CommitBlockError::WriteTaskExited,
809 );
810
811 self.clear_non_finalized_block_queue(CommitBlockError::WriteTaskExited);
812
813 return;
814 };
815
816 new_parents.push(hash);
817 }
818 }
819
820 self.non_finalized_block_write_sent_hashes.finish_batch();
821 };
822 }
823
824 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
826 self.read_service.best_tip()
827 }
828
829 fn send_invalidate_block(
830 &self,
831 hash: block::Hash,
832 ) -> oneshot::Receiver<Result<block::Hash, InvalidateError>> {
833 let (rsp_tx, rsp_rx) = oneshot::channel();
834
835 let Some(sender) = &self.block_write_sender.non_finalized else {
836 let _ = rsp_tx.send(Err(InvalidateError::ProcessingCheckpointedBlocks));
837 return rsp_rx;
838 };
839
840 if let Err(tokio::sync::mpsc::error::SendError(error)) =
841 sender.send(NonFinalizedWriteMessage::Invalidate { hash, rsp_tx })
842 {
843 let NonFinalizedWriteMessage::Invalidate { rsp_tx, .. } = error else {
844 unreachable!("should return the same Invalidate message could not be sent");
845 };
846
847 let _ = rsp_tx.send(Err(InvalidateError::SendInvalidateRequestFailed));
848 }
849
850 rsp_rx
851 }
852
853 fn send_reconsider_block(
854 &self,
855 hash: block::Hash,
856 ) -> oneshot::Receiver<Result<Vec<block::Hash>, ReconsiderError>> {
857 let (rsp_tx, rsp_rx) = oneshot::channel();
858
859 let Some(sender) = &self.block_write_sender.non_finalized else {
860 let _ = rsp_tx.send(Err(ReconsiderError::CheckpointCommitInProgress));
861 return rsp_rx;
862 };
863
864 if let Err(tokio::sync::mpsc::error::SendError(error)) =
865 sender.send(NonFinalizedWriteMessage::Reconsider { hash, rsp_tx })
866 {
867 let NonFinalizedWriteMessage::Reconsider { rsp_tx, .. } = error else {
868 unreachable!("should return the same Reconsider message could not be sent");
869 };
870
871 let _ = rsp_tx.send(Err(ReconsiderError::ReconsiderSendFailed));
872 }
873
874 rsp_rx
875 }
876
877 fn assert_block_can_be_validated(&self, block: &SemanticallyVerifiedBlock) {
879 assert!(
881 block.height > self.network.mandatory_checkpoint_height(),
882 "invalid semantically verified block height: the canopy checkpoint is mandatory, pre-canopy \
883 blocks, and the canopy activation block, must be committed to the state as finalized \
884 blocks"
885 );
886 }
887
888 fn known_sent_hash(&self, hash: &block::Hash) -> Option<KnownBlock> {
889 self.non_finalized_block_write_sent_hashes
890 .contains(hash)
891 .then_some(KnownBlock::WriteChannel)
892 }
893}
894
895impl ReadStateService {
896 pub(crate) fn new(
902 finalized_state: &FinalizedState,
903 block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
904 non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
905 ) -> Self {
906 let read_service = Self {
907 network: finalized_state.network(),
908 db: finalized_state.db.clone(),
909 non_finalized_state_receiver,
910 block_write_task,
911 };
912
913 tracing::debug!("created new read-only state service");
914
915 read_service
916 }
917
918 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
920 read::best_tip(&self.latest_non_finalized_state(), &self.db)
921 }
922
923 fn latest_non_finalized_state(&self) -> NonFinalizedState {
925 self.non_finalized_state_receiver.cloned_watch_data()
926 }
927
928 fn latest_best_chain(&self) -> Option<Arc<Chain>> {
930 self.non_finalized_state_receiver
931 .borrow_mapped(|non_finalized_state| non_finalized_state.best_chain().cloned())
932 }
933
934 #[cfg(any(test, feature = "proptest-impl"))]
937 pub fn db(&self) -> &ZebraDb {
938 &self.db
939 }
940
941 pub fn log_db_metrics(&self) {
943 self.db.print_db_metrics();
944 }
945}
946
947impl Service<Request> for StateService {
948 type Response = Response;
949 type Error = BoxError;
950 type Future =
951 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
952
953 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
954 let poll = self.read_service.poll_ready(cx);
956
957 let now = Instant::now();
959
960 if self.last_prune + Self::PRUNE_INTERVAL < now {
961 let tip = self.best_tip();
962 let old_len = self.pending_utxos.len();
963
964 self.pending_utxos.prune();
965 self.last_prune = now;
966
967 let new_len = self.pending_utxos.len();
968 let prune_count = old_len
969 .checked_sub(new_len)
970 .expect("prune does not add any utxo requests");
971 if prune_count > 0 {
972 tracing::debug!(
973 ?old_len,
974 ?new_len,
975 ?prune_count,
976 ?tip,
977 "pruned utxo requests"
978 );
979 } else {
980 tracing::debug!(len = ?old_len, ?tip, "no utxo requests needed pruning");
981 }
982 }
983
984 poll
985 }
986
987 #[instrument(name = "state", skip(self, req))]
988 fn call(&mut self, req: Request) -> Self::Future {
989 req.count_metric();
990 let span = Span::current();
991
992 match req {
993 Request::CommitSemanticallyVerifiedBlock(semantically_verified) => {
998 let timer = CodeTimer::start();
999 self.assert_block_can_be_validated(&semantically_verified);
1000
1001 self.pending_utxos
1002 .check_against_ordered(&semantically_verified.new_outputs);
1003
1004 let rsp_rx = tokio::task::block_in_place(move || {
1016 span.in_scope(|| {
1017 self.queue_and_commit_to_non_finalized_state(semantically_verified)
1018 })
1019 });
1020
1021 timer.finish_desc("CommitSemanticallyVerifiedBlock");
1027
1028 let span = Span::current();
1032 async move {
1033 rsp_rx
1034 .await
1035 .map_err(|_recv_error| CommitBlockError::WriteTaskExited.into())
1036 .and_then(|result| result)
1037 .map_err(BoxError::from)
1038 .map(Response::Committed)
1039 }
1040 .instrument(span)
1041 .boxed()
1042 }
1043
1044 Request::CommitCheckpointVerifiedBlock(finalized) => {
1049 let timer = CodeTimer::start();
1050 self.pending_utxos
1064 .check_against_ordered(&finalized.new_outputs);
1065
1066 let rsp_rx = self.queue_and_commit_to_finalized_state(finalized);
1071
1072 timer.finish_desc("CommitCheckpointVerifiedBlock");
1078
1079 async move {
1083 rsp_rx
1084 .await
1085 .map_err(|_recv_error| CommitBlockError::WriteTaskExited.into())
1086 .and_then(|result| result)
1087 .map_err(BoxError::from)
1088 .map(Response::Committed)
1089 }
1090 .instrument(span)
1091 .boxed()
1092 }
1093
1094 Request::AwaitUtxo(outpoint) => {
1097 let timer = CodeTimer::start();
1098 let response_fut = self.pending_utxos.queue(outpoint);
1100 let response_fut = response_fut.instrument(span).boxed();
1104
1105 if let Some(utxo) = self.non_finalized_state_queued_blocks.utxo(&outpoint) {
1108 self.pending_utxos.respond(&outpoint, utxo);
1109
1110 timer.finish_desc("AwaitUtxo/queued-non-finalized");
1112
1113 return response_fut;
1114 }
1115
1116 if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) {
1118 self.pending_utxos.respond(&outpoint, utxo);
1119
1120 timer.finish_desc("AwaitUtxo/sent-non-finalized");
1122
1123 return response_fut;
1124 }
1125
1126 let read_service = self.read_service.clone();
1135
1136 async move {
1138 let req = ReadRequest::AnyChainUtxo(outpoint);
1139
1140 let rsp = read_service.oneshot(req).await?;
1141
1142 if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp {
1155 timer.finish_desc("AwaitUtxo/any-chain");
1157
1158 return Ok(Response::Utxo(utxo));
1159 }
1160
1161 timer.finish_desc("AwaitUtxo/waiting");
1163
1164 response_fut.await
1165 }
1166 .boxed()
1167 }
1168
1169 Request::KnownBlock(hash) => {
1172 let timer = CodeTimer::start();
1173 let sent_hash_response = self.known_sent_hash(&hash);
1174 let read_service = self.read_service.clone();
1175
1176 async move {
1177 if sent_hash_response.is_some() {
1178 return Ok(Response::KnownBlock(sent_hash_response));
1179 };
1180
1181 let response = read::non_finalized_state_contains_block_hash(
1182 &read_service.latest_non_finalized_state(),
1183 hash,
1184 )
1185 .or_else(|| read::finalized_state_contains_block_hash(&read_service.db, hash));
1187
1188 timer.finish_desc("Request::KnownBlock");
1189
1190 Ok(Response::KnownBlock(response))
1191 }
1192 .boxed()
1193 }
1194
1195 Request::InvalidateBlock(block_hash) => {
1197 let rsp_rx = tokio::task::block_in_place(move || {
1198 span.in_scope(|| self.send_invalidate_block(block_hash))
1199 });
1200
1201 let span = Span::current();
1205 async move {
1206 rsp_rx
1207 .await
1208 .map_err(|_recv_error| InvalidateError::InvalidateRequestDropped)
1209 .and_then(|result| result)
1210 .map_err(BoxError::from)
1211 .map(Response::Invalidated)
1212 }
1213 .instrument(span)
1214 .boxed()
1215 }
1216
1217 Request::ReconsiderBlock(block_hash) => {
1219 let rsp_rx = tokio::task::block_in_place(move || {
1220 span.in_scope(|| self.send_reconsider_block(block_hash))
1221 });
1222
1223 let span = Span::current();
1227 async move {
1228 rsp_rx
1229 .await
1230 .map_err(|_recv_error| ReconsiderError::ReconsiderResponseDropped)
1231 .and_then(|result| result)
1232 .map_err(BoxError::from)
1233 .map(Response::Reconsidered)
1234 }
1235 .instrument(span)
1236 .boxed()
1237 }
1238
1239 Request::Tip
1241 | Request::Depth(_)
1242 | Request::BestChainNextMedianTimePast
1243 | Request::BestChainBlockHash(_)
1244 | Request::BlockLocator
1245 | Request::Transaction(_)
1246 | Request::AnyChainTransaction(_)
1247 | Request::UnspentBestChainUtxo(_)
1248 | Request::Block(_)
1249 | Request::AnyChainBlock(_)
1250 | Request::BlockAndSize(_)
1251 | Request::BlockHeader(_)
1252 | Request::FindBlockHashes { .. }
1253 | Request::FindBlockHeaders { .. }
1254 | Request::CheckBestChainTipNullifiersAndAnchors(_)
1255 | Request::CheckBlockProposalValidity(_) => {
1256 let read_service = self.read_service.clone();
1258
1259 async move {
1260 let req = req
1261 .try_into()
1262 .expect("ReadRequest conversion should not fail");
1263
1264 let rsp = read_service.oneshot(req).await?;
1265 let rsp = rsp.try_into().expect("Response conversion should not fail");
1266
1267 Ok(rsp)
1268 }
1269 .boxed()
1270 }
1271 }
1272 }
1273}
1274
1275impl Service<ReadRequest> for ReadStateService {
1276 type Response = ReadResponse;
1277 type Error = BoxError;
1278 type Future =
1279 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1280
1281 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1282 let block_write_task = self.block_write_task.take();
1286
1287 if let Some(block_write_task) = block_write_task {
1288 if block_write_task.is_finished() {
1289 if let Some(block_write_task) = Arc::into_inner(block_write_task) {
1290 if let Err(thread_panic) = block_write_task.join() {
1292 std::panic::resume_unwind(thread_panic);
1293 }
1294 }
1295 } else {
1296 self.block_write_task = Some(block_write_task);
1298 }
1299 }
1300
1301 self.db.check_for_panics();
1302
1303 Poll::Ready(Ok(()))
1304 }
1305
1306 #[instrument(name = "read_state", skip(self, req))]
1307 fn call(&mut self, req: ReadRequest) -> Self::Future {
1308 req.count_metric();
1309 let timer = CodeTimer::start_desc(req.variant_name());
1310 let span = Span::current();
1311 let timed_span = TimedSpan::new(timer, span);
1312 let state = self.clone();
1313
1314 if req == ReadRequest::NonFinalizedBlocksListener {
1315 let non_finalized_blocks_listener = NonFinalizedBlocksListener::spawn(
1318 self.network.clone(),
1319 self.non_finalized_state_receiver.clone(),
1320 );
1321
1322 return async move {
1323 Ok(ReadResponse::NonFinalizedBlocksListener(
1324 non_finalized_blocks_listener,
1325 ))
1326 }
1327 .boxed();
1328 };
1329
1330 let request_handler = move || match req {
1331 ReadRequest::UsageInfo => Ok(ReadResponse::UsageInfo(state.db.size())),
1333
1334 ReadRequest::Tip => Ok(ReadResponse::Tip(read::tip(
1336 state.latest_best_chain(),
1337 &state.db,
1338 ))),
1339
1340 ReadRequest::TipPoolValues => {
1342 let (tip_height, tip_hash, value_balance) =
1343 read::tip_with_value_balance(state.latest_best_chain(), &state.db)?
1344 .ok_or(BoxError::from("no chain tip available yet"))?;
1345
1346 Ok(ReadResponse::TipPoolValues {
1347 tip_height,
1348 tip_hash,
1349 value_balance,
1350 })
1351 }
1352
1353 ReadRequest::BlockInfo(hash_or_height) => Ok(ReadResponse::BlockInfo(
1355 read::block_info(state.latest_best_chain(), &state.db, hash_or_height),
1356 )),
1357
1358 ReadRequest::Depth(hash) => Ok(ReadResponse::Depth(read::depth(
1360 state.latest_best_chain(),
1361 &state.db,
1362 hash,
1363 ))),
1364
1365 ReadRequest::BestChainNextMedianTimePast => {
1367 Ok(ReadResponse::BestChainNextMedianTimePast(
1368 read::next_median_time_past(&state.latest_non_finalized_state(), &state.db)?,
1369 ))
1370 }
1371
1372 ReadRequest::Block(hash_or_height) => Ok(ReadResponse::Block(read::block(
1374 state.latest_best_chain(),
1375 &state.db,
1376 hash_or_height,
1377 ))),
1378
1379 ReadRequest::AnyChainBlock(hash_or_height) => Ok(ReadResponse::Block(read::any_block(
1380 state.latest_non_finalized_state().chain_iter(),
1381 &state.db,
1382 hash_or_height,
1383 ))),
1384
1385 ReadRequest::BlockAndSize(hash_or_height) => Ok(ReadResponse::BlockAndSize(
1387 read::block_and_size(state.latest_best_chain(), &state.db, hash_or_height),
1388 )),
1389
1390 ReadRequest::BlockHeader(hash_or_height) => {
1392 let best_chain = state.latest_best_chain();
1393
1394 let height = hash_or_height
1395 .height_or_else(|hash| {
1396 read::find::height_by_hash(best_chain.clone(), &state.db, hash)
1397 })
1398 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1399
1400 let hash = hash_or_height
1401 .hash_or_else(|height| {
1402 read::find::hash_by_height(best_chain.clone(), &state.db, height)
1403 })
1404 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1405
1406 let next_height = height.next()?;
1407 let next_block_hash =
1408 read::find::hash_by_height(best_chain.clone(), &state.db, next_height);
1409
1410 let header = read::block_header(best_chain, &state.db, height.into())
1411 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1412
1413 Ok(ReadResponse::BlockHeader {
1414 header,
1415 hash,
1416 height,
1417 next_block_hash,
1418 })
1419 }
1420
1421 ReadRequest::Transaction(hash) => Ok(ReadResponse::Transaction(
1423 read::mined_transaction(state.latest_best_chain(), &state.db, hash),
1424 )),
1425
1426 ReadRequest::AnyChainTransaction(hash) => {
1427 Ok(ReadResponse::AnyChainTransaction(read::any_transaction(
1428 state.latest_non_finalized_state().chain_iter(),
1429 &state.db,
1430 hash,
1431 )))
1432 }
1433
1434 ReadRequest::TransactionIdsForBlock(hash_or_height) => Ok(
1436 ReadResponse::TransactionIdsForBlock(read::transaction_hashes_for_block(
1437 state.latest_best_chain(),
1438 &state.db,
1439 hash_or_height,
1440 )),
1441 ),
1442
1443 ReadRequest::AnyChainTransactionIdsForBlock(hash_or_height) => {
1444 Ok(ReadResponse::AnyChainTransactionIdsForBlock(
1445 read::transaction_hashes_for_any_block(
1446 state.latest_non_finalized_state().chain_iter(),
1447 &state.db,
1448 hash_or_height,
1449 ),
1450 ))
1451 }
1452
1453 #[cfg(feature = "indexer")]
1454 ReadRequest::SpendingTransactionId(spend) => Ok(ReadResponse::TransactionId(
1455 read::spending_transaction_hash(state.latest_best_chain(), &state.db, spend),
1456 )),
1457
1458 ReadRequest::UnspentBestChainUtxo(outpoint) => Ok(ReadResponse::UnspentBestChainUtxo(
1459 read::unspent_utxo(state.latest_best_chain(), &state.db, outpoint),
1460 )),
1461
1462 ReadRequest::AnyChainUtxo(outpoint) => Ok(ReadResponse::AnyChainUtxo(read::any_utxo(
1464 state.latest_non_finalized_state(),
1465 &state.db,
1466 outpoint,
1467 ))),
1468
1469 ReadRequest::BlockLocator => Ok(ReadResponse::BlockLocator(
1471 read::block_locator(state.latest_best_chain(), &state.db).unwrap_or_default(),
1472 )),
1473
1474 ReadRequest::FindBlockHashes { known_blocks, stop } => {
1476 Ok(ReadResponse::BlockHashes(read::find_chain_hashes(
1477 state.latest_best_chain(),
1478 &state.db,
1479 known_blocks,
1480 stop,
1481 MAX_FIND_BLOCK_HASHES_RESULTS,
1482 )))
1483 }
1484
1485 ReadRequest::FindBlockHeaders { known_blocks, stop } => Ok(ReadResponse::BlockHeaders(
1487 read::find_chain_headers(
1488 state.latest_best_chain(),
1489 &state.db,
1490 known_blocks,
1491 stop,
1492 MAX_FIND_BLOCK_HEADERS_RESULTS,
1493 )
1494 .into_iter()
1495 .map(|header| CountedHeader { header })
1496 .collect(),
1497 )),
1498
1499 ReadRequest::SaplingTree(hash_or_height) => Ok(ReadResponse::SaplingTree(
1500 read::sapling_tree(state.latest_best_chain(), &state.db, hash_or_height),
1501 )),
1502
1503 ReadRequest::OrchardTree(hash_or_height) => Ok(ReadResponse::OrchardTree(
1504 read::orchard_tree(state.latest_best_chain(), &state.db, hash_or_height),
1505 )),
1506
1507 ReadRequest::SaplingSubtrees { start_index, limit } => {
1508 let end_index = limit
1509 .and_then(|limit| start_index.0.checked_add(limit.0))
1510 .map(NoteCommitmentSubtreeIndex);
1511
1512 let best_chain = state.latest_best_chain();
1513 let sapling_subtrees = if let Some(end_index) = end_index {
1514 read::sapling_subtrees(best_chain, &state.db, start_index..end_index)
1515 } else {
1516 read::sapling_subtrees(best_chain, &state.db, start_index..)
1521 };
1522
1523 Ok(ReadResponse::SaplingSubtrees(sapling_subtrees))
1524 }
1525
1526 ReadRequest::OrchardSubtrees { start_index, limit } => {
1527 let end_index = limit
1528 .and_then(|limit| start_index.0.checked_add(limit.0))
1529 .map(NoteCommitmentSubtreeIndex);
1530
1531 let best_chain = state.latest_best_chain();
1532 let orchard_subtrees = if let Some(end_index) = end_index {
1533 read::orchard_subtrees(best_chain, &state.db, start_index..end_index)
1534 } else {
1535 read::orchard_subtrees(best_chain, &state.db, start_index..)
1540 };
1541
1542 Ok(ReadResponse::OrchardSubtrees(orchard_subtrees))
1543 }
1544
1545 ReadRequest::AddressBalance(addresses) => {
1547 let (balance, received) =
1548 read::transparent_balance(state.latest_best_chain(), &state.db, addresses)?;
1549 Ok(ReadResponse::AddressBalance { balance, received })
1550 }
1551
1552 ReadRequest::TransactionIdsByAddresses {
1554 addresses,
1555 height_range,
1556 } => read::transparent_tx_ids(
1557 state.latest_best_chain(),
1558 &state.db,
1559 addresses,
1560 height_range,
1561 )
1562 .map(ReadResponse::AddressesTransactionIds),
1563
1564 ReadRequest::UtxosByAddresses(addresses) => read::address_utxos(
1566 &state.network,
1567 state.latest_best_chain(),
1568 &state.db,
1569 addresses,
1570 )
1571 .map(ReadResponse::AddressUtxos),
1572
1573 ReadRequest::CheckBestChainTipNullifiersAndAnchors(unmined_tx) => {
1574 let latest_non_finalized_best_chain = state.latest_best_chain();
1575
1576 check::nullifier::tx_no_duplicates_in_chain(
1577 &state.db,
1578 latest_non_finalized_best_chain.as_ref(),
1579 &unmined_tx.transaction,
1580 )?;
1581
1582 check::anchors::tx_anchors_refer_to_final_treestates(
1583 &state.db,
1584 latest_non_finalized_best_chain.as_ref(),
1585 &unmined_tx,
1586 )?;
1587
1588 Ok(ReadResponse::ValidBestChainTipNullifiersAndAnchors)
1589 }
1590
1591 ReadRequest::BestChainBlockHash(height) => Ok(ReadResponse::BlockHash(
1593 read::hash_by_height(state.latest_best_chain(), &state.db, height),
1594 )),
1595
1596 ReadRequest::ChainInfo => {
1598 read::difficulty::get_block_template_chain_info(
1610 &state.latest_non_finalized_state(),
1611 &state.db,
1612 &state.network,
1613 )
1614 .map(ReadResponse::ChainInfo)
1615 }
1616
1617 ReadRequest::SolutionRate { num_blocks, height } => {
1619 let latest_non_finalized_state = state.latest_non_finalized_state();
1620 let (tip_height, tip_hash) =
1628 match read::tip(latest_non_finalized_state.best_chain(), &state.db) {
1629 Some(tip_hash) => tip_hash,
1630 None => return Ok(ReadResponse::SolutionRate(None)),
1631 };
1632
1633 let start_hash = match height {
1634 Some(height) if height < tip_height => read::hash_by_height(
1635 latest_non_finalized_state.best_chain(),
1636 &state.db,
1637 height,
1638 ),
1639 _ => Some(tip_hash),
1641 };
1642
1643 let solution_rate = start_hash.and_then(|start_hash| {
1644 read::difficulty::solution_rate(
1645 &latest_non_finalized_state,
1646 &state.db,
1647 num_blocks,
1648 start_hash,
1649 )
1650 });
1651
1652 Ok(ReadResponse::SolutionRate(solution_rate))
1653 }
1654
1655 ReadRequest::CheckBlockProposalValidity(semantically_verified) => {
1656 tracing::debug!(
1657 "attempting to validate and commit block proposal \
1658 onto a cloned non-finalized state"
1659 );
1660 let mut latest_non_finalized_state = state.latest_non_finalized_state();
1661
1662 let Some((_best_tip_height, best_tip_hash)) =
1664 read::best_tip(&latest_non_finalized_state, &state.db)
1665 else {
1666 return Err(
1667 "state is empty: wait for Zebra to sync before submitting a proposal"
1668 .into(),
1669 );
1670 };
1671
1672 if semantically_verified.block.header.previous_block_hash != best_tip_hash {
1673 return Err("proposal is not based on the current best chain tip: \
1674 previous block hash must be the best chain tip"
1675 .into());
1676 }
1677
1678 latest_non_finalized_state.disable_metrics();
1684
1685 write::validate_and_commit_non_finalized(
1686 &state.db,
1687 &mut latest_non_finalized_state,
1688 semantically_verified,
1689 )?;
1690
1691 Ok(ReadResponse::ValidBlockProposal)
1692 }
1693
1694 ReadRequest::TipBlockSize => {
1695 Ok(ReadResponse::TipBlockSize(
1697 state
1698 .best_tip()
1699 .and_then(|(tip_height, _)| {
1700 read::block_info(
1701 state.latest_best_chain(),
1702 &state.db,
1703 tip_height.into(),
1704 )
1705 })
1706 .map(|info| info.size().try_into().expect("u32 should fit in usize"))
1707 .or_else(|| {
1708 find::tip_block(state.latest_best_chain(), &state.db)
1709 .map(|b| b.zcash_serialized_size())
1710 }),
1711 ))
1712 }
1713
1714 ReadRequest::NonFinalizedBlocksListener => {
1715 unreachable!("should return early");
1716 }
1717
1718 ReadRequest::IsTransparentOutputSpent(outpoint) => {
1720 let is_spent = read::unspent_utxo(state.latest_best_chain(), &state.db, outpoint);
1721 Ok(ReadResponse::IsTransparentOutputSpent(is_spent.is_none()))
1722 }
1723 };
1724
1725 timed_span.spawn_blocking(request_handler)
1726 }
1727}
1728
1729pub async fn init(
1745 config: Config,
1746 network: &Network,
1747 max_checkpoint_height: block::Height,
1748 checkpoint_verify_concurrency_limit: usize,
1749) -> (
1750 BoxService<Request, Response, BoxError>,
1751 ReadStateService,
1752 LatestChainTip,
1753 ChainTipChange,
1754) {
1755 let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
1756 StateService::new(
1757 config,
1758 network,
1759 max_checkpoint_height,
1760 checkpoint_verify_concurrency_limit,
1761 )
1762 .await;
1763
1764 (
1765 BoxService::new(state_service),
1766 read_only_state_service,
1767 latest_chain_tip,
1768 chain_tip_change,
1769 )
1770}
1771
1772pub fn init_read_only(
1779 config: Config,
1780 network: &Network,
1781) -> (
1782 ReadStateService,
1783 ZebraDb,
1784 tokio::sync::watch::Sender<NonFinalizedState>,
1785) {
1786 let finalized_state = FinalizedState::new_with_debug(
1787 &config,
1788 network,
1789 true,
1790 #[cfg(feature = "elasticsearch")]
1791 false,
1792 true,
1793 );
1794 let (non_finalized_state_sender, non_finalized_state_receiver) =
1795 tokio::sync::watch::channel(NonFinalizedState::new(network));
1796
1797 (
1798 ReadStateService::new(
1799 &finalized_state,
1800 None,
1801 WatchReceiver::new(non_finalized_state_receiver),
1802 ),
1803 finalized_state.db.clone(),
1804 non_finalized_state_sender,
1805 )
1806}
1807
1808pub fn spawn_init_read_only(
1811 config: Config,
1812 network: &Network,
1813) -> tokio::task::JoinHandle<(
1814 ReadStateService,
1815 ZebraDb,
1816 tokio::sync::watch::Sender<NonFinalizedState>,
1817)> {
1818 let network = network.clone();
1819 tokio::task::spawn_blocking(move || init_read_only(config, &network))
1820}
1821
1822#[cfg(any(test, feature = "proptest-impl"))]
1826pub async fn init_test(
1827 network: &Network,
1828) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
1829 let (state_service, _, _, _) =
1832 StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
1833
1834 Buffer::new(BoxService::new(state_service), 1)
1835}
1836
1837#[cfg(any(test, feature = "proptest-impl"))]
1842pub async fn init_test_services(
1843 network: &Network,
1844) -> (
1845 Buffer<BoxService<Request, Response, BoxError>, Request>,
1846 ReadStateService,
1847 LatestChainTip,
1848 ChainTipChange,
1849) {
1850 let (state_service, read_state_service, latest_chain_tip, chain_tip_change) =
1853 StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
1854
1855 let state_service = Buffer::new(BoxService::new(state_service), 1);
1856
1857 (
1858 state_service,
1859 read_state_service,
1860 latest_chain_tip,
1861 chain_tip_change,
1862 )
1863}