Implement tree states & hierarchical state DB

This commit is contained in:
Michael Sproul
2023-06-19 10:14:47 +10:00
parent 2bb62b7f7d
commit 23db089a7a
193 changed files with 6093 additions and 5925 deletions

View File

@@ -1,75 +0,0 @@
use crate::chunked_vector::{chunk_key, Chunk, ChunkError, Field};
use crate::{Error, KeyValueStore, KeyValueStoreOp};
use types::EthSpec;
/// Buffered writer for chunked vectors (block roots mainly).
pub struct ChunkWriter<'a, F, E, S>
where
F: Field<E>,
E: EthSpec,
S: KeyValueStore<E>,
{
/// Buffered chunk awaiting writing to disk (always dirty).
chunk: Chunk<F::Value>,
/// Chunk index of `chunk`.
index: usize,
store: &'a S,
}
impl<'a, F, E, S> ChunkWriter<'a, F, E, S>
where
F: Field<E>,
E: EthSpec,
S: KeyValueStore<E>,
{
pub fn new(store: &'a S, vindex: usize) -> Result<Self, Error> {
let chunk_index = F::chunk_index(vindex);
let chunk = Chunk::load(store, F::column(), &chunk_key(chunk_index))?
.unwrap_or_else(|| Chunk::new(vec![F::Value::default(); F::chunk_size()]));
Ok(Self {
chunk,
index: chunk_index,
store,
})
}
/// Set the value at a given vector index, writing the current chunk and moving on if necessary.
pub fn set(
&mut self,
vindex: usize,
value: F::Value,
batch: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
let chunk_index = F::chunk_index(vindex);
// Advance to the next chunk.
if chunk_index != self.index {
self.write(batch)?;
*self = Self::new(self.store, vindex)?;
}
let i = vindex % F::chunk_size();
let existing_value = &self.chunk.values[i];
if existing_value == &value || existing_value == &F::Value::default() {
self.chunk.values[i] = value;
Ok(())
} else {
Err(ChunkError::Inconsistent {
field: F::column(),
chunk_index,
existing_value: format!("{:?}", existing_value),
new_value: format!("{:?}", value),
}
.into())
}
}
/// Write the current chunk to disk.
///
/// Should be called before the writer is dropped, in order to write the final chunk to disk.
pub fn write(&self, batch: &mut Vec<KeyValueStoreOp>) -> Result<(), Error> {
self.chunk.store(F::column(), &chunk_key(self.index), batch)
}
}

View File

@@ -1,123 +0,0 @@
use crate::chunked_vector::{chunk_key, Chunk, Field};
use crate::{HotColdDB, ItemStore};
use slog::error;
use types::{ChainSpec, EthSpec, Slot};
/// Iterator over the values of a `BeaconState` vector field (like `block_roots`).
///
/// Uses the freezer DB's separate table to load the values.
pub struct ChunkedVectorIter<'a, F, E, Hot, Cold>
where
F: Field<E>,
E: EthSpec,
Hot: ItemStore<E>,
Cold: ItemStore<E>,
{
pub(crate) store: &'a HotColdDB<E, Hot, Cold>,
current_vindex: usize,
pub(crate) end_vindex: usize,
next_cindex: usize,
current_chunk: Chunk<F::Value>,
}
impl<'a, F, E, Hot, Cold> ChunkedVectorIter<'a, F, E, Hot, Cold>
where
F: Field<E>,
E: EthSpec,
Hot: ItemStore<E>,
Cold: ItemStore<E>,
{
/// Create a new iterator which can yield elements from `start_vindex` up to the last
/// index stored by the restore point at `last_restore_point_slot`.
///
/// The `last_restore_point` slot should be the slot of a recent restore point as obtained from
/// `HotColdDB::get_latest_restore_point_slot`. We pass it as a parameter so that the caller can
/// maintain a stable view of the database (see `HybridForwardsBlockRootsIterator`).
pub fn new(
store: &'a HotColdDB<E, Hot, Cold>,
start_vindex: usize,
last_restore_point_slot: Slot,
spec: &ChainSpec,
) -> Self {
let (_, end_vindex) = F::start_and_end_vindex(last_restore_point_slot, spec);
// Set the next chunk to the one containing `start_vindex`.
let next_cindex = start_vindex / F::chunk_size();
// Set the current chunk to the empty chunk, it will never be read.
let current_chunk = Chunk::default();
Self {
store,
current_vindex: start_vindex,
end_vindex,
next_cindex,
current_chunk,
}
}
}
impl<'a, F, E, Hot, Cold> Iterator for ChunkedVectorIter<'a, F, E, Hot, Cold>
where
F: Field<E>,
E: EthSpec,
Hot: ItemStore<E>,
Cold: ItemStore<E>,
{
type Item = (usize, F::Value);
fn next(&mut self) -> Option<Self::Item> {
let chunk_size = F::chunk_size();
// Range exhausted, return `None` forever.
if self.current_vindex >= self.end_vindex {
None
}
// Value lies in the current chunk, return it.
else if self.current_vindex < self.next_cindex * chunk_size {
let vindex = self.current_vindex;
let val = self
.current_chunk
.values
.get(vindex % chunk_size)
.cloned()
.or_else(|| {
error!(
self.store.log,
"Missing chunk value in forwards iterator";
"vector index" => vindex
);
None
})?;
self.current_vindex += 1;
Some((vindex, val))
}
// Need to load the next chunk, load it and recurse back into the in-range case.
else {
self.current_chunk = Chunk::load(
&self.store.cold_db,
F::column(),
&chunk_key(self.next_cindex),
)
.map_err(|e| {
error!(
self.store.log,
"Database error in forwards iterator";
"chunk index" => self.next_cindex,
"error" => format!("{:?}", e)
);
e
})
.ok()?
.or_else(|| {
error!(
self.store.log,
"Missing chunk in forwards iterator";
"chunk index" => self.next_cindex
);
None
})?;
self.next_cindex += 1;
self.next()
}
}
}

View File

