mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-30 20:57:10 +00:00
Add StatePayloadStatus to storage_strategy
This commit is contained in:
@@ -224,7 +224,7 @@ pub fn upgrade_to_v24<T: BeaconChainTypes>(
|
|||||||
if previous_snapshot_slot >= anchor_info.state_upper_limit
|
if previous_snapshot_slot >= anchor_info.state_upper_limit
|
||||||
&& db
|
&& db
|
||||||
.hierarchy
|
.hierarchy
|
||||||
.storage_strategy(split.slot, dummy_start_slot)
|
.storage_strategy(split.slot, dummy_start_slot, StatePayloadStatus::Pending)
|
||||||
.is_ok_and(|strategy| !strategy.is_replay_from())
|
.is_ok_and(|strategy| !strategy.is_replay_from())
|
||||||
{
|
{
|
||||||
info!(
|
info!(
|
||||||
@@ -331,7 +331,8 @@ pub fn upgrade_to_v24<T: BeaconChainTypes>(
|
|||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
// 1. Store snapshot or diff at this slot (if required).
|
// 1. Store snapshot or diff at this slot (if required).
|
||||||
let storage_strategy = db.hot_storage_strategy(slot)?;
|
let storage_strategy =
|
||||||
|
db.hot_storage_strategy(slot, StatePayloadStatus::Pending)?;
|
||||||
debug!(
|
debug!(
|
||||||
%slot,
|
%slot,
|
||||||
?state_root,
|
?state_root,
|
||||||
|
|||||||
@@ -5097,7 +5097,7 @@ async fn replay_from_split_state() {
|
|||||||
assert!(
|
assert!(
|
||||||
store
|
store
|
||||||
.hierarchy
|
.hierarchy
|
||||||
.storage_strategy(split.slot, anchor_slot)
|
.storage_strategy(split.slot, anchor_slot, StatePayloadStatus::Pending)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.is_replay_from()
|
.is_replay_from()
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -12,7 +12,9 @@ use std::str::FromStr;
|
|||||||
use std::sync::LazyLock;
|
use std::sync::LazyLock;
|
||||||
use superstruct::superstruct;
|
use superstruct::superstruct;
|
||||||
use types::state::HistoricalSummary;
|
use types::state::HistoricalSummary;
|
||||||
use types::{BeaconState, ChainSpec, Epoch, EthSpec, Hash256, Slot, Validator};
|
use types::{
|
||||||
|
BeaconState, ChainSpec, Epoch, EthSpec, Hash256, Slot, Validator, execution::StatePayloadStatus,
|
||||||
|
};
|
||||||
|
|
||||||
static EMPTY_PUBKEY: LazyLock<PublicKeyBytes> = LazyLock::new(PublicKeyBytes::empty);
|
static EMPTY_PUBKEY: LazyLock<PublicKeyBytes> = LazyLock::new(PublicKeyBytes::empty);
|
||||||
|
|
||||||
@@ -653,7 +655,21 @@ impl HierarchyModuli {
|
|||||||
/// exponents [5,13,21], to reconstruct state at slot 3,000,003: if start = 3,000,002
|
/// exponents [5,13,21], to reconstruct state at slot 3,000,003: if start = 3,000,002
|
||||||
/// layer 2 diff will point to the start snapshot instead of the layer 1 diff at
|
/// layer 2 diff will point to the start snapshot instead of the layer 1 diff at
|
||||||
/// 2998272.
|
/// 2998272.
|
||||||
pub fn storage_strategy(&self, slot: Slot, start_slot: Slot) -> Result<StorageStrategy, Error> {
|
/// * `payload_status` - whether the state is `Full` (came from processing a payload), or
|
||||||
|
/// `Pending` (came from processing a block). Prior to Gloas all states are Pending.
|
||||||
|
pub fn storage_strategy(
|
||||||
|
&self,
|
||||||
|
slot: Slot,
|
||||||
|
start_slot: Slot,
|
||||||
|
payload_status: StatePayloadStatus,
|
||||||
|
) -> Result<StorageStrategy, Error> {
|
||||||
|
// Store all Full states by replaying from their respective Pending state at the same slot.
|
||||||
|
if let StatePayloadStatus::Full = payload_status
|
||||||
|
&& slot >= start_slot
|
||||||
|
{
|
||||||
|
return Ok(StorageStrategy::ReplayFrom(slot));
|
||||||
|
}
|
||||||
|
|
||||||
match slot.cmp(&start_slot) {
|
match slot.cmp(&start_slot) {
|
||||||
Ordering::Less => return Err(Error::LessThanStart(slot, start_slot)),
|
Ordering::Less => return Err(Error::LessThanStart(slot, start_slot)),
|
||||||
Ordering::Equal => return Ok(StorageStrategy::Snapshot),
|
Ordering::Equal => return Ok(StorageStrategy::Snapshot),
|
||||||
@@ -809,33 +825,42 @@ mod tests {
|
|||||||
let sslot = Slot::new(0);
|
let sslot = Slot::new(0);
|
||||||
|
|
||||||
let moduli = config.to_moduli().unwrap();
|
let moduli = config.to_moduli().unwrap();
|
||||||
|
let payload_status = StatePayloadStatus::Pending;
|
||||||
|
|
||||||
// Full snapshots at multiples of 2^21.
|
// Full snapshots at multiples of 2^21.
|
||||||
let snapshot_freq = Slot::new(1 << 21);
|
let snapshot_freq = Slot::new(1 << 21);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
moduli.storage_strategy(Slot::new(0), sslot).unwrap(),
|
moduli
|
||||||
|
.storage_strategy(Slot::new(0), sslot, payload_status)
|
||||||
|
.unwrap(),
|
||||||
StorageStrategy::Snapshot
|
StorageStrategy::Snapshot
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
moduli.storage_strategy(snapshot_freq, sslot).unwrap(),
|
moduli
|
||||||
|
.storage_strategy(snapshot_freq, sslot, payload_status)
|
||||||
|
.unwrap(),
|
||||||
StorageStrategy::Snapshot
|
StorageStrategy::Snapshot
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
moduli.storage_strategy(snapshot_freq * 3, sslot).unwrap(),
|
moduli
|
||||||
|
.storage_strategy(snapshot_freq * 3, sslot, payload_status)
|
||||||
|
.unwrap(),
|
||||||
StorageStrategy::Snapshot
|
StorageStrategy::Snapshot
|
||||||
);
|
);
|
||||||
|
|
||||||
// Diffs should be from the previous layer (the snapshot in this case), and not the previous diff in the same layer.
|
// Diffs should be from the previous layer (the snapshot in this case), and not the previous diff in the same layer.
|
||||||
let first_layer = Slot::new(1 << 18);
|
let first_layer = Slot::new(1 << 18);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
moduli.storage_strategy(first_layer * 2, sslot).unwrap(),
|
moduli
|
||||||
|
.storage_strategy(first_layer * 2, sslot, payload_status)
|
||||||
|
.unwrap(),
|
||||||
StorageStrategy::DiffFrom(Slot::new(0))
|
StorageStrategy::DiffFrom(Slot::new(0))
|
||||||
);
|
);
|
||||||
|
|
||||||
let replay_strategy_slot = first_layer + 1;
|
let replay_strategy_slot = first_layer + 1;
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
moduli
|
moduli
|
||||||
.storage_strategy(replay_strategy_slot, sslot)
|
.storage_strategy(replay_strategy_slot, sslot, payload_status)
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
StorageStrategy::ReplayFrom(first_layer)
|
StorageStrategy::ReplayFrom(first_layer)
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -452,15 +452,26 @@ impl<E: EthSpec> HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold> {
|
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold> {
|
||||||
fn cold_storage_strategy(&self, slot: Slot) -> Result<StorageStrategy, Error> {
|
fn cold_storage_strategy(
|
||||||
|
&self,
|
||||||
|
slot: Slot,
|
||||||
|
// payload_status: StatePayloadStatus,
|
||||||
|
) -> Result<StorageStrategy, Error> {
|
||||||
// The start slot for the freezer HDiff is always 0
|
// The start slot for the freezer HDiff is always 0
|
||||||
Ok(self.hierarchy.storage_strategy(slot, Slot::new(0))?)
|
// TODO(gloas): wire up payload_status
|
||||||
}
|
|
||||||
|
|
||||||
pub fn hot_storage_strategy(&self, slot: Slot) -> Result<StorageStrategy, Error> {
|
|
||||||
Ok(self
|
Ok(self
|
||||||
.hierarchy
|
.hierarchy
|
||||||
.storage_strategy(slot, self.hot_hdiff_start_slot()?)?)
|
.storage_strategy(slot, Slot::new(0), StatePayloadStatus::Pending)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn hot_storage_strategy(
|
||||||
|
&self,
|
||||||
|
slot: Slot,
|
||||||
|
payload_status: StatePayloadStatus,
|
||||||
|
) -> Result<StorageStrategy, Error> {
|
||||||
|
Ok(self
|
||||||
|
.hierarchy
|
||||||
|
.storage_strategy(slot, self.hot_hdiff_start_slot()?, payload_status)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn hot_hdiff_start_slot(&self) -> Result<Slot, Error> {
|
pub fn hot_hdiff_start_slot(&self) -> Result<Slot, Error> {
|
||||||
@@ -1380,8 +1391,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
// NOTE: `hot_storage_strategy` can error if there are states in the database
|
// NOTE: `hot_storage_strategy` can error if there are states in the database
|
||||||
// prior to the `anchor_slot`. This can happen if checkpoint sync has been
|
// prior to the `anchor_slot`. This can happen if checkpoint sync has been
|
||||||
// botched and left some states in the database prior to completing.
|
// botched and left some states in the database prior to completing.
|
||||||
|
// Use `Pending` status here because snapshots and diffs are only stored for
|
||||||
|
// `Pending` states.
|
||||||
if let Some(slot) = slot
|
if let Some(slot) = slot
|
||||||
&& let Ok(strategy) = self.hot_storage_strategy(slot)
|
&& let Ok(strategy) =
|
||||||
|
self.hot_storage_strategy(slot, StatePayloadStatus::Pending)
|
||||||
{
|
{
|
||||||
match strategy {
|
match strategy {
|
||||||
StorageStrategy::Snapshot => {
|
StorageStrategy::Snapshot => {
|
||||||
@@ -1643,6 +1657,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
state: &BeaconState<E>,
|
state: &BeaconState<E>,
|
||||||
ops: &mut Vec<KeyValueStoreOp>,
|
ops: &mut Vec<KeyValueStoreOp>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
let payload_status = state.payload_status();
|
||||||
|
|
||||||
match self.state_cache.lock().put_state(
|
match self.state_cache.lock().put_state(
|
||||||
*state_root,
|
*state_root,
|
||||||
state.get_latest_block_root(*state_root),
|
state.get_latest_block_root(*state_root),
|
||||||
@@ -1688,7 +1704,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
debug!(
|
debug!(
|
||||||
?state_root,
|
?state_root,
|
||||||
slot = %state.slot(),
|
slot = %state.slot(),
|
||||||
storage_strategy = ?self.hot_storage_strategy(state.slot())?,
|
storage_strategy = ?self.hot_storage_strategy(state.slot(), payload_status)?,
|
||||||
diff_base_state = %summary.diff_base_state,
|
diff_base_state = %summary.diff_base_state,
|
||||||
previous_state_root = ?summary.previous_state_root,
|
previous_state_root = ?summary.previous_state_root,
|
||||||
"Storing hot state summary and diffs"
|
"Storing hot state summary and diffs"
|
||||||
@@ -1711,7 +1727,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
self,
|
self,
|
||||||
*state_root,
|
*state_root,
|
||||||
state,
|
state,
|
||||||
self.hot_storage_strategy(state.slot())?,
|
self.hot_storage_strategy(state.slot(), state.payload_status())?,
|
||||||
)?;
|
)?;
|
||||||
ops.push(hot_state_summary.as_kv_store_op(*state_root));
|
ops.push(hot_state_summary.as_kv_store_op(*state_root));
|
||||||
Ok(hot_state_summary)
|
Ok(hot_state_summary)
|
||||||
@@ -1724,7 +1740,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
ops: &mut Vec<KeyValueStoreOp>,
|
ops: &mut Vec<KeyValueStoreOp>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let slot = state.slot();
|
let slot = state.slot();
|
||||||
let storage_strategy = self.hot_storage_strategy(slot)?;
|
let storage_strategy = self.hot_storage_strategy(slot, state.payload_status())?;
|
||||||
match storage_strategy {
|
match storage_strategy {
|
||||||
StorageStrategy::ReplayFrom(_) => {
|
StorageStrategy::ReplayFrom(_) => {
|
||||||
// Already have persisted the state summary, don't persist anything else
|
// Already have persisted the state summary, don't persist anything else
|
||||||
@@ -1884,16 +1900,20 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
return Ok(buffer);
|
return Ok(buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
let Some(HotStateSummary {
|
let Some(
|
||||||
slot,
|
summary @ HotStateSummary {
|
||||||
diff_base_state,
|
slot,
|
||||||
..
|
diff_base_state,
|
||||||
}) = self.load_hot_state_summary(&state_root)?
|
..
|
||||||
|
},
|
||||||
|
) = self.load_hot_state_summary(&state_root)?
|
||||||
else {
|
else {
|
||||||
return Err(Error::MissingHotStateSummary(state_root));
|
return Err(Error::MissingHotStateSummary(state_root));
|
||||||
};
|
};
|
||||||
|
|
||||||
let buffer = match self.hot_storage_strategy(slot)? {
|
let payload_status = self.get_hot_state_summary_payload_status(&summary)?;
|
||||||
|
|
||||||
|
let buffer = match self.hot_storage_strategy(slot, payload_status)? {
|
||||||
StorageStrategy::Snapshot => {
|
StorageStrategy::Snapshot => {
|
||||||
let Some(state) = self.load_hot_state_as_snapshot(state_root)? else {
|
let Some(state) = self.load_hot_state_as_snapshot(state_root)? else {
|
||||||
let existing_snapshots = self.load_hot_state_snapshot_roots()?;
|
let existing_snapshots = self.load_hot_state_snapshot_roots()?;
|
||||||
@@ -1979,7 +1999,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
},
|
},
|
||||||
) = self.load_hot_state_summary(state_root)?
|
) = self.load_hot_state_summary(state_root)?
|
||||||
{
|
{
|
||||||
let mut state = match self.hot_storage_strategy(slot)? {
|
let payload_status = self.get_hot_state_summary_payload_status(&summary)?;
|
||||||
|
let mut state = match self.hot_storage_strategy(slot, payload_status)? {
|
||||||
strat @ StorageStrategy::Snapshot | strat @ StorageStrategy::DiffFrom(_) => {
|
strat @ StorageStrategy::Snapshot | strat @ StorageStrategy::DiffFrom(_) => {
|
||||||
let buffer_timer = metrics::start_timer_vec(
|
let buffer_timer = metrics::start_timer_vec(
|
||||||
&metrics::BEACON_HDIFF_BUFFER_LOAD_TIME,
|
&metrics::BEACON_HDIFF_BUFFER_LOAD_TIME,
|
||||||
@@ -2026,8 +2047,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
.lock()
|
.lock()
|
||||||
.rebase_on_finalized(&mut base_state, &self.spec)?;
|
.rebase_on_finalized(&mut base_state, &self.spec)?;
|
||||||
|
|
||||||
let payload_status = self.get_hot_state_summary_payload_status(&summary)?;
|
|
||||||
|
|
||||||
self.load_hot_state_using_replay(
|
self.load_hot_state_using_replay(
|
||||||
base_state,
|
base_state,
|
||||||
slot,
|
slot,
|
||||||
|
|||||||
Reference in New Issue
Block a user