diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 23fecb7d5a..8575089c78 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3799,14 +3799,18 @@ impl BeaconChain { } } AvailabilityOutcome::Payload(availability) => match availability { - PayloadAvailability::Available(available_payload_data) => { + PayloadAvailability::Available(available_envelope) => { // TODO(gloas) execution publish_fn // publish_fn()?; - // Payload data is fully available - let (block_root, data_columns) = *available_payload_data; - self.import_available_payload_data(block_root, data_columns) + // Payload envelope is fully available + let res = self + .import_available_execution_payload_envelope(available_envelope) .await + .unwrap(); + + // TODO(gloas) unwrap + Ok(res) } PayloadAvailability::MissingComponents(block_root) => Ok( AvailabilityProcessingStatus::MissingComponents(slot, block_root), @@ -3815,17 +3819,6 @@ impl BeaconChain { } } - #[instrument(skip_all)] - pub async fn import_available_payload_data( - self: &Arc, - block_root: Hash256, - _data_columns: Vec>>, - ) -> Result { - // TODO(gloas) this is just a stub implementation - // this function should mark payload data as available somehow - Ok(AvailabilityProcessingStatus::Imported(block_root)) - } - #[instrument(skip_all)] pub async fn import_available_block( self: &Arc, diff --git a/beacon_node/beacon_chain/src/data_availability_checker_v2/mod.rs b/beacon_node/beacon_chain/src/data_availability_checker_v2/mod.rs index cad8070a65..448d87cfcd 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker_v2/mod.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker_v2/mod.rs @@ -3,6 +3,7 @@ use crate::data_availability_checker_v2::pending_components_cache::{ }; use crate::data_availability_checker::AvailabilityCheckError; +use crate::payload_envelope_verification::AvailableExecutedEnvelope; use crate::{BeaconChain, BeaconChainTypes, CustodyContext, metrics}; use kzg::Kzg; use slot_clock::SlotClock; @@ -17,6 +18,7 @@ use types::{ SignedExecutionPayloadBid, Slot, }; +mod payload_envelope_cache; mod pending_components_cache; use crate::data_column_verification::{ @@ -45,7 +47,7 @@ pub type AvailableData = (Hash256, DataColumnSidecarList); /// Indicates if the payloads data is fully `Available` or if we need more columns. pub enum Availability { MissingComponents(Hash256), - Available(Box>), + Available(Box>), } impl Debug for Availability { @@ -54,7 +56,8 @@ impl Debug for Availability { Self::MissingComponents(block_root) => { write!(f, "MissingComponents({})", block_root) } - Self::Available(data) => write!(f, "Available({}, {} columns)", data.0, data.1.len()), + // TODO(gloas) fix success case + Self::Available(data) => todo!(), } } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker_v2/payload_envelope_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker_v2/payload_envelope_cache.rs new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/beacon_node/beacon_chain/src/data_availability_checker_v2/payload_envelope_cache.rs @@ -0,0 +1 @@ + diff --git a/beacon_node/beacon_chain/src/data_availability_checker_v2/pending_components_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker_v2/pending_components_cache.rs index 51a19554dd..1ccf953036 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker_v2/pending_components_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker_v2/pending_components_cache.rs @@ -3,17 +3,26 @@ use crate::CustodyContext; use crate::data_availability_checker::AvailabilityCheckError; use crate::data_availability_checker_v2::Availability; use crate::data_column_verification::KzgVerifiedCustodyDataColumn; +use crate::payload_envelope_verification::AvailabilityPendingExecutedEnvelope; +use crate::payload_envelope_verification::AvailableEnvelope; +use crate::payload_envelope_verification::AvailableExecutedEnvelope; use lru::LruCache; use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; use std::cmp::Ordering; use std::num::NonZeroUsize; use std::sync::Arc; use tracing::{Span, debug, debug_span}; +use types::BlockImportSource; use types::{ ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256, - SignedExecutionPayloadBid, + SignedExecutionPayloadBid, SignedExecutionPayloadEnvelope, }; +pub enum CachedPayloadEnvelope { + PreExecution(Arc>, BlockImportSource), + Executed(Box>), +} + /// This represents the components of a payload pending data availability. /// /// The columns are all gossip and kzg verified. @@ -24,9 +33,12 @@ pub struct PendingComponents { pub block_root: Hash256, /// The execution payload bid containing blob_kzg_commitments. pub bid: Option>>, + /// a cached pre or post executed payload envelope + pub envelope: Option>, pub verified_data_columns: Vec>, pub reconstruction_started: bool, span: Span, + spec: Arc, } impl PendingComponents { @@ -68,6 +80,19 @@ impl PendingComponents { self.bid = Some(bid); } + pub fn insert_pending_executed_envelope( + &mut self, + envelope: Arc>, + import_source: BlockImportSource, + ) { + self.envelope = Some(CachedPayloadEnvelope::PreExecution(envelope, import_source)) + } + + /// Inserts an executed payload envelope into the cache. + pub fn insert_executed_envelope(&mut self, envelope: AvailabilityPendingExecutedEnvelope) { + self.envelope = Some(CachedPayloadEnvelope::Executed(Box::new(envelope))) + } + /// Returns the number of blobs expected by reading the bid's kzg commitments. /// Returns an error if the bid is not cached. This function should only be called /// after ensuring that the bid has been cached. @@ -80,66 +105,92 @@ impl PendingComponents { Ok(bid.message.blob_kzg_commitments.len()) } - /// Returns `Some` if the bid and all required data columns have been received. + /// Returns `Some` if the envelope and all required data columns have been received. pub fn make_available( &self, num_expected_columns: usize, - ) -> Result>, AvailabilityCheckError> { - // Check if we have a bid - if not, still waiting + ) -> Result>, AvailabilityCheckError> { + // If no bid has been received, we can start verifying the columns if self.bid.is_none() { return Ok(None); } + // Check if the payload has been received and executed + let Some(CachedPayloadEnvelope::Executed(envelope)) = self.envelope.as_ref() else { + return Ok(None); + }; + + let AvailabilityPendingExecutedEnvelope { + envelope, + import_data, + payload_verification_outcome, + } = envelope.as_ref(); + // Get the number of blobs expected from the bid let num_expected_blobs = self.num_blobs_expected()?; - if num_expected_blobs == 0 { - // No blobs expected, data is available (empty) + let columns = if num_expected_blobs == 0 { self.span.in_scope(|| { debug!("Bid has no blobs, data is available"); }); - return Ok(Some(vec![])); - } + vec![] + } else { + let num_received_columns = self.verified_data_columns.len(); + match num_received_columns.cmp(&num_expected_columns) { + Ordering::Greater => { + // Should never happen + return Err(AvailabilityCheckError::Unexpected(format!( + "too many columns got {num_received_columns} expected {num_expected_columns}" + ))); + } + Ordering::Equal => { + // We have enough columns + let data_columns = self + .verified_data_columns + .iter() + .map(|d| d.clone().into_inner()) + .collect::>(); - let num_received_columns = self.verified_data_columns.len(); - match num_received_columns.cmp(&num_expected_columns) { - Ordering::Greater => { - // Should never happen - Err(AvailabilityCheckError::Unexpected(format!( - "too many columns got {num_received_columns} expected {num_expected_columns}" - ))) - } - Ordering::Equal => { - // We have enough columns - let data_columns = self - .verified_data_columns - .iter() - .map(|d| d.clone().into_inner()) - .collect::>(); + self.span.in_scope(|| { + debug!("All data columns received, data is available"); + }); - self.span.in_scope(|| { - debug!("All data columns received, data is available"); - }); + data_columns + } + Ordering::Less => { + // Not enough data columns received yet + return Ok(None); + } + } + }; - Ok(Some(data_columns)) - } - Ordering::Less => { - // Not enough data columns received yet - Ok(None) - } - } + let available_envelope = AvailableEnvelope { + execution_block_hash: envelope.block_hash(), + envelope: envelope.clone(), + columns, + columns_available_timestamp: None, + spec: self.spec.clone(), + }; + + Ok(Some(AvailableExecutedEnvelope { + envelope: available_envelope, + import_data: import_data.clone(), + payload_verification_outcome: payload_verification_outcome.clone(), + })) } /// Returns an empty `PendingComponents` object with the given block root. - pub fn empty(block_root: Hash256) -> Self { + pub fn empty(block_root: Hash256, spec: Arc) -> Self { let span = debug_span!(parent: None, "lh_pending_components", %block_root); let _guard = span.clone().entered(); Self { block_root, bid: None, + envelope: None, verified_data_columns: vec![], reconstruction_started: false, span, + spec, } } @@ -294,7 +345,7 @@ impl DataAvailabilityCheckerInner { pending_components: MappedRwLockReadGuard<'_, PendingComponents>, num_expected_columns: usize, ) -> Result, AvailabilityCheckError> { - if let Some(columns) = pending_components.make_available(num_expected_columns)? { + if let Some(available_envelope) = pending_components.make_available(num_expected_columns)? { // Explicitly drop read lock before acquiring write lock drop(pending_components); if let Some(components) = self.critical.write().get_mut(&block_root) { @@ -308,7 +359,7 @@ impl DataAvailabilityCheckerInner { // imported, but re-inserted immediately, causing partial pending components to be // stored and served to peers. // Components are only removed via LRU eviction as finality advances. - Ok(Availability::Available(Box::new((block_root, columns)))) + Ok(Availability::Available(Box::new(available_envelope))) } else { Ok(Availability::MissingComponents(block_root)) } @@ -330,8 +381,9 @@ impl DataAvailabilityCheckerInner { let mut write_lock = self.critical.write(); { - let pending_components = - write_lock.get_or_insert_mut(block_root, || PendingComponents::empty(block_root)); + let pending_components = write_lock.get_or_insert_mut(block_root, || { + PendingComponents::empty(block_root, self.spec.clone()) + }); update_fn(pending_components)? } @@ -433,6 +485,8 @@ impl DataAvailabilityCheckerInner { #[cfg(test)] mod pending_components_tests { + use crate::test_utils::test_spec; + use super::*; use types::MinimalEthSpec; @@ -440,8 +494,9 @@ mod pending_components_tests { #[test] fn test_empty_pending_components() { + let spec = Arc::new(test_spec::()); let block_root = Hash256::random(); - let components = PendingComponents::::empty(block_root); + let components = PendingComponents::::empty(block_root, spec); assert_eq!(components.block_root, block_root); assert!(components.bid.is_none()); @@ -452,8 +507,9 @@ mod pending_components_tests { #[test] fn test_get_cached_data_columns_indices_empty() { + let spec = Arc::new(test_spec::()); let block_root = Hash256::random(); - let components = PendingComponents::::empty(block_root); + let components = PendingComponents::::empty(block_root, spec); let indices = components.get_cached_data_columns_indices(); assert!(indices.is_empty()); @@ -461,8 +517,9 @@ mod pending_components_tests { #[test] fn test_status_str_no_bid() { + let spec = Arc::new(test_spec::()); let block_root = Hash256::random(); - let components = PendingComponents::::empty(block_root); + let components = PendingComponents::::empty(block_root, spec); let status = components.status_str(10); assert_eq!(status, "data_columns 0/10"); @@ -470,8 +527,9 @@ mod pending_components_tests { #[test] fn test_num_blobs_expected_no_bid() { + let spec = Arc::new(test_spec::()); let block_root = Hash256::random(); - let components = PendingComponents::::empty(block_root); + let components = PendingComponents::::empty(block_root, spec); let result = components.num_blobs_expected(); assert!(result.is_err()); @@ -484,8 +542,9 @@ mod pending_components_tests { #[test] fn test_make_available_no_bid_returns_none() { + let spec = Arc::new(test_spec::()); let block_root = Hash256::random(); - let components = PendingComponents::::empty(block_root); + let components = PendingComponents::::empty(block_root, spec); // Without a bid, make_available should return Ok(None) let result = components.make_available(10); diff --git a/beacon_node/beacon_chain/src/data_availability_router.rs b/beacon_node/beacon_chain/src/data_availability_router.rs index 1524e4a5ae..78de0d8935 100644 --- a/beacon_node/beacon_chain/src/data_availability_router.rs +++ b/beacon_node/beacon_chain/src/data_availability_router.rs @@ -64,7 +64,9 @@ impl AvailabilityOutcome { Self::Block(BlockAvailability::Available(block)) => block.import_data.block_root, Self::Block(BlockAvailability::MissingComponents(root)) => *root, // For payload availability, the first element of the tuple is the block root - Self::Payload(PayloadAvailability::Available(available_data)) => available_data.0, + Self::Payload(PayloadAvailability::Available(available_data)) => { + available_data.envelope.message().beacon_block_root + } Self::Payload(PayloadAvailability::MissingComponents(root)) => *root, } } diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs index c707d62dc7..d9503a0272 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -41,7 +41,7 @@ mod payload_notifier; pub use execution_pending_envelope::ExecutionPendingEnvelope; -#[derive(PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct EnvelopeImportData { pub block_root: Hash256, pub post_state: Box>, @@ -50,11 +50,11 @@ pub struct EnvelopeImportData { #[derive(Debug)] #[allow(dead_code)] pub struct AvailableEnvelope { - execution_block_hash: ExecutionBlockHash, - envelope: Arc>, - columns: DataColumnSidecarList, + pub execution_block_hash: ExecutionBlockHash, + pub envelope: Arc>, + pub columns: DataColumnSidecarList, /// Timestamp at which this envelope first became available (UNIX timestamp, time since 1970). - columns_available_timestamp: Option, + pub columns_available_timestamp: Option, pub spec: Arc, } @@ -132,6 +132,33 @@ impl ExecutedEnvelope { } } +/// A payload ernvelope that has completed all envelope procesing checks, verification +/// by an EL client but does not have all requisite columns to get imported into +/// fork choice. +pub struct AvailabilityPendingExecutedEnvelope { + pub envelope: Arc>, + pub import_data: EnvelopeImportData, + pub payload_verification_outcome: PayloadVerificationOutcome, +} + +impl AvailabilityPendingExecutedEnvelope { + pub fn new( + envelope: Arc>, + import_data: EnvelopeImportData, + payload_verification_outcome: PayloadVerificationOutcome, + ) -> Self { + Self { + envelope, + import_data, + payload_verification_outcome, + } + } + + pub fn as_envelope(&self) -> &SignedExecutionPayloadEnvelope { + &self.envelope + } +} + /// A payload envelope that has completed all payload processing checks including verification /// by an EL client **and** has all requisite blob data to be imported into fork choice. pub struct AvailableExecutedEnvelope {