Skip to main content

zebra_state/service/
write.rs

1//! Writing blocks to the finalized and non-finalized states.
2
3use std::sync::Arc;
4
5use indexmap::IndexMap;
6use tokio::sync::{
7    mpsc::{UnboundedReceiver, UnboundedSender},
8    oneshot, watch,
9};
10
11use tracing::Span;
12use zebra_chain::{
13    block::{self, Height},
14    transparent::EXTRA_ZEBRA_COINBASE_DATA,
15};
16
17use crate::{
18    constants::MAX_BLOCK_REORG_HEIGHT,
19    service::{
20        check,
21        finalized_state::{FinalizedState, ZebraDb},
22        non_finalized_state::NonFinalizedState,
23        queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified},
24        ChainTipBlock, ChainTipSender, InvalidateError, ReconsiderError,
25    },
26    SemanticallyVerifiedBlock, ValidateContextError,
27};
28
29// These types are used in doc links
30#[allow(unused_imports)]
31use crate::service::{
32    chain_tip::{ChainTipChange, LatestChainTip},
33    non_finalized_state::Chain,
34};
35
36/// The maximum size of the parent error map.
37///
38/// We allow enough space for multiple concurrent chain forks with errors.
39const PARENT_ERROR_MAP_LIMIT: usize = MAX_BLOCK_REORG_HEIGHT as usize * 2;
40
41/// Run contextual validation on the prepared block and add it to the
42/// non-finalized state if it is contextually valid.
43#[tracing::instrument(
44    level = "debug",
45    skip(finalized_state, non_finalized_state, prepared),
46    fields(
47        height = ?prepared.height,
48        hash = %prepared.hash,
49        chains = non_finalized_state.chain_count()
50    )
51)]
52pub(crate) fn validate_and_commit_non_finalized(
53    finalized_state: &ZebraDb,
54    non_finalized_state: &mut NonFinalizedState,
55    prepared: SemanticallyVerifiedBlock,
56) -> Result<(), ValidateContextError> {
57    check::initial_contextual_validity(finalized_state, non_finalized_state, &prepared)?;
58    let parent_hash = prepared.block.header.previous_block_hash;
59
60    if finalized_state.finalized_tip_hash() == parent_hash {
61        non_finalized_state.commit_new_chain(prepared, finalized_state)?;
62    } else {
63        non_finalized_state.commit_block(prepared, finalized_state)?;
64    }
65
66    Ok(())
67}
68
69/// Update the [`LatestChainTip`], [`ChainTipChange`], and `non_finalized_state_sender`
70/// channels with the latest non-finalized [`ChainTipBlock`] and
71/// [`Chain`].
72///
73/// `last_zebra_mined_log_height` is used to rate-limit logging.
74///
75/// Returns the latest non-finalized chain tip height.
76///
77/// # Panics
78///
79/// If the `non_finalized_state` is empty.
80#[instrument(
81    level = "debug",
82    skip(
83        non_finalized_state,
84        chain_tip_sender,
85        non_finalized_state_sender,
86        last_zebra_mined_log_height
87    ),
88    fields(chains = non_finalized_state.chain_count())
89)]
90fn update_latest_chain_channels(
91    non_finalized_state: &NonFinalizedState,
92    chain_tip_sender: &mut ChainTipSender,
93    non_finalized_state_sender: &watch::Sender<NonFinalizedState>,
94    last_zebra_mined_log_height: &mut Option<Height>,
95) -> block::Height {
96    let best_chain = non_finalized_state.best_chain().expect("unexpected empty non-finalized state: must commit at least one block before updating channels");
97
98    let tip_block = best_chain
99        .tip_block()
100        .expect("unexpected empty chain: must commit at least one block before updating channels")
101        .clone();
102    let tip_block = ChainTipBlock::from(tip_block);
103
104    log_if_mined_by_zebra(&tip_block, last_zebra_mined_log_height);
105
106    let tip_block_height = tip_block.height;
107
108    // If the final receiver was just dropped, ignore the error.
109    let _ = non_finalized_state_sender.send(non_finalized_state.clone());
110
111    chain_tip_sender.set_best_non_finalized_tip(tip_block);
112
113    tip_block_height
114}
115
116/// A worker task that reads, validates, and writes blocks to the
117/// `finalized_state` or `non_finalized_state`.
118struct WriteBlockWorkerTask {
119    finalized_block_write_receiver: UnboundedReceiver<QueuedCheckpointVerified>,
120    non_finalized_block_write_receiver: UnboundedReceiver<NonFinalizedWriteMessage>,
121    finalized_state: FinalizedState,
122    non_finalized_state: NonFinalizedState,
123    invalid_block_reset_sender: UnboundedSender<block::Hash>,
124    chain_tip_sender: ChainTipSender,
125    non_finalized_state_sender: watch::Sender<NonFinalizedState>,
126}
127
128/// The message type for the non-finalized block write task channel.
129pub enum NonFinalizedWriteMessage {
130    /// A newly downloaded and semantically verified block prepared for
131    /// contextual validation and insertion into the non-finalized state.
132    Commit(QueuedSemanticallyVerified),
133    /// The hash of a block that should be invalidated and removed from
134    /// the non-finalized state, if present.
135    Invalidate {
136        hash: block::Hash,
137        rsp_tx: oneshot::Sender<Result<block::Hash, InvalidateError>>,
138    },
139    /// The hash of a block that was previously invalidated but should be
140    /// reconsidered and reinserted into the non-finalized state.
141    Reconsider {
142        hash: block::Hash,
143        rsp_tx: oneshot::Sender<Result<Vec<block::Hash>, ReconsiderError>>,
144    },
145}
146
147impl From<QueuedSemanticallyVerified> for NonFinalizedWriteMessage {
148    fn from(block: QueuedSemanticallyVerified) -> Self {
149        NonFinalizedWriteMessage::Commit(block)
150    }
151}
152
153/// A worker with a task that reads, validates, and writes blocks to the
154/// `finalized_state` or `non_finalized_state` and channels for sending
155/// it blocks.
156#[derive(Clone, Debug)]
157pub(super) struct BlockWriteSender {
158    /// A channel to send blocks to the `block_write_task`,
159    /// so they can be written to the [`NonFinalizedState`].
160    pub non_finalized: Option<tokio::sync::mpsc::UnboundedSender<NonFinalizedWriteMessage>>,
161
162    /// A channel to send blocks to the `block_write_task`,
163    /// so they can be written to the [`FinalizedState`].
164    ///
165    /// This sender is dropped after the state has finished sending all the checkpointed blocks,
166    /// and the lowest semantically verified block arrives.
167    pub finalized: Option<tokio::sync::mpsc::UnboundedSender<QueuedCheckpointVerified>>,
168}
169
170impl BlockWriteSender {
171    /// Creates a new [`BlockWriteSender`] with the given receivers and states.
172    #[instrument(
173        level = "debug",
174        skip_all,
175        fields(
176            network = %non_finalized_state.network
177        )
178    )]
179    pub fn spawn(
180        finalized_state: FinalizedState,
181        non_finalized_state: NonFinalizedState,
182        chain_tip_sender: ChainTipSender,
183        non_finalized_state_sender: watch::Sender<NonFinalizedState>,
184        should_use_finalized_block_write_sender: bool,
185    ) -> (
186        Self,
187        tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
188        Option<Arc<std::thread::JoinHandle<()>>>,
189    ) {
190        // Security: The number of blocks in these channels is limited by
191        //           the syncer and inbound lookahead limits.
192        let (non_finalized_block_write_sender, non_finalized_block_write_receiver) =
193            tokio::sync::mpsc::unbounded_channel();
194        let (finalized_block_write_sender, finalized_block_write_receiver) =
195            tokio::sync::mpsc::unbounded_channel();
196        let (invalid_block_reset_sender, invalid_block_write_reset_receiver) =
197            tokio::sync::mpsc::unbounded_channel();
198
199        let span = Span::current();
200        let task = std::thread::spawn(move || {
201            span.in_scope(|| {
202                WriteBlockWorkerTask {
203                    finalized_block_write_receiver,
204                    non_finalized_block_write_receiver,
205                    finalized_state,
206                    non_finalized_state,
207                    invalid_block_reset_sender,
208                    chain_tip_sender,
209                    non_finalized_state_sender,
210                }
211                .run()
212            })
213        });
214
215        (
216            Self {
217                non_finalized: Some(non_finalized_block_write_sender),
218                finalized: Some(finalized_block_write_sender)
219                    .filter(|_| should_use_finalized_block_write_sender),
220            },
221            invalid_block_write_reset_receiver,
222            Some(Arc::new(task)),
223        )
224    }
225}
226
227impl WriteBlockWorkerTask {
228    /// Reads blocks from the channels, writes them to the `finalized_state` or `non_finalized_state`,
229    /// sends any errors on the `invalid_block_reset_sender`, then updates the `chain_tip_sender` and
230    /// `non_finalized_state_sender`.
231    #[instrument(
232        level = "debug",
233        skip(self),
234        fields(
235            network = %self.non_finalized_state.network
236        )
237    )]
238    pub fn run(mut self) {
239        let Self {
240            finalized_block_write_receiver,
241            non_finalized_block_write_receiver,
242            finalized_state,
243            non_finalized_state,
244            invalid_block_reset_sender,
245            chain_tip_sender,
246            non_finalized_state_sender,
247        } = &mut self;
248
249        let mut last_zebra_mined_log_height = None;
250        let mut prev_finalized_note_commitment_trees = None;
251
252        // Write all the finalized blocks sent by the state,
253        // until the state closes the finalized block channel's sender.
254        while let Some(ordered_block) = finalized_block_write_receiver.blocking_recv() {
255            // TODO: split these checks into separate functions
256
257            if invalid_block_reset_sender.is_closed() {
258                info!("StateService closed the block reset channel. Is Zebra shutting down?");
259                return;
260            }
261
262            // Discard any children of invalid blocks in the channel
263            //
264            // `commit_finalized()` requires blocks in height order.
265            // So if there has been a block commit error,
266            // we need to drop all the descendants of that block,
267            // until we receive a block at the required next height.
268            let next_valid_height = finalized_state
269                .db
270                .finalized_tip_height()
271                .map(|height| (height + 1).expect("committed heights are valid"))
272                .unwrap_or(Height(0));
273
274            if ordered_block.0.height != next_valid_height {
275                debug!(
276                    ?next_valid_height,
277                    invalid_height = ?ordered_block.0.height,
278                    invalid_hash = ?ordered_block.0.hash,
279                    "got a block that was the wrong height. \
280                     Assuming a parent block failed, and dropping this block",
281                );
282
283                // We don't want to send a reset here, because it could overwrite a valid sent hash
284                std::mem::drop(ordered_block);
285                continue;
286            }
287
288            // Try committing the block
289            match finalized_state
290                .commit_finalized(ordered_block, prev_finalized_note_commitment_trees.take())
291            {
292                Ok((finalized, note_commitment_trees)) => {
293                    let tip_block = ChainTipBlock::from(finalized);
294                    prev_finalized_note_commitment_trees = Some(note_commitment_trees);
295
296                    log_if_mined_by_zebra(&tip_block, &mut last_zebra_mined_log_height);
297
298                    chain_tip_sender.set_finalized_tip(tip_block);
299                }
300                Err(error) => {
301                    let finalized_tip = finalized_state.db.tip();
302
303                    // The last block in the queue failed, so we can't commit the next block.
304                    // Instead, we need to reset the state queue,
305                    // and discard any children of the invalid block in the channel.
306                    info!(
307                        ?error,
308                        last_valid_height = ?finalized_tip.map(|tip| tip.0),
309                        last_valid_hash = ?finalized_tip.map(|tip| tip.1),
310                        "committing a block to the finalized state failed, resetting state queue",
311                    );
312
313                    let send_result =
314                        invalid_block_reset_sender.send(finalized_state.db.finalized_tip_hash());
315
316                    if send_result.is_err() {
317                        info!(
318                            "StateService closed the block reset channel. Is Zebra shutting down?"
319                        );
320                        return;
321                    }
322                }
323            }
324        }
325
326        // Do this check even if the channel got closed before any finalized blocks were sent.
327        // This can happen if we're past the finalized tip.
328        if invalid_block_reset_sender.is_closed() {
329            info!("StateService closed the block reset channel. Is Zebra shutting down?");
330            return;
331        }
332
333        // Save any errors to propagate down to queued child blocks
334        let mut parent_error_map: IndexMap<block::Hash, ValidateContextError> = IndexMap::new();
335
336        while let Some(msg) = non_finalized_block_write_receiver.blocking_recv() {
337            let queued_child_and_rsp_tx = match msg {
338                NonFinalizedWriteMessage::Commit(queued_child) => Some(queued_child),
339                NonFinalizedWriteMessage::Invalidate { hash, rsp_tx } => {
340                    tracing::info!(?hash, "invalidating a block in the non-finalized state");
341                    let _ = rsp_tx.send(non_finalized_state.invalidate_block(hash));
342                    None
343                }
344                NonFinalizedWriteMessage::Reconsider { hash, rsp_tx } => {
345                    tracing::info!(?hash, "reconsidering a block in the non-finalized state");
346                    let _ = rsp_tx
347                        .send(non_finalized_state.reconsider_block(hash, &finalized_state.db));
348                    None
349                }
350            };
351
352            let Some((queued_child, rsp_tx)) = queued_child_and_rsp_tx else {
353                update_latest_chain_channels(
354                    non_finalized_state,
355                    chain_tip_sender,
356                    non_finalized_state_sender,
357                    &mut last_zebra_mined_log_height,
358                );
359                continue;
360            };
361
362            let child_hash = queued_child.hash;
363            let parent_hash = queued_child.block.header.previous_block_hash;
364            let parent_error = parent_error_map.get(&parent_hash);
365
366            // If the parent block was marked as rejected, also reject all its children.
367            //
368            // At this point, we know that all the block's descendants
369            // are invalid, because we checked all the consensus rules before
370            // committing the failing ancestor block to the non-finalized state.
371            let result = if let Some(parent_error) = parent_error {
372                Err(parent_error.clone())
373            } else {
374                tracing::trace!(?child_hash, "validating queued child");
375                validate_and_commit_non_finalized(
376                    &finalized_state.db,
377                    non_finalized_state,
378                    queued_child,
379                )
380            };
381
382            // TODO: fix the test timing bugs that require the result to be sent
383            //       after `update_latest_chain_channels()`,
384            //       and send the result on rsp_tx here
385
386            if let Err(ref error) = result {
387                // If the block is invalid, mark any descendant blocks as rejected.
388                parent_error_map.insert(child_hash, error.clone());
389
390                // Make sure the error map doesn't get too big.
391                if parent_error_map.len() > PARENT_ERROR_MAP_LIMIT {
392                    // We only add one hash at a time, so we only need to remove one extra here.
393                    parent_error_map.shift_remove_index(0);
394                }
395
396                // Update the caller with the error.
397                let _ = rsp_tx.send(result.map(|()| child_hash).map_err(Into::into));
398
399                // Skip the things we only need to do for successfully committed blocks
400                continue;
401            }
402
403            // Committing blocks to the finalized state keeps the same chain,
404            // so we can update the chain seen by the rest of the application now.
405            //
406            // TODO: if this causes state request errors due to chain conflicts,
407            //       fix the `service::read` bugs,
408            //       or do the channel update after the finalized state commit
409            let tip_block_height = update_latest_chain_channels(
410                non_finalized_state,
411                chain_tip_sender,
412                non_finalized_state_sender,
413                &mut last_zebra_mined_log_height,
414            );
415
416            // Update the caller with the result.
417            let _ = rsp_tx.send(result.map(|()| child_hash).map_err(Into::into));
418
419            while non_finalized_state
420                .best_chain_len()
421                .expect("just successfully inserted a non-finalized block above")
422                > MAX_BLOCK_REORG_HEIGHT
423            {
424                tracing::trace!("finalizing block past the reorg limit");
425                let contextually_verified_with_trees = non_finalized_state.finalize();
426                prev_finalized_note_commitment_trees = finalized_state
427                            .commit_finalized_direct(contextually_verified_with_trees, prev_finalized_note_commitment_trees.take(), "commit contextually-verified request")
428                            .expect(
429                                "unexpected finalized block commit error: note commitment and history trees were already checked by the non-finalized state",
430                            ).1.into();
431            }
432
433            // Update the metrics if semantic and contextual validation passes
434            //
435            // TODO: split this out into a function?
436            metrics::counter!("state.full_verifier.committed.block.count").increment(1);
437            metrics::counter!("zcash.chain.verified.block.total").increment(1);
438
439            metrics::gauge!("state.full_verifier.committed.block.height")
440                .set(tip_block_height.0 as f64);
441
442            // This height gauge is updated for both fully verified and checkpoint blocks.
443            // These updates can't conflict, because this block write task makes sure that blocks
444            // are committed in order.
445            metrics::gauge!("zcash.chain.verified.block.height").set(tip_block_height.0 as f64);
446
447            tracing::trace!("finished processing queued block");
448        }
449
450        // We're finished receiving non-finalized blocks from the state, and
451        // done writing to the finalized state, so we can force it to shut down.
452        finalized_state.db.shutdown(true);
453        std::mem::drop(self.finalized_state);
454    }
455}
456
457/// Log a message if this block was mined by Zebra.
458///
459/// Does not detect early Zebra blocks, and blocks with custom coinbase transactions.
460/// Rate-limited to every 1000 blocks using `last_zebra_mined_log_height`.
461fn log_if_mined_by_zebra(
462    tip_block: &ChainTipBlock,
463    last_zebra_mined_log_height: &mut Option<Height>,
464) {
465    // This logs at most every 2-3 checkpoints, which seems fine.
466    const LOG_RATE_LIMIT: u32 = 1000;
467
468    let height = tip_block.height.0;
469
470    if let Some(last_height) = last_zebra_mined_log_height {
471        if height < last_height.0 + LOG_RATE_LIMIT {
472            // If we logged in the last 1000 blocks, don't log anything now.
473            return;
474        }
475    };
476
477    // This code is rate-limited, so we can do expensive transformations here.
478    let coinbase_data = tip_block.transactions[0].inputs()[0]
479        .extra_coinbase_data()
480        .expect("valid blocks must start with a coinbase input")
481        .clone();
482
483    if coinbase_data
484        .as_ref()
485        .starts_with(EXTRA_ZEBRA_COINBASE_DATA.as_bytes())
486    {
487        let text = String::from_utf8_lossy(coinbase_data.as_ref());
488
489        *last_zebra_mined_log_height = Some(Height(height));
490
491        // No need for hex-encoded data if it's exactly what we expected.
492        if coinbase_data.as_ref() == EXTRA_ZEBRA_COINBASE_DATA.as_bytes() {
493            info!(
494                %text,
495                %height,
496                hash = %tip_block.hash,
497                "looks like this block was mined by Zebra!"
498            );
499        } else {
500            // # Security
501            //
502            // Use the extra data as an allow-list, replacing unknown characters.
503            // This makes sure control characters and harmful messages don't get logged
504            // to the terminal.
505            let text = text.replace(
506                |c: char| {
507                    !EXTRA_ZEBRA_COINBASE_DATA
508                        .to_ascii_lowercase()
509                        .contains(c.to_ascii_lowercase())
510                },
511                "?",
512            );
513            let data = hex::encode(coinbase_data.as_ref());
514
515            info!(
516                %text,
517                %data,
518                %height,
519                hash = %tip_block.hash,
520                "looks like this block was mined by Zebra!"
521            );
522        }
523    }
524}