Merge in staging, update validator store/cache

Merge remote-tracking branch 'origin/staging' into tree-states
This commit is contained in:
Michael Sproul
2022-11-30 17:27:55 +11:00
15 changed files with 747 additions and 439 deletions

View File

@@ -72,7 +72,7 @@ use slog::{crit, debug, error, info, trace, warn, Logger};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use ssz::Encode; use ssz::Encode;
use state_processing::{ use state_processing::{
common::{get_attesting_indices_from_state, get_indexed_attestation}, common::get_attesting_indices_from_state,
per_block_processing, per_block_processing,
per_block_processing::{ per_block_processing::{
errors::AttestationValidationError, verify_attestation_for_block_inclusion, errors::AttestationValidationError, verify_attestation_for_block_inclusion,
@@ -2516,6 +2516,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
confirmed_state_roots, confirmed_state_roots,
payload_verification_handle, payload_verification_handle,
parent_eth1_finalization_data, parent_eth1_finalization_data,
consensus_context,
} = execution_pending_block; } = execution_pending_block;
let PayloadVerificationOutcome { let PayloadVerificationOutcome {
@@ -2569,6 +2570,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
count_unrealized, count_unrealized,
parent_block, parent_block,
parent_eth1_finalization_data, parent_eth1_finalization_data,
consensus_context,
) )
}, },
"payload_verification_handle", "payload_verification_handle",
@@ -2594,68 +2596,34 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
count_unrealized: CountUnrealized, count_unrealized: CountUnrealized,
parent_block: SignedBlindedBeaconBlock<T::EthSpec>, parent_block: SignedBlindedBeaconBlock<T::EthSpec>,
parent_eth1_finalization_data: Eth1FinalizationData, parent_eth1_finalization_data: Eth1FinalizationData,
mut consensus_context: ConsensusContext<T::EthSpec>,
) -> Result<Hash256, BlockError<T::EthSpec>> { ) -> Result<Hash256, BlockError<T::EthSpec>> {
// ----------------------------- BLOCK NOT YET ATTESTABLE ----------------------------------
// Everything in this initial section is on the hot path between processing the block and
// being able to attest to it. DO NOT add any extra processing in this initial section
// unless it must run before fork choice.
// -----------------------------------------------------------------------------------------
let current_slot = self.slot()?; let current_slot = self.slot()?;
let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch());
let block = signed_block.message();
let post_exec_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_POST_EXEC_PROCESSING);
let attestation_observation_timer = // Check against weak subjectivity checkpoint.
metrics::start_timer(&metrics::BLOCK_PROCESSING_ATTESTATION_OBSERVATION); self.check_block_against_weak_subjectivity_checkpoint(block, block_root, &state)?;
// Iterate through the attestations in the block and register them as an "observed
// attestation". This will stop us from propagating them on the gossip network.
for a in signed_block.message().body().attestations() {
match self.observed_attestations.write().observe_item(a, None) {
// If the observation was successful or if the slot for the attestation was too
// low, continue.
//
// We ignore `SlotTooLow` since this will be very common whilst syncing.
Ok(_) | Err(AttestationObservationError::SlotTooLow { .. }) => {}
Err(e) => return Err(BlockError::BeaconChainError(e.into())),
}
}
metrics::stop_timer(attestation_observation_timer);
// If a slasher is configured, provide the attestations from the block.
if let Some(slasher) = self.slasher.as_ref() {
for attestation in signed_block.message().body().attestations() {
let committee =
state.get_beacon_committee(attestation.data.slot, attestation.data.index)?;
let indexed_attestation = get_indexed_attestation(committee.committee, attestation)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
slasher.accept_attestation(indexed_attestation);
}
}
// If there are new validators in this block, update our pubkey cache. // If there are new validators in this block, update our pubkey cache.
// //
// We perform this _before_ adding the block to fork choice because the pubkey cache is // The only keys imported here will be ones for validators deposited in this block, because
// used by attestation processing which will only process an attestation if the block is // the cache *must* already have been updated for the parent block when it was imported.
// known to fork choice. This ordering ensure that the pubkey cache is always up-to-date. // Newly deposited validators are not active and their keys are not required by other parts
self.validator_pubkey_cache // of block processing. The reason we do this here and not after making the block attestable
// is so we don't have to think about lock ordering with respect to the fork choice lock.
// There are a bunch of places where we lock both fork choice and the pubkey cache and it
// would be difficult to check that they all lock fork choice first.
let mut kv_store_ops = self
.validator_pubkey_cache
.write() .write()
.import_new_pubkeys(&state, &self.store)?; .import_new_pubkeys(&state)?;
// For the current and next epoch of this state, ensure we have the shuffling from this
// block in our cache.
for relative_epoch in &[RelativeEpoch::Current, RelativeEpoch::Next] {
let shuffling_id = AttestationShufflingId::new(block_root, &state, *relative_epoch)?;
let shuffling_is_cached = self
.shuffling_cache
.try_read_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or(Error::AttestationCacheLockTimeout)?
.contains(&shuffling_id);
if !shuffling_is_cached {
state.build_committee_cache(*relative_epoch, &self.spec)?;
let committee_cache = state.committee_cache(*relative_epoch)?;
self.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or(Error::AttestationCacheLockTimeout)?
.insert_committee_cache(shuffling_id, committee_cache);
}
}
// Apply the state to the attester cache, only if it is from the previous epoch or later. // Apply the state to the attester cache, only if it is from the previous epoch or later.
// //
@@ -2668,52 +2636,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_err(BeaconChainError::from)?; .map_err(BeaconChainError::from)?;
} }
// Alias for readability. // Take an exclusive write-lock on fork choice. It's very important to prevent deadlocks by
let block = signed_block.message();
// Only perform the weak subjectivity check if it was configured.
if let Some(wss_checkpoint) = self.config.weak_subjectivity_checkpoint {
// Note: we're using the finalized checkpoint from the head state, rather than fork
// choice.
//
// We are doing this to ensure that we detect changes in finalization. It's possible
// that fork choice has already been updated to the finalized checkpoint in the block
// we're importing.
let current_head_finalized_checkpoint =
self.canonical_head.cached_head().finalized_checkpoint();
// Compare the existing finalized checkpoint with the incoming block's finalized checkpoint.
let new_finalized_checkpoint = state.finalized_checkpoint();
// This ensures we only perform the check once.
if (current_head_finalized_checkpoint.epoch < wss_checkpoint.epoch)
&& (wss_checkpoint.epoch <= new_finalized_checkpoint.epoch)
{
if let Err(e) =
self.verify_weak_subjectivity_checkpoint(wss_checkpoint, block_root, &state)
{
let mut shutdown_sender = self.shutdown_sender();
crit!(
self.log,
"Weak subjectivity checkpoint verification failed while importing block!";
"block_root" => ?block_root,
"parent_root" => ?block.parent_root(),
"old_finalized_epoch" => ?current_head_finalized_checkpoint.epoch,
"new_finalized_epoch" => ?new_finalized_checkpoint.epoch,
"weak_subjectivity_epoch" => ?wss_checkpoint.epoch,
"error" => ?e,
);
crit!(self.log, "You must use the `--purge-db` flag to clear the database and restart sync. You may be on a hostile network.");
shutdown_sender
.try_send(ShutdownReason::Failure(
"Weak subjectivity checkpoint verification failed. Provided block root is not a checkpoint."
))
.map_err(|err| BlockError::BeaconChainError(BeaconChainError::WeakSubjectivtyShutdownError(err)))?;
return Err(BlockError::WeakSubjectivityConflict);
}
}
}
// Take an exclusive write-lock on fork choice. It's very important prevent deadlocks by
// avoiding taking other locks whilst holding this lock. // avoiding taking other locks whilst holding this lock.
let mut fork_choice = self.canonical_head.fork_choice_write_lock(); let mut fork_choice = self.canonical_head.fork_choice_write_lock();
@@ -2743,77 +2666,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_err(|e| BlockError::BeaconChainError(e.into()))?; .map_err(|e| BlockError::BeaconChainError(e.into()))?;
} }
// Allow the validator monitor to learn about a new valid state.
self.validator_monitor
.write()
.process_valid_state(current_slot.epoch(T::EthSpec::slots_per_epoch()), &state);
let validator_monitor = self.validator_monitor.read();
// Register each attester slashing in the block with fork choice.
for attester_slashing in block.body().attester_slashings() {
fork_choice.on_attester_slashing(attester_slashing);
}
// Register each attestation in the block with the fork choice service.
for attestation in block.body().attestations() {
let _fork_choice_attestation_timer =
metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_ATTESTATION_TIMES);
let attestation_target_epoch = attestation.data.target.epoch;
let committee =
state.get_beacon_committee(attestation.data.slot, attestation.data.index)?;
let indexed_attestation = get_indexed_attestation(committee.committee, attestation)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
match fork_choice.on_attestation(
current_slot,
&indexed_attestation,
AttestationFromBlock::True,
&self.spec,
) {
Ok(()) => Ok(()),
// Ignore invalid attestations whilst importing attestations from a block. The
// block might be very old and therefore the attestations useless to fork choice.
Err(ForkChoiceError::InvalidAttestation(_)) => Ok(()),
Err(e) => Err(BlockError::BeaconChainError(e.into())),
}?;
// To avoid slowing down sync, only register attestations for the
// `observed_block_attesters` if they are from the previous epoch or later.
if attestation_target_epoch + 1 >= current_epoch {
let mut observed_block_attesters = self.observed_block_attesters.write();
for &validator_index in &indexed_attestation.attesting_indices {
if let Err(e) = observed_block_attesters
.observe_validator(attestation_target_epoch, validator_index as usize)
{
debug!(
self.log,
"Failed to register observed block attester";
"error" => ?e,
"epoch" => attestation_target_epoch,
"validator_index" => validator_index,
)
}
}
}
// Only register this with the validator monitor when the block is sufficiently close to
// the current slot.
if VALIDATOR_MONITOR_HISTORIC_EPOCHS as u64 * T::EthSpec::slots_per_epoch()
+ block.slot().as_u64()
>= current_slot.as_u64()
{
match fork_choice.get_block(&block.parent_root()) {
Some(parent_block) => validator_monitor.register_attestation_in_block(
&indexed_attestation,
parent_block.slot,
&self.spec,
),
None => warn!(self.log, "Failed to get parent block"; "slot" => %block.slot()),
}
}
}
// If the block is recent enough and it was not optimistically imported, check to see if it // If the block is recent enough and it was not optimistically imported, check to see if it
// becomes the head block. If so, apply it to the early attester cache. This will allow // becomes the head block. If so, apply it to the early attester cache. This will allow
// attestations to the block without waiting for the block and state to be inserted to the // attestations to the block without waiting for the block and state to be inserted to the
@@ -2869,56 +2721,28 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
), ),
} }
} }
drop(post_exec_timer);
// Register sync aggregate with validator monitor // ---------------------------- BLOCK PROBABLY ATTESTABLE ----------------------------------
if let Ok(sync_aggregate) = block.body().sync_aggregate() { // Most blocks are now capable of being attested to thanks to the `early_attester_cache`
// `SyncCommittee` for the sync_aggregate should correspond to the duty slot // cache above. Resume non-essential processing.
let duty_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); // -----------------------------------------------------------------------------------------
let sync_committee = self.sync_committee_at_epoch(duty_epoch)?;
let participant_pubkeys = sync_committee
.pubkeys
.iter()
.zip(sync_aggregate.sync_committee_bits.iter())
.filter_map(|(pubkey, bit)| bit.then_some(pubkey))
.collect::<Vec<_>>();
validator_monitor.register_sync_aggregate_in_block( self.import_block_update_shuffling_cache(block_root, &mut state)?;
block.slot(), self.import_block_observe_attestations(
block.parent_root(), block,
participant_pubkeys, &state,
); &mut consensus_context,
} current_epoch,
);
for exit in block.body().voluntary_exits() { self.import_block_update_validator_monitor(
validator_monitor.register_block_voluntary_exit(&exit.message) block,
} &state,
&mut consensus_context,
for slashing in block.body().attester_slashings() { current_slot,
validator_monitor.register_block_attester_slashing(slashing) parent_block.slot(),
} );
self.import_block_update_slasher(block, &state, &mut consensus_context);
for slashing in block.body().proposer_slashings() {
validator_monitor.register_block_proposer_slashing(slashing)
}
drop(validator_monitor);
// Only present some metrics for blocks from the previous epoch or later.
//
// This helps avoid noise in the metrics during sync.
if block.slot().epoch(T::EthSpec::slots_per_epoch()) + 1 >= self.epoch()? {
metrics::observe(
&metrics::OPERATIONS_PER_BLOCK_ATTESTATION,
block.body().attestations().len() as f64,
);
if let Ok(sync_aggregate) = block.body().sync_aggregate() {
metrics::set_gauge(
&metrics::BLOCK_SYNC_AGGREGATE_SET_BITS,
sync_aggregate.num_set_bits() as i64,
);
}
}
let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE); let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE);
@@ -2935,7 +2759,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
ops.push(StoreOp::PutState(block.state_root(), &state)); ops.push(StoreOp::PutState(block.state_root(), &state));
let txn_lock = self.store.hot_db.begin_rw_transaction(); let txn_lock = self.store.hot_db.begin_rw_transaction();
if let Err(e) = self.store.do_atomically(ops) { kv_store_ops.extend(self.store.convert_to_kv_batch(ops)?);
if let Err(e) = self.store.hot_db.do_atomically(kv_store_ops) {
error!( error!(
self.log, self.log,
"Database write failed!"; "Database write failed!";
@@ -2943,6 +2769,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"error" => ?e, "error" => ?e,
); );
// Clear the early attester cache to prevent attestations which we would later be unable
// to verify due to the failure.
self.early_attester_cache.clear();
// Since the write failed, try to revert the canonical head back to what was stored // Since the write failed, try to revert the canonical head back to what was stored
// in the database. This attempts to prevent inconsistency between the database and // in the database. This attempts to prevent inconsistency between the database and
// fork choice. // fork choice.
@@ -2989,22 +2819,312 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.head_tracker self.head_tracker
.register_block(block_root, parent_root, slot); .register_block(block_root, parent_root, slot);
// Send an event to the `events` endpoint after fully processing the block.
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_block_subscribers() {
event_handler.register(EventKind::Block(SseBlock {
slot,
block: block_root,
execution_optimistic: payload_verification_status.is_optimistic(),
}));
}
}
metrics::stop_timer(db_write_timer); metrics::stop_timer(db_write_timer);
metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES);
let block_delay_total = get_slot_delay_ms(block_time_imported, slot, &self.slot_clock); // Update the deposit contract cache.
self.import_block_update_deposit_contract_finalization(
block,
block_root,
current_epoch,
current_finalized_checkpoint,
current_eth1_finalization_data,
parent_eth1_finalization_data,
parent_block.slot(),
);
// Inform the unknown block cache, in case it was waiting on this block.
self.pre_finalization_block_cache
.block_processed(block_root);
self.import_block_update_metrics_and_events(
block,
block_root,
block_time_imported,
payload_verification_status,
current_slot,
);
Ok(block_root)
}
/// Check block's consistentency with any configured weak subjectivity checkpoint.
fn check_block_against_weak_subjectivity_checkpoint(
&self,
block: BeaconBlockRef<T::EthSpec>,
block_root: Hash256,
state: &BeaconState<T::EthSpec>,
) -> Result<(), BlockError<T::EthSpec>> {
// Only perform the weak subjectivity check if it was configured.
let wss_checkpoint = if let Some(checkpoint) = self.config.weak_subjectivity_checkpoint {
checkpoint
} else {
return Ok(());
};
// Note: we're using the finalized checkpoint from the head state, rather than fork
// choice.
//
// We are doing this to ensure that we detect changes in finalization. It's possible
// that fork choice has already been updated to the finalized checkpoint in the block
// we're importing.
let current_head_finalized_checkpoint =
self.canonical_head.cached_head().finalized_checkpoint();
// Compare the existing finalized checkpoint with the incoming block's finalized checkpoint.
let new_finalized_checkpoint = state.finalized_checkpoint();
// This ensures we only perform the check once.
if current_head_finalized_checkpoint.epoch < wss_checkpoint.epoch
&& wss_checkpoint.epoch <= new_finalized_checkpoint.epoch
{
if let Err(e) =
self.verify_weak_subjectivity_checkpoint(wss_checkpoint, block_root, state)
{
let mut shutdown_sender = self.shutdown_sender();
crit!(
self.log,
"Weak subjectivity checkpoint verification failed while importing block!";
"block_root" => ?block_root,
"parent_root" => ?block.parent_root(),
"old_finalized_epoch" => ?current_head_finalized_checkpoint.epoch,
"new_finalized_epoch" => ?new_finalized_checkpoint.epoch,
"weak_subjectivity_epoch" => ?wss_checkpoint.epoch,
"error" => ?e
);
crit!(
self.log,
"You must use the `--purge-db` flag to clear the database and restart sync. \
You may be on a hostile network."
);
shutdown_sender
.try_send(ShutdownReason::Failure(
"Weak subjectivity checkpoint verification failed. \
Provided block root is not a checkpoint.",
))
.map_err(|err| {
BlockError::BeaconChainError(
BeaconChainError::WeakSubjectivtyShutdownError(err),
)
})?;
return Err(BlockError::WeakSubjectivityConflict);
}
}
Ok(())
}
/// Process a block for the validator monitor, including all its constituent messages.
fn import_block_update_validator_monitor(
&self,
block: BeaconBlockRef<T::EthSpec>,
state: &BeaconState<T::EthSpec>,
ctxt: &mut ConsensusContext<T::EthSpec>,
current_slot: Slot,
parent_block_slot: Slot,
) {
// Only register blocks with the validator monitor when the block is sufficiently close to
// the current slot.
if VALIDATOR_MONITOR_HISTORIC_EPOCHS as u64 * T::EthSpec::slots_per_epoch()
+ block.slot().as_u64()
< current_slot.as_u64()
{
return;
}
// Allow the validator monitor to learn about a new valid state.
self.validator_monitor
.write()
.process_valid_state(current_slot.epoch(T::EthSpec::slots_per_epoch()), state);
let validator_monitor = self.validator_monitor.read();
// Sync aggregate.
if let Ok(sync_aggregate) = block.body().sync_aggregate() {
// `SyncCommittee` for the sync_aggregate should correspond to the duty slot
let duty_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
match self.sync_committee_at_epoch(duty_epoch) {
Ok(sync_committee) => {
let participant_pubkeys = sync_committee
.pubkeys
.iter()
.zip(sync_aggregate.sync_committee_bits.iter())
.filter_map(|(pubkey, bit)| bit.then_some(pubkey))
.collect::<Vec<_>>();
validator_monitor.register_sync_aggregate_in_block(
block.slot(),
block.parent_root(),
participant_pubkeys,
);
}
Err(e) => {
warn!(
self.log,
"Unable to fetch sync committee";
"epoch" => duty_epoch,
"purpose" => "validator monitor",
"error" => ?e,
);
}
}
}
// Attestations.
for attestation in block.body().attestations() {
let indexed_attestation = match ctxt.get_indexed_attestation(state, attestation) {
Ok(indexed) => indexed,
Err(e) => {
debug!(
self.log,
"Failed to get indexed attestation";
"purpose" => "validator monitor",
"attestation_slot" => attestation.data.slot,
"error" => ?e,
);
continue;
}
};
validator_monitor.register_attestation_in_block(
indexed_attestation,
parent_block_slot,
&self.spec,
);
}
for exit in block.body().voluntary_exits() {
validator_monitor.register_block_voluntary_exit(&exit.message)
}
for slashing in block.body().attester_slashings() {
validator_monitor.register_block_attester_slashing(slashing)
}
for slashing in block.body().proposer_slashings() {
validator_monitor.register_block_proposer_slashing(slashing)
}
}
/// Iterate through the attestations in the block and register them as "observed".
///
/// This will stop us from propagating them on the gossip network.
fn import_block_observe_attestations(
&self,
block: BeaconBlockRef<T::EthSpec>,
state: &BeaconState<T::EthSpec>,
ctxt: &mut ConsensusContext<T::EthSpec>,
current_epoch: Epoch,
) {
// To avoid slowing down sync, only observe attestations if the block is from the
// previous epoch or later.
if state.current_epoch() + 1 < current_epoch {
return;
}
let _timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_ATTESTATION_OBSERVATION);
for a in block.body().attestations() {
match self.observed_attestations.write().observe_item(a, None) {
// If the observation was successful or if the slot for the attestation was too
// low, continue.
//
// We ignore `SlotTooLow` since this will be very common whilst syncing.
Ok(_) | Err(AttestationObservationError::SlotTooLow { .. }) => {}
Err(e) => {
debug!(
self.log,
"Failed to register observed attestation";
"error" => ?e,
"epoch" => a.data.target.epoch
);
}
}
let indexed_attestation = match ctxt.get_indexed_attestation(state, a) {
Ok(indexed) => indexed,
Err(e) => {
debug!(
self.log,
"Failed to get indexed attestation";
"purpose" => "observation",
"attestation_slot" => a.data.slot,
"error" => ?e,
);
continue;
}
};
let mut observed_block_attesters = self.observed_block_attesters.write();
for &validator_index in &indexed_attestation.attesting_indices {
if let Err(e) = observed_block_attesters
.observe_validator(a.data.target.epoch, validator_index as usize)
{
debug!(
self.log,
"Failed to register observed block attester";
"error" => ?e,
"epoch" => a.data.target.epoch,
"validator_index" => validator_index,
)
}
}
}
}
/// If a slasher is configured, provide the attestations from the block.
fn import_block_update_slasher(
&self,
block: BeaconBlockRef<T::EthSpec>,
state: &BeaconState<T::EthSpec>,
ctxt: &mut ConsensusContext<T::EthSpec>,
) {
if let Some(slasher) = self.slasher.as_ref() {
for attestation in block.body().attestations() {
let indexed_attestation = match ctxt.get_indexed_attestation(state, attestation) {
Ok(indexed) => indexed,
Err(e) => {
debug!(
self.log,
"Failed to get indexed attestation";
"purpose" => "slasher",
"attestation_slot" => attestation.data.slot,
"error" => ?e,
);
continue;
}
};
slasher.accept_attestation(indexed_attestation.clone());
}
}
}
fn import_block_update_metrics_and_events(
&self,
block: BeaconBlockRef<T::EthSpec>,
block_root: Hash256,
block_time_imported: Duration,
payload_verification_status: PayloadVerificationStatus,
current_slot: Slot,
) {
// Only present some metrics for blocks from the previous epoch or later.
//
// This helps avoid noise in the metrics during sync.
if block.slot() + 2 * T::EthSpec::slots_per_epoch() >= current_slot {
metrics::observe(
&metrics::OPERATIONS_PER_BLOCK_ATTESTATION,
block.body().attestations().len() as f64,
);
if let Ok(sync_aggregate) = block.body().sync_aggregate() {
metrics::set_gauge(
&metrics::BLOCK_SYNC_AGGREGATE_SET_BITS,
sync_aggregate.num_set_bits() as i64,
);
}
}
let block_delay_total =
get_slot_delay_ms(block_time_imported, block.slot(), &self.slot_clock);
// Do not write to the cache for blocks older than 2 epochs, this helps reduce writes to // Do not write to the cache for blocks older than 2 epochs, this helps reduce writes to
// the cache during sync. // the cache during sync.
@@ -3036,62 +3156,105 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
); );
} }
// Do not write to eth1 finalization cache for blocks older than 5 epochs if let Some(event_handler) = self.event_handler.as_ref() {
// this helps reduce noise during sync if event_handler.has_block_subscribers() {
if block_delay_total event_handler.register(EventKind::Block(SseBlock {
< self.slot_clock.slot_duration() * 5 * (T::EthSpec::slots_per_epoch() as u32) slot: block.slot(),
{ block: block_root,
let parent_block_epoch = parent_block.slot().epoch(T::EthSpec::slots_per_epoch()); execution_optimistic: payload_verification_status.is_optimistic(),
if parent_block_epoch < current_epoch { }));
// we've crossed epoch boundary, store Eth1FinalizationData }
let (checkpoint, eth1_finalization_data) = }
if current_slot % T::EthSpec::slots_per_epoch() == 0 { }
// current block is the checkpoint
(
Checkpoint {
epoch: current_epoch,
root: block_root,
},
current_eth1_finalization_data,
)
} else {
// parent block is the checkpoint
(
Checkpoint {
epoch: current_epoch,
root: parent_block.canonical_root(),
},
parent_eth1_finalization_data,
)
};
if let Some(finalized_eth1_data) = self fn import_block_update_shuffling_cache(
.eth1_finalization_cache &self,
.try_write_for(ETH1_FINALIZATION_CACHE_LOCK_TIMEOUT) block_root: Hash256,
.and_then(|mut cache| { state: &mut BeaconState<T::EthSpec>,
cache.insert(checkpoint, eth1_finalization_data); ) -> Result<(), BlockError<T::EthSpec>> {
cache.finalize(&current_finalized_checkpoint) // For the current and next epoch of this state, ensure we have the shuffling from this
}) // block in our cache.
{ for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] {
if let Some(eth1_chain) = self.eth1_chain.as_ref() { let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?;
let finalized_deposit_count = finalized_eth1_data.deposit_count;
eth1_chain.finalize_eth1_data(finalized_eth1_data); let shuffling_is_cached = self
debug!( .shuffling_cache
self.log, .try_read_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
"called eth1_chain.finalize_eth1_data()"; .ok_or(Error::AttestationCacheLockTimeout)?
"epoch" => current_finalized_checkpoint.epoch, .contains(&shuffling_id);
"deposit count" => finalized_deposit_count,
); if !shuffling_is_cached {
} state.build_committee_cache(relative_epoch, &self.spec)?;
let committee_cache = state.committee_cache(relative_epoch)?;
self.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.ok_or(Error::AttestationCacheLockTimeout)?
.insert_committee_cache(shuffling_id, committee_cache);
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn import_block_update_deposit_contract_finalization(
&self,
block: BeaconBlockRef<T::EthSpec>,
block_root: Hash256,
current_epoch: Epoch,
current_finalized_checkpoint: Checkpoint,
current_eth1_finalization_data: Eth1FinalizationData,
parent_eth1_finalization_data: Eth1FinalizationData,
parent_block_slot: Slot,
) {
// Do not write to eth1 finalization cache for blocks older than 5 epochs.
if block.slot().epoch(T::EthSpec::slots_per_epoch()) + 5 < current_epoch {
return;
}
let parent_block_epoch = parent_block_slot.epoch(T::EthSpec::slots_per_epoch());
if parent_block_epoch < current_epoch {
// we've crossed epoch boundary, store Eth1FinalizationData
let (checkpoint, eth1_finalization_data) =
if block.slot() % T::EthSpec::slots_per_epoch() == 0 {
// current block is the checkpoint
(
Checkpoint {
epoch: current_epoch,
root: block_root,
},
current_eth1_finalization_data,
)
} else {
// parent block is the checkpoint
(
Checkpoint {
epoch: current_epoch,
root: block.parent_root(),
},
parent_eth1_finalization_data,
)
};
if let Some(finalized_eth1_data) = self
.eth1_finalization_cache
.try_write_for(ETH1_FINALIZATION_CACHE_LOCK_TIMEOUT)
.and_then(|mut cache| {
cache.insert(checkpoint, eth1_finalization_data);
cache.finalize(&current_finalized_checkpoint)
})
{
if let Some(eth1_chain) = self.eth1_chain.as_ref() {
let finalized_deposit_count = finalized_eth1_data.deposit_count;
eth1_chain.finalize_eth1_data(finalized_eth1_data);
debug!(
self.log,
"called eth1_chain.finalize_eth1_data()";
"epoch" => current_finalized_checkpoint.epoch,
"deposit count" => finalized_deposit_count,
);
} }
} }
} }
// Inform the unknown block cache, in case it was waiting on this block.
self.pre_finalization_block_cache
.block_processed(block_root);
Ok(block_root)
} }
/// If configured, wait for the fork choice run at the start of the slot to complete. /// If configured, wait for the fork choice run at the start of the slot to complete.

