Prioritise important parts of block processing (#3696)

## Issue Addressed

Closes https://github.com/sigp/lighthouse/issues/2327

## Proposed Changes

This is an extension of some ideas I implemented while working on `tree-states`:

- Cache the indexed attestations from blocks in the `ConsensusContext`. Previously we were re-computing them 3-4 times over.
- Clean up `import_block` by splitting each part into `import_block_XXX`.
- Move some stuff off hot paths, specifically:
  - Relocate non-essential tasks that were running between receiving the payload verification status and priming the early attester cache. These tasks are moved after the cache priming:
    - Attestation observation
    - Validator monitor updates
    - Slasher updates
    - Updating the shuffling cache
  - Fork choice attestation observation now happens at the end of block verification in parallel with payload verification (this seems to save 5-10ms).
  - Payload verification now happens _before_ advancing the pre-state and writing it to disk! States were previously being written eagerly and adding ~20-30ms in front of verifying the execution payload. State catchup also sometimes takes ~500ms if we get a cache miss and need to rebuild the tree hash cache.

The remaining task that's taking substantial time (~20ms) is importing the block to fork choice. I _think_ this is because of pull-tips, and we should be able to optimise it out with a clever total active balance cache in the state (which would be computed in parallel with payload verification). I've decided to leave that for future work though. For now it can be observed via the new `beacon_block_processing_post_exec_pre_attestable_seconds` metric.


Co-authored-by: Michael Sproul <micsproul@gmail.com>
This commit is contained in:
Michael Sproul
2022-11-30 05:22:58 +00:00
parent b4f4c0d253
commit 22115049ee
14 changed files with 784 additions and 497 deletions

View File

@@ -73,7 +73,7 @@ use slog::{crit, debug, error, info, trace, warn, Logger};
use slot_clock::SlotClock;
use ssz::Encode;
use state_processing::{
common::{get_attesting_indices_from_state, get_indexed_attestation},
common::get_attesting_indices_from_state,
per_block_processing,
per_block_processing::{
errors::AttestationValidationError, verify_attestation_for_block_inclusion,
@@ -2587,6 +2587,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
confirmed_state_roots,
payload_verification_handle,
parent_eth1_finalization_data,
consensus_context,
} = execution_pending_block;
let PayloadVerificationOutcome {
@@ -2640,6 +2641,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
count_unrealized,
parent_block,
parent_eth1_finalization_data,
consensus_context,
)
},
"payload_verification_handle",
@@ -2665,70 +2667,36 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
count_unrealized: CountUnrealized,
parent_block: SignedBlindedBeaconBlock<T::EthSpec>,
parent_eth1_finalization_data: Eth1FinalizationData,
mut consensus_context: ConsensusContext<T::EthSpec>,
) -> Result<Hash256, BlockError<T::EthSpec>> {
// ----------------------------- BLOCK NOT YET ATTESTABLE ----------------------------------
// Everything in this initial section is on the hot path between processing the block and
// being able to attest to it. DO NOT add any extra processing in this initial section
// unless it must run before fork choice.
// -----------------------------------------------------------------------------------------
let current_slot = self.slot()?;
let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch());
let block = signed_block.message();
let post_exec_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_POST_EXEC_PROCESSING);
let attestation_observation_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_ATTESTATION_OBSERVATION);
// Iterate through the attestations in the block and register them as an "observed
// attestation". This will stop us from propagating them on the gossip network.
for a in signed_block.message().body().attestations() {
match self.observed_attestations.write().observe_item(a, None) {
// If the observation was successful or if the slot for the attestation was too
// low, continue.
//
// We ignore `SlotTooLow` since this will be very common whilst syncing.
Ok(_) | Err(AttestationObservationError::SlotTooLow { .. }) => {}
Err(e) => return Err(BlockError::BeaconChainError(e.into())),
}
}
metrics::stop_timer(attestation_observation_timer);
// If a slasher is configured, provide the attestations from the block.
if let Some(slasher) = self.slasher.as_ref() {
for attestation in signed_block.message().body().attestations() {
let committee =
state.get_beacon_committee(attestation.data.slot, attestation.data.index)?;
let indexed_attestation = get_indexed_attestation(committee.committee, attestation)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
slasher.accept_attestation(indexed_attestation);
}
}
// Check against weak subjectivity checkpoint.
self.check_block_against_weak_subjectivity_checkpoint(block, block_root, &state)?;
// If there are new validators in this block, update our pubkey cache.
//
// We perform this _before_ adding the block to fork choice because the pubkey cache is
// used by attestation processing which will only process an attestation if the block is
// known to fork choice. This ordering ensure that the pubkey cache is always up-to-date.
self.validator_pubkey_cache
// The only keys imported here will be ones for validators deposited in this block, because
// the cache *must* already have been updated for the parent block when it was imported.
// Newly deposited validators are not active and their keys are not required by other parts
// of block processing. The reason we do this here and not after making the block attestable
// is so we don't have to think about lock ordering with respect to the fork choice lock.
// There are a bunch of places where we lock both fork choice and the pubkey cache and it
// would be difficult to check that they all lock fork choice first.
let mut kv_store_ops = self
.validator_pubkey_cache
.try_write_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?
.import_new_pubkeys(&state)?;
// For the current and next epoch of this state, ensure we have the shuffling from this
// block in our cache.
for relative_epoch in &[RelativeEpoch::Current, RelativeEpoch::Next] {
let shuffling_id = AttestationShufflingId::new(block_root, &state, *relative_epoch)?;
let shuffling_is_cached = self
.shuffling_cache
.try_read_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or(Error::AttestationCacheLockTimeout)?
.contains(&shuffling_id);
if !shuffling_is_cached {
state.build_committee_cache(*relative_epoch, &self.spec)?;
let committee_cache = state.committee_cache(*relative_epoch)?;
self.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or(Error::AttestationCacheLockTimeout)?
.insert_committee_cache(shuffling_id, committee_cache);
}
}
// Apply the state to the attester cache, only if it is from the previous epoch or later.
//
// In a perfect scenario there should be no need to add previous-epoch states to the cache.
@@ -2740,52 +2708,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_err(BeaconChainError::from)?;
}
// Alias for readability.
let block = signed_block.message();
// Only perform the weak subjectivity check if it was configured.
if let Some(wss_checkpoint) = self.config.weak_subjectivity_checkpoint {
// Note: we're using the finalized checkpoint from the head state, rather than fork
// choice.
//
// We are doing this to ensure that we detect changes in finalization. It's possible
// that fork choice has already been updated to the finalized checkpoint in the block
// we're importing.
let current_head_finalized_checkpoint =
self.canonical_head.cached_head().finalized_checkpoint();
// Compare the existing finalized checkpoint with the incoming block's finalized checkpoint.
let new_finalized_checkpoint = state.finalized_checkpoint();
// This ensures we only perform the check once.
if (current_head_finalized_checkpoint.epoch < wss_checkpoint.epoch)
&& (wss_checkpoint.epoch <= new_finalized_checkpoint.epoch)
{
if let Err(e) =
self.verify_weak_subjectivity_checkpoint(wss_checkpoint, block_root, &state)
{
let mut shutdown_sender = self.shutdown_sender();
crit!(
self.log,
"Weak subjectivity checkpoint verification failed while importing block!";
"block_root" => ?block_root,
"parent_root" => ?block.parent_root(),
"old_finalized_epoch" => ?current_head_finalized_checkpoint.epoch,
"new_finalized_epoch" => ?new_finalized_checkpoint.epoch,
"weak_subjectivity_epoch" => ?wss_checkpoint.epoch,
"error" => ?e,
);
crit!(self.log, "You must use the `--purge-db` flag to clear the database and restart sync. You may be on a hostile network.");
shutdown_sender
.try_send(ShutdownReason::Failure(
"Weak subjectivity checkpoint verification failed. Provided block root is not a checkpoint."
))
.map_err(|err| BlockError::BeaconChainError(BeaconChainError::WeakSubjectivtyShutdownError(err)))?;
return Err(BlockError::WeakSubjectivityConflict);
}
}
}
// Take an exclusive write-lock on fork choice. It's very important prevent deadlocks by
// Take an exclusive write-lock on fork choice. It's very important to prevent deadlocks by
// avoiding taking other locks whilst holding this lock.
let mut fork_choice = self.canonical_head.fork_choice_write_lock();
@@ -2815,77 +2738,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
}
// Allow the validator monitor to learn about a new valid state.
self.validator_monitor
.write()
.process_valid_state(current_slot.epoch(T::EthSpec::slots_per_epoch()), &state);
let validator_monitor = self.validator_monitor.read();
// Register each attester slashing in the block with fork choice.
for attester_slashing in block.body().attester_slashings() {
fork_choice.on_attester_slashing(attester_slashing);
}
// Register each attestation in the block with the fork choice service.
for attestation in block.body().attestations() {
let _fork_choice_attestation_timer =
metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_ATTESTATION_TIMES);
let attestation_target_epoch = attestation.data.target.epoch;
let committee =
state.get_beacon_committee(attestation.data.slot, attestation.data.index)?;
let indexed_attestation = get_indexed_attestation(committee.committee, attestation)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
match fork_choice.on_attestation(
current_slot,
&indexed_attestation,
AttestationFromBlock::True,
&self.spec,
) {
Ok(()) => Ok(()),
// Ignore invalid attestations whilst importing attestations from a block. The
// block might be very old and therefore the attestations useless to fork choice.
Err(ForkChoiceError::InvalidAttestation(_)) => Ok(()),
Err(e) => Err(BlockError::BeaconChainError(e.into())),
}?;
// To avoid slowing down sync, only register attestations for the
// `observed_block_attesters` if they are from the previous epoch or later.
if attestation_target_epoch + 1 >= current_epoch {
let mut observed_block_attesters = self.observed_block_attesters.write();
for &validator_index in &indexed_attestation.attesting_indices {
if let Err(e) = observed_block_attesters
.observe_validator(attestation_target_epoch, validator_index as usize)
{
debug!(
self.log,
"Failed to register observed block attester";
"error" => ?e,
"epoch" => attestation_target_epoch,
"validator_index" => validator_index,
)
}
}
}
// Only register this with the validator monitor when the block is sufficiently close to
// the current slot.
if VALIDATOR_MONITOR_HISTORIC_EPOCHS as u64 * T::EthSpec::slots_per_epoch()
+ block.slot().as_u64()
>= current_slot.as_u64()
{
match fork_choice.get_block(&block.parent_root()) {
Some(parent_block) => validator_monitor.register_attestation_in_block(
&indexed_attestation,
parent_block.slot,
&self.spec,
),
None => warn!(self.log, "Failed to get parent block"; "slot" => %block.slot()),
}
}
}
// If the block is recent enough and it was not optimistically imported, check to see if it
// becomes the head block. If so, apply it to the early attester cache. This will allow
// attestations to the block without waiting for the block and state to be inserted to the
@@ -2934,56 +2786,28 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
),
}
}
drop(post_exec_timer);
// Register sync aggregate with validator monitor
if let Ok(sync_aggregate) = block.body().sync_aggregate() {
// `SyncCommittee` for the sync_aggregate should correspond to the duty slot
let duty_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
let sync_committee = self.sync_committee_at_epoch(duty_epoch)?;
let participant_pubkeys = sync_committee
.pubkeys
.iter()
.zip(sync_aggregate.sync_committee_bits.iter())
.filter_map(|(pubkey, bit)| bit.then_some(pubkey))
.collect::<Vec<_>>();
// ---------------------------- BLOCK PROBABLY ATTESTABLE ----------------------------------
// Most blocks are now capable of being attested to thanks to the `early_attester_cache`
// cache above. Resume non-essential processing.
// -----------------------------------------------------------------------------------------
validator_monitor.register_sync_aggregate_in_block(
block.slot(),
block.parent_root(),
participant_pubkeys,
);
}
for exit in block.body().voluntary_exits() {
validator_monitor.register_block_voluntary_exit(&exit.message)
}
for slashing in block.body().attester_slashings() {
validator_monitor.register_block_attester_slashing(slashing)
}
for slashing in block.body().proposer_slashings() {
validator_monitor.register_block_proposer_slashing(slashing)
}
drop(validator_monitor);
// Only present some metrics for blocks from the previous epoch or later.
//
// This helps avoid noise in the metrics during sync.
if block.slot().epoch(T::EthSpec::slots_per_epoch()) + 1 >= self.epoch()? {
metrics::observe(
&metrics::OPERATIONS_PER_BLOCK_ATTESTATION,
block.body().attestations().len() as f64,
);
if let Ok(sync_aggregate) = block.body().sync_aggregate() {
metrics::set_gauge(
&metrics::BLOCK_SYNC_AGGREGATE_SET_BITS,
sync_aggregate.num_set_bits() as i64,
);
}
}
self.import_block_update_shuffling_cache(block_root, &mut state)?;
self.import_block_observe_attestations(
block,
&state,
&mut consensus_context,
current_epoch,
);
self.import_block_update_validator_monitor(
block,
&state,
&mut consensus_context,
current_slot,
parent_block.slot(),
);
self.import_block_update_slasher(block, &state, &mut consensus_context);
let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE);
@@ -3000,7 +2824,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
ops.push(StoreOp::PutState(block.state_root(), &state));
let txn_lock = self.store.hot_db.begin_rw_transaction();
if let Err(e) = self.store.do_atomically(ops) {
kv_store_ops.extend(self.store.convert_to_kv_batch(ops)?);
if let Err(e) = self.store.hot_db.do_atomically(kv_store_ops) {
error!(
self.log,
"Database write failed!";
@@ -3008,6 +2834,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"error" => ?e,
);
// Clear the early attester cache to prevent attestations which we would later be unable
// to verify due to the failure.
self.early_attester_cache.clear();
// Since the write failed, try to revert the canonical head back to what was stored
// in the database. This attempts to prevent inconsistency between the database and
// fork choice.
@@ -3050,6 +2880,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
eth1_deposit_index: state.eth1_deposit_index(),
};
let current_finalized_checkpoint = state.finalized_checkpoint();
self.snapshot_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.ok_or(Error::SnapshotCacheLockTimeout)
@@ -3057,7 +2888,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
snapshot_cache.insert(
BeaconSnapshot {
beacon_state: state,
beacon_block: signed_block,
beacon_block: signed_block.clone(),
beacon_block_root: block_root,
},
None,
@@ -3076,22 +2907,312 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.head_tracker
.register_block(block_root, parent_root, slot);
// Send an event to the `events` endpoint after fully processing the block.
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_block_subscribers() {
event_handler.register(EventKind::Block(SseBlock {
slot,
block: block_root,
execution_optimistic: payload_verification_status.is_optimistic(),
}));
}
}
metrics::stop_timer(db_write_timer);
metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES);
let block_delay_total = get_slot_delay_ms(block_time_imported, slot, &self.slot_clock);
// Update the deposit contract cache.
self.import_block_update_deposit_contract_finalization(
block,
block_root,
current_epoch,
current_finalized_checkpoint,
current_eth1_finalization_data,
parent_eth1_finalization_data,
parent_block.slot(),
);
// Inform the unknown block cache, in case it was waiting on this block.
self.pre_finalization_block_cache
.block_processed(block_root);
self.import_block_update_metrics_and_events(
block,
block_root,
block_time_imported,
payload_verification_status,
current_slot,
);
Ok(block_root)
}
/// Check block's consistentency with any configured weak subjectivity checkpoint.
fn check_block_against_weak_subjectivity_checkpoint(
&self,
block: BeaconBlockRef<T::EthSpec>,
block_root: Hash256,
state: &BeaconState<T::EthSpec>,
) -> Result<(), BlockError<T::EthSpec>> {
// Only perform the weak subjectivity check if it was configured.
let wss_checkpoint = if let Some(checkpoint) = self.config.weak_subjectivity_checkpoint {
checkpoint
} else {
return Ok(());
};
// Note: we're using the finalized checkpoint from the head state, rather than fork
// choice.
//
// We are doing this to ensure that we detect changes in finalization. It's possible
// that fork choice has already been updated to the finalized checkpoint in the block
// we're importing.
let current_head_finalized_checkpoint =
self.canonical_head.cached_head().finalized_checkpoint();
// Compare the existing finalized checkpoint with the incoming block's finalized checkpoint.
let new_finalized_checkpoint = state.finalized_checkpoint();
// This ensures we only perform the check once.
if current_head_finalized_checkpoint.epoch < wss_checkpoint.epoch
&& wss_checkpoint.epoch <= new_finalized_checkpoint.epoch
{
if let Err(e) =
self.verify_weak_subjectivity_checkpoint(wss_checkpoint, block_root, state)
{
let mut shutdown_sender = self.shutdown_sender();
crit!(
self.log,
"Weak subjectivity checkpoint verification failed while importing block!";
"block_root" => ?block_root,
"parent_root" => ?block.parent_root(),
"old_finalized_epoch" => ?current_head_finalized_checkpoint.epoch,
"new_finalized_epoch" => ?new_finalized_checkpoint.epoch,
"weak_subjectivity_epoch" => ?wss_checkpoint.epoch,
"error" => ?e
);
crit!(
self.log,
"You must use the `--purge-db` flag to clear the database and restart sync. \
You may be on a hostile network."
);
shutdown_sender
.try_send(ShutdownReason::Failure(
"Weak subjectivity checkpoint verification failed. \
Provided block root is not a checkpoint.",
))
.map_err(|err| {
BlockError::BeaconChainError(
BeaconChainError::WeakSubjectivtyShutdownError(err),
)
})?;
return Err(BlockError::WeakSubjectivityConflict);
}
}
Ok(())
}
/// Process a block for the validator monitor, including all its constituent messages.
fn import_block_update_validator_monitor(
&self,
block: BeaconBlockRef<T::EthSpec>,
state: &BeaconState<T::EthSpec>,
ctxt: &mut ConsensusContext<T::EthSpec>,
current_slot: Slot,
parent_block_slot: Slot,
) {
// Only register blocks with the validator monitor when the block is sufficiently close to
// the current slot.
if VALIDATOR_MONITOR_HISTORIC_EPOCHS as u64 * T::EthSpec::slots_per_epoch()
+ block.slot().as_u64()
< current_slot.as_u64()
{
return;
}
// Allow the validator monitor to learn about a new valid state.
self.validator_monitor
.write()
.process_valid_state(current_slot.epoch(T::EthSpec::slots_per_epoch()), state);
let validator_monitor = self.validator_monitor.read();
// Sync aggregate.
if let Ok(sync_aggregate) = block.body().sync_aggregate() {
// `SyncCommittee` for the sync_aggregate should correspond to the duty slot
let duty_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
match self.sync_committee_at_epoch(duty_epoch) {
Ok(sync_committee) => {
let participant_pubkeys = sync_committee
.pubkeys
.iter()
.zip(sync_aggregate.sync_committee_bits.iter())
.filter_map(|(pubkey, bit)| bit.then_some(pubkey))
.collect::<Vec<_>>();
validator_monitor.register_sync_aggregate_in_block(
block.slot(),
block.parent_root(),
participant_pubkeys,
);
}
Err(e) => {
warn!(
self.log,
"Unable to fetch sync committee";
"epoch" => duty_epoch,
"purpose" => "validator monitor",
"error" => ?e,
);
}
}
}
// Attestations.
for attestation in block.body().attestations() {
let indexed_attestation = match ctxt.get_indexed_attestation(state, attestation) {
Ok(indexed) => indexed,
Err(e) => {
debug!(
self.log,
"Failed to get indexed attestation";
"purpose" => "validator monitor",
"attestation_slot" => attestation.data.slot,
"error" => ?e,
);
continue;
}
};
validator_monitor.register_attestation_in_block(
indexed_attestation,
parent_block_slot,
&self.spec,
);
}
for exit in block.body().voluntary_exits() {
validator_monitor.register_block_voluntary_exit(&exit.message)
}
for slashing in block.body().attester_slashings() {
validator_monitor.register_block_attester_slashing(slashing)
}
for slashing in block.body().proposer_slashings() {
validator_monitor.register_block_proposer_slashing(slashing)
}
}
/// Iterate through the attestations in the block and register them as "observed".
///
/// This will stop us from propagating them on the gossip network.
fn import_block_observe_attestations(
&self,
block: BeaconBlockRef<T::EthSpec>,
state: &BeaconState<T::EthSpec>,
ctxt: &mut ConsensusContext<T::EthSpec>,
current_epoch: Epoch,
) {
// To avoid slowing down sync, only observe attestations if the block is from the
// previous epoch or later.
if state.current_epoch() + 1 < current_epoch {
return;
}
let _timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_ATTESTATION_OBSERVATION);
for a in block.body().attestations() {
match self.observed_attestations.write().observe_item(a, None) {
// If the observation was successful or if the slot for the attestation was too
// low, continue.
//
// We ignore `SlotTooLow` since this will be very common whilst syncing.
Ok(_) | Err(AttestationObservationError::SlotTooLow { .. }) => {}
Err(e) => {
debug!(
self.log,
"Failed to register observed attestation";
"error" => ?e,
"epoch" => a.data.target.epoch
);
}
}
let indexed_attestation = match ctxt.get_indexed_attestation(state, a) {
Ok(indexed) => indexed,
Err(e) => {
debug!(
self.log,
"Failed to get indexed attestation";
"purpose" => "observation",
"attestation_slot" => a.data.slot,
"error" => ?e,
);
continue;
}
};
let mut observed_block_attesters = self.observed_block_attesters.write();
for &validator_index in &indexed_attestation.attesting_indices {
if let Err(e) = observed_block_attesters
.observe_validator(a.data.target.epoch, validator_index as usize)
{
debug!(
self.log,
"Failed to register observed block attester";
"error" => ?e,
"epoch" => a.data.target.epoch,
"validator_index" => validator_index,
)
}
}
}
}
/// If a slasher is configured, provide the attestations from the block.
fn import_block_update_slasher(
&self,
block: BeaconBlockRef<T::EthSpec>,
state: &BeaconState<T::EthSpec>,
ctxt: &mut ConsensusContext<T::EthSpec>,
) {
if let Some(slasher) = self.slasher.as_ref() {
for attestation in block.body().attestations() {
let indexed_attestation = match ctxt.get_indexed_attestation(state, attestation) {
Ok(indexed) => indexed,
Err(e) => {
debug!(
self.log,
"Failed to get indexed attestation";
"purpose" => "slasher",
"attestation_slot" => attestation.data.slot,
"error" => ?e,
);
continue;
}
};
slasher.accept_attestation(indexed_attestation.clone());
}
}
}
fn import_block_update_metrics_and_events(
&self,
block: BeaconBlockRef<T::EthSpec>,
block_root: Hash256,
block_time_imported: Duration,
payload_verification_status: PayloadVerificationStatus,
current_slot: Slot,
) {
// Only present some metrics for blocks from the previous epoch or later.
//
// This helps avoid noise in the metrics during sync.
if block.slot() + 2 * T::EthSpec::slots_per_epoch() >= current_slot {
metrics::observe(
&metrics::OPERATIONS_PER_BLOCK_ATTESTATION,
block.body().attestations().len() as f64,
);
if let Ok(sync_aggregate) = block.body().sync_aggregate() {
metrics::set_gauge(
&metrics::BLOCK_SYNC_AGGREGATE_SET_BITS,
sync_aggregate.num_set_bits() as i64,
);
}
}
let block_delay_total =
get_slot_delay_ms(block_time_imported, block.slot(), &self.slot_clock);
// Do not write to the cache for blocks older than 2 epochs, this helps reduce writes to
// the cache during sync.
@@ -3123,62 +3244,105 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
}
// Do not write to eth1 finalization cache for blocks older than 5 epochs
// this helps reduce noise during sync
if block_delay_total
< self.slot_clock.slot_duration() * 5 * (T::EthSpec::slots_per_epoch() as u32)
{
let parent_block_epoch = parent_block.slot().epoch(T::EthSpec::slots_per_epoch());
if parent_block_epoch < current_epoch {
// we've crossed epoch boundary, store Eth1FinalizationData
let (checkpoint, eth1_finalization_data) =
if current_slot % T::EthSpec::slots_per_epoch() == 0 {
// current block is the checkpoint
(
Checkpoint {
epoch: current_epoch,
root: block_root,
},
current_eth1_finalization_data,
)
} else {
// parent block is the checkpoint
(
Checkpoint {
epoch: current_epoch,
root: parent_block.canonical_root(),
},
parent_eth1_finalization_data,
)
};
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_block_subscribers() {
event_handler.register(EventKind::Block(SseBlock {
slot: block.slot(),
block: block_root,
execution_optimistic: payload_verification_status.is_optimistic(),
}));
}
}
}
if let Some(finalized_eth1_data) = self
.eth1_finalization_cache
.try_write_for(ETH1_FINALIZATION_CACHE_LOCK_TIMEOUT)
.and_then(|mut cache| {
cache.insert(checkpoint, eth1_finalization_data);
cache.finalize(&current_finalized_checkpoint)
})
{
if let Some(eth1_chain) = self.eth1_chain.as_ref() {
let finalized_deposit_count = finalized_eth1_data.deposit_count;
eth1_chain.finalize_eth1_data(finalized_eth1_data);
debug!(
self.log,
"called eth1_chain.finalize_eth1_data()";
"epoch" => current_finalized_checkpoint.epoch,
"deposit count" => finalized_deposit_count,
);
}
fn import_block_update_shuffling_cache(
&self,
block_root: Hash256,
state: &mut BeaconState<T::EthSpec>,
) -> Result<(), BlockError<T::EthSpec>> {
// For the current and next epoch of this state, ensure we have the shuffling from this
// block in our cache.
for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] {
let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?;
let shuffling_is_cached = self
.shuffling_cache
.try_read_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or(Error::AttestationCacheLockTimeout)?
.contains(&shuffling_id);
if !shuffling_is_cached {
state.build_committee_cache(relative_epoch, &self.spec)?;
let committee_cache = state.committee_cache(relative_epoch)?;
self.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or(Error::AttestationCacheLockTimeout)?
.insert_committee_cache(shuffling_id, committee_cache);
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn import_block_update_deposit_contract_finalization(
&self,
block: BeaconBlockRef<T::EthSpec>,
block_root: Hash256,
current_epoch: Epoch,
current_finalized_checkpoint: Checkpoint,
current_eth1_finalization_data: Eth1FinalizationData,
parent_eth1_finalization_data: Eth1FinalizationData,
parent_block_slot: Slot,
) {
// Do not write to eth1 finalization cache for blocks older than 5 epochs.
if block.slot().epoch(T::EthSpec::slots_per_epoch()) + 5 < current_epoch {
return;
}
let parent_block_epoch = parent_block_slot.epoch(T::EthSpec::slots_per_epoch());
if parent_block_epoch < current_epoch {
// we've crossed epoch boundary, store Eth1FinalizationData
let (checkpoint, eth1_finalization_data) =
if block.slot() % T::EthSpec::slots_per_epoch() == 0 {
// current block is the checkpoint
(
Checkpoint {
epoch: current_epoch,
root: block_root,
},
current_eth1_finalization_data,
)
} else {
// parent block is the checkpoint
(
Checkpoint {
epoch: current_epoch,
root: block.parent_root(),
},
parent_eth1_finalization_data,
)
};
if let Some(finalized_eth1_data) = self
.eth1_finalization_cache
.try_write_for(ETH1_FINALIZATION_CACHE_LOCK_TIMEOUT)
.and_then(|mut cache| {
cache.insert(checkpoint, eth1_finalization_data);
cache.finalize(&current_finalized_checkpoint)
})
{
if let Some(eth1_chain) = self.eth1_chain.as_ref() {
let finalized_deposit_count = finalized_eth1_data.deposit_count;
eth1_chain.finalize_eth1_data(finalized_eth1_data);
debug!(
self.log,
"called eth1_chain.finalize_eth1_data()";
"epoch" => current_finalized_checkpoint.epoch,
"deposit count" => finalized_deposit_count,
);
}
}
}
// Inform the unknown block cache, in case it was waiting on this block.
self.pre_finalization_block_cache
.block_processed(block_root);
Ok(block_root)
}
/// If configured, wait for the fork choice run at the start of the slot to complete.
@@ -3559,10 +3723,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// This will be a lot slower but guards against bugs in block production and can be
// quickly rolled out without a release.
if self.config.paranoid_block_proposal {
let mut tmp_ctxt = ConsensusContext::new(state.slot());
attestations.retain(|att| {
verify_attestation_for_block_inclusion(
&state,
att,
&mut tmp_ctxt,
VerifySignatures::True,
&self.spec,
)