1use std::{
4 path::{Path, PathBuf},
5 sync::Arc,
6};
7
8use indexmap::IndexMap;
9use tokio::sync::{
10 mpsc::{UnboundedReceiver, UnboundedSender},
11 oneshot, watch,
12};
13
14use tracing::Span;
15use zebra_chain::{
16 block::{self, Height},
17 transparent::EXTRA_ZEBRA_COINBASE_DATA,
18};
19
20use crate::{
21 constants::MAX_BLOCK_REORG_HEIGHT,
22 service::{
23 check,
24 finalized_state::{FinalizedState, ZebraDb},
25 non_finalized_state::NonFinalizedState,
26 queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified},
27 ChainTipBlock, ChainTipSender, InvalidateError, ReconsiderError,
28 },
29 SemanticallyVerifiedBlock, ValidateContextError,
30};
31
32#[allow(unused_imports)]
34use crate::service::{
35 chain_tip::{ChainTipChange, LatestChainTip},
36 non_finalized_state::Chain,
37};
38
39const PARENT_ERROR_MAP_LIMIT: usize = MAX_BLOCK_REORG_HEIGHT as usize * 2;
43
44#[tracing::instrument(
47 level = "debug",
48 skip(finalized_state, non_finalized_state, prepared),
49 fields(
50 height = ?prepared.height,
51 hash = %prepared.hash,
52 chains = non_finalized_state.chain_count()
53 )
54)]
55pub(crate) fn validate_and_commit_non_finalized(
56 finalized_state: &ZebraDb,
57 non_finalized_state: &mut NonFinalizedState,
58 prepared: SemanticallyVerifiedBlock,
59) -> Result<(), ValidateContextError> {
60 check::initial_contextual_validity(finalized_state, non_finalized_state, &prepared)?;
61 let parent_hash = prepared.block.header.previous_block_hash;
62
63 if finalized_state.finalized_tip_hash() == parent_hash {
64 non_finalized_state.commit_new_chain(prepared, finalized_state)?;
65 } else {
66 non_finalized_state.commit_block(prepared, finalized_state)?;
67 }
68
69 Ok(())
70}
71
72#[instrument(
87 level = "debug",
88 skip(
89 non_finalized_state,
90 chain_tip_sender,
91 non_finalized_state_sender,
92 last_zebra_mined_log_height,
93 backup_dir_path,
94 ),
95 fields(chains = non_finalized_state.chain_count())
96)]
97fn update_latest_chain_channels(
98 non_finalized_state: &NonFinalizedState,
99 chain_tip_sender: &mut ChainTipSender,
100 non_finalized_state_sender: &watch::Sender<NonFinalizedState>,
101 last_zebra_mined_log_height: &mut Option<Height>,
102 backup_dir_path: Option<&Path>,
103) -> block::Height {
104 let best_chain = non_finalized_state.best_chain().expect("unexpected empty non-finalized state: must commit at least one block before updating channels");
105
106 let tip_block = best_chain
107 .tip_block()
108 .expect("unexpected empty chain: must commit at least one block before updating channels")
109 .clone();
110 let tip_block = ChainTipBlock::from(tip_block);
111
112 log_if_mined_by_zebra(&tip_block, last_zebra_mined_log_height);
113
114 let tip_block_height = tip_block.height;
115
116 if let Some(backup_dir_path) = backup_dir_path {
117 non_finalized_state.write_to_backup(backup_dir_path);
118 }
119
120 let _ = non_finalized_state_sender.send(non_finalized_state.clone());
122
123 chain_tip_sender.set_best_non_finalized_tip(tip_block);
124
125 tip_block_height
126}
127
128struct WriteBlockWorkerTask {
131 finalized_block_write_receiver: UnboundedReceiver<QueuedCheckpointVerified>,
132 non_finalized_block_write_receiver: UnboundedReceiver<NonFinalizedWriteMessage>,
133 finalized_state: FinalizedState,
134 non_finalized_state: NonFinalizedState,
135 invalid_block_reset_sender: UnboundedSender<block::Hash>,
136 chain_tip_sender: ChainTipSender,
137 non_finalized_state_sender: watch::Sender<NonFinalizedState>,
138 backup_dir_path: Option<PathBuf>,
141}
142
143pub enum NonFinalizedWriteMessage {
145 Commit(QueuedSemanticallyVerified),
148 Invalidate {
151 hash: block::Hash,
152 rsp_tx: oneshot::Sender<Result<block::Hash, InvalidateError>>,
153 },
154 Reconsider {
157 hash: block::Hash,
158 rsp_tx: oneshot::Sender<Result<Vec<block::Hash>, ReconsiderError>>,
159 },
160}
161
162impl From<QueuedSemanticallyVerified> for NonFinalizedWriteMessage {
163 fn from(block: QueuedSemanticallyVerified) -> Self {
164 NonFinalizedWriteMessage::Commit(block)
165 }
166}
167
168#[derive(Clone, Debug)]
172pub(super) struct BlockWriteSender {
173 pub non_finalized: Option<tokio::sync::mpsc::UnboundedSender<NonFinalizedWriteMessage>>,
176
177 pub finalized: Option<tokio::sync::mpsc::UnboundedSender<QueuedCheckpointVerified>>,
183}
184
185impl BlockWriteSender {
186 #[instrument(
188 level = "debug",
189 skip_all,
190 fields(
191 network = %non_finalized_state.network
192 )
193 )]
194 pub fn spawn(
195 finalized_state: FinalizedState,
196 non_finalized_state: NonFinalizedState,
197 chain_tip_sender: ChainTipSender,
198 non_finalized_state_sender: watch::Sender<NonFinalizedState>,
199 should_use_finalized_block_write_sender: bool,
200 backup_dir_path: Option<PathBuf>,
201 ) -> (
202 Self,
203 tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
204 Option<Arc<std::thread::JoinHandle<()>>>,
205 ) {
206 let (non_finalized_block_write_sender, non_finalized_block_write_receiver) =
209 tokio::sync::mpsc::unbounded_channel();
210 let (finalized_block_write_sender, finalized_block_write_receiver) =
211 tokio::sync::mpsc::unbounded_channel();
212 let (invalid_block_reset_sender, invalid_block_write_reset_receiver) =
213 tokio::sync::mpsc::unbounded_channel();
214
215 let span = Span::current();
216 let task = std::thread::spawn(move || {
217 span.in_scope(|| {
218 WriteBlockWorkerTask {
219 finalized_block_write_receiver,
220 non_finalized_block_write_receiver,
221 finalized_state,
222 non_finalized_state,
223 invalid_block_reset_sender,
224 chain_tip_sender,
225 non_finalized_state_sender,
226 backup_dir_path,
227 }
228 .run()
229 })
230 });
231
232 (
233 Self {
234 non_finalized: Some(non_finalized_block_write_sender),
235 finalized: Some(finalized_block_write_sender)
236 .filter(|_| should_use_finalized_block_write_sender),
237 },
238 invalid_block_write_reset_receiver,
239 Some(Arc::new(task)),
240 )
241 }
242}
243
244impl WriteBlockWorkerTask {
245 #[instrument(
249 level = "debug",
250 skip(self),
251 fields(
252 network = %self.non_finalized_state.network
253 )
254 )]
255 pub fn run(mut self) {
256 let Self {
257 finalized_block_write_receiver,
258 non_finalized_block_write_receiver,
259 finalized_state,
260 non_finalized_state,
261 invalid_block_reset_sender,
262 chain_tip_sender,
263 non_finalized_state_sender,
264 backup_dir_path,
265 } = &mut self;
266
267 let mut last_zebra_mined_log_height = None;
268 let mut prev_finalized_note_commitment_trees = None;
269
270 while let Some(ordered_block) = finalized_block_write_receiver.blocking_recv() {
273 if invalid_block_reset_sender.is_closed() {
276 info!("StateService closed the block reset channel. Is Zebra shutting down?");
277 return;
278 }
279
280 let next_valid_height = finalized_state
287 .db
288 .finalized_tip_height()
289 .map(|height| (height + 1).expect("committed heights are valid"))
290 .unwrap_or(Height(0));
291
292 if ordered_block.0.height != next_valid_height {
293 debug!(
294 ?next_valid_height,
295 invalid_height = ?ordered_block.0.height,
296 invalid_hash = ?ordered_block.0.hash,
297 "got a block that was the wrong height. \
298 Assuming a parent block failed, and dropping this block",
299 );
300
301 std::mem::drop(ordered_block);
303 continue;
304 }
305
306 match finalized_state
308 .commit_finalized(ordered_block, prev_finalized_note_commitment_trees.take())
309 {
310 Ok((finalized, note_commitment_trees)) => {
311 let tip_block = ChainTipBlock::from(finalized);
312 prev_finalized_note_commitment_trees = Some(note_commitment_trees);
313
314 log_if_mined_by_zebra(&tip_block, &mut last_zebra_mined_log_height);
315
316 chain_tip_sender.set_finalized_tip(tip_block);
317 }
318 Err(error) => {
319 let finalized_tip = finalized_state.db.tip();
320
321 info!(
325 ?error,
326 last_valid_height = ?finalized_tip.map(|tip| tip.0),
327 last_valid_hash = ?finalized_tip.map(|tip| tip.1),
328 "committing a block to the finalized state failed, resetting state queue",
329 );
330
331 let send_result =
332 invalid_block_reset_sender.send(finalized_state.db.finalized_tip_hash());
333
334 if send_result.is_err() {
335 info!(
336 "StateService closed the block reset channel. Is Zebra shutting down?"
337 );
338 return;
339 }
340 }
341 }
342 }
343
344 if invalid_block_reset_sender.is_closed() {
347 info!("StateService closed the block reset channel. Is Zebra shutting down?");
348 return;
349 }
350
351 let mut parent_error_map: IndexMap<block::Hash, ValidateContextError> = IndexMap::new();
353
354 while let Some(msg) = non_finalized_block_write_receiver.blocking_recv() {
355 let queued_child_and_rsp_tx = match msg {
356 NonFinalizedWriteMessage::Commit(queued_child) => Some(queued_child),
357 NonFinalizedWriteMessage::Invalidate { hash, rsp_tx } => {
358 tracing::info!(?hash, "invalidating a block in the non-finalized state");
359 let _ = rsp_tx.send(non_finalized_state.invalidate_block(hash));
360 None
361 }
362 NonFinalizedWriteMessage::Reconsider { hash, rsp_tx } => {
363 tracing::info!(?hash, "reconsidering a block in the non-finalized state");
364 let _ = rsp_tx
365 .send(non_finalized_state.reconsider_block(hash, &finalized_state.db));
366 None
367 }
368 };
369
370 let Some((queued_child, rsp_tx)) = queued_child_and_rsp_tx else {
371 update_latest_chain_channels(
372 non_finalized_state,
373 chain_tip_sender,
374 non_finalized_state_sender,
375 &mut last_zebra_mined_log_height,
376 backup_dir_path.as_deref(),
377 );
378 continue;
379 };
380
381 let child_hash = queued_child.hash;
382 let parent_hash = queued_child.block.header.previous_block_hash;
383 let parent_error = parent_error_map.get(&parent_hash);
384
385 let result = if let Some(parent_error) = parent_error {
391 Err(parent_error.clone())
392 } else {
393 tracing::trace!(?child_hash, "validating queued child");
394 validate_and_commit_non_finalized(
395 &finalized_state.db,
396 non_finalized_state,
397 queued_child,
398 )
399 };
400
401 if let Err(ref error) = result {
406 parent_error_map.insert(child_hash, error.clone());
408
409 if parent_error_map.len() > PARENT_ERROR_MAP_LIMIT {
411 parent_error_map.shift_remove_index(0);
413 }
414
415 let _ = rsp_tx.send(result.map(|()| child_hash).map_err(Into::into));
417
418 continue;
420 }
421
422 let tip_block_height = update_latest_chain_channels(
429 non_finalized_state,
430 chain_tip_sender,
431 non_finalized_state_sender,
432 &mut last_zebra_mined_log_height,
433 backup_dir_path.as_deref(),
434 );
435
436 let _ = rsp_tx.send(result.map(|()| child_hash).map_err(Into::into));
438
439 while non_finalized_state
440 .best_chain_len()
441 .expect("just successfully inserted a non-finalized block above")
442 > MAX_BLOCK_REORG_HEIGHT
443 {
444 tracing::trace!("finalizing block past the reorg limit");
445 let contextually_verified_with_trees = non_finalized_state.finalize();
446 prev_finalized_note_commitment_trees = finalized_state
447 .commit_finalized_direct(contextually_verified_with_trees, prev_finalized_note_commitment_trees.take(), "commit contextually-verified request")
448 .expect(
449 "unexpected finalized block commit error: note commitment and history trees were already checked by the non-finalized state",
450 ).1.into();
451 }
452
453 metrics::counter!("state.full_verifier.committed.block.count").increment(1);
457 metrics::counter!("zcash.chain.verified.block.total").increment(1);
458
459 metrics::gauge!("state.full_verifier.committed.block.height")
460 .set(tip_block_height.0 as f64);
461
462 metrics::gauge!("zcash.chain.verified.block.height").set(tip_block_height.0 as f64);
466
467 tracing::trace!("finished processing queued block");
468 }
469
470 finalized_state.db.shutdown(true);
473 std::mem::drop(self.finalized_state);
474 }
475}
476
477fn log_if_mined_by_zebra(
482 tip_block: &ChainTipBlock,
483 last_zebra_mined_log_height: &mut Option<Height>,
484) {
485 const LOG_RATE_LIMIT: u32 = 1000;
487
488 let height = tip_block.height.0;
489
490 if let Some(last_height) = last_zebra_mined_log_height {
491 if height < last_height.0 + LOG_RATE_LIMIT {
492 return;
494 }
495 };
496
497 let coinbase_data = tip_block.transactions[0].inputs()[0]
499 .extra_coinbase_data()
500 .expect("valid blocks must start with a coinbase input")
501 .clone();
502
503 if coinbase_data
504 .as_ref()
505 .starts_with(EXTRA_ZEBRA_COINBASE_DATA.as_bytes())
506 {
507 let text = String::from_utf8_lossy(coinbase_data.as_ref());
508
509 *last_zebra_mined_log_height = Some(Height(height));
510
511 if coinbase_data.as_ref() == EXTRA_ZEBRA_COINBASE_DATA.as_bytes() {
513 info!(
514 %text,
515 %height,
516 hash = %tip_block.hash,
517 "looks like this block was mined by Zebra!"
518 );
519 } else {
520 let text = text.replace(
526 |c: char| {
527 !EXTRA_ZEBRA_COINBASE_DATA
528 .to_ascii_lowercase()
529 .contains(c.to_ascii_lowercase())
530 },
531 "?",
532 );
533 let data = hex::encode(coinbase_data.as_ref());
534
535 info!(
536 %text,
537 %data,
538 %height,
539 hash = %tip_block.hash,
540 "looks like this block was mined by Zebra!"
541 );
542 }
543 }
544}