diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index c1d30253a3..990f4b6099 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3146,7 +3146,7 @@ impl BeaconChain { self: &Arc, slot: Slot, block_root: Hash256, - engine_get_blobs_output: EngineGetBlobsOutput, + engine_get_blobs_output: EngineGetBlobsOutput, ) -> Result { // If this block has already been imported to forkchoice it must have been available, so // we don't need to process its blobs again. @@ -3161,7 +3161,7 @@ impl BeaconChain { // process_engine_blobs is called for both pre and post PeerDAS. However, post PeerDAS // consumers don't expect the blobs event to fire erratically. if let EngineGetBlobsOutput::Blobs(blobs) = &engine_get_blobs_output { - self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref)); + self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().map(|b| b.as_blob())); } let r = self @@ -3545,7 +3545,9 @@ impl BeaconChain { if let Some(slasher) = self.slasher.as_ref() { slasher.accept_block_header(blob.signed_block_header()); } - let availability = self.data_availability_checker.put_gossip_blob(blob)?; + let availability = self + .data_availability_checker + .put_gossip_verified_blobs(blob.block_root(), std::iter::once(blob))?; self.process_availability(slot, availability, || Ok(())) .await @@ -3568,21 +3570,21 @@ impl BeaconChain { let availability = self .data_availability_checker - .put_gossip_data_columns(block_root, data_columns)?; + .put_gossip_verified_data_columns(block_root, data_columns)?; self.process_availability(slot, availability, publish_fn) .await } - fn check_blobs_for_slashability( + fn check_blobs_for_slashability<'a>( self: &Arc, block_root: Hash256, - blobs: &FixedBlobSidecarList, + blobs: impl IntoIterator>, ) -> Result<(), BlockError> { let mut slashable_cache = self.observed_slashable.write(); for header in blobs - .iter() - .filter_map(|b| b.as_ref().map(|b| b.signed_block_header.clone())) + .into_iter() + .map(|b| b.signed_block_header.clone()) .unique() { if verify_header_signature::(self, &header).is_ok() { @@ -3609,7 +3611,7 @@ impl BeaconChain { block_root: Hash256, blobs: FixedBlobSidecarList, ) -> Result { - self.check_blobs_for_slashability(block_root, &blobs)?; + self.check_blobs_for_slashability(block_root, blobs.iter().flatten().map(Arc::as_ref))?; let availability = self .data_availability_checker .put_rpc_blobs(block_root, blobs)?; @@ -3622,18 +3624,21 @@ impl BeaconChain { self: &Arc, slot: Slot, block_root: Hash256, - engine_get_blobs_output: EngineGetBlobsOutput, + engine_get_blobs_output: EngineGetBlobsOutput, ) -> Result { let availability = match engine_get_blobs_output { EngineGetBlobsOutput::Blobs(blobs) => { - self.check_blobs_for_slashability(block_root, &blobs)?; + self.check_blobs_for_slashability(block_root, blobs.iter().map(|b| b.as_blob()))?; self.data_availability_checker - .put_engine_blobs(block_root, blobs)? + .put_gossip_verified_blobs(block_root, blobs)? } EngineGetBlobsOutput::CustodyColumns(data_columns) => { - self.check_columns_for_slashability(block_root, &data_columns)?; + self.check_columns_for_slashability( + block_root, + data_columns.iter().map(|c| c.as_data_column()), + )?; self.data_availability_checker - .put_engine_data_columns(block_root, data_columns)? + .put_gossip_verified_data_columns(block_root, data_columns)? } }; @@ -3649,7 +3654,10 @@ impl BeaconChain { block_root: Hash256, custody_columns: DataColumnSidecarList, ) -> Result { - self.check_columns_for_slashability(block_root, &custody_columns)?; + self.check_columns_for_slashability( + block_root, + custody_columns.iter().map(|c| c.as_ref()), + )?; // This slot value is purely informative for the consumers of // `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot. @@ -3661,16 +3669,21 @@ impl BeaconChain { .await } - fn check_columns_for_slashability( + fn check_columns_for_slashability<'a>( self: &Arc, block_root: Hash256, - custody_columns: &DataColumnSidecarList, + custody_columns: impl IntoIterator>, ) -> Result<(), BlockError> { let mut slashable_cache = self.observed_slashable.write(); - // Assumes all items in custody_columns are for the same block_root - if let Some(column) = custody_columns.first() { - let header = &column.signed_block_header; - if verify_header_signature::(self, header).is_ok() { + // Process all unique block headers - previous logic assumed all headers were identical and + // only processed the first one. However, we should not make assumptions about data received + // from RPC. + for header in custody_columns + .into_iter() + .map(|c| c.signed_block_header.clone()) + .unique() + { + if verify_header_signature::(self, &header).is_ok() { slashable_cache .observe_slashable( header.message.slot, @@ -3679,7 +3692,7 @@ impl BeaconChain { ) .map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))?; if let Some(slasher) = self.slasher.as_ref() { - slasher.accept_block_header(header.clone()); + slasher.accept_block_header(header); } } } diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 6fe710f41a..d7acb78408 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -166,6 +166,16 @@ pub struct GossipVerifiedBlob, } +impl Clone for GossipVerifiedBlob { + fn clone(&self) -> Self { + Self { + block_root: self.block_root, + blob: self.blob.clone(), + _phantom: PhantomData, + } + } +} + impl GossipVerifiedBlob { pub fn new( blob: Arc>, @@ -335,21 +345,9 @@ impl KzgVerifiedBlobList { } /// Create a `KzgVerifiedBlobList` from `blobs` that are already KZG verified. - /// - /// This should be used with caution, as used incorrectly it could result in KZG verification - /// being skipped and invalid blobs being deemed valid. - pub fn from_verified>>>( - blobs: I, - seen_timestamp: Duration, - ) -> Self { + pub fn from_verified>>(blobs: I) -> Self { Self { - verified_blobs: blobs - .into_iter() - .map(|blob| KzgVerifiedBlob { - blob, - seen_timestamp, - }) - .collect(), + verified_blobs: blobs.into_iter().collect(), } } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 6f292f3551..0fd417389b 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -17,7 +17,7 @@ use task_executor::TaskExecutor; use tracing::{debug, error, info_span, Instrument}; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; use types::{ - BlobSidecarList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, Hash256, + BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, }; @@ -32,6 +32,7 @@ use crate::data_column_verification::{ use crate::metrics::{ KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES, }; +use crate::observed_data_sidecars::ObservationStrategy; pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory}; use types::non_zero_usize::new_non_zero_usize; @@ -155,6 +156,21 @@ impl DataAvailabilityChecker { }) } + /// Check if the exact data column is in the availability cache. + pub fn is_data_column_cached( + &self, + block_root: &Hash256, + data_column: &DataColumnSidecar, + ) -> bool { + self.availability_cache + .peek_pending_components(block_root, |components| { + components.is_some_and(|components| { + let cached_column_opt = components.get_cached_data_column(data_column.index); + cached_column_opt.is_some_and(|cached| *cached == *data_column) + }) + }) + } + /// Get a blob from the availability cache. pub fn get_blob( &self, @@ -218,65 +234,21 @@ impl DataAvailabilityChecker { .put_kzg_verified_data_columns(block_root, verified_custody_columns) } - /// Put a list of blobs received from the EL pool into the availability cache. - /// - /// This DOES NOT perform KZG verification because the KZG proofs should have been constructed - /// immediately prior to calling this function so they are assumed to be valid. - pub fn put_engine_blobs( - &self, - block_root: Hash256, - blobs: FixedBlobSidecarList, - ) -> Result, AvailabilityCheckError> { - let seen_timestamp = self - .slot_clock - .now_duration() - .ok_or(AvailabilityCheckError::SlotClockError)?; - self.availability_cache.put_kzg_verified_blobs( - block_root, - KzgVerifiedBlobList::from_verified(blobs.iter().flatten().cloned(), seen_timestamp), - ) - } - - /// Put a list of data columns computed from blobs received from the EL pool into the - /// availability cache. - /// - /// This DOES NOT perform KZG proof and inclusion proof verification because - /// - The KZG proofs should have been verified by the trusted EL. - /// - The KZG commitments inclusion proof should have been constructed immediately prior to - /// calling this function so they are assumed to be valid. - /// - /// This method is used if the EL already has the blobs and returns them via the `getBlobsV2` - /// engine method. - /// More details in [fetch_blobs.rs](https://github.com/sigp/lighthouse/blob/44f8add41ea2252769bb967864af95b3c13af8ca/beacon_node/beacon_chain/src/fetch_blobs.rs). - pub fn put_engine_data_columns( - &self, - block_root: Hash256, - data_columns: DataColumnSidecarList, - ) -> Result, AvailabilityCheckError> { - let kzg_verified_custody_columns = data_columns - .into_iter() - .map(|d| { - KzgVerifiedCustodyDataColumn::from_asserted_custody( - KzgVerifiedDataColumn::from_verified(d), - ) - }) - .collect::>(); - - self.availability_cache - .put_kzg_verified_data_columns(block_root, kzg_verified_custody_columns) - } - /// Check if we've cached other blobs for this block. If it completes a set and we also /// have a block cached, return the `Availability` variant triggering block import. /// Otherwise cache the blob sidecar. /// /// This should only accept gossip verified blobs, so we should not have to worry about dupes. - pub fn put_gossip_blob( + pub fn put_gossip_verified_blobs< + I: IntoIterator>, + O: ObservationStrategy, + >( &self, - gossip_blob: GossipVerifiedBlob, + block_root: Hash256, + blobs: I, ) -> Result, AvailabilityCheckError> { self.availability_cache - .put_kzg_verified_blobs(gossip_blob.block_root(), vec![gossip_blob.into_inner()]) + .put_kzg_verified_blobs(block_root, blobs.into_iter().map(|b| b.into_inner())) } /// Check if we've cached other data columns for this block. If it satisfies the custody requirement and we also @@ -284,13 +256,15 @@ impl DataAvailabilityChecker { /// Otherwise cache the data column sidecar. /// /// This should only accept gossip verified data columns, so we should not have to worry about dupes. - #[allow(clippy::type_complexity)] - pub fn put_gossip_data_columns( + pub fn put_gossip_verified_data_columns< + O: ObservationStrategy, + I: IntoIterator>, + >( &self, block_root: Hash256, - gossip_data_columns: Vec>, + data_columns: I, ) -> Result, AvailabilityCheckError> { - let custody_columns = gossip_data_columns + let custody_columns = data_columns .into_iter() .map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner())) .collect::>(); diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index b43b259cf6..4e847d9f9f 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -129,6 +129,10 @@ pub enum GossipDataColumnError { slot: Slot, index: ColumnIndex, }, + /// A column has already been processed from non-gossip source and have not yet been seen on + /// the gossip network. + /// This column should be accepted and forwarded over gossip. + PriorKnownUnpublished, /// Data column index must be between 0 and `NUMBER_OF_COLUMNS` (exclusive). /// /// ## Peer scoring @@ -181,6 +185,16 @@ pub struct GossipVerifiedDataColumn, } +impl Clone for GossipVerifiedDataColumn { + fn clone(&self) -> Self { + Self { + block_root: self.block_root, + data_column: self.data_column.clone(), + _phantom: PhantomData, + } + } +} + impl GossipVerifiedDataColumn { pub fn new( column_sidecar: Arc>, @@ -200,6 +214,16 @@ impl GossipVerifiedDataColumn ) } + /// Create a `GossipVerifiedDataColumn` from `DataColumnSidecar` for testing ONLY. + #[cfg(test)] + pub(crate) fn __new_for_testing(column_sidecar: Arc>) -> Self { + Self { + block_root: column_sidecar.block_root(), + data_column: KzgVerifiedDataColumn::__new_for_testing(column_sidecar), + _phantom: Default::default(), + } + } + pub fn as_data_column(&self) -> &DataColumnSidecar { self.data_column.as_data_column() } @@ -243,11 +267,9 @@ impl KzgVerifiedDataColumn { verify_kzg_for_data_column(data_column, kzg) } - /// Create a `KzgVerifiedDataColumn` from `data_column` that are already KZG verified. - /// - /// This should be used with caution, as used incorrectly it could result in KZG verification - /// being skipped and invalid data_columns being deemed valid. - pub fn from_verified(data_column: Arc>) -> Self { + /// Create a `KzgVerifiedDataColumn` from `DataColumnSidecar` for testing ONLY. + #[cfg(test)] + pub(crate) fn __new_for_testing(data_column: Arc>) -> Self { Self { data: data_column } } @@ -444,6 +466,23 @@ pub fn validate_data_column_sidecar_for_gossip { @@ -74,11 +75,19 @@ impl FetchBlobsBeaconAdapter { GossipVerifiedBlob::::new(blob.clone(), blob.index, &self.chain) } + pub(crate) fn verify_data_column_for_gossip( + &self, + data_column: Arc>, + ) -> Result, GossipDataColumnError> { + let index = data_column.index; + GossipVerifiedDataColumn::::new(data_column, index, &self.chain) + } + pub(crate) async fn process_engine_blobs( &self, slot: Slot, block_root: Hash256, - blobs: EngineGetBlobsOutput, + blobs: EngineGetBlobsOutput, ) -> Result { self.chain .process_engine_blobs(slot, block_root, blobs) diff --git a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs index ba798137b0..927841376f 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs @@ -13,6 +13,7 @@ mod fetch_blobs_beacon_adapter; mod tests; use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; +use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; #[cfg_attr(test, double)] use crate::fetch_blobs::fetch_blobs_beacon_adapter::FetchBlobsBeaconAdapter; use crate::kzg_utils::blobs_to_data_column_sidecars; @@ -34,24 +35,17 @@ use tracing::{debug, warn}; use types::blob_sidecar::{BlobSidecarError, FixedBlobSidecarList}; use types::data_column_sidecar::DataColumnSidecarError; use types::{ - BeaconStateError, Blob, BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecarList, EthSpec, - FullPayload, Hash256, KzgProofs, SignedBeaconBlock, SignedBeaconBlockHeader, VersionedHash, + BeaconStateError, Blob, BlobSidecar, ChainSpec, ColumnIndex, EthSpec, FullPayload, Hash256, + KzgProofs, SignedBeaconBlock, SignedBeaconBlockHeader, VersionedHash, }; -/// Blobs or data column to be published to the gossip network. -pub enum BlobsOrDataColumns { +/// Result from engine get blobs to be passed onto `DataAvailabilityChecker` and published to the +/// gossip network. The blobs / data columns have not been marked as observed yet, as they may not +/// be published immediately. +pub enum EngineGetBlobsOutput { Blobs(Vec>), - DataColumns(DataColumnSidecarList), -} - -/// Result from engine get blobs to be passed onto `DataAvailabilityChecker`. -/// -/// The blobs are retrieved from a trusted EL and columns are computed locally, therefore they are -/// considered valid without requiring extra validation. -pub enum EngineGetBlobsOutput { - Blobs(FixedBlobSidecarList), /// A filtered list of custody data columns to be imported into the `DataAvailabilityChecker`. - CustodyColumns(DataColumnSidecarList), + CustodyColumns(Vec>), } #[derive(Debug)] @@ -64,6 +58,7 @@ pub enum FetchEngineBlobError { ExecutionLayerMissing, InternalError(String), GossipBlob(GossipBlobError), + GossipDataColumn(GossipDataColumnError), RequestFailed(ExecutionLayerError), RuntimeShutdown, TokioJoin(tokio::task::JoinError), @@ -76,7 +71,7 @@ pub async fn fetch_and_process_engine_blobs( block_root: Hash256, block: Arc>>, custody_columns: HashSet, - publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, + publish_fn: impl Fn(EngineGetBlobsOutput) + Send + 'static, ) -> Result, FetchEngineBlobError> { fetch_and_process_engine_blobs_inner( FetchBlobsBeaconAdapter::new(chain), @@ -95,7 +90,7 @@ async fn fetch_and_process_engine_blobs_inner( block_root: Hash256, block: Arc>>, custody_columns: HashSet, - publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, + publish_fn: impl Fn(EngineGetBlobsOutput) + Send + 'static, ) -> Result, FetchEngineBlobError> { let versioned_hashes = if let Some(kzg_commitments) = block .message() @@ -148,7 +143,7 @@ async fn fetch_and_process_blobs_v1( block_root: Hash256, block: Arc>, versioned_hashes: Vec, - publish_fn: impl Fn(BlobsOrDataColumns) + Send + Sized, + publish_fn: impl Fn(EngineGetBlobsOutput) + Send + Sized, ) -> Result, FetchEngineBlobError> { let num_expected_blobs = versioned_hashes.len(); metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64); @@ -189,7 +184,7 @@ async fn fetch_and_process_blobs_v1( // and be accepted (and propagated) while we are waiting to publish. Just before publishing // we will observe the blobs/columns and only proceed with publishing if they are not yet seen. let blobs_to_import_and_publish = fixed_blob_sidecar_list - .iter() + .into_iter() .filter_map(|opt_blob| { let blob = opt_blob.as_ref()?; match chain_adapter.verify_blob_for_gossip(blob) { @@ -203,7 +198,9 @@ async fn fetch_and_process_blobs_v1( .map_err(FetchEngineBlobError::GossipBlob)?; if !blobs_to_import_and_publish.is_empty() { - publish_fn(BlobsOrDataColumns::Blobs(blobs_to_import_and_publish)); + publish_fn(EngineGetBlobsOutput::Blobs( + blobs_to_import_and_publish.clone(), + )); } debug!(num_fetched_blobs, "Processing engine blobs"); @@ -212,7 +209,7 @@ async fn fetch_and_process_blobs_v1( .process_engine_blobs( block.slot(), block_root, - EngineGetBlobsOutput::Blobs(fixed_blob_sidecar_list.clone()), + EngineGetBlobsOutput::Blobs(blobs_to_import_and_publish), ) .await?; @@ -225,7 +222,7 @@ async fn fetch_and_process_blobs_v2( block: Arc>, versioned_hashes: Vec, custody_columns_indices: HashSet, - publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, + publish_fn: impl Fn(EngineGetBlobsOutput) + Send + 'static, ) -> Result, FetchEngineBlobError> { let num_expected_blobs = versioned_hashes.len(); @@ -278,6 +275,7 @@ async fn fetch_and_process_blobs_v2( return Ok(None); } + let chain_adapter = Arc::new(chain_adapter); let custody_columns = compute_and_publish_data_columns( &chain_adapter, block.clone(), @@ -303,15 +301,16 @@ async fn fetch_and_process_blobs_v2( /// Offload the data column computation to a blocking task to avoid holding up the async runtime. async fn compute_and_publish_data_columns( - chain_adapter: &FetchBlobsBeaconAdapter, + chain_adapter: &Arc>, block: Arc>>, blobs: Vec>, proofs: Vec>, custody_columns_indices: HashSet, - publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, -) -> Result, FetchEngineBlobError> { + publish_fn: impl Fn(EngineGetBlobsOutput) + Send + 'static, +) -> Result>, FetchEngineBlobError> { let kzg = chain_adapter.kzg().clone(); let spec = chain_adapter.spec().clone(); + let chain_adapter_cloned = chain_adapter.clone(); chain_adapter .executor() .spawn_blocking_handle( @@ -338,8 +337,54 @@ async fn compute_and_publish_data_columns( }) .map_err(FetchEngineBlobError::DataColumnSidecarError)?; - publish_fn(BlobsOrDataColumns::DataColumns(custody_columns.clone())); - Ok(custody_columns) + // Gossip verify data columns before publishing. This prevents blobs with invalid + // KZG proofs from the EL making it into the data availability checker. We do not + // immediately add these blobs to the observed blobs/columns cache because we want + // to allow blobs/columns to arrive on gossip and be accepted (and propagated) while + // we are waiting to publish. Just before publishing we will observe the blobs/columns + // and only proceed with publishing if they are not yet seen. + // TODO(das): we may want to just perform kzg proof verification here, since the + // `DataColumnSidecar` and inclusion proof is computed just above and is unnecessary + // to verify them. + let columns_to_import_and_publish = custody_columns + .into_iter() + .filter_map(|col| { + match chain_adapter_cloned.verify_data_column_for_gossip(col) { + Ok(verified) => Some(Ok(verified)), + Err(e) => match e { + // Ignore already seen data columns + GossipDataColumnError::PriorKnown { .. } + | GossipDataColumnError::PriorKnownUnpublished => None, + GossipDataColumnError::BeaconChainError(_) + | GossipDataColumnError::ProposalSignatureInvalid + | GossipDataColumnError::UnknownValidator(_) + | GossipDataColumnError::IsNotLaterThanParent { .. } + | GossipDataColumnError::InvalidKzgProof(_) + | GossipDataColumnError::InvalidSubnetId { .. } + | GossipDataColumnError::FutureSlot { .. } + | GossipDataColumnError::PastFinalizedSlot { .. } + | GossipDataColumnError::PubkeyCacheTimeout + | GossipDataColumnError::ProposerIndexMismatch { .. } + | GossipDataColumnError::ParentUnknown { .. } + | GossipDataColumnError::NotFinalizedDescendant { .. } + | GossipDataColumnError::InvalidInclusionProof + | GossipDataColumnError::InvalidColumnIndex(_) + | GossipDataColumnError::UnexpectedDataColumn + | GossipDataColumnError::InconsistentCommitmentsLength { .. } + | GossipDataColumnError::InconsistentProofsLength { .. } => { + Some(Err(e)) + } + }, + } + }) + .collect::, _>>() + .map_err(FetchEngineBlobError::GossipDataColumn)?; + + publish_fn(EngineGetBlobsOutput::CustodyColumns( + columns_to_import_and_publish.clone(), + )); + + Ok(columns_to_import_and_publish) }, "compute_and_publish_data_columns", ) diff --git a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs index be3d29e9c9..8eefd4ddf8 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs @@ -1,6 +1,7 @@ +use crate::data_column_verification::GossipVerifiedDataColumn; use crate::fetch_blobs::fetch_blobs_beacon_adapter::MockFetchBlobsBeaconAdapter; use crate::fetch_blobs::{ - fetch_and_process_engine_blobs_inner, BlobsOrDataColumns, FetchEngineBlobError, + fetch_and_process_engine_blobs_inner, EngineGetBlobsOutput, FetchEngineBlobError, }; use crate::test_utils::{get_kzg, EphemeralHarnessType}; use crate::AvailabilityProcessingStatus; @@ -148,6 +149,9 @@ async fn test_fetch_blobs_v2_success() { // All blobs returned, fork choice doesn't contain block mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs)); mock_fork_choice_contains_block(&mut mock_adapter, vec![]); + mock_adapter + .expect_verify_data_column_for_gossip() + .returning(|c| Ok(GossipVerifiedDataColumn::__new_for_testing(c))); mock_process_engine_blobs_result( &mut mock_adapter, Ok(AvailabilityProcessingStatus::Imported(block_root)), @@ -174,16 +178,16 @@ async fn test_fetch_blobs_v2_success() { assert!( matches!( published_columns, - BlobsOrDataColumns::DataColumns (columns) if columns.len() == custody_columns.len() + EngineGetBlobsOutput::CustodyColumns(columns) if columns.len() == custody_columns.len() ), "should publish custody columns" ); } -/// Extract the `BlobsOrDataColumns` passed to the `publish_fn`. +/// Extract the `EngineGetBlobsOutput` passed to the `publish_fn`. fn extract_published_blobs( - publish_fn_args: Arc>>>, -) -> BlobsOrDataColumns { + publish_fn_args: Arc>>>, +) -> EngineGetBlobsOutput { let mut calls = publish_fn_args.lock().unwrap(); assert_eq!(calls.len(), 1); calls.pop().unwrap() @@ -250,8 +254,8 @@ fn create_test_block_and_blobs( #[allow(clippy::type_complexity)] fn mock_publish_fn() -> ( - impl Fn(BlobsOrDataColumns) + Send + 'static, - Arc>>>, + impl Fn(EngineGetBlobsOutput) + Send + 'static, + Arc>>>, ) { // Keep track of the arguments captured by `publish_fn`. let captured_args = Arc::new(Mutex::new(vec![])); diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 9b1a3f8677..463f585f2c 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -424,6 +424,14 @@ fn build_gossip_verified_data_columns( ); Ok(None) } + Err(GossipDataColumnError::PriorKnownUnpublished) => { + debug!( + column_index, + %slot, + "Data column for publication already known via the EL" + ); + Ok(None) + } Err(e) => { error!( column_index, diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 638f9e4824..3be416165b 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -797,6 +797,19 @@ impl NetworkBeaconProcessor { } Err(err) => { match err { + GossipDataColumnError::PriorKnownUnpublished => { + debug!( + %slot, + %block_root, + %index, + "Gossip data column already processed via the EL. Accepting the column sidecar without re-processing." + ); + self.propagate_validation_result( + message_id, + peer_id, + MessageAcceptance::Accept, + ); + } GossipDataColumnError::ParentUnknown { parent_root } => { debug!( action = "requesting parent", diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index ba681eed14..f9390a2c7b 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -5,7 +5,7 @@ use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_column_verification::{observe_gossip_data_column, GossipDataColumnError}; use beacon_chain::fetch_blobs::{ - fetch_and_process_engine_blobs, BlobsOrDataColumns, FetchEngineBlobError, + fetch_and_process_engine_blobs, EngineGetBlobsOutput, FetchEngineBlobError, }; use beacon_chain::observed_data_sidecars::DoNotObserve; use beacon_chain::{ @@ -848,11 +848,14 @@ impl NetworkBeaconProcessor { let publish_fn = move |blobs_or_data_column| { if publish_blobs { match blobs_or_data_column { - BlobsOrDataColumns::Blobs(blobs) => { + EngineGetBlobsOutput::Blobs(blobs) => { self_cloned.publish_blobs_gradually(blobs, block_root); } - BlobsOrDataColumns::DataColumns(columns) => { - self_cloned.publish_data_columns_gradually(columns, block_root); + EngineGetBlobsOutput::CustodyColumns(columns) => { + self_cloned.publish_data_columns_gradually( + columns.into_iter().map(|c| c.clone_data_column()).collect(), + block_root, + ); } }; } diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 292e894870..87a5a77294 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -9,13 +9,16 @@ use crate::{ sync::{manager::BlockProcessType, SyncMessage}, }; use beacon_chain::block_verification_types::RpcBlock; +use beacon_chain::data_column_verification::validate_data_column_sidecar_for_gossip; use beacon_chain::kzg_utils::blobs_to_data_column_sidecars; +use beacon_chain::observed_data_sidecars::DoNotObserve; use beacon_chain::test_utils::{ get_kzg, test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, }; use beacon_chain::{BeaconChain, WhenSlotSkipped}; use beacon_processor::{work_reprocessing_queue::*, *}; +use gossipsub::MessageAcceptance; use itertools::Itertools; use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3}; use lighthouse_network::rpc::InboundRequestId; @@ -25,6 +28,7 @@ use lighthouse_network::{ types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield}, Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response, }; +use matches::assert_matches; use slot_clock::SlotClock; use std::iter::Iterator; use std::sync::Arc; @@ -32,9 +36,9 @@ use std::time::Duration; use tokio::sync::mpsc; use types::blob_sidecar::FixedBlobSidecarList; use types::{ - Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, DataColumnSidecarList, - DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, - SignedBeaconBlock, SignedVoluntaryExit, Slot, SubnetId, + Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecarList, + DataColumnSubnetId, Epoch, EthSpec, ForkName, Hash256, MainnetEthSpec, ProposerSlashing, + SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, Slot, SubnetId, }; type E = MainnetEthSpec; @@ -64,7 +68,7 @@ struct TestRig { voluntary_exit: SignedVoluntaryExit, beacon_processor_tx: BeaconProcessorSend, work_journal_rx: mpsc::Receiver<&'static str>, - _network_rx: mpsc::UnboundedReceiver>, + network_rx: mpsc::UnboundedReceiver>, _sync_rx: mpsc::UnboundedReceiver>, duplicate_cache: DuplicateCache, network_beacon_processor: Arc>, @@ -83,19 +87,18 @@ impl Drop for TestRig { impl TestRig { pub async fn new(chain_length: u64) -> Self { - Self::new_parametric( - chain_length, - BeaconProcessorConfig::default().enable_backfill_rate_limiting, - ) - .await - } - - pub async fn new_parametric(chain_length: u64, enable_backfill_rate_limiting: bool) -> Self { // This allows for testing voluntary exits without building out a massive chain. let mut spec = test_spec::(); spec.shard_committee_period = 2; - let spec = Arc::new(spec); + Self::new_parametric(chain_length, BeaconProcessorConfig::default(), spec).await + } + pub async fn new_parametric( + chain_length: u64, + beacon_processor_config: BeaconProcessorConfig, + spec: ChainSpec, + ) -> Self { + let spec = Arc::new(spec); let harness = BeaconChainHarness::builder(MainnetEthSpec) .spec(spec.clone()) .deterministic_keypairs(VALIDATOR_COUNT) @@ -183,12 +186,8 @@ impl TestRig { let chain = harness.chain.clone(); - let (network_tx, _network_rx) = mpsc::unbounded_channel(); + let (network_tx, network_rx) = mpsc::unbounded_channel(); - let beacon_processor_config = BeaconProcessorConfig { - enable_backfill_rate_limiting, - ..Default::default() - }; let BeaconProcessorChannels { beacon_processor_tx, beacon_processor_rx, @@ -304,7 +303,7 @@ impl TestRig { voluntary_exit, beacon_processor_tx, work_journal_rx, - _network_rx, + network_rx, _sync_rx, duplicate_cache, network_beacon_processor, @@ -643,6 +642,50 @@ impl TestRig { assert_eq!(events, expected); } + + /// Listen for network messages and collect them for a specified duration or until reaching a count. + /// + /// Returns None if no messages were received, or Some(Vec) containing the received messages. + /// + /// # Arguments + /// + /// * `timeout` - Maximum duration to listen for messages + /// * `count` - Optional maximum number of messages to collect before returning + pub async fn receive_network_messages_with_timeout( + &mut self, + timeout: Duration, + count: Option, + ) -> Option>> { + let mut events = vec![]; + + let timeout_future = tokio::time::sleep(timeout); + tokio::pin!(timeout_future); + + loop { + // Break if we've received the requested count of messages + if let Some(target_count) = count { + if events.len() >= target_count { + break; + } + } + + tokio::select! { + _ = &mut timeout_future => break, + maybe_msg = self.network_rx.recv() => { + match maybe_msg { + Some(msg) => events.push(msg), + None => break, // Channel closed + } + } + } + } + + if events.is_empty() { + None + } else { + Some(events) + } + } } fn junk_peer_id() -> PeerId { @@ -753,6 +796,58 @@ async fn import_gossip_block_unacceptably_early() { ); } +/// Data columns that have already been processed but unobserved should be propagated without re-importing. +#[tokio::test] +async fn accept_processed_gossip_data_columns_without_import() { + let processor_config = BeaconProcessorConfig::default(); + let fulu_genesis_spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + let mut rig = TestRig::new_parametric(SMALL_CHAIN, processor_config, fulu_genesis_spec).await; + + // GIVEN the data columns have already been processed but unobserved. + // 1. verify data column with `DoNotObserve` to create verified but unobserved data columns. + // 2. put verified but unobserved data columns into the data availability cache. + let verified_data_columns: Vec<_> = rig + .next_data_columns + .clone() + .unwrap() + .into_iter() + .map(|data_column| { + let subnet_id = data_column.index; + validate_data_column_sidecar_for_gossip::<_, DoNotObserve>( + data_column, + subnet_id, + &rig.chain, + ) + .expect("should be valid data column") + }) + .collect(); + + let block_root = rig.next_block.canonical_root(); + rig.chain + .data_availability_checker + .put_gossip_verified_data_columns(block_root, verified_data_columns) + .expect("should put data columns into availability cache"); + + // WHEN an already processed but unobserved data column is received via gossip + rig.enqueue_gossip_data_columns(0); + + // THEN the data column should be propagated without re-importing (not sure if there's an easy way to test this) + let network_message = rig + .receive_network_messages_with_timeout(Duration::from_millis(100), Some(1)) + .await + .and_then(|mut vec| vec.pop()) + .expect("should receive network messages"); + + assert_matches!( + network_message, + NetworkMessage::ValidationResult { + propagation_source: _, + message_id: _, + validation_result: MessageAcceptance::Accept, + } + ); +} + /// Blocks that arrive on-time should be processed normally. #[tokio::test] async fn import_gossip_block_at_current_slot() { @@ -1192,8 +1287,12 @@ async fn test_backfill_sync_processing() { /// Ensure that backfill batches get processed as fast as they can when rate-limiting is disabled. #[tokio::test] async fn test_backfill_sync_processing_rate_limiting_disabled() { - let enable_backfill_rate_limiting = false; - let mut rig = TestRig::new_parametric(SMALL_CHAIN, enable_backfill_rate_limiting).await; + let beacon_processor_config = BeaconProcessorConfig { + enable_backfill_rate_limiting: false, + ..Default::default() + }; + let mut rig = + TestRig::new_parametric(SMALL_CHAIN, beacon_processor_config, test_spec::()).await; for _ in 0..3 { rig.enqueue_backfill_batch(); @@ -1236,7 +1335,7 @@ async fn test_blobs_by_range() { .unwrap_or(0); } let mut actual_count = 0; - while let Some(next) = rig._network_rx.recv().await { + while let Some(next) = rig.network_rx.recv().await { if let NetworkMessage::SendResponse { peer_id: _, response: Response::BlobsByRange(blob), diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 38095ec434..84ff1c7e25 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -14,6 +14,7 @@ use std::time::Duration; use super::*; use crate::sync::block_lookups::common::ResponseType; +use beacon_chain::observed_data_sidecars::Observe; use beacon_chain::{ blob_verification::GossipVerifiedBlob, block_verification_types::{AsBlock, BlockImportData}, @@ -1229,7 +1230,12 @@ impl TestRig { .harness .chain .data_availability_checker - .put_gossip_blob(GossipVerifiedBlob::__assumed_valid(blob.into())) + .put_gossip_verified_blobs( + blob.block_root(), + std::iter::once(GossipVerifiedBlob::<_, Observe>::__assumed_valid( + blob.into(), + )), + ) .unwrap() { Availability::Available(_) => panic!("blob removed from da_checker, available"),