diff --git a/beacon_node/store/src/chunked_iter.rs b/beacon_node/store/src/chunked_iter.rs deleted file mode 100644 index 72e5d9c7af..0000000000 --- a/beacon_node/store/src/chunked_iter.rs +++ /dev/null @@ -1,120 +0,0 @@ -use crate::chunked_vector::{Chunk, Field, chunk_key}; -use crate::{HotColdDB, ItemStore}; -use tracing::error; -use types::{ChainSpec, EthSpec, Slot}; - -/// Iterator over the values of a `BeaconState` vector field (like `block_roots`). -/// -/// Uses the freezer DB's separate table to load the values. -pub struct ChunkedVectorIter<'a, F, E, Hot, Cold> -where - F: Field, - E: EthSpec, - Hot: ItemStore, - Cold: ItemStore, -{ - pub(crate) store: &'a HotColdDB, - current_vindex: usize, - pub(crate) end_vindex: usize, - next_cindex: usize, - current_chunk: Chunk, -} - -impl<'a, F, E, Hot, Cold> ChunkedVectorIter<'a, F, E, Hot, Cold> -where - F: Field, - E: EthSpec, - Hot: ItemStore, - Cold: ItemStore, -{ - /// Create a new iterator which can yield elements from `start_vindex` up to the last - /// index stored by the restore point at `last_restore_point_slot`. - /// - /// The `freezer_upper_limit` slot should be the slot of a recent restore point as obtained from - /// `Root::freezer_upper_limit`. We pass it as a parameter so that the caller can - /// maintain a stable view of the database (see `HybridForwardsBlockRootsIterator`). - pub fn new( - store: &'a HotColdDB, - start_vindex: usize, - freezer_upper_limit: Slot, - spec: &ChainSpec, - ) -> Self { - let (_, end_vindex) = F::start_and_end_vindex(freezer_upper_limit, spec); - - // Set the next chunk to the one containing `start_vindex`. - let next_cindex = start_vindex / F::chunk_size(); - // Set the current chunk to the empty chunk, it will never be read. - let current_chunk = Chunk::default(); - - Self { - store, - current_vindex: start_vindex, - end_vindex, - next_cindex, - current_chunk, - } - } -} - -impl Iterator for ChunkedVectorIter<'_, F, E, Hot, Cold> -where - F: Field, - E: EthSpec, - Hot: ItemStore, - Cold: ItemStore, -{ - type Item = (usize, F::Value); - - fn next(&mut self) -> Option { - let chunk_size = F::chunk_size(); - - // Range exhausted, return `None` forever. - if self.current_vindex >= self.end_vindex { - None - } - // Value lies in the current chunk, return it. - else if self.current_vindex < self.next_cindex * chunk_size { - let vindex = self.current_vindex; - let val = self - .current_chunk - .values - .get(vindex % chunk_size) - .cloned() - .or_else(|| { - error!( - vector_index = vindex, - "Missing chunk value in forwards iterator" - ); - None - })?; - self.current_vindex += 1; - Some((vindex, val)) - } - // Need to load the next chunk, load it and recurse back into the in-range case. - else { - self.current_chunk = Chunk::load( - &self.store.cold_db, - F::column(), - &chunk_key(self.next_cindex), - ) - .map_err(|e| { - error!( - chunk_index = self.next_cindex, - error = ?e, - "Database error in forwards iterator" - ); - e - }) - .ok()? - .or_else(|| { - error!( - chunk_index = self.next_cindex, - "Missing chunk in forwards iterator" - ); - None - })?; - self.next_cindex += 1; - self.next() - } - } -} diff --git a/beacon_node/store/src/chunked_vector.rs b/beacon_node/store/src/chunked_vector.rs deleted file mode 100644 index 9c8114e0c1..0000000000 --- a/beacon_node/store/src/chunked_vector.rs +++ /dev/null @@ -1,922 +0,0 @@ -//! Space-efficient storage for `BeaconState` vector fields. -//! -//! This module provides logic for splitting the `Vector` fields of a `BeaconState` into -//! chunks, and storing those chunks in contiguous ranges in the on-disk database. The motiviation -//! for doing this is avoiding massive duplication in every on-disk state. For example, rather than -//! storing the whole `historical_roots` vector, which is updated once every couple of thousand -//! slots, at every slot, we instead store all the historical values as a chunked vector on-disk, -//! and fetch only the slice we need when reconstructing the `historical_roots` of a state. -//! -//! ## Terminology -//! -//! * **Chunk size**: the number of vector values stored per on-disk chunk. -//! * **Vector index** (vindex): index into all the historical values, identifying a single element -//! of the vector being stored. -//! * **Chunk index** (cindex): index into the keyspace of the on-disk database, identifying a chunk -//! of elements. To find the chunk index of a vector index: `cindex = vindex / chunk_size`. -use self::UpdatePattern::*; -use crate::*; -use milhouse::{List, Vector}; -use ssz::{Decode, Encode}; -use typenum::Unsigned; -use types::historical_summary::HistoricalSummary; - -/// Description of how a `BeaconState` field is updated during state processing. -/// -/// When storing a state, this allows us to efficiently store only those entries -/// which are not present in the DB already. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum UpdatePattern { - /// The value is updated once per `n` slots. - OncePerNSlots { - n: u64, - /// The slot at which the field begins to accumulate values. - /// - /// The field should not be read or written until `activation_slot` is reached, and the - /// activation slot should act as an offset when converting slots to vector indices. - activation_slot: Option, - /// The slot at which the field ceases to accumulate values. - /// - /// If this is `None` then the field is continually updated. - deactivation_slot: Option, - }, - /// The value is updated once per epoch, for the epoch `current_epoch - lag`. - OncePerEpoch { lag: u64 }, -} - -/// Map a chunk index to bytes that can be used to key the NoSQL database. -/// -/// We shift chunks up by 1 to make room for a genesis chunk that is handled separately. -pub fn chunk_key(cindex: usize) -> [u8; 8] { - (cindex as u64 + 1).to_be_bytes() -} - -/// Return the database key for the genesis value. -fn genesis_value_key() -> [u8; 8] { - 0u64.to_be_bytes() -} - -/// Trait for types representing fields of the `BeaconState`. -/// -/// All of the required methods are type-level, because we do most things with fields at the -/// type-level. We require their value-level witnesses to be `Copy` so that we can avoid the -/// turbofish when calling functions like `store_updated_vector`. -pub trait Field: Copy { - /// The type of value stored in this field: the `T` from `Vector`. - /// - /// The `Default` impl will be used to fill extra vector entries. - type Value: Default + std::fmt::Debug + milhouse::Value; - // Decode + Encode + Default + Clone + PartialEq + std::fmt::Debug - - /// The length of this field: the `N` from `Vector`. - type Length: Unsigned; - - /// The database column where the integer-indexed chunks for this field should be stored. - /// - /// Each field's column **must** be unique. - fn column() -> DBColumn; - - /// Update pattern for this field, so that we can do differential updates. - fn update_pattern(spec: &ChainSpec) -> UpdatePattern; - - /// The number of values to store per chunk on disk. - /// - /// Default is 128 so that we read/write 4K pages when the values are 32 bytes. - // TODO: benchmark and optimise this parameter - fn chunk_size() -> usize { - 128 - } - - /// Convert a v-index (vector index) to a chunk index. - fn chunk_index(vindex: usize) -> usize { - vindex / Self::chunk_size() - } - - /// Get the value of this field at the given vector index, from the state. - fn get_value( - state: &BeaconState, - vindex: u64, - spec: &ChainSpec, - ) -> Result; - - /// True if this is a `FixedLengthField`, false otherwise. - fn is_fixed_length() -> bool; - - /// Compute the start and end vector indices of the slice of history required at `current_slot`. - /// - /// ## Example - /// - /// If we have a field that is updated once per epoch, then the end vindex will be - /// `current_epoch + 1`, because we want to include the value for the current epoch, and the - /// start vindex will be `end_vindex - Self::Length`, because that's how far back we can look. - fn start_and_end_vindex(current_slot: Slot, spec: &ChainSpec) -> (usize, usize) { - // We take advantage of saturating subtraction on slots and epochs - match Self::update_pattern(spec) { - OncePerNSlots { - n, - activation_slot, - deactivation_slot, - } => { - // Per-slot changes exclude the index for the current slot, because - // it won't be set until the slot completes (think of `state_roots`, `block_roots`). - // This also works for the `historical_roots` because at the `n`th slot, the 0th - // entry of the list is created, and before that the list is empty. - // - // To account for the switch from historical roots to historical summaries at - // Capella we also modify the current slot by the activation and deactivation slots. - // The activation slot acts as an offset (subtraction) while the deactivation slot - // acts as a clamp (min). - let slot_with_clamp = deactivation_slot.map_or(current_slot, |deactivation_slot| { - std::cmp::min(current_slot, deactivation_slot) - }); - let slot_with_clamp_and_offset = if let Some(activation_slot) = activation_slot { - slot_with_clamp - activation_slot - } else { - // Return (0, 0) to indicate that the field should not be read/written. - return (0, 0); - }; - let end_vindex = slot_with_clamp_and_offset / n; - let start_vindex = end_vindex - Self::Length::to_u64(); - (start_vindex.as_usize(), end_vindex.as_usize()) - } - OncePerEpoch { lag } => { - // Per-epoch changes include the index for the current epoch, because it - // will have been set at the most recent epoch boundary. - let current_epoch = current_slot.epoch(E::slots_per_epoch()); - let end_epoch = current_epoch + 1 - lag; - let start_epoch = end_epoch + lag - Self::Length::to_u64(); - (start_epoch.as_usize(), end_epoch.as_usize()) - } - } - } - - /// Given an `existing_chunk` stored in the DB, construct an updated chunk to replace it. - fn get_updated_chunk( - existing_chunk: &Chunk, - chunk_index: usize, - start_vindex: usize, - end_vindex: usize, - state: &BeaconState, - spec: &ChainSpec, - ) -> Result, Error> { - let chunk_size = Self::chunk_size(); - let mut new_chunk = Chunk::new(vec![Self::Value::default(); chunk_size]); - - for i in 0..chunk_size { - let vindex = chunk_index * chunk_size + i; - if vindex >= start_vindex && vindex < end_vindex { - let vector_value = Self::get_value(state, vindex as u64, spec)?; - - if let Some(existing_value) = existing_chunk.values.get(i) - && *existing_value != vector_value - && *existing_value != Self::Value::default() - { - return Err(ChunkError::Inconsistent { - field: Self::column(), - chunk_index, - existing_value: format!("{:?}", existing_value), - new_value: format!("{:?}", vector_value), - } - .into()); - } - - new_chunk.values[i] = vector_value; - } else { - new_chunk.values[i] = existing_chunk.values.get(i).cloned().unwrap_or_default(); - } - } - - Ok(new_chunk) - } - - /// Determine whether a state at `slot` possesses (or requires) the genesis value. - fn slot_needs_genesis_value(slot: Slot, spec: &ChainSpec) -> bool { - let (_, end_vindex) = Self::start_and_end_vindex(slot, spec); - match Self::update_pattern(spec) { - // If the end_vindex is less than the length of the vector, then the vector - // has not yet been completely filled with non-genesis values, and so the genesis - // value is still required. - OncePerNSlots { .. } => { - Self::is_fixed_length() && end_vindex < Self::Length::to_usize() - } - // If the field has lag, then it takes an extra `lag` vindices beyond the - // `end_vindex` before the vector has been filled with non-genesis values. - OncePerEpoch { lag } => { - Self::is_fixed_length() && end_vindex + (lag as usize) < Self::Length::to_usize() - } - } - } - - /// Load the genesis value for a fixed length field from the store. - /// - /// This genesis value should be used to fill the initial state of the vector. - fn load_genesis_value>(store: &S) -> Result { - let key = &genesis_value_key()[..]; - let chunk = - Chunk::load(store, Self::column(), key)?.ok_or(ChunkError::MissingGenesisValue)?; - chunk - .values - .first() - .cloned() - .ok_or_else(|| ChunkError::MissingGenesisValue.into()) - } - - /// Store the given `value` as the genesis value for this field, unless stored already. - /// - /// Check the existing value (if any) for consistency with the value we intend to store, and - /// return an error if they are inconsistent. - fn check_and_store_genesis_value>( - store: &S, - value: Self::Value, - ops: &mut Vec, - ) -> Result<(), Error> { - let key = &genesis_value_key()[..]; - - if let Some(existing_chunk) = Chunk::::load(store, Self::column(), key)? { - if existing_chunk.values.len() != 1 { - Err(ChunkError::InvalidGenesisChunk { - field: Self::column(), - expected_len: 1, - observed_len: existing_chunk.values.len(), - } - .into()) - } else if existing_chunk.values[0] != value { - Err(ChunkError::InconsistentGenesisValue { - field: Self::column(), - existing_value: format!("{:?}", existing_chunk.values[0]), - new_value: format!("{:?}", value), - } - .into()) - } else { - Ok(()) - } - } else { - let chunk = Chunk::new(vec![value]); - chunk.store(Self::column(), &genesis_value_key()[..], ops)?; - Ok(()) - } - } - - /// Extract the genesis value for a fixed length field from an - /// - /// Will only return a correct value if `slot_needs_genesis_value(state.slot(), spec) == true`. - fn extract_genesis_value( - state: &BeaconState, - spec: &ChainSpec, - ) -> Result { - let (_, end_vindex) = Self::start_and_end_vindex(state.slot(), spec); - match Self::update_pattern(spec) { - // Genesis value is guaranteed to exist at `end_vindex`, as it won't yet have been - // updated - OncePerNSlots { .. } => Ok(Self::get_value(state, end_vindex as u64, spec)?), - // If there's lag, the value of the field at the vindex *without the lag* - // should still be set to the genesis value. - OncePerEpoch { lag } => Ok(Self::get_value(state, end_vindex as u64 + lag, spec)?), - } - } -} - -/// Marker trait for fixed-length fields (`Vector`). -pub trait FixedLengthField: Field {} - -/// Marker trait for variable-length fields (`List`). -pub trait VariableLengthField: Field {} - -/// Macro to implement the `Field` trait on a new unit struct type. -macro_rules! field { - ($struct_name:ident, $marker_trait:ident, $value_ty:ty, $length_ty:ty, $column:expr, - $update_pattern:expr, $get_value:expr) => { - #[derive(Clone, Copy)] - pub struct $struct_name; - - impl Field for $struct_name - where - E: EthSpec, - { - type Value = $value_ty; - type Length = $length_ty; - - fn column() -> DBColumn { - $column - } - - fn update_pattern(spec: &ChainSpec) -> UpdatePattern { - let update_pattern = $update_pattern; - update_pattern(spec) - } - - fn get_value( - state: &BeaconState, - vindex: u64, - spec: &ChainSpec, - ) -> Result { - let get_value = $get_value; - get_value(state, vindex, spec) - } - - fn is_fixed_length() -> bool { - stringify!($marker_trait) == "FixedLengthField" - } - } - - impl $marker_trait for $struct_name {} - }; -} - -field!( - BlockRootsChunked, - FixedLengthField, - Hash256, - E::SlotsPerHistoricalRoot, - DBColumn::BeaconBlockRootsChunked, - |_| OncePerNSlots { - n: 1, - activation_slot: Some(Slot::new(0)), - deactivation_slot: None - }, - |state: &BeaconState<_>, index, _| safe_modulo_vector_index(state.block_roots(), index) -); - -field!( - StateRootsChunked, - FixedLengthField, - Hash256, - E::SlotsPerHistoricalRoot, - DBColumn::BeaconStateRootsChunked, - |_| OncePerNSlots { - n: 1, - activation_slot: Some(Slot::new(0)), - deactivation_slot: None, - }, - |state: &BeaconState<_>, index, _| safe_modulo_vector_index(state.state_roots(), index) -); - -field!( - HistoricalRoots, - VariableLengthField, - Hash256, - E::HistoricalRootsLimit, - DBColumn::BeaconHistoricalRoots, - |spec: &ChainSpec| OncePerNSlots { - n: E::SlotsPerHistoricalRoot::to_u64(), - activation_slot: Some(Slot::new(0)), - deactivation_slot: spec - .capella_fork_epoch - .map(|fork_epoch| fork_epoch.start_slot(E::slots_per_epoch())), - }, - |state: &BeaconState<_>, index, _| safe_modulo_list_index(state.historical_roots(), index) -); - -field!( - RandaoMixes, - FixedLengthField, - Hash256, - E::EpochsPerHistoricalVector, - DBColumn::BeaconRandaoMixes, - |_| OncePerEpoch { lag: 1 }, - |state: &BeaconState<_>, index, _| safe_modulo_vector_index(state.randao_mixes(), index) -); - -field!( - HistoricalSummaries, - VariableLengthField, - HistoricalSummary, - E::HistoricalRootsLimit, - DBColumn::BeaconHistoricalSummaries, - |spec: &ChainSpec| OncePerNSlots { - n: E::SlotsPerHistoricalRoot::to_u64(), - activation_slot: spec - .capella_fork_epoch - .map(|fork_epoch| fork_epoch.start_slot(E::slots_per_epoch())), - deactivation_slot: None, - }, - |state: &BeaconState<_>, index, _| safe_modulo_list_index( - state - .historical_summaries() - .map_err(|_| ChunkError::InvalidFork)?, - index - ) -); - -pub fn store_updated_vector, E: EthSpec, S: KeyValueStore>( - field: F, - store: &S, - state: &BeaconState, - spec: &ChainSpec, - ops: &mut Vec, -) -> Result<(), Error> { - let chunk_size = F::chunk_size(); - let (start_vindex, end_vindex) = F::start_and_end_vindex(state.slot(), spec); - let start_cindex = start_vindex / chunk_size; - let end_cindex = end_vindex / chunk_size; - - // Store the genesis value if we have access to it, and it hasn't been stored already. - if F::slot_needs_genesis_value(state.slot(), spec) { - let genesis_value = F::extract_genesis_value(state, spec)?; - F::check_and_store_genesis_value(store, genesis_value, ops)?; - } - - // Start by iterating backwards from the last chunk, storing new chunks in the database. - // Stop once a chunk in the database matches what we were about to store, this indicates - // that a previously stored state has already filled-in a portion of the indices covered. - let full_range_checked = store_range( - field, - (start_cindex..=end_cindex).rev(), - start_vindex, - end_vindex, - store, - state, - spec, - ops, - )?; - - // If the previous `store_range` did not check the entire range, it may be the case that the - // state's vector includes elements at low vector indices that are not yet stored in the - // database, so run another `store_range` to ensure these values are also stored. - if !full_range_checked { - store_range( - field, - start_cindex..end_cindex, - start_vindex, - end_vindex, - store, - state, - spec, - ops, - )?; - } - - Ok(()) -} - -#[allow(clippy::too_many_arguments)] -fn store_range( - _: F, - range: I, - start_vindex: usize, - end_vindex: usize, - store: &S, - state: &BeaconState, - spec: &ChainSpec, - ops: &mut Vec, -) -> Result -where - F: Field, - E: EthSpec, - S: KeyValueStore, - I: Iterator, -{ - for chunk_index in range { - let chunk_key = &chunk_key(chunk_index)[..]; - - let existing_chunk = - Chunk::::load(store, F::column(), chunk_key)?.unwrap_or_default(); - - let new_chunk = F::get_updated_chunk( - &existing_chunk, - chunk_index, - start_vindex, - end_vindex, - state, - spec, - )?; - - if new_chunk == existing_chunk { - return Ok(false); - } - - new_chunk.store(F::column(), chunk_key, ops)?; - } - - Ok(true) -} - -// Chunks at the end index are included. -// TODO: could be more efficient with a real range query (perhaps RocksDB) -fn range_query, E: EthSpec, T: Decode + Encode>( - store: &S, - column: DBColumn, - start_index: usize, - end_index: usize, -) -> Result>, Error> { - let range = start_index..=end_index; - let len = range - .end() - // Add one to account for inclusive range. - .saturating_add(1) - .saturating_sub(*range.start()); - let mut result = Vec::with_capacity(len); - - for chunk_index in range { - let key = &chunk_key(chunk_index)[..]; - let chunk = Chunk::load(store, column, key)?.ok_or(ChunkError::Missing { chunk_index })?; - result.push(chunk); - } - - Ok(result) -} - -/// Combine chunks to form a list or vector of all values with vindex in `start_vindex..end_vindex`. -/// -/// The `length` parameter is the length of the vec to construct, with entries set to `default` if -/// they lie outside the vindex range. -fn stitch( - chunks: Vec>, - start_vindex: usize, - end_vindex: usize, - chunk_size: usize, - length: usize, - default: T, -) -> Result, ChunkError> { - if start_vindex + length < end_vindex { - return Err(ChunkError::OversizedRange { - start_vindex, - end_vindex, - length, - }); - } - - let start_cindex = start_vindex / chunk_size; - let end_cindex = end_vindex / chunk_size; - - let mut result = vec![default; length]; - - for (chunk_index, chunk) in (start_cindex..=end_cindex).zip(chunks.into_iter()) { - // All chunks but the last chunk must be full-sized - if chunk_index != end_cindex && chunk.values.len() != chunk_size { - return Err(ChunkError::InvalidSize { - chunk_index, - expected: chunk_size, - actual: chunk.values.len(), - }); - } - - // Copy the chunk entries into the result vector - for (i, value) in chunk.values.into_iter().enumerate() { - let vindex = chunk_index * chunk_size + i; - - if vindex >= start_vindex && vindex < end_vindex { - result[vindex % length] = value; - } - } - } - - Ok(result) -} - -pub fn load_vector_from_db, E: EthSpec, S: KeyValueStore>( - store: &S, - slot: Slot, - spec: &ChainSpec, -) -> Result, Error> { - // Do a range query - let chunk_size = F::chunk_size(); - let (start_vindex, end_vindex) = F::start_and_end_vindex(slot, spec); - let start_cindex = start_vindex / chunk_size; - let end_cindex = end_vindex / chunk_size; - - let chunks = range_query(store, F::column(), start_cindex, end_cindex)?; - - let default = if F::slot_needs_genesis_value(slot, spec) { - F::load_genesis_value(store)? - } else { - F::Value::default() - }; - - let result = stitch( - chunks, - start_vindex, - end_vindex, - chunk_size, - F::Length::to_usize(), - default, - )?; - - Ok(Vector::new(result).map_err(ChunkError::Milhouse)?) -} - -/// The historical roots are stored in vector chunks, despite not actually being a vector. -pub fn load_variable_list_from_db, E: EthSpec, S: KeyValueStore>( - store: &S, - slot: Slot, - spec: &ChainSpec, -) -> Result, Error> { - let chunk_size = F::chunk_size(); - let (start_vindex, end_vindex) = F::start_and_end_vindex(slot, spec); - let start_cindex = start_vindex / chunk_size; - let end_cindex = end_vindex / chunk_size; - - let chunks: Vec> = range_query(store, F::column(), start_cindex, end_cindex)?; - - let mut result = Vec::with_capacity(chunk_size * chunks.len()); - - for (chunk_index, chunk) in chunks.into_iter().enumerate() { - for (i, value) in chunk.values.into_iter().enumerate() { - let vindex = chunk_index * chunk_size + i; - - if vindex >= start_vindex && vindex < end_vindex { - result.push(value); - } - } - } - - Ok(List::new(result).map_err(ChunkError::Milhouse)?) -} - -/// Index into a `List` field of the state, avoiding out of bounds and division by 0. -fn safe_modulo_list_index( - values: &List, - index: u64, -) -> Result { - if values.is_empty() { - Err(ChunkError::ZeroLengthList) - } else { - values - .get(index as usize % values.len()) - .copied() - .ok_or(ChunkError::IndexOutOfBounds { index }) - } -} - -fn safe_modulo_vector_index( - values: &Vector, - index: u64, -) -> Result { - if values.is_empty() { - Err(ChunkError::ZeroLengthVector) - } else { - values - .get(index as usize % values.len()) - .copied() - .ok_or(ChunkError::IndexOutOfBounds { index }) - } -} - -/// A chunk of a fixed-size vector from the `BeaconState`, stored in the database. -#[derive(Debug, Clone, PartialEq)] -pub struct Chunk { - /// A vector of up-to `chunk_size` values. - pub values: Vec, -} - -impl Default for Chunk -where - T: Decode + Encode, -{ - fn default() -> Self { - Chunk { values: vec![] } - } -} - -impl Chunk -where - T: Decode + Encode, -{ - pub fn new(values: Vec) -> Self { - Chunk { values } - } - - pub fn load, E: EthSpec>( - store: &S, - column: DBColumn, - key: &[u8], - ) -> Result, Error> { - store - .get_bytes(column, key)? - .map(|bytes| Self::decode(&bytes)) - .transpose() - } - - pub fn store( - &self, - column: DBColumn, - key: &[u8], - ops: &mut Vec, - ) -> Result<(), Error> { - ops.push(KeyValueStoreOp::PutKeyValue( - column, - key.to_vec(), - self.encode()?, - )); - Ok(()) - } - - /// Attempt to decode a single chunk. - pub fn decode(bytes: &[u8]) -> Result { - if !::is_ssz_fixed_len() { - return Err(Error::from(ChunkError::InvalidType)); - } - - let value_size = ::ssz_fixed_len(); - - if value_size == 0 { - return Err(Error::from(ChunkError::InvalidType)); - } - - let values = bytes - .chunks(value_size) - .map(T::from_ssz_bytes) - .collect::>()?; - - Ok(Chunk { values }) - } - - pub fn encoded_size(&self) -> usize { - self.values.len() * ::ssz_fixed_len() - } - - /// Encode a single chunk as bytes. - pub fn encode(&self) -> Result, Error> { - if !::is_ssz_fixed_len() { - return Err(Error::from(ChunkError::InvalidType)); - } - - Ok(self.values.iter().flat_map(T::as_ssz_bytes).collect()) - } -} - -#[derive(Debug, PartialEq)] -pub enum ChunkError { - ZeroLengthVector, - ZeroLengthList, - IndexOutOfBounds { - index: u64, - }, - InvalidSize { - chunk_index: usize, - expected: usize, - actual: usize, - }, - Missing { - chunk_index: usize, - }, - MissingGenesisValue, - Inconsistent { - field: DBColumn, - chunk_index: usize, - existing_value: String, - new_value: String, - }, - InconsistentGenesisValue { - field: DBColumn, - existing_value: String, - new_value: String, - }, - InvalidGenesisChunk { - field: DBColumn, - expected_len: usize, - observed_len: usize, - }, - InvalidType, - OversizedRange { - start_vindex: usize, - end_vindex: usize, - length: usize, - }, - InvalidFork, - Milhouse(milhouse::Error), -} - -impl From for ChunkError { - fn from(e: milhouse::Error) -> ChunkError { - Self::Milhouse(e) - } -} - -#[cfg(test)] -mod test { - use super::*; - use fixed_bytes::FixedBytesExtended; - use types::MainnetEthSpec as TestSpec; - use types::*; - - fn v(i: u64) -> Hash256 { - Hash256::from_low_u64_be(i) - } - - #[test] - fn stitch_default() { - let chunk_size = 4; - - let chunks = vec![ - Chunk::new(vec![0u64, 1, 2, 3]), - Chunk::new(vec![4, 5, 0, 0]), - ]; - - assert_eq!( - stitch(chunks, 2, 6, chunk_size, 12, 99).unwrap(), - vec![99, 99, 2, 3, 4, 5, 99, 99, 99, 99, 99, 99] - ); - } - - #[test] - fn stitch_basic() { - let chunk_size = 4; - let default = v(0); - - let chunks = vec![ - Chunk::new(vec![v(0), v(1), v(2), v(3)]), - Chunk::new(vec![v(4), v(5), v(6), v(7)]), - Chunk::new(vec![v(8), v(9), v(10), v(11)]), - ]; - - assert_eq!( - stitch(chunks.clone(), 0, 12, chunk_size, 12, default).unwrap(), - (0..12).map(v).collect::>() - ); - - assert_eq!( - stitch(chunks, 2, 10, chunk_size, 8, default).unwrap(), - vec![v(8), v(9), v(2), v(3), v(4), v(5), v(6), v(7)] - ); - } - - #[test] - fn stitch_oversized_range() { - let chunk_size = 4; - let default = 0; - - let chunks = vec![Chunk::new(vec![20u64, 21, 22, 23])]; - - // Args (start_vindex, end_vindex, length) - let args = vec![(0, 21, 20), (0, 2048, 1024), (0, 2, 1)]; - - for (start_vindex, end_vindex, length) in args { - assert_eq!( - stitch( - chunks.clone(), - start_vindex, - end_vindex, - chunk_size, - length, - default - ), - Err(ChunkError::OversizedRange { - start_vindex, - end_vindex, - length, - }) - ); - } - } - - #[test] - fn fixed_length_fields() { - fn test_fixed_length>(_: F, expected: bool) { - assert_eq!(F::is_fixed_length(), expected); - } - test_fixed_length(BlockRootsChunked, true); - test_fixed_length(StateRootsChunked, true); - test_fixed_length(HistoricalRoots, false); - test_fixed_length(RandaoMixes, true); - } - - fn needs_genesis_value_once_per_slot>(_: F) { - let spec = &TestSpec::default_spec(); - let max = F::Length::to_u64(); - for i in 0..max { - assert!( - F::slot_needs_genesis_value(Slot::new(i), spec), - "slot {}", - i - ); - } - assert!(!F::slot_needs_genesis_value(Slot::new(max), spec)); - } - - #[test] - fn needs_genesis_value_block_roots() { - needs_genesis_value_once_per_slot(BlockRootsChunked); - } - - #[test] - fn needs_genesis_value_state_roots() { - needs_genesis_value_once_per_slot(StateRootsChunked); - } - - #[test] - fn needs_genesis_value_historical_roots() { - let spec = &TestSpec::default_spec(); - assert!( - !>::slot_needs_genesis_value(Slot::new(0), spec) - ); - } - - fn needs_genesis_value_test_randao>(_: F) { - let spec = &TestSpec::default_spec(); - let max = TestSpec::slots_per_epoch() * (F::Length::to_u64() - 1); - for i in 0..max { - assert!( - F::slot_needs_genesis_value(Slot::new(i), spec), - "slot {}", - i - ); - } - assert!(!F::slot_needs_genesis_value(Slot::new(max), spec)); - } - - #[test] - fn needs_genesis_value_randao() { - needs_genesis_value_test_randao(RandaoMixes); - } -} diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 6da99b7bd6..a07cc83886 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -1,4 +1,3 @@ -use crate::chunked_vector::ChunkError; use crate::config::StoreConfigError; use crate::hot_cold_store::{HotColdDBError, StateSummaryIteratorError}; use crate::{DBColumn, hdiff}; @@ -13,9 +12,7 @@ pub type Result = std::result::Result; #[derive(Debug)] pub enum Error { SszDecodeError(DecodeError), - VectorChunkError(ChunkError), BeaconStateError(BeaconStateError), - PartialBeaconStateError, HotColdDBError(HotColdDBError), DBError { message: String, @@ -126,12 +123,6 @@ impl From for Error { } } -impl From for Error { - fn from(e: ChunkError) -> Error { - Error::VectorChunkError(e) - } -} - impl From for Error { fn from(e: HotColdDBError) -> Error { Error::HotColdDBError(e) diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index a3d4e4a8ce..ae5b2e1e57 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -8,8 +8,6 @@ //! Provides a simple API for storing/retrieving all types that sometimes needs type-hints. See //! tests for implementation examples. pub mod blob_sidecar_list_from_root; -pub mod chunked_iter; -pub mod chunked_vector; pub mod config; pub mod consensus_context; pub mod errors; @@ -21,7 +19,6 @@ mod impls; mod memory_store; pub mod metadata; pub mod metrics; -pub mod partial_beacon_state; pub mod reconstruct; pub mod state_cache; diff --git a/beacon_node/store/src/partial_beacon_state.rs b/beacon_node/store/src/partial_beacon_state.rs deleted file mode 100644 index 9e5e1ebbb4..0000000000 --- a/beacon_node/store/src/partial_beacon_state.rs +++ /dev/null @@ -1,511 +0,0 @@ -use crate::chunked_vector::{ - BlockRootsChunked, HistoricalRoots, HistoricalSummaries, RandaoMixes, StateRootsChunked, - load_variable_list_from_db, load_vector_from_db, -}; -use crate::{DBColumn, Error, KeyValueStore, KeyValueStoreOp}; -use milhouse::{List, Vector}; -use ssz::{BitVector, Decode, DecodeError, Encode}; -use ssz_derive::{Decode, Encode}; -use std::sync::Arc; -use superstruct::superstruct; -use types::historical_summary::HistoricalSummary; -use types::*; - -/// DEPRECATED Lightweight variant of the `BeaconState` that is stored in the database. -/// -/// Utilises lazy-loading from separate storage for its vector fields. -/// -/// This can be deleted once schema versions prior to V22 are no longer supported. -#[superstruct( - variants(Base, Altair, Bellatrix, Capella, Deneb, Electra, Fulu, Gloas), - variant_attributes(derive(Debug, PartialEq, Clone, Encode, Decode)) -)] -#[derive(Debug, PartialEq, Clone, Encode)] -#[ssz(enum_behaviour = "transparent")] -pub struct PartialBeaconState -where - E: EthSpec, -{ - // Versioning - pub genesis_time: u64, - pub genesis_validators_root: Hash256, - #[superstruct(getter(copy))] - pub slot: Slot, - pub fork: Fork, - - // History - pub latest_block_header: BeaconBlockHeader, - - #[ssz(skip_serializing, skip_deserializing)] - pub block_roots: Option>, - #[ssz(skip_serializing, skip_deserializing)] - pub state_roots: Option>, - - #[ssz(skip_serializing, skip_deserializing)] - pub historical_roots: Option>, - - // Ethereum 1.0 chain data - pub eth1_data: Eth1Data, - pub eth1_data_votes: List, - pub eth1_deposit_index: u64, - - // Registry - pub validators: List, - pub balances: List, - - // Shuffling - /// Randao value from the current slot, for patching into the per-epoch randao vector. - pub latest_randao_value: Hash256, - #[ssz(skip_serializing, skip_deserializing)] - pub randao_mixes: Option>, - - // Slashings - slashings: Vector, - - // Attestations (genesis fork only) - #[superstruct(only(Base))] - pub previous_epoch_attestations: List, E::MaxPendingAttestations>, - #[superstruct(only(Base))] - pub current_epoch_attestations: List, E::MaxPendingAttestations>, - - // Participation (Altair and later) - #[superstruct(only(Altair, Bellatrix, Capella, Deneb, Electra, Fulu, Gloas))] - pub previous_epoch_participation: List, - #[superstruct(only(Altair, Bellatrix, Capella, Deneb, Electra, Fulu, Gloas))] - pub current_epoch_participation: List, - - // Finality - pub justification_bits: BitVector, - pub previous_justified_checkpoint: Checkpoint, - pub current_justified_checkpoint: Checkpoint, - pub finalized_checkpoint: Checkpoint, - - // Inactivity - #[superstruct(only(Altair, Bellatrix, Capella, Deneb, Electra, Fulu, Gloas))] - pub inactivity_scores: List, - - // Light-client sync committees - #[superstruct(only(Altair, Bellatrix, Capella, Deneb, Electra, Fulu, Gloas))] - pub current_sync_committee: Arc>, - #[superstruct(only(Altair, Bellatrix, Capella, Deneb, Electra, Fulu, Gloas))] - pub next_sync_committee: Arc>, - - // Execution - #[superstruct( - only(Bellatrix), - partial_getter(rename = "latest_execution_payload_header_bellatrix") - )] - pub latest_execution_payload_header: ExecutionPayloadHeaderBellatrix, - #[superstruct( - only(Capella), - partial_getter(rename = "latest_execution_payload_header_capella") - )] - pub latest_execution_payload_header: ExecutionPayloadHeaderCapella, - #[superstruct( - only(Deneb), - partial_getter(rename = "latest_execution_payload_header_deneb") - )] - pub latest_execution_payload_header: ExecutionPayloadHeaderDeneb, - #[superstruct( - only(Electra), - partial_getter(rename = "latest_execution_payload_header_electra") - )] - pub latest_execution_payload_header: ExecutionPayloadHeaderElectra, - #[superstruct( - only(Fulu), - partial_getter(rename = "latest_execution_payload_header_fulu") - )] - pub latest_execution_payload_header: ExecutionPayloadHeaderFulu, - - #[superstruct( - only(Gloas), - partial_getter(rename = "latest_execution_payload_bid_gloas") - )] - pub latest_execution_payload_bid: ExecutionPayloadBid, - - // Capella - #[superstruct(only(Capella, Deneb, Electra, Fulu, Gloas))] - pub next_withdrawal_index: u64, - #[superstruct(only(Capella, Deneb, Electra, Fulu, Gloas))] - pub next_withdrawal_validator_index: u64, - - #[ssz(skip_serializing, skip_deserializing)] - #[superstruct(only(Capella, Deneb, Electra, Fulu, Gloas))] - pub historical_summaries: Option>, - - // Electra - #[superstruct(only(Electra, Fulu, Gloas))] - pub deposit_requests_start_index: u64, - #[superstruct(only(Electra, Fulu, Gloas))] - pub deposit_balance_to_consume: u64, - #[superstruct(only(Electra, Fulu, Gloas))] - pub exit_balance_to_consume: u64, - #[superstruct(only(Electra, Fulu, Gloas))] - pub earliest_exit_epoch: Epoch, - #[superstruct(only(Electra, Fulu, Gloas))] - pub consolidation_balance_to_consume: u64, - #[superstruct(only(Electra, Fulu, Gloas))] - pub earliest_consolidation_epoch: Epoch, - - #[superstruct(only(Electra, Fulu, Gloas))] - pub pending_deposits: List, - #[superstruct(only(Electra, Fulu, Gloas))] - pub pending_partial_withdrawals: - List, - #[superstruct(only(Electra, Fulu, Gloas))] - pub pending_consolidations: List, - #[superstruct(only(Fulu, Gloas))] - pub proposer_lookahead: Vector, - - // Gloas - #[superstruct(only(Gloas))] - pub execution_payload_availability: BitVector, - - #[superstruct(only(Gloas))] - pub builder_pending_payments: Vector, - - #[superstruct(only(Gloas))] - pub builder_pending_withdrawals: - List, - - #[superstruct(only(Gloas))] - pub latest_block_hash: ExecutionBlockHash, - - #[superstruct(only(Gloas))] - pub latest_withdrawals_root: Hash256, -} - -impl PartialBeaconState { - /// SSZ decode. - pub fn from_ssz_bytes(bytes: &[u8], spec: &ChainSpec) -> Result { - // Slot is after genesis_time (u64) and genesis_validators_root (Hash256). - let slot_offset = ::ssz_fixed_len() + ::ssz_fixed_len(); - let slot_len = ::ssz_fixed_len(); - let slot_bytes = bytes.get(slot_offset..slot_offset + slot_len).ok_or( - DecodeError::InvalidByteLength { - len: bytes.len(), - expected: slot_offset + slot_len, - }, - )?; - - let slot = Slot::from_ssz_bytes(slot_bytes)?; - let fork_at_slot = spec.fork_name_at_slot::(slot); - - Ok(map_fork_name!( - fork_at_slot, - Self, - <_>::from_ssz_bytes(bytes)? - )) - } - - /// Prepare the partial state for storage in the KV database. - pub fn as_kv_store_op(&self, state_root: Hash256) -> KeyValueStoreOp { - KeyValueStoreOp::PutKeyValue( - DBColumn::BeaconState, - state_root.as_slice().to_vec(), - self.as_ssz_bytes(), - ) - } - - pub fn load_block_roots>( - &mut self, - store: &S, - spec: &ChainSpec, - ) -> Result<(), Error> { - if self.block_roots().is_none() { - *self.block_roots_mut() = Some(load_vector_from_db::( - store, - self.slot(), - spec, - )?); - } - Ok(()) - } - - pub fn load_state_roots>( - &mut self, - store: &S, - spec: &ChainSpec, - ) -> Result<(), Error> { - if self.state_roots().is_none() { - *self.state_roots_mut() = Some(load_vector_from_db::( - store, - self.slot(), - spec, - )?); - } - Ok(()) - } - - pub fn load_historical_roots>( - &mut self, - store: &S, - spec: &ChainSpec, - ) -> Result<(), Error> { - if self.historical_roots().is_none() { - *self.historical_roots_mut() = Some( - load_variable_list_from_db::(store, self.slot(), spec)?, - ); - } - Ok(()) - } - - pub fn load_historical_summaries>( - &mut self, - store: &S, - spec: &ChainSpec, - ) -> Result<(), Error> { - let slot = self.slot(); - if let Ok(historical_summaries) = self.historical_summaries_mut() - && historical_summaries.is_none() - { - *historical_summaries = Some(load_variable_list_from_db::( - store, slot, spec, - )?); - } - Ok(()) - } - - pub fn load_randao_mixes>( - &mut self, - store: &S, - spec: &ChainSpec, - ) -> Result<(), Error> { - if self.randao_mixes().is_none() { - // Load the per-epoch values from the database - let mut randao_mixes = - load_vector_from_db::(store, self.slot(), spec)?; - - // Patch the value for the current slot into the index for the current epoch - let current_epoch = self.slot().epoch(E::slots_per_epoch()); - let len = randao_mixes.len(); - *randao_mixes - .get_mut(current_epoch.as_usize() % len) - .ok_or(Error::RandaoMixOutOfBounds)? = *self.latest_randao_value(); - - *self.randao_mixes_mut() = Some(randao_mixes) - } - Ok(()) - } -} - -/// Implement the conversion from PartialBeaconState -> BeaconState. -macro_rules! impl_try_into_beacon_state { - ($inner:ident, $variant_name:ident, $struct_name:ident, [$($extra_fields:ident),*], [$($extra_opt_fields:ident),*]) => { - BeaconState::$variant_name($struct_name { - // Versioning - genesis_time: $inner.genesis_time, - genesis_validators_root: $inner.genesis_validators_root, - slot: $inner.slot, - fork: $inner.fork, - - // History - latest_block_header: $inner.latest_block_header, - block_roots: unpack_field($inner.block_roots)?, - state_roots: unpack_field($inner.state_roots)?, - historical_roots: unpack_field($inner.historical_roots)?, - - // Eth1 - eth1_data: $inner.eth1_data, - eth1_data_votes: $inner.eth1_data_votes, - eth1_deposit_index: $inner.eth1_deposit_index, - - // Validator registry - validators: $inner.validators, - balances: $inner.balances, - - // Shuffling - randao_mixes: unpack_field($inner.randao_mixes)?, - - // Slashings - slashings: $inner.slashings, - - // Finality - justification_bits: $inner.justification_bits, - previous_justified_checkpoint: $inner.previous_justified_checkpoint, - current_justified_checkpoint: $inner.current_justified_checkpoint, - finalized_checkpoint: $inner.finalized_checkpoint, - - // Caching - total_active_balance: <_>::default(), - progressive_balances_cache: <_>::default(), - committee_caches: <_>::default(), - pubkey_cache: <_>::default(), - exit_cache: <_>::default(), - slashings_cache: <_>::default(), - epoch_cache: <_>::default(), - - // Variant-specific fields - $( - $extra_fields: $inner.$extra_fields - ),*, - - // Variant-specific optional fields - $( - $extra_opt_fields: unpack_field($inner.$extra_opt_fields)? - ),* - }) - } -} - -fn unpack_field(x: Option) -> Result { - x.ok_or(Error::PartialBeaconStateError) -} - -impl TryInto> for PartialBeaconState { - type Error = Error; - - fn try_into(self) -> Result, Error> { - let state = match self { - PartialBeaconState::Base(inner) => impl_try_into_beacon_state!( - inner, - Base, - BeaconStateBase, - [previous_epoch_attestations, current_epoch_attestations], - [] - ), - PartialBeaconState::Altair(inner) => impl_try_into_beacon_state!( - inner, - Altair, - BeaconStateAltair, - [ - previous_epoch_participation, - current_epoch_participation, - current_sync_committee, - next_sync_committee, - inactivity_scores - ], - [] - ), - PartialBeaconState::Bellatrix(inner) => impl_try_into_beacon_state!( - inner, - Bellatrix, - BeaconStateBellatrix, - [ - previous_epoch_participation, - current_epoch_participation, - current_sync_committee, - next_sync_committee, - inactivity_scores, - latest_execution_payload_header - ], - [] - ), - PartialBeaconState::Capella(inner) => impl_try_into_beacon_state!( - inner, - Capella, - BeaconStateCapella, - [ - previous_epoch_participation, - current_epoch_participation, - current_sync_committee, - next_sync_committee, - inactivity_scores, - latest_execution_payload_header, - next_withdrawal_index, - next_withdrawal_validator_index - ], - [historical_summaries] - ), - PartialBeaconState::Deneb(inner) => impl_try_into_beacon_state!( - inner, - Deneb, - BeaconStateDeneb, - [ - previous_epoch_participation, - current_epoch_participation, - current_sync_committee, - next_sync_committee, - inactivity_scores, - latest_execution_payload_header, - next_withdrawal_index, - next_withdrawal_validator_index - ], - [historical_summaries] - ), - PartialBeaconState::Electra(inner) => impl_try_into_beacon_state!( - inner, - Electra, - BeaconStateElectra, - [ - previous_epoch_participation, - current_epoch_participation, - current_sync_committee, - next_sync_committee, - inactivity_scores, - latest_execution_payload_header, - next_withdrawal_index, - next_withdrawal_validator_index, - deposit_requests_start_index, - deposit_balance_to_consume, - exit_balance_to_consume, - earliest_exit_epoch, - consolidation_balance_to_consume, - earliest_consolidation_epoch, - pending_deposits, - pending_partial_withdrawals, - pending_consolidations - ], - [historical_summaries] - ), - PartialBeaconState::Fulu(inner) => impl_try_into_beacon_state!( - inner, - Fulu, - BeaconStateFulu, - [ - previous_epoch_participation, - current_epoch_participation, - current_sync_committee, - next_sync_committee, - inactivity_scores, - latest_execution_payload_header, - next_withdrawal_index, - next_withdrawal_validator_index, - deposit_requests_start_index, - deposit_balance_to_consume, - exit_balance_to_consume, - earliest_exit_epoch, - consolidation_balance_to_consume, - earliest_consolidation_epoch, - pending_deposits, - pending_partial_withdrawals, - pending_consolidations, - proposer_lookahead - ], - [historical_summaries] - ), - PartialBeaconState::Gloas(inner) => impl_try_into_beacon_state!( - inner, - Gloas, - BeaconStateGloas, - [ - previous_epoch_participation, - current_epoch_participation, - current_sync_committee, - next_sync_committee, - inactivity_scores, - latest_execution_payload_bid, - next_withdrawal_index, - next_withdrawal_validator_index, - deposit_requests_start_index, - deposit_balance_to_consume, - exit_balance_to_consume, - earliest_exit_epoch, - consolidation_balance_to_consume, - earliest_consolidation_epoch, - pending_deposits, - pending_partial_withdrawals, - pending_consolidations, - proposer_lookahead, - execution_payload_availability, - builder_pending_payments, - builder_pending_withdrawals, - latest_block_hash, - latest_withdrawals_root - ], - [historical_summaries] - ), - }; - Ok(state) - } -}