Enable proposer boost re-orging (#2860)

## Proposed Changes

With proposer boosting implemented (#2822) we have an opportunity to re-org out late blocks.

This PR adds three flags to the BN to control this behaviour:

* `--disable-proposer-reorgs`: turn aggressive re-orging off (it's on by default).
* `--proposer-reorg-threshold N`: attempt to orphan blocks with less than N% of the committee vote. If this parameter isn't set then N defaults to 20% when the feature is enabled.
* `--proposer-reorg-epochs-since-finalization N`: only attempt to re-org late blocks when the number of epochs since finalization is less than or equal to N. The default is 2 epochs, meaning re-orgs will only be attempted when the chain is finalizing optimally.

For safety Lighthouse will only attempt a re-org under very specific conditions:

1. The block being proposed is 1 slot after the canonical head, and the canonical head is 1 slot after its parent. i.e. at slot `n + 1` rather than building on the block from slot `n` we build on the block from slot `n - 1`.
2. The current canonical head received less than N% of the committee vote. N should be set depending on the proposer boost fraction itself, the fraction of the network that is believed to be applying it, and the size of the largest entity that could be hoarding votes.
3. The current canonical head arrived after the attestation deadline from our perspective. This condition was only added to support suppression of forkchoiceUpdated messages, but makes intuitive sense.
4. The block is being proposed in the first 2 seconds of the slot. This gives it time to propagate and receive the proposer boost.


## Additional Info

For the initial idea and background, see: https://github.com/ethereum/consensus-specs/pull/2353#issuecomment-950238004

There is also a specification for this feature here: https://github.com/ethereum/consensus-specs/pull/3034

Co-authored-by: Michael Sproul <micsproul@gmail.com>
Co-authored-by: pawan <pawandhananjay@gmail.com>
This commit is contained in:
Michael Sproul
2022-12-13 09:57:26 +00:00
parent 6f79263a21
commit 775d222299
55 changed files with 2309 additions and 341 deletions

View File

@@ -44,9 +44,8 @@ use crate::observed_operations::{ObservationOutcome, ObservedOperations};
use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT};
use crate::persisted_fork_choice::PersistedForkChoice;
use crate::pre_finalization_cache::PreFinalizationBlockCache;
use crate::proposer_prep_service::PAYLOAD_PREPARATION_LOOKAHEAD_FACTOR;
use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
use crate::snapshot_cache::SnapshotCache;
use crate::snapshot_cache::{BlockProductionPreState, SnapshotCache};
use crate::sync_committee_verification::{
Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution,
};
@@ -56,9 +55,7 @@ use crate::validator_monitor::{
HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS,
};
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::BeaconForkChoiceStore;
use crate::BeaconSnapshot;
use crate::{metrics, BeaconChainError};
use crate::{metrics, BeaconChainError, BeaconForkChoiceStore, BeaconSnapshot, CachedHead};
use eth2::types::{EventKind, SseBlock, SyncDuty};
use execution_layer::{
BuilderParams, ChainHealth, ExecutionLayer, FailedCondition, PayloadAttributes, PayloadStatus,
@@ -72,7 +69,7 @@ use itertools::process_results;
use itertools::Itertools;
use operation_pool::{AttestationRef, OperationPool, PersistedOperationPool};
use parking_lot::{Mutex, RwLock};
use proto_array::CountUnrealizedFull;
use proto_array::{CountUnrealizedFull, DoNotReOrg, ProposerHeadError};
use safe_arith::SafeArith;
use slasher::Slasher;
use slog::{crit, debug, error, info, trace, warn, Logger};
@@ -103,6 +100,7 @@ use store::{
use task_executor::{ShutdownReason, TaskExecutor};
use tree_hash::TreeHash;
use types::beacon_state::CloneConfig;
use types::consts::merge::INTERVALS_PER_SLOT;
use types::*;
pub use crate::canonical_head::{CanonicalHead, CanonicalHeadRwLock};
@@ -127,6 +125,12 @@ pub const VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1)
/// The timeout for the eth1 finalization cache
pub const ETH1_FINALIZATION_CACHE_LOCK_TIMEOUT: Duration = Duration::from_millis(200);
/// The latest delay from the start of the slot at which to attempt a 1-slot re-org.
fn max_re_org_slot_delay(seconds_per_slot: u64) -> Duration {
// Allow at least half of the attestation deadline for the block to propagate.
Duration::from_secs(seconds_per_slot) / INTERVALS_PER_SLOT as u32 / 2
}
// These keys are all zero because they get stored in different columns, see `DBColumn` type.
pub const BEACON_CHAIN_DB_KEY: Hash256 = Hash256::zero();
pub const OP_POOL_DB_KEY: Hash256 = Hash256::zero();
@@ -188,6 +192,21 @@ pub enum ProduceBlockVerification {
NoVerification,
}
/// Payload attributes for which the `beacon_chain` crate is responsible.
pub struct PrePayloadAttributes {
pub proposer_index: u64,
pub prev_randao: Hash256,
}
/// Define whether a forkchoiceUpdate needs to be checked for an override (`Yes`) or has already
/// been checked (`AlreadyApplied`). It is safe to specify `Yes` even if re-orgs are disabled.
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum OverrideForkchoiceUpdate {
#[default]
Yes,
AlreadyApplied,
}
/// The accepted clock drift for nodes gossiping blocks and attestations. See:
///
/// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/p2p-interface.md#configuration
@@ -2756,6 +2775,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
if !payload_verification_status.is_optimistic()
&& block.slot() + EARLY_ATTESTER_CACHE_HISTORIC_SLOTS >= current_slot
{
let fork_choice_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_FORK_CHOICE);
match fork_choice.get_head(current_slot, &self.spec) {
// This block became the head, add it to the early attester cache.
Ok(new_head_root) if new_head_root == block_root => {
@@ -2789,6 +2809,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"error" => ?e
),
}
drop(fork_choice_timer);
}
drop(post_exec_timer);
@@ -3475,6 +3496,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// signed. If we miss the cache or we're producing a block that conflicts with the head,
// fall back to getting the head from `slot - 1`.
let state_load_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_STATE_LOAD_TIMES);
// Atomically read some values from the head whilst avoiding holding cached head `Arc` any
// longer than necessary.
let (head_slot, head_block_root) = {
@@ -3482,8 +3504,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
(head.head_slot(), head.head_block_root())
};
let (state, state_root_opt) = if head_slot < slot {
// Attempt an aggressive re-org if configured and the conditions are right.
if let Some(re_org_state) = self.get_state_for_re_org(slot, head_slot, head_block_root)
{
info!(
self.log,
"Proposing block to re-org current head";
"slot" => slot,
"head_to_reorg" => %head_block_root,
);
(re_org_state.pre_state, re_org_state.state_root)
}
// Normal case: proposing a block atop the current head. Use the snapshot cache.
if let Some(pre_state) = self
else if let Some(pre_state) = self
.snapshot_cache
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.and_then(|snapshot_cache| {
@@ -3523,6 +3556,400 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok((state, state_root_opt))
}
/// Fetch the beacon state to use for producing a block if a 1-slot proposer re-org is viable.
///
/// This function will return `None` if proposer re-orgs are disabled.
fn get_state_for_re_org(
&self,
slot: Slot,
head_slot: Slot,
canonical_head: Hash256,
) -> Option<BlockProductionPreState<T::EthSpec>> {
let re_org_threshold = self.config.re_org_threshold?;
if self.spec.proposer_score_boost.is_none() {
warn!(
self.log,
"Ignoring proposer re-org configuration";
"reason" => "this network does not have proposer boost enabled"
);
return None;
}
let slot_delay = self
.slot_clock
.seconds_from_current_slot_start(self.spec.seconds_per_slot)
.or_else(|| {
warn!(
self.log,
"Not attempting re-org";
"error" => "unable to read slot clock"
);
None
})?;
// Attempt a proposer re-org if:
//
// 1. It seems we have time to propagate and still receive the proposer boost.
// 2. The current head block was seen late.
// 3. The `get_proposer_head` conditions from fork choice pass.
let proposing_on_time = slot_delay < max_re_org_slot_delay(self.spec.seconds_per_slot);
if !proposing_on_time {
debug!(
self.log,
"Not attempting re-org";
"reason" => "not proposing on time",
);
return None;
}
let head_late = self.block_observed_after_attestation_deadline(canonical_head, head_slot);
if !head_late {
debug!(
self.log,
"Not attempting re-org";
"reason" => "head not late"
);
return None;
}
// Is the current head weak and appropriate for re-orging?
let proposer_head_timer =
metrics::start_timer(&metrics::BLOCK_PRODUCTION_GET_PROPOSER_HEAD_TIMES);
let proposer_head = self
.canonical_head
.fork_choice_read_lock()
.get_proposer_head(
slot,
canonical_head,
re_org_threshold,
self.config.re_org_max_epochs_since_finalization,
)
.map_err(|e| match e {
ProposerHeadError::DoNotReOrg(reason) => {
debug!(
self.log,
"Not attempting re-org";
"reason" => %reason,
);
}
ProposerHeadError::Error(e) => {
warn!(
self.log,
"Not attempting re-org";
"error" => ?e,
);
}
})
.ok()?;
drop(proposer_head_timer);
let re_org_parent_block = proposer_head.parent_node.root;
// Only attempt a re-org if we hit the snapshot cache.
let pre_state = self
.snapshot_cache
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.and_then(|snapshot_cache| {
snapshot_cache.get_state_for_block_production(re_org_parent_block)
})
.or_else(|| {
debug!(
self.log,
"Not attempting re-org";
"reason" => "missed snapshot cache",
"parent_block" => ?re_org_parent_block,
);
None
})?;
info!(
self.log,
"Attempting re-org due to weak head";
"weak_head" => ?canonical_head,
"parent" => ?re_org_parent_block,
"head_weight" => proposer_head.head_node.weight,
"threshold_weight" => proposer_head.re_org_weight_threshold
);
Some(pre_state)
}
/// Get the proposer index and `prev_randao` value for a proposal at slot `proposal_slot`.
///
/// The `proposer_head` may be the head block of `cached_head` or its parent. An error will
/// be returned for any other value.
pub fn get_pre_payload_attributes(
&self,
proposal_slot: Slot,
proposer_head: Hash256,
cached_head: &CachedHead<T::EthSpec>,
) -> Result<Option<PrePayloadAttributes>, Error> {
let proposal_epoch = proposal_slot.epoch(T::EthSpec::slots_per_epoch());
let head_block_root = cached_head.head_block_root();
let parent_block_root = cached_head.parent_block_root();
// The proposer head must be equal to the canonical head or its parent.
if proposer_head != head_block_root && proposer_head != parent_block_root {
warn!(
self.log,
"Unable to compute payload attributes";
"block_root" => ?proposer_head,
"head_block_root" => ?head_block_root,
);
return Ok(None);
}
// Compute the proposer index.
let head_epoch = cached_head.head_slot().epoch(T::EthSpec::slots_per_epoch());
let shuffling_decision_root = if head_epoch == proposal_epoch {
cached_head
.snapshot
.beacon_state
.proposer_shuffling_decision_root(proposer_head)?
} else {
proposer_head
};
let cached_proposer = self
.beacon_proposer_cache
.lock()
.get_slot::<T::EthSpec>(shuffling_decision_root, proposal_slot);
let proposer_index = if let Some(proposer) = cached_proposer {
proposer.index as u64
} else {
if head_epoch + 2 < proposal_epoch {
warn!(
self.log,
"Skipping proposer preparation";
"msg" => "this is a non-critical issue that can happen on unhealthy nodes or \
networks.",
"proposal_epoch" => proposal_epoch,
"head_epoch" => head_epoch,
);
// Don't skip the head forward more than two epochs. This avoids burdening an
// unhealthy node.
//
// Although this node might miss out on preparing for a proposal, they should still
// be able to propose. This will prioritise beacon chain health over efficient
// packing of execution blocks.
return Ok(None);
}
let (proposers, decision_root, _, fork) =
compute_proposer_duties_from_head(proposal_epoch, self)?;
let proposer_offset = (proposal_slot % T::EthSpec::slots_per_epoch()).as_usize();
let proposer = *proposers
.get(proposer_offset)
.ok_or(BeaconChainError::NoProposerForSlot(proposal_slot))?;
self.beacon_proposer_cache.lock().insert(
proposal_epoch,
decision_root,
proposers,
fork,
)?;
// It's possible that the head changes whilst computing these duties. If so, abandon
// this routine since the change of head would have also spawned another instance of
// this routine.
//
// Exit now, after updating the cache.
if decision_root != shuffling_decision_root {
warn!(
self.log,
"Head changed during proposer preparation";
);
return Ok(None);
}
proposer as u64
};
// Get the `prev_randao` value.
let prev_randao = if proposer_head == parent_block_root {
cached_head.parent_random()
} else {
cached_head.head_random()
}?;
Ok(Some(PrePayloadAttributes {
proposer_index,
prev_randao,
}))
}
/// Determine whether a fork choice update to the execution layer should be overridden.
///
/// This is *only* necessary when proposer re-orgs are enabled, because we have to prevent the
/// execution layer from enshrining the block we want to re-org as the head.
///
/// This function uses heuristics that align quite closely but not exactly with the re-org
/// conditions set out in `get_state_for_re_org` and `get_proposer_head`. The differences are
/// documented below.
fn overridden_forkchoice_update_params(
&self,
canonical_forkchoice_params: ForkchoiceUpdateParameters,
) -> Result<ForkchoiceUpdateParameters, Error> {
self.overridden_forkchoice_update_params_or_failure_reason(&canonical_forkchoice_params)
.or_else(|e| match e {
ProposerHeadError::DoNotReOrg(reason) => {
trace!(
self.log,
"Not suppressing fork choice update";
"reason" => %reason,
);
Ok(canonical_forkchoice_params)
}
ProposerHeadError::Error(e) => Err(e),
})
}
fn overridden_forkchoice_update_params_or_failure_reason(
&self,
canonical_forkchoice_params: &ForkchoiceUpdateParameters,
) -> Result<ForkchoiceUpdateParameters, ProposerHeadError<Error>> {
let _timer = metrics::start_timer(&metrics::FORK_CHOICE_OVERRIDE_FCU_TIMES);
// Never override if proposer re-orgs are disabled.
let re_org_threshold = self
.config
.re_org_threshold
.ok_or(DoNotReOrg::ReOrgsDisabled)?;
let head_block_root = canonical_forkchoice_params.head_root;
// Perform initial checks and load the relevant info from fork choice.
let info = self
.canonical_head
.fork_choice_read_lock()
.get_preliminary_proposer_head(
head_block_root,
re_org_threshold,
self.config.re_org_max_epochs_since_finalization,
)
.map_err(|e| e.map_inner_error(Error::ProposerHeadForkChoiceError))?;
// The slot of our potential re-org block is always 1 greater than the head block because we
// only attempt single-slot re-orgs.
let head_slot = info.head_node.slot;
let re_org_block_slot = head_slot + 1;
let fork_choice_slot = info.current_slot;
// If a re-orging proposal isn't made by the `max_re_org_slot_delay` then we give up
// and allow the fork choice update for the canonical head through so that we may attest
// correctly.
let current_slot_ok = if head_slot == fork_choice_slot {
true
} else if re_org_block_slot == fork_choice_slot {
self.slot_clock
.start_of(re_org_block_slot)
.and_then(|slot_start| {
let now = self.slot_clock.now_duration()?;
let slot_delay = now.saturating_sub(slot_start);
Some(slot_delay <= max_re_org_slot_delay(self.spec.seconds_per_slot))
})
.unwrap_or(false)
} else {
false
};
if !current_slot_ok {
return Err(DoNotReOrg::HeadDistance.into());
}
// Only attempt a re-org if we have a proposer registered for the re-org slot.
let proposing_at_re_org_slot = {
// The proposer shuffling has the same decision root as the next epoch attestation
// shuffling. We know our re-org block is not on the epoch boundary, so it has the
// same proposer shuffling as the head (but not necessarily the parent which may lie
// in the previous epoch).
let shuffling_decision_root = info
.head_node
.next_epoch_shuffling_id
.shuffling_decision_block;
let proposer_index = self
.beacon_proposer_cache
.lock()
.get_slot::<T::EthSpec>(shuffling_decision_root, re_org_block_slot)
.ok_or_else(|| {
debug!(
self.log,
"Fork choice override proposer shuffling miss";
"slot" => re_org_block_slot,
"decision_root" => ?shuffling_decision_root,
);
DoNotReOrg::NotProposing
})?
.index as u64;
self.execution_layer
.as_ref()
.ok_or(ProposerHeadError::Error(Error::ExecutionLayerMissing))?
.has_proposer_preparation_data_blocking(proposer_index)
};
if !proposing_at_re_org_slot {
return Err(DoNotReOrg::NotProposing.into());
}
// If the current slot is already equal to the proposal slot (or we are in the tail end of
// the prior slot), then check the actual weight of the head against the re-org threshold.
let head_weak = if fork_choice_slot == re_org_block_slot {
info.head_node.weight < info.re_org_weight_threshold
} else {
true
};
if !head_weak {
return Err(DoNotReOrg::HeadNotWeak {
head_weight: info.head_node.weight,
re_org_weight_threshold: info.re_org_weight_threshold,
}
.into());
}
// Check that the head block arrived late and is vulnerable to a re-org. This check is only
// a heuristic compared to the proper weight check in `get_state_for_re_org`, the reason
// being that we may have only *just* received the block and not yet processed any
// attestations for it. We also can't dequeue attestations for the block during the
// current slot, which would be necessary for determining its weight.
let head_block_late =
self.block_observed_after_attestation_deadline(head_block_root, head_slot);
if !head_block_late {
return Err(DoNotReOrg::HeadNotLate.into());
}
let parent_head_hash = info.parent_node.execution_status.block_hash();
let forkchoice_update_params = ForkchoiceUpdateParameters {
head_root: info.parent_node.root,
head_hash: parent_head_hash,
justified_hash: canonical_forkchoice_params.justified_hash,
finalized_hash: canonical_forkchoice_params.finalized_hash,
};
debug!(
self.log,
"Fork choice update overridden";
"canonical_head" => ?head_block_root,
"override" => ?info.parent_node.root,
"slot" => fork_choice_slot,
);
Ok(forkchoice_update_params)
}
/// Check if the block with `block_root` was observed after the attestation deadline of `slot`.
fn block_observed_after_attestation_deadline(&self, block_root: Hash256, slot: Slot) -> bool {
let block_delays = self.block_times_cache.read().get_block_delays(
block_root,
self.slot_clock
.start_of(slot)
.unwrap_or_else(|| Duration::from_secs(0)),
);
block_delays.observed.map_or(false, |delay| {
delay > self.slot_clock.unagg_attestation_production_delay()
})
}
/// Produce a block for some `slot` upon the given `state`.
///
/// Typically the `self.produce_block()` function should be used, instead of calling this
@@ -4085,17 +4512,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// The `PayloadAttributes` are used by the EL to give it a look-ahead for preparing an optimal
/// set of transactions for a new `ExecutionPayload`.
///
/// This function will result in a call to `forkchoiceUpdated` on the EL if:
///
/// 1. We're in the tail-end of the slot (as defined by PAYLOAD_PREPARATION_LOOKAHEAD_FACTOR)
/// 2. The head block is one slot (or less) behind the prepare slot (e.g., we're preparing for
/// the next slot and the block at the current slot is already known).
/// This function will result in a call to `forkchoiceUpdated` on the EL if we're in the
/// tail-end of the slot (as defined by `self.config.prepare_payload_lookahead`).
pub async fn prepare_beacon_proposer(
self: &Arc<Self>,
current_slot: Slot,
) -> Result<(), Error> {
let prepare_slot = current_slot + 1;
let prepare_epoch = prepare_slot.epoch(T::EthSpec::slots_per_epoch());
// There's no need to run the proposer preparation routine before the bellatrix fork.
if self.slot_is_prior_to_bellatrix(prepare_slot) {
@@ -4113,158 +4536,99 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Ok(());
}
// Atomically read some values from the canonical head, whilst avoiding holding the cached
// head `Arc` any longer than necessary.
// Load the cached head and its forkchoice update parameters.
//
// Use a blocking task since blocking the core executor on the canonical head read lock can
// block the core tokio executor.
let chain = self.clone();
let (head_slot, head_root, head_decision_root, head_random, forkchoice_update_params) =
self.spawn_blocking_handle(
let maybe_prep_data = self
.spawn_blocking_handle(
move || {
let cached_head = chain.canonical_head.cached_head();
let head_block_root = cached_head.head_block_root();
let decision_root = cached_head
.snapshot
.beacon_state
.proposer_shuffling_decision_root(head_block_root)?;
Ok::<_, Error>((
cached_head.head_slot(),
head_block_root,
decision_root,
cached_head.head_random()?,
cached_head.forkchoice_update_parameters(),
))
// Don't bother with proposer prep if the head is more than
// `PREPARE_PROPOSER_HISTORIC_EPOCHS` prior to the current slot.
//
// This prevents the routine from running during sync.
let head_slot = cached_head.head_slot();
if head_slot + T::EthSpec::slots_per_epoch() * PREPARE_PROPOSER_HISTORIC_EPOCHS
< current_slot
{
debug!(
chain.log,
"Head too old for proposer prep";
"head_slot" => head_slot,
"current_slot" => current_slot,
);
return Ok(None);
}
let canonical_fcu_params = cached_head.forkchoice_update_parameters();
let fcu_params =
chain.overridden_forkchoice_update_params(canonical_fcu_params)?;
let pre_payload_attributes = chain.get_pre_payload_attributes(
prepare_slot,
fcu_params.head_root,
&cached_head,
)?;
Ok::<_, Error>(Some((fcu_params, pre_payload_attributes)))
},
"prepare_beacon_proposer_fork_choice_read",
"prepare_beacon_proposer_head_read",
)
.await??;
let head_epoch = head_slot.epoch(T::EthSpec::slots_per_epoch());
// Don't bother with proposer prep if the head is more than
// `PREPARE_PROPOSER_HISTORIC_EPOCHS` prior to the current slot.
//
// This prevents the routine from running during sync.
if head_slot + T::EthSpec::slots_per_epoch() * PREPARE_PROPOSER_HISTORIC_EPOCHS
< current_slot
{
debug!(
self.log,
"Head too old for proposer prep";
"head_slot" => head_slot,
"current_slot" => current_slot,
);
return Ok(());
}
// Ensure that the shuffling decision root is correct relative to the epoch we wish to
// query.
let shuffling_decision_root = if head_epoch == prepare_epoch {
head_decision_root
} else {
head_root
};
// Read the proposer from the proposer cache.
let cached_proposer = self
.beacon_proposer_cache
.lock()
.get_slot::<T::EthSpec>(shuffling_decision_root, prepare_slot);
let proposer = if let Some(proposer) = cached_proposer {
proposer.index
} else {
if head_epoch + 2 < prepare_epoch {
warn!(
self.log,
"Skipping proposer preparation";
"msg" => "this is a non-critical issue that can happen on unhealthy nodes or \
networks.",
"prepare_epoch" => prepare_epoch,
"head_epoch" => head_epoch,
);
// Don't skip the head forward more than two epochs. This avoids burdening an
// unhealthy node.
//
// Although this node might miss out on preparing for a proposal, they should still
// be able to propose. This will prioritise beacon chain health over efficient
// packing of execution blocks.
let (forkchoice_update_params, pre_payload_attributes) =
if let Some((fcu, Some(pre_payload))) = maybe_prep_data {
(fcu, pre_payload)
} else {
// Appropriate log messages have already been logged above and in
// `get_pre_payload_attributes`.
return Ok(());
}
let (proposers, decision_root, _, fork) =
compute_proposer_duties_from_head(prepare_epoch, self)?;
let proposer_index = prepare_slot.as_usize() % (T::EthSpec::slots_per_epoch() as usize);
let proposer = *proposers
.get(proposer_index)
.ok_or(BeaconChainError::NoProposerForSlot(prepare_slot))?;
self.beacon_proposer_cache.lock().insert(
prepare_epoch,
decision_root,
proposers,
fork,
)?;
// It's possible that the head changes whilst computing these duties. If so, abandon
// this routine since the change of head would have also spawned another instance of
// this routine.
//
// Exit now, after updating the cache.
if decision_root != shuffling_decision_root {
warn!(
self.log,
"Head changed during proposer preparation";
);
return Ok(());
}
proposer
};
};
// If the execution layer doesn't have any proposer data for this validator then we assume
// it's not connected to this BN and no action is required.
let proposer = pre_payload_attributes.proposer_index;
if !execution_layer
.has_proposer_preparation_data(proposer as u64)
.has_proposer_preparation_data(proposer)
.await
{
return Ok(());
}
let head_root = forkchoice_update_params.head_root;
let payload_attributes = PayloadAttributes {
timestamp: self
.slot_clock
.start_of(prepare_slot)
.ok_or(Error::InvalidSlot(prepare_slot))?
.as_secs(),
prev_randao: head_random,
suggested_fee_recipient: execution_layer
.get_suggested_fee_recipient(proposer as u64)
.await,
prev_randao: pre_payload_attributes.prev_randao,
suggested_fee_recipient: execution_layer.get_suggested_fee_recipient(proposer).await,
};
debug!(
self.log,
"Preparing beacon proposer";
"payload_attributes" => ?payload_attributes,
"head_root" => ?head_root,
"prepare_slot" => prepare_slot,
"validator" => proposer,
"parent_root" => ?head_root,
);
let already_known = execution_layer
.insert_proposer(prepare_slot, head_root, proposer as u64, payload_attributes)
.insert_proposer(prepare_slot, head_root, proposer, payload_attributes)
.await;
// Only push a log to the user if this is the first time we've seen this proposer for this
// slot.
if !already_known {
info!(
self.log,
"Prepared beacon proposer";
"already_known" => already_known,
"prepare_slot" => prepare_slot,
"validator" => proposer,
"parent_root" => ?head_root,
);
}
@@ -4286,27 +4650,22 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Ok(());
};
// If either of the following are true, send a fork-choice update message to the
// EL:
//
// 1. We're in the tail-end of the slot (as defined by
// PAYLOAD_PREPARATION_LOOKAHEAD_FACTOR)
// 2. The head block is one slot (or less) behind the prepare slot (e.g., we're
// preparing for the next slot and the block at the current slot is already
// known).
if till_prepare_slot
<= self.slot_clock.slot_duration() / PAYLOAD_PREPARATION_LOOKAHEAD_FACTOR
|| head_slot + 1 >= prepare_slot
{
// If we are close enough to the proposal slot, send an fcU, which will have payload
// attributes filled in by the execution layer cache we just primed.
if till_prepare_slot <= self.config.prepare_payload_lookahead {
debug!(
self.log,
"Pushing update to prepare proposer";
"Sending forkchoiceUpdate for proposer prep";
"till_prepare_slot" => ?till_prepare_slot,
"prepare_slot" => prepare_slot
);
self.update_execution_engine_forkchoice(current_slot, forkchoice_update_params)
.await?;
self.update_execution_engine_forkchoice(
current_slot,
forkchoice_update_params,
OverrideForkchoiceUpdate::AlreadyApplied,
)
.await?;
}
Ok(())
@@ -4315,7 +4674,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn update_execution_engine_forkchoice(
self: &Arc<Self>,
current_slot: Slot,
params: ForkchoiceUpdateParameters,
input_params: ForkchoiceUpdateParameters,
override_forkchoice_update: OverrideForkchoiceUpdate,
) -> Result<(), Error> {
let next_slot = current_slot + 1;
@@ -4337,6 +4697,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.as_ref()
.ok_or(Error::ExecutionLayerMissing)?;
// Determine whether to override the forkchoiceUpdated message if we want to re-org
// the current head at the next slot.
let params = if override_forkchoice_update == OverrideForkchoiceUpdate::Yes {
let chain = self.clone();
self.spawn_blocking_handle(
move || chain.overridden_forkchoice_update_params(input_params),
"update_execution_engine_forkchoice_override",
)
.await??
} else {
input_params
};
// Take the global lock for updating the execution engine fork choice.
//
// Whilst holding this lock we must:

View File

@@ -7,6 +7,8 @@
use crate::{metrics, BeaconSnapshot};
use derivative::Derivative;
use fork_choice::ForkChoiceStore;
use proto_array::JustifiedBalances;
use safe_arith::ArithError;
use ssz_derive::{Decode, Encode};
use std::collections::BTreeSet;
use std::marker::PhantomData;
@@ -31,6 +33,7 @@ pub enum Error {
MissingState(Hash256),
InvalidPersistedBytes(ssz::DecodeError),
BeaconStateError(BeaconStateError),
Arith(ArithError),
}
impl From<BeaconStateError> for Error {
@@ -39,27 +42,15 @@ impl From<BeaconStateError> for Error {
}
}
impl From<ArithError> for Error {
fn from(e: ArithError) -> Self {
Error::Arith(e)
}
}
/// The number of validator balance sets that are cached within `BalancesCache`.
const MAX_BALANCE_CACHE_SIZE: usize = 4;
/// Returns the effective balances for every validator in the given `state`.
///
/// Any validator who is not active in the epoch of the given `state` is assigned a balance of
/// zero.
pub fn get_effective_balances<T: EthSpec>(state: &BeaconState<T>) -> Vec<u64> {
state
.validators()
.iter()
.map(|validator| {
if validator.is_active_at(state.current_epoch()) {
validator.effective_balance
} else {
0
}
})
.collect()
}
#[superstruct(
variants(V8),
variant_attributes(derive(PartialEq, Clone, Debug, Encode, Decode)),
@@ -113,7 +104,7 @@ impl BalancesCache {
let item = CacheItem {
block_root: epoch_boundary_root,
epoch,
balances: get_effective_balances(state),
balances: JustifiedBalances::from_justified_state(state)?.effective_balances,
};
if self.items.len() == MAX_BALANCE_CACHE_SIZE {
@@ -152,7 +143,7 @@ pub struct BeaconForkChoiceStore<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<
time: Slot,
finalized_checkpoint: Checkpoint,
justified_checkpoint: Checkpoint,
justified_balances: Vec<u64>,
justified_balances: JustifiedBalances,
best_justified_checkpoint: Checkpoint,
unrealized_justified_checkpoint: Checkpoint,
unrealized_finalized_checkpoint: Checkpoint,
@@ -181,7 +172,7 @@ where
pub fn get_forkchoice_store(
store: Arc<HotColdDB<E, Hot, Cold>>,
anchor: &BeaconSnapshot<E>,
) -> Self {
) -> Result<Self, Error> {
let anchor_state = &anchor.beacon_state;
let mut anchor_block_header = anchor_state.latest_block_header().clone();
if anchor_block_header.state_root == Hash256::zero() {
@@ -194,13 +185,14 @@ where
root: anchor_root,
};
let finalized_checkpoint = justified_checkpoint;
let justified_balances = JustifiedBalances::from_justified_state(anchor_state)?;
Self {
Ok(Self {
store,
balances_cache: <_>::default(),
time: anchor_state.slot(),
justified_checkpoint,
justified_balances: anchor_state.balances().clone().into(),
justified_balances,
finalized_checkpoint,
best_justified_checkpoint: justified_checkpoint,
unrealized_justified_checkpoint: justified_checkpoint,
@@ -208,7 +200,7 @@ where
proposer_boost_root: Hash256::zero(),
equivocating_indices: BTreeSet::new(),
_phantom: PhantomData,
}
})
}
/// Save the current state of `Self` to a `PersistedForkChoiceStore` which can be stored to the
@@ -219,7 +211,7 @@ where
time: self.time,
finalized_checkpoint: self.finalized_checkpoint,
justified_checkpoint: self.justified_checkpoint,
justified_balances: self.justified_balances.clone(),
justified_balances: self.justified_balances.effective_balances.clone(),
best_justified_checkpoint: self.best_justified_checkpoint,
unrealized_justified_checkpoint: self.unrealized_justified_checkpoint,
unrealized_finalized_checkpoint: self.unrealized_finalized_checkpoint,
@@ -233,13 +225,15 @@ where
persisted: PersistedForkChoiceStore,
store: Arc<HotColdDB<E, Hot, Cold>>,
) -> Result<Self, Error> {
let justified_balances =
JustifiedBalances::from_effective_balances(persisted.justified_balances)?;
Ok(Self {
store,
balances_cache: persisted.balances_cache,
time: persisted.time,
finalized_checkpoint: persisted.finalized_checkpoint,
justified_checkpoint: persisted.justified_checkpoint,
justified_balances: persisted.justified_balances,
justified_balances,
best_justified_checkpoint: persisted.best_justified_checkpoint,
unrealized_justified_checkpoint: persisted.unrealized_justified_checkpoint,
unrealized_finalized_checkpoint: persisted.unrealized_finalized_checkpoint,
@@ -279,7 +273,7 @@ where
&self.justified_checkpoint
}
fn justified_balances(&self) -> &[u64] {
fn justified_balances(&self) -> &JustifiedBalances {
&self.justified_balances
}
@@ -314,8 +308,9 @@ where
self.justified_checkpoint.root,
self.justified_checkpoint.epoch,
) {
// NOTE: could avoid this re-calculation by introducing a `PersistedCacheItem`.
metrics::inc_counter(&metrics::BALANCES_CACHE_HITS);
self.justified_balances = balances;
self.justified_balances = JustifiedBalances::from_effective_balances(balances)?;
} else {
metrics::inc_counter(&metrics::BALANCES_CACHE_MISSES);
let justified_block = self
@@ -332,7 +327,7 @@ where
.map_err(Error::FailedToReadState)?
.ok_or_else(|| Error::MissingState(justified_block.state_root()))?;
self.justified_balances = get_effective_balances(&state);
self.justified_balances = JustifiedBalances::from_justified_state(&state)?;
}
Ok(())

View File

@@ -22,6 +22,7 @@ use fork_choice::{ForkChoice, ResetPayloadStatuses};
use futures::channel::mpsc::Sender;
use operation_pool::{OperationPool, PersistedOperationPool};
use parking_lot::RwLock;
use proto_array::ReOrgThreshold;
use slasher::Slasher;
use slog::{crit, error, info, Logger};
use slot_clock::{SlotClock, TestingSlotClock};
@@ -31,8 +32,8 @@ use std::time::Duration;
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
use task_executor::{ShutdownReason, TaskExecutor};
use types::{
BeaconBlock, BeaconState, ChainSpec, Checkpoint, EthSpec, Graffiti, Hash256, PublicKeyBytes,
Signature, SignedBeaconBlock, Slot,
BeaconBlock, BeaconState, ChainSpec, Checkpoint, Epoch, EthSpec, Graffiti, Hash256,
PublicKeyBytes, Signature, SignedBeaconBlock, Slot,
};
/// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing
@@ -159,6 +160,21 @@ where
self
}
/// Sets the proposer re-org threshold.
pub fn proposer_re_org_threshold(mut self, threshold: Option<ReOrgThreshold>) -> Self {
self.chain_config.re_org_threshold = threshold;
self
}
/// Sets the proposer re-org max epochs since finalization.
pub fn proposer_re_org_max_epochs_since_finalization(
mut self,
epochs_since_finalization: Epoch,
) -> Self {
self.chain_config.re_org_max_epochs_since_finalization = epochs_since_finalization;
self
}
/// Sets the store (database).
///
/// Should generally be called early in the build chain.
@@ -358,7 +374,8 @@ where
let (genesis, updated_builder) = self.set_genesis_state(beacon_state)?;
self = updated_builder;
let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &genesis);
let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &genesis)
.map_err(|e| format!("Unable to initialize fork choice store: {e:?}"))?;
let current_slot = None;
let fork_choice = ForkChoice::from_anchor(
@@ -476,7 +493,8 @@ where
beacon_state: weak_subj_state,
};
let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &snapshot);
let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &snapshot)
.map_err(|e| format!("Unable to initialize fork choice store: {e:?}"))?;
let current_slot = Some(snapshot.beacon_block.slot());
let fork_choice = ForkChoice::from_anchor(

View File

@@ -34,7 +34,8 @@
use crate::persisted_fork_choice::PersistedForkChoice;
use crate::{
beacon_chain::{
BeaconForkChoice, BeaconStore, BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT, FORK_CHOICE_DB_KEY,
BeaconForkChoice, BeaconStore, OverrideForkchoiceUpdate,
BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT, FORK_CHOICE_DB_KEY,
},
block_times_cache::BlockTimesCache,
events::ServerSentEventHandler,
@@ -114,6 +115,11 @@ impl<E: EthSpec> CachedHead<E> {
self.snapshot.beacon_block_root
}
/// Returns the root of the parent of the head block.
pub fn parent_block_root(&self) -> Hash256 {
self.snapshot.beacon_block.parent_root()
}
/// Returns root of the `BeaconState` at the head of the beacon chain.
///
/// ## Note
@@ -146,6 +152,21 @@ impl<E: EthSpec> CachedHead<E> {
Ok(root)
}
/// Returns the randao mix for the parent of the block at the head of the chain.
///
/// This is useful for re-orging the current head. The parent's RANDAO value is read from
/// the head's execution payload because it is unavailable in the beacon state's RANDAO mixes
/// array after being overwritten by the head block's RANDAO mix.
///
/// This will error if the head block is not execution-enabled (post Bellatrix).
pub fn parent_random(&self) -> Result<Hash256, BeaconStateError> {
self.snapshot
.beacon_block
.message()
.execution_payload()
.map(|payload| payload.prev_randao())
}
/// Returns the active validator count for the current epoch of the head state.
///
/// Should only return `None` if the caches have not been built on the head state (this should
@@ -765,6 +786,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
new_cached_head: &CachedHead<T::EthSpec>,
new_head_proto_block: ProtoBlock,
) -> Result<(), Error> {
let _timer = metrics::start_timer(&metrics::FORK_CHOICE_AFTER_NEW_HEAD_TIMES);
let old_snapshot = &old_cached_head.snapshot;
let new_snapshot = &new_cached_head.snapshot;
let new_head_is_optimistic = new_head_proto_block
@@ -902,6 +924,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
new_view: ForkChoiceView,
finalized_proto_block: ProtoBlock,
) -> Result<(), Error> {
let _timer = metrics::start_timer(&metrics::FORK_CHOICE_AFTER_FINALIZATION_TIMES);
let new_snapshot = &new_cached_head.snapshot;
let finalized_block_is_optimistic = finalized_proto_block
.execution_status
@@ -1124,7 +1147,11 @@ fn spawn_execution_layer_updates<T: BeaconChainTypes>(
}
if let Err(e) = chain
.update_execution_engine_forkchoice(current_slot, forkchoice_update_params)
.update_execution_engine_forkchoice(
current_slot,
forkchoice_update_params,
OverrideForkchoiceUpdate::Yes,
)
.await
{
crit!(

View File

@@ -1,9 +1,18 @@
pub use proto_array::CountUnrealizedFull;
pub use proto_array::{CountUnrealizedFull, ReOrgThreshold};
use serde_derive::{Deserialize, Serialize};
use types::Checkpoint;
use std::time::Duration;
use types::{Checkpoint, Epoch};
pub const DEFAULT_RE_ORG_THRESHOLD: ReOrgThreshold = ReOrgThreshold(20);
pub const DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION: Epoch = Epoch::new(2);
pub const DEFAULT_FORK_CHOICE_BEFORE_PROPOSAL_TIMEOUT: u64 = 250;
/// Default fraction of a slot lookahead for payload preparation (12/3 = 4 seconds on mainnet).
pub const DEFAULT_PREPARE_PAYLOAD_LOOKAHEAD_FACTOR: u32 = 3;
/// Fraction of a slot lookahead for fork choice in the state advance timer (500ms on mainnet).
pub const FORK_CHOICE_LOOKAHEAD_FACTOR: u32 = 24;
#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
pub struct ChainConfig {
/// Maximum number of slots to skip when importing a consensus message (e.g., block,
@@ -21,6 +30,10 @@ pub struct ChainConfig {
pub enable_lock_timeouts: bool,
/// The max size of a message that can be sent over the network.
pub max_network_size: usize,
/// Maximum percentage of committee weight at which to attempt re-orging the canonical head.
pub re_org_threshold: Option<ReOrgThreshold>,
/// Maximum number of epochs since finalization for attempting a proposer re-org.
pub re_org_max_epochs_since_finalization: Epoch,
/// Number of milliseconds to wait for fork choice before proposing a block.
///
/// If set to 0 then block proposal will not wait for fork choice at all.
@@ -47,6 +60,11 @@ pub struct ChainConfig {
pub count_unrealized_full: CountUnrealizedFull,
/// Optionally set timeout for calls to checkpoint sync endpoint.
pub checkpoint_sync_url_timeout: u64,
/// The offset before the start of a proposal slot at which payload attributes should be sent.
///
/// Low values are useful for execution engines which don't improve their payload after the
/// first call, and high values are useful for ensuring the EL is given ample notice.
pub prepare_payload_lookahead: Duration,
}
impl Default for ChainConfig {
@@ -57,6 +75,8 @@ impl Default for ChainConfig {
reconstruct_historic_states: false,
enable_lock_timeouts: true,
max_network_size: 10 * 1_048_576, // 10M
re_org_threshold: Some(DEFAULT_RE_ORG_THRESHOLD),
re_org_max_epochs_since_finalization: DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION,
fork_choice_before_proposal_timeout_ms: DEFAULT_FORK_CHOICE_BEFORE_PROPOSAL_TIMEOUT,
// Builder fallback configs that are set in `clap` will override these.
builder_fallback_skips: 3,
@@ -68,6 +88,7 @@ impl Default for ChainConfig {
paranoid_block_proposal: false,
count_unrealized_full: CountUnrealizedFull::default(),
checkpoint_sync_url_timeout: 60,
prepare_payload_lookahead: Duration::from_secs(4),
}
}
}

View File

@@ -204,6 +204,7 @@ pub enum BeaconChainError {
MissingPersistedForkChoice,
CommitteePromiseFailed(oneshot_broadcast::Error),
MaxCommitteePromises(usize),
ProposerHeadForkChoiceError(fork_choice::Error<proto_array::Error>),
}
easy_from_to!(SlotProcessingError, BeaconChainError);
@@ -234,6 +235,7 @@ pub enum BlockProductionError {
UnableToProduceAtSlot(Slot),
SlotProcessingError(SlotProcessingError),
BlockProcessingError(BlockProcessingError),
ForkChoiceError(ForkChoiceError),
Eth1ChainError(Eth1ChainError),
BeaconStateError(BeaconStateError),
StateAdvanceError(StateAdvanceError),
@@ -252,7 +254,6 @@ pub enum BlockProductionError {
FailedToReadFinalizedBlock(store::Error),
MissingFinalizedBlock(Hash256),
BlockTooLarge(usize),
ForkChoiceError(BeaconChainError),
ShuttingDown,
MissingSyncAggregate,
MissingExecutionPayload,
@@ -265,3 +266,4 @@ easy_from_to!(BeaconStateError, BlockProductionError);
easy_from_to!(SlotProcessingError, BlockProductionError);
easy_from_to!(Eth1ChainError, BlockProductionError);
easy_from_to!(StateAdvanceError, BlockProductionError);
easy_from_to!(ForkChoiceError, BlockProductionError);

View File

@@ -147,7 +147,8 @@ pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: It
beacon_state: finalized_state,
};
let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store.clone(), &finalized_snapshot);
let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store.clone(), &finalized_snapshot)
.map_err(|e| format!("Unable to reset fork choice store for revert: {e:?}"))?;
let mut fork_choice = ForkChoice::from_anchor(
fc_store,

View File

@@ -48,8 +48,8 @@ pub mod validator_pubkey_cache;
pub use self::beacon_chain::{
AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult,
CountUnrealized, ForkChoiceError, ProduceBlockVerification, StateSkipConfig, WhenSlotSkipped,
INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON,
CountUnrealized, ForkChoiceError, OverrideForkchoiceUpdate, ProduceBlockVerification,
StateSkipConfig, WhenSlotSkipped, INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON,
INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
};
pub use self::beacon_snapshot::BeaconSnapshot;

View File

@@ -77,6 +77,11 @@ lazy_static! {
"beacon_block_processing_attestation_observation_seconds",
"Time spent hashing and remembering all the attestations in the block"
);
pub static ref BLOCK_PROCESSING_FORK_CHOICE: Result<Histogram> = try_create_histogram_with_buckets(
"beacon_block_processing_fork_choice_seconds",
"Time spent running fork choice's `get_head` during block import",
exponential_buckets(1e-3, 2.0, 8)
);
pub static ref BLOCK_SYNC_AGGREGATE_SET_BITS: Result<IntGauge> = try_create_int_gauge(
"block_sync_aggregate_set_bits",
"The number of true bits in the last sync aggregate in a block"
@@ -99,6 +104,11 @@ lazy_static! {
"beacon_block_production_fork_choice_seconds",
"Time taken to run fork choice before block production"
);
pub static ref BLOCK_PRODUCTION_GET_PROPOSER_HEAD_TIMES: Result<Histogram> = try_create_histogram_with_buckets(
"beacon_block_production_get_proposer_head_times",
"Time taken for fork choice to compute the proposer head before block production",
exponential_buckets(1e-3, 2.0, 8)
);
pub static ref BLOCK_PRODUCTION_STATE_LOAD_TIMES: Result<Histogram> = try_create_histogram(
"beacon_block_production_state_load_seconds",
"Time taken to load the base state for block production"
@@ -322,10 +332,26 @@ lazy_static! {
"beacon_reorgs_total",
"Count of occasions fork choice has switched to a different chain"
);
pub static ref FORK_CHOICE_TIMES: Result<Histogram> =
try_create_histogram("beacon_fork_choice_seconds", "Full runtime of fork choice");
pub static ref FORK_CHOICE_FIND_HEAD_TIMES: Result<Histogram> =
try_create_histogram("beacon_fork_choice_find_head_seconds", "Full runtime of fork choice find_head function");
pub static ref FORK_CHOICE_TIMES: Result<Histogram> = try_create_histogram_with_buckets(
"beacon_fork_choice_seconds",
"Full runtime of fork choice",
linear_buckets(10e-3, 20e-3, 10)
);
pub static ref FORK_CHOICE_OVERRIDE_FCU_TIMES: Result<Histogram> = try_create_histogram_with_buckets(
"beacon_fork_choice_override_fcu_seconds",
"Time taken to compute the optional forkchoiceUpdated override",
exponential_buckets(1e-3, 2.0, 8)
);
pub static ref FORK_CHOICE_AFTER_NEW_HEAD_TIMES: Result<Histogram> = try_create_histogram_with_buckets(
"beacon_fork_choice_after_new_head_seconds",
"Time taken to run `after_new_head`",
exponential_buckets(1e-3, 2.0, 10)
);
pub static ref FORK_CHOICE_AFTER_FINALIZATION_TIMES: Result<Histogram> = try_create_histogram_with_buckets(
"beacon_fork_choice_after_finalization_seconds",
"Time taken to run `after_finalization`",
exponential_buckets(1e-3, 2.0, 10)
);
pub static ref FORK_CHOICE_PROCESS_BLOCK_TIMES: Result<Histogram> = try_create_histogram(
"beacon_fork_choice_process_block_seconds",
"Time taken to add a block and all attestations to fork choice"

View File

@@ -5,13 +5,9 @@ use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::time::sleep;
/// At 12s slot times, the means that the payload preparation routine will run 4s before the start
/// of each slot (`12 / 3 = 4`).
pub const PAYLOAD_PREPARATION_LOOKAHEAD_FACTOR: u32 = 3;
/// Spawns a routine which ensures the EL is provided advance notice of any block producers.
///
/// This routine will run once per slot, at `slot_duration / PAYLOAD_PREPARATION_LOOKAHEAD_FACTOR`
/// This routine will run once per slot, at `chain.prepare_payload_lookahead()`
/// before the start of each slot.
///
/// The service will not be started if there is no `execution_layer` on the `chain`.
@@ -38,8 +34,8 @@ async fn proposer_prep_service<T: BeaconChainTypes>(
loop {
match chain.slot_clock.duration_to_next_slot() {
Some(duration) => {
let additional_delay = slot_duration
- chain.slot_clock.slot_duration() / PAYLOAD_PREPARATION_LOOKAHEAD_FACTOR;
let additional_delay =
slot_duration.saturating_sub(chain.config.prepare_payload_lookahead);
sleep(duration + additional_delay).await;
debug!(
@@ -65,14 +61,11 @@ async fn proposer_prep_service<T: BeaconChainTypes>(
},
"proposer_prep_update",
);
continue;
}
None => {
error!(chain.log, "Failed to read slot clock");
// If we can't read the slot clock, just wait another slot.
sleep(slot_duration).await;
continue;
}
};
}

View File

@@ -13,7 +13,10 @@ pub const DEFAULT_SNAPSHOT_CACHE_SIZE: usize = 4;
/// The minimum block delay to clone the state in the cache instead of removing it.
/// This helps keep block processing fast during re-orgs from late blocks.
const MINIMUM_BLOCK_DELAY_FOR_CLONE: Duration = Duration::from_secs(6);
fn minimum_block_delay_for_clone(seconds_per_slot: u64) -> Duration {
// If the block arrived at the attestation deadline or later, it might get re-orged.
Duration::from_secs(seconds_per_slot) / 3
}
/// This snapshot is to be used for verifying a child of `self.beacon_block`.
#[derive(Debug)]
@@ -256,7 +259,7 @@ impl<T: EthSpec> SnapshotCache<T> {
if let Some(cache) = self.snapshots.get(i) {
// Avoid cloning the block during sync (when the `block_delay` is `None`).
if let Some(delay) = block_delay {
if delay >= MINIMUM_BLOCK_DELAY_FOR_CLONE
if delay >= minimum_block_delay_for_clone(spec.seconds_per_slot)
&& delay <= Duration::from_secs(spec.seconds_per_slot) * 4
|| block_slot > cache.beacon_block.slot() + 1
{

View File

@@ -16,6 +16,7 @@
use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS;
use crate::{
beacon_chain::{ATTESTATION_CACHE_LOCK_TIMEOUT, BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT},
chain_config::FORK_CHOICE_LOOKAHEAD_FACTOR,
snapshot_cache::StateAdvance,
BeaconChain, BeaconChainError, BeaconChainTypes,
};
@@ -133,7 +134,7 @@ async fn state_advance_timer<T: BeaconChainTypes>(
// Run fork choice 23/24s of the way through the slot (11.5s on mainnet).
// We need to run after the state advance, so use the same condition as above.
let fork_choice_offset = slot_duration / 24;
let fork_choice_offset = slot_duration / FORK_CHOICE_LOOKAHEAD_FACTOR;
let fork_choice_instant = if duration_to_next_slot > state_advance_offset {
Instant::now() + duration_to_next_slot - fork_choice_offset
} else {
@@ -224,8 +225,20 @@ async fn state_advance_timer<T: BeaconChainTypes>(
return;
}
// Re-compute the head, dequeuing attestations for the current slot early.
beacon_chain.recompute_head_at_slot(next_slot).await;
// Prepare proposers so that the node can send payload attributes in the case where
// it decides to abandon a proposer boost re-org.
if let Err(e) = beacon_chain.prepare_beacon_proposer(current_slot).await {
warn!(
log,
"Unable to prepare proposer with lookahead";
"error" => ?e,
"slot" => next_slot,
);
}
// Use a blocking task to avoid blocking the core executor whilst waiting for locks
// in `ForkChoiceSignalTx`.
beacon_chain.task_executor.clone().spawn_blocking(

View File

@@ -32,7 +32,7 @@ use rand::SeedableRng;
use rayon::prelude::*;
use sensitive_url::SensitiveUrl;
use slog::Logger;
use slot_clock::TestingSlotClock;
use slot_clock::{SlotClock, TestingSlotClock};
use state_processing::per_block_processing::compute_timestamp_at_slot;
use state_processing::{
state_advance::{complete_state_advance, partial_state_advance},
@@ -319,6 +319,12 @@ where
self
}
pub fn logger(mut self, log: Logger) -> Self {
self.log = log.clone();
self.runtime.set_logger(log);
self
}
/// This mutator will be run before the `store_mutator`.
pub fn initial_mutator(mut self, mutator: BoxedMutator<E, Hot, Cold>) -> Self {
assert!(
@@ -524,10 +530,9 @@ pub struct BeaconChainHarness<T: BeaconChainTypes> {
pub rng: Mutex<StdRng>,
}
pub type HarnessAttestations<E> = Vec<(
Vec<(Attestation<E>, SubnetId)>,
Option<SignedAggregateAndProof<E>>,
)>;
pub type CommitteeAttestations<E> = Vec<(Attestation<E>, SubnetId)>;
pub type HarnessAttestations<E> =
Vec<(CommitteeAttestations<E>, Option<SignedAggregateAndProof<E>>)>;
pub type HarnessSyncContributions<E> = Vec<(
Vec<(SyncCommitteeMessage, usize)>,
@@ -778,6 +783,21 @@ where
sk.sign(message)
}
/// Sign a beacon block using the proposer's key.
pub fn sign_beacon_block(
&self,
block: BeaconBlock<E>,
state: &BeaconState<E>,
) -> SignedBeaconBlock<E> {
let proposer_index = block.proposer_index() as usize;
block.sign(
&self.validator_keypairs[proposer_index].sk,
&state.fork(),
state.genesis_validators_root(),
&self.spec,
)
}
/// Produces an "unaggregated" attestation for the given `slot` and `index` that attests to
/// `beacon_block_root`. The provided `state` should match the `block.state_root` for the
/// `block` identified by `beacon_block_root`.
@@ -851,13 +871,35 @@ where
state_root: Hash256,
head_block_root: SignedBeaconBlockHash,
attestation_slot: Slot,
) -> Vec<Vec<(Attestation<E>, SubnetId)>> {
) -> Vec<CommitteeAttestations<E>> {
self.make_unaggregated_attestations_with_limit(
attesting_validators,
state,
state_root,
head_block_root,
attestation_slot,
None,
)
.0
}
pub fn make_unaggregated_attestations_with_limit(
&self,
attesting_validators: &[usize],
state: &BeaconState<E>,
state_root: Hash256,
head_block_root: SignedBeaconBlockHash,
attestation_slot: Slot,
limit: Option<usize>,
) -> (Vec<CommitteeAttestations<E>>, Vec<usize>) {
let committee_count = state.get_committee_count_at_slot(state.slot()).unwrap();
let fork = self
.spec
.fork_at_epoch(attestation_slot.epoch(E::slots_per_epoch()));
state
let attesters = Mutex::new(vec![]);
let attestations = state
.get_beacon_committees_at_slot(attestation_slot)
.expect("should get committees")
.iter()
@@ -869,6 +911,15 @@ where
if !attesting_validators.contains(validator_index) {
return None;
}
let mut attesters = attesters.lock();
if let Some(limit) = limit {
if attesters.len() >= limit {
return None;
}
}
attesters.push(*validator_index);
let mut attestation = self
.produce_unaggregated_attestation_for_block(
attestation_slot,
@@ -909,9 +960,19 @@ where
Some((attestation, subnet_id))
})
.collect()
.collect::<Vec<_>>()
})
.collect()
.collect::<Vec<_>>();
let attesters = attesters.into_inner();
if let Some(limit) = limit {
assert_eq!(
limit,
attesters.len(),
"failed to generate `limit` attestations"
);
}
(attestations, attesters)
}
/// A list of sync messages for the given state.
@@ -1004,13 +1065,38 @@ where
block_hash: SignedBeaconBlockHash,
slot: Slot,
) -> HarnessAttestations<E> {
let unaggregated_attestations = self.make_unaggregated_attestations(
self.make_attestations_with_limit(
attesting_validators,
state,
state_root,
block_hash,
slot,
);
None,
)
.0
}
/// Produce exactly `limit` attestations.
///
/// Return attestations and vec of validator indices that attested.
pub fn make_attestations_with_limit(
&self,
attesting_validators: &[usize],
state: &BeaconState<E>,
state_root: Hash256,
block_hash: SignedBeaconBlockHash,
slot: Slot,
limit: Option<usize>,
) -> (HarnessAttestations<E>, Vec<usize>) {
let (unaggregated_attestations, attesters) = self
.make_unaggregated_attestations_with_limit(
attesting_validators,
state,
state_root,
block_hash,
slot,
limit,
);
let fork = self.spec.fork_at_epoch(slot.epoch(E::slots_per_epoch()));
let aggregated_attestations: Vec<Option<SignedAggregateAndProof<E>>> =
@@ -1029,7 +1115,7 @@ where
.committee
.iter()
.find(|&validator_index| {
if !attesting_validators.contains(validator_index) {
if !attesters.contains(validator_index) {
return false;
}
@@ -1080,10 +1166,13 @@ where
})
.collect();
unaggregated_attestations
.into_iter()
.zip(aggregated_attestations)
.collect()
(
unaggregated_attestations
.into_iter()
.zip(aggregated_attestations)
.collect(),
attesters,
)
}
pub fn make_sync_contributions(
@@ -1736,6 +1825,12 @@ where
self.chain.slot_clock.advance_slot();
}
/// Advance the clock to `lookahead` before the start of `slot`.
pub fn advance_to_slot_lookahead(&self, slot: Slot, lookahead: Duration) {
let time = self.chain.slot_clock.start_of(slot).unwrap() - lookahead;
self.chain.slot_clock.set_current_time(time);
}
/// Deprecated: Use make_block() instead
///
/// Returns a newly created block, signed by the proposer for the given slot.

View File

@@ -53,6 +53,7 @@ async fn merge_with_terminal_block_hash_override() {
let harness = BeaconChainHarness::builder(E::default())
.spec(spec)
.logger(logging::test_logger())
.deterministic_keypairs(VALIDATOR_COUNT)
.fresh_ephemeral_store()
.mock_execution_layer()
@@ -109,6 +110,7 @@ async fn base_altair_merge_with_terminal_block_after_fork() {
let harness = BeaconChainHarness::builder(E::default())
.spec(spec)
.logger(logging::test_logger())
.deterministic_keypairs(VALIDATOR_COUNT)
.fresh_ephemeral_store()
.mock_execution_layer()

View File

@@ -7,8 +7,9 @@ use beacon_chain::otb_verification_service::{
use beacon_chain::{
canonical_head::{CachedHead, CanonicalHead},
test_utils::{BeaconChainHarness, EphemeralHarnessType},
BeaconChainError, BlockError, ExecutionPayloadError, NotifyExecutionLayer, StateSkipConfig,
WhenSlotSkipped, INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON,
BeaconChainError, BlockError, ExecutionPayloadError, NotifyExecutionLayer,
OverrideForkchoiceUpdate, StateSkipConfig, WhenSlotSkipped,
INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON,
INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON,
};
use execution_layer::{
@@ -19,6 +20,7 @@ use execution_layer::{
use fork_choice::{
CountUnrealized, Error as ForkChoiceError, InvalidationOperation, PayloadVerificationStatus,
};
use logging::test_logger;
use proto_array::{Error as ProtoArrayError, ExecutionStatus};
use slot_clock::SlotClock;
use std::collections::HashMap;
@@ -59,6 +61,7 @@ impl InvalidPayloadRig {
let harness = BeaconChainHarness::builder(MainnetEthSpec)
.spec(spec)
.logger(test_logger())
.deterministic_keypairs(VALIDATOR_COUNT)
.mock_execution_layer()
.fresh_ephemeral_store()
@@ -383,7 +386,7 @@ impl InvalidPayloadRig {
.fork_choice_write_lock()
.get_head(self.harness.chain.slot().unwrap(), &self.harness.chain.spec)
{
Err(ForkChoiceError::ProtoArrayError(e)) if e.contains(s) => (),
Err(ForkChoiceError::ProtoArrayStringError(e)) if e.contains(s) => (),
other => panic!("expected {} error, got {:?}", s, other),
};
}
@@ -978,6 +981,10 @@ async fn payload_preparation() {
)
.await;
rig.harness.advance_to_slot_lookahead(
next_slot,
rig.harness.chain.config.prepare_payload_lookahead,
);
rig.harness
.chain
.prepare_beacon_proposer(rig.harness.chain.slot().unwrap())
@@ -1054,7 +1061,7 @@ async fn invalid_parent() {
&rig.harness.chain.spec,
CountUnrealized::True,
),
Err(ForkChoiceError::ProtoArrayError(message))
Err(ForkChoiceError::ProtoArrayStringError(message))
if message.contains(&format!(
"{:?}",
ProtoArrayError::ParentExecutionStatusIsInvalid {
@@ -1121,7 +1128,11 @@ async fn payload_preparation_before_transition_block() {
.get_forkchoice_update_parameters();
rig.harness
.chain
.update_execution_engine_forkchoice(current_slot, forkchoice_update_params)
.update_execution_engine_forkchoice(
current_slot,
forkchoice_update_params,
OverrideForkchoiceUpdate::Yes,
)
.await
.unwrap();