Skip to main content

zebra_state/service/
write.rs

1//! Writing blocks to the finalized and non-finalized states.
2
3use std::{
4    path::{Path, PathBuf},
5    sync::Arc,
6};
7
8use indexmap::IndexMap;
9use tokio::sync::{
10    mpsc::{UnboundedReceiver, UnboundedSender},
11    oneshot, watch,
12};
13
14use tracing::Span;
15use zebra_chain::block::{self, Height};
16
17use crate::{
18    constants::MAX_BLOCK_REORG_HEIGHT,
19    service::{
20        check,
21        finalized_state::{FinalizedState, ZebraDb},
22        non_finalized_state::NonFinalizedState,
23        queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified},
24        ChainTipBlock, ChainTipSender, InvalidateError, ReconsiderError,
25    },
26    SemanticallyVerifiedBlock, ValidateContextError,
27};
28
29// These types are used in doc links
30#[allow(unused_imports)]
31use crate::service::{
32    chain_tip::{ChainTipChange, LatestChainTip},
33    non_finalized_state::Chain,
34};
35
36/// The maximum size of the parent error map.
37///
38/// We allow enough space for multiple concurrent chain forks with errors.
39const PARENT_ERROR_MAP_LIMIT: usize = MAX_BLOCK_REORG_HEIGHT as usize * 2;
40
41/// Run contextual validation on the prepared block and add it to the
42/// non-finalized state if it is contextually valid.
43#[tracing::instrument(
44    level = "debug",
45    skip(finalized_state, non_finalized_state, prepared),
46    fields(
47        height = ?prepared.height,
48        hash = %prepared.hash,
49        chains = non_finalized_state.chain_count()
50    )
51)]
52pub(crate) fn validate_and_commit_non_finalized(
53    finalized_state: &ZebraDb,
54    non_finalized_state: &mut NonFinalizedState,
55    prepared: SemanticallyVerifiedBlock,
56) -> Result<(), ValidateContextError> {
57    check::initial_contextual_validity(finalized_state, non_finalized_state, &prepared)?;
58    let parent_hash = prepared.block.header.previous_block_hash;
59
60    if finalized_state.finalized_tip_hash() == parent_hash {
61        non_finalized_state.commit_new_chain(prepared, finalized_state)?;
62    } else {
63        non_finalized_state.commit_block(prepared, finalized_state)?;
64    }
65
66    Ok(())
67}
68
69/// Update the [`LatestChainTip`], [`ChainTipChange`], and `non_finalized_state_sender`
70/// channels with the latest non-finalized [`ChainTipBlock`] and
71/// [`Chain`].
72///
73/// `last_zebra_mined_log_height` is used to rate-limit logging.
74///
75/// If `backup_dir_path` is `Some`, the non-finalized state is written to the backup
76/// directory before updating the channels.
77///
78/// Returns the latest non-finalized chain tip height.
79///
80/// # Panics
81///
82/// If the `non_finalized_state` is empty.
83#[instrument(
84    level = "debug",
85    skip(
86        non_finalized_state,
87        chain_tip_sender,
88        non_finalized_state_sender,
89        backup_dir_path,
90    ),
91    fields(chains = non_finalized_state.chain_count())
92)]
93fn update_latest_chain_channels(
94    non_finalized_state: &NonFinalizedState,
95    chain_tip_sender: &mut ChainTipSender,
96    non_finalized_state_sender: &watch::Sender<NonFinalizedState>,
97    backup_dir_path: Option<&Path>,
98) -> block::Height {
99    let best_chain = non_finalized_state.best_chain().expect("unexpected empty non-finalized state: must commit at least one block before updating channels");
100
101    let tip_block = best_chain
102        .tip_block()
103        .expect("unexpected empty chain: must commit at least one block before updating channels")
104        .clone();
105    let tip_block = ChainTipBlock::from(tip_block);
106
107    let tip_block_height = tip_block.height;
108
109    if let Some(backup_dir_path) = backup_dir_path {
110        non_finalized_state.write_to_backup(backup_dir_path);
111    }
112
113    // If the final receiver was just dropped, ignore the error.
114    let _ = non_finalized_state_sender.send(non_finalized_state.clone());
115
116    chain_tip_sender.set_best_non_finalized_tip(tip_block);
117
118    tip_block_height
119}
120
121/// A worker task that reads, validates, and writes blocks to the
122/// `finalized_state` or `non_finalized_state`.
123struct WriteBlockWorkerTask {
124    finalized_block_write_receiver: UnboundedReceiver<QueuedCheckpointVerified>,
125    non_finalized_block_write_receiver: UnboundedReceiver<NonFinalizedWriteMessage>,
126    finalized_state: FinalizedState,
127    non_finalized_state: NonFinalizedState,
128    invalid_block_reset_sender: UnboundedSender<block::Hash>,
129    /// Signals the [`crate::service::StateService`] that a non-finalized block was rejected by
130    /// the write task, so its hash should be removed from
131    /// `non_finalized_block_write_sent_hashes`.
132    ///
133    /// Without this, a rejected same-hash block locks out a later honest
134    /// re-delivery of a block at the same hash as a "duplicate" until restart
135    /// or reorg.
136    non_finalized_rejected_sender: UnboundedSender<block::Hash>,
137    chain_tip_sender: ChainTipSender,
138    non_finalized_state_sender: watch::Sender<NonFinalizedState>,
139    /// If `Some`, the non-finalized state is written to this backup directory
140    /// synchronously before each channel update, instead of via the async backup task.
141    backup_dir_path: Option<PathBuf>,
142}
143
144/// The message type for the non-finalized block write task channel.
145pub enum NonFinalizedWriteMessage {
146    /// A newly downloaded and semantically verified block prepared for
147    /// contextual validation and insertion into the non-finalized state.
148    Commit(QueuedSemanticallyVerified),
149    /// The hash of a block that should be invalidated and removed from
150    /// the non-finalized state, if present.
151    Invalidate {
152        hash: block::Hash,
153        rsp_tx: oneshot::Sender<Result<block::Hash, InvalidateError>>,
154    },
155    /// The hash of a block that was previously invalidated but should be
156    /// reconsidered and reinserted into the non-finalized state.
157    Reconsider {
158        hash: block::Hash,
159        rsp_tx: oneshot::Sender<Result<Vec<block::Hash>, ReconsiderError>>,
160    },
161}
162
163impl From<QueuedSemanticallyVerified> for NonFinalizedWriteMessage {
164    fn from(block: QueuedSemanticallyVerified) -> Self {
165        NonFinalizedWriteMessage::Commit(block)
166    }
167}
168
169/// A worker with a task that reads, validates, and writes blocks to the
170/// `finalized_state` or `non_finalized_state` and channels for sending
171/// it blocks.
172#[derive(Clone, Debug)]
173pub(super) struct BlockWriteSender {
174    /// A channel to send blocks to the `block_write_task`,
175    /// so they can be written to the [`NonFinalizedState`].
176    pub non_finalized: Option<tokio::sync::mpsc::UnboundedSender<NonFinalizedWriteMessage>>,
177
178    /// A channel to send blocks to the `block_write_task`,
179    /// so they can be written to the [`FinalizedState`].
180    ///
181    /// This sender is dropped after the state has finished sending all the checkpointed blocks,
182    /// and the lowest semantically verified block arrives.
183    pub finalized: Option<tokio::sync::mpsc::UnboundedSender<QueuedCheckpointVerified>>,
184}
185
186impl BlockWriteSender {
187    /// Creates a new [`BlockWriteSender`] with the given receivers and states.
188    #[instrument(
189        level = "debug",
190        skip_all,
191        fields(
192            network = %non_finalized_state.network
193        )
194    )]
195    pub fn spawn(
196        finalized_state: FinalizedState,
197        non_finalized_state: NonFinalizedState,
198        chain_tip_sender: ChainTipSender,
199        non_finalized_state_sender: watch::Sender<NonFinalizedState>,
200        should_use_finalized_block_write_sender: bool,
201        backup_dir_path: Option<PathBuf>,
202    ) -> (
203        Self,
204        tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
205        tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
206        Option<Arc<std::thread::JoinHandle<()>>>,
207    ) {
208        // Security: The number of blocks in these channels is limited by
209        //           the syncer and inbound lookahead limits.
210        let (non_finalized_block_write_sender, non_finalized_block_write_receiver) =
211            tokio::sync::mpsc::unbounded_channel();
212        let (finalized_block_write_sender, finalized_block_write_receiver) =
213            tokio::sync::mpsc::unbounded_channel();
214        let (invalid_block_reset_sender, invalid_block_write_reset_receiver) =
215            tokio::sync::mpsc::unbounded_channel();
216        let (non_finalized_rejected_sender, non_finalized_rejected_receiver) =
217            tokio::sync::mpsc::unbounded_channel();
218
219        let span = Span::current();
220        let task = std::thread::spawn(move || {
221            span.in_scope(|| {
222                WriteBlockWorkerTask {
223                    finalized_block_write_receiver,
224                    non_finalized_block_write_receiver,
225                    finalized_state,
226                    non_finalized_state,
227                    invalid_block_reset_sender,
228                    non_finalized_rejected_sender,
229                    chain_tip_sender,
230                    non_finalized_state_sender,
231                    backup_dir_path,
232                }
233                .run()
234            })
235        });
236
237        (
238            Self {
239                non_finalized: Some(non_finalized_block_write_sender),
240                finalized: should_use_finalized_block_write_sender
241                    .then_some(finalized_block_write_sender),
242            },
243            invalid_block_write_reset_receiver,
244            non_finalized_rejected_receiver,
245            Some(Arc::new(task)),
246        )
247    }
248}
249
250impl WriteBlockWorkerTask {
251    /// Reads blocks from the channels, writes them to the `finalized_state` or `non_finalized_state`,
252    /// sends any errors on the `invalid_block_reset_sender`, then updates the `chain_tip_sender` and
253    /// `non_finalized_state_sender`.
254    #[instrument(
255        level = "debug",
256        skip(self),
257        fields(
258            network = %self.non_finalized_state.network
259        )
260    )]
261    pub fn run(mut self) {
262        let Self {
263            finalized_block_write_receiver,
264            non_finalized_block_write_receiver,
265            finalized_state,
266            non_finalized_state,
267            invalid_block_reset_sender,
268            non_finalized_rejected_sender,
269            chain_tip_sender,
270            non_finalized_state_sender,
271            backup_dir_path,
272        } = &mut self;
273
274        let mut prev_finalized_note_commitment_trees = None;
275
276        // Write all the finalized blocks sent by the state,
277        // until the state closes the finalized block channel's sender.
278        while let Some(ordered_block) = finalized_block_write_receiver.blocking_recv() {
279            // TODO: split these checks into separate functions
280
281            if invalid_block_reset_sender.is_closed() {
282                info!("StateService closed the block reset channel. Is Zebra shutting down?");
283                return;
284            }
285
286            // Discard any children of invalid blocks in the channel
287            //
288            // `commit_finalized()` requires blocks in height order.
289            // So if there has been a block commit error,
290            // we need to drop all the descendants of that block,
291            // until we receive a block at the required next height.
292            let next_valid_height = finalized_state
293                .db
294                .finalized_tip_height()
295                .map(|height| (height + 1).expect("committed heights are valid"))
296                .unwrap_or(Height(0));
297
298            if ordered_block.0.height != next_valid_height {
299                debug!(
300                    ?next_valid_height,
301                    invalid_height = ?ordered_block.0.height,
302                    invalid_hash = ?ordered_block.0.hash,
303                    "got a block that was the wrong height. \
304                     Assuming a parent block failed, and dropping this block",
305                );
306
307                // We don't want to send a reset here, because it could overwrite a valid sent hash
308                std::mem::drop(ordered_block);
309                continue;
310            }
311
312            // Try committing the block
313            match finalized_state
314                .commit_finalized(ordered_block, prev_finalized_note_commitment_trees.take())
315            {
316                Ok((finalized, note_commitment_trees)) => {
317                    let tip_block = ChainTipBlock::from(finalized);
318                    prev_finalized_note_commitment_trees = Some(note_commitment_trees);
319                    chain_tip_sender.set_finalized_tip(tip_block);
320                }
321                Err(error) => {
322                    let finalized_tip = finalized_state.db.tip();
323
324                    // The last block in the queue failed, so we can't commit the next block.
325                    // Instead, we need to reset the state queue,
326                    // and discard any children of the invalid block in the channel.
327                    info!(
328                        ?error,
329                        last_valid_height = ?finalized_tip.map(|tip| tip.0),
330                        last_valid_hash = ?finalized_tip.map(|tip| tip.1),
331                        "committing a block to the finalized state failed, resetting state queue",
332                    );
333
334                    let send_result =
335                        invalid_block_reset_sender.send(finalized_state.db.finalized_tip_hash());
336
337                    if send_result.is_err() {
338                        info!(
339                            "StateService closed the block reset channel. Is Zebra shutting down?"
340                        );
341                        return;
342                    }
343                }
344            }
345        }
346
347        // Do this check even if the channel got closed before any finalized blocks were sent.
348        // This can happen if we're past the finalized tip.
349        if invalid_block_reset_sender.is_closed() {
350            info!("StateService closed the block reset channel. Is Zebra shutting down?");
351            return;
352        }
353
354        // Save any errors to propagate down to queued child blocks
355        let mut parent_error_map: IndexMap<block::Hash, ValidateContextError> = IndexMap::new();
356
357        while let Some(msg) = non_finalized_block_write_receiver.blocking_recv() {
358            let queued_child_and_rsp_tx = match msg {
359                NonFinalizedWriteMessage::Commit(queued_child) => Some(queued_child),
360                NonFinalizedWriteMessage::Invalidate { hash, rsp_tx } => {
361                    tracing::info!(?hash, "invalidating a block in the non-finalized state");
362                    let _ = rsp_tx.send(non_finalized_state.invalidate_block(hash));
363                    None
364                }
365                NonFinalizedWriteMessage::Reconsider { hash, rsp_tx } => {
366                    tracing::info!(?hash, "reconsidering a block in the non-finalized state");
367                    let _ = rsp_tx
368                        .send(non_finalized_state.reconsider_block(hash, &finalized_state.db));
369                    None
370                }
371            };
372
373            let Some((queued_child, rsp_tx)) = queued_child_and_rsp_tx else {
374                update_latest_chain_channels(
375                    non_finalized_state,
376                    chain_tip_sender,
377                    non_finalized_state_sender,
378                    backup_dir_path.as_deref(),
379                );
380                continue;
381            };
382
383            let child_hash = queued_child.hash;
384            let parent_hash = queued_child.block.header.previous_block_hash;
385            let parent_error = parent_error_map.get(&parent_hash);
386
387            // If the parent block was marked as rejected, also reject all its children.
388            //
389            // At this point, we know that all the block's descendants
390            // are invalid, because we checked all the consensus rules before
391            // committing the failing ancestor block to the non-finalized state.
392            let result = if let Some(parent_error) = parent_error {
393                Err(parent_error.clone())
394            } else {
395                tracing::trace!(?child_hash, "validating queued child");
396                validate_and_commit_non_finalized(
397                    &finalized_state.db,
398                    non_finalized_state,
399                    queued_child,
400                )
401            };
402
403            // TODO: fix the test timing bugs that require the result to be sent
404            //       after `update_latest_chain_channels()`,
405            //       and send the result on rsp_tx here
406
407            if let Err(ref error) = result {
408                // If the block is invalid, mark any descendant blocks as rejected.
409                parent_error_map.insert(child_hash, error.clone());
410
411                // Make sure the error map doesn't get too big.
412                if parent_error_map.len() > PARENT_ERROR_MAP_LIMIT {
413                    // We only add one hash at a time, so we only need to remove one extra here.
414                    parent_error_map.shift_remove_index(0);
415                }
416
417                // Signal the StateService to drop this hash from
418                // `non_finalized_block_write_sent_hashes`, so a subsequent
419                // re-delivery of a block at the same hash is not short-circuited
420                // as a "duplicate" against a rejected variant that never reached
421                // any chain.
422                //
423                // If the receiver was dropped (the StateService is shutting
424                // down), ignore the error: the lockout cannot matter once the
425                // service exits.
426                let _ = non_finalized_rejected_sender.send(child_hash);
427
428                // Update the caller with the error.
429                let _ = rsp_tx.send(result.map(|()| child_hash).map_err(Into::into));
430
431                // Skip the things we only need to do for successfully committed blocks
432                continue;
433            }
434
435            // Committing blocks to the finalized state keeps the same chain,
436            // so we can update the chain seen by the rest of the application now.
437            //
438            // TODO: if this causes state request errors due to chain conflicts,
439            //       fix the `service::read` bugs,
440            //       or do the channel update after the finalized state commit
441            let tip_block_height = update_latest_chain_channels(
442                non_finalized_state,
443                chain_tip_sender,
444                non_finalized_state_sender,
445                backup_dir_path.as_deref(),
446            );
447
448            // Update the caller with the result.
449            let _ = rsp_tx.send(result.map(|()| child_hash).map_err(Into::into));
450
451            while non_finalized_state
452                .best_chain_len()
453                .expect("just successfully inserted a non-finalized block above")
454                > MAX_BLOCK_REORG_HEIGHT
455            {
456                tracing::trace!("finalizing block past the reorg limit");
457                let contextually_verified_with_trees = non_finalized_state.finalize();
458                prev_finalized_note_commitment_trees = finalized_state
459                            .commit_finalized_direct(contextually_verified_with_trees, prev_finalized_note_commitment_trees.take(), "commit contextually-verified request")
460                            .expect(
461                                "unexpected finalized block commit error: note commitment and history trees were already checked by the non-finalized state",
462                            ).1.into();
463            }
464
465            // Update the metrics if semantic and contextual validation passes
466            //
467            // TODO: split this out into a function?
468            metrics::counter!("state.full_verifier.committed.block.count").increment(1);
469            metrics::counter!("zcash.chain.verified.block.total").increment(1);
470
471            metrics::gauge!("state.full_verifier.committed.block.height")
472                .set(tip_block_height.0 as f64);
473
474            // This height gauge is updated for both fully verified and checkpoint blocks.
475            // These updates can't conflict, because this block write task makes sure that blocks
476            // are committed in order.
477            metrics::gauge!("zcash.chain.verified.block.height").set(tip_block_height.0 as f64);
478
479            tracing::trace!("finished processing queued block");
480        }
481
482        // We're finished receiving non-finalized blocks from the state, and
483        // done writing to the finalized state, so we can force it to shut down.
484        finalized_state.db.shutdown(true);
485        std::mem::drop(self.finalized_state);
486    }
487}