Skip to main content

zebra_state/service/non_finalized_state/
backup.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    fs::DirEntry,
4    io::{self, ErrorKind},
5    path::{Path, PathBuf},
6    sync::Arc,
7    time::Duration,
8};
9
10use hex::ToHex;
11use zebra_chain::{
12    amount::{Amount, DeferredPoolBalanceChange},
13    block::{self, Block, Height},
14    serialization::{ZcashDeserializeInto, ZcashSerialize},
15};
16
17use crate::{
18    ContextuallyVerifiedBlock, IntoDisk, NonFinalizedState, SemanticallyVerifiedBlock,
19    WatchReceiver, ZebraDb,
20};
21
22#[cfg(not(test))]
23use crate::service::write::validate_and_commit_non_finalized;
24
25/// The minimum duration that Zebra will wait between updates to the non-finalized state backup cache.
26pub(crate) const MIN_DURATION_BETWEEN_BACKUP_UPDATES: Duration = Duration::from_secs(5);
27
28/// Accepts an optional path to the non-finalized state backup directory and a handle to the database.
29///
30/// Looks for blocks above the finalized tip height in the backup directory (if a path was provided) and
31/// attempts to commit them to the non-finalized state.
32///
33/// Returns the resulting non-finalized state.
34pub(super) fn restore_backup(
35    mut non_finalized_state: NonFinalizedState,
36    backup_dir_path: &Path,
37    finalized_state: &ZebraDb,
38) -> NonFinalizedState {
39    let mut store: BTreeMap<Height, Vec<SemanticallyVerifiedBlock>> = BTreeMap::new();
40
41    for block in read_non_finalized_blocks_from_backup(backup_dir_path, finalized_state) {
42        store.entry(block.height).or_default().push(block);
43    }
44
45    for (height, blocks) in store {
46        for block in blocks {
47            #[cfg(test)]
48            let commit_result = if non_finalized_state
49                .any_chain_contains(&block.block.header.previous_block_hash)
50            {
51                non_finalized_state.commit_block(block, finalized_state)
52            } else {
53                non_finalized_state.commit_new_chain(block, finalized_state)
54            };
55
56            #[cfg(not(test))]
57            let commit_result =
58                validate_and_commit_non_finalized(finalized_state, &mut non_finalized_state, block);
59
60            // Re-computes the block hash in case the hash from the filename is wrong.
61            if let Err(commit_error) = commit_result {
62                tracing::warn!(
63                    ?commit_error,
64                    ?height,
65                    "failed to commit non-finalized block from backup directory"
66                );
67            }
68        }
69    }
70
71    non_finalized_state
72}
73
74/// Updates the non-finalized state backup cache by writing any blocks that are in the
75/// non-finalized state but missing in the backup cache, and deleting any backup files
76/// that are no longer present in the non-finalized state.
77///
78/// `backup_blocks` should be the current contents of the backup directory, obtained by
79/// calling [`list_backup_dir_entries`] before the non-finalized state was updated.
80///
81/// This function performs blocking I/O and should be called from a blocking context,
82/// or wrapped in [`tokio::task::spawn_blocking`].
83pub(super) fn update_non_finalized_state_backup(
84    backup_dir_path: &Path,
85    non_finalized_state: &NonFinalizedState,
86    mut backup_blocks: HashMap<block::Hash, PathBuf>,
87) {
88    for block in non_finalized_state
89        .chain_iter()
90        .flat_map(|chain| chain.blocks.values())
91        // Remove blocks from `backup_blocks` that are present in the non-finalized state
92        .filter(|block| backup_blocks.remove(&block.hash).is_none())
93    {
94        // This loop will typically iterate only once, but may write multiple blocks if it misses
95        // some non-finalized state changes while waiting for I/O ops.
96        write_backup_block(backup_dir_path, block);
97    }
98
99    // Remove any backup blocks that are not present in the non-finalized state
100    for (_, outdated_backup_block_path) in backup_blocks {
101        if let Err(delete_error) = std::fs::remove_file(outdated_backup_block_path) {
102            tracing::warn!(?delete_error, "failed to delete backup block file");
103        }
104    }
105}
106
107/// Updates the non-finalized state backup cache whenever the non-finalized state changes,
108/// deleting any outdated backup files and writing any blocks that are in the non-finalized
109/// state but missing in the backup cache.
110pub(super) async fn run_backup_task(
111    mut non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
112    backup_dir_path: PathBuf,
113) {
114    let err = loop {
115        let rate_limit = tokio::time::sleep(MIN_DURATION_BETWEEN_BACKUP_UPDATES);
116        let backup_blocks: HashMap<block::Hash, PathBuf> = {
117            let backup_dir_path = backup_dir_path.clone();
118            tokio::task::spawn_blocking(move || list_backup_dir_entries(&backup_dir_path))
119                .await
120                .expect("failed to join blocking task when reading in backup task")
121                .collect()
122        };
123
124        if let (Err(err), _) = tokio::join!(non_finalized_state_receiver.changed(), rate_limit) {
125            break err;
126        };
127
128        let latest_non_finalized_state = non_finalized_state_receiver.cloned_watch_data();
129
130        let backup_dir_path = backup_dir_path.clone();
131        tokio::task::spawn_blocking(move || {
132            update_non_finalized_state_backup(
133                &backup_dir_path,
134                &latest_non_finalized_state,
135                backup_blocks,
136            );
137        })
138        .await
139        .expect("failed to join blocking task when writing in backup task");
140    };
141
142    tracing::warn!(
143        ?err,
144        "got recv error waiting on non-finalized state change, is Zebra shutting down?"
145    )
146}
147
148#[derive(Clone, Debug, PartialEq, Eq)]
149struct NonFinalizedBlockBackup {
150    block: Arc<Block>,
151    deferred_pool_balance_change: Amount,
152}
153
154impl From<&ContextuallyVerifiedBlock> for NonFinalizedBlockBackup {
155    fn from(cv_block: &ContextuallyVerifiedBlock) -> Self {
156        Self {
157            block: cv_block.block.clone(),
158            deferred_pool_balance_change: cv_block.chain_value_pool_change.deferred_amount(),
159        }
160    }
161}
162
163impl NonFinalizedBlockBackup {
164    /// Encodes a [`NonFinalizedBlockBackup`] as a vector of bytes.
165    fn as_bytes(&self) -> Vec<u8> {
166        let block_bytes = self
167            .block
168            .zcash_serialize_to_vec()
169            .expect("verified block header version should be valid");
170
171        let deferred_pool_balance_change_bytes =
172            self.deferred_pool_balance_change.as_bytes().to_vec();
173
174        [deferred_pool_balance_change_bytes, block_bytes].concat()
175    }
176
177    /// Constructs a new [`NonFinalizedBlockBackup`] from a vector of bytes.
178    #[allow(clippy::unwrap_in_result)]
179    fn from_bytes(bytes: Vec<u8>) -> Result<Self, io::Error> {
180        let (deferred_pool_balance_change_bytes, block_bytes) = bytes
181            .split_at_checked(size_of::<Amount>())
182            .ok_or(io::Error::new(
183                ErrorKind::InvalidInput,
184                "input is too short",
185            ))?;
186
187        Ok(Self {
188            block: Arc::new(
189                block_bytes
190                    .zcash_deserialize_into()
191                    .map_err(|err| io::Error::new(ErrorKind::InvalidData, err))?,
192            ),
193            deferred_pool_balance_change: Amount::from_bytes(
194                deferred_pool_balance_change_bytes
195                    .try_into()
196                    .expect("slice from `split_at_checked()` should fit in [u8; 8]"),
197            )
198            .map_err(|err| io::Error::new(ErrorKind::InvalidData, err))?,
199        })
200    }
201}
202
203/// Writes a block to a file in the provided non-finalized state backup cache directory path.
204fn write_backup_block(backup_dir_path: &Path, block: &ContextuallyVerifiedBlock) {
205    let backup_block_file_name: String = block.hash.encode_hex();
206    let backup_block_file_path = backup_dir_path.join(backup_block_file_name);
207    let non_finalized_block_backup: NonFinalizedBlockBackup = block.into();
208
209    if let Err(err) = std::fs::write(
210        backup_block_file_path,
211        non_finalized_block_backup.as_bytes(),
212    ) {
213        tracing::warn!(?err, "failed to write non-finalized state backup block");
214    }
215}
216
217/// Reads blocks from the provided non-finalized state backup directory path.
218///
219/// Returns any blocks that are valid and not present in the finalized state.
220fn read_non_finalized_blocks_from_backup<'a>(
221    backup_dir_path: &Path,
222    finalized_state: &'a ZebraDb,
223) -> impl Iterator<Item = SemanticallyVerifiedBlock> + 'a {
224    list_backup_dir_entries(backup_dir_path)
225        // It's okay to leave the file here, the backup task will delete it as long as
226        // the block is not added to the non-finalized state.
227        .filter(|&(block_hash, _)| !finalized_state.contains_hash(block_hash))
228        .filter_map(|(block_hash, file_path)| match std::fs::read(file_path) {
229            Ok(block_bytes) => Some((block_hash, block_bytes)),
230            Err(err) => {
231                tracing::warn!(?err, "failed to open non-finalized state backup block file");
232                None
233            }
234        })
235        .filter_map(|(expected_block_hash, backup_block_file_contents)| {
236            match NonFinalizedBlockBackup::from_bytes(backup_block_file_contents) {
237                Ok(NonFinalizedBlockBackup {
238                    block,
239                    deferred_pool_balance_change,
240                }) if block.coinbase_height().is_some() => {
241                    let block = SemanticallyVerifiedBlock::from(block)
242                        .with_deferred_pool_balance_change(Some(DeferredPoolBalanceChange::new(
243                            deferred_pool_balance_change,
244                        )));
245                    if block.hash != expected_block_hash {
246                        tracing::warn!(
247                            block_hash = ?block.hash,
248                            ?expected_block_hash,
249                            "wrong block hash in file name"
250                        );
251                    }
252                    Some(block)
253                }
254                Ok(block) => {
255                    tracing::warn!(
256                        ?block,
257                        "invalid non-finalized backup block, missing coinbase height"
258                    );
259                    None
260                }
261                Err(err) => {
262                    tracing::warn!(
263                        ?err,
264                        "failed to deserialize non-finalized backup data into block"
265                    );
266                    None
267                }
268            }
269        })
270}
271
272/// Accepts a backup directory path, opens the directory, converts its entries
273/// filenames to block hashes, and deletes any entries with invalid file names.
274///
275/// # Panics
276///
277/// If the provided path cannot be opened as a directory.
278/// See [`read_backup_dir`] for more details.
279pub(super) fn list_backup_dir_entries(
280    backup_dir_path: &Path,
281) -> impl Iterator<Item = (block::Hash, PathBuf)> {
282    read_backup_dir(backup_dir_path).filter_map(process_backup_dir_entry)
283}
284
285/// Accepts a backup directory path and opens the directory.
286///
287/// Returns an iterator over all [`DirEntry`]s in the directory that are successfully read.
288///
289/// # Panics
290///
291/// If the provided path cannot be opened as a directory.
292fn read_backup_dir(backup_dir_path: &Path) -> impl Iterator<Item = DirEntry> {
293    std::fs::read_dir(backup_dir_path)
294        .expect("failed to read non-finalized state backup directory")
295        .filter_map(|entry| match entry {
296            Ok(entry) => Some(entry),
297            Err(io_err) => {
298                tracing::warn!(
299                    ?io_err,
300                    "failed to read DirEntry in non-finalized state backup dir"
301                );
302
303                None
304            }
305        })
306}
307
308/// Accepts a [`DirEntry`] from the non-finalized state backup directory and
309/// parses the filename into a block hash.
310///
311/// Returns the block hash and the file path if successful, or
312/// returns None and deletes the file at the entry path otherwise.
313fn process_backup_dir_entry(entry: DirEntry) -> Option<(block::Hash, PathBuf)> {
314    let delete_file = || {
315        if let Err(delete_error) = std::fs::remove_file(entry.path()) {
316            tracing::warn!(?delete_error, "failed to delete backup block file");
317        }
318    };
319
320    let block_file_name = match entry.file_name().into_string() {
321        Ok(block_hash) => block_hash,
322        Err(err) => {
323            tracing::warn!(
324                ?err,
325                "failed to convert OsString to String, attempting to delete file"
326            );
327
328            delete_file();
329            return None;
330        }
331    };
332
333    let block_hash: block::Hash = match block_file_name.parse() {
334        Ok(block_hash) => block_hash,
335        Err(err) => {
336            tracing::warn!(
337                ?err,
338                "failed to parse hex-encoded block hash from file name, attempting to delete file"
339            );
340
341            delete_file();
342            return None;
343        }
344    };
345
346    Some((block_hash, entry.path()))
347}