diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index d7619e6671..5c86280cc5 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -846,7 +846,7 @@ impl, Cold: ItemStore> HotColdDB state: &BeaconState, ops: &mut Vec, ) -> Result<(), Error> { - store_full_state(state_root, state, ops) + store_full_state(state_root, state, ops, &self.config) } /// Get a post-finalization state from the database or store. @@ -1056,8 +1056,16 @@ impl, Cold: ItemStore> HotColdDB &self, state_root: &Hash256, ) -> Result<(BeaconState, Hash256), Error> { - let mut state = get_full_state(&self.hot_db, state_root, &self.spec)? - .ok_or(HotColdDBError::MissingEpochBoundaryState(*state_root))?; + let pubkey_cache = self.immutable_validators.read(); + let immutable_validators = |i: usize| pubkey_cache.get_validator(i); + let mut state = get_full_state( + &self.hot_db, + state_root, + immutable_validators, + &self.config, + &self.spec, + )? + .ok_or(HotColdDBError::MissingEpochBoundaryState(*state_root))?; // Do a tree hash here so that the cache is fully built. state.update_tree_hash_cache()?; diff --git a/beacon_node/store/src/impls/beacon_state.rs b/beacon_node/store/src/impls/beacon_state.rs index da07d3fe77..5ebb0dbd18 100644 --- a/beacon_node/store/src/impls/beacon_state.rs +++ b/beacon_node/store/src/impls/beacon_state.rs @@ -1,63 +1,83 @@ use crate::*; use ssz::{DecodeError, Encode}; use ssz_derive::Encode; -use std::convert::TryInto; +use std::io::{Read, Write}; use std::sync::Arc; -use types::beacon_state::{CommitteeCache, CACHED_EPOCHS}; +use types::{CompactBeaconState, ValidatorImmutable}; +use zstd::{Decoder, Encoder}; pub fn store_full_state( state_root: &Hash256, state: &BeaconState, ops: &mut Vec, + config: &StoreConfig, ) -> Result<(), Error> { let bytes = { let _overhead_timer = metrics::start_timer(&metrics::BEACON_STATE_WRITE_OVERHEAD_TIMES); StorageContainer::new(state).as_ssz_bytes() }; - metrics::inc_counter_by(&metrics::BEACON_STATE_WRITE_BYTES, bytes.len() as u64); + let mut compressed_value = Vec::with_capacity(config.estimate_compressed_size(bytes.len())); + let mut encoder = Encoder::new(&mut compressed_value, config.compression_level) + .map_err(Error::Compression)?; + encoder.write_all(&bytes).map_err(Error::Compression)?; + encoder.finish().map_err(Error::Compression)?; + + metrics::inc_counter_by( + &metrics::BEACON_STATE_WRITE_BYTES, + compressed_value.len() as u64, + ); metrics::inc_counter(&metrics::BEACON_STATE_WRITE_COUNT); + let key = get_key_for_col(DBColumn::BeaconState.into(), state_root.as_bytes()); - ops.push(KeyValueStoreOp::PutKeyValue(key, bytes)); + ops.push(KeyValueStoreOp::PutKeyValue(key, compressed_value)); Ok(()) } -pub fn get_full_state, E: EthSpec>( +pub fn get_full_state, E: EthSpec, F>( db: &KV, state_root: &Hash256, + immutable_validators: F, + config: &StoreConfig, spec: &ChainSpec, -) -> Result>, Error> { +) -> Result>, Error> +where + F: Fn(usize) -> Option>, +{ let total_timer = metrics::start_timer(&metrics::BEACON_STATE_READ_TIMES); match db.get_bytes(DBColumn::BeaconState.into(), state_root.as_bytes())? { Some(bytes) => { + let mut ssz_bytes = Vec::with_capacity(config.estimate_decompressed_size(bytes.len())); + let mut decoder = Decoder::new(&*bytes).map_err(Error::Compression)?; + decoder + .read_to_end(&mut ssz_bytes) + .map_err(Error::Compression)?; + let overhead_timer = metrics::start_timer(&metrics::BEACON_STATE_READ_OVERHEAD_TIMES); - let container = StorageContainer::from_ssz_bytes(&bytes, spec)?; + let container = StorageContainer::from_ssz_bytes(&ssz_bytes, spec)?; metrics::stop_timer(overhead_timer); metrics::stop_timer(total_timer); metrics::inc_counter(&metrics::BEACON_STATE_READ_COUNT); metrics::inc_counter_by(&metrics::BEACON_STATE_READ_BYTES, bytes.len() as u64); - Ok(Some(container.try_into()?)) + Ok(Some(container.into_beacon_state(immutable_validators)?)) } None => Ok(None), } } /// A container for storing `BeaconState` components. -// TODO: would be more space efficient with the caches stored separately and referenced by hash #[derive(Encode)] pub struct StorageContainer { - state: BeaconState, - committee_caches: Vec>, + state: CompactBeaconState, } impl StorageContainer { /// Create a new instance for storing a `BeaconState`. pub fn new(state: &BeaconState) -> Self { Self { - state: state.clone(), - committee_caches: state.committee_caches().to_vec(), + state: state.clone().into_compact_state(), } } @@ -67,36 +87,20 @@ impl StorageContainer { let mut builder = ssz::SszDecoderBuilder::new(bytes); builder.register_anonymous_variable_length_item()?; - builder.register_type::>()?; let mut decoder = builder.build()?; - let state = decoder.decode_next_with(|bytes| BeaconState::from_ssz_bytes(bytes, spec))?; - let committee_caches = decoder.decode_next()?; + let state = + decoder.decode_next_with(|bytes| CompactBeaconState::from_ssz_bytes(bytes, spec))?; - Ok(Self { - state, - committee_caches, - }) + Ok(Self { state }) } -} - -impl TryInto> for StorageContainer { - type Error = Error; - - fn try_into(mut self) -> Result, Error> { - let mut state = self.state; - - for i in (0..CACHED_EPOCHS).rev() { - if i >= self.committee_caches.len() { - return Err(Error::SszDecodeError(DecodeError::BytesInvalid( - "Insufficient committees for BeaconState".to_string(), - ))); - }; - - state.committee_caches_mut()[i] = self.committee_caches.remove(i); - } + fn into_beacon_state(self, immutable_validators: F) -> Result, Error> + where + F: Fn(usize) -> Option>, + { + let state = self.state.try_into_full_state(immutable_validators)?; Ok(state) } } diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index 7a31d74827..caaa0ecfd4 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -1,6 +1,7 @@ use self::committee_cache::get_active_validator_indices; use self::exit_cache::ExitCache; use crate::test_utils::TestRandom; +use crate::validator::ValidatorTrait; use crate::*; use compare_fields::CompareFields; use compare_fields_derive::CompareFields; @@ -31,6 +32,7 @@ pub use milhouse::{interface::Interface, List as VList, List, Vector as FixedVec #[macro_use] mod committee_cache; +pub mod compact_state; mod diff; mod exit_cache; mod iter; @@ -138,6 +140,7 @@ pub enum Error { current_fork: ForkName, }, TotalActiveBalanceDiffUninitialized, + MissingImmutableValidator(usize), } /// Control whether an epoch-indexed field can be indexed at the next epoch or not. @@ -212,7 +215,7 @@ impl From for Hash256 { #[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] #[tree_hash(enum_behaviour = "transparent")] #[ssz(enum_behaviour = "transparent")] -pub struct BeaconState +pub struct BeaconState where T: EthSpec, { @@ -246,7 +249,7 @@ where // Registry #[test_random(default)] - pub validators: VList, + pub validators: VList, // FIXME(sproul): serde quoting // #[serde(with = "ssz_types::serde_utils::quoted_u64_var_list")] #[test_random(default)] @@ -418,30 +421,6 @@ impl BeaconState { } } - /// Specialised deserialisation method that uses the `ChainSpec` as context. - #[allow(clippy::integer_arithmetic)] - pub fn from_ssz_bytes(bytes: &[u8], spec: &ChainSpec) -> Result { - // Slot is after genesis_time (u64) and genesis_validators_root (Hash256). - let slot_start = ::ssz_fixed_len() + ::ssz_fixed_len(); - let slot_end = slot_start + ::ssz_fixed_len(); - - let slot_bytes = bytes - .get(slot_start..slot_end) - .ok_or(DecodeError::InvalidByteLength { - len: bytes.len(), - expected: slot_end, - })?; - - let slot = Slot::from_ssz_bytes(slot_bytes)?; - let fork_at_slot = spec.fork_name_at_slot::(slot); - - Ok(map_fork_name!( - fork_at_slot, - Self, - <_>::from_ssz_bytes(bytes)? - )) - } - /// Returns the `tree_hash_root` of the state. /// /// Spec v0.12.1 @@ -1668,6 +1647,32 @@ impl BeaconState { } } +impl BeaconState { + /// Specialised deserialisation method that uses the `ChainSpec` as context. + #[allow(clippy::integer_arithmetic)] + pub fn from_ssz_bytes(bytes: &[u8], spec: &ChainSpec) -> Result { + // Slot is after genesis_time (u64) and genesis_validators_root (Hash256). + let slot_start = ::ssz_fixed_len() + ::ssz_fixed_len(); + let slot_end = slot_start + ::ssz_fixed_len(); + + let slot_bytes = bytes + .get(slot_start..slot_end) + .ok_or(DecodeError::InvalidByteLength { + len: bytes.len(), + expected: slot_end, + })?; + + let slot = Slot::from_ssz_bytes(slot_bytes)?; + let fork_at_slot = spec.fork_name_at_slot::(slot); + + Ok(map_fork_name!( + fork_at_slot, + Self, + <_>::from_ssz_bytes(bytes)? + )) + } +} + impl From for Error { fn from(e: RelativeEpochError) -> Error { Error::RelativeEpochError(e) diff --git a/consensus/types/src/beacon_state/compact_state.rs b/consensus/types/src/beacon_state/compact_state.rs new file mode 100644 index 0000000000..bac3a25925 --- /dev/null +++ b/consensus/types/src/beacon_state/compact_state.rs @@ -0,0 +1,207 @@ +use crate::{ + BeaconState, BeaconStateAltair, BeaconStateBase, BeaconStateError as Error, BeaconStateMerge, + EthSpec, VList, Validator, ValidatorImmutable, ValidatorMutable, +}; +use itertools::process_results; +use std::sync::Arc; + +pub type CompactBeaconState = BeaconState; + +/// Implement the conversion function from BeaconState -> CompactBeaconState. +macro_rules! full_to_compact { + ($s:ident, $outer:ident, $variant_name:ident, $struct_name:ident, [$($extra_fields:ident),*]) => { + BeaconState::$variant_name($struct_name { + // Versioning + genesis_time: $s.genesis_time, + genesis_validators_root: $s.genesis_validators_root, + slot: $s.slot, + fork: $s.fork, + + // History + latest_block_header: $s.latest_block_header.clone(), + block_roots: $s.block_roots.clone(), + state_roots: $s.state_roots.clone(), + historical_roots: $s.historical_roots.clone(), + + // Eth1 + eth1_data: $s.eth1_data.clone(), + eth1_data_votes: $s.eth1_data_votes.clone(), + eth1_deposit_index: $s.eth1_deposit_index, + + // Validator registry + validators: VList::try_from_iter( + $s.validators.into_iter().map(|validator| validator.mutable.clone()) + ).expect("fix this"), + balances: $s.balances.clone(), + + // Shuffling + randao_mixes: $s.randao_mixes.clone(), + + // Slashings + slashings: $s.slashings.clone(), + + // Finality + justification_bits: $s.justification_bits.clone(), + previous_justified_checkpoint: $s.previous_justified_checkpoint, + current_justified_checkpoint: $s.current_justified_checkpoint, + finalized_checkpoint: $s.finalized_checkpoint, + + // Caches. + total_active_balance: $s.total_active_balance.clone(), + committee_caches: $s.committee_caches.clone(), + pubkey_cache: $s.pubkey_cache.clone(), + exit_cache: $s.exit_cache.clone(), + + // Variant-specific fields + $( + $extra_fields: $s.$extra_fields.clone() + ),* + }) + } +} + +/// Implement the conversion from CompactBeaconState -> BeaconState. +macro_rules! compact_to_full { + ($inner:ident, $variant_name:ident, $struct_name:ident, $immutable_validators:ident, [$($extra_fields:ident),*]) => { + BeaconState::$variant_name($struct_name { + // Versioning + genesis_time: $inner.genesis_time, + genesis_validators_root: $inner.genesis_validators_root, + slot: $inner.slot, + fork: $inner.fork, + + // History + latest_block_header: $inner.latest_block_header, + block_roots: $inner.block_roots, + state_roots: $inner.state_roots, + historical_roots: $inner.historical_roots, + + // Eth1 + eth1_data: $inner.eth1_data, + eth1_data_votes: $inner.eth1_data_votes, + eth1_deposit_index: $inner.eth1_deposit_index, + + // Validator registry + validators: process_results($inner.validators.into_iter().enumerate().map(|(i, mutable)| { + $immutable_validators(i) + .ok_or(Error::MissingImmutableValidator(i)) + .map(move |immutable| { + Validator { + immutable, + mutable: mutable.clone(), + } + }) + }), |iter| VList::try_from_iter(iter))??, + balances: $inner.balances, + + // Shuffling + randao_mixes: $inner.randao_mixes, + + // Slashings + slashings: $inner.slashings, + + // Finality + justification_bits: $inner.justification_bits, + previous_justified_checkpoint: $inner.previous_justified_checkpoint, + current_justified_checkpoint: $inner.current_justified_checkpoint, + finalized_checkpoint: $inner.finalized_checkpoint, + + // Caching + total_active_balance: $inner.total_active_balance, + committee_caches: $inner.committee_caches, + pubkey_cache: $inner.pubkey_cache, + exit_cache: $inner.exit_cache, + + // Variant-specific fields + $( + $extra_fields: $inner.$extra_fields + ),* + }) + } +} + +impl BeaconState { + pub fn into_compact_state(self) -> CompactBeaconState { + match self { + BeaconState::Base(s) => full_to_compact!( + s, + self, + Base, + BeaconStateBase, + [previous_epoch_attestations, current_epoch_attestations] + ), + BeaconState::Altair(s) => full_to_compact!( + s, + self, + Altair, + BeaconStateAltair, + [ + previous_epoch_participation, + current_epoch_participation, + current_sync_committee, + next_sync_committee, + inactivity_scores + ] + ), + BeaconState::Merge(s) => full_to_compact!( + s, + self, + Merge, + BeaconStateMerge, + [ + previous_epoch_participation, + current_epoch_participation, + current_sync_committee, + next_sync_committee, + inactivity_scores, + latest_execution_payload_header + ] + ), + } + } +} + +impl CompactBeaconState { + pub fn try_into_full_state(self, immutable_validators: F) -> Result, Error> + where + F: Fn(usize) -> Option>, + { + let state = match self { + BeaconState::Base(inner) => compact_to_full!( + inner, + Base, + BeaconStateBase, + immutable_validators, + [previous_epoch_attestations, current_epoch_attestations] + ), + BeaconState::Altair(inner) => compact_to_full!( + inner, + Altair, + BeaconStateAltair, + immutable_validators, + [ + previous_epoch_participation, + current_epoch_participation, + current_sync_committee, + next_sync_committee, + inactivity_scores + ] + ), + BeaconState::Merge(inner) => compact_to_full!( + inner, + Merge, + BeaconStateMerge, + immutable_validators, + [ + previous_epoch_participation, + current_epoch_participation, + current_sync_committee, + next_sync_committee, + inactivity_scores, + latest_execution_payload_header + ] + ), + }; + Ok(state) + } +} diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 52983d4f8c..1487cb57bb 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -106,7 +106,7 @@ pub use crate::beacon_block_body::{ }; pub use crate::beacon_block_header::BeaconBlockHeader; pub use crate::beacon_committee::{BeaconCommittee, OwnedBeaconCommittee}; -pub use crate::beacon_state::{Error as BeaconStateError, *}; +pub use crate::beacon_state::{compact_state::CompactBeaconState, Error as BeaconStateError, *}; pub use crate::chain_spec::{ChainSpec, Config, Domain}; pub use crate::checkpoint::Checkpoint; pub use crate::config_and_preset::{ diff --git a/consensus/types/src/validator.rs b/consensus/types/src/validator.rs index 5159158875..35747fa414 100644 --- a/consensus/types/src/validator.rs +++ b/consensus/types/src/validator.rs @@ -40,6 +40,24 @@ pub struct ValidatorImmutable { pub withdrawal_credentials: Hash256, } +pub trait ValidatorTrait: + std::fmt::Debug + + PartialEq + + Clone + + serde::Serialize + + Send + + Sync + + serde::de::DeserializeOwned + + ssz::Encode + + ssz::Decode + + TreeHash + + TestRandom +{ +} + +impl ValidatorTrait for Validator {} +impl ValidatorTrait for ValidatorMutable {} + impl Validator { pub fn pubkey(&self) -> &PublicKeyBytes { &self.immutable.pubkey