diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 8575089c78..2b380c0520 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3779,7 +3779,7 @@ impl BeaconChain { /// /// An error is returned if the block was unable to be imported. It may be partially imported /// (i.e., this function is not atomic). - async fn process_availability( + pub(crate) async fn process_availability( self: &Arc, slot: Slot, availability: AvailabilityOutcome, @@ -3801,16 +3801,12 @@ impl BeaconChain { AvailabilityOutcome::Payload(availability) => match availability { PayloadAvailability::Available(available_envelope) => { // TODO(gloas) execution publish_fn - // publish_fn()?; + publish_fn()?; // Payload envelope is fully available - let res = self - .import_available_execution_payload_envelope(available_envelope) + self.import_available_execution_payload_envelope(available_envelope) .await - .unwrap(); - - // TODO(gloas) unwrap - Ok(res) + .map_err(BlockError::from) } PayloadAvailability::MissingComponents(block_root) => Ok( AvailabilityProcessingStatus::MissingComponents(slot, block_root), diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 244b06f475..dcc006ecd8 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -60,6 +60,7 @@ use crate::execution_payload::{ }; use crate::kzg_utils::blobs_to_data_column_sidecars; use crate::observed_block_producers::SeenBlock; +use crate::payload_envelope_verification::EnvelopeError; use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::{ @@ -321,6 +322,12 @@ pub enum BlockError { bid_parent_root: Hash256, block_parent_root: Hash256, }, + /// An error occurred while processing a payload envelope. + /// + /// ## Peer scoring + /// + /// Peer scoring depends on the inner `EnvelopeError`. + EnvelopeError(EnvelopeError), } /// Which specific signature(s) are invalid in a SignedBeaconBlock @@ -340,6 +347,12 @@ impl From for BlockError { } } +impl From for BlockError { + fn from(e: EnvelopeError) -> Self { + Self::EnvelopeError(e) + } +} + /// Returned when block validation failed due to some issue verifying /// the execution payload. #[derive(Debug)] diff --git a/beacon_node/beacon_chain/src/data_availability_checker_v2/mod.rs b/beacon_node/beacon_chain/src/data_availability_checker_v2/mod.rs index 15a681ba35..c45f54b467 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker_v2/mod.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker_v2/mod.rs @@ -3,7 +3,9 @@ use crate::data_availability_checker_v2::pending_components_cache::{ }; use crate::data_availability_checker::AvailabilityCheckError; -use crate::payload_envelope_verification::AvailableExecutedEnvelope; +use crate::payload_envelope_verification::{ + AvailabilityPendingExecutedEnvelope, AvailableExecutedEnvelope, +}; use crate::{BeaconChain, BeaconChainTypes, CustodyContext, metrics}; use kzg::Kzg; use slot_clock::SlotClock; @@ -12,7 +14,7 @@ use std::fmt::Debug; use std::num::NonZeroUsize; use std::sync::Arc; use task_executor::TaskExecutor; -use tracing::{debug, error, instrument}; +use tracing::{debug, error, instrument, trace}; use types::{ BlockImportSource, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256, SignedExecutionPayloadBid, SignedExecutionPayloadEnvelope, Slot, @@ -158,6 +160,14 @@ impl DataAvailabilityChecker { }) } + pub fn put_executed_payload_envelope( + &self, + executed_envelope: AvailabilityPendingExecutedEnvelope, + ) -> Result, AvailabilityCheckError> { + self.availability_cache + .put_executed_payload_envelope(executed_envelope) + } + pub fn put_pre_executed_payload_envelope( &self, envelope: Arc>, @@ -167,10 +177,7 @@ impl DataAvailabilityChecker { .put_pre_executed_payload_envelope(envelope, source) } - pub fn remove_pre_executed_payload_envelope( - &self, - block_root: &Hash256, - ) { + pub fn remove_pre_executed_payload_envelope(&self, block_root: &Hash256) { self.availability_cache .remove_pre_executed_envelope(block_root); } @@ -373,7 +380,7 @@ pub fn start_availability_cache_maintenance_service( "availability_cache_service", ); } else { - debug!("Gloas fork not configured, not starting availability cache maintenance service"); + trace!("Gloas fork not configured, not starting availability cache maintenance service"); } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker_v2/pending_components_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker_v2/pending_components_cache.rs index de353c32b0..607c031694 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker_v2/pending_components_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker_v2/pending_components_cache.rs @@ -43,7 +43,6 @@ pub struct PendingComponents { } impl PendingComponents { - /// Returns an immutable reference to the cached data column. pub fn get_cached_data_column( &self, @@ -82,7 +81,14 @@ impl PendingComponents { self.bid = Some(bid); } - pub fn insert_pre_executed_envelope( + pub fn insert_executed_paylaod_envelope( + &mut self, + envelope: AvailabilityPendingExecutedEnvelope, + ) { + self.envelope = Some(CachedPayloadEnvelope::Executed(Box::new(envelope))) + } + + pub fn insert_pre_executed_payload_envelope( &mut self, envelope: Arc>, import_source: BlockImportSource, @@ -91,7 +97,10 @@ impl PendingComponents { } /// Inserts an executed payload envelope into the cache. - pub fn insert_executed_envelope(&mut self, envelope: AvailabilityPendingExecutedEnvelope) { + pub fn insert_executed_payload_envelope( + &mut self, + envelope: AvailabilityPendingExecutedEnvelope, + ) { self.envelope = Some(CachedPayloadEnvelope::Executed(Box::new(envelope))) } @@ -253,19 +262,25 @@ impl DataAvailabilityCheckerInner { /// Returns the envelope processing status for the given `block_root`. A `None` response indicates that /// the envelope has not yet been inserted into the cache. - pub fn get_envelope_processing_status(&self, block_root: &Hash256) -> Option> { + pub fn get_envelope_processing_status( + &self, + block_root: &Hash256, + ) -> Option> { self.critical .read() .peek(block_root) .and_then(|pending_components| { - pending_components.envelope.as_ref().map(|envelope| match envelope { - CachedPayloadEnvelope::PreExecution(e, source) => { - PayloadEnvelopeProcessingStatus::NotValidated(e.clone(), *source) - } - CachedPayloadEnvelope::Executed(e) => { - PayloadEnvelopeProcessingStatus::ExecutionValidated(e.envelope.clone()) - } - }) + pending_components + .envelope + .as_ref() + .map(|envelope| match envelope { + CachedPayloadEnvelope::PreExecution(e, source) => { + PayloadEnvelopeProcessingStatus::NotValidated(e.clone(), *source) + } + CachedPayloadEnvelope::Executed(e) => { + PayloadEnvelopeProcessingStatus::ExecutionValidated(e.envelope.clone()) + } + }) }) } @@ -321,6 +336,35 @@ impl DataAvailabilityCheckerInner { self.check_availability(block_root, pending_components, num_expected_columns) } + pub fn put_executed_payload_envelope( + &self, + executed_envelope: AvailabilityPendingExecutedEnvelope, + ) -> Result, AvailabilityCheckError> { + let epoch = executed_envelope.envelope.epoch(); + let beacon_block_root = executed_envelope.envelope.beacon_block_root(); + let pending_components = + self.update_or_insert_pending_components(beacon_block_root, |pending_components| { + pending_components.insert_executed_payload_envelope(executed_envelope); + Ok(()) + })?; + + let num_expected_columns_opt = self.get_num_expected_columns(epoch); + + pending_components.span.in_scope(|| { + debug!( + component = "executed envelope", + status = pending_components.status_str(num_expected_columns_opt), + "Component added to data availability checker" + ); + }); + + self.check_availability( + beacon_block_root, + pending_components, + num_expected_columns_opt, + ) + } + pub fn put_pre_executed_payload_envelope( &self, envelope: Arc>, @@ -330,7 +374,7 @@ impl DataAvailabilityCheckerInner { let beacon_block_root = envelope.beacon_block_root(); let pending_components = self.update_or_insert_pending_components(beacon_block_root, |pending_components| { - pending_components.insert_pre_executed_envelope(envelope, source); + pending_components.insert_pre_executed_payload_envelope(envelope, source); Ok(()) })?; @@ -351,7 +395,9 @@ impl DataAvailabilityCheckerInner { /// This does NOT remove an existing executed envelope. pub fn remove_pre_executed_envelope(&self, block_root: &Hash256) { // The read lock is immediately dropped so we can safely remove the envelope from the cache. - if let Some(PayloadEnvelopeProcessingStatus::NotValidated(_, _)) = self.get_envelope_processing_status(block_root) { + if let Some(PayloadEnvelopeProcessingStatus::NotValidated(_, _)) = + self.get_envelope_processing_status(block_root) + { // If the envelope is execution invalid, this status is permanent and idempotent to this // block_root. We drop its components (e.g. columns) because they will never be useful. self.critical.write().pop(block_root); diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index 4f754abd2c..b9b9507a6f 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -13,8 +13,14 @@ use super::{ }; use crate::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, - NotifyExecutionLayer, block_verification_types::AvailableBlockData, metrics, - payload_envelope_verification::ExecutionPendingEnvelope, validator_monitor::get_slot_delay_ms, + NotifyExecutionLayer, + block_verification_types::AvailableBlockData, + data_availability_router::AvailabilityOutcome, + metrics, + payload_envelope_verification::{ + AvailabilityPendingExecutedEnvelope, ExecutionPendingEnvelope, + }, + validator_monitor::get_slot_delay_ms, }; const ENVELOPE_METRICS_CACHE_SLOT_LIMIT: u32 = 64; @@ -89,7 +95,9 @@ impl BeaconChain { // reprocess this envelope until the envelope is evicted from DA checker, causing the // chain to get stuck temporarily if the envelope is canonical. Therefore we remove // it from the cache if execution fails. - // self.data_availability_checker.v2().remove_pre_executed_envelop(block_root); + self.data_availability_checker + .v2() + .remove_pre_executed_payload_envelope(&block_root); })?; // Record the time it took to wait for execution layer verification. @@ -104,9 +112,9 @@ impl BeaconChain { self.import_available_execution_payload_envelope(Box::new(envelope)) .await } - ExecutedEnvelope::AvailabilityPending() => Err(EnvelopeError::InternalError( - "Pending payload envelope not yet implemented".to_owned(), - )), + ExecutedEnvelope::AvailabilityPending(envelope) => { + self.check_envelope_availability_and_import(envelope).await + } } }; @@ -153,6 +161,24 @@ impl BeaconChain { } } + /// Checks if the payload envelope is available, and imports immediately if so, otherwise caches the envelope + /// in the data availability checker. + #[instrument(skip_all)] + async fn check_envelope_availability_and_import( + self: &Arc, + envelope: AvailabilityPendingExecutedEnvelope, + ) -> Result { + let slot = envelope.envelope.slot(); + let availability = AvailabilityOutcome::Payload( + self.data_availability_checker + .v2() + .put_executed_payload_envelope(envelope)?, + ); + self.process_availability(slot, availability, || Ok(())) + .await + .map_err(EnvelopeError::BlockError) + } + /// Accepts a fully-verified payload envelope and awaits on its payload verification handle to /// get a fully `ExecutedEnvelope`. /// diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs index 908da9a609..11de97a441 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -106,8 +106,7 @@ pub struct EnvelopeProcessingSnapshot { /// fully available. pub enum ExecutedEnvelope { Available(AvailableExecutedEnvelope), - // TODO(gloas) implement availability pending - AvailabilityPending(), + AvailabilityPending(AvailabilityPendingExecutedEnvelope), } impl ExecutedEnvelope { @@ -124,11 +123,14 @@ impl ExecutedEnvelope { payload_verification_outcome, )) } - // TODO(gloas) implement availability pending MaybeAvailableEnvelope::AvailabilityPending { block_hash: _, - envelope: _, - } => Self::AvailabilityPending(), + envelope, + } => Self::AvailabilityPending(AvailabilityPendingExecutedEnvelope::new( + envelope, + import_data, + payload_verification_outcome, + )), } } }