Add DataColumnSidecar gossip topic and message handling (#6147)

* Add `DataColumnSidecar` gossip topic and verification (#5050 and #5783).

* Remove gossip verification changes (#5783).

* Merge branch 'unstable' into data-column-gossip

# Conflicts:
#	beacon_node/beacon_chain/src/data_column_verification.rs
#	beacon_node/beacon_chain/src/lib.rs

* Add gossip cache timeout for data columns. Rename data column metrics for consistency.

* Remove usage of `unimplemented!` and address review comments.

* Remove unnused `GossipDataColumnError` variants and address review comments.

* Merge branch 'unstable' into data-column-gossip

* Update Cargo.lock

* Arc `ChainSpec` in discovery to avoid performance regression when needing to clone it repeatedly.
This commit is contained in:
Jimmy Chen
2024-07-25 16:05:18 +10:00
committed by GitHub
parent a2ab26c327
commit 4e5a363a4f
26 changed files with 907 additions and 31 deletions

View File

@@ -72,6 +72,10 @@ lazy_static! {
"beacon_processor_gossip_blob_verified_total",
"Total number of gossip blob verified for propagation."
);
pub static ref BEACON_PROCESSOR_GOSSIP_DATA_COLUMN_SIDECAR_VERIFIED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_gossip_data_column_verified_total",
"Total number of gossip data column sidecar verified for propagation."
);
// Gossip Exits.
pub static ref BEACON_PROCESSOR_EXIT_VERIFIED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_exit_verified_total",
@@ -357,6 +361,22 @@ lazy_static! {
"Count of times when a gossip blob arrived from the network later than the attestation deadline.",
);
pub static ref BEACON_DATA_COLUMN_DELAY_GOSSIP: Result<IntGauge> = try_create_int_gauge(
"beacon_data_column_delay_gossip_last_delay",
"The first time we see this data column as a delay from the start of the slot"
);
pub static ref BEACON_DATA_COLUMN_DELAY_GOSSIP_VERIFICATION: Result<IntGauge> = try_create_int_gauge(
"beacon_data_column_delay_gossip_verification",
"Keeps track of the time delay from the start of the slot to the point we propagate the data column"
);
pub static ref BEACON_DATA_COLUMN_DELAY_FULL_VERIFICATION: Result<IntGauge> = try_create_int_gauge(
"beacon_data_column_last_full_verification_delay",
"The time it takes to verify a beacon data column"
);
/*
* Light client update reprocessing queue metrics.
*/

View File

@@ -6,6 +6,7 @@ use crate::{
};
use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::data_column_verification::GossipVerifiedDataColumn;
use beacon_chain::store::Error;
use beacon_chain::{
attestation_verification::{self, Error as AttnError, VerifiedAttestation},
@@ -32,8 +33,9 @@ use store::hot_cold_store::HotColdDBError;
use tokio::sync::mpsc;
use types::{
beacon_block::BlockImportSource, Attestation, AttestationRef, AttesterSlashing, BlobSidecar,
EthSpec, Hash256, IndexedAttestation, LightClientFinalityUpdate, LightClientOptimisticUpdate,
ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBlsToExecutionChange,
DataColumnSidecar, DataColumnSubnetId, EthSpec, Hash256, IndexedAttestation,
LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing,
SignedAggregateAndProof, SignedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedVoluntaryExit, Slot, SubnetId, SyncCommitteeMessage,
SyncSubnetId,
};
@@ -599,6 +601,67 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}
pub async fn process_gossip_data_column_sidecar(
self: &Arc<Self>,
message_id: MessageId,
peer_id: PeerId,
_peer_client: Client,
subnet_id: DataColumnSubnetId,
column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
seen_duration: Duration,
) {
let slot = column_sidecar.slot();
let block_root = column_sidecar.block_root();
let index = column_sidecar.index;
let delay = get_slot_delay_ms(seen_duration, slot, &self.chain.slot_clock);
// Log metrics to track delay from other nodes on the network.
metrics::set_gauge(
&metrics::BEACON_DATA_COLUMN_DELAY_GOSSIP,
delay.as_millis() as i64,
);
match self
.chain
.verify_data_column_sidecar_for_gossip(column_sidecar, *subnet_id)
{
Ok(gossip_verified_data_column) => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_GOSSIP_DATA_COLUMN_SIDECAR_VERIFIED_TOTAL,
);
debug!(
self.log,
"Successfully verified gossip data column sidecar";
"slot" => %slot,
"block_root" => %block_root,
"index" => %index,
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
// Log metrics to keep track of propagation delay times.
if let Some(duration) = SystemTime::now()
.duration_since(UNIX_EPOCH)
.ok()
.and_then(|now| now.checked_sub(seen_duration))
{
metrics::set_gauge(
&metrics::BEACON_DATA_COLUMN_DELAY_GOSSIP_VERIFICATION,
duration.as_millis() as i64,
);
}
self.process_gossip_verified_data_column(
peer_id,
gossip_verified_data_column,
seen_duration,
)
.await
}
Err(_) => {
// TODO(das) implement gossip error handling
}
}
}
#[allow(clippy::too_many_arguments)]
pub async fn process_gossip_blob(
self: &Arc<Self>,
@@ -837,6 +900,81 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}
pub async fn process_gossip_verified_data_column(
self: &Arc<Self>,
peer_id: PeerId,
verified_data_column: GossipVerifiedDataColumn<T>,
// This value is not used presently, but it might come in handy for debugging.
_seen_duration: Duration,
) {
let processing_start_time = Instant::now();
let block_root = verified_data_column.block_root();
let data_column_slot = verified_data_column.slot();
let data_column_index = verified_data_column.id().index;
match self
.chain
.process_gossip_data_columns(vec![verified_data_column])
.await
{
Ok(availability) => {
match availability {
AvailabilityProcessingStatus::Imported(block_root) => {
// Note: Reusing block imported metric here
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL,
);
info!(
self.log,
"Gossipsub data column processed, imported fully available block";
"block_root" => %block_root
);
self.chain.recompute_head_at_current_slot().await;
metrics::set_gauge(
&metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION,
processing_start_time.elapsed().as_millis() as i64,
);
}
AvailabilityProcessingStatus::MissingComponents(slot, block_root) => {
trace!(
self.log,
"Processed data column, waiting for other components";
"slot" => %slot,
"data_column_index" => %data_column_index,
"block_root" => %block_root,
);
// Potentially trigger reconstruction
}
}
}
Err(BlockError::BlockIsAlreadyKnown(_)) => {
debug!(
self.log,
"Ignoring gossip column already imported";
"block_root" => ?block_root,
"data_column_index" => data_column_index,
);
}
Err(err) => {
debug!(
self.log,
"Invalid gossip data column";
"outcome" => ?err,
"block root" => ?block_root,
"block slot" => data_column_slot,
"data column index" => data_column_index,
);
self.gossip_penalize_peer(
peer_id,
PeerAction::MidToleranceError,
"bad_gossip_data_column_ssz",
);
}
}
}
/// Process the beacon block received from the gossip network and:
///
/// - If it passes gossip propagation criteria, tell the network thread to forward it.
@@ -1086,6 +1224,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
return None;
}
Err(e @ BlockError::InternalError(_)) => {
error!(self.log, "Internal block gossip validation error";
"error" => %e
);
return None;
}
};
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL);

