From e6ef644db4e88cdb5a8c4362d8037e6abfbb0abc Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 27 May 2025 05:55:58 +1000 Subject: [PATCH] Verify `getBlobsV2` response and avoid reprocessing imported data columns (#7493) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #7461 and partly #6439. Desired behaviour after receiving `engine_getBlobs` response: 1. Gossip verify the blobs and proofs, but don't mark them as observed yet. This is because not all blobs are published immediately (due to staggered publishing). If we mark them as observed and not publish them, we could end up blocking the gossip propagation. 2. Blobs are marked as observed _either_ when: * They are received from gossip and forwarded to the network . * They are published by the node. Current behaviour: - ❗ We only gossip verify `engine_getBlobsV1` responses, but not `engine_getBlobsV2` responses (PeerDAS). - ❗ After importing EL blobs AND before they're published, if the same blobs arrive via gossip, they will get re-processed, which may result in a re-import. 1. Perform gossip verification on data columns computed from EL `getBlobsV2` response. We currently only do this for `getBlobsV1` to prevent importing blobs with invalid proofs into the `DataAvailabilityChecker`, this should be done on V2 responses too. 2. Add additional gossip verification to make sure we don't re-process a ~~blob~~ or data column that was imported via the EL `getBlobs` but not yet "seen" on the gossip network. If an "unobserved" gossip blob is found in the availability cache, then we know it has passed verification so we can immediately propagate the `ACCEPT` result and forward it to the network, but without re-processing it. **UPDATE:** I've left blobs out for the second change mentioned above, as the likelihood and impact is very slow and we haven't seen it enough, but under PeerDAS this issue is a regular occurrence and we do see the same block getting imported many times. --- beacon_node/beacon_chain/src/beacon_chain.rs | 57 ++++--- .../beacon_chain/src/blob_verification.rs | 26 ++-- .../src/data_availability_checker.rs | 86 ++++------- .../src/data_column_verification.rs | 51 ++++++- .../fetch_blobs/fetch_blobs_beacon_adapter.rs | 13 +- .../beacon_chain/src/fetch_blobs/mod.rs | 97 ++++++++---- .../beacon_chain/src/fetch_blobs/tests.rs | 18 ++- beacon_node/http_api/src/publish_blocks.rs | 8 + .../gossip_methods.rs | 13 ++ .../src/network_beacon_processor/mod.rs | 11 +- .../src/network_beacon_processor/tests.rs | 143 +++++++++++++++--- beacon_node/network/src/sync/tests/lookups.rs | 8 +- 12 files changed, 371 insertions(+), 160 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index c1d30253a3..990f4b6099 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3146,7 +3146,7 @@ impl BeaconChain { self: &Arc, slot: Slot, block_root: Hash256, - engine_get_blobs_output: EngineGetBlobsOutput, + engine_get_blobs_output: EngineGetBlobsOutput, ) -> Result { // If this block has already been imported to forkchoice it must have been available, so // we don't need to process its blobs again. @@ -3161,7 +3161,7 @@ impl BeaconChain { // process_engine_blobs is called for both pre and post PeerDAS. However, post PeerDAS // consumers don't expect the blobs event to fire erratically. if let EngineGetBlobsOutput::Blobs(blobs) = &engine_get_blobs_output { - self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref)); + self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().map(|b| b.as_blob())); } let r = self @@ -3545,7 +3545,9 @@ impl BeaconChain { if let Some(slasher) = self.slasher.as_ref() { slasher.accept_block_header(blob.signed_block_header()); } - let availability = self.data_availability_checker.put_gossip_blob(blob)?; + let availability = self + .data_availability_checker + .put_gossip_verified_blobs(blob.block_root(), std::iter::once(blob))?; self.process_availability(slot, availability, || Ok(())) .await @@ -3568,21 +3570,21 @@ impl BeaconChain { let availability = self .data_availability_checker - .put_gossip_data_columns(block_root, data_columns)?; + .put_gossip_verified_data_columns(block_root, data_columns)?; self.process_availability(slot, availability, publish_fn) .await } - fn check_blobs_for_slashability( + fn check_blobs_for_slashability<'a>( self: &Arc, block_root: Hash256, - blobs: &FixedBlobSidecarList, + blobs: impl IntoIterator>, ) -> Result<(), BlockError> { let mut slashable_cache = self.observed_slashable.write(); for header in blobs - .iter() - .filter_map(|b| b.as_ref().map(|b| b.signed_block_header.clone())) + .into_iter() + .map(|b| b.signed_block_header.clone()) .unique() { if verify_header_signature::(self, &header).is_ok() { @@ -3609,7 +3611,7 @@ impl BeaconChain { block_root: Hash256, blobs: FixedBlobSidecarList, ) -> Result { - self.check_blobs_for_slashability(block_root, &blobs)?; + self.check_blobs_for_slashability(block_root, blobs.iter().flatten().map(Arc::as_ref))?; let availability = self .data_availability_checker .put_rpc_blobs(block_root, blobs)?; @@ -3622,18 +3624,21 @@ impl BeaconChain { self: &Arc, slot: Slot, block_root: Hash256, - engine_get_blobs_output: EngineGetBlobsOutput, + engine_get_blobs_output: EngineGetBlobsOutput, ) -> Result { let availability = match engine_get_blobs_output { EngineGetBlobsOutput::Blobs(blobs) => { - self.check_blobs_for_slashability(block_root, &blobs)?; + self.check_blobs_for_slashability(block_root, blobs.iter().map(|b| b.as_blob()))?; self.data_availability_checker - .put_engine_blobs(block_root, blobs)? + .put_gossip_verified_blobs(block_root, blobs)? } EngineGetBlobsOutput::CustodyColumns(data_columns) => { - self.check_columns_for_slashability(block_root, &data_columns)?; + self.check_columns_for_slashability( + block_root, + data_columns.iter().map(|c| c.as_data_column()), + )?; self.data_availability_checker - .put_engine_data_columns(block_root, data_columns)? + .put_gossip_verified_data_columns(block_root, data_columns)? } }; @@ -3649,7 +3654,10 @@ impl BeaconChain { block_root: Hash256, custody_columns: DataColumnSidecarList, ) -> Result { - self.check_columns_for_slashability(block_root, &custody_columns)?; + self.check_columns_for_slashability( + block_root, + custody_columns.iter().map(|c| c.as_ref()), + )?; // This slot value is purely informative for the consumers of // `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot. @@ -3661,16 +3669,21 @@ impl BeaconChain { .await } - fn check_columns_for_slashability( + fn check_columns_for_slashability<'a>( self: &Arc, block_root: Hash256, - custody_columns: &DataColumnSidecarList, + custody_columns: impl IntoIterator>, ) -> Result<(), BlockError> { let mut slashable_cache = self.observed_slashable.write(); - // Assumes all items in custody_columns are for the same block_root - if let Some(column) = custody_columns.first() { - let header = &column.signed_block_header; - if verify_header_signature::(self, header).is_ok() { + // Process all unique block headers - previous logic assumed all headers were identical and + // only processed the first one. However, we should not make assumptions about data received + // from RPC. + for header in custody_columns + .into_iter() + .map(|c| c.signed_block_header.clone()) + .unique() + { + if verify_header_signature::(self, &header).is_ok() { slashable_cache .observe_slashable( header.message.slot, @@ -3679,7 +3692,7 @@ impl BeaconChain { ) .map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))?; if let Some(slasher) = self.slasher.as_ref() { - slasher.accept_block_header(header.clone()); + slasher.accept_block_header(header); } } } diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 6fe710f41a..d7acb78408 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -166,6 +166,16 @@ pub struct GossipVerifiedBlob, } +impl Clone for GossipVerifiedBlob { + fn clone(&self) -> Self { + Self { + block_root: self.block_root, + blob: self.blob.clone(), + _phantom: PhantomData, + } + } +} + impl GossipVerifiedBlob { pub fn new( blob: Arc>, @@ -335,21 +345,9 @@ impl KzgVerifiedBlobList { } /// Create a `KzgVerifiedBlobList` from `blobs` that are already KZG verified. - /// - /// This should be used with caution, as used incorrectly it could result in KZG verification - /// being skipped and invalid blobs being deemed valid. - pub fn from_verified>>>( - blobs: I, - seen_timestamp: Duration, - ) -> Self { + pub fn from_verified>>(blobs: I) -> Self { Self { - verified_blobs: blobs - .into_iter() - .map(|blob| KzgVerifiedBlob { - blob, - seen_timestamp, - }) - .collect(), + verified_blobs: blobs.into_iter().collect(), } } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 6f292f3551..0fd417389b 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -17,7 +17,7 @@ use task_executor::TaskExecutor; use tracing::{debug, error, info_span, Instrument}; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; use types::{ - BlobSidecarList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, Hash256, + BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, }; @@ -32,6 +32,7 @@ use crate::data_column_verification::{ use crate::metrics::{ KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES, }; +use crate::observed_data_sidecars::ObservationStrategy; pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory}; use types::non_zero_usize::new_non_zero_usize; @@ -155,6 +156,21 @@ impl DataAvailabilityChecker { }) } + /// Check if the exact data column is in the availability cache. + pub fn is_data_column_cached( + &self, + block_root: &Hash256, + data_column: &DataColumnSidecar, + ) -> bool { + self.availability_cache + .peek_pending_components(block_root, |components| { + components.is_some_and(|components| { + let cached_column_opt = components.get_cached_data_column(data_column.index); + cached_column_opt.is_some_and(|cached| *cached == *data_column) + }) + }) + } + /// Get a blob from the availability cache. pub fn get_blob( &self, @@ -218,65 +234,21 @@ impl DataAvailabilityChecker { .put_kzg_verified_data_columns(block_root, verified_custody_columns) } - /// Put a list of blobs received from the EL pool into the availability cache. - /// - /// This DOES NOT perform KZG verification because the KZG proofs should have been constructed - /// immediately prior to calling this function so they are assumed to be valid. - pub fn put_engine_blobs( - &self, - block_root: Hash256, - blobs: FixedBlobSidecarList, - ) -> Result, AvailabilityCheckError> { - let seen_timestamp = self - .slot_clock - .now_duration() - .ok_or(AvailabilityCheckError::SlotClockError)?; - self.availability_cache.put_kzg_verified_blobs( - block_root, - KzgVerifiedBlobList::from_verified(blobs.iter().flatten().cloned(), seen_timestamp), - ) - } - - /// Put a list of data columns computed from blobs received from the EL pool into the - /// availability cache. - /// - /// This DOES NOT perform KZG proof and inclusion proof verification because - /// - The KZG proofs should have been verified by the trusted EL. - /// - The KZG commitments inclusion proof should have been constructed immediately prior to - /// calling this function so they are assumed to be valid. - /// - /// This method is used if the EL already has the blobs and returns them via the `getBlobsV2` - /// engine method. - /// More details in [fetch_blobs.rs](https://github.com/sigp/lighthouse/blob/44f8add41ea2252769bb967864af95b3c13af8ca/beacon_node/beacon_chain/src/fetch_blobs.rs). - pub fn put_engine_data_columns( - &self, - block_root: Hash256, - data_columns: DataColumnSidecarList, - ) -> Result, AvailabilityCheckError> { - let kzg_verified_custody_columns = data_columns - .into_iter() - .map(|d| { - KzgVerifiedCustodyDataColumn::from_asserted_custody( - KzgVerifiedDataColumn::from_verified(d), - ) - }) - .collect::>(); - - self.availability_cache - .put_kzg_verified_data_columns(block_root, kzg_verified_custody_columns) - } - /// Check if we've cached other blobs for this block. If it completes a set and we also /// have a block cached, return the `Availability` variant triggering block import. /// Otherwise cache the blob sidecar. /// /// This should only accept gossip verified blobs, so we should not have to worry about dupes. - pub fn put_gossip_blob( + pub fn put_gossip_verified_blobs< + I: IntoIterator>, + O: ObservationStrategy, + >( &self, - gossip_blob: GossipVerifiedBlob, + block_root: Hash256, + blobs: I, ) -> Result, AvailabilityCheckError> { self.availability_cache - .put_kzg_verified_blobs(gossip_blob.block_root(), vec![gossip_blob.into_inner()]) + .put_kzg_verified_blobs(block_root, blobs.into_iter().map(|b| b.into_inner())) } /// Check if we've cached other data columns for this block. If it satisfies the custody requirement and we also @@ -284,13 +256,15 @@ impl DataAvailabilityChecker { /// Otherwise cache the data column sidecar. /// /// This should only accept gossip verified data columns, so we should not have to worry about dupes. - #[allow(clippy::type_complexity)] - pub fn put_gossip_data_columns( + pub fn put_gossip_verified_data_columns< + O: ObservationStrategy, + I: IntoIterator>, + >( &self, block_root: Hash256, - gossip_data_columns: Vec>, + data_columns: I, ) -> Result, AvailabilityCheckError> { - let custody_columns = gossip_data_columns + let custody_columns = data_columns .into_iter() .map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner())) .collect::>(); diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index b43b259cf6..4e847d9f9f 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -129,6 +129,10 @@ pub enum GossipDataColumnError { slot: Slot, index: ColumnIndex, }, + /// A column has already been processed from non-gossip source and have not yet been seen on + /// the gossip network. + /// This column should be accepted and forwarded over gossip. + PriorKnownUnpublished, /// Data column index must be between 0 and `NUMBER_OF_COLUMNS` (exclusive). /// /// ## Peer scoring @@ -181,6 +185,16 @@ pub struct GossipVerifiedDataColumn, } +impl Clone for GossipVerifiedDataColumn { + fn clone(&self) -> Self { + Self { + block_root: self.block_root, + data_column: self.data_column.clone(), + _phantom: PhantomData, + } + } +} + impl GossipVerifiedDataColumn { pub fn new( column_sidecar: Arc>, @@ -200,6 +214,16 @@ impl GossipVerifiedDataColumn ) } + /// Create a `GossipVerifiedDataColumn` from `DataColumnSidecar` for testing ONLY. + #[cfg(test)] + pub(crate) fn __new_for_testing(column_sidecar: Arc>) -> Self { + Self { + block_root: column_sidecar.block_root(), + data_column: KzgVerifiedDataColumn::__new_for_testing(column_sidecar), + _phantom: Default::default(), + } + } + pub fn as_data_column(&self) -> &DataColumnSidecar { self.data_column.as_data_column() } @@ -243,11 +267,9 @@ impl KzgVerifiedDataColumn { verify_kzg_for_data_column(data_column, kzg) } - /// Create a `KzgVerifiedDataColumn` from `data_column` that are already KZG verified. - /// - /// This should be used with caution, as used incorrectly it could result in KZG verification - /// being skipped and invalid data_columns being deemed valid. - pub fn from_verified(data_column: Arc>) -> Self { + /// Create a `KzgVerifiedDataColumn` from `DataColumnSidecar` for testing ONLY. + #[cfg(test)] + pub(crate) fn __new_for_testing(data_column: Arc>) -> Self { Self { data: data_column } } @@ -444,6 +466,23 @@ pub fn validate_data_column_sidecar_for_gossip { @@ -74,11 +75,19 @@ impl FetchBlobsBeaconAdapter { GossipVerifiedBlob::::new(blob.clone(), blob.index, &self.chain) } + pub(crate) fn verify_data_column_for_gossip( + &self, + data_column: Arc>, + ) -> Result, GossipDataColumnError> { + let index = data_column.index; + GossipVerifiedDataColumn::::new(data_column, index, &self.chain) + } + pub(crate) async fn process_engine_blobs( &self, slot: Slot, block_root: Hash256, - blobs: EngineGetBlobsOutput, + blobs: EngineGetBlobsOutput, ) -> Result { self.chain .process_engine_blobs(slot, block_root, blobs) diff --git a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs index ba798137b0..927841376f 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs @@ -13,6 +13,7 @@ mod fetch_blobs_beacon_adapter; mod tests; use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; +use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; #[cfg_attr(test, double)] use crate::fetch_blobs::fetch_blobs_beacon_adapter::FetchBlobsBeaconAdapter; use crate::kzg_utils::blobs_to_data_column_sidecars; @@ -34,24 +35,17 @@ use tracing::{debug, warn}; use types::blob_sidecar::{BlobSidecarError, FixedBlobSidecarList}; use types::data_column_sidecar::DataColumnSidecarError; use types::{ - BeaconStateError, Blob, BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecarList, EthSpec, - FullPayload, Hash256, KzgProofs, SignedBeaconBlock, SignedBeaconBlockHeader, VersionedHash, + BeaconStateError, Blob, BlobSidecar, ChainSpec, ColumnIndex, EthSpec, FullPayload, Hash256, + KzgProofs, SignedBeaconBlock, SignedBeaconBlockHeader, VersionedHash, }; -/// Blobs or data column to be published to the gossip network. -pub enum BlobsOrDataColumns { +/// Result from engine get blobs to be passed onto `DataAvailabilityChecker` and published to the +/// gossip network. The blobs / data columns have not been marked as observed yet, as they may not +/// be published immediately. +pub enum EngineGetBlobsOutput { Blobs(Vec>), - DataColumns(DataColumnSidecarList), -} - -/// Result from engine get blobs to be passed onto `DataAvailabilityChecker`. -/// -/// The blobs are retrieved from a trusted EL and columns are computed locally, therefore they are -/// considered valid without requiring extra validation. -pub enum EngineGetBlobsOutput { - Blobs(FixedBlobSidecarList), /// A filtered list of custody data columns to be imported into the `DataAvailabilityChecker`. - CustodyColumns(DataColumnSidecarList), + CustodyColumns(Vec>), } #[derive(Debug)] @@ -64,6 +58,7 @@ pub enum FetchEngineBlobError { ExecutionLayerMissing, InternalError(String), GossipBlob(GossipBlobError), + GossipDataColumn(GossipDataColumnError), RequestFailed(ExecutionLayerError), RuntimeShutdown, TokioJoin(tokio::task::JoinError), @@ -76,7 +71,7 @@ pub async fn fetch_and_process_engine_blobs( block_root: Hash256, block: Arc>>, custody_columns: HashSet, - publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, + publish_fn: impl Fn(EngineGetBlobsOutput) + Send + 'static, ) -> Result, FetchEngineBlobError> { fetch_and_process_engine_blobs_inner( FetchBlobsBeaconAdapter::new(chain), @@ -95,7 +90,7 @@ async fn fetch_and_process_engine_blobs_inner( block_root: Hash256, block: Arc>>, custody_columns: HashSet, - publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, + publish_fn: impl Fn(EngineGetBlobsOutput) + Send + 'static, ) -> Result, FetchEngineBlobError> { let versioned_hashes = if let Some(kzg_commitments) = block .message() @@ -148,7 +143,7 @@ async fn fetch_and_process_blobs_v1( block_root: Hash256, block: Arc>, versioned_hashes: Vec, - publish_fn: impl Fn(BlobsOrDataColumns) + Send + Sized, + publish_fn: impl Fn(EngineGetBlobsOutput) + Send + Sized, ) -> Result, FetchEngineBlobError> { let num_expected_blobs = versioned_hashes.len(); metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64); @@ -189,7 +184,7 @@ async fn fetch_and_process_blobs_v1( // and be accepted (and propagated) while we are waiting to publish. Just before publishing // we will observe the blobs/columns and only proceed with publishing if they are not yet seen. let blobs_to_import_and_publish = fixed_blob_sidecar_list - .iter() + .into_iter() .filter_map(|opt_blob| { let blob = opt_blob.as_ref()?; match chain_adapter.verify_blob_for_gossip(blob) { @@ -203,7 +198,9 @@ async fn fetch_and_process_blobs_v1( .map_err(FetchEngineBlobError::GossipBlob)?; if !blobs_to_import_and_publish.is_empty() { - publish_fn(BlobsOrDataColumns::Blobs(blobs_to_import_and_publish)); + publish_fn(EngineGetBlobsOutput::Blobs( + blobs_to_import_and_publish.clone(), + )); } debug!(num_fetched_blobs, "Processing engine blobs"); @@ -212,7 +209,7 @@ async fn fetch_and_process_blobs_v1( .process_engine_blobs( block.slot(), block_root, - EngineGetBlobsOutput::Blobs(fixed_blob_sidecar_list.clone()), + EngineGetBlobsOutput::Blobs(blobs_to_import_and_publish), ) .await?; @@ -225,7 +222,7 @@ async fn fetch_and_process_blobs_v2( block: Arc>, versioned_hashes: Vec, custody_columns_indices: HashSet, - publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, + publish_fn: impl Fn(EngineGetBlobsOutput) + Send + 'static, ) -> Result, FetchEngineBlobError> { let num_expected_blobs = versioned_hashes.len(); @@ -278,6 +275,7 @@ async fn fetch_and_process_blobs_v2( return Ok(None); } + let chain_adapter = Arc::new(chain_adapter); let custody_columns = compute_and_publish_data_columns( &chain_adapter, block.clone(), @@ -303,15 +301,16 @@ async fn fetch_and_process_blobs_v2( /// Offload the data column computation to a blocking task to avoid holding up the async runtime. async fn compute_and_publish_data_columns( - chain_adapter: &FetchBlobsBeaconAdapter, + chain_adapter: &Arc>, block: Arc>>, blobs: Vec>, proofs: Vec>, custody_columns_indices: HashSet, - publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, -) -> Result, FetchEngineBlobError> { + publish_fn: impl Fn(EngineGetBlobsOutput) + Send + 'static, +) -> Result>, FetchEngineBlobError> { let kzg = chain_adapter.kzg().clone(); let spec = chain_adapter.spec().clone(); + let chain_adapter_cloned = chain_adapter.clone(); chain_adapter .executor() .spawn_blocking_handle( @@ -338,8 +337,54 @@ async fn compute_and_publish_data_columns( }) .map_err(FetchEngineBlobError::DataColumnSidecarError)?; - publish_fn(BlobsOrDataColumns::DataColumns(custody_columns.clone())); - Ok(custody_columns) + // Gossip verify data columns before publishing. This prevents blobs with invalid + // KZG proofs from the EL making it into the data availability checker. We do not + // immediately add these blobs to the observed blobs/columns cache because we want + // to allow blobs/columns to arrive on gossip and be accepted (and propagated) while + // we are waiting to publish. Just before publishing we will observe the blobs/columns + // and only proceed with publishing if they are not yet seen. + // TODO(das): we may want to just perform kzg proof verification here, since the + // `DataColumnSidecar` and inclusion proof is computed just above and is unnecessary + // to verify them. + let columns_to_import_and_publish = custody_columns + .into_iter() + .filter_map(|col| { + match chain_adapter_cloned.verify_data_column_for_gossip(col) { + Ok(verified) => Some(Ok(verified)), + Err(e) => match e { + // Ignore already seen data columns + GossipDataColumnError::PriorKnown { .. } + | GossipDataColumnError::PriorKnownUnpublished => None, + GossipDataColumnError::BeaconChainError(_) + | GossipDataColumnError::ProposalSignatureInvalid + | GossipDataColumnError::UnknownValidator(_) + | GossipDataColumnError::IsNotLaterThanParent { .. } + | GossipDataColumnError::InvalidKzgProof(_) + | GossipDataColumnError::InvalidSubnetId { .. } + | GossipDataColumnError::FutureSlot { .. } + | GossipDataColumnError::PastFinalizedSlot { .. } + | GossipDataColumnError::PubkeyCacheTimeout + | GossipDataColumnError::ProposerIndexMismatch { .. } + | GossipDataColumnError::ParentUnknown { .. } + | GossipDataColumnError::NotFinalizedDescendant { .. } + | GossipDataColumnError::InvalidInclusionProof + | GossipDataColumnError::InvalidColumnIndex(_) + | GossipDataColumnError::UnexpectedDataColumn + | GossipDataColumnError::InconsistentCommitmentsLength { .. } + | GossipDataColumnError::InconsistentProofsLength { .. } => { + Some(Err(e)) + } + }, + } + }) + .collect::, _>>() + .map_err(FetchEngineBlobError::GossipDataColumn)?; + + publish_fn(EngineGetBlobsOutput::CustodyColumns( + columns_to_import_and_publish.clone(), + )); + + Ok(columns_to_import_and_publish) }, "compute_and_publish_data_columns", ) diff --git a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs index be3d29e9c9..8eefd4ddf8 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs @@ -1,6 +1,7 @@ +use crate::data_column_verification::GossipVerifiedDataColumn; use crate::fetch_blobs::fetch_blobs_beacon_adapter::MockFetchBlobsBeaconAdapter; use crate::fetch_blobs::{ - fetch_and_process_engine_blobs_inner, BlobsOrDataColumns, FetchEngineBlobError, + fetch_and_process_engine_blobs_inner, EngineGetBlobsOutput, FetchEngineBlobError, }; use crate::test_utils::{get_kzg, EphemeralHarnessType}; use crate::AvailabilityProcessingStatus; @@ -148,6 +149,9 @@ async fn test_fetch_blobs_v2_success() { // All blobs returned, fork choice doesn't contain block mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs)); mock_fork_choice_contains_block(&mut mock_adapter, vec![]); + mock_adapter + .expect_verify_data_column_for_gossip() + .returning(|c| Ok(GossipVerifiedDataColumn::__new_for_testing(c))); mock_process_engine_blobs_result( &mut mock_adapter, Ok(AvailabilityProcessingStatus::Imported(block_root)), @@ -174,16 +178,16 @@ async fn test_fetch_blobs_v2_success() { assert!( matches!( published_columns, - BlobsOrDataColumns::DataColumns (columns) if columns.len() == custody_columns.len() + EngineGetBlobsOutput::CustodyColumns(columns) if columns.len() == custody_columns.len() ), "should publish custody columns" ); } -/// Extract the `BlobsOrDataColumns` passed to the `publish_fn`. +/// Extract the `EngineGetBlobsOutput` passed to the `publish_fn`. fn extract_published_blobs( - publish_fn_args: Arc>>>, -) -> BlobsOrDataColumns { + publish_fn_args: Arc>>>, +) -> EngineGetBlobsOutput { let mut calls = publish_fn_args.lock().unwrap(); assert_eq!(calls.len(), 1); calls.pop().unwrap() @@ -250,8 +254,8 @@ fn create_test_block_and_blobs( #[allow(clippy::type_complexity)] fn mock_publish_fn() -> ( - impl Fn(BlobsOrDataColumns) + Send + 'static, - Arc>>>, + impl Fn(EngineGetBlobsOutput) + Send + 'static, + Arc>>>, ) { // Keep track of the arguments captured by `publish_fn`. let captured_args = Arc::new(Mutex::new(vec![])); diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 9b1a3f8677..463f585f2c 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -424,6 +424,14 @@ fn build_gossip_verified_data_columns( ); Ok(None) } + Err(GossipDataColumnError::PriorKnownUnpublished) => { + debug!( + column_index, + %slot, + "Data column for publication already known via the EL" + ); + Ok(None) + } Err(e) => { error!( column_index, diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 638f9e4824..3be416165b 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -797,6 +797,19 @@ impl NetworkBeaconProcessor { } Err(err) => { match err { + GossipDataColumnError::PriorKnownUnpublished => { + debug!( + %slot, + %block_root, + %index, + "Gossip data column already processed via the EL. Accepting the column sidecar without re-processing." + ); + self.propagate_validation_result( + message_id, + peer_id, + MessageAcceptance::Accept, + ); + } GossipDataColumnError::ParentUnknown { parent_root } => { debug!( action = "requesting parent", diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index ba681eed14..f9390a2c7b 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -5,7 +5,7 @@ use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_column_verification::{observe_gossip_data_column, GossipDataColumnError}; use beacon_chain::fetch_blobs::{ - fetch_and_process_engine_blobs, BlobsOrDataColumns, FetchEngineBlobError, + fetch_and_process_engine_blobs, EngineGetBlobsOutput, FetchEngineBlobError, }; use beacon_chain::observed_data_sidecars::DoNotObserve; use beacon_chain::{ @@ -848,11 +848,14 @@ impl NetworkBeaconProcessor { let publish_fn = move |blobs_or_data_column| { if publish_blobs { match blobs_or_data_column { - BlobsOrDataColumns::Blobs(blobs) => { + EngineGetBlobsOutput::Blobs(blobs) => { self_cloned.publish_blobs_gradually(blobs, block_root); } - BlobsOrDataColumns::DataColumns(columns) => { - self_cloned.publish_data_columns_gradually(columns, block_root); + EngineGetBlobsOutput::CustodyColumns(columns) => { + self_cloned.publish_data_columns_gradually( + columns.into_iter().map(|c| c.clone_data_column()).collect(), + block_root, + ); } }; } diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 292e894870..87a5a77294 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -9,13 +9,16 @@ use crate::{ sync::{manager::BlockProcessType, SyncMessage}, }; use beacon_chain::block_verification_types::RpcBlock; +use beacon_chain::data_column_verification::validate_data_column_sidecar_for_gossip; use beacon_chain::kzg_utils::blobs_to_data_column_sidecars; +use beacon_chain::observed_data_sidecars::DoNotObserve; use beacon_chain::test_utils::{ get_kzg, test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, }; use beacon_chain::{BeaconChain, WhenSlotSkipped}; use beacon_processor::{work_reprocessing_queue::*, *}; +use gossipsub::MessageAcceptance; use itertools::Itertools; use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3}; use lighthouse_network::rpc::InboundRequestId; @@ -25,6 +28,7 @@ use lighthouse_network::{ types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield}, Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response, }; +use matches::assert_matches; use slot_clock::SlotClock; use std::iter::Iterator; use std::sync::Arc; @@ -32,9 +36,9 @@ use std::time::Duration; use tokio::sync::mpsc; use types::blob_sidecar::FixedBlobSidecarList; use types::{ - Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, DataColumnSidecarList, - DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, - SignedBeaconBlock, SignedVoluntaryExit, Slot, SubnetId, + Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecarList, + DataColumnSubnetId, Epoch, EthSpec, ForkName, Hash256, MainnetEthSpec, ProposerSlashing, + SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, Slot, SubnetId, }; type E = MainnetEthSpec; @@ -64,7 +68,7 @@ struct TestRig { voluntary_exit: SignedVoluntaryExit, beacon_processor_tx: BeaconProcessorSend, work_journal_rx: mpsc::Receiver<&'static str>, - _network_rx: mpsc::UnboundedReceiver>, + network_rx: mpsc::UnboundedReceiver>, _sync_rx: mpsc::UnboundedReceiver>, duplicate_cache: DuplicateCache, network_beacon_processor: Arc>, @@ -83,19 +87,18 @@ impl Drop for TestRig { impl TestRig { pub async fn new(chain_length: u64) -> Self { - Self::new_parametric( - chain_length, - BeaconProcessorConfig::default().enable_backfill_rate_limiting, - ) - .await - } - - pub async fn new_parametric(chain_length: u64, enable_backfill_rate_limiting: bool) -> Self { // This allows for testing voluntary exits without building out a massive chain. let mut spec = test_spec::(); spec.shard_committee_period = 2; - let spec = Arc::new(spec); + Self::new_parametric(chain_length, BeaconProcessorConfig::default(), spec).await + } + pub async fn new_parametric( + chain_length: u64, + beacon_processor_config: BeaconProcessorConfig, + spec: ChainSpec, + ) -> Self { + let spec = Arc::new(spec); let harness = BeaconChainHarness::builder(MainnetEthSpec) .spec(spec.clone()) .deterministic_keypairs(VALIDATOR_COUNT) @@ -183,12 +186,8 @@ impl TestRig { let chain = harness.chain.clone(); - let (network_tx, _network_rx) = mpsc::unbounded_channel(); + let (network_tx, network_rx) = mpsc::unbounded_channel(); - let beacon_processor_config = BeaconProcessorConfig { - enable_backfill_rate_limiting, - ..Default::default() - }; let BeaconProcessorChannels { beacon_processor_tx, beacon_processor_rx, @@ -304,7 +303,7 @@ impl TestRig { voluntary_exit, beacon_processor_tx, work_journal_rx, - _network_rx, + network_rx, _sync_rx, duplicate_cache, network_beacon_processor, @@ -643,6 +642,50 @@ impl TestRig { assert_eq!(events, expected); } + + /// Listen for network messages and collect them for a specified duration or until reaching a count. + /// + /// Returns None if no messages were received, or Some(Vec) containing the received messages. + /// + /// # Arguments + /// + /// * `timeout` - Maximum duration to listen for messages + /// * `count` - Optional maximum number of messages to collect before returning + pub async fn receive_network_messages_with_timeout( + &mut self, + timeout: Duration, + count: Option, + ) -> Option>> { + let mut events = vec![]; + + let timeout_future = tokio::time::sleep(timeout); + tokio::pin!(timeout_future); + + loop { + // Break if we've received the requested count of messages + if let Some(target_count) = count { + if events.len() >= target_count { + break; + } + } + + tokio::select! { + _ = &mut timeout_future => break, + maybe_msg = self.network_rx.recv() => { + match maybe_msg { + Some(msg) => events.push(msg), + None => break, // Channel closed + } + } + } + } + + if events.is_empty() { + None + } else { + Some(events) + } + } } fn junk_peer_id() -> PeerId { @@ -753,6 +796,58 @@ async fn import_gossip_block_unacceptably_early() { ); } +/// Data columns that have already been processed but unobserved should be propagated without re-importing. +#[tokio::test] +async fn accept_processed_gossip_data_columns_without_import() { + let processor_config = BeaconProcessorConfig::default(); + let fulu_genesis_spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + let mut rig = TestRig::new_parametric(SMALL_CHAIN, processor_config, fulu_genesis_spec).await; + + // GIVEN the data columns have already been processed but unobserved. + // 1. verify data column with `DoNotObserve` to create verified but unobserved data columns. + // 2. put verified but unobserved data columns into the data availability cache. + let verified_data_columns: Vec<_> = rig + .next_data_columns + .clone() + .unwrap() + .into_iter() + .map(|data_column| { + let subnet_id = data_column.index; + validate_data_column_sidecar_for_gossip::<_, DoNotObserve>( + data_column, + subnet_id, + &rig.chain, + ) + .expect("should be valid data column") + }) + .collect(); + + let block_root = rig.next_block.canonical_root(); + rig.chain + .data_availability_checker + .put_gossip_verified_data_columns(block_root, verified_data_columns) + .expect("should put data columns into availability cache"); + + // WHEN an already processed but unobserved data column is received via gossip + rig.enqueue_gossip_data_columns(0); + + // THEN the data column should be propagated without re-importing (not sure if there's an easy way to test this) + let network_message = rig + .receive_network_messages_with_timeout(Duration::from_millis(100), Some(1)) + .await + .and_then(|mut vec| vec.pop()) + .expect("should receive network messages"); + + assert_matches!( + network_message, + NetworkMessage::ValidationResult { + propagation_source: _, + message_id: _, + validation_result: MessageAcceptance::Accept, + } + ); +} + /// Blocks that arrive on-time should be processed normally. #[tokio::test] async fn import_gossip_block_at_current_slot() { @@ -1192,8 +1287,12 @@ async fn test_backfill_sync_processing() { /// Ensure that backfill batches get processed as fast as they can when rate-limiting is disabled. #[tokio::test] async fn test_backfill_sync_processing_rate_limiting_disabled() { - let enable_backfill_rate_limiting = false; - let mut rig = TestRig::new_parametric(SMALL_CHAIN, enable_backfill_rate_limiting).await; + let beacon_processor_config = BeaconProcessorConfig { + enable_backfill_rate_limiting: false, + ..Default::default() + }; + let mut rig = + TestRig::new_parametric(SMALL_CHAIN, beacon_processor_config, test_spec::()).await; for _ in 0..3 { rig.enqueue_backfill_batch(); @@ -1236,7 +1335,7 @@ async fn test_blobs_by_range() { .unwrap_or(0); } let mut actual_count = 0; - while let Some(next) = rig._network_rx.recv().await { + while let Some(next) = rig.network_rx.recv().await { if let NetworkMessage::SendResponse { peer_id: _, response: Response::BlobsByRange(blob), diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 38095ec434..84ff1c7e25 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -14,6 +14,7 @@ use std::time::Duration; use super::*; use crate::sync::block_lookups::common::ResponseType; +use beacon_chain::observed_data_sidecars::Observe; use beacon_chain::{ blob_verification::GossipVerifiedBlob, block_verification_types::{AsBlock, BlockImportData}, @@ -1229,7 +1230,12 @@ impl TestRig { .harness .chain .data_availability_checker - .put_gossip_blob(GossipVerifiedBlob::__assumed_valid(blob.into())) + .put_gossip_verified_blobs( + blob.block_root(), + std::iter::once(GossipVerifiedBlob::<_, Observe>::__assumed_valid( + blob.into(), + )), + ) .unwrap() { Availability::Available(_) => panic!("blob removed from da_checker, available"),