From ed1fc7cca677b533dd8bebb7ea7c4dd1d4196d1a Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 5 Oct 2021 03:53:17 +0000 Subject: [PATCH] Fix I/O atomicity issues with checkpoint sync (#2671) ## Issue Addressed This PR addresses an issue found by @YorickDowne during testing of v2.0.0-rc.0. Due to a lack of atomic database writes on checkpoint sync start-up, it was possible for the database to get into an inconsistent state from which it couldn't recover without `--purge-db`. The core of the issue was that the store's anchor info was being stored _before_ the `PersistedBeaconChain`. If a crash occured so that anchor info was stored but _not_ the `PersistedBeaconChain`, then on restart Lighthouse would think the database was unitialized and attempt to compare-and-swap a `None` value, but would actually find the stale info from the previous run. ## Proposed Changes The issue is fixed by writing the anchor info, the split point, and the `PersistedBeaconChain` atomically on start-up. Some type-hinting ugliness was required, which could possibly be cleaned up in future refactors. --- beacon_node/beacon_chain/src/beacon_chain.rs | 27 ++++++++-- beacon_node/beacon_chain/src/builder.rs | 54 ++++++++++++++----- .../beacon_chain/src/historical_blocks.rs | 2 +- beacon_node/store/src/hot_cold_store.rs | 39 +++++++++----- beacon_node/store/src/lib.rs | 1 + beacon_node/store/src/partial_beacon_state.rs | 1 - beacon_node/store/src/reconstruct.rs | 7 ++- 7 files changed, 95 insertions(+), 36 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index eb9e179aad..47cabbbfe4 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -339,24 +339,41 @@ impl BeaconChain { Ok(()) } - /// Return a `PersistedBeaconChain` representing the current head. - pub fn make_persisted_head(&self) -> PersistedBeaconChain { + /// Return a `PersistedBeaconChain` without reference to a `BeaconChain`. + pub fn make_persisted_head( + genesis_block_root: Hash256, + head_tracker: &HeadTracker, + ) -> PersistedBeaconChain { PersistedBeaconChain { _canonical_head_block_root: DUMMY_CANONICAL_HEAD_BLOCK_ROOT, - genesis_block_root: self.genesis_block_root, - ssz_head_tracker: self.head_tracker.to_ssz_container(), + genesis_block_root, + ssz_head_tracker: head_tracker.to_ssz_container(), } } /// Return a database operation for writing the beacon chain head to disk. pub fn persist_head_in_batch(&self) -> KeyValueStoreOp { - self.make_persisted_head() + Self::persist_head_in_batch_standalone(self.genesis_block_root, &self.head_tracker) + } + + pub fn persist_head_in_batch_standalone( + genesis_block_root: Hash256, + head_tracker: &HeadTracker, + ) -> KeyValueStoreOp { + Self::make_persisted_head(genesis_block_root, head_tracker) .as_kv_store_op(BEACON_CHAIN_DB_KEY) } /// Return a database operation for writing fork choice to disk. pub fn persist_fork_choice_in_batch(&self) -> KeyValueStoreOp { let fork_choice = self.fork_choice.read(); + Self::persist_fork_choice_in_batch_standalone(&fork_choice) + } + + /// Return a database operation for writing fork choice to disk. + pub fn persist_fork_choice_in_batch_standalone( + fork_choice: &BeaconForkChoice, + ) -> KeyValueStoreOp { let persisted_fork_choice = PersistedForkChoice { fork_choice: fork_choice.to_persisted(), fork_choice_store: fork_choice.fc_store().to_persisted(), diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 3d718df239..d96ca70829 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -25,7 +25,7 @@ use slot_clock::{SlotClock, TestingSlotClock}; use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; -use store::{Error as StoreError, HotColdDB, ItemStore}; +use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp}; use task_executor::ShutdownReason; use types::{ BeaconBlock, BeaconState, ChainSpec, Checkpoint, EthSpec, Graffiti, Hash256, PublicKeyBytes, @@ -87,6 +87,9 @@ pub struct BeaconChainBuilder { graffiti: Graffiti, slasher: Option>>, validator_monitor: Option>, + // Pending I/O batch that is constructed during building and should be executed atomically + // alongside `PersistedBeaconChain` storage when `BeaconChainBuilder::build` is called. + pending_io_batch: Vec, } impl @@ -124,6 +127,7 @@ where graffiti: Graffiti::default(), slasher: None, validator_monitor: None, + pending_io_batch: vec![], } } @@ -416,13 +420,11 @@ where // Set the store's split point *before* storing genesis so that genesis is stored // immediately in the freezer DB. store.set_split(weak_subj_slot, weak_subj_state_root); - store - .store_split() - .map_err(|e| format!("Error storing DB split point: {:?}", e))?; - let (_, updated_builder) = self.set_genesis_state(genesis_state)?; self = updated_builder; + // Write the state and block non-atomically, it doesn't matter if they're forgotten + // about on a crash restart. store .put_state(&weak_subj_state_root, &weak_subj_state) .map_err(|e| format!("Failed to store weak subjectivity state: {:?}", e))?; @@ -430,18 +432,22 @@ where .put_block(&weak_subj_block_root, weak_subj_block.clone()) .map_err(|e| format!("Failed to store weak subjectivity block: {:?}", e))?; - // Store anchor info (context for weak subj sync). - store - .init_anchor_info(weak_subj_block.message()) - .map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?; + // Stage the database's metadata fields for atomic storage when `build` is called. + // This prevents the database from restarting in an inconsistent state if the anchor + // info or split point is written before the `PersistedBeaconChain`. + self.pending_io_batch.push(store.store_split_in_batch()); + self.pending_io_batch.push( + store + .init_anchor_info(weak_subj_block.message()) + .map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?, + ); // Store pruning checkpoint to prevent attempting to prune before the anchor state. - store - .store_pruning_checkpoint(Checkpoint { + self.pending_io_batch + .push(store.pruning_checkpoint_store_op(Checkpoint { root: weak_subj_block_root, epoch: weak_subj_state.slot().epoch(TEthSpec::slots_per_epoch()), - }) - .map_err(|e| format!("Failed to write pruning checkpoint: {:?}", e))?; + })); let snapshot = BeaconSnapshot { beacon_block_root: weak_subj_block_root, @@ -542,7 +548,7 @@ where /// configured. #[allow(clippy::type_complexity)] // I think there's nothing to be gained here from a type alias. pub fn build( - self, + mut self, ) -> Result< BeaconChain>, String, @@ -679,6 +685,26 @@ where ); } + // Store the `PersistedBeaconChain` in the database atomically with the metadata so that on + // restart we can correctly detect the presence of an initialized database. + // + // This *must* be stored before constructing the `BeaconChain`, so that its `Drop` instance + // doesn't write a `PersistedBeaconChain` without the rest of the batch. + self.pending_io_batch.push(BeaconChain::< + Witness, + >::persist_head_in_batch_standalone( + genesis_block_root, &head_tracker + )); + self.pending_io_batch.push(BeaconChain::< + Witness, + >::persist_fork_choice_in_batch_standalone( + &fork_choice + )); + store + .hot_db + .do_atomically(self.pending_io_batch) + .map_err(|e| format!("Error writing chain & metadata to disk: {:?}", e))?; + let beacon_chain = BeaconChain { spec: self.spec, config: self.chain_config, diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 63e83ce369..7e75dae53b 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -182,7 +182,7 @@ impl BeaconChain { }; let backfill_complete = new_anchor.block_backfill_complete(); self.store - .compare_and_set_anchor_info(Some(anchor_info), Some(new_anchor))?; + .compare_and_set_anchor_info_with_write(Some(anchor_info), Some(new_anchor))?; // If backfill has completed and the chain is configured to reconstruct historic states, // send a message to the background migrator instructing it to begin reconstruction. diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 39c63507ea..0194544c80 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -261,7 +261,6 @@ impl, Cold: ItemStore> HotColdDB } /// Prepare a signed beacon block for storage in the database. - #[must_use] pub fn block_as_kv_store_op( &self, key: &Hash256, @@ -973,7 +972,7 @@ impl, Cold: ItemStore> HotColdDB } /// Initialise the anchor info for checkpoint sync starting from `block`. - pub fn init_anchor_info(&self, block: BeaconBlockRef<'_, E>) -> Result<(), Error> { + pub fn init_anchor_info(&self, block: BeaconBlockRef<'_, E>) -> Result { let anchor_slot = block.slot(); let slots_per_restore_point = self.config.slots_per_restore_point; @@ -1003,23 +1002,36 @@ impl, Cold: ItemStore> HotColdDB /// Atomically update the anchor info from `prev_value` to `new_value`. /// + /// Return a `KeyValueStoreOp` which should be written to disk, possibly atomically with other + /// values. + /// /// Return an `AnchorInfoConcurrentMutation` error if the `prev_value` provided /// is not correct. pub fn compare_and_set_anchor_info( &self, prev_value: Option, new_value: Option, - ) -> Result<(), Error> { + ) -> Result { let mut anchor_info = self.anchor_info.write(); if *anchor_info == prev_value { - self.store_anchor_info(&new_value)?; + let kv_op = self.store_anchor_info_in_batch(&new_value); *anchor_info = new_value; - Ok(()) + Ok(kv_op) } else { Err(Error::AnchorInfoConcurrentMutation) } } + /// As for `compare_and_set_anchor_info`, but also writes the anchor to disk immediately. + pub fn compare_and_set_anchor_info_with_write( + &self, + prev_value: Option, + new_value: Option, + ) -> Result<(), Error> { + let kv_store_op = self.compare_and_set_anchor_info(prev_value, new_value)?; + self.hot_db.do_atomically(vec![kv_store_op]) + } + /// Load the anchor info from disk, but do not set `self.anchor_info`. fn load_anchor_info(&self) -> Result, Error> { self.hot_db.get(&ANCHOR_INFO_KEY) @@ -1029,13 +1041,15 @@ impl, Cold: ItemStore> HotColdDB /// /// The argument is intended to be `self.anchor_info`, but is passed manually to avoid issues /// with recursive locking. - fn store_anchor_info(&self, anchor_info: &Option) -> Result<(), Error> { + fn store_anchor_info_in_batch(&self, anchor_info: &Option) -> KeyValueStoreOp { if let Some(ref anchor_info) = anchor_info { - self.hot_db.put(&ANCHOR_INFO_KEY, anchor_info)?; + anchor_info.as_kv_store_op(ANCHOR_INFO_KEY) } else { - self.hot_db.delete::(&ANCHOR_INFO_KEY)?; + KeyValueStoreOp::DeleteKey(get_key_for_col( + DBColumn::BeaconMeta.into(), + ANCHOR_INFO_KEY.as_bytes(), + )) } - Ok(()) } /// If an anchor exists, return its `anchor_slot` field. @@ -1103,10 +1117,9 @@ impl, Cold: ItemStore> HotColdDB self.hot_db.get(&SPLIT_KEY) } - /// Store the split point to disk. - pub fn store_split(&self) -> Result<(), Error> { - self.hot_db.put_sync(&SPLIT_KEY, &*self.split.read())?; - Ok(()) + /// Stage the split for storage to disk. + pub fn store_split_in_batch(&self) -> KeyValueStoreOp { + self.split.read_recursive().as_kv_store_op(SPLIT_KEY) } /// Load the state root of a restore point. diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index ca9af67254..c86a01213c 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -81,6 +81,7 @@ pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec { result } +#[must_use] pub enum KeyValueStoreOp { PutKeyValue(Vec, Vec), DeleteKey(Vec), diff --git a/beacon_node/store/src/partial_beacon_state.rs b/beacon_node/store/src/partial_beacon_state.rs index 1ed1e7b8ff..c433323466 100644 --- a/beacon_node/store/src/partial_beacon_state.rs +++ b/beacon_node/store/src/partial_beacon_state.rs @@ -189,7 +189,6 @@ impl PartialBeaconState { } /// Prepare the partial state for storage in the KV database. - #[must_use] pub fn as_kv_store_op(&self, state_root: Hash256) -> KeyValueStoreOp { let db_key = get_key_for_col(DBColumn::BeaconState.into(), state_root.as_bytes()); KeyValueStoreOp::PutKeyValue(db_key, self.as_ssz_bytes()) diff --git a/beacon_node/store/src/reconstruct.rs b/beacon_node/store/src/reconstruct.rs index ff10e642df..a88af95c85 100644 --- a/beacon_node/store/src/reconstruct.rs +++ b/beacon_node/store/src/reconstruct.rs @@ -131,14 +131,17 @@ where }); } - self.compare_and_set_anchor_info(old_anchor, None)?; + self.compare_and_set_anchor_info_with_write(old_anchor, None)?; return Ok(()); } else { // The lower limit has been raised, store it. anchor.state_lower_limit = slot; - self.compare_and_set_anchor_info(old_anchor, Some(anchor.clone()))?; + self.compare_and_set_anchor_info_with_write( + old_anchor, + Some(anchor.clone()), + )?; } } }