Compute proposer shuffling only once in gossip verification (#7304)

When we perform data column gossip verification, we sometimes see multiple proposer shuffling cache miss simultaneously and this results in multiple threads computing the shuffling cache and potentially slows down the gossip verification.

Proposal here is to use a `OnceCell` for each shuffling key to make sure it's only computed once. I have only implemented this in data column verification as a PoC, but this can also be applied to blob and block verification

Related issues:
- https://github.com/sigp/lighthouse/issues/4447
- https://github.com/sigp/lighthouse/issues/7203
This commit is contained in:
Jimmy Chen
2025-05-01 11:30:42 +10:00
committed by GitHub
parent 9779b4ba2c
commit 93ec9df137
6 changed files with 62 additions and 35 deletions

3
Cargo.lock generated
View File

@@ -813,6 +813,7 @@ dependencies = [
"maplit",
"merkle_proof",
"metrics",
"once_cell",
"oneshot_broadcast",
"operation_pool",
"parking_lot 0.12.3",
@@ -5731,8 +5732,6 @@ dependencies = [
"chrono",
"logroller",
"metrics",
"once_cell",
"parking_lot 0.12.3",
"serde",
"serde_json",
"tokio",

View File

@@ -161,6 +161,7 @@ maplit = "1"
milhouse = "0.5"
mockito = "1.5.0"
num_cpus = "1"
once_cell = "1.17.1"
parking_lot = "0.12"
paste = "1"
prometheus = { version = "0.13", default-features = false }

View File

@@ -47,6 +47,7 @@ logging = { workspace = true }
lru = { workspace = true }
merkle_proof = { workspace = true }
metrics = { workspace = true }
once_cell = { workspace = true }
oneshot_broadcast = { path = "../../common/oneshot_broadcast/" }
operation_pool = { workspace = true }
parking_lot = { workspace = true }

View File

@@ -11,10 +11,12 @@
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
use fork_choice::ExecutionStatus;
use lru::LruCache;
use once_cell::sync::OnceCell;
use smallvec::SmallVec;
use state_processing::state_advance::partial_state_advance;
use std::cmp::Ordering;
use std::num::NonZeroUsize;
use std::sync::Arc;
use types::non_zero_usize::new_non_zero_usize;
use types::{
BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, Hash256, Slot, Unsigned,
@@ -39,21 +41,21 @@ pub struct Proposer {
/// their signatures.
pub struct EpochBlockProposers {
/// The epoch to which the proposers pertain.
epoch: Epoch,
pub(crate) epoch: Epoch,
/// The fork that should be used to verify proposer signatures.
fork: Fork,
pub(crate) fork: Fork,
/// A list of length `T::EthSpec::slots_per_epoch()`, representing the proposers for each slot
/// in that epoch.
///
/// E.g., if `self.epoch == 1`, then `self.proposers[0]` contains the proposer for slot `32`.
proposers: SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>,
pub(crate) proposers: SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>,
}
/// A cache to store the proposers for some epoch.
///
/// See the module-level documentation for more information.
pub struct BeaconProposerCache {
cache: LruCache<(Epoch, Hash256), EpochBlockProposers>,
cache: LruCache<(Epoch, Hash256), Arc<OnceCell<EpochBlockProposers>>>,
}
impl Default for BeaconProposerCache {
@@ -74,7 +76,8 @@ impl BeaconProposerCache {
) -> Option<Proposer> {
let epoch = slot.epoch(E::slots_per_epoch());
let key = (epoch, shuffling_decision_block);
if let Some(cache) = self.cache.get(&key) {
let cache_opt = self.cache.get(&key).and_then(|cell| cell.get());
if let Some(cache) = cache_opt {
// This `if` statement is likely unnecessary, but it feels like good practice.
if epoch == cache.epoch {
cache
@@ -103,7 +106,26 @@ impl BeaconProposerCache {
epoch: Epoch,
) -> Option<&SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>> {
let key = (epoch, shuffling_decision_block);
self.cache.get(&key).map(|cache| &cache.proposers)
self.cache
.get(&key)
.and_then(|cache_once_cell| cache_once_cell.get().map(|proposers| &proposers.proposers))
}
/// Returns the `OnceCell` for the given `(epoch, shuffling_decision_block)` key,
/// inserting an empty one if it doesn't exist.
///
/// The returned `OnceCell` allows the caller to initialise the value externally
/// using `get_or_try_init`, enabling deferred computation without holding a mutable
/// reference to the cache.
pub fn get_or_insert_key(
&mut self,
epoch: Epoch,
shuffling_decision_block: Hash256,
) -> Arc<OnceCell<EpochBlockProposers>> {
let key = (epoch, shuffling_decision_block);
self.cache
.get_or_insert(key, || Arc::new(OnceCell::new()))
.clone()
}
/// Insert the proposers into the cache.
@@ -120,14 +142,13 @@ impl BeaconProposerCache {
) -> Result<(), BeaconStateError> {
let key = (epoch, shuffling_decision_block);
if !self.cache.contains(&key) {
self.cache.put(
key,
EpochBlockProposers {
epoch,
fork,
proposers: proposers.into(),
},
);
let epoch_proposers = EpochBlockProposers {
epoch,
fork,
proposers: proposers.into(),
};
self.cache
.put(key, Arc::new(OnceCell::with_value(epoch_proposers)));
}
Ok(())

View File

@@ -1,3 +1,4 @@
use crate::beacon_proposer_cache::EpochBlockProposers;
use crate::block_verification::{
cheap_state_advance_to_obtain_committees, get_validator_pubkey_cache, process_block_slash_info,
BlockSlashInfo,
@@ -602,14 +603,19 @@ fn verify_proposer_and_signature<T: BeaconChainTypes>(
parent_block.root
};
let proposer_opt = chain
// We lock the cache briefly to get or insert a OnceCell, then drop the lock
// before doing proposer shuffling calculation via `OnceCell::get_or_try_init`. This avoids
// holding the lock during the computation, while still ensuring the result is cached and
// initialised only once.
//
// This approach exposes the cache internals (`OnceCell` & `EpochBlockProposers`)
// as a trade-off for avoiding lock contention.
let epoch_proposers_cell = chain
.beacon_proposer_cache
.lock()
.get_slot::<T::EthSpec>(proposer_shuffling_root, column_slot);
.get_or_insert_key(column_epoch, proposer_shuffling_root);
let (proposer_index, fork) = if let Some(proposer) = proposer_opt {
(proposer.index, proposer.fork)
} else {
let epoch_proposers = epoch_proposers_cell.get_or_try_init(move || {
debug!(
%block_root,
index = %column_index,
@@ -633,19 +639,20 @@ fn verify_proposer_and_signature<T: BeaconChainTypes>(
)?;
let proposers = state.get_beacon_proposer_indices(&chain.spec)?;
let proposer_index = *proposers
.get(column_slot.as_usize() % T::EthSpec::slots_per_epoch() as usize)
.ok_or_else(|| BeaconChainError::NoProposerForSlot(column_slot))?;
// Prime the proposer shuffling cache with the newly-learned value.
chain.beacon_proposer_cache.lock().insert(
column_epoch,
proposer_shuffling_root,
proposers,
state.fork(),
)?;
(proposer_index, state.fork())
};
Ok::<_, GossipDataColumnError>(EpochBlockProposers {
epoch: column_epoch,
fork: state.fork(),
proposers: proposers.into(),
})
})?;
let proposer_index = *epoch_proposers
.proposers
.get(column_slot.as_usize() % T::EthSpec::slots_per_epoch() as usize)
.ok_or_else(|| BeaconChainError::NoProposerForSlot(column_slot))?;
let fork = epoch_proposers.fork;
// Signature verify the signed block header.
let signature_is_valid = {

View File

@@ -11,8 +11,6 @@ test_logger = [] # Print log output to stderr when running tests instead of drop
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
logroller = { workspace = true }
metrics = { workspace = true }
once_cell = "1.17.1"
parking_lot = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = [ "time" ] }