mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-29 20:27:14 +00:00
Merge branch 'payload-attestation-committee-cache' into fc-compliance
This commit is contained in:
@@ -77,9 +77,7 @@ use crate::persisted_custody::persist_custody_context;
|
||||
use crate::persisted_fork_choice::PersistedForkChoice;
|
||||
use crate::pre_finalization_cache::PreFinalizationBlockCache;
|
||||
use crate::proposer_preferences_verification::proposer_preference_cache::GossipVerifiedProposerPreferenceCache;
|
||||
use crate::shuffling_cache::{
|
||||
CachedShuffling, ShufflingCache, get_ptcs_for_shuffling_epoch, with_cached_shuffling,
|
||||
};
|
||||
use crate::shuffling_cache::{CachedPTCs, CachedShuffling, ShufflingCache, with_cached_shuffling};
|
||||
use crate::sync_committee_verification::{
|
||||
Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution,
|
||||
};
|
||||
@@ -4927,11 +4925,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
if !shuffling_is_cached {
|
||||
state.build_committee_cache(relative_epoch, &self.spec)?;
|
||||
let committee_cache = state.committee_cache(relative_epoch)?;
|
||||
let ptcs = get_ptcs_for_shuffling_epoch(state, shuffling_epoch, &self.spec)?;
|
||||
let ptcs = CachedPTCs::from_state(state, shuffling_epoch, &self.spec)?;
|
||||
let cached_shuffling = CachedShuffling::new(committee_cache.clone(), ptcs);
|
||||
self.shuffling_cache
|
||||
.write()
|
||||
.insert_committee_cache_with_ptcs(shuffling_id, cached_shuffling, &self.spec)?;
|
||||
.insert_committee_cache(shuffling_id, cached_shuffling)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -254,6 +254,13 @@ pub enum BeaconChainError {
|
||||
request_epoch: Epoch,
|
||||
cache_epoch: Epoch,
|
||||
},
|
||||
AttesterCachePtcOutOfBounds {
|
||||
slot: Slot,
|
||||
epoch: Epoch,
|
||||
},
|
||||
AttesterCacheNoPtcPreGloas {
|
||||
slot: Slot,
|
||||
},
|
||||
SkipProposerPreparation,
|
||||
FailedColumnCustodyInfoUpdate,
|
||||
}
|
||||
|
||||
@@ -87,9 +87,8 @@ impl<T: BeaconChainTypes> VerifiedPayloadAttestationMessage<T> {
|
||||
ctx.spec,
|
||||
beacon_block_root,
|
||||
message_epoch,
|
||||
|cached_shuffling, _| Ok::<_, Error>(cached_shuffling.ptc_for_slot(slot).cloned()),
|
||||
)?
|
||||
.ok_or(Error::MissingPTC { slot })?;
|
||||
|cached_shuffling, _| cached_shuffling.ptc_for_slot(slot),
|
||||
)?;
|
||||
|
||||
// [REJECT] `validator_index` is within `get_ptc(state, data.slot)`.
|
||||
if !ptc.0.contains(&(validator_index as usize)) {
|
||||
|
||||
@@ -66,12 +66,6 @@ pub enum Error {
|
||||
///
|
||||
/// The peer has sent an invalid message.
|
||||
NotInPTC { validator_index: u64, slot: Slot },
|
||||
/// The shuffling cache entry did not contain a PTC for this slot.
|
||||
///
|
||||
/// ## Peer scoring
|
||||
///
|
||||
/// We were unable to process this message due to an internal error.
|
||||
MissingPTC { slot: Slot },
|
||||
/// The validator index is unknown.
|
||||
///
|
||||
/// ## Peer scoring
|
||||
|
||||
@@ -7,8 +7,8 @@ use parking_lot::RwLock;
|
||||
use state_processing::state_advance::partial_state_advance;
|
||||
use tracing::debug;
|
||||
use types::{
|
||||
AttestationShufflingId, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Hash256, PTC,
|
||||
RelativeEpoch, Slot, state::CommitteeCache,
|
||||
AttestationShufflingId, BeaconState, ChainSpec, Epoch, EthSpec, Hash256, PTC, RelativeEpoch,
|
||||
Slot, state::CommitteeCache,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
@@ -17,9 +17,9 @@ use crate::{
|
||||
|
||||
/// The size of the cache that stores shufflings for quicker verification.
|
||||
///
|
||||
/// Each entry should be around `8 + 800,000 + 4,096 = 804,104` bytes in size with 100k validators
|
||||
/// Each entry should be around `8 * 2M + 128KB ~= 16 MB` in size with 2M validators
|
||||
/// and 32 512-validator PTCs. Therefore, this cache should be approx
|
||||
/// `16 * (8 + 800,000 + 131,072) = 14.9 MB`. (Note: this ignores a few extra bytes in the
|
||||
/// `16 * 16 MB ~= 256 MB`. (Note: this ignores a few extra bytes in the
|
||||
/// caches that should be insignificant compared to the indices).
|
||||
pub const DEFAULT_CACHE_SIZE: usize = 16;
|
||||
|
||||
@@ -37,32 +37,53 @@ const MAX_CONCURRENT_PROMISES: usize = 2;
|
||||
#[derive(Clone)]
|
||||
pub struct CachedShuffling<E: EthSpec> {
|
||||
pub committee_cache: Arc<CommitteeCache>,
|
||||
pub ptcs: Option<Vec<PTC<E>>>,
|
||||
pub ptcs: CachedPTCs<E>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum CachedPTCs<E: EthSpec> {
|
||||
PreGloas,
|
||||
PostGloas(Vec<PTC<E>>, Epoch),
|
||||
}
|
||||
|
||||
impl<E: EthSpec> CachedPTCs<E> {
|
||||
pub fn from_state(
|
||||
state: &BeaconState<E>,
|
||||
epoch: Epoch,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<Self, BeaconChainError> {
|
||||
if shuffling_requires_ptcs(epoch, spec) {
|
||||
let ptcs = epoch
|
||||
.slot_iter(E::slots_per_epoch())
|
||||
.map(|slot| state.get_ptc(slot, spec))
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
Ok(Self::PostGloas(ptcs, epoch))
|
||||
} else {
|
||||
Ok(Self::PreGloas)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec> CachedShuffling<E> {
|
||||
pub fn new(committee_cache: Arc<CommitteeCache>, ptcs: Option<Vec<PTC<E>>>) -> Self {
|
||||
pub fn new(committee_cache: Arc<CommitteeCache>, ptcs: CachedPTCs<E>) -> Self {
|
||||
Self {
|
||||
committee_cache,
|
||||
ptcs,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ptc_for_slot(&self, slot: Slot) -> Option<&PTC<E>> {
|
||||
self.ptcs
|
||||
.as_ref()?
|
||||
.get(slot.as_usize() % E::slots_per_epoch() as usize)
|
||||
}
|
||||
|
||||
fn ensure_ptcs_for_gloas_shuffling(
|
||||
&self,
|
||||
shuffling_epoch: Epoch,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<(), BeaconChainError> {
|
||||
if shuffling_requires_ptcs(shuffling_epoch, spec) && self.ptcs.is_none() {
|
||||
Err(BeaconChainError::MissingPtcForGloasShuffling { shuffling_epoch })
|
||||
} else {
|
||||
Ok(())
|
||||
pub fn ptc_for_slot(&self, slot: Slot) -> Result<PTC<E>, BeaconChainError> {
|
||||
match &self.ptcs {
|
||||
CachedPTCs::PreGloas => Err(BeaconChainError::AttesterCacheNoPtcPreGloas { slot }),
|
||||
&CachedPTCs::PostGloas(ref ptcs, epoch) => {
|
||||
if slot.epoch(E::slots_per_epoch()) != epoch {
|
||||
Err(BeaconChainError::AttesterCachePtcOutOfBounds { slot, epoch })
|
||||
} else {
|
||||
ptcs.get(slot.as_usize() % E::slots_per_epoch() as usize)
|
||||
.cloned()
|
||||
.ok_or(BeaconChainError::AttesterCachePtcOutOfBounds { slot, epoch })
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -173,7 +194,9 @@ impl<E: EthSpec> ShufflingCache<E> {
|
||||
self.cache.iter().all(|(key, item)| {
|
||||
if shuffling_requires_ptcs(key.shuffling_epoch, spec) {
|
||||
match item {
|
||||
CacheItem::Committee(cached_shuffling) => cached_shuffling.ptcs.is_some(),
|
||||
CacheItem::Committee(cached_shuffling) => {
|
||||
matches!(cached_shuffling.ptcs, CachedPTCs::PostGloas(..))
|
||||
}
|
||||
CacheItem::Promise(_) => true,
|
||||
}
|
||||
} else {
|
||||
@@ -182,30 +205,14 @@ impl<E: EthSpec> ShufflingCache<E> {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn insert_committee_cache<C: ToArcCommitteeCache>(
|
||||
&mut self,
|
||||
key: AttestationShufflingId,
|
||||
committee_cache: &C,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<(), BeaconChainError> {
|
||||
self.insert_committee_cache_with_ptcs(
|
||||
key,
|
||||
CachedShuffling::new(committee_cache.to_arc_committee_cache(), None),
|
||||
spec,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn insert_committee_cache_with_ptcs(
|
||||
pub fn insert_committee_cache(
|
||||
&mut self,
|
||||
key: AttestationShufflingId,
|
||||
cached_shuffling: CachedShuffling<E>,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<(), BeaconChainError> {
|
||||
cached_shuffling.ensure_ptcs_for_gloas_shuffling(key.shuffling_epoch, spec)?;
|
||||
|
||||
match self.cache.get(&key) {
|
||||
Some(CacheItem::Committee(existing)) => {
|
||||
existing.ensure_ptcs_for_gloas_shuffling(key.shuffling_epoch, spec)?;
|
||||
Some(CacheItem::Committee(_)) => {
|
||||
// Calculation is deterministic, so no need to replace the existing entry.
|
||||
}
|
||||
// Replace the committee if it's not present or if it's a promise. A bird in the hand is
|
||||
// worth two in the promise-bush!
|
||||
@@ -324,7 +331,6 @@ where
|
||||
drop(shuffling_cache);
|
||||
|
||||
let cached_shuffling = cache_item.wait()?;
|
||||
cached_shuffling.ensure_ptcs_for_gloas_shuffling(shuffling_epoch, spec)?;
|
||||
map_fn(&cached_shuffling, shuffling_id.shuffling_decision_block)
|
||||
} else {
|
||||
// Create an entry in the cache that "promises" this value will eventually be computed.
|
||||
@@ -395,8 +401,6 @@ where
|
||||
let (mut state, state_root) = if let Some((state, state_root)) = head_state_opt {
|
||||
(state, state_root)
|
||||
} else {
|
||||
// We assume that the `Pending` state has the same shufflings as a `Full` state for the
|
||||
// same block. Analysis: https://hackmd.io/@dapplion/gloas_dependant_root
|
||||
let (state_root, state) = store
|
||||
.get_advanced_hot_state(head_block_root, target_slot, head_block.state_root)
|
||||
.map_err(BeaconChainError::DBError)?
|
||||
@@ -434,14 +438,13 @@ where
|
||||
.committee_cache(relative_epoch)
|
||||
.map_err(BeaconChainError::from)?
|
||||
.clone();
|
||||
let ptcs = get_ptcs_for_shuffling_epoch(&state, shuffling_epoch, spec)
|
||||
.map_err(BeaconChainError::from)?;
|
||||
let ptcs = CachedPTCs::from_state(&state, shuffling_epoch, spec)?;
|
||||
let shuffling_decision_block = shuffling_id.shuffling_decision_block;
|
||||
let cached_shuffling = CachedShuffling::new(committee_cache, ptcs);
|
||||
|
||||
shuffling_cache_lock
|
||||
.write()
|
||||
.insert_committee_cache_with_ptcs(shuffling_id, cached_shuffling.clone(), spec)?;
|
||||
.insert_committee_cache(shuffling_id, cached_shuffling.clone())?;
|
||||
|
||||
metrics::stop_timer(committee_building_timer);
|
||||
|
||||
@@ -450,39 +453,6 @@ where
|
||||
map_fn(&cached_shuffling, shuffling_decision_block)
|
||||
}
|
||||
}
|
||||
/// Return the PTCs associated with each slot in `shuffling_epoch`, when the state supports PTCs.
|
||||
pub fn get_ptcs_for_shuffling_epoch<E: EthSpec>(
|
||||
state: &BeaconState<E>,
|
||||
shuffling_epoch: Epoch,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<Option<Vec<PTC<E>>>, BeaconStateError> {
|
||||
if shuffling_requires_ptcs(shuffling_epoch, spec) {
|
||||
shuffling_epoch
|
||||
.slot_iter(E::slots_per_epoch())
|
||||
.map(|slot| state.get_ptc(slot, spec))
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map(Some)
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// A helper trait to allow lazy-cloning of the committee cache when inserting into the cache.
|
||||
pub trait ToArcCommitteeCache {
|
||||
fn to_arc_committee_cache(&self) -> Arc<CommitteeCache>;
|
||||
}
|
||||
|
||||
impl ToArcCommitteeCache for CommitteeCache {
|
||||
fn to_arc_committee_cache(&self) -> Arc<CommitteeCache> {
|
||||
Arc::new(self.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl ToArcCommitteeCache for Arc<CommitteeCache> {
|
||||
fn to_arc_committee_cache(&self) -> Arc<CommitteeCache> {
|
||||
self.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Contains the shuffling IDs for a beacon block.
|
||||
#[derive(Clone)]
|
||||
@@ -573,14 +543,8 @@ mod test {
|
||||
ShufflingCache::new(TEST_CACHE_SIZE, head_shuffling_ids)
|
||||
}
|
||||
|
||||
fn test_spec() -> ChainSpec {
|
||||
// Use a Fulu spec specifically because behaviour changes at Gloas.
|
||||
// The Gloas tests explicitly enable Gloas.
|
||||
ForkName::Fulu.make_genesis_spec(E::default_spec())
|
||||
}
|
||||
|
||||
fn cached_shuffling(committee_cache: Arc<CommitteeCache>) -> CachedShuffling<E> {
|
||||
CachedShuffling::new(committee_cache, None)
|
||||
CachedShuffling::new(committee_cache, CachedPTCs::PreGloas)
|
||||
}
|
||||
|
||||
/// Returns two different committee caches for testing.
|
||||
@@ -748,11 +712,10 @@ mod test {
|
||||
#[test]
|
||||
fn should_insert_committee_cache() {
|
||||
let mut cache = new_shuffling_cache();
|
||||
let spec = test_spec();
|
||||
let id_a = shuffling_id(1);
|
||||
let committee_cache_a = Arc::new(CommitteeCache::default());
|
||||
cache
|
||||
.insert_committee_cache(id_a.clone(), &committee_cache_a, &spec)
|
||||
.insert_committee_cache(id_a.clone(), cached_shuffling(committee_cache_a.clone()))
|
||||
.unwrap();
|
||||
assert!(
|
||||
matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_cache_a),
|
||||
@@ -760,34 +723,19 @@ mod test {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_reject_gloas_committee_cache_without_ptc() {
|
||||
let mut cache = new_shuffling_cache();
|
||||
let spec = ForkName::Gloas.make_genesis_spec(E::default_spec());
|
||||
let id = shuffling_id(1);
|
||||
let committee_cache = Arc::new(CommitteeCache::default());
|
||||
|
||||
let result = cache.insert_committee_cache(id.clone(), &committee_cache, &spec);
|
||||
|
||||
assert!(matches!(
|
||||
result,
|
||||
Err(BeaconChainError::MissingPtcForGloasShuffling { shuffling_epoch })
|
||||
if shuffling_epoch == id.shuffling_epoch
|
||||
));
|
||||
assert!(!cache.contains(&id), "should not insert invalid cache");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_prune_committee_cache_with_lowest_epoch() {
|
||||
let mut cache = new_shuffling_cache();
|
||||
let spec = test_spec();
|
||||
let shuffling_id_and_committee_caches = (0..(TEST_CACHE_SIZE + 1))
|
||||
.map(|i| (shuffling_id(i as u64), Arc::new(CommitteeCache::default())))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for (shuffling_id, committee_cache) in shuffling_id_and_committee_caches.iter() {
|
||||
cache
|
||||
.insert_committee_cache(shuffling_id.clone(), committee_cache, &spec)
|
||||
.insert_committee_cache(
|
||||
shuffling_id.clone(),
|
||||
cached_shuffling(committee_cache.clone()),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
@@ -812,7 +760,6 @@ mod test {
|
||||
#[test]
|
||||
fn should_retain_head_state_shufflings() {
|
||||
let mut cache = new_shuffling_cache();
|
||||
let spec = test_spec();
|
||||
let current_epoch = 10;
|
||||
let committee_cache = Arc::new(CommitteeCache::default());
|
||||
|
||||
@@ -823,7 +770,7 @@ mod test {
|
||||
shuffling_decision_block: Hash256::from_low_u64_be(current_epoch + i as u64),
|
||||
};
|
||||
cache
|
||||
.insert_committee_cache(shuffling_id, &committee_cache, &spec)
|
||||
.insert_committee_cache(shuffling_id, cached_shuffling(committee_cache.clone()))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
@@ -838,16 +785,21 @@ mod test {
|
||||
|
||||
// Insert head state shuffling ids. Should not be overridden by other shuffling ids.
|
||||
cache
|
||||
.insert_committee_cache(head_shuffling_ids.current.clone(), &committee_cache, &spec)
|
||||
.insert_committee_cache(
|
||||
head_shuffling_ids.current.clone(),
|
||||
cached_shuffling(committee_cache.clone()),
|
||||
)
|
||||
.unwrap();
|
||||
cache
|
||||
.insert_committee_cache(head_shuffling_ids.next.clone(), &committee_cache, &spec)
|
||||
.insert_committee_cache(
|
||||
head_shuffling_ids.next.clone(),
|
||||
cached_shuffling(committee_cache.clone()),
|
||||
)
|
||||
.unwrap();
|
||||
cache
|
||||
.insert_committee_cache(
|
||||
head_shuffling_ids.previous.clone().unwrap(),
|
||||
&committee_cache,
|
||||
&spec,
|
||||
cached_shuffling(committee_cache.clone()),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -858,7 +810,7 @@ mod test {
|
||||
shuffling_decision_block: Hash256::from_low_u64_be(i as u64),
|
||||
};
|
||||
cache
|
||||
.insert_committee_cache(shuffling_id, &committee_cache, &spec)
|
||||
.insert_committee_cache(shuffling_id, cached_shuffling(committee_cache.clone()))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
|
||||
@@ -13,11 +13,11 @@
|
||||
//! 1. We are required to store an additional `BeaconState` for the head block. This consumes
|
||||
//! memory.
|
||||
//! 2. There's a possibility that the head block is never built upon, causing wasted CPU cycles.
|
||||
use crate::shuffling_cache::CachedPTCs;
|
||||
use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS;
|
||||
use crate::{
|
||||
BeaconChain, BeaconChainError, BeaconChainTypes,
|
||||
chain_config::FORK_CHOICE_LOOKAHEAD_FACTOR,
|
||||
shuffling_cache::{CachedShuffling, get_ptcs_for_shuffling_epoch},
|
||||
BeaconChain, BeaconChainError, BeaconChainTypes, chain_config::FORK_CHOICE_LOOKAHEAD_FACTOR,
|
||||
shuffling_cache::CachedShuffling,
|
||||
};
|
||||
use slot_clock::SlotClock;
|
||||
use state_processing::per_slot_processing;
|
||||
@@ -410,17 +410,12 @@ fn advance_head<T: BeaconChainTypes>(beacon_chain: &Arc<BeaconChain<T>>) -> Resu
|
||||
"Skipping priming of attester cache for Gloas boundary epoch"
|
||||
);
|
||||
} else {
|
||||
let ptcs = get_ptcs_for_shuffling_epoch(&state, shuffling_epoch, &beacon_chain.spec)
|
||||
.map_err(BeaconChainError::from)?;
|
||||
let ptcs = CachedPTCs::from_state(&state, shuffling_epoch, &beacon_chain.spec)?;
|
||||
let cached_shuffling = CachedShuffling::new(committee_cache.clone(), ptcs);
|
||||
beacon_chain
|
||||
.shuffling_cache
|
||||
.write()
|
||||
.insert_committee_cache_with_ptcs(
|
||||
shuffling_id.clone(),
|
||||
cached_shuffling,
|
||||
&beacon_chain.spec,
|
||||
)?;
|
||||
.insert_committee_cache(shuffling_id.clone(), cached_shuffling)?;
|
||||
|
||||
debug!(
|
||||
?head_block_root,
|
||||
|
||||
@@ -36,6 +36,7 @@ use rand::SeedableRng;
|
||||
use rand::rngs::{OsRng, StdRng};
|
||||
use slasher::Slasher;
|
||||
use slasher_service::SlasherService;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
@@ -639,6 +640,10 @@ where
|
||||
network_globals: self.network_globals.clone(),
|
||||
beacon_processor_send: Some(beacon_processor_channels.beacon_processor_tx.clone()),
|
||||
sse_logging_components: runtime_context.sse_logging_components.clone(),
|
||||
historical_committee_cache: Arc::new(http_api::HistoricalCommitteeCache::new(
|
||||
NonZeroUsize::new(self.http_api_config.historical_committee_cache_size)
|
||||
.unwrap_or(NonZeroUsize::MIN),
|
||||
)),
|
||||
});
|
||||
|
||||
let exit = runtime_context.executor.exit();
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::StateId;
|
||||
use crate::caches::{HistoricalCommitteeCache, HistoricalShufflingId};
|
||||
use crate::task_spawner::{Priority, TaskSpawner};
|
||||
use crate::utils::ResponseFilter;
|
||||
use crate::validator::pubkey_to_validator_index;
|
||||
@@ -13,7 +14,10 @@ use eth2::types::{
|
||||
};
|
||||
use ssz::Encode;
|
||||
use std::sync::Arc;
|
||||
use types::{AttestationShufflingId, BeaconStateError, CommitteeCache, EthSpec, RelativeEpoch};
|
||||
use types::{
|
||||
AttestationShufflingId, BeaconStateError, CommitteeCache, EthSpec, RelativeEpoch,
|
||||
RelativeEpochError,
|
||||
};
|
||||
use warp::filters::BoxedFilter;
|
||||
use warp::http::Response;
|
||||
use warp::hyper::Body;
|
||||
@@ -26,6 +30,8 @@ type BeaconStatesPath<T> = BoxedFilter<(
|
||||
Arc<BeaconChain<T>>,
|
||||
)>;
|
||||
|
||||
type BeaconStatesCommitteesFilter = BoxedFilter<(Arc<HistoricalCommitteeCache>,)>;
|
||||
|
||||
// GET beacon/states/{state_id}/pending_consolidations
|
||||
pub fn get_beacon_state_pending_consolidations<T: BeaconChainTypes>(
|
||||
beacon_states_path: BeaconStatesPath<T>,
|
||||
@@ -337,17 +343,20 @@ pub fn get_beacon_state_sync_committees<T: BeaconChainTypes>(
|
||||
// GET beacon/states/{state_id}/committees?slot,index,epoch
|
||||
pub fn get_beacon_state_committees<T: BeaconChainTypes>(
|
||||
beacon_states_path: BeaconStatesPath<T>,
|
||||
beacon_states_committees_filter: BeaconStatesCommitteesFilter,
|
||||
) -> ResponseFilter {
|
||||
beacon_states_path
|
||||
.clone()
|
||||
.and(warp::path("committees"))
|
||||
.and(warp::query::<eth2::types::CommitteesQuery>())
|
||||
.and(beacon_states_committees_filter)
|
||||
.and(warp::path::end())
|
||||
.then(
|
||||
|state_id: StateId,
|
||||
task_spawner: TaskSpawner<T::EthSpec>,
|
||||
chain: Arc<BeaconChain<T>>,
|
||||
query: eth2::types::CommitteesQuery| {
|
||||
query: eth2::types::CommitteesQuery,
|
||||
historical_committee_cache: Arc<HistoricalCommitteeCache>| {
|
||||
task_spawner.blocking_json_task(Priority::P1, move || {
|
||||
let (data, execution_optimistic, finalized) = state_id
|
||||
.map_state_and_execution_optimistic_and_finalized(
|
||||
@@ -364,108 +373,75 @@ pub fn get_beacon_state_committees<T: BeaconChainTypes>(
|
||||
let shuffling_id = if let Ok(Some(shuffling_decision_block)) =
|
||||
chain.block_root_at_slot(decision_slot, WhenSlotSkipped::Prev)
|
||||
{
|
||||
Some(AttestationShufflingId {
|
||||
shuffling_epoch: epoch,
|
||||
shuffling_decision_block,
|
||||
})
|
||||
Some(HistoricalShufflingId::ShufflingId(
|
||||
AttestationShufflingId {
|
||||
shuffling_epoch: epoch,
|
||||
shuffling_decision_block,
|
||||
},
|
||||
))
|
||||
} else if epoch < chain.head().finalized_checkpoint().epoch {
|
||||
// Use the case for finalized epochs
|
||||
Some(HistoricalShufflingId::FinalizedEpoch(epoch))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Attempt to read from the chain cache if there exists a
|
||||
// shuffling_id
|
||||
let maybe_cached_shuffling = if let Some(shuffling_id) =
|
||||
shuffling_id.as_ref()
|
||||
{
|
||||
chain
|
||||
.shuffling_cache
|
||||
.try_write_for(std::time::Duration::from_secs(1))
|
||||
.and_then(|mut cache_write| cache_write.get(shuffling_id))
|
||||
.and_then(|cache_item| cache_item.wait().ok())
|
||||
.map(|cached_shuffling| cached_shuffling.committee_cache)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let maybe_cached_shuffling =
|
||||
if let Some(shuffling_id) = shuffling_id.as_ref() {
|
||||
historical_committee_cache.get(shuffling_id)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let committee_cache =
|
||||
if let Some(shuffling) = maybe_cached_shuffling {
|
||||
shuffling
|
||||
} else {
|
||||
let possibly_built_cache =
|
||||
match RelativeEpoch::from_epoch(current_epoch, epoch) {
|
||||
Ok(relative_epoch)
|
||||
if state.committee_cache_is_initialized(
|
||||
relative_epoch,
|
||||
) =>
|
||||
{
|
||||
state.committee_cache(relative_epoch).cloned()
|
||||
}
|
||||
_ => CommitteeCache::initialized(
|
||||
let committee_cache = match RelativeEpoch::from_epoch(
|
||||
current_epoch,
|
||||
epoch,
|
||||
) {
|
||||
Ok(relative_epoch)
|
||||
if state.committee_cache_is_initialized(
|
||||
relative_epoch,
|
||||
) =>
|
||||
{
|
||||
state.committee_cache(relative_epoch).cloned()
|
||||
}
|
||||
Ok(_) | Err(RelativeEpochError::EpochTooLow { .. }) => {
|
||||
CommitteeCache::initialized(
|
||||
state,
|
||||
epoch,
|
||||
&chain.spec,
|
||||
),
|
||||
)
|
||||
}
|
||||
.map_err(
|
||||
|e| match e {
|
||||
BeaconStateError::EpochOutOfBounds => {
|
||||
let max_sprp =
|
||||
T::EthSpec::slots_per_historical_root()
|
||||
as u64;
|
||||
let first_subsequent_restore_point_slot =
|
||||
((epoch.start_slot(
|
||||
T::EthSpec::slots_per_epoch(),
|
||||
) / max_sprp)
|
||||
+ 1)
|
||||
* max_sprp;
|
||||
if epoch < current_epoch {
|
||||
warp_utils::reject::custom_bad_request(
|
||||
format!(
|
||||
"epoch out of bounds, \
|
||||
try state at slot {}",
|
||||
first_subsequent_restore_point_slot,
|
||||
),
|
||||
)
|
||||
} else {
|
||||
warp_utils::reject::custom_bad_request(
|
||||
"epoch out of bounds, \
|
||||
too far in future"
|
||||
.into(),
|
||||
)
|
||||
}
|
||||
}
|
||||
_ => warp_utils::reject::unhandled_error(
|
||||
BeaconChainError::from(e),
|
||||
),
|
||||
},
|
||||
)?;
|
||||
|
||||
// Attempt to write to the beacon cache (only if the cache
|
||||
// size is not the default value).
|
||||
if chain.config.shuffling_cache_size
|
||||
!= beacon_chain::shuffling_cache::DEFAULT_CACHE_SIZE
|
||||
&& let Some(shuffling_id) = shuffling_id
|
||||
&& let Some(mut cache_write) = chain
|
||||
.shuffling_cache
|
||||
.try_write_for(std::time::Duration::from_secs(1))
|
||||
{
|
||||
let decision_block_root =
|
||||
shuffling_id.shuffling_decision_block;
|
||||
if let Err(error) = cache_write.insert_committee_cache(
|
||||
shuffling_id.clone(),
|
||||
&possibly_built_cache,
|
||||
&chain.spec,
|
||||
) {
|
||||
tracing::warn!(
|
||||
%epoch,
|
||||
?decision_block_root,
|
||||
?error,
|
||||
"Priming committee cache failed"
|
||||
);
|
||||
Err(RelativeEpochError::EpochTooHigh { .. }) => {
|
||||
Err(BeaconStateError::EpochOutOfBounds)
|
||||
}
|
||||
Err(RelativeEpochError::ArithError(e)) => {
|
||||
Err(BeaconStateError::ArithError(e))
|
||||
}
|
||||
}
|
||||
.map_err(|e| match e {
|
||||
BeaconStateError::EpochOutOfBounds => {
|
||||
warp_utils::reject::custom_bad_request(format!(
|
||||
"epoch {} out of bounds for state at {}",
|
||||
epoch, current_epoch
|
||||
))
|
||||
}
|
||||
_ => warp_utils::reject::unhandled_error(
|
||||
BeaconChainError::from(e),
|
||||
),
|
||||
})?;
|
||||
|
||||
possibly_built_cache
|
||||
if let Some(shuffling_id) = shuffling_id {
|
||||
historical_committee_cache
|
||||
.insert(shuffling_id, committee_cache.clone());
|
||||
}
|
||||
|
||||
committee_cache
|
||||
};
|
||||
|
||||
// Use either the supplied slot or all slots in the epoch.
|
||||
|
||||
43
beacon_node/http_api/src/caches.rs
Normal file
43
beacon_node/http_api/src/caches.rs
Normal file
@@ -0,0 +1,43 @@
|
||||
use lru::LruCache;
|
||||
use parking_lot::Mutex;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
use types::{AttestationShufflingId, CommitteeCache, Epoch};
|
||||
|
||||
/// See `shuffling_cache::DEFAULT_CACHE_SIZE` for rationale
|
||||
pub const DEFAULT_HISTORICAL_COMMITTEE_CACHE_SIZE: usize = 16;
|
||||
|
||||
/// Indexes the `HistoricalCommitteeCache`. We can compute committees for very old epochs, and we
|
||||
/// can't retrieve the decision root cheaply from a state. For those cases we allow the cache to
|
||||
/// key those committees by finalized epoch.
|
||||
#[derive(Eq, Hash, PartialEq)]
|
||||
pub enum HistoricalShufflingId {
|
||||
FinalizedEpoch(Epoch),
|
||||
ShufflingId(AttestationShufflingId),
|
||||
}
|
||||
|
||||
/// Dedicated cache for attestation committees, used exclusively by the HTTP API.
|
||||
///
|
||||
/// This may contain committees for finalized and unfinalized epochs. The name is slightly
|
||||
/// missleading :)
|
||||
pub struct HistoricalCommitteeCache {
|
||||
committees: Mutex<LruCache<HistoricalShufflingId, Arc<CommitteeCache>>>,
|
||||
}
|
||||
|
||||
impl HistoricalCommitteeCache {
|
||||
pub fn new(size: NonZeroUsize) -> Self {
|
||||
Self {
|
||||
committees: Mutex::new(LruCache::new(size)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl HistoricalCommitteeCache {
|
||||
pub fn get(&self, id: &HistoricalShufflingId) -> Option<Arc<CommitteeCache>> {
|
||||
self.committees.lock().get(id).cloned()
|
||||
}
|
||||
|
||||
pub fn insert(&self, id: HistoricalShufflingId, cache: Arc<CommitteeCache>) {
|
||||
self.committees.lock().put(id, cache);
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,7 @@ mod beacon;
|
||||
mod block_id;
|
||||
mod build_block_contents;
|
||||
mod builder_states;
|
||||
mod caches;
|
||||
mod custody;
|
||||
mod database;
|
||||
mod light_client;
|
||||
@@ -40,6 +41,8 @@ use crate::beacon::execution_payload_envelope::{
|
||||
post_beacon_execution_payload_envelope_ssz,
|
||||
};
|
||||
use crate::beacon::pool::*;
|
||||
use crate::caches::DEFAULT_HISTORICAL_COMMITTEE_CACHE_SIZE;
|
||||
pub use crate::caches::HistoricalCommitteeCache;
|
||||
use crate::light_client::{get_light_client_bootstrap, get_light_client_updates};
|
||||
use crate::utils::{AnyVersionFilter, EthV1Filter};
|
||||
use crate::validator::post_validator_liveness_epoch;
|
||||
@@ -132,6 +135,7 @@ pub struct Context<T: BeaconChainTypes> {
|
||||
pub network_globals: Option<Arc<NetworkGlobals<T::EthSpec>>>,
|
||||
pub beacon_processor_send: Option<BeaconProcessorSend<T::EthSpec>>,
|
||||
pub sse_logging_components: Option<SSELoggingComponents>,
|
||||
pub historical_committee_cache: Arc<HistoricalCommitteeCache>,
|
||||
}
|
||||
|
||||
/// Configuration for the HTTP server.
|
||||
@@ -148,6 +152,7 @@ pub struct Config {
|
||||
#[serde(with = "eth2::types::serde_status_code")]
|
||||
pub duplicate_block_status_code: StatusCode,
|
||||
pub target_peers: usize,
|
||||
pub historical_committee_cache_size: usize,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
@@ -163,6 +168,7 @@ impl Default for Config {
|
||||
enable_beacon_processor: true,
|
||||
duplicate_block_status_code: StatusCode::ACCEPTED,
|
||||
target_peers: 100,
|
||||
historical_committee_cache_size: DEFAULT_HISTORICAL_COMMITTEE_CACHE_SIZE,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -416,6 +422,11 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
})
|
||||
.boxed();
|
||||
|
||||
let historical_committee_cache = ctx.historical_committee_cache.clone();
|
||||
let beacon_states_committees_filter = warp::any()
|
||||
.map(move || historical_committee_cache.clone())
|
||||
.boxed();
|
||||
|
||||
// Create a `warp` filter that provides access to the network sender channel.
|
||||
let network_tx = ctx
|
||||
.network_senders
|
||||
@@ -628,8 +639,10 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
states::get_beacon_state_validators_id(beacon_states_path.clone());
|
||||
|
||||
// GET beacon/states/{state_id}/committees?slot,index,epoch
|
||||
let get_beacon_state_committees =
|
||||
states::get_beacon_state_committees(beacon_states_path.clone());
|
||||
let get_beacon_state_committees = states::get_beacon_state_committees(
|
||||
beacon_states_path.clone(),
|
||||
beacon_states_committees_filter,
|
||||
);
|
||||
|
||||
// GET beacon/states/{state_id}/sync_committees?epoch
|
||||
let get_beacon_state_sync_committees =
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::{Config, Context};
|
||||
use crate::{Config, Context, caches::HistoricalCommitteeCache};
|
||||
use beacon_chain::{
|
||||
BeaconChain, BeaconChainTypes,
|
||||
custody_context::NodeCustodyType,
|
||||
@@ -22,10 +22,10 @@ use lighthouse_network::{
|
||||
};
|
||||
use network::{NetworkReceivers, NetworkSenders};
|
||||
use sensitive_url::SensitiveUrl;
|
||||
use std::future::Future;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::{future::Future, num::NonZeroUsize};
|
||||
use store::MemoryStore;
|
||||
use task_executor::test_utils::TestRuntime;
|
||||
use types::{ChainSpec, EthSpec};
|
||||
@@ -293,6 +293,9 @@ pub async fn create_api_server_with_config<T: BeaconChainTypes>(
|
||||
network_globals: Some(network_globals),
|
||||
beacon_processor_send: Some(beacon_processor_send),
|
||||
sse_logging_components: None,
|
||||
historical_committee_cache: Arc::new(HistoricalCommitteeCache::new(
|
||||
NonZeroUsize::new(http_config.historical_committee_cache_size).unwrap(),
|
||||
)),
|
||||
});
|
||||
|
||||
let (listening_socket, server) =
|
||||
|
||||
@@ -4254,8 +4254,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
"payload_attn_invalid_sig",
|
||||
);
|
||||
}
|
||||
PayloadAttestationError::MissingPTC { .. }
|
||||
| PayloadAttestationError::BeaconChainError(_) => {
|
||||
PayloadAttestationError::BeaconChainError(_) => {
|
||||
debug!(
|
||||
%peer_id,
|
||||
%message_slot,
|
||||
|
||||
@@ -215,6 +215,9 @@ pub fn get_config<E: EthSpec>(
|
||||
|
||||
if let Some(cache_size) = clap_utils::parse_optional(cli_args, "shuffling-cache-size")? {
|
||||
client_config.chain.shuffling_cache_size = cache_size;
|
||||
// Mantain backwards compatibility with users customizing `shuffling_cache_size` to tweak
|
||||
// the behaviour of the HTTP API route `beacon/states/committees`
|
||||
client_config.http_api.historical_committee_cache_size = cache_size;
|
||||
}
|
||||
|
||||
if let Some(batches) = clap_utils::parse_optional(cli_args, "blob-publication-batches")? {
|
||||
|
||||
Reference in New Issue
Block a user