Repurpose the pubkey cache for validator de-dupe

This commit is contained in:
Michael Sproul
2022-09-28 15:08:30 +10:00
parent 9a1799f235
commit ea599a6d7f
12 changed files with 184 additions and 113 deletions

View File

@@ -359,7 +359,7 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// Caches the beacon block proposer shuffling for a given epoch and shuffling key root.
pub beacon_proposer_cache: Mutex<BeaconProposerCache>,
/// Caches a map of `validator_index -> validator_pubkey`.
pub(crate) validator_pubkey_cache: TimeoutRwLock<ValidatorPubkeyCache<T>>,
pub(crate) validator_pubkey_cache: Arc<RwLock<ValidatorPubkeyCache<T>>>,
/// A cache used when producing attestations.
pub(crate) attester_cache: Arc<AttesterCache>,
/// A cache used when producing attestations whilst the head block is still being imported.
@@ -1170,10 +1170,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// May return an error if acquiring a read-lock on the `validator_pubkey_cache` times out.
pub fn validator_index(&self, pubkey: &PublicKeyBytes) -> Result<Option<usize>, Error> {
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?;
let pubkey_cache = self.validator_pubkey_cache.read();
Ok(pubkey_cache.get_index(pubkey))
}
@@ -1186,10 +1183,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
validator_pubkeys: impl Iterator<Item = &'a PublicKeyBytes>,
) -> Result<Vec<u64>, Error> {
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?;
let pubkey_cache = self.validator_pubkey_cache.read();
validator_pubkeys
.map(|pubkey| {
@@ -1214,10 +1208,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// May return an error if acquiring a read-lock on the `validator_pubkey_cache` times out.
pub fn validator_pubkey(&self, validator_index: usize) -> Result<Option<PublicKey>, Error> {
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?;
let pubkey_cache = self.validator_pubkey_cache.read();
Ok(pubkey_cache.get(validator_index).cloned())
}
@@ -1227,11 +1218,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
validator_index: usize,
) -> Result<Option<PublicKeyBytes>, Error> {
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?;
let pubkey_cache = self.validator_pubkey_cache.read();
Ok(pubkey_cache.get_pubkey_bytes(validator_index).copied())
}
@@ -1244,10 +1231,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
validator_indices: &[usize],
) -> Result<HashMap<usize, PublicKeyBytes>, Error> {
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?;
let pubkey_cache = self.validator_pubkey_cache.read();
let mut map = HashMap::with_capacity(validator_indices.len());
for &validator_index in validator_indices {
@@ -2612,9 +2596,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// used by attestation processing which will only process an attestation if the block is
// known to fork choice. This ordering ensure that the pubkey cache is always up-to-date.
self.validator_pubkey_cache
.try_write_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?
.import_new_pubkeys(&state)?;
.write()
.import_new_pubkeys(&state, &self.store)?;
// For the current and next epoch of this state, ensure we have the shuffling from this
// block in our cache.

View File

@@ -545,7 +545,7 @@ pub fn signature_verify_chain_segment<T: BeaconChainTypes>(
)?;
let pubkey_cache = get_validator_pubkey_cache(chain)?;
let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec);
let mut signature_verifier = get_signature_verifier::<T>(&state, &pubkey_cache, &chain.spec);
for (block_root, block) in &chain_segment {
signature_verifier.include_all_signatures(block, Some(*block_root), true)?;
@@ -936,7 +936,8 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
let pubkey_cache = get_validator_pubkey_cache(chain)?;
let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec);
let mut signature_verifier =
get_signature_verifier::<T>(&state, &pubkey_cache, &chain.spec);
signature_verifier.include_all_signatures(&block, Some(block_root), true)?;
@@ -985,7 +986,8 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
let pubkey_cache = get_validator_pubkey_cache(chain)?;
let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec);
let mut signature_verifier =
get_signature_verifier::<T>(&state, &pubkey_cache, &chain.spec);
signature_verifier.include_all_signatures_except_proposal(&block)?;

View File

@@ -8,7 +8,6 @@ use crate::persisted_beacon_chain::PersistedBeaconChain;
use crate::shuffling_cache::ShufflingCache;
use crate::timeout_rw_lock::TimeoutRwLock;
use crate::validator_monitor::ValidatorMonitor;
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::ChainConfig;
use crate::{
BeaconChain, BeaconChainTypes, BeaconForkChoiceStore, BeaconSnapshot, Eth1Chain,
@@ -81,7 +80,6 @@ pub struct BeaconChainBuilder<T: BeaconChainTypes> {
slot_clock: Option<T::SlotClock>,
shutdown_sender: Option<Sender<ShutdownReason>>,
head_tracker: Option<HeadTracker>,
validator_pubkey_cache: Option<ValidatorPubkeyCache<T>>,
spec: ChainSpec,
chain_config: ChainConfig,
log: Option<Logger>,
@@ -122,7 +120,6 @@ where
slot_clock: None,
shutdown_sender: None,
head_tracker: None,
validator_pubkey_cache: None,
spec: TEthSpec::default_spec(),
chain_config: ChainConfig::default(),
log: None,
@@ -280,16 +277,12 @@ where
.unwrap_or_else(OperationPool::new),
);
let pubkey_cache = ValidatorPubkeyCache::load_from_store(store)
.map_err(|e| format!("Unable to open persisted pubkey cache: {:?}", e))?;
self.genesis_block_root = Some(chain.genesis_block_root);
self.genesis_state_root = Some(genesis_block.state_root());
self.head_tracker = Some(
HeadTracker::from_ssz_container(&chain.ssz_head_tracker)
.map_err(|e| format!("Failed to decode head tracker for database: {:?}", e))?,
);
self.validator_pubkey_cache = Some(pubkey_cache);
self.fork_choice = Some(fork_choice);
Ok(self)
@@ -713,10 +706,13 @@ where
));
}
let validator_pubkey_cache = self.validator_pubkey_cache.map(Ok).unwrap_or_else(|| {
ValidatorPubkeyCache::new(&head_snapshot.beacon_state, store.clone())
.map_err(|e| format!("Unable to init validator pubkey cache: {:?}", e))
})?;
let validator_pubkey_cache = store.immutable_validators.clone();
// Update pubkey cache on first start in case we have started from genesis.
validator_pubkey_cache
.write()
.import_new_pubkeys(&head_snapshot.beacon_state, &store)
.map_err(|e| format!("error initializing pubkey cache: {e:?}"))?;
let migrator_config = self.store_migrator_config.unwrap_or_default();
let store_migrator = BackgroundMigrator::new(
@@ -816,7 +812,7 @@ where
beacon_proposer_cache: <_>::default(),
block_times_cache: <_>::default(),
pre_finalization_block_cache: <_>::default(),
validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache),
validator_pubkey_cache,
attester_cache: <_>::default(),
early_attester_cache: <_>::default(),
shutdown_sender: self

