Skip to main content

zebra_state/service/
non_finalized_state.rs

1//! Non-finalized chain state management as defined by [RFC0005]
2//!
3//! [RFC0005]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html
4
5use std::{
6    collections::{BTreeSet, HashMap},
7    mem,
8    path::{Path, PathBuf},
9    sync::Arc,
10};
11
12use indexmap::IndexMap;
13use tokio::sync::watch;
14use zebra_chain::{
15    block::{self, Block, Hash, Height},
16    parameters::Network,
17    sprout::{self},
18    transparent,
19};
20
21use crate::{
22    constants::{MAX_INVALIDATED_BLOCKS, MAX_NON_FINALIZED_CHAIN_FORKS},
23    error::ReconsiderError,
24    request::{ContextuallyVerifiedBlock, FinalizableBlock},
25    service::{check, finalized_state::ZebraDb, InvalidateError},
26    SemanticallyVerifiedBlock, ValidateContextError, WatchReceiver,
27};
28
29mod backup;
30mod chain;
31
32#[cfg(test)]
33pub(crate) use backup::MIN_DURATION_BETWEEN_BACKUP_UPDATES;
34
35#[cfg(test)]
36mod tests;
37
38pub(crate) use chain::{Chain, SpendingTransactionId};
39
40/// The state of the chains in memory, including queued blocks.
41///
42/// Clones of the non-finalized state contain independent copies of the chains.
43/// This is different from `FinalizedState::clone()`,
44/// which returns a shared reference to the database.
45///
46/// Most chain data is clone-on-write using [`Arc`].
47pub struct NonFinalizedState {
48    // Chain Data
49    //
50    /// Verified, non-finalized chains, in ascending work order.
51    ///
52    /// The best chain is [`NonFinalizedState::best_chain()`], or `chain_iter().next()`.
53    /// Using `chain_set.last()` or `chain_set.iter().next_back()` is deprecated,
54    /// callers should migrate to `chain_iter().next()`.
55    chain_set: BTreeSet<Arc<Chain>>,
56
57    /// Blocks that have been invalidated in, and removed from, the non finalized
58    /// state.
59    invalidated_blocks: IndexMap<Height, Arc<Vec<ContextuallyVerifiedBlock>>>,
60
61    // Configuration
62    //
63    /// The configured Zcash network.
64    pub network: Network,
65
66    // Diagnostics
67    //
68    /// Configures the non-finalized state to count metrics.
69    ///
70    /// Used for skipping metrics and progress bars when testing block proposals
71    /// with a commit to a cloned non-finalized state.
72    //
73    // TODO: make this field private and set it via an argument to NonFinalizedState::new()
74    should_count_metrics: bool,
75
76    /// Number of chain forks transmitter.
77    #[cfg(feature = "progress-bar")]
78    chain_count_bar: Option<howudoin::Tx>,
79
80    /// A chain fork length transmitter for each [`Chain`] in [`chain_set`](Self.chain_set).
81    ///
82    /// Because `chain_set` contains `Arc<Chain>`s, it is difficult to update the metrics state
83    /// on each chain. ([`Arc`]s are read-only, and we don't want to clone them just for metrics.)
84    #[cfg(feature = "progress-bar")]
85    chain_fork_length_bars: Vec<howudoin::Tx>,
86}
87
88impl std::fmt::Debug for NonFinalizedState {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        let mut f = f.debug_struct("NonFinalizedState");
91
92        f.field("chain_set", &self.chain_set)
93            .field("network", &self.network);
94
95        f.field("should_count_metrics", &self.should_count_metrics);
96
97        f.finish()
98    }
99}
100
101impl Clone for NonFinalizedState {
102    fn clone(&self) -> Self {
103        Self {
104            chain_set: self.chain_set.clone(),
105            network: self.network.clone(),
106            invalidated_blocks: self.invalidated_blocks.clone(),
107            should_count_metrics: self.should_count_metrics,
108            // Don't track progress in clones.
109            #[cfg(feature = "progress-bar")]
110            chain_count_bar: None,
111            #[cfg(feature = "progress-bar")]
112            chain_fork_length_bars: Vec::new(),
113        }
114    }
115}
116
117impl NonFinalizedState {
118    /// Returns a new non-finalized state for `network`.
119    pub fn new(network: &Network) -> NonFinalizedState {
120        NonFinalizedState {
121            chain_set: Default::default(),
122            network: network.clone(),
123            invalidated_blocks: Default::default(),
124            should_count_metrics: true,
125            #[cfg(feature = "progress-bar")]
126            chain_count_bar: None,
127            #[cfg(feature = "progress-bar")]
128            chain_fork_length_bars: Vec::new(),
129        }
130    }
131
132    /// Writes the current non-finalized state to the backup directory at `backup_dir_path`.
133    ///
134    /// Reads the existing backup directory contents, writes any blocks that are in the
135    /// non-finalized state but missing from the backup, and deletes any backup files that
136    /// are no longer present in the non-finalized state.
137    ///
138    /// This method performs blocking I/O and should only be called from a blocking context.
139    pub(crate) fn write_to_backup(&self, backup_dir_path: &Path) {
140        let backup_blocks: HashMap<block::Hash, PathBuf> =
141            backup::list_backup_dir_entries(backup_dir_path).collect();
142        backup::update_non_finalized_state_backup(backup_dir_path, self, backup_blocks);
143    }
144
145    /// Accepts an optional path to the non-finalized state backup directory and a handle to the database.
146    ///
147    /// If a backup directory path is provided:
148    /// - Creates a new backup directory at the provided path if none exists,
149    /// - Restores non-finalized blocks from the backup directory, if any, and
150    /// - Unless `skip_backup_task` is true, spawns a task that updates the non-finalized
151    ///   backup cache with the latest non-finalized state sent to the returned watch channel.
152    ///
153    /// Returns the non-finalized state with a watch channel sender and receiver.
154    pub async fn with_backup(
155        self,
156        backup_dir_path: Option<PathBuf>,
157        finalized_state: &ZebraDb,
158        should_restore_backup: bool,
159        skip_backup_task: bool,
160    ) -> (
161        Self,
162        watch::Sender<NonFinalizedState>,
163        WatchReceiver<NonFinalizedState>,
164    ) {
165        let with_watch_channel = |non_finalized_state: NonFinalizedState| {
166            let (sender, receiver) = watch::channel(non_finalized_state.clone());
167            (non_finalized_state, sender, WatchReceiver::new(receiver))
168        };
169
170        let Some(backup_dir_path) = backup_dir_path else {
171            return with_watch_channel(self);
172        };
173
174        if skip_backup_task {
175            tracing::info!(
176                ?backup_dir_path,
177                "restoring non-finalized blocks from backup (sync write mode, backup task skipped)"
178            );
179        } else {
180            tracing::info!(
181                ?backup_dir_path,
182                "restoring non-finalized blocks from backup and spawning backup task"
183            );
184        }
185
186        let non_finalized_state = {
187            let backup_dir_path = backup_dir_path.clone();
188            let finalized_state = finalized_state.clone();
189            tokio::task::spawn_blocking(move || {
190                // Create a new backup directory if none exists
191                std::fs::create_dir_all(&backup_dir_path)
192                    .expect("failed to create non-finalized state backup directory");
193
194                if should_restore_backup {
195                    backup::restore_backup(self, &backup_dir_path, &finalized_state)
196                } else {
197                    self
198                }
199            })
200            .await
201            .expect("failed to join blocking task")
202        };
203
204        let (non_finalized_state, sender, receiver) = with_watch_channel(non_finalized_state);
205
206        if !skip_backup_task {
207            tokio::spawn(backup::run_backup_task(receiver.clone(), backup_dir_path));
208        }
209
210        if !non_finalized_state.is_chain_set_empty() {
211            let num_blocks_restored = non_finalized_state
212                .best_chain()
213                .expect("must have best chain if chain set is not empty")
214                .len();
215
216            tracing::info!(
217                ?num_blocks_restored,
218                "restored blocks from non-finalized backup cache"
219            );
220        }
221
222        (non_finalized_state, sender, receiver)
223    }
224
225    /// Is the internal state of `self` the same as `other`?
226    ///
227    /// [`Chain`] has a custom [`Eq`] implementation based on proof of work,
228    /// which is used to select the best chain. So we can't derive [`Eq`] for [`NonFinalizedState`].
229    ///
230    /// Unlike the custom trait impl, this method returns `true` if the entire internal state
231    /// of two non-finalized states is equal.
232    ///
233    /// If the internal states are different, it returns `false`,
234    /// even if the chains and blocks are equal.
235    #[cfg(any(test, feature = "proptest-impl"))]
236    #[allow(dead_code)]
237    pub fn eq_internal_state(&self, other: &NonFinalizedState) -> bool {
238        // this method must be updated every time a consensus-critical field is added to NonFinalizedState
239        // (diagnostic fields can be ignored)
240
241        self.chain_set.len() == other.chain_set.len()
242            && self
243                .chain_set
244                .iter()
245                .zip(other.chain_set.iter())
246                .all(|(self_chain, other_chain)| self_chain.eq_internal_state(other_chain))
247            && self.network == other.network
248    }
249
250    /// Returns an iterator over the non-finalized chains, with the best chain first.
251    //
252    // TODO: replace chain_set.iter().rev() with this method
253    pub fn chain_iter(&self) -> impl Iterator<Item = &Arc<Chain>> {
254        self.chain_set.iter().rev()
255    }
256
257    /// Insert `chain` into `self.chain_set`, apply `chain_filter` to the chains,
258    /// then limit the number of tracked chains.
259    fn insert_with<F>(&mut self, chain: Arc<Chain>, chain_filter: F)
260    where
261        F: FnOnce(&mut BTreeSet<Arc<Chain>>),
262    {
263        self.chain_set.insert(chain);
264
265        chain_filter(&mut self.chain_set);
266
267        while self.chain_set.len() > MAX_NON_FINALIZED_CHAIN_FORKS {
268            // The first chain is the chain with the lowest work.
269            self.chain_set.pop_first();
270        }
271
272        self.update_metrics_bars();
273    }
274
275    /// Insert `chain` into `self.chain_set`, then limit the number of tracked chains.
276    fn insert(&mut self, chain: Arc<Chain>) {
277        self.insert_with(chain, |_ignored_chain| { /* no filter */ })
278    }
279
280    /// Finalize the lowest height block in the non-finalized portion of the best
281    /// chain and update all side-chains to match.
282    pub fn finalize(&mut self) -> FinalizableBlock {
283        // Chain::cmp uses the partial cumulative work, and the hash of the tip block.
284        // Neither of these fields has interior mutability.
285        // (And when the tip block is dropped for a chain, the chain is also dropped.)
286        #[allow(clippy::mutable_key_type)]
287        let chains = mem::take(&mut self.chain_set);
288        let mut chains = chains.into_iter();
289
290        // extract best chain
291        let mut best_chain = chains.next_back().expect("there's at least one chain");
292
293        // clone if required
294        let mut_best_chain = Arc::make_mut(&mut best_chain);
295
296        // extract the rest into side_chains so they can be mutated
297        let side_chains = chains;
298
299        // Pop the lowest height block from the best chain to be finalized, and
300        // also obtain its associated treestate.
301        let (best_chain_root, root_treestate) = mut_best_chain.pop_root();
302
303        // add best_chain back to `self.chain_set`
304        if !best_chain.is_empty() {
305            self.insert(best_chain);
306        }
307
308        // for each remaining chain in side_chains
309        for mut side_chain in side_chains.rev() {
310            if side_chain.non_finalized_root_hash() != best_chain_root.hash {
311                // If we popped the root, the chain would be empty or orphaned,
312                // so just drop it now.
313                drop(side_chain);
314
315                continue;
316            }
317
318            // otherwise, the popped root block is the same as the finalizing block
319
320            // clone if required
321            let mut_side_chain = Arc::make_mut(&mut side_chain);
322
323            // remove the first block from `chain`
324            let (side_chain_root, _treestate) = mut_side_chain.pop_root();
325            assert_eq!(side_chain_root.hash, best_chain_root.hash);
326
327            // add the chain back to `self.chain_set`
328            self.insert(side_chain);
329        }
330
331        // Remove all invalidated_blocks at or below the finalized height
332        self.invalidated_blocks
333            .retain(|height, _blocks| *height >= best_chain_root.height);
334
335        self.update_metrics_for_chains();
336
337        // Add the treestate to the finalized block.
338        FinalizableBlock::new(best_chain_root, root_treestate)
339    }
340
341    /// Commit block to the non-finalized state, on top of:
342    /// - an existing chain's tip, or
343    /// - a newly forked chain.
344    #[tracing::instrument(level = "debug", skip(self, finalized_state, prepared))]
345    pub fn commit_block(
346        &mut self,
347        prepared: SemanticallyVerifiedBlock,
348        finalized_state: &ZebraDb,
349    ) -> Result<(), ValidateContextError> {
350        let parent_hash = prepared.block.header.previous_block_hash;
351        let (height, hash) = (prepared.height, prepared.hash);
352
353        let parent_chain = self.parent_chain(parent_hash)?;
354
355        // If the block is invalid, return the error,
356        // and drop the cloned parent Arc, or newly created chain fork.
357        let modified_chain = self.validate_and_commit(parent_chain, prepared, finalized_state)?;
358
359        // If the block is valid:
360        // - add the new chain fork or updated chain to the set of recent chains
361        // - remove the parent chain, if it was in the chain set
362        //   (if it was a newly created fork, it won't be in the chain set)
363        self.insert_with(modified_chain, |chain_set| {
364            chain_set.retain(|chain| chain.non_finalized_tip_hash() != parent_hash)
365        });
366
367        self.update_metrics_for_committed_block(height, hash);
368
369        Ok(())
370    }
371
372    /// Invalidate block with hash `block_hash` and all descendants from the non-finalized state. Insert
373    /// the new chain into the chain_set and discard the previous.
374    #[allow(clippy::unwrap_in_result)]
375    pub fn invalidate_block(&mut self, block_hash: Hash) -> Result<block::Hash, InvalidateError> {
376        let Some(chain) = self.find_chain(|chain| chain.contains_block_hash(block_hash)) else {
377            return Err(InvalidateError::BlockNotFound(block_hash));
378        };
379
380        let invalidated_blocks = if chain.non_finalized_root_hash() == block_hash {
381            self.chain_set.remove(&chain);
382            chain.blocks.values().cloned().collect()
383        } else {
384            let (new_chain, invalidated_blocks) = chain
385                .invalidate_block(block_hash)
386                .expect("already checked that chain contains hash");
387
388            // Add the new chain fork or updated chain to the set of recent chains, and
389            // remove the chain containing the hash of the block from chain set
390            self.insert_with(Arc::new(new_chain.clone()), |chain_set| {
391                chain_set.retain(|c| !c.contains_block_hash(block_hash))
392            });
393
394            invalidated_blocks
395        };
396
397        // TODO: Allow for invalidating multiple block hashes at a given height (#9552).
398        self.invalidated_blocks.insert(
399            invalidated_blocks
400                .first()
401                .expect("should not be empty")
402                .clone()
403                .height,
404            Arc::new(invalidated_blocks),
405        );
406
407        while self.invalidated_blocks.len() > MAX_INVALIDATED_BLOCKS {
408            self.invalidated_blocks.shift_remove_index(0);
409        }
410
411        self.update_metrics_for_chains();
412        self.update_metrics_bars();
413
414        Ok(block_hash)
415    }
416
417    /// Reconsiders a previously invalidated block and its descendants into the non-finalized state
418    /// based on a block_hash. Reconsidered blocks are inserted into the previous chain and re-inserted
419    /// into the chain_set.
420    #[allow(clippy::unwrap_in_result)]
421    pub fn reconsider_block(
422        &mut self,
423        block_hash: block::Hash,
424        finalized_state: &ZebraDb,
425    ) -> Result<Vec<block::Hash>, ReconsiderError> {
426        // Get the invalidated blocks that were invalidated by the given block_hash
427        let height = self
428            .invalidated_blocks
429            .iter()
430            .find_map(|(height, blocks)| {
431                if blocks.first()?.hash == block_hash {
432                    Some(height)
433                } else {
434                    None
435                }
436            })
437            .ok_or(ReconsiderError::MissingInvalidatedBlock(block_hash))?;
438
439        let invalidated_blocks = Arc::unwrap_or_clone(
440            self.invalidated_blocks
441                .clone()
442                .shift_remove(height)
443                .ok_or(ReconsiderError::MissingInvalidatedBlock(block_hash))?,
444        );
445
446        let invalidated_block_hashes = invalidated_blocks
447            .iter()
448            .map(|block| block.hash)
449            .collect::<Vec<_>>();
450
451        // Find and fork the parent chain of the invalidated_root. Update the parent chain
452        // with the invalidated_descendants
453        let invalidated_root = invalidated_blocks
454            .first()
455            .ok_or(ReconsiderError::InvalidatedBlocksEmpty)?;
456
457        let root_parent_hash = invalidated_root.block.header.previous_block_hash;
458
459        // If the parent is the tip of the finalized_state we create a new chain and insert it
460        // into the non finalized state
461        let chain_result = if root_parent_hash == finalized_state.finalized_tip_hash() {
462            let chain = Chain::new(
463                &self.network,
464                finalized_state
465                    .finalized_tip_height()
466                    .ok_or(ReconsiderError::ParentChainNotFound(block_hash))?,
467                finalized_state.sprout_tree_for_tip(),
468                finalized_state.sapling_tree_for_tip(),
469                finalized_state.orchard_tree_for_tip(),
470                finalized_state.history_tree(),
471                finalized_state.finalized_value_pool(),
472            );
473            Arc::new(chain)
474        } else {
475            // The parent is not the finalized_tip and still exist in the NonFinalizedState
476            // or else we return an error due to the parent not existing in the NonFinalizedState
477            self.parent_chain(root_parent_hash)
478                .map_err(|_| ReconsiderError::ParentChainNotFound(block_hash))?
479        };
480
481        let mut modified_chain = Arc::unwrap_or_clone(chain_result);
482        for block in invalidated_blocks {
483            modified_chain = modified_chain
484                .push(block)
485                .expect("previously invalidated block should be valid for chain");
486        }
487
488        let (height, hash) = modified_chain.non_finalized_tip();
489
490        // Only track invalidated_blocks that are not yet finalized. Once blocks are finalized (below the best_chain_root_height)
491        // we can discard the block.
492        if let Some(best_chain_root_height) = finalized_state.finalized_tip_height() {
493            self.invalidated_blocks
494                .retain(|height, _blocks| *height >= best_chain_root_height);
495        }
496
497        self.insert_with(Arc::new(modified_chain), |chain_set| {
498            chain_set.retain(|chain| chain.non_finalized_tip_hash() != root_parent_hash)
499        });
500
501        self.update_metrics_for_committed_block(height, hash);
502
503        Ok(invalidated_block_hashes)
504    }
505
506    /// Commit block to the non-finalized state as a new chain where its parent
507    /// is the finalized tip.
508    #[tracing::instrument(level = "debug", skip(self, finalized_state, prepared))]
509    #[allow(clippy::unwrap_in_result)]
510    pub fn commit_new_chain(
511        &mut self,
512        prepared: SemanticallyVerifiedBlock,
513        finalized_state: &ZebraDb,
514    ) -> Result<(), ValidateContextError> {
515        let finalized_tip_height = finalized_state.finalized_tip_height();
516
517        // TODO: fix tests that don't initialize the finalized state
518        #[cfg(not(test))]
519        let finalized_tip_height = finalized_tip_height.expect("finalized state contains blocks");
520        #[cfg(test)]
521        let finalized_tip_height = finalized_tip_height.unwrap_or(zebra_chain::block::Height(0));
522
523        let chain = Chain::new(
524            &self.network,
525            finalized_tip_height,
526            finalized_state.sprout_tree_for_tip(),
527            finalized_state.sapling_tree_for_tip(),
528            finalized_state.orchard_tree_for_tip(),
529            finalized_state.history_tree(),
530            finalized_state.finalized_value_pool(),
531        );
532
533        let (height, hash) = (prepared.height, prepared.hash);
534
535        // If the block is invalid, return the error, and drop the newly created chain fork
536        let chain = self.validate_and_commit(Arc::new(chain), prepared, finalized_state)?;
537
538        // If the block is valid, add the new chain fork to the set of recent chains.
539        self.insert(chain);
540        self.update_metrics_for_committed_block(height, hash);
541
542        Ok(())
543    }
544
545    /// Contextually validate `prepared` using `finalized_state`.
546    /// If validation succeeds, push `prepared` onto `new_chain`.
547    ///
548    /// `new_chain` should start as a clone of the parent chain fork,
549    /// or the finalized tip.
550    #[tracing::instrument(level = "debug", skip(self, finalized_state, new_chain))]
551    fn validate_and_commit(
552        &self,
553        new_chain: Arc<Chain>,
554        prepared: SemanticallyVerifiedBlock,
555        finalized_state: &ZebraDb,
556    ) -> Result<Arc<Chain>, ValidateContextError> {
557        if self
558            .invalidated_blocks
559            .values()
560            .any(|blocks| blocks.iter().any(|block| block.hash == prepared.hash))
561        {
562            return Err(ValidateContextError::BlockPreviouslyInvalidated {
563                block_hash: prepared.hash,
564            });
565        }
566
567        // Reads from disk
568        //
569        // TODO: if these disk reads show up in profiles, run them in parallel, using std::thread::spawn()
570        let spent_utxos = check::utxo::transparent_spend(
571            &prepared,
572            &new_chain.unspent_utxos(),
573            &new_chain.spent_utxos,
574            finalized_state,
575        )?;
576
577        // Reads from disk
578        check::anchors::block_sapling_orchard_anchors_refer_to_final_treestates(
579            finalized_state,
580            &new_chain,
581            &prepared,
582        )?;
583
584        // Reads from disk
585        let sprout_final_treestates = check::anchors::block_fetch_sprout_final_treestates(
586            finalized_state,
587            &new_chain,
588            &prepared,
589        );
590
591        // Quick check that doesn't read from disk
592        let contextual = ContextuallyVerifiedBlock::with_block_and_spent_utxos(
593            prepared.clone(),
594            spent_utxos.clone(),
595        )
596        .map_err(|value_balance_error| {
597            ValidateContextError::CalculateBlockChainValueChange {
598                value_balance_error,
599                height: prepared.height,
600                block_hash: prepared.hash,
601                transaction_count: prepared.block.transactions.len(),
602                spent_utxo_count: spent_utxos.len(),
603            }
604        })?;
605
606        Self::validate_and_update_parallel(new_chain, contextual, sprout_final_treestates)
607    }
608
609    /// Validate `contextual` and update `new_chain`, doing CPU-intensive work in parallel batches.
610    #[allow(clippy::unwrap_in_result)]
611    #[tracing::instrument(skip(new_chain, sprout_final_treestates))]
612    fn validate_and_update_parallel(
613        new_chain: Arc<Chain>,
614        contextual: ContextuallyVerifiedBlock,
615        sprout_final_treestates: HashMap<sprout::tree::Root, Arc<sprout::tree::NoteCommitmentTree>>,
616    ) -> Result<Arc<Chain>, ValidateContextError> {
617        let mut block_commitment_result = None;
618        let mut sprout_anchor_result = None;
619        let mut chain_push_result = None;
620
621        // Clone function arguments for different threads
622        let block = contextual.block.clone();
623        let network = new_chain.network();
624        let history_tree = new_chain.history_block_commitment_tree();
625
626        let block2 = contextual.block.clone();
627        let height = contextual.height;
628        let transaction_hashes = contextual.transaction_hashes.clone();
629
630        rayon::in_place_scope_fifo(|scope| {
631            scope.spawn_fifo(|_scope| {
632                block_commitment_result = Some(check::block_commitment_is_valid_for_chain_history(
633                    block,
634                    &network,
635                    &history_tree,
636                ));
637            });
638
639            scope.spawn_fifo(|_scope| {
640                sprout_anchor_result =
641                    Some(check::anchors::block_sprout_anchors_refer_to_treestates(
642                        sprout_final_treestates,
643                        block2,
644                        transaction_hashes,
645                        height,
646                    ));
647            });
648
649            // We're pretty sure the new block is valid,
650            // so clone the inner chain if needed, then add the new block.
651            //
652            // Pushing a block onto a Chain can launch additional parallel batches.
653            // TODO: should we pass _scope into Chain::push()?
654            scope.spawn_fifo(|_scope| {
655                // TODO: Replace with Arc::unwrap_or_clone() when it stabilises:
656                // https://github.com/rust-lang/rust/issues/93610
657                let new_chain = Arc::try_unwrap(new_chain)
658                    .unwrap_or_else(|shared_chain| (*shared_chain).clone());
659                chain_push_result = Some(new_chain.push(contextual).map(Arc::new));
660            });
661        });
662
663        // Don't return the updated Chain unless all the parallel results were Ok
664        block_commitment_result.expect("scope has finished")?;
665        sprout_anchor_result.expect("scope has finished")?;
666
667        chain_push_result.expect("scope has finished")
668    }
669
670    /// Returns the length of the non-finalized portion of the current best chain
671    /// or `None` if the best chain has no blocks.
672    pub fn best_chain_len(&self) -> Option<u32> {
673        // This `as` can't overflow because the number of blocks in the chain is limited to i32::MAX,
674        // and the non-finalized chain is further limited by the fork length (slightly over 100 blocks).
675        Some(self.best_chain()?.blocks.len() as u32)
676    }
677
678    /// Returns the root height of the non-finalized state, if the non-finalized state is not empty.
679    pub fn root_height(&self) -> Option<block::Height> {
680        self.best_chain()
681            .map(|chain| chain.non_finalized_root_height())
682    }
683
684    /// Returns `true` if `hash` is contained in the non-finalized portion of any
685    /// known chain.
686    #[allow(dead_code)]
687    pub fn any_chain_contains(&self, hash: &block::Hash) -> bool {
688        self.chain_set
689            .iter()
690            .rev()
691            .any(|chain| chain.height_by_hash.contains_key(hash))
692    }
693
694    /// Returns the first chain satisfying the given predicate.
695    ///
696    /// If multiple chains satisfy the predicate, returns the chain with the highest difficulty.
697    /// (Using the tip block hash tie-breaker.)
698    pub fn find_chain<P>(&self, mut predicate: P) -> Option<Arc<Chain>>
699    where
700        P: FnMut(&Chain) -> bool,
701    {
702        // Reverse the iteration order, to find highest difficulty chains first.
703        self.chain_set
704            .iter()
705            .rev()
706            .find(|chain| predicate(chain))
707            .cloned()
708    }
709
710    /// Returns the [`transparent::Utxo`] pointed to by the given
711    /// [`transparent::OutPoint`] if it is present in any chain.
712    ///
713    /// UTXOs are returned regardless of whether they have been spent.
714    pub fn any_utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Utxo> {
715        self.chain_set
716            .iter()
717            .rev()
718            .find_map(|chain| chain.created_utxo(outpoint))
719    }
720
721    /// Returns the `block` with the given hash in any chain.
722    #[allow(dead_code)]
723    pub fn any_block_by_hash(&self, hash: block::Hash) -> Option<Arc<Block>> {
724        // This performs efficiently because the number of chains is limited to 10.
725        for chain in self.chain_set.iter().rev() {
726            if let Some(prepared) = chain
727                .height_by_hash
728                .get(&hash)
729                .and_then(|height| chain.blocks.get(height))
730            {
731                return Some(prepared.block.clone());
732            }
733        }
734
735        None
736    }
737
738    /// Returns the previous block hash for the given block hash in any chain.
739    #[allow(dead_code)]
740    pub fn any_prev_block_hash_for_hash(&self, hash: block::Hash) -> Option<block::Hash> {
741        // This performs efficiently because the blocks are in memory.
742        self.any_block_by_hash(hash)
743            .map(|block| block.header.previous_block_hash)
744    }
745
746    /// Returns the hash for a given `block::Height` if it is present in the best chain.
747    #[allow(dead_code)]
748    pub fn best_hash(&self, height: block::Height) -> Option<block::Hash> {
749        self.best_chain()?
750            .blocks
751            .get(&height)
752            .map(|prepared| prepared.hash)
753    }
754
755    /// Returns the tip of the best chain.
756    #[allow(dead_code)]
757    pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
758        let best_chain = self.best_chain()?;
759        let height = best_chain.non_finalized_tip_height();
760        let hash = best_chain.non_finalized_tip_hash();
761
762        Some((height, hash))
763    }
764
765    /// Returns the block at the tip of the best chain.
766    #[allow(dead_code)]
767    pub fn best_tip_block(&self) -> Option<&ContextuallyVerifiedBlock> {
768        let best_chain = self.best_chain()?;
769
770        best_chain.tip_block()
771    }
772
773    /// Returns the height of `hash` in the best chain.
774    #[allow(dead_code)]
775    pub fn best_height_by_hash(&self, hash: block::Hash) -> Option<block::Height> {
776        let best_chain = self.best_chain()?;
777        let height = *best_chain.height_by_hash.get(&hash)?;
778        Some(height)
779    }
780
781    /// Returns the height of `hash` in any chain.
782    #[allow(dead_code)]
783    pub fn any_height_by_hash(&self, hash: block::Hash) -> Option<block::Height> {
784        for chain in self.chain_set.iter().rev() {
785            if let Some(height) = chain.height_by_hash.get(&hash) {
786                return Some(*height);
787            }
788        }
789
790        None
791    }
792
793    /// Returns `true` if the best chain contains `sprout_nullifier`.
794    #[cfg(any(test, feature = "proptest-impl"))]
795    #[allow(dead_code)]
796    pub fn best_contains_sprout_nullifier(&self, sprout_nullifier: &sprout::Nullifier) -> bool {
797        self.best_chain()
798            .map(|best_chain| best_chain.sprout_nullifiers.contains_key(sprout_nullifier))
799            .unwrap_or(false)
800    }
801
802    /// Returns `true` if the best chain contains `sapling_nullifier`.
803    #[cfg(any(test, feature = "proptest-impl"))]
804    #[allow(dead_code)]
805    pub fn best_contains_sapling_nullifier(
806        &self,
807        sapling_nullifier: &zebra_chain::sapling::Nullifier,
808    ) -> bool {
809        self.best_chain()
810            .map(|best_chain| {
811                best_chain
812                    .sapling_nullifiers
813                    .contains_key(sapling_nullifier)
814            })
815            .unwrap_or(false)
816    }
817
818    /// Returns `true` if the best chain contains `orchard_nullifier`.
819    #[cfg(any(test, feature = "proptest-impl"))]
820    #[allow(dead_code)]
821    pub fn best_contains_orchard_nullifier(
822        &self,
823        orchard_nullifier: &zebra_chain::orchard::Nullifier,
824    ) -> bool {
825        self.best_chain()
826            .map(|best_chain| {
827                best_chain
828                    .orchard_nullifiers
829                    .contains_key(orchard_nullifier)
830            })
831            .unwrap_or(false)
832    }
833
834    /// Return the non-finalized portion of the current best chain.
835    pub fn best_chain(&self) -> Option<&Arc<Chain>> {
836        self.chain_iter().next()
837    }
838
839    /// Return the number of chains.
840    pub fn chain_count(&self) -> usize {
841        self.chain_set.len()
842    }
843
844    /// Returns true if this [`NonFinalizedState`] contains no chains.
845    pub fn is_chain_set_empty(&self) -> bool {
846        self.chain_count() == 0
847    }
848
849    /// Return the invalidated blocks.
850    pub fn invalidated_blocks(&self) -> IndexMap<Height, Arc<Vec<ContextuallyVerifiedBlock>>> {
851        self.invalidated_blocks.clone()
852    }
853
854    /// Return the chain whose tip block hash is `parent_hash`.
855    ///
856    /// The chain can be an existing chain in the non-finalized state, or a freshly
857    /// created fork.
858    fn parent_chain(&self, parent_hash: block::Hash) -> Result<Arc<Chain>, ValidateContextError> {
859        match self.find_chain(|chain| chain.non_finalized_tip_hash() == parent_hash) {
860            // Clone the existing Arc<Chain> in the non-finalized state
861            Some(chain) => Ok(chain.clone()),
862            // Create a new fork
863            None => {
864                // Check the lowest difficulty chains first,
865                // because the fork could be closer to their tip.
866                let fork_chain = self
867                    .chain_set
868                    .iter()
869                    .rev()
870                    .find_map(|chain| chain.fork(parent_hash))
871                    .ok_or(ValidateContextError::NotReadyToBeCommitted)?;
872
873                Ok(Arc::new(fork_chain))
874            }
875        }
876    }
877
878    /// Should this `NonFinalizedState` instance track metrics and progress bars?
879    fn should_count_metrics(&self) -> bool {
880        self.should_count_metrics
881    }
882
883    /// Update the metrics after `block` is committed
884    fn update_metrics_for_committed_block(&self, height: block::Height, hash: block::Hash) {
885        if !self.should_count_metrics() {
886            return;
887        }
888
889        metrics::counter!("state.memory.committed.block.count").increment(1);
890        metrics::gauge!("state.memory.committed.block.height").set(height.0 as f64);
891
892        if self
893            .best_chain()
894            .expect("metrics are only updated after initialization")
895            .non_finalized_tip_hash()
896            == hash
897        {
898            metrics::counter!("state.memory.best.committed.block.count").increment(1);
899            metrics::gauge!("state.memory.best.committed.block.height").set(height.0 as f64);
900        }
901
902        self.update_metrics_for_chains();
903    }
904
905    /// Update the metrics after `self.chain_set` is modified
906    fn update_metrics_for_chains(&self) {
907        if !self.should_count_metrics() {
908            return;
909        }
910
911        metrics::gauge!("state.memory.chain.count").set(self.chain_set.len() as f64);
912        metrics::gauge!("state.memory.best.chain.length",)
913            .set(self.best_chain_len().unwrap_or_default() as f64);
914    }
915
916    /// Update the progress bars after any chain is modified.
917    /// This includes both chain forks and committed blocks.
918    fn update_metrics_bars(&mut self) {
919        // TODO: make chain_count_bar interior mutable, move to update_metrics_for_committed_block()
920
921        if !self.should_count_metrics() {
922            #[allow(clippy::needless_return)]
923            return;
924        }
925
926        #[cfg(feature = "progress-bar")]
927        {
928            use std::cmp::Ordering::*;
929
930            if matches!(howudoin::cancelled(), Some(true)) {
931                self.disable_metrics();
932                return;
933            }
934
935            // Update the chain count bar
936            if self.chain_count_bar.is_none() {
937                self.chain_count_bar = Some(howudoin::new_root().label("Chain Forks"));
938            }
939
940            let chain_count_bar = self
941                .chain_count_bar
942                .as_ref()
943                .expect("just initialized if missing");
944            let finalized_tip_height = self
945                .best_chain()
946                .map(|chain| chain.non_finalized_root_height().0 - 1);
947
948            chain_count_bar.set_pos(u64::try_from(self.chain_count()).expect("fits in u64"));
949            // .set_len(u64::try_from(MAX_NON_FINALIZED_CHAIN_FORKS).expect("fits in u64"));
950
951            if let Some(finalized_tip_height) = finalized_tip_height {
952                chain_count_bar.desc(format!("Finalized Root {finalized_tip_height}"));
953            }
954
955            // Update each chain length bar, creating or deleting bars as needed
956            let prev_length_bars = self.chain_fork_length_bars.len();
957
958            match self.chain_count().cmp(&prev_length_bars) {
959                Greater => self
960                    .chain_fork_length_bars
961                    .resize_with(self.chain_count(), || {
962                        howudoin::new_with_parent(chain_count_bar.id())
963                    }),
964                Less => {
965                    let redundant_bars = self.chain_fork_length_bars.split_off(self.chain_count());
966                    for bar in redundant_bars {
967                        bar.close();
968                    }
969                }
970                Equal => {}
971            }
972
973            // It doesn't matter what chain the bar was previously used for,
974            // because we update everything based on the latest chain in that position.
975            for (chain_length_bar, chain) in
976                std::iter::zip(self.chain_fork_length_bars.iter(), self.chain_iter())
977            {
978                let fork_height = chain
979                    .last_fork_height
980                    .unwrap_or_else(|| chain.non_finalized_tip_height())
981                    .0;
982
983                // We need to initialize and set all the values of the bar here, because:
984                // - the bar might have been newly created, or
985                // - the chain this bar was previously assigned to might have changed position.
986                chain_length_bar
987                    .label(format!("Fork {fork_height}"))
988                    .set_pos(u64::try_from(chain.len()).expect("fits in u64"));
989                // TODO: should this be MAX_BLOCK_REORG_HEIGHT?
990                // .set_len(u64::from(
991                //     zebra_chain::transparent::MIN_TRANSPARENT_COINBASE_MATURITY,
992                // ));
993
994                // TODO: store work in the finalized state for each height (#7109),
995                //       and show the full chain work here, like `zcashd` (#7110)
996                //
997                // For now, we don't show any work here, see the deleted code in PR #7087.
998                let mut desc = String::new();
999
1000                if let Some(recent_fork_height) = chain.recent_fork_height() {
1001                    let recent_fork_length = chain
1002                        .recent_fork_length()
1003                        .expect("just checked recent fork height");
1004
1005                    let mut plural = "s";
1006                    if recent_fork_length == 1 {
1007                        plural = "";
1008                    }
1009
1010                    desc.push_str(&format!(
1011                        " at {recent_fork_height:?} + {recent_fork_length} block{plural}"
1012                    ));
1013                }
1014
1015                chain_length_bar.desc(desc);
1016            }
1017        }
1018    }
1019
1020    /// Stop tracking metrics for this non-finalized state and all its chains.
1021    pub fn disable_metrics(&mut self) {
1022        self.should_count_metrics = false;
1023
1024        #[cfg(feature = "progress-bar")]
1025        {
1026            let count_bar = self.chain_count_bar.take().into_iter();
1027            let fork_bars = self.chain_fork_length_bars.drain(..);
1028            count_bar.chain(fork_bars).for_each(howudoin::Tx::close);
1029        }
1030    }
1031}
1032
1033impl Drop for NonFinalizedState {
1034    fn drop(&mut self) {
1035        self.disable_metrics();
1036    }
1037}