Skip to main content

zebra_state/service/finalized_state/disk_format/
upgrade.rs

1//! In-place format upgrades and format validity checks for the Zebra state database.
2
3use std::{
4    cmp::Ordering,
5    sync::Arc,
6    thread::{self, JoinHandle},
7};
8
9use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender};
10use semver::Version;
11use tracing::Span;
12
13use zebra_chain::{
14    block::Height,
15    diagnostic::{
16        task::{CheckForPanics, WaitForPanics},
17        CodeTimer,
18    },
19};
20
21use DbFormatChange::*;
22
23use crate::service::finalized_state::ZebraDb;
24
25pub(crate) mod add_subtrees;
26pub(crate) mod block_info_and_address_received;
27pub(crate) mod cache_genesis_roots;
28pub(crate) mod fix_tree_key_type;
29pub(crate) mod no_migration;
30pub(crate) mod prune_trees;
31pub(crate) mod tree_keys_and_caches_upgrade;
32
33#[cfg(not(feature = "indexer"))]
34pub(crate) mod drop_tx_locs_by_spends;
35
36#[cfg(feature = "indexer")]
37pub(crate) mod track_tx_locs_by_spends;
38
39/// Defines method signature for running disk format upgrades.
40pub trait DiskFormatUpgrade {
41    /// Returns the version at which this upgrade is applied.
42    fn version(&self) -> Version;
43
44    /// Returns the description of this upgrade.
45    fn description(&self) -> &'static str;
46
47    /// Runs disk format upgrade.
48    fn run(
49        &self,
50        initial_tip_height: Height,
51        db: &ZebraDb,
52        cancel_receiver: &Receiver<CancelFormatChange>,
53    ) -> Result<(), CancelFormatChange>;
54
55    /// Check that state has been upgraded to this format correctly.
56    ///
57    /// The outer `Result` indicates whether the validation was cancelled (due to e.g. node shutdown).
58    /// The inner `Result` indicates whether the validation itself failed or not.
59    fn validate(
60        &self,
61        _db: &ZebraDb,
62        _cancel_receiver: &Receiver<CancelFormatChange>,
63    ) -> Result<Result<(), String>, CancelFormatChange> {
64        Ok(Ok(()))
65    }
66
67    /// Prepare for disk format upgrade.
68    fn prepare(
69        &self,
70        _initial_tip_height: Height,
71        _upgrade_db: &ZebraDb,
72        _cancel_receiver: &Receiver<CancelFormatChange>,
73        _older_disk_version: &Version,
74    ) -> Result<(), CancelFormatChange> {
75        Ok(())
76    }
77
78    /// Returns true if the [`DiskFormatUpgrade`] needs to run a migration on existing data in the db.
79    fn needs_migration(&self) -> bool {
80        true
81    }
82
83    /// Returns true if the upgrade is a major upgrade that can reuse the cache in the previous major db format version.
84    fn is_reusable_major_upgrade(&self) -> bool {
85        let version = self.version();
86        version.minor == 0 && version.patch == 0
87    }
88}
89
90fn format_upgrades(
91    min_version: Option<Version>,
92) -> impl DoubleEndedIterator<Item = Box<dyn DiskFormatUpgrade>> {
93    let min_version = move || min_version.clone().unwrap_or(Version::new(0, 0, 0));
94
95    // Note: Disk format upgrades must be run in order of database version.
96    ([
97        Box::new(prune_trees::PruneTrees),
98        Box::new(add_subtrees::AddSubtrees),
99        Box::new(tree_keys_and_caches_upgrade::FixTreeKeyTypeAndCacheGenesisRoots),
100        Box::new(no_migration::NoMigration::new(
101            "add value balance upgrade",
102            Version::new(26, 0, 0),
103        )),
104        Box::new(block_info_and_address_received::Upgrade),
105    ] as [Box<dyn DiskFormatUpgrade>; 5])
106        .into_iter()
107        .filter(move |upgrade| upgrade.version() > min_version())
108}
109
110/// Returns a list of all the major db format versions that can restored from the
111/// previous major database format.
112pub fn restorable_db_versions() -> Vec<u64> {
113    format_upgrades(None)
114        .filter_map(|upgrade| {
115            upgrade
116                .is_reusable_major_upgrade()
117                .then_some(upgrade.version().major)
118        })
119        .collect()
120}
121
122/// The kind of database format change or validity check we're performing.
123#[derive(Clone, Debug, Eq, PartialEq)]
124pub enum DbFormatChange {
125    // Data Format Changes
126    //
127    /// Upgrade the format from `older_disk_version` to `newer_running_version`.
128    ///
129    /// Until this upgrade is complete, the format is a mixture of both versions.
130    Upgrade {
131        older_disk_version: Version,
132        newer_running_version: Version,
133    },
134
135    // Format Version File Changes
136    //
137    /// Mark the format as newly created by `running_version`.
138    ///
139    /// Newly created databases are opened with no disk version.
140    /// It is set to the running version by the format change code.
141    NewlyCreated { running_version: Version },
142
143    /// Mark the format as downgraded from `newer_disk_version` to `older_running_version`.
144    ///
145    /// Until the state is upgraded to `newer_disk_version` by a Zebra version with that state
146    /// version (or greater), the format will be a mixture of both versions.
147    Downgrade {
148        newer_disk_version: Version,
149        older_running_version: Version,
150    },
151
152    // Data Format Checks
153    //
154    /// Check that the database from a previous instance has the current `running_version` format.
155    ///
156    /// Current version databases have a disk version that matches the running version.
157    /// No upgrades are needed, so we just run a format check on the database.
158    /// The data in that database was created or updated by a previous Zebra instance.
159    CheckOpenCurrent { running_version: Version },
160
161    /// Check that the database from this instance has the current `running_version` format.
162    ///
163    /// The data in that database was created or updated by the currently running Zebra instance.
164    /// So we periodically check for data bugs, which can happen if the upgrade and new block
165    /// code produce different data. (They can also be caused by disk corruption.)
166    CheckNewBlocksCurrent { running_version: Version },
167}
168
169/// A handle to a spawned format change thread.
170///
171/// Cloning this struct creates an additional handle to the same thread.
172///
173/// # Concurrency
174///
175/// Cancelling the thread on drop has a race condition, because two handles can be dropped at
176/// the same time.
177///
178/// If cancelling the thread is required for correct operation or usability, the owner of the
179/// handle must call force_cancel().
180#[derive(Clone, Debug)]
181pub struct DbFormatChangeThreadHandle {
182    /// A handle to the format change/check thread.
183    /// If configured, this thread continues running so it can perform periodic format checks.
184    ///
185    /// Panics from this thread are propagated into Zebra's state service.
186    /// The task returns an error if the upgrade was cancelled by a shutdown.
187    update_task: Option<Arc<JoinHandle<Result<(), CancelFormatChange>>>>,
188
189    /// A channel that tells the running format thread to finish early.
190    cancel_handle: Sender<CancelFormatChange>,
191}
192
193/// Marker type that is sent to cancel a format upgrade, and returned as an error on cancellation.
194#[derive(Copy, Clone, Debug, Eq, PartialEq)]
195pub struct CancelFormatChange;
196
197impl DbFormatChange {
198    /// Returns the format change for `running_version` code loading a `disk_version` database.
199    ///
200    /// Also logs that change at info level.
201    ///
202    /// If `disk_version` is `None`, Zebra is creating a new database.
203    pub fn open_database(running_version: &Version, disk_version: Option<Version>) -> Self {
204        let running_version = running_version.clone();
205
206        let Some(disk_version) = disk_version else {
207            info!(
208                %running_version,
209                "creating new database with the current format"
210            );
211
212            return NewlyCreated { running_version };
213        };
214
215        match disk_version.cmp_precedence(&running_version) {
216            Ordering::Less => {
217                info!(
218                    %running_version,
219                    %disk_version,
220                    "trying to open older database format: launching upgrade task"
221                );
222
223                Upgrade {
224                    older_disk_version: disk_version,
225                    newer_running_version: running_version,
226                }
227            }
228            Ordering::Greater => {
229                info!(
230                    %running_version,
231                    %disk_version,
232                    "trying to open newer database format: data should be compatible"
233                );
234
235                Downgrade {
236                    newer_disk_version: disk_version,
237                    older_running_version: running_version,
238                }
239            }
240            Ordering::Equal => {
241                info!(%running_version, "trying to open current database format");
242
243                CheckOpenCurrent { running_version }
244            }
245        }
246    }
247
248    /// Returns a format check for newly added blocks in the currently running Zebra version.
249    /// This check makes sure the upgrade and new block code produce the same data.
250    ///
251    /// Also logs the check at info level.
252    pub fn check_new_blocks(db: &ZebraDb) -> Self {
253        let running_version = db.format_version_in_code();
254
255        info!(%running_version, "checking new blocks were written in current database format");
256        CheckNewBlocksCurrent { running_version }
257    }
258
259    /// Returns true if this format change/check is an upgrade.
260    #[allow(dead_code)]
261    pub fn is_upgrade(&self) -> bool {
262        matches!(self, Upgrade { .. })
263    }
264
265    /// Returns true if this format change/check happens at startup.
266    #[allow(dead_code)]
267    pub fn is_run_at_startup(&self) -> bool {
268        !matches!(self, CheckNewBlocksCurrent { .. })
269    }
270
271    /// Returns the running version in this format change.
272    pub fn running_version(&self) -> Version {
273        match self {
274            Upgrade {
275                newer_running_version,
276                ..
277            } => newer_running_version,
278            Downgrade {
279                older_running_version,
280                ..
281            } => older_running_version,
282            NewlyCreated { running_version }
283            | CheckOpenCurrent { running_version }
284            | CheckNewBlocksCurrent { running_version } => running_version,
285        }
286        .clone()
287    }
288
289    /// Returns the initial database version before this format change.
290    ///
291    /// Returns `None` if the database was newly created.
292    pub fn initial_disk_version(&self) -> Option<Version> {
293        match self {
294            Upgrade {
295                older_disk_version, ..
296            } => Some(older_disk_version),
297            Downgrade {
298                newer_disk_version, ..
299            } => Some(newer_disk_version),
300            CheckOpenCurrent { running_version } | CheckNewBlocksCurrent { running_version } => {
301                Some(running_version)
302            }
303            NewlyCreated { .. } => None,
304        }
305        .cloned()
306    }
307
308    /// Launch a `std::thread` that applies this format change to the database,
309    /// then continues running to perform periodic format checks.
310    ///
311    /// `initial_tip_height` is the database height when it was opened, and `db` is the
312    /// database instance to upgrade or check.
313    pub fn spawn_format_change(
314        self,
315        db: ZebraDb,
316        initial_tip_height: Option<Height>,
317    ) -> DbFormatChangeThreadHandle {
318        // # Correctness
319        //
320        // Cancel handles must use try_send() to avoid blocking waiting for the format change
321        // thread to shut down.
322        let (cancel_handle, cancel_receiver) = bounded(1);
323
324        let span = Span::current();
325        let update_task = thread::spawn(move || {
326            span.in_scope(move || {
327                self.format_change_run_loop(db, initial_tip_height, cancel_receiver)
328            })
329        });
330
331        let mut handle = DbFormatChangeThreadHandle {
332            update_task: Some(Arc::new(update_task)),
333            cancel_handle,
334        };
335
336        handle.check_for_panics();
337
338        handle
339    }
340
341    /// Run the initial format change or check to the database. Under the default runtime config,
342    /// this method returns after the format change or check.
343    ///
344    /// But if runtime validity checks are enabled, this method periodically checks the format of
345    /// newly added blocks matches the current format. It will run until it is cancelled or panics.
346    fn format_change_run_loop(
347        self,
348        db: ZebraDb,
349        initial_tip_height: Option<Height>,
350        cancel_receiver: Receiver<CancelFormatChange>,
351    ) -> Result<(), CancelFormatChange> {
352        self.run_format_change_or_check(&db, initial_tip_height, &cancel_receiver)?;
353
354        let Some(debug_validity_check_interval) = db.config().debug_validity_check_interval else {
355            return Ok(());
356        };
357
358        loop {
359            // We've just run a format check, so sleep first, then run another one.
360            // But return early if there is a cancel signal.
361            if !matches!(
362                cancel_receiver.recv_timeout(debug_validity_check_interval),
363                Err(RecvTimeoutError::Timeout)
364            ) {
365                return Err(CancelFormatChange);
366            }
367
368            Self::check_new_blocks(&db).run_format_change_or_check(
369                &db,
370                initial_tip_height,
371                &cancel_receiver,
372            )?;
373        }
374    }
375
376    /// Run a format change in the database, or check the format of the database once.
377    #[allow(clippy::unwrap_in_result)]
378    pub(crate) fn run_format_change_or_check(
379        &self,
380        db: &ZebraDb,
381        initial_tip_height: Option<Height>,
382        cancel_receiver: &Receiver<CancelFormatChange>,
383    ) -> Result<(), CancelFormatChange> {
384        // Mark the database as having finished applying any format upgrades if there are no
385        // format upgrades that need to be applied.
386        if !self.is_upgrade() {
387            db.mark_finished_format_upgrades();
388        }
389
390        match self {
391            // Perform any required upgrades, then mark the state as upgraded.
392            Upgrade { .. } => {
393                self.apply_format_upgrade(db, initial_tip_height, cancel_receiver)?;
394                db.mark_finished_format_upgrades();
395            }
396
397            NewlyCreated { .. } => {
398                Self::mark_as_newly_created(db);
399            }
400
401            Downgrade { .. } => {
402                // # Correctness
403                //
404                // At the start of a format downgrade, the database must be marked as partially or
405                // fully downgraded. This lets newer Zebra versions know that some blocks with older
406                // formats have been added to the database.
407                Self::mark_as_downgraded(db);
408
409                // Older supported versions just assume they can read newer formats,
410                // because they can't predict all changes a newer Zebra version could make.
411                //
412                // The responsibility of staying backwards-compatible is on the newer version.
413                // We do this on a best-effort basis for versions that are still supported.
414            }
415
416            CheckOpenCurrent { running_version } => {
417                // If we're re-opening a previously upgraded or newly created database,
418                // the database format should be valid. This check is done below.
419                info!(
420                    %running_version,
421                    "checking database format produced by a previous zebra instance \
422                     is current and valid"
423                );
424            }
425
426            CheckNewBlocksCurrent { running_version } => {
427                // If we've added new blocks using the non-upgrade code,
428                // the database format should be valid. This check is done below.
429                //
430                // TODO: should this check panic or just log an error?
431                //       Currently, we panic to avoid consensus bugs, but this could cause a denial
432                //       of service. We can make errors fail in CI using ZEBRA_FAILURE_MESSAGES.
433                info!(
434                    %running_version,
435                    "checking database format produced by new blocks in this instance is valid"
436                );
437            }
438        }
439
440        #[cfg(feature = "indexer")]
441        if let (
442            Upgrade { .. } | CheckOpenCurrent { .. } | Downgrade { .. },
443            Some(initial_tip_height),
444        ) = (self, initial_tip_height)
445        {
446            // Indexing transaction locations by their spent outpoints and revealed nullifiers.
447            let timer = CodeTimer::start();
448
449            // Add build metadata to on-disk version file just before starting to add indexes
450            let mut version = db
451                .format_version_on_disk()
452                .expect("unable to read database format version file")
453                .expect("should write database format version file above");
454            version.build = db.format_version_in_code().build;
455
456            db.update_format_version_on_disk(&version)
457                .expect("unable to write database format version file to disk");
458
459            info!("started checking/adding indexes for spending tx ids");
460            track_tx_locs_by_spends::run(initial_tip_height, db, cancel_receiver)?;
461            info!("finished checking/adding indexes for spending tx ids");
462
463            timer.finish_desc("indexing spending transaction ids");
464        };
465
466        #[cfg(not(feature = "indexer"))]
467        if let (
468            Upgrade { .. } | CheckOpenCurrent { .. } | Downgrade { .. },
469            Some(initial_tip_height),
470        ) = (self, initial_tip_height)
471        {
472            let mut version = db
473                .format_version_on_disk()
474                .expect("unable to read database format version file")
475                .expect("should write database format version file above");
476
477            if version.build.contains("indexer") {
478                // Indexing transaction locations by their spent outpoints and revealed nullifiers.
479                let timer = CodeTimer::start();
480
481                info!("started removing indexes for spending tx ids");
482                drop_tx_locs_by_spends::run(initial_tip_height, db, cancel_receiver)?;
483                info!("finished removing indexes for spending tx ids");
484
485                // Remove build metadata to on-disk version file after indexes have been dropped.
486                version.build = db.format_version_in_code().build;
487                db.update_format_version_on_disk(&version)
488                    .expect("unable to write database format version file to disk");
489
490                timer.finish_desc("removing spending transaction ids");
491            }
492        };
493
494        // These checks should pass for all format changes:
495        // - upgrades should produce a valid format (and they already do that check)
496        // - an empty state should pass all the format checks
497        // - since the running Zebra code knows how to upgrade the database to this format,
498        //   downgrades using this running code still know how to create a valid database
499        //   (unless a future upgrade breaks these format checks)
500        // - re-opening the current version should be valid, regardless of whether the upgrade
501        //   or new block code created the format (or any combination).
502        Self::format_validity_checks_detailed(db, cancel_receiver)?.unwrap_or_else(|_| {
503            panic!(
504                "unexpected invalid database format: delete and re-sync the database at '{:?}'",
505                db.path()
506            )
507        });
508
509        let initial_disk_version = self
510            .initial_disk_version()
511            .map_or_else(|| "None".to_string(), |version| version.to_string());
512        info!(
513            running_version = %self.running_version(),
514            %initial_disk_version,
515            "database format is valid"
516        );
517
518        Ok(())
519    }
520
521    // TODO: Move state-specific upgrade code to a finalized_state/* module.
522
523    /// Apply any required format updates to the database.
524    /// Format changes should be launched in an independent `std::thread`.
525    ///
526    /// If `cancel_receiver` gets a message, or its sender is dropped,
527    /// the format change stops running early, and returns an error.
528    ///
529    /// See the format upgrade design docs for more details:
530    /// <https://github.com/ZcashFoundation/zebra/blob/main/book/src/dev/state-db-upgrades.md#design>
531    //
532    // New format upgrades must be added to the *end* of this method.
533    #[allow(clippy::unwrap_in_result)]
534    fn apply_format_upgrade(
535        &self,
536        db: &ZebraDb,
537        initial_tip_height: Option<Height>,
538        cancel_receiver: &Receiver<CancelFormatChange>,
539    ) -> Result<(), CancelFormatChange> {
540        let Upgrade {
541            newer_running_version,
542            older_disk_version,
543        } = self
544        else {
545            unreachable!("already checked for Upgrade")
546        };
547
548        // # New Upgrades Sometimes Go Here
549        //
550        // If the format change is outside RocksDb, put new code above this comment!
551        let Some(initial_tip_height) = initial_tip_height else {
552            // If the database is empty, then the RocksDb format doesn't need any changes.
553            info!(
554                %newer_running_version,
555                %older_disk_version,
556                "marking empty database as upgraded"
557            );
558
559            Self::mark_as_upgraded_to(db, newer_running_version);
560
561            info!(
562                %newer_running_version,
563                %older_disk_version,
564                "empty database is fully upgraded"
565            );
566
567            return Ok(());
568        };
569
570        // Apply or validate format upgrades
571        for upgrade in format_upgrades(Some(older_disk_version.clone())) {
572            if upgrade.needs_migration() {
573                let timer = CodeTimer::start();
574
575                upgrade.prepare(initial_tip_height, db, cancel_receiver, older_disk_version)?;
576                upgrade.run(initial_tip_height, db, cancel_receiver)?;
577
578                // Before marking the state as upgraded, check that the upgrade completed successfully.
579                upgrade
580                    .validate(db, cancel_receiver)?
581                    .expect("db should be valid after upgrade");
582
583                timer.finish_desc(upgrade.description());
584            }
585
586            // Mark the database as upgraded. Zebra won't repeat the upgrade anymore once the
587            // database is marked, so the upgrade MUST be complete at this point.
588            info!(
589                newer_running_version = ?upgrade.version(),
590                "Zebra automatically upgraded the database format"
591            );
592            Self::mark_as_upgraded_to(db, &upgrade.version());
593        }
594
595        Ok(())
596    }
597
598    /// Run quick checks that the current database format is valid.
599    #[allow(clippy::vec_init_then_push)]
600    pub fn format_validity_checks_quick(db: &ZebraDb) -> Result<(), String> {
601        let timer = CodeTimer::start();
602        let mut results = Vec::new();
603
604        // Check the entire format before returning any errors.
605        results.push(db.check_max_on_disk_tip_height());
606
607        // This check can be run before the upgrade, but the upgrade code is finished, so we don't
608        // run it early any more. (If future code changes accidentally make it depend on the
609        // upgrade, they would accidentally break compatibility with older Zebra cached states.)
610        results.push(add_subtrees::subtree_format_calculation_pre_checks(db));
611
612        results.push(cache_genesis_roots::quick_check(db));
613        results.push(fix_tree_key_type::quick_check(db));
614
615        // The work is done in the functions we just called.
616        timer.finish_desc("format_validity_checks_quick()");
617
618        if results.iter().any(Result::is_err) {
619            let err = Err(format!("invalid quick check: {results:?}"));
620            error!(?err);
621            return err;
622        }
623
624        Ok(())
625    }
626
627    /// Run detailed checks that the current database format is valid.
628    #[allow(clippy::vec_init_then_push)]
629    pub fn format_validity_checks_detailed(
630        db: &ZebraDb,
631        cancel_receiver: &Receiver<CancelFormatChange>,
632    ) -> Result<Result<(), String>, CancelFormatChange> {
633        let timer = CodeTimer::start();
634        let mut results = Vec::new();
635
636        // Check the entire format before returning any errors.
637        //
638        // Do the quick checks first, so we don't have to do this in every detailed check.
639        results.push(Self::format_validity_checks_quick(db));
640
641        for upgrade in format_upgrades(None) {
642            results.push(upgrade.validate(db, cancel_receiver)?);
643        }
644
645        // The work is done in the functions we just called.
646        timer.finish_desc("format_validity_checks_detailed()");
647
648        if results.iter().any(Result::is_err) {
649            let err = Err(format!("invalid detailed check: {results:?}"));
650            error!(?err);
651            return Ok(err);
652        }
653
654        Ok(Ok(()))
655    }
656
657    /// Mark a newly created database with the current format version.
658    ///
659    /// This should be called when a newly created database is opened.
660    ///
661    /// # Concurrency
662    ///
663    /// The version must only be updated while RocksDB is holding the database
664    /// directory lock. This prevents multiple Zebra instances corrupting the version
665    /// file.
666    ///
667    /// # Panics
668    ///
669    /// If the format should not have been upgraded, because the database is not newly created.
670    fn mark_as_newly_created(db: &ZebraDb) {
671        let running_version = db.format_version_in_code();
672        let disk_version = db
673            .format_version_on_disk()
674            .expect("unable to read database format version file path");
675
676        let default_new_version = Some(Version::new(running_version.major, 0, 0));
677
678        // The database version isn't empty any more, because we've created the RocksDB database
679        // and acquired its lock. (If it is empty, we have a database locking bug.)
680        assert_eq!(
681            disk_version, default_new_version,
682            "can't overwrite the format version in an existing database:\n\
683             disk: {disk_version:?}\n\
684             running: {running_version}"
685        );
686
687        db.update_format_version_on_disk(&running_version)
688            .expect("unable to write database format version file to disk");
689
690        info!(
691            %running_version,
692            disk_version = %disk_version.map_or("None".to_string(), |version| version.to_string()),
693            "marked database format as newly created"
694        );
695    }
696
697    /// Mark the database as upgraded to `format_upgrade_version`.
698    ///
699    /// This should be called when an older database is opened by an older Zebra version,
700    /// after each version upgrade is complete.
701    ///
702    /// # Concurrency
703    ///
704    /// The version must only be updated while RocksDB is holding the database
705    /// directory lock. This prevents multiple Zebra instances corrupting the version
706    /// file.
707    ///
708    /// # Panics
709    ///
710    /// If the format should not have been upgraded, because the running version is:
711    /// - older than the disk version (that's a downgrade)
712    /// - the same as to the disk version (no upgrade needed)
713    ///
714    /// If the format should not have been upgraded, because the format upgrade version is:
715    /// - older or the same as the disk version
716    ///   (multiple upgrades to the same version are not allowed)
717    /// - greater than the running version (that's a logic bug)
718    fn mark_as_upgraded_to(db: &ZebraDb, format_upgrade_version: &Version) {
719        let running_version = db.format_version_in_code();
720        let disk_version = db
721            .format_version_on_disk()
722            .expect("unable to read database format version file")
723            .expect("tried to upgrade a newly created database");
724
725        assert!(
726            running_version > disk_version,
727            "can't upgrade a database that is being opened by an older or the same Zebra version:\n\
728             disk: {disk_version}\n\
729             upgrade: {format_upgrade_version}\n\
730             running: {running_version}"
731        );
732
733        assert!(
734            format_upgrade_version > &disk_version,
735            "can't upgrade a database that has already been upgraded, or is newer:\n\
736             disk: {disk_version}\n\
737             upgrade: {format_upgrade_version}\n\
738             running: {running_version}"
739        );
740
741        assert!(
742            format_upgrade_version <= &running_version,
743            "can't upgrade to a newer version than the running Zebra version:\n\
744             disk: {disk_version}\n\
745             upgrade: {format_upgrade_version}\n\
746             running: {running_version}"
747        );
748
749        db.update_format_version_on_disk(format_upgrade_version)
750            .expect("unable to write database format version file to disk");
751
752        info!(
753            %running_version,
754            %disk_version,
755            // wait_for_state_version_upgrade() needs this to be the last field,
756            // so the regex matches correctly
757            %format_upgrade_version,
758            "marked database format as upgraded"
759        );
760    }
761
762    /// Mark the database as downgraded to the running database version.
763    /// This should be called after a newer database is opened by an older Zebra version.
764    ///
765    /// # Concurrency
766    ///
767    /// The version must only be updated while RocksDB is holding the database
768    /// directory lock. This prevents multiple Zebra instances corrupting the version
769    /// file.
770    ///
771    /// # Panics
772    ///
773    /// If the format should have been upgraded, because the running version is newer.
774    /// If the state is newly created, because the running version should be the same.
775    ///
776    /// Multiple downgrades are allowed, because they all downgrade to the same running version.
777    fn mark_as_downgraded(db: &ZebraDb) {
778        let running_version = db.format_version_in_code();
779        let disk_version = db
780            .format_version_on_disk()
781            .expect("unable to read database format version file")
782            .expect("can't downgrade a newly created database");
783
784        assert!(
785            disk_version >= running_version,
786            "can't downgrade a database that is being opened by a newer Zebra version:\n\
787             disk: {disk_version}\n\
788             running: {running_version}"
789        );
790
791        db.update_format_version_on_disk(&running_version)
792            .expect("unable to write database format version file to disk");
793
794        info!(
795            %running_version,
796            %disk_version,
797            "marked database format as downgraded"
798        );
799    }
800}
801
802impl DbFormatChangeThreadHandle {
803    /// Cancel the running format change thread, if this is the last handle.
804    /// Returns true if it was actually cancelled.
805    pub fn cancel_if_needed(&self) -> bool {
806        // # Correctness
807        //
808        // Checking the strong count has a race condition, because two handles can be dropped at
809        // the same time.
810        //
811        // If cancelling the thread is important, the owner of the handle must call force_cancel().
812        if let Some(update_task) = self.update_task.as_ref() {
813            if Arc::strong_count(update_task) <= 1 {
814                self.force_cancel();
815                return true;
816            }
817        }
818
819        false
820    }
821
822    /// Force the running format change thread to cancel, even if there are other handles.
823    pub fn force_cancel(&self) {
824        // There's nothing we can do about errors here.
825        // If the channel is disconnected, the task has exited.
826        // If it's full, it's already been cancelled.
827        let _ = self.cancel_handle.try_send(CancelFormatChange);
828    }
829
830    /// Check for panics in the code running in the spawned thread.
831    /// If the thread exited with a panic, resume that panic.
832    ///
833    /// This method should be called regularly, so that panics are detected as soon as possible.
834    pub fn check_for_panics(&mut self) {
835        self.update_task.panic_if_task_has_panicked();
836    }
837
838    /// Wait for the spawned thread to finish. If it exited with a panic, resume that panic.
839    ///
840    /// Exits early if the thread has other outstanding handles.
841    ///
842    /// This method should be called during shutdown.
843    pub fn wait_for_panics(&mut self) {
844        self.update_task.wait_for_panics();
845    }
846}
847
848impl Drop for DbFormatChangeThreadHandle {
849    fn drop(&mut self) {
850        // Only cancel the format change if the state service is shutting down.
851        if self.cancel_if_needed() {
852            self.wait_for_panics();
853        } else {
854            self.check_for_panics();
855        }
856    }
857}
858
859#[test]
860fn format_upgrades_are_in_version_order() {
861    let mut last_version = Version::new(0, 0, 0);
862    for upgrade in format_upgrades(None) {
863        assert!(upgrade.version() > last_version);
864        last_version = upgrade.version();
865    }
866}