Skip to main content

zebra_state/service/
queued_blocks.rs

1//! Queued blocks that are awaiting their parent block for verification.
2
3use std::{
4    collections::{hash_map::Drain, BTreeMap, HashMap, HashSet, VecDeque},
5    iter, mem,
6};
7
8use tokio::sync::oneshot;
9use tracing::instrument;
10
11use zebra_chain::{block, transparent};
12
13use crate::{
14    error::{CommitBlockError, CommitCheckpointVerifiedError},
15    CheckpointVerifiedBlock, CommitSemanticallyVerifiedError, KnownBlock, NonFinalizedState,
16    SemanticallyVerifiedBlock,
17};
18
19#[cfg(test)]
20mod tests;
21
22/// A queued checkpoint verified block, and its corresponding [`Result`] channel.
23pub type QueuedCheckpointVerified = (
24    CheckpointVerifiedBlock,
25    oneshot::Sender<Result<block::Hash, CommitCheckpointVerifiedError>>,
26);
27
28/// A queued semantically verified block, and its corresponding [`Result`] channel.
29pub type QueuedSemanticallyVerified = (
30    SemanticallyVerifiedBlock,
31    oneshot::Sender<Result<block::Hash, CommitSemanticallyVerifiedError>>,
32);
33
34/// A queue of blocks, awaiting the arrival of parent blocks.
35#[derive(Debug, Default)]
36pub struct QueuedBlocks {
37    /// Blocks awaiting their parent blocks for contextual verification.
38    blocks: HashMap<block::Hash, QueuedSemanticallyVerified>,
39    /// Hashes from `queued_blocks`, indexed by parent hash.
40    by_parent: HashMap<block::Hash, HashSet<block::Hash>>,
41    /// Hashes from `queued_blocks`, indexed by block height.
42    by_height: BTreeMap<block::Height, HashSet<block::Hash>>,
43    /// Known UTXOs.
44    known_utxos: HashMap<transparent::OutPoint, transparent::Utxo>,
45}
46
47impl QueuedBlocks {
48    /// Queue a block for eventual verification and commit.
49    ///
50    /// # Panics
51    ///
52    /// - if a block with the same `block::Hash` has already been queued.
53    #[instrument(skip(self), fields(height = ?new.0.height, hash = %new.0.hash))]
54    pub fn queue(&mut self, new: QueuedSemanticallyVerified) {
55        let new_hash = new.0.hash;
56        let new_height = new.0.height;
57        let parent_hash = new.0.block.header.previous_block_hash;
58
59        if self.blocks.contains_key(&new_hash) {
60            // Skip queueing the block and return early if the hash is not unique
61            return;
62        }
63
64        // Track known UTXOs in queued blocks.
65        for (outpoint, ordered_utxo) in new.0.new_outputs.iter() {
66            self.known_utxos
67                .insert(*outpoint, ordered_utxo.utxo.clone());
68        }
69
70        self.blocks.insert(new_hash, new);
71        self.by_height
72            .entry(new_height)
73            .or_default()
74            .insert(new_hash);
75        self.by_parent
76            .entry(parent_hash)
77            .or_default()
78            .insert(new_hash);
79
80        tracing::trace!(%parent_hash, queued = %self.blocks.len(), "queued block");
81        self.update_metrics();
82    }
83
84    /// Returns `true` if there are any queued children of `parent_hash`.
85    #[instrument(skip(self), fields(%parent_hash))]
86    pub fn has_queued_children(&self, parent_hash: block::Hash) -> bool {
87        self.by_parent.contains_key(&parent_hash)
88    }
89
90    /// Dequeue and return all blocks that were waiting for the arrival of
91    /// `parent`.
92    #[instrument(skip(self), fields(%parent_hash))]
93    pub fn dequeue_children(
94        &mut self,
95        parent_hash: block::Hash,
96    ) -> Vec<QueuedSemanticallyVerified> {
97        let queued_children = self
98            .by_parent
99            .remove(&parent_hash)
100            .unwrap_or_default()
101            .into_iter()
102            .map(|hash| {
103                self.blocks
104                    .remove(&hash)
105                    .expect("block is present if its hash is in by_parent")
106            })
107            .collect::<Vec<_>>();
108
109        for queued in &queued_children {
110            self.by_height.remove(&queued.0.height);
111            // TODO: only remove UTXOs if there are no queued blocks with that UTXO
112            //       (known_utxos is best-effort, so this is ok for now)
113            for outpoint in queued.0.new_outputs.keys() {
114                self.known_utxos.remove(outpoint);
115            }
116        }
117
118        tracing::trace!(
119            dequeued = queued_children.len(),
120            remaining = self.blocks.len(),
121            "dequeued blocks"
122        );
123        self.update_metrics();
124
125        queued_children
126    }
127
128    /// Remove all queued blocks whose height is less than or equal to the given
129    /// `finalized_tip_height`.
130    #[instrument(skip(self))]
131    pub fn prune_by_height(&mut self, finalized_tip_height: block::Height) {
132        // split_off returns the values _greater than or equal to_ the key. What
133        // we need is the keys that are less than or equal to
134        // `finalized_tip_height`. To get this we have split at
135        // `finalized_tip_height + 1` and swap the removed portion of the list
136        // with the remainder.
137        let split_height = finalized_tip_height + 1;
138        let split_height =
139            split_height.expect("height after finalized tip won't exceed max height");
140        let mut by_height = self.by_height.split_off(&split_height);
141        mem::swap(&mut self.by_height, &mut by_height);
142
143        for hash in by_height.into_iter().flat_map(|(_, hashes)| hashes) {
144            let (expired_block, expired_sender) =
145                self.blocks.remove(&hash).expect("block is present");
146            let parent_hash = &expired_block.block.header.previous_block_hash;
147
148            // we don't care if the receiver was dropped
149            let _ = expired_sender.send(Err(CommitBlockError::new_duplicate(
150                Some(expired_block.height.into()),
151                KnownBlock::Finalized,
152            )
153            .into()));
154
155            // TODO: only remove UTXOs if there are no queued blocks with that UTXO
156            //       (known_utxos is best-effort, so this is ok for now)
157            for outpoint in expired_block.new_outputs.keys() {
158                self.known_utxos.remove(outpoint);
159            }
160
161            let parent_list = self
162                .by_parent
163                .get_mut(parent_hash)
164                .expect("parent is present");
165
166            if parent_list.len() == 1 {
167                let removed = self
168                    .by_parent
169                    .remove(parent_hash)
170                    .expect("parent is present");
171                assert!(
172                    removed.contains(&hash),
173                    "hash must be present in parent hash list"
174                );
175            } else {
176                assert!(
177                    parent_list.remove(&hash),
178                    "hash must be present in parent hash list"
179                );
180            }
181        }
182
183        tracing::trace!(num_blocks = %self.blocks.len(), "Finished pruning blocks at or beneath the finalized tip height");
184        self.update_metrics();
185    }
186
187    /// Return the queued block if it has already been registered
188    pub fn get_mut(&mut self, hash: &block::Hash) -> Option<&mut QueuedSemanticallyVerified> {
189        self.blocks.get_mut(hash)
190    }
191
192    /// Update metrics after the queue is modified
193    fn update_metrics(&self) {
194        if let Some(min_height) = self.by_height.keys().next() {
195            metrics::gauge!("state.memory.queued.min.height").set(min_height.0 as f64);
196        } else {
197            // use f64::NAN as a sentinel value for "None", because 0 is a valid height
198            metrics::gauge!("state.memory.queued.min.height").set(f64::NAN);
199        }
200        if let Some(max_height) = self.by_height.keys().next_back() {
201            metrics::gauge!("state.memory.queued.max.height").set(max_height.0 as f64);
202        } else {
203            // use f64::NAN as a sentinel value for "None", because 0 is a valid height
204            metrics::gauge!("state.memory.queued.max.height").set(f64::NAN);
205        }
206
207        metrics::gauge!("state.memory.queued.block.count").set(self.blocks.len() as f64);
208    }
209
210    /// Try to look up this UTXO in any queued block.
211    #[instrument(skip(self))]
212    pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Utxo> {
213        self.known_utxos.get(outpoint).cloned()
214    }
215
216    /// Clears known_utxos, by_parent, and by_height, then drains blocks.
217    /// Returns all key-value pairs of blocks as an iterator.
218    ///
219    /// Doesn't update the metrics, because it is only used when the state is being dropped.
220    pub fn drain(&mut self) -> Drain<'_, block::Hash, QueuedSemanticallyVerified> {
221        self.known_utxos.clear();
222        self.known_utxos.shrink_to_fit();
223        self.by_parent.clear();
224        self.by_parent.shrink_to_fit();
225        self.by_height.clear();
226
227        self.blocks.drain()
228    }
229}
230
231#[derive(Debug, Default)]
232pub(crate) struct SentHashes {
233    /// A list of previously sent block batches, each batch is in increasing height order.
234    /// We use this list to efficiently prune outdated hashes that are at or below the finalized tip.
235    bufs: Vec<VecDeque<(block::Hash, block::Height)>>,
236
237    /// The list of blocks sent in the current batch, in increasing height order.
238    curr_buf: VecDeque<(block::Hash, block::Height)>,
239
240    /// Stores a set of hashes that have been sent to the block write task but
241    /// may not be in the finalized state yet.
242    pub sent: HashMap<block::Hash, Vec<transparent::OutPoint>>,
243
244    /// Known UTXOs.
245    known_utxos: HashMap<transparent::OutPoint, transparent::Utxo>,
246
247    /// Whether the hashes in this struct can be used check if the chain can be forked.
248    /// This is set to false until all checkpoint-verified block hashes have been pruned.
249    pub(crate) can_fork_chain_at_hashes: bool,
250}
251
252impl SentHashes {
253    /// Creates a new [`SentHashes`] with the block hashes and UTXOs in the provided non-finalized state.
254    pub fn new(non_finalized_state: &NonFinalizedState) -> Self {
255        let mut sent_hashes = Self::default();
256        for (_, block) in non_finalized_state
257            .chain_iter()
258            .flat_map(|c| c.blocks.clone())
259        {
260            sent_hashes.add(&block.into());
261        }
262
263        if !sent_hashes.sent.is_empty() {
264            sent_hashes.can_fork_chain_at_hashes = true;
265        }
266
267        sent_hashes
268    }
269
270    /// Stores the `block`'s hash, height, and UTXOs, so they can be used to check if a block or UTXO
271    /// is available in the state.
272    ///
273    /// Assumes that blocks are added in the order of their height between `finish_batch` calls
274    /// for efficient pruning.
275    pub fn add(&mut self, block: &SemanticallyVerifiedBlock) {
276        // Track known UTXOs in sent blocks.
277        let outpoints = block
278            .new_outputs
279            .iter()
280            .map(|(outpoint, ordered_utxo)| {
281                self.known_utxos
282                    .insert(*outpoint, ordered_utxo.utxo.clone());
283                outpoint
284            })
285            .cloned()
286            .collect();
287
288        self.curr_buf.push_back((block.hash, block.height));
289        self.sent.insert(block.hash, outpoints);
290
291        self.update_metrics_for_block(block.height);
292    }
293
294    /// Stores the checkpoint verified `block`'s hash, height, and UTXOs, so they can be used to check if a
295    /// block or UTXO is available in the state.
296    ///
297    /// Used for checkpoint verified blocks close to the final checkpoint, so the semantic block verifier can look up
298    /// their UTXOs.
299    ///
300    /// Assumes that blocks are added in the order of their height between `finish_batch` calls
301    /// for efficient pruning.
302    ///
303    /// For more details see `add()`.
304    pub fn add_finalized(&mut self, block: &CheckpointVerifiedBlock) {
305        // Track known UTXOs in sent blocks.
306        let outpoints = block
307            .new_outputs
308            .iter()
309            .map(|(outpoint, ordered_utxo)| {
310                self.known_utxos
311                    .insert(*outpoint, ordered_utxo.utxo.clone());
312                outpoint
313            })
314            .cloned()
315            .collect();
316
317        self.curr_buf.push_back((block.hash, block.height));
318        self.sent.insert(block.hash, outpoints);
319
320        self.update_metrics_for_block(block.height);
321    }
322
323    /// Try to look up this UTXO in any sent block.
324    #[instrument(skip(self))]
325    pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Utxo> {
326        self.known_utxos.get(outpoint).cloned()
327    }
328
329    /// Finishes the current block batch, and stores it for efficient pruning.
330    pub fn finish_batch(&mut self) {
331        if !self.curr_buf.is_empty() {
332            self.bufs.push(std::mem::take(&mut self.curr_buf));
333        }
334    }
335
336    /// Prunes sent blocks at or below `height_bound`.
337    ///
338    /// Finishes the batch if `finish_batch()` hasn't been called already.
339    ///
340    /// Assumes that blocks will be added in order of their heights between each `finish_batch()` call,
341    /// so that blocks can be efficiently and reliably removed by height.
342    pub fn prune_by_height(&mut self, height_bound: block::Height) {
343        self.finish_batch();
344
345        // Iterates over each buf in `sent_bufs`, removing sent blocks until reaching
346        // the first block with a height above the `height_bound`.
347        self.bufs.retain_mut(|buf| {
348            while let Some((hash, height)) = buf.pop_front() {
349                if height > height_bound {
350                    buf.push_front((hash, height));
351                    return true;
352                } else if let Some(expired_outpoints) = self.sent.remove(&hash) {
353                    // TODO: only remove UTXOs if there are no queued blocks with that UTXO
354                    //       (known_utxos is best-effort, so this is ok for now)
355                    for outpoint in expired_outpoints.iter() {
356                        self.known_utxos.remove(outpoint);
357                    }
358                }
359            }
360
361            false
362        });
363
364        self.sent.shrink_to_fit();
365        self.known_utxos.shrink_to_fit();
366        self.bufs.shrink_to_fit();
367
368        self.update_metrics_for_cache();
369    }
370
371    /// Returns true if SentHashes contains the `hash`
372    pub fn contains(&self, hash: &block::Hash) -> bool {
373        self.sent.contains_key(hash)
374    }
375
376    /// Returns true if the chain can be forked at the provided hash
377    pub fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
378        self.can_fork_chain_at_hashes && self.contains(hash)
379    }
380
381    /// Update sent block metrics after a block is sent.
382    fn update_metrics_for_block(&self, height: block::Height) {
383        metrics::counter!("state.memory.sent.block.count").increment(1);
384        metrics::gauge!("state.memory.sent.block.height").set(height.0 as f64);
385
386        self.update_metrics_for_cache();
387    }
388
389    /// Update sent block cache metrics after the sent blocks are modified.
390    fn update_metrics_for_cache(&self) {
391        let batch_iter = || self.bufs.iter().chain(iter::once(&self.curr_buf));
392
393        if let Some(min_height) = batch_iter()
394            .flat_map(|batch| batch.front().map(|(_hash, height)| height))
395            .min()
396        {
397            metrics::gauge!("state.memory.sent.cache.min.height").set(min_height.0 as f64);
398        } else {
399            // use f64::NAN as a sentinel value for "None", because 0 is a valid height
400            metrics::gauge!("state.memory.sent.cache.min.height").set(f64::NAN);
401        }
402
403        if let Some(max_height) = batch_iter()
404            .flat_map(|batch| batch.back().map(|(_hash, height)| height))
405            .max()
406        {
407            metrics::gauge!("state.memory.sent.cache.max.height").set(max_height.0 as f64);
408        } else {
409            // use f64::NAN as a sentinel value for "None", because 0 is a valid height
410            metrics::gauge!("state.memory.sent.cache.max.height").set(f64::NAN);
411        }
412
413        metrics::gauge!("state.memory.sent.cache.block.count")
414            .set(batch_iter().flatten().count() as f64);
415
416        metrics::gauge!("state.memory.sent.cache.batch.count").set(batch_iter().count() as f64);
417    }
418}