Linearise restore points

This commit is contained in:
Michael Sproul
2022-10-20 11:54:02 +11:00
parent fd800ce755
commit 3f71de8c2d
5 changed files with 47 additions and 32 deletions

View File

@@ -30,8 +30,10 @@ pub struct StoreConfig {
pub compact_on_prune: bool, pub compact_on_prune: bool,
/// Whether to prune payloads on initialization and finalization. /// Whether to prune payloads on initialization and finalization.
pub prune_payloads: bool, pub prune_payloads: bool,
/// Whether to store finalized blocks in the freezer database. /// Whether to store finalized blocks compressed and linearised in the freezer database.
pub separate_blocks: bool, pub linear_blocks: bool,
/// Whether to store finalized states compressed and linearised in the freezer database.
pub linear_restore_points: bool,
} }
/// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params. /// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params.
@@ -39,7 +41,8 @@ pub struct StoreConfig {
pub struct OnDiskStoreConfig { pub struct OnDiskStoreConfig {
pub slots_per_restore_point: u64, pub slots_per_restore_point: u64,
// FIXME(sproul): schema migration // FIXME(sproul): schema migration
pub separate_blocks: bool, pub linear_blocks: bool,
pub linear_restore_points: bool,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@@ -60,7 +63,8 @@ impl Default for StoreConfig {
compact_on_init: false, compact_on_init: false,
compact_on_prune: true, compact_on_prune: true,
prune_payloads: true, prune_payloads: true,
separate_blocks: true, linear_blocks: true,
linear_restore_points: true,
} }
} }
} }
@@ -69,7 +73,8 @@ impl StoreConfig {
pub fn as_disk_config(&self) -> OnDiskStoreConfig { pub fn as_disk_config(&self) -> OnDiskStoreConfig {
OnDiskStoreConfig { OnDiskStoreConfig {
slots_per_restore_point: self.slots_per_restore_point, slots_per_restore_point: self.slots_per_restore_point,
separate_blocks: self.separate_blocks, linear_blocks: self.linear_blocks,
linear_restore_points: self.linear_restore_points,
} }
} }

View File

@@ -99,6 +99,7 @@ pub enum HotColdDBError {
}, },
MissingStateToFreeze(Hash256), MissingStateToFreeze(Hash256),
MissingRestorePointHash(u64), MissingRestorePointHash(u64),
MissingRestorePointState(Slot),
MissingRestorePoint(Hash256), MissingRestorePoint(Hash256),
MissingColdStateSummary(Hash256), MissingColdStateSummary(Hash256),
MissingHotStateSummary(Hash256), MissingHotStateSummary(Hash256),
@@ -1221,7 +1222,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
// 1. Convert to PartialBeaconState and store that in the DB. // 1. Convert to PartialBeaconState and store that in the DB.
let partial_state = PartialBeaconState::from_state_forgetful(state); let partial_state = PartialBeaconState::from_state_forgetful(state);
let op = partial_state.as_kv_store_op(*state_root, &self.config)?; let op = partial_state.as_kv_store_op(&self.config)?;
ops.push(op); ops.push(op);
// 2. Store updated vector entries. // 2. Store updated vector entries.
@@ -1232,8 +1233,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
store_updated_vector(RandaoMixes, db, state, &self.spec, ops)?; store_updated_vector(RandaoMixes, db, state, &self.spec, ops)?;
// 3. Store restore point. // 3. Store restore point.
// FIXME(sproul): backwards compat
/*
let restore_point_index = state.slot().as_u64() / self.config.slots_per_restore_point; let restore_point_index = state.slot().as_u64() / self.config.slots_per_restore_point;
self.store_restore_point_hash(restore_point_index, *state_root, ops)?; self.store_restore_point_hash(restore_point_index, *state_root, ops)?;
*/
Ok(()) Ok(())
} }
@@ -1259,8 +1263,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
if slot <= lower_limit || slot >= upper_limit { if slot <= lower_limit || slot >= upper_limit {
if slot % self.config.slots_per_restore_point == 0 { if slot % self.config.slots_per_restore_point == 0 {
let restore_point_idx = slot.as_u64() / self.config.slots_per_restore_point; self.load_restore_point(slot)
self.load_restore_point_by_index(restore_point_idx)
} else { } else {
self.load_cold_intermediate_state(slot) self.load_cold_intermediate_state(slot)
} }
@@ -1270,12 +1273,15 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
} }
} }
/// Load a restore point state by its `state_root`. /// Load a restore point state by its `slot`.
fn load_restore_point(&self, state_root: &Hash256) -> Result<BeaconState<E>, Error> { fn load_restore_point(&self, slot: Slot) -> Result<BeaconState<E>, Error> {
let bytes = self let bytes = self
.cold_db .cold_db
.get_bytes(DBColumn::BeaconState.into(), state_root.as_bytes())? .get_bytes(
.ok_or(HotColdDBError::MissingRestorePoint(*state_root))?; DBColumn::BeaconRestorePointState.into(),
&slot.as_u64().to_be_bytes(),
)?
.ok_or(HotColdDBError::MissingRestorePointState(slot))?;
let mut ssz_bytes = Vec::with_capacity(self.config.estimate_decompressed_size(bytes.len())); let mut ssz_bytes = Vec::with_capacity(self.config.estimate_decompressed_size(bytes.len()));
let mut decoder = Decoder::new(&*bytes).map_err(Error::Compression)?; let mut decoder = Decoder::new(&*bytes).map_err(Error::Compression)?;
@@ -1298,26 +1304,29 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
partial_state.try_into_full_state(immutable_validators) partial_state.try_into_full_state(immutable_validators)
} }
/* FIXME(sproul): backwards compat
/// Load a restore point state by its `restore_point_index`. /// Load a restore point state by its `restore_point_index`.
fn load_restore_point_by_index( fn load_legacy_restore_point_by_index(
&self, &self,
restore_point_index: u64, restore_point_index: u64,
) -> Result<BeaconState<E>, Error> { ) -> Result<BeaconState<E>, Error> {
let state_root = self.load_restore_point_hash(restore_point_index)?; let state_root = self.load_restore_point_hash(restore_point_index)?;
self.load_restore_point(&state_root) self.load_restore_point(&state_root)
} }
*/
/// Load a frozen state that lies between restore points. /// Load a frozen state that lies between restore points.
fn load_cold_intermediate_state(&self, slot: Slot) -> Result<BeaconState<E>, Error> { fn load_cold_intermediate_state(&self, slot: Slot) -> Result<BeaconState<E>, Error> {
// 1. Load the restore points either side of the intermediate state. // 1. Load the restore points either side of the intermediate state.
let low_restore_point_idx = slot.as_u64() / self.config.slots_per_restore_point; let sprp = self.config.slots_per_restore_point;
let high_restore_point_idx = low_restore_point_idx + 1; let low_restore_point_slot = slot / sprp * sprp;
let high_restore_point_slot = low_restore_point_slot + sprp;
// Acquire the read lock, so that the split can't change while this is happening. // Acquire the read lock, so that the split can't change while this is happening.
let split = self.split.read_recursive(); let split = self.split.read_recursive();
let low_restore_point = self.load_restore_point_by_index(low_restore_point_idx)?; let low_restore_point = self.load_restore_point(low_restore_point_slot)?;
let high_restore_point = self.get_restore_point(high_restore_point_idx, &split)?; let high_restore_point = self.get_restore_point(high_restore_point_slot, &split)?;
// 2. Load the blocks from the high restore point back to the low restore point. // 2. Load the blocks from the high restore point back to the low restore point.
let blocks = self.load_blocks_to_replay( let blocks = self.load_blocks_to_replay(
@@ -1342,10 +1351,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// Get the restore point with the given index, or if it is out of bounds, the split state. /// Get the restore point with the given index, or if it is out of bounds, the split state.
pub(crate) fn get_restore_point( pub(crate) fn get_restore_point(
&self, &self,
restore_point_idx: u64, slot: Slot,
split: &Split, split: &Split,
) -> Result<BeaconState<E>, Error> { ) -> Result<BeaconState<E>, Error> {
if restore_point_idx * self.config.slots_per_restore_point >= split.slot.as_u64() { if slot >= split.slot.as_u64() {
self.get_state(&split.state_root, Some(split.slot))? self.get_state(&split.state_root, Some(split.slot))?
.ok_or(HotColdDBError::MissingSplitState( .ok_or(HotColdDBError::MissingSplitState(
split.state_root, split.state_root,
@@ -1353,7 +1362,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
)) ))
.map_err(Into::into) .map_err(Into::into)
} else { } else {
self.load_restore_point_by_index(restore_point_idx) self.load_restore_point(slot)
} }
} }

View File

@@ -234,9 +234,12 @@ pub enum DBColumn {
ForkChoice, ForkChoice,
#[strum(serialize = "pkc")] #[strum(serialize = "pkc")]
PubkeyCache, PubkeyCache,
/// For the table mapping restore point numbers to state roots. /// For the legacy table mapping restore point numbers to state roots.
#[strum(serialize = "brp")] #[strum(serialize = "brp")]
BeaconRestorePoint, BeaconRestorePoint,
/// For the new table mapping restore point slots to compressed beacon states.
#[strum(serialize = "rps")]
BeaconRestorePointState,
#[strum(serialize = "bbr")] #[strum(serialize = "bbr")]
BeaconBlockRoots, BeaconBlockRoots,
#[strum(serialize = "bsr")] #[strum(serialize = "bsr")]
@@ -291,7 +294,8 @@ impl DBColumn {
| Self::BeaconStateRoots | Self::BeaconStateRoots
| Self::BeaconHistoricalRoots | Self::BeaconHistoricalRoots
| Self::BeaconRandaoMixes | Self::BeaconRandaoMixes
| Self::BeaconBlockFrozen => 8, | Self::BeaconBlockFrozen
| Self::BeaconRestorePointState => 8,
} }
} }
} }

