Omit pubkeys from hot states

This commit is contained in:
Michael Sproul
2022-09-30 10:34:36 +10:00
parent 14135cf9be
commit a6318732cf
6 changed files with 309 additions and 67 deletions

View File

@@ -846,7 +846,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
state: &BeaconState<E>,
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
store_full_state(state_root, state, ops)
store_full_state(state_root, state, ops, &self.config)
}
/// Get a post-finalization state from the database or store.
@@ -1056,8 +1056,16 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
&self,
state_root: &Hash256,
) -> Result<(BeaconState<E>, Hash256), Error> {
let mut state = get_full_state(&self.hot_db, state_root, &self.spec)?
.ok_or(HotColdDBError::MissingEpochBoundaryState(*state_root))?;
let pubkey_cache = self.immutable_validators.read();
let immutable_validators = |i: usize| pubkey_cache.get_validator(i);
let mut state = get_full_state(
&self.hot_db,
state_root,
immutable_validators,
&self.config,
&self.spec,
)?
.ok_or(HotColdDBError::MissingEpochBoundaryState(*state_root))?;
// Do a tree hash here so that the cache is fully built.
state.update_tree_hash_cache()?;

View File

@@ -1,63 +1,83 @@
use crate::*;
use ssz::{DecodeError, Encode};
use ssz_derive::Encode;
use std::convert::TryInto;
use std::io::{Read, Write};
use std::sync::Arc;
use types::beacon_state::{CommitteeCache, CACHED_EPOCHS};
use types::{CompactBeaconState, ValidatorImmutable};
use zstd::{Decoder, Encoder};
pub fn store_full_state<E: EthSpec>(
state_root: &Hash256,
state: &BeaconState<E>,
ops: &mut Vec<KeyValueStoreOp>,
config: &StoreConfig,
) -> Result<(), Error> {
let bytes = {
let _overhead_timer = metrics::start_timer(&metrics::BEACON_STATE_WRITE_OVERHEAD_TIMES);
StorageContainer::new(state).as_ssz_bytes()
};
metrics::inc_counter_by(&metrics::BEACON_STATE_WRITE_BYTES, bytes.len() as u64);
let mut compressed_value = Vec::with_capacity(config.estimate_compressed_size(bytes.len()));
let mut encoder = Encoder::new(&mut compressed_value, config.compression_level)
.map_err(Error::Compression)?;
encoder.write_all(&bytes).map_err(Error::Compression)?;
encoder.finish().map_err(Error::Compression)?;
metrics::inc_counter_by(
&metrics::BEACON_STATE_WRITE_BYTES,
compressed_value.len() as u64,
);
metrics::inc_counter(&metrics::BEACON_STATE_WRITE_COUNT);
let key = get_key_for_col(DBColumn::BeaconState.into(), state_root.as_bytes());
ops.push(KeyValueStoreOp::PutKeyValue(key, bytes));
ops.push(KeyValueStoreOp::PutKeyValue(key, compressed_value));
Ok(())
}
pub fn get_full_state<KV: KeyValueStore<E>, E: EthSpec>(
pub fn get_full_state<KV: KeyValueStore<E>, E: EthSpec, F>(
db: &KV,
state_root: &Hash256,
immutable_validators: F,
config: &StoreConfig,
spec: &ChainSpec,
) -> Result<Option<BeaconState<E>>, Error> {
) -> Result<Option<BeaconState<E>>, Error>
where
F: Fn(usize) -> Option<Arc<ValidatorImmutable>>,
{
let total_timer = metrics::start_timer(&metrics::BEACON_STATE_READ_TIMES);
match db.get_bytes(DBColumn::BeaconState.into(), state_root.as_bytes())? {
Some(bytes) => {
let mut ssz_bytes = Vec::with_capacity(config.estimate_decompressed_size(bytes.len()));
let mut decoder = Decoder::new(&*bytes).map_err(Error::Compression)?;
decoder
.read_to_end(&mut ssz_bytes)
.map_err(Error::Compression)?;
let overhead_timer = metrics::start_timer(&metrics::BEACON_STATE_READ_OVERHEAD_TIMES);
let container = StorageContainer::from_ssz_bytes(&bytes, spec)?;
let container = StorageContainer::from_ssz_bytes(&ssz_bytes, spec)?;
metrics::stop_timer(overhead_timer);
metrics::stop_timer(total_timer);
metrics::inc_counter(&metrics::BEACON_STATE_READ_COUNT);
metrics::inc_counter_by(&metrics::BEACON_STATE_READ_BYTES, bytes.len() as u64);
Ok(Some(container.try_into()?))
Ok(Some(container.into_beacon_state(immutable_validators)?))
}
None => Ok(None),
}
}
/// A container for storing `BeaconState` components.
// TODO: would be more space efficient with the caches stored separately and referenced by hash
#[derive(Encode)]
pub struct StorageContainer<T: EthSpec> {
state: BeaconState<T>,
committee_caches: Vec<Arc<CommitteeCache>>,
state: CompactBeaconState<T>,
}
impl<T: EthSpec> StorageContainer<T> {
/// Create a new instance for storing a `BeaconState`.
pub fn new(state: &BeaconState<T>) -> Self {
Self {
state: state.clone(),
committee_caches: state.committee_caches().to_vec(),
state: state.clone().into_compact_state(),
}
}
@@ -67,36 +87,20 @@ impl<T: EthSpec> StorageContainer<T> {
let mut builder = ssz::SszDecoderBuilder::new(bytes);
builder.register_anonymous_variable_length_item()?;
builder.register_type::<Vec<CommitteeCache>>()?;
let mut decoder = builder.build()?;
let state = decoder.decode_next_with(|bytes| BeaconState::from_ssz_bytes(bytes, spec))?;
let committee_caches = decoder.decode_next()?;
let state =
decoder.decode_next_with(|bytes| CompactBeaconState::from_ssz_bytes(bytes, spec))?;
Ok(Self {
state,
committee_caches,
})
Ok(Self { state })
}
}
impl<T: EthSpec> TryInto<BeaconState<T>> for StorageContainer<T> {
type Error = Error;
fn try_into(mut self) -> Result<BeaconState<T>, Error> {
let mut state = self.state;
for i in (0..CACHED_EPOCHS).rev() {
if i >= self.committee_caches.len() {
return Err(Error::SszDecodeError(DecodeError::BytesInvalid(
"Insufficient committees for BeaconState".to_string(),
)));
};
state.committee_caches_mut()[i] = self.committee_caches.remove(i);
}
fn into_beacon_state<F>(self, immutable_validators: F) -> Result<BeaconState<T>, Error>
where
F: Fn(usize) -> Option<Arc<ValidatorImmutable>>,
{
let state = self.state.try_into_full_state(immutable_validators)?;
Ok(state)
}
}