View File

@@ -52,21 +52,22 @@ use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOC
use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::{ use crate::{
beacon_chain::{ beacon_chain::{
BeaconForkChoice, MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT, BeaconForkChoice, ForkChoiceError, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT,
}, },
metrics, BeaconChain, BeaconChainError, BeaconChainTypes, metrics, BeaconChain, BeaconChainError, BeaconChainTypes,
}; };
use derivative::Derivative; use derivative::Derivative;
use eth2::types::EventKind; use eth2::types::EventKind;
use execution_layer::PayloadStatus; use execution_layer::PayloadStatus;
use fork_choice::PayloadVerificationStatus; use fork_choice::{AttestationFromBlock, PayloadVerificationStatus};
use parking_lot::RwLockReadGuard; use parking_lot::RwLockReadGuard;
use proto_array::Block as ProtoBlock; use proto_array::Block as ProtoBlock;
use safe_arith::ArithError; use safe_arith::ArithError;
use slog::{debug, error, warn, Logger}; use slog::{debug, error, warn, Logger};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use ssz::Encode; use ssz::Encode;
use state_processing::per_block_processing::is_merge_transition_block; use state_processing::per_block_processing::{errors::IntoWithIndex, is_merge_transition_block};
use state_processing::{ use state_processing::{
block_signature_verifier::{BlockSignatureVerifier, Error as BlockSignatureVerifierError}, block_signature_verifier::{BlockSignatureVerifier, Error as BlockSignatureVerifierError},
per_block_processing, per_slot_processing, per_block_processing, per_slot_processing,
@@ -554,7 +555,7 @@ pub fn signature_verify_chain_segment<T: BeaconChainTypes>(
let mut consensus_context = let mut consensus_context =
ConsensusContext::new(block.slot()).set_current_block_root(*block_root); ConsensusContext::new(block.slot()).set_current_block_root(*block_root);
signature_verifier.include_all_signatures(&block, &mut consensus_context)?; signature_verifier.include_all_signatures(block, &mut consensus_context)?;
// Save the block and its consensus context. The context will have had its proposer index // Save the block and its consensus context. The context will have had its proposer index
// and attesting indices filled in, which can be used to accelerate later block processing. // and attesting indices filled in, which can be used to accelerate later block processing.
@@ -621,6 +622,7 @@ pub struct ExecutionPendingBlock<T: BeaconChainTypes> {
pub parent_block: SignedBeaconBlock<T::EthSpec, BlindedPayload<T::EthSpec>>, pub parent_block: SignedBeaconBlock<T::EthSpec, BlindedPayload<T::EthSpec>>,
pub parent_eth1_finalization_data: Eth1FinalizationData, pub parent_eth1_finalization_data: Eth1FinalizationData,
pub confirmed_state_roots: Vec<Hash256>, pub confirmed_state_roots: Vec<Hash256>,
pub consensus_context: ConsensusContext<T::EthSpec>,
pub payload_verification_handle: PayloadVerificationHandle<T::EthSpec>, pub payload_verification_handle: PayloadVerificationHandle<T::EthSpec>,
} }
@@ -1137,6 +1139,79 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
check_block_relevancy(&block, block_root, chain)?; check_block_relevancy(&block, block_root, chain)?;
// Define a future that will verify the execution payload with an execution engine.
//
// We do this as early as possible so that later parts of this function can run in parallel
// with the payload verification.
let payload_notifier = PayloadNotifier::new(
chain.clone(),
block.clone(),
&parent.pre_state,
notify_execution_layer,
)?;
let is_valid_merge_transition_block =
is_merge_transition_block(&parent.pre_state, block.message().body());
let payload_verification_future = async move {
let chain = payload_notifier.chain.clone();
let block = payload_notifier.block.clone();
// If this block triggers the merge, check to ensure that it references valid execution
// blocks.
//
// The specification defines this check inside `on_block` in the fork-choice specification,
// however we perform the check here for two reasons:
//
// - There's no point in importing a block that will fail fork choice, so it's best to fail
// early.
// - Doing the check here means we can keep our fork-choice implementation "pure". I.e., no
// calls to remote servers.
if is_valid_merge_transition_block {
validate_merge_block(&chain, block.message(), AllowOptimisticImport::Yes).await?;
};
// The specification declares that this should be run *inside* `per_block_processing`,
// however we run it here to keep `per_block_processing` pure (i.e., no calls to external
// servers).
let payload_verification_status = payload_notifier.notify_new_payload().await?;
// If the payload did not validate or invalidate the block, check to see if this block is
// valid for optimistic import.
if payload_verification_status.is_optimistic() {
let block_hash_opt = block
.message()
.body()
.execution_payload()
.map(|full_payload| full_payload.execution_payload.block_hash);
// Ensure the block is a candidate for optimistic import.
if !is_optimistic_candidate_block(&chain, block.slot(), block.parent_root()).await?
{
warn!(
chain.log,
"Rejecting optimistic block";
"block_hash" => ?block_hash_opt,
"msg" => "the execution engine is not synced"
);
return Err(ExecutionPayloadError::UnverifiedNonOptimisticCandidate.into());
}
}
Ok(PayloadVerificationOutcome {
payload_verification_status,
is_valid_merge_transition_block,
})
};
// Spawn the payload verification future as a new task, but don't wait for it to complete.
// The `payload_verification_future` will be awaited later to ensure verification completed
// successfully.
let payload_verification_handle = chain
.task_executor
.spawn_handle(
payload_verification_future,
"execution_payload_verification",
)
.ok_or(BeaconChainError::RuntimeShutdown)?;
/* /*
* Advance the given `parent.beacon_state` to the slot of the given `block`. * Advance the given `parent.beacon_state` to the slot of the given `block`.
*/ */
@@ -1221,80 +1296,11 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
summaries.push(summary); summaries.push(summary);
} }
} }
metrics::stop_timer(catchup_timer);
let block_slot = block.slot(); let block_slot = block.slot();
let state_current_epoch = state.current_epoch(); let state_current_epoch = state.current_epoch();
// Define a future that will verify the execution payload with an execution engine (but
// don't execute it yet).
let payload_notifier =
PayloadNotifier::new(chain.clone(), block.clone(), &state, notify_execution_layer)?;
let is_valid_merge_transition_block =
is_merge_transition_block(&state, block.message().body());
let payload_verification_future = async move {
let chain = payload_notifier.chain.clone();
let block = payload_notifier.block.clone();
// If this block triggers the merge, check to ensure that it references valid execution
// blocks.
//
// The specification defines this check inside `on_block` in the fork-choice specification,
// however we perform the check here for two reasons:
//
// - There's no point in importing a block that will fail fork choice, so it's best to fail
// early.
// - Doing the check here means we can keep our fork-choice implementation "pure". I.e., no
// calls to remote servers.
if is_valid_merge_transition_block {
validate_merge_block(&chain, block.message(), AllowOptimisticImport::Yes).await?;
};
// The specification declares that this should be run *inside* `per_block_processing`,
// however we run it here to keep `per_block_processing` pure (i.e., no calls to external
// servers).
//
// It is important that this function is called *after* `per_slot_processing`, since the
// `randao` may change.
let payload_verification_status = payload_notifier.notify_new_payload().await?;
// If the payload did not validate or invalidate the block, check to see if this block is
// valid for optimistic import.
if payload_verification_status.is_optimistic() {
let block_hash_opt = block
.message()
.body()
.execution_payload()
.map(|full_payload| full_payload.execution_payload.block_hash);
// Ensure the block is a candidate for optimistic import.
if !is_optimistic_candidate_block(&chain, block.slot(), block.parent_root()).await?
{
warn!(
chain.log,
"Rejecting optimistic block";
"block_hash" => ?block_hash_opt,
"msg" => "the execution engine is not synced"
);
return Err(ExecutionPayloadError::UnverifiedNonOptimisticCandidate.into());
}
}
Ok(PayloadVerificationOutcome {
payload_verification_status,
is_valid_merge_transition_block,
})
};
// Spawn the payload verification future as a new task, but don't wait for it to complete.
// The `payload_verification_future` will be awaited later to ensure verification completed
// successfully.
let payload_verification_handle = chain
.task_executor
.spawn_handle(
payload_verification_future,
"execution_payload_verification",
)
.ok_or(BeaconChainError::RuntimeShutdown)?;
// If the block is sufficiently recent, notify the validator monitor. // If the block is sufficiently recent, notify the validator monitor.
if let Some(slot) = chain.slot_clock.now() { if let Some(slot) = chain.slot_clock.now() {
let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
@@ -1321,8 +1327,6 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
} }
} }
metrics::stop_timer(catchup_timer);
/* /*
* Build the committee caches on the state. * Build the committee caches on the state.
*/ */
@@ -1411,6 +1415,44 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
}); });
} }
/*
* Apply the block's attestations to fork choice.
*
* We're running in parallel with the payload verification at this point, so this is
* free real estate.
*/
let current_slot = chain.slot()?;
let mut fork_choice = chain.canonical_head.fork_choice_write_lock();
// Register each attester slashing in the block with fork choice.
for attester_slashing in block.message().body().attester_slashings() {
fork_choice.on_attester_slashing(attester_slashing);
}
// Register each attestation in the block with fork choice.
for (i, attestation) in block.message().body().attestations().iter().enumerate() {
let _fork_choice_attestation_timer =
metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_ATTESTATION_TIMES);
let indexed_attestation = consensus_context
.get_indexed_attestation(&state, attestation)
.map_err(|e| BlockError::PerBlockProcessingError(e.into_with_index(i)))?;
match fork_choice.on_attestation(
current_slot,
indexed_attestation,
AttestationFromBlock::True,
&chain.spec,
) {
Ok(()) => Ok(()),
// Ignore invalid attestations whilst importing attestations from a block. The
// block might be very old and therefore the attestations useless to fork choice.
Err(ForkChoiceError::InvalidAttestation(_)) => Ok(()),
Err(e) => Err(BlockError::BeaconChainError(e.into())),
}?;
}
drop(fork_choice);
Ok(Self { Ok(Self {
block, block,
block_root, block_root,
@@ -1418,6 +1460,7 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
parent_block: parent.beacon_block, parent_block: parent.beacon_block,
parent_eth1_finalization_data, parent_eth1_finalization_data,
confirmed_state_roots, confirmed_state_roots,
consensus_context,
payload_verification_handle, payload_verification_handle,
}) })
} }

