From 4e5a363a4f1e0141b6f4c478807f3575b674e904 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Thu, 25 Jul 2024 16:05:18 +1000 Subject: [PATCH] Add `DataColumnSidecar` gossip topic and message handling (#6147) * Add `DataColumnSidecar` gossip topic and verification (#5050 and #5783). * Remove gossip verification changes (#5783). * Merge branch 'unstable' into data-column-gossip # Conflicts: # beacon_node/beacon_chain/src/data_column_verification.rs # beacon_node/beacon_chain/src/lib.rs * Add gossip cache timeout for data columns. Rename data column metrics for consistency. * Remove usage of `unimplemented!` and address review comments. * Remove unnused `GossipDataColumnError` variants and address review comments. * Merge branch 'unstable' into data-column-gossip * Update Cargo.lock * Arc `ChainSpec` in discovery to avoid performance regression when needing to clone it repeatedly. --- Cargo.lock | 1 + beacon_node/beacon_chain/src/beacon_chain.rs | 87 ++++++++ .../beacon_chain/src/block_verification.rs | 39 ++++ .../src/data_availability_checker.rs | 9 + .../src/data_column_verification.rs | 185 +++++++++++++++- beacon_node/beacon_chain/src/lib.rs | 2 +- beacon_node/beacon_chain/src/metrics.rs | 12 ++ beacon_node/beacon_processor/src/lib.rs | 25 ++- beacon_node/beacon_processor/src/metrics.rs | 5 + beacon_node/lighthouse_network/Cargo.toml | 1 + .../lighthouse_network/src/discovery/mod.rs | 21 +- .../src/discovery/subnet_predicate.rs | 21 +- .../src/peer_manager/mod.rs | 4 + .../src/peer_manager/peerdb/peer_info.rs | 9 + .../src/service/gossip_cache.rs | 7 + .../lighthouse_network/src/service/mod.rs | 10 +- .../lighthouse_network/src/types/pubsub.rs | 56 ++++- .../lighthouse_network/src/types/subnet.rs | 4 +- .../lighthouse_network/src/types/topics.rs | 17 +- beacon_node/network/src/metrics.rs | 20 ++ .../gossip_methods.rs | 148 ++++++++++++- .../src/network_beacon_processor/mod.rs | 30 +++ beacon_node/network/src/router.rs | 14 ++ beacon_node/network/src/sync/manager.rs | 8 +- consensus/types/src/data_column_subnet_id.rs | 198 ++++++++++++++++++ consensus/types/src/lib.rs | 5 + 26 files changed, 907 insertions(+), 31 deletions(-) create mode 100644 consensus/types/src/data_column_subnet_id.rs diff --git a/Cargo.lock b/Cargo.lock index f00d109db2..605cb4d2a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5012,6 +5012,7 @@ dependencies = [ "futures", "gossipsub", "hex", + "itertools 0.10.5", "lazy_static", "libp2p", "libp2p-mplex", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7ebd011cf4..1fa77a2043 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -23,6 +23,7 @@ use crate::chain_config::ChainConfig; use crate::data_availability_checker::{ Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker, }; +use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; use crate::early_attester_cache::EarlyAttesterCache; use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; @@ -2118,6 +2119,19 @@ impl BeaconChain { }) } + pub fn verify_data_column_sidecar_for_gossip( + self: &Arc, + data_column_sidecar: Arc>, + subnet_id: u64, + ) -> Result, GossipDataColumnError> { + metrics::inc_counter(&metrics::DATA_COLUMN_SIDECAR_PROCESSING_REQUESTS); + let _timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES); + GossipVerifiedDataColumn::new(data_column_sidecar, subnet_id, self).map(|v| { + metrics::inc_counter(&metrics::DATA_COLUMN_SIDECAR_PROCESSING_SUCCESSES); + v + }) + } + pub fn verify_blob_sidecar_for_gossip( self: &Arc, blob_sidecar: Arc>, @@ -2964,6 +2978,39 @@ impl BeaconChain { self.remove_notified(&block_root, r) } + /// Cache the data columns in the processing cache, process it, then evict it from the cache if it was + /// imported or errors. + pub async fn process_gossip_data_columns( + self: &Arc, + data_columns: Vec>, + ) -> Result> { + let Ok(block_root) = data_columns + .iter() + .map(|c| c.block_root()) + .unique() + .exactly_one() + else { + return Err(BlockError::InternalError( + "Columns should be from the same block".to_string(), + )); + }; + + // If this block has already been imported to forkchoice it must have been available, so + // we don't need to process its samples again. + if self + .canonical_head + .fork_choice_read_lock() + .contains_block(&block_root) + { + return Err(BlockError::BlockIsAlreadyKnown(block_root)); + } + + let r = self + .check_gossip_data_columns_availability_and_import(data_columns) + .await; + self.remove_notified_custody_columns(&block_root, r) + } + /// Cache the blobs in the processing cache, process it, then evict it from the cache if it was /// imported or errors. pub async fn process_rpc_blobs( @@ -3013,6 +3060,21 @@ impl BeaconChain { r } + /// Remove any block components from the *processing cache* if we no longer require them. If the + /// block was imported full or erred, we no longer require them. + fn remove_notified_custody_columns( + &self, + block_root: &Hash256, + r: Result>, + ) -> Result> { + let has_missing_components = + matches!(r, Ok(AvailabilityProcessingStatus::MissingComponents(_, _))); + if !has_missing_components { + self.reqresp_pre_import_cache.write().remove(block_root); + } + r + } + /// Wraps `process_block` in logic to cache the block's commitments in the processing cache /// and evict if the block was imported or errored. pub async fn process_block_with_early_caching>( @@ -3257,6 +3319,31 @@ impl BeaconChain { self.process_availability(slot, availability).await } + /// Checks if the provided data column can make any cached blocks available, and imports immediately + /// if so, otherwise caches the data column in the data availability checker. + async fn check_gossip_data_columns_availability_and_import( + self: &Arc, + data_columns: Vec>, + ) -> Result> { + if let Some(slasher) = self.slasher.as_ref() { + for data_colum in &data_columns { + slasher.accept_block_header(data_colum.signed_block_header()); + } + } + + let Ok(slot) = data_columns.iter().map(|c| c.slot()).unique().exactly_one() else { + return Err(BlockError::InternalError( + "Columns for the same block should have matching slot".to_string(), + )); + }; + + let availability = self + .data_availability_checker + .put_gossip_data_columns(data_columns)?; + + self.process_availability(slot, availability).await + } + /// Checks if the provided blobs can make any cached blocks available, and imports immediately /// if so, otherwise caches the blob in the data availability checker. async fn check_rpc_blob_availability_and_import( diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index d906518ff5..5ae98cefbe 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -54,6 +54,7 @@ use crate::block_verification_types::{ AsBlock, BlockContentsError, BlockImportData, GossipVerifiedBlockContents, RpcBlock, }; use crate::data_availability_checker::{AvailabilityCheckError, MaybeAvailableBlock}; +use crate::data_column_verification::GossipDataColumnError; use crate::eth1_finalization_cache::Eth1FinalizationData; use crate::execution_payload::{ is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block, @@ -303,6 +304,13 @@ pub enum BlockError { /// TODO: We may need to penalize the peer that gave us a potentially invalid rpc blob. /// https://github.com/sigp/lighthouse/issues/4546 AvailabilityCheck(AvailabilityCheckError), + /// An internal error has occurred when processing the block or sidecars. + /// + /// ## Peer scoring + /// + /// We were unable to process this block due to an internal error. It's unclear if the block is + /// valid. + InternalError(String), } impl From for BlockError { @@ -523,6 +531,20 @@ impl BlockSlashInfo> { } } +impl BlockSlashInfo { + pub fn from_early_error_data_column( + header: SignedBeaconBlockHeader, + e: GossipDataColumnError, + ) -> Self { + match e { + GossipDataColumnError::ProposalSignatureInvalid => BlockSlashInfo::SignatureInvalid(e), + // `InvalidSignature` could indicate any signature in the block, so we want + // to recheck the proposer signature alone. + _ => BlockSlashInfo::SignatureNotChecked(header, e), + } + } +} + /// Process invalid blocks to see if they are suitable for the slasher. /// /// If no slasher is configured, this is a no-op. @@ -2007,6 +2029,23 @@ impl BlockBlobError for GossipBlobError { } } +impl BlockBlobError for GossipDataColumnError { + fn not_later_than_parent_error(data_column_slot: Slot, parent_slot: Slot) -> Self { + GossipDataColumnError::IsNotLaterThanParent { + data_column_slot, + parent_slot, + } + } + + fn unknown_validator_error(validator_index: u64) -> Self { + GossipDataColumnError::UnknownValidator(validator_index) + } + + fn proposer_signature_invalid() -> Self { + GossipDataColumnError::ProposalSignatureInvalid + } +} + /// Performs a cheap (time-efficient) state advancement so the committees and proposer shuffling for /// `slot` can be obtained from `state`. /// diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 90d4b6081e..fdba60a69a 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -20,6 +20,7 @@ mod error; mod overflow_lru_cache; mod state_lru_cache; +use crate::data_column_verification::GossipVerifiedDataColumn; pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory}; use types::non_zero_usize::new_non_zero_usize; @@ -188,6 +189,14 @@ impl DataAvailabilityChecker { ) } + pub fn put_gossip_data_columns( + &self, + _gossip_data_columns: Vec>, + ) -> Result, AvailabilityCheckError> { + // TODO(das) to be implemented + Err(AvailabilityCheckError::Unexpected) + } + /// Check if we have all the blobs for a block. Returns `Availability` which has information /// about whether all components have been received or more are required. pub fn put_pending_executed_block( diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index da848ccc47..2e88da8f6a 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -1,8 +1,144 @@ +use crate::block_verification::{process_block_slash_info, BlockSlashInfo}; +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; use derivative::Derivative; +use kzg::{Error as KzgError, Kzg}; use ssz_derive::{Decode, Encode}; use std::sync::Arc; -use types::data_column_sidecar::{ColumnIndex, DataColumnSidecar}; -use types::EthSpec; +use types::data_column_sidecar::{ColumnIndex, DataColumnIdentifier}; +use types::{ + BeaconStateError, DataColumnSidecar, EthSpec, Hash256, RuntimeVariableList, + SignedBeaconBlockHeader, Slot, +}; + +/// An error occurred while validating a gossip data column. +#[derive(Debug)] +pub enum GossipDataColumnError { + /// There was an error whilst processing the data column. It is not known if it is + /// valid or invalid. + /// + /// ## Peer scoring + /// + /// We were unable to process this data column due to an internal error. It's + /// unclear if the data column is valid. + BeaconChainError(BeaconChainError), + /// The proposal signature in invalid. + /// + /// ## Peer scoring + /// + /// The data column is invalid and the peer is faulty. + ProposalSignatureInvalid, + /// The proposal_index corresponding to data column.beacon_block_root is not known. + /// + /// ## Peer scoring + /// + /// The data column is invalid and the peer is faulty. + UnknownValidator(u64), + /// The provided data column is not from a later slot than its parent. + /// + /// ## Peer scoring + /// + /// The data column is invalid and the peer is faulty. + IsNotLaterThanParent { + data_column_slot: Slot, + parent_slot: Slot, + }, + /// `Kzg` struct hasn't been initialized. This is an internal error. + /// + /// ## Peer scoring + /// + /// The peer isn't faulty, This is an internal error. + KzgNotInitialized, + /// The kzg verification failed. + /// + /// ## Peer scoring + /// + /// The data column sidecar is invalid and the peer is faulty. + InvalidKzgProof(kzg::Error), +} + +impl From for GossipDataColumnError { + fn from(e: BeaconChainError) -> Self { + GossipDataColumnError::BeaconChainError(e) + } +} + +impl From for GossipDataColumnError { + fn from(e: BeaconStateError) -> Self { + GossipDataColumnError::BeaconChainError(BeaconChainError::BeaconStateError(e)) + } +} + +pub type GossipVerifiedDataColumnList = RuntimeVariableList>; + +/// A wrapper around a `DataColumnSidecar` that indicates it has been approved for re-gossiping on +/// the p2p network. +#[derive(Debug)] +pub struct GossipVerifiedDataColumn { + block_root: Hash256, + data_column: KzgVerifiedDataColumn, +} + +impl GossipVerifiedDataColumn { + pub fn new( + column_sidecar: Arc>, + subnet_id: u64, + chain: &BeaconChain, + ) -> Result { + let header = column_sidecar.signed_block_header.clone(); + // We only process slashing info if the gossip verification failed + // since we do not process the data column any further in that case. + validate_data_column_sidecar_for_gossip(column_sidecar, subnet_id, chain).map_err(|e| { + process_block_slash_info::<_, GossipDataColumnError>( + chain, + BlockSlashInfo::from_early_error_data_column(header, e), + ) + }) + } + + pub fn id(&self) -> DataColumnIdentifier { + DataColumnIdentifier { + block_root: self.block_root, + index: self.data_column.data_column_index(), + } + } + + pub fn block_root(&self) -> Hash256 { + self.block_root + } + + pub fn slot(&self) -> Slot { + self.data_column.data.slot() + } + + pub fn signed_block_header(&self) -> SignedBeaconBlockHeader { + self.data_column.data.signed_block_header.clone() + } +} + +/// Wrapper over a `DataColumnSidecar` for which we have completed kzg verification. +#[derive(Debug, Derivative, Clone, Encode, Decode)] +#[derivative(PartialEq, Eq)] +#[ssz(struct_behaviour = "transparent")] +pub struct KzgVerifiedDataColumn { + data: Arc>, +} + +impl KzgVerifiedDataColumn { + pub fn new(data_column: Arc>, kzg: &Kzg) -> Result { + verify_kzg_for_data_column(data_column, kzg) + } + pub fn as_data_column(&self) -> &DataColumnSidecar { + &self.data + } + /// This is cheap as we're calling clone on an Arc + pub fn clone_data_column(&self) -> Arc> { + self.data.clone() + } + + pub fn data_column_index(&self) -> u64 { + self.data.index + } +} /// Data column that we must custody and has completed kzg verification #[derive(Debug, Derivative, Clone, Encode, Decode)] @@ -17,3 +153,48 @@ impl KzgVerifiedCustodyDataColumn { self.data.index } } + +/// Complete kzg verification for a `DataColumnSidecar`. +/// +/// Returns an error if the kzg verification check fails. +pub fn verify_kzg_for_data_column( + data_column: Arc>, + _kzg: &Kzg, +) -> Result, KzgError> { + // TODO(das): KZG verification to be implemented + Ok(KzgVerifiedDataColumn { data: data_column }) +} + +/// Complete kzg verification for a list of `DataColumnSidecar`s. +/// Returns an error if any of the `DataColumnSidecar`s fails kzg verification. +/// +/// Note: This function should be preferred over calling `verify_kzg_for_data_column` +/// in a loop since this function kzg verifies a list of data columns more efficiently. +pub fn verify_kzg_for_data_column_list<'a, E: EthSpec, I>( + _data_column_iter: I, + _kzg: &'a Kzg, +) -> Result<(), KzgError> +where + I: Iterator>> + Clone, +{ + // TODO(das): implement KZG verification + Ok(()) +} + +pub fn validate_data_column_sidecar_for_gossip( + data_column: Arc>, + _subnet: u64, + chain: &BeaconChain, +) -> Result, GossipDataColumnError> { + // TODO(das): implement gossip verification + let kzg = chain + .kzg + .clone() + .ok_or(GossipDataColumnError::KzgNotInitialized)?; + let kzg_verified_data_column = verify_kzg_for_data_column(data_column.clone(), &kzg) + .map_err(GossipDataColumnError::InvalidKzgProof)?; + Ok(GossipVerifiedDataColumn { + block_root: data_column.block_root(), + data_column: kzg_verified_data_column, + }) +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 0fd000ff00..e1d0f61c58 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -19,7 +19,7 @@ pub mod canonical_head; pub mod capella_readiness; pub mod chain_config; pub mod data_availability_checker; -mod data_column_verification; +pub mod data_column_verification; pub mod deneb_readiness; mod early_attester_cache; pub mod electra_readiness; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 064b2b199f..ab547cb600 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1048,6 +1048,18 @@ lazy_static! { "blob_sidecar_inclusion_proof_computation_seconds", "Time taken to compute blob sidecar inclusion proof" ); + pub static ref DATA_COLUMN_SIDECAR_PROCESSING_REQUESTS: Result = try_create_int_counter( + "beacon_data_column_sidecar_processing_requests_total", + "Count of all data column sidecars submitted for processing" + ); + pub static ref DATA_COLUMN_SIDECAR_PROCESSING_SUCCESSES: Result = try_create_int_counter( + "beacon_data_column_sidecar_processing_successes_total", + "Number of data column sidecars verified for gossip" + ); + pub static ref DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES: Result = try_create_histogram( + "beacon_data_column_sidecar_gossip_verification_seconds", + "Full runtime of data column sidecars gossip verification" + ); } // Fifth lazy-static block is used to account for macro recursion limit. diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 5bf13d82b7..f491dc7ffb 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -112,6 +112,7 @@ pub struct BeaconProcessorQueueLengths { backfill_chain_segment: usize, gossip_block_queue: usize, gossip_blob_queue: usize, + gossip_data_column_queue: usize, delayed_block_queue: usize, status_queue: usize, bbrange_queue: usize, @@ -164,6 +165,7 @@ impl BeaconProcessorQueueLengths { backfill_chain_segment: 64, gossip_block_queue: 1024, gossip_blob_queue: 1024, + gossip_data_column_queue: 1024, delayed_block_queue: 1024, status_queue: 1024, bbrange_queue: 1024, @@ -209,6 +211,7 @@ pub const GOSSIP_AGGREGATE: &str = "gossip_aggregate"; pub const GOSSIP_AGGREGATE_BATCH: &str = "gossip_aggregate_batch"; pub const GOSSIP_BLOCK: &str = "gossip_block"; pub const GOSSIP_BLOBS_SIDECAR: &str = "gossip_blobs_sidecar"; +pub const GOSSIP_BLOBS_COLUMN_SIDECAR: &str = "gossip_blobs_column_sidecar"; pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block"; pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit"; pub const GOSSIP_PROPOSER_SLASHING: &str = "gossip_proposer_slashing"; @@ -577,6 +580,7 @@ pub enum Work { }, GossipBlock(AsyncFn), GossipBlobSidecar(AsyncFn), + GossipDataColumnSidecar(AsyncFn), DelayedImportBlock { beacon_block_slot: Slot, beacon_block_root: Hash256, @@ -629,6 +633,7 @@ impl Work { Work::GossipAggregateBatch { .. } => GOSSIP_AGGREGATE_BATCH, Work::GossipBlock(_) => GOSSIP_BLOCK, Work::GossipBlobSidecar(_) => GOSSIP_BLOBS_SIDECAR, + Work::GossipDataColumnSidecar(_) => GOSSIP_BLOBS_COLUMN_SIDECAR, Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK, Work::GossipVoluntaryExit(_) => GOSSIP_VOLUNTARY_EXIT, Work::GossipProposerSlashing(_) => GOSSIP_PROPOSER_SLASHING, @@ -803,6 +808,7 @@ impl BeaconProcessor { let mut backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment); let mut gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue); let mut gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue); + let mut gossip_data_column_queue = FifoQueue::new(queue_lengths.gossip_data_column_queue); let mut delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue); let mut status_queue = FifoQueue::new(queue_lengths.status_queue); @@ -961,6 +967,8 @@ impl BeaconProcessor { self.spawn_worker(item, idle_tx); } else if let Some(item) = gossip_blob_queue.pop() { self.spawn_worker(item, idle_tx); + } else if let Some(item) = gossip_data_column_queue.pop() { + self.spawn_worker(item, idle_tx); // Check the priority 0 API requests after blocks and blobs, but before attestations. } else if let Some(item) = api_request_p0_queue.pop() { self.spawn_worker(item, idle_tx); @@ -1208,6 +1216,9 @@ impl BeaconProcessor { Work::GossipBlobSidecar { .. } => { gossip_blob_queue.push(work, work_id, &self.log) } + Work::GossipDataColumnSidecar { .. } => { + gossip_data_column_queue.push(work, work_id, &self.log) + } Work::DelayedImportBlock { .. } => { delayed_block_queue.push(work, work_id, &self.log) } @@ -1312,6 +1323,10 @@ impl BeaconProcessor { &metrics::BEACON_PROCESSOR_GOSSIP_BLOB_QUEUE_TOTAL, gossip_blob_queue.len() as i64, ); + metrics::set_gauge( + &metrics::BEACON_PROCESSOR_GOSSIP_DATA_COLUMN_QUEUE_TOTAL, + gossip_data_column_queue.len() as i64, + ); metrics::set_gauge( &metrics::BEACON_PROCESSOR_RPC_BLOCK_QUEUE_TOTAL, rpc_block_queue.len() as i64, @@ -1463,11 +1478,11 @@ impl BeaconProcessor { task_spawner.spawn_async(process_fn) } Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn), - Work::GossipBlock(work) | Work::GossipBlobSidecar(work) => { - task_spawner.spawn_async(async move { - work.await; - }) - } + Work::GossipBlock(work) + | Work::GossipBlobSidecar(work) + | Work::GossipDataColumnSidecar(work) => task_spawner.spawn_async(async move { + work.await; + }), Work::BlobsByRangeRequest(process_fn) | Work::BlobsByRootsRequest(process_fn) => { task_spawner.spawn_blocking(process_fn) } diff --git a/beacon_node/beacon_processor/src/metrics.rs b/beacon_node/beacon_processor/src/metrics.rs index fa7d7d7b9a..bcd422b357 100644 --- a/beacon_node/beacon_processor/src/metrics.rs +++ b/beacon_node/beacon_processor/src/metrics.rs @@ -51,6 +51,11 @@ lazy_static::lazy_static! { "beacon_processor_gossip_blob_queue_total", "Count of blobs from gossip waiting to be verified." ); + // Gossip data column sidecars. + pub static ref BEACON_PROCESSOR_GOSSIP_DATA_COLUMN_QUEUE_TOTAL: Result = try_create_int_gauge( + "beacon_processor_gossip_data_column_queue_total", + "Count of data column sidecars from gossip waiting to be verified." + ); // Gossip Exits. pub static ref BEACON_PROCESSOR_EXIT_QUEUE_TOTAL: Result = try_create_int_gauge( "beacon_processor_exit_queue_total", diff --git a/beacon_node/lighthouse_network/Cargo.toml b/beacon_node/lighthouse_network/Cargo.toml index 56a8fe99c7..3dfa24d467 100644 --- a/beacon_node/lighthouse_network/Cargo.toml +++ b/beacon_node/lighthouse_network/Cargo.toml @@ -43,6 +43,7 @@ unused_port = { workspace = true } delay_map = { workspace = true } bytes = { workspace = true } either = { workspace = true } +itertools = { workspace = true } # Local dependencies void = "1.0.2" diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 73f51c001a..017db26049 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -43,7 +43,7 @@ use std::{ time::{Duration, Instant}, }; use tokio::sync::mpsc; -use types::{EnrForkId, EthSpec}; +use types::{ChainSpec, EnrForkId, EthSpec}; mod subnet_predicate; pub use subnet_predicate::subnet_predicate; @@ -192,6 +192,7 @@ pub struct Discovery { /// Logger for the discovery behaviour. log: slog::Logger, + spec: Arc, } impl Discovery { @@ -201,6 +202,7 @@ impl Discovery { config: &NetworkConfig, network_globals: Arc>, log: &slog::Logger, + spec: &ChainSpec, ) -> error::Result { let log = log.clone(); @@ -325,6 +327,7 @@ impl Discovery { update_ports, log, enr_dir, + spec: Arc::new(spec.clone()), }) } @@ -548,6 +551,8 @@ impl Discovery { ) .map_err(|e| format!("{:?}", e))?; } + // Data column subnets are computed from node ID. No subnet bitfield in the ENR. + Subnet::DataColumn(_) => return Ok(()), } // replace the global version @@ -753,7 +758,8 @@ impl Discovery { // Only start a discovery query if we have a subnet to look for. if !filtered_subnet_queries.is_empty() { // build the subnet predicate as a combination of the eth2_fork_predicate and the subnet predicate - let subnet_predicate = subnet_predicate::(filtered_subnets, &self.log); + let subnet_predicate = + subnet_predicate::(filtered_subnets, &self.log, self.spec.clone()); debug!( self.log, @@ -867,6 +873,7 @@ impl Discovery { let query_str = match query.subnet { Subnet::Attestation(_) => "attestation", Subnet::SyncCommittee(_) => "sync_committee", + Subnet::DataColumn(_) => "data_column", }; if let Some(v) = metrics::get_int_counter( @@ -879,8 +886,11 @@ impl Discovery { self.add_subnet_query(query.subnet, query.min_ttl, query.retries + 1); // Check the specific subnet against the enr - let subnet_predicate = - subnet_predicate::(vec![query.subnet], &self.log); + let subnet_predicate = subnet_predicate::( + vec![query.subnet], + &self.log, + self.spec.clone(), + ); r.clone() .into_iter() @@ -1194,6 +1204,7 @@ mod tests { } async fn build_discovery() -> Discovery { + let spec = ChainSpec::default(); let keypair = secp256k1::Keypair::generate(); let mut config = NetworkConfig::default(); config.set_listening_addr(crate::ListenAddress::unused_v4_ports()); @@ -1212,7 +1223,7 @@ mod tests { &log, ); let keypair = keypair.into(); - Discovery::new(keypair, &config, Arc::new(globals), &log) + Discovery::new(keypair, &config, Arc::new(globals), &log, &spec) .await .unwrap() } diff --git a/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs b/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs index 1aabf12b72..b53afe556d 100644 --- a/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs +++ b/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs @@ -1,11 +1,17 @@ //! The subnet predicate used for searching for a particular subnet. use super::*; use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield}; +use itertools::Itertools; use slog::trace; use std::ops::Deref; +use types::{ChainSpec, DataColumnSubnetId}; /// Returns the predicate for a given subnet. -pub fn subnet_predicate(subnets: Vec, log: &slog::Logger) -> impl Fn(&Enr) -> bool + Send +pub fn subnet_predicate( + subnets: Vec, + log: &slog::Logger, + spec: Arc, +) -> impl Fn(&Enr) -> bool + Send where E: EthSpec, { @@ -19,10 +25,13 @@ where }; // Pre-fork/fork-boundary enrs may not contain a syncnets field. - // Don't return early here + // Don't return early here. let sync_committee_bitfield: Result, _> = enr.sync_committee_bitfield::(); + // TODO(das): compute from enr + let custody_subnet_count = spec.custody_requirement; + let predicate = subnets.iter().any(|subnet| match subnet { Subnet::Attestation(s) => attestation_bitfield .get(*s.deref() as usize) @@ -30,6 +39,14 @@ where Subnet::SyncCommittee(s) => sync_committee_bitfield .as_ref() .map_or(false, |b| b.get(*s.deref() as usize).unwrap_or(false)), + Subnet::DataColumn(s) => { + let mut subnets = DataColumnSubnetId::compute_custody_subnets::( + enr.node_id().raw().into(), + custody_subnet_count, + &spec, + ); + subnets.contains(s) + } }); if !predicate { diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 0d9a7c60dd..c86c2098d6 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -1027,6 +1027,10 @@ impl PeerManager { .or_default() .insert(id); } + // TODO(das) to be implemented. We're not pruning data column peers yet + // because data column topics are subscribed as core topics until we + // implement recomputing data column subnets. + Subnet::DataColumn(_) => {} } } } diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs index ab113a1a04..0745cc2600 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs @@ -94,6 +94,15 @@ impl PeerInfo { .syncnets() .map_or(false, |s| s.get(**id as usize).unwrap_or(false)) } + Subnet::DataColumn(_) => { + // TODO(das): Pending spec PR https://github.com/ethereum/consensus-specs/pull/3821 + // We should use MetaDataV3 for peer selection rather than + // looking at subscribed peers (current behavior). Until MetaDataV3 is + // implemented, this is the perhaps the only viable option on the current devnet + // as the peer count is low and it's important to identify supernodes to get a + // good distribution of peers across subnets. + return true; + } } } false diff --git a/beacon_node/lighthouse_network/src/service/gossip_cache.rs b/beacon_node/lighthouse_network/src/service/gossip_cache.rs index 158c7a994a..0ad31ff2e8 100644 --- a/beacon_node/lighthouse_network/src/service/gossip_cache.rs +++ b/beacon_node/lighthouse_network/src/service/gossip_cache.rs @@ -22,6 +22,8 @@ pub struct GossipCache { beacon_block: Option, /// Timeout for blobs. blob_sidecar: Option, + /// Timeout for data columns. + data_column_sidecar: Option, /// Timeout for aggregate attestations. aggregates: Option, /// Timeout for attestations. @@ -51,6 +53,8 @@ pub struct GossipCacheBuilder { beacon_block: Option, /// Timeout for blob sidecars. blob_sidecar: Option, + /// Timeout for data column sidecars. + data_column_sidecar: Option, /// Timeout for aggregate attestations. aggregates: Option, /// Timeout for attestations. @@ -152,6 +156,7 @@ impl GossipCacheBuilder { default_timeout, beacon_block, blob_sidecar, + data_column_sidecar, aggregates, attestation, voluntary_exit, @@ -168,6 +173,7 @@ impl GossipCacheBuilder { topic_msgs: HashMap::default(), beacon_block: beacon_block.or(default_timeout), blob_sidecar: blob_sidecar.or(default_timeout), + data_column_sidecar: data_column_sidecar.or(default_timeout), aggregates: aggregates.or(default_timeout), attestation: attestation.or(default_timeout), voluntary_exit: voluntary_exit.or(default_timeout), @@ -194,6 +200,7 @@ impl GossipCache { let expire_timeout = match topic.kind() { GossipKind::BeaconBlock => self.beacon_block, GossipKind::BlobSidecar(_) => self.blob_sidecar, + GossipKind::DataColumnSidecar(_) => self.data_column_sidecar, GossipKind::BeaconAggregateAndProof => self.aggregates, GossipKind::Attestation(_) => self.attestation, GossipKind::VoluntaryExit => self.voluntary_exit, diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index fadc8bd895..aaf9dda523 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -39,10 +39,10 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use types::ForkName; use types::{ consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId, }; +use types::{ChainSpec, ForkName}; use utils::{build_transport, strip_peer_id, Context as ServiceContext, MAX_CONNECTIONS_PER_PEER}; pub mod api_types; @@ -327,6 +327,7 @@ impl Network { &config, network_globals.clone(), &log, + ctx.chain_spec, ) .await?; // start searching for peers @@ -1018,6 +1019,7 @@ impl Network { return; } + let spec = Arc::new(self.fork_context.spec.clone()); let filtered: Vec = subnets_to_discover .into_iter() .filter(|s| { @@ -1053,7 +1055,7 @@ impl Network { // If we connect to the cached peers before the discovery query starts, then we potentially // save a costly discovery query. } else { - self.dial_cached_enrs_in_subnet(s.subnet); + self.dial_cached_enrs_in_subnet(s.subnet, spec.clone()); true } }) @@ -1217,8 +1219,8 @@ impl Network { /// Dial cached Enrs in discovery service that are in the given `subnet_id` and aren't /// in Connected, Dialing or Banned state. - fn dial_cached_enrs_in_subnet(&mut self, subnet: Subnet) { - let predicate = subnet_predicate::(vec![subnet], &self.log); + fn dial_cached_enrs_in_subnet(&mut self, subnet: Subnet, spec: Arc) { + let predicate = subnet_predicate::(vec![subnet], &self.log, spec); let peers_to_dial: Vec = self .discovery() .cached_enrs() diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index b443ecd1b9..1bc99f9a6c 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -8,13 +8,13 @@ use std::io::{Error, ErrorKind}; use std::sync::Arc; use types::{ Attestation, AttestationBase, AttestationElectra, AttesterSlashing, AttesterSlashingBase, - AttesterSlashingElectra, BlobSidecar, EthSpec, ForkContext, ForkName, - LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing, - SignedAggregateAndProof, SignedAggregateAndProofBase, SignedAggregateAndProofElectra, - SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockBellatrix, - SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockElectra, - SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, - SyncCommitteeMessage, SyncSubnetId, + AttesterSlashingElectra, BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec, + ForkContext, ForkName, LightClientFinalityUpdate, LightClientOptimisticUpdate, + ProposerSlashing, SignedAggregateAndProof, SignedAggregateAndProofBase, + SignedAggregateAndProofElectra, SignedBeaconBlock, SignedBeaconBlockAltair, + SignedBeaconBlockBase, SignedBeaconBlockBellatrix, SignedBeaconBlockCapella, + SignedBeaconBlockDeneb, SignedBeaconBlockElectra, SignedBlsToExecutionChange, + SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; #[derive(Debug, Clone, PartialEq)] @@ -23,6 +23,8 @@ pub enum PubsubMessage { BeaconBlock(Arc>), /// Gossipsub message providing notification of a [`BlobSidecar`] along with the subnet id where it was received. BlobSidecar(Box<(u64, Arc>)>), + /// Gossipsub message providing notification of a [`DataColumnSidecar`] along with the subnet id where it was received. + DataColumnSidecar(Box<(DataColumnSubnetId, Arc>)>), /// Gossipsub message providing notification of a Aggregate attestation and associated proof. AggregateAndProofAttestation(Box>), /// Gossipsub message providing notification of a raw un-aggregated attestation with its shard id. @@ -119,6 +121,9 @@ impl PubsubMessage { PubsubMessage::BlobSidecar(blob_sidecar_data) => { GossipKind::BlobSidecar(blob_sidecar_data.0) } + PubsubMessage::DataColumnSidecar(column_sidecar_data) => { + GossipKind::DataColumnSidecar(column_sidecar_data.0) + } PubsubMessage::AggregateAndProofAttestation(_) => GossipKind::BeaconAggregateAndProof, PubsubMessage::Attestation(attestation_data) => { GossipKind::Attestation(attestation_data.0) @@ -270,6 +275,36 @@ impl PubsubMessage { )), } } + GossipKind::DataColumnSidecar(subnet_id) => { + match fork_context.from_context_bytes(gossip_topic.fork_digest) { + // TODO(das): Remove Deneb fork + Some(fork) if fork.deneb_enabled() => { + let col_sidecar = Arc::new( + DataColumnSidecar::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?, + ); + let peer_das_enabled = + fork_context.spec.is_peer_das_enabled_for_epoch( + col_sidecar.slot().epoch(E::slots_per_epoch()), + ); + if peer_das_enabled { + Ok(PubsubMessage::DataColumnSidecar(Box::new(( + *subnet_id, + col_sidecar, + )))) + } else { + Err(format!( + "data_column_sidecar topic invalid for given fork digest {:?}", + gossip_topic.fork_digest + )) + } + } + Some(_) | None => Err(format!( + "data_column_sidecar topic invalid for given fork digest {:?}", + gossip_topic.fork_digest + )), + } + } GossipKind::VoluntaryExit => { let voluntary_exit = SignedVoluntaryExit::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; @@ -373,6 +408,7 @@ impl PubsubMessage { match &self { PubsubMessage::BeaconBlock(data) => data.as_ssz_bytes(), PubsubMessage::BlobSidecar(data) => data.1.as_ssz_bytes(), + PubsubMessage::DataColumnSidecar(data) => data.1.as_ssz_bytes(), PubsubMessage::AggregateAndProofAttestation(data) => data.as_ssz_bytes(), PubsubMessage::VoluntaryExit(data) => data.as_ssz_bytes(), PubsubMessage::ProposerSlashing(data) => data.as_ssz_bytes(), @@ -402,6 +438,12 @@ impl std::fmt::Display for PubsubMessage { data.1.slot(), data.1.index, ), + PubsubMessage::DataColumnSidecar(data) => write!( + f, + "DataColumnSidecar: slot: {}, column index: {}", + data.1.slot(), + data.1.index, + ), PubsubMessage::AggregateAndProofAttestation(att) => write!( f, "Aggregate and Proof: slot: {}, index: {:?}, aggregator_index: {}", diff --git a/beacon_node/lighthouse_network/src/types/subnet.rs b/beacon_node/lighthouse_network/src/types/subnet.rs index 50d28542be..1892dcc83a 100644 --- a/beacon_node/lighthouse_network/src/types/subnet.rs +++ b/beacon_node/lighthouse_network/src/types/subnet.rs @@ -1,6 +1,6 @@ use serde::Serialize; use std::time::Instant; -use types::{SubnetId, SyncSubnetId}; +use types::{DataColumnSubnetId, SubnetId, SyncSubnetId}; /// Represents a subnet on an attestation or sync committee `SubnetId`. /// @@ -12,6 +12,8 @@ pub enum Subnet { Attestation(SubnetId), /// Represents a gossipsub sync committee subnet and the metadata `syncnets` field. SyncCommittee(SyncSubnetId), + /// Represents a gossipsub data column subnet. + DataColumn(DataColumnSubnetId), } /// A subnet to discover peers on along with the instant after which it's no longer useful. diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index c5f4b0c9eb..174787f999 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -1,7 +1,7 @@ use gossipsub::{IdentTopic as Topic, TopicHash}; use serde::{Deserialize, Serialize}; use strum::AsRefStr; -use types::{ChainSpec, EthSpec, ForkName, SubnetId, SyncSubnetId, Unsigned}; +use types::{ChainSpec, DataColumnSubnetId, EthSpec, ForkName, SubnetId, SyncSubnetId, Unsigned}; use crate::Subnet; @@ -14,6 +14,7 @@ pub const BEACON_BLOCK_TOPIC: &str = "beacon_block"; pub const BEACON_AGGREGATE_AND_PROOF_TOPIC: &str = "beacon_aggregate_and_proof"; pub const BEACON_ATTESTATION_PREFIX: &str = "beacon_attestation_"; pub const BLOB_SIDECAR_PREFIX: &str = "blob_sidecar_"; +pub const DATA_COLUMN_SIDECAR_PREFIX: &str = "data_column_sidecar_"; pub const VOLUNTARY_EXIT_TOPIC: &str = "voluntary_exit"; pub const PROPOSER_SLASHING_TOPIC: &str = "proposer_slashing"; pub const ATTESTER_SLASHING_TOPIC: &str = "attester_slashing"; @@ -112,6 +113,8 @@ pub enum GossipKind { BeaconAggregateAndProof, /// Topic for publishing BlobSidecars. BlobSidecar(u64), + /// Topic for publishing DataColumnSidecars. + DataColumnSidecar(DataColumnSubnetId), /// Topic for publishing raw attestations on a particular subnet. #[strum(serialize = "beacon_attestation")] Attestation(SubnetId), @@ -144,6 +147,9 @@ impl std::fmt::Display for GossipKind { GossipKind::BlobSidecar(blob_index) => { write!(f, "{}{}", BLOB_SIDECAR_PREFIX, blob_index) } + GossipKind::DataColumnSidecar(column_index) => { + write!(f, "{}{}", DATA_COLUMN_SIDECAR_PREFIX, **column_index) + } x => f.write_str(x.as_ref()), } } @@ -231,6 +237,7 @@ impl GossipTopic { match self.kind() { GossipKind::Attestation(subnet_id) => Some(Subnet::Attestation(*subnet_id)), GossipKind::SyncCommitteeMessage(subnet_id) => Some(Subnet::SyncCommittee(*subnet_id)), + GossipKind::DataColumnSidecar(subnet_id) => Some(Subnet::DataColumn(*subnet_id)), _ => None, } } @@ -269,6 +276,9 @@ impl std::fmt::Display for GossipTopic { GossipKind::BlobSidecar(blob_index) => { format!("{}{}", BLOB_SIDECAR_PREFIX, blob_index) } + GossipKind::DataColumnSidecar(index) => { + format!("{}{}", DATA_COLUMN_SIDECAR_PREFIX, *index) + } GossipKind::BlsToExecutionChange => BLS_TO_EXECUTION_CHANGE_TOPIC.into(), GossipKind::LightClientFinalityUpdate => LIGHT_CLIENT_FINALITY_UPDATE.into(), GossipKind::LightClientOptimisticUpdate => LIGHT_CLIENT_OPTIMISTIC_UPDATE.into(), @@ -289,6 +299,7 @@ impl From for GossipKind { match subnet_id { Subnet::Attestation(s) => GossipKind::Attestation(s), Subnet::SyncCommittee(s) => GossipKind::SyncCommitteeMessage(s), + Subnet::DataColumn(s) => GossipKind::DataColumnSidecar(s), } } } @@ -312,6 +323,10 @@ fn subnet_topic_index(topic: &str) -> Option { ))); } else if let Some(index) = topic.strip_prefix(BLOB_SIDECAR_PREFIX) { return Some(GossipKind::BlobSidecar(index.parse::().ok()?)); + } else if let Some(index) = topic.strip_prefix(DATA_COLUMN_SIDECAR_PREFIX) { + return Some(GossipKind::DataColumnSidecar(DataColumnSubnetId::new( + index.parse::().ok()?, + ))); } None } diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index f0dba8d965..0fadb51edb 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -72,6 +72,10 @@ lazy_static! { "beacon_processor_gossip_blob_verified_total", "Total number of gossip blob verified for propagation." ); + pub static ref BEACON_PROCESSOR_GOSSIP_DATA_COLUMN_SIDECAR_VERIFIED_TOTAL: Result = try_create_int_counter( + "beacon_processor_gossip_data_column_verified_total", + "Total number of gossip data column sidecar verified for propagation." + ); // Gossip Exits. pub static ref BEACON_PROCESSOR_EXIT_VERIFIED_TOTAL: Result = try_create_int_counter( "beacon_processor_exit_verified_total", @@ -357,6 +361,22 @@ lazy_static! { "Count of times when a gossip blob arrived from the network later than the attestation deadline.", ); + pub static ref BEACON_DATA_COLUMN_DELAY_GOSSIP: Result = try_create_int_gauge( + "beacon_data_column_delay_gossip_last_delay", + "The first time we see this data column as a delay from the start of the slot" + ); + + pub static ref BEACON_DATA_COLUMN_DELAY_GOSSIP_VERIFICATION: Result = try_create_int_gauge( + "beacon_data_column_delay_gossip_verification", + "Keeps track of the time delay from the start of the slot to the point we propagate the data column" + ); + + pub static ref BEACON_DATA_COLUMN_DELAY_FULL_VERIFICATION: Result = try_create_int_gauge( + "beacon_data_column_last_full_verification_delay", + "The time it takes to verify a beacon data column" + ); + + /* * Light client update reprocessing queue metrics. */ diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index ab25053258..781c447f81 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -6,6 +6,7 @@ use crate::{ }; use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::AsBlock; +use beacon_chain::data_column_verification::GossipVerifiedDataColumn; use beacon_chain::store::Error; use beacon_chain::{ attestation_verification::{self, Error as AttnError, VerifiedAttestation}, @@ -32,8 +33,9 @@ use store::hot_cold_store::HotColdDBError; use tokio::sync::mpsc; use types::{ beacon_block::BlockImportSource, Attestation, AttestationRef, AttesterSlashing, BlobSidecar, - EthSpec, Hash256, IndexedAttestation, LightClientFinalityUpdate, LightClientOptimisticUpdate, - ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBlsToExecutionChange, + DataColumnSidecar, DataColumnSubnetId, EthSpec, Hash256, IndexedAttestation, + LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing, + SignedAggregateAndProof, SignedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; @@ -599,6 +601,67 @@ impl NetworkBeaconProcessor { } } + pub async fn process_gossip_data_column_sidecar( + self: &Arc, + message_id: MessageId, + peer_id: PeerId, + _peer_client: Client, + subnet_id: DataColumnSubnetId, + column_sidecar: Arc>, + seen_duration: Duration, + ) { + let slot = column_sidecar.slot(); + let block_root = column_sidecar.block_root(); + let index = column_sidecar.index; + let delay = get_slot_delay_ms(seen_duration, slot, &self.chain.slot_clock); + // Log metrics to track delay from other nodes on the network. + metrics::set_gauge( + &metrics::BEACON_DATA_COLUMN_DELAY_GOSSIP, + delay.as_millis() as i64, + ); + match self + .chain + .verify_data_column_sidecar_for_gossip(column_sidecar, *subnet_id) + { + Ok(gossip_verified_data_column) => { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_GOSSIP_DATA_COLUMN_SIDECAR_VERIFIED_TOTAL, + ); + + debug!( + self.log, + "Successfully verified gossip data column sidecar"; + "slot" => %slot, + "block_root" => %block_root, + "index" => %index, + ); + + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + + // Log metrics to keep track of propagation delay times. + if let Some(duration) = SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok() + .and_then(|now| now.checked_sub(seen_duration)) + { + metrics::set_gauge( + &metrics::BEACON_DATA_COLUMN_DELAY_GOSSIP_VERIFICATION, + duration.as_millis() as i64, + ); + } + self.process_gossip_verified_data_column( + peer_id, + gossip_verified_data_column, + seen_duration, + ) + .await + } + Err(_) => { + // TODO(das) implement gossip error handling + } + } + } + #[allow(clippy::too_many_arguments)] pub async fn process_gossip_blob( self: &Arc, @@ -837,6 +900,81 @@ impl NetworkBeaconProcessor { } } + pub async fn process_gossip_verified_data_column( + self: &Arc, + peer_id: PeerId, + verified_data_column: GossipVerifiedDataColumn, + // This value is not used presently, but it might come in handy for debugging. + _seen_duration: Duration, + ) { + let processing_start_time = Instant::now(); + let block_root = verified_data_column.block_root(); + let data_column_slot = verified_data_column.slot(); + let data_column_index = verified_data_column.id().index; + + match self + .chain + .process_gossip_data_columns(vec![verified_data_column]) + .await + { + Ok(availability) => { + match availability { + AvailabilityProcessingStatus::Imported(block_root) => { + // Note: Reusing block imported metric here + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL, + ); + info!( + self.log, + "Gossipsub data column processed, imported fully available block"; + "block_root" => %block_root + ); + self.chain.recompute_head_at_current_slot().await; + + metrics::set_gauge( + &metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION, + processing_start_time.elapsed().as_millis() as i64, + ); + } + AvailabilityProcessingStatus::MissingComponents(slot, block_root) => { + trace!( + self.log, + "Processed data column, waiting for other components"; + "slot" => %slot, + "data_column_index" => %data_column_index, + "block_root" => %block_root, + ); + + // Potentially trigger reconstruction + } + } + } + Err(BlockError::BlockIsAlreadyKnown(_)) => { + debug!( + self.log, + "Ignoring gossip column already imported"; + "block_root" => ?block_root, + "data_column_index" => data_column_index, + ); + } + Err(err) => { + debug!( + self.log, + "Invalid gossip data column"; + "outcome" => ?err, + "block root" => ?block_root, + "block slot" => data_column_slot, + "data column index" => data_column_index, + ); + self.gossip_penalize_peer( + peer_id, + PeerAction::MidToleranceError, + "bad_gossip_data_column_ssz", + ); + } + } + } + /// Process the beacon block received from the gossip network and: /// /// - If it passes gossip propagation criteria, tell the network thread to forward it. @@ -1086,6 +1224,12 @@ impl NetworkBeaconProcessor { ); return None; } + Err(e @ BlockError::InternalError(_)) => { + error!(self.log, "Internal block gossip validation error"; + "error" => %e + ); + return None; + } }; metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL); diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index ccdbb10720..ffb01a99ef 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -223,6 +223,36 @@ impl NetworkBeaconProcessor { }) } + /// Create a new `Work` event for some data column sidecar. + pub fn send_gossip_data_column_sidecar( + self: &Arc, + message_id: MessageId, + peer_id: PeerId, + peer_client: Client, + subnet_id: DataColumnSubnetId, + column_sidecar: Arc>, + seen_timestamp: Duration, + ) -> Result<(), Error> { + let processor = self.clone(); + let process_fn = async move { + processor + .process_gossip_data_column_sidecar( + message_id, + peer_id, + peer_client, + subnet_id, + column_sidecar, + seen_timestamp, + ) + .await + }; + + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::GossipDataColumnSidecar(Box::pin(process_fn)), + }) + } + /// Create a new `Work` event for some sync committee signature. pub fn send_gossip_sync_signature( self: &Arc, diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index e125c13f4c..c162d52d02 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -319,6 +319,20 @@ impl Router { ), ) } + PubsubMessage::DataColumnSidecar(data) => { + let (subnet_id, column_sidecar) = *data; + self.handle_beacon_processor_send_result( + self.network_beacon_processor + .send_gossip_data_column_sidecar( + message_id, + peer_id, + self.network_globals.client(&peer_id), + subnet_id, + column_sidecar, + timestamp_now(), + ), + ) + } PubsubMessage::VoluntaryExit(exit) => { debug!(self.log, "Received a voluntary exit"; "peer_id" => %peer_id); self.handle_beacon_processor_send_result( diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index dd4fa56d53..c9894c8b24 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -63,7 +63,7 @@ use std::ops::Sub; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot}; +use types::{BlobSidecar, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a @@ -107,6 +107,9 @@ pub enum SyncMessage { /// A blob with an unknown parent has been received. UnknownParentBlob(PeerId, Arc>), + /// A data column with an unknown parent has been received. + UnknownParentDataColumn(PeerId, Arc>), + /// A peer has sent an attestation that references a block that is unknown. This triggers the /// manager to attempt to find the block matching the unknown hash. UnknownBlockHashFromAttestation(PeerId, Hash256), @@ -646,6 +649,9 @@ impl SyncManager { }), ); } + SyncMessage::UnknownParentDataColumn(_peer_id, _data_column) => { + // TODO(das): data column parent lookup to be implemented + } SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_root) => { if !self.notified_unknown_roots.contains(&(peer_id, block_root)) { self.notified_unknown_roots.insert((peer_id, block_root)); diff --git a/consensus/types/src/data_column_subnet_id.rs b/consensus/types/src/data_column_subnet_id.rs new file mode 100644 index 0000000000..dd58c6c36b --- /dev/null +++ b/consensus/types/src/data_column_subnet_id.rs @@ -0,0 +1,198 @@ +//! Identifies each data column subnet by an integer identifier. +use crate::data_column_sidecar::ColumnIndex; +use crate::{ChainSpec, EthSpec}; +use ethereum_types::U256; +use itertools::Itertools; +use safe_arith::{ArithError, SafeArith}; +use serde::{Deserialize, Serialize}; +use std::collections::HashSet; +use std::fmt::{self, Display}; +use std::ops::{Deref, DerefMut}; + +#[derive(arbitrary::Arbitrary, Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(transparent)] +pub struct DataColumnSubnetId(#[serde(with = "serde_utils::quoted_u64")] u64); + +impl DataColumnSubnetId { + pub fn new(id: u64) -> Self { + id.into() + } + + pub fn from_column_index(column_index: usize, spec: &ChainSpec) -> Self { + (column_index + .safe_rem(spec.data_column_sidecar_subnet_count as usize) + .expect( + "data_column_sidecar_subnet_count should never be zero if this function is called", + ) as u64) + .into() + } + + #[allow(clippy::arithmetic_side_effects)] + pub fn columns(&self, spec: &ChainSpec) -> impl Iterator { + let subnet = self.0; + let data_column_sidecar_subnet = spec.data_column_sidecar_subnet_count; + let columns_per_subnet = spec.data_columns_per_subnet() as u64; + (0..columns_per_subnet).map(move |i| data_column_sidecar_subnet * i + subnet) + } + + /// Compute required subnets to subscribe to given the node id. + #[allow(clippy::arithmetic_side_effects)] + pub fn compute_custody_subnets( + node_id: U256, + custody_subnet_count: u64, + spec: &ChainSpec, + ) -> impl Iterator { + // TODO(das): we could perform check on `custody_subnet_count` here to ensure that it is a valid + // value, but here we assume it is valid. + + let mut subnets: HashSet = HashSet::new(); + let mut current_id = node_id; + while (subnets.len() as u64) < custody_subnet_count { + let mut node_id_bytes = [0u8; 32]; + current_id.to_little_endian(&mut node_id_bytes); + let hash = ethereum_hashing::hash_fixed(&node_id_bytes); + let hash_prefix: [u8; 8] = hash[0..8] + .try_into() + .expect("hash_fixed produces a 32 byte array"); + let hash_prefix_u64 = u64::from_le_bytes(hash_prefix); + let subnet = hash_prefix_u64 % spec.data_column_sidecar_subnet_count; + + if !subnets.contains(&subnet) { + subnets.insert(subnet); + } + + if current_id == U256::MAX { + current_id = U256::zero() + } + current_id += U256::one() + } + subnets.into_iter().map(DataColumnSubnetId::new) + } + + pub fn compute_custody_columns( + node_id: U256, + custody_subnet_count: u64, + spec: &ChainSpec, + ) -> impl Iterator { + Self::compute_custody_subnets::(node_id, custody_subnet_count, spec) + .flat_map(|subnet| subnet.columns::(spec)) + .sorted() + } +} + +impl Display for DataColumnSubnetId { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(f, "{}", self.0) + } +} + +impl Deref for DataColumnSubnetId { + type Target = u64; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for DataColumnSubnetId { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl From for DataColumnSubnetId { + fn from(x: u64) -> Self { + Self(x) + } +} + +impl From for u64 { + fn from(val: DataColumnSubnetId) -> Self { + val.0 + } +} + +impl From<&DataColumnSubnetId> for u64 { + fn from(val: &DataColumnSubnetId) -> Self { + val.0 + } +} + +#[derive(Debug)] +pub enum Error { + ArithError(ArithError), +} + +impl From for Error { + fn from(e: ArithError) -> Self { + Error::ArithError(e) + } +} + +#[cfg(test)] +mod test { + use crate::data_column_subnet_id::DataColumnSubnetId; + use crate::EthSpec; + use crate::MainnetEthSpec; + + type E = MainnetEthSpec; + + #[test] + fn test_compute_subnets_for_data_column() { + let spec = E::default_spec(); + let node_ids = [ + "0", + "88752428858350697756262172400162263450541348766581994718383409852729519486397", + "18732750322395381632951253735273868184515463718109267674920115648614659369468", + "27726842142488109545414954493849224833670205008410190955613662332153332462900", + "39755236029158558527862903296867805548949739810920318269566095185775868999998", + "31899136003441886988955119620035330314647133604576220223892254902004850516297", + "58579998103852084482416614330746509727562027284701078483890722833654510444626", + "28248042035542126088870192155378394518950310811868093527036637864276176517397", + "60930578857433095740782970114409273483106482059893286066493409689627770333527", + "103822458477361691467064888613019442068586830412598673713899771287914656699997", + ] + .into_iter() + .map(|v| ethereum_types::U256::from_dec_str(v).unwrap()) + .collect::>(); + + let custody_requirement = 4; + for node_id in node_ids { + let computed_subnets = DataColumnSubnetId::compute_custody_subnets::( + node_id, + custody_requirement, + &spec, + ); + let computed_subnets: Vec<_> = computed_subnets.collect(); + + // the number of subnets is equal to the custody requirement + assert_eq!(computed_subnets.len() as u64, custody_requirement); + + let subnet_count = spec.data_column_sidecar_subnet_count; + for subnet in computed_subnets { + let columns: Vec<_> = subnet.columns::(&spec).collect(); + // the number of columns is equal to the specified number of columns per subnet + assert_eq!(columns.len(), spec.data_columns_per_subnet()); + + for pair in columns.windows(2) { + // each successive column index is offset by the number of subnets + assert_eq!(pair[1] - pair[0], subnet_count); + } + } + } + } + + #[test] + fn test_columns_subnet_conversion() { + let spec = E::default_spec(); + for subnet in 0..spec.data_column_sidecar_subnet_count { + let subnet_id = DataColumnSubnetId::new(subnet); + for column_index in subnet_id.columns::(&spec) { + assert_eq!( + subnet_id, + DataColumnSubnetId::from_column_index::(column_index as usize, &spec) + ); + } + } + } +} diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 89c14fa0ad..2afd726110 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -105,6 +105,7 @@ pub mod sqlite; pub mod blob_sidecar; pub mod data_column_sidecar; +pub mod data_column_subnet_id; pub mod light_client_header; pub mod non_zero_usize; pub mod runtime_var_list; @@ -147,6 +148,10 @@ pub use crate::config_and_preset::{ }; pub use crate::consolidation::Consolidation; pub use crate::contribution_and_proof::ContributionAndProof; +pub use crate::data_column_sidecar::{ + ColumnIndex, DataColumnIdentifier, DataColumnSidecar, DataColumnSidecarList, +}; +pub use crate::data_column_subnet_id::DataColumnSubnetId; pub use crate::deposit::{Deposit, DEPOSIT_TREE_DEPTH}; pub use crate::deposit_data::DepositData; pub use crate::deposit_message::DepositMessage;