Skip to main content

zebra_state/service/
write.rs

1//! Writing blocks to the finalized and non-finalized states.
2
3use std::{
4    path::{Path, PathBuf},
5    sync::Arc,
6};
7
8use indexmap::IndexMap;
9use tokio::sync::{
10    mpsc::{UnboundedReceiver, UnboundedSender},
11    oneshot, watch,
12};
13
14use tracing::Span;
15use zebra_chain::{
16    block::{self, Height},
17    transparent::EXTRA_ZEBRA_COINBASE_DATA,
18};
19
20use crate::{
21    constants::MAX_BLOCK_REORG_HEIGHT,
22    service::{
23        check,
24        finalized_state::{FinalizedState, ZebraDb},
25        non_finalized_state::NonFinalizedState,
26        queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified},
27        ChainTipBlock, ChainTipSender, InvalidateError, ReconsiderError,
28    },
29    SemanticallyVerifiedBlock, ValidateContextError,
30};
31
32// These types are used in doc links
33#[allow(unused_imports)]
34use crate::service::{
35    chain_tip::{ChainTipChange, LatestChainTip},
36    non_finalized_state::Chain,
37};
38
39/// The maximum size of the parent error map.
40///
41/// We allow enough space for multiple concurrent chain forks with errors.
42const PARENT_ERROR_MAP_LIMIT: usize = MAX_BLOCK_REORG_HEIGHT as usize * 2;
43
44/// Run contextual validation on the prepared block and add it to the
45/// non-finalized state if it is contextually valid.
46#[tracing::instrument(
47    level = "debug",
48    skip(finalized_state, non_finalized_state, prepared),
49    fields(
50        height = ?prepared.height,
51        hash = %prepared.hash,
52        chains = non_finalized_state.chain_count()
53    )
54)]
55pub(crate) fn validate_and_commit_non_finalized(
56    finalized_state: &ZebraDb,
57    non_finalized_state: &mut NonFinalizedState,
58    prepared: SemanticallyVerifiedBlock,
59) -> Result<(), ValidateContextError> {
60    check::initial_contextual_validity(finalized_state, non_finalized_state, &prepared)?;
61    let parent_hash = prepared.block.header.previous_block_hash;
62
63    if finalized_state.finalized_tip_hash() == parent_hash {
64        non_finalized_state.commit_new_chain(prepared, finalized_state)?;
65    } else {
66        non_finalized_state.commit_block(prepared, finalized_state)?;
67    }
68
69    Ok(())
70}
71
72/// Update the [`LatestChainTip`], [`ChainTipChange`], and `non_finalized_state_sender`
73/// channels with the latest non-finalized [`ChainTipBlock`] and
74/// [`Chain`].
75///
76/// `last_zebra_mined_log_height` is used to rate-limit logging.
77///
78/// If `backup_dir_path` is `Some`, the non-finalized state is written to the backup
79/// directory before updating the channels.
80///
81/// Returns the latest non-finalized chain tip height.
82///
83/// # Panics
84///
85/// If the `non_finalized_state` is empty.
86#[instrument(
87    level = "debug",
88    skip(
89        non_finalized_state,
90        chain_tip_sender,
91        non_finalized_state_sender,
92        last_zebra_mined_log_height,
93        backup_dir_path,
94    ),
95    fields(chains = non_finalized_state.chain_count())
96)]
97fn update_latest_chain_channels(
98    non_finalized_state: &NonFinalizedState,
99    chain_tip_sender: &mut ChainTipSender,
100    non_finalized_state_sender: &watch::Sender<NonFinalizedState>,
101    last_zebra_mined_log_height: &mut Option<Height>,
102    backup_dir_path: Option<&Path>,
103) -> block::Height {
104    let best_chain = non_finalized_state.best_chain().expect("unexpected empty non-finalized state: must commit at least one block before updating channels");
105
106    let tip_block = best_chain
107        .tip_block()
108        .expect("unexpected empty chain: must commit at least one block before updating channels")
109        .clone();
110    let tip_block = ChainTipBlock::from(tip_block);
111
112    log_if_mined_by_zebra(&tip_block, last_zebra_mined_log_height);
113
114    let tip_block_height = tip_block.height;
115
116    if let Some(backup_dir_path) = backup_dir_path {
117        non_finalized_state.write_to_backup(backup_dir_path);
118    }
119
120    // If the final receiver was just dropped, ignore the error.
121    let _ = non_finalized_state_sender.send(non_finalized_state.clone());
122
123    chain_tip_sender.set_best_non_finalized_tip(tip_block);
124
125    tip_block_height
126}
127
128/// A worker task that reads, validates, and writes blocks to the
129/// `finalized_state` or `non_finalized_state`.
130struct WriteBlockWorkerTask {
131    finalized_block_write_receiver: UnboundedReceiver<QueuedCheckpointVerified>,
132    non_finalized_block_write_receiver: UnboundedReceiver<NonFinalizedWriteMessage>,
133    finalized_state: FinalizedState,
134    non_finalized_state: NonFinalizedState,
135    invalid_block_reset_sender: UnboundedSender<block::Hash>,
136    chain_tip_sender: ChainTipSender,
137    non_finalized_state_sender: watch::Sender<NonFinalizedState>,
138    /// If `Some`, the non-finalized state is written to this backup directory
139    /// synchronously before each channel update, instead of via the async backup task.
140    backup_dir_path: Option<PathBuf>,
141}
142
143/// The message type for the non-finalized block write task channel.
144pub enum NonFinalizedWriteMessage {
145    /// A newly downloaded and semantically verified block prepared for
146    /// contextual validation and insertion into the non-finalized state.
147    Commit(QueuedSemanticallyVerified),
148    /// The hash of a block that should be invalidated and removed from
149    /// the non-finalized state, if present.
150    Invalidate {
151        hash: block::Hash,
152        rsp_tx: oneshot::Sender<Result<block::Hash, InvalidateError>>,
153    },
154    /// The hash of a block that was previously invalidated but should be
155    /// reconsidered and reinserted into the non-finalized state.
156    Reconsider {
157        hash: block::Hash,
158        rsp_tx: oneshot::Sender<Result<Vec<block::Hash>, ReconsiderError>>,
159    },
160}
161
162impl From<QueuedSemanticallyVerified> for NonFinalizedWriteMessage {
163    fn from(block: QueuedSemanticallyVerified) -> Self {
164        NonFinalizedWriteMessage::Commit(block)
165    }
166}
167
168/// A worker with a task that reads, validates, and writes blocks to the
169/// `finalized_state` or `non_finalized_state` and channels for sending
170/// it blocks.
171#[derive(Clone, Debug)]
172pub(super) struct BlockWriteSender {
173    /// A channel to send blocks to the `block_write_task`,
174    /// so they can be written to the [`NonFinalizedState`].
175    pub non_finalized: Option<tokio::sync::mpsc::UnboundedSender<NonFinalizedWriteMessage>>,
176
177    /// A channel to send blocks to the `block_write_task`,
178    /// so they can be written to the [`FinalizedState`].
179    ///
180    /// This sender is dropped after the state has finished sending all the checkpointed blocks,
181    /// and the lowest semantically verified block arrives.
182    pub finalized: Option<tokio::sync::mpsc::UnboundedSender<QueuedCheckpointVerified>>,
183}
184
185impl BlockWriteSender {
186    /// Creates a new [`BlockWriteSender`] with the given receivers and states.
187    #[instrument(
188        level = "debug",
189        skip_all,
190        fields(
191            network = %non_finalized_state.network
192        )
193    )]
194    pub fn spawn(
195        finalized_state: FinalizedState,
196        non_finalized_state: NonFinalizedState,
197        chain_tip_sender: ChainTipSender,
198        non_finalized_state_sender: watch::Sender<NonFinalizedState>,
199        should_use_finalized_block_write_sender: bool,
200        backup_dir_path: Option<PathBuf>,
201    ) -> (
202        Self,
203        tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
204        Option<Arc<std::thread::JoinHandle<()>>>,
205    ) {
206        // Security: The number of blocks in these channels is limited by
207        //           the syncer and inbound lookahead limits.
208        let (non_finalized_block_write_sender, non_finalized_block_write_receiver) =
209            tokio::sync::mpsc::unbounded_channel();
210        let (finalized_block_write_sender, finalized_block_write_receiver) =
211            tokio::sync::mpsc::unbounded_channel();
212        let (invalid_block_reset_sender, invalid_block_write_reset_receiver) =
213            tokio::sync::mpsc::unbounded_channel();
214
215        let span = Span::current();
216        let task = std::thread::spawn(move || {
217            span.in_scope(|| {
218                WriteBlockWorkerTask {
219                    finalized_block_write_receiver,
220                    non_finalized_block_write_receiver,
221                    finalized_state,
222                    non_finalized_state,
223                    invalid_block_reset_sender,
224                    chain_tip_sender,
225                    non_finalized_state_sender,
226                    backup_dir_path,
227                }
228                .run()
229            })
230        });
231
232        (
233            Self {
234                non_finalized: Some(non_finalized_block_write_sender),
235                finalized: Some(finalized_block_write_sender)
236                    .filter(|_| should_use_finalized_block_write_sender),
237            },
238            invalid_block_write_reset_receiver,
239            Some(Arc::new(task)),
240        )
241    }
242}
243
244impl WriteBlockWorkerTask {
245    /// Reads blocks from the channels, writes them to the `finalized_state` or `non_finalized_state`,
246    /// sends any errors on the `invalid_block_reset_sender`, then updates the `chain_tip_sender` and
247    /// `non_finalized_state_sender`.
248    #[instrument(
249        level = "debug",
250        skip(self),
251        fields(
252            network = %self.non_finalized_state.network
253        )
254    )]
255    pub fn run(mut self) {
256        let Self {
257            finalized_block_write_receiver,
258            non_finalized_block_write_receiver,
259            finalized_state,
260            non_finalized_state,
261            invalid_block_reset_sender,
262            chain_tip_sender,
263            non_finalized_state_sender,
264            backup_dir_path,
265        } = &mut self;
266
267        let mut last_zebra_mined_log_height = None;
268        let mut prev_finalized_note_commitment_trees = None;
269
270        // Write all the finalized blocks sent by the state,
271        // until the state closes the finalized block channel's sender.
272        while let Some(ordered_block) = finalized_block_write_receiver.blocking_recv() {
273            // TODO: split these checks into separate functions
274
275            if invalid_block_reset_sender.is_closed() {
276                info!("StateService closed the block reset channel. Is Zebra shutting down?");
277                return;
278            }
279
280            // Discard any children of invalid blocks in the channel
281            //
282            // `commit_finalized()` requires blocks in height order.
283            // So if there has been a block commit error,
284            // we need to drop all the descendants of that block,
285            // until we receive a block at the required next height.
286            let next_valid_height = finalized_state
287                .db
288                .finalized_tip_height()
289                .map(|height| (height + 1).expect("committed heights are valid"))
290                .unwrap_or(Height(0));
291
292            if ordered_block.0.height != next_valid_height {
293                debug!(
294                    ?next_valid_height,
295                    invalid_height = ?ordered_block.0.height,
296                    invalid_hash = ?ordered_block.0.hash,
297                    "got a block that was the wrong height. \
298                     Assuming a parent block failed, and dropping this block",
299                );
300
301                // We don't want to send a reset here, because it could overwrite a valid sent hash
302                std::mem::drop(ordered_block);
303                continue;
304            }
305
306            // Try committing the block
307            match finalized_state
308                .commit_finalized(ordered_block, prev_finalized_note_commitment_trees.take())
309            {
310                Ok((finalized, note_commitment_trees)) => {
311                    let tip_block = ChainTipBlock::from(finalized);
312                    prev_finalized_note_commitment_trees = Some(note_commitment_trees);
313
314                    log_if_mined_by_zebra(&tip_block, &mut last_zebra_mined_log_height);
315
316                    chain_tip_sender.set_finalized_tip(tip_block);
317                }
318                Err(error) => {
319                    let finalized_tip = finalized_state.db.tip();
320
321                    // The last block in the queue failed, so we can't commit the next block.
322                    // Instead, we need to reset the state queue,
323                    // and discard any children of the invalid block in the channel.
324                    info!(
325                        ?error,
326                        last_valid_height = ?finalized_tip.map(|tip| tip.0),
327                        last_valid_hash = ?finalized_tip.map(|tip| tip.1),
328                        "committing a block to the finalized state failed, resetting state queue",
329                    );
330
331                    let send_result =
332                        invalid_block_reset_sender.send(finalized_state.db.finalized_tip_hash());
333
334                    if send_result.is_err() {
335                        info!(
336                            "StateService closed the block reset channel. Is Zebra shutting down?"
337                        );
338                        return;
339                    }
340                }
341            }
342        }
343
344        // Do this check even if the channel got closed before any finalized blocks were sent.
345        // This can happen if we're past the finalized tip.
346        if invalid_block_reset_sender.is_closed() {
347            info!("StateService closed the block reset channel. Is Zebra shutting down?");
348            return;
349        }
350
351        // Save any errors to propagate down to queued child blocks
352        let mut parent_error_map: IndexMap<block::Hash, ValidateContextError> = IndexMap::new();
353
354        while let Some(msg) = non_finalized_block_write_receiver.blocking_recv() {
355            let queued_child_and_rsp_tx = match msg {
356                NonFinalizedWriteMessage::Commit(queued_child) => Some(queued_child),
357                NonFinalizedWriteMessage::Invalidate { hash, rsp_tx } => {
358                    tracing::info!(?hash, "invalidating a block in the non-finalized state");
359                    let _ = rsp_tx.send(non_finalized_state.invalidate_block(hash));
360                    None
361                }
362                NonFinalizedWriteMessage::Reconsider { hash, rsp_tx } => {
363                    tracing::info!(?hash, "reconsidering a block in the non-finalized state");
364                    let _ = rsp_tx
365                        .send(non_finalized_state.reconsider_block(hash, &finalized_state.db));
366                    None
367                }
368            };
369
370            let Some((queued_child, rsp_tx)) = queued_child_and_rsp_tx else {
371                update_latest_chain_channels(
372                    non_finalized_state,
373                    chain_tip_sender,
374                    non_finalized_state_sender,
375                    &mut last_zebra_mined_log_height,
376                    backup_dir_path.as_deref(),
377                );
378                continue;
379            };
380
381            let child_hash = queued_child.hash;
382            let parent_hash = queued_child.block.header.previous_block_hash;
383            let parent_error = parent_error_map.get(&parent_hash);
384
385            // If the parent block was marked as rejected, also reject all its children.
386            //
387            // At this point, we know that all the block's descendants
388            // are invalid, because we checked all the consensus rules before
389            // committing the failing ancestor block to the non-finalized state.
390            let result = if let Some(parent_error) = parent_error {
391                Err(parent_error.clone())
392            } else {
393                tracing::trace!(?child_hash, "validating queued child");
394                validate_and_commit_non_finalized(
395                    &finalized_state.db,
396                    non_finalized_state,
397                    queued_child,
398                )
399            };
400
401            // TODO: fix the test timing bugs that require the result to be sent
402            //       after `update_latest_chain_channels()`,
403            //       and send the result on rsp_tx here
404
405            if let Err(ref error) = result {
406                // If the block is invalid, mark any descendant blocks as rejected.
407                parent_error_map.insert(child_hash, error.clone());
408
409                // Make sure the error map doesn't get too big.
410                if parent_error_map.len() > PARENT_ERROR_MAP_LIMIT {
411                    // We only add one hash at a time, so we only need to remove one extra here.
412                    parent_error_map.shift_remove_index(0);
413                }
414
415                // Update the caller with the error.
416                let _ = rsp_tx.send(result.map(|()| child_hash).map_err(Into::into));
417
418                // Skip the things we only need to do for successfully committed blocks
419                continue;
420            }
421
422            // Committing blocks to the finalized state keeps the same chain,
423            // so we can update the chain seen by the rest of the application now.
424            //
425            // TODO: if this causes state request errors due to chain conflicts,
426            //       fix the `service::read` bugs,
427            //       or do the channel update after the finalized state commit
428            let tip_block_height = update_latest_chain_channels(
429                non_finalized_state,
430                chain_tip_sender,
431                non_finalized_state_sender,
432                &mut last_zebra_mined_log_height,
433                backup_dir_path.as_deref(),
434            );
435
436            // Update the caller with the result.
437            let _ = rsp_tx.send(result.map(|()| child_hash).map_err(Into::into));
438
439            while non_finalized_state
440                .best_chain_len()
441                .expect("just successfully inserted a non-finalized block above")
442                > MAX_BLOCK_REORG_HEIGHT
443            {
444                tracing::trace!("finalizing block past the reorg limit");
445                let contextually_verified_with_trees = non_finalized_state.finalize();
446                prev_finalized_note_commitment_trees = finalized_state
447                            .commit_finalized_direct(contextually_verified_with_trees, prev_finalized_note_commitment_trees.take(), "commit contextually-verified request")
448                            .expect(
449                                "unexpected finalized block commit error: note commitment and history trees were already checked by the non-finalized state",
450                            ).1.into();
451            }
452
453            // Update the metrics if semantic and contextual validation passes
454            //
455            // TODO: split this out into a function?
456            metrics::counter!("state.full_verifier.committed.block.count").increment(1);
457            metrics::counter!("zcash.chain.verified.block.total").increment(1);
458
459            metrics::gauge!("state.full_verifier.committed.block.height")
460                .set(tip_block_height.0 as f64);
461
462            // This height gauge is updated for both fully verified and checkpoint blocks.
463            // These updates can't conflict, because this block write task makes sure that blocks
464            // are committed in order.
465            metrics::gauge!("zcash.chain.verified.block.height").set(tip_block_height.0 as f64);
466
467            tracing::trace!("finished processing queued block");
468        }
469
470        // We're finished receiving non-finalized blocks from the state, and
471        // done writing to the finalized state, so we can force it to shut down.
472        finalized_state.db.shutdown(true);
473        std::mem::drop(self.finalized_state);
474    }
475}
476
477/// Log a message if this block was mined by Zebra.
478///
479/// Does not detect early Zebra blocks, and blocks with custom coinbase transactions.
480/// Rate-limited to every 1000 blocks using `last_zebra_mined_log_height`.
481fn log_if_mined_by_zebra(
482    tip_block: &ChainTipBlock,
483    last_zebra_mined_log_height: &mut Option<Height>,
484) {
485    // This logs at most every 2-3 checkpoints, which seems fine.
486    const LOG_RATE_LIMIT: u32 = 1000;
487
488    let height = tip_block.height.0;
489
490    if let Some(last_height) = last_zebra_mined_log_height {
491        if height < last_height.0 + LOG_RATE_LIMIT {
492            // If we logged in the last 1000 blocks, don't log anything now.
493            return;
494        }
495    };
496
497    // This code is rate-limited, so we can do expensive transformations here.
498    let coinbase_data = tip_block.transactions[0].inputs()[0]
499        .extra_coinbase_data()
500        .expect("valid blocks must start with a coinbase input")
501        .clone();
502
503    if coinbase_data
504        .as_ref()
505        .starts_with(EXTRA_ZEBRA_COINBASE_DATA.as_bytes())
506    {
507        let text = String::from_utf8_lossy(coinbase_data.as_ref());
508
509        *last_zebra_mined_log_height = Some(Height(height));
510
511        // No need for hex-encoded data if it's exactly what we expected.
512        if coinbase_data.as_ref() == EXTRA_ZEBRA_COINBASE_DATA.as_bytes() {
513            info!(
514                %text,
515                %height,
516                hash = %tip_block.hash,
517                "looks like this block was mined by Zebra!"
518            );
519        } else {
520            // # Security
521            //
522            // Use the extra data as an allow-list, replacing unknown characters.
523            // This makes sure control characters and harmful messages don't get logged
524            // to the terminal.
525            let text = text.replace(
526                |c: char| {
527                    !EXTRA_ZEBRA_COINBASE_DATA
528                        .to_ascii_lowercase()
529                        .contains(c.to_ascii_lowercase())
530                },
531                "?",
532            );
533            let data = hex::encode(coinbase_data.as_ref());
534
535            info!(
536                %text,
537                %data,
538                %height,
539                hash = %tip_block.hash,
540                "looks like this block was mined by Zebra!"
541            );
542        }
543    }
544}