View File

@@ -711,10 +711,14 @@ where
let validator_pubkey_cache = store.immutable_validators.clone(); let validator_pubkey_cache = store.immutable_validators.clone();
// Update pubkey cache on first start in case we have started from genesis. // Update pubkey cache on first start in case we have started from genesis.
validator_pubkey_cache let kv_store_ops = validator_pubkey_cache
.write() .write()
.import_new_pubkeys(&head_snapshot.beacon_state, &store) .import_new_pubkeys(&head_snapshot.beacon_state)
.map_err(|e| format!("error initializing pubkey cache: {e:?}"))?; .map_err(|e| format!("error initializing pubkey cache: {e:?}"))?;
store
.hot_db
.do_atomically(kv_store_ops)
.map_err(|e| format!("error writing validator store: {e:?}"))?;
let migrator_config = self.store_migrator_config.unwrap_or_default(); let migrator_config = self.store_migrator_config.unwrap_or_default();
let store_migrator = BackgroundMigrator::new( let store_migrator = BackgroundMigrator::new(

View File

@@ -69,6 +69,7 @@ impl<T: BeaconChainTypes> PayloadNotifier<T> {
// are cheap and doing them here ensures we protect the execution engine from junk. // are cheap and doing them here ensures we protect the execution engine from junk.
partially_verify_execution_payload( partially_verify_execution_payload(
state, state,
block.slot(),
block.message().execution_payload()?, block.message().execution_payload()?,
&chain.spec, &chain.spec,
) )
@@ -373,7 +374,8 @@ pub fn get_execution_payload<
let spec = &chain.spec; let spec = &chain.spec;
let current_epoch = state.current_epoch(); let current_epoch = state.current_epoch();
let is_merge_transition_complete = is_merge_transition_complete(state); let is_merge_transition_complete = is_merge_transition_complete(state);
let timestamp = compute_timestamp_at_slot(state, spec).map_err(BeaconStateError::from)?; let timestamp =
compute_timestamp_at_slot(state, state.slot(), spec).map_err(BeaconStateError::from)?;
let random = *state.get_randao_mix(current_epoch)?; let random = *state.get_randao_mix(current_epoch)?;
let latest_execution_payload_header_block_hash = let latest_execution_payload_header_block_hash =
state.latest_execution_payload_header()?.block_hash; state.latest_execution_payload_header()?.block_hash;

View File

@@ -60,6 +60,11 @@ lazy_static! {
"beacon_block_processing_state_root_seconds", "beacon_block_processing_state_root_seconds",
"Time spent calculating the state root when processing a block." "Time spent calculating the state root when processing a block."
); );
pub static ref BLOCK_PROCESSING_POST_EXEC_PROCESSING: Result<Histogram> = try_create_histogram_with_buckets(
"beacon_block_processing_post_exec_pre_attestable_seconds",
"Time between finishing execution processing and the block becoming attestable",
linear_buckets(5e-3, 5e-3, 10)
);
pub static ref BLOCK_PROCESSING_DB_WRITE: Result<Histogram> = try_create_histogram( pub static ref BLOCK_PROCESSING_DB_WRITE: Result<Histogram> = try_create_histogram(
"beacon_block_processing_db_write_seconds", "beacon_block_processing_db_write_seconds",
"Time spent writing a newly processed block and state to DB" "Time spent writing a newly processed block and state to DB"

View File

@@ -589,7 +589,7 @@ where
pub fn get_timestamp_at_slot(&self) -> u64 { pub fn get_timestamp_at_slot(&self) -> u64 {
let state = self.get_current_state(); let state = self.get_current_state();
compute_timestamp_at_slot(&state, &self.spec).unwrap() compute_timestamp_at_slot(&state, state.slot(), &self.spec).unwrap()
} }
pub fn get_current_state_and_root(&self) -> (BeaconState<E>, Hash256) { pub fn get_current_state_and_root(&self) -> (BeaconState<E>, Hash256) {

View File

@@ -14,6 +14,7 @@ use std::cmp::max;
use std::fmt::Debug; use std::fmt::Debug;
use std::fmt::Write; use std::fmt::Write;
use std::fs; use std::fs;
use std::net::Ipv6Addr;
use std::net::{IpAddr, Ipv4Addr, ToSocketAddrs}; use std::net::{IpAddr, Ipv4Addr, ToSocketAddrs};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::str::FromStr; use std::str::FromStr;
@@ -852,9 +853,11 @@ pub fn set_network_config(
} }
if cli_args.is_present("enr-match") { if cli_args.is_present("enr-match") {
// set the enr address to localhost if the address is 0.0.0.0 // set the enr address to localhost if the address is unspecified
if config.listen_address == "0.0.0.0".parse::<IpAddr>().expect("valid ip addr") { if config.listen_address == IpAddr::V4(Ipv4Addr::UNSPECIFIED) {
config.enr_address = Some("127.0.0.1".parse::<IpAddr>().expect("valid ip addr")); config.enr_address = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
} else if config.listen_address == IpAddr::V6(Ipv6Addr::UNSPECIFIED) {
config.enr_address = Some(IpAddr::V6(Ipv6Addr::LOCALHOST));
} else { } else {
config.enr_address = Some(config.listen_address); config.enr_address = Some(config.listen_address);
} }

View File

@@ -1,4 +1,4 @@
use crate::{DBColumn, Error, HotColdDB, ItemStore, StoreItem}; use crate::{DBColumn, Error, HotColdDB, ItemStore, KeyValueStoreOp, StoreItem};
use bls::PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN; use bls::PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN;
use smallvec::SmallVec; use smallvec::SmallVec;
use ssz::{Decode, Encode}; use ssz::{Decode, Encode};
@@ -45,8 +45,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Default
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> ValidatorPubkeyCache<E, Hot, Cold> { impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> ValidatorPubkeyCache<E, Hot, Cold> {
/// Create a new public key cache using the keys in `state.validators`. /// Create a new public key cache using the keys in `state.validators`.
/// ///
/// Also creates a new persistence file, returning an error if there is already a file at /// The new cache will be updated with the keys from `state` and immediately written to disk.
/// `persistence_path`.
pub fn new(state: &BeaconState<E>, store: &HotColdDB<E, Hot, Cold>) -> Result<Self, Error> { pub fn new(state: &BeaconState<E>, store: &HotColdDB<E, Hot, Cold>) -> Result<Self, Error> {
let mut cache = Self { let mut cache = Self {
pubkeys: vec![], pubkeys: vec![],
@@ -55,7 +54,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> ValidatorPubkeyCache<E,
_phantom: PhantomData, _phantom: PhantomData,
}; };
cache.import_new_pubkeys(state, store)?; let store_ops = cache.import_new_pubkeys(state)?;
store.hot_db.do_atomically(store_ops)?;
Ok(cache) Ok(cache)
} }
@@ -91,26 +91,26 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> ValidatorPubkeyCache<E,
/// Scan the given `state` and add any new validator public keys. /// Scan the given `state` and add any new validator public keys.
/// ///
/// Does not delete any keys from `self` if they don't appear in `state`. /// Does not delete any keys from `self` if they don't appear in `state`.
///
/// NOTE: The caller *must* commit the returned I/O batch as part of the block import process.
pub fn import_new_pubkeys( pub fn import_new_pubkeys(
&mut self, &mut self,
state: &BeaconState<E>, state: &BeaconState<E>,
store: &HotColdDB<E, Hot, Cold>, ) -> Result<Vec<KeyValueStoreOp>, Error> {
) -> Result<(), Error> {
if state.validators().len() > self.validators.len() { if state.validators().len() > self.validators.len() {
self.import( self.import(
state state
.validators() .validators()
.iter_from(self.pubkeys.len())? .iter_from(self.pubkeys.len())?
.map(|v| v.immutable.clone()), .map(|v| v.immutable.clone()),
store,
) )
} else { } else {
Ok(()) Ok(vec![])
} }
} }
/// Adds zero or more validators to `self`. /// Adds zero or more validators to `self`.
fn import<I>(&mut self, validator_keys: I, store: &HotColdDB<E, Hot, Cold>) -> Result<(), Error> fn import<I>(&mut self, validator_keys: I) -> Result<Vec<KeyValueStoreOp>, Error>
where where
I: Iterator<Item = Arc<ValidatorImmutable>> + ExactSizeIterator, I: Iterator<Item = Arc<ValidatorImmutable>> + ExactSizeIterator,
{ {
@@ -118,6 +118,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> ValidatorPubkeyCache<E,
self.pubkeys.reserve(validator_keys.len()); self.pubkeys.reserve(validator_keys.len());
self.indices.reserve(validator_keys.len()); self.indices.reserve(validator_keys.len());
let mut store_ops = Vec::with_capacity(validator_keys.len());
for validator in validator_keys { for validator in validator_keys {
let i = self.pubkeys.len(); let i = self.pubkeys.len();
@@ -129,26 +130,21 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> ValidatorPubkeyCache<E,
.try_into() .try_into()
.map_err(Error::InvalidValidatorPubkeyBytes)?; .map_err(Error::InvalidValidatorPubkeyBytes)?;
// The item is written to disk _before_ it is written into // Stage the new validator key for writing to disk.
// the local struct. // It will be committed atomically when the block that introduced it is written to disk.
// // Notably it is NOT written while the write lock on the cache is held.
// This means that a pubkey cache read from disk will always be equivalent to or // See: https://github.com/sigp/lighthouse/issues/2327
// _later than_ the cache that was running in the previous instance of Lighthouse. store_ops.push(
// DatabaseValidator::from_immutable_validator(&pubkey, &validator)
// The motivation behind this ordering is that we do not want to have states that .as_kv_store_op(DatabaseValidator::key_for_index(i))?,
// reference a pubkey that is not in our cache. However, it's fine to have pubkeys );
// that are never referenced in a state.
store.put_item(
&DatabaseValidator::key_for_index(i),
&DatabaseValidator::from_immutable_validator(&pubkey, &validator),
)?;
self.pubkeys.push(pubkey); self.pubkeys.push(pubkey);
self.indices.insert(validator.pubkey, i); self.indices.insert(validator.pubkey, i);
self.validators.push(validator); self.validators.push(validator);
} }
Ok(()) Ok(store_ops)
} }
/// Get the public key for a validator with index `i`. /// Get the public key for a validator with index `i`.
@@ -345,9 +341,10 @@ mod test {
// Add some more keypairs. // Add some more keypairs.
let (state, keypairs) = get_state(12); let (state, keypairs) = get_state(12);
cache let ops = cache
.import_new_pubkeys(&state) .import_new_pubkeys(&state)
.expect("should import pubkeys"); .expect("should import pubkeys");
store.hot_db.do_atomically(ops).unwrap();
check_cache_get(&cache, &keypairs[..]); check_cache_get(&cache, &keypairs[..]);
drop(cache); drop(cache);

View File

@@ -1,9 +1,11 @@
use beacon_node::{get_data_dir, set_network_config}; use beacon_node::{get_data_dir, set_network_config};
use clap::ArgMatches; use clap::ArgMatches;
use eth2_network_config::Eth2NetworkConfig; use eth2_network_config::Eth2NetworkConfig;
use lighthouse_network::discv5::enr::EnrBuilder;
use lighthouse_network::discv5::IpMode;
use lighthouse_network::discv5::{enr::CombinedKey, Discv5Config, Enr}; use lighthouse_network::discv5::{enr::CombinedKey, Discv5Config, Enr};
use lighthouse_network::{ use lighthouse_network::{
discovery::{create_enr_builder_from_config, load_enr_from_disk, use_or_load_enr}, discovery::{load_enr_from_disk, use_or_load_enr},
load_private_key, CombinedKeyExt, NetworkConfig, load_private_key, CombinedKeyExt, NetworkConfig,
}; };
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
@@ -70,6 +72,15 @@ impl<T: EthSpec> BootNodeConfig<T> {
// the address to listen on // the address to listen on
let listen_socket = let listen_socket =
SocketAddr::new(network_config.listen_address, network_config.discovery_port); SocketAddr::new(network_config.listen_address, network_config.discovery_port);
if listen_socket.is_ipv6() {
// create ipv6 sockets and enable ipv4 mapped addresses.
network_config.discv5_config.ip_mode = IpMode::Ip6 {
enable_mapped_addresses: true,
};
} else {
// Set explicitly as ipv4 otherwise
network_config.discv5_config.ip_mode = IpMode::Ip4;
}
let private_key = load_private_key(&network_config, &logger); let private_key = load_private_key(&network_config, &logger);
let local_key = CombinedKey::from_libp2p(&private_key)?; let local_key = CombinedKey::from_libp2p(&private_key)?;
@@ -104,7 +115,29 @@ impl<T: EthSpec> BootNodeConfig<T> {
// Build the local ENR // Build the local ENR
let mut local_enr = { let mut local_enr = {
let mut builder = create_enr_builder_from_config(&network_config, false); let mut builder = EnrBuilder::new("v4");
// Set the enr address if specified. Set also the port.
// NOTE: if the port is specified but the the address is not, the port won't be
// set since it can't be known if it's an ipv6 or ipv4 udp port.
if let Some(enr_address) = network_config.enr_address {
match enr_address {
std::net::IpAddr::V4(ipv4_addr) => {
builder.ip4(ipv4_addr);
if let Some(port) = network_config.enr_udp_port {
builder.udp4(port);
}
}
std::net::IpAddr::V6(ipv6_addr) => {
builder.ip6(ipv6_addr);
if let Some(port) = network_config.enr_udp_port {
builder.udp6(port);
// We are enabling mapped addresses in the boot node in this case,
// so advertise an udp4 port as well.
builder.udp4(port);
}
}
}
};
// If we know of the ENR field, add it to the initial construction // If we know of the ENR field, add it to the initial construction
if let Some(enr_fork_bytes) = enr_fork { if let Some(enr_fork_bytes) = enr_fork {

View File

@@ -9,53 +9,63 @@ use slog::info;
use types::EthSpec; use types::EthSpec;
pub async fn run<T: EthSpec>(config: BootNodeConfig<T>, log: slog::Logger) { pub async fn run<T: EthSpec>(config: BootNodeConfig<T>, log: slog::Logger) {
let BootNodeConfig {
listen_socket,
boot_nodes,
local_enr,
local_key,
discv5_config,
..
} = config;
// Print out useful information about the generated ENR // Print out useful information about the generated ENR
let enr_socket = config let enr_v4_socket = local_enr.udp4_socket();
.local_enr let enr_v6_socket = local_enr.udp6_socket();
.udp4_socket() let eth2_field = local_enr
.expect("Enr has a UDP socket");
let eth2_field = config
.local_enr
.eth2() .eth2()
.map(|fork_id| hex::encode(fork_id.fork_digest)) .map(|fork_id| hex::encode(fork_id.fork_digest))
.unwrap_or_default(); .unwrap_or_default();
info!(log, "Configuration parameters"; "listening_address" => format!("{}:{}", config.listen_socket.ip(), config.listen_socket.port()), "broadcast_address" => format!("{}:{}",enr_socket.ip(), enr_socket.port()), "eth2" => eth2_field); let pretty_v4_socket = enr_v4_socket.as_ref().map(|addr| addr.to_string());
let pretty_v6_socket = enr_v6_socket.as_ref().map(|addr| addr.to_string());
info!(
log, "Configuration parameters";
"listening_address" => %listen_socket,
"advertised_v4_address" => ?pretty_v4_socket,
"advertised_v6_address" => ?pretty_v6_socket,
"eth2" => eth2_field
);
info!(log, "Identity established"; "peer_id" => config.local_enr.peer_id().to_string(), "node_id" => config.local_enr.node_id().to_string()); info!(log, "Identity established"; "peer_id" => %local_enr.peer_id(), "node_id" => %local_enr.node_id());
// build the contactable multiaddr list, adding the p2p protocol // build the contactable multiaddr list, adding the p2p protocol
info!(log, "Contact information"; "enr" => config.local_enr.to_base64()); info!(log, "Contact information"; "enr" => local_enr.to_base64());
info!(log, "Contact information"; "multiaddrs" => format!("{:?}", config.local_enr.multiaddr_p2p())); info!(log, "Contact information"; "multiaddrs" => ?local_enr.multiaddr_p2p());
// construct the discv5 server // construct the discv5 server
let mut discv5 = Discv5::new( let mut discv5 = Discv5::new(local_enr.clone(), local_key, discv5_config).unwrap();
config.local_enr.clone(),
config.local_key,
config.discv5_config,
)
.unwrap();
// If there are any bootnodes add them to the routing table // If there are any bootnodes add them to the routing table
for enr in config.boot_nodes { for enr in boot_nodes {
info!( info!(
log, log,
"Adding bootnode"; "Adding bootnode";
"address" => ?enr.udp4_socket(), "ipv4_address" => ?enr.udp4_socket(),
"peer_id" => enr.peer_id().to_string(), "ipv6_address" => ?enr.udp6_socket(),
"node_id" => enr.node_id().to_string() "peer_id" => ?enr.peer_id(),
"node_id" => ?enr.node_id()
); );
if enr != config.local_enr { if enr != local_enr {
if let Err(e) = discv5.add_enr(enr) { if let Err(e) = discv5.add_enr(enr) {
slog::warn!(log, "Failed adding ENR"; "error" => e.to_string()); slog::warn!(log, "Failed adding ENR"; "error" => ?e);
} }
} }
} }
// start the server // start the server
if let Err(e) = discv5.start(config.listen_socket).await { if let Err(e) = discv5.start(listen_socket).await {
slog::crit!(log, "Could not start discv5 server"; "error" => e.to_string()); slog::crit!(log, "Could not start discv5 server"; "error" => %e);
return; return;
} }
@@ -72,7 +82,7 @@ pub async fn run<T: EthSpec>(config: BootNodeConfig<T>, log: slog::Logger) {
let mut event_stream = match discv5.event_stream().await { let mut event_stream = match discv5.event_stream().await {
Ok(stream) => stream, Ok(stream) => stream,
Err(e) => { Err(e) => {
slog::crit!(log, "Failed to obtain event stream"; "error" => e.to_string()); slog::crit!(log, "Failed to obtain event stream"; "error" => %e);
return; return;
} }
}; };
@@ -81,9 +91,35 @@ pub async fn run<T: EthSpec>(config: BootNodeConfig<T>, log: slog::Logger) {
loop { loop {
tokio::select! { tokio::select! {
_ = metric_interval.tick() => { _ = metric_interval.tick() => {
// Get some ipv4/ipv6 stats to add in the metrics.
let mut ipv4_only_reachable: usize = 0;
let mut ipv6_only_reachable: usize= 0;
let mut ipv4_ipv6_reachable: usize = 0;
let mut unreachable_nodes: usize = 0;
for enr in discv5.kbuckets().iter_ref().filter_map(|entry| entry.status.is_connected().then_some(entry.node.value)) {
let declares_ipv4 = enr.udp4_socket().is_some();
let declares_ipv6 = enr.udp6_socket().is_some();
match (declares_ipv4, declares_ipv6) {
(true, true) => ipv4_ipv6_reachable += 1,
(true, false) => ipv4_only_reachable += 1,
(false, true) => ipv6_only_reachable += 1,
(false, false) => unreachable_nodes += 1,
}
}
// display server metrics // display server metrics
let metrics = discv5.metrics(); let metrics = discv5.metrics();
info!(log, "Server metrics"; "connected_peers" => discv5.connected_peers(), "active_sessions" => metrics.active_sessions, "requests/s" => format!("{:.2}", metrics.unsolicited_requests_per_second)); info!(
log, "Server metrics";
"connected_peers" => discv5.connected_peers(),
"active_sessions" => metrics.active_sessions,
"requests/s" => format_args!("{:.2}", metrics.unsolicited_requests_per_second),
"ipv4_nodes" => ipv4_only_reachable,
"ipv6_nodes" => ipv6_only_reachable,
"ipv6_and_ipv4_nodes" => ipv4_ipv6_reachable,
"unreachable_nodes" => unreachable_nodes,
);
} }
Some(event) = event_stream.recv() => { Some(event) = event_stream.recv() => {
match event { match event {
@@ -95,7 +131,7 @@ pub async fn run<T: EthSpec>(config: BootNodeConfig<T>, log: slog::Logger) {
Discv5Event::TalkRequest(_) => {} // Ignore Discv5Event::TalkRequest(_) => {} // Ignore
Discv5Event::NodeInserted { .. } => {} // Ignore Discv5Event::NodeInserted { .. } => {} // Ignore
Discv5Event::SocketUpdated(socket_addr) => { Discv5Event::SocketUpdated(socket_addr) => {
info!(log, "External socket address updated"; "socket_addr" => format!("{:?}", socket_addr)); info!(log, "Advertised socket address updated"; "socket_addr" => %socket_addr);
} }
Discv5Event::SessionEstablished{ .. } => {} // Ignore Discv5Event::SessionEstablished{ .. } => {} // Ignore
} }

View File

@@ -63,14 +63,39 @@ impl<T: EthSpec> ConsensusContext<T> {
self self
} }
// FIXME(sproul): extra safety checks? /// Strict method for fetching the proposer index.
///
/// Gets the proposer index for `self.slot` while ensuring that it matches `state.slot()`. This
/// method should be used in block processing and almost everywhere the proposer index is
/// required. If the slot check is too restrictive, see `get_proposer_index_from_epoch_state`.
pub fn get_proposer_index( pub fn get_proposer_index(
&mut self, &mut self,
state: &BeaconState<T>, state: &BeaconState<T>,
spec: &ChainSpec, spec: &ChainSpec,
) -> Result<u64, ContextError> { ) -> Result<u64, ContextError> {
self.check_epoch(state.current_epoch())?; self.check_slot(state.slot())?;
self.get_proposer_index_no_checks(state, spec)
}
/// More liberal method for fetching the proposer index.
///
/// Fetches the proposer index for `self.slot` but does not require the state to be from an
/// exactly matching slot (merely a matching epoch). This is useful in batch verification where
/// we want to extract the proposer index from a single state for every slot in the epoch.
pub fn get_proposer_index_from_epoch_state(
&mut self,
state: &BeaconState<T>,
spec: &ChainSpec,
) -> Result<u64, ContextError> {
self.check_epoch(state.current_epoch())?;
self.get_proposer_index_no_checks(state, spec)
}
fn get_proposer_index_no_checks(
&mut self,
state: &BeaconState<T>,
spec: &ChainSpec,
) -> Result<u64, ContextError> {
if let Some(proposer_index) = self.proposer_index { if let Some(proposer_index) = self.proposer_index {
return Ok(proposer_index); return Ok(proposer_index);
} }

View File

@@ -336,6 +336,7 @@ pub fn get_new_eth1_data<T: EthSpec>(
/// https://github.com/ethereum/consensus-specs/blob/v1.1.5/specs/merge/beacon-chain.md#process_execution_payload /// https://github.com/ethereum/consensus-specs/blob/v1.1.5/specs/merge/beacon-chain.md#process_execution_payload
pub fn partially_verify_execution_payload<T: EthSpec, Payload: ExecPayload<T>>( pub fn partially_verify_execution_payload<T: EthSpec, Payload: ExecPayload<T>>(
state: &BeaconState<T>, state: &BeaconState<T>,
block_slot: Slot,
payload: &Payload, payload: &Payload,
spec: &ChainSpec, spec: &ChainSpec,
) -> Result<(), BlockProcessingError> { ) -> Result<(), BlockProcessingError> {
@@ -356,7 +357,7 @@ pub fn partially_verify_execution_payload<T: EthSpec, Payload: ExecPayload<T>>(
} }
); );
let timestamp = compute_timestamp_at_slot(state, spec)?; let timestamp = compute_timestamp_at_slot(state, block_slot, spec)?;
block_verify!( block_verify!(
payload.timestamp() == timestamp, payload.timestamp() == timestamp,
BlockProcessingError::ExecutionInvalidTimestamp { BlockProcessingError::ExecutionInvalidTimestamp {
@@ -380,7 +381,7 @@ pub fn process_execution_payload<T: EthSpec, Payload: ExecPayload<T>>(
payload: &Payload, payload: &Payload,
spec: &ChainSpec, spec: &ChainSpec,
) -> Result<(), BlockProcessingError> { ) -> Result<(), BlockProcessingError> {
partially_verify_execution_payload(state, payload, spec)?; partially_verify_execution_payload(state, state.slot(), payload, spec)?;
*state.latest_execution_payload_header_mut()? = payload.to_execution_payload_header(); *state.latest_execution_payload_header_mut()? = payload.to_execution_payload_header();
@@ -417,9 +418,10 @@ pub fn is_execution_enabled<T: EthSpec, Payload: ExecPayload<T>>(
/// https://github.com/ethereum/consensus-specs/blob/dev/specs/merge/beacon-chain.md#compute_timestamp_at_slot /// https://github.com/ethereum/consensus-specs/blob/dev/specs/merge/beacon-chain.md#compute_timestamp_at_slot
pub fn compute_timestamp_at_slot<T: EthSpec>( pub fn compute_timestamp_at_slot<T: EthSpec>(
state: &BeaconState<T>, state: &BeaconState<T>,
block_slot: Slot,
spec: &ChainSpec, spec: &ChainSpec,
) -> Result<u64, ArithError> { ) -> Result<u64, ArithError> {
let slots_since_genesis = state.slot().as_u64().safe_sub(spec.genesis_slot.as_u64())?; let slots_since_genesis = block_slot.as_u64().safe_sub(spec.genesis_slot.as_u64())?;
slots_since_genesis slots_since_genesis
.safe_mul(spec.seconds_per_slot) .safe_mul(spec.seconds_per_slot)
.and_then(|since_genesis| state.genesis_time().safe_add(since_genesis)) .and_then(|since_genesis| state.genesis_time().safe_add(since_genesis))

View File

@@ -144,7 +144,8 @@ where
ctxt: &mut ConsensusContext<T>, ctxt: &mut ConsensusContext<T>,
) -> Result<()> { ) -> Result<()> {
let block_root = Some(ctxt.get_current_block_root(block)?); let block_root = Some(ctxt.get_current_block_root(block)?);
let verified_proposer_index = Some(ctxt.get_proposer_index(self.state, self.spec)?); let verified_proposer_index =
Some(ctxt.get_proposer_index_from_epoch_state(self.state, self.spec)?);
self.include_block_proposal(block, block_root, verified_proposer_index)?; self.include_block_proposal(block, block_root, verified_proposer_index)?;
self.include_all_signatures_except_proposal(block, ctxt)?; self.include_all_signatures_except_proposal(block, ctxt)?;
@@ -159,7 +160,8 @@ where
block: &'a SignedBeaconBlock<T, Payload>, block: &'a SignedBeaconBlock<T, Payload>,
ctxt: &mut ConsensusContext<T>, ctxt: &mut ConsensusContext<T>,
) -> Result<()> { ) -> Result<()> {
let verified_proposer_index = Some(ctxt.get_proposer_index(self.state, self.spec)?); let verified_proposer_index =
Some(ctxt.get_proposer_index_from_epoch_state(self.state, self.spec)?);
self.include_randao_reveal(block, verified_proposer_index)?; self.include_randao_reveal(block, verified_proposer_index)?;
self.include_proposer_slashings(block)?; self.include_proposer_slashings(block)?;
self.include_attester_slashings(block)?; self.include_attester_slashings(block)?;
@@ -281,7 +283,7 @@ where
.attestations() .attestations()
.iter() .iter()
.try_for_each(|attestation| { .try_for_each(|attestation| {
let indexed_attestation = ctxt.get_indexed_attestation(&self.state, attestation)?; let indexed_attestation = ctxt.get_indexed_attestation(self.state, attestation)?;
self.sets.push(indexed_attestation_signature_set( self.sets.push(indexed_attestation_signature_set(
self.state, self.state,

View File

@@ -398,7 +398,6 @@ fn do_transition<T: EthSpec>(
} }
let t = Instant::now(); let t = Instant::now();
per_block_processing( per_block_processing(
&mut pre_state, &mut pre_state,
&block, &block,

View File

@@ -80,7 +80,6 @@ impl<E: EthSpec> Operation<E> for Attestation<E> {
_: &Operations<E, Self>, _: &Operations<E, Self>,
) -> Result<(), BlockProcessingError> { ) -> Result<(), BlockProcessingError> {
let mut ctxt = ConsensusContext::new(state.slot()); let mut ctxt = ConsensusContext::new(state.slot());
let proposer_index = ctxt.get_proposer_index(state, spec)?;
match state { match state {
BeaconState::Base(_) => base::process_attestations( BeaconState::Base(_) => base::process_attestations(
state, state,
@@ -89,14 +88,9 @@ impl<E: EthSpec> Operation<E> for Attestation<E> {
&mut ctxt, &mut ctxt,
spec, spec,
), ),
BeaconState::Altair(_) | BeaconState::Merge(_) => altair::process_attestation( BeaconState::Altair(_) | BeaconState::Merge(_) => {
state, altair::process_attestation(state, self, 0, &mut ctxt, VerifySignatures::True, spec)
self, }
0,
proposer_index,
VerifySignatures::True,
spec,
),
} }
} }
} }