mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-18 21:38:31 +00:00
Use global pubkey cache for block processing (#849)
* Start updating types * WIP * Signature hacking * Existing EF tests passing with fake_crypto * Updates * Delete outdated API spec * The refactor continues * It compiles * WIP test fixes * All release tests passing bar genesis state parsing * Update and test YamlConfig * Update to spec v0.10 compatible BLS * Updates to BLS EF tests * Add EF test for AggregateVerify And delete unused hash2curve tests for uncompressed points * Update EF tests to v0.10.1 * Use optional block root correctly in block proc * Use genesis fork in deposit domain. All tests pass * Cargo fmt * Fast aggregate verify test * Update REST API docs * Cargo fmt * Fix unused import * Bump spec tags to v0.10.1 * Add `seconds_per_eth1_block` to chainspec * Update to timestamp based eth1 voting scheme * Return None from `get_votes_to_consider` if block cache is empty * Handle overflows in `is_candidate_block` * Revert to failing tests * Fix eth1 data sets test * Choose default vote according to spec * Fix collect_valid_votes tests * Fix `get_votes_to_consider` to choose all eligible blocks * Uncomment winning_vote tests * Add comments; remove unused code * Reduce seconds_per_eth1_block for simulation * Addressed review comments * Add test for default vote case * Fix logs * Remove unused functions * Meter default eth1 votes * Fix comments * Address review comments; remove unused dependency * Add first attempt at attestation proc. re-write * Add version 2 of attestation processing * Minor fixes * Add validator pubkey cache * Make get_indexed_attestation take a committee * Link signature processing into new attn verification * First working version * Ensure pubkey cache is updated * Add more metrics, slight optimizations * Clone committee cache during attestation processing * Update shuffling cache during block processing * Remove old commented-out code * Fix shuffling cache insert bug * Used indexed attestation in fork choice * Restructure attn processing, add metrics * Add more detailed metrics * Tidy, fix failing tests * Fix failing tests, tidy * Disable/delete two outdated tests * Add new Pubkeys struct to signature_sets * Refactor with functional approach * Update beacon chain * Remove decompressed member from pubkey bytes * Add hashmap for indices lookup * Change `get_attesting_indices` to use Vec * Fix failing test * Tidy * Add pubkey cache persistence file * Add more comments * Integrate persistence file into builder * Add pubkey cache tests * Add data_dir to beacon chain builder * Remove Option in pubkey cache persistence file * Ensure consistency between datadir/data_dir * Fix failing network test * Tidy * Fix todos * Improve tests * Split up block processing metrics * Tidy * Refactor get_pubkey_from_state * Remove commented-out code * Add BeaconChain::validator_pubkey * Use Option::filter * Remove Box * Comment out tests that fail due to hard-coded Co-authored-by: Michael Sproul <michael@sigmaprime.io> Co-authored-by: Michael Sproul <micsproul@gmail.com> Co-authored-by: pawan <pawandhananjay@gmail.com>
This commit is contained in:
@@ -21,7 +21,7 @@ use state_processing::per_block_processing::errors::{
|
||||
use state_processing::{
|
||||
common::get_indexed_attestation, per_block_processing, per_slot_processing,
|
||||
signature_sets::indexed_attestation_signature_set_from_pubkeys, BlockProcessingError,
|
||||
BlockSignatureStrategy,
|
||||
BlockSignatureStrategy, BlockSignatureVerifier,
|
||||
};
|
||||
use std::borrow::Cow;
|
||||
use std::cmp::Ordering;
|
||||
@@ -74,7 +74,10 @@ pub const FORK_CHOICE_DB_KEY: [u8; 32] = [0; 32];
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum BlockProcessingOutcome {
|
||||
/// Block was valid and imported into the block graph.
|
||||
Processed { block_root: Hash256 },
|
||||
Processed {
|
||||
block_root: Hash256,
|
||||
},
|
||||
InvalidSignature,
|
||||
/// The parent block was unknown.
|
||||
ParentUnknown {
|
||||
parent: Hash256,
|
||||
@@ -86,7 +89,10 @@ pub enum BlockProcessingOutcome {
|
||||
block_slot: Slot,
|
||||
},
|
||||
/// The block state_root does not match the generated state.
|
||||
StateRootMismatch { block: Hash256, local: Hash256 },
|
||||
StateRootMismatch {
|
||||
block: Hash256,
|
||||
local: Hash256,
|
||||
},
|
||||
/// The block was a genesis block, these blocks cannot be re-imported.
|
||||
GenesisBlock,
|
||||
/// The slot is finalized, no need to import.
|
||||
@@ -594,14 +600,44 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
|
||||
/// Returns the validator index (if any) for the given public key.
|
||||
///
|
||||
/// Information is retrieved from the present `beacon_state.validators`.
|
||||
/// ## Notes
|
||||
///
|
||||
/// This query uses the `validator_pubkey_cache` which contains _all_ validators ever seen,
|
||||
/// even if those validators aren't included in the head state. It is important to remember
|
||||
/// that just because a validator exists here, it doesn't necessarily exist in all
|
||||
/// `BeaconStates`.
|
||||
///
|
||||
/// ## Errors
|
||||
///
|
||||
/// May return an error if acquiring a read-lock on the `validator_pubkey_cache` times out.
|
||||
pub fn validator_index(&self, pubkey: &PublicKeyBytes) -> Result<Option<usize>, Error> {
|
||||
for (i, validator) in self.head()?.beacon_state.validators.iter().enumerate() {
|
||||
if validator.pubkey == *pubkey {
|
||||
return Ok(Some(i));
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
let pubkey_cache = self
|
||||
.validator_pubkey_cache
|
||||
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
|
||||
.ok_or_else(|| Error::ValidatorPubkeyCacheLockTimeout)?;
|
||||
|
||||
Ok(pubkey_cache.get_index(pubkey))
|
||||
}
|
||||
|
||||
/// Returns the validator pubkey (if any) for the given validator index.
|
||||
///
|
||||
/// ## Notes
|
||||
///
|
||||
/// This query uses the `validator_pubkey_cache` which contains _all_ validators ever seen,
|
||||
/// even if those validators aren't included in the head state. It is important to remember
|
||||
/// that just because a validator exists here, it doesn't necessarily exist in all
|
||||
/// `BeaconStates`.
|
||||
///
|
||||
/// ## Errors
|
||||
///
|
||||
/// May return an error if acquiring a read-lock on the `validator_pubkey_cache` times out.
|
||||
pub fn validator_pubkey(&self, validator_index: usize) -> Result<Option<PublicKey>, Error> {
|
||||
let pubkey_cache = self
|
||||
.validator_pubkey_cache
|
||||
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
|
||||
.ok_or_else(|| Error::ValidatorPubkeyCacheLockTimeout)?;
|
||||
|
||||
Ok(pubkey_cache.get(validator_index).cloned())
|
||||
}
|
||||
|
||||
/// Returns the block canonical root of the current canonical chain at a given slot.
|
||||
@@ -1007,16 +1043,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
|
||||
.ok_or_else(|| Error::ValidatorPubkeyCacheLockTimeout)?;
|
||||
|
||||
let pubkeys = indexed_attestation
|
||||
.attesting_indices
|
||||
.iter()
|
||||
.map(|i| {
|
||||
pubkey_cache
|
||||
.get(*i as usize)
|
||||
.ok_or_else(|| Error::ValidatorPubkeyCacheIncomplete(*i as usize))
|
||||
})
|
||||
.collect::<Result<Vec<&PublicKey>, Error>>()?;
|
||||
|
||||
let (fork, genesis_validators_root) = self
|
||||
.canonical_head
|
||||
.try_read_for(HEAD_LOCK_TIMEOUT)
|
||||
@@ -1029,7 +1055,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
})?;
|
||||
|
||||
let signature_set = indexed_attestation_signature_set_from_pubkeys(
|
||||
pubkeys,
|
||||
|validator_index| {
|
||||
pubkey_cache
|
||||
.get(validator_index)
|
||||
.map(|pk| Cow::Borrowed(pk.as_point()))
|
||||
},
|
||||
&attestation.signature,
|
||||
&indexed_attestation,
|
||||
&fork,
|
||||
@@ -1047,6 +1077,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
|
||||
metrics::stop_timer(signature_verification_timer);
|
||||
|
||||
drop(pubkey_cache);
|
||||
|
||||
if signature_is_valid {
|
||||
// Provide the attestation to fork choice, updating the validator latest messages but
|
||||
// _without_ finding and updating the head.
|
||||
@@ -1364,6 +1396,39 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
&self.log,
|
||||
);
|
||||
|
||||
let signature_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_SIGNATURE);
|
||||
|
||||
let signature_verification_result = {
|
||||
let validator_pubkey_cache = self
|
||||
.validator_pubkey_cache
|
||||
.try_write_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
|
||||
.ok_or_else(|| Error::ValidatorPubkeyCacheLockTimeout)?;
|
||||
|
||||
BlockSignatureVerifier::verify_entire_block(
|
||||
&state,
|
||||
|validator_index| {
|
||||
// Disallow access to any validator pubkeys that are not in the current beacon
|
||||
// state.
|
||||
if validator_index < state.validators.len() {
|
||||
validator_pubkey_cache
|
||||
.get(validator_index)
|
||||
.map(|pk| Cow::Borrowed(pk.as_point()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
},
|
||||
&signed_block,
|
||||
Some(block_root),
|
||||
&self.spec,
|
||||
)
|
||||
};
|
||||
|
||||
if signature_verification_result.is_err() {
|
||||
return Ok(BlockProcessingOutcome::InvalidSignature);
|
||||
}
|
||||
|
||||
metrics::stop_timer(signature_timer);
|
||||
|
||||
let core_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_CORE);
|
||||
|
||||
// Apply the received block to its parent state (which has been transitioned into this
|
||||
@@ -1372,7 +1437,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
&mut state,
|
||||
&signed_block,
|
||||
Some(block_root),
|
||||
BlockSignatureStrategy::VerifyBulk,
|
||||
// Signatures were verified earlier in this function.
|
||||
BlockSignatureStrategy::NoVerification,
|
||||
&self.spec,
|
||||
) {
|
||||
Err(BlockProcessingError::BeaconStateError(e)) => {
|
||||
|
||||
@@ -58,6 +58,8 @@ pub enum BeaconChainError {
|
||||
InvalidValidatorPubkeyBytes(DecodeError),
|
||||
ValidatorPubkeyCacheIncomplete(usize),
|
||||
SignatureSetError(state_processing::signature_sets::Error),
|
||||
BlockSignatureVerifierError(state_processing::block_signature_verifier::Error),
|
||||
DuplicateValidatorPublicKey,
|
||||
ValidatorPubkeyCacheFileError(String),
|
||||
}
|
||||
|
||||
|
||||
@@ -32,6 +32,10 @@ lazy_static! {
|
||||
"beacon_block_processing_committee_building_seconds",
|
||||
"Time spent building/obtaining committees for block processing."
|
||||
);
|
||||
pub static ref BLOCK_PROCESSING_SIGNATURE: Result<Histogram> = try_create_histogram(
|
||||
"beacon_block_processing_signature_seconds",
|
||||
"Time spent doing signature verification for a block."
|
||||
);
|
||||
pub static ref BLOCK_PROCESSING_CORE: Result<Histogram> = try_create_histogram(
|
||||
"beacon_block_processing_core_seconds",
|
||||
"Time spent doing the core per_block_processing state processing."
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
use crate::errors::BeaconChainError;
|
||||
use ssz::{Decode, DecodeError, Encode};
|
||||
use std::collections::HashMap;
|
||||
use std::convert::TryInto;
|
||||
use std::fs::{File, OpenOptions};
|
||||
use std::io::{self, Read, Write};
|
||||
use std::path::Path;
|
||||
use types::{BeaconState, EthSpec, PublicKey, PublicKeyBytes};
|
||||
use types::{BeaconState, EthSpec, PublicKey, PublicKeyBytes, Validator};
|
||||
|
||||
/// Provides a mapping of `validator_index -> validator_publickey`.
|
||||
///
|
||||
@@ -19,6 +20,7 @@ use types::{BeaconState, EthSpec, PublicKey, PublicKeyBytes};
|
||||
/// copy of itself. This allows it to be restored between process invocations.
|
||||
pub struct ValidatorPubkeyCache {
|
||||
pubkeys: Vec<PublicKey>,
|
||||
indices: HashMap<PublicKeyBytes, usize>,
|
||||
persitence_file: ValidatorPubkeyCacheFile,
|
||||
}
|
||||
|
||||
@@ -47,6 +49,7 @@ impl ValidatorPubkeyCache {
|
||||
let mut cache = Self {
|
||||
persitence_file: ValidatorPubkeyCacheFile::create(persistence_path)?,
|
||||
pubkeys: vec![],
|
||||
indices: HashMap::new(),
|
||||
};
|
||||
|
||||
cache.import_new_pubkeys(state)?;
|
||||
@@ -61,38 +64,57 @@ impl ValidatorPubkeyCache {
|
||||
&mut self,
|
||||
state: &BeaconState<T>,
|
||||
) -> Result<(), BeaconChainError> {
|
||||
state
|
||||
.validators
|
||||
.iter()
|
||||
.skip(self.pubkeys.len())
|
||||
.try_for_each(|v| {
|
||||
let i = self.pubkeys.len();
|
||||
if state.validators.len() > self.pubkeys.len() {
|
||||
self.import(&state.validators[self.pubkeys.len()..])
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// The item is written to disk (the persistence file) _before_ it is written into
|
||||
// the local struct.
|
||||
//
|
||||
// This means that a pubkey cache read from disk will always be equivalent to or
|
||||
// _later than_ the cache that was running in the previous instance of Lighthouse.
|
||||
//
|
||||
// The motivation behind this ordering is that we do not want to have states that
|
||||
// reference a pubkey that is not in our cache. However, it's fine to have pubkeys
|
||||
// that are never referenced in a state.
|
||||
self.persitence_file.append(i, &v.pubkey)?;
|
||||
/// Adds zero or more validators to `self`.
|
||||
fn import(&mut self, validators: &[Validator]) -> Result<(), BeaconChainError> {
|
||||
self.pubkeys.reserve(validators.len());
|
||||
self.indices.reserve(validators.len());
|
||||
|
||||
self.pubkeys.push(
|
||||
(&v.pubkey)
|
||||
.try_into()
|
||||
.map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?,
|
||||
);
|
||||
for v in validators.iter() {
|
||||
let i = self.pubkeys.len();
|
||||
|
||||
Ok(())
|
||||
})
|
||||
if self.indices.contains_key(&v.pubkey) {
|
||||
return Err(BeaconChainError::DuplicateValidatorPublicKey);
|
||||
}
|
||||
|
||||
// The item is written to disk (the persistence file) _before_ it is written into
|
||||
// the local struct.
|
||||
//
|
||||
// This means that a pubkey cache read from disk will always be equivalent to or
|
||||
// _later than_ the cache that was running in the previous instance of Lighthouse.
|
||||
//
|
||||
// The motivation behind this ordering is that we do not want to have states that
|
||||
// reference a pubkey that is not in our cache. However, it's fine to have pubkeys
|
||||
// that are never referenced in a state.
|
||||
self.persitence_file.append(i, &v.pubkey)?;
|
||||
|
||||
self.pubkeys.push(
|
||||
(&v.pubkey)
|
||||
.try_into()
|
||||
.map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?,
|
||||
);
|
||||
|
||||
self.indices.insert(v.pubkey.clone(), i);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get the public key for a validator with index `i`.
|
||||
pub fn get(&self, i: usize) -> Option<&PublicKey> {
|
||||
self.pubkeys.get(i)
|
||||
}
|
||||
|
||||
/// Get the index of a validator with `pubkey`.
|
||||
pub fn get_index(&self, pubkey: &PublicKeyBytes) -> Option<usize> {
|
||||
self.indices.get(pubkey).copied()
|
||||
}
|
||||
}
|
||||
|
||||
/// Allows for maintaining an on-disk copy of the `ValidatorPubkeyCache`. The file is raw SSZ bytes
|
||||
@@ -168,12 +190,14 @@ impl ValidatorPubkeyCacheFile {
|
||||
|
||||
let mut last = None;
|
||||
let mut pubkeys = Vec::with_capacity(list.len());
|
||||
let mut indices = HashMap::new();
|
||||
|
||||
for (index, pubkey) in list {
|
||||
let expected = last.map(|n| n + 1);
|
||||
if expected.map_or(true, |expected| index == expected) {
|
||||
last = Some(index);
|
||||
pubkeys.push((&pubkey).try_into().map_err(Error::SszError)?);
|
||||
indices.insert(pubkey, index);
|
||||
} else {
|
||||
return Err(Error::InconsistentIndex {
|
||||
expected,
|
||||
@@ -184,6 +208,7 @@ impl ValidatorPubkeyCacheFile {
|
||||
|
||||
Ok(ValidatorPubkeyCache {
|
||||
pubkeys,
|
||||
indices,
|
||||
persitence_file: self,
|
||||
})
|
||||
}
|
||||
@@ -221,6 +246,16 @@ mod test {
|
||||
if i < validator_count {
|
||||
let pubkey = cache.get(i).expect("pubkey should be present");
|
||||
assert_eq!(pubkey, &keypairs[i].pk, "pubkey should match cache");
|
||||
|
||||
let pubkey_bytes: PublicKeyBytes = pubkey.clone().into();
|
||||
|
||||
assert_eq!(
|
||||
i,
|
||||
cache
|
||||
.get_index(&pubkey_bytes)
|
||||
.expect("should resolve index"),
|
||||
"index should match cache"
|
||||
);
|
||||
} else {
|
||||
assert_eq!(
|
||||
cache.get(i),
|
||||
|
||||
@@ -189,11 +189,17 @@ fn return_validator_duties<T: BeaconChainTypes>(
|
||||
validator_pubkeys
|
||||
.into_iter()
|
||||
.map(|validator_pubkey| {
|
||||
if let Some(validator_index) =
|
||||
state.get_validator_index(&validator_pubkey).map_err(|e| {
|
||||
ApiError::ServerError(format!("Unable to read pubkey cache: {:?}", e))
|
||||
// The `beacon_chain` can return a validator index that does not exist in all states.
|
||||
// Therefore, we must check to ensure that the validator index is valid for our
|
||||
// `state`.
|
||||
let validator_index = beacon_chain
|
||||
.validator_index(&validator_pubkey)
|
||||
.map_err(|e| {
|
||||
ApiError::ServerError(format!("Unable to get validator index: {:?}", e))
|
||||
})?
|
||||
{
|
||||
.filter(|i| *i < state.validators.len());
|
||||
|
||||
if let Some(validator_index) = validator_index {
|
||||
let duties = state
|
||||
.get_attestation_duties(validator_index, relative_epoch)
|
||||
.map_err(|e| {
|
||||
|
||||
Reference in New Issue
Block a user