mirror of
https://github.com/sigp/lighthouse.git
synced 2026-07-05 05:44:30 +00:00
Fix regression in DB write atomicity (#3931)
This commit is contained in:
@@ -2683,7 +2683,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
// is so we don't have to think about lock ordering with respect to the fork choice lock.
|
// is so we don't have to think about lock ordering with respect to the fork choice lock.
|
||||||
// There are a bunch of places where we lock both fork choice and the pubkey cache and it
|
// There are a bunch of places where we lock both fork choice and the pubkey cache and it
|
||||||
// would be difficult to check that they all lock fork choice first.
|
// would be difficult to check that they all lock fork choice first.
|
||||||
let mut kv_store_ops = self
|
let mut ops = self
|
||||||
.validator_pubkey_cache
|
.validator_pubkey_cache
|
||||||
.write()
|
.write()
|
||||||
.import_new_pubkeys(&state)?;
|
.import_new_pubkeys(&state)?;
|
||||||
@@ -2791,9 +2791,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
// ---------------------------- BLOCK PROBABLY ATTESTABLE ----------------------------------
|
// ---------------------------- BLOCK PROBABLY ATTESTABLE ----------------------------------
|
||||||
// Most blocks are now capable of being attested to thanks to the `early_attester_cache`
|
// Most blocks are now capable of being attested to thanks to the `early_attester_cache`
|
||||||
// cache above. Resume non-essential processing.
|
// cache above. Resume non-essential processing.
|
||||||
|
//
|
||||||
|
// It is important NOT to return errors here before the database commit, because the block
|
||||||
|
// has already been added to fork choice and the database would be left in an inconsistent
|
||||||
|
// state if we returned early without committing. In other words, an error here would
|
||||||
|
// corrupt the node's database permanently.
|
||||||
// -----------------------------------------------------------------------------------------
|
// -----------------------------------------------------------------------------------------
|
||||||
|
|
||||||
self.import_block_update_shuffling_cache(block_root, &mut state)?;
|
self.import_block_update_shuffling_cache(block_root, &mut state);
|
||||||
self.import_block_observe_attestations(
|
self.import_block_observe_attestations(
|
||||||
block,
|
block,
|
||||||
&state,
|
&state,
|
||||||
@@ -2816,17 +2821,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
// If the write fails, revert fork choice to the version from disk, else we can
|
// If the write fails, revert fork choice to the version from disk, else we can
|
||||||
// end up with blocks in fork choice that are missing from disk.
|
// end up with blocks in fork choice that are missing from disk.
|
||||||
// See https://github.com/sigp/lighthouse/issues/2028
|
// See https://github.com/sigp/lighthouse/issues/2028
|
||||||
let mut ops: Vec<_> = confirmed_state_roots
|
ops.extend(
|
||||||
|
confirmed_state_roots
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(StoreOp::DeleteStateTemporaryFlag)
|
.map(StoreOp::DeleteStateTemporaryFlag),
|
||||||
.collect();
|
);
|
||||||
ops.push(StoreOp::PutBlock(block_root, signed_block.clone()));
|
ops.push(StoreOp::PutBlock(block_root, signed_block.clone()));
|
||||||
ops.push(StoreOp::PutState(block.state_root(), &state));
|
ops.push(StoreOp::PutState(block.state_root(), &state));
|
||||||
let txn_lock = self.store.hot_db.begin_rw_transaction();
|
let txn_lock = self.store.hot_db.begin_rw_transaction();
|
||||||
|
|
||||||
kv_store_ops.extend(self.store.convert_to_kv_batch(ops)?);
|
if let Err(e) = self.store.do_atomically(ops) {
|
||||||
|
|
||||||
if let Err(e) = self.store.hot_db.do_atomically(kv_store_ops) {
|
|
||||||
error!(
|
error!(
|
||||||
self.log,
|
self.log,
|
||||||
"Database write failed!";
|
"Database write failed!";
|
||||||
@@ -3232,13 +3236,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// For the current and next epoch of this state, ensure we have the shuffling from this
|
||||||
|
// block in our cache.
|
||||||
fn import_block_update_shuffling_cache(
|
fn import_block_update_shuffling_cache(
|
||||||
&self,
|
&self,
|
||||||
block_root: Hash256,
|
block_root: Hash256,
|
||||||
state: &mut BeaconState<T::EthSpec>,
|
state: &mut BeaconState<T::EthSpec>,
|
||||||
|
) {
|
||||||
|
if let Err(e) = self.import_block_update_shuffling_cache_fallible(block_root, state) {
|
||||||
|
warn!(
|
||||||
|
self.log,
|
||||||
|
"Failed to prime shuffling cache";
|
||||||
|
"error" => ?e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn import_block_update_shuffling_cache_fallible(
|
||||||
|
&self,
|
||||||
|
block_root: Hash256,
|
||||||
|
state: &mut BeaconState<T::EthSpec>,
|
||||||
) -> Result<(), BlockError<T::EthSpec>> {
|
) -> Result<(), BlockError<T::EthSpec>> {
|
||||||
// For the current and next epoch of this state, ensure we have the shuffling from this
|
|
||||||
// block in our cache.
|
|
||||||
for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] {
|
for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] {
|
||||||
let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?;
|
let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?;
|
||||||
|
|
||||||
|
|||||||
@@ -731,13 +731,12 @@ where
|
|||||||
let validator_pubkey_cache = store.immutable_validators.clone();
|
let validator_pubkey_cache = store.immutable_validators.clone();
|
||||||
|
|
||||||
// Update pubkey cache on first start in case we have started from genesis.
|
// Update pubkey cache on first start in case we have started from genesis.
|
||||||
let kv_store_ops = validator_pubkey_cache
|
let store_ops = validator_pubkey_cache
|
||||||
.write()
|
.write()
|
||||||
.import_new_pubkeys(&head_snapshot.beacon_state)
|
.import_new_pubkeys(&head_snapshot.beacon_state)
|
||||||
.map_err(|e| format!("error initializing pubkey cache: {e:?}"))?;
|
.map_err(|e| format!("error initializing pubkey cache: {e:?}"))?;
|
||||||
store
|
store
|
||||||
.hot_db
|
.do_atomically(store_ops)
|
||||||
.do_atomically(kv_store_ops)
|
|
||||||
.map_err(|e| format!("error writing validator store: {e:?}"))?;
|
.map_err(|e| format!("error writing validator store: {e:?}"))?;
|
||||||
|
|
||||||
let migrator_config = self.store_migrator_config.unwrap_or_default();
|
let migrator_config = self.store_migrator_config.unwrap_or_default();
|
||||||
|
|||||||
@@ -809,11 +809,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
key_value_batch.push(KeyValueStoreOp::DeleteKey(diff_key));
|
key_value_batch.push(KeyValueStoreOp::DeleteKey(diff_key));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
StoreOp::KeyValueOp(kv_op) => key_value_batch.push(kv_op),
|
|
||||||
StoreOp::DeleteExecutionPayload(block_root) => {
|
StoreOp::DeleteExecutionPayload(block_root) => {
|
||||||
let key = get_key_for_col(DBColumn::ExecPayload.into(), block_root.as_bytes());
|
let key = get_key_for_col(DBColumn::ExecPayload.into(), block_root.as_bytes());
|
||||||
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
|
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
|
||||||
}
|
}
|
||||||
|
StoreOp::KeyValueOp(kv_op) => key_value_batch.push(kv_op),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(key_value_batch)
|
Ok(key_value_batch)
|
||||||
@@ -845,9 +845,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
self.state_cache.lock().delete_state(state_root)
|
self.state_cache.lock().delete_state(state_root)
|
||||||
}
|
}
|
||||||
|
|
||||||
StoreOp::KeyValueOp(_) => (),
|
|
||||||
|
|
||||||
StoreOp::DeleteExecutionPayload(_) => (),
|
StoreOp::DeleteExecutionPayload(_) => (),
|
||||||
|
|
||||||
|
StoreOp::KeyValueOp(_) => (),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
use crate::{DBColumn, Error, HotColdDB, ItemStore, KeyValueStoreOp, StoreItem};
|
use crate::{DBColumn, Error, HotColdDB, ItemStore, StoreItem, StoreOp};
|
||||||
use bls::PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN;
|
use bls::PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN;
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
use ssz::{Decode, Encode};
|
use ssz::{Decode, Encode};
|
||||||
@@ -55,7 +55,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> ValidatorPubkeyCache<E,
|
|||||||
};
|
};
|
||||||
|
|
||||||
let store_ops = cache.import_new_pubkeys(state)?;
|
let store_ops = cache.import_new_pubkeys(state)?;
|
||||||
store.hot_db.do_atomically(store_ops)?;
|
store.do_atomically(store_ops)?;
|
||||||
|
|
||||||
Ok(cache)
|
Ok(cache)
|
||||||
}
|
}
|
||||||
@@ -96,7 +96,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> ValidatorPubkeyCache<E,
|
|||||||
pub fn import_new_pubkeys(
|
pub fn import_new_pubkeys(
|
||||||
&mut self,
|
&mut self,
|
||||||
state: &BeaconState<E>,
|
state: &BeaconState<E>,
|
||||||
) -> Result<Vec<KeyValueStoreOp>, Error> {
|
) -> Result<Vec<StoreOp<'static, E>>, Error> {
|
||||||
if state.validators().len() > self.validators.len() {
|
if state.validators().len() > self.validators.len() {
|
||||||
self.import(
|
self.import(
|
||||||
state
|
state
|
||||||
@@ -110,7 +110,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> ValidatorPubkeyCache<E,
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Adds zero or more validators to `self`.
|
/// Adds zero or more validators to `self`.
|
||||||
fn import<I>(&mut self, validator_keys: I) -> Result<Vec<KeyValueStoreOp>, Error>
|
fn import<I>(&mut self, validator_keys: I) -> Result<Vec<StoreOp<'static, E>>, Error>
|
||||||
where
|
where
|
||||||
I: Iterator<Item = Arc<ValidatorImmutable>> + ExactSizeIterator,
|
I: Iterator<Item = Arc<ValidatorImmutable>> + ExactSizeIterator,
|
||||||
{
|
{
|
||||||
@@ -134,10 +134,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> ValidatorPubkeyCache<E,
|
|||||||
// It will be committed atomically when the block that introduced it is written to disk.
|
// It will be committed atomically when the block that introduced it is written to disk.
|
||||||
// Notably it is NOT written while the write lock on the cache is held.
|
// Notably it is NOT written while the write lock on the cache is held.
|
||||||
// See: https://github.com/sigp/lighthouse/issues/2327
|
// See: https://github.com/sigp/lighthouse/issues/2327
|
||||||
store_ops.push(
|
store_ops.push(StoreOp::KeyValueOp(
|
||||||
DatabaseValidator::from_immutable_validator(&pubkey, &validator)
|
DatabaseValidator::from_immutable_validator(&pubkey, &validator)
|
||||||
.as_kv_store_op(DatabaseValidator::key_for_index(i))?,
|
.as_kv_store_op(DatabaseValidator::key_for_index(i))?,
|
||||||
);
|
));
|
||||||
|
|
||||||
self.pubkeys.push(pubkey);
|
self.pubkeys.push(pubkey);
|
||||||
self.indices.insert(validator.pubkey, i);
|
self.indices.insert(validator.pubkey, i);
|
||||||
@@ -344,7 +344,7 @@ mod test {
|
|||||||
let ops = cache
|
let ops = cache
|
||||||
.import_new_pubkeys(&state)
|
.import_new_pubkeys(&state)
|
||||||
.expect("should import pubkeys");
|
.expect("should import pubkeys");
|
||||||
store.hot_db.do_atomically(ops).unwrap();
|
store.do_atomically(ops).unwrap();
|
||||||
check_cache_get(&cache, &keypairs[..]);
|
check_cache_get(&cache, &keypairs[..]);
|
||||||
drop(cache);
|
drop(cache);
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user