Merge branch 'peerdas-devnet-7' into peerdas-rangesync

This commit is contained in:
Jimmy Chen
2025-06-03 18:20:54 +10:00
committed by GitHub
63 changed files with 1660 additions and 978 deletions

View File

@@ -797,6 +797,19 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
Err(err) => {
match err {
GossipDataColumnError::PriorKnownUnpublished => {
debug!(
%slot,
%block_root,
%index,
"Gossip data column already processed via the EL. Accepting the column sidecar without re-processing."
);
self.propagate_validation_result(
message_id,
peer_id,
MessageAcceptance::Accept,
);
}
GossipDataColumnError::ParentUnknown { parent_root } => {
debug!(
action = "requesting parent",
@@ -2767,6 +2780,26 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
MessageAcceptance::Ignore,
);
}
BeaconChainError::AttestationValidationError(e) => {
// Failures from `get_attesting_indices` end up here.
debug!(
%peer_id,
block_root = ?beacon_block_root,
attestation_slot = %failed_att.attestation_data().slot,
error = ?e,
"Rejecting attestation that failed validation"
);
self.propagate_validation_result(
message_id,
peer_id,
MessageAcceptance::Reject,
);
self.gossip_penalize_peer(
peer_id,
PeerAction::MidToleranceError,
"attn_validation_error",
);
}
_ => {
/*
* Lighthouse hit an unexpected error whilst processing the attestation. It

View File

@@ -5,7 +5,7 @@ use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::data_column_verification::{observe_gossip_data_column, GossipDataColumnError};
use beacon_chain::fetch_blobs::{
fetch_and_process_engine_blobs, BlobsOrDataColumns, FetchEngineBlobError,
fetch_and_process_engine_blobs, EngineGetBlobsOutput, FetchEngineBlobError,
};
use beacon_chain::observed_data_sidecars::DoNotObserve;
use beacon_chain::{
@@ -848,11 +848,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let publish_fn = move |blobs_or_data_column| {
if publish_blobs {
match blobs_or_data_column {
BlobsOrDataColumns::Blobs(blobs) => {
EngineGetBlobsOutput::Blobs(blobs) => {
self_cloned.publish_blobs_gradually(blobs, block_root);
}
BlobsOrDataColumns::DataColumns(columns) => {
self_cloned.publish_data_columns_gradually(columns, block_root);
EngineGetBlobsOutput::CustodyColumns(columns) => {
self_cloned.publish_data_columns_gradually(
columns.into_iter().map(|c| c.clone_data_column()).collect(),
block_root,
);
}
};
}

View File

@@ -9,13 +9,16 @@ use crate::{
sync::{manager::BlockProcessType, SyncMessage},
};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::data_column_verification::validate_data_column_sidecar_for_gossip;
use beacon_chain::kzg_utils::blobs_to_data_column_sidecars;
use beacon_chain::observed_data_sidecars::DoNotObserve;
use beacon_chain::test_utils::{
get_kzg, test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy,
EphemeralHarnessType,
};
use beacon_chain::{BeaconChain, WhenSlotSkipped};
use beacon_processor::{work_reprocessing_queue::*, *};
use gossipsub::MessageAcceptance;
use itertools::Itertools;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3};
use lighthouse_network::rpc::InboundRequestId;
@@ -25,6 +28,7 @@ use lighthouse_network::{
types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield},
Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response,
};
use matches::assert_matches;
use slot_clock::SlotClock;
use std::iter::Iterator;
use std::sync::Arc;
@@ -32,7 +36,7 @@ use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, DataColumnSidecarList,
Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecarList,
DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedVoluntaryExit, Slot, SubnetId,
};
@@ -64,7 +68,7 @@ struct TestRig {
voluntary_exit: SignedVoluntaryExit,
beacon_processor_tx: BeaconProcessorSend<E>,
work_journal_rx: mpsc::Receiver<&'static str>,
_network_rx: mpsc::UnboundedReceiver<NetworkMessage<E>>,
network_rx: mpsc::UnboundedReceiver<NetworkMessage<E>>,
_sync_rx: mpsc::UnboundedReceiver<SyncMessage<E>>,
duplicate_cache: DuplicateCache,
network_beacon_processor: Arc<NetworkBeaconProcessor<T>>,
@@ -83,19 +87,18 @@ impl Drop for TestRig {
impl TestRig {
pub async fn new(chain_length: u64) -> Self {
Self::new_parametric(
chain_length,
BeaconProcessorConfig::default().enable_backfill_rate_limiting,
)
.await
}
pub async fn new_parametric(chain_length: u64, enable_backfill_rate_limiting: bool) -> Self {
// This allows for testing voluntary exits without building out a massive chain.
let mut spec = test_spec::<E>();
spec.shard_committee_period = 2;
let spec = Arc::new(spec);
Self::new_parametric(chain_length, BeaconProcessorConfig::default(), spec).await
}
pub async fn new_parametric(
chain_length: u64,
beacon_processor_config: BeaconProcessorConfig,
spec: ChainSpec,
) -> Self {
let spec = Arc::new(spec);
let harness = BeaconChainHarness::builder(MainnetEthSpec)
.spec(spec.clone())
.deterministic_keypairs(VALIDATOR_COUNT)
@@ -126,6 +129,14 @@ impl TestRig {
"precondition: current slot is one after head"
);
// Ensure there is a blob in the next block. Required for some tests.
harness
.mock_execution_layer
.as_ref()
.unwrap()
.server
.execution_block_generator()
.set_min_blob_count(1);
let (next_block_tuple, next_state) = harness
.make_block(head.beacon_state.clone(), harness.chain.slot().unwrap())
.await;
@@ -183,12 +194,8 @@ impl TestRig {
let chain = harness.chain.clone();
let (network_tx, _network_rx) = mpsc::unbounded_channel();
let (network_tx, network_rx) = mpsc::unbounded_channel();
let beacon_processor_config = BeaconProcessorConfig {
enable_backfill_rate_limiting,
..Default::default()
};
let BeaconProcessorChannels {
beacon_processor_tx,
beacon_processor_rx,
@@ -304,7 +311,7 @@ impl TestRig {
voluntary_exit,
beacon_processor_tx,
work_journal_rx,
_network_rx,
network_rx,
_sync_rx,
duplicate_cache,
network_beacon_processor,
@@ -643,6 +650,50 @@ impl TestRig {
assert_eq!(events, expected);
}
/// Listen for network messages and collect them for a specified duration or until reaching a count.
///
/// Returns None if no messages were received, or Some(Vec) containing the received messages.
///
/// # Arguments
///
/// * `timeout` - Maximum duration to listen for messages
/// * `count` - Optional maximum number of messages to collect before returning
pub async fn receive_network_messages_with_timeout(
&mut self,
timeout: Duration,
count: Option<usize>,
) -> Option<Vec<NetworkMessage<E>>> {
let mut events = vec![];
let timeout_future = tokio::time::sleep(timeout);
tokio::pin!(timeout_future);
loop {
// Break if we've received the requested count of messages
if let Some(target_count) = count {
if events.len() >= target_count {
break;
}
}
tokio::select! {
_ = &mut timeout_future => break,
maybe_msg = self.network_rx.recv() => {
match maybe_msg {
Some(msg) => events.push(msg),
None => break, // Channel closed
}
}
}
}
if events.is_empty() {
None
} else {
Some(events)
}
}
}
fn junk_peer_id() -> PeerId {
@@ -753,6 +804,60 @@ async fn import_gossip_block_unacceptably_early() {
);
}
/// Data columns that have already been processed but unobserved should be propagated without re-importing.
#[tokio::test]
async fn accept_processed_gossip_data_columns_without_import() {
if test_spec::<E>().fulu_fork_epoch.is_none() {
return;
};
let mut rig = TestRig::new(SMALL_CHAIN).await;
// GIVEN the data columns have already been processed but unobserved.
// 1. verify data column with `DoNotObserve` to create verified but unobserved data columns.
// 2. put verified but unobserved data columns into the data availability cache.
let verified_data_columns: Vec<_> = rig
.next_data_columns
.clone()
.unwrap()
.into_iter()
.map(|data_column| {
let subnet_id = data_column.index;
validate_data_column_sidecar_for_gossip::<_, DoNotObserve>(
data_column,
subnet_id,
&rig.chain,
)
.expect("should be valid data column")
})
.collect();
let block_root = rig.next_block.canonical_root();
rig.chain
.data_availability_checker
.put_gossip_verified_data_columns(block_root, verified_data_columns)
.expect("should put data columns into availability cache");
// WHEN an already processed but unobserved data column is received via gossip
rig.enqueue_gossip_data_columns(0);
// THEN the data column should be propagated without re-importing (not sure if there's an easy way to test this)
let network_message = rig
.receive_network_messages_with_timeout(Duration::from_millis(100), Some(1))
.await
.and_then(|mut vec| vec.pop())
.expect("should receive network messages");
assert_matches!(
network_message,
NetworkMessage::ValidationResult {
propagation_source: _,
message_id: _,
validation_result: MessageAcceptance::Accept,
}
);
}
/// Blocks that arrive on-time should be processed normally.
#[tokio::test]
async fn import_gossip_block_at_current_slot() {
@@ -1192,8 +1297,12 @@ async fn test_backfill_sync_processing() {
/// Ensure that backfill batches get processed as fast as they can when rate-limiting is disabled.
#[tokio::test]
async fn test_backfill_sync_processing_rate_limiting_disabled() {
let enable_backfill_rate_limiting = false;
let mut rig = TestRig::new_parametric(SMALL_CHAIN, enable_backfill_rate_limiting).await;
let beacon_processor_config = BeaconProcessorConfig {
enable_backfill_rate_limiting: false,
..Default::default()
};
let mut rig =
TestRig::new_parametric(SMALL_CHAIN, beacon_processor_config, test_spec::<E>()).await;
for _ in 0..3 {
rig.enqueue_backfill_batch();
@@ -1236,7 +1345,7 @@ async fn test_blobs_by_range() {
.unwrap_or(0);
}
let mut actual_count = 0;
while let Some(next) = rig._network_rx.recv().await {
while let Some(next) = rig.network_rx.recv().await {
if let NetworkMessage::SendResponse {
peer_id: _,
response: Response::BlobsByRange(blob),

View File

@@ -14,6 +14,7 @@ use std::time::Duration;
use super::*;
use crate::sync::block_lookups::common::ResponseType;
use beacon_chain::observed_data_sidecars::Observe;
use beacon_chain::{
blob_verification::GossipVerifiedBlob,
block_verification_types::{AsBlock, BlockImportData},
@@ -1322,7 +1323,12 @@ impl TestRig {
.harness
.chain
.data_availability_checker
.put_gossip_blob(GossipVerifiedBlob::__assumed_valid(blob.into()))
.put_gossip_verified_blobs(
blob.block_root(),
std::iter::once(GossipVerifiedBlob::<_, Observe>::__assumed_valid(
blob.into(),
)),
)
.unwrap()
{
Availability::Available(_) => panic!("blob removed from da_checker, available"),