Merge sigp/unstable into gloas-lookup-sync-fixes

Brings in the gossip-blob deprecation (#9126) and 17 other unstable
commits. Conflict resolutions (8 files):

- Kept our unified `SyncMessage::UnknownParentSidecarHeader` design over
  unstable's separate `UnknownParentDataColumn`/`UnknownParentPartialDataColumn`
  variants (gossip_methods, manager, single_block_lookup, mod, tests).
- Adopted unstable's gossip-blob deprecation: dropped `process_gossip_blob`,
  `process_gossip_verified_blob`, and the blob parent-unknown test path.
- Took unstable's `process_gossip_verified_data_column` (Result-returning
  `to_partial`), router PayloadEnvelopesByRoot flattened match, and combined
  `BlockProcessType::id` arm.
- Dropped unstable's gloas-lookup-sync boilerplate stubs (#9322) that
  duplicated our real impls: `process_lookup_envelope`,
  `rpc_payload_envelope_received`, `on_single_payload_envelope_response`,
  and the `SinglePayloadEnvelope` processing-result arm.

cargo check -p network passes clean.
This commit is contained in:
dapplion
2026-06-01 06:15:12 +02:00
134 changed files with 3321 additions and 3910 deletions

View File

@@ -11,6 +11,7 @@ mod subnet_service;
mod sync;
pub use lighthouse_network::NetworkConfig;
pub use network_beacon_processor::NetworkBeaconProcessor;
pub use service::{
NetworkMessage, NetworkReceivers, NetworkSenders, NetworkService, ValidatorSubscriptionMessage,
};

View File

@@ -128,13 +128,6 @@ pub static BEACON_PROCESSOR_GOSSIP_BLOCK_EARLY_SECONDS: LazyLock<Result<Histogra
)
},
);
pub static BEACON_PROCESSOR_GOSSIP_BLOB_VERIFIED_TOTAL: LazyLock<Result<IntCounter>> =
LazyLock::new(|| {
try_create_int_counter(
"beacon_processor_gossip_blob_verified_total",
"Total number of gossip blob verified for propagation.",
)
});
pub static BEACON_PROCESSOR_GOSSIP_DATA_COLUMN_SIDECAR_VERIFIED_TOTAL: LazyLock<
Result<IntCounter>,
> = LazyLock::new(|| {
@@ -600,12 +593,6 @@ pub static BEACON_BLOCK_DELAY_GOSSIP_ARRIVED_LATE_TOTAL: LazyLock<Result<IntCoun
/*
* Blob Delay Metrics
*/
pub static BEACON_BLOB_DELAY_GOSSIP: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
try_create_int_gauge(
"beacon_blob_delay_gossip_last_delay",
"The first time we see this blob as a delay from the start of the slot",
)
});
pub static BEACON_DATA_COLUMN_GOSSIP_PROPAGATION_VERIFICATION_DELAY_TIME: LazyLock<
Result<Histogram>,
@@ -664,14 +651,6 @@ pub static BEACON_USEFUL_FULL_COLUMNS_RECEIVED_TOTAL: LazyLock<Result<IntCounter
)
});
pub static BEACON_BLOB_DELAY_GOSSIP_VERIFICATION: LazyLock<Result<IntGauge>> = LazyLock::new(
|| {
try_create_int_gauge(
"beacon_blob_delay_gossip_verification",
"Keeps track of the time delay from the start of the slot to the point we propagate the blob",
)
},
);
pub static BEACON_BLOB_DELAY_FULL_VERIFICATION: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
try_create_int_gauge(
"beacon_blob_last_full_verification_delay",
@@ -695,15 +674,6 @@ pub static BEACON_BLOB_RPC_SLOT_START_DELAY_TIME: LazyLock<Result<Histogram>> =
},
);
pub static BEACON_BLOB_GOSSIP_ARRIVED_LATE_TOTAL: LazyLock<Result<IntCounter>> = LazyLock::new(
|| {
try_create_int_counter(
"beacon_blob_gossip_arrived_late_total",
"Count of times when a gossip blob arrived from the network later than the attestation deadline.",
)
},
);
/*
* Light client update reprocessing queue metrics.
*/

View File

