More proposer shuffling cleanup (#8130)

Addressing more review comments from:

- https://github.com/sigp/lighthouse/pull/8101

I've also tweaked a few more things that I think are minor bugs.


  - Instrument `ensure_state_can_determine_proposers_for_epoch`
- Fix `block_root` usage in `compute_proposer_duties_from_head`. This was a regression introduced in 8101 😬 .
- Update the `state_advance_timer` to prime the next-epoch proposer cache post-Fulu.


Co-Authored-By: Michael Sproul <michael@sigmaprime.io>
This commit is contained in:
Michael Sproul
2025-10-20 14:14:14 +11:00
committed by GitHub
parent 79716f6ec1
commit 2f8587301d
8 changed files with 359 additions and 59 deletions

View File

@@ -4726,6 +4726,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// efficient packing of execution blocks.
Err(Error::SkipProposerPreparation)
} else {
debug!(
?shuffling_decision_root,
epoch = %proposal_epoch,
"Proposer shuffling cache miss for proposer prep"
);
let head = self.canonical_head.cached_head();
Ok((
head.head_state_root(),
@@ -6557,6 +6562,26 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
/// This function provides safe and efficient multi-threaded access to the beacon proposer cache.
///
/// The arguments are:
///
/// - `shuffling_decision_block`: The block root of the decision block for the desired proposer
/// shuffling. This should be computed using one of the methods for computing proposer
/// shuffling decision roots, e.g. `BeaconState::proposer_shuffling_decision_root_at_epoch`.
/// - `proposal_epoch`: The epoch at which the proposer shuffling is required.
/// - `accessor`: A closure to run against the proposers for the selected epoch. Usually this
/// closure just grabs a single proposer, or takes the vec of proposers for the epoch.
/// - `state_provider`: A closure to compute a state suitable for determining the shuffling.
/// This closure is evaluated lazily ONLY in the case that a cache miss occurs. It is
/// recommended for code that wants to keep track of cache misses to produce a log and/or
/// increment a metric inside this closure .
///
/// This function makes use of closures in order to efficiently handle concurrent accesses to
/// the cache.
///
/// The error type is polymorphic, if in doubt you can use `BeaconChainError`. You might need
/// to use a turbofish if type inference can't work it out.
pub fn with_proposer_cache<V, E: From<BeaconChainError> + From<BeaconStateError>>(
&self,
shuffling_decision_block: Hash256,
@@ -6575,12 +6600,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// If it is already initialised, then `get_or_try_init` will return immediately without
// executing the initialisation code at all.
let epoch_block_proposers = cache_entry.get_or_try_init(|| {
debug!(
?shuffling_decision_block,
%proposal_epoch,
"Proposer shuffling cache miss"
);
// Fetch the state on-demand if the required epoch was missing from the cache.
// If the caller wants to not compute the state they must return an error here and then
// catch it at the call site.
@@ -6610,11 +6629,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
let proposers = state.get_beacon_proposer_indices(proposal_epoch, &self.spec)?;
Ok::<_, E>(EpochBlockProposers::new(
proposal_epoch,
state.fork(),
proposers,
))
// Use fork_at_epoch rather than the state's fork, because post-Fulu we may not have
// advanced the state completely into the new epoch.
let fork = self.spec.fork_at_epoch(proposal_epoch);
debug!(
?shuffling_decision_block,
epoch = %proposal_epoch,
"Priming proposer shuffling cache"
);
Ok::<_, E>(EpochBlockProposers::new(proposal_epoch, fork, proposers))
})?;
// Run the accessor function on the computed epoch proposers.

View File

@@ -17,6 +17,7 @@ use smallvec::SmallVec;
use state_processing::state_advance::partial_state_advance;
use std::num::NonZeroUsize;
use std::sync::Arc;
use tracing::instrument;
use types::non_zero_usize::new_non_zero_usize;
use types::{
BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, Hash256, Slot, Unsigned,
@@ -199,11 +200,14 @@ pub fn compute_proposer_duties_from_head<T: BeaconChainTypes>(
.map_err(BeaconChainError::from)?;
let dependent_root = state
// The only block which decides its own shuffling is the genesis block.
.proposer_shuffling_decision_root(chain.genesis_block_root, &chain.spec)
.proposer_shuffling_decision_root_at_epoch(request_epoch, head_block_root, &chain.spec)
.map_err(BeaconChainError::from)?;
Ok((indices, dependent_root, execution_status, state.fork()))
// Use fork_at_epoch rather than the state's fork, because post-Fulu we may not have advanced
// the state completely into the new epoch.
let fork = chain.spec.fork_at_epoch(request_epoch);
Ok((indices, dependent_root, execution_status, fork))
}
/// If required, advance `state` to the epoch required to determine proposer indices in `target_epoch`.
@@ -214,6 +218,7 @@ pub fn compute_proposer_duties_from_head<T: BeaconChainTypes>(
/// - No-op if `state.current_epoch() == target_epoch`.
/// - It must be the case that `state.canonical_root() == state_root`, but this function will not
/// check that.
#[instrument(skip_all, fields(?state_root, %target_epoch, state_slot = %state.slot()), level = "debug")]
pub fn ensure_state_can_determine_proposers_for_epoch<E: EthSpec>(
state: &mut BeaconState<E>,
state_root: Hash256,
@@ -234,14 +239,6 @@ pub fn ensure_state_can_determine_proposers_for_epoch<E: EthSpec>(
if state.current_epoch() > maximum_epoch {
Err(BeaconStateError::SlotOutOfBounds.into())
} else if state.current_epoch() >= minimum_epoch {
if target_epoch > state.current_epoch() {
let target_slot = target_epoch.start_slot(E::slots_per_epoch());
// Advance the state into the same epoch as the block. Use the "partial" method since state
// roots are not important for proposer/attester shuffling.
partial_state_advance(state, Some(state_root), target_slot, spec)
.map_err(BeaconChainError::from)?;
}
Ok(())
} else {
// State's current epoch is less than the minimum epoch.

View File

@@ -950,8 +950,6 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
let proposer_shuffling_decision_block =
parent_block.proposer_shuffling_root_for_child_block(block_epoch, &chain.spec);
// We assign to a variable instead of using `if let Some` directly to ensure we drop the
// write lock before trying to acquire it again in the `else` clause.
let block_slot = block.slot();
let mut opt_parent = None;
let proposer = chain.with_proposer_cache::<_, BlockError>(

View File

@@ -333,25 +333,54 @@ fn advance_head<T: BeaconChainTypes>(beacon_chain: &Arc<BeaconChain<T>>) -> Resu
.build_committee_cache(RelativeEpoch::Next, &beacon_chain.spec)
.map_err(BeaconChainError::from)?;
// If the `pre_state` is in a later epoch than `state`, pre-emptively add the proposer shuffling
// for the state's current epoch and the committee cache for the state's next epoch.
// The state root is required to prime the proposer cache AND for writing it to disk.
let advanced_state_root = state.update_tree_hash_cache()?;
// If the `pre_state` is in a later epoch than `state`, pre-emptively update the proposer
// shuffling and attester shuffling caches.
if initial_epoch < state.current_epoch() {
// Update the proposer cache.
//
// We supply the `head_block_root` as the decision block since the prior `if` statement guarantees
// the head root is the latest block from the prior epoch.
beacon_chain
.beacon_proposer_cache
.lock()
.insert(
state.current_epoch(),
head_block_root,
state
.get_beacon_proposer_indices(state.current_epoch(), &beacon_chain.spec)
.map_err(BeaconChainError::from)?,
state.fork(),
)
.map_err(BeaconChainError::from)?;
// Include the proposer shuffling from the current epoch, which is likely to be useful
// pre-Fulu, and probably redundant post-Fulu (it should already have been in the cache).
let current_epoch_decision_root = state.proposer_shuffling_decision_root_at_epoch(
state.current_epoch(),
head_block_root,
&beacon_chain.spec,
)?;
beacon_chain.with_proposer_cache(
current_epoch_decision_root,
state.current_epoch(),
|_| Ok(()),
|| {
debug!(
shuffling_decision_root = ?current_epoch_decision_root,
epoch = %state.current_epoch(),
"Computing current epoch proposer shuffling in state advance"
);
Ok::<_, Error>((advanced_state_root, state.clone()))
},
)?;
// For epochs *greater than* the Fulu fork epoch, we have also determined the proposer
// shuffling for the next epoch.
let next_epoch = state.next_epoch()?;
let next_epoch_decision_root = state.proposer_shuffling_decision_root_at_epoch(
next_epoch,
head_block_root,
&beacon_chain.spec,
)?;
beacon_chain.with_proposer_cache(
next_epoch_decision_root,
next_epoch,
|_| Ok(()),
|| {
debug!(
shuffling_decision_root = ?next_epoch_decision_root,
epoch = %next_epoch,
"Computing next epoch proposer shuffling in state advance"
);
Ok::<_, Error>((advanced_state_root, state.clone()))
},
)?;
// Update the attester cache.
let shuffling_id =
@@ -406,7 +435,6 @@ fn advance_head<T: BeaconChainTypes>(beacon_chain: &Arc<BeaconChain<T>>) -> Resu
// even if we race with the deletion of this state by the finalization pruning code, the worst
// case is we end up with a finalized state stored, that will get pruned the next time pruning
// runs.
let advanced_state_root = state.update_tree_hash_cache()?;
beacon_chain.store.put_state(&advanced_state_root, &state)?;
debug!(

View File

@@ -13,7 +13,11 @@ use beacon_chain::test_utils::{
use beacon_chain::{
BeaconChain, BeaconChainError, BeaconChainTypes, BeaconSnapshot, BlockError, ChainConfig,
NotifyExecutionLayer, ServerSentEventHandler, WhenSlotSkipped,
data_availability_checker::MaybeAvailableBlock, historical_blocks::HistoricalBlockError,
beacon_proposer_cache::{
compute_proposer_duties_from_head, ensure_state_can_determine_proposers_for_epoch,
},
data_availability_checker::MaybeAvailableBlock,
historical_blocks::HistoricalBlockError,
migrate::MigratorConfig,
};
use logging::create_test_tracing_subscriber;
@@ -1273,19 +1277,34 @@ async fn proposer_shuffling_root_consistency_test(
#[tokio::test]
async fn proposer_shuffling_root_consistency_same_epoch() {
let spec = test_spec::<E>();
proposer_shuffling_root_consistency_test(spec, 32, 39).await;
proposer_shuffling_root_consistency_test(
spec,
4 * E::slots_per_epoch(),
5 * E::slots_per_epoch() - 1,
)
.await;
}
#[tokio::test]
async fn proposer_shuffling_root_consistency_next_epoch() {
let spec = test_spec::<E>();
proposer_shuffling_root_consistency_test(spec, 32, 47).await;
proposer_shuffling_root_consistency_test(
spec,
4 * E::slots_per_epoch(),
6 * E::slots_per_epoch() - 1,
)
.await;
}
#[tokio::test]
async fn proposer_shuffling_root_consistency_two_epochs() {
let spec = test_spec::<E>();
proposer_shuffling_root_consistency_test(spec, 32, 55).await;
proposer_shuffling_root_consistency_test(
spec,
4 * E::slots_per_epoch(),
7 * E::slots_per_epoch() - 1,
)
.await;
}
#[tokio::test]
@@ -1501,6 +1520,120 @@ async fn proposer_shuffling_changing_with_lookahead() {
);
}
#[tokio::test]
async fn proposer_duties_from_head_fulu() {
let spec = ForkName::Fulu.make_genesis_spec(E::default_spec());
let db_path = tempdir().unwrap();
let store = get_store_generic(&db_path, Default::default(), spec.clone());
let validators_keypairs =
types::test_utils::generate_deterministic_keypairs(LOW_VALIDATOR_COUNT);
let harness = TestHarness::builder(MinimalEthSpec)
.spec(spec.into())
.keypairs(validators_keypairs)
.fresh_disk_store(store)
.mock_execution_layer()
.build();
let spec = &harness.chain.spec;
let initial_blocks = E::slots_per_epoch() * 3;
// Build chain out to parent block.
let initial_slots: Vec<Slot> = (1..=initial_blocks).map(Into::into).collect();
let (state, state_root) = harness.get_current_state_and_root();
let all_validators = harness.get_all_validators();
let (_, _, head_block_root, head_state) = harness
.add_attested_blocks_at_slots(state, state_root, &initial_slots, &all_validators)
.await;
// Compute the proposer duties at the next epoch from the head
let next_epoch = head_state.next_epoch().unwrap();
let (_indices, dependent_root, _, fork) =
compute_proposer_duties_from_head(next_epoch, &harness.chain).unwrap();
assert_eq!(
dependent_root,
head_state
.proposer_shuffling_decision_root_at_epoch(next_epoch, head_block_root.into(), spec)
.unwrap()
);
assert_eq!(fork, head_state.fork());
}
/// Test that we can compute the proposer shuffling for the Gloas fork epoch itself using lookahead!
#[tokio::test]
async fn proposer_lookahead_gloas_fork_epoch() {
let gloas_fork_epoch = Epoch::new(4);
let mut spec = ForkName::Fulu.make_genesis_spec(E::default_spec());
spec.gloas_fork_epoch = Some(gloas_fork_epoch);
let db_path = tempdir().unwrap();
let store = get_store_generic(&db_path, Default::default(), spec.clone());
let validators_keypairs =
types::test_utils::generate_deterministic_keypairs(LOW_VALIDATOR_COUNT);
let harness = TestHarness::builder(E::default())
.spec(spec.into())
.keypairs(validators_keypairs)
.fresh_disk_store(store)
.mock_execution_layer()
.build();
let spec = &harness.chain.spec;
let initial_blocks = (gloas_fork_epoch - 1)
.start_slot(E::slots_per_epoch())
.as_u64();
// Build chain out to parent block.
let initial_slots: Vec<Slot> = (1..=initial_blocks).map(Into::into).collect();
let (state, state_root) = harness.get_current_state_and_root();
let all_validators = harness.get_all_validators();
let (_, _, head_block_root, mut head_state) = harness
.add_attested_blocks_at_slots(state, state_root, &initial_slots, &all_validators)
.await;
let head_state_root = head_state.canonical_root().unwrap();
// Check that we have access to the next epoch shuffling according to
// `ensure_state_can_determine_proposers_for_epoch`.
ensure_state_can_determine_proposers_for_epoch(
&mut head_state,
head_state_root,
gloas_fork_epoch,
spec,
)
.unwrap();
assert_eq!(head_state.current_epoch(), gloas_fork_epoch - 1);
// Compute the proposer duties at the fork epoch from the head.
let (indices, dependent_root, _, fork) =
compute_proposer_duties_from_head(gloas_fork_epoch, &harness.chain).unwrap();
assert_eq!(
dependent_root,
head_state
.proposer_shuffling_decision_root_at_epoch(
gloas_fork_epoch,
head_block_root.into(),
spec
)
.unwrap()
);
assert_ne!(fork, head_state.fork());
assert_eq!(fork, spec.fork_at_epoch(gloas_fork_epoch));
// Build a block in the Gloas fork epoch and assert that the shuffling does not change.
let gloas_slots = vec![gloas_fork_epoch.start_slot(E::slots_per_epoch())];
let (_, _, _, _) = harness
.add_attested_blocks_at_slots(head_state, head_state_root, &gloas_slots, &all_validators)
.await;
let (no_lookahead_indices, no_lookahead_dependent_root, _, no_lookahead_fork) =
compute_proposer_duties_from_head(gloas_fork_epoch, &harness.chain).unwrap();
assert_eq!(no_lookahead_indices, indices);
assert_eq!(no_lookahead_dependent_root, dependent_root);
assert_eq!(no_lookahead_fork, fork);
}
// Ensure blocks from abandoned forks are pruned from the Hot DB
#[tokio::test]
async fn prunes_abandoned_fork_between_two_finalized_checkpoints() {

View File

@@ -103,14 +103,6 @@ fn try_proposer_duties_from_cache<T: BeaconChainTypes>(
let head_block = &head.snapshot.beacon_block;
let head_block_root = head.head_block_root();
let head_epoch = head_block.slot().epoch(T::EthSpec::slots_per_epoch());
let head_decision_root = head
.snapshot
.beacon_state
.proposer_shuffling_decision_root(head_block_root, &chain.spec)
.map_err(warp_utils::reject::beacon_state_error)?;
let execution_optimistic = chain
.is_optimistic_or_invalid_head_block(head_block)
.map_err(warp_utils::reject::unhandled_error)?;
// This code path can't handle requests for past epochs.
if head_epoch > request_epoch {
@@ -119,6 +111,15 @@ fn try_proposer_duties_from_cache<T: BeaconChainTypes>(
)));
}
let head_decision_root = head
.snapshot
.beacon_state
.proposer_shuffling_decision_root_at_epoch(request_epoch, head_block_root, &chain.spec)
.map_err(warp_utils::reject::beacon_state_error)?;
let execution_optimistic = chain
.is_optimistic_or_invalid_head_block(head_block)
.map_err(warp_utils::reject::unhandled_error)?;
chain
.beacon_proposer_cache
.lock()

View File

@@ -940,3 +940,110 @@ async fn queue_attestations_from_http() {
attestation_future.await.unwrap();
}
// Test that a request for next epoch proposer duties suceeds when the current slot clock is within
// gossip clock disparity (500ms) of the new epoch.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn proposer_duties_with_gossip_tolerance() {
let validator_count = 24;
let tester = InteractiveTester::<E>::new(None, validator_count).await;
let harness = &tester.harness;
let spec = &harness.spec;
let client = &tester.client;
let num_initial = 4 * E::slots_per_epoch() - 1;
let next_epoch_start_slot = Slot::new(num_initial + 1);
harness.advance_slot();
harness
.extend_chain_with_sync(
num_initial as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
SyncCommitteeStrategy::NoValidators,
LightClientStrategy::Disabled,
)
.await;
assert_eq!(harness.chain.slot().unwrap(), num_initial);
// Set the clock to just before the next epoch.
harness.chain.slot_clock.advance_time(
Duration::from_secs(spec.seconds_per_slot) - spec.maximum_gossip_clock_disparity(),
);
assert_eq!(
harness
.chain
.slot_clock
.now_with_future_tolerance(spec.maximum_gossip_clock_disparity())
.unwrap(),
next_epoch_start_slot
);
let head_state = harness.get_current_state();
let head_block_root = harness.head_block_root();
let tolerant_current_epoch = next_epoch_start_slot.epoch(E::slots_per_epoch());
// This is a regression test for the bug described here:
// https://github.com/sigp/lighthouse/pull/8130/files#r2386594566
//
// To trigger it, we need to prime the proposer shuffling cache with an incorrect entry which
// the previous code would be liable to lookup due to the bugs in its decision root calculation.
let wrong_decision_root = head_state
.proposer_shuffling_decision_root(head_block_root, spec)
.unwrap();
let wrong_proposer_indices = vec![0; E::slots_per_epoch() as usize];
harness
.chain
.beacon_proposer_cache
.lock()
.insert(
tolerant_current_epoch,
wrong_decision_root,
wrong_proposer_indices.clone(),
head_state.fork(),
)
.unwrap();
// Request the proposer duties.
let proposer_duties_tolerant_current_epoch = client
.get_validator_duties_proposer(tolerant_current_epoch)
.await
.unwrap();
assert_eq!(
proposer_duties_tolerant_current_epoch.dependent_root,
head_state
.proposer_shuffling_decision_root_at_epoch(
tolerant_current_epoch,
head_block_root,
spec
)
.unwrap()
);
assert_ne!(
proposer_duties_tolerant_current_epoch
.data
.iter()
.map(|data| data.validator_index as usize)
.collect::<Vec<_>>(),
wrong_proposer_indices,
);
// We should get the exact same result after properly advancing into the epoch.
harness
.chain
.slot_clock
.advance_time(spec.maximum_gossip_clock_disparity());
assert_eq!(harness.chain.slot().unwrap(), next_epoch_start_slot);
let proposer_duties_current_epoch = client
.get_validator_duties_proposer(tolerant_current_epoch)
.await
.unwrap();
assert_eq!(
proposer_duties_tolerant_current_epoch,
proposer_duties_current_epoch
);
}

View File

@@ -476,15 +476,23 @@ impl ChainSpec {
/// Returns a full `Fork` struct for a given epoch.
pub fn fork_at_epoch(&self, epoch: Epoch) -> Fork {
let current_fork_name = self.fork_name_at_epoch(epoch);
let previous_fork_name = current_fork_name.previous_fork().unwrap_or(ForkName::Base);
let epoch = self
let fork_epoch = self
.fork_epoch(current_fork_name)
.unwrap_or_else(|| Epoch::new(0));
// At genesis the Fork is initialised with two copies of the same value for both
// `previous_version` and `current_version` (see `initialize_beacon_state_from_eth1`).
let previous_fork_name = if fork_epoch == 0 {
current_fork_name
} else {
current_fork_name.previous_fork().unwrap_or(ForkName::Base)
};
Fork {
previous_version: self.fork_version_for_name(previous_fork_name),
current_version: self.fork_version_for_name(current_fork_name),
epoch,
epoch: fork_epoch,
}
}
@@ -3010,9 +3018,11 @@ mod yaml_tests {
fn proposer_shuffling_decision_root_around_epoch_boundary() {
type E = MainnetEthSpec;
let fulu_fork_epoch = 5;
let gloas_fork_epoch = 10;
let spec = {
let mut spec = ForkName::Electra.make_genesis_spec(E::default_spec());
spec.fulu_fork_epoch = Some(Epoch::new(fulu_fork_epoch));
spec.gloas_fork_epoch = Some(Epoch::new(gloas_fork_epoch));
Arc::new(spec)
};
@@ -3026,7 +3036,7 @@ mod yaml_tests {
}
// For epochs after Fulu, the decision slot is the end of the epoch two epochs prior.
for epoch in ((fulu_fork_epoch + 1)..(fulu_fork_epoch + 10)).map(Epoch::new) {
for epoch in ((fulu_fork_epoch + 1)..=(gloas_fork_epoch + 1)).map(Epoch::new) {
assert_eq!(
spec.proposer_shuffling_decision_slot::<E>(epoch),
(epoch - 1).start_slot(E::slots_per_epoch()) - 1