Resolve merge conflicts

This commit is contained in:
Eitan Seri-Levi
2026-04-25 17:14:57 +09:00
72 changed files with 6137 additions and 709 deletions

View File

@@ -143,6 +143,22 @@ pub static BEACON_PROCESSOR_GOSSIP_DATA_COLUMN_SIDECAR_VERIFIED_TOTAL: LazyLock<
"Total number of gossip data column sidecar verified for propagation.",
)
});
pub static BEACON_PROCESSOR_GOSSIP_PARTIAL_DATA_COLUMN_SIDECAR_VERIFIED_TOTAL: LazyLock<
Result<IntCounter>,
> = LazyLock::new(|| {
try_create_int_counter(
"beacon_processor_gossip_partial_data_column_verified_total",
"Total number of gossip partial data column sidecar verified for propagation.",
)
});
pub static BEACON_PROCESSOR_GOSSIP_PARTIAL_DATA_COLUMN_SIDECAR_MISSING_HEADER_TOTAL: LazyLock<
Result<IntCounter>,
> = LazyLock::new(|| {
try_create_int_counter(
"beacon_processor_gossip_partial_data_column_missing_header_total",
"Total number of gossip partial data column sidecar received without a (cached) header.",
)
});
// Gossip Exits.
pub static BEACON_PROCESSOR_EXIT_VERIFIED_TOTAL: LazyLock<Result<IntCounter>> =
LazyLock::new(|| {
@@ -601,6 +617,16 @@ pub static BEACON_DATA_COLUMN_GOSSIP_PROPAGATION_VERIFICATION_DELAY_TIME: LazyLo
decimal_buckets(-3, -1),
)
});
pub static BEACON_PARTIAL_DATA_COLUMN_GOSSIP_PROPAGATION_VERIFICATION_DELAY_TIME: LazyLock<
Result<Histogram>,
> = LazyLock::new(|| {
try_create_histogram_with_buckets(
"beacon_partial_data_column_gossip_propagation_verification_delay_time",
"Duration between when the partial data column sidecar is received over gossip and when it is verified for propagation.",
// [0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5]
decimal_buckets(-3, -1),
)
});
pub static BEACON_DATA_COLUMN_GOSSIP_SLOT_START_DELAY_TIME: LazyLock<Result<Histogram>> =
LazyLock::new(|| {
try_create_histogram_with_buckets(
@@ -615,6 +641,28 @@ pub static BEACON_DATA_COLUMN_GOSSIP_SLOT_START_DELAY_TIME: LazyLock<Result<Hist
//decimal_buckets(-1,2)
)
});
pub static BEACON_PARTIAL_DATA_COLUMN_GOSSIP_SLOT_START_DELAY_TIME: LazyLock<Result<Histogram>> =
LazyLock::new(|| {
try_create_histogram_with_buckets(
"beacon_partial_data_column_gossip_slot_start_delay_time",
"Duration between when the partial data column sidecar is received over gossip and the start of the slot it belongs to.",
// Create a custom bucket list for greater granularity in block delay
Ok(vec![
0.1, 0.2, 0.3, 0.4, 0.5, 0.75, 1.0, 1.25, 1.5, 1.75, 2.0, 2.5, 3.0, 3.5, 4.0, 5.0,
6.0, 7.0, 8.0, 9.0, 10.0, 15.0, 20.0,
]), // NOTE: Previous values, which we may want to switch back to.
// [0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50]
//decimal_buckets(-1,2)
)
});
pub static BEACON_USEFUL_FULL_COLUMNS_RECEIVED_TOTAL: LazyLock<Result<IntCounterVec>> =
LazyLock::new(|| {
try_create_int_counter_vec(
"beacon_useful_full_columns_received_total",
"Number of useful full columns (any cell being useful) received",
&["column_index"],
)
});
pub static BEACON_BLOB_DELAY_GOSSIP_VERIFICATION: LazyLock<Result<IntGauge>> = LazyLock::new(
|| {

View File

@@ -4,6 +4,14 @@ use crate::{
service::NetworkMessage,
sync::SyncMessage,
};
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::data_column_verification::{
GossipDataColumnError, GossipPartialDataColumnError, GossipVerifiedDataColumn,
GossipVerifiedPartialDataColumnHeader, KzgVerifiedPartialDataColumn,
PartialColumnVerificationResult,
};
use beacon_chain::payload_bid_verification::PayloadBidError;
use beacon_chain::proposer_preferences_verification::ProposerPreferencesError;
use beacon_chain::store::Error;
use beacon_chain::{
AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError,
@@ -22,13 +30,11 @@ use beacon_chain::{
EnvelopeError, gossip_verified_envelope::GossipVerifiedEnvelope,
},
};
use beacon_chain::{block_verification_types::AsBlock, payload_bid_verification::PayloadBidError};
use beacon_chain::{
data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn},
proposer_preferences_verification::ProposerPreferencesError,
};
use beacon_processor::{Work, WorkEvent};
use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource};
use lighthouse_network::{
Client, GossipTopic, MessageAcceptance, MessageId, PeerAction, PeerId, PubsubMessage,
ReportSource,
};
use logging::crit;
use operation_pool::ReceivedPreCapella;
use slot_clock::SlotClock;
@@ -41,13 +47,14 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use store::hot_cold_store::HotColdDBError;
use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
use types::{
Attestation, AttestationData, AttestationRef, AttesterSlashing, BlobSidecar, DataColumnSidecar,
DataColumnSubnetId, EthSpec, Hash256, IndexedAttestation, LightClientFinalityUpdate,
LightClientOptimisticUpdate, PayloadAttestationMessage, ProposerSlashing,
SignedAggregateAndProof, SignedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedExecutionPayloadBid, SignedExecutionPayloadEnvelope,
SignedProposerPreferences, SignedVoluntaryExit, SingleAttestation, Slot, SubnetId,
SyncCommitteeMessage, SyncSubnetId, block::BlockImportSource,
Attestation, AttestationData, AttestationRef, AttesterSlashing, BlobSidecar, ColumnIndex,
DataColumnSidecar, DataColumnSubnetId, EthSpec, Hash256, IndexedAttestation,
LightClientFinalityUpdate, LightClientOptimisticUpdate, PartialDataColumn,
PartialDataColumnHeader, PayloadAttestationMessage, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof,
SignedExecutionPayloadBid, SignedExecutionPayloadEnvelope, SignedProposerPreferences,
SignedVoluntaryExit, SingleAttestation, Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId,
block::BlockImportSource,
};
use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction;
@@ -196,6 +203,19 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
})
}
/// Send a message on `message_tx` that `peer_id` has sent an invalid partial message and should
/// be penalized.
pub(crate) fn propagate_partial_validation_failure(
&self,
propagation_source: PeerId,
gossip_topic: GossipTopic,
) {
self.send_network_message(NetworkMessage::PartialValidationFailure {
propagation_source,
gossip_topic,
})
}
/* Processing functions */
/// Process the unaggregated attestation received from the gossip network and:
@@ -697,7 +717,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
MessageAcceptance::Accept,
);
}
GossipDataColumnError::ParentUnknown { parent_root } => {
GossipDataColumnError::ParentUnknown { parent_root, .. } => {
debug!(
action = "requesting parent",
%block_root,
@@ -723,6 +743,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
| GossipDataColumnError::InvalidSubnetId { .. }
| GossipDataColumnError::InvalidInclusionProof
| GossipDataColumnError::InvalidKzgProof { .. }
| GossipDataColumnError::MismatchesCachedColumn
| GossipDataColumnError::UnexpectedDataColumn
| GossipDataColumnError::InvalidColumnIndex(_)
| GossipDataColumnError::MaxBlobsPerBlockExceeded { .. }
@@ -784,6 +805,261 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}
#[instrument(
name = "lh_process_gossip_partial_data_column",
parent = None,
level = "debug",
skip_all,
fields(block_root = ?column.block_root, index = column.index),
)]
pub async fn process_gossip_partial_data_column_sidecar(
self: &Arc<Self>,
peer_id: PeerId,
column: Box<PartialDataColumn<T::EthSpec>>,
seen_duration: Duration,
topic: GossipTopic,
) {
let block_root = column.block_root;
let index = column.index;
let result = self
.chain
.verify_partial_data_column_sidecar_for_gossip(column, seen_duration);
let header = match result {
PartialColumnVerificationResult::Ok { header, column } => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_GOSSIP_PARTIAL_DATA_COLUMN_SIDECAR_VERIFIED_TOTAL,
);
let slot = header.as_header().slot();
debug!(
%slot,
%block_root,
%index,
"Successfully verified gossip partial data column sidecar"
);
// Log metrics to keep track of propagation delay times.
if let Some(duration) = UNIX_EPOCH
.elapsed()
.ok()
.and_then(|now| now.checked_sub(seen_duration))
{
metrics::observe_duration(
&metrics::BEACON_PARTIAL_DATA_COLUMN_GOSSIP_PROPAGATION_VERIFICATION_DELAY_TIME,
duration,
);
}
self.process_gossip_verified_partial_data_column(
peer_id,
column,
header.clone(),
slot,
)
.await;
Some(header)
}
PartialColumnVerificationResult::ErrWithValidHeader { header, err } => {
self.handle_partial_verification_error(peer_id, err, block_root, index, topic);
Some(header)
}
PartialColumnVerificationResult::Err(err) => {
self.handle_partial_verification_error(peer_id, err, block_root, index, topic);
None
}
};
if let Some(header) = header {
let slot = header.as_header().slot();
let delay = get_slot_delay_ms(seen_duration, slot, &self.chain.slot_clock);
// Log metrics to track delay from other nodes on the network.
metrics::observe_duration(
&metrics::BEACON_PARTIAL_DATA_COLUMN_GOSSIP_SLOT_START_DELAY_TIME,
delay,
);
if !header.was_cached() {
debug!(block = %block_root, "Triggering getBlobs after receiving partial header");
// We want to publish immediately when this finishes
let publish_blobs = true;
self.fetch_engine_blobs_and_publish(header.into_header(), block_root, publish_blobs)
.await
}
}
}
fn handle_partial_verification_error(
self: &Arc<Self>,
peer_id: PeerId,
err: GossipPartialDataColumnError,
block_root: Hash256,
index: ColumnIndex,
topic: GossipTopic,
) {
match err {
GossipPartialDataColumnError::GossipDataColumnError(err) => match err {
GossipDataColumnError::InvalidVariant => {
// TODO(gloas) we should probably penalize the peer here
debug!(
%block_root,
%index,
"Invalid gossip partial data column variant."
)
}
GossipDataColumnError::PriorKnownUnpublished => {
debug!(
%block_root,
%index,
"Gossip partial data column already processed via the EL."
);
}
GossipDataColumnError::ParentUnknown { parent_root, slot } => {
debug!(
action = "requesting parent",
%block_root,
%parent_root,
"Unknown parent hash for partial column"
);
self.send_sync_message(SyncMessage::UnknownParentPartialDataColumn {
peer_id,
block_root,
parent_root,
slot,
});
}
GossipDataColumnError::PubkeyCacheTimeout
| GossipDataColumnError::BeaconChainError(_) => {
crit!(
error = ?err,
"Internal error when verifying partial column sidecar"
)
}
GossipDataColumnError::ProposalSignatureInvalid
| GossipDataColumnError::UnknownValidator(_)
| GossipDataColumnError::ProposerIndexMismatch { .. }
| GossipDataColumnError::IsNotLaterThanParent { .. }
| GossipDataColumnError::InvalidSubnetId { .. }
| GossipDataColumnError::InvalidInclusionProof
| GossipDataColumnError::InvalidKzgProof { .. }
| GossipDataColumnError::MismatchesCachedColumn
| GossipDataColumnError::UnexpectedDataColumn
| GossipDataColumnError::InvalidColumnIndex(_)
| GossipDataColumnError::MaxBlobsPerBlockExceeded { .. }
| GossipDataColumnError::InconsistentCommitmentsLength { .. }
| GossipDataColumnError::InconsistentProofsLength { .. }
| GossipDataColumnError::NotFinalizedDescendant { .. } => {
debug!(
error = ?err,
%block_root,
%index,
"Could not verify partial column for gossip. Rejecting the column sidecar"
);
// Prevent recurring behaviour by penalizing the peer slightly.
self.gossip_penalize_peer(
peer_id,
PeerAction::LowToleranceError,
"gossip_partial_data_column_low",
);
self.propagate_partial_validation_failure(peer_id, topic);
}
GossipDataColumnError::PriorKnown { .. } => {
// Data column is available via either the EL or reconstruction.
// Do not penalise the peer.
// Gossip filter should filter any duplicates received after this.
debug!(
%block_root,
%index,
"Received already available column sidecar. Ignoring the partial column sidecar"
)
}
GossipDataColumnError::FutureSlot { .. }
| GossipDataColumnError::PastFinalizedSlot { .. } => {
debug!(
error = ?err,
%block_root,
%index,
"Could not verify column sidecar for gossip. Ignoring the partial column sidecar"
);
// Prevent recurring behaviour by penalizing the peer slightly.
self.gossip_penalize_peer(
peer_id,
PeerAction::HighToleranceError,
"gossip_partial_data_column_high",
);
}
},
GossipPartialDataColumnError::MissingHeader => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_GOSSIP_PARTIAL_DATA_COLUMN_SIDECAR_MISSING_HEADER_TOTAL,
);
warn!(
error = ?err,
%block_root,
%index,
"Received partial column while not having header stored"
);
self.gossip_penalize_peer(
peer_id,
PeerAction::HighToleranceError,
"gossip_partial_data_column_high",
);
}
GossipPartialDataColumnError::HeaderMismatches
| GossipPartialDataColumnError::HeaderIncorrectRoot { .. } => {
debug!(
error = ?err,
%block_root,
%index,
"Could not verify partial column header"
);
self.gossip_penalize_peer(
peer_id,
PeerAction::LowToleranceError,
"gossip_partial_data_column_low",
);
}
GossipPartialDataColumnError::EmptyMessage
| GossipPartialDataColumnError::InconsistentPresentCount { .. }
| GossipPartialDataColumnError::InconsistentCommitmentsLength { .. } => {
debug!(
error = ?err,
%block_root,
%index,
"Could not verify partial column"
);
self.gossip_penalize_peer(
peer_id,
PeerAction::LowToleranceError,
"gossip_partial_data_column_low",
);
}
GossipPartialDataColumnError::PartialColumnsDisabled => {
error!(
error = ?err,
%block_root,
%index,
"Received partial column while disabled"
);
self.gossip_penalize_peer(
peer_id,
PeerAction::LowToleranceError,
"gossip_partial_data_column_low",
);
}
GossipPartialDataColumnError::InternalError(_) => {
error!(
error = ?err,
%block_root,
%index,
"Internal error while processing partial column"
);
}
}
}
#[allow(clippy::too_many_arguments)]
#[instrument(
name = "lh_process_gossip_blob",
@@ -1030,6 +1306,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}
/// Process a gossip-verified full data column (not partial).
/// Partials are handled by process_gossip_verified_partial_data_column.
async fn process_gossip_verified_data_column(
self: &Arc<Self>,
peer_id: PeerId,
@@ -1042,6 +1320,30 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let data_column_slot = verified_data_column.slot();
let data_column_index = verified_data_column.index();
if let DataColumnSidecar::Fulu(col) = verified_data_column.as_data_column()
&& self
.chain
.data_availability_checker
.partial_assembler()
.is_some_and(|a| !a.is_complete(block_root, verified_data_column.index()))
{
metrics::inc_counter_vec(
&metrics::BEACON_USEFUL_FULL_COLUMNS_RECEIVED_TOTAL,
&[&data_column_index.to_string()],
);
let mut column = col.to_partial();
let header = column.sidecar.header.take();
if let Some(header) = header {
self.send_network_message(NetworkMessage::PublishPartialColumns {
columns: vec![Arc::new(column)],
header: Arc::new(header),
});
} else {
crit!("Converting from full to partial yielded headerless partial")
};
}
let result = self
.chain
.process_gossip_data_columns(vec![verified_data_column], || Ok(()))
@@ -1070,44 +1372,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"Processed data column, waiting for other components"
);
if self
.chain
.data_availability_checker
.custody_context()
.should_attempt_reconstruction(
slot.epoch(T::EthSpec::slots_per_epoch()),
&self.chain.spec,
)
{
// Instead of triggering reconstruction immediately, schedule it to be run. If
// another column arrives, it either completes availability or pushes
// reconstruction back a bit.
let cloned_self = Arc::clone(self);
let block_root = *block_root;
if self
.beacon_processor_send
.try_send(WorkEvent {
drop_during_sync: false,
work: Work::Reprocess(
ReprocessQueueMessage::DelayColumnReconstruction(
QueuedColumnReconstruction {
block_root,
slot: *slot,
process_fn: Box::pin(async move {
cloned_self
.attempt_data_column_reconstruction(block_root)
.await;
}),
},
),
),
})
.is_err()
{
warn!("Unable to send reconstruction to reprocessing");
}
}
self.check_reconstruction_trigger(*slot, block_root).await;
}
},
Err(BlockError::DuplicateFullyImported(_)) => {
@@ -1143,6 +1408,183 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}
/// Process a gossip-verified partial data column by merging it in the assembler
async fn process_gossip_verified_partial_data_column(
self: &Arc<Self>,
_peer_id: PeerId,
verified_partial: KzgVerifiedPartialDataColumn<T::EthSpec>,
verified_header: GossipVerifiedPartialDataColumnHeader<T::EthSpec>,
slot: Slot,
) {
let processing_start_time = Instant::now();
let block_root = verified_partial.block_root();
let data_column_index = verified_partial.index();
let result = self
.chain
.process_gossip_partial_data_column(verified_partial, verified_header.clone(), slot)
.await;
// First, handle merge results (if any)
let result = match result {
Ok(Some((avail, merge_result))) => {
if !merge_result.full_columns.is_empty() {
debug!(
%block_root,
index = data_column_index,
"Partial data column completed to full column"
);
self.send_network_message(NetworkMessage::Publish {
messages: merge_result
.full_columns
.into_iter()
.map(|col| {
let subnet = DataColumnSubnetId::from_column_index(
col.index(),
&self.chain.spec,
);
PubsubMessage::DataColumnSidecar(Box::new((
subnet,
col.into_inner(),
)))
})
.collect(),
});
}
let only_send_completed_partials =
merge_result.local_blobs || self.chain.config.disable_get_blobs;
let columns = merge_result
.updated_partials
.into_iter()
.map(|partial| partial.into_inner())
.filter(|partial| {
!only_send_completed_partials || partial.sidecar.is_complete()
})
.collect::<Vec<_>>();
if !columns.is_empty() {
if only_send_completed_partials {
debug!(
block = %block_root,
"Not publishing incomplete partials before getBlobs"
);
}
self.send_network_message(NetworkMessage::PublishPartialColumns {
columns,
header: verified_header.into_header(),
});
}
Ok(avail)
}
Ok(None) => {
// Column was not merged because it is not a custody column.
return;
}
Err(err) => Err(err),
};
register_process_result_metrics(
&result,
metrics::BlockSource::Gossip,
"partial_data_column",
);
match &result {
Ok(availability) => match availability {
AvailabilityProcessingStatus::Imported(block_root) => {
debug!(
%block_root,
"Data column from partial processed, imported fully available block"
);
self.chain.recompute_head_at_current_slot().await;
metrics::set_gauge(
&metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION,
processing_start_time.elapsed().as_millis() as i64,
);
}
AvailabilityProcessingStatus::MissingComponents(slot, block_root) => {
trace!(
%slot,
%data_column_index,
%block_root,
"Processed data column from partial, waiting for other components"
);
self.check_reconstruction_trigger(*slot, block_root).await;
}
},
Err(BlockError::DuplicateFullyImported(_)) => {
debug!(
?block_root,
data_column_index, "Ignoring completed gossip column already imported"
);
}
Err(err) => {
debug!(
outcome = ?err,
?block_root,
block_slot = %slot,
data_column_index,
"Invalid completed gossip data column"
);
// We can't really penalize here, as the error might be the fault of another peer
// contributing to the partial.
}
}
// If a block is in the da_checker, sync maybe awaiting for an event when block is finally
// imported. A block can become imported both after processing a block or data column. If a
// importing a block results in `Imported`, notify. Do not notify of data column errors.
if matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))) {
self.send_sync_message(SyncMessage::GossipBlockProcessResult {
block_root,
imported: true,
});
}
}
async fn check_reconstruction_trigger(self: &Arc<Self>, slot: Slot, block_root: &Hash256) {
if self
.chain
.data_availability_checker
.custody_context()
.should_attempt_reconstruction(
slot.epoch(T::EthSpec::slots_per_epoch()),
&self.chain.spec,
)
{
// Instead of triggering reconstruction immediately, schedule it to be run. If
// another column arrives, it either completes availability or pushes
// reconstruction back a bit.
let cloned_self = Arc::clone(self);
let block_root = *block_root;
if self
.beacon_processor_send
.try_send(WorkEvent {
drop_during_sync: false,
work: Work::Reprocess(ReprocessQueueMessage::DelayColumnReconstruction(
QueuedColumnReconstruction {
block_root,
slot,
process_fn: Box::pin(async move {
cloned_self
.attempt_data_column_reconstruction(block_root)
.await;
}),
},
)),
})
.is_err()
{
warn!("Unable to send reconstruction to reprocessing");
}
}
}
/// Process the beacon block received from the gossip network and:
///
/// - If it passes gossip propagation criteria, tell the network thread to forward it.
@@ -1512,23 +1954,21 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// Block is gossip valid. Attempt to fetch blobs from the EL using versioned hashes derived
// from kzg commitments, without having to wait for all blobs to be sent from the peers.
// TODO(gloas) we'll want to use this same optimization, but we need to refactor the
// `fetch_and_process_engine_blobs` flow to support gloas.
if !block.fork_name_unchecked().gloas_enabled() {
let publish_blobs = true;
let self_clone = self.clone();
let block_clone = block.clone();
let current_span = Span::current();
self.executor.spawn(
async move {
let publish_blobs = true;
let self_clone = self.clone();
let block_clone = block.clone();
let current_span = Span::current();
self.executor.spawn(
async move {
if let Ok(header) = PartialDataColumnHeader::try_from(block_clone.as_ref()) {
self_clone
.fetch_engine_blobs_and_publish(block_clone, block_root, publish_blobs)
.fetch_engine_blobs_and_publish(Arc::new(header), block_root, publish_blobs)
.await
}
.instrument(current_span),
"fetch_blobs_gossip",
);
}
}
.instrument(current_span),
"fetch_blobs_gossip",
);
let result = self
.chain

View File

@@ -20,7 +20,7 @@ use lighthouse_network::rpc::methods::{
};
use lighthouse_network::service::api_types::CustodyBackfillBatchId;
use lighthouse_network::{
Client, MessageId, NetworkGlobals, PeerId, PubsubMessage,
Client, GossipTopic, MessageId, NetworkGlobals, PeerId, PubsubMessage,
rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage},
};
use rand::prelude::SliceRandom;
@@ -251,6 +251,32 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
})
}
/// Create a new `Work` event for some partial data column sidecar.
pub fn send_gossip_partial_data_column_sidecar(
self: &Arc<Self>,
peer_id: PeerId,
column_sidecar: Box<PartialDataColumn<T::EthSpec>>,
seen_timestamp: Duration,
topic: GossipTopic,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = async move {
processor
.process_gossip_partial_data_column_sidecar(
peer_id,
column_sidecar,
seen_timestamp,
topic,
)
.await
};
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::GossipPartialDataColumnSidecar(Box::pin(process_fn)),
})
}
/// Create a new `Work` event for some sync committee signature.
pub fn send_gossip_sync_signature(
self: &Arc<Self>,
@@ -910,14 +936,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub async fn fetch_engine_blobs_and_publish(
self: &Arc<Self>,
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
header: Arc<PartialDataColumnHeader<T::EthSpec>>,
block_root: Hash256,
publish_blobs: bool,
) {
if self.chain.config.disable_get_blobs {
return;
}
let epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
let epoch = header.slot().epoch(T::EthSpec::slots_per_epoch());
let custody_columns = self.chain.sampling_columns_for_epoch(epoch);
let self_cloned = self.clone();
let publish_fn = move |blobs_or_data_column| {
@@ -942,7 +968,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
match fetch_and_process_engine_blobs(
self.chain.clone(),
block_root,
block.clone(),
header.clone(),
custody_columns,
publish_fn,
)
@@ -986,6 +1012,23 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
}
}
// Publish partial columns without eager send
if let Some(assembler) = self.chain.data_availability_checker.partial_assembler() {
let columns = assembler.get_partials_and_mark_as_local_fetched(block_root, &header);
if !columns.is_empty() {
debug!(block = %block_root, "Publishing all partials after getBlobs");
self.send_network_message(NetworkMessage::PublishPartialColumns {
columns: columns
.into_iter()
.map(|partial| partial.into_inner())
.collect(),
header,
});
} else {
debug!(block = %block_root, "No partials to publish after getBlobs");
}
}
}
/// Attempts to reconstruct all data columns if the conditions checked in