@@ -11,6 +11,9 @@ use beacon_chain::data_column_verification::{
PartialColumnVerificationResult,
};
use beacon_chain::payload_bid_verification::PayloadBidError;
use beacon_chain::payload_envelope_verification::{
EnvelopeError, gossip_verified_envelope::GossipVerifiedEnvelope,
};
use beacon_chain::proposer_preferences_verification::ProposerPreferencesError;
use beacon_chain::store::Error;
use beacon_chain::{
@@ -27,12 +30,6 @@ use beacon_chain::{
sync_committee_verification::{self, Error as SyncCommitteeError},
validator_monitor::{get_block_delay_ms, get_slot_delay_ms},
};
use beacon_chain::{
blob_verification::{GossipBlobError, GossipVerifiedBlob},
payload_envelope_verification::{
EnvelopeError, gossip_verified_envelope::GossipVerifiedEnvelope,
},
};
use beacon_processor::{Work, WorkEvent};
use lighthouse_network::{
Client, GossipTopic, MessageAcceptance, MessageId, PeerAction, PeerId, PubsubMessage,
@@ -50,13 +47,13 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use store::hot_cold_store::HotColdDBError;
use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
use types::{
Attestation, AttestationData, AttestationRef, AttesterSlashing, BlobSidecar, ColumnIndex,
DataColumnSidecar, DataColumnSubnetId, EthSpec, Hash256, IndexedAttestation,
LightClientFinalityUpdate, LightClientOptimisticUpdate, PartialDataColumn,
PartialDataColumnHeader, PayloadAttestationMessage, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof,
SignedExecutionPayloadBid, SignedExecutionPayloadEnvelope, SignedProposerPreferences,
SignedVoluntaryExit, SingleAttestation, Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId,
Attestation, AttestationData, AttestationRef, AttesterSlashing, ColumnIndex, DataColumnSidecar,
DataColumnSubnetId, EthSpec, Hash256, IndexedAttestation, LightClientFinalityUpdate,
LightClientOptimisticUpdate, PartialDataColumn, PartialDataColumnHeader,
PayloadAttestationMessage, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock,
SignedBlsToExecutionChange, SignedContributionAndProof, SignedExecutionPayloadBid,
SignedExecutionPayloadEnvelope, SignedProposerPreferences, SignedVoluntaryExit,
SingleAttestation, Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId,
block::BlockImportSource,
};
@@ -174,6 +171,17 @@ impl<E: EthSpec> FailedAtt<E> {
}
}
/// `MessageAcceptance` doesn't implement clone so we do a manual match here.
/// TODO: remove this once `Clone` is available on this type:
/// https://github.com/libp2p/rust-libp2p/pull/6445
fn clone_message_acceptance(a: &MessageAcceptance) -> MessageAcceptance {
match a {
MessageAcceptance::Accept => MessageAcceptance::Accept,
MessageAcceptance::Reject => MessageAcceptance::Reject,
MessageAcceptance::Ignore => MessageAcceptance::Ignore,
}
}
impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/* Auxiliary functions */
@@ -835,13 +843,109 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}
#[instrument(
name = "lh_process_gossip_partial_data_column",
parent = None,
level = "debug",
skip_all,
fields(block_root = ?column.block_root, index = column.index),
)]
async fn process_gossip_verified_data_column(
self: &Arc<Self>,
peer_id: PeerId,
verified_data_column: GossipVerifiedDataColumn<T>,
// This value is not used presently, but it might come in handy for debugging.
_seen_duration: Duration,
) {
let processing_start_time = Instant::now();
let block_root = verified_data_column.block_root();
let data_column_slot = verified_data_column.slot();
let data_column_index = verified_data_column.index();
// TODO(gloas): implement partial messages
if let DataColumnSidecar::Fulu(col) = verified_data_column.as_data_column()
&& self
.chain
.data_availability_checker
.partial_assembler()
.is_some_and(|a| !a.is_complete(block_root, verified_data_column.index()))
{
metrics::inc_counter_vec(
&metrics::BEACON_USEFUL_FULL_COLUMNS_RECEIVED_TOTAL,
&[&data_column_index.to_string()],
);
match col.to_partial() {
Ok(mut column) => {
let header = column.sidecar.header.take();
if let Some(header) = header {
self.send_network_message(NetworkMessage::PublishPartialColumns {
columns: vec![Arc::new(column)],
header: Arc::new(header),
});
} else {
crit!("Converting from full to partial yielded headerless partial")
};
}
Err(err) => crit!(?err, "Could not convert from full to partial"),
}
}
let result = self
.chain
.process_gossip_data_columns(vec![verified_data_column], || Ok(()))
.await;
register_process_result_metrics(&result, metrics::BlockSource::Gossip, "data_column");
match result {
Ok(availability) => match availability {
AvailabilityProcessingStatus::Imported(block_root) => {
debug!(
%block_root,
"Gossipsub data column processed, imported fully available block"
);
self.chain.recompute_head_at_current_slot().await;
metrics::set_gauge(
&metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION,
processing_start_time.elapsed().as_millis() as i64,
);
// If a block is in the da_checker, sync maybe awaiting for an event when block is finally
// imported. A block can become imported both after processing a block or data column. If
// importing a block results in `Imported`, notify. Do not notify of data column errors.
self.send_sync_message(SyncMessage::GossipBlockProcessResult {
block_root,
imported: true,
});
}
AvailabilityProcessingStatus::MissingComponents(slot, block_root) => {
trace!(
%slot,
%data_column_index,
%block_root,
"Processed data column, waiting for other components"
);
self.check_reconstruction_trigger(slot, &block_root).await;
}
},
Err(BlockError::DuplicateFullyImported(_)) => {
debug!(
?block_root,
data_column_index, "Ignoring gossip column already imported"
);
}
Err(err) => {
debug!(
outcome = ?err,
?block_root,
block_slot = %data_column_slot,
data_column_index,
"Invalid gossip data column"
);
self.gossip_penalize_peer(
peer_id,
PeerAction::MidToleranceError,
"bad_gossip_data_column_ssz",
);
}
}
}
pub async fn process_gossip_partial_data_column_sidecar(
self: &Arc<Self>,
peer_id: PeerId,
@@ -999,7 +1103,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
%index,
"Could not verify partial column for gossip. Rejecting the column sidecar"
);
// Prevent recurring behaviour by penalizing the peer slightly.
self.gossip_penalize_peer(
peer_id,
PeerAction::LowToleranceError,
@@ -1008,9 +1111,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.propagate_partial_validation_failure(peer_id, topic);
}
GossipDataColumnError::PriorKnown { .. } => {
// Data column is available via either the EL or reconstruction.
// Do not penalise the peer.
// Gossip filter should filter any duplicates received after this.
debug!(
%block_root,
%index,
@@ -1025,7 +1125,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
%index,
"Could not verify column sidecar for gossip. Ignoring the partial column sidecar"
);
// Prevent recurring behaviour by penalizing the peer slightly.
self.gossip_penalize_peer(
peer_id,
PeerAction::HighToleranceError,
@@ -1110,355 +1209,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}
#[allow(clippy::too_many_arguments)]
#[instrument(
name = "lh_process_gossip_blob",
parent = None,
level = "debug",
skip_all,
fields(
slot = ?blob_sidecar.slot(),
block_root = ?blob_sidecar.block_root(),
index = blob_sidecar.index),
)]
pub async fn process_gossip_blob(
self: &Arc<Self>,
message_id: MessageId,
peer_id: PeerId,
_peer_client: Client,
blob_index: u64,
blob_sidecar: Arc<BlobSidecar<T::EthSpec>>,
seen_duration: Duration,
) {
let slot = blob_sidecar.slot();
let root = blob_sidecar.block_root();
let index = blob_sidecar.index;
let commitment = blob_sidecar.kzg_commitment;
let delay = get_slot_delay_ms(seen_duration, slot, &self.chain.slot_clock);
// Log metrics to track delay from other nodes on the network.
metrics::set_gauge(&metrics::BEACON_BLOB_DELAY_GOSSIP, delay.as_millis() as i64);
match self
.chain
.verify_blob_sidecar_for_gossip(blob_sidecar.clone(), blob_index)
{
Ok(gossip_verified_blob) => {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOB_VERIFIED_TOTAL);
if delay >= self.chain.spec.get_unaggregated_attestation_due() {
metrics::inc_counter(&metrics::BEACON_BLOB_GOSSIP_ARRIVED_LATE_TOTAL);
debug!(
block_root = ?gossip_verified_blob.block_root(),
proposer_index = gossip_verified_blob.block_proposer_index(),
slot = %gossip_verified_blob.slot(),
delay = ?delay,
commitment = %gossip_verified_blob.kzg_commitment(),
"Gossip blob arrived late"
);
}
debug!(
%slot,
%root,
%index,
commitment = %gossip_verified_blob.kzg_commitment(),
"Successfully verified gossip blob"
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
// Log metrics to keep track of propagation delay times.
if let Some(duration) = SystemTime::now()
.duration_since(UNIX_EPOCH)
.ok()
.and_then(|now| now.checked_sub(seen_duration))
{
metrics::set_gauge(
&metrics::BEACON_BLOB_DELAY_GOSSIP_VERIFICATION,
duration.as_millis() as i64,
);
}
self.process_gossip_verified_blob(peer_id, gossip_verified_blob, seen_duration)
.await
}
Err(err) => {
match err {
GossipBlobError::ParentUnknown { parent_root } => {
debug!(
action = "requesting parent",
block_root = %root,
parent_root = %parent_root,
%commitment,
"Unknown parent hash for blob"
);
self.send_sync_message(SyncMessage::UnknownParentSidecarHeader {
peer_id,
block_root: root,
parent_root,
slot,
});
}
GossipBlobError::PubkeyCacheTimeout | GossipBlobError::BeaconChainError(_) => {
crit!(
error = ?err,
"Internal error when verifying blob sidecar"
)
}
GossipBlobError::ProposalSignatureInvalid
| GossipBlobError::UnknownValidator(_)
| GossipBlobError::ProposerIndexMismatch { .. }
| GossipBlobError::BlobIsNotLaterThanParent { .. }
| GossipBlobError::InvalidSubnet { .. }
| GossipBlobError::InvalidInclusionProof
| GossipBlobError::KzgError(_)
| GossipBlobError::NotFinalizedDescendant { .. } => {
warn!(
error = ?err,
%slot,
%root,
%index,
%commitment,
"Could not verify blob sidecar for gossip. Rejecting the blob sidecar"
);
// Prevent recurring behaviour by penalizing the peer.
self.gossip_penalize_peer(
peer_id,
PeerAction::LowToleranceError,
"gossip_blob_low",
);
self.propagate_validation_result(
message_id,
peer_id,
MessageAcceptance::Reject,
);
}
GossipBlobError::RepeatBlob { .. } => {
// We may have received the blob from the EL. Do not penalise the peer.
// Gossip filter should filter any duplicates received after this.
debug!(
%slot,
%root,
%index,
"Received already available blob sidecar. Ignoring the blob sidecar"
)
}
GossipBlobError::FutureSlot { .. } => {
debug!(
error = ?err,
%slot,
%root,
%index,
%commitment,
"Could not verify blob sidecar for gossip. Ignoring the blob sidecar"
);
// Prevent recurring behaviour by penalizing the peer slightly.
self.gossip_penalize_peer(
peer_id,
PeerAction::HighToleranceError,
"gossip_blob_high",
);
self.propagate_validation_result(
message_id,
peer_id,
MessageAcceptance::Ignore,
);
}
GossipBlobError::PastFinalizedSlot { .. } => {
debug!(
error = ?err,
%slot,
%root,
%index,
%commitment,
"Could not verify blob sidecar for gossip. Ignoring the blob sidecar"
);
// Prevent recurring behaviour by penalizing the peer. A low-tolerance
// error is fine because there's no reason for peers to be propagating old
// blobs on gossip, even if their view of finality is lagging.
self.gossip_penalize_peer(
peer_id,
PeerAction::LowToleranceError,
"gossip_blob_low",
);
self.propagate_validation_result(
message_id,
peer_id,
MessageAcceptance::Ignore,
);
}
}
}
}
}
async fn process_gossip_verified_blob(
self: &Arc<Self>,
peer_id: PeerId,
verified_blob: GossipVerifiedBlob<T>,
_seen_duration: Duration,
) {
let processing_start_time = Instant::now();
let block_root = verified_blob.block_root();
let blob_slot = verified_blob.slot();
let blob_index = verified_blob.id().index;
let result = self.chain.process_gossip_blob(verified_blob).await;
register_process_result_metrics(&result, metrics::BlockSource::Gossip, "blob");
match &result {
Ok(AvailabilityProcessingStatus::Imported(block_root)) => {
debug!(
%block_root,
"Gossipsub blob processed - imported fully available block"
);
self.chain.recompute_head_at_current_slot().await;
metrics::set_gauge(
&metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION,
processing_start_time.elapsed().as_millis() as i64,
);
}
Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => {
debug!(
%slot,
%blob_index,
%block_root,
"Processed gossip blob - waiting for other components"
);
}
Err(BlockError::DuplicateFullyImported(_)) => {
debug!(
?block_root,
blob_index, "Ignoring gossip blob already imported"
);
}
Err(err) => {
debug!(
outcome = ?err,
?block_root,
%blob_slot,
blob_index,
"Invalid gossip blob"
);
self.gossip_penalize_peer(
peer_id,
PeerAction::MidToleranceError,
"bad_gossip_blob_ssz",
);
}
}
// If a block is in the da_checker, sync maybe awaiting for an event when block is finally
// imported. A block can become imported both after processing a block or blob. If a
// importing a block results in `Imported`, notify. Do not notify of blob errors.
if matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))) {
self.send_sync_message(SyncMessage::GossipBlockProcessResult {
block_root,
imported: true,
});
}
}
/// Process a gossip-verified full data column (not partial).
/// Partials are handled by process_gossip_verified_partial_data_column.
async fn process_gossip_verified_data_column(
self: &Arc<Self>,
peer_id: PeerId,
verified_data_column: GossipVerifiedDataColumn<T>,
// This value is not used presently, but it might come in handy for debugging.
_seen_duration: Duration,
) {
let processing_start_time = Instant::now();
let block_root = verified_data_column.block_root();
let data_column_slot = verified_data_column.slot();
let data_column_index = verified_data_column.index();
// TODO(gloas): implement partial messages
if let DataColumnSidecar::Fulu(col) = verified_data_column.as_data_column()
&& self
.chain
.data_availability_checker
.partial_assembler()
.is_some_and(|a| !a.is_complete(block_root, verified_data_column.index()))
{
metrics::inc_counter_vec(
&metrics::BEACON_USEFUL_FULL_COLUMNS_RECEIVED_TOTAL,
&[&data_column_index.to_string()],
);
let mut column = col.to_partial();
let header = column.sidecar.header.take();
if let Some(header) = header {
self.send_network_message(NetworkMessage::PublishPartialColumns {
columns: vec![Arc::new(column)],
header: Arc::new(header),
});
} else {
crit!("Converting from full to partial yielded headerless partial")
};
}
let result = self
.chain
.process_gossip_data_columns(vec![verified_data_column], || Ok(()))
.await;
register_process_result_metrics(&result, metrics::BlockSource::Gossip, "data_column");
match result {
Ok(availability) => match availability {
AvailabilityProcessingStatus::Imported(block_root) => {
debug!(
%block_root,
"Gossipsub data column processed, imported fully available block"
);
self.chain.recompute_head_at_current_slot().await;
metrics::set_gauge(
&metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION,
processing_start_time.elapsed().as_millis() as i64,
);
// If a block is in the da_checker, sync maybe awaiting for an event when block is finally
// imported. A block can become imported both after processing a block or data column. If
// importing a block results in `Imported`, notify. Do not notify of data column errors.
self.send_sync_message(SyncMessage::GossipBlockProcessResult {
block_root,
imported: true,
});
}
AvailabilityProcessingStatus::MissingComponents(slot, block_root) => {
trace!(
%slot,
%data_column_index,
%block_root,
"Processed data column, waiting for other components"
);
self.check_reconstruction_trigger(slot, &block_root).await;
}
},
Err(BlockError::DuplicateFullyImported(_)) => {
debug!(
?block_root,
data_column_index, "Ignoring gossip column already imported"
);
}
Err(err) => {
debug!(
outcome = ?err,
?block_root,
block_slot = %data_column_slot,
data_column_index,
"Invalid gossip data column"
);
self.gossip_penalize_peer(
peer_id,
PeerAction::MidToleranceError,
"bad_gossip_data_column_ssz",
);
}
}
}
/// Process a gossip-verified partial data column by merging it in the assembler
async fn process_gossip_verified_partial_data_column(
self: &Arc<Self>,
@@ -1874,9 +1624,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
crit!(error = %e, "Internal block gossip validation error. Availability check during gossip validation");
return None;
}
// BlobNotRequired is unreachable. Only constructed in `process_gossip_blob`
Err(e @ BlockError::InternalError(_))
| Err(e @ BlockError::BlobNotRequired(_))
| Err(e @ BlockError::EnvelopeBlockRootUnknown(_))
| Err(e @ BlockError::OptimisticSyncNotSupported { .. }) => {
error!(error = %e, "Internal block gossip validation error");
@@ -2194,14 +1942,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
message_id: MessageId,
peer_id: PeerId,
proposer_slashing: ProposerSlashing,
) {
) -> MessageAcceptance {
let validator_index = proposer_slashing.signed_header_1.message.proposer_index;
let slashing = match self
let (validation_result, verified_slashing_opt) = match self
.chain
.verify_proposer_slashing_for_gossip(proposer_slashing)
{
Ok(ObservationOutcome::New(slashing)) => slashing,
Ok(ObservationOutcome::New(slashing)) => (MessageAcceptance::Accept, Some(slashing)),
Ok(ObservationOutcome::AlreadyKnown) => {
debug!(
reason = "Already seen a proposer slashing for that validator",
@@ -2209,44 +1957,54 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
peer = %peer_id,
"Dropping proposer slashing"
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return;
(MessageAcceptance::Ignore, None)
}
Err(e) => {
// This is likely a fault with the beacon chain and not necessarily a
// malicious message from the peer.
debug!(
validator_index,
%peer_id,
error = ?e,
"Dropping invalid proposer slashing"
"Dropping proposer slashing due to an error"
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
// Penalize peer slightly for invalids.
self.gossip_penalize_peer(
peer_id,
PeerAction::HighToleranceError,
"invalid_gossip_proposer_slashing",
);
return;
if matches!(e, BeaconChainError::ProposerSlashingValidationError(_)) {
// Penalize peer slightly for invalids.
self.gossip_penalize_peer(
peer_id,
PeerAction::HighToleranceError,
"invalid_gossip_proposer_slashing",
);
(MessageAcceptance::Reject, None)
} else {
// This is likely a fault with the beacon chain and not necessarily a
// malicious message from the peer.
(MessageAcceptance::Ignore, None)
}
}
};
metrics::inc_counter(&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_VERIFIED_TOTAL);
self.propagate_validation_result(
message_id,
peer_id,
clone_message_acceptance(&validation_result),
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
if let Some(slashing) = verified_slashing_opt {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_VERIFIED_TOTAL);
// Register the slashing with any monitored validators.
self.chain
.validator_monitor
.read()
.register_gossip_proposer_slashing(slashing.as_inner());
// Register the slashing with any monitored validators.
self.chain
.validator_monitor
.read()
.register_gossip_proposer_slashing(slashing.as_inner());
self.chain.import_proposer_slashing(slashing);
debug!("Successfully imported proposer slashing");
self.chain.import_proposer_slashing(slashing);
debug!("Successfully imported proposer slashing");
metrics::inc_counter(&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_IMPORTED_TOTAL);
metrics::inc_counter(&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_IMPORTED_TOTAL);
}
validation_result
}
pub fn process_gossip_attester_slashing(
@@ -2254,51 +2012,64 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
message_id: MessageId,
peer_id: PeerId,
attester_slashing: AttesterSlashing<T::EthSpec>,
) {
let slashing = match self
) -> MessageAcceptance {
let (validation_result, verified_slashing_opt) = match self
.chain
.verify_attester_slashing_for_gossip(attester_slashing)
{
Ok(ObservationOutcome::New(slashing)) => slashing,
Ok(ObservationOutcome::New(slashing)) => (MessageAcceptance::Accept, Some(slashing)),
Ok(ObservationOutcome::AlreadyKnown) => {
debug!(
reason = "Slashings already known for all slashed validators",
peer = %peer_id,
"Dropping attester slashing"
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return;
(MessageAcceptance::Ignore, None)
}
Err(e) => {
debug!(
%peer_id,
error = ?e,
"Dropping invalid attester slashing"
"Dropping attester slashing due to an error"
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
// Penalize peer slightly for invalids.
self.gossip_penalize_peer(
peer_id,
PeerAction::HighToleranceError,
"invalid_gossip_attester_slashing",
);
return;
if matches!(e, BeaconChainError::AttesterSlashingValidationError(_)) {
// Penalize peer slightly for invalids.
self.gossip_penalize_peer(
peer_id,
PeerAction::HighToleranceError,
"invalid_gossip_attester_slashing",
);
(MessageAcceptance::Reject, None)
} else {
// This is likely a fault with the beacon chain and not necessarily a
// malicious message from the peer.
(MessageAcceptance::Ignore, None)
}
}
};
metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_VERIFIED_TOTAL);
self.propagate_validation_result(
message_id,
peer_id,
clone_message_acceptance(&validation_result),
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
if let Some(slashing) = verified_slashing_opt {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_VERIFIED_TOTAL);
// Register the slashing with any monitored validators.
self.chain
.validator_monitor
.read()
.register_gossip_attester_slashing(slashing.as_inner().to_ref());
// Register the slashing with any monitored validators.
self.chain
.validator_monitor
.read()
.register_gossip_attester_slashing(slashing.as_inner().to_ref());
self.chain.import_attester_slashing(slashing);
debug!("Successfully imported attester slashing");
metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_IMPORTED_TOTAL);
self.chain.import_attester_slashing(slashing);
debug!("Successfully imported attester slashing");
metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_IMPORTED_TOTAL);
}
validation_result
}
pub fn process_gossip_bls_to_execution_change(
@@ -4058,7 +3829,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
PayloadBidError::BadSignature
| PayloadBidError::InvalidBuilder { .. }
| PayloadBidError::InvalidFeeRecipient
| PayloadBidError::InvalidGasLimit
| PayloadBidError::ExecutionPaymentNonZero { .. }
| PayloadBidError::InvalidBlobKzgCommitments { .. },
) => {
@@ -4076,6 +3846,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
| PayloadBidError::ParentBlockRootUnknown { .. }
| PayloadBidError::ParentBlockRootNotCanonical { .. }
| PayloadBidError::BuilderCantCoverBid { .. }
| PayloadBidError::InvalidGasLimit
| PayloadBidError::BeaconStateError(_)
| PayloadBidError::InternalError(_)
| PayloadBidError::InvalidBidSlot { .. }
@@ -4258,8 +4029,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"payload_attn_invalid_sig",
);
}
PayloadAttestationError::BeaconChainError(_)
| PayloadAttestationError::BeaconStateError(_) => {
PayloadAttestationError::BeaconChainError(_) => {
debug!(
%peer_id,
%message_slot,

View File

@@ -1,12 +1,12 @@
use crate::sync::manager::BlockProcessType;
use crate::{service::NetworkMessage, sync::manager::SyncMessage};
use beacon_chain::blob_verification::{GossipBlobError, observe_gossip_blob};
use beacon_chain::block_verification_types::LookupBlock;
use beacon_chain::block_verification_types::RangeSyncBlock;
use beacon_chain::data_column_verification::{GossipDataColumnError, observe_gossip_data_column};
use beacon_chain::fetch_blobs::{
EngineGetBlobsOutput, FetchEngineBlobError, fetch_and_process_engine_blobs,
use beacon_chain::data_column_verification::{
GossipDataColumnError, KzgVerifiedCustodyDataColumn, observe_gossip_data_column,
};
use beacon_chain::fetch_blobs::{FetchEngineBlobError, fetch_and_process_engine_blobs};
use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType};
use beacon_chain::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError};
use beacon_processor::{
BeaconProcessorSend, DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work,
@@ -20,7 +20,7 @@ use lighthouse_network::rpc::methods::{
};
use lighthouse_network::service::api_types::CustodyBackfillBatchId;
use lighthouse_network::{
Client, GossipTopic, MessageId, NetworkGlobals, PeerId, PubsubMessage,
Client, GossipTopic, MessageId, NetworkConfig, NetworkGlobals, PeerId, PubsubMessage,
rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage},
};
use rand::prelude::SliceRandom;
@@ -31,6 +31,10 @@ use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, error::TrySendError};
use tracing::{debug, error, instrument, trace, warn};
use types::*;
use {
beacon_chain::builder::Witness, beacon_processor::BeaconProcessorChannels,
slot_clock::ManualSlotClock, store::MemoryStore, tokio::sync::mpsc::UnboundedSender,
};
pub use sync_methods::ChainSegmentProcessId;
use types::data::FixedBlobSidecarList;
@@ -65,9 +69,6 @@ pub struct NetworkBeaconProcessor<T: BeaconChainTypes> {
pub executor: TaskExecutor,
}
// Publish blobs in batches of exponentially increasing size.
const BLOB_PUBLICATION_EXP_FACTOR: usize = 2;
impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
fn try_send(&self, event: BeaconWorkEvent<T::EthSpec>) -> Result<(), Error<T::EthSpec>> {
self.beacon_processor_send.try_send(event)
@@ -193,36 +194,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
})
}
/// Create a new `Work` event for some blob sidecar.
pub fn send_gossip_blob_sidecar(
self: &Arc<Self>,
message_id: MessageId,
peer_id: PeerId,
peer_client: Client,
blob_index: u64,
blob_sidecar: Arc<BlobSidecar<T::EthSpec>>,
seen_timestamp: Duration,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = async move {
processor
.process_gossip_blob(
message_id,
peer_id,
peer_client,
blob_index,
blob_sidecar,
seen_timestamp,
)
.await
};
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::GossipBlobSidecar(Box::pin(process_fn)),
})
}
/// Create a new `Work` event for some data column sidecar.
pub fn send_gossip_data_column_sidecar(
self: &Arc<Self>,
@@ -353,7 +324,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = move || {
processor.process_gossip_proposer_slashing(message_id, peer_id, *proposer_slashing)
processor.process_gossip_proposer_slashing(message_id, peer_id, *proposer_slashing);
};
self.try_send(BeaconWorkEvent {
@@ -420,7 +391,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = move || {
processor.process_gossip_attester_slashing(message_id, peer_id, *attester_slashing)
processor.process_gossip_attester_slashing(message_id, peer_id, *attester_slashing);
};
self.try_send(BeaconWorkEvent {
@@ -965,22 +936,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let epoch = header.slot().epoch(T::EthSpec::slots_per_epoch());
let custody_columns = self.chain.sampling_columns_for_epoch(epoch);
let self_cloned = self.clone();
let publish_fn = move |blobs_or_data_column| {
let publish_fn = move |columns: Vec<KzgVerifiedCustodyDataColumn<T::EthSpec>>| {
if publish_blobs {
match blobs_or_data_column {
EngineGetBlobsOutput::Blobs(blobs) => {
self_cloned.publish_blobs_gradually(
blobs.into_iter().map(|b| b.to_blob()).collect(),
block_root,
);
}
EngineGetBlobsOutput::CustodyColumns(columns) => {
self_cloned.publish_data_columns_gradually(
columns.into_iter().map(|c| c.clone_arc()).collect(),
block_root,
);
}
};
self_cloned.publish_data_columns_gradually(
columns.into_iter().map(|c| c.clone_arc()).collect(),
block_root,
);
}
};
@@ -1098,84 +1059,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}
/// This function gradually publishes blobs to the network in randomised batches.
///
/// This is an optimisation to reduce outbound bandwidth and ensures each blob is published
/// by some nodes on the network as soon as possible. Our hope is that some blobs arrive from
/// other nodes in the meantime, obviating the need for us to publish them. If no other
/// publisher exists for a blob, it will eventually get published here.
fn publish_blobs_gradually(
self: &Arc<Self>,
mut blobs: Vec<Arc<BlobSidecar<T::EthSpec>>>,
block_root: Hash256,
) {
let self_clone = self.clone();
self.executor.spawn(
async move {
let chain = self_clone.chain.clone();
let publish_fn = |blobs: Vec<Arc<BlobSidecar<T::EthSpec>>>| {
self_clone.send_network_message(NetworkMessage::Publish {
messages: blobs
.into_iter()
.map(|blob| PubsubMessage::BlobSidecar(Box::new((blob.index, blob))))
.collect(),
});
};
// Permute the blobs and split them into batches.
// The hope is that we won't need to publish some blobs because we will receive them
// on gossip from other nodes.
blobs.shuffle(&mut rand::rng());
let blob_publication_batch_interval = chain.config.blob_publication_batch_interval;
let mut publish_count = 0usize;
let blob_count = blobs.len();
let mut blobs_iter = blobs.into_iter().peekable();
let mut batch_size = 1usize;
while blobs_iter.peek().is_some() {
let batch = blobs_iter.by_ref().take(batch_size);
let publishable = batch
.filter_map(|blob| match observe_gossip_blob(&blob, &chain) {
Ok(()) => Some(blob),
Err(GossipBlobError::RepeatBlob { .. }) => None,
Err(e) => {
warn!(
error = ?e,
"Previously verified blob is invalid"
);
None
}
})
.collect::<Vec<_>>();
if !publishable.is_empty() {
debug!(
publish_count = publishable.len(),
?block_root,
"Publishing blob batch"
);
publish_count += publishable.len();
publish_fn(publishable);
}
tokio::time::sleep(blob_publication_batch_interval).await;
batch_size *= BLOB_PUBLICATION_EXP_FACTOR;
}
debug!(
batch_interval = blob_publication_batch_interval.as_millis(),
blob_count,
publish_count,
?block_root,
"Batch blob publication complete"
)
},
"gradual_blob_publication",
);
}
/// This function gradually publishes data columns to the network in randomised batches.
///
/// This is an optimisation to reduce outbound bandwidth and ensures each column is published
@@ -1260,17 +1143,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}
#[cfg(test)]
use {
beacon_chain::builder::Witness, beacon_processor::BeaconProcessorChannels,
slot_clock::ManualSlotClock, store::MemoryStore, tokio::sync::mpsc::UnboundedSender,
};
pub(crate) type TestBeaconChainType<E> = Witness<ManualSlotClock, E, MemoryStore, MemoryStore>;
#[cfg(test)]
pub(crate) type TestBeaconChainType<E> =
Witness<ManualSlotClock, E, MemoryStore<E>, MemoryStore<E>>;
#[cfg(test)]
impl<E: EthSpec> NetworkBeaconProcessor<TestBeaconChainType<E>> {
// Instantiates a mostly non-functional version of `Self` and returns the
// event receiver that would normally go to the beacon processor. This is
@@ -1302,4 +1176,22 @@ impl<E: EthSpec> NetworkBeaconProcessor<TestBeaconChainType<E>> {
(network_beacon_processor, beacon_processor_rx)
}
/// Constructs a mostly non-functional `NetworkBeaconProcessor` from a test harness,
/// suitable for directly calling gossip processing methods in tests.
pub fn null_from_harness(harness: &BeaconChainHarness<EphemeralHarnessType<E>>) -> Self {
let network_globals = NetworkGlobals::new_test_globals(
vec![],
Arc::new(NetworkConfig::default()),
harness.spec.clone(),
);
Self::null_for_testing(
Arc::new(network_globals),
mpsc::unbounded_channel().0,
harness.chain.clone(),
harness.runtime.task_executor.clone(),
)
.0
}
}

View File

@@ -409,22 +409,6 @@ impl TestRig {
.unwrap();
}
pub fn enqueue_gossip_blob(&self, blob_index: usize) {
if let Some(blobs) = self.next_blobs.as_ref() {
let blob = blobs.get(blob_index).unwrap();
self.network_beacon_processor
.send_gossip_blob_sidecar(
junk_message_id(),
junk_peer_id(),
Client::default(),
blob.index,
blob.clone(),
Duration::from_secs(0),
)
.unwrap();
}
}
pub fn enqueue_gossip_data_columns(&self, col_index: usize) {
if let Some(data_columns) = self.next_data_columns.as_ref() {
let data_column = data_columns.get(col_index).unwrap();
@@ -1101,13 +1085,6 @@ async fn import_gossip_block_acceptably_early() {
rig.assert_event_journal_completes(&[WorkType::GossipBlock])
.await;
let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
for i in 0..num_blobs {
rig.enqueue_gossip_blob(i);
rig.assert_event_journal_completes(&[WorkType::GossipBlobSidecar])
.await;
}
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
@@ -1242,13 +1219,6 @@ async fn import_gossip_block_at_current_slot() {
rig.assert_event_journal_completes(&[WorkType::GossipBlock])
.await;
let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
for i in 0..num_blobs {
rig.enqueue_gossip_blob(i);
rig.assert_event_journal_completes(&[WorkType::GossipBlobSidecar])
.await;
}
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
@@ -1315,10 +1285,6 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod
BlockImportMethod::Gossip => {
rig.enqueue_gossip_block();
events.push(WorkType::GossipBlock);
for i in 0..num_blobs {
rig.enqueue_gossip_blob(i);
events.push(WorkType::GossipBlobSidecar);
}
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
events.push(WorkType::GossipDataColumnSidecar);
@@ -1401,10 +1367,6 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod
BlockImportMethod::Gossip => {
rig.enqueue_gossip_block();
events.push(WorkType::GossipBlock);
for i in 0..num_blobs {
rig.enqueue_gossip_blob(i);
events.push(WorkType::GossipBlobSidecar);
}
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
events.push(WorkType::GossipDataColumnSidecar)

View File

@@ -6,7 +6,7 @@ use types::{EthSpec, Hash256};
/// 32-byte key for accessing the `DhtEnrs`. All zero because `DhtEnrs` has its own column.
pub const DHT_DB_KEY: Hash256 = Hash256::ZERO;
pub fn load_dht<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
pub fn load_dht<E: EthSpec, Hot: ItemStore, Cold: ItemStore>(
store: Arc<HotColdDB<E, Hot, Cold>>,
) -> Vec<Enr> {
// Load DHT from store
@@ -20,7 +20,7 @@ pub fn load_dht<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
}
/// Attempt to persist the ENR's in the DHT to `self.store`.
pub fn persist_dht<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
pub fn persist_dht<E: EthSpec, Hot: ItemStore, Cold: ItemStore>(
store: Arc<HotColdDB<E, Hot, Cold>>,
enrs: Vec<Enr>,
) -> Result<(), store::Error> {
@@ -28,7 +28,7 @@ pub fn persist_dht<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
}
/// Attempts to clear any DHT entries.
pub fn clear_dht<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
pub fn clear_dht<E: EthSpec, Hot: ItemStore, Cold: ItemStore>(
store: Arc<HotColdDB<E, Hot, Cold>>,
) -> Result<(), store::Error> {
store.hot_db.delete::<PersistedDht>(&DHT_DB_KEY)
@@ -75,11 +75,8 @@ mod tests {
use types::{ChainSpec, MinimalEthSpec};
#[test]
fn test_persisted_dht() {
let store: HotColdDB<
MinimalEthSpec,
MemoryStore<MinimalEthSpec>,
MemoryStore<MinimalEthSpec>,
> = HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal().into()).unwrap();
let store: HotColdDB<MinimalEthSpec, MemoryStore, MemoryStore> =
HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal().into()).unwrap();
let enrs = vec![Enr::from_str("enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjzCBOonrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQPKY0yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8").unwrap()];
store
.put_item(&DHT_DB_KEY, &PersistedDht { enrs: enrs.clone() })

View File

@@ -412,19 +412,6 @@ impl<T: BeaconChainTypes> Router<T> {
seen_timestamp,
),
),
PubsubMessage::BlobSidecar(data) => {
let (blob_index, blob_sidecar) = *data;
self.handle_beacon_processor_send_result(
self.network_beacon_processor.send_gossip_blob_sidecar(
message_id,
peer_id,
self.network_globals.client(&peer_id),
blob_index,
blob_sidecar,
seen_timestamp,
),
)
}
PubsubMessage::DataColumnSidecar(data) => {
let (subnet_id, column_sidecar) = *data;
self.handle_beacon_processor_send_result(
@@ -833,24 +820,13 @@ impl<T: BeaconChainTypes> Router<T> {
envelope: Option<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
) {
let sync_request_id = match app_request_id {
AppRequestId::Sync(sync_id) => match sync_id {
id @ SyncRequestId::SinglePayloadEnvelope { .. } => id,
other => {
crit!(request = ?other, "PayloadEnvelopesByRoot response on incorrect request");
return;
}
},
AppRequestId::Router => {
crit!(%peer_id, "All PayloadEnvelopesByRoot requests belong to sync");
AppRequestId::Sync(id @ SyncRequestId::SinglePayloadEnvelope { .. }) => id,
other => {
crit!(request = ?other, %peer_id, "PayloadEnvelopesByRoot response on incorrect request");
return;
}
AppRequestId::Internal => unreachable!("Handled internally"),
};
trace!(
%peer_id,
"Received PayloadEnvelopesByRoot Response"
);
self.send_to_sync(SyncMessage::RpcPayloadEnvelope {
sync_request_id,
peer_id,

View File

@@ -25,12 +25,7 @@ const SLOT_DURATION_MILLIS: u64 = 400;
const TEST_LOG_LEVEL: Option<&str> = None;
type TestBeaconChainType = Witness<
SystemTimeSlotClock,
MainnetEthSpec,
MemoryStore<MainnetEthSpec>,
MemoryStore<MainnetEthSpec>,
>;
type TestBeaconChainType = Witness<SystemTimeSlotClock, MainnetEthSpec, MemoryStore, MemoryStore>;
pub struct TestBeaconChain {
chain: Arc<BeaconChain<TestBeaconChainType>>,

View File

@@ -203,9 +203,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
block_root,
Some(block_component),
Some(awaiting_parent),
// On a `UnknownParentBlock` or `UnknownParentBlob` event the peer is not required
// to have the rest of the block components (refer to decoupled blob gossip). Create
// the lookup with zero peers to house the block components.
// On a `UnknownParentBlock` or `UnknownParentSidecarHeader` event the peer is not
// required to have the rest of the block components (refer to decoupled blob
// gossip). Create the lookup with zero peers to house the block components.
&[],
&PeerType::PreGloas,
new_lookup_trigger,

View File

@@ -593,7 +593,7 @@ impl<T: BeaconChainTypes> CustodyBackFillSync<T> {
Err(err) => {
debug!(batch_epoch = %batch_id, error = ?err, "Batch download failed");
// If there are any coupling errors, penalize the appropriate peers
// If there are any coupling errors, penalize the appropriate peers.
if let RpcResponseError::BlockComponentCouplingError(coupling_error) = err
&& let CouplingError::DataColumnPeerFailure {
error,
@@ -601,15 +601,19 @@ impl<T: BeaconChainTypes> CustodyBackFillSync<T> {
exceeded_retries: _,
} = coupling_error
{
let mut failed_peers = HashSet::new();
for (column_index, faulty_peer) in faulty_peers {
debug!(
?error,
?column_index,
?faulty_peer,
"Custody backfill sync penalizing peer"
"Custody backfill sync: peer failed to serve column"
);
failed_peers.insert(faulty_peer);
}
for peer in failed_peers {
network.report_peer(
faulty_peer,
peer,
PeerAction::LowToleranceError,
"Peer failed to serve column",
);

View File

@@ -198,8 +198,9 @@ pub enum BlockProcessType {
impl BlockProcessType {
pub fn id(&self) -> Id {
match self {
BlockProcessType::SingleBlock { id } | BlockProcessType::SingleBlob { id } => *id,
BlockProcessType::SingleCustodyColumn(id)
BlockProcessType::SingleBlock { id }
| BlockProcessType::SingleBlob { id }
| BlockProcessType::SingleCustodyColumn(id)
| BlockProcessType::SinglePayloadEnvelope(id) => *id,
}
}

View File

@@ -305,7 +305,12 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
// must have its columns in custody. In that case, set `true = enforce max_requests`
// and downscore if data_columns_by_root does not return the expected custody
// columns. For the rest of peers, don't downscore if columns are missing.
lookup_peers.contains(&peer_id),
//
// Post-Gloas, blocks and payload envelopes are decoupled. A peer may
// have the block but not yet imported the envelope and data columns.
// Don't enforce max_responses in this case.
lookup_peers.contains(&peer_id)
&& !cx.fork_context.current_fork_name().gloas_enabled(),
)
.map_err(Error::SendFailed)?;

View File

@@ -33,6 +33,7 @@ impl<E: EthSpec> ActiveRequestItems for BlobsByRangeRequestItems<E> {
if blob.index >= self.max_blobs_per_block {
return Err(LookupVerifyError::UnrequestedIndex(blob.index));
}
if !blob.verify_blob_sidecar_inclusion_proof() {
return Err(LookupVerifyError::InvalidInclusionProof);
}

View File

@@ -50,9 +50,11 @@ impl<E: EthSpec> ActiveRequestItems for BlobsByRootRequestItems<E> {
if self.request.block_root != block_root {
return Err(LookupVerifyError::UnrequestedBlockRoot(block_root));
}
if !blob.verify_blob_sidecar_inclusion_proof() {
return Err(LookupVerifyError::InvalidInclusionProof);
}
if !self.request.indices.contains(&blob.index) {
return Err(LookupVerifyError::UnrequestedIndex(blob.index));
}

View File

@@ -1278,17 +1278,6 @@ impl TestRig {
self.trigger_unknown_parent_block(peer_id, last_block);
}
fn trigger_with_last_unknown_blob_parent(&mut self) {
let peer_id = self.new_connected_supernode_peer();
let blobs = self
.get_last_block()
.block_data()
.blobs()
.expect("no blobs");
let blob = blobs.first().expect("empty blobs");
self.trigger_unknown_parent_blob(peer_id, blob.clone());
}
fn trigger_with_last_unknown_data_column_parent(&mut self) {
let peer_id = self.new_connected_supernode_peer();
let columns = self
@@ -1297,7 +1286,7 @@ impl TestRig {
.data_columns()
.expect("No data columns");
let column = columns.first().expect("empty columns");
self.trigger_unknown_parent_column(peer_id, column.clone());
self.trigger_unknown_parent_data_column(peer_id, column.clone());
}
// Post-test assertions
@@ -1501,6 +1490,10 @@ impl TestRig {
genesis_fork().deneb_enabled().then(Self::default)
}
fn new_after_fulu() -> Option<Self> {
genesis_fork().fulu_enabled().then(Self::default)
}
fn new_after_deneb_before_fulu() -> Option<Self> {
let fork = genesis_fork();
if fork.deneb_enabled() && !fork.fulu_enabled() {
@@ -1536,27 +1529,18 @@ impl TestRig {
self.send_sync_message(SyncMessage::UnknownParentBlock(peer_id, block, block_root))
}
fn trigger_unknown_parent_blob(&mut self, peer_id: PeerId, blob: Arc<BlobSidecar<E>>) {
self.send_sync_message(SyncMessage::UnknownParentSidecarHeader {
peer_id,
block_root: blob.block_root(),
parent_root: blob.block_parent_root(),
slot: blob.slot(),
});
}
fn trigger_unknown_parent_column(
fn trigger_unknown_parent_data_column(
&mut self,
peer_id: PeerId,
column: Arc<DataColumnSidecar<E>>,
data_column: Arc<DataColumnSidecar<E>>,
) {
let DataColumnSidecar::Fulu(col) = column.as_ref() else {
let DataColumnSidecar::Fulu(col) = data_column.as_ref() else {
// Gloas data columns don't carry a parent block root, so the
// `UnknownParentSidecarHeader` trigger doesn't apply post-Gloas. The production
// path drops these with a `warn!` (see `manager.rs` handler). Mirror that here
// so Gloas test paths can call the same helper as Fulu without panicking.
self.log(&format!(
"trigger_unknown_parent_column noop (post-Gloas column has no parent root) peer {peer_id:?}"
"trigger_unknown_parent_data_column noop (post-Gloas column has no parent root) peer {peer_id:?}"
));
return;
};
@@ -1850,9 +1834,9 @@ impl TestRig {
)
.unwrap()
{
Availability::Available(_) => panic!("blob removed from da_checker, available"),
Availability::Available(_) => panic!("column removed from da_checker, available"),
Availability::MissingComponents(block_root) => {
self.log(&format!("inserted blob to da_checker {block_root:?}"))
self.log(&format!("inserted column to da_checker {block_root:?}"))
}
};
}
@@ -2037,9 +2021,9 @@ async fn happy_path_unknown_block_parent(depth: usize) {
}
}
/// Assert that sync completes from a GossipUnknownParentBlob / UnknownDataColumnParent
/// Assert that sync completes from an UnknownDataColumnParent
async fn happy_path_unknown_data_parent(depth: usize) {
let Some(mut r) = TestRig::new_after_deneb() else {
let Some(mut r) = TestRig::new_after_fulu() else {
return;
};
// Post-Gloas data columns don't carry a parent block root, so the unknown-parent-data
@@ -2048,29 +2032,23 @@ async fn happy_path_unknown_data_parent(depth: usize) {
return;
}
r.build_chain(depth).await;
if r.is_after_fulu() {
r.trigger_with_last_unknown_data_column_parent();
} else if r.is_after_deneb() {
r.trigger_with_last_unknown_blob_parent();
}
r.trigger_with_last_unknown_data_column_parent();
r.simulate(SimulateConfig::happy_path()).await;
r.assert_successful_lookup_sync_parent_trigger();
}
/// Assert that multiple trigger types don't create extra lookups
async fn happy_path_multiple_triggers(depth: usize) {
let mut r = TestRig::default();
let Some(mut r) = TestRig::new_after_fulu() else {
return;
};
// + 1, because the unknown parent trigger needs two new blocks
r.build_chain(depth + 1).await;
r.trigger_with_last_block();
r.trigger_with_last_block();
r.trigger_with_last_unknown_block_parent();
r.trigger_with_last_unknown_block_parent();
if r.is_after_fulu() {
r.trigger_with_last_unknown_data_column_parent();
} else if r.is_after_deneb() {
r.trigger_with_last_unknown_blob_parent();
}
r.trigger_with_last_unknown_data_column_parent();
r.simulate(SimulateConfig::happy_path()).await;
assert_eq!(r.created_lookups(), depth + 1, "Don't create extra lookups");
r.assert_successful_lookup_sync();
@@ -2209,18 +2187,14 @@ async fn too_many_processing_failures(depth: usize) {
#[tokio::test]
/// Assert that multiple trigger types don't create extra lookups
async fn unknown_parent_does_not_add_peers_to_itself() {
let Some(mut r) = TestRig::new_after_deneb() else {
let Some(mut r) = TestRig::new_after_fulu() else {
return;
};
// 2, because the unknown parent trigger needs two new blocks
r.build_chain(2).await;
r.trigger_with_last_unknown_block_parent();
r.trigger_with_last_unknown_block_parent();
if r.is_after_fulu() {
r.trigger_with_last_unknown_data_column_parent();
} else if r.is_after_deneb() {
r.trigger_with_last_unknown_blob_parent();
}
r.trigger_with_last_unknown_data_column_parent();
r.simulate(SimulateConfig::happy_path()).await;
r.assert_peers_at_lookup_of_slot(2, 0);
// Post-Gloas the data-column trigger is a no-op (Gloas columns don't carry a parent

View File

@@ -26,7 +26,7 @@ use types::{ForkName, Hash256, MinimalEthSpec as E, SignedExecutionPayloadEnvelo
mod lookups;
mod range;
type T = Witness<ManualSlotClock, E, MemoryStore<E>, MemoryStore<E>>;
type T = Witness<ManualSlotClock, E, MemoryStore, MemoryStore>;
/// This test utility enables integration testing of Lighthouse sync components.
///