@@ -1,883 +0,0 @@
//! Space-efficient storage for `BeaconState` vector fields.
//!
//! This module provides logic for splitting the `FixedVector` fields of a `BeaconState` into
//! chunks, and storing those chunks in contiguous ranges in the on-disk database. The motiviation
//! for doing this is avoiding massive duplication in every on-disk state. For example, rather than
//! storing the whole `historical_roots` vector, which is updated once every couple of thousand
//! slots, at every slot, we instead store all the historical values as a chunked vector on-disk,
//! and fetch only the slice we need when reconstructing the `historical_roots` of a state.
//!
//! ## Terminology
//!
//! * **Chunk size**: the number of vector values stored per on-disk chunk.
//! * **Vector index** (vindex): index into all the historical values, identifying a single element
//! of the vector being stored.
//! * **Chunk index** (cindex): index into the keyspace of the on-disk database, identifying a chunk
//! of elements. To find the chunk index of a vector index: `cindex = vindex / chunk_size`.
use self::UpdatePattern::*;
use crate::*;
use ssz::{Decode, Encode};
use typenum::Unsigned;
use types::historical_summary::HistoricalSummary;
/// Description of how a `BeaconState` field is updated during state processing.
///
/// When storing a state, this allows us to efficiently store only those entries
/// which are not present in the DB already.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum UpdatePattern {
/// The value is updated once per `n` slots.
OncePerNSlots {
n: u64,
/// The slot at which the field begins to accumulate values.
///
/// The field should not be read or written until `activation_slot` is reached, and the
/// activation slot should act as an offset when converting slots to vector indices.
activation_slot: Option<Slot>,
/// The slot at which the field ceases to accumulate values.
///
/// If this is `None` then the field is continually updated.
deactivation_slot: Option<Slot>,
},
/// The value is updated once per epoch, for the epoch `current_epoch - lag`.
OncePerEpoch { lag: u64 },
}
/// Map a chunk index to bytes that can be used to key the NoSQL database.
///
/// We shift chunks up by 1 to make room for a genesis chunk that is handled separately.
pub fn chunk_key(cindex: usize) -> [u8; 8] {
(cindex as u64 + 1).to_be_bytes()
}
/// Return the database key for the genesis value.
fn genesis_value_key() -> [u8; 8] {
0u64.to_be_bytes()
}
/// Trait for types representing fields of the `BeaconState`.
///
/// All of the required methods are type-level, because we do most things with fields at the
/// type-level. We require their value-level witnesses to be `Copy` so that we can avoid the
/// turbofish when calling functions like `store_updated_vector`.
pub trait Field<E: EthSpec>: Copy {
/// The type of value stored in this field: the `T` from `FixedVector<T, N>`.
///
/// The `Default` impl will be used to fill extra vector entries.
type Value: Decode + Encode + Default + Clone + PartialEq + std::fmt::Debug;
/// The length of this field: the `N` from `FixedVector<T, N>`.
type Length: Unsigned;
/// The database column where the integer-indexed chunks for this field should be stored.
///
/// Each field's column **must** be unique.
fn column() -> DBColumn;
/// Update pattern for this field, so that we can do differential updates.
fn update_pattern(spec: &ChainSpec) -> UpdatePattern;
/// The number of values to store per chunk on disk.
///
/// Default is 128 so that we read/write 4K pages when the values are 32 bytes.
// TODO: benchmark and optimise this parameter
fn chunk_size() -> usize {
128
}
/// Convert a v-index (vector index) to a chunk index.
fn chunk_index(vindex: usize) -> usize {
vindex / Self::chunk_size()
}
/// Get the value of this field at the given vector index, from the state.
fn get_value(
state: &BeaconState<E>,
vindex: u64,
spec: &ChainSpec,
) -> Result<Self::Value, ChunkError>;
/// True if this is a `FixedLengthField`, false otherwise.
fn is_fixed_length() -> bool;
/// Compute the start and end vector indices of the slice of history required at `current_slot`.
///
/// ## Example
///
/// If we have a field that is updated once per epoch, then the end vindex will be
/// `current_epoch + 1`, because we want to include the value for the current epoch, and the
/// start vindex will be `end_vindex - Self::Length`, because that's how far back we can look.
fn start_and_end_vindex(current_slot: Slot, spec: &ChainSpec) -> (usize, usize) {
// We take advantage of saturating subtraction on slots and epochs
match Self::update_pattern(spec) {
OncePerNSlots {
n,
activation_slot,
deactivation_slot,
} => {
// Per-slot changes exclude the index for the current slot, because
// it won't be set until the slot completes (think of `state_roots`, `block_roots`).
// This also works for the `historical_roots` because at the `n`th slot, the 0th
// entry of the list is created, and before that the list is empty.
//
// To account for the switch from historical roots to historical summaries at
// Capella we also modify the current slot by the activation and deactivation slots.
// The activation slot acts as an offset (subtraction) while the deactivation slot
// acts as a clamp (min).
let slot_with_clamp = deactivation_slot.map_or(current_slot, |deactivation_slot| {
std::cmp::min(current_slot, deactivation_slot)
});
let slot_with_clamp_and_offset = if let Some(activation_slot) = activation_slot {
slot_with_clamp - activation_slot
} else {
// Return (0, 0) to indicate that the field should not be read/written.
return (0, 0);
};
let end_vindex = slot_with_clamp_and_offset / n;
let start_vindex = end_vindex - Self::Length::to_u64();
(start_vindex.as_usize(), end_vindex.as_usize())
}
OncePerEpoch { lag } => {
// Per-epoch changes include the index for the current epoch, because it
// will have been set at the most recent epoch boundary.
let current_epoch = current_slot.epoch(E::slots_per_epoch());
let end_epoch = current_epoch + 1 - lag;
let start_epoch = end_epoch + lag - Self::Length::to_u64();
(start_epoch.as_usize(), end_epoch.as_usize())
}
}
}
/// Given an `existing_chunk` stored in the DB, construct an updated chunk to replace it.
fn get_updated_chunk(
existing_chunk: &Chunk<Self::Value>,
chunk_index: usize,
start_vindex: usize,
end_vindex: usize,
state: &BeaconState<E>,
spec: &ChainSpec,
) -> Result<Chunk<Self::Value>, Error> {
let chunk_size = Self::chunk_size();
let mut new_chunk = Chunk::new(vec![Self::Value::default(); chunk_size]);
for i in 0..chunk_size {
let vindex = chunk_index * chunk_size + i;
if vindex >= start_vindex && vindex < end_vindex {
let vector_value = Self::get_value(state, vindex as u64, spec)?;
if let Some(existing_value) = existing_chunk.values.get(i) {
if *existing_value != vector_value && *existing_value != Self::Value::default()
{
return Err(ChunkError::Inconsistent {
field: Self::column(),
chunk_index,
existing_value: format!("{:?}", existing_value),
new_value: format!("{:?}", vector_value),
}
.into());
}
}
new_chunk.values[i] = vector_value;
} else {
new_chunk.values[i] = existing_chunk.values.get(i).cloned().unwrap_or_default();
}
}
Ok(new_chunk)
}
/// Determine whether a state at `slot` possesses (or requires) the genesis value.
fn slot_needs_genesis_value(slot: Slot, spec: &ChainSpec) -> bool {
let (_, end_vindex) = Self::start_and_end_vindex(slot, spec);
match Self::update_pattern(spec) {
// If the end_vindex is less than the length of the vector, then the vector
// has not yet been completely filled with non-genesis values, and so the genesis
// value is still required.
OncePerNSlots { .. } => {
Self::is_fixed_length() && end_vindex < Self::Length::to_usize()
}
// If the field has lag, then it takes an extra `lag` vindices beyond the
// `end_vindex` before the vector has been filled with non-genesis values.
OncePerEpoch { lag } => {
Self::is_fixed_length() && end_vindex + (lag as usize) < Self::Length::to_usize()
}
}
}
/// Load the genesis value for a fixed length field from the store.
///
/// This genesis value should be used to fill the initial state of the vector.
fn load_genesis_value<S: KeyValueStore<E>>(store: &S) -> Result<Self::Value, Error> {
let key = &genesis_value_key()[..];
let chunk =
Chunk::load(store, Self::column(), key)?.ok_or(ChunkError::MissingGenesisValue)?;
chunk
.values
.first()
.cloned()
.ok_or_else(|| ChunkError::MissingGenesisValue.into())
}
/// Store the given `value` as the genesis value for this field, unless stored already.
///
/// Check the existing value (if any) for consistency with the value we intend to store, and
/// return an error if they are inconsistent.
fn check_and_store_genesis_value<S: KeyValueStore<E>>(
store: &S,
value: Self::Value,
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
let key = &genesis_value_key()[..];
if let Some(existing_chunk) = Chunk::<Self::Value>::load(store, Self::column(), key)? {
if existing_chunk.values.len() != 1 {
Err(ChunkError::InvalidGenesisChunk {
field: Self::column(),
expected_len: 1,
observed_len: existing_chunk.values.len(),
}
.into())
} else if existing_chunk.values[0] != value {
Err(ChunkError::InconsistentGenesisValue {
field: Self::column(),
existing_value: format!("{:?}", existing_chunk.values[0]),
new_value: format!("{:?}", value),
}
.into())
} else {
Ok(())
}
} else {
let chunk = Chunk::new(vec![value]);
chunk.store(Self::column(), &genesis_value_key()[..], ops)?;
Ok(())
}
}
/// Extract the genesis value for a fixed length field from an
///
/// Will only return a correct value if `slot_needs_genesis_value(state.slot(), spec) == true`.
fn extract_genesis_value(
state: &BeaconState<E>,
spec: &ChainSpec,
) -> Result<Self::Value, Error> {
let (_, end_vindex) = Self::start_and_end_vindex(state.slot(), spec);
match Self::update_pattern(spec) {
// Genesis value is guaranteed to exist at `end_vindex`, as it won't yet have been
// updated
OncePerNSlots { .. } => Ok(Self::get_value(state, end_vindex as u64, spec)?),
// If there's lag, the value of the field at the vindex *without the lag*
// should still be set to the genesis value.
OncePerEpoch { lag } => Ok(Self::get_value(state, end_vindex as u64 + lag, spec)?),
}
}
}
/// Marker trait for fixed-length fields (`FixedVector<T, N>`).
pub trait FixedLengthField<E: EthSpec>: Field<E> {}
/// Marker trait for variable-length fields (`VariableList<T, N>`).
pub trait VariableLengthField<E: EthSpec>: Field<E> {}
/// Macro to implement the `Field` trait on a new unit struct type.
macro_rules! field {
($struct_name:ident, $marker_trait:ident, $value_ty:ty, $length_ty:ty, $column:expr,
$update_pattern:expr, $get_value:expr) => {
#[derive(Clone, Copy)]
pub struct $struct_name;
impl<T> Field<T> for $struct_name
where
T: EthSpec,
{
type Value = $value_ty;
type Length = $length_ty;
fn column() -> DBColumn {
$column
}
fn update_pattern(spec: &ChainSpec) -> UpdatePattern {
$update_pattern(spec)
}
fn get_value(
state: &BeaconState<T>,
vindex: u64,
spec: &ChainSpec,
) -> Result<Self::Value, ChunkError> {
$get_value(state, vindex, spec)
}
fn is_fixed_length() -> bool {
stringify!($marker_trait) == "FixedLengthField"
}
}
impl<E: EthSpec> $marker_trait<E> for $struct_name {}
};
}
field!(
BlockRoots,
FixedLengthField,
Hash256,
T::SlotsPerHistoricalRoot,
DBColumn::BeaconBlockRoots,
|_| OncePerNSlots {
n: 1,
activation_slot: Some(Slot::new(0)),
deactivation_slot: None
},
|state: &BeaconState<_>, index, _| safe_modulo_index(state.block_roots(), index)
);
field!(
StateRoots,
FixedLengthField,
Hash256,
T::SlotsPerHistoricalRoot,
DBColumn::BeaconStateRoots,
|_| OncePerNSlots {
n: 1,
activation_slot: Some(Slot::new(0)),
deactivation_slot: None,
},
|state: &BeaconState<_>, index, _| safe_modulo_index(state.state_roots(), index)
);
field!(
HistoricalRoots,
VariableLengthField,
Hash256,
T::HistoricalRootsLimit,
DBColumn::BeaconHistoricalRoots,
|spec: &ChainSpec| OncePerNSlots {
n: T::SlotsPerHistoricalRoot::to_u64(),
activation_slot: Some(Slot::new(0)),
deactivation_slot: spec
.capella_fork_epoch
.map(|fork_epoch| fork_epoch.start_slot(T::slots_per_epoch())),
},
|state: &BeaconState<_>, index, _| safe_modulo_index(state.historical_roots(), index)
);
field!(
RandaoMixes,
FixedLengthField,
Hash256,
T::EpochsPerHistoricalVector,
DBColumn::BeaconRandaoMixes,
|_| OncePerEpoch { lag: 1 },
|state: &BeaconState<_>, index, _| safe_modulo_index(state.randao_mixes(), index)
);
field!(
HistoricalSummaries,
VariableLengthField,
HistoricalSummary,
T::HistoricalRootsLimit,
DBColumn::BeaconHistoricalSummaries,
|spec: &ChainSpec| OncePerNSlots {
n: T::SlotsPerHistoricalRoot::to_u64(),
activation_slot: spec
.capella_fork_epoch
.map(|fork_epoch| fork_epoch.start_slot(T::slots_per_epoch())),
deactivation_slot: None,
},
|state: &BeaconState<_>, index, _| safe_modulo_index(
state
.historical_summaries()
.map_err(|_| ChunkError::InvalidFork)?,
index
)
);
pub fn store_updated_vector<F: Field<E>, E: EthSpec, S: KeyValueStore<E>>(
field: F,
store: &S,
state: &BeaconState<E>,
spec: &ChainSpec,
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
let chunk_size = F::chunk_size();
let (start_vindex, end_vindex) = F::start_and_end_vindex(state.slot(), spec);
let start_cindex = start_vindex / chunk_size;
let end_cindex = end_vindex / chunk_size;
// Store the genesis value if we have access to it, and it hasn't been stored already.
if F::slot_needs_genesis_value(state.slot(), spec) {
let genesis_value = F::extract_genesis_value(state, spec)?;
F::check_and_store_genesis_value(store, genesis_value, ops)?;
}
// Start by iterating backwards from the last chunk, storing new chunks in the database.
// Stop once a chunk in the database matches what we were about to store, this indicates
// that a previously stored state has already filled-in a portion of the indices covered.
let full_range_checked = store_range(
field,
(start_cindex..=end_cindex).rev(),
start_vindex,
end_vindex,
store,
state,
spec,
ops,
)?;
// If the previous `store_range` did not check the entire range, it may be the case that the
// state's vector includes elements at low vector indices that are not yet stored in the
// database, so run another `store_range` to ensure these values are also stored.
if !full_range_checked {
store_range(
field,
start_cindex..end_cindex,
start_vindex,
end_vindex,
store,
state,
spec,
ops,
)?;
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn store_range<F, E, S, I>(
_: F,
range: I,
start_vindex: usize,
end_vindex: usize,
store: &S,
state: &BeaconState<E>,
spec: &ChainSpec,
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<bool, Error>
where
F: Field<E>,
E: EthSpec,
S: KeyValueStore<E>,
I: Iterator<Item = usize>,
{
for chunk_index in range {
let chunk_key = &chunk_key(chunk_index)[..];
let existing_chunk =
Chunk::<F::Value>::load(store, F::column(), chunk_key)?.unwrap_or_default();
let new_chunk = F::get_updated_chunk(
&existing_chunk,
chunk_index,
start_vindex,
end_vindex,
state,
spec,
)?;
if new_chunk == existing_chunk {
return Ok(false);
}
new_chunk.store(F::column(), chunk_key, ops)?;
}
Ok(true)
}
// Chunks at the end index are included.
// TODO: could be more efficient with a real range query (perhaps RocksDB)
fn range_query<S: KeyValueStore<E>, E: EthSpec, T: Decode + Encode>(
store: &S,
column: DBColumn,
start_index: usize,
end_index: usize,
) -> Result<Vec<Chunk<T>>, Error> {
let range = start_index..=end_index;
let len = range
.end()
// Add one to account for inclusive range.
.saturating_add(1)
.saturating_sub(*range.start());
let mut result = Vec::with_capacity(len);
for chunk_index in range {
let key = &chunk_key(chunk_index)[..];
let chunk = Chunk::load(store, column, key)?.ok_or(ChunkError::Missing { chunk_index })?;
result.push(chunk);
}
Ok(result)
}
/// Combine chunks to form a list or vector of all values with vindex in `start_vindex..end_vindex`.
///
/// The `length` parameter is the length of the vec to construct, with entries set to `default` if
/// they lie outside the vindex range.
fn stitch<T: Default + Clone>(
chunks: Vec<Chunk<T>>,
start_vindex: usize,
end_vindex: usize,
chunk_size: usize,
length: usize,
default: T,
) -> Result<Vec<T>, ChunkError> {
if start_vindex + length < end_vindex {
return Err(ChunkError::OversizedRange {
start_vindex,
end_vindex,
length,
});
}
let start_cindex = start_vindex / chunk_size;
let end_cindex = end_vindex / chunk_size;
let mut result = vec![default; length];
for (chunk_index, chunk) in (start_cindex..=end_cindex).zip(chunks.into_iter()) {
// All chunks but the last chunk must be full-sized
if chunk_index != end_cindex && chunk.values.len() != chunk_size {
return Err(ChunkError::InvalidSize {
chunk_index,
expected: chunk_size,
actual: chunk.values.len(),
});
}
// Copy the chunk entries into the result vector
for (i, value) in chunk.values.into_iter().enumerate() {
let vindex = chunk_index * chunk_size + i;
if vindex >= start_vindex && vindex < end_vindex {
result[vindex % length] = value;
}
}
}
Ok(result)
}
pub fn load_vector_from_db<F: FixedLengthField<E>, E: EthSpec, S: KeyValueStore<E>>(
store: &S,
slot: Slot,
spec: &ChainSpec,
) -> Result<FixedVector<F::Value, F::Length>, Error> {
// Do a range query
let chunk_size = F::chunk_size();
let (start_vindex, end_vindex) = F::start_and_end_vindex(slot, spec);
let start_cindex = start_vindex / chunk_size;
let end_cindex = end_vindex / chunk_size;
let chunks = range_query(store, F::column(), start_cindex, end_cindex)?;
let default = if F::slot_needs_genesis_value(slot, spec) {
F::load_genesis_value(store)?
} else {
F::Value::default()
};
let result = stitch(
chunks,
start_vindex,
end_vindex,
chunk_size,
F::Length::to_usize(),
default,
)?;
Ok(result.into())
}
/// The historical roots are stored in vector chunks, despite not actually being a vector.
pub fn load_variable_list_from_db<F: VariableLengthField<E>, E: EthSpec, S: KeyValueStore<E>>(
store: &S,
slot: Slot,
spec: &ChainSpec,
) -> Result<VariableList<F::Value, F::Length>, Error> {
let chunk_size = F::chunk_size();
let (start_vindex, end_vindex) = F::start_and_end_vindex(slot, spec);
let start_cindex = start_vindex / chunk_size;
let end_cindex = end_vindex / chunk_size;
let chunks: Vec<Chunk<F::Value>> = range_query(store, F::column(), start_cindex, end_cindex)?;
let mut result = Vec::with_capacity(chunk_size * chunks.len());
for (chunk_index, chunk) in chunks.into_iter().enumerate() {
for (i, value) in chunk.values.into_iter().enumerate() {
let vindex = chunk_index * chunk_size + i;
if vindex >= start_vindex && vindex < end_vindex {
result.push(value);
}
}
}
Ok(result.into())
}
/// Index into a field of the state, avoiding out of bounds and division by 0.
fn safe_modulo_index<T: Copy>(values: &[T], index: u64) -> Result<T, ChunkError> {
if values.is_empty() {
Err(ChunkError::ZeroLengthVector)
} else {
Ok(values[index as usize % values.len()])
}
}
/// A chunk of a fixed-size vector from the `BeaconState`, stored in the database.
#[derive(Debug, Clone, PartialEq)]
pub struct Chunk<T> {
/// A vector of up-to `chunk_size` values.
pub values: Vec<T>,
}
impl<T> Default for Chunk<T>
where
T: Decode + Encode,
{
fn default() -> Self {
Chunk { values: vec![] }
}
}
impl<T> Chunk<T>
where
T: Decode + Encode,
{
pub fn new(values: Vec<T>) -> Self {
Chunk { values }
}
pub fn load<S: KeyValueStore<E>, E: EthSpec>(
store: &S,
column: DBColumn,
key: &[u8],
) -> Result<Option<Self>, Error> {
store
.get_bytes(column.into(), key)?
.map(|bytes| Self::decode(&bytes))
.transpose()
}
pub fn store(
&self,
column: DBColumn,
key: &[u8],
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
let db_key = get_key_for_col(column.into(), key);
ops.push(KeyValueStoreOp::PutKeyValue(db_key, self.encode()?));
Ok(())
}
/// Attempt to decode a single chunk.
pub fn decode(bytes: &[u8]) -> Result<Self, Error> {
if !<T as Decode>::is_ssz_fixed_len() {
return Err(Error::from(ChunkError::InvalidType));
}
let value_size = <T as Decode>::ssz_fixed_len();
if value_size == 0 {
return Err(Error::from(ChunkError::InvalidType));
}
let values = bytes
.chunks(value_size)
.map(T::from_ssz_bytes)
.collect::<Result<_, _>>()?;
Ok(Chunk { values })
}
pub fn encoded_size(&self) -> usize {
self.values.len() * <T as Encode>::ssz_fixed_len()
}
/// Encode a single chunk as bytes.
pub fn encode(&self) -> Result<Vec<u8>, Error> {
if !<T as Encode>::is_ssz_fixed_len() {
return Err(Error::from(ChunkError::InvalidType));
}
Ok(self.values.iter().flat_map(T::as_ssz_bytes).collect())
}
}
#[derive(Debug, PartialEq)]
pub enum ChunkError {
ZeroLengthVector,
InvalidSize {
chunk_index: usize,
expected: usize,
actual: usize,
},
Missing {
chunk_index: usize,
},
MissingGenesisValue,
Inconsistent {
field: DBColumn,
chunk_index: usize,
existing_value: String,
new_value: String,
},
InconsistentGenesisValue {
field: DBColumn,
existing_value: String,
new_value: String,
},
InvalidGenesisChunk {
field: DBColumn,
expected_len: usize,
observed_len: usize,
},
InvalidType,
OversizedRange {
start_vindex: usize,
end_vindex: usize,
length: usize,
},
InvalidFork,
}
#[cfg(test)]
mod test {
use super::*;
use types::MainnetEthSpec as TestSpec;
use types::*;
fn v(i: u64) -> Hash256 {
Hash256::from_low_u64_be(i)
}
#[test]
fn stitch_default() {
let chunk_size = 4;
let chunks = vec![
Chunk::new(vec![0u64, 1, 2, 3]),
Chunk::new(vec![4, 5, 0, 0]),
];
assert_eq!(
stitch(chunks, 2, 6, chunk_size, 12, 99).unwrap(),
vec![99, 99, 2, 3, 4, 5, 99, 99, 99, 99, 99, 99]
);
}
#[test]
fn stitch_basic() {
let chunk_size = 4;
let default = v(0);
let chunks = vec![
Chunk::new(vec![v(0), v(1), v(2), v(3)]),
Chunk::new(vec![v(4), v(5), v(6), v(7)]),
Chunk::new(vec![v(8), v(9), v(10), v(11)]),
];
assert_eq!(
stitch(chunks.clone(), 0, 12, chunk_size, 12, default).unwrap(),
(0..12).map(v).collect::<Vec<_>>()
);
assert_eq!(
stitch(chunks, 2, 10, chunk_size, 8, default).unwrap(),
vec![v(8), v(9), v(2), v(3), v(4), v(5), v(6), v(7)]
);
}
#[test]
fn stitch_oversized_range() {
let chunk_size = 4;
let default = 0;
let chunks = vec![Chunk::new(vec![20u64, 21, 22, 23])];
// Args (start_vindex, end_vindex, length)
let args = vec![(0, 21, 20), (0, 2048, 1024), (0, 2, 1)];
for (start_vindex, end_vindex, length) in args {
assert_eq!(
stitch(
chunks.clone(),
start_vindex,
end_vindex,
chunk_size,
length,
default
),
Err(ChunkError::OversizedRange {
start_vindex,
end_vindex,
length,
})
);
}
}
#[test]
fn fixed_length_fields() {
fn test_fixed_length<F: Field<TestSpec>>(_: F, expected: bool) {
assert_eq!(F::is_fixed_length(), expected);
}
test_fixed_length(BlockRoots, true);
test_fixed_length(StateRoots, true);
test_fixed_length(HistoricalRoots, false);
test_fixed_length(RandaoMixes, true);
}
fn needs_genesis_value_once_per_slot<F: Field<TestSpec>>(_: F) {
let spec = &TestSpec::default_spec();
let max = F::Length::to_u64();
for i in 0..max {
assert!(
F::slot_needs_genesis_value(Slot::new(i), spec),
"slot {}",
i
);
}
assert!(!F::slot_needs_genesis_value(Slot::new(max), spec));
}
#[test]
fn needs_genesis_value_block_roots() {
needs_genesis_value_once_per_slot(BlockRoots);
}
#[test]
fn needs_genesis_value_state_roots() {
needs_genesis_value_once_per_slot(StateRoots);
}
#[test]
fn needs_genesis_value_historical_roots() {
let spec = &TestSpec::default_spec();
assert!(
!<HistoricalRoots as Field<TestSpec>>::slot_needs_genesis_value(Slot::new(0), spec)
);
}
fn needs_genesis_value_test_randao<F: Field<TestSpec>>(_: F) {
let spec = &TestSpec::default_spec();
let max = TestSpec::slots_per_epoch() * (F::Length::to_u64() - 1);
for i in 0..max {
assert!(
F::slot_needs_genesis_value(Slot::new(i), spec),
"slot {}",
i
);
}
assert!(!F::slot_needs_genesis_value(Slot::new(max), spec));
}
#[test]
fn needs_genesis_value_randao() {
needs_genesis_value_test_randao(RandaoMixes);
}
}

View File

@@ -1,23 +1,29 @@
use crate::hdiff::HierarchyConfig;
use crate::{DBColumn, Error, StoreItem};
use serde_derive::{Deserialize, Serialize};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use types::{EthSpec, MinimalEthSpec};
use std::io::Write;
use zstd::Encoder;
pub const PREV_DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048;
pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 8192;
pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5;
pub const DEFAULT_EPOCHS_PER_STATE_DIFF: u64 = 4;
pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 64;
pub const DEFAULT_STATE_CACHE_SIZE: usize = 128;
pub const DEFAULT_COMPRESSION_LEVEL: i32 = 1;
const EST_COMPRESSION_FACTOR: usize = 2;
pub const DEFAULT_HISTORIC_STATE_CACHE_SIZE: usize = 1;
/// Database configuration parameters.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct StoreConfig {
/// Number of slots to wait between storing restore points in the freezer database.
pub slots_per_restore_point: u64,
/// Flag indicating whether the `slots_per_restore_point` was set explicitly by the user.
pub slots_per_restore_point_set_explicitly: bool,
/// Number of epochs between state diffs in the hot database.
pub epochs_per_state_diff: u64,
/// Maximum number of blocks to store in the in-memory block cache.
pub block_cache_size: usize,
/// Maximum number of states to store in the in-memory state cache.
pub state_cache_size: usize,
/// Compression level for `BeaconStateDiff`s.
pub compression_level: i32,
/// Maximum number of states from freezer database to store in the in-memory state cache.
pub historic_state_cache_size: usize,
/// Whether to compact the database on initialization.
@@ -26,30 +32,42 @@ pub struct StoreConfig {
pub compact_on_prune: bool,
/// Whether to prune payloads on initialization and finalization.
pub prune_payloads: bool,
/// Whether to store finalized blocks compressed and linearised in the freezer database.
pub linear_blocks: bool,
/// Whether to store finalized states compressed and linearised in the freezer database.
pub linear_restore_points: bool,
/// State diff hierarchy.
pub hierarchy_config: HierarchyConfig,
}
/// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params.
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
// FIXME(sproul): schema migration, add hdiff
pub struct OnDiskStoreConfig {
pub slots_per_restore_point: u64,
pub linear_blocks: bool,
pub linear_restore_points: bool,
}
#[derive(Debug, Clone)]
pub enum StoreConfigError {
MismatchedSlotsPerRestorePoint { config: u64, on_disk: u64 },
InvalidCompressionLevel { level: i32 },
}
impl Default for StoreConfig {
fn default() -> Self {
Self {
// Safe default for tests, shouldn't ever be read by a CLI node.
slots_per_restore_point: MinimalEthSpec::slots_per_historical_root() as u64,
slots_per_restore_point_set_explicitly: false,
epochs_per_state_diff: DEFAULT_EPOCHS_PER_STATE_DIFF,
block_cache_size: DEFAULT_BLOCK_CACHE_SIZE,
state_cache_size: DEFAULT_STATE_CACHE_SIZE,
compression_level: DEFAULT_COMPRESSION_LEVEL,
historic_state_cache_size: DEFAULT_HISTORIC_STATE_CACHE_SIZE,
compact_on_init: false,
compact_on_prune: true,
prune_payloads: true,
linear_blocks: true,
linear_restore_points: true,
hierarchy_config: HierarchyConfig::default(),
}
}
}
@@ -57,22 +75,58 @@ impl Default for StoreConfig {
impl StoreConfig {
pub fn as_disk_config(&self) -> OnDiskStoreConfig {
OnDiskStoreConfig {
slots_per_restore_point: self.slots_per_restore_point,
linear_blocks: self.linear_blocks,
linear_restore_points: self.linear_restore_points,
}
}
pub fn check_compatibility(
&self,
on_disk_config: &OnDiskStoreConfig,
_on_disk_config: &OnDiskStoreConfig,
) -> Result<(), StoreConfigError> {
if self.slots_per_restore_point != on_disk_config.slots_per_restore_point {
return Err(StoreConfigError::MismatchedSlotsPerRestorePoint {
config: self.slots_per_restore_point,
on_disk: on_disk_config.slots_per_restore_point,
});
}
// FIXME(sproul): TODO
Ok(())
}
/// Check that the compression level is valid.
pub fn verify_compression_level(&self) -> Result<(), StoreConfigError> {
if zstd::compression_level_range().contains(&self.compression_level) {
Ok(())
} else {
Err(StoreConfigError::InvalidCompressionLevel {
level: self.compression_level,
})
}
}
/// Estimate the size of `len` bytes after compression at the current compression level.
pub fn estimate_compressed_size(&self, len: usize) -> usize {
if self.compression_level == 0 {
len
} else {
len / EST_COMPRESSION_FACTOR
}
}
/// Estimate the size of `len` compressed bytes after decompression at the current compression
/// level.
pub fn estimate_decompressed_size(&self, len: usize) -> usize {
if self.compression_level == 0 {
len
} else {
len * EST_COMPRESSION_FACTOR
}
}
pub fn compress_bytes(&self, ssz_bytes: &[u8]) -> Result<Vec<u8>, Error> {
let mut compressed_value =
Vec::with_capacity(self.estimate_compressed_size(ssz_bytes.len()));
let mut encoder = Encoder::new(&mut compressed_value, self.compression_level)
.map_err(Error::Compression)?;
encoder.write_all(ssz_bytes).map_err(Error::Compression)?;
encoder.finish().map_err(Error::Compression)?;
Ok(compressed_value)
}
}
impl StoreItem for OnDiskStoreConfig {
@@ -80,8 +134,8 @@ impl StoreItem for OnDiskStoreConfig {
DBColumn::BeaconMeta
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(self.as_ssz_bytes())
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {

View File

@@ -1,16 +1,15 @@
use crate::chunked_vector::ChunkError;
use crate::config::StoreConfigError;
use crate::hdiff;
use crate::hot_cold_store::HotColdDBError;
use ssz::DecodeError;
use state_processing::BlockReplayError;
use types::{BeaconStateError, Hash256, InconsistentFork, Slot};
use types::{milhouse, BeaconStateError, Epoch, Hash256, InconsistentFork, Slot};
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug)]
pub enum Error {
SszDecodeError(DecodeError),
VectorChunkError(ChunkError),
BeaconStateError(BeaconStateError),
PartialBeaconStateError,
HotColdDBError(HotColdDBError),
@@ -40,11 +39,38 @@ pub enum Error {
expected: Hash256,
computed: Hash256,
},
MissingStateRoot(Slot),
MissingState(Hash256),
MissingSnapshot(Epoch),
MissingDiff(Epoch),
NoBaseStateFound(Hash256),
BlockReplayError(BlockReplayError),
MilhouseError(milhouse::Error),
Compression(std::io::Error),
MissingPersistedBeaconChain,
SlotIsBeforeSplit {
slot: Slot,
},
FinalizedStateDecreasingSlot,
FinalizedStateUnaligned,
StateForCacheHasPendingUpdates {
state_root: Hash256,
slot: Slot,
},
AddPayloadLogicError,
SlotClockUnavailableForMigration,
MissingImmutableValidator(usize),
MissingValidator(usize),
V9MigrationFailure(Hash256),
ValidatorPubkeyCacheError(String),
DuplicateValidatorPublicKey,
InvalidValidatorPubkeyBytes(bls::Error),
ValidatorPubkeyCacheUninitialized,
InvalidKey,
UnableToDowngrade,
Hdiff(hdiff::Error),
InconsistentFork(InconsistentFork),
ZeroCacheSize,
}
pub trait HandleUnavailable<T> {
@@ -67,12 +93,6 @@ impl From<DecodeError> for Error {
}
}
impl From<ChunkError> for Error {
fn from(e: ChunkError) -> Error {
Error::VectorChunkError(e)
}
}
impl From<HotColdDBError> for Error {
fn from(e: HotColdDBError) -> Error {
Error::HotColdDBError(e)
@@ -97,6 +117,18 @@ impl From<StoreConfigError> for Error {
}
}
impl From<milhouse::Error> for Error {
fn from(e: milhouse::Error) -> Self {
Self::MilhouseError(e)
}
}
impl From<hdiff::Error> for Error {
fn from(e: hdiff::Error) -> Self {
Self::Hdiff(e)
}
}
impl From<BlockReplayError> for Error {
fn from(e: BlockReplayError) -> Error {
Error::BlockReplayError(e)

View File

@@ -1,29 +1,34 @@
use crate::chunked_iter::ChunkedVectorIter;
use crate::chunked_vector::{BlockRoots, Field, StateRoots};
use crate::errors::{Error, Result};
use crate::iter::{BlockRootsIterator, StateRootsIterator};
use crate::{HotColdDB, ItemStore};
use crate::{ColumnIter, DBColumn, HotColdDB, ItemStore};
use itertools::process_results;
use types::{BeaconState, ChainSpec, EthSpec, Hash256, Slot};
use std::marker::PhantomData;
use types::{BeaconState, EthSpec, Hash256, Slot};
pub type HybridForwardsBlockRootsIterator<'a, E, Hot, Cold> =
HybridForwardsIterator<'a, E, BlockRoots, Hot, Cold>;
HybridForwardsIterator<'a, E, Hot, Cold>;
pub type HybridForwardsStateRootsIterator<'a, E, Hot, Cold> =
HybridForwardsIterator<'a, E, StateRoots, Hot, Cold>;
HybridForwardsIterator<'a, E, Hot, Cold>;
/// Trait unifying `BlockRoots` and `StateRoots` for forward iteration.
pub trait Root<E: EthSpec>: Field<E, Value = Hash256> {
fn simple_forwards_iterator<Hot: ItemStore<E>, Cold: ItemStore<E>>(
store: &HotColdDB<E, Hot, Cold>,
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold> {
pub fn simple_forwards_iterator(
&self,
column: DBColumn,
start_slot: Slot,
end_state: BeaconState<E>,
end_root: Hash256,
) -> Result<SimpleForwardsIterator>;
}
) -> Result<SimpleForwardsIterator> {
if column == DBColumn::BeaconBlockRoots {
self.forwards_iter_block_roots_using_state(start_slot, end_state, end_root)
} else if column == DBColumn::BeaconStateRoots {
self.forwards_iter_state_roots_using_state(start_slot, end_state, end_root)
} else {
panic!("FIXME(sproul): better error")
}
}
impl<E: EthSpec> Root<E> for BlockRoots {
fn simple_forwards_iterator<Hot: ItemStore<E>, Cold: ItemStore<E>>(
store: &HotColdDB<E, Hot, Cold>,
pub fn forwards_iter_block_roots_using_state(
&self,
start_slot: Slot,
end_state: BeaconState<E>,
end_block_root: Hash256,
@@ -31,7 +36,7 @@ impl<E: EthSpec> Root<E> for BlockRoots {
// Iterate backwards from the end state, stopping at the start slot.
let values = process_results(
std::iter::once(Ok((end_block_root, end_state.slot())))
.chain(BlockRootsIterator::owned(store, end_state)),
.chain(BlockRootsIterator::owned(self, end_state)),
|iter| {
iter.take_while(|(_, slot)| *slot >= start_slot)
.collect::<Vec<_>>()
@@ -39,11 +44,9 @@ impl<E: EthSpec> Root<E> for BlockRoots {
)?;
Ok(SimpleForwardsIterator { values })
}
}
impl<E: EthSpec> Root<E> for StateRoots {
fn simple_forwards_iterator<Hot: ItemStore<E>, Cold: ItemStore<E>>(
store: &HotColdDB<E, Hot, Cold>,
pub fn forwards_iter_state_roots_using_state(
&self,
start_slot: Slot,
end_state: BeaconState<E>,
end_state_root: Hash256,
@@ -51,7 +54,7 @@ impl<E: EthSpec> Root<E> for StateRoots {
// Iterate backwards from the end state, stopping at the start slot.
let values = process_results(
std::iter::once(Ok((end_state_root, end_state.slot())))
.chain(StateRootsIterator::owned(store, end_state)),
.chain(StateRootsIterator::owned(self, end_state)),
|iter| {
iter.take_while(|(_, slot)| *slot >= start_slot)
.collect::<Vec<_>>()
@@ -62,40 +65,62 @@ impl<E: EthSpec> Root<E> for StateRoots {
}
/// Forwards root iterator that makes use of a flat field table in the freezer DB.
pub struct FrozenForwardsIterator<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
{
inner: ChunkedVectorIter<'a, F, E, Hot, Cold>,
pub struct FrozenForwardsIterator<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
inner: ColumnIter<'a, Vec<u8>>,
limit: Slot,
finished: bool,
_phantom: PhantomData<(E, Hot, Cold)>,
}
impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
FrozenForwardsIterator<'a, E, F, Hot, Cold>
impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
FrozenForwardsIterator<'a, E, Hot, Cold>
{
/// `end_slot` is EXCLUSIVE here.
pub fn new(
store: &'a HotColdDB<E, Hot, Cold>,
column: DBColumn,
start_slot: Slot,
last_restore_point_slot: Slot,
spec: &ChainSpec,
end_slot: Slot,
) -> Self {
if column != DBColumn::BeaconBlockRoots && column != DBColumn::BeaconStateRoots {
panic!("FIXME(sproul): bad column error");
}
let start = start_slot.as_u64().to_be_bytes();
Self {
inner: ChunkedVectorIter::new(
store,
start_slot.as_usize(),
last_restore_point_slot,
spec,
),
inner: store.cold_db.iter_column_from(column, &start),
limit: end_slot,
finished: false,
_phantom: PhantomData,
}
}
}
impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>> Iterator
for FrozenForwardsIterator<'a, E, F, Hot, Cold>
impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Iterator
for FrozenForwardsIterator<'a, E, Hot, Cold>
{
type Item = (Hash256, Slot);
type Item = Result<(Hash256, Slot)>;
fn next(&mut self) -> Option<Self::Item> {
if self.finished {
return None;
}
self.inner
.next()
.map(|(slot, root)| (root, Slot::from(slot)))
.next()?
.and_then(|(slot_bytes, root_bytes)| {
if slot_bytes.len() != 8 || root_bytes.len() != 32 {
panic!("FIXME(sproul): put an error here")
} else {
let slot = Slot::new(u64::from_be_bytes(slot_bytes.try_into().unwrap()));
let root = Hash256::from_slice(&root_bytes);
if slot + 1 == self.limit {
self.finished = true;
}
Ok(Some((root, slot)))
}
})
.transpose()
}
}
@@ -115,24 +140,27 @@ impl Iterator for SimpleForwardsIterator {
}
/// Fusion of the above two approaches to forwards iteration. Fast and efficient.
pub enum HybridForwardsIterator<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>> {
pub enum HybridForwardsIterator<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
PreFinalization {
iter: Box<FrozenForwardsIterator<'a, E, F, Hot, Cold>>,
iter: Box<FrozenForwardsIterator<'a, E, Hot, Cold>>,
store: &'a HotColdDB<E, Hot, Cold>,
/// Data required by the `PostFinalization` iterator when we get to it.
continuation_data: Option<Box<(BeaconState<E>, Hash256)>>,
column: DBColumn,
},
PostFinalizationLazy {
continuation_data: Option<Box<(BeaconState<E>, Hash256)>>,
store: &'a HotColdDB<E, Hot, Cold>,
start_slot: Slot,
column: DBColumn,
},
PostFinalization {
iter: SimpleForwardsIterator,
},
}
impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
HybridForwardsIterator<'a, E, F, Hot, Cold>
impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
HybridForwardsIterator<'a, E, Hot, Cold>
{
/// Construct a new hybrid iterator.
///
@@ -148,41 +176,41 @@ impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
/// function may block for some time while `get_state` runs.
pub fn new(
store: &'a HotColdDB<E, Hot, Cold>,
column: DBColumn,
start_slot: Slot,
end_slot: Option<Slot>,
get_state: impl FnOnce() -> (BeaconState<E>, Hash256),
spec: &ChainSpec,
) -> Result<Self> {
use HybridForwardsIterator::*;
let latest_restore_point_slot = store.get_latest_restore_point_slot();
// FIXME(sproul): consider whether this is 100% correct
let split_slot = store.get_split_slot();
let result = if start_slot < latest_restore_point_slot {
let result = if start_slot < split_slot {
let iter = Box::new(FrozenForwardsIterator::new(
store,
start_slot,
latest_restore_point_slot,
spec,
store, column, start_slot, split_slot,
));
// No continuation data is needed if the forwards iterator plans to halt before
// `end_slot`. If it tries to continue further a `NoContinuationData` error will be
// returned.
let continuation_data =
if end_slot.map_or(false, |end_slot| end_slot < latest_restore_point_slot) {
None
} else {
Some(Box::new(get_state()))
};
let continuation_data = if end_slot.map_or(false, |end_slot| end_slot < split_slot) {
None
} else {
Some(Box::new(get_state()))
};
PreFinalization {
iter,
store,
continuation_data,
column,
}
} else {
PostFinalizationLazy {
continuation_data: Some(Box::new(get_state())),
store,
start_slot,
column,
}
};
@@ -195,22 +223,24 @@ impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
match self {
PreFinalization {
iter,
store,
continuation_data,
column,
} => {
match iter.next() {
Some(x) => Ok(Some(x)),
Some(x) => x.map(Some),
// Once the pre-finalization iterator is consumed, transition
// to a post-finalization iterator beginning from the last slot
// of the pre iterator.
None => {
let continuation_data = continuation_data.take();
let store = iter.inner.store;
let start_slot = Slot::from(iter.inner.end_vindex);
let start_slot = Slot::from(iter.limit);
*self = PostFinalizationLazy {
continuation_data,
store,
start_slot,
column: *column,
};
self.do_next()
@@ -221,11 +251,17 @@ impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
continuation_data,
store,
start_slot,
column,
} => {
let (end_state, end_root) =
*continuation_data.take().ok_or(Error::NoContinuationData)?;
*self = PostFinalization {
iter: F::simple_forwards_iterator(store, *start_slot, end_state, end_root)?,
iter: store.simple_forwards_iterator(
*column,
*start_slot,
end_state,
end_root,
)?,
};
self.do_next()
}
@@ -234,8 +270,8 @@ impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
}
}
impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>> Iterator
for HybridForwardsIterator<'a, E, F, Hot, Cold>
impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Iterator
for HybridForwardsIterator<'a, E, Hot, Cold>
{
type Item = Result<(Hash256, Slot)>;

View File

@@ -0,0 +1,349 @@
//! Hierarchical diff implementation.
use crate::{DBColumn, StoreItem};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use std::io::{Read, Write};
use types::{BeaconState, ChainSpec, Epoch, EthSpec, VList};
use zstd::{Decoder, Encoder};
#[derive(Debug)]
pub enum Error {
InvalidHierarchy,
XorDeletionsNotSupported,
UnableToComputeDiff,
UnableToApplyDiff,
Compression(std::io::Error),
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct HierarchyConfig {
exponents: Vec<u8>,
}
#[derive(Debug)]
pub struct HierarchyModuli {
moduli: Vec<u64>,
}
#[derive(Debug, PartialEq, Eq)]
pub enum StorageStrategy {
Nothing,
DiffFrom(Epoch),
Snapshot,
}
/// Hierarchical diff output and working buffer.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct HDiffBuffer {
state: Vec<u8>,
balances: Vec<u64>,
}
/// Hierarchical state diff.
#[derive(Debug, Encode, Decode)]
pub struct HDiff {
state_diff: BytesDiff,
balances_diff: XorDiff,
}
#[derive(Debug, Encode, Decode)]
pub struct BytesDiff {
bytes: Vec<u8>,
}
#[derive(Debug, Encode, Decode)]
pub struct XorDiff {
bytes: Vec<u8>,
}
impl HDiffBuffer {
pub fn from_state<E: EthSpec>(mut beacon_state: BeaconState<E>) -> Self {
let balances_list = std::mem::take(beacon_state.balances_mut());
let state = beacon_state.as_ssz_bytes();
let balances = balances_list.to_vec();
HDiffBuffer { state, balances }
}
pub fn into_state<E: EthSpec>(self, spec: &ChainSpec) -> Result<BeaconState<E>, Error> {
let mut state = BeaconState::from_ssz_bytes(&self.state, spec).unwrap();
*state.balances_mut() = VList::new(self.balances).unwrap();
Ok(state)
}
}
impl HDiff {
pub fn compute(source: &HDiffBuffer, target: &HDiffBuffer) -> Result<Self, Error> {
let state_diff = BytesDiff::compute(&source.state, &target.state)?;
let balances_diff = XorDiff::compute(&source.balances, &target.balances)?;
Ok(Self {
state_diff,
balances_diff,
})
}
pub fn apply(&self, source: &mut HDiffBuffer) -> Result<(), Error> {
let source_state = std::mem::take(&mut source.state);
self.state_diff.apply(&source_state, &mut source.state)?;
self.balances_diff.apply(&mut source.balances)?;
Ok(())
}
pub fn state_diff_len(&self) -> usize {
self.state_diff.bytes.len()
}
pub fn balances_diff_len(&self) -> usize {
self.balances_diff.bytes.len()
}
}
impl StoreItem for HDiff {
fn db_column() -> DBColumn {
DBColumn::BeaconStateDiff
}
fn as_store_bytes(&self) -> Result<Vec<u8>, crate::Error> {
Ok(self.as_ssz_bytes())
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, crate::Error> {
Ok(Self::from_ssz_bytes(bytes)?)
}
}
impl BytesDiff {
pub fn compute(source: &[u8], target: &[u8]) -> Result<Self, Error> {
Self::compute_xdelta(source, target)
}
pub fn compute_xdelta(source_bytes: &[u8], target_bytes: &[u8]) -> Result<Self, Error> {
let bytes =
xdelta3::encode(target_bytes, source_bytes).ok_or(Error::UnableToComputeDiff)?;
Ok(Self { bytes })
}
pub fn apply(&self, source: &[u8], target: &mut Vec<u8>) -> Result<(), Error> {
self.apply_xdelta(source, target)
}
pub fn apply_xdelta(&self, source: &[u8], target: &mut Vec<u8>) -> Result<(), Error> {
*target = xdelta3::decode(&self.bytes, source).ok_or(Error::UnableToApplyDiff)?;
Ok(())
}
}
impl XorDiff {
pub fn compute(xs: &[u64], ys: &[u64]) -> Result<Self, Error> {
if xs.len() > ys.len() {
return Err(Error::XorDeletionsNotSupported);
}
let uncompressed_bytes: Vec<u8> = ys
.iter()
.enumerate()
.flat_map(|(i, y)| {
// Diff from 0 if the entry is new.
let x = xs.get(i).copied().unwrap_or(0);
y.wrapping_sub(x).to_be_bytes()
})
.collect();
// FIXME(sproul): reconsider
let compression_level = 1;
let mut compressed_bytes = Vec::with_capacity(uncompressed_bytes.len() / 2);
let mut encoder =
Encoder::new(&mut compressed_bytes, compression_level).map_err(Error::Compression)?;
encoder
.write_all(&uncompressed_bytes)
.map_err(Error::Compression)?;
encoder.finish().map_err(Error::Compression)?;
Ok(XorDiff {
bytes: compressed_bytes,
})
}
pub fn apply(&self, xs: &mut Vec<u64>) -> Result<(), Error> {
// Decompress balances diff.
let mut balances_diff_bytes = Vec::with_capacity(2 * self.bytes.len());
let mut decoder = Decoder::new(&*self.bytes).map_err(Error::Compression)?;
decoder
.read_to_end(&mut balances_diff_bytes)
.map_err(Error::Compression)?;
for (i, diff_bytes) in balances_diff_bytes
.chunks(u64::BITS as usize / 8)
.enumerate()
{
// FIXME(sproul): unwrap
let diff = u64::from_be_bytes(diff_bytes.try_into().unwrap());
if let Some(x) = xs.get_mut(i) {
*x = x.wrapping_add(diff);
} else {
xs.push(diff);
}
}
Ok(())
}
}
impl Default for HierarchyConfig {
fn default() -> Self {
HierarchyConfig {
exponents: vec![0, 4, 6, 8, 11, 13, 16],
}
}
}
impl HierarchyConfig {
pub fn to_moduli(&self) -> Result<HierarchyModuli, Error> {
self.validate()?;
let moduli = self.exponents.iter().map(|n| 1 << n).collect();
Ok(HierarchyModuli { moduli })
}
pub fn validate(&self) -> Result<(), Error> {
if self.exponents.len() > 2
&& self
.exponents
.iter()
.tuple_windows()
.all(|(small, big)| small < big && *big < u64::BITS as u8)
{
Ok(())
} else {
Err(Error::InvalidHierarchy)
}
}
}
impl HierarchyModuli {
pub fn storage_strategy(&self, epoch: Epoch) -> Result<StorageStrategy, Error> {
let last = self.moduli.last().copied().ok_or(Error::InvalidHierarchy)?;
if epoch % last == 0 {
return Ok(StorageStrategy::Snapshot);
}
let diff_from = self.moduli.iter().rev().find_map(|&n| {
(epoch % n == 0).then(|| {
// Diff from the previous state.
(epoch - 1) / n * n
})
});
Ok(diff_from.map_or(StorageStrategy::Nothing, StorageStrategy::DiffFrom))
}
/// Return the smallest epoch greater than or equal to `epoch` at which a full snapshot should
/// be stored.
pub fn next_snapshot_epoch(&self, epoch: Epoch) -> Result<Epoch, Error> {
let last = self.moduli.last().copied().ok_or(Error::InvalidHierarchy)?;
if epoch % last == 0 {
Ok(epoch)
} else {
Ok((epoch / last + 1) * last)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_storage_strategy() {
let config = HierarchyConfig::default();
config.validate().unwrap();
let moduli = config.to_moduli().unwrap();
// Full snapshots at multiples of 2^16.
let snapshot_freq = Epoch::new(1 << 16);
assert_eq!(
moduli.storage_strategy(Epoch::new(0)).unwrap(),
StorageStrategy::Snapshot
);
assert_eq!(
moduli.storage_strategy(snapshot_freq).unwrap(),
StorageStrategy::Snapshot
);
assert_eq!(
moduli.storage_strategy(snapshot_freq * 3).unwrap(),
StorageStrategy::Snapshot
);
// For the first layer of diffs
let first_layer = Epoch::new(1 << 13);
assert_eq!(
moduli.storage_strategy(first_layer * 2).unwrap(),
StorageStrategy::DiffFrom(first_layer)
);
}
#[test]
fn next_snapshot_epoch() {
let config = HierarchyConfig::default();
config.validate().unwrap();
let moduli = config.to_moduli().unwrap();
let snapshot_freq = Epoch::new(1 << 16);
assert_eq!(
moduli.next_snapshot_epoch(snapshot_freq).unwrap(),
snapshot_freq
);
assert_eq!(
moduli.next_snapshot_epoch(snapshot_freq + 1).unwrap(),
snapshot_freq * 2
);
assert_eq!(
moduli.next_snapshot_epoch(snapshot_freq * 2 - 1).unwrap(),
snapshot_freq * 2
);
assert_eq!(
moduli.next_snapshot_epoch(snapshot_freq * 2).unwrap(),
snapshot_freq * 2
);
assert_eq!(
moduli.next_snapshot_epoch(snapshot_freq * 100).unwrap(),
snapshot_freq * 100
);
}
#[test]
fn xor_vs_bytes_diff() {
let x_values = vec![99u64, 55, 123, 6834857, 0, 12];
let y_values = vec![98u64, 55, 312, 1, 1, 2, 4, 5];
let to_bytes =
|nums: &[u64]| -> Vec<u8> { nums.iter().flat_map(|x| x.to_be_bytes()).collect() };
let x_bytes = to_bytes(&x_values);
let y_bytes = to_bytes(&y_values);
let xor_diff = XorDiff::compute(&x_values, &y_values).unwrap();
let mut y_from_xor = x_values.clone();
xor_diff.apply(&mut y_from_xor).unwrap();
assert_eq!(y_values, y_from_xor);
let bytes_diff = BytesDiff::compute(&x_bytes, &y_bytes).unwrap();
let mut y_from_bytes = vec![];
bytes_diff.apply(&x_bytes, &mut y_from_bytes).unwrap();
assert_eq!(y_bytes, y_from_bytes);
// XOR diff wins by more than a factor of 3
assert!(xor_diff.bytes.len() < 3 * bytes_diff.bytes.len());
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,50 @@
use crate::{hot_cold_store::HotColdDBError, Error, HotColdDB, HotStateSummary, ItemStore};
use types::{EthSpec, Hash256, Slot};
pub struct HotStateRootIter<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
store: &'a HotColdDB<E, Hot, Cold>,
next_slot: Slot,
next_state_root: Hash256,
}
impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotStateRootIter<'a, E, Hot, Cold> {
pub fn new(
store: &'a HotColdDB<E, Hot, Cold>,
next_slot: Slot,
next_state_root: Hash256,
) -> Self {
Self {
store,
next_slot,
next_state_root,
}
}
fn do_next(&mut self) -> Result<Option<(Hash256, HotStateSummary)>, Error> {
if self.next_state_root.is_zero() {
return Ok(None);
}
let summary = self
.store
.load_hot_state_summary(&self.next_state_root)?
.ok_or(HotColdDBError::MissingHotStateSummary(self.next_state_root))?;
let state_root = self.next_state_root;
self.next_state_root = summary.prev_state_root;
self.next_slot -= 1;
Ok(Some((state_root, summary)))
}
}
impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Iterator
for HotStateRootIter<'a, E, Hot, Cold>
{
type Item = Result<(Hash256, HotStateSummary), Error>;
fn next(&mut self) -> Option<Self::Item> {
self.do_next().transpose()
}
}

View File

@@ -1,2 +1,3 @@
pub mod beacon_state;
pub mod execution_payload;
pub mod frozen_block_slot;

View File

@@ -1,62 +1,83 @@
use crate::*;
use ssz::{DecodeError, Encode};
use ssz::Encode;
use ssz_derive::Encode;
use std::convert::TryInto;
use types::beacon_state::{CloneConfig, CommitteeCache, CACHED_EPOCHS};
use std::io::{Read, Write};
use std::sync::Arc;
use types::{CompactBeaconState, PublicKeyBytes};
use zstd::{Decoder, Encoder};
pub fn store_full_state<E: EthSpec>(
state_root: &Hash256,
state: &BeaconState<E>,
ops: &mut Vec<KeyValueStoreOp>,
config: &StoreConfig,
) -> Result<(), Error> {
let bytes = {
let _overhead_timer = metrics::start_timer(&metrics::BEACON_STATE_WRITE_OVERHEAD_TIMES);
StorageContainer::new(state).as_ssz_bytes()
};
metrics::inc_counter_by(&metrics::BEACON_STATE_WRITE_BYTES, bytes.len() as u64);
let mut compressed_value = Vec::with_capacity(config.estimate_compressed_size(bytes.len()));
let mut encoder = Encoder::new(&mut compressed_value, config.compression_level)
.map_err(Error::Compression)?;
encoder.write_all(&bytes).map_err(Error::Compression)?;
encoder.finish().map_err(Error::Compression)?;
metrics::inc_counter_by(
&metrics::BEACON_STATE_WRITE_BYTES,
compressed_value.len() as u64,
);
metrics::inc_counter(&metrics::BEACON_STATE_WRITE_COUNT);
let key = get_key_for_col(DBColumn::BeaconState.into(), state_root.as_bytes());
ops.push(KeyValueStoreOp::PutKeyValue(key, bytes));
ops.push(KeyValueStoreOp::PutKeyValue(key, compressed_value));
Ok(())
}
pub fn get_full_state<KV: KeyValueStore<E>, E: EthSpec>(
pub fn get_full_state<KV: KeyValueStore<E>, E: EthSpec, F>(
db: &KV,
state_root: &Hash256,
immutable_validators: F,
config: &StoreConfig,
spec: &ChainSpec,
) -> Result<Option<BeaconState<E>>, Error> {
) -> Result<Option<BeaconState<E>>, Error>
where
F: Fn(usize) -> Option<Arc<PublicKeyBytes>>,
{
let total_timer = metrics::start_timer(&metrics::BEACON_STATE_READ_TIMES);
match db.get_bytes(DBColumn::BeaconState.into(), state_root.as_bytes())? {
Some(bytes) => {
let mut ssz_bytes = Vec::with_capacity(config.estimate_decompressed_size(bytes.len()));
let mut decoder = Decoder::new(&*bytes).map_err(Error::Compression)?;
decoder
.read_to_end(&mut ssz_bytes)
.map_err(Error::Compression)?;
let overhead_timer = metrics::start_timer(&metrics::BEACON_STATE_READ_OVERHEAD_TIMES);
let container = StorageContainer::from_ssz_bytes(&bytes, spec)?;
let container = StorageContainer::from_ssz_bytes(&ssz_bytes, spec)?;
metrics::stop_timer(overhead_timer);
metrics::stop_timer(total_timer);
metrics::inc_counter(&metrics::BEACON_STATE_READ_COUNT);
metrics::inc_counter_by(&metrics::BEACON_STATE_READ_BYTES, bytes.len() as u64);
Ok(Some(container.try_into()?))
Ok(Some(container.into_beacon_state(immutable_validators)?))
}
None => Ok(None),
}
}
/// A container for storing `BeaconState` components.
// TODO: would be more space efficient with the caches stored separately and referenced by hash
#[derive(Encode)]
pub struct StorageContainer<T: EthSpec> {
state: BeaconState<T>,
committee_caches: Vec<CommitteeCache>,
state: CompactBeaconState<T>,
}
impl<T: EthSpec> StorageContainer<T> {
/// Create a new instance for storing a `BeaconState`.
pub fn new(state: &BeaconState<T>) -> Self {
Self {
state: state.clone_with(CloneConfig::none()),
committee_caches: state.committee_caches().to_vec(),
state: state.clone().into_compact_state(),
}
}
@@ -66,36 +87,20 @@ impl<T: EthSpec> StorageContainer<T> {
let mut builder = ssz::SszDecoderBuilder::new(bytes);
builder.register_anonymous_variable_length_item()?;
builder.register_type::<Vec<CommitteeCache>>()?;
let mut decoder = builder.build()?;
let state = decoder.decode_next_with(|bytes| BeaconState::from_ssz_bytes(bytes, spec))?;
let committee_caches = decoder.decode_next()?;
let state =
decoder.decode_next_with(|bytes| CompactBeaconState::from_ssz_bytes(bytes, spec))?;
Ok(Self {
state,
committee_caches,
})
Ok(Self { state })
}
}
impl<T: EthSpec> TryInto<BeaconState<T>> for StorageContainer<T> {
type Error = Error;
fn try_into(mut self) -> Result<BeaconState<T>, Error> {
let mut state = self.state;
for i in (0..CACHED_EPOCHS).rev() {
if i >= self.committee_caches.len() {
return Err(Error::SszDecodeError(DecodeError::BytesInvalid(
"Insufficient committees for BeaconState".to_string(),
)));
};
state.committee_caches_mut()[i] = self.committee_caches.remove(i);
}
fn into_beacon_state<F>(self, immutable_validators: F) -> Result<BeaconState<T>, Error>
where
F: Fn(usize) -> Option<Arc<PublicKeyBytes>>,
{
let state = self.state.try_into_full_state(immutable_validators)?;
Ok(state)
}
}

View File

@@ -9,8 +9,8 @@ macro_rules! impl_store_item {
DBColumn::ExecPayload
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(self.as_ssz_bytes())
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
@@ -31,8 +31,8 @@ impl<E: EthSpec> StoreItem for ExecutionPayload<E> {
DBColumn::ExecPayload
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(self.as_ssz_bytes())
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {

View File

@@ -0,0 +1,19 @@
use crate::{DBColumn, Error, StoreItem};
use ssz::{Decode, Encode};
use types::Slot;
pub struct FrozenBlockSlot(pub Slot);
impl StoreItem for FrozenBlockSlot {
fn db_column() -> DBColumn {
DBColumn::BeaconBlock
}
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(self.0.as_ssz_bytes())
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
Ok(FrozenBlockSlot(Slot::from_ssz_bytes(bytes)?))
}
}

View File

@@ -189,7 +189,7 @@ impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> RootsIterator<'a, T,
block_hash: Hash256,
) -> Result<Self, Error> {
let block = store
.get_blinded_block(&block_hash)?
.get_blinded_block(&block_hash, None)?
.ok_or_else(|| BeaconStateError::MissingBeaconBlock(block_hash.into()))?;
let state = store
.get_state(&block.state_root(), Some(block.slot()))?
@@ -286,7 +286,7 @@ impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
let block = if self.decode_any_variant {
self.store.get_block_any_variant(&block_root)
} else {
self.store.get_blinded_block(&block_root)
self.store.get_blinded_block(&block_root, None)
}?
.ok_or(Error::BlockNotFound(block_root))?;
self.next_block_root = block.message().parent_root();
@@ -329,7 +329,8 @@ impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> BlockIterator<'a, T,
fn do_next(&mut self) -> Result<Option<SignedBeaconBlock<T, BlindedPayload<T>>>, Error> {
if let Some(result) = self.roots.next() {
let (root, _slot) = result?;
self.roots.inner.store.get_blinded_block(&root)
// Don't use slot hint here as it could be a skipped slot.
self.roots.inner.store.get_blinded_block(&root, None)
} else {
Ok(None)
}
@@ -413,15 +414,15 @@ mod test {
let mut hashes = (0..).map(Hash256::from_low_u64_be);
let roots_a = state_a.block_roots_mut();
for i in 0..roots_a.len() {
roots_a[i] = hashes.next().unwrap()
*roots_a.get_mut(i).unwrap() = hashes.next().unwrap()
}
let roots_b = state_b.block_roots_mut();
for i in 0..roots_b.len() {
roots_b[i] = hashes.next().unwrap()
*roots_b.get_mut(i).unwrap() = hashes.next().unwrap()
}
let state_a_root = hashes.next().unwrap();
state_b.state_roots_mut()[0] = state_a_root;
*state_b.state_roots_mut().get_mut(0).unwrap() = state_a_root;
store.put_state(&state_a_root, &state_a).unwrap();
let iter = BlockRootsIterator::new(&store, &state_b);

View File

@@ -1,7 +1,6 @@
use super::*;
use crate::hot_cold_store::HotColdDBError;
use crate::metrics;
use db_key::Key;
use leveldb::compaction::Compaction;
use leveldb::database::batch::{Batch, Writebatch};
use leveldb::database::kv::KV;
@@ -27,6 +26,7 @@ impl<E: EthSpec> LevelDB<E> {
let mut options = Options::new();
options.create_if_missing = true;
options.write_buffer_size = Some(512 * 1024 * 1024);
let db = Database::open(path, options)?;
let transaction_mutex = Mutex::new(());
@@ -168,8 +168,9 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
};
for (start_key, end_key) in vec![
endpoints(DBColumn::BeaconStateTemporary),
endpoints(DBColumn::BeaconState),
endpoints(DBColumn::BeaconStateDiff),
endpoints(DBColumn::BeaconStateSummary),
] {
self.db.compact(&start_key, &end_key);
}
@@ -177,9 +178,12 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
}
/// Iterate through all keys and values in a particular column.
fn iter_column(&self, column: DBColumn) -> ColumnIter {
let start_key =
BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes()));
fn iter_column<K: Key>(&self, column: DBColumn) -> ColumnIter<K> {
self.iter_column_from(column, &vec![0; column.key_size()])
}
fn iter_column_from<K: Key>(&self, column: DBColumn, from: &[u8]) -> ColumnIter<K> {
let start_key = BytesKey::from_vec(get_key_for_col(column.into(), &from));
let iter = self.db.iter(self.read_options());
iter.seek(&start_key);
@@ -187,13 +191,12 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
Box::new(
iter.take_while(move |(key, _)| key.matches_column(column))
.map(move |(bytes_key, value)| {
let key =
bytes_key
.remove_column(column)
.ok_or(HotColdDBError::IterationError {
unexpected_key: bytes_key,
})?;
Ok((key, value))
let key = bytes_key.remove_column_variable(column).ok_or_else(|| {
HotColdDBError::IterationError {
unexpected_key: bytes_key.clone(),
}
})?;
Ok((K::from_bytes(key)?, value))
}),
)
}
@@ -224,12 +227,12 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
impl<E: EthSpec> ItemStore<E> for LevelDB<E> {}
/// Used for keying leveldb.
#[derive(Debug, PartialEq)]
#[derive(Debug, Clone, PartialEq)]
pub struct BytesKey {
key: Vec<u8>,
}
impl Key for BytesKey {
impl db_key::Key for BytesKey {
fn from_u8(key: &[u8]) -> Self {
Self { key: key.to_vec() }
}
@@ -245,12 +248,20 @@ impl BytesKey {
self.key.starts_with(column.as_bytes())
}
/// Remove the column from a key, returning its `Hash256` portion.
/// Remove the column from a 32 byte key, yielding the `Hash256` key.
pub fn remove_column(&self, column: DBColumn) -> Option<Hash256> {
let key = self.remove_column_variable(column)?;
(column.key_size() == 32).then(|| Hash256::from_slice(key))
}
/// Remove the column from a key.
///
/// Will return `None` if the value doesn't match the column or has the wrong length.
pub fn remove_column_variable(&self, column: DBColumn) -> Option<&[u8]> {
if self.matches_column(column) {
let subkey = &self.key[column.as_bytes().len()..];
if subkey.len() == 32 {
return Some(Hash256::from_slice(subkey));
if subkey.len() == column.key_size() {
return Some(subkey);
}
}
None

View File

@@ -10,30 +10,28 @@
#[macro_use]
extern crate lazy_static;
mod chunk_writer;
pub mod chunked_iter;
pub mod chunked_vector;
pub mod config;
pub mod errors;
mod forwards_iter;
mod garbage_collection;
pub mod hdiff;
pub mod hot_cold_store;
mod hot_state_iter;
mod impls;
mod leveldb_store;
mod memory_store;
pub mod metadata;
pub mod metrics;
mod partial_beacon_state;
pub mod reconstruct;
mod state_cache;
pub mod validator_pubkey_cache;
pub mod iter;
pub use self::chunk_writer::ChunkWriter;
pub use self::config::StoreConfig;
pub use self::hot_cold_store::{HotColdDB, HotStateSummary, Split};
pub use self::leveldb_store::LevelDB;
pub use self::memory_store::MemoryStore;
pub use self::partial_beacon_state::PartialBeaconState;
pub use errors::Error;
pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer;
pub use metadata::AnchorInfo;
@@ -42,8 +40,9 @@ use parking_lot::MutexGuard;
use std::sync::Arc;
use strum::{EnumString, IntoStaticStr};
pub use types::*;
pub use validator_pubkey_cache::ValidatorPubkeyCache;
pub type ColumnIter<'a> = Box<dyn Iterator<Item = Result<(Hash256, Vec<u8>), Error>> + 'a>;
pub type ColumnIter<'a, K> = Box<dyn Iterator<Item = Result<(K, Vec<u8>), Error>> + 'a>;
pub type ColumnKeyIter<'a> = Box<dyn Iterator<Item = Result<Hash256, Error>> + 'a>;
pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
@@ -80,7 +79,11 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
fn compact(&self) -> Result<(), Error>;
/// Iterate through all keys and values in a particular column.
fn iter_column(&self, _column: DBColumn) -> ColumnIter {
fn iter_column<K: Key>(&self, column: DBColumn) -> ColumnIter<K> {
self.iter_column_from(column, &vec![0; column.key_size()])
}
fn iter_column_from<K: Key>(&self, _column: DBColumn, _from: &[u8]) -> ColumnIter<K> {
// Default impl for non LevelDB databases
Box::new(std::iter::empty())
}
@@ -92,6 +95,26 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
}
}
pub trait Key: Sized + 'static {
fn from_bytes(key: &[u8]) -> Result<Self, Error>;
}
impl Key for Hash256 {
fn from_bytes(key: &[u8]) -> Result<Self, Error> {
if key.len() == 32 {
Ok(Hash256::from_slice(key))
} else {
Err(Error::InvalidKey)
}
}
}
impl Key for Vec<u8> {
fn from_bytes(key: &[u8]) -> Result<Self, Error> {
Ok(key.to_vec())
}
}
pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec<u8> {
let mut result = column.as_bytes().to_vec();
result.extend_from_slice(key);
@@ -110,7 +133,7 @@ pub trait ItemStore<E: EthSpec>: KeyValueStore<E> + Sync + Send + Sized + 'stati
let column = I::db_column().into();
let key = key.as_bytes();
self.put_bytes(column, key, &item.as_store_bytes())
self.put_bytes(column, key, &item.as_store_bytes()?)
.map_err(Into::into)
}
@@ -118,7 +141,7 @@ pub trait ItemStore<E: EthSpec>: KeyValueStore<E> + Sync + Send + Sized + 'stati
let column = I::db_column().into();
let key = key.as_bytes();
self.put_bytes_sync(column, key, &item.as_store_bytes())
self.put_bytes_sync(column, key, &item.as_store_bytes()?)
.map_err(Into::into)
}
@@ -155,7 +178,6 @@ pub trait ItemStore<E: EthSpec>: KeyValueStore<E> + Sync + Send + Sized + 'stati
pub enum StoreOp<'a, E: EthSpec> {
PutBlock(Hash256, Arc<SignedBeaconBlock<E>>),
PutState(Hash256, &'a BeaconState<E>),
PutStateSummary(Hash256, HotStateSummary),
PutStateTemporaryFlag(Hash256),
DeleteStateTemporaryFlag(Hash256),
DeleteBlock(Hash256),
@@ -170,11 +192,28 @@ pub enum DBColumn {
/// For data related to the database itself.
#[strum(serialize = "bma")]
BeaconMeta,
/// Data related to blocks.
///
/// - Key: `Hash256` block root.
/// - Value in hot DB: SSZ-encoded blinded block.
/// - Value in cold DB: 8-byte slot of block.
#[strum(serialize = "blk")]
BeaconBlock,
/// Frozen beacon blocks.
///
/// - Key: 8-byte slot.
/// - Value: ZSTD-compressed SSZ-encoded blinded block.
#[strum(serialize = "bbf")]
BeaconBlockFrozen,
/// For full `BeaconState`s in the hot database (finalized or fork-boundary states).
#[strum(serialize = "ste")]
BeaconState,
/// For beacon state snapshots in the freezer DB.
#[strum(serialize = "bsn")]
BeaconStateSnapshot,
/// For compact `BeaconStateDiff`s in the freezer DB.
#[strum(serialize = "bsd")]
BeaconStateDiff,
/// For the mapping from state roots to their slots or summaries.
#[strum(serialize = "bss")]
BeaconStateSummary,
@@ -196,7 +235,7 @@ pub enum DBColumn {
ForkChoice,
#[strum(serialize = "pkc")]
PubkeyCache,
/// For the table mapping restore point numbers to state roots.
/// For the legacy table mapping restore point numbers to state roots.
#[strum(serialize = "brp")]
BeaconRestorePoint,
#[strum(serialize = "bbr")]
@@ -230,6 +269,36 @@ impl DBColumn {
pub fn as_bytes(self) -> &'static [u8] {
self.as_str().as_bytes()
}
/// Most database keys are 32 bytes, but some freezer DB keys are 8 bytes.
///
/// This function returns the number of bytes used by keys in a given column.
pub fn key_size(self) -> usize {
match self {
Self::BeaconMeta
| Self::BeaconBlock
| Self::BeaconState
| Self::BeaconStateSummary
| Self::BeaconStateTemporary
| Self::ExecPayload
| Self::BeaconChain
| Self::OpPool
| Self::Eth1Cache
| Self::ForkChoice
| Self::PubkeyCache
| Self::BeaconRestorePoint
| Self::DhtEnrs
| Self::OptimisticTransitionBlock => 32,
Self::BeaconBlockRoots
| Self::BeaconStateRoots
| Self::BeaconHistoricalRoots
| Self::BeaconHistoricalSummaries
| Self::BeaconRandaoMixes
| Self::BeaconBlockFrozen
| Self::BeaconStateSnapshot
| Self::BeaconStateDiff => 8,
}
}
}
/// An item that may stored in a `Store` by serializing and deserializing from bytes.
@@ -238,16 +307,16 @@ pub trait StoreItem: Sized {
fn db_column() -> DBColumn;
/// Serialize `self` as bytes.
fn as_store_bytes(&self) -> Vec<u8>;
fn as_store_bytes(&self) -> Result<Vec<u8>, Error>;
/// De-serialize `self` from bytes.
///
/// Return an instance of the type and the number of bytes that were read.
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error>;
fn as_kv_store_op(&self, key: Hash256) -> KeyValueStoreOp {
fn as_kv_store_op(&self, key: Hash256) -> Result<KeyValueStoreOp, Error> {
let db_key = get_key_for_col(Self::db_column().into(), key.as_bytes());
KeyValueStoreOp::PutKeyValue(db_key, self.as_store_bytes())
Ok(KeyValueStoreOp::PutKeyValue(db_key, self.as_store_bytes()?))
}
}
@@ -269,8 +338,8 @@ mod tests {
DBColumn::BeaconBlock
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(self.as_ssz_bytes())
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {

View File

@@ -1,5 +1,4 @@
use super::{Error, ItemStore, KeyValueStore, KeyValueStoreOp};
use crate::{ColumnIter, DBColumn};
use crate::{ColumnIter, DBColumn, Error, ItemStore, Key, KeyValueStore, KeyValueStoreOp};
use parking_lot::{Mutex, MutexGuard, RwLock};
use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
@@ -94,8 +93,7 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
Ok(())
}
// pub type ColumnIter<'a> = Box<dyn Iterator<Item = Result<(Hash256, Vec<u8>), Error>> + 'a>;
fn iter_column(&self, column: DBColumn) -> ColumnIter {
fn iter_column<K: Key>(&self, column: DBColumn) -> ColumnIter<K> {
let col = column.as_str();
if let Some(keys) = self
.col_keys
@@ -104,10 +102,11 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
.map(|set| set.iter().cloned().collect::<Vec<_>>())
{
Box::new(keys.into_iter().filter_map(move |key| {
let hash = Hash256::from_slice(&key);
self.get_bytes(col, &key)
.transpose()
.map(|res| res.map(|bytes| (hash, bytes)))
self.get_bytes(col, &key).transpose().map(|res| {
let k = K::from_bytes(&key)?;
let v = res?;
Ok((k, v))
})
}))
} else {
Box::new(std::iter::empty())

View File

@@ -4,7 +4,7 @@ use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use types::{Checkpoint, Hash256, Slot};
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(17);
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(23);
// All the keys that get stored under the `BeaconMeta` column.
//
@@ -30,8 +30,8 @@ impl StoreItem for SchemaVersion {
DBColumn::BeaconMeta
}
fn as_store_bytes(&self) -> Vec<u8> {
self.0.as_ssz_bytes()
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(self.0.as_ssz_bytes())
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
@@ -52,8 +52,8 @@ impl StoreItem for PruningCheckpoint {
DBColumn::BeaconMeta
}
fn as_store_bytes(&self) -> Vec<u8> {
self.checkpoint.as_ssz_bytes()
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(self.checkpoint.as_ssz_bytes())
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
@@ -71,8 +71,8 @@ impl StoreItem for CompactionTimestamp {
DBColumn::BeaconMeta
}
fn as_store_bytes(&self) -> Vec<u8> {
self.0.as_ssz_bytes()
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(self.0.as_ssz_bytes())
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
@@ -111,8 +111,8 @@ impl StoreItem for AnchorInfo {
DBColumn::BeaconMeta
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(self.as_ssz_bytes())
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {

View File

@@ -54,17 +54,13 @@ lazy_static! {
"store_beacon_state_hot_get_total",
"Total number of hot beacon states requested from the store (cache or DB)"
);
pub static ref BEACON_STATE_CACHE_HIT_COUNT: Result<IntCounter> = try_create_int_counter(
"store_beacon_state_cache_hit_total",
"Number of hits to the store's state cache"
);
pub static ref BEACON_STATE_CACHE_CLONE_TIME: Result<Histogram> = try_create_histogram(
"store_beacon_state_cache_clone_time",
"Time to load a beacon block from the block cache"
);
pub static ref BEACON_STATE_READ_TIMES: Result<Histogram> = try_create_histogram(
"store_beacon_state_read_seconds",
"Total time required to read a BeaconState from the database"
"Total time required to read a full BeaconState from the database"
);
pub static ref BEACON_HOT_STATE_READ_TIMES: Result<Histogram> = try_create_histogram(
"store_beacon_hot_state_read_seconds",
"Total time required to read a hot BeaconState from the database"
);
pub static ref BEACON_STATE_READ_OVERHEAD_TIMES: Result<Histogram> = try_create_histogram(
"store_beacon_state_read_overhead_seconds",
@@ -90,6 +86,33 @@ lazy_static! {
"store_beacon_state_write_bytes_total",
"Total number of beacon state bytes written to the DB"
);
/*
* Beacon state diffs
*/
pub static ref BEACON_STATE_DIFF_WRITE_BYTES: Result<IntCounter> = try_create_int_counter(
"store_beacon_state_diff_write_bytes_total",
"Total number of bytes written for beacon state diffs"
);
pub static ref BEACON_STATE_DIFF_WRITE_COUNT: Result<IntCounter> = try_create_int_counter(
"store_beacon_state_diff_write_count_total",
"Total number of beacon state diffs written"
);
pub static ref BEACON_STATE_DIFF_COMPRESSION_RATIO: Result<Gauge> = try_create_float_gauge(
"store_beacon_state_diff_compression_ratio",
"Compression ratio for beacon state diffs (higher is better)"
);
pub static ref BEACON_STATE_DIFF_COMPUTE_TIME: Result<Histogram> = try_create_histogram(
"store_beacon_state_diff_compute_time",
"Time to calculate a beacon state diff"
);
pub static ref BEACON_STATE_DIFF_ENCODE_TIME: Result<Histogram> = try_create_histogram(
"store_beacon_state_diff_encode_time",
"Time to encode a beacon state diff as SSZ"
);
pub static ref BEACON_STATE_DIFF_COMPRESSION_TIME: Result<Histogram> = try_create_histogram(
"store_beacon_state_diff_compression_time",
"Time to compress beacon state SSZ using Flate2"
);
/*
* Beacon Block
*/

View File

@@ -1,456 +0,0 @@
use crate::chunked_vector::{
load_variable_list_from_db, load_vector_from_db, BlockRoots, HistoricalRoots,
HistoricalSummaries, RandaoMixes, StateRoots,
};
use crate::{get_key_for_col, DBColumn, Error, KeyValueStore, KeyValueStoreOp};
use ssz::{Decode, DecodeError, Encode};
use ssz_derive::{Decode, Encode};
use std::convert::TryInto;
use std::sync::Arc;
use types::historical_summary::HistoricalSummary;
use types::superstruct;
use types::*;
/// Lightweight variant of the `BeaconState` that is stored in the database.
///
/// Utilises lazy-loading from separate storage for its vector fields.
#[superstruct(
variants(Base, Altair, Merge, Capella),
variant_attributes(derive(Debug, PartialEq, Clone, Encode, Decode))
)]
#[derive(Debug, PartialEq, Clone, Encode)]
#[ssz(enum_behaviour = "transparent")]
pub struct PartialBeaconState<T>
where
T: EthSpec,
{
// Versioning
pub genesis_time: u64,
pub genesis_validators_root: Hash256,
#[superstruct(getter(copy))]
pub slot: Slot,
pub fork: Fork,
// History
pub latest_block_header: BeaconBlockHeader,
#[ssz(skip_serializing, skip_deserializing)]
pub block_roots: Option<FixedVector<Hash256, T::SlotsPerHistoricalRoot>>,
#[ssz(skip_serializing, skip_deserializing)]
pub state_roots: Option<FixedVector<Hash256, T::SlotsPerHistoricalRoot>>,
#[ssz(skip_serializing, skip_deserializing)]
pub historical_roots: Option<VariableList<Hash256, T::HistoricalRootsLimit>>,
// Ethereum 1.0 chain data
pub eth1_data: Eth1Data,
pub eth1_data_votes: VariableList<Eth1Data, T::SlotsPerEth1VotingPeriod>,
pub eth1_deposit_index: u64,
// Registry
pub validators: VariableList<Validator, T::ValidatorRegistryLimit>,
pub balances: VariableList<u64, T::ValidatorRegistryLimit>,
// Shuffling
/// Randao value from the current slot, for patching into the per-epoch randao vector.
pub latest_randao_value: Hash256,
#[ssz(skip_serializing, skip_deserializing)]
pub randao_mixes: Option<FixedVector<Hash256, T::EpochsPerHistoricalVector>>,
// Slashings
slashings: FixedVector<u64, T::EpochsPerSlashingsVector>,
// Attestations (genesis fork only)
#[superstruct(only(Base))]
pub previous_epoch_attestations: VariableList<PendingAttestation<T>, T::MaxPendingAttestations>,
#[superstruct(only(Base))]
pub current_epoch_attestations: VariableList<PendingAttestation<T>, T::MaxPendingAttestations>,
// Participation (Altair and later)
#[superstruct(only(Altair, Merge, Capella))]
pub previous_epoch_participation: VariableList<ParticipationFlags, T::ValidatorRegistryLimit>,
#[superstruct(only(Altair, Merge, Capella))]
pub current_epoch_participation: VariableList<ParticipationFlags, T::ValidatorRegistryLimit>,
// Finality
pub justification_bits: BitVector<T::JustificationBitsLength>,
pub previous_justified_checkpoint: Checkpoint,
pub current_justified_checkpoint: Checkpoint,
pub finalized_checkpoint: Checkpoint,
// Inactivity
#[superstruct(only(Altair, Merge, Capella))]
pub inactivity_scores: VariableList<u64, T::ValidatorRegistryLimit>,
// Light-client sync committees
#[superstruct(only(Altair, Merge, Capella))]
pub current_sync_committee: Arc<SyncCommittee<T>>,
#[superstruct(only(Altair, Merge, Capella))]
pub next_sync_committee: Arc<SyncCommittee<T>>,
// Execution
#[superstruct(
only(Merge),
partial_getter(rename = "latest_execution_payload_header_merge")
)]
pub latest_execution_payload_header: ExecutionPayloadHeaderMerge<T>,
#[superstruct(
only(Capella),
partial_getter(rename = "latest_execution_payload_header_capella")
)]
pub latest_execution_payload_header: ExecutionPayloadHeaderCapella<T>,
// Capella
#[superstruct(only(Capella))]
pub next_withdrawal_index: u64,
#[superstruct(only(Capella))]
pub next_withdrawal_validator_index: u64,
#[ssz(skip_serializing, skip_deserializing)]
#[superstruct(only(Capella))]
pub historical_summaries: Option<VariableList<HistoricalSummary, T::HistoricalRootsLimit>>,
}
/// Implement the conversion function from BeaconState -> PartialBeaconState.
macro_rules! impl_from_state_forgetful {
($s:ident, $outer:ident, $variant_name:ident, $struct_name:ident, [$($extra_fields:ident),*], [$($extra_fields_opt:ident),*]) => {
PartialBeaconState::$variant_name($struct_name {
// Versioning
genesis_time: $s.genesis_time,
genesis_validators_root: $s.genesis_validators_root,
slot: $s.slot,
fork: $s.fork,
// History
latest_block_header: $s.latest_block_header.clone(),
block_roots: None,
state_roots: None,
historical_roots: None,
// Eth1
eth1_data: $s.eth1_data.clone(),
eth1_data_votes: $s.eth1_data_votes.clone(),
eth1_deposit_index: $s.eth1_deposit_index,
// Validator registry
validators: $s.validators.clone(),
balances: $s.balances.clone(),
// Shuffling
latest_randao_value: *$outer
.get_randao_mix($outer.current_epoch())
.expect("randao at current epoch is OK"),
randao_mixes: None,
// Slashings
slashings: $s.slashings.clone(),
// Finality
justification_bits: $s.justification_bits.clone(),
previous_justified_checkpoint: $s.previous_justified_checkpoint,
current_justified_checkpoint: $s.current_justified_checkpoint,
finalized_checkpoint: $s.finalized_checkpoint,
// Variant-specific fields
$(
$extra_fields: $s.$extra_fields.clone()
),*,
// Variant-specific optional
$(
$extra_fields_opt: None
),*
})
}
}
impl<T: EthSpec> PartialBeaconState<T> {
/// Convert a `BeaconState` to a `PartialBeaconState`, while dropping the optional fields.
pub fn from_state_forgetful(outer: &BeaconState<T>) -> Self {
match outer {
BeaconState::Base(s) => impl_from_state_forgetful!(
s,
outer,
Base,
PartialBeaconStateBase,
[previous_epoch_attestations, current_epoch_attestations],
[]
),
BeaconState::Altair(s) => impl_from_state_forgetful!(
s,
outer,
Altair,
PartialBeaconStateAltair,
[
previous_epoch_participation,
current_epoch_participation,
current_sync_committee,
next_sync_committee,
inactivity_scores
],
[]
),
BeaconState::Merge(s) => impl_from_state_forgetful!(
s,
outer,
Merge,
PartialBeaconStateMerge,
[
previous_epoch_participation,
current_epoch_participation,
current_sync_committee,
next_sync_committee,
inactivity_scores,
latest_execution_payload_header
],
[]
),
BeaconState::Capella(s) => impl_from_state_forgetful!(
s,
outer,
Capella,
PartialBeaconStateCapella,
[
previous_epoch_participation,
current_epoch_participation,
current_sync_committee,
next_sync_committee,
inactivity_scores,
latest_execution_payload_header,
next_withdrawal_index,
next_withdrawal_validator_index
],
[historical_summaries]
),
}
}
/// SSZ decode.
pub fn from_ssz_bytes(bytes: &[u8], spec: &ChainSpec) -> Result<Self, ssz::DecodeError> {
// Slot is after genesis_time (u64) and genesis_validators_root (Hash256).
let slot_offset = <u64 as Decode>::ssz_fixed_len() + <Hash256 as Decode>::ssz_fixed_len();
let slot_len = <Slot as Decode>::ssz_fixed_len();
let slot_bytes = bytes.get(slot_offset..slot_offset + slot_len).ok_or(
DecodeError::InvalidByteLength {
len: bytes.len(),
expected: slot_offset + slot_len,
},
)?;
let slot = Slot::from_ssz_bytes(slot_bytes)?;
let fork_at_slot = spec.fork_name_at_slot::<T>(slot);
Ok(map_fork_name!(
fork_at_slot,
Self,
<_>::from_ssz_bytes(bytes)?
))
}
/// Prepare the partial state for storage in the KV database.
pub fn as_kv_store_op(&self, state_root: Hash256) -> KeyValueStoreOp {
let db_key = get_key_for_col(DBColumn::BeaconState.into(), state_root.as_bytes());
KeyValueStoreOp::PutKeyValue(db_key, self.as_ssz_bytes())
}
pub fn load_block_roots<S: KeyValueStore<T>>(
&mut self,
store: &S,
spec: &ChainSpec,
) -> Result<(), Error> {
if self.block_roots().is_none() {
*self.block_roots_mut() = Some(load_vector_from_db::<BlockRoots, T, _>(
store,
self.slot(),
spec,
)?);
}
Ok(())
}
pub fn load_state_roots<S: KeyValueStore<T>>(
&mut self,
store: &S,
spec: &ChainSpec,
) -> Result<(), Error> {
if self.state_roots().is_none() {
*self.state_roots_mut() = Some(load_vector_from_db::<StateRoots, T, _>(
store,
self.slot(),
spec,
)?);
}
Ok(())
}
pub fn load_historical_roots<S: KeyValueStore<T>>(
&mut self,
store: &S,
spec: &ChainSpec,
) -> Result<(), Error> {
if self.historical_roots().is_none() {
*self.historical_roots_mut() = Some(
load_variable_list_from_db::<HistoricalRoots, T, _>(store, self.slot(), spec)?,
);
}
Ok(())
}
pub fn load_historical_summaries<S: KeyValueStore<T>>(
&mut self,
store: &S,
spec: &ChainSpec,
) -> Result<(), Error> {
let slot = self.slot();
if let Ok(historical_summaries) = self.historical_summaries_mut() {
if historical_summaries.is_none() {
*historical_summaries =
Some(load_variable_list_from_db::<HistoricalSummaries, T, _>(
store, slot, spec,
)?);
}
}
Ok(())
}
pub fn load_randao_mixes<S: KeyValueStore<T>>(
&mut self,
store: &S,
spec: &ChainSpec,
) -> Result<(), Error> {
if self.randao_mixes().is_none() {
// Load the per-epoch values from the database
let mut randao_mixes =
load_vector_from_db::<RandaoMixes, T, _>(store, self.slot(), spec)?;
// Patch the value for the current slot into the index for the current epoch
let current_epoch = self.slot().epoch(T::slots_per_epoch());
let len = randao_mixes.len();
randao_mixes[current_epoch.as_usize() % len] = *self.latest_randao_value();
*self.randao_mixes_mut() = Some(randao_mixes)
}
Ok(())
}
}
/// Implement the conversion from PartialBeaconState -> BeaconState.
macro_rules! impl_try_into_beacon_state {
($inner:ident, $variant_name:ident, $struct_name:ident, [$($extra_fields:ident),*], [$($extra_opt_fields:ident),*]) => {
BeaconState::$variant_name($struct_name {
// Versioning
genesis_time: $inner.genesis_time,
genesis_validators_root: $inner.genesis_validators_root,
slot: $inner.slot,
fork: $inner.fork,
// History
latest_block_header: $inner.latest_block_header,
block_roots: unpack_field($inner.block_roots)?,
state_roots: unpack_field($inner.state_roots)?,
historical_roots: unpack_field($inner.historical_roots)?,
// Eth1
eth1_data: $inner.eth1_data,
eth1_data_votes: $inner.eth1_data_votes,
eth1_deposit_index: $inner.eth1_deposit_index,
// Validator registry
validators: $inner.validators,
balances: $inner.balances,
// Shuffling
randao_mixes: unpack_field($inner.randao_mixes)?,
// Slashings
slashings: $inner.slashings,
// Finality
justification_bits: $inner.justification_bits,
previous_justified_checkpoint: $inner.previous_justified_checkpoint,
current_justified_checkpoint: $inner.current_justified_checkpoint,
finalized_checkpoint: $inner.finalized_checkpoint,
// Caching
total_active_balance: <_>::default(),
committee_caches: <_>::default(),
pubkey_cache: <_>::default(),
exit_cache: <_>::default(),
tree_hash_cache: <_>::default(),
// Variant-specific fields
$(
$extra_fields: $inner.$extra_fields
),*,
// Variant-specific optional fields
$(
$extra_opt_fields: unpack_field($inner.$extra_opt_fields)?
),*
})
}
}
fn unpack_field<T>(x: Option<T>) -> Result<T, Error> {
x.ok_or(Error::PartialBeaconStateError)
}
impl<E: EthSpec> TryInto<BeaconState<E>> for PartialBeaconState<E> {
type Error = Error;
fn try_into(self) -> Result<BeaconState<E>, Error> {
let state = match self {
PartialBeaconState::Base(inner) => impl_try_into_beacon_state!(
inner,
Base,
BeaconStateBase,
[previous_epoch_attestations, current_epoch_attestations],
[]
),
PartialBeaconState::Altair(inner) => impl_try_into_beacon_state!(
inner,
Altair,
BeaconStateAltair,
[
previous_epoch_participation,
current_epoch_participation,
current_sync_committee,
next_sync_committee,
inactivity_scores
],
[]
),
PartialBeaconState::Merge(inner) => impl_try_into_beacon_state!(
inner,
Merge,
BeaconStateMerge,
[
previous_epoch_participation,
current_epoch_participation,
current_sync_committee,
next_sync_committee,
inactivity_scores,
latest_execution_payload_header
],
[]
),
PartialBeaconState::Capella(inner) => impl_try_into_beacon_state!(
inner,
Capella,
BeaconStateCapella,
[
previous_epoch_participation,
current_epoch_participation,
current_sync_committee,
next_sync_committee,
inactivity_scores,
latest_execution_payload_header,
next_withdrawal_index,
next_withdrawal_validator_index
],
[historical_summaries]
),
};
Ok(state)
}
}

View File

@@ -8,7 +8,7 @@ use state_processing::{
StateProcessingStrategy, VerifyBlockRoot,
};
use std::sync::Arc;
use types::{EthSpec, Hash256};
use types::EthSpec;
impl<E, Hot, Cold> HotColdDB<E, Hot, Cold>
where
@@ -16,7 +16,10 @@ where
Hot: ItemStore<E>,
Cold: ItemStore<E>,
{
pub fn reconstruct_historic_states(self: &Arc<Self>) -> Result<(), Error> {
pub fn reconstruct_historic_states(
self: &Arc<Self>,
num_blocks: Option<usize>,
) -> Result<(), Error> {
let mut anchor = if let Some(anchor) = self.get_anchor_info() {
anchor
} else {
@@ -37,26 +40,17 @@ where
"start_slot" => anchor.state_lower_limit,
);
let slots_per_restore_point = self.config.slots_per_restore_point;
// Iterate blocks from the state lower limit to the upper limit.
let lower_limit_slot = anchor.state_lower_limit;
let split = self.get_split_info();
let upper_limit_state = self.get_restore_point(
anchor.state_upper_limit.as_u64() / slots_per_restore_point,
&split,
)?;
let upper_limit_slot = upper_limit_state.slot();
let lower_limit_slot = anchor.state_lower_limit;
let upper_limit_slot = std::cmp::min(split.slot, anchor.state_upper_limit);
// Use a dummy root, as we never read the block for the upper limit state.
let upper_limit_block_root = Hash256::repeat_byte(0xff);
let block_root_iter = self.forwards_block_roots_iterator(
lower_limit_slot,
upper_limit_state,
upper_limit_block_root,
&self.spec,
)?;
// If `num_blocks` is not specified iterate all blocks.
let block_root_iter = self
.forwards_block_roots_iterator_until(lower_limit_slot, upper_limit_slot - 1, || {
panic!("FIXME(sproul): reconstruction doesn't need this state")
})?
.take(num_blocks.unwrap_or(usize::MAX));
// The state to be advanced.
let mut state = self
@@ -77,7 +71,7 @@ where
None
} else {
Some(
self.get_blinded_block(&block_root)?
self.get_blinded_block(&block_root, Some(slot))?
.ok_or(Error::BlockNotFound(block_root))?,
)
};
@@ -114,7 +108,7 @@ where
self.store_cold_state(&state_root, &state, &mut io_batch)?;
// If the slot lies on an epoch boundary, commit the batch and update the anchor.
if slot % slots_per_restore_point == 0 || slot + 1 == upper_limit_slot {
if slot % E::slots_per_epoch() == 0 || slot + 1 == upper_limit_slot {
info!(
self.log,
"State reconstruction in progress";

View File

@@ -0,0 +1,210 @@
use crate::Error;
use lru::LruCache;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::num::NonZeroUsize;
use types::{BeaconState, EthSpec, Hash256, Slot};
#[derive(Debug)]
pub struct FinalizedState<E: EthSpec> {
state_root: Hash256,
state: BeaconState<E>,
}
/// Map from block_root -> slot -> state_root.
#[derive(Debug, Default)]
pub struct BlockMap {
blocks: HashMap<Hash256, SlotMap>,
}
/// Map from slot -> state_root.
#[derive(Debug, Default)]
pub struct SlotMap {
slots: BTreeMap<Slot, Hash256>,
}
#[derive(Debug)]
pub struct StateCache<E: EthSpec> {
finalized_state: Option<FinalizedState<E>>,
states: LruCache<Hash256, BeaconState<E>>,
block_map: BlockMap,
}
#[derive(Debug)]
pub enum PutStateOutcome {
Finalized,
Duplicate,
New,
}
impl<E: EthSpec> StateCache<E> {
pub fn new(capacity: NonZeroUsize) -> Self {
StateCache {
finalized_state: None,
states: LruCache::new(capacity),
block_map: BlockMap::default(),
}
}
pub fn len(&self) -> usize {
self.states.len()
}
pub fn update_finalized_state(
&mut self,
state_root: Hash256,
block_root: Hash256,
state: BeaconState<E>,
) -> Result<(), Error> {
if state.slot() % E::slots_per_epoch() != 0 {
return Err(Error::FinalizedStateUnaligned);
}
if self
.finalized_state
.as_ref()
.map_or(false, |finalized_state| {
state.slot() < finalized_state.state.slot()
})
{
return Err(Error::FinalizedStateDecreasingSlot);
}
// Add to block map.
self.block_map.insert(block_root, state.slot(), state_root);
// Prune block map.
let state_roots_to_prune = self.block_map.prune(state.slot());
// Delete states.
for state_root in state_roots_to_prune {
self.states.pop(&state_root);
}
// Update finalized state.
self.finalized_state = Some(FinalizedState { state_root, state });
Ok(())
}
/// Return a status indicating whether the state already existed in the cache.
pub fn put_state(
&mut self,
state_root: Hash256,
block_root: Hash256,
state: &BeaconState<E>,
) -> Result<PutStateOutcome, Error> {
if self
.finalized_state
.as_ref()
.map_or(false, |finalized_state| {
finalized_state.state_root == state_root
})
{
return Ok(PutStateOutcome::Finalized);
}
if self.states.peek(&state_root).is_some() {
return Ok(PutStateOutcome::Duplicate);
}
// Refuse states with pending mutations: we want cached states to be as small as possible
// i.e. stored entirely as a binary merkle tree with no updates overlaid.
if state.has_pending_mutations() {
return Err(Error::StateForCacheHasPendingUpdates {
state_root,
slot: state.slot(),
});
}
// Insert the full state into the cache.
self.states.put(state_root, state.clone());
// Record the connection from block root and slot to this state.
let slot = state.slot();
self.block_map.insert(block_root, slot, state_root);
Ok(PutStateOutcome::New)
}
pub fn get_by_state_root(&mut self, state_root: Hash256) -> Option<BeaconState<E>> {
if let Some(ref finalized_state) = self.finalized_state {
if state_root == finalized_state.state_root {
return Some(finalized_state.state.clone());
}
}
self.states.get(&state_root).cloned()
}
pub fn get_by_block_root(
&mut self,
block_root: Hash256,
slot: Slot,
) -> Option<(Hash256, BeaconState<E>)> {
let slot_map = self.block_map.blocks.get(&block_root)?;
// Find the state at `slot`, or failing that the most recent ancestor.
let state_root = slot_map
.slots
.iter()
.rev()
.find_map(|(ancestor_slot, state_root)| {
(*ancestor_slot <= slot).then_some(*state_root)
})?;
let state = self.get_by_state_root(state_root)?;
Some((state_root, state))
}
pub fn delete_state(&mut self, state_root: &Hash256) {
self.states.pop(state_root);
self.block_map.delete(state_root);
}
pub fn delete_block_states(&mut self, block_root: &Hash256) {
if let Some(slot_map) = self.block_map.delete_block_states(block_root) {
for state_root in slot_map.slots.values() {
self.states.pop(state_root);
}
}
}
}
impl BlockMap {
fn insert(&mut self, block_root: Hash256, slot: Slot, state_root: Hash256) {
let slot_map = self
.blocks
.entry(block_root)
.or_insert_with(SlotMap::default);
slot_map.slots.insert(slot, state_root);
}
fn prune(&mut self, finalized_slot: Slot) -> HashSet<Hash256> {
let mut pruned_states = HashSet::new();
self.blocks.retain(|_, slot_map| {
slot_map.slots.retain(|slot, state_root| {
let keep = *slot >= finalized_slot;
if !keep {
pruned_states.insert(*state_root);
}
keep
});
!slot_map.slots.is_empty()
});
pruned_states
}
fn delete(&mut self, state_root_to_delete: &Hash256) {
self.blocks.retain(|_, slot_map| {
slot_map
.slots
.retain(|_, state_root| state_root != state_root_to_delete);
!slot_map.slots.is_empty()
});
}
fn delete_block_states(&mut self, block_root: &Hash256) -> Option<SlotMap> {
self.blocks.remove(block_root)
}
}

View File

@@ -0,0 +1,346 @@
use crate::{DBColumn, Error, HotColdDB, ItemStore, StoreItem, StoreOp};
use bls::PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN;
use smallvec::SmallVec;
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;
use types::{BeaconState, EthSpec, Hash256, PublicKey, PublicKeyBytes};
/// Provides a mapping of `validator_index -> validator_publickey`.
///
/// This cache exists for two reasons:
///
/// 1. To avoid reading a `BeaconState` from disk each time we need a public key.
/// 2. To reduce the amount of public key _decompression_ required. A `BeaconState` stores public
/// keys in compressed form and they are needed in decompressed form for signature verification.
/// Decompression is expensive when many keys are involved.
///
/// The cache has a `backing` that it uses to maintain a persistent, on-disk
/// copy of itself. This allows it to be restored between process invocations.
#[derive(Debug)]
pub struct ValidatorPubkeyCache<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
pubkeys: Vec<PublicKey>,
indices: HashMap<PublicKeyBytes, usize>,
validators: Vec<Arc<PublicKeyBytes>>,
_phantom: PhantomData<(E, Hot, Cold)>,
}
// Temp value.
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Default
for ValidatorPubkeyCache<E, Hot, Cold>
{
fn default() -> Self {
ValidatorPubkeyCache {
pubkeys: vec![],
indices: HashMap::new(),
validators: vec![],
_phantom: PhantomData,
}
}
}
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> ValidatorPubkeyCache<E, Hot, Cold> {
/// Create a new public key cache using the keys in `state.validators`.
///
/// The new cache will be updated with the keys from `state` and immediately written to disk.
pub fn new(state: &BeaconState<E>, store: &HotColdDB<E, Hot, Cold>) -> Result<Self, Error> {
let mut cache = Self {
pubkeys: vec![],
indices: HashMap::new(),
validators: vec![],
_phantom: PhantomData,
};
let store_ops = cache.import_new_pubkeys(state)?;
store.do_atomically(store_ops)?;
Ok(cache)
}
/// Load the pubkey cache from the given on-disk database.
pub fn load_from_store(store: &HotColdDB<E, Hot, Cold>) -> Result<Self, Error> {
let mut pubkeys = vec![];
let mut indices = HashMap::new();
let mut validators = vec![];
for validator_index in 0.. {
if let Some(db_validator) =
store.get_item(&DatabaseValidator::key_for_index(validator_index))?
{
let (pubkey, pubkey_bytes) =
DatabaseValidator::into_immutable_validator(&db_validator)?;
pubkeys.push(pubkey);
indices.insert(pubkey_bytes, validator_index);
validators.push(Arc::new(pubkey_bytes));
} else {
break;
}
}
Ok(ValidatorPubkeyCache {
pubkeys,
indices,
validators,
_phantom: PhantomData,
})
}
/// Scan the given `state` and add any new validator public keys.
///
/// Does not delete any keys from `self` if they don't appear in `state`.
///
/// NOTE: The caller *must* commit the returned I/O batch as part of the block import process.
pub fn import_new_pubkeys(
&mut self,
state: &BeaconState<E>,
) -> Result<Vec<StoreOp<'static, E>>, Error> {
if state.validators().len() > self.validators.len() {
self.import(
state
.validators()
.iter_from(self.pubkeys.len())?
.map(|v| v.pubkey.clone()),
)
} else {
Ok(vec![])
}
}
/// Adds zero or more validators to `self`.
fn import<I>(&mut self, validator_keys: I) -> Result<Vec<StoreOp<'static, E>>, Error>
where
I: Iterator<Item = Arc<PublicKeyBytes>> + ExactSizeIterator,
{
self.validators.reserve(validator_keys.len());
self.pubkeys.reserve(validator_keys.len());
self.indices.reserve(validator_keys.len());
let mut store_ops = Vec::with_capacity(validator_keys.len());
for pubkey_bytes in validator_keys {
let i = self.pubkeys.len();
if self.indices.contains_key(&pubkey_bytes) {
return Err(Error::DuplicateValidatorPublicKey);
}
let pubkey = (&*pubkey_bytes)
.try_into()
.map_err(Error::InvalidValidatorPubkeyBytes)?;
// Stage the new validator key for writing to disk.
// It will be committed atomically when the block that introduced it is written to disk.
// Notably it is NOT written while the write lock on the cache is held.
// See: https://github.com/sigp/lighthouse/issues/2327
store_ops.push(StoreOp::KeyValueOp(
DatabaseValidator::from_immutable_validator(&pubkey, &pubkey_bytes)
.as_kv_store_op(DatabaseValidator::key_for_index(i))?,
));
self.pubkeys.push(pubkey);
self.indices.insert(*pubkey_bytes, i);
self.validators.push(pubkey_bytes);
}
Ok(store_ops)
}
/// Get the public key for a validator with index `i`.
pub fn get(&self, i: usize) -> Option<&PublicKey> {
self.pubkeys.get(i)
}
/// Get the immutable validator with index `i`.
pub fn get_validator_pubkey(&self, i: usize) -> Option<Arc<PublicKeyBytes>> {
self.validators.get(i).cloned()
}
/// Get the `PublicKey` for a validator with `PublicKeyBytes`.
pub fn get_pubkey_from_pubkey_bytes(&self, pubkey: &PublicKeyBytes) -> Option<&PublicKey> {
self.get_index(pubkey).and_then(|index| self.get(index))
}
/// Get the public key (in bytes form) for a validator with index `i`.
pub fn get_pubkey_bytes(&self, i: usize) -> Option<&PublicKeyBytes> {
self.validators.get(i).map(|pubkey_bytes| &**pubkey_bytes)
}
/// Get the index of a validator with `pubkey`.
pub fn get_index(&self, pubkey: &PublicKeyBytes) -> Option<usize> {
self.indices.get(pubkey).copied()
}
/// Returns the number of validators in the cache.
pub fn len(&self) -> usize {
self.indices.len()
}
/// Returns `true` if there are no validators in the cache.
pub fn is_empty(&self) -> bool {
self.indices.is_empty()
}
}
/// Wrapper for a public key stored in the database.
///
/// Keyed by the validator index as `Hash256::from_low_u64_be(index)`.
#[derive(Encode, Decode)]
struct DatabaseValidator {
pubkey: SmallVec<[u8; PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN]>,
}
impl StoreItem for DatabaseValidator {
fn db_column() -> DBColumn {
DBColumn::PubkeyCache
}
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(self.as_ssz_bytes())
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
Ok(Self::from_ssz_bytes(bytes)?)
}
}
impl DatabaseValidator {
fn key_for_index(index: usize) -> Hash256 {
Hash256::from_low_u64_be(index as u64)
}
// FIXME(sproul): remove param
fn from_immutable_validator(pubkey: &PublicKey, _validator: &PublicKeyBytes) -> Self {
DatabaseValidator {
pubkey: pubkey.serialize_uncompressed().into(),
}
}
#[allow(clippy::wrong_self_convention)]
fn into_immutable_validator(&self) -> Result<(PublicKey, PublicKeyBytes), Error> {
let pubkey = PublicKey::deserialize_uncompressed(&self.pubkey)
.map_err(Error::InvalidValidatorPubkeyBytes)?;
let pubkey_bytes = pubkey.compress();
Ok((pubkey, pubkey_bytes))
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::{HotColdDB, MemoryStore};
use beacon_chain::test_utils::BeaconChainHarness;
use logging::test_logger;
use std::sync::Arc;
use types::{BeaconState, EthSpec, Keypair, MainnetEthSpec};
type E = MainnetEthSpec;
type Store = MemoryStore<E>;
fn get_state(validator_count: usize) -> (BeaconState<E>, Vec<Keypair>) {
let harness = BeaconChainHarness::builder(MainnetEthSpec)
.default_spec()
.deterministic_keypairs(validator_count)
.fresh_ephemeral_store()
.build();
harness.advance_slot();
(harness.get_current_state(), harness.validator_keypairs)
}
fn get_store() -> Arc<HotColdDB<E, Store, Store>> {
Arc::new(
HotColdDB::open_ephemeral(<_>::default(), E::default_spec(), test_logger()).unwrap(),
)
}
#[allow(clippy::needless_range_loop)]
fn check_cache_get(cache: &ValidatorPubkeyCache<E, Store, Store>, keypairs: &[Keypair]) {
let validator_count = keypairs.len();
for i in 0..validator_count + 1 {
if i < validator_count {
let pubkey = cache.get(i).expect("pubkey should be present");
assert_eq!(pubkey, &keypairs[i].pk, "pubkey should match cache");
let pubkey_bytes: PublicKeyBytes = pubkey.clone().into();
assert_eq!(
i,
cache
.get_index(&pubkey_bytes)
.expect("should resolve index"),
"index should match cache"
);
} else {
assert_eq!(
cache.get(i),
None,
"should not get pubkey for out of bounds index",
);
}
}
}
#[test]
fn basic_operation() {
let (state, keypairs) = get_state(8);
let store = get_store();
let mut cache = ValidatorPubkeyCache::new(&state, &store).expect("should create cache");
check_cache_get(&cache, &keypairs[..]);
// Try adding a state with the same number of keypairs.
let (state, keypairs) = get_state(8);
cache
.import_new_pubkeys(&state)
.expect("should import pubkeys");
check_cache_get(&cache, &keypairs[..]);
// Try adding a state with less keypairs.
let (state, _) = get_state(1);
cache
.import_new_pubkeys(&state)
.expect("should import pubkeys");
check_cache_get(&cache, &keypairs[..]);
// Try adding a state with more keypairs.
let (state, keypairs) = get_state(12);
cache
.import_new_pubkeys(&state)
.expect("should import pubkeys");
check_cache_get(&cache, &keypairs[..]);
}
#[test]
fn persistence() {
let (state, keypairs) = get_state(8);
let store = get_store();
// Create a new cache.
let cache = ValidatorPubkeyCache::new(&state, &store).expect("should create cache");
check_cache_get(&cache, &keypairs[..]);
drop(cache);
// Re-init the cache from the store.
let mut cache = ValidatorPubkeyCache::load_from_store(&store).expect("should open cache");
check_cache_get(&cache, &keypairs[..]);
// Add some more keypairs.
let (state, keypairs) = get_state(12);
let ops = cache
.import_new_pubkeys(&state)
.expect("should import pubkeys");
store.do_atomically(ops).unwrap();
check_cache_get(&cache, &keypairs[..]);
drop(cache);
// Re-init the cache from the store.
let cache = ValidatorPubkeyCache::load_from_store(&store).expect("should open cache");
check_cache_get(&cache, &keypairs[..]);
}
}