mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-30 20:57:10 +00:00
Generalise reconstruct_historic_states for ranged replay (#9222)
Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com>
This commit is contained in:
@@ -1,7 +1,8 @@
|
|||||||
//! Implementation of historic state reconstruction (given complete block history).
|
//! Implementation of historic state reconstruction (given complete block history).
|
||||||
|
use crate::forwards_iter::FrozenForwardsIterator;
|
||||||
use crate::hot_cold_store::{HotColdDB, HotColdDBError};
|
use crate::hot_cold_store::{HotColdDB, HotColdDBError};
|
||||||
use crate::metrics;
|
use crate::metrics;
|
||||||
use crate::{Error, ItemStore};
|
use crate::{DBColumn, Error, ItemStore};
|
||||||
use itertools::{Itertools, process_results};
|
use itertools::{Itertools, process_results};
|
||||||
use state_processing::{
|
use state_processing::{
|
||||||
BlockSignatureStrategy, ConsensusContext, VerifyBlockRoot, per_block_processing,
|
BlockSignatureStrategy, ConsensusContext, VerifyBlockRoot, per_block_processing,
|
||||||
@@ -9,7 +10,7 @@ use state_processing::{
|
|||||||
};
|
};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tracing::{debug, info};
|
use tracing::{debug, info};
|
||||||
use types::EthSpec;
|
use types::{EthSpec, Slot};
|
||||||
|
|
||||||
impl<E, Hot, Cold> HotColdDB<E, Hot, Cold>
|
impl<E, Hot, Cold> HotColdDB<E, Hot, Cold>
|
||||||
where
|
where
|
||||||
@@ -35,13 +36,6 @@ where
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!(
|
|
||||||
start_slot = %anchor.state_lower_limit,
|
|
||||||
"Starting state reconstruction batch"
|
|
||||||
);
|
|
||||||
|
|
||||||
let _t = metrics::start_timer(&metrics::STORE_BEACON_RECONSTRUCTION_TIME);
|
|
||||||
|
|
||||||
// Iterate blocks from the state lower limit to the upper limit.
|
// Iterate blocks from the state lower limit to the upper limit.
|
||||||
let split = self.get_split_info();
|
let split = self.get_split_info();
|
||||||
let lower_limit_slot = anchor.state_lower_limit;
|
let lower_limit_slot = anchor.state_lower_limit;
|
||||||
@@ -56,20 +50,86 @@ where
|
|||||||
// If `num_blocks` is not specified iterate all blocks. Add 1 so that we end on an epoch
|
// If `num_blocks` is not specified iterate all blocks. Add 1 so that we end on an epoch
|
||||||
// boundary when `num_blocks` is a multiple of an epoch boundary. We want to be *inclusive*
|
// boundary when `num_blocks` is a multiple of an epoch boundary. We want to be *inclusive*
|
||||||
// of the state at slot `lower_limit_slot + num_blocks`.
|
// of the state at slot `lower_limit_slot + num_blocks`.
|
||||||
let block_root_iter = self
|
let to_slot = num_blocks
|
||||||
.forwards_block_roots_iterator_until(lower_limit_slot, upper_limit_slot - 1, || {
|
.map(|n| std::cmp::min(lower_limit_slot + n as u64 + 1, upper_limit_slot))
|
||||||
Err(Error::StateShouldNotBeRequired(upper_limit_slot - 1))
|
.unwrap_or(upper_limit_slot);
|
||||||
})?
|
|
||||||
.take(num_blocks.map_or(usize::MAX, |n| n + 1));
|
let on_commit = |slot: Slot| -> Result<(), Error> {
|
||||||
|
info!(
|
||||||
|
%slot,
|
||||||
|
remaining = %(upper_limit_slot - 1 - slot),
|
||||||
|
"State reconstruction in progress"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Update anchor.
|
||||||
|
let old_anchor = anchor.clone();
|
||||||
|
let reconstruction_complete = slot + 1 == upper_limit_slot;
|
||||||
|
|
||||||
|
if reconstruction_complete {
|
||||||
|
// The two limits have met in the middle! We're done!
|
||||||
|
let new_anchor = old_anchor.as_archive_anchor();
|
||||||
|
self.compare_and_set_anchor_info_with_write(old_anchor, new_anchor)?;
|
||||||
|
} else {
|
||||||
|
// The lower limit has been raised, store it.
|
||||||
|
anchor.state_lower_limit = slot;
|
||||||
|
self.compare_and_set_anchor_info_with_write(old_anchor, anchor.clone())?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
};
|
||||||
|
|
||||||
|
self.reconstruct_historic_states_on_range(lower_limit_slot, to_slot, on_commit)?;
|
||||||
|
|
||||||
|
// Check that the split point wasn't mutated during the state reconstruction process.
|
||||||
|
// It shouldn't have been, due to the serialization of requests through the store migrator,
|
||||||
|
// so this is just a paranoid check.
|
||||||
|
let latest_split = self.get_split_info();
|
||||||
|
if split != latest_split {
|
||||||
|
return Err(Error::SplitPointModified(latest_split.slot, split.slot));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reconstruct historic states for the slot range `(with_state_at_slot, to_slot)`.
|
||||||
|
///
|
||||||
|
/// Loads the state at `with_state_at_slot` and replays blocks up to and including slot
|
||||||
|
/// `to_slot - 1`, writing all intermediate states to the freezer DB.
|
||||||
|
///
|
||||||
|
/// The `BeaconBlockRoots` column must be populated for the range before this is called.
|
||||||
|
///
|
||||||
|
/// `on_commit(slot)` is invoked after each atomic commit (whenever the hierarchy says to
|
||||||
|
/// commit, plus once at the final slot) so callers can update anchor metadata or log
|
||||||
|
/// progress.
|
||||||
|
pub fn reconstruct_historic_states_on_range(
|
||||||
|
self: &Arc<Self>,
|
||||||
|
with_state_at_slot: Slot,
|
||||||
|
to_slot: Slot,
|
||||||
|
mut on_commit: impl FnMut(Slot) -> Result<(), Error>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
debug!(
|
||||||
|
from_slot = %(with_state_at_slot + 1),
|
||||||
|
%to_slot,
|
||||||
|
"Starting state reconstruction batch"
|
||||||
|
);
|
||||||
|
|
||||||
|
let _t = metrics::start_timer(&metrics::STORE_BEACON_RECONSTRUCTION_TIME);
|
||||||
|
|
||||||
|
// Iterate from `with_state_at_slot` so `tuple_windows` gives us the predecessor block
|
||||||
|
// root at each step for skip detection.
|
||||||
|
let block_root_iter = FrozenForwardsIterator::new(
|
||||||
|
self,
|
||||||
|
DBColumn::BeaconBlockRoots,
|
||||||
|
with_state_at_slot,
|
||||||
|
to_slot,
|
||||||
|
)?;
|
||||||
|
|
||||||
// The state to be advanced.
|
// The state to be advanced.
|
||||||
let mut state = self.load_cold_state_by_slot(lower_limit_slot)?;
|
let mut state = self.load_cold_state_by_slot(with_state_at_slot)?;
|
||||||
|
|
||||||
state.build_caches(&self.spec)?;
|
state.build_caches(&self.spec)?;
|
||||||
|
|
||||||
process_results(block_root_iter, |iter| -> Result<(), Error> {
|
process_results(block_root_iter, |iter| -> Result<(), Error> {
|
||||||
let mut io_batch = vec![];
|
let mut io_batch = vec![];
|
||||||
|
|
||||||
let mut prev_state_root = None;
|
let mut prev_state_root = None;
|
||||||
|
|
||||||
for ((prev_block_root, _), (block_root, slot)) in iter.tuple_windows() {
|
for ((prev_block_root, _), (block_root, slot)) in iter.tuple_windows() {
|
||||||
@@ -114,32 +174,16 @@ where
|
|||||||
// Stage state for storage in freezer DB.
|
// Stage state for storage in freezer DB.
|
||||||
self.store_cold_state(&state_root, &state, &mut io_batch)?;
|
self.store_cold_state(&state_root, &state, &mut io_batch)?;
|
||||||
|
|
||||||
let batch_complete =
|
let batch_complete = slot + 1 == to_slot;
|
||||||
num_blocks.is_some_and(|n_blocks| slot == lower_limit_slot + n_blocks as u64);
|
|
||||||
let reconstruction_complete = slot + 1 == upper_limit_slot;
|
|
||||||
|
|
||||||
// Commit the I/O batch if:
|
// Commit the I/O batch if:
|
||||||
//
|
//
|
||||||
// - The diff/snapshot for this slot is required for future slots, or
|
// - The diff/snapshot for this slot is required for future slots, or
|
||||||
// - The reconstruction batch is complete (we are about to return), or
|
// - The reconstruction batch is complete (we are about to return).
|
||||||
// - Reconstruction is complete.
|
if self.hierarchy.should_commit_immediately(slot)? || batch_complete {
|
||||||
if self.hierarchy.should_commit_immediately(slot)?
|
|
||||||
|| batch_complete
|
|
||||||
|| reconstruction_complete
|
|
||||||
{
|
|
||||||
info!(
|
|
||||||
%slot,
|
|
||||||
remaining = %(upper_limit_slot - 1 - slot),
|
|
||||||
"State reconstruction in progress"
|
|
||||||
);
|
|
||||||
|
|
||||||
self.cold_db.do_atomically(std::mem::take(&mut io_batch))?;
|
self.cold_db.do_atomically(std::mem::take(&mut io_batch))?;
|
||||||
|
|
||||||
// Update anchor.
|
if batch_complete {
|
||||||
let old_anchor = anchor.clone();
|
|
||||||
|
|
||||||
if reconstruction_complete {
|
|
||||||
// The two limits have met in the middle! We're done!
|
|
||||||
// Perform one last integrity check on the state reached.
|
// Perform one last integrity check on the state reached.
|
||||||
let computed_state_root = state.update_tree_hash_cache()?;
|
let computed_state_root = state.update_tree_hash_cache()?;
|
||||||
if computed_state_root != state_root {
|
if computed_state_root != state_root {
|
||||||
@@ -149,23 +193,15 @@ where
|
|||||||
computed: computed_state_root,
|
computed: computed_state_root,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
let new_anchor = old_anchor.as_archive_anchor();
|
|
||||||
self.compare_and_set_anchor_info_with_write(old_anchor, new_anchor)?;
|
|
||||||
|
|
||||||
return Ok(());
|
|
||||||
} else {
|
|
||||||
// The lower limit has been raised, store it.
|
|
||||||
anchor.state_lower_limit = slot;
|
|
||||||
|
|
||||||
self.compare_and_set_anchor_info_with_write(old_anchor, anchor.clone())?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
on_commit(slot)?;
|
||||||
|
|
||||||
// If this is the end of the batch, return Ok. The caller will run another
|
// If this is the end of the batch, return Ok. The caller will run another
|
||||||
// batch when there is idle capacity.
|
// batch when there is idle capacity.
|
||||||
if batch_complete {
|
if batch_complete {
|
||||||
debug!(
|
debug!(
|
||||||
start_slot = %lower_limit_slot,
|
start_slot = %(with_state_at_slot + 1),
|
||||||
end_slot = %slot,
|
end_slot = %slot,
|
||||||
"Finished state reconstruction batch"
|
"Finished state reconstruction batch"
|
||||||
);
|
);
|
||||||
@@ -174,19 +210,10 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Should always reach the `upper_limit_slot` or the end of the batch and return early
|
// Should always reach `to_slot` or the end of the batch and return early above.
|
||||||
// above.
|
|
||||||
Err(Error::StateReconstructionLogicError)
|
Err(Error::StateReconstructionLogicError)
|
||||||
})??;
|
})??;
|
||||||
|
|
||||||
// Check that the split point wasn't mutated during the state reconstruction process.
|
|
||||||
// It shouldn't have been, due to the serialization of requests through the store migrator,
|
|
||||||
// so this is just a paranoid check.
|
|
||||||
let latest_split = self.get_split_info();
|
|
||||||
if split != latest_split {
|
|
||||||
return Err(Error::SplitPointModified(latest_split.slot, split.slot));
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user