View File

@@ -291,9 +291,15 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// Block is valid, we can now attempt fetching blobs from EL using version hashes
// derived from kzg commitments from the block, without having to wait for all blobs
// to be sent from the peers if we already have them.
let publish_blobs = false;
self.fetch_engine_blobs_and_publish(signed_beacon_block, block_root, publish_blobs)
if let Ok(header) = signed_beacon_block.as_ref().try_into() {
let publish_blobs = false;
self.fetch_engine_blobs_and_publish(
Arc::new(header),
block_root,
publish_blobs,
)
.await;
}
}
_ => {}
}

View File

@@ -14,7 +14,7 @@ use beacon_processor::{BeaconProcessorSend, DuplicateCache};
use futures::prelude::*;
use lighthouse_network::rpc::*;
use lighthouse_network::{
MessageId, NetworkGlobals, PeerId, PubsubMessage, Response,
GossipTopic, MessageId, NetworkGlobals, PeerId, PubsubMessage, Response,
service::api_types::{AppRequestId, SyncRequestId},
};
use logging::TimeLatch;
@@ -25,7 +25,7 @@ use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, error, trace, warn};
use types::{
BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, SignedBeaconBlock,
BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, PartialDataColumn, SignedBeaconBlock,
SignedExecutionPayloadEnvelope,
};
@@ -72,6 +72,8 @@ pub enum RouterMessage<E: EthSpec> {
/// message, the message itself and a bool which indicates if the message should be processed
/// by the beacon chain after successful verification.
PubsubMessage(MessageId, PeerId, PubsubMessage<E>, bool),
/// A partial data column sidecar has been received via gossipsub partial protocol.
PartialDataColumnSidecar(PeerId, Box<PartialDataColumn<E>>, GossipTopic),
/// The peer manager has requested we re-status a peer.
StatusPeer(PeerId),
/// The peer has an updated custody group count from METADATA.
@@ -183,6 +185,16 @@ impl<T: BeaconChainTypes> Router<T> {
RouterMessage::PubsubMessage(id, peer_id, gossip, should_process) => {
self.handle_gossip(id, peer_id, gossip, should_process);
}
RouterMessage::PartialDataColumnSidecar(peer_id, column, topic) => self
.handle_beacon_processor_send_result(
self.network_beacon_processor
.send_gossip_partial_data_column_sidecar(
peer_id,
column,
self.chain.slot_clock.now_duration().unwrap_or_default(),
topic,
),
),
}
}

