From ea599a6d7f697cffb556f40fdeca620ee170d871 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Wed, 28 Sep 2022 15:08:30 +1000 Subject: [PATCH] Repurpose the pubkey cache for validator de-dupe --- beacon_node/beacon_chain/src/beacon_chain.rs | 33 ++--- .../beacon_chain/src/block_verification.rs | 8 +- beacon_node/beacon_chain/src/builder.rs | 20 ++- beacon_node/beacon_chain/src/errors.rs | 3 - beacon_node/beacon_chain/src/lib.rs | 12 +- beacon_node/store/Cargo.toml | 1 + beacon_node/store/src/errors.rs | 5 + beacon_node/store/src/hot_cold_store.rs | 32 ++++- beacon_node/store/src/lib.rs | 2 + beacon_node/store/src/partial_beacon_state.rs | 56 ++++++-- .../src/validator_pubkey_cache.rs | 122 +++++++++++------- consensus/types/src/validator.rs | 3 +- 12 files changed, 184 insertions(+), 113 deletions(-) rename beacon_node/{beacon_chain => store}/src/validator_pubkey_cache.rs (71%) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e8b7e421e7..effba21768 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -359,7 +359,7 @@ pub struct BeaconChain { /// Caches the beacon block proposer shuffling for a given epoch and shuffling key root. pub beacon_proposer_cache: Mutex, /// Caches a map of `validator_index -> validator_pubkey`. - pub(crate) validator_pubkey_cache: TimeoutRwLock>, + pub(crate) validator_pubkey_cache: Arc>>, /// A cache used when producing attestations. pub(crate) attester_cache: Arc, /// A cache used when producing attestations whilst the head block is still being imported. @@ -1170,10 +1170,7 @@ impl BeaconChain { /// /// May return an error if acquiring a read-lock on the `validator_pubkey_cache` times out. pub fn validator_index(&self, pubkey: &PublicKeyBytes) -> Result, Error> { - let pubkey_cache = self - .validator_pubkey_cache - .try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT) - .ok_or(Error::ValidatorPubkeyCacheLockTimeout)?; + let pubkey_cache = self.validator_pubkey_cache.read(); Ok(pubkey_cache.get_index(pubkey)) } @@ -1186,10 +1183,7 @@ impl BeaconChain { &self, validator_pubkeys: impl Iterator, ) -> Result, Error> { - let pubkey_cache = self - .validator_pubkey_cache - .try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT) - .ok_or(Error::ValidatorPubkeyCacheLockTimeout)?; + let pubkey_cache = self.validator_pubkey_cache.read(); validator_pubkeys .map(|pubkey| { @@ -1214,10 +1208,7 @@ impl BeaconChain { /// /// May return an error if acquiring a read-lock on the `validator_pubkey_cache` times out. pub fn validator_pubkey(&self, validator_index: usize) -> Result, Error> { - let pubkey_cache = self - .validator_pubkey_cache - .try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT) - .ok_or(Error::ValidatorPubkeyCacheLockTimeout)?; + let pubkey_cache = self.validator_pubkey_cache.read(); Ok(pubkey_cache.get(validator_index).cloned()) } @@ -1227,11 +1218,7 @@ impl BeaconChain { &self, validator_index: usize, ) -> Result, Error> { - let pubkey_cache = self - .validator_pubkey_cache - .try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT) - .ok_or(Error::ValidatorPubkeyCacheLockTimeout)?; - + let pubkey_cache = self.validator_pubkey_cache.read(); Ok(pubkey_cache.get_pubkey_bytes(validator_index).copied()) } @@ -1244,10 +1231,7 @@ impl BeaconChain { &self, validator_indices: &[usize], ) -> Result, Error> { - let pubkey_cache = self - .validator_pubkey_cache - .try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT) - .ok_or(Error::ValidatorPubkeyCacheLockTimeout)?; + let pubkey_cache = self.validator_pubkey_cache.read(); let mut map = HashMap::with_capacity(validator_indices.len()); for &validator_index in validator_indices { @@ -2612,9 +2596,8 @@ impl BeaconChain { // used by attestation processing which will only process an attestation if the block is // known to fork choice. This ordering ensure that the pubkey cache is always up-to-date. self.validator_pubkey_cache - .try_write_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT) - .ok_or(Error::ValidatorPubkeyCacheLockTimeout)? - .import_new_pubkeys(&state)?; + .write() + .import_new_pubkeys(&state, &self.store)?; // For the current and next epoch of this state, ensure we have the shuffling from this // block in our cache. diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 5237e598d9..791a06a20a 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -545,7 +545,7 @@ pub fn signature_verify_chain_segment( )?; let pubkey_cache = get_validator_pubkey_cache(chain)?; - let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec); + let mut signature_verifier = get_signature_verifier::(&state, &pubkey_cache, &chain.spec); for (block_root, block) in &chain_segment { signature_verifier.include_all_signatures(block, Some(*block_root), true)?; @@ -936,7 +936,8 @@ impl SignatureVerifiedBlock { let pubkey_cache = get_validator_pubkey_cache(chain)?; - let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec); + let mut signature_verifier = + get_signature_verifier::(&state, &pubkey_cache, &chain.spec); signature_verifier.include_all_signatures(&block, Some(block_root), true)?; @@ -985,7 +986,8 @@ impl SignatureVerifiedBlock { let pubkey_cache = get_validator_pubkey_cache(chain)?; - let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec); + let mut signature_verifier = + get_signature_verifier::(&state, &pubkey_cache, &chain.spec); signature_verifier.include_all_signatures_except_proposal(&block)?; diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 37e3c35293..c08ff84119 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -8,7 +8,6 @@ use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::shuffling_cache::ShufflingCache; use crate::timeout_rw_lock::TimeoutRwLock; use crate::validator_monitor::ValidatorMonitor; -use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::ChainConfig; use crate::{ BeaconChain, BeaconChainTypes, BeaconForkChoiceStore, BeaconSnapshot, Eth1Chain, @@ -81,7 +80,6 @@ pub struct BeaconChainBuilder { slot_clock: Option, shutdown_sender: Option>, head_tracker: Option, - validator_pubkey_cache: Option>, spec: ChainSpec, chain_config: ChainConfig, log: Option, @@ -122,7 +120,6 @@ where slot_clock: None, shutdown_sender: None, head_tracker: None, - validator_pubkey_cache: None, spec: TEthSpec::default_spec(), chain_config: ChainConfig::default(), log: None, @@ -280,16 +277,12 @@ where .unwrap_or_else(OperationPool::new), ); - let pubkey_cache = ValidatorPubkeyCache::load_from_store(store) - .map_err(|e| format!("Unable to open persisted pubkey cache: {:?}", e))?; - self.genesis_block_root = Some(chain.genesis_block_root); self.genesis_state_root = Some(genesis_block.state_root()); self.head_tracker = Some( HeadTracker::from_ssz_container(&chain.ssz_head_tracker) .map_err(|e| format!("Failed to decode head tracker for database: {:?}", e))?, ); - self.validator_pubkey_cache = Some(pubkey_cache); self.fork_choice = Some(fork_choice); Ok(self) @@ -713,10 +706,13 @@ where )); } - let validator_pubkey_cache = self.validator_pubkey_cache.map(Ok).unwrap_or_else(|| { - ValidatorPubkeyCache::new(&head_snapshot.beacon_state, store.clone()) - .map_err(|e| format!("Unable to init validator pubkey cache: {:?}", e)) - })?; + let validator_pubkey_cache = store.immutable_validators.clone(); + + // Update pubkey cache on first start in case we have started from genesis. + validator_pubkey_cache + .write() + .import_new_pubkeys(&head_snapshot.beacon_state, &store) + .map_err(|e| format!("error initializing pubkey cache: {e:?}"))?; let migrator_config = self.store_migrator_config.unwrap_or_default(); let store_migrator = BackgroundMigrator::new( @@ -816,7 +812,7 @@ where beacon_proposer_cache: <_>::default(), block_times_cache: <_>::default(), pre_finalization_block_cache: <_>::default(), - validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache), + validator_pubkey_cache, attester_cache: <_>::default(), early_attester_cache: <_>::default(), shutdown_sender: self diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 7a23ff675d..c0f8c62743 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -85,13 +85,10 @@ pub enum BeaconChainError { ValidatorPubkeyCacheLockTimeout, SnapshotCacheLockTimeout, IncorrectStateForAttestation(RelativeEpochError), - InvalidValidatorPubkeyBytes(bls::Error), ValidatorPubkeyCacheIncomplete(usize), SignatureSetError(SignatureSetError), BlockSignatureVerifierError(state_processing::block_signature_verifier::Error), BlockReplayError(BlockReplayError), - DuplicateValidatorPublicKey, - ValidatorPubkeyCacheError(String), ValidatorIndexUnknown(usize), ValidatorPubkeyUnknown(PublicKeyBytes), OpPoolError(OpPoolError), diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 06036c93b4..8c59ce2334 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -40,7 +40,6 @@ pub mod sync_committee_verification; pub mod test_utils; mod timeout_rw_lock; pub mod validator_monitor; -pub mod validator_pubkey_cache; pub use self::beacon_chain::{ AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult, @@ -70,3 +69,14 @@ pub use state_processing::per_block_processing::errors::{ pub use store; pub use timeout_rw_lock::TimeoutRwLock; pub use types; + +// FIXME(sproul): compatibility shim +pub mod validator_pubkey_cache { + use crate::BeaconChainTypes; + + pub type ValidatorPubkeyCache = store::ValidatorPubkeyCache< + ::EthSpec, + ::HotStore, + ::ColdStore, + >; +} diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index f3bef5c027..dbafb61bd0 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -30,3 +30,4 @@ tree_hash = "0.4.0" take-until = "0.1.0" zstd = "0.10.0" strum = { version = "0.24.0", features = ["derive"] } +bls = { path = "../../crypto/bls" } diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index fd0b2ebb3c..54f685c184 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -59,7 +59,12 @@ pub enum Error { AddPayloadLogicError, ResyncRequiredForExecutionPayloadSeparation, SlotClockUnavailableForMigration, + MissingImmutableValidator(usize), V9MigrationFailure(Hash256), + ValidatorPubkeyCacheError(String), + DuplicateValidatorPublicKey, + InvalidValidatorPubkeyBytes(bls::Error), + ValidatorPubkeyCacheUninitialized, } pub trait HandleUnavailable { diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index fdebcd58e1..d7619e6671 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -20,7 +20,7 @@ use crate::metrics; use crate::state_cache::{PutStateOutcome, StateCache}; use crate::{ get_key_for_col, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp, - PartialBeaconState, StoreItem, StoreOp, + PartialBeaconState, StoreItem, StoreOp, ValidatorPubkeyCache, }; use itertools::process_results; use leveldb::iterator::LevelDBIterator; @@ -36,13 +36,14 @@ use state_processing::{ block_replayer::PreSlotHook, BlockProcessingError, BlockReplayer, SlotProcessingError, }; use std::cmp::min; -use std::convert::TryInto; +use std::io::Read; use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; use std::time::Duration; use types::*; use types::{beacon_state::BeaconStateDiff, EthSpec}; +use zstd::Decoder; pub const MAX_PARENT_STATES_TO_CACHE: u64 = 32; @@ -70,6 +71,8 @@ pub struct HotColdDB, Cold: ItemStore> { block_cache: Mutex>>, /// Cache of beacon states. state_cache: Mutex>, + /// Immutable validator cache. + pub immutable_validators: Arc>>, /// Chain spec. pub(crate) spec: ChainSpec, /// Logger. @@ -141,6 +144,7 @@ impl HotColdDB, MemoryStore> { hot_db: MemoryStore::open(), block_cache: Mutex::new(LruCache::new(config.block_cache_size)), state_cache: Mutex::new(StateCache::new(config.state_cache_size)), + immutable_validators: Arc::new(RwLock::new(Default::default())), config, spec, log, @@ -176,6 +180,7 @@ impl HotColdDB, LevelDB> { hot_db: LevelDB::open(hot_path)?, block_cache: Mutex::new(LruCache::new(config.block_cache_size)), state_cache: Mutex::new(StateCache::new(config.state_cache_size)), + immutable_validators: Arc::new(RwLock::new(Default::default())), config, spec, log, @@ -217,6 +222,11 @@ impl HotColdDB, LevelDB> { ); } + // Load validator pubkey cache. + // FIXME(sproul): probably breaks migrations, etc + let pubkey_cache = ValidatorPubkeyCache::load_from_store(&db)?; + *db.immutable_validators.write() = pubkey_cache; + // Ensure that the schema version of the on-disk database matches the software. // If the version is mismatched, an automatic migration will be attempted. let db = Arc::new(db); @@ -1100,7 +1110,7 @@ impl, Cold: ItemStore> HotColdDB // 1. Convert to PartialBeaconState and store that in the DB. let partial_state = PartialBeaconState::from_state_forgetful(state); - let op = partial_state.as_kv_store_op(*state_root); + let op = partial_state.as_kv_store_op(*state_root, &self.config)?; ops.push(op); // 2. Store updated vector entries. @@ -1151,12 +1161,19 @@ impl, Cold: ItemStore> HotColdDB /// Load a restore point state by its `state_root`. fn load_restore_point(&self, state_root: &Hash256) -> Result, Error> { - let partial_state_bytes = self + let bytes = self .cold_db .get_bytes(DBColumn::BeaconState.into(), state_root.as_bytes())? .ok_or(HotColdDBError::MissingRestorePoint(*state_root))?; + + let mut ssz_bytes = Vec::with_capacity(self.config.estimate_decompressed_size(bytes.len())); + let mut decoder = Decoder::new(&*bytes).map_err(Error::Compression)?; + decoder + .read_to_end(&mut ssz_bytes) + .map_err(Error::Compression)?; + let mut partial_state: PartialBeaconState = - PartialBeaconState::from_ssz_bytes(&partial_state_bytes, &self.spec)?; + PartialBeaconState::from_ssz_bytes(&ssz_bytes, &self.spec)?; // Fill in the fields of the partial state. partial_state.load_block_roots(&self.cold_db, &self.spec)?; @@ -1164,7 +1181,10 @@ impl, Cold: ItemStore> HotColdDB partial_state.load_historical_roots(&self.cold_db, &self.spec)?; partial_state.load_randao_mixes(&self.cold_db, &self.spec)?; - partial_state.try_into() + let pubkey_cache = self.immutable_validators.read(); + let immutable_validators = |i: usize| pubkey_cache.get_validator(i); + + partial_state.try_into_full_state(immutable_validators) } /// Load a restore point state by its `restore_point_index`. diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 0ddbf1abb7..5e6a40135f 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -28,6 +28,7 @@ mod partial_beacon_state; pub mod reconstruct; mod state_cache; mod state_diff; +pub mod validator_pubkey_cache; pub mod iter; @@ -45,6 +46,7 @@ use parking_lot::MutexGuard; use std::sync::Arc; use strum::{EnumString, IntoStaticStr}; pub use types::*; +pub use validator_pubkey_cache::ValidatorPubkeyCache; pub type ColumnIter<'a> = Box), Error>> + 'a>; pub type ColumnKeyIter<'a> = Box> + 'a>; diff --git a/beacon_node/store/src/partial_beacon_state.rs b/beacon_node/store/src/partial_beacon_state.rs index 43fe75a5a1..6f423b2905 100644 --- a/beacon_node/store/src/partial_beacon_state.rs +++ b/beacon_node/store/src/partial_beacon_state.rs @@ -2,13 +2,15 @@ use crate::chunked_vector::{ load_variable_list_from_db, load_vector_from_db, BlockRoots, HistoricalRoots, RandaoMixes, StateRoots, }; -use crate::{get_key_for_col, DBColumn, Error, KeyValueStore, KeyValueStoreOp}; +use crate::{get_key_for_col, DBColumn, Error, KeyValueStore, KeyValueStoreOp, StoreConfig}; +use itertools::process_results; use ssz::{Decode, DecodeError, Encode}; use ssz_derive::{Decode, Encode}; -use std::convert::TryInto; +use std::io::Write; use std::sync::Arc; use types::superstruct; use types::*; +use zstd::Encoder; /// Lightweight variant of the `BeaconState` that is stored in the database. /// @@ -47,7 +49,7 @@ where pub eth1_deposit_index: u64, // Registry - pub validators: VList, + pub validators: Vec, pub balances: VList, // Shuffling @@ -114,7 +116,10 @@ macro_rules! impl_from_state_forgetful { eth1_deposit_index: $s.eth1_deposit_index, // Validator registry - validators: $s.validators.clone(), + validators: $s.validators.into_iter().map(|validator| { + validator.mutable.clone() + }) + .collect(), balances: $s.balances.clone(), // Shuffling @@ -204,9 +209,23 @@ impl PartialBeaconState { } /// Prepare the partial state for storage in the KV database. - pub fn as_kv_store_op(&self, state_root: Hash256) -> KeyValueStoreOp { + pub fn as_kv_store_op( + &self, + state_root: Hash256, + config: &StoreConfig, + ) -> Result { let db_key = get_key_for_col(DBColumn::BeaconState.into(), state_root.as_bytes()); - KeyValueStoreOp::PutKeyValue(db_key, self.as_ssz_bytes()) + + let ssz_bytes = self.as_ssz_bytes(); + + let mut compressed_value = + Vec::with_capacity(config.estimate_compressed_size(ssz_bytes.len())); + let mut encoder = Encoder::new(&mut compressed_value, config.compression_level) + .map_err(Error::Compression)?; + encoder.write_all(&ssz_bytes).map_err(Error::Compression)?; + encoder.finish().map_err(Error::Compression)?; + + Ok(KeyValueStoreOp::PutKeyValue(db_key, compressed_value)) } pub fn load_block_roots>( @@ -278,7 +297,7 @@ impl PartialBeaconState { /// Implement the conversion from PartialBeaconState -> BeaconState. macro_rules! impl_try_into_beacon_state { - ($inner:ident, $variant_name:ident, $struct_name:ident, [$($extra_fields:ident),*]) => { + ($inner:ident, $variant_name:ident, $struct_name:ident, $immutable_validators:ident, [$($extra_fields:ident),*]) => { BeaconState::$variant_name($struct_name { // Versioning genesis_time: $inner.genesis_time, @@ -298,7 +317,16 @@ macro_rules! impl_try_into_beacon_state { eth1_deposit_index: $inner.eth1_deposit_index, // Validator registry - validators: $inner.validators, + validators: process_results($inner.validators.into_iter().enumerate().map(|(i, mutable)| { + $immutable_validators(i) + .ok_or(Error::MissingImmutableValidator(i)) + .map(move |immutable| { + Validator { + immutable, + mutable + } + }) + }), |iter| VList::try_from_iter(iter))??, balances: $inner.balances, // Shuffling @@ -331,21 +359,24 @@ fn unpack_field(x: Option) -> Result { x.ok_or(Error::PartialBeaconStateError) } -impl TryInto> for PartialBeaconState { - type Error = Error; - - fn try_into(self) -> Result, Error> { +impl PartialBeaconState { + pub fn try_into_full_state(self, immutable_validators: F) -> Result, Error> + where + F: Fn(usize) -> Option>, + { let state = match self { PartialBeaconState::Base(inner) => impl_try_into_beacon_state!( inner, Base, BeaconStateBase, + immutable_validators, [previous_epoch_attestations, current_epoch_attestations] ), PartialBeaconState::Altair(inner) => impl_try_into_beacon_state!( inner, Altair, BeaconStateAltair, + immutable_validators, [ previous_epoch_participation, current_epoch_participation, @@ -358,6 +389,7 @@ impl TryInto> for PartialBeaconState { inner, Merge, BeaconStateMerge, + immutable_validators, [ previous_epoch_participation, current_epoch_participation, diff --git a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs b/beacon_node/store/src/validator_pubkey_cache.rs similarity index 71% rename from beacon_node/beacon_chain/src/validator_pubkey_cache.rs rename to beacon_node/store/src/validator_pubkey_cache.rs index a72168c5f0..4f130a91ab 100644 --- a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs +++ b/beacon_node/store/src/validator_pubkey_cache.rs @@ -1,10 +1,10 @@ -use crate::errors::BeaconChainError; -use crate::{BeaconChainTypes, BeaconStore}; +use crate::{DBColumn, Error, HotColdDB, ItemStore, StoreItem}; use ssz::{Decode, Encode}; use std::collections::HashMap; use std::convert::TryInto; -use store::{DBColumn, Error as StoreError, StoreItem}; -use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes}; +use std::marker::PhantomData; +use std::sync::Arc; +use types::{BeaconState, EthSpec, Hash256, PublicKey, PublicKeyBytes, ValidatorImmutable}; /// Provides a mapping of `validator_index -> validator_publickey`. /// @@ -17,49 +17,63 @@ use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes}; /// /// The cache has a `backing` that it uses to maintain a persistent, on-disk /// copy of itself. This allows it to be restored between process invocations. -pub struct ValidatorPubkeyCache { +#[derive(Debug)] +pub struct ValidatorPubkeyCache, Cold: ItemStore> { pubkeys: Vec, indices: HashMap, - pubkey_bytes: Vec, - store: BeaconStore, + validators: Vec>, + _phantom: PhantomData<(E, Hot, Cold)>, } -impl ValidatorPubkeyCache { +// Temp value. +impl, Cold: ItemStore> Default + for ValidatorPubkeyCache +{ + fn default() -> Self { + ValidatorPubkeyCache { + pubkeys: vec![], + indices: HashMap::new(), + validators: vec![], + _phantom: PhantomData, + } + } +} + +impl, Cold: ItemStore> ValidatorPubkeyCache { /// Create a new public key cache using the keys in `state.validators`. /// /// Also creates a new persistence file, returning an error if there is already a file at /// `persistence_path`. - pub fn new( - state: &BeaconState, - store: BeaconStore, - ) -> Result { + pub fn new(state: &BeaconState, store: &HotColdDB) -> Result { let mut cache = Self { pubkeys: vec![], indices: HashMap::new(), - pubkey_bytes: vec![], - store, + validators: vec![], + _phantom: PhantomData, }; - cache.import_new_pubkeys(state)?; + cache.import_new_pubkeys(state, store)?; Ok(cache) } /// Load the pubkey cache from the given on-disk database. - pub fn load_from_store(store: BeaconStore) -> Result { + pub fn load_from_store(store: &HotColdDB) -> Result { let mut pubkeys = vec![]; let mut indices = HashMap::new(); - let mut pubkey_bytes = vec![]; + let mut validators = vec![]; for validator_index in 0.. { - if let Some(DatabasePubkey(pubkey)) = - store.get_item(&DatabasePubkey::key_for_index(validator_index))? + if let Some(DatabaseValidator(validator)) = + store.get_item(&DatabaseValidator::key_for_index(validator_index))? { - pubkeys.push((&pubkey).try_into().map_err(|e| { - BeaconChainError::ValidatorPubkeyCacheError(format!("{:?}", e)) - })?); - pubkey_bytes.push(pubkey); - indices.insert(pubkey, validator_index); + pubkeys.push( + (&validator.pubkey) + .try_into() + .map_err(|e| Error::ValidatorPubkeyCacheError(format!("{:?}", e)))?, + ); + indices.insert(validator.pubkey, validator_index); + validators.push(validator); } else { break; } @@ -68,8 +82,8 @@ impl ValidatorPubkeyCache { Ok(ValidatorPubkeyCache { pubkeys, indices, - pubkey_bytes, - store, + validators, + _phantom: PhantomData, }) } @@ -78,15 +92,17 @@ impl ValidatorPubkeyCache { /// Does not delete any keys from `self` if they don't appear in `state`. pub fn import_new_pubkeys( &mut self, - state: &BeaconState, - ) -> Result<(), BeaconChainError> { - if state.validators().len() > self.pubkeys.len() { + state: &BeaconState, + store: &HotColdDB, + ) -> Result<(), Error> { + if state.validators().len() > self.validators.len() { self.import( state .validators() .iter_from(self.pubkeys.len()) .unwrap() // FIXME(sproul) - .map(|v| *v.pubkey()), + .map(|v| v.immutable.clone()), + store, ) } else { Ok(()) @@ -94,19 +110,19 @@ impl ValidatorPubkeyCache { } /// Adds zero or more validators to `self`. - fn import(&mut self, validator_keys: I) -> Result<(), BeaconChainError> + fn import(&mut self, validator_keys: I, store: &HotColdDB) -> Result<(), Error> where - I: Iterator + ExactSizeIterator, + I: Iterator> + ExactSizeIterator, { - self.pubkey_bytes.reserve(validator_keys.len()); + self.validators.reserve(validator_keys.len()); self.pubkeys.reserve(validator_keys.len()); self.indices.reserve(validator_keys.len()); - for pubkey in validator_keys { + for validator in validator_keys { let i = self.pubkeys.len(); - if self.indices.contains_key(&pubkey) { - return Err(BeaconChainError::DuplicateValidatorPublicKey); + if self.indices.contains_key(&validator.pubkey) { + return Err(Error::DuplicateValidatorPublicKey); } // The item is written to disk _before_ it is written into @@ -118,17 +134,18 @@ impl ValidatorPubkeyCache { // The motivation behind this ordering is that we do not want to have states that // reference a pubkey that is not in our cache. However, it's fine to have pubkeys // that are never referenced in a state. - self.store - .put_item(&DatabasePubkey::key_for_index(i), &DatabasePubkey(pubkey))?; + store.put_item( + &DatabaseValidator::key_for_index(i), + &DatabaseValidator(validator.clone()), + )?; self.pubkeys.push( - (&pubkey) + (&validator.pubkey) .try_into() - .map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?, + .map_err(Error::InvalidValidatorPubkeyBytes)?, ); - self.pubkey_bytes.push(pubkey); - - self.indices.insert(pubkey, i); + self.indices.insert(validator.pubkey, i); + self.validators.push(validator); } Ok(()) @@ -139,6 +156,11 @@ impl ValidatorPubkeyCache { self.pubkeys.get(i) } + /// Get the immutable validator with index `i`. + pub fn get_validator(&self, i: usize) -> Option> { + self.validators.get(i).cloned() + } + /// Get the `PublicKey` for a validator with `PublicKeyBytes`. pub fn get_pubkey_from_pubkey_bytes(&self, pubkey: &PublicKeyBytes) -> Option<&PublicKey> { self.get_index(pubkey).and_then(|index| self.get(index)) @@ -146,7 +168,7 @@ impl ValidatorPubkeyCache { /// Get the public key (in bytes form) for a validator with index `i`. pub fn get_pubkey_bytes(&self, i: usize) -> Option<&PublicKeyBytes> { - self.pubkey_bytes.get(i) + self.validators.get(i).map(|validator| &validator.pubkey) } /// Get the index of a validator with `pubkey`. @@ -168,23 +190,23 @@ impl ValidatorPubkeyCache { /// Wrapper for a public key stored in the database. /// /// Keyed by the validator index as `Hash256::from_low_u64_be(index)`. -struct DatabasePubkey(PublicKeyBytes); +struct DatabaseValidator(Arc); -impl StoreItem for DatabasePubkey { +impl StoreItem for DatabaseValidator { fn db_column() -> DBColumn { DBColumn::PubkeyCache } - fn as_store_bytes(&self) -> Result, StoreError> { + fn as_store_bytes(&self) -> Result, Error> { Ok(self.0.as_ssz_bytes()) } - fn from_store_bytes(bytes: &[u8]) -> Result { - Ok(Self(PublicKeyBytes::from_ssz_bytes(bytes)?)) + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(Self(Arc::new(ValidatorImmutable::from_ssz_bytes(bytes)?))) } } -impl DatabasePubkey { +impl DatabaseValidator { fn key_for_index(index: usize) -> Hash256 { Hash256::from_low_u64_be(index as u64) } diff --git a/consensus/types/src/validator.rs b/consensus/types/src/validator.rs index 668c65ede1..5159158875 100644 --- a/consensus/types/src/validator.rs +++ b/consensus/types/src/validator.rs @@ -6,6 +6,7 @@ use ssz_derive::{Decode, Encode}; use std::sync::Arc; use test_random_derive::TestRandom; use tree_hash::TreeHash; +use tree_hash_derive::TreeHash; const NUM_FIELDS: usize = 8; @@ -20,7 +21,7 @@ pub struct Validator { /// The mutable fields of a validator. #[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Encode, Decode, TestRandom)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Encode, Decode, TreeHash, TestRandom)] pub struct ValidatorMutable { #[serde(with = "eth2_serde_utils::quoted_u64")] pub effective_balance: u64,