Skip to main content

zebra_state/
service.rs

1//! [`tower::Service`]s for Zebra's cached chain state.
2//!
3//! Zebra provides cached state access via two main services:
4//! - [`StateService`]: a read-write service that writes blocks to the state,
5//!   and redirects most read requests to the [`ReadStateService`].
6//! - [`ReadStateService`]: a read-only service that answers from the most
7//!   recent committed block.
8//!
9//! Most users should prefer [`ReadStateService`], unless they need to write blocks to the state.
10//!
11//! Zebra also provides access to the best chain tip via:
12//! - [`LatestChainTip`]: a read-only channel that contains the latest committed
13//!   tip.
14//! - [`ChainTipChange`]: a read-only channel that can asynchronously await
15//!   chain tip changes.
16
17use std::{
18    collections::HashMap,
19    future::Future,
20    pin::Pin,
21    sync::Arc,
22    task::{Context, Poll},
23    time::{Duration, Instant},
24};
25
26use futures::future::FutureExt;
27use tokio::sync::oneshot;
28use tower::{util::BoxService, Service, ServiceExt};
29use tracing::{instrument, Instrument, Span};
30
31#[cfg(any(test, feature = "proptest-impl"))]
32use tower::buffer::Buffer;
33
34use zebra_chain::{
35    block::{self, CountedHeader, HeightDiff},
36    diagnostic::CodeTimer,
37    parameters::{Network, NetworkUpgrade},
38    serialization::ZcashSerialize,
39    subtree::NoteCommitmentSubtreeIndex,
40};
41
42use crate::{
43    constants::{
44        MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS, MAX_LEGACY_CHAIN_BLOCKS,
45    },
46    error::{CommitBlockError, CommitCheckpointVerifiedError, InvalidateError, ReconsiderError},
47    request::TimedSpan,
48    response::NonFinalizedBlocksListener,
49    service::{
50        block_iter::any_ancestor_blocks,
51        chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
52        finalized_state::{FinalizedState, ZebraDb},
53        non_finalized_state::{Chain, NonFinalizedState},
54        pending_utxos::PendingUtxos,
55        queued_blocks::QueuedBlocks,
56        read::find,
57        watch_receiver::WatchReceiver,
58    },
59    BoxError, CheckpointVerifiedBlock, CommitSemanticallyVerifiedError, Config, KnownBlock,
60    ReadRequest, ReadResponse, Request, Response, SemanticallyVerifiedBlock,
61};
62
63pub mod block_iter;
64pub mod chain_tip;
65pub mod watch_receiver;
66
67pub mod check;
68
69pub(crate) mod finalized_state;
70pub(crate) mod non_finalized_state;
71mod pending_utxos;
72mod queued_blocks;
73pub(crate) mod read;
74mod traits;
75mod write;
76
77#[cfg(any(test, feature = "proptest-impl"))]
78pub mod arbitrary;
79
80#[cfg(test)]
81mod tests;
82
83pub use finalized_state::{OutputLocation, TransactionIndex, TransactionLocation};
84use write::NonFinalizedWriteMessage;
85
86use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes};
87
88pub use self::traits::{ReadState, State};
89
90/// A read-write service for Zebra's cached blockchain state.
91///
92/// This service modifies and provides access to:
93/// - the non-finalized state: the ~100 most recent blocks.
94///   Zebra allows chain forks in the non-finalized state,
95///   stores it in memory, and re-downloads it when restarted.
96/// - the finalized state: older blocks that have many confirmations.
97///   Zebra stores the single best chain in the finalized state,
98///   and re-loads it from disk when restarted.
99///
100/// Read requests to this service are buffered, then processed concurrently.
101/// Block write requests are buffered, then queued, then processed in order by a separate task.
102///
103/// Most state users can get faster read responses using the [`ReadStateService`],
104/// because its requests do not share a [`tower::buffer::Buffer`] with block write requests.
105///
106/// To quickly get the latest block, use [`LatestChainTip`] or [`ChainTipChange`].
107/// They can read the latest block directly, without queueing any requests.
108#[derive(Debug)]
109pub(crate) struct StateService {
110    // Configuration
111    //
112    /// The configured Zcash network.
113    network: Network,
114
115    /// The height that we start storing UTXOs from finalized blocks.
116    ///
117    /// This height should be lower than the last few checkpoints,
118    /// so the full verifier can verify UTXO spends from those blocks,
119    /// even if they haven't been committed to the finalized state yet.
120    full_verifier_utxo_lookahead: block::Height,
121
122    // Queued Blocks
123    //
124    /// Queued blocks for the [`NonFinalizedState`] that arrived out of order.
125    /// These blocks are awaiting their parent blocks before they can do contextual verification.
126    non_finalized_state_queued_blocks: QueuedBlocks,
127
128    /// Queued blocks for the [`FinalizedState`] that arrived out of order.
129    /// These blocks are awaiting their parent blocks before they can do contextual verification.
130    ///
131    /// Indexed by their parent block hash.
132    finalized_state_queued_blocks: HashMap<block::Hash, QueuedCheckpointVerified>,
133
134    /// Channels to send blocks to the block write task.
135    block_write_sender: write::BlockWriteSender,
136
137    /// The [`block::Hash`] of the most recent block sent on
138    /// `finalized_block_write_sender` or `non_finalized_block_write_sender`.
139    ///
140    /// On startup, this is:
141    /// - the finalized tip, if there are stored blocks, or
142    /// - the genesis block's parent hash, if the database is empty.
143    ///
144    /// If `invalid_block_write_reset_receiver` gets a reset, this is:
145    /// - the hash of the last valid committed block (the parent of the invalid block).
146    finalized_block_write_last_sent_hash: block::Hash,
147
148    /// A set of block hashes that have been sent to the block write task.
149    /// Hashes of blocks below the finalized tip height are periodically pruned.
150    non_finalized_block_write_sent_hashes: SentHashes,
151
152    /// If an invalid block is sent on `finalized_block_write_sender`
153    /// or `non_finalized_block_write_sender`,
154    /// this channel gets the [`block::Hash`] of the valid tip.
155    //
156    // TODO: add tests for finalized and non-finalized resets (#2654)
157    invalid_block_write_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
158
159    // Pending UTXO Request Tracking
160    //
161    /// The set of outpoints with pending requests for their associated transparent::Output.
162    pending_utxos: PendingUtxos,
163
164    /// Instant tracking the last time `pending_utxos` was pruned.
165    last_prune: Instant,
166
167    // Updating Concurrently Readable State
168    //
169    /// A cloneable [`ReadStateService`], used to answer concurrent read requests.
170    ///
171    /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`.
172    read_service: ReadStateService,
173
174    // Metrics
175    //
176    /// A metric tracking the maximum height that's currently in `finalized_state_queued_blocks`
177    ///
178    /// Set to `f64::NAN` if `finalized_state_queued_blocks` is empty, because grafana shows NaNs
179    /// as a break in the graph.
180    max_finalized_queue_height: f64,
181}
182
183/// A read-only service for accessing Zebra's cached blockchain state.
184///
185/// This service provides read-only access to:
186/// - the non-finalized state: the ~100 most recent blocks.
187/// - the finalized state: older blocks that have many confirmations.
188///
189/// Requests to this service are processed in parallel,
190/// ignoring any blocks queued by the read-write [`StateService`].
191///
192/// This quick response behavior is better for most state users.
193/// It allows other async tasks to make progress while concurrently reading data from disk.
194#[derive(Clone, Debug)]
195pub struct ReadStateService {
196    // Configuration
197    //
198    /// The configured Zcash network.
199    network: Network,
200
201    // Shared Concurrently Readable State
202    //
203    /// A watch channel with a cached copy of the [`NonFinalizedState`].
204    ///
205    /// This state is only updated between requests,
206    /// so it might include some block data that is also on `disk`.
207    non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
208
209    /// The shared inner on-disk database for the finalized state.
210    ///
211    /// RocksDB allows reads and writes via a shared reference,
212    /// but [`ZebraDb`] doesn't expose any write methods or types.
213    ///
214    /// This chain is updated concurrently with requests,
215    /// so it might include some block data that is also in `best_mem`.
216    db: ZebraDb,
217
218    /// A shared handle to a task that writes blocks to the [`NonFinalizedState`] or [`FinalizedState`],
219    /// once the queues have received all their parent blocks.
220    ///
221    /// Used to check for panics when writing blocks.
222    block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
223}
224
225impl Drop for StateService {
226    fn drop(&mut self) {
227        // The state service owns the state, tasks, and channels,
228        // so dropping it should shut down everything.
229
230        // Close the channels (non-blocking)
231        // This makes the block write thread exit the next time it checks the channels.
232        // We want to do this here so we get any errors or panics from the block write task before it shuts down.
233        self.invalid_block_write_reset_receiver.close();
234
235        std::mem::drop(self.block_write_sender.finalized.take());
236        std::mem::drop(self.block_write_sender.non_finalized.take());
237
238        self.clear_finalized_block_queue(CommitBlockError::WriteTaskExited);
239        self.clear_non_finalized_block_queue(CommitBlockError::WriteTaskExited);
240
241        // Log database metrics before shutting down
242        info!("dropping the state: logging database metrics");
243        self.log_db_metrics();
244
245        // Then drop self.read_service, which checks the block write task for panics,
246        // and tries to shut down the database.
247    }
248}
249
250impl Drop for ReadStateService {
251    fn drop(&mut self) {
252        // The read state service shares the state,
253        // so dropping it should check if we can shut down.
254
255        // TODO: move this into a try_shutdown() method
256        if let Some(block_write_task) = self.block_write_task.take() {
257            if let Some(block_write_task_handle) = Arc::into_inner(block_write_task) {
258                // We're the last database user, so we can tell it to shut down (blocking):
259                // - flushes the database to disk, and
260                // - drops the database, which cleans up any database tasks correctly.
261                self.db.shutdown(true);
262
263                // We are the last state with a reference to this thread, so we can
264                // wait until the block write task finishes, then check for panics (blocking).
265                // (We'd also like to abort the thread, but std::thread::JoinHandle can't do that.)
266
267                // This log is verbose during tests.
268                #[cfg(not(test))]
269                info!("waiting for the block write task to finish");
270                #[cfg(test)]
271                debug!("waiting for the block write task to finish");
272
273                // TODO: move this into a check_for_panics() method
274                if let Err(thread_panic) = block_write_task_handle.join() {
275                    std::panic::resume_unwind(thread_panic);
276                } else {
277                    debug!("shutting down the state because the block write task has finished");
278                }
279            }
280        } else {
281            // Even if we're not the last database user, try shutting it down.
282            //
283            // TODO: rename this to try_shutdown()?
284            self.db.shutdown(false);
285        }
286    }
287}
288
289impl StateService {
290    const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
291
292    /// Creates a new state service for the state `config` and `network`.
293    ///
294    /// Uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
295    /// to work out when it is near the final checkpoint.
296    ///
297    /// Returns the read-write and read-only state services,
298    /// and read-only watch channels for its best chain tip.
299    pub async fn new(
300        config: Config,
301        network: &Network,
302        max_checkpoint_height: block::Height,
303        checkpoint_verify_concurrency_limit: usize,
304    ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
305        let (finalized_state, finalized_tip, timer) = {
306            let config = config.clone();
307            let network = network.clone();
308            tokio::task::spawn_blocking(move || {
309                let timer = CodeTimer::start();
310                let finalized_state = FinalizedState::new(
311                    &config,
312                    &network,
313                    #[cfg(feature = "elasticsearch")]
314                    true,
315                );
316                timer.finish_desc("opening finalized state database");
317
318                let timer = CodeTimer::start();
319                let finalized_tip = finalized_state.db.tip_block();
320
321                (finalized_state, finalized_tip, timer)
322            })
323            .await
324            .expect("failed to join blocking task")
325        };
326
327        // # Correctness
328        //
329        // The state service must set the finalized block write sender to `None`
330        // if there are blocks in the restored non-finalized state that are above
331        // the max checkpoint height so that non-finalized blocks can be written, otherwise,
332        // Zebra will be unable to commit semantically verified blocks, and its chain sync will stall.
333        //
334        // The state service must not set the finalized block write sender to `None` if there
335        // aren't blocks in the restored non-finalized state that are above the max checkpoint height,
336        // otherwise, unless checkpoint sync is disabled in the zebra-consensus configuration,
337        // Zebra will be unable to commit checkpoint verified blocks, and its chain sync will stall.
338        let is_finalized_tip_past_max_checkpoint = if let Some(tip) = &finalized_tip {
339            tip.coinbase_height().expect("valid block must have height") >= max_checkpoint_height
340        } else {
341            false
342        };
343        let (non_finalized_state, non_finalized_state_sender, non_finalized_state_receiver) =
344            NonFinalizedState::new(network)
345                .with_backup(
346                    config.non_finalized_state_backup_dir(network),
347                    &finalized_state.db,
348                    is_finalized_tip_past_max_checkpoint,
349                )
350                .await;
351
352        let non_finalized_block_write_sent_hashes = SentHashes::new(&non_finalized_state);
353        let initial_tip = non_finalized_state
354            .best_tip_block()
355            .map(|cv_block| cv_block.block.clone())
356            .or(finalized_tip)
357            .map(CheckpointVerifiedBlock::from)
358            .map(ChainTipBlock::from);
359
360        tracing::info!(chain_tip = ?initial_tip.as_ref().map(|tip| (tip.hash, tip.height)), "loaded Zebra state cache");
361
362        let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
363            ChainTipSender::new(initial_tip, network);
364
365        let finalized_state_for_writing = finalized_state.clone();
366        let should_use_finalized_block_write_sender = non_finalized_state.is_chain_set_empty();
367        let (block_write_sender, invalid_block_write_reset_receiver, block_write_task) =
368            write::BlockWriteSender::spawn(
369                finalized_state_for_writing,
370                non_finalized_state,
371                chain_tip_sender,
372                non_finalized_state_sender,
373                should_use_finalized_block_write_sender,
374            );
375
376        let read_service = ReadStateService::new(
377            &finalized_state,
378            block_write_task,
379            non_finalized_state_receiver,
380        );
381
382        let full_verifier_utxo_lookahead = max_checkpoint_height
383            - HeightDiff::try_from(checkpoint_verify_concurrency_limit)
384                .expect("fits in HeightDiff");
385        let full_verifier_utxo_lookahead =
386            full_verifier_utxo_lookahead.unwrap_or(block::Height::MIN);
387        let non_finalized_state_queued_blocks = QueuedBlocks::default();
388        let pending_utxos = PendingUtxos::default();
389
390        let finalized_block_write_last_sent_hash =
391            tokio::task::spawn_blocking(move || finalized_state.db.finalized_tip_hash())
392                .await
393                .expect("failed to join blocking task");
394
395        let state = Self {
396            network: network.clone(),
397            full_verifier_utxo_lookahead,
398            non_finalized_state_queued_blocks,
399            finalized_state_queued_blocks: HashMap::new(),
400            block_write_sender,
401            finalized_block_write_last_sent_hash,
402            non_finalized_block_write_sent_hashes,
403            invalid_block_write_reset_receiver,
404            pending_utxos,
405            last_prune: Instant::now(),
406            read_service: read_service.clone(),
407            max_finalized_queue_height: f64::NAN,
408        };
409        timer.finish_desc("initializing state service");
410
411        tracing::info!("starting legacy chain check");
412        let timer = CodeTimer::start();
413
414        if let (Some(tip), Some(nu5_activation_height)) = (
415            {
416                let read_state = state.read_service.clone();
417                tokio::task::spawn_blocking(move || read_state.best_tip())
418                    .await
419                    .expect("task should not panic")
420            },
421            NetworkUpgrade::Nu5.activation_height(network),
422        ) {
423            if let Err(error) = check::legacy_chain(
424                nu5_activation_height,
425                any_ancestor_blocks(
426                    &state.read_service.latest_non_finalized_state(),
427                    &state.read_service.db,
428                    tip.1,
429                ),
430                &state.network,
431                MAX_LEGACY_CHAIN_BLOCKS,
432            ) {
433                let legacy_db_path = state.read_service.db.path().to_path_buf();
434                panic!(
435                    "Cached state contains a legacy chain.\n\
436                     An outdated Zebra version did not know about a recent network upgrade,\n\
437                     so it followed a legacy chain using outdated consensus branch rules.\n\
438                     Hint: Delete your database, and restart Zebra to do a full sync.\n\
439                     Database path: {legacy_db_path:?}\n\
440                     Error: {error:?}",
441                );
442            }
443        }
444
445        tracing::info!("cached state consensus branch is valid: no legacy chain found");
446        timer.finish_desc("legacy chain check");
447
448        // Spawn a background task to periodically export RocksDB metrics to Prometheus
449        let db_for_metrics = read_service.db.clone();
450        tokio::spawn(async move {
451            let mut interval = tokio::time::interval(Duration::from_secs(30));
452            loop {
453                interval.tick().await;
454                db_for_metrics.export_metrics();
455            }
456        });
457
458        (state, read_service, latest_chain_tip, chain_tip_change)
459    }
460
461    /// Call read only state service to log rocksdb database metrics.
462    pub fn log_db_metrics(&self) {
463        self.read_service.db.print_db_metrics();
464    }
465
466    /// Queue a checkpoint verified block for verification and storage in the finalized state.
467    ///
468    /// Returns a channel receiver that provides the result of the block commit.
469    fn queue_and_commit_to_finalized_state(
470        &mut self,
471        checkpoint_verified: CheckpointVerifiedBlock,
472    ) -> oneshot::Receiver<Result<block::Hash, CommitCheckpointVerifiedError>> {
473        // # Correctness & Performance
474        //
475        // This method must not block, access the database, or perform CPU-intensive tasks,
476        // because it is called directly from the tokio executor's Future threads.
477
478        let queued_prev_hash = checkpoint_verified.block.header.previous_block_hash;
479        let queued_height = checkpoint_verified.height;
480
481        // If we're close to the final checkpoint, make the block's UTXOs available for
482        // semantic block verification, even when it is in the channel.
483        if self.is_close_to_final_checkpoint(queued_height) {
484            self.non_finalized_block_write_sent_hashes
485                .add_finalized(&checkpoint_verified)
486        }
487
488        let (rsp_tx, rsp_rx) = oneshot::channel();
489        let queued = (checkpoint_verified, rsp_tx);
490
491        if self.block_write_sender.finalized.is_some() {
492            // We're still committing checkpoint verified blocks
493            if let Some(duplicate_queued) = self
494                .finalized_state_queued_blocks
495                .insert(queued_prev_hash, queued)
496            {
497                Self::send_checkpoint_verified_block_error(
498                    duplicate_queued,
499                    CommitBlockError::new_duplicate(
500                        Some(queued_prev_hash.into()),
501                        KnownBlock::Queue,
502                    ),
503                );
504            }
505
506            self.drain_finalized_queue_and_commit();
507        } else {
508            // We've finished committing checkpoint verified blocks to the finalized state,
509            // so drop any repeated queued blocks, and return an error.
510            //
511            // TODO: track the latest sent height, and drop any blocks under that height
512            //       every time we send some blocks (like QueuedSemanticallyVerifiedBlocks)
513            Self::send_checkpoint_verified_block_error(
514                queued,
515                CommitBlockError::new_duplicate(None, KnownBlock::Finalized),
516            );
517
518            self.clear_finalized_block_queue(CommitBlockError::new_duplicate(
519                None,
520                KnownBlock::Finalized,
521            ));
522        }
523
524        if self.finalized_state_queued_blocks.is_empty() {
525            self.max_finalized_queue_height = f64::NAN;
526        } else if self.max_finalized_queue_height.is_nan()
527            || self.max_finalized_queue_height < queued_height.0 as f64
528        {
529            // if there are still blocks in the queue, then either:
530            //   - the new block was lower than the old maximum, and there was a gap before it,
531            //     so the maximum is still the same (and we skip this code), or
532            //   - the new block is higher than the old maximum, and there is at least one gap
533            //     between the finalized tip and the new maximum
534            self.max_finalized_queue_height = queued_height.0 as f64;
535        }
536
537        metrics::gauge!("state.checkpoint.queued.max.height").set(self.max_finalized_queue_height);
538        metrics::gauge!("state.checkpoint.queued.block.count")
539            .set(self.finalized_state_queued_blocks.len() as f64);
540
541        rsp_rx
542    }
543
544    /// Finds finalized state queue blocks to be committed to the state in order,
545    /// removes them from the queue, and sends them to the block commit task.
546    ///
547    /// After queueing a finalized block, this method checks whether the newly
548    /// queued block (and any of its descendants) can be committed to the state.
549    ///
550    /// Returns an error if the block commit channel has been closed.
551    pub fn drain_finalized_queue_and_commit(&mut self) {
552        use tokio::sync::mpsc::error::{SendError, TryRecvError};
553
554        // # Correctness & Performance
555        //
556        // This method must not block, access the database, or perform CPU-intensive tasks,
557        // because it is called directly from the tokio executor's Future threads.
558
559        // If a block failed, we need to start again from a valid tip.
560        match self.invalid_block_write_reset_receiver.try_recv() {
561            Ok(reset_tip_hash) => self.finalized_block_write_last_sent_hash = reset_tip_hash,
562            Err(TryRecvError::Disconnected) => {
563                info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
564                return;
565            }
566            // There are no errors, so we can just use the last block hash we sent
567            Err(TryRecvError::Empty) => {}
568        }
569
570        while let Some(queued_block) = self
571            .finalized_state_queued_blocks
572            .remove(&self.finalized_block_write_last_sent_hash)
573        {
574            let last_sent_finalized_block_height = queued_block.0.height;
575
576            self.finalized_block_write_last_sent_hash = queued_block.0.hash;
577
578            // If we've finished sending finalized blocks, ignore any repeated blocks.
579            // (Blocks can be repeated after a syncer reset.)
580            if let Some(finalized_block_write_sender) = &self.block_write_sender.finalized {
581                let send_result = finalized_block_write_sender.send(queued_block);
582
583                // If the receiver is closed, we can't send any more blocks.
584                if let Err(SendError(queued)) = send_result {
585                    // If Zebra is shutting down, drop blocks and return an error.
586                    Self::send_checkpoint_verified_block_error(
587                        queued,
588                        CommitBlockError::WriteTaskExited,
589                    );
590
591                    self.clear_finalized_block_queue(CommitBlockError::WriteTaskExited);
592                } else {
593                    metrics::gauge!("state.checkpoint.sent.block.height")
594                        .set(last_sent_finalized_block_height.0 as f64);
595                };
596            }
597        }
598    }
599
600    /// Drops all finalized state queue blocks, and sends an error on their result channels.
601    fn clear_finalized_block_queue(
602        &mut self,
603        error: impl Into<CommitCheckpointVerifiedError> + Clone,
604    ) {
605        for (_hash, queued) in self.finalized_state_queued_blocks.drain() {
606            Self::send_checkpoint_verified_block_error(queued, error.clone());
607        }
608    }
609
610    /// Send an error on a `QueuedCheckpointVerified` block's result channel, and drop the block
611    fn send_checkpoint_verified_block_error(
612        queued: QueuedCheckpointVerified,
613        error: impl Into<CommitCheckpointVerifiedError>,
614    ) {
615        let (finalized, rsp_tx) = queued;
616
617        // The block sender might have already given up on this block,
618        // so ignore any channel send errors.
619        let _ = rsp_tx.send(Err(error.into()));
620        std::mem::drop(finalized);
621    }
622
623    /// Drops all non-finalized state queue blocks, and sends an error on their result channels.
624    fn clear_non_finalized_block_queue(
625        &mut self,
626        error: impl Into<CommitSemanticallyVerifiedError> + Clone,
627    ) {
628        for (_hash, queued) in self.non_finalized_state_queued_blocks.drain() {
629            Self::send_semantically_verified_block_error(queued, error.clone());
630        }
631    }
632
633    /// Send an error on a `QueuedSemanticallyVerified` block's result channel, and drop the block
634    fn send_semantically_verified_block_error(
635        queued: QueuedSemanticallyVerified,
636        error: impl Into<CommitSemanticallyVerifiedError>,
637    ) {
638        let (finalized, rsp_tx) = queued;
639
640        // The block sender might have already given up on this block,
641        // so ignore any channel send errors.
642        let _ = rsp_tx.send(Err(error.into()));
643        std::mem::drop(finalized);
644    }
645
646    /// Queue a semantically verified block for contextual verification and check if any queued
647    /// blocks are ready to be verified and committed to the state.
648    ///
649    /// This function encodes the logic for [committing non-finalized blocks][1]
650    /// in RFC0005.
651    ///
652    /// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
653    #[instrument(level = "debug", skip(self, semantically_verified))]
654    fn queue_and_commit_to_non_finalized_state(
655        &mut self,
656        semantically_verified: SemanticallyVerifiedBlock,
657    ) -> oneshot::Receiver<Result<block::Hash, CommitSemanticallyVerifiedError>> {
658        tracing::debug!(block = %semantically_verified.block, "queueing block for contextual verification");
659        let parent_hash = semantically_verified.block.header.previous_block_hash;
660
661        if self
662            .non_finalized_block_write_sent_hashes
663            .contains(&semantically_verified.hash)
664        {
665            let (rsp_tx, rsp_rx) = oneshot::channel();
666            let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
667                Some(semantically_verified.hash.into()),
668                KnownBlock::WriteChannel,
669            )
670            .into()));
671            return rsp_rx;
672        }
673
674        if self
675            .read_service
676            .db
677            .contains_height(semantically_verified.height)
678        {
679            let (rsp_tx, rsp_rx) = oneshot::channel();
680            let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
681                Some(semantically_verified.height.into()),
682                KnownBlock::Finalized,
683            )
684            .into()));
685            return rsp_rx;
686        }
687
688        // [`Request::CommitSemanticallyVerifiedBlock`] contract: a request to commit a block which
689        // has been queued but not yet committed to the state fails the older request and replaces
690        // it with the newer request.
691        let rsp_rx = if let Some((_, old_rsp_tx)) = self
692            .non_finalized_state_queued_blocks
693            .get_mut(&semantically_verified.hash)
694        {
695            tracing::debug!("replacing older queued request with new request");
696            let (mut rsp_tx, rsp_rx) = oneshot::channel();
697            std::mem::swap(old_rsp_tx, &mut rsp_tx);
698            let _ = rsp_tx.send(Err(CommitBlockError::new_duplicate(
699                Some(semantically_verified.hash.into()),
700                KnownBlock::Queue,
701            )
702            .into()));
703            rsp_rx
704        } else {
705            let (rsp_tx, rsp_rx) = oneshot::channel();
706            self.non_finalized_state_queued_blocks
707                .queue((semantically_verified, rsp_tx));
708            rsp_rx
709        };
710
711        // We've finished sending checkpoint verified blocks when:
712        // - we've sent the verified block for the last checkpoint, and
713        // - it has been successfully written to disk.
714        //
715        // We detect the last checkpoint by looking for non-finalized blocks
716        // that are a child of the last block we sent.
717        //
718        // TODO: configure the state with the last checkpoint hash instead?
719        if self.block_write_sender.finalized.is_some()
720            && self
721                .non_finalized_state_queued_blocks
722                .has_queued_children(self.finalized_block_write_last_sent_hash)
723            && self.read_service.db.finalized_tip_hash()
724                == self.finalized_block_write_last_sent_hash
725        {
726            // Tell the block write task to stop committing checkpoint verified blocks to the finalized state,
727            // and move on to committing semantically verified blocks to the non-finalized state.
728            std::mem::drop(self.block_write_sender.finalized.take());
729            // Remove any checkpoint-verified block hashes from `non_finalized_block_write_sent_hashes`.
730            self.non_finalized_block_write_sent_hashes = SentHashes::default();
731            // Mark `SentHashes` as usable by the `can_fork_chain_at()` method.
732            self.non_finalized_block_write_sent_hashes
733                .can_fork_chain_at_hashes = true;
734            // Send blocks from non-finalized queue
735            self.send_ready_non_finalized_queued(self.finalized_block_write_last_sent_hash);
736            // We've finished committing checkpoint verified blocks to finalized state, so drop any repeated queued blocks.
737            self.clear_finalized_block_queue(CommitBlockError::new_duplicate(
738                None,
739                KnownBlock::Finalized,
740            ));
741        } else if !self.can_fork_chain_at(&parent_hash) {
742            tracing::trace!("unready to verify, returning early");
743        } else if self.block_write_sender.finalized.is_none() {
744            // Wait until block commit task is ready to write non-finalized blocks before dequeuing them
745            self.send_ready_non_finalized_queued(parent_hash);
746
747            let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
748                "Finalized state must have at least one block before committing non-finalized state",
749            );
750
751            self.non_finalized_state_queued_blocks
752                .prune_by_height(finalized_tip_height);
753
754            self.non_finalized_block_write_sent_hashes
755                .prune_by_height(finalized_tip_height);
756        }
757
758        rsp_rx
759    }
760
761    /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
762    fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
763        self.non_finalized_block_write_sent_hashes
764            .can_fork_chain_at(hash)
765            || &self.read_service.db.finalized_tip_hash() == hash
766    }
767
768    /// Returns `true` if `queued_height` is near the final checkpoint.
769    ///
770    /// The semantic block verifier needs access to UTXOs from checkpoint verified blocks
771    /// near the final checkpoint, so that it can verify blocks that spend those UTXOs.
772    ///
773    /// If it doesn't have the required UTXOs, some blocks will time out,
774    /// but succeed after a syncer restart.
775    fn is_close_to_final_checkpoint(&self, queued_height: block::Height) -> bool {
776        queued_height >= self.full_verifier_utxo_lookahead
777    }
778
779    /// Sends all queued blocks whose parents have recently arrived starting from `new_parent`
780    /// in breadth-first ordering to the block write task which will attempt to validate and commit them
781    #[tracing::instrument(level = "debug", skip(self, new_parent))]
782    fn send_ready_non_finalized_queued(&mut self, new_parent: block::Hash) {
783        use tokio::sync::mpsc::error::SendError;
784        if let Some(non_finalized_block_write_sender) = &self.block_write_sender.non_finalized {
785            let mut new_parents: Vec<block::Hash> = vec![new_parent];
786
787            while let Some(parent_hash) = new_parents.pop() {
788                let queued_children = self
789                    .non_finalized_state_queued_blocks
790                    .dequeue_children(parent_hash);
791
792                for queued_child in queued_children {
793                    let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child;
794
795                    self.non_finalized_block_write_sent_hashes
796                        .add(&queued_child.0);
797                    let send_result = non_finalized_block_write_sender.send(queued_child.into());
798
799                    if let Err(SendError(NonFinalizedWriteMessage::Commit(queued))) = send_result {
800                        // If Zebra is shutting down, drop blocks and return an error.
801                        Self::send_semantically_verified_block_error(
802                            queued,
803                            CommitBlockError::WriteTaskExited,
804                        );
805
806                        self.clear_non_finalized_block_queue(CommitBlockError::WriteTaskExited);
807
808                        return;
809                    };
810
811                    new_parents.push(hash);
812                }
813            }
814
815            self.non_finalized_block_write_sent_hashes.finish_batch();
816        };
817    }
818
819    /// Return the tip of the current best chain.
820    pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
821        self.read_service.best_tip()
822    }
823
824    fn send_invalidate_block(
825        &self,
826        hash: block::Hash,
827    ) -> oneshot::Receiver<Result<block::Hash, InvalidateError>> {
828        let (rsp_tx, rsp_rx) = oneshot::channel();
829
830        let Some(sender) = &self.block_write_sender.non_finalized else {
831            let _ = rsp_tx.send(Err(InvalidateError::ProcessingCheckpointedBlocks));
832            return rsp_rx;
833        };
834
835        if let Err(tokio::sync::mpsc::error::SendError(error)) =
836            sender.send(NonFinalizedWriteMessage::Invalidate { hash, rsp_tx })
837        {
838            let NonFinalizedWriteMessage::Invalidate { rsp_tx, .. } = error else {
839                unreachable!("should return the same Invalidate message could not be sent");
840            };
841
842            let _ = rsp_tx.send(Err(InvalidateError::SendInvalidateRequestFailed));
843        }
844
845        rsp_rx
846    }
847
848    fn send_reconsider_block(
849        &self,
850        hash: block::Hash,
851    ) -> oneshot::Receiver<Result<Vec<block::Hash>, ReconsiderError>> {
852        let (rsp_tx, rsp_rx) = oneshot::channel();
853
854        let Some(sender) = &self.block_write_sender.non_finalized else {
855            let _ = rsp_tx.send(Err(ReconsiderError::CheckpointCommitInProgress));
856            return rsp_rx;
857        };
858
859        if let Err(tokio::sync::mpsc::error::SendError(error)) =
860            sender.send(NonFinalizedWriteMessage::Reconsider { hash, rsp_tx })
861        {
862            let NonFinalizedWriteMessage::Reconsider { rsp_tx, .. } = error else {
863                unreachable!("should return the same Reconsider message could not be sent");
864            };
865
866            let _ = rsp_tx.send(Err(ReconsiderError::ReconsiderSendFailed));
867        }
868
869        rsp_rx
870    }
871
872    /// Assert some assumptions about the semantically verified `block` before it is queued.
873    fn assert_block_can_be_validated(&self, block: &SemanticallyVerifiedBlock) {
874        // required by `Request::CommitSemanticallyVerifiedBlock` call
875        assert!(
876            block.height > self.network.mandatory_checkpoint_height(),
877            "invalid semantically verified block height: the canopy checkpoint is mandatory, pre-canopy \
878            blocks, and the canopy activation block, must be committed to the state as finalized \
879            blocks"
880        );
881    }
882
883    fn known_sent_hash(&self, hash: &block::Hash) -> Option<KnownBlock> {
884        self.non_finalized_block_write_sent_hashes
885            .contains(hash)
886            .then_some(KnownBlock::WriteChannel)
887    }
888}
889
890impl ReadStateService {
891    /// Creates a new read-only state service, using the provided finalized state and
892    /// block write task handle.
893    ///
894    /// Returns the newly created service,
895    /// and a watch channel for updating the shared recent non-finalized chain.
896    pub(crate) fn new(
897        finalized_state: &FinalizedState,
898        block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
899        non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
900    ) -> Self {
901        let read_service = Self {
902            network: finalized_state.network(),
903            db: finalized_state.db.clone(),
904            non_finalized_state_receiver,
905            block_write_task,
906        };
907
908        tracing::debug!("created new read-only state service");
909
910        read_service
911    }
912
913    /// Return the tip of the current best chain.
914    pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
915        read::best_tip(&self.latest_non_finalized_state(), &self.db)
916    }
917
918    /// Gets a clone of the latest non-finalized state from the `non_finalized_state_receiver`
919    fn latest_non_finalized_state(&self) -> NonFinalizedState {
920        self.non_finalized_state_receiver.cloned_watch_data()
921    }
922
923    /// Gets a clone of the latest, best non-finalized chain from the `non_finalized_state_receiver`
924    fn latest_best_chain(&self) -> Option<Arc<Chain>> {
925        self.non_finalized_state_receiver
926            .borrow_mapped(|non_finalized_state| non_finalized_state.best_chain().cloned())
927    }
928
929    /// Test-only access to the inner database.
930    /// Can be used to modify the database without doing any consensus checks.
931    #[cfg(any(test, feature = "proptest-impl"))]
932    pub fn db(&self) -> &ZebraDb {
933        &self.db
934    }
935
936    /// Logs rocksdb metrics using the read only state service.
937    pub fn log_db_metrics(&self) {
938        self.db.print_db_metrics();
939    }
940}
941
942impl Service<Request> for StateService {
943    type Response = Response;
944    type Error = BoxError;
945    type Future =
946        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
947
948    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
949        // Check for panics in the block write task
950        let poll = self.read_service.poll_ready(cx);
951
952        // Prune outdated UTXO requests
953        let now = Instant::now();
954
955        if self.last_prune + Self::PRUNE_INTERVAL < now {
956            let tip = self.best_tip();
957            let old_len = self.pending_utxos.len();
958
959            self.pending_utxos.prune();
960            self.last_prune = now;
961
962            let new_len = self.pending_utxos.len();
963            let prune_count = old_len
964                .checked_sub(new_len)
965                .expect("prune does not add any utxo requests");
966            if prune_count > 0 {
967                tracing::debug!(
968                    ?old_len,
969                    ?new_len,
970                    ?prune_count,
971                    ?tip,
972                    "pruned utxo requests"
973                );
974            } else {
975                tracing::debug!(len = ?old_len, ?tip, "no utxo requests needed pruning");
976            }
977        }
978
979        poll
980    }
981
982    #[instrument(name = "state", skip(self, req))]
983    fn call(&mut self, req: Request) -> Self::Future {
984        req.count_metric();
985        let span = Span::current();
986
987        match req {
988            // Uses non_finalized_state_queued_blocks and pending_utxos in the StateService
989            // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
990            //
991            // The expected error type for this request is `CommitSemanticallyVerifiedError`.
992            Request::CommitSemanticallyVerifiedBlock(semantically_verified) => {
993                let timer = CodeTimer::start();
994                self.assert_block_can_be_validated(&semantically_verified);
995
996                self.pending_utxos
997                    .check_against_ordered(&semantically_verified.new_outputs);
998
999                // # Performance
1000                //
1001                // Allow other async tasks to make progress while blocks are being verified
1002                // and written to disk. But wait for the blocks to finish committing,
1003                // so that `StateService` multi-block queries always observe a consistent state.
1004                //
1005                // Since each block is spawned into its own task,
1006                // there shouldn't be any other code running in the same task,
1007                // so we don't need to worry about blocking it:
1008                // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
1009
1010                let rsp_rx = tokio::task::block_in_place(move || {
1011                    span.in_scope(|| {
1012                        self.queue_and_commit_to_non_finalized_state(semantically_verified)
1013                    })
1014                });
1015
1016                // TODO:
1017                //   - check for panics in the block write task here,
1018                //     as well as in poll_ready()
1019
1020                // The work is all done, the future just waits on a channel for the result
1021                timer.finish_desc("CommitSemanticallyVerifiedBlock");
1022
1023                // Await the channel response, flatten the result, map receive errors to
1024                // `CommitSemanticallyVerifiedError::WriteTaskExited`.
1025                // Then flatten the nested Result and convert any errors to a BoxError.
1026                let span = Span::current();
1027                async move {
1028                    rsp_rx
1029                        .await
1030                        .map_err(|_recv_error| CommitBlockError::WriteTaskExited.into())
1031                        .and_then(|result| result)
1032                        .map_err(BoxError::from)
1033                        .map(Response::Committed)
1034                }
1035                .instrument(span)
1036                .boxed()
1037            }
1038
1039            // Uses finalized_state_queued_blocks and pending_utxos in the StateService.
1040            // Accesses shared writeable state in the StateService.
1041            //
1042            // The expected error type for this request is `CommitCheckpointVerifiedError`.
1043            Request::CommitCheckpointVerifiedBlock(finalized) => {
1044                let timer = CodeTimer::start();
1045                // # Consensus
1046                //
1047                // A semantic block verification could have called AwaitUtxo
1048                // before this checkpoint verified block arrived in the state.
1049                // So we need to check for pending UTXO requests sent by running
1050                // semantic block verifications.
1051                //
1052                // This check is redundant for most checkpoint verified blocks,
1053                // because semantic verification can only succeed near the final
1054                // checkpoint, when all the UTXOs are available for the verifying block.
1055                //
1056                // (Checkpoint block UTXOs are verified using block hash checkpoints
1057                // and transaction merkle tree block header commitments.)
1058                self.pending_utxos
1059                    .check_against_ordered(&finalized.new_outputs);
1060
1061                // # Performance
1062                //
1063                // This method doesn't block, access the database, or perform CPU-intensive tasks,
1064                // so we can run it directly in the tokio executor's Future threads.
1065                let rsp_rx = self.queue_and_commit_to_finalized_state(finalized);
1066
1067                // TODO:
1068                //   - check for panics in the block write task here,
1069                //     as well as in poll_ready()
1070
1071                // The work is all done, the future just waits on a channel for the result
1072                timer.finish_desc("CommitCheckpointVerifiedBlock");
1073
1074                // Await the channel response, flatten the result, map receive errors to
1075                // `CommitCheckpointVerifiedError::WriteTaskExited`.
1076                // Then flatten the nested Result and convert any errors to a BoxError.
1077                async move {
1078                    rsp_rx
1079                        .await
1080                        .map_err(|_recv_error| CommitBlockError::WriteTaskExited.into())
1081                        .and_then(|result| result)
1082                        .map_err(BoxError::from)
1083                        .map(Response::Committed)
1084                }
1085                .instrument(span)
1086                .boxed()
1087            }
1088
1089            // Uses pending_utxos and non_finalized_state_queued_blocks in the StateService.
1090            // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
1091            Request::AwaitUtxo(outpoint) => {
1092                let timer = CodeTimer::start();
1093                // Prepare the AwaitUtxo future from PendingUxtos.
1094                let response_fut = self.pending_utxos.queue(outpoint);
1095                // Only instrument `response_fut`, the ReadStateService already
1096                // instruments its requests with the same span.
1097
1098                let response_fut = response_fut.instrument(span).boxed();
1099
1100                // Check the non-finalized block queue outside the returned future,
1101                // so we can access mutable state fields.
1102                if let Some(utxo) = self.non_finalized_state_queued_blocks.utxo(&outpoint) {
1103                    self.pending_utxos.respond(&outpoint, utxo);
1104
1105                    // We're finished, the returned future gets the UTXO from the respond() channel.
1106                    timer.finish_desc("AwaitUtxo/queued-non-finalized");
1107
1108                    return response_fut;
1109                }
1110
1111                // Check the sent non-finalized blocks
1112                if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) {
1113                    self.pending_utxos.respond(&outpoint, utxo);
1114
1115                    // We're finished, the returned future gets the UTXO from the respond() channel.
1116                    timer.finish_desc("AwaitUtxo/sent-non-finalized");
1117
1118                    return response_fut;
1119                }
1120
1121                // We ignore any UTXOs in FinalizedState.finalized_state_queued_blocks,
1122                // because it is only used during checkpoint verification.
1123                //
1124                // This creates a rare race condition, but it doesn't seem to happen much in practice.
1125                // See #5126 for details.
1126
1127                // Manually send a request to the ReadStateService,
1128                // to get UTXOs from any non-finalized chain or the finalized chain.
1129                let read_service = self.read_service.clone();
1130
1131                // Run the request in an async block, so we can await the response.
1132                async move {
1133                    let req = ReadRequest::AnyChainUtxo(outpoint);
1134
1135                    let rsp = read_service.oneshot(req).await?;
1136
1137                    // Optional TODO:
1138                    //  - make pending_utxos.respond() async using a channel,
1139                    //    so we can respond to all waiting requests here
1140                    //
1141                    // This change is not required for correctness, because:
1142                    // - any waiting requests should have returned when the block was sent to the state
1143                    // - otherwise, the request returns immediately if:
1144                    //   - the block is in the non-finalized queue, or
1145                    //   - the block is in any non-finalized chain or the finalized state
1146                    //
1147                    // And if the block is in the finalized queue,
1148                    // that's rare enough that a retry is ok.
1149                    if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp {
1150                        // We got a UTXO, so we replace the response future with the result own.
1151                        timer.finish_desc("AwaitUtxo/any-chain");
1152
1153                        return Ok(Response::Utxo(utxo));
1154                    }
1155
1156                    // We're finished, but the returned future is waiting on the respond() channel.
1157                    timer.finish_desc("AwaitUtxo/waiting");
1158
1159                    response_fut.await
1160                }
1161                .boxed()
1162            }
1163
1164            // Used by sync, inbound, and block verifier to check if a block is already in the state
1165            // before downloading or validating it.
1166            Request::KnownBlock(hash) => {
1167                let timer = CodeTimer::start();
1168                let sent_hash_response = self.known_sent_hash(&hash);
1169                let read_service = self.read_service.clone();
1170
1171                async move {
1172                    if sent_hash_response.is_some() {
1173                        return Ok(Response::KnownBlock(sent_hash_response));
1174                    };
1175
1176                    let response = read::non_finalized_state_contains_block_hash(
1177                        &read_service.latest_non_finalized_state(),
1178                        hash,
1179                    )
1180                    // TODO: Move this to a blocking task, perhaps by moving some of this logic to the ReadStateService.
1181                    .or_else(|| read::finalized_state_contains_block_hash(&read_service.db, hash));
1182
1183                    timer.finish_desc("Request::KnownBlock");
1184
1185                    Ok(Response::KnownBlock(response))
1186                }
1187                .boxed()
1188            }
1189
1190            // The expected error type for this request is `InvalidateError`
1191            Request::InvalidateBlock(block_hash) => {
1192                let rsp_rx = tokio::task::block_in_place(move || {
1193                    span.in_scope(|| self.send_invalidate_block(block_hash))
1194                });
1195
1196                // Await the channel response, flatten the result, map receive errors to
1197                // `InvalidateError::InvalidateRequestDropped`.
1198                // Then flatten the nested Result and convert any errors to a BoxError.
1199                let span = Span::current();
1200                async move {
1201                    rsp_rx
1202                        .await
1203                        .map_err(|_recv_error| InvalidateError::InvalidateRequestDropped)
1204                        .and_then(|result| result)
1205                        .map_err(BoxError::from)
1206                        .map(Response::Invalidated)
1207                }
1208                .instrument(span)
1209                .boxed()
1210            }
1211
1212            // The expected error type for this request is `ReconsiderError`
1213            Request::ReconsiderBlock(block_hash) => {
1214                let rsp_rx = tokio::task::block_in_place(move || {
1215                    span.in_scope(|| self.send_reconsider_block(block_hash))
1216                });
1217
1218                // Await the channel response, flatten the result, map receive errors to
1219                // `ReconsiderError::ReconsiderResponseDropped`.
1220                // Then flatten the nested Result and convert any errors to a BoxError.
1221                let span = Span::current();
1222                async move {
1223                    rsp_rx
1224                        .await
1225                        .map_err(|_recv_error| ReconsiderError::ReconsiderResponseDropped)
1226                        .and_then(|result| result)
1227                        .map_err(BoxError::from)
1228                        .map(Response::Reconsidered)
1229                }
1230                .instrument(span)
1231                .boxed()
1232            }
1233
1234            // Runs concurrently using the ReadStateService
1235            Request::Tip
1236            | Request::Depth(_)
1237            | Request::BestChainNextMedianTimePast
1238            | Request::BestChainBlockHash(_)
1239            | Request::BlockLocator
1240            | Request::Transaction(_)
1241            | Request::AnyChainTransaction(_)
1242            | Request::UnspentBestChainUtxo(_)
1243            | Request::Block(_)
1244            | Request::AnyChainBlock(_)
1245            | Request::BlockAndSize(_)
1246            | Request::BlockHeader(_)
1247            | Request::FindBlockHashes { .. }
1248            | Request::FindBlockHeaders { .. }
1249            | Request::CheckBestChainTipNullifiersAndAnchors(_)
1250            | Request::CheckBlockProposalValidity(_) => {
1251                // Redirect the request to the concurrent ReadStateService
1252                let read_service = self.read_service.clone();
1253
1254                async move {
1255                    let req = req
1256                        .try_into()
1257                        .expect("ReadRequest conversion should not fail");
1258
1259                    let rsp = read_service.oneshot(req).await?;
1260                    let rsp = rsp.try_into().expect("Response conversion should not fail");
1261
1262                    Ok(rsp)
1263                }
1264                .boxed()
1265            }
1266        }
1267    }
1268}
1269
1270impl Service<ReadRequest> for ReadStateService {
1271    type Response = ReadResponse;
1272    type Error = BoxError;
1273    type Future =
1274        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1275
1276    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1277        // Check for panics in the block write task
1278        //
1279        // TODO: move into a check_for_panics() method
1280        let block_write_task = self.block_write_task.take();
1281
1282        if let Some(block_write_task) = block_write_task {
1283            if block_write_task.is_finished() {
1284                if let Some(block_write_task) = Arc::into_inner(block_write_task) {
1285                    // We are the last state with a reference to this task, so we can propagate any panics
1286                    if let Err(thread_panic) = block_write_task.join() {
1287                        std::panic::resume_unwind(thread_panic);
1288                    }
1289                }
1290            } else {
1291                // It hasn't finished, so we need to put it back
1292                self.block_write_task = Some(block_write_task);
1293            }
1294        }
1295
1296        self.db.check_for_panics();
1297
1298        Poll::Ready(Ok(()))
1299    }
1300
1301    #[instrument(name = "read_state", skip(self, req))]
1302    fn call(&mut self, req: ReadRequest) -> Self::Future {
1303        req.count_metric();
1304        let timer = CodeTimer::start_desc(req.variant_name());
1305        let span = Span::current();
1306        let timed_span = TimedSpan::new(timer, span);
1307        let state = self.clone();
1308
1309        if req == ReadRequest::NonFinalizedBlocksListener {
1310            // The non-finalized blocks listener is used to notify the state service
1311            // about new blocks that have been added to the non-finalized state.
1312            let non_finalized_blocks_listener = NonFinalizedBlocksListener::spawn(
1313                self.network.clone(),
1314                self.non_finalized_state_receiver.clone(),
1315            );
1316
1317            return async move {
1318                Ok(ReadResponse::NonFinalizedBlocksListener(
1319                    non_finalized_blocks_listener,
1320                ))
1321            }
1322            .boxed();
1323        };
1324
1325        let request_handler = move || match req {
1326            // Used by the `getblockchaininfo` RPC.
1327            ReadRequest::UsageInfo => Ok(ReadResponse::UsageInfo(state.db.size())),
1328
1329            // Used by the StateService.
1330            ReadRequest::Tip => Ok(ReadResponse::Tip(read::tip(
1331                state.latest_best_chain(),
1332                &state.db,
1333            ))),
1334
1335            // Used by `getblockchaininfo` RPC method.
1336            ReadRequest::TipPoolValues => {
1337                let (tip_height, tip_hash, value_balance) =
1338                    read::tip_with_value_balance(state.latest_best_chain(), &state.db)?
1339                        .ok_or(BoxError::from("no chain tip available yet"))?;
1340
1341                Ok(ReadResponse::TipPoolValues {
1342                    tip_height,
1343                    tip_hash,
1344                    value_balance,
1345                })
1346            }
1347
1348            // Used by getblock
1349            ReadRequest::BlockInfo(hash_or_height) => Ok(ReadResponse::BlockInfo(
1350                read::block_info(state.latest_best_chain(), &state.db, hash_or_height),
1351            )),
1352
1353            // Used by the StateService.
1354            ReadRequest::Depth(hash) => Ok(ReadResponse::Depth(read::depth(
1355                state.latest_best_chain(),
1356                &state.db,
1357                hash,
1358            ))),
1359
1360            // Used by the StateService.
1361            ReadRequest::BestChainNextMedianTimePast => {
1362                Ok(ReadResponse::BestChainNextMedianTimePast(
1363                    read::next_median_time_past(&state.latest_non_finalized_state(), &state.db)?,
1364                ))
1365            }
1366
1367            // Used by the get_block (raw) RPC and the StateService.
1368            ReadRequest::Block(hash_or_height) => Ok(ReadResponse::Block(read::block(
1369                state.latest_best_chain(),
1370                &state.db,
1371                hash_or_height,
1372            ))),
1373
1374            ReadRequest::AnyChainBlock(hash_or_height) => Ok(ReadResponse::Block(read::any_block(
1375                state.latest_non_finalized_state().chain_iter(),
1376                &state.db,
1377                hash_or_height,
1378            ))),
1379
1380            // Used by the get_block (raw) RPC and the StateService.
1381            ReadRequest::BlockAndSize(hash_or_height) => Ok(ReadResponse::BlockAndSize(
1382                read::block_and_size(state.latest_best_chain(), &state.db, hash_or_height),
1383            )),
1384
1385            // Used by the get_block (verbose) RPC and the StateService.
1386            ReadRequest::BlockHeader(hash_or_height) => {
1387                let best_chain = state.latest_best_chain();
1388
1389                let height = hash_or_height
1390                    .height_or_else(|hash| {
1391                        read::find::height_by_hash(best_chain.clone(), &state.db, hash)
1392                    })
1393                    .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1394
1395                let hash = hash_or_height
1396                    .hash_or_else(|height| {
1397                        read::find::hash_by_height(best_chain.clone(), &state.db, height)
1398                    })
1399                    .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1400
1401                let next_height = height.next()?;
1402                let next_block_hash =
1403                    read::find::hash_by_height(best_chain.clone(), &state.db, next_height);
1404
1405                let header = read::block_header(best_chain, &state.db, height.into())
1406                    .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1407
1408                Ok(ReadResponse::BlockHeader {
1409                    header,
1410                    hash,
1411                    height,
1412                    next_block_hash,
1413                })
1414            }
1415
1416            // For the get_raw_transaction RPC and the StateService.
1417            ReadRequest::Transaction(hash) => Ok(ReadResponse::Transaction(
1418                read::mined_transaction(state.latest_best_chain(), &state.db, hash),
1419            )),
1420
1421            ReadRequest::AnyChainTransaction(hash) => {
1422                Ok(ReadResponse::AnyChainTransaction(read::any_transaction(
1423                    state.latest_non_finalized_state().chain_iter(),
1424                    &state.db,
1425                    hash,
1426                )))
1427            }
1428
1429            // Used by the getblock (verbose) RPC.
1430            ReadRequest::TransactionIdsForBlock(hash_or_height) => Ok(
1431                ReadResponse::TransactionIdsForBlock(read::transaction_hashes_for_block(
1432                    state.latest_best_chain(),
1433                    &state.db,
1434                    hash_or_height,
1435                )),
1436            ),
1437
1438            ReadRequest::AnyChainTransactionIdsForBlock(hash_or_height) => {
1439                Ok(ReadResponse::AnyChainTransactionIdsForBlock(
1440                    read::transaction_hashes_for_any_block(
1441                        state.latest_non_finalized_state().chain_iter(),
1442                        &state.db,
1443                        hash_or_height,
1444                    ),
1445                ))
1446            }
1447
1448            #[cfg(feature = "indexer")]
1449            ReadRequest::SpendingTransactionId(spend) => Ok(ReadResponse::TransactionId(
1450                read::spending_transaction_hash(state.latest_best_chain(), &state.db, spend),
1451            )),
1452
1453            ReadRequest::UnspentBestChainUtxo(outpoint) => Ok(ReadResponse::UnspentBestChainUtxo(
1454                read::unspent_utxo(state.latest_best_chain(), &state.db, outpoint),
1455            )),
1456
1457            // Manually used by the StateService to implement part of AwaitUtxo.
1458            ReadRequest::AnyChainUtxo(outpoint) => Ok(ReadResponse::AnyChainUtxo(read::any_utxo(
1459                state.latest_non_finalized_state(),
1460                &state.db,
1461                outpoint,
1462            ))),
1463
1464            // Used by the StateService.
1465            ReadRequest::BlockLocator => Ok(ReadResponse::BlockLocator(
1466                read::block_locator(state.latest_best_chain(), &state.db).unwrap_or_default(),
1467            )),
1468
1469            // Used by the StateService.
1470            ReadRequest::FindBlockHashes { known_blocks, stop } => {
1471                Ok(ReadResponse::BlockHashes(read::find_chain_hashes(
1472                    state.latest_best_chain(),
1473                    &state.db,
1474                    known_blocks,
1475                    stop,
1476                    MAX_FIND_BLOCK_HASHES_RESULTS,
1477                )))
1478            }
1479
1480            // Used by the StateService.
1481            ReadRequest::FindBlockHeaders { known_blocks, stop } => Ok(ReadResponse::BlockHeaders(
1482                read::find_chain_headers(
1483                    state.latest_best_chain(),
1484                    &state.db,
1485                    known_blocks,
1486                    stop,
1487                    MAX_FIND_BLOCK_HEADERS_RESULTS,
1488                )
1489                .into_iter()
1490                .map(|header| CountedHeader { header })
1491                .collect(),
1492            )),
1493
1494            ReadRequest::SaplingTree(hash_or_height) => Ok(ReadResponse::SaplingTree(
1495                read::sapling_tree(state.latest_best_chain(), &state.db, hash_or_height),
1496            )),
1497
1498            ReadRequest::OrchardTree(hash_or_height) => Ok(ReadResponse::OrchardTree(
1499                read::orchard_tree(state.latest_best_chain(), &state.db, hash_or_height),
1500            )),
1501
1502            ReadRequest::SaplingSubtrees { start_index, limit } => {
1503                let end_index = limit
1504                    .and_then(|limit| start_index.0.checked_add(limit.0))
1505                    .map(NoteCommitmentSubtreeIndex);
1506
1507                let best_chain = state.latest_best_chain();
1508                let sapling_subtrees = if let Some(end_index) = end_index {
1509                    read::sapling_subtrees(best_chain, &state.db, start_index..end_index)
1510                } else {
1511                    // If there is no end bound, just return all the trees.
1512                    // If the end bound would overflow, just returns all the trees, because that's what
1513                    // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1514                    // the trees run out.)
1515                    read::sapling_subtrees(best_chain, &state.db, start_index..)
1516                };
1517
1518                Ok(ReadResponse::SaplingSubtrees(sapling_subtrees))
1519            }
1520
1521            ReadRequest::OrchardSubtrees { start_index, limit } => {
1522                let end_index = limit
1523                    .and_then(|limit| start_index.0.checked_add(limit.0))
1524                    .map(NoteCommitmentSubtreeIndex);
1525
1526                let best_chain = state.latest_best_chain();
1527                let orchard_subtrees = if let Some(end_index) = end_index {
1528                    read::orchard_subtrees(best_chain, &state.db, start_index..end_index)
1529                } else {
1530                    // If there is no end bound, just return all the trees.
1531                    // If the end bound would overflow, just returns all the trees, because that's what
1532                    // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1533                    // the trees run out.)
1534                    read::orchard_subtrees(best_chain, &state.db, start_index..)
1535                };
1536
1537                Ok(ReadResponse::OrchardSubtrees(orchard_subtrees))
1538            }
1539
1540            // For the get_address_balance RPC.
1541            ReadRequest::AddressBalance(addresses) => {
1542                let (balance, received) =
1543                    read::transparent_balance(state.latest_best_chain(), &state.db, addresses)?;
1544                Ok(ReadResponse::AddressBalance { balance, received })
1545            }
1546
1547            // For the get_address_tx_ids RPC.
1548            ReadRequest::TransactionIdsByAddresses {
1549                addresses,
1550                height_range,
1551            } => read::transparent_tx_ids(
1552                state.latest_best_chain(),
1553                &state.db,
1554                addresses,
1555                height_range,
1556            )
1557            .map(ReadResponse::AddressesTransactionIds),
1558
1559            // For the get_address_utxos RPC.
1560            ReadRequest::UtxosByAddresses(addresses) => read::address_utxos(
1561                &state.network,
1562                state.latest_best_chain(),
1563                &state.db,
1564                addresses,
1565            )
1566            .map(ReadResponse::AddressUtxos),
1567
1568            ReadRequest::CheckBestChainTipNullifiersAndAnchors(unmined_tx) => {
1569                let latest_non_finalized_best_chain = state.latest_best_chain();
1570
1571                check::nullifier::tx_no_duplicates_in_chain(
1572                    &state.db,
1573                    latest_non_finalized_best_chain.as_ref(),
1574                    &unmined_tx.transaction,
1575                )?;
1576
1577                check::anchors::tx_anchors_refer_to_final_treestates(
1578                    &state.db,
1579                    latest_non_finalized_best_chain.as_ref(),
1580                    &unmined_tx,
1581                )?;
1582
1583                Ok(ReadResponse::ValidBestChainTipNullifiersAndAnchors)
1584            }
1585
1586            // Used by the get_block and get_block_hash RPCs.
1587            ReadRequest::BestChainBlockHash(height) => Ok(ReadResponse::BlockHash(
1588                read::hash_by_height(state.latest_best_chain(), &state.db, height),
1589            )),
1590
1591            // Used by get_block_template and getblockchaininfo RPCs.
1592            ReadRequest::ChainInfo => {
1593                // # Correctness
1594                //
1595                // It is ok to do these lookups using multiple database calls. Finalized state updates
1596                // can only add overlapping blocks, and block hashes are unique across all chain forks.
1597                //
1598                // If there is a large overlap between the non-finalized and finalized states,
1599                // where the finalized tip is above the non-finalized tip,
1600                // Zebra is receiving a lot of blocks, or this request has been delayed for a long time.
1601                //
1602                // In that case, the `getblocktemplate` RPC will return an error because Zebra
1603                // is not synced to the tip. That check happens before the RPC makes this request.
1604                read::difficulty::get_block_template_chain_info(
1605                    &state.latest_non_finalized_state(),
1606                    &state.db,
1607                    &state.network,
1608                )
1609                .map(ReadResponse::ChainInfo)
1610            }
1611
1612            // Used by getmininginfo, getnetworksolps, and getnetworkhashps RPCs.
1613            ReadRequest::SolutionRate { num_blocks, height } => {
1614                let latest_non_finalized_state = state.latest_non_finalized_state();
1615                // # Correctness
1616                //
1617                // It is ok to do these lookups using multiple database calls. Finalized state updates
1618                // can only add overlapping blocks, and block hashes are unique across all chain forks.
1619                //
1620                // The worst that can happen here is that the default `start_hash` will be below
1621                // the chain tip.
1622                let (tip_height, tip_hash) =
1623                    match read::tip(latest_non_finalized_state.best_chain(), &state.db) {
1624                        Some(tip_hash) => tip_hash,
1625                        None => return Ok(ReadResponse::SolutionRate(None)),
1626                    };
1627
1628                let start_hash = match height {
1629                    Some(height) if height < tip_height => read::hash_by_height(
1630                        latest_non_finalized_state.best_chain(),
1631                        &state.db,
1632                        height,
1633                    ),
1634                    // use the chain tip hash if height is above it or not provided.
1635                    _ => Some(tip_hash),
1636                };
1637
1638                let solution_rate = start_hash.and_then(|start_hash| {
1639                    read::difficulty::solution_rate(
1640                        &latest_non_finalized_state,
1641                        &state.db,
1642                        num_blocks,
1643                        start_hash,
1644                    )
1645                });
1646
1647                Ok(ReadResponse::SolutionRate(solution_rate))
1648            }
1649
1650            ReadRequest::CheckBlockProposalValidity(semantically_verified) => {
1651                tracing::debug!(
1652                    "attempting to validate and commit block proposal \
1653                         onto a cloned non-finalized state"
1654                );
1655                let mut latest_non_finalized_state = state.latest_non_finalized_state();
1656
1657                // The previous block of a valid proposal must be on the best chain tip.
1658                let Some((_best_tip_height, best_tip_hash)) =
1659                    read::best_tip(&latest_non_finalized_state, &state.db)
1660                else {
1661                    return Err(
1662                        "state is empty: wait for Zebra to sync before submitting a proposal"
1663                            .into(),
1664                    );
1665                };
1666
1667                if semantically_verified.block.header.previous_block_hash != best_tip_hash {
1668                    return Err("proposal is not based on the current best chain tip: \
1669                                    previous block hash must be the best chain tip"
1670                        .into());
1671                }
1672
1673                // This clone of the non-finalized state is dropped when this closure returns.
1674                // The non-finalized state that's used in the rest of the state (including finalizing
1675                // blocks into the db) is not mutated here.
1676                //
1677                // TODO: Convert `CommitSemanticallyVerifiedError` to a new `ValidateProposalError`?
1678                latest_non_finalized_state.disable_metrics();
1679
1680                write::validate_and_commit_non_finalized(
1681                    &state.db,
1682                    &mut latest_non_finalized_state,
1683                    semantically_verified,
1684                )?;
1685
1686                Ok(ReadResponse::ValidBlockProposal)
1687            }
1688
1689            ReadRequest::TipBlockSize => {
1690                // Respond with the length of the obtained block if any.
1691                Ok(ReadResponse::TipBlockSize(
1692                    state
1693                        .best_tip()
1694                        .and_then(|(tip_height, _)| {
1695                            read::block_info(
1696                                state.latest_best_chain(),
1697                                &state.db,
1698                                tip_height.into(),
1699                            )
1700                        })
1701                        .map(|info| info.size().try_into().expect("u32 should fit in usize"))
1702                        .or_else(|| {
1703                            find::tip_block(state.latest_best_chain(), &state.db)
1704                                .map(|b| b.zcash_serialized_size())
1705                        }),
1706                ))
1707            }
1708
1709            ReadRequest::NonFinalizedBlocksListener => {
1710                unreachable!("should return early");
1711            }
1712
1713            // Used by `gettxout` RPC method.
1714            ReadRequest::IsTransparentOutputSpent(outpoint) => {
1715                let is_spent = read::unspent_utxo(state.latest_best_chain(), &state.db, outpoint);
1716                Ok(ReadResponse::IsTransparentOutputSpent(is_spent.is_none()))
1717            }
1718        };
1719
1720        timed_span.spawn_blocking(request_handler)
1721    }
1722}
1723
1724/// Initialize a state service from the provided [`Config`].
1725/// Returns a boxed state service, a read-only state service,
1726/// and receivers for state chain tip updates.
1727///
1728/// Each `network` has its own separate on-disk database.
1729///
1730/// The state uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
1731/// to work out when it is near the final checkpoint.
1732///
1733/// To share access to the state, wrap the returned service in a `Buffer`,
1734/// or clone the returned [`ReadStateService`].
1735///
1736/// It's possible to construct multiple state services in the same application (as
1737/// long as they, e.g., use different storage locations), but doing so is
1738/// probably not what you want.
1739pub async fn init(
1740    config: Config,
1741    network: &Network,
1742    max_checkpoint_height: block::Height,
1743    checkpoint_verify_concurrency_limit: usize,
1744) -> (
1745    BoxService<Request, Response, BoxError>,
1746    ReadStateService,
1747    LatestChainTip,
1748    ChainTipChange,
1749) {
1750    let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
1751        StateService::new(
1752            config,
1753            network,
1754            max_checkpoint_height,
1755            checkpoint_verify_concurrency_limit,
1756        )
1757        .await;
1758
1759    (
1760        BoxService::new(state_service),
1761        read_only_state_service,
1762        latest_chain_tip,
1763        chain_tip_change,
1764    )
1765}
1766
1767/// Initialize a read state service from the provided [`Config`].
1768/// Returns a read-only state service,
1769///
1770/// Each `network` has its own separate on-disk database.
1771///
1772/// To share access to the state, clone the returned [`ReadStateService`].
1773pub fn init_read_only(
1774    config: Config,
1775    network: &Network,
1776) -> (
1777    ReadStateService,
1778    ZebraDb,
1779    tokio::sync::watch::Sender<NonFinalizedState>,
1780) {
1781    let finalized_state = FinalizedState::new_with_debug(
1782        &config,
1783        network,
1784        true,
1785        #[cfg(feature = "elasticsearch")]
1786        false,
1787        true,
1788    );
1789    let (non_finalized_state_sender, non_finalized_state_receiver) =
1790        tokio::sync::watch::channel(NonFinalizedState::new(network));
1791
1792    (
1793        ReadStateService::new(
1794            &finalized_state,
1795            None,
1796            WatchReceiver::new(non_finalized_state_receiver),
1797        ),
1798        finalized_state.db.clone(),
1799        non_finalized_state_sender,
1800    )
1801}
1802
1803/// Calls [`init_read_only`] with the provided [`Config`] and [`Network`] from a blocking task.
1804/// Returns a [`tokio::task::JoinHandle`] with a read state service and chain tip sender.
1805pub fn spawn_init_read_only(
1806    config: Config,
1807    network: &Network,
1808) -> tokio::task::JoinHandle<(
1809    ReadStateService,
1810    ZebraDb,
1811    tokio::sync::watch::Sender<NonFinalizedState>,
1812)> {
1813    let network = network.clone();
1814    tokio::task::spawn_blocking(move || init_read_only(config, &network))
1815}
1816
1817/// Returns a [`StateService`] with an ephemeral [`Config`] and a buffer with a single slot.
1818///
1819/// This can be used to create a state service for testing. See also [`init`].
1820#[cfg(any(test, feature = "proptest-impl"))]
1821pub async fn init_test(
1822    network: &Network,
1823) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
1824    // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
1825    //       if we ever need to test final checkpoint sent UTXO queries
1826    let (state_service, _, _, _) =
1827        StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
1828
1829    Buffer::new(BoxService::new(state_service), 1)
1830}
1831
1832/// Initializes a state service with an ephemeral [`Config`] and a buffer with a single slot,
1833/// then returns the read-write service, read-only service, and tip watch channels.
1834///
1835/// This can be used to create a state service for testing. See also [`init`].
1836#[cfg(any(test, feature = "proptest-impl"))]
1837pub async fn init_test_services(
1838    network: &Network,
1839) -> (
1840    Buffer<BoxService<Request, Response, BoxError>, Request>,
1841    ReadStateService,
1842    LatestChainTip,
1843    ChainTipChange,
1844) {
1845    // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
1846    //       if we ever need to test final checkpoint sent UTXO queries
1847    let (state_service, read_state_service, latest_chain_tip, chain_tip_change) =
1848        StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
1849
1850    let state_service = Buffer::new(BoxService::new(state_service), 1);
1851
1852    (
1853        state_service,
1854        read_state_service,
1855        latest_chain_tip,
1856        chain_tip_change,
1857    )
1858}