mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-02 16:21:42 +00:00
Delete attester cache (#8469)
Fixes attester cache write lock contention. Alternative to #8463. Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>
This commit is contained in:
@@ -1,385 +0,0 @@
|
||||
//! This module provides the `AttesterCache`, a cache designed for reducing state-reads when
|
||||
//! validators produce `AttestationData`.
|
||||
//!
|
||||
//! This cache is required *as well as* the `ShufflingCache` since the `ShufflingCache` does not
|
||||
//! provide any information about the `state.current_justified_checkpoint`. It is not trivial to add
|
||||
//! the justified checkpoint to the `ShufflingCache` since that cache is keyed by shuffling decision
|
||||
//! root, which is not suitable for the justified checkpoint. Whilst we can know the shuffling for
|
||||
//! epoch `n` during `n - 1`, we *cannot* know the justified checkpoint. Instead, we *must* perform
|
||||
//! `per_epoch_processing` to transform the state from epoch `n - 1` to epoch `n` so that rewards
|
||||
//! and penalties can be computed and the `state.current_justified_checkpoint` can be updated.
|
||||
|
||||
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
|
||||
use fixed_bytes::FixedBytesExtended;
|
||||
use parking_lot::RwLock;
|
||||
use state_processing::state_advance::{Error as StateAdvanceError, partial_state_advance};
|
||||
use std::collections::HashMap;
|
||||
use std::ops::Range;
|
||||
use types::{
|
||||
BeaconState, BeaconStateError, ChainSpec, Checkpoint, Epoch, EthSpec, Hash256, RelativeEpoch,
|
||||
Slot,
|
||||
attestation::AttestationError,
|
||||
beacon_state::{
|
||||
compute_committee_index_in_epoch, compute_committee_range_in_epoch, epoch_committee_count,
|
||||
},
|
||||
};
|
||||
|
||||
type JustifiedCheckpoint = Checkpoint;
|
||||
type CommitteeLength = usize;
|
||||
type CommitteeIndex = u64;
|
||||
type CacheHashMap = HashMap<AttesterCacheKey, AttesterCacheValue>;
|
||||
|
||||
/// The maximum number of `AttesterCacheValues` to be kept in memory.
|
||||
///
|
||||
/// Each `AttesterCacheValues` is very small (~16 bytes) and the cache will generally be kept small
|
||||
/// by pruning on finality.
|
||||
///
|
||||
/// The value provided here is much larger than will be used during ideal network conditions,
|
||||
/// however we make it large since the values are so small.
|
||||
const MAX_CACHE_LEN: usize = 1_024;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
BeaconState(BeaconStateError),
|
||||
// Boxed to avoid an infinite-size recursion issue.
|
||||
BeaconChain(Box<BeaconChainError>),
|
||||
MissingBeaconState(Hash256),
|
||||
FailedToTransitionState(StateAdvanceError),
|
||||
CannotAttestToFutureState {
|
||||
state_slot: Slot,
|
||||
request_slot: Slot,
|
||||
},
|
||||
/// Indicates a cache inconsistency.
|
||||
WrongEpoch {
|
||||
request_epoch: Epoch,
|
||||
epoch: Epoch,
|
||||
},
|
||||
InvalidCommitteeIndex {
|
||||
committee_index: u64,
|
||||
},
|
||||
/// Indicates an inconsistency with the beacon state committees.
|
||||
InverseRange {
|
||||
range: Range<usize>,
|
||||
},
|
||||
AttestationError(AttestationError),
|
||||
}
|
||||
|
||||
impl From<BeaconStateError> for Error {
|
||||
fn from(e: BeaconStateError) -> Self {
|
||||
Error::BeaconState(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<BeaconChainError> for Error {
|
||||
fn from(e: BeaconChainError) -> Self {
|
||||
Error::BeaconChain(Box::new(e))
|
||||
}
|
||||
}
|
||||
|
||||
/// Stores the minimal amount of data required to compute the committee length for any committee at any
|
||||
/// slot in a given `epoch`.
|
||||
pub struct CommitteeLengths {
|
||||
/// The `epoch` to which the lengths pertain.
|
||||
epoch: Epoch,
|
||||
/// The length of the shuffling in `self.epoch`.
|
||||
active_validator_indices_len: usize,
|
||||
}
|
||||
|
||||
impl CommitteeLengths {
|
||||
/// Instantiate `Self` using `state.current_epoch()`.
|
||||
pub fn new<E: EthSpec>(state: &BeaconState<E>, spec: &ChainSpec) -> Result<Self, Error> {
|
||||
let active_validator_indices_len = if let Ok(committee_cache) =
|
||||
state.committee_cache(RelativeEpoch::Current)
|
||||
{
|
||||
committee_cache.active_validator_indices().len()
|
||||
} else {
|
||||
// Building the cache like this avoids taking a mutable reference to `BeaconState`.
|
||||
let committee_cache = state.initialize_committee_cache(state.current_epoch(), spec)?;
|
||||
committee_cache.active_validator_indices().len()
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
epoch: state.current_epoch(),
|
||||
active_validator_indices_len,
|
||||
})
|
||||
}
|
||||
|
||||
/// Get the count of committees per each slot of `self.epoch`.
|
||||
pub fn get_committee_count_per_slot<E: EthSpec>(
|
||||
&self,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<usize, Error> {
|
||||
E::get_committee_count_per_slot(self.active_validator_indices_len, spec).map_err(Into::into)
|
||||
}
|
||||
|
||||
/// Get the length of the committee at the given `slot` and `committee_index`.
|
||||
pub fn get_committee_length<E: EthSpec>(
|
||||
&self,
|
||||
slot: Slot,
|
||||
committee_index: CommitteeIndex,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<CommitteeLength, Error> {
|
||||
let slots_per_epoch = E::slots_per_epoch();
|
||||
let request_epoch = slot.epoch(slots_per_epoch);
|
||||
|
||||
// Sanity check.
|
||||
if request_epoch != self.epoch {
|
||||
return Err(Error::WrongEpoch {
|
||||
request_epoch,
|
||||
epoch: self.epoch,
|
||||
});
|
||||
}
|
||||
|
||||
let slots_per_epoch = slots_per_epoch as usize;
|
||||
let committees_per_slot = self.get_committee_count_per_slot::<E>(spec)?;
|
||||
let index_in_epoch = compute_committee_index_in_epoch(
|
||||
slot,
|
||||
slots_per_epoch,
|
||||
committees_per_slot,
|
||||
committee_index as usize,
|
||||
);
|
||||
let range = compute_committee_range_in_epoch(
|
||||
epoch_committee_count(committees_per_slot, slots_per_epoch),
|
||||
index_in_epoch,
|
||||
self.active_validator_indices_len,
|
||||
)
|
||||
.ok_or(Error::InvalidCommitteeIndex { committee_index })?;
|
||||
|
||||
range
|
||||
.end
|
||||
.checked_sub(range.start)
|
||||
.ok_or(Error::InverseRange { range })
|
||||
}
|
||||
}
|
||||
|
||||
/// Provides the following information for some epoch:
|
||||
///
|
||||
/// - The `state.current_justified_checkpoint` value.
|
||||
/// - The committee lengths for all indices and slots.
|
||||
///
|
||||
/// These values are used during attestation production.
|
||||
pub struct AttesterCacheValue {
|
||||
current_justified_checkpoint: Checkpoint,
|
||||
committee_lengths: CommitteeLengths,
|
||||
}
|
||||
|
||||
impl AttesterCacheValue {
|
||||
/// Instantiate `Self` using `state.current_epoch()`.
|
||||
pub fn new<E: EthSpec>(state: &BeaconState<E>, spec: &ChainSpec) -> Result<Self, Error> {
|
||||
let current_justified_checkpoint = state.current_justified_checkpoint();
|
||||
let committee_lengths = CommitteeLengths::new(state, spec)?;
|
||||
Ok(Self {
|
||||
current_justified_checkpoint,
|
||||
committee_lengths,
|
||||
})
|
||||
}
|
||||
|
||||
/// Get the justified checkpoint and committee length for some `slot` and `committee_index`.
|
||||
fn get<E: EthSpec>(
|
||||
&self,
|
||||
slot: Slot,
|
||||
committee_index: CommitteeIndex,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<(JustifiedCheckpoint, CommitteeLength), Error> {
|
||||
self.committee_lengths
|
||||
.get_committee_length::<E>(slot, committee_index, spec)
|
||||
.map(|committee_length| (self.current_justified_checkpoint, committee_length))
|
||||
}
|
||||
}
|
||||
|
||||
/// The `AttesterCacheKey` is fundamentally the same thing as the proposer shuffling decision root,
|
||||
/// however here we use it as an identity for both of the following values:
|
||||
///
|
||||
/// 1. The `state.current_justified_checkpoint`.
|
||||
/// 2. The attester shuffling.
|
||||
///
|
||||
/// This struct relies upon the premise that the `state.current_justified_checkpoint` in epoch `n`
|
||||
/// is determined by the root of the latest block in epoch `n - 1`. Notably, this is identical to
|
||||
/// how the proposer shuffling is keyed in `BeaconProposerCache`.
|
||||
///
|
||||
/// It is also safe, but not maximally efficient, to key the attester shuffling with the same
|
||||
/// strategy. For better shuffling keying strategies, see the `ShufflingCache`.
|
||||
#[derive(Eq, PartialEq, Hash, Clone, Copy)]
|
||||
pub struct AttesterCacheKey {
|
||||
/// The epoch from which the justified checkpoint should be observed.
|
||||
///
|
||||
/// Attestations which use `self.epoch` as `target.epoch` should use this key.
|
||||
epoch: Epoch,
|
||||
/// The root of the block at the last slot of `self.epoch - 1`.
|
||||
decision_root: Hash256,
|
||||
}
|
||||
|
||||
impl AttesterCacheKey {
|
||||
/// Instantiate `Self` to key `state.current_epoch()`.
|
||||
///
|
||||
/// The `latest_block_root` should be the latest block that has been applied to `state`. This
|
||||
/// parameter is required since the state does not store the block root for any block with the
|
||||
/// same slot as `state.slot()`.
|
||||
///
|
||||
/// ## Errors
|
||||
///
|
||||
/// May error if `epoch` is out of the range of `state.block_roots`.
|
||||
pub fn new<E: EthSpec>(
|
||||
epoch: Epoch,
|
||||
state: &BeaconState<E>,
|
||||
latest_block_root: Hash256,
|
||||
) -> Result<Self, Error> {
|
||||
let slots_per_epoch = E::slots_per_epoch();
|
||||
let decision_slot = epoch.start_slot(slots_per_epoch).saturating_sub(1_u64);
|
||||
|
||||
let decision_root = if decision_slot.epoch(slots_per_epoch) == epoch {
|
||||
// This scenario is only possible during the genesis epoch. In this scenario, all-zeros
|
||||
// is used as an alias to the genesis block.
|
||||
Hash256::zero()
|
||||
} else if epoch > state.current_epoch() {
|
||||
// If the requested epoch is higher than the current epoch, the latest block will always
|
||||
// be the decision root.
|
||||
latest_block_root
|
||||
} else {
|
||||
*state.get_block_root(decision_slot)?
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
epoch,
|
||||
decision_root,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Provides a cache for the justified checkpoint and committee length when producing an
|
||||
/// attestation.
|
||||
///
|
||||
/// See the module-level documentation for more information.
|
||||
#[derive(Default)]
|
||||
pub struct AttesterCache {
|
||||
cache: RwLock<CacheHashMap>,
|
||||
}
|
||||
|
||||
impl AttesterCache {
|
||||
/// Get the justified checkpoint and committee length for the `slot` and `committee_index` in
|
||||
/// the state identified by the cache `key`.
|
||||
pub fn get<E: EthSpec>(
|
||||
&self,
|
||||
key: &AttesterCacheKey,
|
||||
slot: Slot,
|
||||
committee_index: CommitteeIndex,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<Option<(JustifiedCheckpoint, CommitteeLength)>, Error> {
|
||||
self.cache
|
||||
.read()
|
||||
.get(key)
|
||||
.map(|cache_item| cache_item.get::<E>(slot, committee_index, spec))
|
||||
.transpose()
|
||||
}
|
||||
|
||||
/// Cache the `state.current_epoch()` values if they are not already present in the state.
|
||||
pub fn maybe_cache_state<E: EthSpec>(
|
||||
&self,
|
||||
state: &BeaconState<E>,
|
||||
latest_block_root: Hash256,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<(), Error> {
|
||||
let key = AttesterCacheKey::new(state.current_epoch(), state, latest_block_root)?;
|
||||
let mut cache = self.cache.write();
|
||||
if !cache.contains_key(&key) {
|
||||
let cache_item = AttesterCacheValue::new(state, spec)?;
|
||||
Self::insert_respecting_max_len(&mut cache, key, cache_item);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Read the state identified by `state_root` from the database, advance it to the required
|
||||
/// slot, use it to prime the cache and return the values for the provided `slot` and
|
||||
/// `committee_index`.
|
||||
///
|
||||
/// ## Notes
|
||||
///
|
||||
/// This function takes a write-lock on the internal cache. Prefer attempting a `Self::get` call
|
||||
/// before running this function as `Self::get` only takes a read-lock and is therefore less
|
||||
/// likely to create contention.
|
||||
pub fn load_and_cache_state<T: BeaconChainTypes>(
|
||||
&self,
|
||||
state_root: Hash256,
|
||||
key: AttesterCacheKey,
|
||||
slot: Slot,
|
||||
committee_index: CommitteeIndex,
|
||||
chain: &BeaconChain<T>,
|
||||
) -> Result<(JustifiedCheckpoint, CommitteeLength), Error> {
|
||||
let spec = &chain.spec;
|
||||
let slots_per_epoch = T::EthSpec::slots_per_epoch();
|
||||
let epoch = slot.epoch(slots_per_epoch);
|
||||
|
||||
// Take a write-lock on the cache before starting the state read.
|
||||
//
|
||||
// Whilst holding the write-lock during the state read will create contention, it prevents
|
||||
// the scenario where multiple requests from separate threads cause duplicate state reads.
|
||||
let mut cache = self.cache.write();
|
||||
|
||||
// Try the cache to see if someone has already primed it between the time the function was
|
||||
// called and when the cache write-lock was obtained. This avoids performing duplicate state
|
||||
// reads.
|
||||
if let Some(value) = cache
|
||||
.get(&key)
|
||||
.map(|cache_item| cache_item.get::<T::EthSpec>(slot, committee_index, spec))
|
||||
.transpose()?
|
||||
{
|
||||
return Ok(value);
|
||||
}
|
||||
|
||||
// We use `cache_state = true` here because if we are attesting to the state it's likely
|
||||
// to be recent and useful for other things.
|
||||
let mut state: BeaconState<T::EthSpec> = chain
|
||||
.get_state(&state_root, None, true)?
|
||||
.ok_or(Error::MissingBeaconState(state_root))?;
|
||||
|
||||
if state.slot() > slot {
|
||||
// This indicates an internal inconsistency.
|
||||
return Err(Error::CannotAttestToFutureState {
|
||||
state_slot: state.slot(),
|
||||
request_slot: slot,
|
||||
});
|
||||
} else if state.current_epoch() < epoch {
|
||||
// Only perform a "partial" state advance since we do not require the state roots to be
|
||||
// accurate.
|
||||
partial_state_advance(
|
||||
&mut state,
|
||||
Some(state_root),
|
||||
epoch.start_slot(slots_per_epoch),
|
||||
spec,
|
||||
)
|
||||
.map_err(Error::FailedToTransitionState)?;
|
||||
state.build_committee_cache(RelativeEpoch::Current, spec)?;
|
||||
}
|
||||
|
||||
let cache_item = AttesterCacheValue::new(&state, spec)?;
|
||||
let value = cache_item.get::<T::EthSpec>(slot, committee_index, spec)?;
|
||||
Self::insert_respecting_max_len(&mut cache, key, cache_item);
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
/// Insert a value to `cache`, ensuring it does not exceed the maximum length.
|
||||
///
|
||||
/// If the cache is already full, the item with the lowest epoch will be removed.
|
||||
fn insert_respecting_max_len(
|
||||
cache: &mut CacheHashMap,
|
||||
key: AttesterCacheKey,
|
||||
value: AttesterCacheValue,
|
||||
) {
|
||||
while cache.len() >= MAX_CACHE_LEN {
|
||||
if let Some(oldest) = cache.keys().copied().min_by_key(|key| key.epoch) {
|
||||
cache.remove(&oldest);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
cache.insert(key, value);
|
||||
}
|
||||
|
||||
/// Remove all entries where the `key.epoch` is lower than the given `epoch`.
|
||||
///
|
||||
/// Generally, the provided `epoch` should be the finalized epoch.
|
||||
pub fn prune_below(&self, epoch: Epoch) {
|
||||
self.cache.write().retain(|target, _| target.epoch >= epoch);
|
||||
}
|
||||
}
|
||||
@@ -3,7 +3,6 @@ use crate::attestation_verification::{
|
||||
VerifiedUnaggregatedAttestation, batch_verify_aggregated_attestations,
|
||||
batch_verify_unaggregated_attestations,
|
||||
};
|
||||
use crate::attester_cache::{AttesterCache, AttesterCacheKey};
|
||||
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckCaches};
|
||||
use crate::beacon_proposer_cache::{
|
||||
BeaconProposerCache, EpochBlockProposers, ensure_state_can_determine_proposers_for_epoch,
|
||||
@@ -92,6 +91,7 @@ use futures::channel::mpsc::Sender;
|
||||
use itertools::Itertools;
|
||||
use itertools::process_results;
|
||||
use kzg::Kzg;
|
||||
use lighthouse_tracing::SPAN_PRODUCE_UNAGGREGATED_ATTESTATION;
|
||||
use logging::crit;
|
||||
use operation_pool::{
|
||||
CompactAttestationRef, OperationPool, PersistedOperationPool, ReceivedPreCapella,
|
||||
@@ -455,8 +455,6 @@ pub struct BeaconChain<T: BeaconChainTypes> {
|
||||
pub beacon_proposer_cache: Arc<Mutex<BeaconProposerCache>>,
|
||||
/// Caches a map of `validator_index -> validator_pubkey`.
|
||||
pub(crate) validator_pubkey_cache: RwLock<ValidatorPubkeyCache<T>>,
|
||||
/// A cache used when producing attestations.
|
||||
pub(crate) attester_cache: Arc<AttesterCache>,
|
||||
/// A cache used when producing attestations whilst the head block is still being imported.
|
||||
pub early_attester_cache: EarlyAttesterCache<T::EthSpec>,
|
||||
/// A cache used to keep track of various block timings.
|
||||
@@ -1846,6 +1844,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
/// ## Errors
|
||||
///
|
||||
/// May return an error if the `request_slot` is too far behind the head state.
|
||||
#[instrument(name = SPAN_PRODUCE_UNAGGREGATED_ATTESTATION, skip_all, fields(%request_slot, %request_index), level = "debug")]
|
||||
pub fn produce_unaggregated_attestation(
|
||||
&self,
|
||||
request_slot: Slot,
|
||||
@@ -1889,19 +1888,17 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
* the head-lock is not desirable.
|
||||
*/
|
||||
|
||||
let head_state_slot;
|
||||
let beacon_block_root;
|
||||
let beacon_state_root;
|
||||
let target;
|
||||
let current_epoch_attesting_info: Option<(Checkpoint, usize)>;
|
||||
let attester_cache_key;
|
||||
let head_timer = metrics::start_timer(&metrics::ATTESTATION_PRODUCTION_HEAD_SCRAPE_SECONDS);
|
||||
let head_span = debug_span!("attestation_production_head_scrape").entered();
|
||||
// The following braces are to prevent the `cached_head` Arc from being held for longer than
|
||||
// required. It also helps reduce the diff for a very large PR (#3244).
|
||||
{
|
||||
let head = self.head_snapshot();
|
||||
let head_state = &head.beacon_state;
|
||||
head_state_slot = head_state.slot();
|
||||
|
||||
// There is no value in producing an attestation to a block that is pre-finalization and
|
||||
// it is likely to cause expensive and pointless reads to the freezer database. Exit
|
||||
@@ -1969,12 +1966,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
// to determine the justified checkpoint and committee length.
|
||||
None
|
||||
};
|
||||
|
||||
// Determine the key for `self.attester_cache`, in case it is required later in this
|
||||
// routine.
|
||||
attester_cache_key =
|
||||
AttesterCacheKey::new(request_epoch, head_state, beacon_block_root)?;
|
||||
}
|
||||
drop(head_span);
|
||||
drop(head_timer);
|
||||
|
||||
// Only attest to a block if it is fully verified (i.e. not optimistic or invalid).
|
||||
@@ -1997,47 +1990,38 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
* Phase 2/2:
|
||||
*
|
||||
* If the justified checkpoint and committee length from the head are suitable for this
|
||||
* attestation, use them. If not, try the attester cache. If the cache misses, load a state
|
||||
* from disk and prime the cache with it.
|
||||
* attestation, use them. If not, use the database, which will hit the state cache.
|
||||
*/
|
||||
|
||||
let cache_timer =
|
||||
metrics::start_timer(&metrics::ATTESTATION_PRODUCTION_CACHE_INTERACTION_SECONDS);
|
||||
let (justified_checkpoint, committee_len) =
|
||||
if let Some((justified_checkpoint, committee_len)) = current_epoch_attesting_info {
|
||||
// The head state is in the same epoch as the attestation, so there is no more
|
||||
// required information.
|
||||
(justified_checkpoint, committee_len)
|
||||
} else if let Some(cached_values) = self.attester_cache.get::<T::EthSpec>(
|
||||
&attester_cache_key,
|
||||
request_slot,
|
||||
request_index,
|
||||
&self.spec,
|
||||
)? {
|
||||
// The suitable values were already cached. Return them.
|
||||
cached_values
|
||||
} else {
|
||||
debug!(
|
||||
?beacon_block_root,
|
||||
%head_state_slot,
|
||||
%request_slot,
|
||||
"Attester cache miss"
|
||||
);
|
||||
let (advanced_state_root, mut state) = self
|
||||
.store
|
||||
.get_advanced_hot_state(beacon_block_root, request_slot, beacon_state_root)?
|
||||
.ok_or(Error::MissingBeaconState(beacon_state_root))?;
|
||||
if state.current_epoch() < request_epoch {
|
||||
partial_state_advance(
|
||||
&mut state,
|
||||
Some(advanced_state_root),
|
||||
request_epoch.start_slot(T::EthSpec::slots_per_epoch()),
|
||||
&self.spec,
|
||||
)
|
||||
.map_err(Error::StateAdvanceError)?;
|
||||
|
||||
// Neither the head state, nor the attester cache was able to produce the required
|
||||
// information to attest in this epoch. So, load a `BeaconState` from disk and use
|
||||
// it to fulfil the request (and prime the cache to avoid this next time).
|
||||
let _cache_build_timer =
|
||||
metrics::start_timer(&metrics::ATTESTATION_PRODUCTION_CACHE_PRIME_SECONDS);
|
||||
self.attester_cache.load_and_cache_state(
|
||||
beacon_state_root,
|
||||
attester_cache_key,
|
||||
request_slot,
|
||||
request_index,
|
||||
self,
|
||||
)?
|
||||
state.build_committee_cache(RelativeEpoch::Current, &self.spec)?;
|
||||
}
|
||||
|
||||
(
|
||||
state.current_justified_checkpoint(),
|
||||
state
|
||||
.get_beacon_committee(request_slot, request_index)?
|
||||
.committee
|
||||
.len(),
|
||||
)
|
||||
};
|
||||
drop(cache_timer);
|
||||
|
||||
Ok(Attestation::<T::EthSpec>::empty_for_signing(
|
||||
request_index,
|
||||
@@ -3844,18 +3828,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}
|
||||
};
|
||||
|
||||
// Apply the state to the attester cache, only if it is from the previous epoch or later.
|
||||
//
|
||||
// In a perfect scenario there should be no need to add previous-epoch states to the cache.
|
||||
// However, latency between the VC and the BN might cause the VC to produce attestations at
|
||||
// a previous slot.
|
||||
if state.current_epoch().saturating_add(1_u64) >= current_epoch {
|
||||
let _attester_span = debug_span!("attester_cache_update").entered();
|
||||
self.attester_cache
|
||||
.maybe_cache_state(&state, block_root, &self.spec)
|
||||
.map_err(BeaconChainError::from)?;
|
||||
}
|
||||
|
||||
// Take an upgradable read lock on fork choice so we can check if this block has already
|
||||
// been imported. We don't want to repeat work importing a block that is already imported.
|
||||
let fork_choice_reader = self.canonical_head.fork_choice_upgradable_read_lock();
|
||||
@@ -3916,7 +3888,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
&signed_block,
|
||||
proto_block,
|
||||
&state,
|
||||
&self.spec,
|
||||
) {
|
||||
warn!(
|
||||
error = ?e,
|
||||
|
||||
@@ -1029,7 +1029,6 @@ where
|
||||
block_times_cache: <_>::default(),
|
||||
pre_finalization_block_cache: <_>::default(),
|
||||
validator_pubkey_cache: RwLock::new(validator_pubkey_cache),
|
||||
attester_cache: <_>::default(),
|
||||
early_attester_cache: <_>::default(),
|
||||
light_client_server_cache: LightClientServerCache::new(),
|
||||
light_client_server_tx: self.light_client_server_tx,
|
||||
@@ -1061,16 +1060,6 @@ where
|
||||
|
||||
let head = beacon_chain.head_snapshot();
|
||||
|
||||
// Prime the attester cache with the head state.
|
||||
beacon_chain
|
||||
.attester_cache
|
||||
.maybe_cache_state(
|
||||
&head.beacon_state,
|
||||
head.beacon_block_root,
|
||||
&beacon_chain.spec,
|
||||
)
|
||||
.map_err(|e| format!("Failed to prime attester cache: {:?}", e))?;
|
||||
|
||||
// Only perform the check if it was configured.
|
||||
if let Some(wss_checkpoint) = beacon_chain.config.weak_subjectivity_checkpoint
|
||||
&& let Err(e) = beacon_chain.verify_weak_subjectivity_checkpoint(
|
||||
|
||||
@@ -958,9 +958,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
.start_slot(T::EthSpec::slots_per_epoch()),
|
||||
);
|
||||
|
||||
self.attester_cache
|
||||
.prune_below(new_view.finalized_checkpoint.epoch);
|
||||
|
||||
if let Some(event_handler) = self.event_handler.as_ref()
|
||||
&& event_handler.has_finalized_subscribers()
|
||||
{
|
||||
|
||||
@@ -1,13 +1,79 @@
|
||||
use crate::data_availability_checker::{AvailableBlock, AvailableBlockData};
|
||||
use crate::{
|
||||
attester_cache::{CommitteeLengths, Error},
|
||||
metrics,
|
||||
};
|
||||
use crate::{BeaconChainError as Error, metrics};
|
||||
use parking_lot::RwLock;
|
||||
use proto_array::Block as ProtoBlock;
|
||||
use std::sync::Arc;
|
||||
use tracing::instrument;
|
||||
use types::*;
|
||||
|
||||
/// Stores the minimal amount of data required to compute the committee length for any committee at any
|
||||
/// slot in a given `epoch`.
|
||||
pub struct CommitteeLengths {
|
||||
/// The `epoch` to which the lengths pertain.
|
||||
epoch: Epoch,
|
||||
/// The length of the shuffling in `self.epoch`.
|
||||
active_validator_indices_len: usize,
|
||||
}
|
||||
|
||||
impl CommitteeLengths {
|
||||
/// Instantiate `Self` using `state.current_epoch()`.
|
||||
pub fn new<E: EthSpec>(state: &BeaconState<E>) -> Result<Self, Error> {
|
||||
let active_validator_indices_len = state
|
||||
.committee_cache(RelativeEpoch::Current)?
|
||||
.active_validator_indices()
|
||||
.len();
|
||||
|
||||
Ok(Self {
|
||||
epoch: state.current_epoch(),
|
||||
active_validator_indices_len,
|
||||
})
|
||||
}
|
||||
|
||||
/// Get the count of committees per each slot of `self.epoch`.
|
||||
pub fn get_committee_count_per_slot<E: EthSpec>(
|
||||
&self,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<usize, Error> {
|
||||
E::get_committee_count_per_slot(self.active_validator_indices_len, spec).map_err(Into::into)
|
||||
}
|
||||
|
||||
/// Get the length of the committee at the given `slot` and `committee_index`.
|
||||
pub fn get_committee_length<E: EthSpec>(
|
||||
&self,
|
||||
slot: Slot,
|
||||
committee_index: CommitteeIndex,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<usize, Error> {
|
||||
let slots_per_epoch = E::slots_per_epoch();
|
||||
let request_epoch = slot.epoch(slots_per_epoch);
|
||||
|
||||
// Sanity check.
|
||||
if request_epoch != self.epoch {
|
||||
return Err(Error::EarlyAttesterCacheError);
|
||||
}
|
||||
|
||||
let slots_per_epoch = slots_per_epoch as usize;
|
||||
let committees_per_slot = self.get_committee_count_per_slot::<E>(spec)?;
|
||||
let index_in_epoch = compute_committee_index_in_epoch(
|
||||
slot,
|
||||
slots_per_epoch,
|
||||
committees_per_slot,
|
||||
committee_index as usize,
|
||||
);
|
||||
let range = compute_committee_range_in_epoch(
|
||||
epoch_committee_count(committees_per_slot, slots_per_epoch),
|
||||
index_in_epoch,
|
||||
self.active_validator_indices_len,
|
||||
)
|
||||
.ok_or(Error::EarlyAttesterCacheError)?;
|
||||
|
||||
range
|
||||
.end
|
||||
.checked_sub(range.start)
|
||||
.ok_or(Error::EarlyAttesterCacheError)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CacheItem<E: EthSpec> {
|
||||
/*
|
||||
* Values used to create attestations.
|
||||
@@ -55,10 +121,9 @@ impl<E: EthSpec> EarlyAttesterCache<E> {
|
||||
block: &AvailableBlock<E>,
|
||||
proto_block: ProtoBlock,
|
||||
state: &BeaconState<E>,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<(), Error> {
|
||||
let epoch = state.current_epoch();
|
||||
let committee_lengths = CommitteeLengths::new(state, spec)?;
|
||||
let committee_lengths = CommitteeLengths::new(state)?;
|
||||
let source = state.current_justified_checkpoint();
|
||||
let target_slot = epoch.start_slot(E::slots_per_epoch());
|
||||
let target = Checkpoint {
|
||||
@@ -98,6 +163,7 @@ impl<E: EthSpec> EarlyAttesterCache<E> {
|
||||
/// - There is a cache `item` present.
|
||||
/// - If `request_slot` is in the same epoch as `item.epoch`.
|
||||
/// - If `request_index` does not exceed `item.committee_count`.
|
||||
#[instrument(skip_all, fields(%request_slot, %request_index), level = "debug")]
|
||||
pub fn try_attest(
|
||||
&self,
|
||||
request_slot: Slot,
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use crate::attester_cache::Error as AttesterCacheError;
|
||||
use crate::beacon_block_streamer::Error as BlockStreamerError;
|
||||
use crate::beacon_chain::ForkChoiceError;
|
||||
use crate::beacon_fork_choice_store::Error as ForkChoiceStoreError;
|
||||
@@ -99,7 +98,7 @@ pub enum BeaconChainError {
|
||||
ObservedAttestersError(ObservedAttestersError),
|
||||
ObservedBlockProducersError(ObservedBlockProducersError),
|
||||
ObservedDataSidecarsError(ObservedDataSidecarsError),
|
||||
AttesterCacheError(AttesterCacheError),
|
||||
EarlyAttesterCacheError,
|
||||
PruningError(PruningError),
|
||||
ArithError(ArithError),
|
||||
InvalidShufflingId {
|
||||
@@ -266,7 +265,6 @@ easy_from_to!(ObservedAttestationsError, BeaconChainError);
|
||||
easy_from_to!(ObservedAttestersError, BeaconChainError);
|
||||
easy_from_to!(ObservedBlockProducersError, BeaconChainError);
|
||||
easy_from_to!(ObservedDataSidecarsError, BeaconChainError);
|
||||
easy_from_to!(AttesterCacheError, BeaconChainError);
|
||||
easy_from_to!(BlockSignatureVerifierError, BeaconChainError);
|
||||
easy_from_to!(PruningError, BeaconChainError);
|
||||
easy_from_to!(ArithError, BeaconChainError);
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
pub mod attestation_rewards;
|
||||
pub mod attestation_simulator;
|
||||
pub mod attestation_verification;
|
||||
mod attester_cache;
|
||||
pub mod beacon_block_reward;
|
||||
mod beacon_block_streamer;
|
||||
mod beacon_chain;
|
||||
|
||||
@@ -482,20 +482,6 @@ pub static ATTESTATION_PRODUCTION_HEAD_SCRAPE_SECONDS: LazyLock<Result<Histogram
|
||||
"Time taken to read the head state",
|
||||
)
|
||||
});
|
||||
pub static ATTESTATION_PRODUCTION_CACHE_INTERACTION_SECONDS: LazyLock<Result<Histogram>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"attestation_production_cache_interaction_seconds",
|
||||
"Time spent interacting with the attester cache",
|
||||
)
|
||||
});
|
||||
pub static ATTESTATION_PRODUCTION_CACHE_PRIME_SECONDS: LazyLock<Result<Histogram>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"attestation_production_cache_prime_seconds",
|
||||
"Time spent loading a new state from the disk due to a cache miss",
|
||||
)
|
||||
});
|
||||
|
||||
/*
|
||||
* Fork Choice
|
||||
|
||||
@@ -409,12 +409,6 @@ fn advance_head<T: BeaconChainTypes>(beacon_chain: &Arc<BeaconChain<T>>) -> Resu
|
||||
);
|
||||
}
|
||||
|
||||
// Apply the state to the attester cache, if the cache deems it interesting.
|
||||
beacon_chain
|
||||
.attester_cache
|
||||
.maybe_cache_state(&state, head_block_root, &beacon_chain.spec)
|
||||
.map_err(BeaconChainError::from)?;
|
||||
|
||||
let final_slot = state.slot();
|
||||
|
||||
// If we have moved into the next slot whilst processing the state then this function is going
|
||||
|
||||
@@ -239,13 +239,7 @@ async fn produces_attestations() {
|
||||
.unwrap();
|
||||
chain
|
||||
.early_attester_cache
|
||||
.add_head_block(
|
||||
block_root,
|
||||
&available_block,
|
||||
proto_block,
|
||||
&state,
|
||||
&chain.spec,
|
||||
)
|
||||
.add_head_block(block_root, &available_block, proto_block, &state)
|
||||
.unwrap();
|
||||
chain
|
||||
.early_attester_cache
|
||||
@@ -312,7 +306,6 @@ async fn early_attester_cache_old_request() {
|
||||
&available_block,
|
||||
head_proto_block,
|
||||
&head.beacon_state,
|
||||
&harness.chain.spec,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -8,6 +8,9 @@ pub const SPAN_PRODUCE_BLOCK_V2: &str = "produce_block_v2";
|
||||
pub const SPAN_PRODUCE_BLOCK_V3: &str = "produce_block_v3";
|
||||
pub const SPAN_PUBLISH_BLOCK: &str = "publish_block";
|
||||
|
||||
/// Root span names for attestation production
|
||||
pub const SPAN_PRODUCE_UNAGGREGATED_ATTESTATION: &str = "produce_unaggregated_attestation";
|
||||
|
||||
/// Data Availability checker span identifiers
|
||||
pub const SPAN_PENDING_COMPONENTS: &str = "pending_components";
|
||||
|
||||
|
||||
Reference in New Issue
Block a user