mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-30 12:47:05 +00:00
temp chhanges
This commit is contained in:
@@ -14,8 +14,8 @@ use std::sync::Arc;
|
|||||||
use task_executor::TaskExecutor;
|
use task_executor::TaskExecutor;
|
||||||
use tracing::{debug, error, instrument};
|
use tracing::{debug, error, instrument};
|
||||||
use types::{
|
use types::{
|
||||||
ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256,
|
BlockImportSource, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
|
||||||
SignedExecutionPayloadBid, Slot,
|
Hash256, SignedExecutionPayloadBid, SignedExecutionPayloadEnvelope, Slot,
|
||||||
};
|
};
|
||||||
|
|
||||||
mod payload_envelope_cache;
|
mod payload_envelope_cache;
|
||||||
@@ -50,6 +50,16 @@ pub enum Availability<E: EthSpec> {
|
|||||||
Available(Box<AvailableExecutedEnvelope<E>>),
|
Available(Box<AvailableExecutedEnvelope<E>>),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub enum PayloadEnvelopeProcessingStatus<E: EthSpec> {
|
||||||
|
/// Envelope is not in any pre-import cache. Envelope may be in the data-base or in the fork-choice.
|
||||||
|
Unknown,
|
||||||
|
/// Envelope is currently processing but not yet validated.
|
||||||
|
NotValidated(Arc<SignedExecutionPayloadEnvelope<E>>, BlockImportSource),
|
||||||
|
/// Envelope is fully valid, but not yet imported. It's cached in the da_checker while awaiting
|
||||||
|
/// missing envelope components.
|
||||||
|
ExecutionValidated(Arc<SignedExecutionPayloadEnvelope<E>>),
|
||||||
|
}
|
||||||
|
|
||||||
impl<E: EthSpec> Debug for Availability<E> {
|
impl<E: EthSpec> Debug for Availability<E> {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
@@ -148,6 +158,15 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn put_pre_executed_payload_envelope(
|
||||||
|
&self,
|
||||||
|
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||||
|
source: BlockImportSource,
|
||||||
|
) -> Result<(), AvailabilityCheckError> {
|
||||||
|
self.availability_cache
|
||||||
|
.put_pre_executed_payload_envelope(envelope, source)
|
||||||
|
}
|
||||||
|
|
||||||
/// Insert RPC custody columns and check if the payload becomes available.
|
/// Insert RPC custody columns and check if the payload becomes available.
|
||||||
#[instrument(skip_all, level = "trace")]
|
#[instrument(skip_all, level = "trace")]
|
||||||
pub fn put_rpc_custody_columns(
|
pub fn put_rpc_custody_columns(
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ use crate::BeaconChainTypes;
|
|||||||
use crate::CustodyContext;
|
use crate::CustodyContext;
|
||||||
use crate::data_availability_checker::AvailabilityCheckError;
|
use crate::data_availability_checker::AvailabilityCheckError;
|
||||||
use crate::data_availability_checker_v2::Availability;
|
use crate::data_availability_checker_v2::Availability;
|
||||||
|
use crate::data_availability_checker_v2::PayloadEnvelopeProcessingStatus;
|
||||||
use crate::data_column_verification::KzgVerifiedCustodyDataColumn;
|
use crate::data_column_verification::KzgVerifiedCustodyDataColumn;
|
||||||
use crate::payload_envelope_verification::AvailabilityPendingExecutedEnvelope;
|
use crate::payload_envelope_verification::AvailabilityPendingExecutedEnvelope;
|
||||||
use crate::payload_envelope_verification::AvailableEnvelope;
|
use crate::payload_envelope_verification::AvailableEnvelope;
|
||||||
@@ -42,6 +43,7 @@ pub struct PendingComponents<E: EthSpec> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<E: EthSpec> PendingComponents<E> {
|
impl<E: EthSpec> PendingComponents<E> {
|
||||||
|
|
||||||
/// Returns an immutable reference to the cached data column.
|
/// Returns an immutable reference to the cached data column.
|
||||||
pub fn get_cached_data_column(
|
pub fn get_cached_data_column(
|
||||||
&self,
|
&self,
|
||||||
@@ -80,7 +82,7 @@ impl<E: EthSpec> PendingComponents<E> {
|
|||||||
self.bid = Some(bid);
|
self.bid = Some(bid);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn insert_pending_executed_envelope(
|
pub fn insert_pre_executed_envelope(
|
||||||
&mut self,
|
&mut self,
|
||||||
envelope: Arc<SignedExecutionPayloadEnvelope<E>>,
|
envelope: Arc<SignedExecutionPayloadEnvelope<E>>,
|
||||||
import_source: BlockImportSource,
|
import_source: BlockImportSource,
|
||||||
@@ -249,6 +251,24 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the envelope processing status for the given `block_root`. A `None` response indicates that
|
||||||
|
/// the envelope has not yet been inserted into the cache.
|
||||||
|
pub fn get_envelope_processing_status(&self, block_root: &Hash256) -> Option<PayloadEnvelopeProcessingStatus<T::EthSpec>> {
|
||||||
|
self.critical
|
||||||
|
.read()
|
||||||
|
.peek(block_root)
|
||||||
|
.and_then(|pending_components| {
|
||||||
|
pending_components.envelope.as_ref().map(|envelope| match envelope {
|
||||||
|
CachedPayloadEnvelope::PreExecution(e, source) => {
|
||||||
|
PayloadEnvelopeProcessingStatus::NotValidated(e.clone(), *source)
|
||||||
|
}
|
||||||
|
CachedPayloadEnvelope::Executed(e) => {
|
||||||
|
PayloadEnvelopeProcessingStatus::ExecutionValidated(e.envelope.clone())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// Fetch data columns of a given `block_root` from the cache without affecting the LRU ordering
|
/// Fetch data columns of a given `block_root` from the cache without affecting the LRU ordering
|
||||||
pub fn peek_data_columns(
|
pub fn peek_data_columns(
|
||||||
&self,
|
&self,
|
||||||
@@ -301,6 +321,43 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
|||||||
self.check_availability(block_root, pending_components, num_expected_columns)
|
self.check_availability(block_root, pending_components, num_expected_columns)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn put_pre_executed_payload_envelope(
|
||||||
|
&self,
|
||||||
|
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||||
|
source: BlockImportSource,
|
||||||
|
) -> Result<(), AvailabilityCheckError> {
|
||||||
|
let epoch = envelope.epoch();
|
||||||
|
let beacon_block_root = envelope.beacon_block_root();
|
||||||
|
let pending_components =
|
||||||
|
self.update_or_insert_pending_components(beacon_block_root, |pending_components| {
|
||||||
|
pending_components.insert_pre_executed_envelope(envelope, source);
|
||||||
|
Ok(())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let num_expected_columns_opt = self.get_num_expected_columns(epoch);
|
||||||
|
|
||||||
|
pending_components.span.in_scope(|| {
|
||||||
|
debug!(
|
||||||
|
component = "pre executed payload envelope",
|
||||||
|
status = pending_components.status_str(num_expected_columns_opt),
|
||||||
|
"Component added to data availability checker"
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Removes a pre-executed envelope from the cache.
|
||||||
|
/// This does NOT remove an existing executed envelope.
|
||||||
|
pub fn remove_pre_executed_envelope(&self, block_root: &Hash256) {
|
||||||
|
// The read lock is immediately dropped so we can safely remove the envelope from the cache.
|
||||||
|
if let Some(PayloadEnvelopeProcessingStatus::NotValidated(_, _)) = self.get_envelope_processing_status(block_root) {
|
||||||
|
// If the envelope is execution invalid, this status is permanent and idempotent to this
|
||||||
|
// block_root. We drop its components (e.g. columns) because they will never be useful.
|
||||||
|
self.critical.write().pop(block_root);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[allow(clippy::type_complexity)]
|
#[allow(clippy::type_complexity)]
|
||||||
pub fn put_kzg_verified_data_columns<
|
pub fn put_kzg_verified_data_columns<
|
||||||
I: IntoIterator<Item = KzgVerifiedCustodyDataColumn<T::EthSpec>>,
|
I: IntoIterator<Item = KzgVerifiedCustodyDataColumn<T::EthSpec>>,
|
||||||
|
|||||||
@@ -27,13 +27,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
///
|
///
|
||||||
/// Returns an `Err` if the given payload envelope was invalid, or an error was encountered during
|
/// Returns an `Err` if the given payload envelope was invalid, or an error was encountered during
|
||||||
/// verification.
|
/// verification.
|
||||||
#[instrument(skip_all, fields(block_root = ?block_root, block_source = %block_source))]
|
#[instrument(skip_all, fields(block_root = ?block_root, envelope_source = %envelope_source))]
|
||||||
pub async fn process_execution_payload_envelope(
|
pub async fn process_execution_payload_envelope(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
block_root: Hash256,
|
block_root: Hash256,
|
||||||
unverified_envelope: GossipVerifiedEnvelope<T>,
|
unverified_envelope: GossipVerifiedEnvelope<T>,
|
||||||
notify_execution_layer: NotifyExecutionLayer,
|
notify_execution_layer: NotifyExecutionLayer,
|
||||||
block_source: BlockImportSource,
|
envelope_source: BlockImportSource,
|
||||||
publish_fn: impl FnOnce() -> Result<(), EnvelopeError>,
|
publish_fn: impl FnOnce() -> Result<(), EnvelopeError>,
|
||||||
) -> Result<AvailabilityProcessingStatus, EnvelopeError> {
|
) -> Result<AvailabilityProcessingStatus, EnvelopeError> {
|
||||||
let block_slot = unverified_envelope.signed_envelope.slot();
|
let block_slot = unverified_envelope.signed_envelope.slot();
|
||||||
@@ -49,7 +49,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(gloas) insert the pre-executed envelope into some type of cache.
|
self.data_availability_checker
|
||||||
|
.v2()
|
||||||
|
.put_pre_executed_payload_envelope(
|
||||||
|
unverified_envelope.envelope_cloned(),
|
||||||
|
envelope_source,
|
||||||
|
)?;
|
||||||
|
|
||||||
let _full_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_TIMES);
|
let _full_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_TIMES);
|
||||||
|
|
||||||
@@ -79,11 +84,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
.into_executed_payload_envelope(execution_pending)
|
.into_executed_payload_envelope(execution_pending)
|
||||||
.await
|
.await
|
||||||
.inspect_err(|_| {
|
.inspect_err(|_| {
|
||||||
// TODO(gloas) If the envelope fails execution for whatever reason (e.g. engine offline),
|
// If the envelope fails execution for whatever reason (e.g. engine offline),
|
||||||
// and we keep it in the cache, then the node will NOT perform lookup and
|
// and we keep it in the cache, then the node will NOT perform lookup and
|
||||||
// reprocess this block until the block is evicted from DA checker, causing the
|
// reprocess this envelope until the envelope is evicted from DA checker, causing the
|
||||||
// chain to get stuck temporarily if the block is canonical. Therefore we remove
|
// chain to get stuck temporarily if the envelope is canonical. Therefore we remove
|
||||||
// it from the cache if execution fails.
|
// it from the cache if execution fails.
|
||||||
|
// self.data_availability_checker.v2().remove_pre_executed_envelop(block_root);
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
// Record the time it took to wait for execution layer verification.
|
// Record the time it took to wait for execution layer verification.
|
||||||
@@ -111,7 +117,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
info!(
|
info!(
|
||||||
?block_root,
|
?block_root,
|
||||||
%block_slot,
|
%block_slot,
|
||||||
source = %block_source,
|
source = %envelope_source,
|
||||||
"Execution payload envelope imported"
|
"Execution payload envelope imported"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -39,6 +39,7 @@ pub mod gossip_verified_envelope;
|
|||||||
pub mod import;
|
pub mod import;
|
||||||
mod payload_notifier;
|
mod payload_notifier;
|
||||||
|
|
||||||
|
use crate::data_availability_checker::AvailabilityCheckError;
|
||||||
pub use execution_pending_envelope::ExecutionPendingEnvelope;
|
pub use execution_pending_envelope::ExecutionPendingEnvelope;
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq)]
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
@@ -221,6 +222,9 @@ pub enum EnvelopeError {
|
|||||||
ExecutionPayloadError(ExecutionPayloadError),
|
ExecutionPayloadError(ExecutionPayloadError),
|
||||||
/// An error from block-level checks reused during envelope import
|
/// An error from block-level checks reused during envelope import
|
||||||
BlockError(BlockError),
|
BlockError(BlockError),
|
||||||
|
/// The envelope satisfied all validity conditions except consistency
|
||||||
|
/// with the corresponding columns that we received over gossip/rpc.
|
||||||
|
AvailabilityCheck(AvailabilityCheckError),
|
||||||
/// Internal error
|
/// Internal error
|
||||||
InternalError(String),
|
InternalError(String),
|
||||||
}
|
}
|
||||||
@@ -261,7 +265,12 @@ impl From<BlockError> for EnvelopeError {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Pull errors up from EnvelopeProcessingError to EnvelopeError
|
impl From<AvailabilityCheckError> for EnvelopeError {
|
||||||
|
fn from(e: AvailabilityCheckError) -> Self {
|
||||||
|
EnvelopeError::AvailabilityCheck(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<EnvelopeProcessingError> for EnvelopeError {
|
impl From<EnvelopeProcessingError> for EnvelopeError {
|
||||||
fn from(e: EnvelopeProcessingError) -> Self {
|
fn from(e: EnvelopeProcessingError) -> Self {
|
||||||
match e {
|
match e {
|
||||||
|
|||||||
Reference in New Issue
Block a user