mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-03 08:41:43 +00:00
Verify getBlobsV2 response and avoid reprocessing imported data columns (#7493)
#7461 and partly #6439. Desired behaviour after receiving `engine_getBlobs` response: 1. Gossip verify the blobs and proofs, but don't mark them as observed yet. This is because not all blobs are published immediately (due to staggered publishing). If we mark them as observed and not publish them, we could end up blocking the gossip propagation. 2. Blobs are marked as observed _either_ when: * They are received from gossip and forwarded to the network . * They are published by the node. Current behaviour: - ❗ We only gossip verify `engine_getBlobsV1` responses, but not `engine_getBlobsV2` responses (PeerDAS). - ❗ After importing EL blobs AND before they're published, if the same blobs arrive via gossip, they will get re-processed, which may result in a re-import. 1. Perform gossip verification on data columns computed from EL `getBlobsV2` response. We currently only do this for `getBlobsV1` to prevent importing blobs with invalid proofs into the `DataAvailabilityChecker`, this should be done on V2 responses too. 2. Add additional gossip verification to make sure we don't re-process a ~~blob~~ or data column that was imported via the EL `getBlobs` but not yet "seen" on the gossip network. If an "unobserved" gossip blob is found in the availability cache, then we know it has passed verification so we can immediately propagate the `ACCEPT` result and forward it to the network, but without re-processing it. **UPDATE:** I've left blobs out for the second change mentioned above, as the likelihood and impact is very slow and we haven't seen it enough, but under PeerDAS this issue is a regular occurrence and we do see the same block getting imported many times.
This commit is contained in:
@@ -3146,7 +3146,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
self: &Arc<Self>,
|
||||
slot: Slot,
|
||||
block_root: Hash256,
|
||||
engine_get_blobs_output: EngineGetBlobsOutput<T::EthSpec>,
|
||||
engine_get_blobs_output: EngineGetBlobsOutput<T>,
|
||||
) -> Result<AvailabilityProcessingStatus, BlockError> {
|
||||
// If this block has already been imported to forkchoice it must have been available, so
|
||||
// we don't need to process its blobs again.
|
||||
@@ -3161,7 +3161,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
// process_engine_blobs is called for both pre and post PeerDAS. However, post PeerDAS
|
||||
// consumers don't expect the blobs event to fire erratically.
|
||||
if let EngineGetBlobsOutput::Blobs(blobs) = &engine_get_blobs_output {
|
||||
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));
|
||||
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().map(|b| b.as_blob()));
|
||||
}
|
||||
|
||||
let r = self
|
||||
@@ -3545,7 +3545,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
if let Some(slasher) = self.slasher.as_ref() {
|
||||
slasher.accept_block_header(blob.signed_block_header());
|
||||
}
|
||||
let availability = self.data_availability_checker.put_gossip_blob(blob)?;
|
||||
let availability = self
|
||||
.data_availability_checker
|
||||
.put_gossip_verified_blobs(blob.block_root(), std::iter::once(blob))?;
|
||||
|
||||
self.process_availability(slot, availability, || Ok(()))
|
||||
.await
|
||||
@@ -3568,21 +3570,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
|
||||
let availability = self
|
||||
.data_availability_checker
|
||||
.put_gossip_data_columns(block_root, data_columns)?;
|
||||
.put_gossip_verified_data_columns(block_root, data_columns)?;
|
||||
|
||||
self.process_availability(slot, availability, publish_fn)
|
||||
.await
|
||||
}
|
||||
|
||||
fn check_blobs_for_slashability(
|
||||
fn check_blobs_for_slashability<'a>(
|
||||
self: &Arc<Self>,
|
||||
block_root: Hash256,
|
||||
blobs: &FixedBlobSidecarList<T::EthSpec>,
|
||||
blobs: impl IntoIterator<Item = &'a BlobSidecar<T::EthSpec>>,
|
||||
) -> Result<(), BlockError> {
|
||||
let mut slashable_cache = self.observed_slashable.write();
|
||||
for header in blobs
|
||||
.iter()
|
||||
.filter_map(|b| b.as_ref().map(|b| b.signed_block_header.clone()))
|
||||
.into_iter()
|
||||
.map(|b| b.signed_block_header.clone())
|
||||
.unique()
|
||||
{
|
||||
if verify_header_signature::<T, BlockError>(self, &header).is_ok() {
|
||||
@@ -3609,7 +3611,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
block_root: Hash256,
|
||||
blobs: FixedBlobSidecarList<T::EthSpec>,
|
||||
) -> Result<AvailabilityProcessingStatus, BlockError> {
|
||||
self.check_blobs_for_slashability(block_root, &blobs)?;
|
||||
self.check_blobs_for_slashability(block_root, blobs.iter().flatten().map(Arc::as_ref))?;
|
||||
let availability = self
|
||||
.data_availability_checker
|
||||
.put_rpc_blobs(block_root, blobs)?;
|
||||
@@ -3622,18 +3624,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
self: &Arc<Self>,
|
||||
slot: Slot,
|
||||
block_root: Hash256,
|
||||
engine_get_blobs_output: EngineGetBlobsOutput<T::EthSpec>,
|
||||
engine_get_blobs_output: EngineGetBlobsOutput<T>,
|
||||
) -> Result<AvailabilityProcessingStatus, BlockError> {
|
||||
let availability = match engine_get_blobs_output {
|
||||
EngineGetBlobsOutput::Blobs(blobs) => {
|
||||
self.check_blobs_for_slashability(block_root, &blobs)?;
|
||||
self.check_blobs_for_slashability(block_root, blobs.iter().map(|b| b.as_blob()))?;
|
||||
self.data_availability_checker
|
||||
.put_engine_blobs(block_root, blobs)?
|
||||
.put_gossip_verified_blobs(block_root, blobs)?
|
||||
}
|
||||
EngineGetBlobsOutput::CustodyColumns(data_columns) => {
|
||||
self.check_columns_for_slashability(block_root, &data_columns)?;
|
||||
self.check_columns_for_slashability(
|
||||
block_root,
|
||||
data_columns.iter().map(|c| c.as_data_column()),
|
||||
)?;
|
||||
self.data_availability_checker
|
||||
.put_engine_data_columns(block_root, data_columns)?
|
||||
.put_gossip_verified_data_columns(block_root, data_columns)?
|
||||
}
|
||||
};
|
||||
|
||||
@@ -3649,7 +3654,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
block_root: Hash256,
|
||||
custody_columns: DataColumnSidecarList<T::EthSpec>,
|
||||
) -> Result<AvailabilityProcessingStatus, BlockError> {
|
||||
self.check_columns_for_slashability(block_root, &custody_columns)?;
|
||||
self.check_columns_for_slashability(
|
||||
block_root,
|
||||
custody_columns.iter().map(|c| c.as_ref()),
|
||||
)?;
|
||||
|
||||
// This slot value is purely informative for the consumers of
|
||||
// `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot.
|
||||
@@ -3661,16 +3669,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
.await
|
||||
}
|
||||
|
||||
fn check_columns_for_slashability(
|
||||
fn check_columns_for_slashability<'a>(
|
||||
self: &Arc<Self>,
|
||||
block_root: Hash256,
|
||||
custody_columns: &DataColumnSidecarList<T::EthSpec>,
|
||||
custody_columns: impl IntoIterator<Item = &'a DataColumnSidecar<T::EthSpec>>,
|
||||
) -> Result<(), BlockError> {
|
||||
let mut slashable_cache = self.observed_slashable.write();
|
||||
// Assumes all items in custody_columns are for the same block_root
|
||||
if let Some(column) = custody_columns.first() {
|
||||
let header = &column.signed_block_header;
|
||||
if verify_header_signature::<T, BlockError>(self, header).is_ok() {
|
||||
// Process all unique block headers - previous logic assumed all headers were identical and
|
||||
// only processed the first one. However, we should not make assumptions about data received
|
||||
// from RPC.
|
||||
for header in custody_columns
|
||||
.into_iter()
|
||||
.map(|c| c.signed_block_header.clone())
|
||||
.unique()
|
||||
{
|
||||
if verify_header_signature::<T, BlockError>(self, &header).is_ok() {
|
||||
slashable_cache
|
||||
.observe_slashable(
|
||||
header.message.slot,
|
||||
@@ -3679,7 +3692,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
)
|
||||
.map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))?;
|
||||
if let Some(slasher) = self.slasher.as_ref() {
|
||||
slasher.accept_block_header(header.clone());
|
||||
slasher.accept_block_header(header);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -166,6 +166,16 @@ pub struct GossipVerifiedBlob<T: BeaconChainTypes, O: ObservationStrategy = Obse
|
||||
_phantom: PhantomData<O>,
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes, O: ObservationStrategy> Clone for GossipVerifiedBlob<T, O> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
block_root: self.block_root,
|
||||
blob: self.blob.clone(),
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes, O: ObservationStrategy> GossipVerifiedBlob<T, O> {
|
||||
pub fn new(
|
||||
blob: Arc<BlobSidecar<T::EthSpec>>,
|
||||
@@ -335,21 +345,9 @@ impl<E: EthSpec> KzgVerifiedBlobList<E> {
|
||||
}
|
||||
|
||||
/// Create a `KzgVerifiedBlobList` from `blobs` that are already KZG verified.
|
||||
///
|
||||
/// This should be used with caution, as used incorrectly it could result in KZG verification
|
||||
/// being skipped and invalid blobs being deemed valid.
|
||||
pub fn from_verified<I: IntoIterator<Item = Arc<BlobSidecar<E>>>>(
|
||||
blobs: I,
|
||||
seen_timestamp: Duration,
|
||||
) -> Self {
|
||||
pub fn from_verified<I: IntoIterator<Item = KzgVerifiedBlob<E>>>(blobs: I) -> Self {
|
||||
Self {
|
||||
verified_blobs: blobs
|
||||
.into_iter()
|
||||
.map(|blob| KzgVerifiedBlob {
|
||||
blob,
|
||||
seen_timestamp,
|
||||
})
|
||||
.collect(),
|
||||
verified_blobs: blobs.into_iter().collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ use task_executor::TaskExecutor;
|
||||
use tracing::{debug, error, info_span, Instrument};
|
||||
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
|
||||
use types::{
|
||||
BlobSidecarList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, Hash256,
|
||||
BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256,
|
||||
RuntimeVariableList, SignedBeaconBlock,
|
||||
};
|
||||
|
||||
@@ -32,6 +32,7 @@ use crate::data_column_verification::{
|
||||
use crate::metrics::{
|
||||
KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES,
|
||||
};
|
||||
use crate::observed_data_sidecars::ObservationStrategy;
|
||||
pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory};
|
||||
use types::non_zero_usize::new_non_zero_usize;
|
||||
|
||||
@@ -155,6 +156,21 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Check if the exact data column is in the availability cache.
|
||||
pub fn is_data_column_cached(
|
||||
&self,
|
||||
block_root: &Hash256,
|
||||
data_column: &DataColumnSidecar<T::EthSpec>,
|
||||
) -> bool {
|
||||
self.availability_cache
|
||||
.peek_pending_components(block_root, |components| {
|
||||
components.is_some_and(|components| {
|
||||
let cached_column_opt = components.get_cached_data_column(data_column.index);
|
||||
cached_column_opt.is_some_and(|cached| *cached == *data_column)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/// Get a blob from the availability cache.
|
||||
pub fn get_blob(
|
||||
&self,
|
||||
@@ -218,65 +234,21 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
.put_kzg_verified_data_columns(block_root, verified_custody_columns)
|
||||
}
|
||||
|
||||
/// Put a list of blobs received from the EL pool into the availability cache.
|
||||
///
|
||||
/// This DOES NOT perform KZG verification because the KZG proofs should have been constructed
|
||||
/// immediately prior to calling this function so they are assumed to be valid.
|
||||
pub fn put_engine_blobs(
|
||||
&self,
|
||||
block_root: Hash256,
|
||||
blobs: FixedBlobSidecarList<T::EthSpec>,
|
||||
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
|
||||
let seen_timestamp = self
|
||||
.slot_clock
|
||||
.now_duration()
|
||||
.ok_or(AvailabilityCheckError::SlotClockError)?;
|
||||
self.availability_cache.put_kzg_verified_blobs(
|
||||
block_root,
|
||||
KzgVerifiedBlobList::from_verified(blobs.iter().flatten().cloned(), seen_timestamp),
|
||||
)
|
||||
}
|
||||
|
||||
/// Put a list of data columns computed from blobs received from the EL pool into the
|
||||
/// availability cache.
|
||||
///
|
||||
/// This DOES NOT perform KZG proof and inclusion proof verification because
|
||||
/// - The KZG proofs should have been verified by the trusted EL.
|
||||
/// - The KZG commitments inclusion proof should have been constructed immediately prior to
|
||||
/// calling this function so they are assumed to be valid.
|
||||
///
|
||||
/// This method is used if the EL already has the blobs and returns them via the `getBlobsV2`
|
||||
/// engine method.
|
||||
/// More details in [fetch_blobs.rs](https://github.com/sigp/lighthouse/blob/44f8add41ea2252769bb967864af95b3c13af8ca/beacon_node/beacon_chain/src/fetch_blobs.rs).
|
||||
pub fn put_engine_data_columns(
|
||||
&self,
|
||||
block_root: Hash256,
|
||||
data_columns: DataColumnSidecarList<T::EthSpec>,
|
||||
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
|
||||
let kzg_verified_custody_columns = data_columns
|
||||
.into_iter()
|
||||
.map(|d| {
|
||||
KzgVerifiedCustodyDataColumn::from_asserted_custody(
|
||||
KzgVerifiedDataColumn::from_verified(d),
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
self.availability_cache
|
||||
.put_kzg_verified_data_columns(block_root, kzg_verified_custody_columns)
|
||||
}
|
||||
|
||||
/// Check if we've cached other blobs for this block. If it completes a set and we also
|
||||
/// have a block cached, return the `Availability` variant triggering block import.
|
||||
/// Otherwise cache the blob sidecar.
|
||||
///
|
||||
/// This should only accept gossip verified blobs, so we should not have to worry about dupes.
|
||||
pub fn put_gossip_blob(
|
||||
pub fn put_gossip_verified_blobs<
|
||||
I: IntoIterator<Item = GossipVerifiedBlob<T, O>>,
|
||||
O: ObservationStrategy,
|
||||
>(
|
||||
&self,
|
||||
gossip_blob: GossipVerifiedBlob<T>,
|
||||
block_root: Hash256,
|
||||
blobs: I,
|
||||
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
|
||||
self.availability_cache
|
||||
.put_kzg_verified_blobs(gossip_blob.block_root(), vec![gossip_blob.into_inner()])
|
||||
.put_kzg_verified_blobs(block_root, blobs.into_iter().map(|b| b.into_inner()))
|
||||
}
|
||||
|
||||
/// Check if we've cached other data columns for this block. If it satisfies the custody requirement and we also
|
||||
@@ -284,13 +256,15 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
/// Otherwise cache the data column sidecar.
|
||||
///
|
||||
/// This should only accept gossip verified data columns, so we should not have to worry about dupes.
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub fn put_gossip_data_columns(
|
||||
pub fn put_gossip_verified_data_columns<
|
||||
O: ObservationStrategy,
|
||||
I: IntoIterator<Item = GossipVerifiedDataColumn<T, O>>,
|
||||
>(
|
||||
&self,
|
||||
block_root: Hash256,
|
||||
gossip_data_columns: Vec<GossipVerifiedDataColumn<T>>,
|
||||
data_columns: I,
|
||||
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
|
||||
let custody_columns = gossip_data_columns
|
||||
let custody_columns = data_columns
|
||||
.into_iter()
|
||||
.map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
@@ -129,6 +129,10 @@ pub enum GossipDataColumnError {
|
||||
slot: Slot,
|
||||
index: ColumnIndex,
|
||||
},
|
||||
/// A column has already been processed from non-gossip source and have not yet been seen on
|
||||
/// the gossip network.
|
||||
/// This column should be accepted and forwarded over gossip.
|
||||
PriorKnownUnpublished,
|
||||
/// Data column index must be between 0 and `NUMBER_OF_COLUMNS` (exclusive).
|
||||
///
|
||||
/// ## Peer scoring
|
||||
@@ -181,6 +185,16 @@ pub struct GossipVerifiedDataColumn<T: BeaconChainTypes, O: ObservationStrategy
|
||||
_phantom: PhantomData<O>,
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes, O: ObservationStrategy> Clone for GossipVerifiedDataColumn<T, O> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
block_root: self.block_root,
|
||||
data_column: self.data_column.clone(),
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes, O: ObservationStrategy> GossipVerifiedDataColumn<T, O> {
|
||||
pub fn new(
|
||||
column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
|
||||
@@ -200,6 +214,16 @@ impl<T: BeaconChainTypes, O: ObservationStrategy> GossipVerifiedDataColumn<T, O>
|
||||
)
|
||||
}
|
||||
|
||||
/// Create a `GossipVerifiedDataColumn` from `DataColumnSidecar` for testing ONLY.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn __new_for_testing(column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>) -> Self {
|
||||
Self {
|
||||
block_root: column_sidecar.block_root(),
|
||||
data_column: KzgVerifiedDataColumn::__new_for_testing(column_sidecar),
|
||||
_phantom: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_data_column(&self) -> &DataColumnSidecar<T::EthSpec> {
|
||||
self.data_column.as_data_column()
|
||||
}
|
||||
@@ -243,11 +267,9 @@ impl<E: EthSpec> KzgVerifiedDataColumn<E> {
|
||||
verify_kzg_for_data_column(data_column, kzg)
|
||||
}
|
||||
|
||||
/// Create a `KzgVerifiedDataColumn` from `data_column` that are already KZG verified.
|
||||
///
|
||||
/// This should be used with caution, as used incorrectly it could result in KZG verification
|
||||
/// being skipped and invalid data_columns being deemed valid.
|
||||
pub fn from_verified(data_column: Arc<DataColumnSidecar<E>>) -> Self {
|
||||
/// Create a `KzgVerifiedDataColumn` from `DataColumnSidecar` for testing ONLY.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn __new_for_testing(data_column: Arc<DataColumnSidecar<E>>) -> Self {
|
||||
Self { data: data_column }
|
||||
}
|
||||
|
||||
@@ -444,6 +466,23 @@ pub fn validate_data_column_sidecar_for_gossip<T: BeaconChainTypes, O: Observati
|
||||
verify_sidecar_not_from_future_slot(chain, column_slot)?;
|
||||
verify_slot_greater_than_latest_finalized_slot(chain, column_slot)?;
|
||||
verify_is_first_sidecar(chain, &data_column)?;
|
||||
|
||||
// Check if the data column is already in the DA checker cache. This happens when data columns
|
||||
// are made available through the `engine_getBlobs` method. If it exists in the cache, we know
|
||||
// it has already passed the gossip checks, even though this particular instance hasn't been
|
||||
// seen / published on the gossip network yet (passed the `verify_is_first_sidecar` check above).
|
||||
// In this case, we should accept it for gossip propagation.
|
||||
if chain
|
||||
.data_availability_checker
|
||||
.is_data_column_cached(&data_column.block_root(), &data_column)
|
||||
{
|
||||
// Observe this data column so we don't process it again.
|
||||
if O::observe() {
|
||||
observe_gossip_data_column(&data_column, chain)?;
|
||||
}
|
||||
return Err(GossipDataColumnError::PriorKnownUnpublished);
|
||||
}
|
||||
|
||||
verify_column_inclusion_proof(&data_column)?;
|
||||
let parent_block = verify_parent_block_and_finalized_descendant(data_column.clone(), chain)?;
|
||||
verify_slot_higher_than_parent(&parent_block, column_slot)?;
|
||||
@@ -463,7 +502,7 @@ pub fn validate_data_column_sidecar_for_gossip<T: BeaconChainTypes, O: Observati
|
||||
.map_err(|e| GossipDataColumnError::BeaconChainError(Box::new(e.into())))?;
|
||||
|
||||
if O::observe() {
|
||||
observe_gossip_data_column(&kzg_verified_data_column.data, chain)?;
|
||||
observe_gossip_data_column(&data_column, chain)?;
|
||||
}
|
||||
|
||||
Ok(GossipVerifiedDataColumn {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
|
||||
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
|
||||
use crate::fetch_blobs::{EngineGetBlobsOutput, FetchEngineBlobError};
|
||||
use crate::observed_data_sidecars::DoNotObserve;
|
||||
use crate::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes};
|
||||
@@ -8,7 +9,7 @@ use kzg::Kzg;
|
||||
use mockall::automock;
|
||||
use std::sync::Arc;
|
||||
use task_executor::TaskExecutor;
|
||||
use types::{BlobSidecar, ChainSpec, Hash256, Slot};
|
||||
use types::{BlobSidecar, ChainSpec, DataColumnSidecar, Hash256, Slot};
|
||||
|
||||
/// An adapter to the `BeaconChain` functionalities to remove `BeaconChain` from direct dependency to enable testing fetch blobs logic.
|
||||
pub(crate) struct FetchBlobsBeaconAdapter<T: BeaconChainTypes> {
|
||||
@@ -74,11 +75,19 @@ impl<T: BeaconChainTypes> FetchBlobsBeaconAdapter<T> {
|
||||
GossipVerifiedBlob::<T, DoNotObserve>::new(blob.clone(), blob.index, &self.chain)
|
||||
}
|
||||
|
||||
pub(crate) fn verify_data_column_for_gossip(
|
||||
&self,
|
||||
data_column: Arc<DataColumnSidecar<T::EthSpec>>,
|
||||
) -> Result<GossipVerifiedDataColumn<T, DoNotObserve>, GossipDataColumnError> {
|
||||
let index = data_column.index;
|
||||
GossipVerifiedDataColumn::<T, DoNotObserve>::new(data_column, index, &self.chain)
|
||||
}
|
||||
|
||||
pub(crate) async fn process_engine_blobs(
|
||||
&self,
|
||||
slot: Slot,
|
||||
block_root: Hash256,
|
||||
blobs: EngineGetBlobsOutput<T::EthSpec>,
|
||||
blobs: EngineGetBlobsOutput<T>,
|
||||
) -> Result<AvailabilityProcessingStatus, FetchEngineBlobError> {
|
||||
self.chain
|
||||
.process_engine_blobs(slot, block_root, blobs)
|
||||
|
||||
@@ -13,6 +13,7 @@ mod fetch_blobs_beacon_adapter;
|
||||
mod tests;
|
||||
|
||||
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
|
||||
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
|
||||
#[cfg_attr(test, double)]
|
||||
use crate::fetch_blobs::fetch_blobs_beacon_adapter::FetchBlobsBeaconAdapter;
|
||||
use crate::kzg_utils::blobs_to_data_column_sidecars;
|
||||
@@ -34,24 +35,17 @@ use tracing::{debug, warn};
|
||||
use types::blob_sidecar::{BlobSidecarError, FixedBlobSidecarList};
|
||||
use types::data_column_sidecar::DataColumnSidecarError;
|
||||
use types::{
|
||||
BeaconStateError, Blob, BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecarList, EthSpec,
|
||||
FullPayload, Hash256, KzgProofs, SignedBeaconBlock, SignedBeaconBlockHeader, VersionedHash,
|
||||
BeaconStateError, Blob, BlobSidecar, ChainSpec, ColumnIndex, EthSpec, FullPayload, Hash256,
|
||||
KzgProofs, SignedBeaconBlock, SignedBeaconBlockHeader, VersionedHash,
|
||||
};
|
||||
|
||||
/// Blobs or data column to be published to the gossip network.
|
||||
pub enum BlobsOrDataColumns<T: BeaconChainTypes> {
|
||||
/// Result from engine get blobs to be passed onto `DataAvailabilityChecker` and published to the
|
||||
/// gossip network. The blobs / data columns have not been marked as observed yet, as they may not
|
||||
/// be published immediately.
|
||||
pub enum EngineGetBlobsOutput<T: BeaconChainTypes> {
|
||||
Blobs(Vec<GossipVerifiedBlob<T, DoNotObserve>>),
|
||||
DataColumns(DataColumnSidecarList<T::EthSpec>),
|
||||
}
|
||||
|
||||
/// Result from engine get blobs to be passed onto `DataAvailabilityChecker`.
|
||||
///
|
||||
/// The blobs are retrieved from a trusted EL and columns are computed locally, therefore they are
|
||||
/// considered valid without requiring extra validation.
|
||||
pub enum EngineGetBlobsOutput<E: EthSpec> {
|
||||
Blobs(FixedBlobSidecarList<E>),
|
||||
/// A filtered list of custody data columns to be imported into the `DataAvailabilityChecker`.
|
||||
CustodyColumns(DataColumnSidecarList<E>),
|
||||
CustodyColumns(Vec<GossipVerifiedDataColumn<T, DoNotObserve>>),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -64,6 +58,7 @@ pub enum FetchEngineBlobError {
|
||||
ExecutionLayerMissing,
|
||||
InternalError(String),
|
||||
GossipBlob(GossipBlobError),
|
||||
GossipDataColumn(GossipDataColumnError),
|
||||
RequestFailed(ExecutionLayerError),
|
||||
RuntimeShutdown,
|
||||
TokioJoin(tokio::task::JoinError),
|
||||
@@ -76,7 +71,7 @@ pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
|
||||
block_root: Hash256,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
|
||||
custody_columns: HashSet<ColumnIndex>,
|
||||
publish_fn: impl Fn(BlobsOrDataColumns<T>) + Send + 'static,
|
||||
publish_fn: impl Fn(EngineGetBlobsOutput<T>) + Send + 'static,
|
||||
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
|
||||
fetch_and_process_engine_blobs_inner(
|
||||
FetchBlobsBeaconAdapter::new(chain),
|
||||
@@ -95,7 +90,7 @@ async fn fetch_and_process_engine_blobs_inner<T: BeaconChainTypes>(
|
||||
block_root: Hash256,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
|
||||
custody_columns: HashSet<ColumnIndex>,
|
||||
publish_fn: impl Fn(BlobsOrDataColumns<T>) + Send + 'static,
|
||||
publish_fn: impl Fn(EngineGetBlobsOutput<T>) + Send + 'static,
|
||||
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
|
||||
let versioned_hashes = if let Some(kzg_commitments) = block
|
||||
.message()
|
||||
@@ -148,7 +143,7 @@ async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
|
||||
block_root: Hash256,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
versioned_hashes: Vec<VersionedHash>,
|
||||
publish_fn: impl Fn(BlobsOrDataColumns<T>) + Send + Sized,
|
||||
publish_fn: impl Fn(EngineGetBlobsOutput<T>) + Send + Sized,
|
||||
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
|
||||
let num_expected_blobs = versioned_hashes.len();
|
||||
metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64);
|
||||
@@ -189,7 +184,7 @@ async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
|
||||
// and be accepted (and propagated) while we are waiting to publish. Just before publishing
|
||||
// we will observe the blobs/columns and only proceed with publishing if they are not yet seen.
|
||||
let blobs_to_import_and_publish = fixed_blob_sidecar_list
|
||||
.iter()
|
||||
.into_iter()
|
||||
.filter_map(|opt_blob| {
|
||||
let blob = opt_blob.as_ref()?;
|
||||
match chain_adapter.verify_blob_for_gossip(blob) {
|
||||
@@ -203,7 +198,9 @@ async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
|
||||
.map_err(FetchEngineBlobError::GossipBlob)?;
|
||||
|
||||
if !blobs_to_import_and_publish.is_empty() {
|
||||
publish_fn(BlobsOrDataColumns::Blobs(blobs_to_import_and_publish));
|
||||
publish_fn(EngineGetBlobsOutput::Blobs(
|
||||
blobs_to_import_and_publish.clone(),
|
||||
));
|
||||
}
|
||||
|
||||
debug!(num_fetched_blobs, "Processing engine blobs");
|
||||
@@ -212,7 +209,7 @@ async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
|
||||
.process_engine_blobs(
|
||||
block.slot(),
|
||||
block_root,
|
||||
EngineGetBlobsOutput::Blobs(fixed_blob_sidecar_list.clone()),
|
||||
EngineGetBlobsOutput::Blobs(blobs_to_import_and_publish),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -225,7 +222,7 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
versioned_hashes: Vec<VersionedHash>,
|
||||
custody_columns_indices: HashSet<ColumnIndex>,
|
||||
publish_fn: impl Fn(BlobsOrDataColumns<T>) + Send + 'static,
|
||||
publish_fn: impl Fn(EngineGetBlobsOutput<T>) + Send + 'static,
|
||||
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
|
||||
let num_expected_blobs = versioned_hashes.len();
|
||||
|
||||
@@ -278,6 +275,7 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let chain_adapter = Arc::new(chain_adapter);
|
||||
let custody_columns = compute_and_publish_data_columns(
|
||||
&chain_adapter,
|
||||
block.clone(),
|
||||
@@ -303,15 +301,16 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
|
||||
|
||||
/// Offload the data column computation to a blocking task to avoid holding up the async runtime.
|
||||
async fn compute_and_publish_data_columns<T: BeaconChainTypes>(
|
||||
chain_adapter: &FetchBlobsBeaconAdapter<T>,
|
||||
chain_adapter: &Arc<FetchBlobsBeaconAdapter<T>>,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
|
||||
blobs: Vec<Blob<T::EthSpec>>,
|
||||
proofs: Vec<KzgProofs<T::EthSpec>>,
|
||||
custody_columns_indices: HashSet<ColumnIndex>,
|
||||
publish_fn: impl Fn(BlobsOrDataColumns<T>) + Send + 'static,
|
||||
) -> Result<DataColumnSidecarList<T::EthSpec>, FetchEngineBlobError> {
|
||||
publish_fn: impl Fn(EngineGetBlobsOutput<T>) + Send + 'static,
|
||||
) -> Result<Vec<GossipVerifiedDataColumn<T, DoNotObserve>>, FetchEngineBlobError> {
|
||||
let kzg = chain_adapter.kzg().clone();
|
||||
let spec = chain_adapter.spec().clone();
|
||||
let chain_adapter_cloned = chain_adapter.clone();
|
||||
chain_adapter
|
||||
.executor()
|
||||
.spawn_blocking_handle(
|
||||
@@ -338,8 +337,54 @@ async fn compute_and_publish_data_columns<T: BeaconChainTypes>(
|
||||
})
|
||||
.map_err(FetchEngineBlobError::DataColumnSidecarError)?;
|
||||
|
||||
publish_fn(BlobsOrDataColumns::DataColumns(custody_columns.clone()));
|
||||
Ok(custody_columns)
|
||||
// Gossip verify data columns before publishing. This prevents blobs with invalid
|
||||
// KZG proofs from the EL making it into the data availability checker. We do not
|
||||
// immediately add these blobs to the observed blobs/columns cache because we want
|
||||
// to allow blobs/columns to arrive on gossip and be accepted (and propagated) while
|
||||
// we are waiting to publish. Just before publishing we will observe the blobs/columns
|
||||
// and only proceed with publishing if they are not yet seen.
|
||||
// TODO(das): we may want to just perform kzg proof verification here, since the
|
||||
// `DataColumnSidecar` and inclusion proof is computed just above and is unnecessary
|
||||
// to verify them.
|
||||
let columns_to_import_and_publish = custody_columns
|
||||
.into_iter()
|
||||
.filter_map(|col| {
|
||||
match chain_adapter_cloned.verify_data_column_for_gossip(col) {
|
||||
Ok(verified) => Some(Ok(verified)),
|
||||
Err(e) => match e {
|
||||
// Ignore already seen data columns
|
||||
GossipDataColumnError::PriorKnown { .. }
|
||||
| GossipDataColumnError::PriorKnownUnpublished => None,
|
||||
GossipDataColumnError::BeaconChainError(_)
|
||||
| GossipDataColumnError::ProposalSignatureInvalid
|
||||
| GossipDataColumnError::UnknownValidator(_)
|
||||
| GossipDataColumnError::IsNotLaterThanParent { .. }
|
||||
| GossipDataColumnError::InvalidKzgProof(_)
|
||||
| GossipDataColumnError::InvalidSubnetId { .. }
|
||||
| GossipDataColumnError::FutureSlot { .. }
|
||||
| GossipDataColumnError::PastFinalizedSlot { .. }
|
||||
| GossipDataColumnError::PubkeyCacheTimeout
|
||||
| GossipDataColumnError::ProposerIndexMismatch { .. }
|
||||
| GossipDataColumnError::ParentUnknown { .. }
|
||||
| GossipDataColumnError::NotFinalizedDescendant { .. }
|
||||
| GossipDataColumnError::InvalidInclusionProof
|
||||
| GossipDataColumnError::InvalidColumnIndex(_)
|
||||
| GossipDataColumnError::UnexpectedDataColumn
|
||||
| GossipDataColumnError::InconsistentCommitmentsLength { .. }
|
||||
| GossipDataColumnError::InconsistentProofsLength { .. } => {
|
||||
Some(Err(e))
|
||||
}
|
||||
},
|
||||
}
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map_err(FetchEngineBlobError::GossipDataColumn)?;
|
||||
|
||||
publish_fn(EngineGetBlobsOutput::CustodyColumns(
|
||||
columns_to_import_and_publish.clone(),
|
||||
));
|
||||
|
||||
Ok(columns_to_import_and_publish)
|
||||
},
|
||||
"compute_and_publish_data_columns",
|
||||
)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::data_column_verification::GossipVerifiedDataColumn;
|
||||
use crate::fetch_blobs::fetch_blobs_beacon_adapter::MockFetchBlobsBeaconAdapter;
|
||||
use crate::fetch_blobs::{
|
||||
fetch_and_process_engine_blobs_inner, BlobsOrDataColumns, FetchEngineBlobError,
|
||||
fetch_and_process_engine_blobs_inner, EngineGetBlobsOutput, FetchEngineBlobError,
|
||||
};
|
||||
use crate::test_utils::{get_kzg, EphemeralHarnessType};
|
||||
use crate::AvailabilityProcessingStatus;
|
||||
@@ -148,6 +149,9 @@ async fn test_fetch_blobs_v2_success() {
|
||||
// All blobs returned, fork choice doesn't contain block
|
||||
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
|
||||
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
|
||||
mock_adapter
|
||||
.expect_verify_data_column_for_gossip()
|
||||
.returning(|c| Ok(GossipVerifiedDataColumn::__new_for_testing(c)));
|
||||
mock_process_engine_blobs_result(
|
||||
&mut mock_adapter,
|
||||
Ok(AvailabilityProcessingStatus::Imported(block_root)),
|
||||
@@ -174,16 +178,16 @@ async fn test_fetch_blobs_v2_success() {
|
||||
assert!(
|
||||
matches!(
|
||||
published_columns,
|
||||
BlobsOrDataColumns::DataColumns (columns) if columns.len() == custody_columns.len()
|
||||
EngineGetBlobsOutput::CustodyColumns(columns) if columns.len() == custody_columns.len()
|
||||
),
|
||||
"should publish custody columns"
|
||||
);
|
||||
}
|
||||
|
||||
/// Extract the `BlobsOrDataColumns` passed to the `publish_fn`.
|
||||
/// Extract the `EngineGetBlobsOutput` passed to the `publish_fn`.
|
||||
fn extract_published_blobs(
|
||||
publish_fn_args: Arc<Mutex<Vec<BlobsOrDataColumns<T>>>>,
|
||||
) -> BlobsOrDataColumns<T> {
|
||||
publish_fn_args: Arc<Mutex<Vec<EngineGetBlobsOutput<T>>>>,
|
||||
) -> EngineGetBlobsOutput<T> {
|
||||
let mut calls = publish_fn_args.lock().unwrap();
|
||||
assert_eq!(calls.len(), 1);
|
||||
calls.pop().unwrap()
|
||||
@@ -250,8 +254,8 @@ fn create_test_block_and_blobs(
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn mock_publish_fn() -> (
|
||||
impl Fn(BlobsOrDataColumns<T>) + Send + 'static,
|
||||
Arc<Mutex<Vec<BlobsOrDataColumns<T>>>>,
|
||||
impl Fn(EngineGetBlobsOutput<T>) + Send + 'static,
|
||||
Arc<Mutex<Vec<EngineGetBlobsOutput<T>>>>,
|
||||
) {
|
||||
// Keep track of the arguments captured by `publish_fn`.
|
||||
let captured_args = Arc::new(Mutex::new(vec![]));
|
||||
|
||||
@@ -424,6 +424,14 @@ fn build_gossip_verified_data_columns<T: BeaconChainTypes>(
|
||||
);
|
||||
Ok(None)
|
||||
}
|
||||
Err(GossipDataColumnError::PriorKnownUnpublished) => {
|
||||
debug!(
|
||||
column_index,
|
||||
%slot,
|
||||
"Data column for publication already known via the EL"
|
||||
);
|
||||
Ok(None)
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
column_index,
|
||||
|
||||
@@ -797,6 +797,19 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
}
|
||||
Err(err) => {
|
||||
match err {
|
||||
GossipDataColumnError::PriorKnownUnpublished => {
|
||||
debug!(
|
||||
%slot,
|
||||
%block_root,
|
||||
%index,
|
||||
"Gossip data column already processed via the EL. Accepting the column sidecar without re-processing."
|
||||
);
|
||||
self.propagate_validation_result(
|
||||
message_id,
|
||||
peer_id,
|
||||
MessageAcceptance::Accept,
|
||||
);
|
||||
}
|
||||
GossipDataColumnError::ParentUnknown { parent_root } => {
|
||||
debug!(
|
||||
action = "requesting parent",
|
||||
|
||||
@@ -5,7 +5,7 @@ use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob};
|
||||
use beacon_chain::block_verification_types::RpcBlock;
|
||||
use beacon_chain::data_column_verification::{observe_gossip_data_column, GossipDataColumnError};
|
||||
use beacon_chain::fetch_blobs::{
|
||||
fetch_and_process_engine_blobs, BlobsOrDataColumns, FetchEngineBlobError,
|
||||
fetch_and_process_engine_blobs, EngineGetBlobsOutput, FetchEngineBlobError,
|
||||
};
|
||||
use beacon_chain::observed_data_sidecars::DoNotObserve;
|
||||
use beacon_chain::{
|
||||
@@ -848,11 +848,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
let publish_fn = move |blobs_or_data_column| {
|
||||
if publish_blobs {
|
||||
match blobs_or_data_column {
|
||||
BlobsOrDataColumns::Blobs(blobs) => {
|
||||
EngineGetBlobsOutput::Blobs(blobs) => {
|
||||
self_cloned.publish_blobs_gradually(blobs, block_root);
|
||||
}
|
||||
BlobsOrDataColumns::DataColumns(columns) => {
|
||||
self_cloned.publish_data_columns_gradually(columns, block_root);
|
||||
EngineGetBlobsOutput::CustodyColumns(columns) => {
|
||||
self_cloned.publish_data_columns_gradually(
|
||||
columns.into_iter().map(|c| c.clone_data_column()).collect(),
|
||||
block_root,
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -9,13 +9,16 @@ use crate::{
|
||||
sync::{manager::BlockProcessType, SyncMessage},
|
||||
};
|
||||
use beacon_chain::block_verification_types::RpcBlock;
|
||||
use beacon_chain::data_column_verification::validate_data_column_sidecar_for_gossip;
|
||||
use beacon_chain::kzg_utils::blobs_to_data_column_sidecars;
|
||||
use beacon_chain::observed_data_sidecars::DoNotObserve;
|
||||
use beacon_chain::test_utils::{
|
||||
get_kzg, test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy,
|
||||
EphemeralHarnessType,
|
||||
};
|
||||
use beacon_chain::{BeaconChain, WhenSlotSkipped};
|
||||
use beacon_processor::{work_reprocessing_queue::*, *};
|
||||
use gossipsub::MessageAcceptance;
|
||||
use itertools::Itertools;
|
||||
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3};
|
||||
use lighthouse_network::rpc::InboundRequestId;
|
||||
@@ -25,6 +28,7 @@ use lighthouse_network::{
|
||||
types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield},
|
||||
Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response,
|
||||
};
|
||||
use matches::assert_matches;
|
||||
use slot_clock::SlotClock;
|
||||
use std::iter::Iterator;
|
||||
use std::sync::Arc;
|
||||
@@ -32,9 +36,9 @@ use std::time::Duration;
|
||||
use tokio::sync::mpsc;
|
||||
use types::blob_sidecar::FixedBlobSidecarList;
|
||||
use types::{
|
||||
Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, DataColumnSidecarList,
|
||||
DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof,
|
||||
SignedBeaconBlock, SignedVoluntaryExit, Slot, SubnetId,
|
||||
Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecarList,
|
||||
DataColumnSubnetId, Epoch, EthSpec, ForkName, Hash256, MainnetEthSpec, ProposerSlashing,
|
||||
SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, Slot, SubnetId,
|
||||
};
|
||||
|
||||
type E = MainnetEthSpec;
|
||||
@@ -64,7 +68,7 @@ struct TestRig {
|
||||
voluntary_exit: SignedVoluntaryExit,
|
||||
beacon_processor_tx: BeaconProcessorSend<E>,
|
||||
work_journal_rx: mpsc::Receiver<&'static str>,
|
||||
_network_rx: mpsc::UnboundedReceiver<NetworkMessage<E>>,
|
||||
network_rx: mpsc::UnboundedReceiver<NetworkMessage<E>>,
|
||||
_sync_rx: mpsc::UnboundedReceiver<SyncMessage<E>>,
|
||||
duplicate_cache: DuplicateCache,
|
||||
network_beacon_processor: Arc<NetworkBeaconProcessor<T>>,
|
||||
@@ -83,19 +87,18 @@ impl Drop for TestRig {
|
||||
|
||||
impl TestRig {
|
||||
pub async fn new(chain_length: u64) -> Self {
|
||||
Self::new_parametric(
|
||||
chain_length,
|
||||
BeaconProcessorConfig::default().enable_backfill_rate_limiting,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn new_parametric(chain_length: u64, enable_backfill_rate_limiting: bool) -> Self {
|
||||
// This allows for testing voluntary exits without building out a massive chain.
|
||||
let mut spec = test_spec::<E>();
|
||||
spec.shard_committee_period = 2;
|
||||
let spec = Arc::new(spec);
|
||||
Self::new_parametric(chain_length, BeaconProcessorConfig::default(), spec).await
|
||||
}
|
||||
|
||||
pub async fn new_parametric(
|
||||
chain_length: u64,
|
||||
beacon_processor_config: BeaconProcessorConfig,
|
||||
spec: ChainSpec,
|
||||
) -> Self {
|
||||
let spec = Arc::new(spec);
|
||||
let harness = BeaconChainHarness::builder(MainnetEthSpec)
|
||||
.spec(spec.clone())
|
||||
.deterministic_keypairs(VALIDATOR_COUNT)
|
||||
@@ -183,12 +186,8 @@ impl TestRig {
|
||||
|
||||
let chain = harness.chain.clone();
|
||||
|
||||
let (network_tx, _network_rx) = mpsc::unbounded_channel();
|
||||
let (network_tx, network_rx) = mpsc::unbounded_channel();
|
||||
|
||||
let beacon_processor_config = BeaconProcessorConfig {
|
||||
enable_backfill_rate_limiting,
|
||||
..Default::default()
|
||||
};
|
||||
let BeaconProcessorChannels {
|
||||
beacon_processor_tx,
|
||||
beacon_processor_rx,
|
||||
@@ -304,7 +303,7 @@ impl TestRig {
|
||||
voluntary_exit,
|
||||
beacon_processor_tx,
|
||||
work_journal_rx,
|
||||
_network_rx,
|
||||
network_rx,
|
||||
_sync_rx,
|
||||
duplicate_cache,
|
||||
network_beacon_processor,
|
||||
@@ -643,6 +642,50 @@ impl TestRig {
|
||||
|
||||
assert_eq!(events, expected);
|
||||
}
|
||||
|
||||
/// Listen for network messages and collect them for a specified duration or until reaching a count.
|
||||
///
|
||||
/// Returns None if no messages were received, or Some(Vec) containing the received messages.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `timeout` - Maximum duration to listen for messages
|
||||
/// * `count` - Optional maximum number of messages to collect before returning
|
||||
pub async fn receive_network_messages_with_timeout(
|
||||
&mut self,
|
||||
timeout: Duration,
|
||||
count: Option<usize>,
|
||||
) -> Option<Vec<NetworkMessage<E>>> {
|
||||
let mut events = vec![];
|
||||
|
||||
let timeout_future = tokio::time::sleep(timeout);
|
||||
tokio::pin!(timeout_future);
|
||||
|
||||
loop {
|
||||
// Break if we've received the requested count of messages
|
||||
if let Some(target_count) = count {
|
||||
if events.len() >= target_count {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
tokio::select! {
|
||||
_ = &mut timeout_future => break,
|
||||
maybe_msg = self.network_rx.recv() => {
|
||||
match maybe_msg {
|
||||
Some(msg) => events.push(msg),
|
||||
None => break, // Channel closed
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if events.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(events)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn junk_peer_id() -> PeerId {
|
||||
@@ -753,6 +796,58 @@ async fn import_gossip_block_unacceptably_early() {
|
||||
);
|
||||
}
|
||||
|
||||
/// Data columns that have already been processed but unobserved should be propagated without re-importing.
|
||||
#[tokio::test]
|
||||
async fn accept_processed_gossip_data_columns_without_import() {
|
||||
let processor_config = BeaconProcessorConfig::default();
|
||||
let fulu_genesis_spec = ForkName::Fulu.make_genesis_spec(E::default_spec());
|
||||
let mut rig = TestRig::new_parametric(SMALL_CHAIN, processor_config, fulu_genesis_spec).await;
|
||||
|
||||
// GIVEN the data columns have already been processed but unobserved.
|
||||
// 1. verify data column with `DoNotObserve` to create verified but unobserved data columns.
|
||||
// 2. put verified but unobserved data columns into the data availability cache.
|
||||
let verified_data_columns: Vec<_> = rig
|
||||
.next_data_columns
|
||||
.clone()
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|data_column| {
|
||||
let subnet_id = data_column.index;
|
||||
validate_data_column_sidecar_for_gossip::<_, DoNotObserve>(
|
||||
data_column,
|
||||
subnet_id,
|
||||
&rig.chain,
|
||||
)
|
||||
.expect("should be valid data column")
|
||||
})
|
||||
.collect();
|
||||
|
||||
let block_root = rig.next_block.canonical_root();
|
||||
rig.chain
|
||||
.data_availability_checker
|
||||
.put_gossip_verified_data_columns(block_root, verified_data_columns)
|
||||
.expect("should put data columns into availability cache");
|
||||
|
||||
// WHEN an already processed but unobserved data column is received via gossip
|
||||
rig.enqueue_gossip_data_columns(0);
|
||||
|
||||
// THEN the data column should be propagated without re-importing (not sure if there's an easy way to test this)
|
||||
let network_message = rig
|
||||
.receive_network_messages_with_timeout(Duration::from_millis(100), Some(1))
|
||||
.await
|
||||
.and_then(|mut vec| vec.pop())
|
||||
.expect("should receive network messages");
|
||||
|
||||
assert_matches!(
|
||||
network_message,
|
||||
NetworkMessage::ValidationResult {
|
||||
propagation_source: _,
|
||||
message_id: _,
|
||||
validation_result: MessageAcceptance::Accept,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
/// Blocks that arrive on-time should be processed normally.
|
||||
#[tokio::test]
|
||||
async fn import_gossip_block_at_current_slot() {
|
||||
@@ -1192,8 +1287,12 @@ async fn test_backfill_sync_processing() {
|
||||
/// Ensure that backfill batches get processed as fast as they can when rate-limiting is disabled.
|
||||
#[tokio::test]
|
||||
async fn test_backfill_sync_processing_rate_limiting_disabled() {
|
||||
let enable_backfill_rate_limiting = false;
|
||||
let mut rig = TestRig::new_parametric(SMALL_CHAIN, enable_backfill_rate_limiting).await;
|
||||
let beacon_processor_config = BeaconProcessorConfig {
|
||||
enable_backfill_rate_limiting: false,
|
||||
..Default::default()
|
||||
};
|
||||
let mut rig =
|
||||
TestRig::new_parametric(SMALL_CHAIN, beacon_processor_config, test_spec::<E>()).await;
|
||||
|
||||
for _ in 0..3 {
|
||||
rig.enqueue_backfill_batch();
|
||||
@@ -1236,7 +1335,7 @@ async fn test_blobs_by_range() {
|
||||
.unwrap_or(0);
|
||||
}
|
||||
let mut actual_count = 0;
|
||||
while let Some(next) = rig._network_rx.recv().await {
|
||||
while let Some(next) = rig.network_rx.recv().await {
|
||||
if let NetworkMessage::SendResponse {
|
||||
peer_id: _,
|
||||
response: Response::BlobsByRange(blob),
|
||||
|
||||
@@ -14,6 +14,7 @@ use std::time::Duration;
|
||||
use super::*;
|
||||
|
||||
use crate::sync::block_lookups::common::ResponseType;
|
||||
use beacon_chain::observed_data_sidecars::Observe;
|
||||
use beacon_chain::{
|
||||
blob_verification::GossipVerifiedBlob,
|
||||
block_verification_types::{AsBlock, BlockImportData},
|
||||
@@ -1229,7 +1230,12 @@ impl TestRig {
|
||||
.harness
|
||||
.chain
|
||||
.data_availability_checker
|
||||
.put_gossip_blob(GossipVerifiedBlob::__assumed_valid(blob.into()))
|
||||
.put_gossip_verified_blobs(
|
||||
blob.block_root(),
|
||||
std::iter::once(GossipVerifiedBlob::<_, Observe>::__assumed_valid(
|
||||
blob.into(),
|
||||
)),
|
||||
)
|
||||
.unwrap()
|
||||
{
|
||||
Availability::Available(_) => panic!("blob removed from da_checker, available"),
|
||||
|
||||
Reference in New Issue
Block a user