mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-30 11:13:34 +00:00
Merge remote-tracking branch 'eitan/gloas-payload-processing' into gloas-replay-blocks
This commit is contained in:
@@ -4,9 +4,7 @@ use crate::attestation_verification::{
|
||||
batch_verify_unaggregated_attestations,
|
||||
};
|
||||
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckCaches};
|
||||
use crate::beacon_proposer_cache::{
|
||||
BeaconProposerCache, EpochBlockProposers, ensure_state_can_determine_proposers_for_epoch,
|
||||
};
|
||||
use crate::beacon_proposer_cache::{BeaconProposerCache, EpochBlockProposers};
|
||||
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
|
||||
use crate::block_times_cache::BlockTimesCache;
|
||||
use crate::block_verification::POS_PANDA_BANNER;
|
||||
@@ -27,6 +25,7 @@ use crate::data_availability_checker::{
|
||||
};
|
||||
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
|
||||
use crate::early_attester_cache::EarlyAttesterCache;
|
||||
use crate::envelope_times_cache::EnvelopeTimesCache;
|
||||
use crate::errors::{BeaconChainError as Error, BlockProductionError};
|
||||
use crate::events::ServerSentEventHandler;
|
||||
use crate::execution_payload::{NotifyExecutionLayer, PreparePayloadHandle, get_execution_payload};
|
||||
@@ -463,6 +462,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
|
||||
pub early_attester_cache: EarlyAttesterCache<T::EthSpec>,
|
||||
/// A cache used to keep track of various block timings.
|
||||
pub block_times_cache: Arc<RwLock<BlockTimesCache>>,
|
||||
/// A cache used to keep track of various envelope timings.
|
||||
pub envelope_times_cache: Arc<RwLock<EnvelopeTimesCache>>,
|
||||
/// A cache used to track pre-finalization block roots for quick rejection.
|
||||
pub pre_finalization_block_cache: PreFinalizationBlockCache,
|
||||
/// A cache used to produce light_client server messages
|
||||
@@ -1151,6 +1152,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the full block at the given root, if it's available in the database.
|
||||
///
|
||||
/// Should always return a full block for pre-merge and post-gloas blocks.
|
||||
pub fn get_full_block(
|
||||
&self,
|
||||
block_root: &Hash256,
|
||||
) -> Result<Option<SignedBeaconBlock<T::EthSpec>>, Error> {
|
||||
match self.store.try_get_full_block(block_root)? {
|
||||
Some(DatabaseBlock::Full(block)) => Ok(Some(block)),
|
||||
Some(DatabaseBlock::Blinded(_)) => {
|
||||
// TODO(gloas) should we return None here?
|
||||
Ok(None)
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the block at the given root, if any.
|
||||
///
|
||||
/// ## Errors
|
||||
@@ -4169,7 +4187,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}
|
||||
|
||||
/// Check block's consistentency with any configured weak subjectivity checkpoint.
|
||||
fn check_block_against_weak_subjectivity_checkpoint(
|
||||
pub(crate) fn check_block_against_weak_subjectivity_checkpoint(
|
||||
&self,
|
||||
block: BeaconBlockRef<T::EthSpec>,
|
||||
block_root: Hash256,
|
||||
@@ -6476,6 +6494,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
// sync anyway).
|
||||
self.naive_aggregation_pool.write().prune(slot);
|
||||
self.block_times_cache.write().prune(slot);
|
||||
self.envelope_times_cache.write().prune(slot);
|
||||
|
||||
// Don't run heavy-weight tasks during sync.
|
||||
if self.best_slot() + MAX_PER_SLOT_FORK_CHOICE_DISTANCE < slot {
|
||||
@@ -6535,62 +6554,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
accessor: impl Fn(&EpochBlockProposers) -> Result<V, BeaconChainError>,
|
||||
state_provider: impl FnOnce() -> Result<(Hash256, BeaconState<T::EthSpec>), E>,
|
||||
) -> Result<V, E> {
|
||||
let cache_entry = self
|
||||
.beacon_proposer_cache
|
||||
.lock()
|
||||
.get_or_insert_key(proposal_epoch, shuffling_decision_block);
|
||||
|
||||
// If the cache entry is not initialised, run the code to initialise it inside a OnceCell.
|
||||
// This prevents duplication of work across multiple threads.
|
||||
//
|
||||
// If it is already initialised, then `get_or_try_init` will return immediately without
|
||||
// executing the initialisation code at all.
|
||||
let epoch_block_proposers = cache_entry.get_or_try_init(|| {
|
||||
// Fetch the state on-demand if the required epoch was missing from the cache.
|
||||
// If the caller wants to not compute the state they must return an error here and then
|
||||
// catch it at the call site.
|
||||
let (state_root, mut state) = state_provider()?;
|
||||
|
||||
// Ensure the state can compute proposer duties for `epoch`.
|
||||
ensure_state_can_determine_proposers_for_epoch(
|
||||
&mut state,
|
||||
state_root,
|
||||
proposal_epoch,
|
||||
&self.spec,
|
||||
)?;
|
||||
|
||||
// Sanity check the state.
|
||||
let latest_block_root = state.get_latest_block_root(state_root);
|
||||
let state_decision_block_root = state.proposer_shuffling_decision_root_at_epoch(
|
||||
proposal_epoch,
|
||||
latest_block_root,
|
||||
&self.spec,
|
||||
)?;
|
||||
if state_decision_block_root != shuffling_decision_block {
|
||||
return Err(Error::ProposerCacheIncorrectState {
|
||||
state_decision_block_root,
|
||||
requested_decision_block_root: shuffling_decision_block,
|
||||
}
|
||||
.into());
|
||||
}
|
||||
|
||||
let proposers = state.get_beacon_proposer_indices(proposal_epoch, &self.spec)?;
|
||||
|
||||
// Use fork_at_epoch rather than the state's fork, because post-Fulu we may not have
|
||||
// advanced the state completely into the new epoch.
|
||||
let fork = self.spec.fork_at_epoch(proposal_epoch);
|
||||
|
||||
debug!(
|
||||
?shuffling_decision_block,
|
||||
epoch = %proposal_epoch,
|
||||
"Priming proposer shuffling cache"
|
||||
);
|
||||
|
||||
Ok::<_, E>(EpochBlockProposers::new(proposal_epoch, fork, proposers))
|
||||
})?;
|
||||
|
||||
// Run the accessor function on the computed epoch proposers.
|
||||
accessor(epoch_block_proposers).map_err(Into::into)
|
||||
crate::beacon_proposer_cache::with_proposer_cache(
|
||||
&self.beacon_proposer_cache,
|
||||
&self.spec,
|
||||
shuffling_decision_block,
|
||||
proposal_epoch,
|
||||
accessor,
|
||||
state_provider,
|
||||
)
|
||||
}
|
||||
|
||||
/// Runs the `map_fn` with the committee cache for `shuffling_epoch` from the chain with head
|
||||
|
||||
@@ -12,12 +12,13 @@ use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
|
||||
use fork_choice::ExecutionStatus;
|
||||
use lru::LruCache;
|
||||
use once_cell::sync::OnceCell;
|
||||
use parking_lot::Mutex;
|
||||
use safe_arith::SafeArith;
|
||||
use smallvec::SmallVec;
|
||||
use state_processing::state_advance::partial_state_advance;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
use tracing::instrument;
|
||||
use tracing::{debug, instrument};
|
||||
use typenum::Unsigned;
|
||||
use types::new_non_zero_usize;
|
||||
use types::{BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, Hash256, Slot};
|
||||
@@ -164,6 +165,82 @@ impl BeaconProposerCache {
|
||||
}
|
||||
}
|
||||
|
||||
/// Access the proposer cache, computing and caching the proposers if necessary.
|
||||
///
|
||||
/// This is a free function that operates on references to the cache and spec, decoupled from
|
||||
/// `BeaconChain`. The `accessor` is called with the cached `EpochBlockProposers` for the given
|
||||
/// `(proposal_epoch, shuffling_decision_block)` key. If the cache entry is missing, the
|
||||
/// `state_provider` closure is called to produce a state which is then used to compute and
|
||||
/// cache the proposers.
|
||||
pub fn with_proposer_cache<Spec, V, Err>(
|
||||
beacon_proposer_cache: &Mutex<BeaconProposerCache>,
|
||||
spec: &ChainSpec,
|
||||
shuffling_decision_block: Hash256,
|
||||
proposal_epoch: Epoch,
|
||||
accessor: impl Fn(&EpochBlockProposers) -> Result<V, BeaconChainError>,
|
||||
state_provider: impl FnOnce() -> Result<(Hash256, BeaconState<Spec>), Err>,
|
||||
) -> Result<V, Err>
|
||||
where
|
||||
Spec: EthSpec,
|
||||
Err: From<BeaconChainError> + From<BeaconStateError>,
|
||||
{
|
||||
let cache_entry = beacon_proposer_cache
|
||||
.lock()
|
||||
.get_or_insert_key(proposal_epoch, shuffling_decision_block);
|
||||
|
||||
// If the cache entry is not initialised, run the code to initialise it inside a OnceCell.
|
||||
// This prevents duplication of work across multiple threads.
|
||||
//
|
||||
// If it is already initialised, then `get_or_try_init` will return immediately without
|
||||
// executing the initialisation code at all.
|
||||
let epoch_block_proposers = cache_entry.get_or_try_init(|| {
|
||||
// Fetch the state on-demand if the required epoch was missing from the cache.
|
||||
// If the caller wants to not compute the state they must return an error here and then
|
||||
// catch it at the call site.
|
||||
let (state_root, mut state) = state_provider()?;
|
||||
|
||||
// Ensure the state can compute proposer duties for `epoch`.
|
||||
ensure_state_can_determine_proposers_for_epoch(
|
||||
&mut state,
|
||||
state_root,
|
||||
proposal_epoch,
|
||||
spec,
|
||||
)?;
|
||||
|
||||
// Sanity check the state.
|
||||
let latest_block_root = state.get_latest_block_root(state_root);
|
||||
let state_decision_block_root = state.proposer_shuffling_decision_root_at_epoch(
|
||||
proposal_epoch,
|
||||
latest_block_root,
|
||||
spec,
|
||||
)?;
|
||||
if state_decision_block_root != shuffling_decision_block {
|
||||
return Err(BeaconChainError::ProposerCacheIncorrectState {
|
||||
state_decision_block_root,
|
||||
requested_decision_block_root: shuffling_decision_block,
|
||||
}
|
||||
.into());
|
||||
}
|
||||
|
||||
let proposers = state.get_beacon_proposer_indices(proposal_epoch, spec)?;
|
||||
|
||||
// Use fork_at_epoch rather than the state's fork, because post-Fulu we may not have
|
||||
// advanced the state completely into the new epoch.
|
||||
let fork = spec.fork_at_epoch(proposal_epoch);
|
||||
|
||||
debug!(
|
||||
?shuffling_decision_block,
|
||||
epoch = %proposal_epoch,
|
||||
"Priming proposer shuffling cache"
|
||||
);
|
||||
|
||||
Ok::<_, Err>(EpochBlockProposers::new(proposal_epoch, fork, proposers))
|
||||
})?;
|
||||
|
||||
// Run the accessor function on the computed epoch proposers.
|
||||
accessor(epoch_block_proposers).map_err(Into::into)
|
||||
}
|
||||
|
||||
/// Compute the proposer duties using the head state without cache.
|
||||
///
|
||||
/// Return:
|
||||
|
||||
@@ -729,7 +729,8 @@ pub struct SignatureVerifiedBlock<T: BeaconChainTypes> {
|
||||
}
|
||||
|
||||
/// Used to await the result of executing payload with an EE.
|
||||
type PayloadVerificationHandle = JoinHandle<Option<Result<PayloadVerificationOutcome, BlockError>>>;
|
||||
pub type PayloadVerificationHandle =
|
||||
JoinHandle<Option<Result<PayloadVerificationOutcome, BlockError>>>;
|
||||
|
||||
/// A wrapper around a `SignedBeaconBlock` that indicates that this block is fully verified and
|
||||
/// ready to import into the `BeaconChain`. The validation includes:
|
||||
|
||||
@@ -1023,6 +1023,7 @@ where
|
||||
)),
|
||||
beacon_proposer_cache,
|
||||
block_times_cache: <_>::default(),
|
||||
envelope_times_cache: <_>::default(),
|
||||
pre_finalization_block_cache: <_>::default(),
|
||||
validator_pubkey_cache: RwLock::new(validator_pubkey_cache),
|
||||
early_attester_cache: <_>::default(),
|
||||
|
||||
197
beacon_node/beacon_chain/src/envelope_times_cache.rs
Normal file
197
beacon_node/beacon_chain/src/envelope_times_cache.rs
Normal file
@@ -0,0 +1,197 @@
|
||||
//! This module provides the `EnvelopeTimesCache` which contains information regarding payload
|
||||
//! envelope timings.
|
||||
//!
|
||||
//! This provides `BeaconChain` and associated functions with access to the timestamps of when a
|
||||
//! payload envelope was observed, verified, executed, and imported.
|
||||
//! This allows for better traceability and allows us to determine the root cause for why an
|
||||
//! envelope was imported late.
|
||||
//! This allows us to distinguish between the following scenarios:
|
||||
//! - The envelope was observed late.
|
||||
//! - Consensus verification was slow.
|
||||
//! - Execution verification was slow.
|
||||
//! - The DB write was slow.
|
||||
|
||||
use eth2::types::{Hash256, Slot};
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
|
||||
type BlockRoot = Hash256;
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub struct EnvelopeTimestamps {
|
||||
/// When the envelope was first observed (gossip or RPC).
|
||||
pub observed: Option<Duration>,
|
||||
/// When consensus verification (state transition) completed.
|
||||
pub consensus_verified: Option<Duration>,
|
||||
/// When execution layer verification started.
|
||||
pub started_execution: Option<Duration>,
|
||||
/// When execution layer verification completed.
|
||||
pub executed: Option<Duration>,
|
||||
/// When the envelope was imported into the DB.
|
||||
pub imported: Option<Duration>,
|
||||
}
|
||||
|
||||
/// Delay data for envelope processing, computed relative to the slot start time.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct EnvelopeDelays {
|
||||
/// Time after start of slot we saw the envelope.
|
||||
pub observed: Option<Duration>,
|
||||
/// The time it took to complete consensus verification of the envelope.
|
||||
pub consensus_verification_time: Option<Duration>,
|
||||
/// The time it took to complete execution verification of the envelope.
|
||||
pub execution_time: Option<Duration>,
|
||||
/// Time after execution until the envelope was imported.
|
||||
pub imported: Option<Duration>,
|
||||
}
|
||||
|
||||
impl EnvelopeDelays {
|
||||
fn new(times: EnvelopeTimestamps, slot_start_time: Duration) -> EnvelopeDelays {
|
||||
let observed = times
|
||||
.observed
|
||||
.and_then(|observed_time| observed_time.checked_sub(slot_start_time));
|
||||
let consensus_verification_time = times
|
||||
.consensus_verified
|
||||
.and_then(|consensus_verified| consensus_verified.checked_sub(times.observed?));
|
||||
let execution_time = times
|
||||
.executed
|
||||
.and_then(|executed| executed.checked_sub(times.started_execution?));
|
||||
let imported = times
|
||||
.imported
|
||||
.and_then(|imported_time| imported_time.checked_sub(times.executed?));
|
||||
EnvelopeDelays {
|
||||
observed,
|
||||
consensus_verification_time,
|
||||
execution_time,
|
||||
imported,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct EnvelopeTimesCacheValue {
|
||||
pub slot: Slot,
|
||||
pub timestamps: EnvelopeTimestamps,
|
||||
pub peer_id: Option<String>,
|
||||
}
|
||||
|
||||
impl EnvelopeTimesCacheValue {
|
||||
fn new(slot: Slot) -> Self {
|
||||
EnvelopeTimesCacheValue {
|
||||
slot,
|
||||
timestamps: Default::default(),
|
||||
peer_id: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct EnvelopeTimesCache {
|
||||
pub cache: HashMap<BlockRoot, EnvelopeTimesCacheValue>,
|
||||
}
|
||||
|
||||
impl EnvelopeTimesCache {
|
||||
/// Set the observation time for `block_root` to `timestamp` if `timestamp` is less than
|
||||
/// any previous timestamp at which this envelope was observed.
|
||||
pub fn set_time_observed(
|
||||
&mut self,
|
||||
block_root: BlockRoot,
|
||||
slot: Slot,
|
||||
timestamp: Duration,
|
||||
peer_id: Option<String>,
|
||||
) {
|
||||
let entry = self
|
||||
.cache
|
||||
.entry(block_root)
|
||||
.or_insert_with(|| EnvelopeTimesCacheValue::new(slot));
|
||||
match entry.timestamps.observed {
|
||||
Some(existing) if existing <= timestamp => {
|
||||
// Existing timestamp is earlier, do nothing.
|
||||
}
|
||||
_ => {
|
||||
entry.timestamps.observed = Some(timestamp);
|
||||
entry.peer_id = peer_id;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the timestamp for `field` if that timestamp is less than any previously known value.
|
||||
fn set_time_if_less(
|
||||
&mut self,
|
||||
block_root: BlockRoot,
|
||||
slot: Slot,
|
||||
field: impl Fn(&mut EnvelopeTimestamps) -> &mut Option<Duration>,
|
||||
timestamp: Duration,
|
||||
) {
|
||||
let entry = self
|
||||
.cache
|
||||
.entry(block_root)
|
||||
.or_insert_with(|| EnvelopeTimesCacheValue::new(slot));
|
||||
let existing_timestamp = field(&mut entry.timestamps);
|
||||
if existing_timestamp.is_none_or(|prev| timestamp < prev) {
|
||||
*existing_timestamp = Some(timestamp);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_time_consensus_verified(
|
||||
&mut self,
|
||||
block_root: BlockRoot,
|
||||
slot: Slot,
|
||||
timestamp: Duration,
|
||||
) {
|
||||
self.set_time_if_less(
|
||||
block_root,
|
||||
slot,
|
||||
|timestamps| &mut timestamps.consensus_verified,
|
||||
timestamp,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn set_time_started_execution(
|
||||
&mut self,
|
||||
block_root: BlockRoot,
|
||||
slot: Slot,
|
||||
timestamp: Duration,
|
||||
) {
|
||||
self.set_time_if_less(
|
||||
block_root,
|
||||
slot,
|
||||
|timestamps| &mut timestamps.started_execution,
|
||||
timestamp,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn set_time_executed(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
|
||||
self.set_time_if_less(
|
||||
block_root,
|
||||
slot,
|
||||
|timestamps| &mut timestamps.executed,
|
||||
timestamp,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn set_time_imported(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
|
||||
self.set_time_if_less(
|
||||
block_root,
|
||||
slot,
|
||||
|timestamps| &mut timestamps.imported,
|
||||
timestamp,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn get_envelope_delays(
|
||||
&self,
|
||||
block_root: BlockRoot,
|
||||
slot_start_time: Duration,
|
||||
) -> EnvelopeDelays {
|
||||
if let Some(entry) = self.cache.get(&block_root) {
|
||||
EnvelopeDelays::new(entry.timestamps.clone(), slot_start_time)
|
||||
} else {
|
||||
EnvelopeDelays::default()
|
||||
}
|
||||
}
|
||||
|
||||
/// Prune the cache to only store the most recent 2 epochs.
|
||||
pub fn prune(&mut self, current_slot: Slot) {
|
||||
self.cache
|
||||
.retain(|_, entry| entry.slot > current_slot.saturating_sub(64_u64));
|
||||
}
|
||||
}
|
||||
@@ -115,12 +115,18 @@ impl<T: BeaconChainTypes> PayloadNotifier<T> {
|
||||
if let Some(precomputed_status) = self.payload_verification_status {
|
||||
Ok(precomputed_status)
|
||||
} else {
|
||||
notify_new_payload(&self.chain, self.block.message()).await
|
||||
notify_new_payload(
|
||||
&self.chain,
|
||||
self.block.message().tree_hash_root(),
|
||||
self.block.message().slot(),
|
||||
self.block.message().try_into()?,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Verify that `execution_payload` contained by `block` is considered valid by an execution
|
||||
/// Verify that `execution_payload` associated with `beacon_block_root` is considered valid by an execution
|
||||
/// engine.
|
||||
///
|
||||
/// ## Specification
|
||||
@@ -129,17 +135,21 @@ impl<T: BeaconChainTypes> PayloadNotifier<T> {
|
||||
/// contains a few extra checks by running `partially_verify_execution_payload` first:
|
||||
///
|
||||
/// https://github.com/ethereum/consensus-specs/blob/v1.1.9/specs/bellatrix/beacon-chain.md#notify_new_payload
|
||||
async fn notify_new_payload<T: BeaconChainTypes>(
|
||||
pub async fn notify_new_payload<T: BeaconChainTypes>(
|
||||
chain: &Arc<BeaconChain<T>>,
|
||||
block: BeaconBlockRef<'_, T::EthSpec>,
|
||||
beacon_block_root: Hash256,
|
||||
slot: Slot,
|
||||
new_payload_request: NewPayloadRequest<'_, T::EthSpec>,
|
||||
) -> Result<PayloadVerificationStatus, BlockError> {
|
||||
let execution_layer = chain
|
||||
.execution_layer
|
||||
.as_ref()
|
||||
.ok_or(ExecutionPayloadError::NoExecutionConnection)?;
|
||||
|
||||
let execution_block_hash = block.execution_payload()?.block_hash();
|
||||
let new_payload_response = execution_layer.notify_new_payload(block.try_into()?).await;
|
||||
let execution_block_hash = new_payload_request.execution_payload_ref().block_hash();
|
||||
let new_payload_response = execution_layer
|
||||
.notify_new_payload(new_payload_request.clone())
|
||||
.await;
|
||||
|
||||
match new_payload_response {
|
||||
Ok(status) => match status {
|
||||
@@ -155,10 +165,8 @@ async fn notify_new_payload<T: BeaconChainTypes>(
|
||||
?validation_error,
|
||||
?latest_valid_hash,
|
||||
?execution_block_hash,
|
||||
root = ?block.tree_hash_root(),
|
||||
graffiti = block.body().graffiti().as_utf8_lossy(),
|
||||
proposer_index = block.proposer_index(),
|
||||
slot = %block.slot(),
|
||||
root = ?beacon_block_root,
|
||||
%slot,
|
||||
method = "new_payload",
|
||||
"Invalid execution payload"
|
||||
);
|
||||
@@ -181,11 +189,11 @@ async fn notify_new_payload<T: BeaconChainTypes>(
|
||||
{
|
||||
// This block has not yet been applied to fork choice, so the latest block that was
|
||||
// imported to fork choice was the parent.
|
||||
let latest_root = block.parent_root();
|
||||
let latest_root = new_payload_request.parent_beacon_block_root()?;
|
||||
|
||||
chain
|
||||
.process_invalid_execution_payload(&InvalidationOperation::InvalidateMany {
|
||||
head_block_root: latest_root,
|
||||
head_block_root: *latest_root,
|
||||
always_invalidate_head: false,
|
||||
latest_valid_ancestor: latest_valid_hash,
|
||||
})
|
||||
@@ -200,10 +208,8 @@ async fn notify_new_payload<T: BeaconChainTypes>(
|
||||
warn!(
|
||||
?validation_error,
|
||||
?execution_block_hash,
|
||||
root = ?block.tree_hash_root(),
|
||||
graffiti = block.body().graffiti().as_utf8_lossy(),
|
||||
proposer_index = block.proposer_index(),
|
||||
slot = %block.slot(),
|
||||
root = ?beacon_block_root,
|
||||
%slot,
|
||||
method = "new_payload",
|
||||
"Invalid execution payload block hash"
|
||||
);
|
||||
|
||||
@@ -21,6 +21,7 @@ pub mod custody_context;
|
||||
pub mod data_availability_checker;
|
||||
pub mod data_column_verification;
|
||||
mod early_attester_cache;
|
||||
pub mod envelope_times_cache;
|
||||
mod errors;
|
||||
pub mod events;
|
||||
pub mod execution_payload;
|
||||
@@ -42,6 +43,7 @@ pub mod observed_block_producers;
|
||||
pub mod observed_data_sidecars;
|
||||
pub mod observed_operations;
|
||||
mod observed_slashable;
|
||||
pub mod payload_envelope_verification;
|
||||
pub mod pending_payload_envelopes;
|
||||
pub mod persisted_beacon_chain;
|
||||
pub mod persisted_custody;
|
||||
|
||||
@@ -21,6 +21,44 @@ pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_HIT_TOTAL: &st
|
||||
pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_MISS_TOTAL: &str =
|
||||
"validator_monitor_attestation_simulator_source_attester_miss_total";
|
||||
|
||||
/*
|
||||
* Execution Payload Envelope Procsesing
|
||||
*/
|
||||
|
||||
pub static ENVELOPE_PROCESSING_REQUESTS: LazyLock<Result<IntCounter>> = LazyLock::new(|| {
|
||||
try_create_int_counter(
|
||||
"payload_envelope_processing_requests_total",
|
||||
"Count of payload envelopes submitted for processing",
|
||||
)
|
||||
});
|
||||
pub static ENVELOPE_PROCESSING_SUCCESSES: LazyLock<Result<IntCounter>> = LazyLock::new(|| {
|
||||
try_create_int_counter(
|
||||
"payload_envelope_processing_successes_total",
|
||||
"Count of payload envelopes processed without error",
|
||||
)
|
||||
});
|
||||
pub static ENVELOPE_PROCESSING_TIMES: LazyLock<Result<Histogram>> = LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"payload_envelope_processing_seconds",
|
||||
"Full runtime of payload envelope processing",
|
||||
)
|
||||
});
|
||||
pub static ENVELOPE_PROCESSING_DB_WRITE: LazyLock<Result<Histogram>> = LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"payload_envelope_processing_db_write_seconds",
|
||||
"Time spent writing a newly processed payload envelope and state to DB",
|
||||
)
|
||||
});
|
||||
pub static ENVELOPE_PROCESSING_POST_EXEC_PROCESSING: LazyLock<Result<Histogram>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_histogram_with_buckets(
|
||||
"payload_envelope_processing_post_exec_pre_attestable_seconds",
|
||||
"Time between finishing execution processing and the payload envelope
|
||||
becoming attestable",
|
||||
linear_buckets(0.01, 0.01, 15),
|
||||
)
|
||||
});
|
||||
|
||||
/*
|
||||
* Block Processing
|
||||
*/
|
||||
|
||||
@@ -0,0 +1,142 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use slot_clock::SlotClock;
|
||||
use state_processing::{
|
||||
VerifySignatures,
|
||||
envelope_processing::{VerifyStateRoot, process_execution_payload_envelope},
|
||||
};
|
||||
use types::{EthSpec, SignedExecutionPayloadEnvelope};
|
||||
|
||||
use crate::{
|
||||
BeaconChain, BeaconChainError, BeaconChainTypes, NotifyExecutionLayer,
|
||||
PayloadVerificationOutcome,
|
||||
block_verification::PayloadVerificationHandle,
|
||||
payload_envelope_verification::{
|
||||
EnvelopeError, EnvelopeImportData, MaybeAvailableEnvelope,
|
||||
gossip_verified_envelope::GossipVerifiedEnvelope, load_snapshot,
|
||||
payload_notifier::PayloadNotifier,
|
||||
},
|
||||
};
|
||||
|
||||
pub trait IntoExecutionPendingEnvelope<T: BeaconChainTypes>: Sized {
|
||||
fn into_execution_pending_envelope(
|
||||
self,
|
||||
chain: &Arc<BeaconChain<T>>,
|
||||
notify_execution_layer: NotifyExecutionLayer,
|
||||
) -> Result<ExecutionPendingEnvelope<T::EthSpec>, EnvelopeError>;
|
||||
|
||||
fn envelope(&self) -> &Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>;
|
||||
}
|
||||
|
||||
pub struct ExecutionPendingEnvelope<E: EthSpec> {
|
||||
pub signed_envelope: MaybeAvailableEnvelope<E>,
|
||||
pub import_data: EnvelopeImportData<E>,
|
||||
pub payload_verification_handle: PayloadVerificationHandle,
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> IntoExecutionPendingEnvelope<T> for GossipVerifiedEnvelope<T> {
|
||||
fn into_execution_pending_envelope(
|
||||
self,
|
||||
chain: &Arc<BeaconChain<T>>,
|
||||
notify_execution_layer: NotifyExecutionLayer,
|
||||
) -> Result<ExecutionPendingEnvelope<T::EthSpec>, EnvelopeError> {
|
||||
let signed_envelope = self.signed_envelope;
|
||||
let envelope = &signed_envelope.message;
|
||||
let payload = &envelope.payload;
|
||||
|
||||
// TODO(gloas)
|
||||
|
||||
// Verify the execution payload is valid
|
||||
let payload_notifier = PayloadNotifier::new(
|
||||
chain.clone(),
|
||||
signed_envelope.clone(),
|
||||
self.block.clone(),
|
||||
notify_execution_layer,
|
||||
)?;
|
||||
let block_root = envelope.beacon_block_root;
|
||||
let slot = self.block.slot();
|
||||
|
||||
let payload_verification_future = async move {
|
||||
let chain = payload_notifier.chain.clone();
|
||||
if let Some(started_execution) = chain.slot_clock.now_duration() {
|
||||
chain
|
||||
.envelope_times_cache
|
||||
.write()
|
||||
.set_time_started_execution(block_root, slot, started_execution);
|
||||
}
|
||||
|
||||
let payload_verification_status = payload_notifier.notify_new_payload().await?;
|
||||
Ok(PayloadVerificationOutcome {
|
||||
payload_verification_status,
|
||||
// This fork is after the merge so it'll never be the merge transition block
|
||||
is_valid_merge_transition_block: false,
|
||||
})
|
||||
};
|
||||
// Spawn the payload verification future as a new task, but don't wait for it to complete.
|
||||
// The `payload_verification_future` will be awaited later to ensure verification completed
|
||||
// successfully.
|
||||
let payload_verification_handle = chain
|
||||
.task_executor
|
||||
.spawn_handle(
|
||||
payload_verification_future,
|
||||
"execution_payload_verification",
|
||||
)
|
||||
.ok_or(BeaconChainError::RuntimeShutdown)?;
|
||||
|
||||
let snapshot = if let Some(snapshot) = self.snapshot {
|
||||
*snapshot
|
||||
} else {
|
||||
load_snapshot(
|
||||
signed_envelope.as_ref(),
|
||||
&chain.canonical_head,
|
||||
&chain.store,
|
||||
)?
|
||||
};
|
||||
let mut state = snapshot.pre_state;
|
||||
|
||||
// All the state modifications are done in envelope_processing
|
||||
process_execution_payload_envelope(
|
||||
&mut state,
|
||||
Some(snapshot.state_root),
|
||||
&signed_envelope,
|
||||
// verify signature already done for GossipVerifiedEnvelope
|
||||
VerifySignatures::False,
|
||||
VerifyStateRoot::True,
|
||||
&chain.spec,
|
||||
)?;
|
||||
|
||||
Ok(ExecutionPendingEnvelope {
|
||||
signed_envelope: MaybeAvailableEnvelope::AvailabilityPending {
|
||||
block_hash: payload.block_hash,
|
||||
envelope: signed_envelope,
|
||||
},
|
||||
import_data: EnvelopeImportData {
|
||||
block_root,
|
||||
block: self.block,
|
||||
post_state: Box::new(state),
|
||||
},
|
||||
payload_verification_handle,
|
||||
})
|
||||
}
|
||||
|
||||
fn envelope(&self) -> &Arc<SignedExecutionPayloadEnvelope<T::EthSpec>> {
|
||||
&self.signed_envelope
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> IntoExecutionPendingEnvelope<T>
|
||||
for Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>
|
||||
{
|
||||
fn into_execution_pending_envelope(
|
||||
self,
|
||||
chain: &Arc<BeaconChain<T>>,
|
||||
notify_execution_layer: NotifyExecutionLayer,
|
||||
) -> Result<ExecutionPendingEnvelope<T::EthSpec>, EnvelopeError> {
|
||||
GossipVerifiedEnvelope::new(self, &chain.gossip_verification_context())?
|
||||
.into_execution_pending_envelope(chain, notify_execution_layer)
|
||||
}
|
||||
|
||||
fn envelope(&self) -> &Arc<SignedExecutionPayloadEnvelope<T::EthSpec>> {
|
||||
self
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,446 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use educe::Educe;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use store::DatabaseBlock;
|
||||
use tracing::{Span, debug};
|
||||
use types::{
|
||||
ChainSpec, EthSpec, ExecutionPayloadBid, ExecutionPayloadEnvelope, Hash256, SignedBeaconBlock,
|
||||
SignedExecutionPayloadEnvelope, Slot, consts::gloas::BUILDER_INDEX_SELF_BUILD,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
BeaconChain, BeaconChainError, BeaconChainTypes, BeaconStore,
|
||||
beacon_proposer_cache::{self, BeaconProposerCache},
|
||||
canonical_head::CanonicalHead,
|
||||
payload_envelope_verification::{
|
||||
EnvelopeError, EnvelopeProcessingSnapshot, load_snapshot_from_state_root,
|
||||
},
|
||||
validator_pubkey_cache::ValidatorPubkeyCache,
|
||||
};
|
||||
|
||||
/// Bundles only the dependencies needed for gossip verification of execution payload envelopes,
|
||||
/// decoupling `GossipVerifiedEnvelope::new` from the full `BeaconChain`.
|
||||
pub struct GossipVerificationContext<'a, T: BeaconChainTypes> {
|
||||
pub canonical_head: &'a CanonicalHead<T>,
|
||||
pub store: &'a BeaconStore<T>,
|
||||
pub spec: &'a ChainSpec,
|
||||
pub beacon_proposer_cache: &'a Mutex<BeaconProposerCache>,
|
||||
pub validator_pubkey_cache: &'a RwLock<ValidatorPubkeyCache<T>>,
|
||||
pub genesis_validators_root: Hash256,
|
||||
}
|
||||
|
||||
/// Verify that an execution payload envelope is consistent with its beacon block
|
||||
/// and execution bid. This checks:
|
||||
/// - The envelope slot is not prior to finalization
|
||||
/// - The envelope slot matches the block slot
|
||||
/// - The builder index matches the committed bid
|
||||
/// - The payload block hash matches the committed bid
|
||||
pub(crate) fn verify_envelope_consistency<E: EthSpec>(
|
||||
envelope: &ExecutionPayloadEnvelope<E>,
|
||||
block: &SignedBeaconBlock<E>,
|
||||
execution_bid: &ExecutionPayloadBid<E>,
|
||||
latest_finalized_slot: Slot,
|
||||
) -> Result<(), EnvelopeError> {
|
||||
// Check that the envelope's slot isn't from a slot prior
|
||||
// to the latest finalized slot.
|
||||
if envelope.slot < latest_finalized_slot {
|
||||
return Err(EnvelopeError::PriorToFinalization {
|
||||
payload_slot: envelope.slot,
|
||||
latest_finalized_slot,
|
||||
});
|
||||
}
|
||||
|
||||
// Check that the slot of the envelope matches the slot of the parent block.
|
||||
if envelope.slot != block.slot() {
|
||||
return Err(EnvelopeError::SlotMismatch {
|
||||
block: block.slot(),
|
||||
envelope: envelope.slot,
|
||||
});
|
||||
}
|
||||
|
||||
// Builder index matches committed bid.
|
||||
if envelope.builder_index != execution_bid.builder_index {
|
||||
return Err(EnvelopeError::BuilderIndexMismatch {
|
||||
committed_bid: execution_bid.builder_index,
|
||||
envelope: envelope.builder_index,
|
||||
});
|
||||
}
|
||||
|
||||
// The block hash should match the block hash of the execution bid.
|
||||
if envelope.payload.block_hash != execution_bid.block_hash {
|
||||
return Err(EnvelopeError::BlockHashMismatch {
|
||||
committed_bid: execution_bid.block_hash,
|
||||
envelope: envelope.payload.block_hash,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// A wrapper around a `SignedExecutionPayloadEnvelope` that indicates it has been approved for re-gossiping on
|
||||
/// the p2p network.
|
||||
#[derive(Educe)]
|
||||
#[educe(Debug(bound = "T: BeaconChainTypes"))]
|
||||
pub struct GossipVerifiedEnvelope<T: BeaconChainTypes> {
|
||||
pub signed_envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||
pub block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
pub snapshot: Option<Box<EnvelopeProcessingSnapshot<T::EthSpec>>>,
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> GossipVerifiedEnvelope<T> {
|
||||
pub fn new(
|
||||
signed_envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||
ctx: &GossipVerificationContext<'_, T>,
|
||||
) -> Result<Self, EnvelopeError> {
|
||||
let envelope = &signed_envelope.message;
|
||||
let beacon_block_root = envelope.beacon_block_root;
|
||||
|
||||
// Check that we've seen the beacon block for this envelope and that it passes validation.
|
||||
// TODO(EIP-7732): We might need some type of status table in order to differentiate between:
|
||||
// If we have a block_processing_table, we could have a Processed(Bid, bool) state that is only
|
||||
// entered post adding to fork choice. That way, we could potentially need only a single call to make
|
||||
// sure the block is valid and to do all consequent checks with the bid
|
||||
//
|
||||
// 1. Blocks we haven't seen (IGNORE), and
|
||||
// 2. Blocks we've seen that are invalid (REJECT).
|
||||
//
|
||||
// Presently these two cases are conflated.
|
||||
let fork_choice_read_lock = ctx.canonical_head.fork_choice_read_lock();
|
||||
let Some(proto_block) = fork_choice_read_lock.get_block(&beacon_block_root) else {
|
||||
return Err(EnvelopeError::BlockRootUnknown {
|
||||
block_root: beacon_block_root,
|
||||
});
|
||||
};
|
||||
|
||||
drop(fork_choice_read_lock);
|
||||
|
||||
let latest_finalized_slot = ctx
|
||||
.canonical_head
|
||||
.cached_head()
|
||||
.finalized_checkpoint()
|
||||
.epoch
|
||||
.start_slot(T::EthSpec::slots_per_epoch());
|
||||
|
||||
// TODO(EIP-7732): check that we haven't seen another valid `SignedExecutionPayloadEnvelope`
|
||||
// for this block root from this builder - envelope status table check
|
||||
let block = match ctx.store.try_get_full_block(&beacon_block_root)? {
|
||||
Some(DatabaseBlock::Full(block)) => Arc::new(block),
|
||||
Some(DatabaseBlock::Blinded(_)) | None => {
|
||||
return Err(EnvelopeError::from(BeaconChainError::MissingBeaconBlock(
|
||||
beacon_block_root,
|
||||
)));
|
||||
}
|
||||
};
|
||||
let execution_bid = &block
|
||||
.message()
|
||||
.body()
|
||||
.signed_execution_payload_bid()?
|
||||
.message;
|
||||
|
||||
verify_envelope_consistency(envelope, &block, execution_bid, latest_finalized_slot)?;
|
||||
|
||||
// Verify the envelope signature.
|
||||
//
|
||||
// For self-built envelopes, we can use the proposer cache for the fork and the
|
||||
// validator pubkey cache for the proposer's pubkey, avoiding a state load from disk.
|
||||
// For external builder envelopes, we must load the state to access the builder registry.
|
||||
let builder_index = envelope.builder_index;
|
||||
let block_slot = envelope.slot;
|
||||
let block_epoch = block_slot.epoch(T::EthSpec::slots_per_epoch());
|
||||
let proposer_shuffling_decision_block =
|
||||
proto_block.proposer_shuffling_root_for_child_block(block_epoch, ctx.spec);
|
||||
|
||||
let (signature_is_valid, opt_snapshot) = if builder_index == BUILDER_INDEX_SELF_BUILD {
|
||||
// Fast path: self-built envelopes can be verified without loading the state.
|
||||
let mut opt_snapshot = None;
|
||||
let proposer = beacon_proposer_cache::with_proposer_cache(
|
||||
ctx.beacon_proposer_cache,
|
||||
ctx.spec,
|
||||
proposer_shuffling_decision_block,
|
||||
block_epoch,
|
||||
|proposers| proposers.get_slot::<T::EthSpec>(block_slot),
|
||||
|| {
|
||||
debug!(
|
||||
%beacon_block_root,
|
||||
"Proposer shuffling cache miss for envelope verification"
|
||||
);
|
||||
let snapshot = load_snapshot_from_state_root::<T>(
|
||||
beacon_block_root,
|
||||
proto_block.state_root,
|
||||
ctx.store,
|
||||
)?;
|
||||
opt_snapshot = Some(Box::new(snapshot.clone()));
|
||||
Ok::<_, EnvelopeError>((snapshot.state_root, snapshot.pre_state))
|
||||
},
|
||||
)?;
|
||||
let expected_proposer = proposer.index;
|
||||
let fork = proposer.fork;
|
||||
|
||||
if block.message().proposer_index() != expected_proposer as u64 {
|
||||
return Err(EnvelopeError::IncorrectBlockProposer {
|
||||
block: block.message().proposer_index(),
|
||||
local_shuffling: expected_proposer as u64,
|
||||
});
|
||||
}
|
||||
|
||||
let pubkey_cache = ctx.validator_pubkey_cache.read();
|
||||
let pubkey = pubkey_cache
|
||||
.get(block.message().proposer_index() as usize)
|
||||
.ok_or_else(|| EnvelopeError::UnknownValidator {
|
||||
builder_index: block.message().proposer_index(),
|
||||
})?;
|
||||
let is_valid = signed_envelope.verify_signature(
|
||||
pubkey,
|
||||
&fork,
|
||||
ctx.genesis_validators_root,
|
||||
ctx.spec,
|
||||
);
|
||||
(is_valid, opt_snapshot)
|
||||
} else {
|
||||
// TODO(gloas) if we implement a builder pubkey cache, we'll need to use it here.
|
||||
// External builder: must load the state to get the builder pubkey.
|
||||
let snapshot = load_snapshot_from_state_root::<T>(
|
||||
beacon_block_root,
|
||||
proto_block.state_root,
|
||||
ctx.store,
|
||||
)?;
|
||||
let is_valid =
|
||||
signed_envelope.verify_signature_with_state(&snapshot.pre_state, ctx.spec)?;
|
||||
(is_valid, Some(Box::new(snapshot)))
|
||||
};
|
||||
|
||||
if !signature_is_valid {
|
||||
return Err(EnvelopeError::BadSignature);
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
signed_envelope,
|
||||
block,
|
||||
snapshot: opt_snapshot,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn envelope_cloned(&self) -> Arc<SignedExecutionPayloadEnvelope<T::EthSpec>> {
|
||||
self.signed_envelope.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
/// Build a `GossipVerificationContext` from this `BeaconChain`.
|
||||
pub fn gossip_verification_context(&self) -> GossipVerificationContext<'_, T> {
|
||||
GossipVerificationContext {
|
||||
canonical_head: &self.canonical_head,
|
||||
store: &self.store,
|
||||
spec: &self.spec,
|
||||
beacon_proposer_cache: &self.beacon_proposer_cache,
|
||||
validator_pubkey_cache: &self.validator_pubkey_cache,
|
||||
genesis_validators_root: self.genesis_validators_root,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns `Ok(GossipVerifiedEnvelope)` if the supplied `envelope` should be forwarded onto the
|
||||
/// gossip network. The envelope is not imported into the chain, it is just partially verified.
|
||||
///
|
||||
/// The returned `GossipVerifiedEnvelope` should be provided to `Self::process_execution_payload_envelope` immediately
|
||||
/// after it is returned, unless some other circumstance decides it should not be imported at
|
||||
/// all.
|
||||
///
|
||||
/// ## Errors
|
||||
///
|
||||
/// Returns an `Err` if the given envelope was invalid, or an error was encountered during
|
||||
pub async fn verify_envelope_for_gossip(
|
||||
self: &Arc<Self>,
|
||||
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||
) -> Result<GossipVerifiedEnvelope<T>, EnvelopeError> {
|
||||
let chain = self.clone();
|
||||
let span = Span::current();
|
||||
self.task_executor
|
||||
.clone()
|
||||
.spawn_blocking_handle(
|
||||
move || {
|
||||
let _guard = span.enter();
|
||||
let slot = envelope.slot();
|
||||
let beacon_block_root = envelope.message.beacon_block_root;
|
||||
|
||||
let ctx = chain.gossip_verification_context();
|
||||
match GossipVerifiedEnvelope::new(envelope, &ctx) {
|
||||
Ok(verified) => {
|
||||
debug!(
|
||||
%slot,
|
||||
?beacon_block_root,
|
||||
"Successfully verified gossip envelope"
|
||||
);
|
||||
|
||||
Ok(verified)
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(
|
||||
error = e.to_string(),
|
||||
?beacon_block_root,
|
||||
%slot,
|
||||
"Rejected gossip envelope"
|
||||
);
|
||||
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
},
|
||||
"gossip_envelope_verification_handle",
|
||||
)
|
||||
.ok_or(BeaconChainError::RuntimeShutdown)?
|
||||
.await
|
||||
.map_err(BeaconChainError::TokioJoin)?
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::marker::PhantomData;
|
||||
|
||||
use bls::Signature;
|
||||
use ssz_types::VariableList;
|
||||
use types::{
|
||||
BeaconBlock, BeaconBlockBodyGloas, BeaconBlockGloas, Eth1Data, ExecutionBlockHash,
|
||||
ExecutionPayloadBid, ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests,
|
||||
Graffiti, Hash256, MinimalEthSpec, SignedBeaconBlock, SignedExecutionPayloadBid, Slot,
|
||||
SyncAggregate,
|
||||
};
|
||||
|
||||
use super::verify_envelope_consistency;
|
||||
use crate::payload_envelope_verification::EnvelopeError;
|
||||
|
||||
type E = MinimalEthSpec;
|
||||
|
||||
fn make_envelope(
|
||||
slot: Slot,
|
||||
builder_index: u64,
|
||||
block_hash: ExecutionBlockHash,
|
||||
) -> ExecutionPayloadEnvelope<E> {
|
||||
ExecutionPayloadEnvelope {
|
||||
payload: ExecutionPayloadGloas {
|
||||
block_hash,
|
||||
..ExecutionPayloadGloas::default()
|
||||
},
|
||||
execution_requests: ExecutionRequests::default(),
|
||||
builder_index,
|
||||
beacon_block_root: Hash256::ZERO,
|
||||
slot,
|
||||
state_root: Hash256::ZERO,
|
||||
}
|
||||
}
|
||||
|
||||
fn make_block(slot: Slot) -> SignedBeaconBlock<E> {
|
||||
let block = BeaconBlock::Gloas(BeaconBlockGloas {
|
||||
slot,
|
||||
proposer_index: 0,
|
||||
parent_root: Hash256::ZERO,
|
||||
state_root: Hash256::ZERO,
|
||||
body: BeaconBlockBodyGloas {
|
||||
randao_reveal: Signature::empty(),
|
||||
eth1_data: Eth1Data {
|
||||
deposit_root: Hash256::ZERO,
|
||||
block_hash: Hash256::ZERO,
|
||||
deposit_count: 0,
|
||||
},
|
||||
graffiti: Graffiti::default(),
|
||||
proposer_slashings: VariableList::empty(),
|
||||
attester_slashings: VariableList::empty(),
|
||||
attestations: VariableList::empty(),
|
||||
deposits: VariableList::empty(),
|
||||
voluntary_exits: VariableList::empty(),
|
||||
sync_aggregate: SyncAggregate::empty(),
|
||||
bls_to_execution_changes: VariableList::empty(),
|
||||
signed_execution_payload_bid: SignedExecutionPayloadBid::empty(),
|
||||
payload_attestations: VariableList::empty(),
|
||||
_phantom: PhantomData,
|
||||
},
|
||||
});
|
||||
SignedBeaconBlock::from_block(block, Signature::empty())
|
||||
}
|
||||
|
||||
fn make_bid(builder_index: u64, block_hash: ExecutionBlockHash) -> ExecutionPayloadBid<E> {
|
||||
ExecutionPayloadBid {
|
||||
builder_index,
|
||||
block_hash,
|
||||
..ExecutionPayloadBid::default()
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_valid_envelope() {
|
||||
let slot = Slot::new(10);
|
||||
let builder_index = 5;
|
||||
let block_hash = ExecutionBlockHash::repeat_byte(0xaa);
|
||||
|
||||
let envelope = make_envelope(slot, builder_index, block_hash);
|
||||
let block = make_block(slot);
|
||||
let bid = make_bid(builder_index, block_hash);
|
||||
|
||||
assert!(verify_envelope_consistency::<E>(&envelope, &block, &bid, Slot::new(0)).is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prior_to_finalization() {
|
||||
let slot = Slot::new(5);
|
||||
let builder_index = 1;
|
||||
let block_hash = ExecutionBlockHash::repeat_byte(0xbb);
|
||||
|
||||
let envelope = make_envelope(slot, builder_index, block_hash);
|
||||
let block = make_block(slot);
|
||||
let bid = make_bid(builder_index, block_hash);
|
||||
let latest_finalized_slot = Slot::new(10);
|
||||
|
||||
let result =
|
||||
verify_envelope_consistency::<E>(&envelope, &block, &bid, latest_finalized_slot);
|
||||
assert!(matches!(
|
||||
result,
|
||||
Err(EnvelopeError::PriorToFinalization { .. })
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_slot_mismatch() {
|
||||
let builder_index = 1;
|
||||
let block_hash = ExecutionBlockHash::repeat_byte(0xcc);
|
||||
|
||||
let envelope = make_envelope(Slot::new(10), builder_index, block_hash);
|
||||
let block = make_block(Slot::new(20));
|
||||
let bid = make_bid(builder_index, block_hash);
|
||||
|
||||
let result = verify_envelope_consistency::<E>(&envelope, &block, &bid, Slot::new(0));
|
||||
assert!(matches!(result, Err(EnvelopeError::SlotMismatch { .. })));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_builder_index_mismatch() {
|
||||
let slot = Slot::new(10);
|
||||
let block_hash = ExecutionBlockHash::repeat_byte(0xdd);
|
||||
|
||||
let envelope = make_envelope(slot, 1, block_hash);
|
||||
let block = make_block(slot);
|
||||
let bid = make_bid(2, block_hash);
|
||||
|
||||
let result = verify_envelope_consistency::<E>(&envelope, &block, &bid, Slot::new(0));
|
||||
assert!(matches!(
|
||||
result,
|
||||
Err(EnvelopeError::BuilderIndexMismatch { .. })
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_block_hash_mismatch() {
|
||||
let slot = Slot::new(10);
|
||||
let builder_index = 1;
|
||||
|
||||
let envelope = make_envelope(slot, builder_index, ExecutionBlockHash::repeat_byte(0xee));
|
||||
let block = make_block(slot);
|
||||
let bid = make_bid(builder_index, ExecutionBlockHash::repeat_byte(0xff));
|
||||
|
||||
let result = verify_envelope_consistency::<E>(&envelope, &block, &bid, Slot::new(0));
|
||||
assert!(matches!(
|
||||
result,
|
||||
Err(EnvelopeError::BlockHashMismatch { .. })
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,385 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use fork_choice::PayloadVerificationStatus;
|
||||
use logging::crit;
|
||||
use slot_clock::SlotClock;
|
||||
use store::StoreOp;
|
||||
use tracing::{debug, error, info, info_span, instrument, warn};
|
||||
use types::{BeaconState, BlockImportSource, Hash256, SignedBeaconBlock, Slot};
|
||||
|
||||
use super::{
|
||||
AvailableEnvelope, AvailableExecutedEnvelope, EnvelopeError, EnvelopeImportData,
|
||||
ExecutedEnvelope, IntoExecutionPendingEnvelope,
|
||||
};
|
||||
use crate::{
|
||||
AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes,
|
||||
NotifyExecutionLayer,
|
||||
block_verification_types::{AsBlock, AvailableBlockData},
|
||||
metrics,
|
||||
payload_envelope_verification::ExecutionPendingEnvelope,
|
||||
validator_monitor::{get_slot_delay_ms, timestamp_now},
|
||||
};
|
||||
|
||||
impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
/// Returns `Ok(block_root)` if the given `unverified_envelope` was successfully verified and
|
||||
/// imported into the chain.
|
||||
///
|
||||
/// Items that implement `IntoExecutionPendingEnvelope` include:
|
||||
///
|
||||
/// - `GossipVerifiedEnvelope`
|
||||
/// - TODO(gloas) implement for envelopes recieved over RPC
|
||||
///
|
||||
/// ## Errors
|
||||
///
|
||||
/// Returns an `Err` if the given block was invalid, or an error was encountered during
|
||||
/// verification.
|
||||
#[instrument(skip_all, fields(block_root = ?block_root, block_source = %block_source))]
|
||||
pub async fn process_execution_payload_envelope<P: IntoExecutionPendingEnvelope<T>>(
|
||||
self: &Arc<Self>,
|
||||
block_root: Hash256,
|
||||
unverified_envelope: P,
|
||||
notify_execution_layer: NotifyExecutionLayer,
|
||||
block_source: BlockImportSource,
|
||||
publish_fn: impl FnOnce() -> Result<(), EnvelopeError>,
|
||||
) -> Result<AvailabilityProcessingStatus, EnvelopeError> {
|
||||
let block_slot = unverified_envelope.envelope().slot();
|
||||
|
||||
// Set observed time if not already set. Usually this should be set by gossip or RPC,
|
||||
// but just in case we set it again here (useful for tests).
|
||||
if let Some(seen_timestamp) = self.slot_clock.now_duration() {
|
||||
self.envelope_times_cache.write().set_time_observed(
|
||||
block_root,
|
||||
block_slot,
|
||||
seen_timestamp,
|
||||
None,
|
||||
);
|
||||
}
|
||||
|
||||
// TODO(gloas) insert the pre-executed envelope into some type of cache.
|
||||
|
||||
let _full_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_TIMES);
|
||||
|
||||
metrics::inc_counter(&metrics::ENVELOPE_PROCESSING_REQUESTS);
|
||||
|
||||
// A small closure to group the verification and import errors.
|
||||
let chain = self.clone();
|
||||
let import_envelope = async move {
|
||||
let execution_pending = unverified_envelope
|
||||
.into_execution_pending_envelope(&chain, notify_execution_layer)?;
|
||||
publish_fn()?;
|
||||
|
||||
// Record the time it took to complete consensus verification.
|
||||
if let Some(timestamp) = chain.slot_clock.now_duration() {
|
||||
chain
|
||||
.envelope_times_cache
|
||||
.write()
|
||||
.set_time_consensus_verified(block_root, block_slot, timestamp);
|
||||
}
|
||||
|
||||
let envelope_times_cache = chain.envelope_times_cache.clone();
|
||||
let slot_clock = chain.slot_clock.clone();
|
||||
|
||||
let executed_envelope = chain
|
||||
.into_executed_payload_envelope(execution_pending)
|
||||
.await
|
||||
.inspect_err(|_| {
|
||||
// TODO(gloas) If the envelope fails execution for whatever reason (e.g. engine offline),
|
||||
// and we keep it in the cache, then the node will NOT perform lookup and
|
||||
// reprocess this block until the block is evicted from DA checker, causing the
|
||||
// chain to get stuck temporarily if the block is canonical. Therefore we remove
|
||||
// it from the cache if execution fails.
|
||||
})?;
|
||||
|
||||
// Record the time it took to wait for execution layer verification.
|
||||
if let Some(timestamp) = slot_clock.now_duration() {
|
||||
envelope_times_cache
|
||||
.write()
|
||||
.set_time_executed(block_root, block_slot, timestamp);
|
||||
}
|
||||
|
||||
match executed_envelope {
|
||||
ExecutedEnvelope::Available(envelope) => {
|
||||
self.import_available_execution_payload_envelope(Box::new(envelope))
|
||||
.await
|
||||
}
|
||||
ExecutedEnvelope::AvailabilityPending() => Err(EnvelopeError::InternalError(
|
||||
"Pending payload envelope not yet implemented".to_owned(),
|
||||
)),
|
||||
}
|
||||
};
|
||||
|
||||
// Verify and import the block.
|
||||
match import_envelope.await {
|
||||
// The block was successfully verified and imported. Yay.
|
||||
Ok(status @ AvailabilityProcessingStatus::Imported(block_root)) => {
|
||||
info!(
|
||||
?block_root,
|
||||
%block_slot,
|
||||
source = %block_source,
|
||||
"Execution payload envelope imported"
|
||||
);
|
||||
|
||||
metrics::inc_counter(&metrics::ENVELOPE_PROCESSING_SUCCESSES);
|
||||
|
||||
Ok(status)
|
||||
}
|
||||
Ok(status @ AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => {
|
||||
debug!(?block_root, %slot, "Beacon block awaiting blobs");
|
||||
|
||||
Ok(status)
|
||||
}
|
||||
Err(EnvelopeError::BeaconChainError(e)) => {
|
||||
match e.as_ref() {
|
||||
BeaconChainError::TokioJoin(e) => {
|
||||
debug!(
|
||||
error = ?e,
|
||||
"Envelope processing cancelled"
|
||||
);
|
||||
}
|
||||
_ => {
|
||||
// There was an error whilst attempting to verify and import the block. The block might
|
||||
// be partially verified or partially imported.
|
||||
crit!(
|
||||
error = ?e,
|
||||
"Envelope processing error"
|
||||
);
|
||||
}
|
||||
};
|
||||
Err(EnvelopeError::BeaconChainError(e))
|
||||
}
|
||||
// The block failed verification.
|
||||
Err(other) => {
|
||||
warn!(
|
||||
reason = other.to_string(),
|
||||
"Execution payload envelope rejected"
|
||||
);
|
||||
Err(other)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Accepts a fully-verified payload envelope and awaits on its payload verification handle to
|
||||
/// get a fully `ExecutedEnvelope`.
|
||||
///
|
||||
/// An error is returned if the verification handle couldn't be awaited.
|
||||
#[instrument(skip_all, level = "debug")]
|
||||
pub async fn into_executed_payload_envelope(
|
||||
self: Arc<Self>,
|
||||
pending_envelope: ExecutionPendingEnvelope<T::EthSpec>,
|
||||
) -> Result<ExecutedEnvelope<T::EthSpec>, EnvelopeError> {
|
||||
let ExecutionPendingEnvelope {
|
||||
signed_envelope,
|
||||
import_data,
|
||||
payload_verification_handle,
|
||||
} = pending_envelope;
|
||||
|
||||
let payload_verification_outcome = payload_verification_handle
|
||||
.await
|
||||
.map_err(BeaconChainError::TokioJoin)?
|
||||
.ok_or(BeaconChainError::RuntimeShutdown)??;
|
||||
|
||||
Ok(ExecutedEnvelope::new(
|
||||
signed_envelope,
|
||||
import_data,
|
||||
payload_verification_outcome,
|
||||
))
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub async fn import_available_execution_payload_envelope(
|
||||
self: &Arc<Self>,
|
||||
envelope: Box<AvailableExecutedEnvelope<T::EthSpec>>,
|
||||
) -> Result<AvailabilityProcessingStatus, EnvelopeError> {
|
||||
let AvailableExecutedEnvelope {
|
||||
envelope,
|
||||
import_data,
|
||||
payload_verification_outcome,
|
||||
} = *envelope;
|
||||
|
||||
let EnvelopeImportData {
|
||||
block_root,
|
||||
block,
|
||||
post_state,
|
||||
} = import_data;
|
||||
|
||||
let block_root = {
|
||||
// Capture the current span before moving into the blocking task
|
||||
let current_span = tracing::Span::current();
|
||||
let chain = self.clone();
|
||||
self.spawn_blocking_handle(
|
||||
move || {
|
||||
// Enter the captured span in the blocking thread
|
||||
let _guard = current_span.enter();
|
||||
chain.import_execution_payload_envelope(
|
||||
envelope,
|
||||
block_root,
|
||||
*post_state,
|
||||
payload_verification_outcome.payload_verification_status,
|
||||
block,
|
||||
)
|
||||
},
|
||||
"payload_verification_handle",
|
||||
)
|
||||
.await??
|
||||
};
|
||||
|
||||
Ok(AvailabilityProcessingStatus::Imported(block_root))
|
||||
}
|
||||
|
||||
/// Accepts a fully-verified and available envelope and imports it into the chain without performing any
|
||||
/// additional verification.
|
||||
///
|
||||
/// An error is returned if the envelope was unable to be imported. It may be partially imported
|
||||
/// (i.e., this function is not atomic).
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(skip_all)]
|
||||
fn import_execution_payload_envelope(
|
||||
&self,
|
||||
signed_envelope: AvailableEnvelope<T::EthSpec>,
|
||||
block_root: Hash256,
|
||||
state: BeaconState<T::EthSpec>,
|
||||
_payload_verification_status: PayloadVerificationStatus,
|
||||
parent_block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
) -> Result<Hash256, EnvelopeError> {
|
||||
// Everything in this initial section is on the hot path for processing the envelope.
|
||||
|
||||
let post_exec_timer =
|
||||
metrics::start_timer(&metrics::ENVELOPE_PROCESSING_POST_EXEC_PROCESSING);
|
||||
|
||||
// Check the payloads parent block against weak subjectivity checkpoint.
|
||||
self.check_block_against_weak_subjectivity_checkpoint(
|
||||
parent_block.message(),
|
||||
block_root,
|
||||
&state,
|
||||
)?;
|
||||
|
||||
// Take an upgradable read lock on fork choice so we can check if this block has already
|
||||
// been imported. We don't want to repeat work importing a block that is already imported.
|
||||
let fork_choice_reader = self.canonical_head.fork_choice_upgradable_read_lock();
|
||||
if !fork_choice_reader.contains_block(&block_root) {
|
||||
return Err(EnvelopeError::BlockRootUnknown { block_root });
|
||||
}
|
||||
|
||||
// TODO(gloas) no fork choice logic yet
|
||||
// Take an exclusive write-lock on fork choice. It's very important to prevent deadlocks by
|
||||
// avoiding taking other locks whilst holding this lock.
|
||||
// let fork_choice = parking_lot::RwLockUpgradableReadGuard::upgrade(fork_choice_reader);
|
||||
|
||||
// TODO(gloas) Do we need this check? Do not import a block that doesn't descend from the finalized root.
|
||||
// let signed_block = check_block_is_finalized_checkpoint_or_descendant(self, &fork_choice, signed_block)?;
|
||||
|
||||
// TODO(gloas) Do we want to use an early attester cache like mechanism for payload enevelopes?
|
||||
// TODO(gloas) emit SSE event if the payload became the new head payload
|
||||
drop(post_exec_timer);
|
||||
|
||||
// It is important NOT to return errors here before the database commit, because the envelope
|
||||
// has already been added to fork choice and the database would be left in an inconsistent
|
||||
// state if we returned early without committing. In other words, an error here would
|
||||
// corrupt the node's database permanently.
|
||||
|
||||
// Store the envelope and its state, and execute the confirmation batch for the intermediate
|
||||
// states, which will delete their temporary flags.
|
||||
// If the write fails, revert fork choice to the version from disk, else we can
|
||||
// end up with envelopes in fork choice that are missing from disk.
|
||||
// See https://github.com/sigp/lighthouse/issues/2028
|
||||
let (signed_envelope, columns) = signed_envelope.deconstruct();
|
||||
|
||||
let mut ops = vec![];
|
||||
|
||||
match self.get_blobs_or_columns_store_op(
|
||||
block_root,
|
||||
signed_envelope.slot(),
|
||||
AvailableBlockData::DataColumns(columns),
|
||||
) {
|
||||
Ok(Some(blobs_or_columns_store_op)) => {
|
||||
ops.push(blobs_or_columns_store_op);
|
||||
}
|
||||
Ok(None) => {}
|
||||
Err(e) => {
|
||||
error!(
|
||||
msg = "Restoring fork choice from disk",
|
||||
error = &e,
|
||||
?block_root,
|
||||
"Failed to store data columns into the database"
|
||||
);
|
||||
// TODO(gloas) implement failed write handling to fork choice
|
||||
// let _ = self.handle_import_block_db_write_error(fork_choice);
|
||||
return Err(EnvelopeError::InternalError(e));
|
||||
}
|
||||
}
|
||||
|
||||
let db_write_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_DB_WRITE);
|
||||
|
||||
ops.push(StoreOp::PutPayloadEnvelope(
|
||||
block_root,
|
||||
signed_envelope.clone(),
|
||||
));
|
||||
ops.push(StoreOp::PutState(
|
||||
signed_envelope.message.state_root,
|
||||
&state,
|
||||
));
|
||||
|
||||
let db_span = info_span!("persist_payloads_and_blobs").entered();
|
||||
|
||||
if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) {
|
||||
error!(
|
||||
msg = "Restoring fork choice from disk",
|
||||
error = ?e,
|
||||
"Database write failed!"
|
||||
);
|
||||
// TODO(gloas) handle db write failure
|
||||
// return Err(self
|
||||
// .handle_import_block_db_write_error(fork_choice)
|
||||
// .err()
|
||||
// .unwrap_or(e.into()));
|
||||
}
|
||||
|
||||
drop(db_span);
|
||||
|
||||
// TODO(gloas) drop fork choice lock
|
||||
// The fork choice write-lock is dropped *after* the on-disk database has been updated.
|
||||
// This prevents inconsistency between the two at the expense of concurrency.
|
||||
// drop(fork_choice);
|
||||
|
||||
// We're declaring the envelope "imported" at this point, since fork choice and the DB know
|
||||
// about it.
|
||||
let envelope_time_imported = timestamp_now();
|
||||
|
||||
// TODO(gloas) depending on what happens with light clients
|
||||
// we might need to do some light client related computations here
|
||||
|
||||
metrics::stop_timer(db_write_timer);
|
||||
metrics::inc_counter(&metrics::ENVELOPE_PROCESSING_SUCCESSES);
|
||||
|
||||
self.import_envelope_update_metrics_and_events(
|
||||
block_root,
|
||||
signed_envelope.slot(),
|
||||
envelope_time_imported,
|
||||
);
|
||||
|
||||
Ok(block_root)
|
||||
}
|
||||
|
||||
fn import_envelope_update_metrics_and_events(
|
||||
&self,
|
||||
block_root: Hash256,
|
||||
envelope_slot: Slot,
|
||||
envelope_time_imported: Duration,
|
||||
) {
|
||||
let envelope_delay_total =
|
||||
get_slot_delay_ms(envelope_time_imported, envelope_slot, &self.slot_clock);
|
||||
|
||||
// Do not write to the cache for envelopes older than 2 epochs, this helps reduce writes
|
||||
// to the cache during sync.
|
||||
if envelope_delay_total < self.slot_clock.slot_duration().saturating_mul(64) {
|
||||
self.envelope_times_cache.write().set_time_imported(
|
||||
block_root,
|
||||
envelope_slot,
|
||||
envelope_time_imported,
|
||||
);
|
||||
}
|
||||
|
||||
// TODO(gloas) emit SSE event for envelope import (similar to SseBlock for blocks).
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,335 @@
|
||||
//! The incremental processing steps (e.g., signatures verified but not the state transition) is
|
||||
//! represented as a sequence of wrapper-types around the block. There is a linear progression of
|
||||
//! types, starting at a `SignedExecutionPayloadEnvelope` and finishing with an `AvailableExecutedEnvelope` (see
|
||||
//! diagram below).
|
||||
//!
|
||||
//! // TODO(gloas) we might want to update this diagram to include `AvailabelExecutedEnvelope`
|
||||
//! ```ignore
|
||||
//! START
|
||||
//! |
|
||||
//! ▼
|
||||
//! SignedExecutionPayloadEnvelope
|
||||
//! |
|
||||
//! |---------------
|
||||
//! | |
|
||||
//! | ▼
|
||||
//! | GossipVerifiedEnvelope
|
||||
//! | |
|
||||
//! |---------------
|
||||
//! |
|
||||
//! ▼
|
||||
//! ExecutionPendingEnvelope
|
||||
//! |
|
||||
//! await
|
||||
//! |
|
||||
//! ▼
|
||||
//! END
|
||||
//!
|
||||
//! ```
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use store::Error as DBError;
|
||||
|
||||
use state_processing::{BlockProcessingError, envelope_processing::EnvelopeProcessingError};
|
||||
use tracing::instrument;
|
||||
use types::{
|
||||
BeaconState, BeaconStateError, ChainSpec, DataColumnSidecarList, EthSpec, ExecutionBlockHash,
|
||||
ExecutionPayloadEnvelope, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
BeaconChainError, BeaconChainTypes, BeaconStore, BlockError, ExecutionPayloadError,
|
||||
PayloadVerificationOutcome, canonical_head::CanonicalHead,
|
||||
};
|
||||
|
||||
pub mod execution_pending_envelope;
|
||||
pub mod gossip_verified_envelope;
|
||||
pub mod import;
|
||||
mod payload_notifier;
|
||||
|
||||
pub use execution_pending_envelope::{ExecutionPendingEnvelope, IntoExecutionPendingEnvelope};
|
||||
|
||||
#[derive(PartialEq)]
|
||||
pub struct EnvelopeImportData<E: EthSpec> {
|
||||
pub block_root: Hash256,
|
||||
pub block: Arc<SignedBeaconBlock<E>>,
|
||||
pub post_state: Box<BeaconState<E>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[allow(dead_code)]
|
||||
pub struct AvailableEnvelope<E: EthSpec> {
|
||||
execution_block_hash: ExecutionBlockHash,
|
||||
envelope: Arc<SignedExecutionPayloadEnvelope<E>>,
|
||||
columns: DataColumnSidecarList<E>,
|
||||
/// Timestamp at which this block first became available (UNIX timestamp, time since 1970).
|
||||
columns_available_timestamp: Option<std::time::Duration>,
|
||||
pub spec: Arc<ChainSpec>,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> AvailableEnvelope<E> {
|
||||
pub fn message(&self) -> &ExecutionPayloadEnvelope<E> {
|
||||
&self.envelope.message
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub fn deconstruct(
|
||||
self,
|
||||
) -> (
|
||||
Arc<SignedExecutionPayloadEnvelope<E>>,
|
||||
DataColumnSidecarList<E>,
|
||||
) {
|
||||
let AvailableEnvelope {
|
||||
envelope, columns, ..
|
||||
} = self;
|
||||
(envelope, columns)
|
||||
}
|
||||
}
|
||||
|
||||
pub enum MaybeAvailableEnvelope<E: EthSpec> {
|
||||
Available(AvailableEnvelope<E>),
|
||||
AvailabilityPending {
|
||||
block_hash: ExecutionBlockHash,
|
||||
envelope: Arc<SignedExecutionPayloadEnvelope<E>>,
|
||||
},
|
||||
}
|
||||
|
||||
/// This snapshot is to be used for verifying a envelope of the block.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct EnvelopeProcessingSnapshot<E: EthSpec> {
|
||||
/// This state is equivalent to the `self.beacon_block.state_root()` before applying the envelope.
|
||||
pub pre_state: BeaconState<E>,
|
||||
pub state_root: Hash256,
|
||||
pub beacon_block_root: Hash256,
|
||||
}
|
||||
|
||||
/// A payload envelope that has gone through processing checks and execution by an EL client.
|
||||
/// This envelope hasn't necessarily completed data availability checks.
|
||||
///
|
||||
///
|
||||
/// It contains 2 variants:
|
||||
/// 1. `Available`: This enelope has been executed and also contains all data to consider it
|
||||
/// fully available.
|
||||
/// 2. `AvailabilityPending`: This envelope hasn't received all required blobs to consider it
|
||||
/// fully available.
|
||||
pub enum ExecutedEnvelope<E: EthSpec> {
|
||||
Available(AvailableExecutedEnvelope<E>),
|
||||
// TODO(gloas) implement availability pending
|
||||
AvailabilityPending(),
|
||||
}
|
||||
|
||||
impl<E: EthSpec> ExecutedEnvelope<E> {
|
||||
pub fn new(
|
||||
envelope: MaybeAvailableEnvelope<E>,
|
||||
import_data: EnvelopeImportData<E>,
|
||||
payload_verification_outcome: PayloadVerificationOutcome,
|
||||
) -> Self {
|
||||
match envelope {
|
||||
MaybeAvailableEnvelope::Available(available_envelope) => {
|
||||
Self::Available(AvailableExecutedEnvelope::new(
|
||||
available_envelope,
|
||||
import_data,
|
||||
payload_verification_outcome,
|
||||
))
|
||||
}
|
||||
// TODO(gloas) implement availability pending
|
||||
MaybeAvailableEnvelope::AvailabilityPending {
|
||||
block_hash: _,
|
||||
envelope: _,
|
||||
} => Self::AvailabilityPending(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A payload envelope that has completed all payload processing checks including verification
|
||||
/// by an EL client **and** has all requisite blob data to be imported into fork choice.
|
||||
pub struct AvailableExecutedEnvelope<E: EthSpec> {
|
||||
pub envelope: AvailableEnvelope<E>,
|
||||
pub import_data: EnvelopeImportData<E>,
|
||||
pub payload_verification_outcome: PayloadVerificationOutcome,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> AvailableExecutedEnvelope<E> {
|
||||
pub fn new(
|
||||
envelope: AvailableEnvelope<E>,
|
||||
import_data: EnvelopeImportData<E>,
|
||||
payload_verification_outcome: PayloadVerificationOutcome,
|
||||
) -> Self {
|
||||
Self {
|
||||
envelope,
|
||||
import_data,
|
||||
payload_verification_outcome,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum EnvelopeError {
|
||||
/// The envelope's block root is unknown.
|
||||
BlockRootUnknown {
|
||||
block_root: Hash256,
|
||||
},
|
||||
/// The signature is invalid.
|
||||
BadSignature,
|
||||
/// The builder index doesn't match the committed bid
|
||||
BuilderIndexMismatch {
|
||||
committed_bid: u64,
|
||||
envelope: u64,
|
||||
},
|
||||
// The envelope slot doesn't match the block
|
||||
SlotMismatch {
|
||||
block: Slot,
|
||||
envelope: Slot,
|
||||
},
|
||||
// The validator index is unknown
|
||||
UnknownValidator {
|
||||
builder_index: u64,
|
||||
},
|
||||
// The block hash doesn't match the committed bid
|
||||
BlockHashMismatch {
|
||||
committed_bid: ExecutionBlockHash,
|
||||
envelope: ExecutionBlockHash,
|
||||
},
|
||||
// The block's proposer_index does not match the locally computed proposer
|
||||
IncorrectBlockProposer {
|
||||
block: u64,
|
||||
local_shuffling: u64,
|
||||
},
|
||||
// The slot belongs to a block that is from a slot prior than
|
||||
// the most recently finalized slot
|
||||
PriorToFinalization {
|
||||
payload_slot: Slot,
|
||||
latest_finalized_slot: Slot,
|
||||
},
|
||||
// Some Beacon Chain Error
|
||||
BeaconChainError(Arc<BeaconChainError>),
|
||||
// Some Beacon State error
|
||||
BeaconStateError(BeaconStateError),
|
||||
// Some BlockProcessingError (for electra operations)
|
||||
BlockProcessingError(BlockProcessingError),
|
||||
// Some EnvelopeProcessingError
|
||||
EnvelopeProcessingError(EnvelopeProcessingError),
|
||||
// Error verifying the execution payload
|
||||
ExecutionPayloadError(ExecutionPayloadError),
|
||||
// An error from block-level checks reused during envelope import
|
||||
BlockError(BlockError),
|
||||
// Internal error
|
||||
InternalError(String),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for EnvelopeError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{:?}", self)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<BeaconChainError> for EnvelopeError {
|
||||
fn from(e: BeaconChainError) -> Self {
|
||||
EnvelopeError::BeaconChainError(Arc::new(e))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ExecutionPayloadError> for EnvelopeError {
|
||||
fn from(e: ExecutionPayloadError) -> Self {
|
||||
EnvelopeError::ExecutionPayloadError(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<BeaconStateError> for EnvelopeError {
|
||||
fn from(e: BeaconStateError) -> Self {
|
||||
EnvelopeError::BeaconStateError(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DBError> for EnvelopeError {
|
||||
fn from(e: DBError) -> Self {
|
||||
EnvelopeError::BeaconChainError(Arc::new(BeaconChainError::DBError(e)))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<BlockError> for EnvelopeError {
|
||||
fn from(e: BlockError) -> Self {
|
||||
EnvelopeError::BlockError(e)
|
||||
}
|
||||
}
|
||||
|
||||
/// Pull errors up from EnvelopeProcessingError to EnvelopeError
|
||||
impl From<EnvelopeProcessingError> for EnvelopeError {
|
||||
fn from(e: EnvelopeProcessingError) -> Self {
|
||||
match e {
|
||||
EnvelopeProcessingError::BadSignature => EnvelopeError::BadSignature,
|
||||
EnvelopeProcessingError::BeaconStateError(e) => EnvelopeError::BeaconStateError(e),
|
||||
EnvelopeProcessingError::BlockHashMismatch {
|
||||
committed_bid,
|
||||
envelope,
|
||||
} => EnvelopeError::BlockHashMismatch {
|
||||
committed_bid,
|
||||
envelope,
|
||||
},
|
||||
EnvelopeProcessingError::BlockProcessingError(e) => {
|
||||
EnvelopeError::BlockProcessingError(e)
|
||||
}
|
||||
e => EnvelopeError::EnvelopeProcessingError(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
#[instrument(skip_all, level = "debug", fields(beacon_block_root = %beacon_block_root))]
|
||||
/// Load state from store given a known state root and block root.
|
||||
/// Use this when the proto block has already been looked up from fork choice.
|
||||
pub(crate) fn load_snapshot_from_state_root<T: BeaconChainTypes>(
|
||||
beacon_block_root: Hash256,
|
||||
block_state_root: Hash256,
|
||||
store: &BeaconStore<T>,
|
||||
) -> Result<EnvelopeProcessingSnapshot<T::EthSpec>, EnvelopeError> {
|
||||
// TODO(EIP-7732): add metrics here
|
||||
|
||||
// We can use `get_hot_state` here rather than `get_advanced_hot_state` because the envelope
|
||||
// must be from the same slot as its block (so no advance is required).
|
||||
let cache_state = true;
|
||||
let state = store
|
||||
.get_hot_state(&block_state_root, cache_state)
|
||||
.map_err(EnvelopeError::from)?
|
||||
.ok_or_else(|| {
|
||||
BeaconChainError::DBInconsistent(format!(
|
||||
"Missing state for envelope block {block_state_root:?}",
|
||||
))
|
||||
})?;
|
||||
|
||||
Ok(EnvelopeProcessingSnapshot {
|
||||
pre_state: state,
|
||||
state_root: block_state_root,
|
||||
beacon_block_root,
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = "debug", fields(beacon_block_root = %envelope.beacon_block_root()))]
|
||||
pub(crate) fn load_snapshot<T: BeaconChainTypes>(
|
||||
envelope: &SignedExecutionPayloadEnvelope<T::EthSpec>,
|
||||
canonical_head: &CanonicalHead<T>,
|
||||
store: &BeaconStore<T>,
|
||||
) -> Result<EnvelopeProcessingSnapshot<T::EthSpec>, EnvelopeError> {
|
||||
// Reject any envelope if its block is not known to fork choice.
|
||||
//
|
||||
// A block that is not in fork choice is either:
|
||||
//
|
||||
// - Not yet imported: we should reject this envelope because we should only import it after
|
||||
// its parent block has been fully imported.
|
||||
// - Pre-finalized: if the parent block is _prior_ to finalization, we should ignore the
|
||||
// envelope because it will revert finalization. Note that the finalized block is stored in
|
||||
// fork choice, so we will not reject any child of the finalized block (this is relevant
|
||||
// during genesis).
|
||||
|
||||
let fork_choice_read_lock = canonical_head.fork_choice_read_lock();
|
||||
let beacon_block_root = envelope.beacon_block_root();
|
||||
let Some(proto_beacon_block) = fork_choice_read_lock.get_block(&beacon_block_root) else {
|
||||
return Err(EnvelopeError::BlockRootUnknown {
|
||||
block_root: beacon_block_root,
|
||||
});
|
||||
};
|
||||
drop(fork_choice_read_lock);
|
||||
|
||||
load_snapshot_from_state_root::<T>(beacon_block_root, proto_beacon_block.state_root, store)
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use execution_layer::{NewPayloadRequest, NewPayloadRequestGloas};
|
||||
use fork_choice::PayloadVerificationStatus;
|
||||
use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash;
|
||||
use tracing::warn;
|
||||
use types::{SignedBeaconBlock, SignedExecutionPayloadEnvelope};
|
||||
|
||||
use crate::{
|
||||
BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer,
|
||||
execution_payload::notify_new_payload, payload_envelope_verification::EnvelopeError,
|
||||
};
|
||||
|
||||
/// Used to await the result of executing payload with a remote EE.
|
||||
pub struct PayloadNotifier<T: BeaconChainTypes> {
|
||||
pub chain: Arc<BeaconChain<T>>,
|
||||
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
payload_verification_status: Option<PayloadVerificationStatus>,
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> PayloadNotifier<T> {
|
||||
pub fn new(
|
||||
chain: Arc<BeaconChain<T>>,
|
||||
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
notify_execution_layer: NotifyExecutionLayer,
|
||||
) -> Result<Self, EnvelopeError> {
|
||||
let payload_verification_status = {
|
||||
let payload_message = &envelope.message;
|
||||
|
||||
// TODO(gloas) re-asses if optimistic syncing works similarly post-gloas
|
||||
match notify_execution_layer {
|
||||
NotifyExecutionLayer::No if chain.config.optimistic_finalized_sync => {
|
||||
let new_payload_request = Self::build_new_payload_request(&envelope, &block)?;
|
||||
if let Err(e) = new_payload_request.perform_optimistic_sync_verifications() {
|
||||
warn!(
|
||||
block_number = ?payload_message.payload.block_number,
|
||||
info = "you can silence this warning with --disable-optimistic-finalized-sync",
|
||||
error = ?e,
|
||||
"Falling back to slow block hash verification"
|
||||
);
|
||||
None
|
||||
} else {
|
||||
Some(PayloadVerificationStatus::Optimistic)
|
||||
}
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
chain,
|
||||
envelope,
|
||||
block,
|
||||
payload_verification_status,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn notify_new_payload(self) -> Result<PayloadVerificationStatus, BlockError> {
|
||||
if let Some(precomputed_status) = self.payload_verification_status {
|
||||
Ok(precomputed_status)
|
||||
} else {
|
||||
let block_root = self.envelope.message.beacon_block_root;
|
||||
let request = Self::build_new_payload_request(&self.envelope, &self.block)?;
|
||||
notify_new_payload(&self.chain, block_root, self.envelope.slot(), request).await
|
||||
}
|
||||
}
|
||||
|
||||
fn build_new_payload_request<'a>(
|
||||
envelope: &'a SignedExecutionPayloadEnvelope<T::EthSpec>,
|
||||
block: &'a SignedBeaconBlock<T::EthSpec>,
|
||||
) -> Result<NewPayloadRequest<'a, T::EthSpec>, BlockError> {
|
||||
let bid = &block
|
||||
.message()
|
||||
.body()
|
||||
.signed_execution_payload_bid()
|
||||
.map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))?
|
||||
.message;
|
||||
|
||||
let versioned_hashes = bid
|
||||
.blob_kzg_commitments
|
||||
.iter()
|
||||
.map(kzg_commitment_to_versioned_hash)
|
||||
.collect();
|
||||
|
||||
Ok(NewPayloadRequest::Gloas(NewPayloadRequestGloas {
|
||||
execution_payload: &envelope.message.payload,
|
||||
versioned_hashes,
|
||||
parent_beacon_block_root: block.message().parent_root(),
|
||||
execution_requests: &envelope.message.execution_requests,
|
||||
}))
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
#![cfg(not(debug_assertions))]
|
||||
|
||||
// TODO(gloas) we probably need similar test for payload envelope verification
|
||||
use beacon_chain::block_verification_types::{AsBlock, ExecutedBlock, RpcBlock};
|
||||
use beacon_chain::data_availability_checker::{AvailabilityCheckError, AvailableBlockData};
|
||||
use beacon_chain::data_column_verification::CustodyDataColumn;
|
||||
|
||||
@@ -532,6 +532,16 @@ pub static SYNC_RPC_REQUEST_TIME: LazyLock<Result<HistogramVec>> = LazyLock::new
|
||||
)
|
||||
});
|
||||
|
||||
/*
|
||||
* Execution Payload Envelope Delay Metrics
|
||||
*/
|
||||
pub static ENVELOPE_DELAY_GOSSIP: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
|
||||
try_create_int_gauge(
|
||||
"payload_envelope_delay_gossip",
|
||||
"The first time we see this payload envelope from gossip as a delay from the start of the slot",
|
||||
)
|
||||
});
|
||||
|
||||
/*
|
||||
* Block Delay Metrics
|
||||
*/
|
||||
|
||||
@@ -4,7 +4,6 @@ use crate::{
|
||||
service::NetworkMessage,
|
||||
sync::SyncMessage,
|
||||
};
|
||||
use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob};
|
||||
use beacon_chain::block_verification_types::AsBlock;
|
||||
use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
|
||||
use beacon_chain::store::Error;
|
||||
@@ -19,6 +18,10 @@ use beacon_chain::{
|
||||
sync_committee_verification::{self, Error as SyncCommitteeError},
|
||||
validator_monitor::{get_block_delay_ms, get_slot_delay_ms},
|
||||
};
|
||||
use beacon_chain::{
|
||||
blob_verification::{GossipBlobError, GossipVerifiedBlob},
|
||||
payload_envelope_verification::gossip_verified_envelope::GossipVerifiedEnvelope,
|
||||
};
|
||||
use beacon_processor::{Work, WorkEvent};
|
||||
use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource};
|
||||
use logging::crit;
|
||||
@@ -3248,25 +3251,178 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn process_gossip_execution_payload(
|
||||
pub async fn process_gossip_execution_payload_envelope(
|
||||
self: Arc<Self>,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||
seen_timestamp: Duration,
|
||||
) {
|
||||
if let Some(gossip_verified_envelope) = self
|
||||
.process_gossip_unverified_execution_payload_envelope(
|
||||
message_id,
|
||||
peer_id,
|
||||
envelope.clone(),
|
||||
seen_timestamp,
|
||||
)
|
||||
.await
|
||||
{
|
||||
let beacon_block_root = gossip_verified_envelope.signed_envelope.beacon_block_root();
|
||||
|
||||
Span::current().record("beacon_block_root", beacon_block_root.to_string());
|
||||
|
||||
// TODO(gloas) in process_gossip_block here we check_and_insert on the duplicate cache
|
||||
// before calling gossip_verified_block. We need this to ensure we dont try to execute the
|
||||
// payload multiple times.
|
||||
|
||||
self.process_gossip_verified_execution_payload_envelope(
|
||||
peer_id,
|
||||
gossip_verified_envelope,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_gossip_unverified_execution_payload_envelope(
|
||||
self: &Arc<Self>,
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
execution_payload: SignedExecutionPayloadEnvelope<T::EthSpec>,
|
||||
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||
seen_duration: Duration,
|
||||
) -> Option<GossipVerifiedEnvelope<T>> {
|
||||
let envelope_delay =
|
||||
get_slot_delay_ms(seen_duration, envelope.slot(), &self.chain.slot_clock);
|
||||
|
||||
let verification_result = self
|
||||
.chain
|
||||
.clone()
|
||||
.verify_envelope_for_gossip(envelope.clone())
|
||||
.await;
|
||||
|
||||
let verified_envelope = match verification_result {
|
||||
Ok(verified_envelope) => {
|
||||
metrics::set_gauge(
|
||||
&metrics::ENVELOPE_DELAY_GOSSIP,
|
||||
envelope_delay.as_millis() as i64,
|
||||
);
|
||||
|
||||
// Write the time the envelope was observed into the delay cache.
|
||||
self.chain.envelope_times_cache.write().set_time_observed(
|
||||
verified_envelope.signed_envelope.beacon_block_root(),
|
||||
envelope.slot(),
|
||||
seen_duration,
|
||||
Some(peer_id.to_string()),
|
||||
);
|
||||
|
||||
info!(
|
||||
slot = %verified_envelope.signed_envelope.slot(),
|
||||
root = ?verified_envelope.signed_envelope.beacon_block_root(),
|
||||
"New envelope received"
|
||||
);
|
||||
|
||||
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
|
||||
|
||||
verified_envelope
|
||||
}
|
||||
// TODO(gloas) penalize peers accordingly
|
||||
Err(_) => return None,
|
||||
};
|
||||
|
||||
// TODO(gloas) do we need to register the payload with monitored validators?
|
||||
|
||||
let envelope_slot = verified_envelope.signed_envelope.slot();
|
||||
let beacon_block_root = verified_envelope.signed_envelope.beacon_block_root();
|
||||
match self.chain.slot() {
|
||||
// We only need to do a simple check about the envelope slot vs the current slot beacuse
|
||||
// `verify_envelope_for_gossip` already ensuresthat the envelope slot is within tolerance
|
||||
// for envelope imports.
|
||||
Ok(current_slot) if envelope_slot > current_slot => {
|
||||
warn!(
|
||||
?envelope_slot,
|
||||
?beacon_block_root,
|
||||
msg = "if this happens consistently, check system clock",
|
||||
"envelope arrived early"
|
||||
);
|
||||
|
||||
// TODO(gloas) update metrics to note how early the envelope arrived
|
||||
|
||||
let inner_self = self.clone();
|
||||
let _process_fn = Box::pin(async move {
|
||||
inner_self
|
||||
.process_gossip_verified_execution_payload_envelope(
|
||||
peer_id,
|
||||
verified_envelope,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
|
||||
// TODO(gloas) send to reprocess queue
|
||||
None
|
||||
}
|
||||
Ok(_) => Some(verified_envelope),
|
||||
Err(e) => {
|
||||
error!(
|
||||
error = ?e,
|
||||
%envelope_slot,
|
||||
?beacon_block_root,
|
||||
location = "envelope gossip",
|
||||
"Failed to defer envelope import"
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_gossip_verified_execution_payload_envelope(
|
||||
self: Arc<Self>,
|
||||
peer_id: PeerId,
|
||||
verified_envelope: GossipVerifiedEnvelope<T>,
|
||||
) {
|
||||
// TODO(EIP-7732): Implement proper execution payload envelope gossip processing.
|
||||
// This should integrate with the envelope_verification.rs module once it's implemented.
|
||||
let _processing_start_time = Instant::now();
|
||||
let beacon_block_root = verified_envelope.signed_envelope.beacon_block_root();
|
||||
|
||||
trace!(
|
||||
%peer_id,
|
||||
builder_index = execution_payload.message.builder_index,
|
||||
slot = %execution_payload.message.slot,
|
||||
beacon_block_root = %execution_payload.message.beacon_block_root,
|
||||
"Processing execution payload envelope"
|
||||
);
|
||||
let result = self
|
||||
.chain
|
||||
.process_execution_payload_envelope(
|
||||
beacon_block_root,
|
||||
verified_envelope,
|
||||
NotifyExecutionLayer::Yes,
|
||||
BlockImportSource::Gossip,
|
||||
|| Ok(()),
|
||||
)
|
||||
.await;
|
||||
|
||||
// For now, ignore all envelopes since verification is not implemented
|
||||
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
|
||||
// TODO(gloas) metrics
|
||||
// register_process_result_metrics(&result, metrics::BlockSource::Gossip, "envelope");
|
||||
|
||||
match &result {
|
||||
Ok(AvailabilityProcessingStatus::Imported(block_root)) => {
|
||||
// TODO(gloas) do we need to send a `PayloadImported` event to the reporcess queue?
|
||||
debug!(
|
||||
?block_root,
|
||||
%peer_id,
|
||||
"Gossipsub envelope processed"
|
||||
);
|
||||
|
||||
// TODO(gloas) do we need to recompute head?
|
||||
// should canonical_head return the block and the payload now?
|
||||
self.chain.recompute_head_at_current_slot().await;
|
||||
|
||||
// TODO(gloas) metrics
|
||||
}
|
||||
Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => {
|
||||
trace!(
|
||||
%slot,
|
||||
%block_root,
|
||||
"Processed envelope, waiting for other components"
|
||||
)
|
||||
}
|
||||
|
||||
Err(_) => {
|
||||
// TODO(gloas) implement peer penalties
|
||||
warn!("process_gossip_verified_execution_payload_envelope_failed")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn process_gossip_execution_payload_bid(
|
||||
|
||||
@@ -429,11 +429,17 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
message_id: MessageId,
|
||||
peer_id: PeerId,
|
||||
execution_payload: Box<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||
seen_timestamp: Duration,
|
||||
) -> Result<(), Error<T::EthSpec>> {
|
||||
let processor = self.clone();
|
||||
let process_fn = async move {
|
||||
processor
|
||||
.process_gossip_execution_payload(message_id, peer_id, *execution_payload)
|
||||
.process_gossip_execution_payload_envelope(
|
||||
message_id,
|
||||
peer_id,
|
||||
Arc::new(*execution_payload),
|
||||
seen_timestamp,
|
||||
)
|
||||
.await
|
||||
};
|
||||
|
||||
|
||||
@@ -493,6 +493,7 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
message_id,
|
||||
peer_id,
|
||||
signed_execution_payload_envelope,
|
||||
timestamp_now(),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user