diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 8ed0c4563d..1092bf45b6 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2683,7 +2683,7 @@ impl BeaconChain { // is so we don't have to think about lock ordering with respect to the fork choice lock. // There are a bunch of places where we lock both fork choice and the pubkey cache and it // would be difficult to check that they all lock fork choice first. - let mut kv_store_ops = self + let mut ops = self .validator_pubkey_cache .write() .import_new_pubkeys(&state)?; @@ -2791,9 +2791,14 @@ impl BeaconChain { // ---------------------------- BLOCK PROBABLY ATTESTABLE ---------------------------------- // Most blocks are now capable of being attested to thanks to the `early_attester_cache` // cache above. Resume non-essential processing. + // + // It is important NOT to return errors here before the database commit, because the block + // has already been added to fork choice and the database would be left in an inconsistent + // state if we returned early without committing. In other words, an error here would + // corrupt the node's database permanently. // ----------------------------------------------------------------------------------------- - self.import_block_update_shuffling_cache(block_root, &mut state)?; + self.import_block_update_shuffling_cache(block_root, &mut state); self.import_block_observe_attestations( block, &state, @@ -2816,17 +2821,16 @@ impl BeaconChain { // If the write fails, revert fork choice to the version from disk, else we can // end up with blocks in fork choice that are missing from disk. // See https://github.com/sigp/lighthouse/issues/2028 - let mut ops: Vec<_> = confirmed_state_roots - .into_iter() - .map(StoreOp::DeleteStateTemporaryFlag) - .collect(); + ops.extend( + confirmed_state_roots + .into_iter() + .map(StoreOp::DeleteStateTemporaryFlag), + ); ops.push(StoreOp::PutBlock(block_root, signed_block.clone())); ops.push(StoreOp::PutState(block.state_root(), &state)); let txn_lock = self.store.hot_db.begin_rw_transaction(); - kv_store_ops.extend(self.store.convert_to_kv_batch(ops)?); - - if let Err(e) = self.store.hot_db.do_atomically(kv_store_ops) { + if let Err(e) = self.store.do_atomically(ops) { error!( self.log, "Database write failed!"; @@ -3232,13 +3236,27 @@ impl BeaconChain { } } + // For the current and next epoch of this state, ensure we have the shuffling from this + // block in our cache. fn import_block_update_shuffling_cache( &self, block_root: Hash256, state: &mut BeaconState, + ) { + if let Err(e) = self.import_block_update_shuffling_cache_fallible(block_root, state) { + warn!( + self.log, + "Failed to prime shuffling cache"; + "error" => ?e + ); + } + } + + fn import_block_update_shuffling_cache_fallible( + &self, + block_root: Hash256, + state: &mut BeaconState, ) -> Result<(), BlockError> { - // For the current and next epoch of this state, ensure we have the shuffling from this - // block in our cache. for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] { let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?; diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 68682199d4..bf27731d22 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -731,13 +731,12 @@ where let validator_pubkey_cache = store.immutable_validators.clone(); // Update pubkey cache on first start in case we have started from genesis. - let kv_store_ops = validator_pubkey_cache + let store_ops = validator_pubkey_cache .write() .import_new_pubkeys(&head_snapshot.beacon_state) .map_err(|e| format!("error initializing pubkey cache: {e:?}"))?; store - .hot_db - .do_atomically(kv_store_ops) + .do_atomically(store_ops) .map_err(|e| format!("error writing validator store: {e:?}"))?; let migrator_config = self.store_migrator_config.unwrap_or_default(); diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index f06e3747d0..c1b3eb1ac8 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -809,11 +809,11 @@ impl, Cold: ItemStore> HotColdDB key_value_batch.push(KeyValueStoreOp::DeleteKey(diff_key)); } } - StoreOp::KeyValueOp(kv_op) => key_value_batch.push(kv_op), StoreOp::DeleteExecutionPayload(block_root) => { let key = get_key_for_col(DBColumn::ExecPayload.into(), block_root.as_bytes()); key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); } + StoreOp::KeyValueOp(kv_op) => key_value_batch.push(kv_op), } } Ok(key_value_batch) @@ -845,9 +845,9 @@ impl, Cold: ItemStore> HotColdDB self.state_cache.lock().delete_state(state_root) } - StoreOp::KeyValueOp(_) => (), - StoreOp::DeleteExecutionPayload(_) => (), + + StoreOp::KeyValueOp(_) => (), } } diff --git a/beacon_node/store/src/validator_pubkey_cache.rs b/beacon_node/store/src/validator_pubkey_cache.rs index daf6ea78c4..31a5c6cd8f 100644 --- a/beacon_node/store/src/validator_pubkey_cache.rs +++ b/beacon_node/store/src/validator_pubkey_cache.rs @@ -1,4 +1,4 @@ -use crate::{DBColumn, Error, HotColdDB, ItemStore, KeyValueStoreOp, StoreItem}; +use crate::{DBColumn, Error, HotColdDB, ItemStore, StoreItem, StoreOp}; use bls::PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN; use smallvec::SmallVec; use ssz::{Decode, Encode}; @@ -55,7 +55,7 @@ impl, Cold: ItemStore> ValidatorPubkeyCache, Cold: ItemStore> ValidatorPubkeyCache, - ) -> Result, Error> { + ) -> Result>, Error> { if state.validators().len() > self.validators.len() { self.import( state @@ -110,7 +110,7 @@ impl, Cold: ItemStore> ValidatorPubkeyCache(&mut self, validator_keys: I) -> Result, Error> + fn import(&mut self, validator_keys: I) -> Result>, Error> where I: Iterator> + ExactSizeIterator, { @@ -134,10 +134,10 @@ impl, Cold: ItemStore> ValidatorPubkeyCache