mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-08 09:16:00 +00:00
Compute proposer shuffling only once in gossip verification (#7304)
When we perform data column gossip verification, we sometimes see multiple proposer shuffling cache miss simultaneously and this results in multiple threads computing the shuffling cache and potentially slows down the gossip verification. Proposal here is to use a `OnceCell` for each shuffling key to make sure it's only computed once. I have only implemented this in data column verification as a PoC, but this can also be applied to blob and block verification Related issues: - https://github.com/sigp/lighthouse/issues/4447 - https://github.com/sigp/lighthouse/issues/7203
This commit is contained in:
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -813,6 +813,7 @@ dependencies = [
|
|||||||
"maplit",
|
"maplit",
|
||||||
"merkle_proof",
|
"merkle_proof",
|
||||||
"metrics",
|
"metrics",
|
||||||
|
"once_cell",
|
||||||
"oneshot_broadcast",
|
"oneshot_broadcast",
|
||||||
"operation_pool",
|
"operation_pool",
|
||||||
"parking_lot 0.12.3",
|
"parking_lot 0.12.3",
|
||||||
@@ -5731,8 +5732,6 @@ dependencies = [
|
|||||||
"chrono",
|
"chrono",
|
||||||
"logroller",
|
"logroller",
|
||||||
"metrics",
|
"metrics",
|
||||||
"once_cell",
|
|
||||||
"parking_lot 0.12.3",
|
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
|||||||
@@ -161,6 +161,7 @@ maplit = "1"
|
|||||||
milhouse = "0.5"
|
milhouse = "0.5"
|
||||||
mockito = "1.5.0"
|
mockito = "1.5.0"
|
||||||
num_cpus = "1"
|
num_cpus = "1"
|
||||||
|
once_cell = "1.17.1"
|
||||||
parking_lot = "0.12"
|
parking_lot = "0.12"
|
||||||
paste = "1"
|
paste = "1"
|
||||||
prometheus = { version = "0.13", default-features = false }
|
prometheus = { version = "0.13", default-features = false }
|
||||||
|
|||||||
@@ -47,6 +47,7 @@ logging = { workspace = true }
|
|||||||
lru = { workspace = true }
|
lru = { workspace = true }
|
||||||
merkle_proof = { workspace = true }
|
merkle_proof = { workspace = true }
|
||||||
metrics = { workspace = true }
|
metrics = { workspace = true }
|
||||||
|
once_cell = { workspace = true }
|
||||||
oneshot_broadcast = { path = "../../common/oneshot_broadcast/" }
|
oneshot_broadcast = { path = "../../common/oneshot_broadcast/" }
|
||||||
operation_pool = { workspace = true }
|
operation_pool = { workspace = true }
|
||||||
parking_lot = { workspace = true }
|
parking_lot = { workspace = true }
|
||||||
|
|||||||
@@ -11,10 +11,12 @@
|
|||||||
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
|
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
|
||||||
use fork_choice::ExecutionStatus;
|
use fork_choice::ExecutionStatus;
|
||||||
use lru::LruCache;
|
use lru::LruCache;
|
||||||
|
use once_cell::sync::OnceCell;
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
use state_processing::state_advance::partial_state_advance;
|
use state_processing::state_advance::partial_state_advance;
|
||||||
use std::cmp::Ordering;
|
use std::cmp::Ordering;
|
||||||
use std::num::NonZeroUsize;
|
use std::num::NonZeroUsize;
|
||||||
|
use std::sync::Arc;
|
||||||
use types::non_zero_usize::new_non_zero_usize;
|
use types::non_zero_usize::new_non_zero_usize;
|
||||||
use types::{
|
use types::{
|
||||||
BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, Hash256, Slot, Unsigned,
|
BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, Hash256, Slot, Unsigned,
|
||||||
@@ -39,21 +41,21 @@ pub struct Proposer {
|
|||||||
/// their signatures.
|
/// their signatures.
|
||||||
pub struct EpochBlockProposers {
|
pub struct EpochBlockProposers {
|
||||||
/// The epoch to which the proposers pertain.
|
/// The epoch to which the proposers pertain.
|
||||||
epoch: Epoch,
|
pub(crate) epoch: Epoch,
|
||||||
/// The fork that should be used to verify proposer signatures.
|
/// The fork that should be used to verify proposer signatures.
|
||||||
fork: Fork,
|
pub(crate) fork: Fork,
|
||||||
/// A list of length `T::EthSpec::slots_per_epoch()`, representing the proposers for each slot
|
/// A list of length `T::EthSpec::slots_per_epoch()`, representing the proposers for each slot
|
||||||
/// in that epoch.
|
/// in that epoch.
|
||||||
///
|
///
|
||||||
/// E.g., if `self.epoch == 1`, then `self.proposers[0]` contains the proposer for slot `32`.
|
/// E.g., if `self.epoch == 1`, then `self.proposers[0]` contains the proposer for slot `32`.
|
||||||
proposers: SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>,
|
pub(crate) proposers: SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A cache to store the proposers for some epoch.
|
/// A cache to store the proposers for some epoch.
|
||||||
///
|
///
|
||||||
/// See the module-level documentation for more information.
|
/// See the module-level documentation for more information.
|
||||||
pub struct BeaconProposerCache {
|
pub struct BeaconProposerCache {
|
||||||
cache: LruCache<(Epoch, Hash256), EpochBlockProposers>,
|
cache: LruCache<(Epoch, Hash256), Arc<OnceCell<EpochBlockProposers>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for BeaconProposerCache {
|
impl Default for BeaconProposerCache {
|
||||||
@@ -74,7 +76,8 @@ impl BeaconProposerCache {
|
|||||||
) -> Option<Proposer> {
|
) -> Option<Proposer> {
|
||||||
let epoch = slot.epoch(E::slots_per_epoch());
|
let epoch = slot.epoch(E::slots_per_epoch());
|
||||||
let key = (epoch, shuffling_decision_block);
|
let key = (epoch, shuffling_decision_block);
|
||||||
if let Some(cache) = self.cache.get(&key) {
|
let cache_opt = self.cache.get(&key).and_then(|cell| cell.get());
|
||||||
|
if let Some(cache) = cache_opt {
|
||||||
// This `if` statement is likely unnecessary, but it feels like good practice.
|
// This `if` statement is likely unnecessary, but it feels like good practice.
|
||||||
if epoch == cache.epoch {
|
if epoch == cache.epoch {
|
||||||
cache
|
cache
|
||||||
@@ -103,7 +106,26 @@ impl BeaconProposerCache {
|
|||||||
epoch: Epoch,
|
epoch: Epoch,
|
||||||
) -> Option<&SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>> {
|
) -> Option<&SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>> {
|
||||||
let key = (epoch, shuffling_decision_block);
|
let key = (epoch, shuffling_decision_block);
|
||||||
self.cache.get(&key).map(|cache| &cache.proposers)
|
self.cache
|
||||||
|
.get(&key)
|
||||||
|
.and_then(|cache_once_cell| cache_once_cell.get().map(|proposers| &proposers.proposers))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the `OnceCell` for the given `(epoch, shuffling_decision_block)` key,
|
||||||
|
/// inserting an empty one if it doesn't exist.
|
||||||
|
///
|
||||||
|
/// The returned `OnceCell` allows the caller to initialise the value externally
|
||||||
|
/// using `get_or_try_init`, enabling deferred computation without holding a mutable
|
||||||
|
/// reference to the cache.
|
||||||
|
pub fn get_or_insert_key(
|
||||||
|
&mut self,
|
||||||
|
epoch: Epoch,
|
||||||
|
shuffling_decision_block: Hash256,
|
||||||
|
) -> Arc<OnceCell<EpochBlockProposers>> {
|
||||||
|
let key = (epoch, shuffling_decision_block);
|
||||||
|
self.cache
|
||||||
|
.get_or_insert(key, || Arc::new(OnceCell::new()))
|
||||||
|
.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Insert the proposers into the cache.
|
/// Insert the proposers into the cache.
|
||||||
@@ -120,14 +142,13 @@ impl BeaconProposerCache {
|
|||||||
) -> Result<(), BeaconStateError> {
|
) -> Result<(), BeaconStateError> {
|
||||||
let key = (epoch, shuffling_decision_block);
|
let key = (epoch, shuffling_decision_block);
|
||||||
if !self.cache.contains(&key) {
|
if !self.cache.contains(&key) {
|
||||||
self.cache.put(
|
let epoch_proposers = EpochBlockProposers {
|
||||||
key,
|
epoch,
|
||||||
EpochBlockProposers {
|
fork,
|
||||||
epoch,
|
proposers: proposers.into(),
|
||||||
fork,
|
};
|
||||||
proposers: proposers.into(),
|
self.cache
|
||||||
},
|
.put(key, Arc::new(OnceCell::with_value(epoch_proposers)));
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
use crate::beacon_proposer_cache::EpochBlockProposers;
|
||||||
use crate::block_verification::{
|
use crate::block_verification::{
|
||||||
cheap_state_advance_to_obtain_committees, get_validator_pubkey_cache, process_block_slash_info,
|
cheap_state_advance_to_obtain_committees, get_validator_pubkey_cache, process_block_slash_info,
|
||||||
BlockSlashInfo,
|
BlockSlashInfo,
|
||||||
@@ -602,14 +603,19 @@ fn verify_proposer_and_signature<T: BeaconChainTypes>(
|
|||||||
parent_block.root
|
parent_block.root
|
||||||
};
|
};
|
||||||
|
|
||||||
let proposer_opt = chain
|
// We lock the cache briefly to get or insert a OnceCell, then drop the lock
|
||||||
|
// before doing proposer shuffling calculation via `OnceCell::get_or_try_init`. This avoids
|
||||||
|
// holding the lock during the computation, while still ensuring the result is cached and
|
||||||
|
// initialised only once.
|
||||||
|
//
|
||||||
|
// This approach exposes the cache internals (`OnceCell` & `EpochBlockProposers`)
|
||||||
|
// as a trade-off for avoiding lock contention.
|
||||||
|
let epoch_proposers_cell = chain
|
||||||
.beacon_proposer_cache
|
.beacon_proposer_cache
|
||||||
.lock()
|
.lock()
|
||||||
.get_slot::<T::EthSpec>(proposer_shuffling_root, column_slot);
|
.get_or_insert_key(column_epoch, proposer_shuffling_root);
|
||||||
|
|
||||||
let (proposer_index, fork) = if let Some(proposer) = proposer_opt {
|
let epoch_proposers = epoch_proposers_cell.get_or_try_init(move || {
|
||||||
(proposer.index, proposer.fork)
|
|
||||||
} else {
|
|
||||||
debug!(
|
debug!(
|
||||||
%block_root,
|
%block_root,
|
||||||
index = %column_index,
|
index = %column_index,
|
||||||
@@ -633,19 +639,20 @@ fn verify_proposer_and_signature<T: BeaconChainTypes>(
|
|||||||
)?;
|
)?;
|
||||||
|
|
||||||
let proposers = state.get_beacon_proposer_indices(&chain.spec)?;
|
let proposers = state.get_beacon_proposer_indices(&chain.spec)?;
|
||||||
let proposer_index = *proposers
|
|
||||||
.get(column_slot.as_usize() % T::EthSpec::slots_per_epoch() as usize)
|
|
||||||
.ok_or_else(|| BeaconChainError::NoProposerForSlot(column_slot))?;
|
|
||||||
|
|
||||||
// Prime the proposer shuffling cache with the newly-learned value.
|
// Prime the proposer shuffling cache with the newly-learned value.
|
||||||
chain.beacon_proposer_cache.lock().insert(
|
Ok::<_, GossipDataColumnError>(EpochBlockProposers {
|
||||||
column_epoch,
|
epoch: column_epoch,
|
||||||
proposer_shuffling_root,
|
fork: state.fork(),
|
||||||
proposers,
|
proposers: proposers.into(),
|
||||||
state.fork(),
|
})
|
||||||
)?;
|
})?;
|
||||||
(proposer_index, state.fork())
|
|
||||||
};
|
let proposer_index = *epoch_proposers
|
||||||
|
.proposers
|
||||||
|
.get(column_slot.as_usize() % T::EthSpec::slots_per_epoch() as usize)
|
||||||
|
.ok_or_else(|| BeaconChainError::NoProposerForSlot(column_slot))?;
|
||||||
|
|
||||||
|
let fork = epoch_proposers.fork;
|
||||||
|
|
||||||
// Signature verify the signed block header.
|
// Signature verify the signed block header.
|
||||||
let signature_is_valid = {
|
let signature_is_valid = {
|
||||||
|
|||||||
@@ -11,8 +11,6 @@ test_logger = [] # Print log output to stderr when running tests instead of drop
|
|||||||
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
|
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
|
||||||
logroller = { workspace = true }
|
logroller = { workspace = true }
|
||||||
metrics = { workspace = true }
|
metrics = { workspace = true }
|
||||||
once_cell = "1.17.1"
|
|
||||||
parking_lot = { workspace = true }
|
|
||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
tokio = { workspace = true, features = [ "time" ] }
|
tokio = { workspace = true, features = [ "time" ] }
|
||||||
|
|||||||
Reference in New Issue
Block a user