Verify getBlobsV2 response and avoid reprocessing imported data columns (#7493)

#7461 and partly #6439.

Desired behaviour after receiving `engine_getBlobs` response:

1. Gossip verify the blobs and proofs, but don't mark them as observed yet. This is because not all blobs are published immediately (due to staggered publishing). If we mark them as observed and not publish them, we could end up blocking the gossip propagation.
2. Blobs are marked as observed _either_ when:
* They are received from gossip and forwarded to the network .
* They are published by the node.

Current behaviour:
-  We only gossip verify `engine_getBlobsV1` responses, but not `engine_getBlobsV2` responses (PeerDAS).
-  After importing EL blobs AND before they're published, if the same blobs arrive via gossip, they will get re-processed, which may result in a re-import.


  1. Perform gossip verification on data columns computed from EL `getBlobsV2` response. We currently only do this for `getBlobsV1` to prevent importing blobs with invalid proofs into the `DataAvailabilityChecker`, this should be done on V2 responses too.
2. Add additional gossip verification to make sure we don't re-process a ~~blob~~ or data column that was imported via the EL `getBlobs` but not yet "seen" on the gossip network. If an "unobserved" gossip blob is found in the availability cache, then we know it has passed verification so we can immediately propagate the `ACCEPT` result and forward it to the network, but without re-processing it.

**UPDATE:** I've left blobs out for the second change mentioned above, as the likelihood and impact is very slow and we haven't seen it enough, but under PeerDAS this issue is a regular occurrence and we do see the same block getting imported many times.
This commit is contained in:
Jimmy Chen
2025-05-27 05:55:58 +10:00
committed by GitHub
parent f01dc556d1
commit e6ef644db4
12 changed files with 371 additions and 160 deletions

View File

@@ -797,6 +797,19 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
Err(err) => {
match err {
GossipDataColumnError::PriorKnownUnpublished => {
debug!(
%slot,
%block_root,
%index,
"Gossip data column already processed via the EL. Accepting the column sidecar without re-processing."
);
self.propagate_validation_result(
message_id,
peer_id,
MessageAcceptance::Accept,
);
}
GossipDataColumnError::ParentUnknown { parent_root } => {
debug!(
action = "requesting parent",

View File

@@ -5,7 +5,7 @@ use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::data_column_verification::{observe_gossip_data_column, GossipDataColumnError};
use beacon_chain::fetch_blobs::{
fetch_and_process_engine_blobs, BlobsOrDataColumns, FetchEngineBlobError,
fetch_and_process_engine_blobs, EngineGetBlobsOutput, FetchEngineBlobError,
};
use beacon_chain::observed_data_sidecars::DoNotObserve;
use beacon_chain::{
@@ -848,11 +848,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let publish_fn = move |blobs_or_data_column| {
if publish_blobs {
match blobs_or_data_column {
BlobsOrDataColumns::Blobs(blobs) => {
EngineGetBlobsOutput::Blobs(blobs) => {
self_cloned.publish_blobs_gradually(blobs, block_root);
}
BlobsOrDataColumns::DataColumns(columns) => {
self_cloned.publish_data_columns_gradually(columns, block_root);
EngineGetBlobsOutput::CustodyColumns(columns) => {
self_cloned.publish_data_columns_gradually(
columns.into_iter().map(|c| c.clone_data_column()).collect(),
block_root,
);
}
};
}

View File

@@ -9,13 +9,16 @@ use crate::{
sync::{manager::BlockProcessType, SyncMessage},
};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::data_column_verification::validate_data_column_sidecar_for_gossip;
use beacon_chain::kzg_utils::blobs_to_data_column_sidecars;
use beacon_chain::observed_data_sidecars::DoNotObserve;
use beacon_chain::test_utils::{
get_kzg, test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy,
EphemeralHarnessType,
};
use beacon_chain::{BeaconChain, WhenSlotSkipped};
use beacon_processor::{work_reprocessing_queue::*, *};
use gossipsub::MessageAcceptance;
use itertools::Itertools;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3};
use lighthouse_network::rpc::InboundRequestId;
@@ -25,6 +28,7 @@ use lighthouse_network::{
types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield},
Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response,
};
use matches::assert_matches;
use slot_clock::SlotClock;
use std::iter::Iterator;
use std::sync::Arc;
@@ -32,9 +36,9 @@ use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, DataColumnSidecarList,
DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedVoluntaryExit, Slot, SubnetId,
Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecarList,
DataColumnSubnetId, Epoch, EthSpec, ForkName, Hash256, MainnetEthSpec, ProposerSlashing,
SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, Slot, SubnetId,
};
type E = MainnetEthSpec;
@@ -64,7 +68,7 @@ struct TestRig {
voluntary_exit: SignedVoluntaryExit,
beacon_processor_tx: BeaconProcessorSend<E>,
work_journal_rx: mpsc::Receiver<&'static str>,
_network_rx: mpsc::UnboundedReceiver<NetworkMessage<E>>,
network_rx: mpsc::UnboundedReceiver<NetworkMessage<E>>,
_sync_rx: mpsc::UnboundedReceiver<SyncMessage<E>>,
duplicate_cache: DuplicateCache,
network_beacon_processor: Arc<NetworkBeaconProcessor<T>>,
@@ -83,19 +87,18 @@ impl Drop for TestRig {
impl TestRig {
pub async fn new(chain_length: u64) -> Self {
Self::new_parametric(
chain_length,
BeaconProcessorConfig::default().enable_backfill_rate_limiting,
)
.await
}
pub async fn new_parametric(chain_length: u64, enable_backfill_rate_limiting: bool) -> Self {
// This allows for testing voluntary exits without building out a massive chain.
let mut spec = test_spec::<E>();
spec.shard_committee_period = 2;
let spec = Arc::new(spec);
Self::new_parametric(chain_length, BeaconProcessorConfig::default(), spec).await
}
pub async fn new_parametric(
chain_length: u64,
beacon_processor_config: BeaconProcessorConfig,
spec: ChainSpec,
) -> Self {
let spec = Arc::new(spec);
let harness = BeaconChainHarness::builder(MainnetEthSpec)
.spec(spec.clone())
.deterministic_keypairs(VALIDATOR_COUNT)
@@ -183,12 +186,8 @@ impl TestRig {
let chain = harness.chain.clone();
let (network_tx, _network_rx) = mpsc::unbounded_channel();
let (network_tx, network_rx) = mpsc::unbounded_channel();
let beacon_processor_config = BeaconProcessorConfig {
enable_backfill_rate_limiting,
..Default::default()
};
let BeaconProcessorChannels {
beacon_processor_tx,
beacon_processor_rx,
@@ -304,7 +303,7 @@ impl TestRig {
voluntary_exit,
beacon_processor_tx,
work_journal_rx,
_network_rx,
network_rx,
_sync_rx,
duplicate_cache,
network_beacon_processor,
@@ -643,6 +642,50 @@ impl TestRig {
assert_eq!(events, expected);
}
/// Listen for network messages and collect them for a specified duration or until reaching a count.
///
/// Returns None if no messages were received, or Some(Vec) containing the received messages.
///
/// # Arguments
///
/// * `timeout` - Maximum duration to listen for messages
/// * `count` - Optional maximum number of messages to collect before returning
pub async fn receive_network_messages_with_timeout(
&mut self,
timeout: Duration,
count: Option<usize>,
) -> Option<Vec<NetworkMessage<E>>> {
let mut events = vec![];
let timeout_future = tokio::time::sleep(timeout);
tokio::pin!(timeout_future);
loop {
// Break if we've received the requested count of messages
if let Some(target_count) = count {
if events.len() >= target_count {
break;
}
}
tokio::select! {
_ = &mut timeout_future => break,
maybe_msg = self.network_rx.recv() => {
match maybe_msg {
Some(msg) => events.push(msg),
None => break, // Channel closed
}
}
}
}
if events.is_empty() {
None
} else {
Some(events)
}
}
}
fn junk_peer_id() -> PeerId {
@@ -753,6 +796,58 @@ async fn import_gossip_block_unacceptably_early() {
);
}
/// Data columns that have already been processed but unobserved should be propagated without re-importing.
#[tokio::test]
async fn accept_processed_gossip_data_columns_without_import() {
let processor_config = BeaconProcessorConfig::default();
let fulu_genesis_spec = ForkName::Fulu.make_genesis_spec(E::default_spec());
let mut rig = TestRig::new_parametric(SMALL_CHAIN, processor_config, fulu_genesis_spec).await;
// GIVEN the data columns have already been processed but unobserved.
// 1. verify data column with `DoNotObserve` to create verified but unobserved data columns.
// 2. put verified but unobserved data columns into the data availability cache.
let verified_data_columns: Vec<_> = rig
.next_data_columns
.clone()
.unwrap()
.into_iter()
.map(|data_column| {
let subnet_id = data_column.index;
validate_data_column_sidecar_for_gossip::<_, DoNotObserve>(
data_column,
subnet_id,
&rig.chain,
)
.expect("should be valid data column")
})
.collect();
let block_root = rig.next_block.canonical_root();
rig.chain
.data_availability_checker
.put_gossip_verified_data_columns(block_root, verified_data_columns)
.expect("should put data columns into availability cache");
// WHEN an already processed but unobserved data column is received via gossip
rig.enqueue_gossip_data_columns(0);
// THEN the data column should be propagated without re-importing (not sure if there's an easy way to test this)
let network_message = rig
.receive_network_messages_with_timeout(Duration::from_millis(100), Some(1))
.await
.and_then(|mut vec| vec.pop())
.expect("should receive network messages");
assert_matches!(
network_message,
NetworkMessage::ValidationResult {
propagation_source: _,
message_id: _,
validation_result: MessageAcceptance::Accept,
}
);
}
/// Blocks that arrive on-time should be processed normally.
#[tokio::test]
async fn import_gossip_block_at_current_slot() {
@@ -1192,8 +1287,12 @@ async fn test_backfill_sync_processing() {
/// Ensure that backfill batches get processed as fast as they can when rate-limiting is disabled.
#[tokio::test]
async fn test_backfill_sync_processing_rate_limiting_disabled() {
let enable_backfill_rate_limiting = false;
let mut rig = TestRig::new_parametric(SMALL_CHAIN, enable_backfill_rate_limiting).await;
let beacon_processor_config = BeaconProcessorConfig {
enable_backfill_rate_limiting: false,
..Default::default()
};
let mut rig =
TestRig::new_parametric(SMALL_CHAIN, beacon_processor_config, test_spec::<E>()).await;
for _ in 0..3 {
rig.enqueue_backfill_batch();
@@ -1236,7 +1335,7 @@ async fn test_blobs_by_range() {
.unwrap_or(0);
}
let mut actual_count = 0;
while let Some(next) = rig._network_rx.recv().await {
while let Some(next) = rig.network_rx.recv().await {
if let NetworkMessage::SendResponse {
peer_id: _,
response: Response::BlobsByRange(blob),

View File

@@ -14,6 +14,7 @@ use std::time::Duration;
use super::*;
use crate::sync::block_lookups::common::ResponseType;
use beacon_chain::observed_data_sidecars::Observe;
use beacon_chain::{
blob_verification::GossipVerifiedBlob,
block_verification_types::{AsBlock, BlockImportData},
@@ -1229,7 +1230,12 @@ impl TestRig {
.harness
.chain
.data_availability_checker
.put_gossip_blob(GossipVerifiedBlob::__assumed_valid(blob.into()))
.put_gossip_verified_blobs(
blob.block_root(),
std::iter::once(GossipVerifiedBlob::<_, Observe>::__assumed_valid(
blob.into(),
)),
)
.unwrap()
{
Availability::Available(_) => panic!("blob removed from da_checker, available"),