zebra_state/service/non_finalized_state/
backup.rs1use std::{
2 collections::{BTreeMap, HashMap},
3 fs::DirEntry,
4 io::{self, ErrorKind},
5 path::{Path, PathBuf},
6 sync::Arc,
7 time::Duration,
8};
9
10use hex::ToHex;
11use zebra_chain::{
12 amount::{Amount, DeferredPoolBalanceChange},
13 block::{self, Block, Height},
14 serialization::{ZcashDeserializeInto, ZcashSerialize},
15};
16
17use crate::{
18 ContextuallyVerifiedBlock, IntoDisk, NonFinalizedState, SemanticallyVerifiedBlock,
19 WatchReceiver, ZebraDb,
20};
21
22#[cfg(not(test))]
23use crate::service::write::validate_and_commit_non_finalized;
24
25pub(crate) const MIN_DURATION_BETWEEN_BACKUP_UPDATES: Duration = Duration::from_secs(5);
27
28pub(super) fn restore_backup(
35 mut non_finalized_state: NonFinalizedState,
36 backup_dir_path: &Path,
37 finalized_state: &ZebraDb,
38) -> NonFinalizedState {
39 let mut store: BTreeMap<Height, Vec<SemanticallyVerifiedBlock>> = BTreeMap::new();
40
41 for block in read_non_finalized_blocks_from_backup(backup_dir_path, finalized_state) {
42 store.entry(block.height).or_default().push(block);
43 }
44
45 for (height, blocks) in store {
46 for block in blocks {
47 #[cfg(test)]
48 let commit_result = if non_finalized_state
49 .any_chain_contains(&block.block.header.previous_block_hash)
50 {
51 non_finalized_state.commit_block(block, finalized_state)
52 } else {
53 non_finalized_state.commit_new_chain(block, finalized_state)
54 };
55
56 #[cfg(not(test))]
57 let commit_result =
58 validate_and_commit_non_finalized(finalized_state, &mut non_finalized_state, block);
59
60 if let Err(commit_error) = commit_result {
62 tracing::warn!(
63 ?commit_error,
64 ?height,
65 "failed to commit non-finalized block from backup directory"
66 );
67 }
68 }
69 }
70
71 non_finalized_state
72}
73
74pub(super) fn update_non_finalized_state_backup(
84 backup_dir_path: &Path,
85 non_finalized_state: &NonFinalizedState,
86 mut backup_blocks: HashMap<block::Hash, PathBuf>,
87) {
88 for block in non_finalized_state
89 .chain_iter()
90 .flat_map(|chain| chain.blocks.values())
91 .filter(|block| backup_blocks.remove(&block.hash).is_none())
93 {
94 write_backup_block(backup_dir_path, block);
97 }
98
99 for (_, outdated_backup_block_path) in backup_blocks {
101 if let Err(delete_error) = std::fs::remove_file(outdated_backup_block_path) {
102 tracing::warn!(?delete_error, "failed to delete backup block file");
103 }
104 }
105}
106
107pub(super) async fn run_backup_task(
111 mut non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
112 backup_dir_path: PathBuf,
113) {
114 let err = loop {
115 let rate_limit = tokio::time::sleep(MIN_DURATION_BETWEEN_BACKUP_UPDATES);
116 let backup_blocks: HashMap<block::Hash, PathBuf> = {
117 let backup_dir_path = backup_dir_path.clone();
118 tokio::task::spawn_blocking(move || list_backup_dir_entries(&backup_dir_path))
119 .await
120 .expect("failed to join blocking task when reading in backup task")
121 .collect()
122 };
123
124 if let (Err(err), _) = tokio::join!(non_finalized_state_receiver.changed(), rate_limit) {
125 break err;
126 };
127
128 let latest_non_finalized_state = non_finalized_state_receiver.cloned_watch_data();
129
130 let backup_dir_path = backup_dir_path.clone();
131 tokio::task::spawn_blocking(move || {
132 update_non_finalized_state_backup(
133 &backup_dir_path,
134 &latest_non_finalized_state,
135 backup_blocks,
136 );
137 })
138 .await
139 .expect("failed to join blocking task when writing in backup task");
140 };
141
142 tracing::warn!(
143 ?err,
144 "got recv error waiting on non-finalized state change, is Zebra shutting down?"
145 )
146}
147
148#[derive(Clone, Debug, PartialEq, Eq)]
149struct NonFinalizedBlockBackup {
150 block: Arc<Block>,
151 deferred_pool_balance_change: Amount,
152}
153
154impl From<&ContextuallyVerifiedBlock> for NonFinalizedBlockBackup {
155 fn from(cv_block: &ContextuallyVerifiedBlock) -> Self {
156 Self {
157 block: cv_block.block.clone(),
158 deferred_pool_balance_change: cv_block.chain_value_pool_change.deferred_amount(),
159 }
160 }
161}
162
163impl NonFinalizedBlockBackup {
164 fn as_bytes(&self) -> Vec<u8> {
166 let block_bytes = self
167 .block
168 .zcash_serialize_to_vec()
169 .expect("verified block header version should be valid");
170
171 let deferred_pool_balance_change_bytes =
172 self.deferred_pool_balance_change.as_bytes().to_vec();
173
174 [deferred_pool_balance_change_bytes, block_bytes].concat()
175 }
176
177 #[allow(clippy::unwrap_in_result)]
179 fn from_bytes(bytes: Vec<u8>) -> Result<Self, io::Error> {
180 let (deferred_pool_balance_change_bytes, block_bytes) = bytes
181 .split_at_checked(size_of::<Amount>())
182 .ok_or(io::Error::new(
183 ErrorKind::InvalidInput,
184 "input is too short",
185 ))?;
186
187 Ok(Self {
188 block: Arc::new(
189 block_bytes
190 .zcash_deserialize_into()
191 .map_err(|err| io::Error::new(ErrorKind::InvalidData, err))?,
192 ),
193 deferred_pool_balance_change: Amount::from_bytes(
194 deferred_pool_balance_change_bytes
195 .try_into()
196 .expect("slice from `split_at_checked()` should fit in [u8; 8]"),
197 )
198 .map_err(|err| io::Error::new(ErrorKind::InvalidData, err))?,
199 })
200 }
201}
202
203fn write_backup_block(backup_dir_path: &Path, block: &ContextuallyVerifiedBlock) {
205 let backup_block_file_name: String = block.hash.encode_hex();
206 let backup_block_file_path = backup_dir_path.join(backup_block_file_name);
207 let non_finalized_block_backup: NonFinalizedBlockBackup = block.into();
208
209 if let Err(err) = std::fs::write(
210 backup_block_file_path,
211 non_finalized_block_backup.as_bytes(),
212 ) {
213 tracing::warn!(?err, "failed to write non-finalized state backup block");
214 }
215}
216
217fn read_non_finalized_blocks_from_backup<'a>(
221 backup_dir_path: &Path,
222 finalized_state: &'a ZebraDb,
223) -> impl Iterator<Item = SemanticallyVerifiedBlock> + 'a {
224 list_backup_dir_entries(backup_dir_path)
225 .filter(|&(block_hash, _)| !finalized_state.contains_hash(block_hash))
228 .filter_map(|(block_hash, file_path)| match std::fs::read(file_path) {
229 Ok(block_bytes) => Some((block_hash, block_bytes)),
230 Err(err) => {
231 tracing::warn!(?err, "failed to open non-finalized state backup block file");
232 None
233 }
234 })
235 .filter_map(|(expected_block_hash, backup_block_file_contents)| {
236 match NonFinalizedBlockBackup::from_bytes(backup_block_file_contents) {
237 Ok(NonFinalizedBlockBackup {
238 block,
239 deferred_pool_balance_change,
240 }) if block.coinbase_height().is_some() => {
241 let block = SemanticallyVerifiedBlock::from(block)
242 .with_deferred_pool_balance_change(Some(DeferredPoolBalanceChange::new(
243 deferred_pool_balance_change,
244 )));
245 if block.hash != expected_block_hash {
246 tracing::warn!(
247 block_hash = ?block.hash,
248 ?expected_block_hash,
249 "wrong block hash in file name"
250 );
251 }
252 Some(block)
253 }
254 Ok(block) => {
255 tracing::warn!(
256 ?block,
257 "invalid non-finalized backup block, missing coinbase height"
258 );
259 None
260 }
261 Err(err) => {
262 tracing::warn!(
263 ?err,
264 "failed to deserialize non-finalized backup data into block"
265 );
266 None
267 }
268 }
269 })
270}
271
272pub(super) fn list_backup_dir_entries(
280 backup_dir_path: &Path,
281) -> impl Iterator<Item = (block::Hash, PathBuf)> {
282 read_backup_dir(backup_dir_path).filter_map(process_backup_dir_entry)
283}
284
285fn read_backup_dir(backup_dir_path: &Path) -> impl Iterator<Item = DirEntry> {
293 std::fs::read_dir(backup_dir_path)
294 .expect("failed to read non-finalized state backup directory")
295 .filter_map(|entry| match entry {
296 Ok(entry) => Some(entry),
297 Err(io_err) => {
298 tracing::warn!(
299 ?io_err,
300 "failed to read DirEntry in non-finalized state backup dir"
301 );
302
303 None
304 }
305 })
306}
307
308fn process_backup_dir_entry(entry: DirEntry) -> Option<(block::Hash, PathBuf)> {
314 let delete_file = || {
315 if let Err(delete_error) = std::fs::remove_file(entry.path()) {
316 tracing::warn!(?delete_error, "failed to delete backup block file");
317 }
318 };
319
320 let block_file_name = match entry.file_name().into_string() {
321 Ok(block_hash) => block_hash,
322 Err(err) => {
323 tracing::warn!(
324 ?err,
325 "failed to convert OsString to String, attempting to delete file"
326 );
327
328 delete_file();
329 return None;
330 }
331 };
332
333 let block_hash: block::Hash = match block_file_name.parse() {
334 Ok(block_hash) => block_hash,
335 Err(err) => {
336 tracing::warn!(
337 ?err,
338 "failed to parse hex-encoded block hash from file name, attempting to delete file"
339 );
340
341 delete_file();
342 return None;
343 }
344 };
345
346 Some((block_hash, entry.path()))
347}