View File

@@ -223,6 +223,36 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
})
}
/// Create a new `Work` event for some data column sidecar.
pub fn send_gossip_data_column_sidecar(
self: &Arc<Self>,
message_id: MessageId,
peer_id: PeerId,
peer_client: Client,
subnet_id: DataColumnSubnetId,
column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
seen_timestamp: Duration,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = async move {
processor
.process_gossip_data_column_sidecar(
message_id,
peer_id,
peer_client,
subnet_id,
column_sidecar,
seen_timestamp,
)
.await
};
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::GossipDataColumnSidecar(Box::pin(process_fn)),
})
}
/// Create a new `Work` event for some sync committee signature.
pub fn send_gossip_sync_signature(
self: &Arc<Self>,

View File

@@ -319,6 +319,20 @@ impl<T: BeaconChainTypes> Router<T> {
),
)
}
PubsubMessage::DataColumnSidecar(data) => {
let (subnet_id, column_sidecar) = *data;
self.handle_beacon_processor_send_result(
self.network_beacon_processor
.send_gossip_data_column_sidecar(
message_id,
peer_id,
self.network_globals.client(&peer_id),
subnet_id,
column_sidecar,
timestamp_now(),
),
)
}
PubsubMessage::VoluntaryExit(exit) => {
debug!(self.log, "Received a voluntary exit"; "peer_id" => %peer_id);
self.handle_beacon_processor_send_result(

View File

@@ -63,7 +63,7 @@ use std::ops::Sub;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot};
use types::{BlobSidecar, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot};
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
/// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a
@@ -107,6 +107,9 @@ pub enum SyncMessage<E: EthSpec> {
/// A blob with an unknown parent has been received.
UnknownParentBlob(PeerId, Arc<BlobSidecar<E>>),
/// A data column with an unknown parent has been received.
UnknownParentDataColumn(PeerId, Arc<DataColumnSidecar<E>>),
/// A peer has sent an attestation that references a block that is unknown. This triggers the
/// manager to attempt to find the block matching the unknown hash.
UnknownBlockHashFromAttestation(PeerId, Hash256),
@@ -646,6 +649,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}),
);
}
SyncMessage::UnknownParentDataColumn(_peer_id, _data_column) => {
// TODO(das): data column parent lookup to be implemented
}
SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_root) => {
if !self.notified_unknown_roots.contains(&(peer_id, block_root)) {
self.notified_unknown_roots.insert((peer_id, block_root));