zebra_state/service/finalized_state/disk_format/upgrade.rs
1//! In-place format upgrades and format validity checks for the Zebra state database.
2
3use std::{
4 cmp::Ordering,
5 sync::Arc,
6 thread::{self, JoinHandle},
7};
8
9use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender};
10use semver::Version;
11use tracing::Span;
12
13use zebra_chain::{
14 block::Height,
15 diagnostic::{
16 task::{CheckForPanics, WaitForPanics},
17 CodeTimer,
18 },
19};
20
21use DbFormatChange::*;
22
23use crate::service::finalized_state::ZebraDb;
24
25pub(crate) mod add_subtrees;
26pub(crate) mod block_info_and_address_received;
27pub(crate) mod cache_genesis_roots;
28pub(crate) mod fix_tree_key_type;
29pub(crate) mod no_migration;
30pub(crate) mod prune_trees;
31pub(crate) mod tree_keys_and_caches_upgrade;
32
33#[cfg(not(feature = "indexer"))]
34pub(crate) mod drop_tx_locs_by_spends;
35
36#[cfg(feature = "indexer")]
37pub(crate) mod track_tx_locs_by_spends;
38
39/// Defines method signature for running disk format upgrades.
40pub trait DiskFormatUpgrade {
41 /// Returns the version at which this upgrade is applied.
42 fn version(&self) -> Version;
43
44 /// Returns the description of this upgrade.
45 fn description(&self) -> &'static str;
46
47 /// Runs disk format upgrade.
48 fn run(
49 &self,
50 initial_tip_height: Height,
51 db: &ZebraDb,
52 cancel_receiver: &Receiver<CancelFormatChange>,
53 ) -> Result<(), CancelFormatChange>;
54
55 /// Check that state has been upgraded to this format correctly.
56 ///
57 /// The outer `Result` indicates whether the validation was cancelled (due to e.g. node shutdown).
58 /// The inner `Result` indicates whether the validation itself failed or not.
59 fn validate(
60 &self,
61 _db: &ZebraDb,
62 _cancel_receiver: &Receiver<CancelFormatChange>,
63 ) -> Result<Result<(), String>, CancelFormatChange> {
64 Ok(Ok(()))
65 }
66
67 /// Prepare for disk format upgrade.
68 fn prepare(
69 &self,
70 _initial_tip_height: Height,
71 _upgrade_db: &ZebraDb,
72 _cancel_receiver: &Receiver<CancelFormatChange>,
73 _older_disk_version: &Version,
74 ) -> Result<(), CancelFormatChange> {
75 Ok(())
76 }
77
78 /// Returns true if the [`DiskFormatUpgrade`] needs to run a migration on existing data in the db.
79 fn needs_migration(&self) -> bool {
80 true
81 }
82
83 /// Returns true if the upgrade is a major upgrade that can reuse the cache in the previous major db format version.
84 fn is_reusable_major_upgrade(&self) -> bool {
85 let version = self.version();
86 version.minor == 0 && version.patch == 0
87 }
88}
89
90fn format_upgrades(
91 min_version: Option<Version>,
92) -> impl DoubleEndedIterator<Item = Box<dyn DiskFormatUpgrade>> {
93 let min_version = move || min_version.clone().unwrap_or(Version::new(0, 0, 0));
94
95 // Note: Disk format upgrades must be run in order of database version.
96 ([
97 Box::new(prune_trees::PruneTrees),
98 Box::new(add_subtrees::AddSubtrees),
99 Box::new(tree_keys_and_caches_upgrade::FixTreeKeyTypeAndCacheGenesisRoots),
100 Box::new(no_migration::NoMigration::new(
101 "add value balance upgrade",
102 Version::new(26, 0, 0),
103 )),
104 Box::new(block_info_and_address_received::Upgrade),
105 ] as [Box<dyn DiskFormatUpgrade>; 5])
106 .into_iter()
107 .filter(move |upgrade| upgrade.version() > min_version())
108}
109
110/// Returns a list of all the major db format versions that can restored from the
111/// previous major database format.
112pub fn restorable_db_versions() -> Vec<u64> {
113 format_upgrades(None)
114 .filter_map(|upgrade| {
115 upgrade
116 .is_reusable_major_upgrade()
117 .then_some(upgrade.version().major)
118 })
119 .collect()
120}
121
122/// The kind of database format change or validity check we're performing.
123#[derive(Clone, Debug, Eq, PartialEq)]
124pub enum DbFormatChange {
125 // Data Format Changes
126 //
127 /// Upgrade the format from `older_disk_version` to `newer_running_version`.
128 ///
129 /// Until this upgrade is complete, the format is a mixture of both versions.
130 Upgrade {
131 older_disk_version: Version,
132 newer_running_version: Version,
133 },
134
135 // Format Version File Changes
136 //
137 /// Mark the format as newly created by `running_version`.
138 ///
139 /// Newly created databases are opened with no disk version.
140 /// It is set to the running version by the format change code.
141 NewlyCreated { running_version: Version },
142
143 /// Mark the format as downgraded from `newer_disk_version` to `older_running_version`.
144 ///
145 /// Until the state is upgraded to `newer_disk_version` by a Zebra version with that state
146 /// version (or greater), the format will be a mixture of both versions.
147 Downgrade {
148 newer_disk_version: Version,
149 older_running_version: Version,
150 },
151
152 // Data Format Checks
153 //
154 /// Check that the database from a previous instance has the current `running_version` format.
155 ///
156 /// Current version databases have a disk version that matches the running version.
157 /// No upgrades are needed, so we just run a format check on the database.
158 /// The data in that database was created or updated by a previous Zebra instance.
159 CheckOpenCurrent { running_version: Version },
160
161 /// Check that the database from this instance has the current `running_version` format.
162 ///
163 /// The data in that database was created or updated by the currently running Zebra instance.
164 /// So we periodically check for data bugs, which can happen if the upgrade and new block
165 /// code produce different data. (They can also be caused by disk corruption.)
166 CheckNewBlocksCurrent { running_version: Version },
167}
168
169/// A handle to a spawned format change thread.
170///
171/// Cloning this struct creates an additional handle to the same thread.
172///
173/// # Concurrency
174///
175/// Cancelling the thread on drop has a race condition, because two handles can be dropped at
176/// the same time.
177///
178/// If cancelling the thread is required for correct operation or usability, the owner of the
179/// handle must call force_cancel().
180#[derive(Clone, Debug)]
181pub struct DbFormatChangeThreadHandle {
182 /// A handle to the format change/check thread.
183 /// If configured, this thread continues running so it can perform periodic format checks.
184 ///
185 /// Panics from this thread are propagated into Zebra's state service.
186 /// The task returns an error if the upgrade was cancelled by a shutdown.
187 update_task: Option<Arc<JoinHandle<Result<(), CancelFormatChange>>>>,
188
189 /// A channel that tells the running format thread to finish early.
190 cancel_handle: Sender<CancelFormatChange>,
191}
192
193/// Marker type that is sent to cancel a format upgrade, and returned as an error on cancellation.
194#[derive(Copy, Clone, Debug, Eq, PartialEq)]
195pub struct CancelFormatChange;
196
197impl DbFormatChange {
198 /// Returns the format change for `running_version` code loading a `disk_version` database.
199 ///
200 /// Also logs that change at info level.
201 ///
202 /// If `disk_version` is `None`, Zebra is creating a new database.
203 pub fn open_database(running_version: &Version, disk_version: Option<Version>) -> Self {
204 let running_version = running_version.clone();
205
206 let Some(disk_version) = disk_version else {
207 info!(
208 %running_version,
209 "creating new database with the current format"
210 );
211
212 return NewlyCreated { running_version };
213 };
214
215 match disk_version.cmp_precedence(&running_version) {
216 Ordering::Less => {
217 info!(
218 %running_version,
219 %disk_version,
220 "trying to open older database format: launching upgrade task"
221 );
222
223 Upgrade {
224 older_disk_version: disk_version,
225 newer_running_version: running_version,
226 }
227 }
228 Ordering::Greater => {
229 info!(
230 %running_version,
231 %disk_version,
232 "trying to open newer database format: data should be compatible"
233 );
234
235 Downgrade {
236 newer_disk_version: disk_version,
237 older_running_version: running_version,
238 }
239 }
240 Ordering::Equal => {
241 info!(%running_version, "trying to open current database format");
242
243 CheckOpenCurrent { running_version }
244 }
245 }
246 }
247
248 /// Returns a format check for newly added blocks in the currently running Zebra version.
249 /// This check makes sure the upgrade and new block code produce the same data.
250 ///
251 /// Also logs the check at info level.
252 pub fn check_new_blocks(db: &ZebraDb) -> Self {
253 let running_version = db.format_version_in_code();
254
255 info!(%running_version, "checking new blocks were written in current database format");
256 CheckNewBlocksCurrent { running_version }
257 }
258
259 /// Returns true if this format change/check is an upgrade.
260 #[allow(dead_code)]
261 pub fn is_upgrade(&self) -> bool {
262 matches!(self, Upgrade { .. })
263 }
264
265 /// Returns true if this format change/check happens at startup.
266 #[allow(dead_code)]
267 pub fn is_run_at_startup(&self) -> bool {
268 !matches!(self, CheckNewBlocksCurrent { .. })
269 }
270
271 /// Returns the running version in this format change.
272 pub fn running_version(&self) -> Version {
273 match self {
274 Upgrade {
275 newer_running_version,
276 ..
277 } => newer_running_version,
278 Downgrade {
279 older_running_version,
280 ..
281 } => older_running_version,
282 NewlyCreated { running_version }
283 | CheckOpenCurrent { running_version }
284 | CheckNewBlocksCurrent { running_version } => running_version,
285 }
286 .clone()
287 }
288
289 /// Returns the initial database version before this format change.
290 ///
291 /// Returns `None` if the database was newly created.
292 pub fn initial_disk_version(&self) -> Option<Version> {
293 match self {
294 Upgrade {
295 older_disk_version, ..
296 } => Some(older_disk_version),
297 Downgrade {
298 newer_disk_version, ..
299 } => Some(newer_disk_version),
300 CheckOpenCurrent { running_version } | CheckNewBlocksCurrent { running_version } => {
301 Some(running_version)
302 }
303 NewlyCreated { .. } => None,
304 }
305 .cloned()
306 }
307
308 /// Launch a `std::thread` that applies this format change to the database,
309 /// then continues running to perform periodic format checks.
310 ///
311 /// `initial_tip_height` is the database height when it was opened, and `db` is the
312 /// database instance to upgrade or check.
313 pub fn spawn_format_change(
314 self,
315 db: ZebraDb,
316 initial_tip_height: Option<Height>,
317 ) -> DbFormatChangeThreadHandle {
318 // # Correctness
319 //
320 // Cancel handles must use try_send() to avoid blocking waiting for the format change
321 // thread to shut down.
322 let (cancel_handle, cancel_receiver) = bounded(1);
323
324 let span = Span::current();
325 let update_task = thread::spawn(move || {
326 span.in_scope(move || {
327 self.format_change_run_loop(db, initial_tip_height, cancel_receiver)
328 })
329 });
330
331 let mut handle = DbFormatChangeThreadHandle {
332 update_task: Some(Arc::new(update_task)),
333 cancel_handle,
334 };
335
336 handle.check_for_panics();
337
338 handle
339 }
340
341 /// Run the initial format change or check to the database. Under the default runtime config,
342 /// this method returns after the format change or check.
343 ///
344 /// But if runtime validity checks are enabled, this method periodically checks the format of
345 /// newly added blocks matches the current format. It will run until it is cancelled or panics.
346 fn format_change_run_loop(
347 self,
348 db: ZebraDb,
349 initial_tip_height: Option<Height>,
350 cancel_receiver: Receiver<CancelFormatChange>,
351 ) -> Result<(), CancelFormatChange> {
352 self.run_format_change_or_check(&db, initial_tip_height, &cancel_receiver)?;
353
354 let Some(debug_validity_check_interval) = db.config().debug_validity_check_interval else {
355 return Ok(());
356 };
357
358 loop {
359 // We've just run a format check, so sleep first, then run another one.
360 // But return early if there is a cancel signal.
361 if !matches!(
362 cancel_receiver.recv_timeout(debug_validity_check_interval),
363 Err(RecvTimeoutError::Timeout)
364 ) {
365 return Err(CancelFormatChange);
366 }
367
368 Self::check_new_blocks(&db).run_format_change_or_check(
369 &db,
370 initial_tip_height,
371 &cancel_receiver,
372 )?;
373 }
374 }
375
376 /// Run a format change in the database, or check the format of the database once.
377 #[allow(clippy::unwrap_in_result)]
378 pub(crate) fn run_format_change_or_check(
379 &self,
380 db: &ZebraDb,
381 initial_tip_height: Option<Height>,
382 cancel_receiver: &Receiver<CancelFormatChange>,
383 ) -> Result<(), CancelFormatChange> {
384 // Mark the database as having finished applying any format upgrades if there are no
385 // format upgrades that need to be applied.
386 if !self.is_upgrade() {
387 db.mark_finished_format_upgrades();
388 }
389
390 match self {
391 // Perform any required upgrades, then mark the state as upgraded.
392 Upgrade { .. } => {
393 self.apply_format_upgrade(db, initial_tip_height, cancel_receiver)?;
394 db.mark_finished_format_upgrades();
395 }
396
397 NewlyCreated { .. } => {
398 Self::mark_as_newly_created(db);
399 }
400
401 Downgrade { .. } => {
402 // # Correctness
403 //
404 // At the start of a format downgrade, the database must be marked as partially or
405 // fully downgraded. This lets newer Zebra versions know that some blocks with older
406 // formats have been added to the database.
407 Self::mark_as_downgraded(db);
408
409 // Older supported versions just assume they can read newer formats,
410 // because they can't predict all changes a newer Zebra version could make.
411 //
412 // The responsibility of staying backwards-compatible is on the newer version.
413 // We do this on a best-effort basis for versions that are still supported.
414 }
415
416 CheckOpenCurrent { running_version } => {
417 // If we're re-opening a previously upgraded or newly created database,
418 // the database format should be valid. This check is done below.
419 info!(
420 %running_version,
421 "checking database format produced by a previous zebra instance \
422 is current and valid"
423 );
424 }
425
426 CheckNewBlocksCurrent { running_version } => {
427 // If we've added new blocks using the non-upgrade code,
428 // the database format should be valid. This check is done below.
429 //
430 // TODO: should this check panic or just log an error?
431 // Currently, we panic to avoid consensus bugs, but this could cause a denial
432 // of service. We can make errors fail in CI using ZEBRA_FAILURE_MESSAGES.
433 info!(
434 %running_version,
435 "checking database format produced by new blocks in this instance is valid"
436 );
437 }
438 }
439
440 #[cfg(feature = "indexer")]
441 if let (
442 Upgrade { .. } | CheckOpenCurrent { .. } | Downgrade { .. },
443 Some(initial_tip_height),
444 ) = (self, initial_tip_height)
445 {
446 // Indexing transaction locations by their spent outpoints and revealed nullifiers.
447 let timer = CodeTimer::start();
448
449 // Add build metadata to on-disk version file just before starting to add indexes
450 let mut version = db
451 .format_version_on_disk()
452 .expect("unable to read database format version file")
453 .expect("should write database format version file above");
454 version.build = db.format_version_in_code().build;
455
456 db.update_format_version_on_disk(&version)
457 .expect("unable to write database format version file to disk");
458
459 info!("started checking/adding indexes for spending tx ids");
460 track_tx_locs_by_spends::run(initial_tip_height, db, cancel_receiver)?;
461 info!("finished checking/adding indexes for spending tx ids");
462
463 timer.finish_desc("indexing spending transaction ids");
464 };
465
466 #[cfg(not(feature = "indexer"))]
467 if let (
468 Upgrade { .. } | CheckOpenCurrent { .. } | Downgrade { .. },
469 Some(initial_tip_height),
470 ) = (self, initial_tip_height)
471 {
472 let mut version = db
473 .format_version_on_disk()
474 .expect("unable to read database format version file")
475 .expect("should write database format version file above");
476
477 if version.build.contains("indexer") {
478 // Indexing transaction locations by their spent outpoints and revealed nullifiers.
479 let timer = CodeTimer::start();
480
481 info!("started removing indexes for spending tx ids");
482 drop_tx_locs_by_spends::run(initial_tip_height, db, cancel_receiver)?;
483 info!("finished removing indexes for spending tx ids");
484
485 // Remove build metadata to on-disk version file after indexes have been dropped.
486 version.build = db.format_version_in_code().build;
487 db.update_format_version_on_disk(&version)
488 .expect("unable to write database format version file to disk");
489
490 timer.finish_desc("removing spending transaction ids");
491 }
492 };
493
494 // These checks should pass for all format changes:
495 // - upgrades should produce a valid format (and they already do that check)
496 // - an empty state should pass all the format checks
497 // - since the running Zebra code knows how to upgrade the database to this format,
498 // downgrades using this running code still know how to create a valid database
499 // (unless a future upgrade breaks these format checks)
500 // - re-opening the current version should be valid, regardless of whether the upgrade
501 // or new block code created the format (or any combination).
502 Self::format_validity_checks_detailed(db, cancel_receiver)?.unwrap_or_else(|_| {
503 panic!(
504 "unexpected invalid database format: delete and re-sync the database at '{:?}'",
505 db.path()
506 )
507 });
508
509 let initial_disk_version = self
510 .initial_disk_version()
511 .map_or_else(|| "None".to_string(), |version| version.to_string());
512 info!(
513 running_version = %self.running_version(),
514 %initial_disk_version,
515 "database format is valid"
516 );
517
518 Ok(())
519 }
520
521 // TODO: Move state-specific upgrade code to a finalized_state/* module.
522
523 /// Apply any required format updates to the database.
524 /// Format changes should be launched in an independent `std::thread`.
525 ///
526 /// If `cancel_receiver` gets a message, or its sender is dropped,
527 /// the format change stops running early, and returns an error.
528 ///
529 /// See the format upgrade design docs for more details:
530 /// <https://github.com/ZcashFoundation/zebra/blob/main/book/src/dev/state-db-upgrades.md#design>
531 //
532 // New format upgrades must be added to the *end* of this method.
533 #[allow(clippy::unwrap_in_result)]
534 fn apply_format_upgrade(
535 &self,
536 db: &ZebraDb,
537 initial_tip_height: Option<Height>,
538 cancel_receiver: &Receiver<CancelFormatChange>,
539 ) -> Result<(), CancelFormatChange> {
540 let Upgrade {
541 newer_running_version,
542 older_disk_version,
543 } = self
544 else {
545 unreachable!("already checked for Upgrade")
546 };
547
548 // # New Upgrades Sometimes Go Here
549 //
550 // If the format change is outside RocksDb, put new code above this comment!
551 let Some(initial_tip_height) = initial_tip_height else {
552 // If the database is empty, then the RocksDb format doesn't need any changes.
553 info!(
554 %newer_running_version,
555 %older_disk_version,
556 "marking empty database as upgraded"
557 );
558
559 Self::mark_as_upgraded_to(db, newer_running_version);
560
561 info!(
562 %newer_running_version,
563 %older_disk_version,
564 "empty database is fully upgraded"
565 );
566
567 return Ok(());
568 };
569
570 // Apply or validate format upgrades
571 for upgrade in format_upgrades(Some(older_disk_version.clone())) {
572 if upgrade.needs_migration() {
573 let timer = CodeTimer::start();
574
575 upgrade.prepare(initial_tip_height, db, cancel_receiver, older_disk_version)?;
576 upgrade.run(initial_tip_height, db, cancel_receiver)?;
577
578 // Before marking the state as upgraded, check that the upgrade completed successfully.
579 upgrade
580 .validate(db, cancel_receiver)?
581 .expect("db should be valid after upgrade");
582
583 timer.finish_desc(upgrade.description());
584 }
585
586 // Mark the database as upgraded. Zebra won't repeat the upgrade anymore once the
587 // database is marked, so the upgrade MUST be complete at this point.
588 info!(
589 newer_running_version = ?upgrade.version(),
590 "Zebra automatically upgraded the database format"
591 );
592 Self::mark_as_upgraded_to(db, &upgrade.version());
593 }
594
595 Ok(())
596 }
597
598 /// Run quick checks that the current database format is valid.
599 #[allow(clippy::vec_init_then_push)]
600 pub fn format_validity_checks_quick(db: &ZebraDb) -> Result<(), String> {
601 let timer = CodeTimer::start();
602 let mut results = Vec::new();
603
604 // Check the entire format before returning any errors.
605 results.push(db.check_max_on_disk_tip_height());
606
607 // This check can be run before the upgrade, but the upgrade code is finished, so we don't
608 // run it early any more. (If future code changes accidentally make it depend on the
609 // upgrade, they would accidentally break compatibility with older Zebra cached states.)
610 results.push(add_subtrees::subtree_format_calculation_pre_checks(db));
611
612 results.push(cache_genesis_roots::quick_check(db));
613 results.push(fix_tree_key_type::quick_check(db));
614
615 // The work is done in the functions we just called.
616 timer.finish_desc("format_validity_checks_quick()");
617
618 if results.iter().any(Result::is_err) {
619 let err = Err(format!("invalid quick check: {results:?}"));
620 error!(?err);
621 return err;
622 }
623
624 Ok(())
625 }
626
627 /// Run detailed checks that the current database format is valid.
628 #[allow(clippy::vec_init_then_push)]
629 pub fn format_validity_checks_detailed(
630 db: &ZebraDb,
631 cancel_receiver: &Receiver<CancelFormatChange>,
632 ) -> Result<Result<(), String>, CancelFormatChange> {
633 let timer = CodeTimer::start();
634 let mut results = Vec::new();
635
636 // Check the entire format before returning any errors.
637 //
638 // Do the quick checks first, so we don't have to do this in every detailed check.
639 results.push(Self::format_validity_checks_quick(db));
640
641 for upgrade in format_upgrades(None) {
642 results.push(upgrade.validate(db, cancel_receiver)?);
643 }
644
645 // The work is done in the functions we just called.
646 timer.finish_desc("format_validity_checks_detailed()");
647
648 if results.iter().any(Result::is_err) {
649 let err = Err(format!("invalid detailed check: {results:?}"));
650 error!(?err);
651 return Ok(err);
652 }
653
654 Ok(Ok(()))
655 }
656
657 /// Mark a newly created database with the current format version.
658 ///
659 /// This should be called when a newly created database is opened.
660 ///
661 /// # Concurrency
662 ///
663 /// The version must only be updated while RocksDB is holding the database
664 /// directory lock. This prevents multiple Zebra instances corrupting the version
665 /// file.
666 ///
667 /// # Panics
668 ///
669 /// If the format should not have been upgraded, because the database is not newly created.
670 fn mark_as_newly_created(db: &ZebraDb) {
671 let running_version = db.format_version_in_code();
672 let disk_version = db
673 .format_version_on_disk()
674 .expect("unable to read database format version file path");
675
676 let default_new_version = Some(Version::new(running_version.major, 0, 0));
677
678 // The database version isn't empty any more, because we've created the RocksDB database
679 // and acquired its lock. (If it is empty, we have a database locking bug.)
680 assert_eq!(
681 disk_version, default_new_version,
682 "can't overwrite the format version in an existing database:\n\
683 disk: {disk_version:?}\n\
684 running: {running_version}"
685 );
686
687 db.update_format_version_on_disk(&running_version)
688 .expect("unable to write database format version file to disk");
689
690 info!(
691 %running_version,
692 disk_version = %disk_version.map_or("None".to_string(), |version| version.to_string()),
693 "marked database format as newly created"
694 );
695 }
696
697 /// Mark the database as upgraded to `format_upgrade_version`.
698 ///
699 /// This should be called when an older database is opened by an older Zebra version,
700 /// after each version upgrade is complete.
701 ///
702 /// # Concurrency
703 ///
704 /// The version must only be updated while RocksDB is holding the database
705 /// directory lock. This prevents multiple Zebra instances corrupting the version
706 /// file.
707 ///
708 /// # Panics
709 ///
710 /// If the format should not have been upgraded, because the running version is:
711 /// - older than the disk version (that's a downgrade)
712 /// - the same as to the disk version (no upgrade needed)
713 ///
714 /// If the format should not have been upgraded, because the format upgrade version is:
715 /// - older or the same as the disk version
716 /// (multiple upgrades to the same version are not allowed)
717 /// - greater than the running version (that's a logic bug)
718 fn mark_as_upgraded_to(db: &ZebraDb, format_upgrade_version: &Version) {
719 let running_version = db.format_version_in_code();
720 let disk_version = db
721 .format_version_on_disk()
722 .expect("unable to read database format version file")
723 .expect("tried to upgrade a newly created database");
724
725 assert!(
726 running_version > disk_version,
727 "can't upgrade a database that is being opened by an older or the same Zebra version:\n\
728 disk: {disk_version}\n\
729 upgrade: {format_upgrade_version}\n\
730 running: {running_version}"
731 );
732
733 assert!(
734 format_upgrade_version > &disk_version,
735 "can't upgrade a database that has already been upgraded, or is newer:\n\
736 disk: {disk_version}\n\
737 upgrade: {format_upgrade_version}\n\
738 running: {running_version}"
739 );
740
741 assert!(
742 format_upgrade_version <= &running_version,
743 "can't upgrade to a newer version than the running Zebra version:\n\
744 disk: {disk_version}\n\
745 upgrade: {format_upgrade_version}\n\
746 running: {running_version}"
747 );
748
749 db.update_format_version_on_disk(format_upgrade_version)
750 .expect("unable to write database format version file to disk");
751
752 info!(
753 %running_version,
754 %disk_version,
755 // wait_for_state_version_upgrade() needs this to be the last field,
756 // so the regex matches correctly
757 %format_upgrade_version,
758 "marked database format as upgraded"
759 );
760 }
761
762 /// Mark the database as downgraded to the running database version.
763 /// This should be called after a newer database is opened by an older Zebra version.
764 ///
765 /// # Concurrency
766 ///
767 /// The version must only be updated while RocksDB is holding the database
768 /// directory lock. This prevents multiple Zebra instances corrupting the version
769 /// file.
770 ///
771 /// # Panics
772 ///
773 /// If the format should have been upgraded, because the running version is newer.
774 /// If the state is newly created, because the running version should be the same.
775 ///
776 /// Multiple downgrades are allowed, because they all downgrade to the same running version.
777 fn mark_as_downgraded(db: &ZebraDb) {
778 let running_version = db.format_version_in_code();
779 let disk_version = db
780 .format_version_on_disk()
781 .expect("unable to read database format version file")
782 .expect("can't downgrade a newly created database");
783
784 assert!(
785 disk_version >= running_version,
786 "can't downgrade a database that is being opened by a newer Zebra version:\n\
787 disk: {disk_version}\n\
788 running: {running_version}"
789 );
790
791 db.update_format_version_on_disk(&running_version)
792 .expect("unable to write database format version file to disk");
793
794 info!(
795 %running_version,
796 %disk_version,
797 "marked database format as downgraded"
798 );
799 }
800}
801
802impl DbFormatChangeThreadHandle {
803 /// Cancel the running format change thread, if this is the last handle.
804 /// Returns true if it was actually cancelled.
805 pub fn cancel_if_needed(&self) -> bool {
806 // # Correctness
807 //
808 // Checking the strong count has a race condition, because two handles can be dropped at
809 // the same time.
810 //
811 // If cancelling the thread is important, the owner of the handle must call force_cancel().
812 if let Some(update_task) = self.update_task.as_ref() {
813 if Arc::strong_count(update_task) <= 1 {
814 self.force_cancel();
815 return true;
816 }
817 }
818
819 false
820 }
821
822 /// Force the running format change thread to cancel, even if there are other handles.
823 pub fn force_cancel(&self) {
824 // There's nothing we can do about errors here.
825 // If the channel is disconnected, the task has exited.
826 // If it's full, it's already been cancelled.
827 let _ = self.cancel_handle.try_send(CancelFormatChange);
828 }
829
830 /// Check for panics in the code running in the spawned thread.
831 /// If the thread exited with a panic, resume that panic.
832 ///
833 /// This method should be called regularly, so that panics are detected as soon as possible.
834 pub fn check_for_panics(&mut self) {
835 self.update_task.panic_if_task_has_panicked();
836 }
837
838 /// Wait for the spawned thread to finish. If it exited with a panic, resume that panic.
839 ///
840 /// Exits early if the thread has other outstanding handles.
841 ///
842 /// This method should be called during shutdown.
843 pub fn wait_for_panics(&mut self) {
844 self.update_task.wait_for_panics();
845 }
846}
847
848impl Drop for DbFormatChangeThreadHandle {
849 fn drop(&mut self) {
850 // Only cancel the format change if the state service is shutting down.
851 if self.cancel_if_needed() {
852 self.wait_for_panics();
853 } else {
854 self.check_for_panics();
855 }
856 }
857}
858
859#[test]
860fn format_upgrades_are_in_version_order() {
861 let mut last_version = Version::new(0, 0, 0);
862 for upgrade in format_upgrades(None) {
863 assert!(upgrade.version() > last_version);
864 last_version = upgrade.version();
865 }
866}