Tree states to support per-slot state diffs (#4652)

* Support per slot state diffs

* Store HierarchyConfig on disk. Support storing hdiffs at per slot level.

* Revert HierachyConfig change for testing.

* Add validity check for the hierarchy config when opening the DB.

* Update HDiff tests.

* Fix `get_cold_state` panic when the diff for the slot isn't stored.

* Use slots instead of epochs for storing snapshots in freezer DB.

* Add snapshot buffer to `diff_buffer_cache` instead of loading it from db every time.

* Add `hierarchy-exponents` cli flag to beacon node.

* Add test for `StorageStrategy::ReplayFrom` and ignore a flaky test.

* Drop hierarchy_config in tests for more frequent snapshot and fix an issue where hdiff wasn't stored unless it's a epoch boundary slot.
This commit is contained in:
Jimmy Chen
2023-09-11 10:19:40 +10:00
committed by GitHub
parent e373e9a107
commit 1e4ee7aa5e
8 changed files with 311 additions and 112 deletions

View File

@@ -45,16 +45,25 @@ pub struct StoreConfig {
/// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params.
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
// FIXME(sproul): schema migration, add hdiff
// FIXME(sproul): schema migration
pub struct OnDiskStoreConfig {
pub linear_blocks: bool,
pub linear_restore_points: bool,
pub hierarchy_config: HierarchyConfig,
}
#[derive(Debug, Clone)]
pub enum StoreConfigError {
MismatchedSlotsPerRestorePoint { config: u64, on_disk: u64 },
InvalidCompressionLevel { level: i32 },
MismatchedSlotsPerRestorePoint {
config: u64,
on_disk: u64,
},
InvalidCompressionLevel {
level: i32,
},
IncompatibleStoreConfig {
config: OnDiskStoreConfig,
on_disk: OnDiskStoreConfig,
},
}
impl Default for StoreConfig {
@@ -80,15 +89,21 @@ impl StoreConfig {
pub fn as_disk_config(&self) -> OnDiskStoreConfig {
OnDiskStoreConfig {
linear_blocks: self.linear_blocks,
linear_restore_points: self.linear_restore_points,
hierarchy_config: self.hierarchy_config.clone(),
}
}
pub fn check_compatibility(
&self,
_on_disk_config: &OnDiskStoreConfig,
on_disk_config: &OnDiskStoreConfig,
) -> Result<(), StoreConfigError> {
// FIXME(sproul): TODO
let db_config = self.as_disk_config();
if db_config.ne(on_disk_config) {
return Err(StoreConfigError::IncompatibleStoreConfig {
config: db_config,
on_disk: on_disk_config.clone(),
});
}
Ok(())
}
@@ -146,3 +161,49 @@ impl StoreItem for OnDiskStoreConfig {
Ok(Self::from_ssz_bytes(bytes)?)
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn check_compatibility_ok() {
let store_config = StoreConfig {
linear_blocks: true,
..Default::default()
};
let on_disk_config = OnDiskStoreConfig {
linear_blocks: true,
hierarchy_config: store_config.hierarchy_config.clone(),
};
assert!(store_config.check_compatibility(&on_disk_config).is_ok());
}
#[test]
fn check_compatibility_linear_blocks_mismatch() {
let store_config = StoreConfig {
linear_blocks: true,
..Default::default()
};
let on_disk_config = OnDiskStoreConfig {
linear_blocks: false,
hierarchy_config: store_config.hierarchy_config.clone(),
};
assert!(store_config.check_compatibility(&on_disk_config).is_err());
}
#[test]
fn check_compatibility_hierarchy_config_incompatible() {
let store_config = StoreConfig {
linear_blocks: true,
..Default::default()
};
let on_disk_config = OnDiskStoreConfig {
linear_blocks: true,
hierarchy_config: HierarchyConfig {
exponents: vec![5, 8, 11, 13, 16, 18, 21],
},
};
assert!(store_config.check_compatibility(&on_disk_config).is_err());
}
}

View File

@@ -41,7 +41,7 @@ pub enum Error {
},
MissingStateRoot(Slot),
MissingState(Hash256),
MissingSnapshot(Epoch),
MissingSnapshot(Slot),
MissingDiff(Epoch),
NoBaseStateFound(Hash256),
BlockReplayError(BlockReplayError),

View File

@@ -5,21 +5,21 @@ use serde::{Deserialize, Serialize};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use std::io::{Read, Write};
use types::{BeaconState, ChainSpec, Epoch, EthSpec, VList};
use types::{BeaconState, ChainSpec, EthSpec, Slot, VList};
use zstd::{Decoder, Encoder};
#[derive(Debug)]
pub enum Error {
InvalidHierarchy,
XorDeletionsNotSupported,
U64DiffDeletionsNotSupported,
UnableToComputeDiff,
UnableToApplyDiff,
Compression(std::io::Error),
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
pub struct HierarchyConfig {
exponents: Vec<u8>,
pub exponents: Vec<u8>,
}
#[derive(Debug)]
@@ -29,8 +29,8 @@ pub struct HierarchyModuli {
#[derive(Debug, PartialEq, Eq)]
pub enum StorageStrategy {
Nothing,
DiffFrom(Epoch),
ReplayFrom(Slot),
DiffFrom(Slot),
Snapshot,
}
@@ -45,7 +45,7 @@ pub struct HDiffBuffer {
#[derive(Debug, Encode, Decode)]
pub struct HDiff {
state_diff: BytesDiff,
balances_diff: XorDiff,
balances_diff: CompressedU64Diff,
}
#[derive(Debug, Encode, Decode)]
@@ -54,7 +54,7 @@ pub struct BytesDiff {
}
#[derive(Debug, Encode, Decode)]
pub struct XorDiff {
pub struct CompressedU64Diff {
bytes: Vec<u8>,
}
@@ -78,7 +78,7 @@ impl HDiffBuffer {
impl HDiff {
pub fn compute(source: &HDiffBuffer, target: &HDiffBuffer) -> Result<Self, Error> {
let state_diff = BytesDiff::compute(&source.state, &target.state)?;
let balances_diff = XorDiff::compute(&source.balances, &target.balances)?;
let balances_diff = CompressedU64Diff::compute(&source.balances, &target.balances)?;
Ok(Self {
state_diff,
@@ -138,10 +138,10 @@ impl BytesDiff {
}
}
impl XorDiff {
impl CompressedU64Diff {
pub fn compute(xs: &[u64], ys: &[u64]) -> Result<Self, Error> {
if xs.len() > ys.len() {
return Err(Error::XorDeletionsNotSupported);
return Err(Error::U64DiffDeletionsNotSupported);
}
let uncompressed_bytes: Vec<u8> = ys
@@ -164,7 +164,7 @@ impl XorDiff {
.map_err(Error::Compression)?;
encoder.finish().map_err(Error::Compression)?;
Ok(XorDiff {
Ok(CompressedU64Diff {
bytes: compressed_bytes,
})
}
@@ -198,7 +198,7 @@ impl XorDiff {
impl Default for HierarchyConfig {
fn default() -> Self {
HierarchyConfig {
exponents: vec![0, 4, 6, 8, 11, 13, 16],
exponents: vec![5, 9, 11, 13, 16, 18, 21],
}
}
}
@@ -226,30 +226,45 @@ impl HierarchyConfig {
}
impl HierarchyModuli {
pub fn storage_strategy(&self, epoch: Epoch) -> Result<StorageStrategy, Error> {
pub fn storage_strategy(&self, slot: Slot) -> Result<StorageStrategy, Error> {
let last = self.moduli.last().copied().ok_or(Error::InvalidHierarchy)?;
let first = self
.moduli
.first()
.copied()
.ok_or(Error::InvalidHierarchy)?;
let replay_from = slot / first * first;
if epoch % last == 0 {
if slot % last == 0 {
return Ok(StorageStrategy::Snapshot);
}
let diff_from = self.moduli.iter().rev().find_map(|&n| {
(epoch % n == 0).then(|| {
// Diff from the previous state.
(epoch - 1) / n * n
})
});
Ok(diff_from.map_or(StorageStrategy::Nothing, StorageStrategy::DiffFrom))
let diff_from = self
.moduli
.iter()
.rev()
.tuple_windows()
.find_map(|(&n_big, &n_small)| {
(slot % n_small == 0).then(|| {
// Diff from the previous layer.
slot / n_big * n_big
})
});
Ok(diff_from.map_or(
StorageStrategy::ReplayFrom(replay_from),
StorageStrategy::DiffFrom,
))
}
/// Return the smallest epoch greater than or equal to `epoch` at which a full snapshot should
/// Return the smallest slot greater than or equal to `slot` at which a full snapshot should
/// be stored.
pub fn next_snapshot_epoch(&self, epoch: Epoch) -> Result<Epoch, Error> {
pub fn next_snapshot_slot(&self, slot: Slot) -> Result<Slot, Error> {
let last = self.moduli.last().copied().ok_or(Error::InvalidHierarchy)?;
if epoch % last == 0 {
Ok(epoch)
if slot % last == 0 {
Ok(slot)
} else {
Ok((epoch / last + 1) * last)
Ok((slot / last + 1) * last)
}
}
}
@@ -265,10 +280,10 @@ mod tests {
let moduli = config.to_moduli().unwrap();
// Full snapshots at multiples of 2^16.
let snapshot_freq = Epoch::new(1 << 16);
// Full snapshots at multiples of 2^21.
let snapshot_freq = Slot::new(1 << 21);
assert_eq!(
moduli.storage_strategy(Epoch::new(0)).unwrap(),
moduli.storage_strategy(Slot::new(0)).unwrap(),
StorageStrategy::Snapshot
);
assert_eq!(
@@ -280,46 +295,52 @@ mod tests {
StorageStrategy::Snapshot
);
// For the first layer of diffs
let first_layer = Epoch::new(1 << 13);
// Diffs should be from the previous layer (the snapshot in this case), and not the previous diff in the same layer.
let first_layer = Slot::new(1 << 18);
assert_eq!(
moduli.storage_strategy(first_layer * 2).unwrap(),
StorageStrategy::DiffFrom(first_layer)
StorageStrategy::DiffFrom(Slot::new(0))
);
let replay_strategy_slot = first_layer + 1;
assert_eq!(
moduli.storage_strategy(replay_strategy_slot).unwrap(),
StorageStrategy::ReplayFrom(first_layer)
);
}
#[test]
fn next_snapshot_epoch() {
fn next_snapshot_slot() {
let config = HierarchyConfig::default();
config.validate().unwrap();
let moduli = config.to_moduli().unwrap();
let snapshot_freq = Epoch::new(1 << 16);
let snapshot_freq = Slot::new(1 << 21);
assert_eq!(
moduli.next_snapshot_epoch(snapshot_freq).unwrap(),
moduli.next_snapshot_slot(snapshot_freq).unwrap(),
snapshot_freq
);
assert_eq!(
moduli.next_snapshot_epoch(snapshot_freq + 1).unwrap(),
moduli.next_snapshot_slot(snapshot_freq + 1).unwrap(),
snapshot_freq * 2
);
assert_eq!(
moduli.next_snapshot_epoch(snapshot_freq * 2 - 1).unwrap(),
moduli.next_snapshot_slot(snapshot_freq * 2 - 1).unwrap(),
snapshot_freq * 2
);
assert_eq!(
moduli.next_snapshot_epoch(snapshot_freq * 2).unwrap(),
moduli.next_snapshot_slot(snapshot_freq * 2).unwrap(),
snapshot_freq * 2
);
assert_eq!(
moduli.next_snapshot_epoch(snapshot_freq * 100).unwrap(),
moduli.next_snapshot_slot(snapshot_freq * 100).unwrap(),
snapshot_freq * 100
);
}
#[test]
fn xor_vs_bytes_diff() {
fn compressed_u64_vs_bytes_diff() {
let x_values = vec![99u64, 55, 123, 6834857, 0, 12];
let y_values = vec![98u64, 55, 312, 1, 1, 2, 4, 5];
@@ -329,12 +350,12 @@ mod tests {
let x_bytes = to_bytes(&x_values);
let y_bytes = to_bytes(&y_values);
let xor_diff = XorDiff::compute(&x_values, &y_values).unwrap();
let u64_diff = CompressedU64Diff::compute(&x_values, &y_values).unwrap();
let mut y_from_xor = x_values;
xor_diff.apply(&mut y_from_xor).unwrap();
let mut y_from_u64_diff = x_values;
u64_diff.apply(&mut y_from_u64_diff).unwrap();
assert_eq!(y_values, y_from_xor);
assert_eq!(y_values, y_from_u64_diff);
let bytes_diff = BytesDiff::compute(&x_bytes, &y_bytes).unwrap();
@@ -343,7 +364,7 @@ mod tests {
assert_eq!(y_bytes, y_from_bytes);
// XOR diff wins by more than a factor of 3
assert!(xor_diff.bytes.len() < 3 * bytes_diff.bytes.len());
// U64 diff wins by more than a factor of 3
assert!(u64_diff.bytes.len() < 3 * bytes_diff.bytes.len());
}
}

View File

@@ -79,7 +79,7 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
#[allow(dead_code)]
historic_state_cache: Mutex<LruCache<Slot, BeaconState<E>>>,
/// Cache of hierarchical diff buffers.
diff_buffer_cache: Mutex<LruCache<Epoch, HDiffBuffer>>,
diff_buffer_cache: Mutex<LruCache<Slot, HDiffBuffer>>,
// Cache of hierarchical diffs.
// FIXME(sproul): see if this is necessary
/// Chain spec.
@@ -113,7 +113,7 @@ pub enum HotColdDBError {
MissingPrevState(Hash256),
MissingSplitState(Hash256, Slot),
MissingStateDiff(Hash256),
MissingHDiff(Epoch),
MissingHDiff(Slot),
MissingExecutionPayload(Hash256),
MissingFullBlockExecutionPayloadPruned(Hash256, Slot),
MissingAnchorInfo,
@@ -1403,17 +1403,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
) -> Result<(), Error> {
self.store_cold_state_summary(state_root, state.slot(), ops)?;
if state.slot() % E::slots_per_epoch() != 0 {
return Ok(());
}
let epoch = state.current_epoch();
match self.hierarchy.storage_strategy(epoch)? {
StorageStrategy::Nothing => {
let slot = state.slot();
match self.hierarchy.storage_strategy(slot)? {
StorageStrategy::ReplayFrom(from) => {
debug!(
self.log,
"Storing cold state";
"strategy" => "replay",
"from_slot" => from,
"slot" => state.slot(),
);
}
@@ -1431,6 +1428,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.log,
"Storing cold state";
"strategy" => "diff",
"from_slot" => from,
"slot" => state.slot(),
);
self.store_cold_state_as_diff(state, from, ops)?;
@@ -1453,22 +1451,18 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
encoder.write_all(&bytes).map_err(Error::Compression)?;
encoder.finish().map_err(Error::Compression)?;
let epoch = state.current_epoch();
let key = get_key_for_col(
DBColumn::BeaconStateSnapshot.into(),
&epoch.as_u64().to_be_bytes(),
&state.slot().as_u64().to_be_bytes(),
);
ops.push(KeyValueStoreOp::PutKeyValue(key, compressed_value));
Ok(())
}
pub fn load_cold_state_bytes_as_snapshot(
&self,
epoch: Epoch,
) -> Result<Option<Vec<u8>>, Error> {
pub fn load_cold_state_bytes_as_snapshot(&self, slot: Slot) -> Result<Option<Vec<u8>>, Error> {
match self.cold_db.get_bytes(
DBColumn::BeaconStateSnapshot.into(),
&epoch.as_u64().to_be_bytes(),
&slot.as_u64().to_be_bytes(),
)? {
Some(bytes) => {
let mut ssz_bytes =
@@ -1483,12 +1477,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
}
pub fn load_cold_state_as_snapshot(
&self,
epoch: Epoch,
) -> Result<Option<BeaconState<E>>, Error> {
pub fn load_cold_state_as_snapshot(&self, slot: Slot) -> Result<Option<BeaconState<E>>, Error> {
Ok(self
.load_cold_state_bytes_as_snapshot(epoch)?
.load_cold_state_bytes_as_snapshot(slot)?
.map(|bytes| BeaconState::from_ssz_bytes(&bytes, &self.spec))
.transpose()?)
}
@@ -1496,18 +1487,18 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn store_cold_state_as_diff(
&self,
state: &BeaconState<E>,
from_epoch: Epoch,
from_slot: Slot,
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
// Load diff base state bytes.
let base_buffer = self.load_hdiff_buffer_for_epoch(from_epoch)?;
let (_, base_buffer) = self.load_hdiff_buffer_for_slot(from_slot)?;
let target_buffer = HDiffBuffer::from_state(state.clone());
let diff = HDiff::compute(&base_buffer, &target_buffer)?;
let diff_bytes = diff.as_ssz_bytes();
let key = get_key_for_col(
DBColumn::BeaconStateDiff.into(),
&state.current_epoch().as_u64().to_be_bytes(),
&state.slot().as_u64().to_be_bytes(),
);
ops.push(KeyValueStoreOp::PutKeyValue(key, diff_bytes));
Ok(())
@@ -1527,10 +1518,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
///
/// Will reconstruct the state if it lies between restore points.
pub fn load_cold_state_by_slot(&self, slot: Slot) -> Result<Option<BeaconState<E>>, Error> {
let epoch = slot.epoch(E::slots_per_epoch());
let hdiff_buffer = self.load_hdiff_buffer_for_epoch(epoch)?;
let (base_slot, hdiff_buffer) = self.load_hdiff_buffer_for_slot(slot)?;
let base_state = hdiff_buffer.into_state(&self.spec)?;
debug_assert_eq!(base_slot, base_state.slot());
if base_state.slot() == slot {
return Ok(Some(base_state));
@@ -1548,56 +1538,68 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
.map(Some)
}
fn load_hdiff_for_epoch(&self, epoch: Epoch) -> Result<HDiff, Error> {
fn load_hdiff_for_slot(&self, slot: Slot) -> Result<HDiff, Error> {
self.cold_db
.get_bytes(
DBColumn::BeaconStateDiff.into(),
&epoch.as_u64().to_be_bytes(),
&slot.as_u64().to_be_bytes(),
)?
.map(|bytes| HDiff::from_ssz_bytes(&bytes))
.ok_or(HotColdDBError::MissingHDiff(epoch))?
.ok_or(HotColdDBError::MissingHDiff(slot))?
.map_err(Into::into)
}
fn load_hdiff_buffer_for_epoch(&self, epoch: Epoch) -> Result<HDiffBuffer, Error> {
if let Some(buffer) = self.diff_buffer_cache.lock().get(&epoch) {
/// Returns `HDiffBuffer` for the specified slot, or `HDiffBuffer` for the `ReplayFrom` slot if
/// the diff for the specified slot is not stored.
fn load_hdiff_buffer_for_slot(&self, slot: Slot) -> Result<(Slot, HDiffBuffer), Error> {
if let Some(buffer) = self.diff_buffer_cache.lock().get(&slot) {
debug!(
self.log,
"Hit diff buffer cache";
"epoch" => epoch
"slot" => slot
);
return Ok(buffer.clone());
return Ok((slot, buffer.clone()));
}
// Load buffer for the previous state.
// This amount of recursion (<10 levels) should be OK.
let t = std::time::Instant::now();
let mut buffer = match self.hierarchy.storage_strategy(epoch)? {
let (_buffer_slot, mut buffer) = match self.hierarchy.storage_strategy(slot)? {
// Base case.
StorageStrategy::Snapshot => {
let state = self
.load_cold_state_as_snapshot(epoch)?
.ok_or(Error::MissingSnapshot(epoch))?;
return Ok(HDiffBuffer::from_state(state));
.load_cold_state_as_snapshot(slot)?
.ok_or(Error::MissingSnapshot(slot))?;
let buffer = HDiffBuffer::from_state(state);
self.diff_buffer_cache.lock().put(slot, buffer.clone());
debug!(
self.log,
"Added diff buffer to cache";
"load_time_ms" => t.elapsed().as_millis(),
"slot" => slot
);
return Ok((slot, buffer));
}
// Recursive case.
StorageStrategy::DiffFrom(from) => self.load_hdiff_buffer_for_epoch(from)?,
StorageStrategy::Nothing => unreachable!("FIXME(sproul)"),
StorageStrategy::DiffFrom(from) => self.load_hdiff_buffer_for_slot(from)?,
StorageStrategy::ReplayFrom(from) => return self.load_hdiff_buffer_for_slot(from),
};
// Load diff and apply it to buffer.
let diff = self.load_hdiff_for_epoch(epoch)?;
let diff = self.load_hdiff_for_slot(slot)?;
diff.apply(&mut buffer)?;
self.diff_buffer_cache.lock().put(epoch, buffer.clone());
self.diff_buffer_cache.lock().put(slot, buffer.clone());
debug!(
self.log,
"Added diff buffer to cache";
"load_time_ms" => t.elapsed().as_millis(),
"epoch" => epoch
"slot" => slot
);
Ok(buffer)
Ok((slot, buffer))
}
/// Load cold blocks between `start_slot` and `end_slot` inclusive.
@@ -1741,14 +1743,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// Initialise the anchor info for checkpoint sync starting from `block`.
pub fn init_anchor_info(&self, block: BeaconBlockRef<'_, E>) -> Result<KeyValueStoreOp, Error> {
let anchor_slot = block.slot();
let anchor_epoch = anchor_slot.epoch(E::slots_per_epoch());
// Set the `state_upper_limit` to the slot of the *next* checkpoint.
// See `get_state_upper_limit` for rationale.
let next_snapshot_slot = self
.hierarchy
.next_snapshot_epoch(anchor_epoch)?
.start_slot(E::slots_per_epoch());
let next_snapshot_slot = self.hierarchy.next_snapshot_slot(anchor_slot)?;
let anchor_info = AnchorInfo {
anchor_slot,
oldest_block_slot: anchor_slot,
@@ -2219,15 +2217,19 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
let mut cold_db_ops: Vec<KeyValueStoreOp> = Vec::new();
if slot % E::slots_per_epoch() == 0 {
// Only store the cold state if it's on a diff boundary
if matches!(
store.hierarchy.storage_strategy(slot)?,
StorageStrategy::ReplayFrom(..)
) {
// Store slot -> state_root and state_root -> slot mappings.
store.store_cold_state_summary(&state_root, slot, &mut cold_db_ops)?;
} else {
let state: BeaconState<E> = store
.get_hot_state(&state_root)?
.ok_or(HotColdDBError::MissingStateToFreeze(state_root))?;
store.store_cold_state(&state_root, &state, &mut cold_db_ops)?;
} else {
// Store slot -> state_root and state_root -> slot mappings.
store.store_cold_state_summary(&state_root, slot, &mut cold_db_ops)?;
}
// There are data dependencies between calls to `store_cold_state()` that prevent us from