View File

@@ -21,6 +21,7 @@ use zstd::Encoder;
)] )]
#[derive(Debug, PartialEq, Clone, Encode)] #[derive(Debug, PartialEq, Clone, Encode)]
#[ssz(enum_behaviour = "transparent")] #[ssz(enum_behaviour = "transparent")]
// FIXME(sproul): schema migration
pub struct PartialBeaconState<T> pub struct PartialBeaconState<T>
where where
T: EthSpec, T: EthSpec,
@@ -209,12 +210,11 @@ impl<T: EthSpec> PartialBeaconState<T> {
} }
/// Prepare the partial state for storage in the KV database. /// Prepare the partial state for storage in the KV database.
pub fn as_kv_store_op( pub fn as_kv_store_op(&self, config: &StoreConfig) -> Result<KeyValueStoreOp, Error> {
&self, let db_key = get_key_for_col(
state_root: Hash256, DBColumn::BeaconRestorePointState.into(),
config: &StoreConfig, &self.slot().as_u64().to_be_bytes(),
) -> Result<KeyValueStoreOp, Error> { );
let db_key = get_key_for_col(DBColumn::BeaconState.into(), state_root.as_bytes());
let ssz_bytes = self.as_ssz_bytes(); let ssz_bytes = self.as_ssz_bytes();

View File

@@ -42,10 +42,7 @@ where
// Iterate blocks from the state lower limit to the upper limit. // Iterate blocks from the state lower limit to the upper limit.
let lower_limit_slot = anchor.state_lower_limit; let lower_limit_slot = anchor.state_lower_limit;
let split = self.get_split_info(); let split = self.get_split_info();
let upper_limit_state = self.get_restore_point( let upper_limit_state = self.get_restore_point(anchor.state_upper_limit, &split)?;
anchor.state_upper_limit.as_u64() / slots_per_restore_point,
&split,
)?;
let upper_limit_slot = upper_limit_state.slot(); let upper_limit_slot = upper_limit_state.slot();
// Use a dummy root, as we never read the block for the upper limit state. // Use a dummy root, as we never read the block for the upper limit state.