Update committee cache to carry PTC

This commit is contained in:
Michael Sproul
2026-05-14 13:23:51 +10:00
parent a795451cf9
commit 90fcc51216
4 changed files with 117 additions and 56 deletions

View File

@@ -71,7 +71,9 @@ use crate::persisted_custody::persist_custody_context;
use crate::persisted_fork_choice::PersistedForkChoice;
use crate::pre_finalization_cache::PreFinalizationBlockCache;
use crate::proposer_preferences_verification::proposer_preference_cache::GossipVerifiedProposerPreferenceCache;
use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
use crate::shuffling_cache::{
BlockShufflingIds, CachedShuffling, ShufflingCache, get_ptc_for_shuffling_epoch,
};
use crate::sync_committee_verification::{
Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution,
};
@@ -466,7 +468,7 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// HTTP server is enabled.
pub event_handler: Option<ServerSentEventHandler<T::EthSpec>>,
/// Caches the attester shuffling for a given epoch and shuffling key root.
pub shuffling_cache: RwLock<ShufflingCache>,
pub shuffling_cache: RwLock<ShufflingCache<T::EthSpec>>,
/// Caches the beacon block proposer shuffling for a given epoch and shuffling key root.
pub beacon_proposer_cache: Arc<Mutex<BeaconProposerCache>>,
/// Caches a map of `validator_index -> validator_pubkey`.
@@ -4794,9 +4796,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
if !shuffling_is_cached {
state.build_committee_cache(relative_epoch, &self.spec)?;
let committee_cache = state.committee_cache(relative_epoch)?;
let shuffling_epoch = relative_epoch.into_epoch(state.current_epoch());
let ptc = get_ptc_for_shuffling_epoch(state, shuffling_epoch, &self.spec)?;
let cached_shuffling = CachedShuffling::new(committee_cache.clone(), ptc);
self.shuffling_cache
.write()
.insert_committee_cache(shuffling_id, committee_cache);
.insert_committee_cache_with_ptc(shuffling_id, cached_shuffling);
}
}
Ok(())
@@ -6930,8 +6935,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// access.
drop(shuffling_cache);
let committee_cache = cache_item.wait()?;
map_fn(&committee_cache, shuffling_id.shuffling_decision_block)
let cached_shuffling = cache_item.wait()?;
map_fn(
&cached_shuffling.committee_cache,
shuffling_id.shuffling_decision_block,
)
} else {
// Create an entry in the cache that "promises" this value will eventually be computed.
// This avoids the case where multiple threads attempt to produce the same value at the
@@ -7029,17 +7037,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
state.build_committee_cache(relative_epoch, &self.spec)?;
let committee_cache = state.committee_cache(relative_epoch)?.clone();
let ptc = get_ptc_for_shuffling_epoch(&state, shuffling_epoch, &self.spec)?;
let shuffling_decision_block = shuffling_id.shuffling_decision_block;
let cached_shuffling = CachedShuffling::new(committee_cache.clone(), ptc);
self.shuffling_cache
.write()
.insert_committee_cache(shuffling_id, &committee_cache);
.insert_committee_cache_with_ptc(shuffling_id, cached_shuffling.clone());
metrics::stop_timer(committee_building_timer);
sender.send(committee_cache.clone());
sender.send(cached_shuffling.clone());
map_fn(&committee_cache, shuffling_decision_block)
map_fn(&cached_shuffling.committee_cache, shuffling_decision_block)
}
}

View File

