mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-18 13:28:33 +00:00
Fix tree-states sub-epoch diffs (#5097)
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
use crate::hdiff::HierarchyConfig;
|
||||
use crate::{DBColumn, Error, StoreItem};
|
||||
use crate::{AnchorInfo, DBColumn, Error, Split, StoreItem};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use ssz::{Decode, Encode};
|
||||
use ssz_derive::{Decode, Encode};
|
||||
@@ -117,15 +117,22 @@ impl StoreConfig {
|
||||
pub fn check_compatibility(
|
||||
&self,
|
||||
on_disk_config: &OnDiskStoreConfig,
|
||||
split: &Split,
|
||||
anchor: Option<&AnchorInfo>,
|
||||
) -> Result<(), StoreConfigError> {
|
||||
let db_config = self.as_disk_config();
|
||||
if db_config.ne(on_disk_config) {
|
||||
return Err(StoreConfigError::IncompatibleStoreConfig {
|
||||
// Allow changing the hierarchy exponents if no historic states are stored.
|
||||
if db_config.linear_blocks == on_disk_config.linear_blocks
|
||||
&& (db_config.hierarchy_config == on_disk_config.hierarchy_config
|
||||
|| anchor.map_or(false, |anchor| anchor.no_historic_states_stored(split.slot)))
|
||||
{
|
||||
Ok(())
|
||||
} else {
|
||||
Err(StoreConfigError::IncompatibleStoreConfig {
|
||||
config: db_config,
|
||||
on_disk: on_disk_config.clone(),
|
||||
});
|
||||
})
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check that the configuration is valid.
|
||||
@@ -218,6 +225,8 @@ impl StoreItem for OnDiskStoreConfig {
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use crate::{metadata::STATE_UPPER_LIMIT_NO_RETAIN, AnchorInfo, Split};
|
||||
use types::{Hash256, Slot};
|
||||
|
||||
#[test]
|
||||
fn check_compatibility_ok() {
|
||||
@@ -229,7 +238,10 @@ mod test {
|
||||
linear_blocks: true,
|
||||
hierarchy_config: store_config.hierarchy_config.clone(),
|
||||
};
|
||||
assert!(store_config.check_compatibility(&on_disk_config).is_ok());
|
||||
let split = Split::default();
|
||||
assert!(store_config
|
||||
.check_compatibility(&on_disk_config, &split, None)
|
||||
.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -242,7 +254,10 @@ mod test {
|
||||
linear_blocks: false,
|
||||
hierarchy_config: store_config.hierarchy_config.clone(),
|
||||
};
|
||||
assert!(store_config.check_compatibility(&on_disk_config).is_err());
|
||||
let split = Split::default();
|
||||
assert!(store_config
|
||||
.check_compatibility(&on_disk_config, &split, None)
|
||||
.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -257,6 +272,34 @@ mod test {
|
||||
exponents: vec![5, 8, 11, 13, 16, 18, 21],
|
||||
},
|
||||
};
|
||||
assert!(store_config.check_compatibility(&on_disk_config).is_err());
|
||||
let split = Split::default();
|
||||
assert!(store_config
|
||||
.check_compatibility(&on_disk_config, &split, None)
|
||||
.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn check_compatibility_hierarchy_config_update() {
|
||||
let store_config = StoreConfig {
|
||||
linear_blocks: true,
|
||||
..Default::default()
|
||||
};
|
||||
let on_disk_config = OnDiskStoreConfig {
|
||||
linear_blocks: true,
|
||||
hierarchy_config: HierarchyConfig {
|
||||
exponents: vec![5, 8, 11, 13, 16, 18, 21],
|
||||
},
|
||||
};
|
||||
let split = Split::default();
|
||||
let anchor = AnchorInfo {
|
||||
anchor_slot: Slot::new(0),
|
||||
oldest_block_slot: Slot::new(0),
|
||||
oldest_block_parent: Hash256::zero(),
|
||||
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
|
||||
state_lower_limit: Slot::new(0),
|
||||
};
|
||||
assert!(store_config
|
||||
.check_compatibility(&on_disk_config, &split, Some(&anchor))
|
||||
.is_ok());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
|
||||
use ssz::{Decode, Encode};
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use std::io::{Read, Write};
|
||||
use std::str::FromStr;
|
||||
use types::{BeaconState, ChainSpec, EthSpec, Slot, VList};
|
||||
use zstd::{Decoder, Encoder};
|
||||
|
||||
@@ -22,6 +23,26 @@ pub struct HierarchyConfig {
|
||||
pub exponents: Vec<u8>,
|
||||
}
|
||||
|
||||
impl FromStr for HierarchyConfig {
|
||||
type Err = String;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, String> {
|
||||
let exponents = s
|
||||
.split(',')
|
||||
.map(|s| {
|
||||
s.parse()
|
||||
.map_err(|e| format!("invalid hierarchy-exponents: {e:?}"))
|
||||
})
|
||||
.collect::<Result<Vec<u8>, _>>()?;
|
||||
|
||||
if exponents.windows(2).any(|w| w[0] >= w[1]) {
|
||||
return Err("hierarchy-exponents must be in ascending order".to_string());
|
||||
}
|
||||
|
||||
Ok(HierarchyConfig { exponents })
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct HierarchyModuli {
|
||||
moduli: Vec<u64>,
|
||||
@@ -267,6 +288,18 @@ impl HierarchyModuli {
|
||||
Ok((slot / last + 1) * last)
|
||||
}
|
||||
}
|
||||
|
||||
/// Return `true` if the database ops for this slot should be committed immediately.
|
||||
///
|
||||
/// This is the case for all diffs in the 2nd lowest layer and above, which are required by diffs
|
||||
/// in the 1st layer.
|
||||
pub fn should_commit_immediately(&self, slot: Slot) -> Result<bool, Error> {
|
||||
// If there's only 1 layer of snapshots, then commit only when writing a snapshot.
|
||||
self.moduli.get(1).map_or_else(
|
||||
|| Ok(slot == self.next_snapshot_slot(slot)?),
|
||||
|second_layer_moduli| Ok(slot % *second_layer_moduli == 0),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -346,7 +346,20 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
|
||||
|
||||
// Ensure that any on-disk config is compatible with the supplied config.
|
||||
if let Some(disk_config) = db.load_config()? {
|
||||
db.config.check_compatibility(&disk_config)?;
|
||||
let split = db.get_split_info();
|
||||
let anchor = db.get_anchor_info();
|
||||
db.config
|
||||
.check_compatibility(&disk_config, &split, anchor.as_ref())?;
|
||||
|
||||
// Inform user if hierarchy config is changing.
|
||||
if db.config.hierarchy_config != disk_config.hierarchy_config {
|
||||
info!(
|
||||
db.log,
|
||||
"Updating historic state config";
|
||||
"previous_config" => ?disk_config.hierarchy_config,
|
||||
"new_config" => ?db.config.hierarchy_config,
|
||||
);
|
||||
}
|
||||
}
|
||||
db.store_config()?;
|
||||
|
||||
@@ -2740,6 +2753,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
let columns = [
|
||||
DBColumn::BeaconState,
|
||||
DBColumn::BeaconStateSummary,
|
||||
DBColumn::BeaconStateDiff,
|
||||
DBColumn::BeaconRestorePoint,
|
||||
DBColumn::BeaconStateRoots,
|
||||
DBColumn::BeaconHistoricalRoots,
|
||||
@@ -2780,6 +2794,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
self.cold_db.do_atomically(cold_ops)?;
|
||||
}
|
||||
|
||||
// In order to reclaim space, we need to compact the freezer DB as well.
|
||||
self.cold_db.compact()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -155,25 +155,15 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
|
||||
self.transaction_mutex.lock()
|
||||
}
|
||||
|
||||
/// Compact all values in the states and states flag columns.
|
||||
fn compact(&self) -> Result<(), Error> {
|
||||
let endpoints = |column: DBColumn| {
|
||||
(
|
||||
BytesKey::from_vec(get_key_for_col(column.as_str(), Hash256::zero().as_bytes())),
|
||||
BytesKey::from_vec(get_key_for_col(
|
||||
column.as_str(),
|
||||
Hash256::repeat_byte(0xff).as_bytes(),
|
||||
)),
|
||||
)
|
||||
};
|
||||
|
||||
for (start_key, end_key) in [
|
||||
endpoints(DBColumn::BeaconState),
|
||||
endpoints(DBColumn::BeaconStateDiff),
|
||||
endpoints(DBColumn::BeaconStateSummary),
|
||||
] {
|
||||
self.db.compact(&start_key, &end_key);
|
||||
}
|
||||
fn compact_column(&self, column: DBColumn) -> Result<(), Error> {
|
||||
// Use key-size-agnostic keys [] and 0xff..ff with a minimum of 32 bytes to account for
|
||||
// columns that may change size between sub-databases or schema versions.
|
||||
let start_key = BytesKey::from_vec(get_key_for_col(column.as_str(), &[]));
|
||||
let end_key = BytesKey::from_vec(get_key_for_col(
|
||||
column.as_str(),
|
||||
&vec![0; std::cmp::max(column.key_size(), 32)],
|
||||
));
|
||||
self.db.compact(&start_key, &end_key);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -79,8 +79,23 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
|
||||
/// this method. In future we may implement a safer mandatory locking scheme.
|
||||
fn begin_rw_transaction(&self) -> MutexGuard<()>;
|
||||
|
||||
/// Compact the database, freeing space used by deleted items.
|
||||
fn compact(&self) -> Result<(), Error>;
|
||||
/// Compact a single column in the database, freeing space used by deleted items.
|
||||
fn compact_column(&self, column: DBColumn) -> Result<(), Error>;
|
||||
|
||||
/// Compact a default set of columns that are likely to free substantial space.
|
||||
fn compact(&self) -> Result<(), Error> {
|
||||
// Compact state and block related columns as they are likely to have the most churn,
|
||||
// i.e. entries being created and deleted.
|
||||
for column in [
|
||||
DBColumn::BeaconState,
|
||||
DBColumn::BeaconStateDiff,
|
||||
DBColumn::BeaconStateSummary,
|
||||
DBColumn::BeaconBlock,
|
||||
] {
|
||||
self.compact_column(column)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Iterate through all keys and values in a particular column.
|
||||
fn iter_column<K: Key>(&self, column: DBColumn) -> ColumnIter<K> {
|
||||
|
||||
@@ -108,7 +108,7 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
|
||||
self.transaction_mutex.lock()
|
||||
}
|
||||
|
||||
fn compact(&self) -> Result<(), Error> {
|
||||
fn compact_column(&self, _column: DBColumn) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -108,6 +108,11 @@ impl AnchorInfo {
|
||||
pub fn block_backfill_complete(&self, target_slot: Slot) -> bool {
|
||||
self.oldest_block_slot <= target_slot
|
||||
}
|
||||
|
||||
/// Return true if no historic states other than genesis are stored in the database.
|
||||
pub fn no_historic_states_stored(&self, split_slot: Slot) -> bool {
|
||||
self.state_lower_limit == 0 && self.state_upper_limit >= split_slot
|
||||
}
|
||||
}
|
||||
|
||||
impl StoreItem for AnchorInfo {
|
||||
|
||||
@@ -106,7 +106,7 @@ where
|
||||
self.store_cold_state(&state_root, &state, &mut io_batch)?;
|
||||
|
||||
// If the slot lies on an epoch boundary, commit the batch and update the anchor.
|
||||
if slot % E::slots_per_epoch() == 0 || slot + 1 == upper_limit_slot {
|
||||
if self.hierarchy.should_commit_immediately(slot)? || slot + 1 == upper_limit_slot {
|
||||
info!(
|
||||
self.log,
|
||||
"State reconstruction in progress";
|
||||
|
||||
Reference in New Issue
Block a user