View File

@@ -85,13 +85,10 @@ pub enum BeaconChainError {
ValidatorPubkeyCacheLockTimeout,
SnapshotCacheLockTimeout,
IncorrectStateForAttestation(RelativeEpochError),
InvalidValidatorPubkeyBytes(bls::Error),
ValidatorPubkeyCacheIncomplete(usize),
SignatureSetError(SignatureSetError),
BlockSignatureVerifierError(state_processing::block_signature_verifier::Error),
BlockReplayError(BlockReplayError),
DuplicateValidatorPublicKey,
ValidatorPubkeyCacheError(String),
ValidatorIndexUnknown(usize),
ValidatorPubkeyUnknown(PublicKeyBytes),
OpPoolError(OpPoolError),

View File

@@ -40,7 +40,6 @@ pub mod sync_committee_verification;
pub mod test_utils;
mod timeout_rw_lock;
pub mod validator_monitor;
pub mod validator_pubkey_cache;
pub use self::beacon_chain::{
AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult,
@@ -70,3 +69,14 @@ pub use state_processing::per_block_processing::errors::{
pub use store;
pub use timeout_rw_lock::TimeoutRwLock;
pub use types;
// FIXME(sproul): compatibility shim
pub mod validator_pubkey_cache {
use crate::BeaconChainTypes;
pub type ValidatorPubkeyCache<T> = store::ValidatorPubkeyCache<
<T as BeaconChainTypes>::EthSpec,
<T as BeaconChainTypes>::HotStore,
<T as BeaconChainTypes>::ColdStore,
>;
}

View File

@@ -1,311 +0,0 @@
use crate::errors::BeaconChainError;
use crate::{BeaconChainTypes, BeaconStore};
use ssz::{Decode, Encode};
use std::collections::HashMap;
use std::convert::TryInto;
use store::{DBColumn, Error as StoreError, StoreItem};
use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes};
/// Provides a mapping of `validator_index -> validator_publickey`.
///
/// This cache exists for two reasons:
///
/// 1. To avoid reading a `BeaconState` from disk each time we need a public key.
/// 2. To reduce the amount of public key _decompression_ required. A `BeaconState` stores public
/// keys in compressed form and they are needed in decompressed form for signature verification.
/// Decompression is expensive when many keys are involved.
///
/// The cache has a `backing` that it uses to maintain a persistent, on-disk
/// copy of itself. This allows it to be restored between process invocations.
pub struct ValidatorPubkeyCache<T: BeaconChainTypes> {
pubkeys: Vec<PublicKey>,
indices: HashMap<PublicKeyBytes, usize>,
pubkey_bytes: Vec<PublicKeyBytes>,
store: BeaconStore<T>,
}
impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
/// Create a new public key cache using the keys in `state.validators`.
///
/// Also creates a new persistence file, returning an error if there is already a file at
/// `persistence_path`.
pub fn new(
state: &BeaconState<T::EthSpec>,
store: BeaconStore<T>,
) -> Result<Self, BeaconChainError> {
let mut cache = Self {
pubkeys: vec![],
indices: HashMap::new(),
pubkey_bytes: vec![],
store,
};
cache.import_new_pubkeys(state)?;
Ok(cache)
}
/// Load the pubkey cache from the given on-disk database.
pub fn load_from_store(store: BeaconStore<T>) -> Result<Self, BeaconChainError> {
let mut pubkeys = vec![];
let mut indices = HashMap::new();
let mut pubkey_bytes = vec![];
for validator_index in 0.. {
if let Some(DatabasePubkey(pubkey)) =
store.get_item(&DatabasePubkey::key_for_index(validator_index))?
{
pubkeys.push((&pubkey).try_into().map_err(|e| {
BeaconChainError::ValidatorPubkeyCacheError(format!("{:?}", e))
})?);
pubkey_bytes.push(pubkey);
indices.insert(pubkey, validator_index);
} else {
break;
}
}
Ok(ValidatorPubkeyCache {
pubkeys,
indices,
pubkey_bytes,
store,
})
}
/// Scan the given `state` and add any new validator public keys.
///
/// Does not delete any keys from `self` if they don't appear in `state`.
pub fn import_new_pubkeys(
&mut self,
state: &BeaconState<T::EthSpec>,
) -> Result<(), BeaconChainError> {
if state.validators().len() > self.pubkeys.len() {
self.import(
state
.validators()
.iter_from(self.pubkeys.len())
.unwrap() // FIXME(sproul)
.map(|v| *v.pubkey()),
)
} else {
Ok(())
}
}
/// Adds zero or more validators to `self`.
fn import<I>(&mut self, validator_keys: I) -> Result<(), BeaconChainError>
where
I: Iterator<Item = PublicKeyBytes> + ExactSizeIterator,
{
self.pubkey_bytes.reserve(validator_keys.len());
self.pubkeys.reserve(validator_keys.len());
self.indices.reserve(validator_keys.len());
for pubkey in validator_keys {
let i = self.pubkeys.len();
if self.indices.contains_key(&pubkey) {
return Err(BeaconChainError::DuplicateValidatorPublicKey);
}
// The item is written to disk _before_ it is written into
// the local struct.
//
// This means that a pubkey cache read from disk will always be equivalent to or
// _later than_ the cache that was running in the previous instance of Lighthouse.
//
// The motivation behind this ordering is that we do not want to have states that
// reference a pubkey that is not in our cache. However, it's fine to have pubkeys
// that are never referenced in a state.
self.store
.put_item(&DatabasePubkey::key_for_index(i), &DatabasePubkey(pubkey))?;
self.pubkeys.push(
(&pubkey)
.try_into()
.map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?,
);
self.pubkey_bytes.push(pubkey);
self.indices.insert(pubkey, i);
}
Ok(())
}
/// Get the public key for a validator with index `i`.
pub fn get(&self, i: usize) -> Option<&PublicKey> {
self.pubkeys.get(i)
}
/// Get the `PublicKey` for a validator with `PublicKeyBytes`.
pub fn get_pubkey_from_pubkey_bytes(&self, pubkey: &PublicKeyBytes) -> Option<&PublicKey> {
self.get_index(pubkey).and_then(|index| self.get(index))
}
/// Get the public key (in bytes form) for a validator with index `i`.
pub fn get_pubkey_bytes(&self, i: usize) -> Option<&PublicKeyBytes> {
self.pubkey_bytes.get(i)
}
/// Get the index of a validator with `pubkey`.
pub fn get_index(&self, pubkey: &PublicKeyBytes) -> Option<usize> {
self.indices.get(pubkey).copied()
}
/// Returns the number of validators in the cache.
pub fn len(&self) -> usize {
self.indices.len()
}
/// Returns `true` if there are no validators in the cache.
pub fn is_empty(&self) -> bool {
self.indices.is_empty()
}
}
/// Wrapper for a public key stored in the database.
///
/// Keyed by the validator index as `Hash256::from_low_u64_be(index)`.
struct DatabasePubkey(PublicKeyBytes);
impl StoreItem for DatabasePubkey {
fn db_column() -> DBColumn {
DBColumn::PubkeyCache
}
fn as_store_bytes(&self) -> Result<Vec<u8>, StoreError> {
Ok(self.0.as_ssz_bytes())
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, StoreError> {
Ok(Self(PublicKeyBytes::from_ssz_bytes(bytes)?))
}
}
impl DatabasePubkey {
fn key_for_index(index: usize) -> Hash256 {
Hash256::from_low_u64_be(index as u64)
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::test_utils::{BeaconChainHarness, EphemeralHarnessType};
use logging::test_logger;
use std::sync::Arc;
use store::HotColdDB;
use types::{BeaconState, EthSpec, Keypair, MainnetEthSpec};
type E = MainnetEthSpec;
type T = EphemeralHarnessType<E>;
fn get_state(validator_count: usize) -> (BeaconState<E>, Vec<Keypair>) {
let harness = BeaconChainHarness::builder(MainnetEthSpec)
.default_spec()
.deterministic_keypairs(validator_count)
.fresh_ephemeral_store()
.build();
harness.advance_slot();
(harness.get_current_state(), harness.validator_keypairs)
}
fn get_store() -> BeaconStore<T> {
Arc::new(
HotColdDB::open_ephemeral(<_>::default(), E::default_spec(), test_logger()).unwrap(),
)
}
#[allow(clippy::needless_range_loop)]
fn check_cache_get(cache: &ValidatorPubkeyCache<T>, keypairs: &[Keypair]) {
let validator_count = keypairs.len();
for i in 0..validator_count + 1 {
if i < validator_count {
let pubkey = cache.get(i).expect("pubkey should be present");
assert_eq!(pubkey, &keypairs[i].pk, "pubkey should match cache");
let pubkey_bytes: PublicKeyBytes = pubkey.clone().into();
assert_eq!(
i,
cache
.get_index(&pubkey_bytes)
.expect("should resolve index"),
"index should match cache"
);
} else {
assert_eq!(
cache.get(i),
None,
"should not get pubkey for out of bounds index",
);
}
}
}
#[test]
fn basic_operation() {
let (state, keypairs) = get_state(8);
let store = get_store();
let mut cache = ValidatorPubkeyCache::new(&state, store).expect("should create cache");
check_cache_get(&cache, &keypairs[..]);
// Try adding a state with the same number of keypairs.
let (state, keypairs) = get_state(8);
cache
.import_new_pubkeys(&state)
.expect("should import pubkeys");
check_cache_get(&cache, &keypairs[..]);
// Try adding a state with less keypairs.
let (state, _) = get_state(1);
cache
.import_new_pubkeys(&state)
.expect("should import pubkeys");
check_cache_get(&cache, &keypairs[..]);
// Try adding a state with more keypairs.
let (state, keypairs) = get_state(12);
cache
.import_new_pubkeys(&state)
.expect("should import pubkeys");
check_cache_get(&cache, &keypairs[..]);
}
#[test]
fn persistence() {
let (state, keypairs) = get_state(8);
let store = get_store();
// Create a new cache.
let cache = ValidatorPubkeyCache::new(&state, store.clone()).expect("should create cache");
check_cache_get(&cache, &keypairs[..]);
drop(cache);
// Re-init the cache from the store.
let mut cache =
ValidatorPubkeyCache::load_from_store(store.clone()).expect("should open cache");
check_cache_get(&cache, &keypairs[..]);
// Add some more keypairs.
let (state, keypairs) = get_state(12);
cache
.import_new_pubkeys(&state)
.expect("should import pubkeys");
check_cache_get(&cache, &keypairs[..]);
drop(cache);
// Re-init the cache from the store.
let cache = ValidatorPubkeyCache::load_from_store(store).expect("should open cache");
check_cache_get(&cache, &keypairs[..]);
}
}