Gloas payload envelope processing (#8806)

Co-Authored-By: Eitan Seri- Levi <eserilev@gmail.com>

Co-Authored-By: Eitan Seri-Levi <eserilev@ucsc.edu>

Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>

Co-Authored-By: Michael Sproul <michael@sigmaprime.io>

Co-Authored-By: Michael Sproul <michaelsproul@users.noreply.github.com>
This commit is contained in:
Eitan Seri-Levi
2026-03-09 14:23:34 +09:00
committed by GitHub
parent 537c2ba8b3
commit 7dab32dd16
19 changed files with 1813 additions and 127 deletions

View File

@@ -4,9 +4,7 @@ use crate::attestation_verification::{
batch_verify_unaggregated_attestations,
};
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckCaches};
use crate::beacon_proposer_cache::{
BeaconProposerCache, EpochBlockProposers, ensure_state_can_determine_proposers_for_epoch,
};
use crate::beacon_proposer_cache::{BeaconProposerCache, EpochBlockProposers};
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use crate::block_times_cache::BlockTimesCache;
use crate::block_verification::{
@@ -26,6 +24,7 @@ use crate::data_availability_checker::{
};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::early_attester_cache::EarlyAttesterCache;
use crate::envelope_times_cache::EnvelopeTimesCache;
use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::events::ServerSentEventHandler;
use crate::execution_payload::{NotifyExecutionLayer, PreparePayloadHandle, get_execution_payload};
@@ -66,7 +65,6 @@ use crate::sync_committee_verification::{
};
use crate::validator_monitor::{
HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS, ValidatorMonitor, get_slot_delay_ms,
timestamp_now,
};
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::{
@@ -462,6 +460,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub early_attester_cache: EarlyAttesterCache<T::EthSpec>,
/// A cache used to keep track of various block timings.
pub block_times_cache: Arc<RwLock<BlockTimesCache>>,
/// A cache used to keep track of various envelope timings.
pub envelope_times_cache: Arc<RwLock<EnvelopeTimesCache>>,
/// A cache used to track pre-finalization block roots for quick rejection.
pub pre_finalization_block_cache: PreFinalizationBlockCache,
/// A cache used to produce light_client server messages
@@ -4042,23 +4042,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// See https://github.com/sigp/lighthouse/issues/2028
let (_, signed_block, block_data) = signed_block.deconstruct();
match self.get_blobs_or_columns_store_op(block_root, signed_block.slot(), block_data) {
Ok(Some(blobs_or_columns_store_op)) => {
ops.push(blobs_or_columns_store_op);
}
Ok(None) => {}
Err(e) => {
error!(
msg = "Restoring fork choice from disk",
error = &e,
?block_root,
"Failed to store data columns into the database"
);
return Err(self
.handle_import_block_db_write_error(fork_choice)
.err()
.unwrap_or(BlockError::InternalError(e)));
}
if let Some(blobs_or_columns_store_op) =
self.get_blobs_or_columns_store_op(block_root, signed_block.slot(), block_data)
{
ops.push(blobs_or_columns_store_op);
}
let block = signed_block.message();
@@ -4088,7 +4075,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// We're declaring the block "imported" at this point, since fork choice and the DB know
// about it.
let block_time_imported = timestamp_now();
let block_time_imported = self.slot_clock.now_duration().unwrap_or(Duration::MAX);
// compute state proofs for light client updates before inserting the state into the
// snapshot cache.
@@ -4157,7 +4144,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
/// Check block's consistentency with any configured weak subjectivity checkpoint.
fn check_block_against_weak_subjectivity_checkpoint(
pub(crate) fn check_block_against_weak_subjectivity_checkpoint(
&self,
block: BeaconBlockRef<T::EthSpec>,
block_root: Hash256,
@@ -6407,6 +6394,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// sync anyway).
self.naive_aggregation_pool.write().prune(slot);
self.block_times_cache.write().prune(slot);
self.envelope_times_cache.write().prune(slot);
// Don't run heavy-weight tasks during sync.
if self.best_slot() + MAX_PER_SLOT_FORK_CHOICE_DISTANCE < slot {
@@ -6466,62 +6454,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
accessor: impl Fn(&EpochBlockProposers) -> Result<V, BeaconChainError>,
state_provider: impl FnOnce() -> Result<(Hash256, BeaconState<T::EthSpec>), E>,
) -> Result<V, E> {
let cache_entry = self
.beacon_proposer_cache
.lock()
.get_or_insert_key(proposal_epoch, shuffling_decision_block);
// If the cache entry is not initialised, run the code to initialise it inside a OnceCell.
// This prevents duplication of work across multiple threads.
//
// If it is already initialised, then `get_or_try_init` will return immediately without
// executing the initialisation code at all.
let epoch_block_proposers = cache_entry.get_or_try_init(|| {
// Fetch the state on-demand if the required epoch was missing from the cache.
// If the caller wants to not compute the state they must return an error here and then
// catch it at the call site.
let (state_root, mut state) = state_provider()?;
// Ensure the state can compute proposer duties for `epoch`.
ensure_state_can_determine_proposers_for_epoch(
&mut state,
state_root,
proposal_epoch,
&self.spec,
)?;
// Sanity check the state.
let latest_block_root = state.get_latest_block_root(state_root);
let state_decision_block_root = state.proposer_shuffling_decision_root_at_epoch(
proposal_epoch,
latest_block_root,
&self.spec,
)?;
if state_decision_block_root != shuffling_decision_block {
return Err(Error::ProposerCacheIncorrectState {
state_decision_block_root,
requested_decision_block_root: shuffling_decision_block,
}
.into());
}
let proposers = state.get_beacon_proposer_indices(proposal_epoch, &self.spec)?;
// Use fork_at_epoch rather than the state's fork, because post-Fulu we may not have
// advanced the state completely into the new epoch.
let fork = self.spec.fork_at_epoch(proposal_epoch);
debug!(
?shuffling_decision_block,
epoch = %proposal_epoch,
"Priming proposer shuffling cache"
);
Ok::<_, E>(EpochBlockProposers::new(proposal_epoch, fork, proposers))
})?;
// Run the accessor function on the computed epoch proposers.
accessor(epoch_block_proposers).map_err(Into::into)
crate::beacon_proposer_cache::with_proposer_cache(
&self.beacon_proposer_cache,
shuffling_decision_block,
proposal_epoch,
accessor,
state_provider,
&self.spec,
)
}
/// Runs the `map_fn` with the committee cache for `shuffling_epoch` from the chain with head
@@ -7197,16 +7137,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_root: Hash256,
block_slot: Slot,
block_data: AvailableBlockData<T::EthSpec>,
) -> Result<Option<StoreOp<'_, T::EthSpec>>, String> {
) -> Option<StoreOp<'_, T::EthSpec>> {
match block_data {
AvailableBlockData::NoData => Ok(None),
AvailableBlockData::NoData => None,
AvailableBlockData::Blobs(blobs) => {
debug!(
%block_root,
count = blobs.len(),
"Writing blobs to store"
);
Ok(Some(StoreOp::PutBlobs(block_root, blobs)))
Some(StoreOp::PutBlobs(block_root, blobs))
}
AvailableBlockData::DataColumns(mut data_columns) => {
let columns_to_custody = self.custody_columns_for_epoch(Some(
@@ -7222,7 +7162,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
count = data_columns.len(),
"Writing data columns to store"
);
Ok(Some(StoreOp::PutDataColumns(block_root, data_columns)))
Some(StoreOp::PutDataColumns(block_root, data_columns))
}
}
}

View File

@@ -12,12 +12,13 @@ use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
use fork_choice::ExecutionStatus;
use lru::LruCache;
use once_cell::sync::OnceCell;
use parking_lot::Mutex;
use safe_arith::SafeArith;
use smallvec::SmallVec;
use state_processing::state_advance::partial_state_advance;
use std::num::NonZeroUsize;
use std::sync::Arc;
use tracing::instrument;
use tracing::{debug, instrument};
use typenum::Unsigned;
use types::new_non_zero_usize;
use types::{BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, Hash256, Slot};
@@ -164,6 +165,82 @@ impl BeaconProposerCache {
}
}
/// Access the proposer cache, computing and caching the proposers if necessary.
///
/// This is a free function that operates on references to the cache and spec, decoupled from
/// `BeaconChain`. The `accessor` is called with the cached `EpochBlockProposers` for the given
/// `(proposal_epoch, shuffling_decision_block)` key. If the cache entry is missing, the
/// `state_provider` closure is called to produce a state which is then used to compute and
/// cache the proposers.
pub fn with_proposer_cache<Spec, V, Err>(
beacon_proposer_cache: &Mutex<BeaconProposerCache>,
shuffling_decision_block: Hash256,
proposal_epoch: Epoch,
accessor: impl Fn(&EpochBlockProposers) -> Result<V, BeaconChainError>,
state_provider: impl FnOnce() -> Result<(Hash256, BeaconState<Spec>), Err>,
spec: &ChainSpec,
) -> Result<V, Err>
where
Spec: EthSpec,
Err: From<BeaconChainError> + From<BeaconStateError>,
{
let cache_entry = beacon_proposer_cache
.lock()
.get_or_insert_key(proposal_epoch, shuffling_decision_block);
// If the cache entry is not initialised, run the code to initialise it inside a OnceCell.
// This prevents duplication of work across multiple threads.
//
// If it is already initialised, then `get_or_try_init` will return immediately without
// executing the initialisation code at all.
let epoch_block_proposers = cache_entry.get_or_try_init(|| {
// Fetch the state on-demand if the required epoch was missing from the cache.
// If the caller wants to not compute the state they must return an error here and then
// catch it at the call site.
let (state_root, mut state) = state_provider()?;
// Ensure the state can compute proposer duties for `epoch`.
ensure_state_can_determine_proposers_for_epoch(
&mut state,
state_root,
proposal_epoch,
spec,
)?;
// Sanity check the state.
let latest_block_root = state.get_latest_block_root(state_root);
let state_decision_block_root = state.proposer_shuffling_decision_root_at_epoch(
proposal_epoch,
latest_block_root,
spec,
)?;
if state_decision_block_root != shuffling_decision_block {
return Err(BeaconChainError::ProposerCacheIncorrectState {
state_decision_block_root,
requested_decision_block_root: shuffling_decision_block,
}
.into());
}
let proposers = state.get_beacon_proposer_indices(proposal_epoch, spec)?;
// Use fork_at_epoch rather than the state's fork, because post-Fulu we may not have
// advanced the state completely into the new epoch.
let fork = spec.fork_at_epoch(proposal_epoch);
debug!(
?shuffling_decision_block,
epoch = %proposal_epoch,
"Priming proposer shuffling cache"
);
Ok::<_, Err>(EpochBlockProposers::new(proposal_epoch, fork, proposers))
})?;
// Run the accessor function on the computed epoch proposers.
accessor(epoch_block_proposers).map_err(Into::into)
}
/// Compute the proposer duties using the head state without cache.
///
/// Return:

View File

@@ -681,7 +681,8 @@ pub struct SignatureVerifiedBlock<T: BeaconChainTypes> {
}
/// Used to await the result of executing payload with an EE.
type PayloadVerificationHandle = JoinHandle<Option<Result<PayloadVerificationOutcome, BlockError>>>;
pub type PayloadVerificationHandle =
JoinHandle<Option<Result<PayloadVerificationOutcome, BlockError>>>;
/// A wrapper around a `SignedBeaconBlock` that indicates that this block is fully verified and
/// ready to import into the `BeaconChain`. The validation includes:
@@ -1357,7 +1358,7 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
/// verification must be done upstream (e.g., via a `SignatureVerifiedBlock`
///
/// Returns an error if the block is invalid, or if the block was unable to be verified.
#[instrument(skip_all, level = "debug")]
#[instrument(skip_all, level = "debug", fields(?block_root))]
pub fn from_signature_verified_components(
block: MaybeAvailableBlock<T::EthSpec>,
block_root: Hash256,

View File

@@ -1023,6 +1023,7 @@ where
)),
beacon_proposer_cache,
block_times_cache: <_>::default(),
envelope_times_cache: <_>::default(),
pre_finalization_block_cache: <_>::default(),
validator_pubkey_cache: RwLock::new(validator_pubkey_cache),
early_attester_cache: <_>::default(),

View File

@@ -0,0 +1,197 @@
//! This module provides the `EnvelopeTimesCache` which contains information regarding payload
//! envelope timings.
//!
//! This provides `BeaconChain` and associated functions with access to the timestamps of when a
//! payload envelope was observed, verified, executed, and imported.
//! This allows for better traceability and allows us to determine the root cause for why an
//! envelope was imported late.
//! This allows us to distinguish between the following scenarios:
//! - The envelope was observed late.
//! - Consensus verification was slow.
//! - Execution verification was slow.
//! - The DB write was slow.
use eth2::types::{Hash256, Slot};
use std::collections::HashMap;
use std::time::Duration;
type BlockRoot = Hash256;
#[derive(Clone, Default)]
pub struct EnvelopeTimestamps {
/// When the envelope was first observed (gossip or RPC).
pub observed: Option<Duration>,
/// When consensus verification (state transition) completed.
pub consensus_verified: Option<Duration>,
/// When execution layer verification started.
pub started_execution: Option<Duration>,
/// When execution layer verification completed.
pub executed: Option<Duration>,
/// When the envelope was imported into the DB.
pub imported: Option<Duration>,
}
/// Delay data for envelope processing, computed relative to the slot start time.
#[derive(Debug, Default)]
pub struct EnvelopeDelays {
/// Time after start of slot we saw the envelope.
pub observed: Option<Duration>,
/// The time it took to complete consensus verification of the envelope.
pub consensus_verification_time: Option<Duration>,
/// The time it took to complete execution verification of the envelope.
pub execution_time: Option<Duration>,
/// Time after execution until the envelope was imported.
pub imported: Option<Duration>,
}
impl EnvelopeDelays {
fn new(times: EnvelopeTimestamps, slot_start_time: Duration) -> EnvelopeDelays {
let observed = times
.observed
.and_then(|observed_time| observed_time.checked_sub(slot_start_time));
let consensus_verification_time = times
.consensus_verified
.and_then(|consensus_verified| consensus_verified.checked_sub(times.observed?));
let execution_time = times
.executed
.and_then(|executed| executed.checked_sub(times.started_execution?));
let imported = times
.imported
.and_then(|imported_time| imported_time.checked_sub(times.executed?));
EnvelopeDelays {
observed,
consensus_verification_time,
execution_time,
imported,
}
}
}
pub struct EnvelopeTimesCacheValue {
pub slot: Slot,
pub timestamps: EnvelopeTimestamps,
pub peer_id: Option<String>,
}
impl EnvelopeTimesCacheValue {
fn new(slot: Slot) -> Self {
EnvelopeTimesCacheValue {
slot,
timestamps: Default::default(),
peer_id: None,
}
}
}
#[derive(Default)]
pub struct EnvelopeTimesCache {
pub cache: HashMap<BlockRoot, EnvelopeTimesCacheValue>,
}
impl EnvelopeTimesCache {
/// Set the observation time for `block_root` to `timestamp` if `timestamp` is less than
/// any previous timestamp at which this envelope was observed.
pub fn set_time_observed(
&mut self,
block_root: BlockRoot,
slot: Slot,
timestamp: Duration,
peer_id: Option<String>,
) {
let entry = self
.cache
.entry(block_root)
.or_insert_with(|| EnvelopeTimesCacheValue::new(slot));
match entry.timestamps.observed {
Some(existing) if existing <= timestamp => {
// Existing timestamp is earlier, do nothing.
}
_ => {
entry.timestamps.observed = Some(timestamp);
entry.peer_id = peer_id;
}
}
}
/// Set the timestamp for `field` if that timestamp is less than any previously known value.
fn set_time_if_less(
&mut self,
block_root: BlockRoot,
slot: Slot,
field: impl Fn(&mut EnvelopeTimestamps) -> &mut Option<Duration>,
timestamp: Duration,
) {
let entry = self
.cache
.entry(block_root)
.or_insert_with(|| EnvelopeTimesCacheValue::new(slot));
let existing_timestamp = field(&mut entry.timestamps);
if existing_timestamp.is_none_or(|prev| timestamp < prev) {
*existing_timestamp = Some(timestamp);
}
}
pub fn set_time_consensus_verified(
&mut self,
block_root: BlockRoot,
slot: Slot,
timestamp: Duration,
) {
self.set_time_if_less(
block_root,
slot,
|timestamps| &mut timestamps.consensus_verified,
timestamp,
)
}
pub fn set_time_started_execution(
&mut self,
block_root: BlockRoot,
slot: Slot,
timestamp: Duration,
) {
self.set_time_if_less(
block_root,
slot,
|timestamps| &mut timestamps.started_execution,
timestamp,
)
}
pub fn set_time_executed(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
self.set_time_if_less(
block_root,
slot,
|timestamps| &mut timestamps.executed,
timestamp,
)
}
pub fn set_time_imported(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
self.set_time_if_less(
block_root,
slot,
|timestamps| &mut timestamps.imported,
timestamp,
)
}
pub fn get_envelope_delays(
&self,
block_root: BlockRoot,
slot_start_time: Duration,
) -> EnvelopeDelays {
if let Some(entry) = self.cache.get(&block_root) {
EnvelopeDelays::new(entry.timestamps.clone(), slot_start_time)
} else {
EnvelopeDelays::default()
}
}
/// Prune the cache to only store the most recent 2 epochs.
pub fn prune(&mut self, current_slot: Slot) {
self.cache
.retain(|_, entry| entry.slot > current_slot.saturating_sub(64_u64));
}
}

View File

@@ -25,7 +25,6 @@ use state_processing::per_block_processing::{
use std::sync::Arc;
use tokio::task::JoinHandle;
use tracing::{Instrument, debug_span, warn};
use tree_hash::TreeHash;
use types::execution::BlockProductionVersion;
use types::*;
@@ -109,12 +108,18 @@ impl<T: BeaconChainTypes> PayloadNotifier<T> {
if let Some(precomputed_status) = self.payload_verification_status {
Ok(precomputed_status)
} else {
notify_new_payload(&self.chain, self.block.message()).await
notify_new_payload(
&self.chain,
self.block.message().slot(),
self.block.message().parent_root(),
self.block.message().try_into()?,
)
.await
}
}
}
/// Verify that `execution_payload` contained by `block` is considered valid by an execution
/// Verify that `execution_payload` is considered valid by an execution
/// engine.
///
/// ## Specification
@@ -123,17 +128,21 @@ impl<T: BeaconChainTypes> PayloadNotifier<T> {
/// contains a few extra checks by running `partially_verify_execution_payload` first:
///
/// https://github.com/ethereum/consensus-specs/blob/v1.1.9/specs/bellatrix/beacon-chain.md#notify_new_payload
async fn notify_new_payload<T: BeaconChainTypes>(
pub async fn notify_new_payload<T: BeaconChainTypes>(
chain: &Arc<BeaconChain<T>>,
block: BeaconBlockRef<'_, T::EthSpec>,
slot: Slot,
parent_beacon_block_root: Hash256,
new_payload_request: NewPayloadRequest<'_, T::EthSpec>,
) -> Result<PayloadVerificationStatus, BlockError> {
let execution_layer = chain
.execution_layer
.as_ref()
.ok_or(ExecutionPayloadError::NoExecutionConnection)?;
let execution_block_hash = block.execution_payload()?.block_hash();
let new_payload_response = execution_layer.notify_new_payload(block.try_into()?).await;
let execution_block_hash = new_payload_request.execution_payload_ref().block_hash();
let new_payload_response = execution_layer
.notify_new_payload(new_payload_request.clone())
.await;
match new_payload_response {
Ok(status) => match status {
@@ -149,10 +158,7 @@ async fn notify_new_payload<T: BeaconChainTypes>(
?validation_error,
?latest_valid_hash,
?execution_block_hash,
root = ?block.tree_hash_root(),
graffiti = block.body().graffiti().as_utf8_lossy(),
proposer_index = block.proposer_index(),
slot = %block.slot(),
%slot,
method = "new_payload",
"Invalid execution payload"
);
@@ -175,11 +181,9 @@ async fn notify_new_payload<T: BeaconChainTypes>(
{
// This block has not yet been applied to fork choice, so the latest block that was
// imported to fork choice was the parent.
let latest_root = block.parent_root();
chain
.process_invalid_execution_payload(&InvalidationOperation::InvalidateMany {
head_block_root: latest_root,
head_block_root: parent_beacon_block_root,
always_invalidate_head: false,
latest_valid_ancestor: latest_valid_hash,
})
@@ -194,10 +198,7 @@ async fn notify_new_payload<T: BeaconChainTypes>(
warn!(
?validation_error,
?execution_block_hash,
root = ?block.tree_hash_root(),
graffiti = block.body().graffiti().as_utf8_lossy(),
proposer_index = block.proposer_index(),
slot = %block.slot(),
%slot,
method = "new_payload",
"Invalid execution payload block hash"
);

View File

@@ -165,13 +165,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
// Store the blobs or data columns too
if let Some(op) = self
.get_blobs_or_columns_store_op(block_root, block.slot(), block_data)
.map_err(|e| {
HistoricalBlockError::StoreError(StoreError::DBError {
message: format!("get_blobs_or_columns_store_op error {e:?}"),
})
})?
if let Some(op) =
self.get_blobs_or_columns_store_op(block_root, block.slot(), block_data)
{
blob_batch.extend(self.store.convert_to_kv_batch(vec![op])?);
}

View File

@@ -20,6 +20,7 @@ pub mod custody_context;
pub mod data_availability_checker;
pub mod data_column_verification;
mod early_attester_cache;
pub mod envelope_times_cache;
mod errors;
pub mod events;
pub mod execution_payload;
@@ -41,6 +42,7 @@ pub mod observed_block_producers;
pub mod observed_data_sidecars;
pub mod observed_operations;
mod observed_slashable;
pub mod payload_envelope_verification;
pub mod pending_payload_envelopes;
pub mod persisted_beacon_chain;
pub mod persisted_custody;

View File

@@ -21,6 +21,34 @@ pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_HIT_TOTAL: &st
pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_MISS_TOTAL: &str =
"validator_monitor_attestation_simulator_source_attester_miss_total";
/*
* Execution Payload Envelope Processing
*/
pub static ENVELOPE_PROCESSING_REQUESTS: LazyLock<Result<IntCounter>> = LazyLock::new(|| {
try_create_int_counter(
"payload_envelope_processing_requests_total",
"Count of payload envelopes submitted for processing",
)
});
pub static ENVELOPE_PROCESSING_SUCCESSES: LazyLock<Result<IntCounter>> = LazyLock::new(|| {
try_create_int_counter(
"payload_envelope_processing_successes_total",
"Count of payload envelopes processed without error",
)
});
pub static ENVELOPE_PROCESSING_TIMES: LazyLock<Result<Histogram>> = LazyLock::new(|| {
try_create_histogram(
"payload_envelope_processing_seconds",
"Full runtime of payload envelope processing",
)
});
pub static ENVELOPE_PROCESSING_DB_WRITE: LazyLock<Result<Histogram>> = LazyLock::new(|| {
try_create_histogram(
"payload_envelope_processing_db_write_seconds",
"Time spent writing a newly processed payload envelope and state to DB",
)
});
/*
* Block Processing
*/

View File

@@ -0,0 +1,105 @@
use std::sync::Arc;
use slot_clock::SlotClock;
use state_processing::{
VerifySignatures,
envelope_processing::{VerifyStateRoot, process_execution_payload_envelope},
};
use types::EthSpec;
use crate::{
BeaconChain, BeaconChainError, BeaconChainTypes, NotifyExecutionLayer,
PayloadVerificationOutcome,
block_verification::PayloadVerificationHandle,
payload_envelope_verification::{
EnvelopeError, EnvelopeImportData, MaybeAvailableEnvelope,
gossip_verified_envelope::GossipVerifiedEnvelope, load_snapshot_from_state_root,
payload_notifier::PayloadNotifier,
},
};
pub struct ExecutionPendingEnvelope<E: EthSpec> {
pub signed_envelope: MaybeAvailableEnvelope<E>,
pub import_data: EnvelopeImportData<E>,
pub payload_verification_handle: PayloadVerificationHandle,
}
impl<T: BeaconChainTypes> GossipVerifiedEnvelope<T> {
pub fn into_execution_pending_envelope(
self,
chain: &Arc<BeaconChain<T>>,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<ExecutionPendingEnvelope<T::EthSpec>, EnvelopeError> {
let signed_envelope = self.signed_envelope;
let envelope = &signed_envelope.message;
let payload = &envelope.payload;
// Define a future that will verify the execution payload with an execution engine.
//
// We do this as early as possible so that later parts of this function can run in parallel
// with the payload verification.
let payload_notifier = PayloadNotifier::new(
chain.clone(),
signed_envelope.clone(),
self.block.clone(),
notify_execution_layer,
)?;
let block_root = envelope.beacon_block_root;
let slot = self.block.slot();
let payload_verification_future = async move {
let chain = payload_notifier.chain.clone();
if let Some(started_execution) = chain.slot_clock.now_duration() {
chain
.envelope_times_cache
.write()
.set_time_started_execution(block_root, slot, started_execution);
}
let payload_verification_status = payload_notifier.notify_new_payload().await?;
Ok(PayloadVerificationOutcome {
payload_verification_status,
})
};
// Spawn the payload verification future as a new task, but don't wait for it to complete.
// The `payload_verification_future` will be awaited later to ensure verification completed
// successfully.
let payload_verification_handle = chain
.task_executor
.spawn_handle(
payload_verification_future,
"execution_payload_verification",
)
.ok_or(BeaconChainError::RuntimeShutdown)?;
let snapshot = if let Some(snapshot) = self.snapshot {
*snapshot
} else {
load_snapshot_from_state_root::<T>(block_root, self.block.state_root(), &chain.store)?
};
let mut state = snapshot.pre_state;
// All the state modifications are done in envelope_processing
process_execution_payload_envelope(
&mut state,
Some(snapshot.state_root),
&signed_envelope,
// verify signature already done for GossipVerifiedEnvelope
VerifySignatures::False,
VerifyStateRoot::True,
&chain.spec,
)?;
Ok(ExecutionPendingEnvelope {
signed_envelope: MaybeAvailableEnvelope::AvailabilityPending {
block_hash: payload.block_hash,
envelope: signed_envelope,
},
import_data: EnvelopeImportData {
block_root,
post_state: Box::new(state),
},
payload_verification_handle,
})
}
}

View File

@@ -0,0 +1,445 @@
use std::sync::Arc;
use educe::Educe;
use parking_lot::{Mutex, RwLock};
use store::DatabaseBlock;
use tracing::{Span, debug};
use types::{
ChainSpec, EthSpec, ExecutionPayloadBid, ExecutionPayloadEnvelope, Hash256, SignedBeaconBlock,
SignedExecutionPayloadEnvelope, Slot, consts::gloas::BUILDER_INDEX_SELF_BUILD,
};
use crate::{
BeaconChain, BeaconChainError, BeaconChainTypes, BeaconStore,
beacon_proposer_cache::{self, BeaconProposerCache},
canonical_head::CanonicalHead,
payload_envelope_verification::{
EnvelopeError, EnvelopeProcessingSnapshot, load_snapshot_from_state_root,
},
validator_pubkey_cache::ValidatorPubkeyCache,
};
/// Bundles only the dependencies needed for gossip verification of execution payload envelopes,
/// decoupling `GossipVerifiedEnvelope::new` from the full `BeaconChain`.
pub struct GossipVerificationContext<'a, T: BeaconChainTypes> {
pub canonical_head: &'a CanonicalHead<T>,
pub store: &'a BeaconStore<T>,
pub spec: &'a ChainSpec,
pub beacon_proposer_cache: &'a Mutex<BeaconProposerCache>,
pub validator_pubkey_cache: &'a RwLock<ValidatorPubkeyCache<T>>,
pub genesis_validators_root: Hash256,
}
/// Verify that an execution payload envelope is consistent with its beacon block
/// and execution bid.
pub(crate) fn verify_envelope_consistency<E: EthSpec>(
envelope: &ExecutionPayloadEnvelope<E>,
block: &SignedBeaconBlock<E>,
execution_bid: &ExecutionPayloadBid<E>,
latest_finalized_slot: Slot,
) -> Result<(), EnvelopeError> {
// Check that the envelope's slot isn't from a slot prior
// to the latest finalized slot.
if envelope.slot < latest_finalized_slot {
return Err(EnvelopeError::PriorToFinalization {
payload_slot: envelope.slot,
latest_finalized_slot,
});
}
// Check that the slot of the envelope matches the slot of the block.
if envelope.slot != block.slot() {
return Err(EnvelopeError::SlotMismatch {
block: block.slot(),
envelope: envelope.slot,
});
}
// Builder index matches committed bid.
if envelope.builder_index != execution_bid.builder_index {
return Err(EnvelopeError::BuilderIndexMismatch {
committed_bid: execution_bid.builder_index,
envelope: envelope.builder_index,
});
}
// The block hash should match the block hash of the execution bid.
if envelope.payload.block_hash != execution_bid.block_hash {
return Err(EnvelopeError::BlockHashMismatch {
committed_bid: execution_bid.block_hash,
envelope: envelope.payload.block_hash,
});
}
Ok(())
}
/// A wrapper around a `SignedExecutionPayloadEnvelope` that indicates it has been approved for re-gossiping on
/// the p2p network.
#[derive(Educe)]
#[educe(Debug(bound = "T: BeaconChainTypes"))]
pub struct GossipVerifiedEnvelope<T: BeaconChainTypes> {
pub signed_envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
pub block: Arc<SignedBeaconBlock<T::EthSpec>>,
pub snapshot: Option<Box<EnvelopeProcessingSnapshot<T::EthSpec>>>,
}
impl<T: BeaconChainTypes> GossipVerifiedEnvelope<T> {
pub fn new(
signed_envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
ctx: &GossipVerificationContext<'_, T>,
) -> Result<Self, EnvelopeError> {
let envelope = &signed_envelope.message;
let beacon_block_root = envelope.beacon_block_root;
// Check that we've seen the beacon block for this envelope and that it passes validation.
// TODO(EIP-7732): We might need some type of status table in order to differentiate between:
// If we have a block_processing_table, we could have a Processed(Bid, bool) state that is only
// entered post adding to fork choice. That way, we could potentially need only a single call to make
// sure the block is valid and to do all consequent checks with the bid
//
// 1. Blocks we haven't seen (IGNORE), and
// 2. Blocks we've seen that are invalid (REJECT).
//
// Presently these two cases are conflated.
let fork_choice_read_lock = ctx.canonical_head.fork_choice_read_lock();
let Some(proto_block) = fork_choice_read_lock.get_block(&beacon_block_root) else {
return Err(EnvelopeError::BlockRootUnknown {
block_root: beacon_block_root,
});
};
drop(fork_choice_read_lock);
let latest_finalized_slot = ctx
.canonical_head
.cached_head()
.finalized_checkpoint()
.epoch
.start_slot(T::EthSpec::slots_per_epoch());
// TODO(EIP-7732): check that we haven't seen another valid `SignedExecutionPayloadEnvelope`
// for this block root from this builder - envelope status table check
let block = match ctx.store.try_get_full_block(&beacon_block_root)? {
Some(DatabaseBlock::Full(block)) => Arc::new(block),
Some(DatabaseBlock::Blinded(_)) | None => {
return Err(EnvelopeError::from(BeaconChainError::MissingBeaconBlock(
beacon_block_root,
)));
}
};
let execution_bid = &block
.message()
.body()
.signed_execution_payload_bid()?
.message;
verify_envelope_consistency(envelope, &block, execution_bid, latest_finalized_slot)?;
// Verify the envelope signature.
//
// For self-built envelopes, we can use the proposer cache for the fork and the
// validator pubkey cache for the proposer's pubkey, avoiding a state load from disk.
// For external builder envelopes, we must load the state to access the builder registry.
let builder_index = envelope.builder_index;
let block_slot = envelope.slot;
let envelope_epoch = block_slot.epoch(T::EthSpec::slots_per_epoch());
// Since the payload's block is already guaranteed to be imported, the associated `proto_block.current_epoch_shuffling_id`
// already carries the correct `shuffling_decision_block`.
let proposer_shuffling_decision_block = proto_block
.current_epoch_shuffling_id
.shuffling_decision_block;
let (signature_is_valid, opt_snapshot) = if builder_index == BUILDER_INDEX_SELF_BUILD {
// Fast path: self-built envelopes can be verified without loading the state.
let mut opt_snapshot = None;
let proposer = beacon_proposer_cache::with_proposer_cache(
ctx.beacon_proposer_cache,
proposer_shuffling_decision_block,
envelope_epoch,
|proposers| proposers.get_slot::<T::EthSpec>(block_slot),
|| {
debug!(
%beacon_block_root,
"Proposer shuffling cache miss for envelope verification"
);
let snapshot = load_snapshot_from_state_root::<T>(
beacon_block_root,
proto_block.state_root,
ctx.store,
)?;
opt_snapshot = Some(Box::new(snapshot.clone()));
Ok::<_, EnvelopeError>((snapshot.state_root, snapshot.pre_state))
},
ctx.spec,
)?;
let expected_proposer = proposer.index;
let fork = proposer.fork;
if block.message().proposer_index() != expected_proposer as u64 {
return Err(EnvelopeError::IncorrectBlockProposer {
proposer_index: block.message().proposer_index(),
local_shuffling: expected_proposer as u64,
});
}
let pubkey_cache = ctx.validator_pubkey_cache.read();
let pubkey = pubkey_cache
.get(block.message().proposer_index() as usize)
.ok_or_else(|| EnvelopeError::UnknownValidator {
proposer_index: block.message().proposer_index(),
})?;
let is_valid = signed_envelope.verify_signature(
pubkey,
&fork,
ctx.genesis_validators_root,
ctx.spec,
);
(is_valid, opt_snapshot)
} else {
// TODO(gloas) if we implement a builder pubkey cache, we'll need to use it here.
// External builder: must load the state to get the builder pubkey.
let snapshot = load_snapshot_from_state_root::<T>(
beacon_block_root,
proto_block.state_root,
ctx.store,
)?;
let is_valid =
signed_envelope.verify_signature_with_state(&snapshot.pre_state, ctx.spec)?;
(is_valid, Some(Box::new(snapshot)))
};
if !signature_is_valid {
return Err(EnvelopeError::BadSignature);
}
Ok(Self {
signed_envelope,
block,
snapshot: opt_snapshot,
})
}
pub fn envelope_cloned(&self) -> Arc<SignedExecutionPayloadEnvelope<T::EthSpec>> {
self.signed_envelope.clone()
}
}
impl<T: BeaconChainTypes> BeaconChain<T> {
/// Build a `GossipVerificationContext` from this `BeaconChain`.
pub fn gossip_verification_context(&self) -> GossipVerificationContext<'_, T> {
GossipVerificationContext {
canonical_head: &self.canonical_head,
store: &self.store,
spec: &self.spec,
beacon_proposer_cache: &self.beacon_proposer_cache,
validator_pubkey_cache: &self.validator_pubkey_cache,
genesis_validators_root: self.genesis_validators_root,
}
}
/// Returns `Ok(GossipVerifiedEnvelope)` if the supplied `envelope` should be forwarded onto the
/// gossip network. The envelope is not imported into the chain, it is just partially verified.
///
/// The returned `GossipVerifiedEnvelope` should be provided to `Self::process_execution_payload_envelope` immediately
/// after it is returned, unless some other circumstance decides it should not be imported at
/// all.
///
/// ## Errors
///
/// Returns an `Err` if the given envelope was invalid, or an error was encountered during verification.
pub async fn verify_envelope_for_gossip(
self: &Arc<Self>,
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
) -> Result<GossipVerifiedEnvelope<T>, EnvelopeError> {
let chain = self.clone();
let span = Span::current();
self.task_executor
.clone()
.spawn_blocking_handle(
move || {
let _guard = span.enter();
let slot = envelope.slot();
let beacon_block_root = envelope.message.beacon_block_root;
let ctx = chain.gossip_verification_context();
match GossipVerifiedEnvelope::new(envelope, &ctx) {
Ok(verified) => {
debug!(
%slot,
?beacon_block_root,
"Successfully verified gossip envelope"
);
Ok(verified)
}
Err(e) => {
debug!(
error = e.to_string(),
?beacon_block_root,
%slot,
"Rejected gossip envelope"
);
Err(e)
}
}
},
"gossip_envelope_verification_handle",
)
.ok_or(BeaconChainError::RuntimeShutdown)?
.await
.map_err(BeaconChainError::TokioJoin)?
}
}
#[cfg(test)]
mod tests {
use std::marker::PhantomData;
use bls::Signature;
use ssz_types::VariableList;
use types::{
BeaconBlock, BeaconBlockBodyGloas, BeaconBlockGloas, Eth1Data, ExecutionBlockHash,
ExecutionPayloadBid, ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests,
Graffiti, Hash256, MinimalEthSpec, SignedBeaconBlock, SignedExecutionPayloadBid, Slot,
SyncAggregate,
};
use super::verify_envelope_consistency;
use crate::payload_envelope_verification::EnvelopeError;
type E = MinimalEthSpec;
fn make_envelope(
slot: Slot,
builder_index: u64,
block_hash: ExecutionBlockHash,
) -> ExecutionPayloadEnvelope<E> {
ExecutionPayloadEnvelope {
payload: ExecutionPayloadGloas {
block_hash,
..ExecutionPayloadGloas::default()
},
execution_requests: ExecutionRequests::default(),
builder_index,
beacon_block_root: Hash256::ZERO,
slot,
state_root: Hash256::ZERO,
}
}
fn make_block(slot: Slot) -> SignedBeaconBlock<E> {
let block = BeaconBlock::Gloas(BeaconBlockGloas {
slot,
proposer_index: 0,
parent_root: Hash256::ZERO,
state_root: Hash256::ZERO,
body: BeaconBlockBodyGloas {
randao_reveal: Signature::empty(),
eth1_data: Eth1Data {
deposit_root: Hash256::ZERO,
block_hash: Hash256::ZERO,
deposit_count: 0,
},
graffiti: Graffiti::default(),
proposer_slashings: VariableList::empty(),
attester_slashings: VariableList::empty(),
attestations: VariableList::empty(),
deposits: VariableList::empty(),
voluntary_exits: VariableList::empty(),
sync_aggregate: SyncAggregate::empty(),
bls_to_execution_changes: VariableList::empty(),
signed_execution_payload_bid: SignedExecutionPayloadBid::empty(),
payload_attestations: VariableList::empty(),
_phantom: PhantomData,
},
});
SignedBeaconBlock::from_block(block, Signature::empty())
}
fn make_bid(builder_index: u64, block_hash: ExecutionBlockHash) -> ExecutionPayloadBid<E> {
ExecutionPayloadBid {
builder_index,
block_hash,
..ExecutionPayloadBid::default()
}
}
#[test]
fn test_valid_envelope() {
let slot = Slot::new(10);
let builder_index = 5;
let block_hash = ExecutionBlockHash::repeat_byte(0xaa);
let envelope = make_envelope(slot, builder_index, block_hash);
let block = make_block(slot);
let bid = make_bid(builder_index, block_hash);
assert!(verify_envelope_consistency::<E>(&envelope, &block, &bid, Slot::new(0)).is_ok());
}
#[test]
fn test_prior_to_finalization() {
let slot = Slot::new(5);
let builder_index = 1;
let block_hash = ExecutionBlockHash::repeat_byte(0xbb);
let envelope = make_envelope(slot, builder_index, block_hash);
let block = make_block(slot);
let bid = make_bid(builder_index, block_hash);
let latest_finalized_slot = Slot::new(10);
let result =
verify_envelope_consistency::<E>(&envelope, &block, &bid, latest_finalized_slot);
assert!(matches!(
result,
Err(EnvelopeError::PriorToFinalization { .. })
));
}
#[test]
fn test_slot_mismatch() {
let builder_index = 1;
let block_hash = ExecutionBlockHash::repeat_byte(0xcc);
let envelope = make_envelope(Slot::new(10), builder_index, block_hash);
let block = make_block(Slot::new(20));
let bid = make_bid(builder_index, block_hash);
let result = verify_envelope_consistency::<E>(&envelope, &block, &bid, Slot::new(0));
assert!(matches!(result, Err(EnvelopeError::SlotMismatch { .. })));
}
#[test]
fn test_builder_index_mismatch() {
let slot = Slot::new(10);
let block_hash = ExecutionBlockHash::repeat_byte(0xdd);
let envelope = make_envelope(slot, 1, block_hash);
let block = make_block(slot);
let bid = make_bid(2, block_hash);
let result = verify_envelope_consistency::<E>(&envelope, &block, &bid, Slot::new(0));
assert!(matches!(
result,
Err(EnvelopeError::BuilderIndexMismatch { .. })
));
}
#[test]
fn test_block_hash_mismatch() {
let slot = Slot::new(10);
let builder_index = 1;
let envelope = make_envelope(slot, builder_index, ExecutionBlockHash::repeat_byte(0xee));
let block = make_block(slot);
let bid = make_bid(builder_index, ExecutionBlockHash::repeat_byte(0xff));
let result = verify_envelope_consistency::<E>(&envelope, &block, &bid, Slot::new(0));
assert!(matches!(
result,
Err(EnvelopeError::BlockHashMismatch { .. })
));
}
}

View File

@@ -0,0 +1,354 @@
use std::sync::Arc;
use std::time::Duration;
use fork_choice::PayloadVerificationStatus;
use slot_clock::SlotClock;
use store::StoreOp;
use tracing::{debug, error, info, info_span, instrument, warn};
use types::{BeaconState, BlockImportSource, Hash256, Slot};
use super::{
AvailableEnvelope, AvailableExecutedEnvelope, EnvelopeError, EnvelopeImportData,
ExecutedEnvelope, gossip_verified_envelope::GossipVerifiedEnvelope,
};
use crate::{
AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes,
NotifyExecutionLayer, block_verification_types::AvailableBlockData, metrics,
payload_envelope_verification::ExecutionPendingEnvelope, validator_monitor::get_slot_delay_ms,
};
const ENVELOPE_METRICS_CACHE_SLOT_LIMIT: u32 = 64;
impl<T: BeaconChainTypes> BeaconChain<T> {
/// Returns `Ok(status)` if the given `unverified_envelope` was successfully verified and
/// imported into the chain.
///
/// ## Errors
///
/// Returns an `Err` if the given payload envelope was invalid, or an error was encountered during
/// verification.
#[instrument(skip_all, fields(block_root = ?block_root, block_source = %block_source))]
pub async fn process_execution_payload_envelope(
self: &Arc<Self>,
block_root: Hash256,
unverified_envelope: GossipVerifiedEnvelope<T>,
notify_execution_layer: NotifyExecutionLayer,
block_source: BlockImportSource,
publish_fn: impl FnOnce() -> Result<(), EnvelopeError>,
) -> Result<AvailabilityProcessingStatus, EnvelopeError> {
let block_slot = unverified_envelope.signed_envelope.slot();
// Set observed time if not already set. Usually this should be set by gossip or RPC,
// but just in case we set it again here (useful for tests).
if let Some(seen_timestamp) = self.slot_clock.now_duration() {
self.envelope_times_cache.write().set_time_observed(
block_root,
block_slot,
seen_timestamp,
None,
);
}
// TODO(gloas) insert the pre-executed envelope into some type of cache.
let _full_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_TIMES);
metrics::inc_counter(&metrics::ENVELOPE_PROCESSING_REQUESTS);
// A small closure to group the verification and import errors.
let chain = self.clone();
let import_envelope = async move {
let execution_pending = unverified_envelope
.into_execution_pending_envelope(&chain, notify_execution_layer)?;
publish_fn()?;
// Record the time it took to complete consensus verification.
if let Some(timestamp) = chain.slot_clock.now_duration() {
chain
.envelope_times_cache
.write()
.set_time_consensus_verified(block_root, block_slot, timestamp);
}
let envelope_times_cache = chain.envelope_times_cache.clone();
let slot_clock = chain.slot_clock.clone();
// TODO(gloas): rename/refactor these `into_` names to be less similar and more clear
// about what the function actually does.
let executed_envelope = chain
.into_executed_payload_envelope(execution_pending)
.await
.inspect_err(|_| {
// TODO(gloas) If the envelope fails execution for whatever reason (e.g. engine offline),
// and we keep it in the cache, then the node will NOT perform lookup and
// reprocess this block until the block is evicted from DA checker, causing the
// chain to get stuck temporarily if the block is canonical. Therefore we remove
// it from the cache if execution fails.
})?;
// Record the time it took to wait for execution layer verification.
if let Some(timestamp) = slot_clock.now_duration() {
envelope_times_cache
.write()
.set_time_executed(block_root, block_slot, timestamp);
}
match executed_envelope {
ExecutedEnvelope::Available(envelope) => {
self.import_available_execution_payload_envelope(Box::new(envelope))
.await
}
ExecutedEnvelope::AvailabilityPending() => Err(EnvelopeError::InternalError(
"Pending payload envelope not yet implemented".to_owned(),
)),
}
};
// Verify and import the payload envelope.
match import_envelope.await {
// The payload envelope was successfully verified and imported.
Ok(status @ AvailabilityProcessingStatus::Imported(block_root)) => {
info!(
?block_root,
%block_slot,
source = %block_source,
"Execution payload envelope imported"
);
// TODO(gloas) do we need to send a `PayloadImported` event to the reprocess queue?
// TODO(gloas) do we need to recompute head?
// should canonical_head return the block and the payload now?
self.recompute_head_at_current_slot().await;
metrics::inc_counter(&metrics::ENVELOPE_PROCESSING_SUCCESSES);
Ok(status)
}
Ok(status @ AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => {
debug!(?block_root, %slot, "Payload envelope awaiting blobs");
Ok(status)
}
Err(EnvelopeError::BeaconChainError(e)) => {
if matches!(e.as_ref(), BeaconChainError::TokioJoin(_)) {
debug!(error = ?e, "Envelope processing cancelled");
} else {
warn!(error = ?e, "Execution payload envelope rejected");
}
Err(EnvelopeError::BeaconChainError(e))
}
Err(other) => {
warn!(
reason = other.to_string(),
"Execution payload envelope rejected"
);
Err(other)
}
}
}
/// Accepts a fully-verified payload envelope and awaits on its payload verification handle to
/// get a fully `ExecutedEnvelope`.
///
/// An error is returned if the verification handle couldn't be awaited.
#[instrument(skip_all, level = "debug")]
async fn into_executed_payload_envelope(
self: Arc<Self>,
pending_envelope: ExecutionPendingEnvelope<T::EthSpec>,
) -> Result<ExecutedEnvelope<T::EthSpec>, EnvelopeError> {
let ExecutionPendingEnvelope {
signed_envelope,
import_data,
payload_verification_handle,
} = pending_envelope;
let payload_verification_outcome = payload_verification_handle
.await
.map_err(BeaconChainError::TokioJoin)?
.ok_or(BeaconChainError::RuntimeShutdown)??;
Ok(ExecutedEnvelope::new(
signed_envelope,
import_data,
payload_verification_outcome,
))
}
#[instrument(skip_all)]
pub async fn import_available_execution_payload_envelope(
self: &Arc<Self>,
envelope: Box<AvailableExecutedEnvelope<T::EthSpec>>,
) -> Result<AvailabilityProcessingStatus, EnvelopeError> {
let AvailableExecutedEnvelope {
envelope,
import_data,
payload_verification_outcome,
} = *envelope;
let EnvelopeImportData {
block_root,
post_state,
} = import_data;
let block_root = {
// Capture the current span before moving into the blocking task
let current_span = tracing::Span::current();
let chain = self.clone();
self.spawn_blocking_handle(
move || {
// Enter the captured span in the blocking thread
let _guard = current_span.enter();
chain.import_execution_payload_envelope(
envelope,
block_root,
*post_state,
payload_verification_outcome.payload_verification_status,
)
},
"payload_verification_handle",
)
.await??
};
Ok(AvailabilityProcessingStatus::Imported(block_root))
}
/// Accepts a fully-verified and available envelope and imports it into the chain without performing any
/// additional verification.
///
/// An error is returned if the envelope was unable to be imported. It may be partially imported
/// (i.e., this function is not atomic).
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
fn import_execution_payload_envelope(
&self,
signed_envelope: AvailableEnvelope<T::EthSpec>,
block_root: Hash256,
state: BeaconState<T::EthSpec>,
_payload_verification_status: PayloadVerificationStatus,
) -> Result<Hash256, EnvelopeError> {
// Everything in this initial section is on the hot path for processing the envelope.
// Take an upgradable read lock on fork choice so we can check if this block has already
// been imported. We don't want to repeat work importing a block that is already imported.
let fork_choice_reader = self.canonical_head.fork_choice_upgradable_read_lock();
if !fork_choice_reader.contains_block(&block_root) {
return Err(EnvelopeError::BlockRootUnknown { block_root });
}
// TODO(gloas) add defensive check to see if payload envelope is already in fork choice
// Note that a duplicate cache/payload status table should prevent this from happening
// but it doesnt hurt to be defensive.
// TODO(gloas) when the code below is implemented we can delete this drop
drop(fork_choice_reader);
// TODO(gloas) no fork choice logic yet
// Take an exclusive write-lock on fork choice. It's very important to prevent deadlocks by
// avoiding taking other locks whilst holding this lock.
// let fork_choice = parking_lot::RwLockUpgradableReadGuard::upgrade(fork_choice_reader);
// TODO(gloas) Do we need this check? Do not import a block that doesn't descend from the finalized root.
// let signed_block = check_block_is_finalized_checkpoint_or_descendant(self, &fork_choice, signed_block)?;
// TODO(gloas) emit SSE event if the payload became the new head payload
// It is important NOT to return errors here before the database commit, because the envelope
// has already been added to fork choice and the database would be left in an inconsistent
// state if we returned early without committing. In other words, an error here would
// corrupt the node's database permanently.
// Store the envelope, its post-state, and any data columns.
// If the write fails, revert fork choice to the version from disk, else we can
// end up with envelopes in fork choice that are missing from disk.
// See https://github.com/sigp/lighthouse/issues/2028
let (signed_envelope, columns) = signed_envelope.deconstruct();
let mut ops = vec![];
if let Some(blobs_or_columns_store_op) = self.get_blobs_or_columns_store_op(
block_root,
signed_envelope.slot(),
AvailableBlockData::DataColumns(columns),
) {
ops.push(blobs_or_columns_store_op);
}
let db_write_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_DB_WRITE);
ops.push(StoreOp::PutPayloadEnvelope(
block_root,
signed_envelope.clone(),
));
ops.push(StoreOp::PutState(
signed_envelope.message.state_root,
&state,
));
let db_span = info_span!("persist_payloads_and_blobs").entered();
if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) {
error!(
msg = "Restoring fork choice from disk",
error = ?e,
"Database write failed!"
);
return Err(e.into());
// TODO(gloas) handle db write failure
// return Err(self
// .handle_import_block_db_write_error(fork_choice)
// .err()
// .unwrap_or(e.into()));
}
drop(db_span);
// TODO(gloas) drop fork choice lock
// The fork choice write-lock is dropped *after* the on-disk database has been updated.
// This prevents inconsistency between the two at the expense of concurrency.
// drop(fork_choice);
// We're declaring the envelope "imported" at this point, since fork choice and the DB know
// about it.
let envelope_time_imported = self.slot_clock.now_duration().unwrap_or(Duration::MAX);
// TODO(gloas) depending on what happens with light clients
// we might need to do some light client related computations here
metrics::stop_timer(db_write_timer);
self.import_envelope_update_metrics_and_events(
block_root,
signed_envelope.slot(),
envelope_time_imported,
);
Ok(block_root)
}
fn import_envelope_update_metrics_and_events(
&self,
block_root: Hash256,
envelope_slot: Slot,
envelope_time_imported: Duration,
) {
let envelope_delay_total =
get_slot_delay_ms(envelope_time_imported, envelope_slot, &self.slot_clock);
// Do not write to the cache for envelopes older than 2 epochs, this helps reduce writes
// to the cache during sync.
if envelope_delay_total
< self
.slot_clock
.slot_duration()
.saturating_mul(ENVELOPE_METRICS_CACHE_SLOT_LIMIT)
{
self.envelope_times_cache.write().set_time_imported(
block_root,
envelope_slot,
envelope_time_imported,
);
}
// TODO(gloas) emit SSE event for envelope import (similar to SseBlock for blocks).
}
}

View File

@@ -0,0 +1,285 @@
//! The incremental processing steps (e.g., signatures verified but not the state transition) is
//! represented as a sequence of wrapper-types around the envelope. There is a linear progression of
//! types, starting at a `SignedExecutionPayloadEnvelope` and finishing with an `AvailableExecutedEnvelope` (see
//! diagram below).
//!
//! ```ignore
//! SignedExecutionPayloadEnvelope
//! |
//! ▼
//! GossipVerifiedEnvelope
//! |
//! ▼
//! ExecutionPendingEnvelope
//! |
//! await
//! ▼
//! ExecutedEnvelope
//!
//! ```
use std::sync::Arc;
use store::Error as DBError;
use state_processing::{BlockProcessingError, envelope_processing::EnvelopeProcessingError};
use tracing::instrument;
use types::{
BeaconState, BeaconStateError, ChainSpec, DataColumnSidecarList, EthSpec, ExecutionBlockHash,
ExecutionPayloadEnvelope, Hash256, SignedExecutionPayloadEnvelope, Slot,
};
use crate::{
BeaconChainError, BeaconChainTypes, BeaconStore, BlockError, ExecutionPayloadError,
PayloadVerificationOutcome,
};
pub mod execution_pending_envelope;
pub mod gossip_verified_envelope;
pub mod import;
mod payload_notifier;
pub use execution_pending_envelope::ExecutionPendingEnvelope;
#[derive(PartialEq)]
pub struct EnvelopeImportData<E: EthSpec> {
pub block_root: Hash256,
pub post_state: Box<BeaconState<E>>,
}
#[derive(Debug)]
#[allow(dead_code)]
pub struct AvailableEnvelope<E: EthSpec> {
execution_block_hash: ExecutionBlockHash,
envelope: Arc<SignedExecutionPayloadEnvelope<E>>,
columns: DataColumnSidecarList<E>,
/// Timestamp at which this envelope first became available (UNIX timestamp, time since 1970).
columns_available_timestamp: Option<std::time::Duration>,
pub spec: Arc<ChainSpec>,
}
impl<E: EthSpec> AvailableEnvelope<E> {
pub fn message(&self) -> &ExecutionPayloadEnvelope<E> {
&self.envelope.message
}
#[allow(clippy::type_complexity)]
pub fn deconstruct(
self,
) -> (
Arc<SignedExecutionPayloadEnvelope<E>>,
DataColumnSidecarList<E>,
) {
let AvailableEnvelope {
envelope, columns, ..
} = self;
(envelope, columns)
}
}
pub enum MaybeAvailableEnvelope<E: EthSpec> {
Available(AvailableEnvelope<E>),
AvailabilityPending {
block_hash: ExecutionBlockHash,
envelope: Arc<SignedExecutionPayloadEnvelope<E>>,
},
}
/// This snapshot is to be used for verifying a payload envelope.
#[derive(Debug, Clone)]
pub struct EnvelopeProcessingSnapshot<E: EthSpec> {
/// This state is equivalent to the `self.beacon_block.state_root()` before applying the envelope.
pub pre_state: BeaconState<E>,
pub state_root: Hash256,
pub beacon_block_root: Hash256,
}
/// A payload envelope that has gone through processing checks and execution by an EL client.
/// This envelope hasn't necessarily completed data availability checks.
///
///
/// It contains 2 variants:
/// 1. `Available`: This envelope has been executed and also contains all data to consider it
/// fully available.
/// 2. `AvailabilityPending`: This envelope hasn't received all required blobs to consider it
/// fully available.
pub enum ExecutedEnvelope<E: EthSpec> {
Available(AvailableExecutedEnvelope<E>),
// TODO(gloas) implement availability pending
AvailabilityPending(),
}
impl<E: EthSpec> ExecutedEnvelope<E> {
pub fn new(
envelope: MaybeAvailableEnvelope<E>,
import_data: EnvelopeImportData<E>,
payload_verification_outcome: PayloadVerificationOutcome,
) -> Self {
match envelope {
MaybeAvailableEnvelope::Available(available_envelope) => {
Self::Available(AvailableExecutedEnvelope::new(
available_envelope,
import_data,
payload_verification_outcome,
))
}
// TODO(gloas) implement availability pending
MaybeAvailableEnvelope::AvailabilityPending {
block_hash: _,
envelope: _,
} => Self::AvailabilityPending(),
}
}
}
/// A payload envelope that has completed all payload processing checks including verification
/// by an EL client **and** has all requisite blob data to be imported into fork choice.
pub struct AvailableExecutedEnvelope<E: EthSpec> {
pub envelope: AvailableEnvelope<E>,
pub import_data: EnvelopeImportData<E>,
pub payload_verification_outcome: PayloadVerificationOutcome,
}
impl<E: EthSpec> AvailableExecutedEnvelope<E> {
pub fn new(
envelope: AvailableEnvelope<E>,
import_data: EnvelopeImportData<E>,
payload_verification_outcome: PayloadVerificationOutcome,
) -> Self {
Self {
envelope,
import_data,
payload_verification_outcome,
}
}
}
#[derive(Debug)]
pub enum EnvelopeError {
/// The envelope's block root is unknown.
BlockRootUnknown { block_root: Hash256 },
/// The signature is invalid.
BadSignature,
/// The builder index doesn't match the committed bid
BuilderIndexMismatch { committed_bid: u64, envelope: u64 },
/// The envelope slot doesn't match the block
SlotMismatch { block: Slot, envelope: Slot },
/// The validator index is unknown
UnknownValidator { proposer_index: u64 },
/// The block hash doesn't match the committed bid
BlockHashMismatch {
committed_bid: ExecutionBlockHash,
envelope: ExecutionBlockHash,
},
/// The block's proposer_index does not match the locally computed proposer
IncorrectBlockProposer {
proposer_index: u64,
local_shuffling: u64,
},
/// The slot belongs to a block that is from a slot prior than
/// to most recently finalized slot
PriorToFinalization {
payload_slot: Slot,
latest_finalized_slot: Slot,
},
/// Some Beacon Chain Error
BeaconChainError(Arc<BeaconChainError>),
/// Some Beacon State error
BeaconStateError(BeaconStateError),
/// Some BlockProcessingError (for electra operations)
BlockProcessingError(BlockProcessingError),
/// Some EnvelopeProcessingError
EnvelopeProcessingError(EnvelopeProcessingError),
/// Error verifying the execution payload
ExecutionPayloadError(ExecutionPayloadError),
/// An error from block-level checks reused during envelope import
BlockError(BlockError),
/// Internal error
InternalError(String),
}
impl std::fmt::Display for EnvelopeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl From<BeaconChainError> for EnvelopeError {
fn from(e: BeaconChainError) -> Self {
EnvelopeError::BeaconChainError(Arc::new(e))
}
}
impl From<ExecutionPayloadError> for EnvelopeError {
fn from(e: ExecutionPayloadError) -> Self {
EnvelopeError::ExecutionPayloadError(e)
}
}
impl From<BeaconStateError> for EnvelopeError {
fn from(e: BeaconStateError) -> Self {
EnvelopeError::BeaconStateError(e)
}
}
impl From<DBError> for EnvelopeError {
fn from(e: DBError) -> Self {
EnvelopeError::BeaconChainError(Arc::new(BeaconChainError::DBError(e)))
}
}
impl From<BlockError> for EnvelopeError {
fn from(e: BlockError) -> Self {
EnvelopeError::BlockError(e)
}
}
/// Pull errors up from EnvelopeProcessingError to EnvelopeError
impl From<EnvelopeProcessingError> for EnvelopeError {
fn from(e: EnvelopeProcessingError) -> Self {
match e {
EnvelopeProcessingError::BadSignature => EnvelopeError::BadSignature,
EnvelopeProcessingError::BeaconStateError(e) => EnvelopeError::BeaconStateError(e),
EnvelopeProcessingError::BlockHashMismatch {
committed_bid,
envelope,
} => EnvelopeError::BlockHashMismatch {
committed_bid,
envelope,
},
EnvelopeProcessingError::BlockProcessingError(e) => {
EnvelopeError::BlockProcessingError(e)
}
e => EnvelopeError::EnvelopeProcessingError(e),
}
}
}
#[instrument(skip_all, level = "debug", fields(beacon_block_root = %beacon_block_root))]
/// Load state from store given a known state root and block root.
/// Use this when the proto block has already been looked up from fork choice.
pub(crate) fn load_snapshot_from_state_root<T: BeaconChainTypes>(
beacon_block_root: Hash256,
block_state_root: Hash256,
store: &BeaconStore<T>,
) -> Result<EnvelopeProcessingSnapshot<T::EthSpec>, EnvelopeError> {
// TODO(EIP-7732): add metrics here
// We can use `get_hot_state` here rather than `get_advanced_hot_state` because the envelope
// must be from the same slot as its block (so no advance is required).
let cache_state = true;
let state = store
.get_hot_state(&block_state_root, cache_state)
.map_err(EnvelopeError::from)?
.ok_or_else(|| {
BeaconChainError::DBInconsistent(format!(
"Missing state for envelope block {block_state_root:?}",
))
})?;
Ok(EnvelopeProcessingSnapshot {
pre_state: state,
state_root: block_state_root,
beacon_block_root,
})
}

View File

@@ -0,0 +1,94 @@
use std::sync::Arc;
use execution_layer::{NewPayloadRequest, NewPayloadRequestGloas};
use fork_choice::PayloadVerificationStatus;
use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash;
use tracing::warn;
use types::{SignedBeaconBlock, SignedExecutionPayloadEnvelope};
use crate::{
BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer,
execution_payload::notify_new_payload, payload_envelope_verification::EnvelopeError,
};
/// Used to await the result of executing payload with a remote EE.
pub struct PayloadNotifier<T: BeaconChainTypes> {
pub chain: Arc<BeaconChain<T>>,
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
payload_verification_status: Option<PayloadVerificationStatus>,
}
impl<T: BeaconChainTypes> PayloadNotifier<T> {
pub fn new(
chain: Arc<BeaconChain<T>>,
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<Self, EnvelopeError> {
let payload_verification_status = {
let payload_message = &envelope.message;
match notify_execution_layer {
NotifyExecutionLayer::No if chain.config.optimistic_finalized_sync => {
let new_payload_request = Self::build_new_payload_request(&envelope, &block)?;
// TODO(gloas): check and test RLP block hash calculation post-Gloas
if let Err(e) = new_payload_request.perform_optimistic_sync_verifications() {
warn!(
block_number = ?payload_message.payload.block_number,
info = "you can silence this warning with --disable-optimistic-finalized-sync",
error = ?e,
"Falling back to slow block hash verification"
);
None
} else {
Some(PayloadVerificationStatus::Optimistic)
}
}
_ => None,
}
};
Ok(Self {
chain,
envelope,
block,
payload_verification_status,
})
}
pub async fn notify_new_payload(self) -> Result<PayloadVerificationStatus, BlockError> {
if let Some(precomputed_status) = self.payload_verification_status {
Ok(precomputed_status)
} else {
let parent_root = self.block.message().parent_root();
let request = Self::build_new_payload_request(&self.envelope, &self.block)?;
notify_new_payload(&self.chain, self.envelope.slot(), parent_root, request).await
}
}
fn build_new_payload_request<'a>(
envelope: &'a SignedExecutionPayloadEnvelope<T::EthSpec>,
block: &'a SignedBeaconBlock<T::EthSpec>,
) -> Result<NewPayloadRequest<'a, T::EthSpec>, BlockError> {
let bid = &block
.message()
.body()
.signed_execution_payload_bid()
.map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))?
.message;
let versioned_hashes = bid
.blob_kzg_commitments
.iter()
.map(kzg_commitment_to_versioned_hash)
.collect();
Ok(NewPayloadRequest::Gloas(NewPayloadRequestGloas {
execution_payload: &envelope.message.payload,
versioned_hashes,
parent_beacon_block_root: block.message().parent_root(),
execution_requests: &envelope.message.execution_requests,
}))
}
}