Merge branch 'unstable' into gloas-fix-proposer-pref-gossip-verification

This commit is contained in:
Eitan Seri-Levi
2026-06-22 05:39:31 -07:00
committed by GitHub
130 changed files with 5190 additions and 1558 deletions

View File

@@ -22,6 +22,13 @@ pub(crate) enum BlockSource {
Rpc,
}
/// The path through which a payload envelope was imported.
#[derive(Debug, Clone, Copy, AsRefStr)]
pub(crate) enum EnvelopeSource {
Gossip,
Rpc,
}
pub static BEACON_BLOCK_MESH_PEERS_PER_CLIENT: LazyLock<Result<IntGaugeVec>> =
LazyLock::new(|| {
try_create_int_gauge_vec(

View File

@@ -1,5 +1,5 @@
use crate::{
metrics::{self, register_process_result_metrics},
metrics::{self, EnvelopeSource, register_process_result_metrics},
network_beacon_processor::{InvalidBlockStorage, NetworkBeaconProcessor},
service::NetworkMessage,
sync::SyncMessage,
@@ -33,7 +33,7 @@ use beacon_chain::{
use beacon_processor::{Work, WorkEvent};
use lighthouse_network::{
Client, GossipTopic, MessageAcceptance, MessageId, PeerAction, PeerId, PubsubMessage,
ReportSource,
PubsubPartialMessage, ReportSource,
};
use logging::crit;
use operation_pool::ReceivedPreCapella;
@@ -70,6 +70,45 @@ use beacon_processor::{
/// messages.
const STRICT_LATE_MESSAGE_PENALTIES: bool = false;
/// Tracks which kinds of attestation re-processing are still permitted for a gossip attestation
/// or aggregate.
///
/// A new attestation may be re-queued for an unknown block, then (post-Gloas) for an unknown
/// payload envelope, and finally not at all. Each re-queue narrows the allowance to the next
/// variant.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReprocessAllowance {
/// Re-queue for either an unknown block or an unknown payload envelope.
BlockAndPayload,
/// Re-queue only for an unknown payload envelope (already re-queued once for the block).
PayloadOnly,
/// Do not re-queue again.
None,
}
impl ReprocessAllowance {
/// Whether the attestation may be re-queued for an unknown block.
fn allows_block(self) -> bool {
matches!(self, ReprocessAllowance::BlockAndPayload)
}
/// Whether the attestation may be re-queued for an unknown payload envelope.
fn allows_payload(self) -> bool {
matches!(
self,
ReprocessAllowance::BlockAndPayload | ReprocessAllowance::PayloadOnly
)
}
/// Re-queuing always narrows the allowance so a message can't loop indefinitely.
fn next_requeue(self) -> Self {
match self {
ReprocessAllowance::BlockAndPayload => ReprocessAllowance::PayloadOnly,
ReprocessAllowance::PayloadOnly | ReprocessAllowance::None => ReprocessAllowance::None,
}
}
}
/// An attestation that has been validated by the `BeaconChain`.
///
/// Since this struct implements `beacon_chain::VerifiedAttestation`, it would be a logic error to
@@ -233,7 +272,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
attestation: Box<SingleAttestation>,
subnet_id: SubnetId,
should_import: bool,
allow_reprocess: bool,
reprocess_allowance: ReprocessAllowance,
seen_timestamp: Duration,
) {
let result = match self
@@ -256,7 +295,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
message_id,
peer_id,
subnet_id,
allow_reprocess,
reprocess_allowance,
should_import,
seen_timestamp,
);
@@ -265,7 +304,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub fn process_gossip_attestation_batch(
self: Arc<Self>,
packages: GossipAttestationBatch,
allow_reprocess: bool,
reprocess_allowance: ReprocessAllowance,
) {
let attestations_and_subnets = packages
.iter()
@@ -326,7 +365,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
package.message_id,
package.peer_id,
package.subnet_id,
allow_reprocess,
reprocess_allowance,
package.should_import,
package.seen_timestamp,
);
@@ -342,7 +381,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
message_id: MessageId,
peer_id: PeerId,
subnet_id: SubnetId,
allow_reprocess: bool,
reprocess_allowance: ReprocessAllowance,
should_import: bool,
seen_timestamp: Duration,
) {
@@ -426,7 +465,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
should_import,
seen_timestamp,
},
allow_reprocess,
reprocess_allowance,
error,
seen_timestamp,
);
@@ -446,7 +485,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
message_id: MessageId,
peer_id: PeerId,
aggregate: Box<SignedAggregateAndProof<T::EthSpec>>,
allow_reprocess: bool,
reprocess_allowance: ReprocessAllowance,
seen_timestamp: Duration,
) {
let beacon_block_root = aggregate.message().aggregate().data().beacon_block_root;
@@ -470,7 +509,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
beacon_block_root,
message_id,
peer_id,
allow_reprocess,
reprocess_allowance,
seen_timestamp,
);
}
@@ -478,7 +517,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub fn process_gossip_aggregate_batch(
self: Arc<Self>,
packages: Vec<GossipAggregatePackage<T::EthSpec>>,
allow_reprocess: bool,
reprocess_allowance: ReprocessAllowance,
) {
let aggregates = packages.iter().map(|package| package.aggregate.as_ref());
@@ -532,7 +571,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
package.beacon_block_root,
package.message_id,
package.peer_id,
allow_reprocess,
reprocess_allowance,
package.seen_timestamp,
);
}
@@ -544,7 +583,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
beacon_block_root: Hash256,
message_id: MessageId,
peer_id: PeerId,
allow_reprocess: bool,
reprocess_allowance: ReprocessAllowance,
seen_timestamp: Duration,
) {
match result {
@@ -624,7 +663,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
attestation: signed_aggregate,
seen_timestamp,
},
allow_reprocess,
reprocess_allowance,
error,
seen_timestamp,
);
@@ -898,9 +937,15 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Ok(mut column) => {
let header = column.sidecar.header.take();
if let Some(header) = header {
// Requesting cells is irrelevant as all cells are available, simply clone
// the `cells_present_bitmap`.
let request_cells = column.sidecar.cells_present_bitmap.clone();
self.send_network_message(NetworkMessage::PublishPartialColumns {
columns: vec![Arc::new(column)],
header: Arc::new(header),
messages: vec![PubsubPartialMessage::DataColumnFulu {
column: Arc::new(column),
request_cells,
header: Arc::new(header),
}],
});
} else {
crit!("Converting from full to partial yielded headerless partial")
@@ -918,12 +963,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
match result {
Ok(availability) => match availability {
AvailabilityProcessingStatus::Imported(block_root) => {
AvailabilityProcessingStatus::Imported(slot, block_root) => {
debug!(
%block_root,
"Gossipsub data column processed, imported fully available block"
);
self.chain.recompute_head_at_current_slot().await;
self.notify_import_after_column(slot, block_root);
metrics::set_gauge(
&metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION,
@@ -1037,8 +1083,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
debug!(block = %block_root, "Triggering getBlobs after receiving partial header");
// We want to publish immediately when this finishes
let publish_blobs = true;
self.fetch_engine_blobs_and_publish(header.into_header(), block_root, publish_blobs)
.await
let header = header.into_header();
self.fetch_engine_blobs_and_publish_full(header.clone(), block_root, publish_blobs)
.await;
self.publish_partial_data_columns(header, block_root).await;
}
}
}
@@ -1161,7 +1209,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_GOSSIP_PARTIAL_DATA_COLUMN_SIDECAR_MISSING_HEADER_TOTAL,
);
warn!(
debug!(
error = ?err,
%block_root,
%index,
@@ -1271,28 +1319,31 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
});
}
let only_send_completed_partials =
merge_result.local_blobs || self.chain.config.disable_get_blobs;
let columns = merge_result
.updated_partials
.into_iter()
.map(|partial| partial.into_inner())
.filter(|partial| {
!only_send_completed_partials || partial.sidecar.is_complete()
})
.collect::<Vec<_>>();
if !columns.is_empty() {
if only_send_completed_partials {
debug!(
block = %block_root,
"Not publishing incomplete partials before getBlobs"
);
}
self.send_network_message(NetworkMessage::PublishPartialColumns {
columns,
header: verified_header.into_header(),
});
if !merge_result.updated_partials.is_empty() {
let header = verified_header.into_header();
let messages = merge_result
.updated_partials
.into_iter()
.map(|partial| {
let column = partial.into_inner();
let present_cells = &column.sidecar.cells_present_bitmap;
let request_cells = if merge_result.local_blobs {
// Request all cells that are not available locally.
let mut all_one = present_cells.clone_zeroed();
all_one.not_inplace();
all_one
} else {
// Do not request cells if we don't know the local blobs yet.
present_cells.clone_zeroed()
};
PubsubPartialMessage::DataColumnFulu {
column,
request_cells,
header: header.clone(),
}
})
.collect();
self.send_network_message(NetworkMessage::PublishPartialColumns { messages });
}
Ok(avail)
}
@@ -1311,12 +1362,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
match &result {
Ok(availability) => match availability {
AvailabilityProcessingStatus::Imported(block_root) => {
AvailabilityProcessingStatus::Imported(slot, block_root) => {
debug!(
%block_root,
"Data column from partial processed, imported fully available block"
);
self.chain.recompute_head_at_current_slot().await;
self.notify_import_after_column(*slot, *block_root);
metrics::set_gauge(
&metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION,
@@ -1762,8 +1814,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.executor.spawn(
async move {
if let Ok(header) = PartialDataColumnHeader::try_from(block_clone.as_ref()) {
let header = Arc::new(header);
self_clone
.fetch_engine_blobs_and_publish(Arc::new(header), block_root, publish_blobs)
.fetch_engine_blobs_and_publish_full(
header.clone(),
block_root,
publish_blobs,
)
.await;
self_clone
.publish_partial_data_columns(header, block_root)
.await
}
}
@@ -1784,24 +1844,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
register_process_result_metrics(&result, metrics::BlockSource::Gossip, "block");
match &result {
Ok(AvailabilityProcessingStatus::Imported(block_root)) => {
if self
.beacon_processor_send
.try_send(WorkEvent {
drop_during_sync: false,
work: Work::Reprocess(ReprocessQueueMessage::BlockImported {
block_root: *block_root,
parent_root: block.message().parent_root(),
}),
})
.is_err()
{
error!(
source = "gossip",
?block_root,
"Failed to inform block import"
)
};
Ok(AvailabilityProcessingStatus::Imported(_, block_root)) => {
self.notify_block_imported(*block_root);
debug!(
?block_root,
@@ -2458,7 +2502,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
peer_id: PeerId,
message_id: MessageId,
failed_att: FailedAtt<T::EthSpec>,
allow_reprocess: bool,
reprocess_allowance: ReprocessAllowance,
error: AttnError,
seen_timestamp: Duration,
) {
@@ -2717,7 +2761,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
block = ?beacon_block_root,
"Attestation for unknown block"
);
if allow_reprocess {
if reprocess_allowance.allows_block() {
// We don't know the block, get the sync manager to handle the block lookup, and
// send the attestation to be scheduled for re-processing.
self.send_sync_message(SyncMessage::UnknownBlockHashFromAttestation(
@@ -2740,7 +2784,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
message_id,
peer_id,
attestation,
false, // Do not allow this attestation to be re-processed beyond this point.
reprocess_allowance.next_requeue(),
seen_timestamp,
)
}),
@@ -2765,7 +2809,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
attestation,
subnet_id,
should_import,
false, // Do not allow this attestation to be re-processed beyond this point.
reprocess_allowance.next_requeue(),
seen_timestamp,
)
}),
@@ -2797,6 +2841,89 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
return;
}
AttnError::UnknownPayloadEnvelope { beacon_block_root } => {
trace!(
%peer_id,
block = ?beacon_block_root,
"Payload-present attestation for block with unseen payload envelope"
);
if reprocess_allowance.allows_payload() {
// We haven't seen the block's payload envelope yet. Ask the sync manager to
// retrieve it, and schedule the attestation for re-processing once it arrives.
self.send_sync_message(SyncMessage::UnknownPayloadEnvelopeFromAttestation(
peer_id,
*beacon_block_root,
));
let msg = match failed_att {
FailedAtt::Aggregate {
attestation,
seen_timestamp,
} => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_REQUEUED_TOTAL,
);
let processor = self.clone();
ReprocessQueueMessage::UnknownPayloadAggregate(QueuedAggregate {
beacon_block_root: *beacon_block_root,
process_fn: Box::new(move || {
processor.process_gossip_aggregate(
message_id,
peer_id,
attestation,
reprocess_allowance.next_requeue(),
seen_timestamp,
)
}),
})
}
FailedAtt::Unaggregate {
attestation,
subnet_id,
should_import,
seen_timestamp,
} => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_REQUEUED_TOTAL,
);
let processor = self.clone();
ReprocessQueueMessage::UnknownPayloadUnaggregate(QueuedUnaggregate {
beacon_block_root: *beacon_block_root,
process_fn: Box::new(move || {
processor.process_gossip_attestation(
message_id,
peer_id,
attestation,
subnet_id,
should_import,
reprocess_allowance.next_requeue(),
seen_timestamp,
)
}),
})
}
};
if self
.beacon_processor_send
.try_send(WorkEvent {
drop_during_sync: false,
work: Work::Reprocess(msg),
})
.is_err()
{
error!("Failed to send attestation for re-processing")
}
} else {
// We shouldn't make any further attempts to process this attestation.
self.propagate_validation_result(
message_id,
peer_id,
MessageAcceptance::Ignore,
);
}
return;
}
AttnError::UnknownTargetRoot(_) => {
/*
* The block indicated by the target root is not known to us.
@@ -3795,16 +3922,83 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// TODO(gloas) metrics
// register_process_result_metrics(&result, metrics::BlockSource::Gossip, "envelope");
if let Err(e) = &result {
debug!(
?beacon_block_root,
%peer_id,
error = ?e,
"Execution payload envelope processing failed"
);
match &result {
Ok(AvailabilityProcessingStatus::Imported(_, block_root)) => {
self.chain.recompute_head_at_current_slot().await;
// The payload envelope is imported (`is_payload_received` is now true); release any
// attestations awaiting this block's payload so they can be re-processed.
self.notify_payload_envelope_imported(*block_root, EnvelopeSource::Gossip);
}
Ok(_) => {}
Err(e) => {
debug!(
?beacon_block_root,
%peer_id,
error = ?e,
"Execution payload envelope processing failed"
);
}
}
}
/// Inform the reprocess queue that a fully available block (or its payload envelope, post-gloas)
/// has been imported, so any attestations waiting on it can be released.
fn notify_import_after_column(&self, slot: Slot, block_root: Hash256) {
if self
.chain
.spec
.fork_name_at_slot::<T::EthSpec>(slot)
.gloas_enabled()
{
self.notify_payload_envelope_imported(block_root, EnvelopeSource::Gossip);
} else {
self.notify_block_imported(block_root);
}
}
/// Inform the reprocess queue that `block_root` has been imported as a full block.
fn notify_block_imported(&self, block_root: Hash256) {
if self
.beacon_processor_send
.try_send(WorkEvent {
drop_during_sync: false,
work: Work::Reprocess(ReprocessQueueMessage::BlockImported { block_root }),
})
.is_err()
{
error!(
source = "gossip",
?block_root,
"Failed to inform block import"
)
};
}
/// Inform the reprocess queue that `block_root`'s payload envelope has been imported, releasing
/// any attestations awaiting the payload. `source` identifies the import path for logging.
pub(crate) fn notify_payload_envelope_imported(
&self,
block_root: Hash256,
source: EnvelopeSource,
) {
if self
.beacon_processor_send
.try_send(WorkEvent {
drop_during_sync: false,
work: Work::Reprocess(ReprocessQueueMessage::PayloadEnvelopeImported {
block_root,
}),
})
.is_err()
{
error!(
source = source.as_ref(),
?block_root,
"Failed to inform payload envelope import"
)
};
}
#[instrument(
name = "lh_process_execution_payload_bid",
parent = None,
@@ -3829,7 +4023,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
| PayloadBidError::InvalidBuilder { .. }
| PayloadBidError::InvalidFeeRecipient
| PayloadBidError::ExecutionPaymentNonZero { .. }
| PayloadBidError::InvalidBlobKzgCommitments { .. },
| PayloadBidError::InvalidBlobKzgCommitments { .. }
| PayloadBidError::BidNotDescendantOfParent { .. },
) => {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
self.gossip_penalize_peer(
@@ -4009,6 +4204,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
*beacon_block_root,
))
}
PayloadAttestationError::BlockNotAtSlot { .. } => {
debug!(
%peer_id,
%message_slot,
"Payload attestation references block at wrong slot"
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
}
PayloadAttestationError::NotInPTC { .. } => {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
self.gossip_penalize_peer(
@@ -4045,3 +4248,42 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}
}
#[cfg(test)]
mod tests {
use super::ReprocessAllowance::{BlockAndPayload, None, PayloadOnly};
#[test]
fn reprocess_allowance_gates() {
// A block re-queue is only permitted for a freshly received attestation.
assert!(BlockAndPayload.allows_block());
assert!(!PayloadOnly.allows_block());
assert!(!None.allows_block());
// A payload-envelope re-queue is permitted until we've already re-queued for it.
assert!(BlockAndPayload.allows_payload());
assert!(PayloadOnly.allows_payload());
assert!(!None.allows_payload());
}
#[test]
fn reprocess_allowance_progression() {
// Each re-queue narrows the allowance to the next variant in the progression.
assert_eq!(BlockAndPayload.next_requeue(), PayloadOnly);
assert_eq!(PayloadOnly.next_requeue(), None);
assert_eq!(None.next_requeue(), None);
}
#[test]
fn reprocess_allowance_is_bounded() {
// Safety property: from any starting state, re-queuing twice reaches the terminal `None`,
// so an attestation can never loop indefinitely.
for start in [BlockAndPayload, PayloadOnly, None] {
assert_eq!(
start.next_requeue().next_requeue(),
None,
"re-queuing twice from {start:?} should be terminal"
);
}
}
}

View File

@@ -22,9 +22,13 @@ use lighthouse_network::rpc::methods::{
use lighthouse_network::service::api_types::CustodyBackfillBatchId;
use lighthouse_network::{
Client, GossipTopic, MessageId, NetworkConfig, NetworkGlobals, PeerId, PubsubMessage,
PubsubPartialMessage,
rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage},
};
use logging::crit;
use rand::prelude::SliceRandom;
use ssz_types::VariableList;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
@@ -39,6 +43,8 @@ use {
pub use sync_methods::{BlockProcessingResult, ChainSegmentProcessId};
use gossip_methods::ReprocessAllowance;
pub type Error<T> = TrySendError<BeaconWorkEvent<T>>;
mod gossip_methods;
@@ -93,15 +99,17 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
package.attestation,
package.subnet_id,
package.should_import,
true,
ReprocessAllowance::BlockAndPayload,
package.seen_timestamp,
)
};
// Define a closure for processing batches of attestations.
let processor = self.clone();
let process_batch =
move |attestations| processor.process_gossip_attestation_batch(attestations, true);
let process_batch = move |attestations| {
processor
.process_gossip_attestation_batch(attestations, ReprocessAllowance::BlockAndPayload)
};
self.try_send(BeaconWorkEvent {
drop_during_sync: true,
@@ -135,15 +143,17 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
package.message_id,
package.peer_id,
package.aggregate,
true,
ReprocessAllowance::BlockAndPayload,
package.seen_timestamp,
)
};
// Define a closure for processing batches of attestations.
let processor = self.clone();
let process_batch =
move |aggregates| processor.process_gossip_aggregate_batch(aggregates, true);
let process_batch = move |aggregates| {
processor
.process_gossip_aggregate_batch(aggregates, ReprocessAllowance::BlockAndPayload)
};
let beacon_block_root = aggregate.message().aggregate().data().beacon_block_root;
self.try_send(BeaconWorkEvent {
@@ -901,7 +911,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
});
}
pub async fn fetch_engine_blobs_and_publish(
pub async fn fetch_engine_blobs_and_publish_full(
self: &Arc<Self>,
header: Arc<PartialDataColumnHeader<T::EthSpec>>,
block_root: Hash256,
@@ -925,14 +935,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
match fetch_and_process_engine_blobs(
self.chain.clone(),
block_root,
header.clone(),
header,
custody_columns,
publish_fn,
)
.await
{
Ok(Some(availability)) => match availability {
AvailabilityProcessingStatus::Imported(_) => {
AvailabilityProcessingStatus::Imported(..) => {
debug!(
result = "imported block and custody columns",
%block_root,
@@ -969,44 +979,108 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
}
}
}
// Publish partial columns without eager send
// TODO(gloas): implement publish partial columns without eager send
if let Some(assembler) = self.chain.data_availability_checker.partial_assembler() {
let columns = assembler.get_columns_and_mark_as_local_fetched(block_root, &header);
pub async fn publish_partial_data_columns(
self: &Arc<Self>,
header: Arc<PartialDataColumnHeader<T::EthSpec>>,
block_root: Hash256,
) {
if header.kzg_commitments.is_empty() {
return;
}
// TODO(gloas): implement publish partial columns
let Some(assembler) = self.chain.data_availability_checker.partial_assembler() else {
// Partials are disabled.
return;
};
let epoch = header.slot().epoch(T::EthSpec::slots_per_epoch());
let custody_columns = self.chain.sampling_columns_for_epoch(epoch);
let columns = assembler.get_columns_and_mark_as_local_fetched(block_root, &header);
let mut present_indices: HashSet<ColumnIndex> = HashSet::with_capacity(columns.len());
let mut messages: Vec<PubsubPartialMessage<T::EthSpec>> = Vec::with_capacity(columns.len());
for column in columns {
// Republish both complete and incomplete columns as partials
let columns: Vec<_> = columns
.into_iter()
.filter_map(|column| match column {
AssemblyColumn::Incomplete(partial) => Some(partial.into_inner()),
AssemblyColumn::Complete(full) => {
let DataColumnSidecar::Fulu(fulu) = full.as_data_column() else {
return None;
};
match fulu.to_partial() {
Ok(partial) => Some(Arc::new(partial)),
Err(err) => {
error!(
%block_root,
column_index = %full.index(),
?err,
"Failed to convert complete column to partial for re-seeding"
);
None
}
let partial_column = match column {
AssemblyColumn::Incomplete(partial) => partial.into_inner(),
AssemblyColumn::Complete(full) => {
let DataColumnSidecar::Fulu(fulu) = full.as_data_column() else {
continue;
};
match fulu.to_partial() {
Ok(partial) => Arc::new(partial),
Err(err) => {
error!(
%block_root,
column_index = %full.index(),
?err,
"Failed to convert complete column to partial for re-seeding"
);
continue;
}
}
})
.collect();
if !columns.is_empty() {
debug!(block = %block_root, "Publishing all partials after getBlobs");
self.send_network_message(NetworkMessage::PublishPartialColumns {
columns,
header,
});
} else {
debug!(block = %block_root, "No partials to publish after getBlobs");
}
};
present_indices.insert(partial_column.index);
let mut request_cells = partial_column.sidecar.cells_present_bitmap.clone_zeroed();
request_cells.not_inplace();
messages.push(PubsubPartialMessage::DataColumnFulu {
column: partial_column,
request_cells,
header: header.clone(),
});
}
// For each custody column without any local partial, send an empty placeholder
// that requests all cells.
let num_cells = header.kzg_commitments.len();
for col_idx in custody_columns {
if present_indices.contains(col_idx) {
continue;
}
// `kzg_commitments.len()` is bounded by `MaxBlobCommitmentsPerBlock`, so the
// bitmap constructor is infallible.
let Ok(cells_present_bitmap) = CellBitmap::<T::EthSpec>::with_capacity(num_cells)
else {
crit!(
%block_root,
num_cells,
column_index = %col_idx,
"CellBitmap construction failed despite being bounded by MaxBlobCommitmentsPerBlock"
);
continue;
};
let request_cells = cells_present_bitmap.not();
messages.push(PubsubPartialMessage::DataColumnFulu {
column: Arc::new(PartialDataColumn {
block_root,
index: *col_idx,
sidecar: PartialDataColumnSidecar {
cells_present_bitmap,
column: VariableList::empty(),
kzg_proofs: VariableList::empty(),
header: None.into(),
},
}),
request_cells,
header: header.clone(),
});
}
if !messages.is_empty() {
debug!(
block = %block_root,
count = messages.len(),
"Publishing all partials"
);
self.send_network_message(NetworkMessage::PublishPartialColumns { messages });
} else {
// This should not happen, as any custody columns will have at least an empty
// partial published.
warn!(block = %block_root, "No partials to publish");
}
}
@@ -1020,7 +1094,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Ok(Some((availability_processing_status, data_columns_to_publish))) => {
self.publish_data_columns_gradually(data_columns_to_publish, block_root);
match &availability_processing_status {
AvailabilityProcessingStatus::Imported(hash) => {
AvailabilityProcessingStatus::Imported(_, hash) => {
debug!(
result = "imported block and custody columns",
block_hash = %hash,

View File

@@ -1,4 +1,4 @@
use crate::metrics::{self, register_process_result_metrics};
use crate::metrics::{self, EnvelopeSource, register_process_result_metrics};
use crate::network_beacon_processor::{FUTURE_SLOT_TOLERANCE, NetworkBeaconProcessor};
use crate::sync::BatchProcessResult;
use crate::sync::manager::CustodyBatchProcessResult;
@@ -158,8 +158,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
return;
};
let slot = block.slot();
let parent_root = block.message().parent_root();
let commitments_formatted = block.as_block().commitments_formatted();
debug!(
@@ -186,17 +184,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// RPC block imported, regardless of process type
match result.as_ref() {
Ok(AvailabilityProcessingStatus::Imported(hash)) => {
Ok(AvailabilityProcessingStatus::Imported(slot, hash)) => {
info!(
%slot,
%hash,
"New RPC block received",
);
// Trigger processing for work referencing this block.
let reprocess_msg = ReprocessQueueMessage::BlockImported {
block_root: *hash,
parent_root,
};
let reprocess_msg = ReprocessQueueMessage::BlockImported { block_root: *hash };
if self
.beacon_processor_send
.try_send(WorkEvent {
@@ -213,7 +208,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
};
self.chain.block_times_cache.write().set_time_observed(
*hash,
slot,
*slot,
seen_timestamp,
None,
None,
@@ -227,7 +222,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// to be sent from the peers if we already have them.
if let Ok(header) = signed_beacon_block.as_ref().try_into() {
let publish_blobs = false;
self.fetch_engine_blobs_and_publish(
self.fetch_engine_blobs_and_publish_full(
Arc::new(header),
block_root,
publish_blobs,
@@ -294,7 +289,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
match &result {
Ok(availability) => match availability {
AvailabilityProcessingStatus::Imported(hash) => {
AvailabilityProcessingStatus::Imported(_, hash) => {
debug!(
result = "imported block and custody columns",
block_hash = %hash,
@@ -376,6 +371,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let result: Result<AvailabilityProcessingStatus, BlockError> =
result.map_err(|e| BlockError::InternalError(format!("envelope: {e}")));
// The payload envelope is imported; release any attestations awaiting this block's payload
// so they can be re-processed (parity with the gossip import path).
if let Ok(AvailabilityProcessingStatus::Imported(_, block_root)) = &result {
self.chain.recompute_head_at_current_slot().await;
self.notify_payload_envelope_imported(*block_root, EnvelopeSource::Rpc);
}
self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type,
result: result.into(),
@@ -1018,7 +1020,7 @@ impl From<Result<AvailabilityProcessingStatus, BlockError>> for BlockProcessingR
))
}
match result {
Ok(AvailabilityProcessingStatus::Imported(_)) => Self::Imported(true, "imported"),
Ok(AvailabilityProcessingStatus::Imported(..)) => Self::Imported(true, "imported"),
Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => {
Self::Imported(false, "missing_components")
}

View File

@@ -19,7 +19,7 @@ use lighthouse_network::rpc::methods::RpcResponse;
use lighthouse_network::service::Network;
use lighthouse_network::types::GossipKind;
use lighthouse_network::{
Context, PeerAction, PubsubMessage, ReportSource, Response, Subnet,
Context, PeerAction, PubsubMessage, PubsubPartialMessage, ReportSource, Response, Subnet,
rpc::{GoodbyeReason, RpcErrorResponse},
};
use lighthouse_network::{MessageAcceptance, prometheus_client::registry::Registry};
@@ -39,8 +39,8 @@ use tokio::time::Sleep;
use tracing::{debug, error, info, trace, warn};
use typenum::Unsigned;
use types::{
EthSpec, ForkContext, PartialDataColumn, PartialDataColumnHeader, Slot, SubnetId,
SyncCommitteeSubscription, SyncSubnetId, ValidatorSubscription,
EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId,
ValidatorSubscription,
};
mod tests;
@@ -85,8 +85,7 @@ pub enum NetworkMessage<E: EthSpec> {
Publish { messages: Vec<PubsubMessage<E>> },
/// Publish partial data column sidecars via the partial gossipsub protocol.
PublishPartialColumns {
columns: Vec<Arc<PartialDataColumn<E>>>,
header: Arc<PartialDataColumnHeader<E>>,
messages: Vec<PubsubPartialMessage<E>>,
},
/// Validates a received gossipsub message. This will propagate the message on the network.
ValidationResult {
@@ -683,8 +682,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
);
self.libp2p.publish(messages);
}
NetworkMessage::PublishPartialColumns { columns, header } => {
self.libp2p.publish_partial(columns, header);
NetworkMessage::PublishPartialColumns { messages } => {
self.libp2p.publish_partial(messages);
}
NetworkMessage::ReportPeer {
peer_id,

View File

@@ -156,6 +156,11 @@ pub enum SyncMessage<E: EthSpec> {
/// manager to attempt to find the block matching the unknown hash.
UnknownBlockHashFromAttestation(PeerId, Hash256),
/// A peer has sent a payload-present attestation (`index == 1`) for a block whose execution
/// payload envelope we have not seen. This triggers the manager to fetch the payload envelope
/// for `block_root` via `ExecutionPayloadEnvelopesByRoot`.
UnknownPayloadEnvelopeFromAttestation(PeerId, Hash256),
/// A peer has disconnected.
Disconnect(PeerId),
@@ -260,6 +265,10 @@ pub struct SyncManager<T: BeaconChainTypes> {
/// may forward us thousands of a attestations, each one triggering an individual event. Only
/// one event is useful, the rest generating log noise and wasted cycles
notified_unknown_roots: LRUTimeCache<(PeerId, Hash256)>,
/// Debounce duplicated `UnknownPayloadEnvelopeFromAttestation` for the same root/peer tuple,
/// for the same reason as `notified_unknown_roots`: a peer may forward many payload-present
/// attestations for a block whose execution payload envelope we have not yet seen.
notified_unknown_payload_roots: LRUTimeCache<(PeerId, Hash256)>,
}
/// Spawns a new `SyncManager` thread which has a weak reference to underlying beacon
@@ -320,6 +329,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
notified_unknown_roots: LRUTimeCache::new(Duration::from_secs(
NOTIFIED_UNKNOWN_ROOT_EXPIRY_SECONDS,
)),
notified_unknown_payload_roots: LRUTimeCache::new(Duration::from_secs(
NOTIFIED_UNKNOWN_ROOT_EXPIRY_SECONDS,
)),
}
}
@@ -895,6 +907,22 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.handle_unknown_block_root(peer_id, block_root);
}
}
SyncMessage::UnknownPayloadEnvelopeFromAttestation(peer_id, block_root) => {
if !self
.notified_unknown_payload_roots
.contains(&(peer_id, block_root))
{
self.notified_unknown_payload_roots
.insert((peer_id, block_root));
// TODO(gloas): trigger a payload-envelope lookup for `block_root` via
// `ExecutionPayloadEnvelopesByRoot`. Wired up in the gloas lookup-sync PR (#9155).
debug!(
?block_root,
?peer_id,
"Received unknown payload envelope from attestation"
);
}
}
SyncMessage::Disconnect(peer_id) => {
debug!(%peer_id, "Received disconnected message");
self.peer_disconnect(&peer_id);

View File

@@ -27,7 +27,9 @@ use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::{
BlobsByRangeRequest, DataColumnsByRangeRequest, PayloadEnvelopesByRangeRequest,
};
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, RequestType};
use lighthouse_network::rpc::{
BlocksByRangeRequest, GoodbyeReason, MAX_CONCURRENT_REQUESTS, RPCError, RequestType,
};
pub use lighthouse_network::service::api_types::RangeRequestId;
use lighthouse_network::service::api_types::{
AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId,
@@ -40,8 +42,8 @@ use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSourc
use parking_lot::RwLock;
pub use requests::LookupVerifyError;
use requests::{
ActiveRequests, BlobsByRangeRequestItems, BlocksByRangeRequestItems, BlocksByRootRequestItems,
DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems,
ActiveRequestItems, ActiveRequests, BlobsByRangeRequestItems, BlocksByRangeRequestItems,
BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems,
PayloadEnvelopesByRangeRequestItems, PayloadEnvelopesByRootRequestItems,
};
#[cfg(test)]
@@ -100,6 +102,30 @@ pub type RpcResponseResult<T> = Result<(T, Duration), RpcResponseError>;
pub type CustodyByRootResult<T> =
Result<DownloadResult<DataColumnSidecarList<T>>, RpcResponseError>;
/// Per-peer count of active requests for a single protocol, to keep peer selection within
/// `MAX_CONCURRENT_REQUESTS` concurrent requests per protocol ID.
struct ActiveRequestsPerPeer {
count_by_peer: HashMap<PeerId, usize>,
}
impl ActiveRequestsPerPeer {
fn new<K, T>(requests: &ActiveRequests<K, T>) -> Self
where
K: Copy + Eq + std::hash::Hash + std::fmt::Display,
T: ActiveRequestItems,
{
let mut count_by_peer = HashMap::<PeerId, usize>::new();
for peer_id in requests.iter_request_peers() {
*count_by_peer.entry(peer_id).or_default() += 1;
}
Self { count_by_peer }
}
fn at_concurrency_limit(&self, peer_id: &PeerId) -> bool {
self.count_by_peer.get(peer_id).copied().unwrap_or(0) >= MAX_CONCURRENT_REQUESTS
}
}
#[derive(Debug)]
#[allow(private_interfaces)]
pub enum RpcResponseError {
@@ -397,9 +423,9 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.collect()
}
pub fn get_custodial_peers(&self, column_index: ColumnIndex) -> Vec<PeerId> {
pub fn get_custodial_peers(&self, column_index: ColumnIndex, block_slot: Slot) -> Vec<PeerId> {
self.network_globals()
.custody_peers_for_column(column_index)
.custody_peers_for_column(column_index, block_slot)
}
pub fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
@@ -440,47 +466,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
}
fn active_request_count_by_peer(&self) -> HashMap<PeerId, usize> {
let Self {
network_send: _,
request_id: _,
blocks_by_root_requests,
payload_envelopes_by_root_requests,
data_columns_by_root_requests,
blocks_by_range_requests,
blobs_by_range_requests,
data_columns_by_range_requests,
payload_envelopes_by_range_requests,
// custody_by_root_requests is a meta request of data_columns_by_root_requests
custody_by_root_requests: _,
// components_by_range_requests is a meta request of various _by_range requests
components_by_range_requests: _,
custody_backfill_data_column_batch_requests: _,
execution_engine_state: _,
network_beacon_processor: _,
chain: _,
fork_context: _,
// Don't use a fallback match. We want to be sure that all requests are considered when
// adding new ones
} = self;
let mut active_request_count_by_peer = HashMap::<PeerId, usize>::new();
for peer_id in blocks_by_root_requests
.iter_request_peers()
.chain(payload_envelopes_by_root_requests.iter_request_peers())
.chain(data_columns_by_root_requests.iter_request_peers())
.chain(blocks_by_range_requests.iter_request_peers())
.chain(blobs_by_range_requests.iter_request_peers())
.chain(data_columns_by_range_requests.iter_request_peers())
.chain(payload_envelopes_by_range_requests.iter_request_peers())
{
*active_request_count_by_peer.entry(peer_id).or_default() += 1;
}
active_request_count_by_peer
}
/// Retries only the specified failed columns by requesting them again.
///
/// Note: This function doesn't retry the whole batch, but retries specific requests within
@@ -507,8 +492,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
return Err("request id not present".to_string());
};
let active_request_count_by_peer = self.active_request_count_by_peer();
debug!(
?failed_columns,
?id,
@@ -518,12 +501,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// Attempt to find all required custody peers to request the failed columns from
let columns_by_range_peers_to_request = self
.select_columns_by_range_peers_to_request(
failed_columns,
peers,
active_request_count_by_peer,
peers_to_deprioritize,
)
.select_columns_by_range_peers_to_request(failed_columns, peers, peers_to_deprioritize)
.map_err(|e| format!("{:?}", e))?;
// Reuse the id for the request that received partially correct responses
@@ -581,7 +559,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
column_peers = column_peers.len()
);
let _guard = range_request_span.clone().entered();
let active_request_count_by_peer = self.active_request_count_by_peer();
let blocks_by_range_per_peer = ActiveRequestsPerPeer::new(&self.blocks_by_range_requests);
let Some(block_peer) = block_peers
.iter()
@@ -589,8 +567,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
(
// If contains -> 1 (order after), not contains -> 0 (order first)
peers_to_deprioritize.contains(peer),
// Prefer peers with less overall requests
active_request_count_by_peer.get(peer).copied().unwrap_or(0),
// Strictly de-prioritize peers already at the per-protocol concurrency limit
blocks_by_range_per_peer.at_concurrency_limit(peer),
// Random factor to break ties, otherwise the PeerID breaks ties
rand::random::<u32>(),
peer,
@@ -620,7 +598,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
Some(self.select_columns_by_range_peers_to_request(
&column_indexes,
column_peers,
active_request_count_by_peer,
peers_to_deprioritize,
)?)
} else {
@@ -692,6 +669,9 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let payloads_req_id =
if matches!(batch_type, ByRangeRequestType::BlocksAndEnvelopesAndColumns) {
Some(self.send_payload_envelopes_by_range_request(
// Peer selection: for a given peer, the count of sent blocks_by_range requests
// equals the count of sent payloads_by_range requests. So we are under the
// concurrency limit for payloads_by_range requests
block_peer,
PayloadEnvelopesByRangeRequest {
start_slot: *request.start_slot(),
@@ -731,10 +711,11 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&self,
custody_indexes: &HashSet<ColumnIndex>,
peers: &HashSet<PeerId>,
active_request_count_by_peer: HashMap<PeerId, usize>,
peers_to_deprioritize: &HashSet<PeerId>,
) -> Result<HashMap<PeerId, Vec<ColumnIndex>>, RpcRequestSendError> {
let mut columns_to_request_by_peer = HashMap::<PeerId, Vec<ColumnIndex>>::new();
let data_columns_by_range_per_peer =
ActiveRequestsPerPeer::new(&self.data_columns_by_range_requests);
for column_index in custody_indexes {
// Strictly consider peers that are custodials of this column AND are part of this
@@ -750,12 +731,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
(
// If contains -> 1 (order after), not contains -> 0 (order first)
peers_to_deprioritize.contains(peer),
// Prefer peers with less overall requests
// Also account for requests that are not yet issued tracked in peer_id_to_request_map
// We batch requests to the same peer, so count existance in the
// `columns_to_request_by_peer` as a single 1 request.
active_request_count_by_peer.get(peer).copied().unwrap_or(0)
+ columns_to_request_by_peer.get(peer).map(|_| 1).unwrap_or(0),
// Strictly de-prioritize peers already at the per-protocol concurrency limit
// Note: do not account for to-be-sent requests on
// `data_columns_by_range_by_peer` as we always send at most one request
data_columns_by_range_per_peer.at_concurrency_limit(peer),
// Random factor to break ties, otherwise the PeerID breaks ties
rand::random::<u32>(),
peer,
@@ -881,14 +860,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
block_root: Hash256,
) -> Result<LookupRequestResult<Arc<SignedBeaconBlock<T::EthSpec>>>, RpcRequestSendError> {
let active_request_count_by_peer = self.active_request_count_by_peer();
let blocks_by_root_per_peer = ActiveRequestsPerPeer::new(&self.blocks_by_root_requests);
let Some(peer_id) = lookup_peers
.read()
.iter()
.map(|peer| {
(
// Prefer peers with less overall requests
active_request_count_by_peer.get(peer).copied().unwrap_or(0),
// Strictly de-prioritize peers already at the per-protocol concurrency limit
blocks_by_root_per_peer.at_concurrency_limit(peer),
// Random factor to break ties, otherwise the PeerID breaks ties
rand::random::<u32>(),
peer,
@@ -1001,13 +980,15 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
));
}
let active_request_count_by_peer = self.active_request_count_by_peer();
let payload_envelopes_by_root_per_peer =
ActiveRequestsPerPeer::new(&self.payload_envelopes_by_root_requests);
let Some(peer_id) = lookup_peers
.read()
.iter()
.map(|peer| {
(
active_request_count_by_peer.get(peer).copied().unwrap_or(0),
// Strictly de-prioritize peers already at the per-protocol concurrency limit
payload_envelopes_by_root_per_peer.at_concurrency_limit(peer),
rand::random::<u32>(),
peer,
)
@@ -1161,6 +1142,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let requester = CustodyRequester(id);
let mut request = ActiveCustodyRequest::new(
block_root,
block_slot,
CustodyId { requester },
&custody_indexes_to_fetch,
lookup_peers,
@@ -1756,7 +1738,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
peers: &HashSet<PeerId>,
peers_to_deprioritize: &HashSet<PeerId>,
) -> Result<CustodyBackFillBatchRequestId, RpcRequestSendError> {
let active_request_count_by_peer = self.active_request_count_by_peer();
// Attempt to find all required custody peers before sending any request or creating an ID
let columns_by_range_peers_to_request = {
let column_indexes = self
@@ -1769,7 +1750,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.select_columns_by_range_peers_to_request(
&column_indexes,
peers,
active_request_count_by_peer,
peers_to_deprioritize,
)?
};

View File

@@ -13,15 +13,18 @@ use std::hash::{BuildHasher, RandomState};
use std::time::{Duration, Instant};
use std::{collections::HashMap, marker::PhantomData, sync::Arc};
use tracing::{Span, debug, debug_span, warn};
use types::{DataColumnSidecar, Hash256, data::ColumnIndex};
use types::{DataColumnSidecar, Hash256, Slot, data::ColumnIndex};
use types::{DataColumnSidecarList, EthSpec};
use super::{LookupRequestResult, PeerGroup, RpcResponseResult, SyncNetworkContext};
use super::{
ActiveRequestsPerPeer, LookupRequestResult, PeerGroup, RpcResponseResult, SyncNetworkContext,
};
const MAX_STALE_NO_PEERS_DURATION: Duration = Duration::from_secs(30);
pub struct ActiveCustodyRequest<T: BeaconChainTypes> {
block_root: Hash256,
block_slot: Slot,
custody_id: CustodyId,
/// List of column indices this request needs to download to complete successfully
column_requests: FnvHashMap<ColumnIndex, ColumnRequest<T::EthSpec>>,
@@ -62,6 +65,7 @@ pub type CustodyRequestResult<E> = Result<Option<DownloadResult<DataColumnSideca
impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
pub(crate) fn new(
block_root: Hash256,
block_slot: Slot,
custody_id: CustodyId,
column_indices: &[ColumnIndex],
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
@@ -73,6 +77,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
);
Self {
block_root,
block_slot,
custody_id,
column_requests: HashMap::from_iter(
column_indices
@@ -234,7 +239,8 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
)));
}
let active_request_count_by_peer = cx.active_request_count_by_peer();
let data_columns_by_root_per_peer =
ActiveRequestsPerPeer::new(&cx.data_columns_by_root_requests);
let mut columns_to_request_by_peer = HashMap::<PeerId, Vec<ColumnIndex>>::new();
let mut columns_without_peers = vec![];
let lookup_peers = self.lookup_peers.read();
@@ -252,7 +258,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
let peer_to_request = self.select_column_peer(
cx,
&active_request_count_by_peer,
&data_columns_by_root_per_peer,
&lookup_peers,
*column_index,
&random_state,
@@ -357,7 +363,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
fn select_column_peer(
&self,
cx: &mut SyncNetworkContext<T>,
active_request_count_by_peer: &HashMap<PeerId, usize>,
data_columns_by_root_per_peer: &ActiveRequestsPerPeer,
lookup_peers: &HashSet<PeerId>,
column_index: ColumnIndex,
random_state: &RandomState,
@@ -365,7 +371,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
// We draw from the total set of peers, but prioritize those peers who we have
// received an attestation or a block from (`lookup_peers`), as the `lookup_peers` may take
// time to build up and we are likely to not find any column peers initially.
let custodial_peers = cx.get_custodial_peers(column_index);
let custodial_peers = cx.get_custodial_peers(column_index, self.block_slot);
let mut prioritized_peers = custodial_peers
.iter()
.filter(|peer| {
@@ -374,12 +380,12 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
})
.map(|peer| {
(
// Strictly de-prioritize peers already at the per-protocol concurrency limit
data_columns_by_root_per_peer.at_concurrency_limit(peer),
// Prioritize peers that claim to know have imported this block
if lookup_peers.contains(peer) { 0 } else { 1 },
// De-prioritize peers that we have already attempted to download from
self.peer_attempts.get(peer).copied().unwrap_or(0),
// Prefer peers with fewer requests to load balance across peers.
active_request_count_by_peer.get(peer).copied().unwrap_or(0),
// The hash ensures consistent peer ordering within this request
// to avoid fragmentation while varying selection across different requests.
random_state.hash_one(peer),

View File

@@ -1989,7 +1989,7 @@ impl TestRig {
block: Arc<SignedBeaconBlock<E>>,
) {
match self.import_block_to_da_checker(block).await {
AvailabilityProcessingStatus::Imported(_) => {
AvailabilityProcessingStatus::Imported(..) => {
panic!("block removed from da_checker, available")
}
AvailabilityProcessingStatus::MissingComponents(_, block_root) => {