1use std::{
4 path::{Path, PathBuf},
5 sync::Arc,
6};
7
8use indexmap::IndexMap;
9use tokio::sync::{
10 mpsc::{UnboundedReceiver, UnboundedSender},
11 oneshot, watch,
12};
13
14use tracing::Span;
15use zebra_chain::block::{self, Height};
16
17use crate::{
18 constants::MAX_BLOCK_REORG_HEIGHT,
19 service::{
20 check,
21 finalized_state::{FinalizedState, ZebraDb},
22 non_finalized_state::NonFinalizedState,
23 queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified},
24 ChainTipBlock, ChainTipSender, InvalidateError, ReconsiderError,
25 },
26 SemanticallyVerifiedBlock, ValidateContextError,
27};
28
29#[allow(unused_imports)]
31use crate::service::{
32 chain_tip::{ChainTipChange, LatestChainTip},
33 non_finalized_state::Chain,
34};
35
36const PARENT_ERROR_MAP_LIMIT: usize = MAX_BLOCK_REORG_HEIGHT as usize * 2;
40
41#[tracing::instrument(
44 level = "debug",
45 skip(finalized_state, non_finalized_state, prepared),
46 fields(
47 height = ?prepared.height,
48 hash = %prepared.hash,
49 chains = non_finalized_state.chain_count()
50 )
51)]
52pub(crate) fn validate_and_commit_non_finalized(
53 finalized_state: &ZebraDb,
54 non_finalized_state: &mut NonFinalizedState,
55 prepared: SemanticallyVerifiedBlock,
56) -> Result<(), ValidateContextError> {
57 check::initial_contextual_validity(finalized_state, non_finalized_state, &prepared)?;
58 let parent_hash = prepared.block.header.previous_block_hash;
59
60 if finalized_state.finalized_tip_hash() == parent_hash {
61 non_finalized_state.commit_new_chain(prepared, finalized_state)?;
62 } else {
63 non_finalized_state.commit_block(prepared, finalized_state)?;
64 }
65
66 Ok(())
67}
68
69#[instrument(
84 level = "debug",
85 skip(
86 non_finalized_state,
87 chain_tip_sender,
88 non_finalized_state_sender,
89 backup_dir_path,
90 ),
91 fields(chains = non_finalized_state.chain_count())
92)]
93fn update_latest_chain_channels(
94 non_finalized_state: &NonFinalizedState,
95 chain_tip_sender: &mut ChainTipSender,
96 non_finalized_state_sender: &watch::Sender<NonFinalizedState>,
97 backup_dir_path: Option<&Path>,
98) -> block::Height {
99 let best_chain = non_finalized_state.best_chain().expect("unexpected empty non-finalized state: must commit at least one block before updating channels");
100
101 let tip_block = best_chain
102 .tip_block()
103 .expect("unexpected empty chain: must commit at least one block before updating channels")
104 .clone();
105 let tip_block = ChainTipBlock::from(tip_block);
106
107 let tip_block_height = tip_block.height;
108
109 if let Some(backup_dir_path) = backup_dir_path {
110 non_finalized_state.write_to_backup(backup_dir_path);
111 }
112
113 let _ = non_finalized_state_sender.send(non_finalized_state.clone());
115
116 chain_tip_sender.set_best_non_finalized_tip(tip_block);
117
118 tip_block_height
119}
120
121struct WriteBlockWorkerTask {
124 finalized_block_write_receiver: UnboundedReceiver<QueuedCheckpointVerified>,
125 non_finalized_block_write_receiver: UnboundedReceiver<NonFinalizedWriteMessage>,
126 finalized_state: FinalizedState,
127 non_finalized_state: NonFinalizedState,
128 invalid_block_reset_sender: UnboundedSender<block::Hash>,
129 non_finalized_rejected_sender: UnboundedSender<block::Hash>,
137 chain_tip_sender: ChainTipSender,
138 non_finalized_state_sender: watch::Sender<NonFinalizedState>,
139 backup_dir_path: Option<PathBuf>,
142}
143
144pub enum NonFinalizedWriteMessage {
146 Commit(QueuedSemanticallyVerified),
149 Invalidate {
152 hash: block::Hash,
153 rsp_tx: oneshot::Sender<Result<block::Hash, InvalidateError>>,
154 },
155 Reconsider {
158 hash: block::Hash,
159 rsp_tx: oneshot::Sender<Result<Vec<block::Hash>, ReconsiderError>>,
160 },
161}
162
163impl From<QueuedSemanticallyVerified> for NonFinalizedWriteMessage {
164 fn from(block: QueuedSemanticallyVerified) -> Self {
165 NonFinalizedWriteMessage::Commit(block)
166 }
167}
168
169#[derive(Clone, Debug)]
173pub(super) struct BlockWriteSender {
174 pub non_finalized: Option<tokio::sync::mpsc::UnboundedSender<NonFinalizedWriteMessage>>,
177
178 pub finalized: Option<tokio::sync::mpsc::UnboundedSender<QueuedCheckpointVerified>>,
184}
185
186impl BlockWriteSender {
187 #[instrument(
189 level = "debug",
190 skip_all,
191 fields(
192 network = %non_finalized_state.network
193 )
194 )]
195 pub fn spawn(
196 finalized_state: FinalizedState,
197 non_finalized_state: NonFinalizedState,
198 chain_tip_sender: ChainTipSender,
199 non_finalized_state_sender: watch::Sender<NonFinalizedState>,
200 should_use_finalized_block_write_sender: bool,
201 backup_dir_path: Option<PathBuf>,
202 ) -> (
203 Self,
204 tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
205 tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
206 Option<Arc<std::thread::JoinHandle<()>>>,
207 ) {
208 let (non_finalized_block_write_sender, non_finalized_block_write_receiver) =
211 tokio::sync::mpsc::unbounded_channel();
212 let (finalized_block_write_sender, finalized_block_write_receiver) =
213 tokio::sync::mpsc::unbounded_channel();
214 let (invalid_block_reset_sender, invalid_block_write_reset_receiver) =
215 tokio::sync::mpsc::unbounded_channel();
216 let (non_finalized_rejected_sender, non_finalized_rejected_receiver) =
217 tokio::sync::mpsc::unbounded_channel();
218
219 let span = Span::current();
220 let task = std::thread::spawn(move || {
221 span.in_scope(|| {
222 WriteBlockWorkerTask {
223 finalized_block_write_receiver,
224 non_finalized_block_write_receiver,
225 finalized_state,
226 non_finalized_state,
227 invalid_block_reset_sender,
228 non_finalized_rejected_sender,
229 chain_tip_sender,
230 non_finalized_state_sender,
231 backup_dir_path,
232 }
233 .run()
234 })
235 });
236
237 (
238 Self {
239 non_finalized: Some(non_finalized_block_write_sender),
240 finalized: should_use_finalized_block_write_sender
241 .then_some(finalized_block_write_sender),
242 },
243 invalid_block_write_reset_receiver,
244 non_finalized_rejected_receiver,
245 Some(Arc::new(task)),
246 )
247 }
248}
249
250impl WriteBlockWorkerTask {
251 #[instrument(
255 level = "debug",
256 skip(self),
257 fields(
258 network = %self.non_finalized_state.network
259 )
260 )]
261 pub fn run(mut self) {
262 let Self {
263 finalized_block_write_receiver,
264 non_finalized_block_write_receiver,
265 finalized_state,
266 non_finalized_state,
267 invalid_block_reset_sender,
268 non_finalized_rejected_sender,
269 chain_tip_sender,
270 non_finalized_state_sender,
271 backup_dir_path,
272 } = &mut self;
273
274 let mut prev_finalized_note_commitment_trees = None;
275
276 while let Some(ordered_block) = finalized_block_write_receiver.blocking_recv() {
279 if invalid_block_reset_sender.is_closed() {
282 info!("StateService closed the block reset channel. Is Zebra shutting down?");
283 return;
284 }
285
286 let next_valid_height = finalized_state
293 .db
294 .finalized_tip_height()
295 .map(|height| (height + 1).expect("committed heights are valid"))
296 .unwrap_or(Height(0));
297
298 if ordered_block.0.height != next_valid_height {
299 debug!(
300 ?next_valid_height,
301 invalid_height = ?ordered_block.0.height,
302 invalid_hash = ?ordered_block.0.hash,
303 "got a block that was the wrong height. \
304 Assuming a parent block failed, and dropping this block",
305 );
306
307 std::mem::drop(ordered_block);
309 continue;
310 }
311
312 match finalized_state
314 .commit_finalized(ordered_block, prev_finalized_note_commitment_trees.take())
315 {
316 Ok((finalized, note_commitment_trees)) => {
317 let tip_block = ChainTipBlock::from(finalized);
318 prev_finalized_note_commitment_trees = Some(note_commitment_trees);
319 chain_tip_sender.set_finalized_tip(tip_block);
320 }
321 Err(error) => {
322 let finalized_tip = finalized_state.db.tip();
323
324 info!(
328 ?error,
329 last_valid_height = ?finalized_tip.map(|tip| tip.0),
330 last_valid_hash = ?finalized_tip.map(|tip| tip.1),
331 "committing a block to the finalized state failed, resetting state queue",
332 );
333
334 let send_result =
335 invalid_block_reset_sender.send(finalized_state.db.finalized_tip_hash());
336
337 if send_result.is_err() {
338 info!(
339 "StateService closed the block reset channel. Is Zebra shutting down?"
340 );
341 return;
342 }
343 }
344 }
345 }
346
347 if invalid_block_reset_sender.is_closed() {
350 info!("StateService closed the block reset channel. Is Zebra shutting down?");
351 return;
352 }
353
354 let mut parent_error_map: IndexMap<block::Hash, ValidateContextError> = IndexMap::new();
356
357 while let Some(msg) = non_finalized_block_write_receiver.blocking_recv() {
358 let queued_child_and_rsp_tx = match msg {
359 NonFinalizedWriteMessage::Commit(queued_child) => Some(queued_child),
360 NonFinalizedWriteMessage::Invalidate { hash, rsp_tx } => {
361 tracing::info!(?hash, "invalidating a block in the non-finalized state");
362 let _ = rsp_tx.send(non_finalized_state.invalidate_block(hash));
363 None
364 }
365 NonFinalizedWriteMessage::Reconsider { hash, rsp_tx } => {
366 tracing::info!(?hash, "reconsidering a block in the non-finalized state");
367 let _ = rsp_tx
368 .send(non_finalized_state.reconsider_block(hash, &finalized_state.db));
369 None
370 }
371 };
372
373 let Some((queued_child, rsp_tx)) = queued_child_and_rsp_tx else {
374 update_latest_chain_channels(
375 non_finalized_state,
376 chain_tip_sender,
377 non_finalized_state_sender,
378 backup_dir_path.as_deref(),
379 );
380 continue;
381 };
382
383 let child_hash = queued_child.hash;
384 let parent_hash = queued_child.block.header.previous_block_hash;
385 let parent_error = parent_error_map.get(&parent_hash);
386
387 let result = if let Some(parent_error) = parent_error {
393 Err(parent_error.clone())
394 } else {
395 tracing::trace!(?child_hash, "validating queued child");
396 validate_and_commit_non_finalized(
397 &finalized_state.db,
398 non_finalized_state,
399 queued_child,
400 )
401 };
402
403 if let Err(ref error) = result {
408 parent_error_map.insert(child_hash, error.clone());
410
411 if parent_error_map.len() > PARENT_ERROR_MAP_LIMIT {
413 parent_error_map.shift_remove_index(0);
415 }
416
417 let _ = non_finalized_rejected_sender.send(child_hash);
427
428 let _ = rsp_tx.send(result.map(|()| child_hash).map_err(Into::into));
430
431 continue;
433 }
434
435 let tip_block_height = update_latest_chain_channels(
442 non_finalized_state,
443 chain_tip_sender,
444 non_finalized_state_sender,
445 backup_dir_path.as_deref(),
446 );
447
448 let _ = rsp_tx.send(result.map(|()| child_hash).map_err(Into::into));
450
451 while non_finalized_state
452 .best_chain_len()
453 .expect("just successfully inserted a non-finalized block above")
454 > MAX_BLOCK_REORG_HEIGHT
455 {
456 tracing::trace!("finalizing block past the reorg limit");
457 let contextually_verified_with_trees = non_finalized_state.finalize();
458 prev_finalized_note_commitment_trees = finalized_state
459 .commit_finalized_direct(contextually_verified_with_trees, prev_finalized_note_commitment_trees.take(), "commit contextually-verified request")
460 .expect(
461 "unexpected finalized block commit error: note commitment and history trees were already checked by the non-finalized state",
462 ).1.into();
463 }
464
465 metrics::counter!("state.full_verifier.committed.block.count").increment(1);
469 metrics::counter!("zcash.chain.verified.block.total").increment(1);
470
471 metrics::gauge!("state.full_verifier.committed.block.height")
472 .set(tip_block_height.0 as f64);
473
474 metrics::gauge!("zcash.chain.verified.block.height").set(tip_block_height.0 as f64);
478
479 tracing::trace!("finished processing queued block");
480 }
481
482 finalized_state.db.shutdown(true);
485 std::mem::drop(self.finalized_state);
486 }
487}