View File

@@ -39,8 +39,8 @@ use tokio::time::Sleep;
use tracing::{debug, error, info, trace, warn};
use typenum::Unsigned;
use types::{
EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId,
ValidatorSubscription,
EthSpec, ForkContext, PartialDataColumn, PartialDataColumnHeader, Slot, SubnetId,
SyncCommitteeSubscription, SyncSubnetId, ValidatorSubscription,
};
mod tests;
@@ -83,6 +83,11 @@ pub enum NetworkMessage<E: EthSpec> {
},
/// Publish a list of messages to the gossipsub protocol.
Publish { messages: Vec<PubsubMessage<E>> },
/// Publish partial data column sidecars via the partial gossipsub protocol.
PublishPartialColumns {
columns: Vec<Arc<PartialDataColumn<E>>>,
header: Arc<PartialDataColumnHeader<E>>,
},
/// Validates a received gossipsub message. This will propagate the message on the network.
ValidationResult {
/// The peer that sent us the message. We don't send back to this peer.
@@ -92,6 +97,13 @@ pub enum NetworkMessage<E: EthSpec> {
/// The result of the validation
validation_result: MessageAcceptance,
},
/// Reports validation failure of a partial message.
PartialValidationFailure {
/// The peer that sent us the message.
propagation_source: PeerId,
/// The topic of the message.
gossip_topic: GossipTopic,
},
/// Reports a peer to the peer manager for performing an action.
ReportPeer {
peer_id: PeerId,
@@ -540,7 +552,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
let subnet_id = subnet_and_attestation.0;
let attestation = &subnet_and_attestation.1;
// checks if we have an aggregator for the slot. If so, we should process
// the attestation, else we just just propagate the Attestation.
// the attestation, else we just propagate the Attestation.
let should_process = self.subnet_service.should_process_attestation(
Subnet::Attestation(subnet_id),
&attestation.data,
@@ -560,6 +572,15 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}
}
NetworkEvent::PartialDataColumnSidecar {
source,
column,
topic,
} => {
self.send_to_router(RouterMessage::PartialDataColumnSidecar(
source, column, topic,
));
}
NetworkEvent::NewListenAddr(multiaddr) => {
self.network_globals
.listen_multiaddrs
@@ -640,11 +661,19 @@ impl<T: BeaconChainTypes> NetworkService<T> {
validation_result,
);
}
NetworkMessage::PartialValidationFailure {
propagation_source,
gossip_topic,
} => {
self.libp2p
.report_partial_message_validation_failure(propagation_source, gossip_topic);
}
NetworkMessage::Publish { messages } => {
let mut topic_kinds = Vec::new();
for message in &messages {
if !topic_kinds.contains(&message.kind()) {
topic_kinds.push(message.kind());
let kind = message.kind();
if !topic_kinds.contains(&kind) {
topic_kinds.push(kind);
}
}
debug!(
@@ -654,6 +683,9 @@ impl<T: BeaconChainTypes> NetworkService<T> {
);
self.libp2p.publish(messages);
}
NetworkMessage::PublishPartialColumns { columns, header } => {
self.libp2p.publish_partial(columns, header);
}
NetworkMessage::ReportPeer {
peer_id,
action,

View File

@@ -49,7 +49,7 @@ use std::sync::Arc;
use std::time::Duration;
use store::Hash256;
use tracing::{debug, error, warn};
use types::{BlobSidecar, DataColumnSidecar, EthSpec, SignedBeaconBlock};
use types::{EthSpec, SignedBeaconBlock};
pub mod common;
pub mod parent_chain;
@@ -81,22 +81,21 @@ const LOOKUP_MAX_DURATION_NO_PEERS_SECS: u64 = 10;
/// take at most 2 GB. 200 lookups allow 3 parallel chains of depth 64 (current maximum).
const MAX_LOOKUPS: usize = 200;
/// The values for `Blob`, `DataColumn` and `PartialDataColumn` is the parent root of the column.
pub enum BlockComponent<E: EthSpec> {
Block(DownloadResult<Arc<SignedBeaconBlock<E>>>),
Blob(DownloadResult<Arc<BlobSidecar<E>>>),
DataColumn(DownloadResult<Arc<DataColumnSidecar<E>>>),
Blob(DownloadResult<Hash256>),
DataColumn(DownloadResult<Hash256>),
PartialDataColumn(DownloadResult<Hash256>),
}
impl<E: EthSpec> BlockComponent<E> {
fn parent_root(&self) -> Hash256 {
match self {
BlockComponent::Block(block) => block.value.parent_root(),
BlockComponent::Blob(blob) => blob.value.block_parent_root(),
BlockComponent::DataColumn(column) => match column.value.as_ref() {
DataColumnSidecar::Fulu(column) => column.block_parent_root(),
// TODO(gloas) we don't have a parent root post gloas, not sure what to do here
DataColumnSidecar::Gloas(column) => column.beacon_block_root,
},
BlockComponent::Blob(parent_root)
| BlockComponent::DataColumn(parent_root)
| BlockComponent::PartialDataColumn(parent_root) => parent_root.value,
}
}
fn get_type(&self) -> &'static str {
@@ -104,6 +103,7 @@ impl<E: EthSpec> BlockComponent<E> {
BlockComponent::Block(_) => "block",
BlockComponent::Blob(_) => "blob",
BlockComponent::DataColumn(_) => "data_column",
BlockComponent::PartialDataColumn(_) => "partial_data_column",
}
}
}

View File

@@ -209,7 +209,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
.block_request_state
.state
.insert_verified_response(block),
BlockComponent::Blob(_) | BlockComponent::DataColumn(_) => {
BlockComponent::Blob(_)
| BlockComponent::DataColumn(_)
| BlockComponent::PartialDataColumn(_) => {
// For now ignore single blobs and columns, as the blob request state assumes all blobs are
// attributed to the same peer = the peer serving the remaining blobs. Ignoring this
// block component has a minor effect, causing the node to re-request this blob

View File

@@ -154,6 +154,14 @@ pub enum SyncMessage<E: EthSpec> {
/// A block's parent is known but its execution payload envelope has not been received yet.
UnknownParentEnvelope(PeerId, Arc<SignedBeaconBlock<E>>, Hash256),
/// A partial data column with an unknown parent has been received.
UnknownParentPartialDataColumn {
peer_id: PeerId,
block_root: Hash256,
parent_root: Hash256,
slot: Slot,
},
/// A peer has sent an attestation that references a block that is unknown. This triggers the
/// manager to attempt to find the block matching the unknown hash.
UnknownBlockHashFromAttestation(PeerId, Hash256),
@@ -895,7 +903,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
parent_root,
blob_slot,
BlockComponent::Blob(DownloadResult {
value: blob,
value: parent_root,
block_root,
seen_timestamp: self.chain.slot_clock.now_duration().unwrap_or_default(),
peer_group: PeerGroup::from_single(peer_id),
@@ -915,7 +923,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
parent_root,
data_column_slot,
BlockComponent::DataColumn(DownloadResult {
value: data_column,
value: parent_root,
block_root,
seen_timestamp: self
.chain
@@ -953,6 +961,26 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}),
);
}
SyncMessage::UnknownParentPartialDataColumn {
peer_id,
block_root,
parent_root,
slot,
} => {
debug!(%block_root, %parent_root, "Received unknown parent partial column message");
self.handle_unknown_parent(
peer_id,
block_root,
parent_root,
slot,
BlockComponent::PartialDataColumn(DownloadResult {
value: parent_root,
block_root,
seen_timestamp: self.chain.slot_clock.now_duration().unwrap_or_default(),
peer_group: PeerGroup::from_single(peer_id),
}),
);
}
SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_root) => {
if !self.notified_unknown_roots.contains(&(peer_id, block_root)) {
self.notified_unknown_roots.insert((peer_id, block_root));