1use std::{
4 collections::{hash_map::Drain, BTreeMap, HashMap, HashSet, VecDeque},
5 iter, mem,
6};
7
8use tokio::sync::oneshot;
9use tracing::instrument;
10
11use zebra_chain::{block, transparent};
12
13use crate::{
14 error::{CommitBlockError, CommitCheckpointVerifiedError},
15 CheckpointVerifiedBlock, CommitSemanticallyVerifiedError, KnownBlock, NonFinalizedState,
16 SemanticallyVerifiedBlock,
17};
18
19#[cfg(test)]
20mod tests;
21
22pub type QueuedCheckpointVerified = (
24 CheckpointVerifiedBlock,
25 oneshot::Sender<Result<block::Hash, CommitCheckpointVerifiedError>>,
26);
27
28pub type QueuedSemanticallyVerified = (
30 SemanticallyVerifiedBlock,
31 oneshot::Sender<Result<block::Hash, CommitSemanticallyVerifiedError>>,
32);
33
34#[derive(Debug, Default)]
36pub struct QueuedBlocks {
37 blocks: HashMap<block::Hash, QueuedSemanticallyVerified>,
39 by_parent: HashMap<block::Hash, HashSet<block::Hash>>,
41 by_height: BTreeMap<block::Height, HashSet<block::Hash>>,
43 known_utxos: HashMap<transparent::OutPoint, transparent::Utxo>,
45}
46
47impl QueuedBlocks {
48 #[instrument(skip(self), fields(height = ?new.0.height, hash = %new.0.hash))]
54 pub fn queue(&mut self, new: QueuedSemanticallyVerified) {
55 let new_hash = new.0.hash;
56 let new_height = new.0.height;
57 let parent_hash = new.0.block.header.previous_block_hash;
58
59 if self.blocks.contains_key(&new_hash) {
60 return;
62 }
63
64 for (outpoint, ordered_utxo) in new.0.new_outputs.iter() {
66 self.known_utxos
67 .insert(*outpoint, ordered_utxo.utxo.clone());
68 }
69
70 self.blocks.insert(new_hash, new);
71 self.by_height
72 .entry(new_height)
73 .or_default()
74 .insert(new_hash);
75 self.by_parent
76 .entry(parent_hash)
77 .or_default()
78 .insert(new_hash);
79
80 tracing::trace!(%parent_hash, queued = %self.blocks.len(), "queued block");
81 self.update_metrics();
82 }
83
84 #[instrument(skip(self), fields(%parent_hash))]
86 pub fn has_queued_children(&self, parent_hash: block::Hash) -> bool {
87 self.by_parent.contains_key(&parent_hash)
88 }
89
90 #[instrument(skip(self), fields(%parent_hash))]
93 pub fn dequeue_children(
94 &mut self,
95 parent_hash: block::Hash,
96 ) -> Vec<QueuedSemanticallyVerified> {
97 let queued_children = self
98 .by_parent
99 .remove(&parent_hash)
100 .unwrap_or_default()
101 .into_iter()
102 .map(|hash| {
103 self.blocks
104 .remove(&hash)
105 .expect("block is present if its hash is in by_parent")
106 })
107 .collect::<Vec<_>>();
108
109 for queued in &queued_children {
110 self.by_height.remove(&queued.0.height);
111 for outpoint in queued.0.new_outputs.keys() {
114 self.known_utxos.remove(outpoint);
115 }
116 }
117
118 tracing::trace!(
119 dequeued = queued_children.len(),
120 remaining = self.blocks.len(),
121 "dequeued blocks"
122 );
123 self.update_metrics();
124
125 queued_children
126 }
127
128 #[instrument(skip(self))]
131 pub fn prune_by_height(&mut self, finalized_tip_height: block::Height) {
132 let split_height = finalized_tip_height + 1;
138 let split_height =
139 split_height.expect("height after finalized tip won't exceed max height");
140 let mut by_height = self.by_height.split_off(&split_height);
141 mem::swap(&mut self.by_height, &mut by_height);
142
143 for hash in by_height.into_iter().flat_map(|(_, hashes)| hashes) {
144 let (expired_block, expired_sender) =
145 self.blocks.remove(&hash).expect("block is present");
146 let parent_hash = &expired_block.block.header.previous_block_hash;
147
148 let _ = expired_sender.send(Err(CommitBlockError::new_duplicate(
150 Some(expired_block.height.into()),
151 KnownBlock::Finalized,
152 )
153 .into()));
154
155 for outpoint in expired_block.new_outputs.keys() {
158 self.known_utxos.remove(outpoint);
159 }
160
161 let parent_list = self
162 .by_parent
163 .get_mut(parent_hash)
164 .expect("parent is present");
165
166 if parent_list.len() == 1 {
167 let removed = self
168 .by_parent
169 .remove(parent_hash)
170 .expect("parent is present");
171 assert!(
172 removed.contains(&hash),
173 "hash must be present in parent hash list"
174 );
175 } else {
176 assert!(
177 parent_list.remove(&hash),
178 "hash must be present in parent hash list"
179 );
180 }
181 }
182
183 tracing::trace!(num_blocks = %self.blocks.len(), "Finished pruning blocks at or beneath the finalized tip height");
184 self.update_metrics();
185 }
186
187 pub fn get_mut(&mut self, hash: &block::Hash) -> Option<&mut QueuedSemanticallyVerified> {
189 self.blocks.get_mut(hash)
190 }
191
192 fn update_metrics(&self) {
194 if let Some(min_height) = self.by_height.keys().next() {
195 metrics::gauge!("state.memory.queued.min.height").set(min_height.0 as f64);
196 } else {
197 metrics::gauge!("state.memory.queued.min.height").set(f64::NAN);
199 }
200 if let Some(max_height) = self.by_height.keys().next_back() {
201 metrics::gauge!("state.memory.queued.max.height").set(max_height.0 as f64);
202 } else {
203 metrics::gauge!("state.memory.queued.max.height").set(f64::NAN);
205 }
206
207 metrics::gauge!("state.memory.queued.block.count").set(self.blocks.len() as f64);
208 }
209
210 #[instrument(skip(self))]
212 pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Utxo> {
213 self.known_utxos.get(outpoint).cloned()
214 }
215
216 pub fn drain(&mut self) -> Drain<'_, block::Hash, QueuedSemanticallyVerified> {
221 self.known_utxos.clear();
222 self.known_utxos.shrink_to_fit();
223 self.by_parent.clear();
224 self.by_parent.shrink_to_fit();
225 self.by_height.clear();
226
227 self.blocks.drain()
228 }
229}
230
231#[derive(Debug, Default)]
232pub(crate) struct SentHashes {
233 bufs: Vec<VecDeque<(block::Hash, block::Height)>>,
236
237 curr_buf: VecDeque<(block::Hash, block::Height)>,
239
240 pub sent: HashMap<block::Hash, Vec<transparent::OutPoint>>,
243
244 known_utxos: HashMap<transparent::OutPoint, transparent::Utxo>,
246
247 pub(crate) can_fork_chain_at_hashes: bool,
250}
251
252impl SentHashes {
253 pub fn new(non_finalized_state: &NonFinalizedState) -> Self {
255 let mut sent_hashes = Self::default();
256 for (_, block) in non_finalized_state
257 .chain_iter()
258 .flat_map(|c| c.blocks.clone())
259 {
260 sent_hashes.add(&block.into());
261 }
262
263 if !sent_hashes.sent.is_empty() {
264 sent_hashes.can_fork_chain_at_hashes = true;
265 }
266
267 sent_hashes
268 }
269
270 pub fn add(&mut self, block: &SemanticallyVerifiedBlock) {
276 let outpoints = block
278 .new_outputs
279 .iter()
280 .map(|(outpoint, ordered_utxo)| {
281 self.known_utxos
282 .insert(*outpoint, ordered_utxo.utxo.clone());
283 outpoint
284 })
285 .cloned()
286 .collect();
287
288 self.curr_buf.push_back((block.hash, block.height));
289 self.sent.insert(block.hash, outpoints);
290
291 self.update_metrics_for_block(block.height);
292 }
293
294 pub fn add_finalized(&mut self, block: &CheckpointVerifiedBlock) {
305 let outpoints = block
307 .new_outputs
308 .iter()
309 .map(|(outpoint, ordered_utxo)| {
310 self.known_utxos
311 .insert(*outpoint, ordered_utxo.utxo.clone());
312 outpoint
313 })
314 .cloned()
315 .collect();
316
317 self.curr_buf.push_back((block.hash, block.height));
318 self.sent.insert(block.hash, outpoints);
319
320 self.update_metrics_for_block(block.height);
321 }
322
323 #[instrument(skip(self))]
325 pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Utxo> {
326 self.known_utxos.get(outpoint).cloned()
327 }
328
329 pub fn finish_batch(&mut self) {
331 if !self.curr_buf.is_empty() {
332 self.bufs.push(std::mem::take(&mut self.curr_buf));
333 }
334 }
335
336 pub fn prune_by_height(&mut self, height_bound: block::Height) {
343 self.finish_batch();
344
345 self.bufs.retain_mut(|buf| {
348 while let Some((hash, height)) = buf.pop_front() {
349 if height > height_bound {
350 buf.push_front((hash, height));
351 return true;
352 } else if let Some(expired_outpoints) = self.sent.remove(&hash) {
353 for outpoint in expired_outpoints.iter() {
356 self.known_utxos.remove(outpoint);
357 }
358 }
359 }
360
361 false
362 });
363
364 self.sent.shrink_to_fit();
365 self.known_utxos.shrink_to_fit();
366 self.bufs.shrink_to_fit();
367
368 self.update_metrics_for_cache();
369 }
370
371 pub fn contains(&self, hash: &block::Hash) -> bool {
373 self.sent.contains_key(hash)
374 }
375
376 pub fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
378 self.can_fork_chain_at_hashes && self.contains(hash)
379 }
380
381 fn update_metrics_for_block(&self, height: block::Height) {
383 metrics::counter!("state.memory.sent.block.count").increment(1);
384 metrics::gauge!("state.memory.sent.block.height").set(height.0 as f64);
385
386 self.update_metrics_for_cache();
387 }
388
389 fn update_metrics_for_cache(&self) {
391 let batch_iter = || self.bufs.iter().chain(iter::once(&self.curr_buf));
392
393 if let Some(min_height) = batch_iter()
394 .flat_map(|batch| batch.front().map(|(_hash, height)| height))
395 .min()
396 {
397 metrics::gauge!("state.memory.sent.cache.min.height").set(min_height.0 as f64);
398 } else {
399 metrics::gauge!("state.memory.sent.cache.min.height").set(f64::NAN);
401 }
402
403 if let Some(max_height) = batch_iter()
404 .flat_map(|batch| batch.back().map(|(_hash, height)| height))
405 .max()
406 {
407 metrics::gauge!("state.memory.sent.cache.max.height").set(max_height.0 as f64);
408 } else {
409 metrics::gauge!("state.memory.sent.cache.max.height").set(f64::NAN);
411 }
412
413 metrics::gauge!("state.memory.sent.cache.block.count")
414 .set(batch_iter().flatten().count() as f64);
415
416 metrics::gauge!("state.memory.sent.cache.batch.count").set(batch_iter().count() as f64);
417 }
418}