Skip to main content

zebra_state/
service.rs

1//! [`tower::Service`]s for Zebra's cached chain state.
2//!
3//! Zebra provides cached state access via two main services:
4//! - [`StateService`]: a read-write service that writes blocks to the state,
5//!   and redirects most read requests to the [`ReadStateService`].
6//! - [`ReadStateService`]: a read-only service that answers from the most
7//!   recent committed block.
8//!
9//! Most users should prefer [`ReadStateService`], unless they need to write blocks to the state.
10//!
11//! Zebra also provides access to the best chain tip via:
12//! - [`LatestChainTip`]: a read-only channel that contains the latest committed
13//!   tip.
14//! - [`ChainTipChange`]: a read-only channel that can asynchronously await
15//!   chain tip changes.
16
17use std::{
18    collections::HashMap,
19    future::Future,
20    pin::Pin,
21    sync::Arc,
22    task::{Context, Poll},
23    time::{Duration, Instant},
24};
25
26use futures::future::FutureExt;
27use tokio::sync::oneshot;
28use tower::{util::BoxService, Service, ServiceExt};
29use tracing::{instrument, Instrument, Span};
30
31#[cfg(any(test, feature = "proptest-impl"))]
32use tower::buffer::Buffer;
33
34use zebra_chain::{
35    block::{self, CountedHeader, HeightDiff},
36    diagnostic::CodeTimer,
37    parameters::{Network, NetworkUpgrade},
38    serialization::ZcashSerialize,
39    subtree::NoteCommitmentSubtreeIndex,
40};
41
42use crate::{
43    constants::{
44        MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS, MAX_LEGACY_CHAIN_BLOCKS,
45    },
46    error::{CommitBlockError, CommitCheckpointVerifiedError, InvalidateError, ReconsiderError},
47    request::TimedSpan,
48    response::NonFinalizedBlocksListener,
49    service::{
50        block_iter::any_ancestor_blocks,
51        chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
52        finalized_state::{FinalizedState, ZebraDb},
53        non_finalized_state::{Chain, NonFinalizedState},
54        pending_utxos::PendingUtxos,
55        queued_blocks::QueuedBlocks,
56        read::find,
57        watch_receiver::WatchReceiver,
58    },
59    BoxError, CheckpointVerifiedBlock, CommitSemanticallyVerifiedError, Config, KnownBlock,
60    ReadRequest, ReadResponse, Request, Response, SemanticallyVerifiedBlock,
61};
62
63pub mod block_iter;
64pub mod chain_tip;
65pub mod watch_receiver;
66
67pub mod check;
68
69pub(crate) mod finalized_state;
70pub(crate) mod non_finalized_state;
71mod pending_utxos;
72mod queued_blocks;
73pub(crate) mod read;
74mod traits;
75mod write;
76
77#[cfg(any(test, feature = "proptest-impl"))]
78pub mod arbitrary;
79
80#[cfg(test)]
81mod tests;
82
83pub use finalized_state::{OutputLocation, TransactionIndex, TransactionLocation};
84use write::NonFinalizedWriteMessage;
85
86use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes};
87
88pub use self::traits::{ReadState, State};
89
90/// A read-write service for Zebra's cached blockchain state.
91///
92/// This service modifies and provides access to:
93/// - the non-finalized state: the ~100 most recent blocks.
94///   Zebra allows chain forks in the non-finalized state,
95///   stores it in memory, and re-downloads it when restarted.
96/// - the finalized state: older blocks that have many confirmations.
97///   Zebra stores the single best chain in the finalized state,
98///   and re-loads it from disk when restarted.
99///
100/// Read requests to this service are buffered, then processed concurrently.
101/// Block write requests are buffered, then queued, then processed in order by a separate task.
102///
103/// Most state users can get faster read responses using the [`ReadStateService`],
104/// because its requests do not share a [`tower::buffer::Buffer`] with block write requests.
105///
106/// To quickly get the latest block, use [`LatestChainTip`] or [`ChainTipChange`].
107/// They can read the latest block directly, without queueing any requests.
108#[derive(Debug)]
109pub(crate) struct StateService {
110    // Configuration
111    //
112    /// The configured Zcash network.
113    network: Network,
114
115    /// The height that we start storing UTXOs from finalized blocks.
116    ///
117    /// This height should be lower than the last few checkpoints,
118    /// so the full verifier can verify UTXO spends from those blocks,
119    /// even if they haven't been committed to the finalized state yet.
120    full_verifier_utxo_lookahead: block::Height,
121
122    // Queued Blocks
123    //
124    /// Queued blocks for the [`NonFinalizedState`] that arrived out of order.
125    /// These blocks are awaiting their parent blocks before they can do contextual verification.
126    non_finalized_state_queued_blocks: QueuedBlocks,
127
128    /// Queued blocks for the [`FinalizedState`] that arrived out of order.
129    /// These blocks are awaiting their parent blocks before they can do contextual verification.
130    ///
131    /// Indexed by their parent block hash.
132    finalized_state_queued_blocks: HashMap<block::Hash, QueuedCheckpointVerified>,
133
134    /// Channels to send blocks to the block write task.
135    block_write_sender: write::BlockWriteSender,
136
137    /// The [`block::Hash`] of the most recent block sent on
138    /// `finalized_block_write_sender` or `non_finalized_block_write_sender`.
139    ///
140    /// On startup, this is:
141    /// - the finalized tip, if there are stored blocks, or
142    /// - the genesis block's parent hash, if the database is empty.
143    ///
144    /// If `invalid_block_write_reset_receiver` gets a reset, this is:
145    /// - the hash of the last valid committed block (the parent of the invalid block).
146    finalized_block_write_last_sent_hash: block::Hash,
147
148    /// A set of block hashes that have been sent to the block write task.
149    /// Hashes of blocks below the finalized tip height are periodically pruned.
150    non_finalized_block_write_sent_hashes: SentHashes,
151
152    /// If an invalid block is sent on `finalized_block_write_sender`
153    /// or `non_finalized_block_write_sender`,
154    /// this channel gets the [`block::Hash`] of the valid tip.
155    //
156    // TODO: add tests for finalized and non-finalized resets (#2654)
157    invalid_block_write_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
158
159    // Pending UTXO Request Tracking
160    //
161    /// The set of outpoints with pending requests for their associated transparent::Output.
162    pending_utxos: PendingUtxos,
163
164    /// Instant tracking the last time `pending_utxos` was pruned.
165    last_prune: Instant,
166
167    // Updating Concurrently Readable State
168    //
169    /// A cloneable [`ReadStateService`], used to answer concurrent read requests.
170    ///
171    /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`.
172    read_service: ReadStateService,
173
174    // Metrics
175    //
176    /// A metric tracking the maximum height that's currently in `finalized_state_queued_blocks`
177    ///
178    /// Set to `f64::NAN` if `finalized_state_queued_blocks` is empty, because grafana shows NaNs
179    /// as a break in the graph.
180    max_finalized_queue_height: f64,
181}
182
183/// A read-only service for accessing Zebra's cached blockchain state.
184///
185/// This service provides read-only access to:
186/// - the non-finalized state: the ~100 most recent blocks.
187/// - the finalized state: older blocks that have many confirmations.
188///
189/// Requests to this service are processed in parallel,
190/// ignoring any blocks queued by the read-write [`StateService`].
191///
192/// This quick response behavior is better for most state users.
193/// It allows other async tasks to make progress while concurrently reading data from disk.
194#[derive(Clone, Debug)]
195pub struct ReadStateService {
196    // Configuration
197    //
198    /// The configured Zcash network.
199    network: Network,
200
201    // Shared Concurrently Readable State
202    //
203    /// A watch channel with a cached copy of the [`NonFinalizedState`].
204    ///
205    /// This state is only updated between requests,
206    /// so it might include some block data that is also on `disk`.
207    non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
208
209    /// The shared inner on-disk database for the finalized state.
210    ///
211    /// RocksDB allows reads and writes via a shared reference,
212    /// but [`ZebraDb`] doesn't expose any write methods or types.
213    ///
214    /// This chain is updated concurrently with requests,
215    /// so it might include some block data that is also in `best_mem`.
216    db: ZebraDb,
217
218    /// A shared handle to a task that writes blocks to the [`NonFinalizedState`] or [`FinalizedState`],
219    /// once the queues have received all their parent blocks.
220    ///
221    /// Used to check for panics when writing blocks.
222    block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
223}
224
225impl Drop for StateService {
226    fn drop(&mut self) {
227        // The state service owns the state, tasks, and channels,
228        // so dropping it should shut down everything.
229
230        // Close the channels (non-blocking)
231        // This makes the block write thread exit the next time it checks the channels.
232        // We want to do this here so we get any errors or panics from the block write task before it shuts down.
233        self.invalid_block_write_reset_receiver.close();
234
235        std::mem::drop(self.block_write_sender.finalized.take());
236        std::mem::drop(self.block_write_sender.non_finalized.take());
237
238        self.clear_finalized_block_queue(CommitBlockError::WriteTaskExited);
239        self.clear_non_finalized_block_queue(CommitBlockError::WriteTaskExited);
240
241        // Log database metrics before shutting down
242        info!("dropping the state: logging database metrics");
243        self.log_db_metrics();
244
245        // Then drop self.read_service, which checks the block write task for panics,
246        // and tries to shut down the database.
247    }
248}
249
250impl Drop for ReadStateService {
251    fn drop(&mut self) {
252        // The read state service shares the state,
253        // so dropping it should check if we can shut down.
254
255        // TODO: move this into a try_shutdown() method
256        if let Some(block_write_task) = self.block_write_task.take() {
257            if let Some(block_write_task_handle) = Arc::into_inner(block_write_task) {
258                // We're the last database user, so we can tell it to shut down (blocking):
259                // - flushes the database to disk, and
260                // - drops the database, which cleans up any database tasks correctly.
261                self.db.shutdown(true);
262
263                // We are the last state with a reference to this thread, so we can
264                // wait until the block write task finishes, then check for panics (blocking).
265                // (We'd also like to abort the thread, but std::thread::JoinHandle can't do that.)
266
267                // This log is verbose during tests.
268                #[cfg(not(test))]
269                info!("waiting for the block write task to finish");
270                #[cfg(test)]
271                debug!("waiting for the block write task to finish");
272
273                // TODO: move this into a check_for_panics() method
274                if let Err(thread_panic) = block_write_task_handle.join() {
275                    std::panic::resume_unwind(thread_panic);
276                } else {
277                    debug!("shutting down the state because the block write task has finished");
278                }
279            }
280        } else {
281            // Even if we're not the last database user, try shutting it down.
282            //
283            // TODO: rename this to try_shutdown()?
284            self.db.shutdown(false);
285        }
286    }
287}
288
289impl StateService {
290    const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
291
292    /// Creates a new state service for the state `config` and `network`.
293    ///
294    /// Uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
295    /// to work out when it is near the final checkpoint.
296    ///
297    /// Returns the read-write and read-only state services,
298    /// and read-only watch channels for its best chain tip.
299    pub async fn new(
300        config: Config,
301        network: &Network,
302        max_checkpoint_height: block::Height,
303        checkpoint_verify_concurrency_limit: usize,
304    ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
305        let (finalized_state, finalized_tip, timer) = {
306            let config = config.clone();
307            let network = network.clone();
308            tokio::task::spawn_blocking(move || {
309                let timer = CodeTimer::start();
310                let finalized_state = FinalizedState::new(
311                    &config,
312                    &network,
313                    #[cfg(feature = "elasticsearch")]
314                    true,
315                );
316                timer.finish_desc("opening finalized state database");
317
318                let timer = CodeTimer::start();
319                let finalized_tip = finalized_state.db.tip_block();
320
321                (finalized_state, finalized_tip, timer)
322            })
323            .await
324            .expect("failed to join blocking task")
325        };
326
327        // # Correctness
328        //
329        // The state service must set the finalized block write sender to `None`
330        // if there are blocks in the restored non-finalized state that are above
331        // the max checkpoint height so that non-finalized blocks can be written, otherwise,
332        // Zebra will be unable to commit semantically verified blocks, and its chain sync will stall.
333        //
334        // The state service must not set the finalized block write sender to `None` if there
335        // aren't blocks in the restored non-finalized state that are above the max checkpoint height,
336        // otherwise, unless checkpoint sync is disabled in the zebra-consensus configuration,
337        // Zebra will be unable to commit checkpoint verified blocks, and its chain sync will stall.
338        let is_finalized_tip_past_max_checkpoint = if let Some(tip) = &finalized_tip {
339            tip.coinbase_height().expect("valid block must have height") >= max_checkpoint_height
340        } else {
341            false
342        };
343        let backup_dir_path = config.non_finalized_state_backup_dir(network);
344        let skip_backup_task = config.debug_skip_non_finalized_state_backup_task;
345        let (non_finalized_state, non_finalized_state_sender, non_finalized_state_receiver) =
346            NonFinalizedState::new(network)
347                .with_backup(
348                    backup_dir_path.clone(),
349                    &finalized_state.db,
350                    is_finalized_tip_past_max_checkpoint,
351                    config.debug_skip_non_finalized_state_backup_task,
352                )
353                .await;
354
355        let non_finalized_block_write_sent_hashes = SentHashes::new(&non_finalized_state);
356        let initial_tip = non_finalized_state
357            .best_tip_block()
358            .map(|cv_block| cv_block.block.clone())
359            .or(finalized_tip)
360            .map(CheckpointVerifiedBlock::from)
361            .map(ChainTipBlock::from);
362
363        tracing::info!(chain_tip = ?initial_tip.as_ref().map(|tip| (tip.hash, tip.height)), "loaded Zebra state cache");
364
365        let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
366            ChainTipSender::new(initial_tip, network);
367
368        let finalized_state_for_writing = finalized_state.clone();
369        let should_use_finalized_block_write_sender = non_finalized_state.is_chain_set_empty();
370        let sync_backup_dir_path = backup_dir_path.filter(|_| skip_backup_task);
371        let (block_write_sender, invalid_block_write_reset_receiver, block_write_task) =
372            write::BlockWriteSender::spawn(
373                finalized_state_for_writing,
374                non_finalized_state,
375                chain_tip_sender,
376                non_finalized_state_sender,
377                should_use_finalized_block_write_sender,
378                sync_backup_dir_path,
379            );
380
381        let read_service = ReadStateService::new(
382            &finalized_state,
383            block_write_task,
384            non_finalized_state_receiver,
385        );
386
387        let full_verifier_utxo_lookahead = max_checkpoint_height
388            - HeightDiff::try_from(checkpoint_verify_concurrency_limit)
389                .expect("fits in HeightDiff");
390        let full_verifier_utxo_lookahead =
391            full_verifier_utxo_lookahead.unwrap_or(block::Height::MIN);
392        let non_finalized_state_queued_blocks = QueuedBlocks::default();
393        let pending_utxos = PendingUtxos::default();
394
395        let finalized_block_write_last_sent_hash =
396            tokio::task::spawn_blocking(move || finalized_state.db.finalized_tip_hash())
397                .await
398                .expect("failed to join blocking task");
399
400        let state = Self {
401            network: network.clone(),
402            full_verifier_utxo_lookahead,
403            non_finalized_state_queued_blocks,
404            finalized_state_queued_blocks: HashMap::new(),
405            block_write_sender,
406            finalized_block_write_last_sent_hash,
407            non_finalized_block_write_sent_hashes,
408            invalid_block_write_reset_receiver,
409            pending_utxos,
410            last_prune: Instant::now(),
411            read_service: read_service.clone(),
412            max_finalized_queue_height: f64::NAN,
413        };
414        timer.finish_desc("initializing state service");
415
416        tracing::info!("starting legacy chain check");
417        let timer = CodeTimer::start();
418
419        if let (Some(tip), Some(nu5_activation_height)) = (
420            {
421                let read_state = state.read_service.clone();
422                tokio::task::spawn_blocking(move || read_state.best_tip())
423                    .await
424                    .expect("task should not panic")
425            },
426            NetworkUpgrade::Nu5.activation_height(network),
427        ) {
428            if let Err(error) = check::legacy_chain(
429                nu5_activation_height,
430                any_ancestor_blocks(
431                    &state.read_service.latest_non_finalized_state(),
432                    &state.read_service.db,
433                    tip.1,
434                ),
435                &state.network,
436                MAX_LEGACY_CHAIN_BLOCKS,
437            ) {
438                let legacy_db_path = state.read_service.db.path().to_path_buf();
439                panic!(
440                    "Cached state contains a legacy chain.\n\
441                     An outdated Zebra version did not know about a recent network upgrade,\n\
442                     so it followed a legacy chain using outdated consensus branch rules.\n\
443                     Hint: Delete your database, and restart Zebra to do a full sync.\n\
444                     Database path: {legacy_db_path:?}\n\
445                     Error: {error:?}",
446                );
447            }
448        }
449
450        tracing::info!("cached state consensus branch is valid: no legacy chain found");
451        timer.finish_desc("legacy chain check");
452
453        // Spawn a background task to periodically export RocksDB metrics to Prometheus
454        let db_for_metrics = read_service.db.clone();
455        tokio::spawn(async move {
456            let mut interval = tokio::time::interval(Duration::from_secs(30));
457            loop {
458                interval.tick().await;
459                db_for_metrics.export_metrics();
460            }
461        });
462
463        (state, read_service, latest_chain_tip, chain_tip_change)
464    }
465
466    /// Call read only state service to log rocksdb database metrics.
467    pub fn log_db_metrics(&self) {
468        self.read_service.db.print_db_metrics();
469    }
470
471    /// Queue a checkpoint verified block for verification and storage in the finalized state.
472    ///
473    /// Returns a channel receiver that provides the result of the block commit.
474    fn queue_and_commit_to_finalized_state(
475        &mut self,
476        checkpoint_verified: CheckpointVerifiedBlock,
477    ) -> oneshot::Receiver<Result<block::Hash, CommitCheckpointVerifiedError>> {
478        // # Correctness & Performance
479        //
480        // This method must not block, access the database, or perform CPU-intensive tasks,
481        // because it is called directly from the tokio executor's Future threads.
482
483        let queued_prev_hash = checkpoint_verified.block.header.previous_block_hash;
484        let queued_height = checkpoint_verified.height;
485
486        // If we're close to the final checkpoint, make the block's UTXOs available for
487        // semantic block verification, even when it is in the channel.
488        if self.is_close_to_final_checkpoint(queued_height) {
489            self.non_finalized_block_write_sent_hashes
490                .add_finalized(&checkpoint_verified)
491        }
492
493        let (rsp_tx, rsp_rx) = oneshot::channel();
494        let queued = (checkpoint_verified, rsp_tx);
495
496        if self.block_write_sender.finalized.is_some() {
497            // We're still committing checkpoint verified blocks
498            if let Some(duplicate_queued) = self
499                .finalized_state_queued_blocks
500                .insert(queued_prev_hash, queued)
501            {
502                Self::send_checkpoint_verified_block_error(
503                    duplicate_queued,
504                    CommitBlockError::new_duplicate(
505                        Some(queued_prev_hash.into()),
506                        KnownBlock::Queue,
507                    ),
508                );
509            }
510
511            self.drain_finalized_queue_and_commit();
512        } else {
513            // We've finished committing checkpoint verified blocks to the finalized state,
514            // so drop any repeated queued blocks, and return an error.
515            //
516            // TODO: track the latest sent height, and drop any blocks under that height
517            //       every time we send some blocks (like QueuedSemanticallyVerifiedBlocks)
518            Self::send_checkpoint_verified_block_error(
519                queued,
520                CommitBlockError::new_duplicate(None, KnownBlock::Finalized),
521            );
522
523            self.clear_finalized_block_queue(CommitBlockError::new_duplicate(
524                None,
525                KnownBlock::Finalized,
526            ));
527        }
528
529        if self.finalized_state_queued_blocks.is_empty() {
530            self.max_finalized_queue_height = f64::NAN;
531        } else if self.max_finalized_queue_height.is_nan()
532            || self.max_finalized_queue_height < queued_height.0 as f64
533        {
534            // if there are still blocks in the queue, then either:
535            //   - the new block was lower than the old maximum, and there was a gap before it,
536            //     so the maximum is still the same (and we skip this code), or
537            //   - the new block is higher than the old maximum, and there is at least one gap
538            //     between the finalized tip and the new maximum
539            self.max_finalized_queue_height = queued_height.0 as f64;
540        }
541
542        metrics::gauge!("state.checkpoint.queued.max.height").set(self.max_finalized_queue_height);
543        metrics::gauge!("state.checkpoint.queued.block.count")
544            .set(self.finalized_state_queued_blocks.len() as f64);
545
546        rsp_rx
547    }
548
549    /// Finds finalized state queue blocks to be committed to the state in order,
550    /// removes them from the queue, and sends them to the block commit task.
551    ///
552    /// After queueing a finalized block, this method checks whether the newly
553    /// queued block (and any of its descendants) can be committed to the state.
554    ///
555    /// Returns an error if the block commit channel has been closed.
556    pub fn drain_finalized_queue_and_commit(&mut self) {
557        use tokio::sync::mpsc::error::{SendError, TryRecvError};
558
559        // # Correctness & Performance
560        //
561        // This method must not block, access the database, or perform CPU-intensive tasks,
562        // because it is called directly from the tokio executor's Future threads.
563
564        // If a block failed, we need to start again from a valid tip.
565        match self.invalid_block_write_reset_receiver.try_recv() {
566            Ok(reset_tip_hash) => self.finalized_block_write_last_sent_hash = reset_tip_hash,
567            Err(TryRecvError::Disconnected) => {
568                info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
569                return;
570            }
571            // There are no errors, so we can just use the last block hash we sent
572            Err(TryRecvError::Empty) => {}
573        }
574
575        while let Some(queued_block) = self
576            .finalized_state_queued_blocks
577            .remove(&self.finalized_block_write_last_sent_hash)
578        {
579            let last_sent_finalized_block_height = queued_block.0.height;
580
581            self.finalized_block_write_last_sent_hash = queued_block.0.hash;
582
583            // If we've finished sending finalized blocks, ignore any repeated blocks.
584            // (Blocks can be repeated after a syncer reset.)
585            if let Some(finalized_block_write_sender) = &self.block_write_sender.finalized {
586                let send_result = finalized_block_write_sender.send(queued_block);
587
588                // If the receiver is closed, we can't send any more blocks.
589                if let Err(SendError(queued)) = send_result {
590                    // If Zebra is shutting down, drop blocks and return an error.
591                    Self::send_checkpoint_verified_block_error(
592                        queued,
593                        CommitBlockError::WriteTaskExited,
594                    );
595
596                    self.clear_finalized_block_queue(CommitBlockError::WriteTaskExited);
597                } else {
598                    metrics::gauge!("state.checkpoint.sent.block.height")
599                        .set(last_sent_finalized_block_height.0 as f64);
600                };
601            }
602        }
603    }
604
605    /// Drops all finalized state queue blocks, and sends an error on their result channels.
606    fn clear_finalized_block_queue(
607        &mut self,
608        error: impl Into<CommitCheckpointVerifiedError> + Clone,
609    ) {
610        for (_hash, queued) in self.finalized_state_queued_blocks.drain() {
611            Self::send_checkpoint_verified_block_error(queued, error.clone());
612        }
613    }
614
615    /// Send an error on a `QueuedCheckpointVerified` block's result channel, and drop the block
616    fn send_checkpoint_verified_block_error(
617        queued: QueuedCheckpointVerified,
618        error: impl Into<CommitCheckpointVerifiedError>,
619    ) {
620        let (finalized, rsp_tx) = queued;
621
622        // The block sender might have already given up on this block,
623        // so ignore any channel send errors.
624        let _ = rsp_tx.send(Err(error.into()));
625        std::mem::drop(finalized);
626    }
627
628    /// Drops all non-finalized state queue blocks, and sends an error on their result channels.
629    fn clear_non_finalized_block_queue(
630        &mut self,
631        error: impl Into<CommitSemanticallyVerifiedError> + Clone,
632    ) {
633        for (_hash, queued) in self.non_finalized_state_queued_blocks.drain() {
634            Self::send_semantically_verified_block_error(queued, error.clone());
635        }
636    }
637
638    /// Send an error on a `QueuedSemanticallyVerified` block's result channel, and drop the block
639    fn send_semantically_verified_block_error(
640        queued: QueuedSemanticallyVerified,
641        error: impl Into<CommitSemanticallyVerifiedError>,
642    ) {
643        let (finalized, rsp_tx) = queued;
644
645        // The block sender might have already given up on this block,
646        // so ignore any channel send errors.
647        let _ = rsp_tx.send(Err(error.into()));
648        std::mem::drop(finalized);
649    }
650
651    /// Queue a semantically verified block for contextual verification and check if any queued
652    /// blocks are ready to be verified and committed to the state.
653    ///
654    /// This function encodes the logic for [committing non-finalized blocks][1]
655    /// in RFC0005.
656    ///
657    /// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
658    #[instrument(level = "debug", skip(self, semantically_verified))]
659    fn queue_and_commit_to_non_finalized_state(
660        &mut self,
661        semantically_verified: SemanticallyVerifiedBlock,
662    ) -> oneshot::Receiver<Result<block::Hash, CommitSemanticallyVerifiedError>> {
663        tracing::debug!(block = %semantically_verified.block, "queueing block for contextual verification");
664        let parent_hash = semantically_verified.block.header.previous_block_hash;
665
666        if self
667            .non_finalized_block_write_sent_hashes
668            .contains(&semantically_verified.hash)
669        {
670            let (rsp_tx, rsp_rx) = oneshot::channel();
671            let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
672                Some(semantically_verified.hash.into()),
673                KnownBlock::WriteChannel,
674            )
675            .into()));
676            return rsp_rx;
677        }
678
679        if self
680            .read_service
681            .db
682            .contains_height(semantically_verified.height)
683        {
684            let (rsp_tx, rsp_rx) = oneshot::channel();
685            let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
686                Some(semantically_verified.height.into()),
687                KnownBlock::Finalized,
688            )
689            .into()));
690            return rsp_rx;
691        }
692
693        // [`Request::CommitSemanticallyVerifiedBlock`] contract: a request to commit a block which
694        // has been queued but not yet committed to the state fails the older request and replaces
695        // it with the newer request.
696        let rsp_rx = if let Some((_, old_rsp_tx)) = self
697            .non_finalized_state_queued_blocks
698            .get_mut(&semantically_verified.hash)
699        {
700            tracing::debug!("replacing older queued request with new request");
701            let (mut rsp_tx, rsp_rx) = oneshot::channel();
702            std::mem::swap(old_rsp_tx, &mut rsp_tx);
703            let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
704                Some(semantically_verified.hash.into()),
705                KnownBlock::Queue,
706            )
707            .into()));
708            rsp_rx
709        } else {
710            let (rsp_tx, rsp_rx) = oneshot::channel();
711            self.non_finalized_state_queued_blocks
712                .queue((semantically_verified, rsp_tx));
713            rsp_rx
714        };
715
716        // We've finished sending checkpoint verified blocks when:
717        // - we've sent the verified block for the last checkpoint, and
718        // - it has been successfully written to disk.
719        //
720        // We detect the last checkpoint by looking for non-finalized blocks
721        // that are a child of the last block we sent.
722        //
723        // TODO: configure the state with the last checkpoint hash instead?
724        if self.block_write_sender.finalized.is_some()
725            && self
726                .non_finalized_state_queued_blocks
727                .has_queued_children(self.finalized_block_write_last_sent_hash)
728            && self.read_service.db.finalized_tip_hash()
729                == self.finalized_block_write_last_sent_hash
730        {
731            // Tell the block write task to stop committing checkpoint verified blocks to the finalized state,
732            // and move on to committing semantically verified blocks to the non-finalized state.
733            std::mem::drop(self.block_write_sender.finalized.take());
734            // Remove any checkpoint-verified block hashes from `non_finalized_block_write_sent_hashes`.
735            self.non_finalized_block_write_sent_hashes = SentHashes::default();
736            // Mark `SentHashes` as usable by the `can_fork_chain_at()` method.
737            self.non_finalized_block_write_sent_hashes
738                .can_fork_chain_at_hashes = true;
739            // Send blocks from non-finalized queue
740            self.send_ready_non_finalized_queued(self.finalized_block_write_last_sent_hash);
741            // We've finished committing checkpoint verified blocks to finalized state, so drop any repeated queued blocks.
742            self.clear_finalized_block_queue(CommitBlockError::new_duplicate(
743                None,
744                KnownBlock::Finalized,
745            ));
746        } else if !self.can_fork_chain_at(&parent_hash) {
747            tracing::trace!("unready to verify, returning early");
748        } else if self.block_write_sender.finalized.is_none() {
749            // Wait until block commit task is ready to write non-finalized blocks before dequeuing them
750            self.send_ready_non_finalized_queued(parent_hash);
751
752            let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
753                "Finalized state must have at least one block before committing non-finalized state",
754            );
755
756            self.non_finalized_state_queued_blocks
757                .prune_by_height(finalized_tip_height);
758
759            self.non_finalized_block_write_sent_hashes
760                .prune_by_height(finalized_tip_height);
761        }
762
763        rsp_rx
764    }
765
766    /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
767    fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
768        self.non_finalized_block_write_sent_hashes
769            .can_fork_chain_at(hash)
770            || &self.read_service.db.finalized_tip_hash() == hash
771    }
772
773    /// Returns `true` if `queued_height` is near the final checkpoint.
774    ///
775    /// The semantic block verifier needs access to UTXOs from checkpoint verified blocks
776    /// near the final checkpoint, so that it can verify blocks that spend those UTXOs.
777    ///
778    /// If it doesn't have the required UTXOs, some blocks will time out,
779    /// but succeed after a syncer restart.
780    fn is_close_to_final_checkpoint(&self, queued_height: block::Height) -> bool {
781        queued_height >= self.full_verifier_utxo_lookahead
782    }
783
784    /// Sends all queued blocks whose parents have recently arrived starting from `new_parent`
785    /// in breadth-first ordering to the block write task which will attempt to validate and commit them
786    #[tracing::instrument(level = "debug", skip(self, new_parent))]
787    fn send_ready_non_finalized_queued(&mut self, new_parent: block::Hash) {
788        use tokio::sync::mpsc::error::SendError;
789        if let Some(non_finalized_block_write_sender) = &self.block_write_sender.non_finalized {
790            let mut new_parents: Vec<block::Hash> = vec![new_parent];
791
792            while let Some(parent_hash) = new_parents.pop() {
793                let queued_children = self
794                    .non_finalized_state_queued_blocks
795                    .dequeue_children(parent_hash);
796
797                for queued_child in queued_children {
798                    let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child;
799
800                    self.non_finalized_block_write_sent_hashes
801                        .add(&queued_child.0);
802                    let send_result = non_finalized_block_write_sender.send(queued_child.into());
803
804                    if let Err(SendError(NonFinalizedWriteMessage::Commit(queued))) = send_result {
805                        // If Zebra is shutting down, drop blocks and return an error.
806                        Self::send_semantically_verified_block_error(
807                            queued,
808                            CommitBlockError::WriteTaskExited,
809                        );
810
811                        self.clear_non_finalized_block_queue(CommitBlockError::WriteTaskExited);
812
813                        return;
814                    };
815
816                    new_parents.push(hash);
817                }
818            }
819
820            self.non_finalized_block_write_sent_hashes.finish_batch();
821        };
822    }
823
824    /// Return the tip of the current best chain.
825    pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
826        self.read_service.best_tip()
827    }
828
829    fn send_invalidate_block(
830        &self,
831        hash: block::Hash,
832    ) -> oneshot::Receiver<Result<block::Hash, InvalidateError>> {
833        let (rsp_tx, rsp_rx) = oneshot::channel();
834
835        let Some(sender) = &self.block_write_sender.non_finalized else {
836            let _ = rsp_tx.send(Err(InvalidateError::ProcessingCheckpointedBlocks));
837            return rsp_rx;
838        };
839
840        if let Err(tokio::sync::mpsc::error::SendError(error)) =
841            sender.send(NonFinalizedWriteMessage::Invalidate { hash, rsp_tx })
842        {
843            let NonFinalizedWriteMessage::Invalidate { rsp_tx, .. } = error else {
844                unreachable!("should return the same Invalidate message could not be sent");
845            };
846
847            let _ = rsp_tx.send(Err(InvalidateError::SendInvalidateRequestFailed));
848        }
849
850        rsp_rx
851    }
852
853    fn send_reconsider_block(
854        &self,
855        hash: block::Hash,
856    ) -> oneshot::Receiver<Result<Vec<block::Hash>, ReconsiderError>> {
857        let (rsp_tx, rsp_rx) = oneshot::channel();
858
859        let Some(sender) = &self.block_write_sender.non_finalized else {
860            let _ = rsp_tx.send(Err(ReconsiderError::CheckpointCommitInProgress));
861            return rsp_rx;
862        };
863
864        if let Err(tokio::sync::mpsc::error::SendError(error)) =
865            sender.send(NonFinalizedWriteMessage::Reconsider { hash, rsp_tx })
866        {
867            let NonFinalizedWriteMessage::Reconsider { rsp_tx, .. } = error else {
868                unreachable!("should return the same Reconsider message could not be sent");
869            };
870
871            let _ = rsp_tx.send(Err(ReconsiderError::ReconsiderSendFailed));
872        }
873
874        rsp_rx
875    }
876
877    /// Assert some assumptions about the semantically verified `block` before it is queued.
878    fn assert_block_can_be_validated(&self, block: &SemanticallyVerifiedBlock) {
879        // required by `Request::CommitSemanticallyVerifiedBlock` call
880        assert!(
881            block.height > self.network.mandatory_checkpoint_height(),
882            "invalid semantically verified block height: the canopy checkpoint is mandatory, pre-canopy \
883            blocks, and the canopy activation block, must be committed to the state as finalized \
884            blocks"
885        );
886    }
887
888    fn known_sent_hash(&self, hash: &block::Hash) -> Option<KnownBlock> {
889        self.non_finalized_block_write_sent_hashes
890            .contains(hash)
891            .then_some(KnownBlock::WriteChannel)
892    }
893}
894
895impl ReadStateService {
896    /// Creates a new read-only state service, using the provided finalized state and
897    /// block write task handle.
898    ///
899    /// Returns the newly created service,
900    /// and a watch channel for updating the shared recent non-finalized chain.
901    pub(crate) fn new(
902        finalized_state: &FinalizedState,
903        block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
904        non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
905    ) -> Self {
906        let read_service = Self {
907            network: finalized_state.network(),
908            db: finalized_state.db.clone(),
909            non_finalized_state_receiver,
910            block_write_task,
911        };
912
913        tracing::debug!("created new read-only state service");
914
915        read_service
916    }
917
918    /// Return the tip of the current best chain.
919    pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
920        read::best_tip(&self.latest_non_finalized_state(), &self.db)
921    }
922
923    /// Gets a clone of the latest non-finalized state from the `non_finalized_state_receiver`
924    fn latest_non_finalized_state(&self) -> NonFinalizedState {
925        self.non_finalized_state_receiver.cloned_watch_data()
926    }
927
928    /// Gets a clone of the latest, best non-finalized chain from the `non_finalized_state_receiver`
929    fn latest_best_chain(&self) -> Option<Arc<Chain>> {
930        self.non_finalized_state_receiver
931            .borrow_mapped(|non_finalized_state| non_finalized_state.best_chain().cloned())
932    }
933
934    /// Test-only access to the inner database.
935    /// Can be used to modify the database without doing any consensus checks.
936    #[cfg(any(test, feature = "proptest-impl"))]
937    pub fn db(&self) -> &ZebraDb {
938        &self.db
939    }
940
941    /// Logs rocksdb metrics using the read only state service.
942    pub fn log_db_metrics(&self) {
943        self.db.print_db_metrics();
944    }
945}
946
947impl Service<Request> for StateService {
948    type Response = Response;
949    type Error = BoxError;
950    type Future =
951        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
952
953    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
954        // Check for panics in the block write task
955        let poll = self.read_service.poll_ready(cx);
956
957        // Prune outdated UTXO requests
958        let now = Instant::now();
959
960        if self.last_prune + Self::PRUNE_INTERVAL < now {
961            let tip = self.best_tip();
962            let old_len = self.pending_utxos.len();
963
964            self.pending_utxos.prune();
965            self.last_prune = now;
966
967            let new_len = self.pending_utxos.len();
968            let prune_count = old_len
969                .checked_sub(new_len)
970                .expect("prune does not add any utxo requests");
971            if prune_count > 0 {
972                tracing::debug!(
973                    ?old_len,
974                    ?new_len,
975                    ?prune_count,
976                    ?tip,
977                    "pruned utxo requests"
978                );
979            } else {
980                tracing::debug!(len = ?old_len, ?tip, "no utxo requests needed pruning");
981            }
982        }
983
984        poll
985    }
986
987    #[instrument(name = "state", skip(self, req))]
988    fn call(&mut self, req: Request) -> Self::Future {
989        req.count_metric();
990        let span = Span::current();
991
992        match req {
993            // Uses non_finalized_state_queued_blocks and pending_utxos in the StateService
994            // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
995            //
996            // The expected error type for this request is `CommitSemanticallyVerifiedError`.
997            Request::CommitSemanticallyVerifiedBlock(semantically_verified) => {
998                let timer = CodeTimer::start();
999                self.assert_block_can_be_validated(&semantically_verified);
1000
1001                self.pending_utxos
1002                    .check_against_ordered(&semantically_verified.new_outputs);
1003
1004                // # Performance
1005                //
1006                // Allow other async tasks to make progress while blocks are being verified
1007                // and written to disk. But wait for the blocks to finish committing,
1008                // so that `StateService` multi-block queries always observe a consistent state.
1009                //
1010                // Since each block is spawned into its own task,
1011                // there shouldn't be any other code running in the same task,
1012                // so we don't need to worry about blocking it:
1013                // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
1014
1015                let rsp_rx = tokio::task::block_in_place(move || {
1016                    span.in_scope(|| {
1017                        self.queue_and_commit_to_non_finalized_state(semantically_verified)
1018                    })
1019                });
1020
1021                // TODO:
1022                //   - check for panics in the block write task here,
1023                //     as well as in poll_ready()
1024
1025                // The work is all done, the future just waits on a channel for the result
1026                timer.finish_desc("CommitSemanticallyVerifiedBlock");
1027
1028                // Await the channel response, flatten the result, map receive errors to
1029                // `CommitSemanticallyVerifiedError::WriteTaskExited`.
1030                // Then flatten the nested Result and convert any errors to a BoxError.
1031                let span = Span::current();
1032                async move {
1033                    rsp_rx
1034                        .await
1035                        .map_err(|_recv_error| CommitBlockError::WriteTaskExited.into())
1036                        .and_then(|result| result)
1037                        .map_err(BoxError::from)
1038                        .map(Response::Committed)
1039                }
1040                .instrument(span)
1041                .boxed()
1042            }
1043
1044            // Uses finalized_state_queued_blocks and pending_utxos in the StateService.
1045            // Accesses shared writeable state in the StateService.
1046            //
1047            // The expected error type for this request is `CommitCheckpointVerifiedError`.
1048            Request::CommitCheckpointVerifiedBlock(finalized) => {
1049                let timer = CodeTimer::start();
1050                // # Consensus
1051                //
1052                // A semantic block verification could have called AwaitUtxo
1053                // before this checkpoint verified block arrived in the state.
1054                // So we need to check for pending UTXO requests sent by running
1055                // semantic block verifications.
1056                //
1057                // This check is redundant for most checkpoint verified blocks,
1058                // because semantic verification can only succeed near the final
1059                // checkpoint, when all the UTXOs are available for the verifying block.
1060                //
1061                // (Checkpoint block UTXOs are verified using block hash checkpoints
1062                // and transaction merkle tree block header commitments.)
1063                self.pending_utxos
1064                    .check_against_ordered(&finalized.new_outputs);
1065
1066                // # Performance
1067                //
1068                // This method doesn't block, access the database, or perform CPU-intensive tasks,
1069                // so we can run it directly in the tokio executor's Future threads.
1070                let rsp_rx = self.queue_and_commit_to_finalized_state(finalized);
1071
1072                // TODO:
1073                //   - check for panics in the block write task here,
1074                //     as well as in poll_ready()
1075
1076                // The work is all done, the future just waits on a channel for the result
1077                timer.finish_desc("CommitCheckpointVerifiedBlock");
1078
1079                // Await the channel response, flatten the result, map receive errors to
1080                // `CommitCheckpointVerifiedError::WriteTaskExited`.
1081                // Then flatten the nested Result and convert any errors to a BoxError.
1082                async move {
1083                    rsp_rx
1084                        .await
1085                        .map_err(|_recv_error| CommitBlockError::WriteTaskExited.into())
1086                        .and_then(|result| result)
1087                        .map_err(BoxError::from)
1088                        .map(Response::Committed)
1089                }
1090                .instrument(span)
1091                .boxed()
1092            }
1093
1094            // Uses pending_utxos and non_finalized_state_queued_blocks in the StateService.
1095            // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
1096            Request::AwaitUtxo(outpoint) => {
1097                let timer = CodeTimer::start();
1098                // Prepare the AwaitUtxo future from PendingUxtos.
1099                let response_fut = self.pending_utxos.queue(outpoint);
1100                // Only instrument `response_fut`, the ReadStateService already
1101                // instruments its requests with the same span.
1102
1103                let response_fut = response_fut.instrument(span).boxed();
1104
1105                // Check the non-finalized block queue outside the returned future,
1106                // so we can access mutable state fields.
1107                if let Some(utxo) = self.non_finalized_state_queued_blocks.utxo(&outpoint) {
1108                    self.pending_utxos.respond(&outpoint, utxo);
1109
1110                    // We're finished, the returned future gets the UTXO from the respond() channel.
1111                    timer.finish_desc("AwaitUtxo/queued-non-finalized");
1112
1113                    return response_fut;
1114                }
1115
1116                // Check the sent non-finalized blocks
1117                if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) {
1118                    self.pending_utxos.respond(&outpoint, utxo);
1119
1120                    // We're finished, the returned future gets the UTXO from the respond() channel.
1121                    timer.finish_desc("AwaitUtxo/sent-non-finalized");
1122
1123                    return response_fut;
1124                }
1125
1126                // We ignore any UTXOs in FinalizedState.finalized_state_queued_blocks,
1127                // because it is only used during checkpoint verification.
1128                //
1129                // This creates a rare race condition, but it doesn't seem to happen much in practice.
1130                // See #5126 for details.
1131
1132                // Manually send a request to the ReadStateService,
1133                // to get UTXOs from any non-finalized chain or the finalized chain.
1134                let read_service = self.read_service.clone();
1135
1136                // Run the request in an async block, so we can await the response.
1137                async move {
1138                    let req = ReadRequest::AnyChainUtxo(outpoint);
1139
1140                    let rsp = read_service.oneshot(req).await?;
1141
1142                    // Optional TODO:
1143                    //  - make pending_utxos.respond() async using a channel,
1144                    //    so we can respond to all waiting requests here
1145                    //
1146                    // This change is not required for correctness, because:
1147                    // - any waiting requests should have returned when the block was sent to the state
1148                    // - otherwise, the request returns immediately if:
1149                    //   - the block is in the non-finalized queue, or
1150                    //   - the block is in any non-finalized chain or the finalized state
1151                    //
1152                    // And if the block is in the finalized queue,
1153                    // that's rare enough that a retry is ok.
1154                    if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp {
1155                        // We got a UTXO, so we replace the response future with the result own.
1156                        timer.finish_desc("AwaitUtxo/any-chain");
1157
1158                        return Ok(Response::Utxo(utxo));
1159                    }
1160
1161                    // We're finished, but the returned future is waiting on the respond() channel.
1162                    timer.finish_desc("AwaitUtxo/waiting");
1163
1164                    response_fut.await
1165                }
1166                .boxed()
1167            }
1168
1169            // Used by sync, inbound, and block verifier to check if a block is already in the state
1170            // before downloading or validating it.
1171            Request::KnownBlock(hash) => {
1172                let timer = CodeTimer::start();
1173                let sent_hash_response = self.known_sent_hash(&hash);
1174                let read_service = self.read_service.clone();
1175
1176                async move {
1177                    if sent_hash_response.is_some() {
1178                        return Ok(Response::KnownBlock(sent_hash_response));
1179                    };
1180
1181                    let response = read::non_finalized_state_contains_block_hash(
1182                        &read_service.latest_non_finalized_state(),
1183                        hash,
1184                    )
1185                    // TODO: Move this to a blocking task, perhaps by moving some of this logic to the ReadStateService.
1186                    .or_else(|| read::finalized_state_contains_block_hash(&read_service.db, hash));
1187
1188                    timer.finish_desc("Request::KnownBlock");
1189
1190                    Ok(Response::KnownBlock(response))
1191                }
1192                .boxed()
1193            }
1194
1195            // The expected error type for this request is `InvalidateError`
1196            Request::InvalidateBlock(block_hash) => {
1197                let rsp_rx = tokio::task::block_in_place(move || {
1198                    span.in_scope(|| self.send_invalidate_block(block_hash))
1199                });
1200
1201                // Await the channel response, flatten the result, map receive errors to
1202                // `InvalidateError::InvalidateRequestDropped`.
1203                // Then flatten the nested Result and convert any errors to a BoxError.
1204                let span = Span::current();
1205                async move {
1206                    rsp_rx
1207                        .await
1208                        .map_err(|_recv_error| InvalidateError::InvalidateRequestDropped)
1209                        .and_then(|result| result)
1210                        .map_err(BoxError::from)
1211                        .map(Response::Invalidated)
1212                }
1213                .instrument(span)
1214                .boxed()
1215            }
1216
1217            // The expected error type for this request is `ReconsiderError`
1218            Request::ReconsiderBlock(block_hash) => {
1219                let rsp_rx = tokio::task::block_in_place(move || {
1220                    span.in_scope(|| self.send_reconsider_block(block_hash))
1221                });
1222
1223                // Await the channel response, flatten the result, map receive errors to
1224                // `ReconsiderError::ReconsiderResponseDropped`.
1225                // Then flatten the nested Result and convert any errors to a BoxError.
1226                let span = Span::current();
1227                async move {
1228                    rsp_rx
1229                        .await
1230                        .map_err(|_recv_error| ReconsiderError::ReconsiderResponseDropped)
1231                        .and_then(|result| result)
1232                        .map_err(BoxError::from)
1233                        .map(Response::Reconsidered)
1234                }
1235                .instrument(span)
1236                .boxed()
1237            }
1238
1239            // Runs concurrently using the ReadStateService
1240            Request::Tip
1241            | Request::Depth(_)
1242            | Request::BestChainNextMedianTimePast
1243            | Request::BestChainBlockHash(_)
1244            | Request::BlockLocator
1245            | Request::Transaction(_)
1246            | Request::AnyChainTransaction(_)
1247            | Request::UnspentBestChainUtxo(_)
1248            | Request::Block(_)
1249            | Request::AnyChainBlock(_)
1250            | Request::BlockAndSize(_)
1251            | Request::BlockHeader(_)
1252            | Request::FindBlockHashes { .. }
1253            | Request::FindBlockHeaders { .. }
1254            | Request::CheckBestChainTipNullifiersAndAnchors(_)
1255            | Request::CheckBlockProposalValidity(_) => {
1256                // Redirect the request to the concurrent ReadStateService
1257                let read_service = self.read_service.clone();
1258
1259                async move {
1260                    let req = req
1261                        .try_into()
1262                        .expect("ReadRequest conversion should not fail");
1263
1264                    let rsp = read_service.oneshot(req).await?;
1265                    let rsp = rsp.try_into().expect("Response conversion should not fail");
1266
1267                    Ok(rsp)
1268                }
1269                .boxed()
1270            }
1271        }
1272    }
1273}
1274
1275impl Service<ReadRequest> for ReadStateService {
1276    type Response = ReadResponse;
1277    type Error = BoxError;
1278    type Future =
1279        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1280
1281    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1282        // Check for panics in the block write task
1283        //
1284        // TODO: move into a check_for_panics() method
1285        let block_write_task = self.block_write_task.take();
1286
1287        if let Some(block_write_task) = block_write_task {
1288            if block_write_task.is_finished() {
1289                if let Some(block_write_task) = Arc::into_inner(block_write_task) {
1290                    // We are the last state with a reference to this task, so we can propagate any panics
1291                    if let Err(thread_panic) = block_write_task.join() {
1292                        std::panic::resume_unwind(thread_panic);
1293                    }
1294                }
1295            } else {
1296                // It hasn't finished, so we need to put it back
1297                self.block_write_task = Some(block_write_task);
1298            }
1299        }
1300
1301        self.db.check_for_panics();
1302
1303        Poll::Ready(Ok(()))
1304    }
1305
1306    #[instrument(name = "read_state", skip(self, req))]
1307    fn call(&mut self, req: ReadRequest) -> Self::Future {
1308        req.count_metric();
1309        let timer = CodeTimer::start_desc(req.variant_name());
1310        let span = Span::current();
1311        let timed_span = TimedSpan::new(timer, span);
1312        let state = self.clone();
1313
1314        if req == ReadRequest::NonFinalizedBlocksListener {
1315            // The non-finalized blocks listener is used to notify the state service
1316            // about new blocks that have been added to the non-finalized state.
1317            let non_finalized_blocks_listener = NonFinalizedBlocksListener::spawn(
1318                self.network.clone(),
1319                self.non_finalized_state_receiver.clone(),
1320            );
1321
1322            return async move {
1323                Ok(ReadResponse::NonFinalizedBlocksListener(
1324                    non_finalized_blocks_listener,
1325                ))
1326            }
1327            .boxed();
1328        };
1329
1330        let request_handler = move || match req {
1331            // Used by the `getblockchaininfo` RPC.
1332            ReadRequest::UsageInfo => Ok(ReadResponse::UsageInfo(state.db.size())),
1333
1334            // Used by the StateService.
1335            ReadRequest::Tip => Ok(ReadResponse::Tip(read::tip(
1336                state.latest_best_chain(),
1337                &state.db,
1338            ))),
1339
1340            // Used by `getblockchaininfo` RPC method.
1341            ReadRequest::TipPoolValues => {
1342                let (tip_height, tip_hash, value_balance) =
1343                    read::tip_with_value_balance(state.latest_best_chain(), &state.db)?
1344                        .ok_or(BoxError::from("no chain tip available yet"))?;
1345
1346                Ok(ReadResponse::TipPoolValues {
1347                    tip_height,
1348                    tip_hash,
1349                    value_balance,
1350                })
1351            }
1352
1353            // Used by getblock
1354            ReadRequest::BlockInfo(hash_or_height) => Ok(ReadResponse::BlockInfo(
1355                read::block_info(state.latest_best_chain(), &state.db, hash_or_height),
1356            )),
1357
1358            // Used by the StateService.
1359            ReadRequest::Depth(hash) => Ok(ReadResponse::Depth(read::depth(
1360                state.latest_best_chain(),
1361                &state.db,
1362                hash,
1363            ))),
1364
1365            // Used by the StateService.
1366            ReadRequest::BestChainNextMedianTimePast => {
1367                Ok(ReadResponse::BestChainNextMedianTimePast(
1368                    read::next_median_time_past(&state.latest_non_finalized_state(), &state.db)?,
1369                ))
1370            }
1371
1372            // Used by the get_block (raw) RPC and the StateService.
1373            ReadRequest::Block(hash_or_height) => Ok(ReadResponse::Block(read::block(
1374                state.latest_best_chain(),
1375                &state.db,
1376                hash_or_height,
1377            ))),
1378
1379            ReadRequest::AnyChainBlock(hash_or_height) => Ok(ReadResponse::Block(read::any_block(
1380                state.latest_non_finalized_state().chain_iter(),
1381                &state.db,
1382                hash_or_height,
1383            ))),
1384
1385            // Used by the get_block (raw) RPC and the StateService.
1386            ReadRequest::BlockAndSize(hash_or_height) => Ok(ReadResponse::BlockAndSize(
1387                read::block_and_size(state.latest_best_chain(), &state.db, hash_or_height),
1388            )),
1389
1390            // Used by the get_block (verbose) RPC and the StateService.
1391            ReadRequest::BlockHeader(hash_or_height) => {
1392                let best_chain = state.latest_best_chain();
1393
1394                let height = hash_or_height
1395                    .height_or_else(|hash| {
1396                        read::find::height_by_hash(best_chain.clone(), &state.db, hash)
1397                    })
1398                    .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1399
1400                let hash = hash_or_height
1401                    .hash_or_else(|height| {
1402                        read::find::hash_by_height(best_chain.clone(), &state.db, height)
1403                    })
1404                    .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1405
1406                let next_height = height.next()?;
1407                let next_block_hash =
1408                    read::find::hash_by_height(best_chain.clone(), &state.db, next_height);
1409
1410                let header = read::block_header(best_chain, &state.db, height.into())
1411                    .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1412
1413                Ok(ReadResponse::BlockHeader {
1414                    header,
1415                    hash,
1416                    height,
1417                    next_block_hash,
1418                })
1419            }
1420
1421            // For the get_raw_transaction RPC and the StateService.
1422            ReadRequest::Transaction(hash) => Ok(ReadResponse::Transaction(
1423                read::mined_transaction(state.latest_best_chain(), &state.db, hash),
1424            )),
1425
1426            ReadRequest::AnyChainTransaction(hash) => {
1427                Ok(ReadResponse::AnyChainTransaction(read::any_transaction(
1428                    state.latest_non_finalized_state().chain_iter(),
1429                    &state.db,
1430                    hash,
1431                )))
1432            }
1433
1434            // Used by the getblock (verbose) RPC.
1435            ReadRequest::TransactionIdsForBlock(hash_or_height) => Ok(
1436                ReadResponse::TransactionIdsForBlock(read::transaction_hashes_for_block(
1437                    state.latest_best_chain(),
1438                    &state.db,
1439                    hash_or_height,
1440                )),
1441            ),
1442
1443            ReadRequest::AnyChainTransactionIdsForBlock(hash_or_height) => {
1444                Ok(ReadResponse::AnyChainTransactionIdsForBlock(
1445                    read::transaction_hashes_for_any_block(
1446                        state.latest_non_finalized_state().chain_iter(),
1447                        &state.db,
1448                        hash_or_height,
1449                    ),
1450                ))
1451            }
1452
1453            #[cfg(feature = "indexer")]
1454            ReadRequest::SpendingTransactionId(spend) => Ok(ReadResponse::TransactionId(
1455                read::spending_transaction_hash(state.latest_best_chain(), &state.db, spend),
1456            )),
1457
1458            ReadRequest::UnspentBestChainUtxo(outpoint) => Ok(ReadResponse::UnspentBestChainUtxo(
1459                read::unspent_utxo(state.latest_best_chain(), &state.db, outpoint),
1460            )),
1461
1462            // Manually used by the StateService to implement part of AwaitUtxo.
1463            ReadRequest::AnyChainUtxo(outpoint) => Ok(ReadResponse::AnyChainUtxo(read::any_utxo(
1464                state.latest_non_finalized_state(),
1465                &state.db,
1466                outpoint,
1467            ))),
1468
1469            // Used by the StateService.
1470            ReadRequest::BlockLocator => Ok(ReadResponse::BlockLocator(
1471                read::block_locator(state.latest_best_chain(), &state.db).unwrap_or_default(),
1472            )),
1473
1474            // Used by the StateService.
1475            ReadRequest::FindBlockHashes { known_blocks, stop } => {
1476                Ok(ReadResponse::BlockHashes(read::find_chain_hashes(
1477                    state.latest_best_chain(),
1478                    &state.db,
1479                    known_blocks,
1480                    stop,
1481                    MAX_FIND_BLOCK_HASHES_RESULTS,
1482                )))
1483            }
1484
1485            // Used by the StateService.
1486            ReadRequest::FindBlockHeaders { known_blocks, stop } => Ok(ReadResponse::BlockHeaders(
1487                read::find_chain_headers(
1488                    state.latest_best_chain(),
1489                    &state.db,
1490                    known_blocks,
1491                    stop,
1492                    MAX_FIND_BLOCK_HEADERS_RESULTS,
1493                )
1494                .into_iter()
1495                .map(|header| CountedHeader { header })
1496                .collect(),
1497            )),
1498
1499            ReadRequest::SaplingTree(hash_or_height) => Ok(ReadResponse::SaplingTree(
1500                read::sapling_tree(state.latest_best_chain(), &state.db, hash_or_height),
1501            )),
1502
1503            ReadRequest::OrchardTree(hash_or_height) => Ok(ReadResponse::OrchardTree(
1504                read::orchard_tree(state.latest_best_chain(), &state.db, hash_or_height),
1505            )),
1506
1507            ReadRequest::SaplingSubtrees { start_index, limit } => {
1508                let end_index = limit
1509                    .and_then(|limit| start_index.0.checked_add(limit.0))
1510                    .map(NoteCommitmentSubtreeIndex);
1511
1512                let best_chain = state.latest_best_chain();
1513                let sapling_subtrees = if let Some(end_index) = end_index {
1514                    read::sapling_subtrees(best_chain, &state.db, start_index..end_index)
1515                } else {
1516                    // If there is no end bound, just return all the trees.
1517                    // If the end bound would overflow, just returns all the trees, because that's what
1518                    // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1519                    // the trees run out.)
1520                    read::sapling_subtrees(best_chain, &state.db, start_index..)
1521                };
1522
1523                Ok(ReadResponse::SaplingSubtrees(sapling_subtrees))
1524            }
1525
1526            ReadRequest::OrchardSubtrees { start_index, limit } => {
1527                let end_index = limit
1528                    .and_then(|limit| start_index.0.checked_add(limit.0))
1529                    .map(NoteCommitmentSubtreeIndex);
1530
1531                let best_chain = state.latest_best_chain();
1532                let orchard_subtrees = if let Some(end_index) = end_index {
1533                    read::orchard_subtrees(best_chain, &state.db, start_index..end_index)
1534                } else {
1535                    // If there is no end bound, just return all the trees.
1536                    // If the end bound would overflow, just returns all the trees, because that's what
1537                    // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1538                    // the trees run out.)
1539                    read::orchard_subtrees(best_chain, &state.db, start_index..)
1540                };
1541
1542                Ok(ReadResponse::OrchardSubtrees(orchard_subtrees))
1543            }
1544
1545            // For the get_address_balance RPC.
1546            ReadRequest::AddressBalance(addresses) => {
1547                let (balance, received) =
1548                    read::transparent_balance(state.latest_best_chain(), &state.db, addresses)?;
1549                Ok(ReadResponse::AddressBalance { balance, received })
1550            }
1551
1552            // For the get_address_tx_ids RPC.
1553            ReadRequest::TransactionIdsByAddresses {
1554                addresses,
1555                height_range,
1556            } => read::transparent_tx_ids(
1557                state.latest_best_chain(),
1558                &state.db,
1559                addresses,
1560                height_range,
1561            )
1562            .map(ReadResponse::AddressesTransactionIds),
1563
1564            // For the get_address_utxos RPC.
1565            ReadRequest::UtxosByAddresses(addresses) => read::address_utxos(
1566                &state.network,
1567                state.latest_best_chain(),
1568                &state.db,
1569                addresses,
1570            )
1571            .map(ReadResponse::AddressUtxos),
1572
1573            ReadRequest::CheckBestChainTipNullifiersAndAnchors(unmined_tx) => {
1574                let latest_non_finalized_best_chain = state.latest_best_chain();
1575
1576                check::nullifier::tx_no_duplicates_in_chain(
1577                    &state.db,
1578                    latest_non_finalized_best_chain.as_ref(),
1579                    &unmined_tx.transaction,
1580                )?;
1581
1582                check::anchors::tx_anchors_refer_to_final_treestates(
1583                    &state.db,
1584                    latest_non_finalized_best_chain.as_ref(),
1585                    &unmined_tx,
1586                )?;
1587
1588                Ok(ReadResponse::ValidBestChainTipNullifiersAndAnchors)
1589            }
1590
1591            // Used by the get_block and get_block_hash RPCs.
1592            ReadRequest::BestChainBlockHash(height) => Ok(ReadResponse::BlockHash(
1593                read::hash_by_height(state.latest_best_chain(), &state.db, height),
1594            )),
1595
1596            // Used by get_block_template and getblockchaininfo RPCs.
1597            ReadRequest::ChainInfo => {
1598                // # Correctness
1599                //
1600                // It is ok to do these lookups using multiple database calls. Finalized state updates
1601                // can only add overlapping blocks, and block hashes are unique across all chain forks.
1602                //
1603                // If there is a large overlap between the non-finalized and finalized states,
1604                // where the finalized tip is above the non-finalized tip,
1605                // Zebra is receiving a lot of blocks, or this request has been delayed for a long time.
1606                //
1607                // In that case, the `getblocktemplate` RPC will return an error because Zebra
1608                // is not synced to the tip. That check happens before the RPC makes this request.
1609                read::difficulty::get_block_template_chain_info(
1610                    &state.latest_non_finalized_state(),
1611                    &state.db,
1612                    &state.network,
1613                )
1614                .map(ReadResponse::ChainInfo)
1615            }
1616
1617            // Used by getmininginfo, getnetworksolps, and getnetworkhashps RPCs.
1618            ReadRequest::SolutionRate { num_blocks, height } => {
1619                let latest_non_finalized_state = state.latest_non_finalized_state();
1620                // # Correctness
1621                //
1622                // It is ok to do these lookups using multiple database calls. Finalized state updates
1623                // can only add overlapping blocks, and block hashes are unique across all chain forks.
1624                //
1625                // The worst that can happen here is that the default `start_hash` will be below
1626                // the chain tip.
1627                let (tip_height, tip_hash) =
1628                    match read::tip(latest_non_finalized_state.best_chain(), &state.db) {
1629                        Some(tip_hash) => tip_hash,
1630                        None => return Ok(ReadResponse::SolutionRate(None)),
1631                    };
1632
1633                let start_hash = match height {
1634                    Some(height) if height < tip_height => read::hash_by_height(
1635                        latest_non_finalized_state.best_chain(),
1636                        &state.db,
1637                        height,
1638                    ),
1639                    // use the chain tip hash if height is above it or not provided.
1640                    _ => Some(tip_hash),
1641                };
1642
1643                let solution_rate = start_hash.and_then(|start_hash| {
1644                    read::difficulty::solution_rate(
1645                        &latest_non_finalized_state,
1646                        &state.db,
1647                        num_blocks,
1648                        start_hash,
1649                    )
1650                });
1651
1652                Ok(ReadResponse::SolutionRate(solution_rate))
1653            }
1654
1655            ReadRequest::CheckBlockProposalValidity(semantically_verified) => {
1656                tracing::debug!(
1657                    "attempting to validate and commit block proposal \
1658                         onto a cloned non-finalized state"
1659                );
1660                let mut latest_non_finalized_state = state.latest_non_finalized_state();
1661
1662                // The previous block of a valid proposal must be on the best chain tip.
1663                let Some((_best_tip_height, best_tip_hash)) =
1664                    read::best_tip(&latest_non_finalized_state, &state.db)
1665                else {
1666                    return Err(
1667                        "state is empty: wait for Zebra to sync before submitting a proposal"
1668                            .into(),
1669                    );
1670                };
1671
1672                if semantically_verified.block.header.previous_block_hash != best_tip_hash {
1673                    return Err("proposal is not based on the current best chain tip: \
1674                                    previous block hash must be the best chain tip"
1675                        .into());
1676                }
1677
1678                // This clone of the non-finalized state is dropped when this closure returns.
1679                // The non-finalized state that's used in the rest of the state (including finalizing
1680                // blocks into the db) is not mutated here.
1681                //
1682                // TODO: Convert `CommitSemanticallyVerifiedError` to a new `ValidateProposalError`?
1683                latest_non_finalized_state.disable_metrics();
1684
1685                write::validate_and_commit_non_finalized(
1686                    &state.db,
1687                    &mut latest_non_finalized_state,
1688                    semantically_verified,
1689                )?;
1690
1691                Ok(ReadResponse::ValidBlockProposal)
1692            }
1693
1694            ReadRequest::TipBlockSize => {
1695                // Respond with the length of the obtained block if any.
1696                Ok(ReadResponse::TipBlockSize(
1697                    state
1698                        .best_tip()
1699                        .and_then(|(tip_height, _)| {
1700                            read::block_info(
1701                                state.latest_best_chain(),
1702                                &state.db,
1703                                tip_height.into(),
1704                            )
1705                        })
1706                        .map(|info| info.size().try_into().expect("u32 should fit in usize"))
1707                        .or_else(|| {
1708                            find::tip_block(state.latest_best_chain(), &state.db)
1709                                .map(|b| b.zcash_serialized_size())
1710                        }),
1711                ))
1712            }
1713
1714            ReadRequest::NonFinalizedBlocksListener => {
1715                unreachable!("should return early");
1716            }
1717
1718            // Used by `gettxout` RPC method.
1719            ReadRequest::IsTransparentOutputSpent(outpoint) => {
1720                let is_spent = read::unspent_utxo(state.latest_best_chain(), &state.db, outpoint);
1721                Ok(ReadResponse::IsTransparentOutputSpent(is_spent.is_none()))
1722            }
1723        };
1724
1725        timed_span.spawn_blocking(request_handler)
1726    }
1727}
1728
1729/// Initialize a state service from the provided [`Config`].
1730/// Returns a boxed state service, a read-only state service,
1731/// and receivers for state chain tip updates.
1732///
1733/// Each `network` has its own separate on-disk database.
1734///
1735/// The state uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
1736/// to work out when it is near the final checkpoint.
1737///
1738/// To share access to the state, wrap the returned service in a `Buffer`,
1739/// or clone the returned [`ReadStateService`].
1740///
1741/// It's possible to construct multiple state services in the same application (as
1742/// long as they, e.g., use different storage locations), but doing so is
1743/// probably not what you want.
1744pub async fn init(
1745    config: Config,
1746    network: &Network,
1747    max_checkpoint_height: block::Height,
1748    checkpoint_verify_concurrency_limit: usize,
1749) -> (
1750    BoxService<Request, Response, BoxError>,
1751    ReadStateService,
1752    LatestChainTip,
1753    ChainTipChange,
1754) {
1755    let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
1756        StateService::new(
1757            config,
1758            network,
1759            max_checkpoint_height,
1760            checkpoint_verify_concurrency_limit,
1761        )
1762        .await;
1763
1764    (
1765        BoxService::new(state_service),
1766        read_only_state_service,
1767        latest_chain_tip,
1768        chain_tip_change,
1769    )
1770}
1771
1772/// Initialize a read state service from the provided [`Config`].
1773/// Returns a read-only state service,
1774///
1775/// Each `network` has its own separate on-disk database.
1776///
1777/// To share access to the state, clone the returned [`ReadStateService`].
1778pub fn init_read_only(
1779    config: Config,
1780    network: &Network,
1781) -> (
1782    ReadStateService,
1783    ZebraDb,
1784    tokio::sync::watch::Sender<NonFinalizedState>,
1785) {
1786    let finalized_state = FinalizedState::new_with_debug(
1787        &config,
1788        network,
1789        true,
1790        #[cfg(feature = "elasticsearch")]
1791        false,
1792        true,
1793    );
1794    let (non_finalized_state_sender, non_finalized_state_receiver) =
1795        tokio::sync::watch::channel(NonFinalizedState::new(network));
1796
1797    (
1798        ReadStateService::new(
1799            &finalized_state,
1800            None,
1801            WatchReceiver::new(non_finalized_state_receiver),
1802        ),
1803        finalized_state.db.clone(),
1804        non_finalized_state_sender,
1805    )
1806}
1807
1808/// Calls [`init_read_only`] with the provided [`Config`] and [`Network`] from a blocking task.
1809/// Returns a [`tokio::task::JoinHandle`] with a read state service and chain tip sender.
1810pub fn spawn_init_read_only(
1811    config: Config,
1812    network: &Network,
1813) -> tokio::task::JoinHandle<(
1814    ReadStateService,
1815    ZebraDb,
1816    tokio::sync::watch::Sender<NonFinalizedState>,
1817)> {
1818    let network = network.clone();
1819    tokio::task::spawn_blocking(move || init_read_only(config, &network))
1820}
1821
1822/// Returns a [`StateService`] with an ephemeral [`Config`] and a buffer with a single slot.
1823///
1824/// This can be used to create a state service for testing. See also [`init`].
1825#[cfg(any(test, feature = "proptest-impl"))]
1826pub async fn init_test(
1827    network: &Network,
1828) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
1829    // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
1830    //       if we ever need to test final checkpoint sent UTXO queries
1831    let (state_service, _, _, _) =
1832        StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
1833
1834    Buffer::new(BoxService::new(state_service), 1)
1835}
1836
1837/// Initializes a state service with an ephemeral [`Config`] and a buffer with a single slot,
1838/// then returns the read-write service, read-only service, and tip watch channels.
1839///
1840/// This can be used to create a state service for testing. See also [`init`].
1841#[cfg(any(test, feature = "proptest-impl"))]
1842pub async fn init_test_services(
1843    network: &Network,
1844) -> (
1845    Buffer<BoxService<Request, Response, BoxError>, Request>,
1846    ReadStateService,
1847    LatestChainTip,
1848    ChainTipChange,
1849) {
1850    // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
1851    //       if we ever need to test final checkpoint sent UTXO queries
1852    let (state_service, read_state_service, latest_chain_tip, chain_tip_change) =
1853        StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
1854
1855    let state_service = Buffer::new(BoxService::new(state_service), 1);
1856
1857    (
1858        state_service,
1859        read_state_service,
1860        latest_chain_tip,
1861        chain_tip_change,
1862    )
1863}