@@ -5,21 +5,22 @@ use itertools::Itertools;
use oneshot_broadcast::{Receiver, Sender, oneshot};
use tracing::debug;
use types::{
AttestationShufflingId, BeaconState, Epoch, EthSpec, Hash256, RelativeEpoch,
state::CommitteeCache,
AttestationShufflingId, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Hash256, PTC,
RelativeEpoch, state::CommitteeCache,
};
use crate::{BeaconChainError, metrics};
/// The size of the cache that stores committee caches for quicker verification.
/// The size of the cache that stores shufflings for quicker verification.
///
/// Each entry should be `8 + 800,000 = 800,008` bytes in size with 100k validators. (8-byte hash +
/// 100k indices). Therefore, this cache should be approx `16 * 800,008 = 12.8 MB`. (Note: this
/// ignores a few extra bytes in the caches that should be insignificant compared to the indices).
/// Each entry should be around `8 + 800,000 + 4,096 = 804,104` bytes in size with 100k validators
/// and a 512-validator PTC. Therefore, this cache should be approx `16 * 804,104 = 12.9 MB`.
/// (Note: this ignores a few extra bytes in the caches that should be insignificant compared to the
/// indices).
pub const DEFAULT_CACHE_SIZE: usize = 16;
/// The maximum number of concurrent committee cache "promises" that can be issued. In effect, this
/// limits the number of concurrent states that can be loaded into memory for the committee cache.
/// The maximum number of concurrent shuffling "promises" that can be issued. In effect, this
/// limits the number of concurrent states that can be loaded into memory for the shuffling.
/// This prevents excessive memory usage at the cost of rejecting some attestations.
///
/// We set this value to 2 since states can be quite large and have a significant impact on memory
@@ -30,19 +31,34 @@ pub const DEFAULT_CACHE_SIZE: usize = 16;
const MAX_CONCURRENT_PROMISES: usize = 2;
#[derive(Clone)]
pub enum CacheItem {
/// A committee.
Committee(Arc<CommitteeCache>),
/// A promise for a future committee.
Promise(Receiver<Arc<CommitteeCache>>),
pub struct CachedShuffling<E: EthSpec> {
pub committee_cache: Arc<CommitteeCache>,
pub ptc: Option<PTC<E>>,
}
impl CacheItem {
impl<E: EthSpec> CachedShuffling<E> {
pub fn new(committee_cache: Arc<CommitteeCache>, ptc: Option<PTC<E>>) -> Self {
Self {
committee_cache,
ptc,
}
}
}
#[derive(Clone)]
pub enum CacheItem<E: EthSpec> {
/// A cached shuffling.
Committee(CachedShuffling<E>),
/// A promise for a future cached shuffling.
Promise(Receiver<CachedShuffling<E>>),
}
impl<E: EthSpec> CacheItem<E> {
pub fn is_promise(&self) -> bool {
matches!(self, CacheItem::Promise(_))
}
pub fn wait(self) -> Result<Arc<CommitteeCache>, BeaconChainError> {
pub fn wait(self) -> Result<CachedShuffling<E>, BeaconChainError> {
match self {
CacheItem::Committee(cache) => Ok(cache),
CacheItem::Promise(receiver) => receiver
@@ -52,17 +68,17 @@ impl CacheItem {
}
}
/// Provides a cache for `CommitteeCache`.
/// Provides a cache for `CommitteeCache` and the associated optional PTC.
///
/// It has been named `ShufflingCache` because `CommitteeCacheCache` is a bit weird and looks like
/// a find/replace error.
pub struct ShufflingCache {
cache: HashMap<AttestationShufflingId, CacheItem>,
pub struct ShufflingCache<E: EthSpec> {
cache: HashMap<AttestationShufflingId, CacheItem<E>>,
cache_size: usize,
head_shuffling_ids: BlockShufflingIds,
}
impl ShufflingCache {
impl<E: EthSpec> ShufflingCache<E> {
pub fn new(cache_size: usize, head_shuffling_ids: BlockShufflingIds) -> Self {
Self {
cache: HashMap::new(),
@@ -71,22 +87,22 @@ impl ShufflingCache {
}
}
pub fn get(&mut self, key: &AttestationShufflingId) -> Option<CacheItem> {
pub fn get(&mut self, key: &AttestationShufflingId) -> Option<CacheItem<E>> {
match self.cache.get(key) {
// The cache contained the committee cache, return it.
// The cache contained the shuffling, return it.
item @ Some(CacheItem::Committee(_)) => {
metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS);
item.cloned()
}
// The cache contains a promise for the committee cache. Check to see if the promise has
// The cache contains a promise for the shuffling. Check to see if the promise has
// already been resolved, without waiting for it.
item @ Some(CacheItem::Promise(receiver)) => match receiver.try_recv() {
// The promise has already been resolved. Replace the entry in the cache with a
// `Committee` entry and then return the committee.
Ok(Some(committee)) => {
// `Committee` entry and then return the cached shuffling.
Ok(Some(cached_shuffling)) => {
metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS);
metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS);
let ready = CacheItem::Committee(committee);
let ready = CacheItem::Committee(cached_shuffling);
self.insert_cache_item(key.clone(), ready.clone());
Some(ready)
}
@@ -97,8 +113,8 @@ impl ShufflingCache {
metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS);
item.cloned()
}
// The sender has been dropped without sending a committee. There was most likely an
// error computing the committee cache. Drop the key from the cache and return
// The sender has been dropped without sending a shuffling. There was most likely an
// error computing the shuffling. Drop the key from the cache and return
// `None` so the caller can recompute the committee.
//
// It's worth noting that this is the only place where we removed unresolved
@@ -113,7 +129,7 @@ impl ShufflingCache {
None
}
},
// The cache does not have this committee and it's not already promised to be computed.
// The cache does not have this shuffling and it's not already promised to be computed.
None => {
metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES);
None
@@ -129,23 +145,30 @@ impl ShufflingCache {
&mut self,
key: AttestationShufflingId,
committee_cache: &C,
) {
self.insert_committee_cache_with_ptc(
key,
CachedShuffling::new(committee_cache.to_arc_committee_cache(), None),
);
}
pub fn insert_committee_cache_with_ptc(
&mut self,
key: AttestationShufflingId,
cached_shuffling: CachedShuffling<E>,
) {
if self
.cache
.get(&key)
// Replace the committee if it's not present or if it's a promise. A bird in the hand is
// worth two in the promise-bush!
// Replace the cached shuffling if it's not present or if it's a promise.
.is_none_or(CacheItem::is_promise)
{
self.insert_cache_item(
key,
CacheItem::Committee(committee_cache.to_arc_committee_cache()),
);
self.insert_cache_item(key, CacheItem::Committee(cached_shuffling));
}
}
/// Prunes the cache first before inserting a new cache item.
fn insert_cache_item(&mut self, key: AttestationShufflingId, cache_item: CacheItem) {
fn insert_cache_item(&mut self, key: AttestationShufflingId, cache_item: CacheItem<E>) {
self.prune_cache();
self.cache.insert(key, cache_item);
}
@@ -188,7 +211,7 @@ impl ShufflingCache {
pub fn create_promise(
&mut self,
key: AttestationShufflingId,
) -> Result<Sender<Arc<CommitteeCache>>, BeaconChainError> {
) -> Result<Sender<CachedShuffling<E>>, BeaconChainError> {
let num_active_promises = self
.cache
.iter()
@@ -212,6 +235,20 @@ impl ShufflingCache {
}
}
/// Return the PTC associated with the first slot in `shuffling_epoch`, when the state supports PTCs.
pub fn get_ptc_for_shuffling_epoch<E: EthSpec>(
state: &BeaconState<E>,
shuffling_epoch: Epoch,
spec: &ChainSpec,
) -> Result<Option<PTC<E>>, BeaconStateError> {
if state.fork_name_unchecked().gloas_enabled() {
let slot = shuffling_epoch.start_slot(E::slots_per_epoch());
state.get_ptc(slot, spec).map(Some)
} else {
Ok(None)
}
}
/// A helper trait to allow lazy-cloning of the committee cache when inserting into the cache.
pub trait ToArcCommitteeCache {
fn to_arc_committee_cache(&self) -> Arc<CommitteeCache>;
@@ -304,7 +341,7 @@ mod test {
const TEST_CACHE_SIZE: usize = 5;
// Creates a new shuffling cache for testing
fn new_shuffling_cache() -> ShufflingCache {
fn new_shuffling_cache() -> ShufflingCache<E> {
create_test_tracing_subscriber();
let current_epoch = 8;
@@ -318,6 +355,10 @@ mod test {
ShufflingCache::new(TEST_CACHE_SIZE, head_shuffling_ids)
}
fn cached_shuffling(committee_cache: Arc<CommitteeCache>) -> CachedShuffling<E> {
CachedShuffling::new(committee_cache, None)
}
/// Returns two different committee caches for testing.
fn committee_caches() -> (Arc<CommitteeCache>, Arc<CommitteeCache>) {
let harness = BeaconChainHarness::builder(MinimalEthSpec)
@@ -366,12 +407,12 @@ mod test {
);
// Resolve the promise.
sender.send(committee_a.clone());
sender.send(cached_shuffling(committee_a.clone()));
// Ensure the promise has been resolved.
let item = cache.get(&id_a).unwrap();
assert!(
matches!(item, CacheItem::Committee(committee) if committee == committee_a),
matches!(item, CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_a),
"the promise should be resolved"
);
assert_eq!(cache.cache.len(), 1, "the cache should have one entry");
@@ -428,30 +469,30 @@ mod test {
);
// Resolve promise A.
sender_a.send(committee_a.clone());
sender_a.send(cached_shuffling(committee_a.clone()));
// Ensure promise A has been resolved.
let item = cache.get(&id_a).unwrap();
assert!(
matches!(item, CacheItem::Committee(committee) if committee == committee_a),
matches!(item, CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_a),
"promise A should be resolved"
);
// Resolve promise B.
sender_b.send(committee_b.clone());
sender_b.send(cached_shuffling(committee_b.clone()));
// Ensure promise B has been resolved.
let item = cache.get(&id_b).unwrap();
assert!(
matches!(item, CacheItem::Committee(committee) if committee == committee_b),
matches!(item, CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_b),
"promise B should be resolved"
);
// Check both entries again.
assert!(
matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(committee) if committee == committee_a),
matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_a),
"promise A should remain resolved"
);
assert!(
matches!(cache.get(&id_b).unwrap(), CacheItem::Committee(committee) if committee == committee_b),
matches!(cache.get(&id_b).unwrap(), CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_b),
"promise B should remain resolved"
);
assert_eq!(cache.cache.len(), 2, "the cache should have two entries");
@@ -487,7 +528,7 @@ mod test {
let committee_cache_a = Arc::new(CommitteeCache::default());
cache.insert_committee_cache(id_a.clone(), &committee_cache_a);
assert!(
matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(committee_cache) if committee_cache == committee_cache_a),
matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_cache_a),
"should insert committee cache"
);
}

View File

@@ -15,7 +15,9 @@
//! 2. There's a possibility that the head block is never built upon, causing wasted CPU cycles.
use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS;
use crate::{
BeaconChain, BeaconChainError, BeaconChainTypes, chain_config::FORK_CHOICE_LOOKAHEAD_FACTOR,
BeaconChain, BeaconChainError, BeaconChainTypes,
chain_config::FORK_CHOICE_LOOKAHEAD_FACTOR,
shuffling_cache::{CachedShuffling, get_ptc_for_shuffling_epoch},
};
use slot_clock::SlotClock;
use state_processing::per_slot_processing;
@@ -395,10 +397,17 @@ fn advance_head<T: BeaconChainTypes>(beacon_chain: &Arc<BeaconChain<T>>) -> Resu
let committee_cache = state
.committee_cache(RelativeEpoch::Next)
.map_err(BeaconChainError::from)?;
let ptc = get_ptc_for_shuffling_epoch(
&state,
RelativeEpoch::Next.into_epoch(state.current_epoch()),
&beacon_chain.spec,
)
.map_err(BeaconChainError::from)?;
let cached_shuffling = CachedShuffling::new(committee_cache.clone(), ptc);
beacon_chain
.shuffling_cache
.write()
.insert_committee_cache(shuffling_id.clone(), committee_cache);
.insert_committee_cache_with_ptc(shuffling_id.clone(), cached_shuffling);
debug!(
?head_block_root,

View File

@@ -382,6 +382,7 @@ pub fn get_beacon_state_committees<T: BeaconChainTypes>(
.try_write_for(std::time::Duration::from_secs(1))
.and_then(|mut cache_write| cache_write.get(shuffling_id))
.and_then(|cache_item| cache_item.wait().ok())
.map(|cached_shuffling| cached_shuffling.committee_cache)
} else {
None
};