1use std::{
18 collections::HashMap,
19 future::Future,
20 pin::Pin,
21 sync::Arc,
22 task::{Context, Poll},
23 time::{Duration, Instant},
24};
25
26use futures::future::FutureExt;
27use tokio::sync::oneshot;
28use tower::{util::BoxService, Service, ServiceExt};
29use tracing::{instrument, Instrument, Span};
30
31#[cfg(any(test, feature = "proptest-impl"))]
32use tower::buffer::Buffer;
33
34use zebra_chain::{
35 block::{self, CountedHeader, HeightDiff},
36 diagnostic::CodeTimer,
37 parameters::{Network, NetworkUpgrade},
38 serialization::ZcashSerialize,
39 subtree::NoteCommitmentSubtreeIndex,
40};
41
42use crate::{
43 constants::{
44 MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS, MAX_LEGACY_CHAIN_BLOCKS,
45 },
46 error::{CommitBlockError, CommitCheckpointVerifiedError, InvalidateError, ReconsiderError},
47 request::TimedSpan,
48 response::NonFinalizedBlocksListener,
49 service::{
50 block_iter::any_ancestor_blocks,
51 chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
52 finalized_state::{FinalizedState, ZebraDb},
53 non_finalized_state::{Chain, NonFinalizedState},
54 pending_utxos::PendingUtxos,
55 queued_blocks::QueuedBlocks,
56 read::find,
57 watch_receiver::WatchReceiver,
58 },
59 BoxError, CheckpointVerifiedBlock, CommitSemanticallyVerifiedError, Config, KnownBlock,
60 ReadRequest, ReadResponse, Request, Response, SemanticallyVerifiedBlock,
61};
62
63pub mod block_iter;
64pub mod chain_tip;
65pub mod watch_receiver;
66
67pub mod check;
68
69pub(crate) mod finalized_state;
70pub(crate) mod non_finalized_state;
71mod pending_utxos;
72mod queued_blocks;
73pub(crate) mod read;
74mod traits;
75mod write;
76
77#[cfg(any(test, feature = "proptest-impl"))]
78pub mod arbitrary;
79
80#[cfg(test)]
81mod tests;
82
83pub use finalized_state::{OutputLocation, TransactionIndex, TransactionLocation};
84use write::NonFinalizedWriteMessage;
85
86use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes};
87
88pub use self::traits::{ReadState, State};
89
90#[derive(Debug)]
109pub(crate) struct StateService {
110 network: Network,
114
115 full_verifier_utxo_lookahead: block::Height,
121
122 non_finalized_state_queued_blocks: QueuedBlocks,
127
128 finalized_state_queued_blocks: HashMap<block::Hash, QueuedCheckpointVerified>,
133
134 block_write_sender: write::BlockWriteSender,
136
137 finalized_block_write_last_sent_hash: block::Hash,
147
148 non_finalized_block_write_sent_hashes: SentHashes,
151
152 invalid_block_write_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
158
159 pending_utxos: PendingUtxos,
163
164 last_prune: Instant,
166
167 read_service: ReadStateService,
173
174 max_finalized_queue_height: f64,
181}
182
183#[derive(Clone, Debug)]
195pub struct ReadStateService {
196 network: Network,
200
201 non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
208
209 db: ZebraDb,
217
218 block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
223}
224
225impl Drop for StateService {
226 fn drop(&mut self) {
227 self.invalid_block_write_reset_receiver.close();
234
235 std::mem::drop(self.block_write_sender.finalized.take());
236 std::mem::drop(self.block_write_sender.non_finalized.take());
237
238 self.clear_finalized_block_queue(CommitBlockError::WriteTaskExited);
239 self.clear_non_finalized_block_queue(CommitBlockError::WriteTaskExited);
240
241 info!("dropping the state: logging database metrics");
243 self.log_db_metrics();
244
245 }
248}
249
250impl Drop for ReadStateService {
251 fn drop(&mut self) {
252 if let Some(block_write_task) = self.block_write_task.take() {
257 if let Some(block_write_task_handle) = Arc::into_inner(block_write_task) {
258 self.db.shutdown(true);
262
263 #[cfg(not(test))]
269 info!("waiting for the block write task to finish");
270 #[cfg(test)]
271 debug!("waiting for the block write task to finish");
272
273 if let Err(thread_panic) = block_write_task_handle.join() {
275 std::panic::resume_unwind(thread_panic);
276 } else {
277 debug!("shutting down the state because the block write task has finished");
278 }
279 }
280 } else {
281 self.db.shutdown(false);
285 }
286 }
287}
288
289impl StateService {
290 const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
291
292 pub async fn new(
300 config: Config,
301 network: &Network,
302 max_checkpoint_height: block::Height,
303 checkpoint_verify_concurrency_limit: usize,
304 ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
305 let (finalized_state, finalized_tip, timer) = {
306 let config = config.clone();
307 let network = network.clone();
308 tokio::task::spawn_blocking(move || {
309 let timer = CodeTimer::start();
310 let finalized_state = FinalizedState::new(
311 &config,
312 &network,
313 #[cfg(feature = "elasticsearch")]
314 true,
315 );
316 timer.finish_desc("opening finalized state database");
317
318 let timer = CodeTimer::start();
319 let finalized_tip = finalized_state.db.tip_block();
320
321 (finalized_state, finalized_tip, timer)
322 })
323 .await
324 .expect("failed to join blocking task")
325 };
326
327 let is_finalized_tip_past_max_checkpoint = if let Some(tip) = &finalized_tip {
339 tip.coinbase_height().expect("valid block must have height") >= max_checkpoint_height
340 } else {
341 false
342 };
343 let (non_finalized_state, non_finalized_state_sender, non_finalized_state_receiver) =
344 NonFinalizedState::new(network)
345 .with_backup(
346 config.non_finalized_state_backup_dir(network),
347 &finalized_state.db,
348 is_finalized_tip_past_max_checkpoint,
349 )
350 .await;
351
352 let non_finalized_block_write_sent_hashes = SentHashes::new(&non_finalized_state);
353 let initial_tip = non_finalized_state
354 .best_tip_block()
355 .map(|cv_block| cv_block.block.clone())
356 .or(finalized_tip)
357 .map(CheckpointVerifiedBlock::from)
358 .map(ChainTipBlock::from);
359
360 tracing::info!(chain_tip = ?initial_tip.as_ref().map(|tip| (tip.hash, tip.height)), "loaded Zebra state cache");
361
362 let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
363 ChainTipSender::new(initial_tip, network);
364
365 let finalized_state_for_writing = finalized_state.clone();
366 let should_use_finalized_block_write_sender = non_finalized_state.is_chain_set_empty();
367 let (block_write_sender, invalid_block_write_reset_receiver, block_write_task) =
368 write::BlockWriteSender::spawn(
369 finalized_state_for_writing,
370 non_finalized_state,
371 chain_tip_sender,
372 non_finalized_state_sender,
373 should_use_finalized_block_write_sender,
374 );
375
376 let read_service = ReadStateService::new(
377 &finalized_state,
378 block_write_task,
379 non_finalized_state_receiver,
380 );
381
382 let full_verifier_utxo_lookahead = max_checkpoint_height
383 - HeightDiff::try_from(checkpoint_verify_concurrency_limit)
384 .expect("fits in HeightDiff");
385 let full_verifier_utxo_lookahead =
386 full_verifier_utxo_lookahead.unwrap_or(block::Height::MIN);
387 let non_finalized_state_queued_blocks = QueuedBlocks::default();
388 let pending_utxos = PendingUtxos::default();
389
390 let finalized_block_write_last_sent_hash =
391 tokio::task::spawn_blocking(move || finalized_state.db.finalized_tip_hash())
392 .await
393 .expect("failed to join blocking task");
394
395 let state = Self {
396 network: network.clone(),
397 full_verifier_utxo_lookahead,
398 non_finalized_state_queued_blocks,
399 finalized_state_queued_blocks: HashMap::new(),
400 block_write_sender,
401 finalized_block_write_last_sent_hash,
402 non_finalized_block_write_sent_hashes,
403 invalid_block_write_reset_receiver,
404 pending_utxos,
405 last_prune: Instant::now(),
406 read_service: read_service.clone(),
407 max_finalized_queue_height: f64::NAN,
408 };
409 timer.finish_desc("initializing state service");
410
411 tracing::info!("starting legacy chain check");
412 let timer = CodeTimer::start();
413
414 if let (Some(tip), Some(nu5_activation_height)) = (
415 {
416 let read_state = state.read_service.clone();
417 tokio::task::spawn_blocking(move || read_state.best_tip())
418 .await
419 .expect("task should not panic")
420 },
421 NetworkUpgrade::Nu5.activation_height(network),
422 ) {
423 if let Err(error) = check::legacy_chain(
424 nu5_activation_height,
425 any_ancestor_blocks(
426 &state.read_service.latest_non_finalized_state(),
427 &state.read_service.db,
428 tip.1,
429 ),
430 &state.network,
431 MAX_LEGACY_CHAIN_BLOCKS,
432 ) {
433 let legacy_db_path = state.read_service.db.path().to_path_buf();
434 panic!(
435 "Cached state contains a legacy chain.\n\
436 An outdated Zebra version did not know about a recent network upgrade,\n\
437 so it followed a legacy chain using outdated consensus branch rules.\n\
438 Hint: Delete your database, and restart Zebra to do a full sync.\n\
439 Database path: {legacy_db_path:?}\n\
440 Error: {error:?}",
441 );
442 }
443 }
444
445 tracing::info!("cached state consensus branch is valid: no legacy chain found");
446 timer.finish_desc("legacy chain check");
447
448 let db_for_metrics = read_service.db.clone();
450 tokio::spawn(async move {
451 let mut interval = tokio::time::interval(Duration::from_secs(30));
452 loop {
453 interval.tick().await;
454 db_for_metrics.export_metrics();
455 }
456 });
457
458 (state, read_service, latest_chain_tip, chain_tip_change)
459 }
460
461 pub fn log_db_metrics(&self) {
463 self.read_service.db.print_db_metrics();
464 }
465
466 fn queue_and_commit_to_finalized_state(
470 &mut self,
471 checkpoint_verified: CheckpointVerifiedBlock,
472 ) -> oneshot::Receiver<Result<block::Hash, CommitCheckpointVerifiedError>> {
473 let queued_prev_hash = checkpoint_verified.block.header.previous_block_hash;
479 let queued_height = checkpoint_verified.height;
480
481 if self.is_close_to_final_checkpoint(queued_height) {
484 self.non_finalized_block_write_sent_hashes
485 .add_finalized(&checkpoint_verified)
486 }
487
488 let (rsp_tx, rsp_rx) = oneshot::channel();
489 let queued = (checkpoint_verified, rsp_tx);
490
491 if self.block_write_sender.finalized.is_some() {
492 if let Some(duplicate_queued) = self
494 .finalized_state_queued_blocks
495 .insert(queued_prev_hash, queued)
496 {
497 Self::send_checkpoint_verified_block_error(
498 duplicate_queued,
499 CommitBlockError::new_duplicate(
500 Some(queued_prev_hash.into()),
501 KnownBlock::Queue,
502 ),
503 );
504 }
505
506 self.drain_finalized_queue_and_commit();
507 } else {
508 Self::send_checkpoint_verified_block_error(
514 queued,
515 CommitBlockError::new_duplicate(None, KnownBlock::Finalized),
516 );
517
518 self.clear_finalized_block_queue(CommitBlockError::new_duplicate(
519 None,
520 KnownBlock::Finalized,
521 ));
522 }
523
524 if self.finalized_state_queued_blocks.is_empty() {
525 self.max_finalized_queue_height = f64::NAN;
526 } else if self.max_finalized_queue_height.is_nan()
527 || self.max_finalized_queue_height < queued_height.0 as f64
528 {
529 self.max_finalized_queue_height = queued_height.0 as f64;
535 }
536
537 metrics::gauge!("state.checkpoint.queued.max.height").set(self.max_finalized_queue_height);
538 metrics::gauge!("state.checkpoint.queued.block.count")
539 .set(self.finalized_state_queued_blocks.len() as f64);
540
541 rsp_rx
542 }
543
544 pub fn drain_finalized_queue_and_commit(&mut self) {
552 use tokio::sync::mpsc::error::{SendError, TryRecvError};
553
554 match self.invalid_block_write_reset_receiver.try_recv() {
561 Ok(reset_tip_hash) => self.finalized_block_write_last_sent_hash = reset_tip_hash,
562 Err(TryRecvError::Disconnected) => {
563 info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
564 return;
565 }
566 Err(TryRecvError::Empty) => {}
568 }
569
570 while let Some(queued_block) = self
571 .finalized_state_queued_blocks
572 .remove(&self.finalized_block_write_last_sent_hash)
573 {
574 let last_sent_finalized_block_height = queued_block.0.height;
575
576 self.finalized_block_write_last_sent_hash = queued_block.0.hash;
577
578 if let Some(finalized_block_write_sender) = &self.block_write_sender.finalized {
581 let send_result = finalized_block_write_sender.send(queued_block);
582
583 if let Err(SendError(queued)) = send_result {
585 Self::send_checkpoint_verified_block_error(
587 queued,
588 CommitBlockError::WriteTaskExited,
589 );
590
591 self.clear_finalized_block_queue(CommitBlockError::WriteTaskExited);
592 } else {
593 metrics::gauge!("state.checkpoint.sent.block.height")
594 .set(last_sent_finalized_block_height.0 as f64);
595 };
596 }
597 }
598 }
599
600 fn clear_finalized_block_queue(
602 &mut self,
603 error: impl Into<CommitCheckpointVerifiedError> + Clone,
604 ) {
605 for (_hash, queued) in self.finalized_state_queued_blocks.drain() {
606 Self::send_checkpoint_verified_block_error(queued, error.clone());
607 }
608 }
609
610 fn send_checkpoint_verified_block_error(
612 queued: QueuedCheckpointVerified,
613 error: impl Into<CommitCheckpointVerifiedError>,
614 ) {
615 let (finalized, rsp_tx) = queued;
616
617 let _ = rsp_tx.send(Err(error.into()));
620 std::mem::drop(finalized);
621 }
622
623 fn clear_non_finalized_block_queue(
625 &mut self,
626 error: impl Into<CommitSemanticallyVerifiedError> + Clone,
627 ) {
628 for (_hash, queued) in self.non_finalized_state_queued_blocks.drain() {
629 Self::send_semantically_verified_block_error(queued, error.clone());
630 }
631 }
632
633 fn send_semantically_verified_block_error(
635 queued: QueuedSemanticallyVerified,
636 error: impl Into<CommitSemanticallyVerifiedError>,
637 ) {
638 let (finalized, rsp_tx) = queued;
639
640 let _ = rsp_tx.send(Err(error.into()));
643 std::mem::drop(finalized);
644 }
645
646 #[instrument(level = "debug", skip(self, semantically_verified))]
654 fn queue_and_commit_to_non_finalized_state(
655 &mut self,
656 semantically_verified: SemanticallyVerifiedBlock,
657 ) -> oneshot::Receiver<Result<block::Hash, CommitSemanticallyVerifiedError>> {
658 tracing::debug!(block = %semantically_verified.block, "queueing block for contextual verification");
659 let parent_hash = semantically_verified.block.header.previous_block_hash;
660
661 if self
662 .non_finalized_block_write_sent_hashes
663 .contains(&semantically_verified.hash)
664 {
665 let (rsp_tx, rsp_rx) = oneshot::channel();
666 let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
667 Some(semantically_verified.hash.into()),
668 KnownBlock::WriteChannel,
669 )
670 .into()));
671 return rsp_rx;
672 }
673
674 if self
675 .read_service
676 .db
677 .contains_height(semantically_verified.height)
678 {
679 let (rsp_tx, rsp_rx) = oneshot::channel();
680 let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
681 Some(semantically_verified.height.into()),
682 KnownBlock::Finalized,
683 )
684 .into()));
685 return rsp_rx;
686 }
687
688 let rsp_rx = if let Some((_, old_rsp_tx)) = self
692 .non_finalized_state_queued_blocks
693 .get_mut(&semantically_verified.hash)
694 {
695 tracing::debug!("replacing older queued request with new request");
696 let (mut rsp_tx, rsp_rx) = oneshot::channel();
697 std::mem::swap(old_rsp_tx, &mut rsp_tx);
698 let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
699 Some(semantically_verified.hash.into()),
700 KnownBlock::Queue,
701 )
702 .into()));
703 rsp_rx
704 } else {
705 let (rsp_tx, rsp_rx) = oneshot::channel();
706 self.non_finalized_state_queued_blocks
707 .queue((semantically_verified, rsp_tx));
708 rsp_rx
709 };
710
711 if self.block_write_sender.finalized.is_some()
720 && self
721 .non_finalized_state_queued_blocks
722 .has_queued_children(self.finalized_block_write_last_sent_hash)
723 && self.read_service.db.finalized_tip_hash()
724 == self.finalized_block_write_last_sent_hash
725 {
726 std::mem::drop(self.block_write_sender.finalized.take());
729 self.non_finalized_block_write_sent_hashes = SentHashes::default();
731 self.non_finalized_block_write_sent_hashes
733 .can_fork_chain_at_hashes = true;
734 self.send_ready_non_finalized_queued(self.finalized_block_write_last_sent_hash);
736 self.clear_finalized_block_queue(CommitBlockError::new_duplicate(
738 None,
739 KnownBlock::Finalized,
740 ));
741 } else if !self.can_fork_chain_at(&parent_hash) {
742 tracing::trace!("unready to verify, returning early");
743 } else if self.block_write_sender.finalized.is_none() {
744 self.send_ready_non_finalized_queued(parent_hash);
746
747 let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
748 "Finalized state must have at least one block before committing non-finalized state",
749 );
750
751 self.non_finalized_state_queued_blocks
752 .prune_by_height(finalized_tip_height);
753
754 self.non_finalized_block_write_sent_hashes
755 .prune_by_height(finalized_tip_height);
756 }
757
758 rsp_rx
759 }
760
761 fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
763 self.non_finalized_block_write_sent_hashes
764 .can_fork_chain_at(hash)
765 || &self.read_service.db.finalized_tip_hash() == hash
766 }
767
768 fn is_close_to_final_checkpoint(&self, queued_height: block::Height) -> bool {
776 queued_height >= self.full_verifier_utxo_lookahead
777 }
778
779 #[tracing::instrument(level = "debug", skip(self, new_parent))]
782 fn send_ready_non_finalized_queued(&mut self, new_parent: block::Hash) {
783 use tokio::sync::mpsc::error::SendError;
784 if let Some(non_finalized_block_write_sender) = &self.block_write_sender.non_finalized {
785 let mut new_parents: Vec<block::Hash> = vec![new_parent];
786
787 while let Some(parent_hash) = new_parents.pop() {
788 let queued_children = self
789 .non_finalized_state_queued_blocks
790 .dequeue_children(parent_hash);
791
792 for queued_child in queued_children {
793 let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child;
794
795 self.non_finalized_block_write_sent_hashes
796 .add(&queued_child.0);
797 let send_result = non_finalized_block_write_sender.send(queued_child.into());
798
799 if let Err(SendError(NonFinalizedWriteMessage::Commit(queued))) = send_result {
800 Self::send_semantically_verified_block_error(
802 queued,
803 CommitBlockError::WriteTaskExited,
804 );
805
806 self.clear_non_finalized_block_queue(CommitBlockError::WriteTaskExited);
807
808 return;
809 };
810
811 new_parents.push(hash);
812 }
813 }
814
815 self.non_finalized_block_write_sent_hashes.finish_batch();
816 };
817 }
818
819 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
821 self.read_service.best_tip()
822 }
823
824 fn send_invalidate_block(
825 &self,
826 hash: block::Hash,
827 ) -> oneshot::Receiver<Result<block::Hash, InvalidateError>> {
828 let (rsp_tx, rsp_rx) = oneshot::channel();
829
830 let Some(sender) = &self.block_write_sender.non_finalized else {
831 let _ = rsp_tx.send(Err(InvalidateError::ProcessingCheckpointedBlocks));
832 return rsp_rx;
833 };
834
835 if let Err(tokio::sync::mpsc::error::SendError(error)) =
836 sender.send(NonFinalizedWriteMessage::Invalidate { hash, rsp_tx })
837 {
838 let NonFinalizedWriteMessage::Invalidate { rsp_tx, .. } = error else {
839 unreachable!("should return the same Invalidate message could not be sent");
840 };
841
842 let _ = rsp_tx.send(Err(InvalidateError::SendInvalidateRequestFailed));
843 }
844
845 rsp_rx
846 }
847
848 fn send_reconsider_block(
849 &self,
850 hash: block::Hash,
851 ) -> oneshot::Receiver<Result<Vec<block::Hash>, ReconsiderError>> {
852 let (rsp_tx, rsp_rx) = oneshot::channel();
853
854 let Some(sender) = &self.block_write_sender.non_finalized else {
855 let _ = rsp_tx.send(Err(ReconsiderError::CheckpointCommitInProgress));
856 return rsp_rx;
857 };
858
859 if let Err(tokio::sync::mpsc::error::SendError(error)) =
860 sender.send(NonFinalizedWriteMessage::Reconsider { hash, rsp_tx })
861 {
862 let NonFinalizedWriteMessage::Reconsider { rsp_tx, .. } = error else {
863 unreachable!("should return the same Reconsider message could not be sent");
864 };
865
866 let _ = rsp_tx.send(Err(ReconsiderError::ReconsiderSendFailed));
867 }
868
869 rsp_rx
870 }
871
872 fn assert_block_can_be_validated(&self, block: &SemanticallyVerifiedBlock) {
874 assert!(
876 block.height > self.network.mandatory_checkpoint_height(),
877 "invalid semantically verified block height: the canopy checkpoint is mandatory, pre-canopy \
878 blocks, and the canopy activation block, must be committed to the state as finalized \
879 blocks"
880 );
881 }
882
883 fn known_sent_hash(&self, hash: &block::Hash) -> Option<KnownBlock> {
884 self.non_finalized_block_write_sent_hashes
885 .contains(hash)
886 .then_some(KnownBlock::WriteChannel)
887 }
888}
889
890impl ReadStateService {
891 pub(crate) fn new(
897 finalized_state: &FinalizedState,
898 block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
899 non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
900 ) -> Self {
901 let read_service = Self {
902 network: finalized_state.network(),
903 db: finalized_state.db.clone(),
904 non_finalized_state_receiver,
905 block_write_task,
906 };
907
908 tracing::debug!("created new read-only state service");
909
910 read_service
911 }
912
913 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
915 read::best_tip(&self.latest_non_finalized_state(), &self.db)
916 }
917
918 fn latest_non_finalized_state(&self) -> NonFinalizedState {
920 self.non_finalized_state_receiver.cloned_watch_data()
921 }
922
923 fn latest_best_chain(&self) -> Option<Arc<Chain>> {
925 self.non_finalized_state_receiver
926 .borrow_mapped(|non_finalized_state| non_finalized_state.best_chain().cloned())
927 }
928
929 #[cfg(any(test, feature = "proptest-impl"))]
932 pub fn db(&self) -> &ZebraDb {
933 &self.db
934 }
935
936 pub fn log_db_metrics(&self) {
938 self.db.print_db_metrics();
939 }
940}
941
942impl Service<Request> for StateService {
943 type Response = Response;
944 type Error = BoxError;
945 type Future =
946 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
947
948 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
949 let poll = self.read_service.poll_ready(cx);
951
952 let now = Instant::now();
954
955 if self.last_prune + Self::PRUNE_INTERVAL < now {
956 let tip = self.best_tip();
957 let old_len = self.pending_utxos.len();
958
959 self.pending_utxos.prune();
960 self.last_prune = now;
961
962 let new_len = self.pending_utxos.len();
963 let prune_count = old_len
964 .checked_sub(new_len)
965 .expect("prune does not add any utxo requests");
966 if prune_count > 0 {
967 tracing::debug!(
968 ?old_len,
969 ?new_len,
970 ?prune_count,
971 ?tip,
972 "pruned utxo requests"
973 );
974 } else {
975 tracing::debug!(len = ?old_len, ?tip, "no utxo requests needed pruning");
976 }
977 }
978
979 poll
980 }
981
982 #[instrument(name = "state", skip(self, req))]
983 fn call(&mut self, req: Request) -> Self::Future {
984 req.count_metric();
985 let span = Span::current();
986
987 match req {
988 Request::CommitSemanticallyVerifiedBlock(semantically_verified) => {
993 let timer = CodeTimer::start();
994 self.assert_block_can_be_validated(&semantically_verified);
995
996 self.pending_utxos
997 .check_against_ordered(&semantically_verified.new_outputs);
998
999 let rsp_rx = tokio::task::block_in_place(move || {
1011 span.in_scope(|| {
1012 self.queue_and_commit_to_non_finalized_state(semantically_verified)
1013 })
1014 });
1015
1016 timer.finish_desc("CommitSemanticallyVerifiedBlock");
1022
1023 let span = Span::current();
1027 async move {
1028 rsp_rx
1029 .await
1030 .map_err(|_recv_error| CommitBlockError::WriteTaskExited.into())
1031 .and_then(|result| result)
1032 .map_err(BoxError::from)
1033 .map(Response::Committed)
1034 }
1035 .instrument(span)
1036 .boxed()
1037 }
1038
1039 Request::CommitCheckpointVerifiedBlock(finalized) => {
1044 let timer = CodeTimer::start();
1045 self.pending_utxos
1059 .check_against_ordered(&finalized.new_outputs);
1060
1061 let rsp_rx = self.queue_and_commit_to_finalized_state(finalized);
1066
1067 timer.finish_desc("CommitCheckpointVerifiedBlock");
1073
1074 async move {
1078 rsp_rx
1079 .await
1080 .map_err(|_recv_error| CommitBlockError::WriteTaskExited.into())
1081 .and_then(|result| result)
1082 .map_err(BoxError::from)
1083 .map(Response::Committed)
1084 }
1085 .instrument(span)
1086 .boxed()
1087 }
1088
1089 Request::AwaitUtxo(outpoint) => {
1092 let timer = CodeTimer::start();
1093 let response_fut = self.pending_utxos.queue(outpoint);
1095 let response_fut = response_fut.instrument(span).boxed();
1099
1100 if let Some(utxo) = self.non_finalized_state_queued_blocks.utxo(&outpoint) {
1103 self.pending_utxos.respond(&outpoint, utxo);
1104
1105 timer.finish_desc("AwaitUtxo/queued-non-finalized");
1107
1108 return response_fut;
1109 }
1110
1111 if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) {
1113 self.pending_utxos.respond(&outpoint, utxo);
1114
1115 timer.finish_desc("AwaitUtxo/sent-non-finalized");
1117
1118 return response_fut;
1119 }
1120
1121 let read_service = self.read_service.clone();
1130
1131 async move {
1133 let req = ReadRequest::AnyChainUtxo(outpoint);
1134
1135 let rsp = read_service.oneshot(req).await?;
1136
1137 if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp {
1150 timer.finish_desc("AwaitUtxo/any-chain");
1152
1153 return Ok(Response::Utxo(utxo));
1154 }
1155
1156 timer.finish_desc("AwaitUtxo/waiting");
1158
1159 response_fut.await
1160 }
1161 .boxed()
1162 }
1163
1164 Request::KnownBlock(hash) => {
1167 let timer = CodeTimer::start();
1168 let sent_hash_response = self.known_sent_hash(&hash);
1169 let read_service = self.read_service.clone();
1170
1171 async move {
1172 if sent_hash_response.is_some() {
1173 return Ok(Response::KnownBlock(sent_hash_response));
1174 };
1175
1176 let response = read::non_finalized_state_contains_block_hash(
1177 &read_service.latest_non_finalized_state(),
1178 hash,
1179 )
1180 .or_else(|| read::finalized_state_contains_block_hash(&read_service.db, hash));
1182
1183 timer.finish_desc("Request::KnownBlock");
1184
1185 Ok(Response::KnownBlock(response))
1186 }
1187 .boxed()
1188 }
1189
1190 Request::InvalidateBlock(block_hash) => {
1192 let rsp_rx = tokio::task::block_in_place(move || {
1193 span.in_scope(|| self.send_invalidate_block(block_hash))
1194 });
1195
1196 let span = Span::current();
1200 async move {
1201 rsp_rx
1202 .await
1203 .map_err(|_recv_error| InvalidateError::InvalidateRequestDropped)
1204 .and_then(|result| result)
1205 .map_err(BoxError::from)
1206 .map(Response::Invalidated)
1207 }
1208 .instrument(span)
1209 .boxed()
1210 }
1211
1212 Request::ReconsiderBlock(block_hash) => {
1214 let rsp_rx = tokio::task::block_in_place(move || {
1215 span.in_scope(|| self.send_reconsider_block(block_hash))
1216 });
1217
1218 let span = Span::current();
1222 async move {
1223 rsp_rx
1224 .await
1225 .map_err(|_recv_error| ReconsiderError::ReconsiderResponseDropped)
1226 .and_then(|result| result)
1227 .map_err(BoxError::from)
1228 .map(Response::Reconsidered)
1229 }
1230 .instrument(span)
1231 .boxed()
1232 }
1233
1234 Request::Tip
1236 | Request::Depth(_)
1237 | Request::BestChainNextMedianTimePast
1238 | Request::BestChainBlockHash(_)
1239 | Request::BlockLocator
1240 | Request::Transaction(_)
1241 | Request::AnyChainTransaction(_)
1242 | Request::UnspentBestChainUtxo(_)
1243 | Request::Block(_)
1244 | Request::AnyChainBlock(_)
1245 | Request::BlockAndSize(_)
1246 | Request::BlockHeader(_)
1247 | Request::FindBlockHashes { .. }
1248 | Request::FindBlockHeaders { .. }
1249 | Request::CheckBestChainTipNullifiersAndAnchors(_)
1250 | Request::CheckBlockProposalValidity(_) => {
1251 let read_service = self.read_service.clone();
1253
1254 async move {
1255 let req = req
1256 .try_into()
1257 .expect("ReadRequest conversion should not fail");
1258
1259 let rsp = read_service.oneshot(req).await?;
1260 let rsp = rsp.try_into().expect("Response conversion should not fail");
1261
1262 Ok(rsp)
1263 }
1264 .boxed()
1265 }
1266 }
1267 }
1268}
1269
1270impl Service<ReadRequest> for ReadStateService {
1271 type Response = ReadResponse;
1272 type Error = BoxError;
1273 type Future =
1274 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1275
1276 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1277 let block_write_task = self.block_write_task.take();
1281
1282 if let Some(block_write_task) = block_write_task {
1283 if block_write_task.is_finished() {
1284 if let Some(block_write_task) = Arc::into_inner(block_write_task) {
1285 if let Err(thread_panic) = block_write_task.join() {
1287 std::panic::resume_unwind(thread_panic);
1288 }
1289 }
1290 } else {
1291 self.block_write_task = Some(block_write_task);
1293 }
1294 }
1295
1296 self.db.check_for_panics();
1297
1298 Poll::Ready(Ok(()))
1299 }
1300
1301 #[instrument(name = "read_state", skip(self, req))]
1302 fn call(&mut self, req: ReadRequest) -> Self::Future {
1303 req.count_metric();
1304 let timer = CodeTimer::start_desc(req.variant_name());
1305 let span = Span::current();
1306 let timed_span = TimedSpan::new(timer, span);
1307 let state = self.clone();
1308
1309 if req == ReadRequest::NonFinalizedBlocksListener {
1310 let non_finalized_blocks_listener = NonFinalizedBlocksListener::spawn(
1313 self.network.clone(),
1314 self.non_finalized_state_receiver.clone(),
1315 );
1316
1317 return async move {
1318 Ok(ReadResponse::NonFinalizedBlocksListener(
1319 non_finalized_blocks_listener,
1320 ))
1321 }
1322 .boxed();
1323 };
1324
1325 let request_handler = move || match req {
1326 ReadRequest::UsageInfo => Ok(ReadResponse::UsageInfo(state.db.size())),
1328
1329 ReadRequest::Tip => Ok(ReadResponse::Tip(read::tip(
1331 state.latest_best_chain(),
1332 &state.db,
1333 ))),
1334
1335 ReadRequest::TipPoolValues => {
1337 let (tip_height, tip_hash, value_balance) =
1338 read::tip_with_value_balance(state.latest_best_chain(), &state.db)?
1339 .ok_or(BoxError::from("no chain tip available yet"))?;
1340
1341 Ok(ReadResponse::TipPoolValues {
1342 tip_height,
1343 tip_hash,
1344 value_balance,
1345 })
1346 }
1347
1348 ReadRequest::BlockInfo(hash_or_height) => Ok(ReadResponse::BlockInfo(
1350 read::block_info(state.latest_best_chain(), &state.db, hash_or_height),
1351 )),
1352
1353 ReadRequest::Depth(hash) => Ok(ReadResponse::Depth(read::depth(
1355 state.latest_best_chain(),
1356 &state.db,
1357 hash,
1358 ))),
1359
1360 ReadRequest::BestChainNextMedianTimePast => {
1362 Ok(ReadResponse::BestChainNextMedianTimePast(
1363 read::next_median_time_past(&state.latest_non_finalized_state(), &state.db)?,
1364 ))
1365 }
1366
1367 ReadRequest::Block(hash_or_height) => Ok(ReadResponse::Block(read::block(
1369 state.latest_best_chain(),
1370 &state.db,
1371 hash_or_height,
1372 ))),
1373
1374 ReadRequest::AnyChainBlock(hash_or_height) => Ok(ReadResponse::Block(read::any_block(
1375 state.latest_non_finalized_state().chain_iter(),
1376 &state.db,
1377 hash_or_height,
1378 ))),
1379
1380 ReadRequest::BlockAndSize(hash_or_height) => Ok(ReadResponse::BlockAndSize(
1382 read::block_and_size(state.latest_best_chain(), &state.db, hash_or_height),
1383 )),
1384
1385 ReadRequest::BlockHeader(hash_or_height) => {
1387 let best_chain = state.latest_best_chain();
1388
1389 let height = hash_or_height
1390 .height_or_else(|hash| {
1391 read::find::height_by_hash(best_chain.clone(), &state.db, hash)
1392 })
1393 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1394
1395 let hash = hash_or_height
1396 .hash_or_else(|height| {
1397 read::find::hash_by_height(best_chain.clone(), &state.db, height)
1398 })
1399 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1400
1401 let next_height = height.next()?;
1402 let next_block_hash =
1403 read::find::hash_by_height(best_chain.clone(), &state.db, next_height);
1404
1405 let header = read::block_header(best_chain, &state.db, height.into())
1406 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1407
1408 Ok(ReadResponse::BlockHeader {
1409 header,
1410 hash,
1411 height,
1412 next_block_hash,
1413 })
1414 }
1415
1416 ReadRequest::Transaction(hash) => Ok(ReadResponse::Transaction(
1418 read::mined_transaction(state.latest_best_chain(), &state.db, hash),
1419 )),
1420
1421 ReadRequest::AnyChainTransaction(hash) => {
1422 Ok(ReadResponse::AnyChainTransaction(read::any_transaction(
1423 state.latest_non_finalized_state().chain_iter(),
1424 &state.db,
1425 hash,
1426 )))
1427 }
1428
1429 ReadRequest::TransactionIdsForBlock(hash_or_height) => Ok(
1431 ReadResponse::TransactionIdsForBlock(read::transaction_hashes_for_block(
1432 state.latest_best_chain(),
1433 &state.db,
1434 hash_or_height,
1435 )),
1436 ),
1437
1438 ReadRequest::AnyChainTransactionIdsForBlock(hash_or_height) => {
1439 Ok(ReadResponse::AnyChainTransactionIdsForBlock(
1440 read::transaction_hashes_for_any_block(
1441 state.latest_non_finalized_state().chain_iter(),
1442 &state.db,
1443 hash_or_height,
1444 ),
1445 ))
1446 }
1447
1448 #[cfg(feature = "indexer")]
1449 ReadRequest::SpendingTransactionId(spend) => Ok(ReadResponse::TransactionId(
1450 read::spending_transaction_hash(state.latest_best_chain(), &state.db, spend),
1451 )),
1452
1453 ReadRequest::UnspentBestChainUtxo(outpoint) => Ok(ReadResponse::UnspentBestChainUtxo(
1454 read::unspent_utxo(state.latest_best_chain(), &state.db, outpoint),
1455 )),
1456
1457 ReadRequest::AnyChainUtxo(outpoint) => Ok(ReadResponse::AnyChainUtxo(read::any_utxo(
1459 state.latest_non_finalized_state(),
1460 &state.db,
1461 outpoint,
1462 ))),
1463
1464 ReadRequest::BlockLocator => Ok(ReadResponse::BlockLocator(
1466 read::block_locator(state.latest_best_chain(), &state.db).unwrap_or_default(),
1467 )),
1468
1469 ReadRequest::FindBlockHashes { known_blocks, stop } => {
1471 Ok(ReadResponse::BlockHashes(read::find_chain_hashes(
1472 state.latest_best_chain(),
1473 &state.db,
1474 known_blocks,
1475 stop,
1476 MAX_FIND_BLOCK_HASHES_RESULTS,
1477 )))
1478 }
1479
1480 ReadRequest::FindBlockHeaders { known_blocks, stop } => Ok(ReadResponse::BlockHeaders(
1482 read::find_chain_headers(
1483 state.latest_best_chain(),
1484 &state.db,
1485 known_blocks,
1486 stop,
1487 MAX_FIND_BLOCK_HEADERS_RESULTS,
1488 )
1489 .into_iter()
1490 .map(|header| CountedHeader { header })
1491 .collect(),
1492 )),
1493
1494 ReadRequest::SaplingTree(hash_or_height) => Ok(ReadResponse::SaplingTree(
1495 read::sapling_tree(state.latest_best_chain(), &state.db, hash_or_height),
1496 )),
1497
1498 ReadRequest::OrchardTree(hash_or_height) => Ok(ReadResponse::OrchardTree(
1499 read::orchard_tree(state.latest_best_chain(), &state.db, hash_or_height),
1500 )),
1501
1502 ReadRequest::SaplingSubtrees { start_index, limit } => {
1503 let end_index = limit
1504 .and_then(|limit| start_index.0.checked_add(limit.0))
1505 .map(NoteCommitmentSubtreeIndex);
1506
1507 let best_chain = state.latest_best_chain();
1508 let sapling_subtrees = if let Some(end_index) = end_index {
1509 read::sapling_subtrees(best_chain, &state.db, start_index..end_index)
1510 } else {
1511 read::sapling_subtrees(best_chain, &state.db, start_index..)
1516 };
1517
1518 Ok(ReadResponse::SaplingSubtrees(sapling_subtrees))
1519 }
1520
1521 ReadRequest::OrchardSubtrees { start_index, limit } => {
1522 let end_index = limit
1523 .and_then(|limit| start_index.0.checked_add(limit.0))
1524 .map(NoteCommitmentSubtreeIndex);
1525
1526 let best_chain = state.latest_best_chain();
1527 let orchard_subtrees = if let Some(end_index) = end_index {
1528 read::orchard_subtrees(best_chain, &state.db, start_index..end_index)
1529 } else {
1530 read::orchard_subtrees(best_chain, &state.db, start_index..)
1535 };
1536
1537 Ok(ReadResponse::OrchardSubtrees(orchard_subtrees))
1538 }
1539
1540 ReadRequest::AddressBalance(addresses) => {
1542 let (balance, received) =
1543 read::transparent_balance(state.latest_best_chain(), &state.db, addresses)?;
1544 Ok(ReadResponse::AddressBalance { balance, received })
1545 }
1546
1547 ReadRequest::TransactionIdsByAddresses {
1549 addresses,
1550 height_range,
1551 } => read::transparent_tx_ids(
1552 state.latest_best_chain(),
1553 &state.db,
1554 addresses,
1555 height_range,
1556 )
1557 .map(ReadResponse::AddressesTransactionIds),
1558
1559 ReadRequest::UtxosByAddresses(addresses) => read::address_utxos(
1561 &state.network,
1562 state.latest_best_chain(),
1563 &state.db,
1564 addresses,
1565 )
1566 .map(ReadResponse::AddressUtxos),
1567
1568 ReadRequest::CheckBestChainTipNullifiersAndAnchors(unmined_tx) => {
1569 let latest_non_finalized_best_chain = state.latest_best_chain();
1570
1571 check::nullifier::tx_no_duplicates_in_chain(
1572 &state.db,
1573 latest_non_finalized_best_chain.as_ref(),
1574 &unmined_tx.transaction,
1575 )?;
1576
1577 check::anchors::tx_anchors_refer_to_final_treestates(
1578 &state.db,
1579 latest_non_finalized_best_chain.as_ref(),
1580 &unmined_tx,
1581 )?;
1582
1583 Ok(ReadResponse::ValidBestChainTipNullifiersAndAnchors)
1584 }
1585
1586 ReadRequest::BestChainBlockHash(height) => Ok(ReadResponse::BlockHash(
1588 read::hash_by_height(state.latest_best_chain(), &state.db, height),
1589 )),
1590
1591 ReadRequest::ChainInfo => {
1593 read::difficulty::get_block_template_chain_info(
1605 &state.latest_non_finalized_state(),
1606 &state.db,
1607 &state.network,
1608 )
1609 .map(ReadResponse::ChainInfo)
1610 }
1611
1612 ReadRequest::SolutionRate { num_blocks, height } => {
1614 let latest_non_finalized_state = state.latest_non_finalized_state();
1615 let (tip_height, tip_hash) =
1623 match read::tip(latest_non_finalized_state.best_chain(), &state.db) {
1624 Some(tip_hash) => tip_hash,
1625 None => return Ok(ReadResponse::SolutionRate(None)),
1626 };
1627
1628 let start_hash = match height {
1629 Some(height) if height < tip_height => read::hash_by_height(
1630 latest_non_finalized_state.best_chain(),
1631 &state.db,
1632 height,
1633 ),
1634 _ => Some(tip_hash),
1636 };
1637
1638 let solution_rate = start_hash.and_then(|start_hash| {
1639 read::difficulty::solution_rate(
1640 &latest_non_finalized_state,
1641 &state.db,
1642 num_blocks,
1643 start_hash,
1644 )
1645 });
1646
1647 Ok(ReadResponse::SolutionRate(solution_rate))
1648 }
1649
1650 ReadRequest::CheckBlockProposalValidity(semantically_verified) => {
1651 tracing::debug!(
1652 "attempting to validate and commit block proposal \
1653 onto a cloned non-finalized state"
1654 );
1655 let mut latest_non_finalized_state = state.latest_non_finalized_state();
1656
1657 let Some((_best_tip_height, best_tip_hash)) =
1659 read::best_tip(&latest_non_finalized_state, &state.db)
1660 else {
1661 return Err(
1662 "state is empty: wait for Zebra to sync before submitting a proposal"
1663 .into(),
1664 );
1665 };
1666
1667 if semantically_verified.block.header.previous_block_hash != best_tip_hash {
1668 return Err("proposal is not based on the current best chain tip: \
1669 previous block hash must be the best chain tip"
1670 .into());
1671 }
1672
1673 latest_non_finalized_state.disable_metrics();
1679
1680 write::validate_and_commit_non_finalized(
1681 &state.db,
1682 &mut latest_non_finalized_state,
1683 semantically_verified,
1684 )?;
1685
1686 Ok(ReadResponse::ValidBlockProposal)
1687 }
1688
1689 ReadRequest::TipBlockSize => {
1690 Ok(ReadResponse::TipBlockSize(
1692 state
1693 .best_tip()
1694 .and_then(|(tip_height, _)| {
1695 read::block_info(
1696 state.latest_best_chain(),
1697 &state.db,
1698 tip_height.into(),
1699 )
1700 })
1701 .map(|info| info.size().try_into().expect("u32 should fit in usize"))
1702 .or_else(|| {
1703 find::tip_block(state.latest_best_chain(), &state.db)
1704 .map(|b| b.zcash_serialized_size())
1705 }),
1706 ))
1707 }
1708
1709 ReadRequest::NonFinalizedBlocksListener => {
1710 unreachable!("should return early");
1711 }
1712
1713 ReadRequest::IsTransparentOutputSpent(outpoint) => {
1715 let is_spent = read::unspent_utxo(state.latest_best_chain(), &state.db, outpoint);
1716 Ok(ReadResponse::IsTransparentOutputSpent(is_spent.is_none()))
1717 }
1718 };
1719
1720 timed_span.spawn_blocking(request_handler)
1721 }
1722}
1723
1724pub async fn init(
1740 config: Config,
1741 network: &Network,
1742 max_checkpoint_height: block::Height,
1743 checkpoint_verify_concurrency_limit: usize,
1744) -> (
1745 BoxService<Request, Response, BoxError>,
1746 ReadStateService,
1747 LatestChainTip,
1748 ChainTipChange,
1749) {
1750 let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
1751 StateService::new(
1752 config,
1753 network,
1754 max_checkpoint_height,
1755 checkpoint_verify_concurrency_limit,
1756 )
1757 .await;
1758
1759 (
1760 BoxService::new(state_service),
1761 read_only_state_service,
1762 latest_chain_tip,
1763 chain_tip_change,
1764 )
1765}
1766
1767pub fn init_read_only(
1774 config: Config,
1775 network: &Network,
1776) -> (
1777 ReadStateService,
1778 ZebraDb,
1779 tokio::sync::watch::Sender<NonFinalizedState>,
1780) {
1781 let finalized_state = FinalizedState::new_with_debug(
1782 &config,
1783 network,
1784 true,
1785 #[cfg(feature = "elasticsearch")]
1786 false,
1787 true,
1788 );
1789 let (non_finalized_state_sender, non_finalized_state_receiver) =
1790 tokio::sync::watch::channel(NonFinalizedState::new(network));
1791
1792 (
1793 ReadStateService::new(
1794 &finalized_state,
1795 None,
1796 WatchReceiver::new(non_finalized_state_receiver),
1797 ),
1798 finalized_state.db.clone(),
1799 non_finalized_state_sender,
1800 )
1801}
1802
1803pub fn spawn_init_read_only(
1806 config: Config,
1807 network: &Network,
1808) -> tokio::task::JoinHandle<(
1809 ReadStateService,
1810 ZebraDb,
1811 tokio::sync::watch::Sender<NonFinalizedState>,
1812)> {
1813 let network = network.clone();
1814 tokio::task::spawn_blocking(move || init_read_only(config, &network))
1815}
1816
1817#[cfg(any(test, feature = "proptest-impl"))]
1821pub async fn init_test(
1822 network: &Network,
1823) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
1824 let (state_service, _, _, _) =
1827 StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
1828
1829 Buffer::new(BoxService::new(state_service), 1)
1830}
1831
1832#[cfg(any(test, feature = "proptest-impl"))]
1837pub async fn init_test_services(
1838 network: &Network,
1839) -> (
1840 Buffer<BoxService<Request, Response, BoxError>, Request>,
1841 ReadStateService,
1842 LatestChainTip,
1843 ChainTipChange,
1844) {
1845 let (state_service, read_state_service, latest_chain_tip, chain_tip_change) =
1848 StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
1849
1850 let state_service = Buffer::new(BoxService::new(state_service), 1);
1851
1852 (
1853 state_service,
1854 read_state_service,
1855 latest_chain_tip,
1856 chain_tip_change,
1857 )
1858}