diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 622ba407ad..8603f6c2de 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2878,7 +2878,7 @@ impl BeaconChain { // is so we don't have to think about lock ordering with respect to the fork choice lock. // There are a bunch of places where we lock both fork choice and the pubkey cache and it // would be difficult to check that they all lock fork choice first. - let mut kv_store_ops = self + let mut ops = self .validator_pubkey_cache .try_write_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT) .ok_or(Error::ValidatorPubkeyCacheLockTimeout)? @@ -2981,9 +2981,14 @@ impl BeaconChain { // ---------------------------- BLOCK PROBABLY ATTESTABLE ---------------------------------- // Most blocks are now capable of being attested to thanks to the `early_attester_cache` // cache above. Resume non-essential processing. + // + // It is important NOT to return errors here before the database commit, because the block + // has already been added to fork choice and the database would be left in an inconsistent + // state if we returned early without committing. In other words, an error here would + // corrupt the node's database permanently. // ----------------------------------------------------------------------------------------- - self.import_block_update_shuffling_cache(block_root, &mut state)?; + self.import_block_update_shuffling_cache(block_root, &mut state); self.import_block_observe_attestations( block, &state, @@ -3008,10 +3013,11 @@ impl BeaconChain { // See https://github.com/sigp/lighthouse/issues/2028 let (signed_block, blobs) = signed_block.deconstruct(); let block = signed_block.message(); - let mut ops: Vec<_> = confirmed_state_roots - .into_iter() - .map(StoreOp::DeleteStateTemporaryFlag) - .collect(); + ops.extend( + confirmed_state_roots + .into_iter() + .map(StoreOp::DeleteStateTemporaryFlag), + ); ops.push(StoreOp::PutBlock(block_root, signed_block.clone())); ops.push(StoreOp::PutState(block.state_root(), &state)); @@ -3036,9 +3042,7 @@ impl BeaconChain { let txn_lock = self.store.hot_db.begin_rw_transaction(); - kv_store_ops.extend(self.store.convert_to_kv_batch(ops)?); - - if let Err(e) = self.store.hot_db.do_atomically(kv_store_ops) { + if let Err(e) = self.store.do_atomically(ops) { error!( self.log, "Database write failed!"; @@ -3467,13 +3471,27 @@ impl BeaconChain { } } + // For the current and next epoch of this state, ensure we have the shuffling from this + // block in our cache. fn import_block_update_shuffling_cache( &self, block_root: Hash256, state: &mut BeaconState, + ) { + if let Err(e) = self.import_block_update_shuffling_cache_fallible(block_root, state) { + warn!( + self.log, + "Failed to prime shuffling cache"; + "error" => ?e + ); + } + } + + fn import_block_update_shuffling_cache_fallible( + &self, + block_root: Hash256, + state: &mut BeaconState, ) -> Result<(), BlockError> { - // For the current and next epoch of this state, ensure we have the shuffling from this - // block in our cache. for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] { let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?; diff --git a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs index 26aea2d272..79910df292 100644 --- a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs +++ b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs @@ -4,7 +4,7 @@ use ssz::{Decode, Encode}; use std::collections::HashMap; use std::convert::TryInto; use std::marker::PhantomData; -use store::{DBColumn, Error as StoreError, KeyValueStore, KeyValueStoreOp, StoreItem}; +use store::{DBColumn, Error as StoreError, StoreItem, StoreOp}; use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes}; /// Provides a mapping of `validator_index -> validator_publickey`. @@ -38,7 +38,7 @@ impl ValidatorPubkeyCache { }; let store_ops = cache.import_new_pubkeys(state)?; - store.hot_db.do_atomically(store_ops)?; + store.do_atomically(store_ops)?; Ok(cache) } @@ -79,7 +79,7 @@ impl ValidatorPubkeyCache { pub fn import_new_pubkeys( &mut self, state: &BeaconState, - ) -> Result, BeaconChainError> { + ) -> Result>, BeaconChainError> { if state.validators().len() > self.pubkeys.len() { self.import( state.validators()[self.pubkeys.len()..] @@ -92,7 +92,10 @@ impl ValidatorPubkeyCache { } /// Adds zero or more validators to `self`. - fn import(&mut self, validator_keys: I) -> Result, BeaconChainError> + fn import( + &mut self, + validator_keys: I, + ) -> Result>, BeaconChainError> where I: Iterator + ExactSizeIterator, { @@ -112,7 +115,9 @@ impl ValidatorPubkeyCache { // It will be committed atomically when the block that introduced it is written to disk. // Notably it is NOT written while the write lock on the cache is held. // See: https://github.com/sigp/lighthouse/issues/2327 - store_ops.push(DatabasePubkey(pubkey).as_kv_store_op(DatabasePubkey::key_for_index(i))); + store_ops.push(StoreOp::KeyValueOp( + DatabasePubkey(pubkey).as_kv_store_op(DatabasePubkey::key_for_index(i)), + )); self.pubkeys.push( (&pubkey) @@ -294,7 +299,7 @@ mod test { let ops = cache .import_new_pubkeys(&state) .expect("should import pubkeys"); - store.hot_db.do_atomically(ops).unwrap(); + store.do_atomically(ops).unwrap(); check_cache_get(&cache, &keypairs[..]); drop(cache); diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index c7a3b440c6..f508734d4e 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -824,8 +824,8 @@ impl, Cold: ItemStore> HotColdDB key_value_batch.push(KeyValueStoreOp::PutKeyValue(db_key, [].into())); } - StoreOp::PutRawKVStoreOp(kv_store_op) => { - key_value_batch.push(kv_store_op); + StoreOp::KeyValueOp(kv_op) => { + key_value_batch.push(kv_op); } } } @@ -870,7 +870,7 @@ impl, Cold: ItemStore> HotColdDB StoreOp::PutOrphanedBlobsKey(_) => (), - StoreOp::PutRawKVStoreOp(_) => (), + StoreOp::KeyValueOp(_) => (), } } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 18e3b3ca34..1d7e92b80a 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -166,7 +166,7 @@ pub enum StoreOp<'a, E: EthSpec> { DeleteBlobs(Hash256), DeleteState(Hash256, Option), DeleteExecutionPayload(Hash256), - PutRawKVStoreOp(KeyValueStoreOp), + KeyValueOp(KeyValueStoreOp), } /// A unique column identifier.