From b014675b7a194e440e217e3e8cf3600ecbaf3697 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 21 May 2025 08:06:42 -0500 Subject: [PATCH] Fix PeerDAS sync scoring (#7352) * Remove request tracking inside syncing chains * Prioritize by range peers in network context * Prioritize custody peers for columns by range * Explicit error handling of the no peers error case * Remove good_peers_on_sampling_subnets * Count AwaitingDownload towards the buffer limit * Retry syncing chains in AwaitingDownload state * Use same peer priorization for lookups * Review PR * Address TODOs * Revert changes to peer erroring in range sync * Revert metrics changes * Update comment * Pass peers_to_deprioritize to select_columns_by_range_peers_to_request * more idiomatic * Idiomatic while * Add note about infinite loop * Use while let * Fix wrong custody column count for lookup blocks * Remove impl * Remove stale comment * Fix build errors. * Or default * Review PR * BatchPeerGroup * Match block and blob signatures * Explicit match statement to BlockError in range sync * Remove todo in BatchPeerGroup * Remove participating peers from backfill sync * Remove MissingAllCustodyColumns error * Merge fixes * Clean up PR * Consistent naming of batch_peers * Address multiple review comments * Better errors for das * Penalize column peers once * Restore fn * Fix error enum * Removed MismatchedPublicKeyLen * Revert testing changes * Change BlockAndCustodyColumns enum variant * Revert type change in import_historical_block_batch * Drop pubkey cache * Don't collect Vec * Classify errors * Remove ReconstructColumnsError * More detailed UnrequestedSlot error * Lint test * Fix slot conversion * Reduce penalty for missing blobs * Revert changes in peer selection * Lint tests * Rename block matching functions * Reorder block matching in historical blocks * Fix order of block matching * Add store tests * Filter blockchain in assert_correct_historical_block_chain * Also filter before KZG checks * Lint tests * Fix lint * Fix fulu err assertion * Check point is not at infinity * Fix ws sync test * Revert dropping filter fn --------- Co-authored-by: Jimmy Chen Co-authored-by: Jimmy Chen Co-authored-by: Pawan Dhananjay --- .../beacon_chain/src/block_verification.rs | 47 ++- .../src/block_verification_types.rs | 141 +++++-- .../src/data_availability_checker.rs | 84 ++-- .../src/data_availability_checker/error.rs | 32 +- .../state_lru_cache.rs | 6 +- .../beacon_chain/src/historical_blocks.rs | 141 ++++++- beacon_node/beacon_chain/src/test_utils.rs | 9 +- .../beacon_chain/tests/block_verification.rs | 12 +- beacon_node/beacon_chain/tests/store_tests.rs | 191 ++++++++- .../src/peer_manager/peerdb/score.rs | 2 +- .../gossip_methods.rs | 7 +- .../src/network_beacon_processor/mod.rs | 2 +- .../network_beacon_processor/sync_methods.rs | 390 ++++++++---------- .../network/src/sync/backfill_sync/mod.rs | 124 +++--- .../src/sync/block_sidecar_coupling.rs | 234 ++++++----- beacon_node/network/src/sync/manager.rs | 25 +- .../network/src/sync/network_context.rs | 53 ++- .../src/sync/network_context/requests.rs | 10 +- .../requests/blobs_by_range.rs | 15 +- .../requests/blocks_by_range.rs | 15 +- .../requests/data_columns_by_range.rs | 15 +- .../network/src/sync/range_sync/batch.rs | 117 ++++-- .../network/src/sync/range_sync/chain.rs | 70 ++-- .../network/src/sync/range_sync/mod.rs | 2 +- .../network/src/sync/range_sync/range.rs | 5 +- beacon_node/network/src/sync/tests/range.rs | 5 +- .../per_block_processing/signature_sets.rs | 3 - 27 files changed, 1103 insertions(+), 654 deletions(-) diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 26bf872392..c3ed09a166 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -94,6 +94,7 @@ use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp}; use strum::AsRefStr; use task_executor::JoinHandle; use tracing::{debug, error}; +use types::ColumnIndex; use types::{ data_column_sidecar::DataColumnSidecarError, BeaconBlockRef, BeaconState, BeaconStateError, BlobsList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, ExecutionBlockHash, FullPayload, @@ -220,6 +221,10 @@ pub enum BlockError { /// /// The block is invalid and the peer is faulty. InvalidSignature(InvalidSignature), + /// One or more signatures in a BlobSidecar of an RpcBlock are invalid + InvalidBlobsSignature(Vec), + /// One or more signatures in a DataColumnSidecar of an RpcBlock are invalid + InvalidDataColumnsSignature(Vec), /// The provided block is not from a later slot than its parent. /// /// ## Peer scoring @@ -634,6 +639,34 @@ pub fn signature_verify_chain_segment( &chain.spec, )?; + // Verify signatures before matching blocks and data. Otherwise we may penalize blob or column + // peers for valid signatures if the block peer sends us an invalid signature. + let pubkey_cache = get_validator_pubkey_cache(chain)?; + let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec); + for (block_root, block) in &chain_segment { + let mut consensus_context = + ConsensusContext::new(block.slot()).set_current_block_root(*block_root); + signature_verifier.include_all_signatures(block.as_block(), &mut consensus_context)?; + } + if signature_verifier.verify().is_err() { + return Err(BlockError::InvalidSignature(InvalidSignature::Unknown)); + } + drop(pubkey_cache); + + // Verify that blobs or data columns signatures match + // + // TODO(das): Should check correct proposer cheap for added protection if blocks and columns + // don't match. This code attributes fault to the blobs / data columns if they don't match the + // block + for (_, block) in &chain_segment { + if let Err(indices) = block.match_block_and_blobs() { + return Err(BlockError::InvalidBlobsSignature(indices)); + } + if let Err(indices) = block.match_block_and_data_columns() { + return Err(BlockError::InvalidDataColumnsSignature(indices)); + } + } + // unzip chain segment and verify kzg in bulk let (roots, blocks): (Vec<_>, Vec<_>) = chain_segment.into_iter().unzip(); let maybe_available_blocks = chain @@ -655,20 +688,6 @@ pub fn signature_verify_chain_segment( }) .collect::>(); - // verify signatures - let pubkey_cache = get_validator_pubkey_cache(chain)?; - let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec); - for svb in &mut signature_verified_blocks { - signature_verifier - .include_all_signatures(svb.block.as_block(), &mut svb.consensus_context)?; - } - - if signature_verifier.verify().is_err() { - return Err(BlockError::InvalidSignature(InvalidSignature::Unknown)); - } - - drop(pubkey_cache); - if let Some(signature_verified_block) = signature_verified_blocks.first_mut() { signature_verified_block.parent = Some(parent); } diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index dab54dc823..7abaf09e5e 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -9,8 +9,9 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use types::blob_sidecar::BlobIdentifier; use types::{ - BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, Epoch, EthSpec, - Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, + BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, ColumnIndex, + DataColumnSidecar, Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, + SignedBeaconBlockHeader, Slot, }; /// A block that has been received over RPC. It has 2 internal variants: @@ -53,7 +54,7 @@ impl RpcBlock { match &self.block { RpcBlockInner::Block(block) => block, RpcBlockInner::BlockAndBlobs(block, _) => block, - RpcBlockInner::BlockAndCustodyColumns(block, _) => block, + RpcBlockInner::BlockAndCustodyColumns { block, .. } => block, } } @@ -61,7 +62,7 @@ impl RpcBlock { match &self.block { RpcBlockInner::Block(block) => block.clone(), RpcBlockInner::BlockAndBlobs(block, _) => block.clone(), - RpcBlockInner::BlockAndCustodyColumns(block, _) => block.clone(), + RpcBlockInner::BlockAndCustodyColumns { block, .. } => block.clone(), } } @@ -69,7 +70,7 @@ impl RpcBlock { match &self.block { RpcBlockInner::Block(_) => None, RpcBlockInner::BlockAndBlobs(_, blobs) => Some(blobs), - RpcBlockInner::BlockAndCustodyColumns(_, _) => None, + RpcBlockInner::BlockAndCustodyColumns { .. } => None, } } @@ -77,7 +78,36 @@ impl RpcBlock { match &self.block { RpcBlockInner::Block(_) => None, RpcBlockInner::BlockAndBlobs(_, _) => None, - RpcBlockInner::BlockAndCustodyColumns(_, data_columns) => Some(data_columns), + RpcBlockInner::BlockAndCustodyColumns { data_columns, .. } => Some(data_columns), + } + } + + /// Returns Err if any of its inner BlobSidecar's signed_block_header does not match the inner + /// block + pub fn match_block_and_blobs(&self) -> Result<(), Vec> { + match &self.block { + RpcBlockInner::Block(_) => Ok(()), + RpcBlockInner::BlockAndBlobs(block, blobs) => match_block_and_blobs(block, blobs), + RpcBlockInner::BlockAndCustodyColumns { .. } => Ok(()), + } + } + + /// Returns Err if any of its inner DataColumnSidecar's signed_block_header does not match the + /// inner block + pub fn match_block_and_data_columns(&self) -> Result<(), Vec> { + match &self.block { + RpcBlockInner::Block(_) => Ok(()), + RpcBlockInner::BlockAndBlobs(..) => Ok(()), + RpcBlockInner::BlockAndCustodyColumns { + block, + data_columns, + .. + } => match_block_and_data_columns( + block, + data_columns + .iter() + .map(|data_column| data_column.as_data_column()), + ), } } } @@ -88,14 +118,20 @@ impl RpcBlock { #[derive(Debug, Clone, Derivative)] #[derivative(Hash(bound = "E: EthSpec"))] enum RpcBlockInner { - /// Single block lookup response. This should potentially hit the data availability cache. + /// **Range sync**: Variant for all pre-Deneb blocks + /// **Lookup sync**: Variant used for all blocks of all forks, regardless if the have data or + /// not Block(Arc>), - /// This variant is used with parent lookups and by-range responses. It should have all blobs - /// ordered, all block roots matching, and the correct number of blobs for this block. + /// **Range sync**: Variant for all post-Deneb blocks regardless if they have data or not + /// **Lookup sync**: Not used BlockAndBlobs(Arc>, BlobSidecarList), - /// This variant is used with parent lookups and by-range responses. It should have all - /// requested data columns, all block roots matching for this block. - BlockAndCustodyColumns(Arc>, CustodyDataColumnList), + /// **Range sync**: Variant for all post-Fulu blocks regardless if they have data or not + /// **Lookup sync**: Not used + BlockAndCustodyColumns { + block: Arc>, + data_columns: CustodyDataColumnList, + expected_custody_indices: Vec, + }, } impl RpcBlock { @@ -161,23 +197,24 @@ impl RpcBlock { block_root: Option, block: Arc>, custody_columns: Vec>, - custody_columns_count: usize, + expected_custody_indices: Vec, spec: &ChainSpec, ) -> Result { let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); - if block.num_expected_blobs() > 0 && custody_columns.is_empty() { - // The number of required custody columns is out of scope here. - return Err(AvailabilityCheckError::MissingCustodyColumns); - } - // Treat empty data column lists as if they are missing. - let inner = if !custody_columns.is_empty() { - RpcBlockInner::BlockAndCustodyColumns( - block, - RuntimeVariableList::new(custody_columns, spec.number_of_columns as usize)?, + let custody_columns_count = expected_custody_indices.len(); + let inner = RpcBlockInner::BlockAndCustodyColumns { + block, + data_columns: RuntimeVariableList::new( + custody_columns, + spec.number_of_columns as usize, ) - } else { - RpcBlockInner::Block(block) + .map_err(|e| { + AvailabilityCheckError::Unexpected(format!( + "custody_columns len exceeds number_of_columns: {e:?}" + )) + })?, + expected_custody_indices, }; Ok(Self { block_root, @@ -193,27 +230,34 @@ impl RpcBlock { Hash256, Arc>, Option>, - Option>, + Option<(CustodyDataColumnList, Vec)>, ) { let block_root = self.block_root(); match self.block { RpcBlockInner::Block(block) => (block_root, block, None, None), RpcBlockInner::BlockAndBlobs(block, blobs) => (block_root, block, Some(blobs), None), - RpcBlockInner::BlockAndCustodyColumns(block, data_columns) => { - (block_root, block, None, Some(data_columns)) - } + RpcBlockInner::BlockAndCustodyColumns { + block, + data_columns, + expected_custody_indices, + } => ( + block_root, + block, + None, + Some((data_columns, expected_custody_indices)), + ), } } pub fn n_blobs(&self) -> usize { match &self.block { - RpcBlockInner::Block(_) | RpcBlockInner::BlockAndCustodyColumns(_, _) => 0, + RpcBlockInner::Block(_) | RpcBlockInner::BlockAndCustodyColumns { .. } => 0, RpcBlockInner::BlockAndBlobs(_, blobs) => blobs.len(), } } pub fn n_data_columns(&self) -> usize { match &self.block { RpcBlockInner::Block(_) | RpcBlockInner::BlockAndBlobs(_, _) => 0, - RpcBlockInner::BlockAndCustodyColumns(_, data_columns) => data_columns.len(), + RpcBlockInner::BlockAndCustodyColumns { data_columns, .. } => data_columns.len(), } } } @@ -528,17 +572,50 @@ impl AsBlock for RpcBlock { match &self.block { RpcBlockInner::Block(block) => block, RpcBlockInner::BlockAndBlobs(block, _) => block, - RpcBlockInner::BlockAndCustodyColumns(block, _) => block, + RpcBlockInner::BlockAndCustodyColumns { block, .. } => block, } } fn block_cloned(&self) -> Arc> { match &self.block { RpcBlockInner::Block(block) => block.clone(), RpcBlockInner::BlockAndBlobs(block, _) => block.clone(), - RpcBlockInner::BlockAndCustodyColumns(block, _) => block.clone(), + RpcBlockInner::BlockAndCustodyColumns { block, .. } => block.clone(), } } fn canonical_root(&self) -> Hash256 { self.as_block().canonical_root() } } + +/// Returns Err if any of `blobs` BlobSidecar's signed_block_header does not match +/// block +pub fn match_block_and_blobs( + block: &SignedBeaconBlock, + blobs: &BlobSidecarList, +) -> Result<(), Vec> { + let indices = blobs + .iter() + .filter(|blob| &blob.signed_block_header.signature != block.signature()) + .map(|blob| blob.index) + .collect::>(); + if indices.is_empty() { + Ok(()) + } else { + Err(indices) + } +} + +pub fn match_block_and_data_columns<'a, E: EthSpec>( + block: &SignedBeaconBlock, + data_columns: impl Iterator>>, +) -> Result<(), Vec> { + let indices = data_columns + .filter(|column| &column.signed_block_header.signature != block.signature()) + .map(|column| column.index) + .collect::>(); + if indices.is_empty() { + Ok(()) + } else { + Err(indices) + } +} diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 6f292f3551..26694faf11 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -1,6 +1,7 @@ use crate::blob_verification::{verify_kzg_for_blob_list, GossipVerifiedBlob, KzgVerifiedBlobList}; use crate::block_verification_types::{ - AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock, + match_block_and_blobs, match_block_and_data_columns, AvailabilityPendingExecutedBlock, + AvailableExecutedBlock, RpcBlock, }; use crate::data_availability_checker::overflow_lru_cache::{ DataAvailabilityCheckerInner, ReconstructColumnsDecision, @@ -8,6 +9,7 @@ use crate::data_availability_checker::overflow_lru_cache::{ use crate::{metrics, BeaconChain, BeaconChainTypes, BeaconStore}; use kzg::Kzg; use slot_clock::SlotClock; +use std::collections::HashSet; use std::fmt; use std::fmt::Debug; use std::num::NonZeroUsize; @@ -17,8 +19,8 @@ use task_executor::TaskExecutor; use tracing::{debug, error, info_span, Instrument}; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; use types::{ - BlobSidecarList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, Hash256, - RuntimeVariableList, SignedBeaconBlock, + BlobSidecarList, ChainSpec, ColumnIndex, DataColumnSidecarList, Epoch, EthSpec, Hash256, + SignedBeaconBlock, }; mod error; @@ -345,7 +347,7 @@ impl DataAvailabilityChecker { }; } if self.data_columns_required_for_block(&block) { - return if let Some(data_column_list) = data_columns.as_ref() { + return if let Some((data_column_list, _)) = data_columns.as_ref() { verify_kzg_for_data_column_list_with_scoring( data_column_list .iter() @@ -410,14 +412,15 @@ impl DataAvailabilityChecker { let all_data_columns = blocks .iter() + // TODO(das): we may want to remove this line. If columns are present they should be + // verified. The outcome of `data_columns_required_for_block` is time dependant. So we + // may end up importing data columns that are not verified. .filter(|block| self.data_columns_required_for_block(block.as_block())) // this clone is cheap as it's cloning an Arc .filter_map(|block| block.custody_columns().cloned()) .flatten() .map(CustodyDataColumn::into_inner) .collect::>(); - let all_data_columns = - RuntimeVariableList::from_vec(all_data_columns, self.spec.number_of_columns as usize); // verify kzg for all data columns at once if !all_data_columns.is_empty() { @@ -426,6 +429,7 @@ impl DataAvailabilityChecker { .map_err(AvailabilityCheckError::InvalidColumn)?; } + // TODO(das): we could do the matching first before spending CPU cycles on KZG verification for block in blocks { let custody_columns_count = block.custody_columns_count(); let (block_root, block, blobs, data_columns) = block.deconstruct(); @@ -447,7 +451,21 @@ impl DataAvailabilityChecker { } } } else if self.data_columns_required_for_block(&block) { - if let Some(data_columns) = data_columns { + if let Some((data_columns, expected_custody_indices)) = data_columns { + let received_indices = + HashSet::::from_iter(data_columns.iter().map(|d| d.index())); + + let missing_custody_columns = expected_custody_indices + .into_iter() + .filter(|index| !received_indices.contains(index)) + .collect::>(); + + if !missing_custody_columns.is_empty() { + return Err(AvailabilityCheckError::MissingCustodyColumns( + missing_custody_columns, + )); + } + MaybeAvailableBlock::Available(AvailableBlock { block_root, block, @@ -458,11 +476,12 @@ impl DataAvailabilityChecker { spec: self.spec.clone(), }) } else { - MaybeAvailableBlock::AvailabilityPending { - block_root, - block, - custody_columns_count, - } + // This is unreachable. If a block returns true for + // `data_columns_required_for_block` it must be a Fulu block. All Fulu RpcBlocks + // are constructed with the `DataColumns` variant, so `data_columns` must be Some + return Err(AvailabilityCheckError::Unexpected( + "Data columns should be Some for a Fulu block".to_string(), + )); } } else { MaybeAvailableBlock::Available(AvailableBlock { @@ -571,7 +590,7 @@ impl DataAvailabilityChecker { self.availability_cache .handle_reconstruction_failure(block_root); metrics::inc_counter(&KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES); - AvailabilityCheckError::ReconstructColumnsError(e) + AvailabilityCheckError::Unexpected(format!("Error reconstructing columns: {e:?}")) })?; // Check indices from cache again to make sure we don't publish components we've already received. @@ -713,7 +732,7 @@ async fn availability_cache_maintenance_service( } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum AvailableBlockData { /// Block is pre-Deneb or has zero blobs NoData, @@ -724,7 +743,7 @@ pub enum AvailableBlockData { } /// A fully available block that is ready to be imported into fork choice. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct AvailableBlock { block_root: Hash256, block: Arc>, @@ -784,21 +803,26 @@ impl AvailableBlock { (block_root, block, blob_data) } - /// Only used for testing - pub fn __clone_without_recv(&self) -> Result { - Ok(Self { - block_root: self.block_root, - block: self.block.clone(), - blob_data: match &self.blob_data { - AvailableBlockData::NoData => AvailableBlockData::NoData, - AvailableBlockData::Blobs(blobs) => AvailableBlockData::Blobs(blobs.clone()), - AvailableBlockData::DataColumns(data_columns) => { - AvailableBlockData::DataColumns(data_columns.clone()) - } - }, - blobs_available_timestamp: self.blobs_available_timestamp, - spec: self.spec.clone(), - }) + /// Returns Err if any of its inner BlobSidecar's signed_block_header does not match the inner + /// block + pub fn match_block_and_blobs(&self) -> Result<(), Vec> { + match &self.blob_data { + AvailableBlockData::NoData => Ok(()), + AvailableBlockData::Blobs(blobs) => match_block_and_blobs(&self.block, blobs), + AvailableBlockData::DataColumns(_) => Ok(()), + } + } + + /// Returns Err if any of its inner DataColumnSidecar's signed_block_header does not match the + /// inner block + pub fn match_block_and_data_columns(&self) -> Result<(), Vec> { + match &self.blob_data { + AvailableBlockData::NoData => Ok(()), + AvailableBlockData::Blobs(_) => Ok(()), + AvailableBlockData::DataColumns(data_columns) => { + match_block_and_data_columns(&self.block, data_columns.iter()) + } + } } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index d091d6fefb..3388fd75cb 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -1,24 +1,20 @@ use kzg::{Error as KzgError, KzgCommitment}; -use types::{BeaconStateError, ColumnIndex, Hash256}; +use types::{BeaconStateError, ColumnIndex}; #[derive(Debug)] pub enum Error { InvalidBlobs(KzgError), InvalidColumn(Vec<(ColumnIndex, KzgError)>), - ReconstructColumnsError(KzgError), KzgCommitmentMismatch { blob_commitment: KzgCommitment, block_commitment: KzgCommitment, }, Unexpected(String), - SszTypes(ssz_types::Error), MissingBlobs, - MissingCustodyColumns, + MissingCustodyColumns(Vec), BlobIndexInvalid(u64), DataColumnIndexInvalid(u64), StoreError(store::Error), - DecodeError(ssz::DecodeError), - ParentStateMissing(Hash256), BlockReplayError(state_processing::BlockReplayError), RebuildingStateCaches(BeaconStateError), SlotClockError, @@ -35,19 +31,15 @@ pub enum ErrorCategory { impl Error { pub fn category(&self) -> ErrorCategory { match self { - Error::SszTypes(_) - | Error::MissingBlobs - | Error::MissingCustodyColumns - | Error::StoreError(_) - | Error::DecodeError(_) + Error::StoreError(_) | Error::Unexpected(_) - | Error::ParentStateMissing(_) | Error::BlockReplayError(_) | Error::RebuildingStateCaches(_) | Error::SlotClockError => ErrorCategory::Internal, - Error::InvalidBlobs { .. } + Error::MissingBlobs + | Error::MissingCustodyColumns(_) + | Error::InvalidBlobs { .. } | Error::InvalidColumn { .. } - | Error::ReconstructColumnsError { .. } | Error::BlobIndexInvalid(_) | Error::DataColumnIndexInvalid(_) | Error::KzgCommitmentMismatch { .. } => ErrorCategory::Malicious, @@ -55,24 +47,12 @@ impl Error { } } -impl From for Error { - fn from(value: ssz_types::Error) -> Self { - Self::SszTypes(value) - } -} - impl From for Error { fn from(value: store::Error) -> Self { Self::StoreError(value) } } -impl From for Error { - fn from(value: ssz::DecodeError) -> Self { - Self::DecodeError(value) - } -} - impl From for Error { fn from(value: state_processing::BlockReplayError) -> Self { Self::BlockReplayError(value) diff --git a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs index 5fe674f30c..fe8c89e6c3 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs @@ -157,9 +157,9 @@ impl StateLRUCache { parent_block_state_root, ) .map_err(AvailabilityCheckError::StoreError)? - .ok_or(AvailabilityCheckError::ParentStateMissing( - parent_block_state_root, - ))?; + .ok_or(AvailabilityCheckError::Unexpected(format!( + "Parent state missing {parent_block_state_root:?}" + )))?; let state_roots = vec![ Ok((parent_state_root, diet_executed_block.parent_block.slot())), diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 348e6d52a6..d4e015706b 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -1,4 +1,7 @@ -use crate::data_availability_checker::{AvailableBlock, AvailableBlockData}; +use crate::block_verification_types::{MaybeAvailableBlock, RpcBlock}; +use crate::data_availability_checker::{ + AvailabilityCheckError, AvailableBlock, AvailableBlockData, +}; use crate::{metrics, BeaconChain, BeaconChainTypes}; use itertools::Itertools; use state_processing::{ @@ -12,7 +15,7 @@ use store::metadata::DataColumnInfo; use store::{AnchorInfo, BlobInfo, DBColumn, Error as StoreError, KeyValueStore, KeyValueStoreOp}; use strum::IntoStaticStr; use tracing::debug; -use types::{FixedBytesExtended, Hash256, Slot}; +use types::{ColumnIndex, FixedBytesExtended, Hash256, Slot}; /// Use a longer timeout on the pubkey cache. /// @@ -23,19 +26,27 @@ const PUBKEY_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(30); pub enum HistoricalBlockError { /// Block root mismatch, caller should retry with different blocks. MismatchedBlockRoot { + block_slot: Slot, block_root: Hash256, expected_block_root: Hash256, + oldest_block_parent: Hash256, }, /// Bad signature, caller should retry with different blocks. - SignatureSet(SignatureSetError), - /// Bad signature, caller should retry with different blocks. - InvalidSignature, + InvalidSignature(String), + /// One or more signatures in a BlobSidecar of an RpcBlock are invalid + InvalidBlobsSignature(Vec), + /// One or more signatures in a DataColumnSidecar of an RpcBlock are invalid + InvalidDataColumnsSignature(Vec), + /// Unexpected error + Unexpected(String), /// Transitory error, caller should retry with the same blocks. ValidatorPubkeyCacheTimeout, /// Logic error: should never occur. IndexOutOfBounds, /// Internal store error StoreError(StoreError), + /// Faulty and internal AvailabilityCheckError + AvailabilityCheckError(AvailabilityCheckError), } impl From for HistoricalBlockError { @@ -44,7 +55,100 @@ impl From for HistoricalBlockError { } } +impl From for HistoricalBlockError { + fn from(err: SignatureSetError) -> Self { + match err { + // The encoding of the signature is invalid, peer fault + e + @ (SignatureSetError::SignatureInvalid(_) | SignatureSetError::BadBlsBytes { .. }) => { + Self::InvalidSignature(format!("{e:?}")) + } + // All these variants are internal errors or unreachable for historical block paths, + // which only check the proposer signature. + // BadBlsBytes = Unreachable + e @ (SignatureSetError::BeaconStateError(_) + | SignatureSetError::ValidatorUnknown(_) + | SignatureSetError::ValidatorPubkeyUnknown(_) + | SignatureSetError::IncorrectBlockProposer { .. } + | SignatureSetError::PublicKeyDecompressionFailed + | SignatureSetError::InconsistentBlockFork(_)) => Self::Unexpected(format!("{e:?}")), + } + } +} + +impl From for HistoricalBlockError { + fn from(e: AvailabilityCheckError) -> Self { + Self::AvailabilityCheckError(e) + } +} + impl BeaconChain { + pub fn assert_correct_historical_block_chain( + &self, + blocks: &[RpcBlock], + ) -> Result<(), HistoricalBlockError> { + let anchor_info = self.store.get_anchor_info(); + let mut expected_block_root = anchor_info.oldest_block_parent; + + for block in blocks.iter().rev() { + if block.as_block().slot() >= anchor_info.oldest_block_slot { + continue; + } + + if block.block_root() != expected_block_root { + return Err(HistoricalBlockError::MismatchedBlockRoot { + block_slot: block.as_block().slot(), + block_root: block.block_root(), + expected_block_root, + oldest_block_parent: anchor_info.oldest_block_parent, + }); + } + + expected_block_root = block.as_block().message().parent_root(); + } + + Ok(()) + } + + pub fn verify_and_import_historical_block_batch( + &self, + blocks: Vec>, + ) -> Result { + let anchor_info = self.store.get_anchor_info(); + + // Take all blocks with slots less than the oldest block slot. + let blocks_to_import = blocks + .into_iter() + .filter(|block| block.as_block().slot() < anchor_info.oldest_block_slot) + .collect::>(); + + // First check that chain of blocks is correct + self.assert_correct_historical_block_chain(&blocks_to_import)?; + + // Check that all data columns are present <- faulty failure if missing because we have + // checked the block root is correct first. + let available_blocks_to_import = self + .data_availability_checker + .verify_kzg_for_rpc_blocks(blocks_to_import) + .and_then(|blocks| { + blocks + .into_iter() + // RpcBlocks must always be Available, otherwise a data peer is faulty of + // malicious. `verify_kzg_for_rpc_blocks` returns errors for those cases, but we + // haven't updated its function signature. This code block can be deleted later + // bigger refactor. + .map(|maybe_available| match maybe_available { + MaybeAvailableBlock::Available(block) => Ok(block), + MaybeAvailableBlock::AvailabilityPending { .. } => Err( + AvailabilityCheckError::Unexpected("block not available".to_string()), + ), + }) + .collect::, _>>() + })?; + + self.import_historical_block_batch(available_blocks_to_import) + } + /// Store a batch of historical blocks in the database. /// /// The `blocks` should be given in slot-ascending order. One of the blocks should have a block @@ -103,16 +207,9 @@ impl BeaconChain { let mut hot_batch = Vec::with_capacity(blocks_to_import.len()); let mut signed_blocks = Vec::with_capacity(blocks_to_import.len()); - for available_block in blocks_to_import.into_iter().rev() { + for available_block in blocks_to_import.iter().cloned().rev() { let (block_root, block, block_data) = available_block.deconstruct(); - if block_root != expected_block_root { - return Err(HistoricalBlockError::MismatchedBlockRoot { - block_root, - expected_block_root, - }); - } - if !self.store.get_config().prune_payloads { // If prune-payloads is set to false, store the block which includes the execution payload self.store @@ -213,18 +310,32 @@ impl BeaconChain { ) }) .collect::, _>>() - .map_err(HistoricalBlockError::SignatureSet) .map(ParallelSignatureSets::from)?; drop(pubkey_cache); drop(setup_timer); let verify_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_VERIFY_TIMES); if !signature_set.verify() { - return Err(HistoricalBlockError::InvalidSignature); + return Err(HistoricalBlockError::InvalidSignature("invalid".to_owned())); } drop(verify_timer); drop(sig_timer); + // Check that the proposer signature in the blobs and data columns is the same as the + // correct signature in the block. + blocks_to_import + .iter() + .map(|block| { + if let Err(indices) = block.match_block_and_blobs() { + return Err(HistoricalBlockError::InvalidBlobsSignature(indices)); + } + if let Err(indices) = block.match_block_and_data_columns() { + return Err(HistoricalBlockError::InvalidDataColumnsSignature(indices)); + } + Ok(()) + }) + .collect::, _>>()?; + // Write the I/O batches to disk, writing the blocks themselves first, as it's better // for the hot DB to contain extra blocks than for the cold DB to point to blocks that // do not exist. diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index ca083f0572..858aaafcf0 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -2372,6 +2372,7 @@ where // Blobs are stored as data columns from Fulu (PeerDAS) if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) { let columns = self.chain.get_data_columns(&block_root).unwrap().unwrap(); + let expected_custody_indices = columns.iter().map(|d| d.index).collect::>(); let custody_columns = columns .into_iter() .map(CustodyDataColumn::from_asserted_custody) @@ -2380,7 +2381,7 @@ where Some(block_root), block, custody_columns, - self.get_sampling_column_count(), + expected_custody_indices, &self.spec, ) .unwrap() @@ -2409,15 +2410,17 @@ where .take(sampling_column_count) .map(CustodyDataColumn::from_asserted_custody) .collect::>(); + let expected_custody_indices = + columns.iter().map(|d| d.index()).collect::>(); RpcBlock::new_with_custody_columns( Some(block_root), block, columns, - sampling_column_count, + expected_custody_indices, &self.spec, )? } else { - RpcBlock::new_without_blobs(Some(block_root), block, 0) + RpcBlock::new_without_blobs(Some(block_root), block, sampling_column_count) } } else { let blobs = blob_items diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 9225ffd9f4..4f3556263f 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -143,10 +143,14 @@ fn build_rpc_block( Some(DataSidecars::Blobs(blobs)) => { RpcBlock::new(None, block, Some(blobs.clone())).unwrap() } - Some(DataSidecars::DataColumns(columns)) => { - RpcBlock::new_with_custody_columns(None, block, columns.clone(), columns.len(), spec) - .unwrap() - } + Some(DataSidecars::DataColumns(columns)) => RpcBlock::new_with_custody_columns( + None, + block, + columns.clone(), + columns.iter().map(|d| d.index()).collect(), + spec, + ) + .unwrap(), None => RpcBlock::new_without_blobs(None, block, 0), } } diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 3343dc101b..98d46482bc 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -3,7 +3,8 @@ use beacon_chain::attestation_verification::Error as AttnError; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::builder::BeaconChainBuilder; -use beacon_chain::data_availability_checker::AvailableBlock; +use beacon_chain::data_availability_checker::{AvailableBlock, AvailableBlockData}; +use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::schema_change::migrate_schema; use beacon_chain::test_utils::SyncCommitteeStrategy; use beacon_chain::test_utils::{ @@ -11,9 +12,11 @@ use beacon_chain::test_utils::{ BlockStrategy, DiskHarnessType, }; use beacon_chain::{ - data_availability_checker::MaybeAvailableBlock, historical_blocks::HistoricalBlockError, - migrate::MigratorConfig, BeaconChain, BeaconChainError, BeaconChainTypes, BeaconSnapshot, - BlockError, ChainConfig, NotifyExecutionLayer, ServerSentEventHandler, WhenSlotSkipped, + data_availability_checker::{AvailabilityCheckError, MaybeAvailableBlock}, + historical_blocks::HistoricalBlockError, + migrate::MigratorConfig, + BeaconChain, BeaconChainError, BeaconChainTypes, BeaconSnapshot, BlockError, ChainConfig, + NotifyExecutionLayer, ServerSentEventHandler, WhenSlotSkipped, }; use logging::create_test_tracing_subscriber; use maplit::hashset; @@ -33,6 +36,7 @@ use store::{ BlobInfo, DBColumn, HotColdDB, StoreConfig, }; use tempfile::{tempdir, TempDir}; +use tracing::info; use types::test_utils::{SeedableRng, XorShiftRng}; use types::*; @@ -2339,6 +2343,7 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { let store = get_store(&temp2); let spec = test_spec::(); let seconds_per_slot = spec.seconds_per_slot; + let wss_fork = harness.spec.fork_name_at_slot::(checkpoint_slot); let kzg = get_kzg(&spec); @@ -2499,12 +2504,154 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { }; // Importing the invalid batch should error. - assert!(matches!( - beacon_chain - .import_historical_block_batch(batch_with_invalid_first_block) - .unwrap_err(), - HistoricalBlockError::InvalidSignature - )); + let err = beacon_chain + .import_historical_block_batch(batch_with_invalid_first_block) + .unwrap_err(); + match err { + HistoricalBlockError::InvalidSignature(_) => {} // ok + e => panic!("Unexpected error {e:?}"), + } + + if wss_fork.deneb_enabled() { + // Currently ExecutionBlockGenerator::build_new_execution_payload doesn't accept a parameter + // to generate a fixed number of blob TXs, so it's random. Given the large number of blocks + // in this batch it's very unlikely that no block has data, but it's probable that's it's + // not index 0, so we need to find the first block with data. + let first_block_with_data = available_blocks + .iter() + .position(|block| block.block().num_expected_blobs() > 0) + .expect("No blocks have data, try different RNG"); + + // Test 1: Invalidate sidecar header signature + + let mut batch_with_invalid_header = available_blocks.to_vec(); + batch_with_invalid_header[first_block_with_data] = { + let (block_root, block, block_data) = batch_with_invalid_header[first_block_with_data] + .clone() + .deconstruct(); + if wss_fork.fulu_enabled() { + info!(block_slot = %block.slot(), ?block_root, "Corrupting data column header signature"); + let AvailableBlockData::DataColumns(mut data_columns) = block_data else { + panic!("no columns") + }; + assert!( + !data_columns.is_empty(), + "data column sidecars shouldn't be empty" + ); + let mut data_column = (*data_columns[0]).clone(); + data_column.signed_block_header.signature = Signature::empty(); + data_columns[0] = data_column.into(); + AvailableBlock::__new_for_testing( + block_root, + block, + AvailableBlockData::DataColumns(data_columns), + beacon_chain.spec.clone(), + ) + } else { + info!(block_slot = %block.slot(), ?block_root, "Corrupting blob header signature"); + let AvailableBlockData::Blobs(mut blobs) = block_data else { + let blocks_have_blobs = available_blocks + .into_iter() + .map(|block| (block.block().slot(), block.has_blobs())) + .collect::>(); + panic!( + "no blobs at block {:?} {}. blocks_have_blobs {:?}", + block_root, + block.slot(), + blocks_have_blobs + ); + }; + assert!(!blobs.is_empty(), "blob sidecars shouldn't be empty"); + let mut blob = (*blobs[0]).clone(); + blob.signed_block_header.signature = Signature::empty(); + blobs[0] = blob.into(); + AvailableBlock::__new_for_testing( + block_root, + block, + AvailableBlockData::Blobs(blobs), + beacon_chain.spec.clone(), + ) + } + }; + + // Importing the invalid batch should error. + let err = beacon_chain + .import_historical_block_batch(batch_with_invalid_header) + .unwrap_err(); + if wss_fork.fulu_enabled() { + match err { + HistoricalBlockError::InvalidDataColumnsSignature(_) => {} // ok + e => panic!("Unexpected error {e:?}"), + } + } else { + match err { + HistoricalBlockError::InvalidBlobsSignature(_) => {} // ok + e => panic!("Unexpected error {e:?}"), + } + } + + // Test 2: invalidate KZG proof + + let mut batch_with_invalid_kzg = available_blocks + .iter() + .map(|block| available_to_rpc_block(block.clone(), &harness.spec)) + .collect::>(); + + batch_with_invalid_kzg[first_block_with_data] = { + let (block_root, block, blobs, cols) = batch_with_invalid_kzg[first_block_with_data] + .clone() + .deconstruct(); + if wss_fork.fulu_enabled() { + info!(block_slot = %block.slot(), ?block_root, "Corrupting data column KZG proof"); + let (mut data_columns, expected_column_indices) = cols.unwrap(); + assert!( + !data_columns.is_empty(), + "data column sidecars shouldn't be empty" + ); + let mut data_column = (*(data_columns[0]).clone_arc()).clone(); + if data_column.kzg_proofs[0] == KzgProof::empty() { + panic!("kzg_proof is already G1_POINT_AT_INFINITY") + } + data_column.kzg_proofs[0] = KzgProof::empty(); + data_columns[0] = CustodyDataColumn::from_asserted_custody(data_column.into()); + RpcBlock::new_with_custody_columns( + Some(block_root), + block, + data_columns.to_vec(), + expected_column_indices, + &harness.spec, + ) + .unwrap() + } else { + info!(block_slot = %block.slot(), ?block_root, "Corrupting blob KZG proof"); + let mut blobs = blobs.unwrap(); + assert!(!blobs.is_empty(), "blob sidecars shouldn't be empty"); + let mut blob = (*blobs[0]).clone(); + blob.kzg_proof = KzgProof::empty(); + blobs[0] = blob.into(); + RpcBlock::new(Some(block_root), block, Some(blobs)).unwrap() + } + }; + + let err = beacon_chain + .verify_and_import_historical_block_batch(batch_with_invalid_kzg) + .unwrap_err(); + if wss_fork.fulu_enabled() { + match err { + HistoricalBlockError::AvailabilityCheckError( + AvailabilityCheckError::InvalidColumn(_), + ) => {} // ok + e => panic!("Unexpected error {e:?}"), + } + } else { + match err { + HistoricalBlockError::AvailabilityCheckError( + AvailabilityCheckError::InvalidBlobs(_), + ) => {} // ok + e => panic!("Unexpected error {e:?}"), + } + } + } // Importing the batch with valid signatures should succeed. let available_blocks_dup = available_blocks.iter().map(clone_block).collect::>(); @@ -3678,5 +3825,27 @@ fn get_blocks( } fn clone_block(block: &AvailableBlock) -> AvailableBlock { - block.__clone_without_recv().unwrap() + block.clone() +} + +fn available_to_rpc_block(block: AvailableBlock, spec: &ChainSpec) -> RpcBlock { + let (block_root, block, block_data) = block.deconstruct(); + + match block_data { + AvailableBlockData::NoData => RpcBlock::new(Some(block_root), block, None).unwrap(), + AvailableBlockData::Blobs(blobs) => { + RpcBlock::new(Some(block_root), block, Some(blobs)).unwrap() + } + AvailableBlockData::DataColumns(data_columns) => RpcBlock::new_with_custody_columns( + Some(block_root), + block, + data_columns + .into_iter() + .map(|d| CustodyDataColumn::from_asserted_custody(d)) + .collect(), + vec![], + spec, + ) + .unwrap(), + } } diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/score.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/score.rs index 995ebf9064..517151a06f 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/score.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/score.rs @@ -43,7 +43,7 @@ const GOSSIPSUB_POSITIVE_SCORE_WEIGHT: f64 = GOSSIPSUB_NEGATIVE_SCORE_WEIGHT; /// Each variant has an associated score change. // To easily assess the behaviour of scores changes the number of variants should stay low, and // somewhat generic. -#[derive(Debug, Clone, Copy, AsRefStr)] +#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq, AsRefStr)] #[strum(serialize_all = "snake_case")] pub enum PeerAction { /// We should not communicate more with this peer. diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 638f9e4824..5b2ba76560 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1426,7 +1426,12 @@ impl NetworkBeaconProcessor { return None; } // BlobNotRequired is unreachable. Only constructed in `process_gossip_blob` - Err(e @ BlockError::InternalError(_)) | Err(e @ BlockError::BlobNotRequired(_)) => { + // InvalidBlobsSignature is unreachable. Only constructed in `process_chain_segment` + // InvalidDataColumnsSignature is unreachable. Only constructed in `process_chain_segment` + Err(e @ BlockError::InternalError(_)) + | Err(e @ BlockError::BlobNotRequired(_)) + | Err(e @ BlockError::InvalidBlobsSignature(_)) + | Err(e @ BlockError::InvalidDataColumnsSignature(_)) => { error!(error = %e, "Internal block gossip validation error"); return None; } diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index ba681eed14..7a4d697880 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -33,7 +33,7 @@ use tokio::sync::mpsc::{self, error::TrySendError}; use tracing::{debug, error, trace, warn, Instrument}; use types::*; -pub use sync_methods::ChainSegmentProcessId; +pub use sync_methods::{ChainSegmentProcessId, PeerGroupAction}; use types::blob_sidecar::FixedBlobSidecarList; pub type Error = TrySendError>; diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 31b17a41a4..b1777cef79 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -7,7 +7,6 @@ use crate::sync::{ }; use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::AvailabilityCheckError; -use beacon_chain::data_availability_checker::MaybeAvailableBlock; use beacon_chain::data_column_verification::verify_kzg_for_data_column_list; use beacon_chain::{ validator_monitor::get_slot_delay_ms, AvailabilityProcessingStatus, BeaconChainTypes, @@ -18,6 +17,7 @@ use beacon_processor::{ AsyncFn, BlockingFn, DuplicateCache, }; use lighthouse_network::PeerAction; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use store::KzgCommitment; @@ -25,7 +25,9 @@ use tokio::sync::mpsc; use tracing::{debug, error, info, warn}; use types::beacon_block_body::format_kzg_commitments; use types::blob_sidecar::FixedBlobSidecarList; -use types::{BlockImportSource, DataColumnSidecar, DataColumnSidecarList, Epoch, Hash256}; +use types::{ + BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, Hash256, +}; /// Id associated to a batch processing request, either a sync batch or a parent lookup. #[derive(Clone, Debug, PartialEq)] @@ -37,11 +39,65 @@ pub enum ChainSegmentProcessId { } /// Returned when a chain segment import fails. -struct ChainSegmentFailed { +#[derive(Debug)] +pub struct ChainSegmentFailed { /// To be displayed in logs. - message: String, + pub message: String, /// Used to penalize peers. - peer_action: Option, + pub peer_action: Option, +} + +/// Tracks which block(s) component caused the block to be invalid. Used to attribute fault in sync. +#[derive(Debug)] +pub struct PeerGroupAction { + pub block_peer: Option, + pub column_peer: HashMap, +} + +impl PeerGroupAction { + fn block_peer(action: PeerAction) -> Self { + Self { + block_peer: Some(action), + column_peer: <_>::default(), + } + } + + fn column_peers(columns: &[ColumnIndex], action: PeerAction) -> Self { + Self { + block_peer: None, + column_peer: HashMap::from_iter(columns.iter().map(|index| (*index, action))), + } + } + + fn from_availability_check_error(e: &AvailabilityCheckError) -> Option { + match e { + AvailabilityCheckError::InvalidBlobs(_) => { + Some(PeerGroupAction::block_peer(PeerAction::LowToleranceError)) + } + AvailabilityCheckError::InvalidColumn(errors) => Some(PeerGroupAction::column_peers( + &errors.iter().map(|(index, _)| *index).collect::>(), + PeerAction::LowToleranceError, + )), + AvailabilityCheckError::KzgCommitmentMismatch { .. } => None, // should never happen after checking inclusion proof + AvailabilityCheckError::Unexpected(_) => None, // internal + AvailabilityCheckError::MissingBlobs => { + Some(PeerGroupAction::block_peer(PeerAction::HighToleranceError)) + } + // TOOD(das): PeerAction::High may be too soft of a penalty. Also may be deprecated + // with https://github.com/sigp/lighthouse/issues/6258 + AvailabilityCheckError::MissingCustodyColumns(columns) => Some( + PeerGroupAction::column_peers(columns, PeerAction::HighToleranceError), + ), + AvailabilityCheckError::BlobIndexInvalid(_) => { + Some(PeerGroupAction::block_peer(PeerAction::LowToleranceError)) + } + AvailabilityCheckError::DataColumnIndexInvalid(_) => None, // unreachable + AvailabilityCheckError::StoreError(_) => None, // unreachable + AvailabilityCheckError::BlockReplayError(_) => None, // internal error + AvailabilityCheckError::RebuildingStateCaches(_) => None, // internal error + AvailabilityCheckError::SlotClockError => None, // internal error + } + } } impl NetworkBeaconProcessor { @@ -480,7 +536,8 @@ impl NetworkBeaconProcessor { match e.peer_action { Some(penalty) => BatchProcessResult::FaultyFailure { imported_blocks, - penalty, + peer_action: penalty, + error: e.message, }, None => BatchProcessResult::NonFaultyFailure, } @@ -502,7 +559,7 @@ impl NetworkBeaconProcessor { .sum::(); match self.process_backfill_blocks(downloaded_blocks) { - (imported_blocks, Ok(_)) => { + Ok(imported_blocks) => { debug!( batch_epoch = %epoch, first_block_slot = start_slot, @@ -518,7 +575,7 @@ impl NetworkBeaconProcessor { imported_blocks, } } - (_, Err(e)) => { + Err(e) => { debug!( batch_epoch = %epoch, first_block_slot = start_slot, @@ -529,9 +586,10 @@ impl NetworkBeaconProcessor { "Backfill batch processing failed" ); match e.peer_action { - Some(penalty) => BatchProcessResult::FaultyFailure { + Some(peer_action) => BatchProcessResult::FaultyFailure { imported_blocks: 0, - penalty, + peer_action, + error: e.message, }, None => BatchProcessResult::NonFaultyFailure, } @@ -589,148 +647,77 @@ impl NetworkBeaconProcessor { fn process_backfill_blocks( &self, downloaded_blocks: Vec>, - ) -> (usize, Result<(), ChainSegmentFailed>) { - let total_blocks = downloaded_blocks.len(); - let available_blocks = match self + ) -> Result { + match self .chain - .data_availability_checker - .verify_kzg_for_rpc_blocks(downloaded_blocks) + .verify_and_import_historical_block_batch(downloaded_blocks) { - Ok(blocks) => blocks - .into_iter() - .filter_map(|maybe_available| match maybe_available { - MaybeAvailableBlock::Available(block) => Some(block), - MaybeAvailableBlock::AvailabilityPending { .. } => None, - }) - .collect::>(), - Err(e) => match e { - AvailabilityCheckError::StoreError(_) => { - return ( - 0, - Err(ChainSegmentFailed { - peer_action: None, - message: "Failed to check block availability".into(), - }), - ); - } - e => { - return ( - 0, - Err(ChainSegmentFailed { - peer_action: Some(PeerAction::LowToleranceError), - message: format!("Failed to check block availability : {:?}", e), - }), - ) - } - }, - }; - - if available_blocks.len() != total_blocks { - return ( - 0, - Err(ChainSegmentFailed { - peer_action: Some(PeerAction::LowToleranceError), - message: format!( - "{} out of {} blocks were unavailable", - (total_blocks - available_blocks.len()), - total_blocks - ), - }), - ); - } - - match self.chain.import_historical_block_batch(available_blocks) { Ok(imported_blocks) => { metrics::inc_counter( &metrics::BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_SUCCESS_TOTAL, ); - (imported_blocks, Ok(())) + Ok(imported_blocks) } Err(e) => { metrics::inc_counter( &metrics::BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_FAILED_TOTAL, ); let peer_action = match &e { - HistoricalBlockError::MismatchedBlockRoot { - block_root, - expected_block_root, - } => { - debug!( - error = "mismatched_block_root", - ?block_root, - expected_root = ?expected_block_root, - "Backfill batch processing error" - ); - // The peer is faulty if they send blocks with bad roots. - Some(PeerAction::LowToleranceError) + HistoricalBlockError::AvailabilityCheckError(e) => { + PeerGroupAction::from_availability_check_error(e) } - HistoricalBlockError::InvalidSignature - | HistoricalBlockError::SignatureSet(_) => { - warn!( - error = ?e, - "Backfill batch processing error" - ); - // The peer is faulty if they bad signatures. - Some(PeerAction::LowToleranceError) + // The peer is faulty if they send blocks with bad roots or invalid signatures + HistoricalBlockError::MismatchedBlockRoot { .. } + | HistoricalBlockError::InvalidSignature(_) => { + Some(PeerGroupAction::block_peer(PeerAction::LowToleranceError)) } - HistoricalBlockError::ValidatorPubkeyCacheTimeout => { - warn!( - error = "pubkey_cache_timeout", - "Backfill batch processing error" - ); + // Blobs are served by the block_peer + HistoricalBlockError::InvalidBlobsSignature(_) => { + Some(PeerGroupAction::block_peer(PeerAction::LowToleranceError)) + } + HistoricalBlockError::InvalidDataColumnsSignature(indices) => Some( + PeerGroupAction::column_peers(indices, PeerAction::LowToleranceError), + ), + HistoricalBlockError::ValidatorPubkeyCacheTimeout + | HistoricalBlockError::IndexOutOfBounds + | HistoricalBlockError::StoreError(_) + | HistoricalBlockError::Unexpected(_) => { // This is an internal error, do not penalize the peer. None - } - HistoricalBlockError::IndexOutOfBounds => { - error!( - error = ?e, - "Backfill batch OOB error" - ); - // This should never occur, don't penalize the peer. - None - } - HistoricalBlockError::StoreError(e) => { - warn!(error = ?e, "Backfill batch processing error"); - // This is an internal error, don't penalize the peer. - None - } // - // Do not use a fallback match, handle all errors explicitly + } // Do not use a fallback match, handle all errors explicitly }; - let err_str: &'static str = e.into(); - ( - 0, - Err(ChainSegmentFailed { - message: format!("{:?}", err_str), - // This is an internal error, don't penalize the peer. - peer_action, - }), - ) + + if peer_action.is_some() { + // All errors that result in a peer penalty are "expected" external faults the + // node runner can't do anything about + debug!(?e, "Backfill sync processing error"); + } else { + // All others are some type of internal error worth surfacing? + warn!(?e, "Unexpected backfill sync processing error"); + } + + Err(ChainSegmentFailed { + // Render the full error in debug for full details + message: format!("{:?}", e), + peer_action, + }) } } } /// Helper function to handle a `BlockError` from `process_chain_segment` fn handle_failed_chain_segment(&self, error: BlockError) -> Result<(), ChainSegmentFailed> { - match error { - BlockError::ParentUnknown { parent_root, .. } => { + let peer_action = match &error { + BlockError::ParentUnknown { .. } => { // blocks should be sequential and all parents should exist - Err(ChainSegmentFailed { - message: format!("Block has an unknown parent: {}", parent_root), - // Peers are faulty if they send non-sequential blocks. - peer_action: Some(PeerAction::LowToleranceError), - }) - } - BlockError::DuplicateFullyImported(_) - | BlockError::DuplicateImportStatusUnknown(..) => { - // This can happen for many reasons. Head sync's can download multiples and parent - // lookups can download blocks before range sync - Ok(()) + // Peers are faulty if they send non-sequential blocks. + Some(PeerGroupAction::block_peer(PeerAction::LowToleranceError)) } BlockError::FutureSlot { present_slot, block_slot, } => { - if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot { + if *present_slot + FUTURE_SLOT_TOLERANCE >= *block_slot { // The block is too far in the future, drop it. warn!( msg = "block for future slot rejected, check your time", @@ -739,121 +726,90 @@ impl NetworkBeaconProcessor { FUTURE_SLOT_TOLERANCE, "Block is ahead of our slot clock" ); - } else { - // The block is in the future, but not too far. - debug!( - %present_slot, - %block_slot, - FUTURE_SLOT_TOLERANCE, - "Block is slightly ahead of our slot clock. Ignoring." - ); } - - Err(ChainSegmentFailed { - message: format!( - "Block with slot {} is higher than the current slot {}", - block_slot, present_slot - ), - // Peers are faulty if they send blocks from the future. - peer_action: Some(PeerAction::LowToleranceError), - }) + // Peers are faulty if they send blocks from the future. + Some(PeerGroupAction::block_peer(PeerAction::LowToleranceError)) } - BlockError::WouldRevertFinalizedSlot { .. } => { - debug!("Finalized or earlier block processed"); - Ok(()) + // Block is invalid + BlockError::StateRootMismatch { .. } + | BlockError::BlockSlotLimitReached + | BlockError::IncorrectBlockProposer { .. } + | BlockError::UnknownValidator { .. } + | BlockError::BlockIsNotLaterThanParent { .. } + | BlockError::NonLinearParentRoots + | BlockError::NonLinearSlots + | BlockError::PerBlockProcessingError(_) + | BlockError::InconsistentFork(_) + | BlockError::InvalidSignature(_) => { + Some(PeerGroupAction::block_peer(PeerAction::LowToleranceError)) } - BlockError::NotFinalizedDescendant { block_parent_root } => { - debug!( - "Not syncing to a chain that conflicts with the canonical or manual finalized checkpoint" - ); - Err(ChainSegmentFailed { - message: format!( - "Block with parent_root {} conflicts with our checkpoint state", - block_parent_root - ), - peer_action: Some(PeerAction::Fatal), - }) + // Currently blobs are served by the block peer + BlockError::InvalidBlobsSignature(_) => { + Some(PeerGroupAction::block_peer(PeerAction::LowToleranceError)) } - BlockError::GenesisBlock => { - debug!("Genesis block was processed"); - Ok(()) + BlockError::InvalidDataColumnsSignature(indices) => Some( + PeerGroupAction::column_peers(indices, PeerAction::LowToleranceError), + ), + BlockError::GenesisBlock + | BlockError::WouldRevertFinalizedSlot { .. } + | BlockError::DuplicateFullyImported(_) + | BlockError::DuplicateImportStatusUnknown(..) => { + // This can happen for many reasons. Head sync's can download multiples and parent + // lookups can download blocks before range sync + return Ok(()); } - BlockError::BeaconChainError(e) => { - warn!( - msg = "unexpected condition in processing block.", - outcome = ?e, - "BlockProcessingFailure" - ); - - Err(ChainSegmentFailed { - message: format!("Internal error whilst processing block: {:?}", e), - // Do not penalize peers for internal errors. - peer_action: None, - }) + // Not syncing to a chain that conflicts with the canonical or manual finalized checkpoint + BlockError::NotFinalizedDescendant { .. } | BlockError::WeakSubjectivityConflict => { + Some(PeerGroupAction::block_peer(PeerAction::Fatal)) } - ref err @ BlockError::ExecutionPayloadError(ref epe) => { - if !epe.penalize_peer() { + BlockError::AvailabilityCheck(e) => PeerGroupAction::from_availability_check_error(e), + BlockError::ExecutionPayloadError(e) => { + if !e.penalize_peer() { // These errors indicate an issue with the EL and not the `ChainSegment`. // Pause the syncing while the EL recovers - debug!( - outcome = "pausing sync", - ?err, - "Execution layer verification failed" - ); - Err(ChainSegmentFailed { - message: format!("Execution layer offline. Reason: {:?}", err), - // Do not penalize peers for internal errors. - peer_action: None, - }) + None } else { - debug!( - error = ?err, - "Invalid execution payload" - ); - Err(ChainSegmentFailed { - message: format!( - "Peer sent a block containing invalid execution payload. Reason: {:?}", - err - ), - peer_action: Some(PeerAction::LowToleranceError), - }) + Some(PeerGroupAction::block_peer(PeerAction::LowToleranceError)) } } - ref err @ BlockError::ParentExecutionPayloadInvalid { ref parent_root } => { + // We need to penalise harshly in case this represents an actual attack. In case + // of a faulty EL it will usually require manual intervention to fix anyway, so + // it's not too bad if we drop most of our peers. + BlockError::ParentExecutionPayloadInvalid { parent_root } => { warn!( ?parent_root, advice = "check execution node for corruption then restart it and Lighthouse", "Failed to sync chain built on invalid parent" ); - Err(ChainSegmentFailed { - message: format!("Peer sent invalid block. Reason: {err:?}"), - // We need to penalise harshly in case this represents an actual attack. In case - // of a faulty EL it will usually require manual intervention to fix anyway, so - // it's not too bad if we drop most of our peers. - peer_action: Some(PeerAction::LowToleranceError), - }) + Some(PeerGroupAction::block_peer(PeerAction::LowToleranceError)) } // Penalise peers for sending us banned blocks. BlockError::KnownInvalidExecutionPayload(block_root) => { - warn!(?block_root, "Received block known to be invalid",); - Err(ChainSegmentFailed { - message: format!("Banned block: {block_root:?}"), - peer_action: Some(PeerAction::Fatal), - }) + warn!(?block_root, "Received block known to be invalid"); + Some(PeerGroupAction::block_peer(PeerAction::Fatal)) } - other => { - debug!( - msg = "peer sent invalid block", - outcome = %other, - "Invalid block received" - ); + BlockError::Slashable => { + Some(PeerGroupAction::block_peer(PeerAction::MidToleranceError)) + } + // Do not penalize peers for internal errors. + // BlobNotRequired is never constructed on this path + // TODO(sync): Double check that all `BeaconChainError` variants are actually internal + // errors in thie code path + BlockError::BeaconChainError(_) + | BlockError::InternalError(_) + | BlockError::BlobNotRequired(_) => None, + // Do not use a fallback match, handle all errors explicitly + }; - Err(ChainSegmentFailed { - message: format!("Peer sent invalid block. Reason: {:?}", other), - // Do not penalize peers for internal errors. - peer_action: None, - }) - } + if peer_action.is_some() { + debug!(?error, "Range sync processing error"); + } else { + warn!(?error, "Unexpected range sync processing error"); } + + Err(ChainSegmentFailed { + message: format!("{error:?}"), + peer_action, + }) } } diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index fcef06271f..7b5701cc8d 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -18,6 +18,7 @@ use crate::sync::range_sync::{ }; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes}; +use itertools::Itertools; use lighthouse_network::service::api_types::Id; use lighthouse_network::types::{BackFillState, NetworkGlobals}; use lighthouse_network::{PeerAction, PeerId}; @@ -30,6 +31,8 @@ use std::sync::Arc; use tracing::{debug, error, info, instrument, warn}; use types::{Epoch, EthSpec}; +use super::range_sync::BatchPeers; + /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of /// blocks per batch are requested _at most_. A batch may request less blocks to account for /// already requested slots. There is a timeout for each batch request. If this value is too high, @@ -128,12 +131,6 @@ pub struct BackFillSync { /// Batches validated by this chain. validated_batches: u64, - /// We keep track of peers that are participating in the backfill sync. Unlike RangeSync, - /// BackFillSync uses all synced peers to download the chain from. If BackFillSync fails, we don't - /// want to penalize all our synced peers, so we use this variable to keep track of peers that - /// have participated and only penalize these peers if backfill sync fails. - participating_peers: HashSet, - /// When a backfill sync fails, we keep track of whether a new fully synced peer has joined. /// This signifies that we are able to attempt to restart a failed chain. restart_failed_sync: bool, @@ -181,7 +178,6 @@ impl BackFillSync { network_globals, current_processing_batch: None, validated_batches: 0, - participating_peers: HashSet::new(), restart_failed_sync: false, beacon_chain, }; @@ -302,25 +298,6 @@ impl BackFillSync { } } - /// A peer has disconnected. - /// If the peer has active batches, those are considered failed and re-requested. - #[instrument(parent = None, - level = "info", - fields(service = "backfill_sync"), - name = "backfill_sync", - skip_all - )] - #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] - pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), BackFillError> { - if matches!(self.state(), BackFillState::Failed) { - return Ok(()); - } - - // Remove the peer from the participation list - self.participating_peers.remove(peer_id); - Ok(()) - } - /// An RPC error has occurred. /// /// If the batch exists it is re-requested. @@ -378,7 +355,7 @@ impl BackFillSync { &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, - peer_id: &PeerId, + batch_peers: BatchPeers, request_id: Id, blocks: Vec>, ) -> Result { @@ -399,7 +376,7 @@ impl BackFillSync { return Ok(ProcessResult::Successful); } - match batch.download_completed(blocks, *peer_id) { + match batch.download_completed(blocks, batch_peers) { Ok(received) => { let awaiting_batches = self.processing_target.saturating_sub(batch_id) / BACKFILL_EPOCHS_PER_BATCH; @@ -440,7 +417,6 @@ impl BackFillSync { self.set_state(BackFillState::Failed); // Remove all batches and active requests and participating peers. self.batches.clear(); - self.participating_peers.clear(); self.restart_failed_sync = false; // Reset all downloading and processing targets @@ -573,7 +549,7 @@ impl BackFillSync { } }; - let Some(peer) = batch.processing_peer() else { + let Some(batch_peers) = batch.processing_peers() else { self.fail_sync(BackFillError::BatchInvalidState( batch_id, String::from("Peer does not exist"), @@ -585,8 +561,6 @@ impl BackFillSync { ?result, %batch, batch_epoch = %batch_id, - %peer, - client = %network.client_type(peer), "Backfill batch processed" ); @@ -628,31 +602,52 @@ impl BackFillSync { } BatchProcessResult::FaultyFailure { imported_blocks, - penalty, + peer_action, + error, } => { + // TODO(sync): De-dup between back and forwards sync + if let Some(penalty) = peer_action.block_peer { + // Penalize the peer appropiately. + network.report_peer(batch_peers.block(), penalty, "faulty_batch"); + } + + // Penalize each peer only once. Currently a peer_action does not mix different + // PeerAction levels. + for (peer, penalty) in peer_action + .column_peer + .iter() + .filter_map(|(column_index, penalty)| { + batch_peers + .column(column_index) + .map(|peer| (*peer, *penalty)) + }) + .unique() + { + network.report_peer(peer, penalty, "faulty_batch_column"); + } + match batch.processing_completed(BatchProcessingResult::FaultyFailure) { Err(e) => { // Batch was in the wrong state self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)) .map(|_| ProcessResult::Successful) } - Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { - // check that we have not exceeded the re-process retry counter - // If a batch has exceeded the invalid batch lookup attempts limit, it means - // that it is likely all peers are sending invalid batches - // repeatedly and are either malicious or faulty. We stop the backfill sync and - // report all synced peers that have participated. + Ok(BatchOperationOutcome::Failed { .. }) => { + // When backfill syncing post-PeerDAS we can't attribute fault to previous + // peers if a batch fails to process too many times. We have strict peer + // scoring for faulty errors, so participating peers that sent invalid + // data are already downscored. + // + // Because backfill sync deals with historical data that we can assert + // to be correct, once we import a batch that contains at least one + // block we are sure we got the right data. There's no need to penalize + // all participating peers in backfill sync if a batch fails warn!( - score_adjustment = %penalty, batch_epoch = %batch_id, - "Backfill batch failed to download. Penalizing peers" + error, + "Backfill sync failed after attempting to process batch too many times" ); - for peer in self.participating_peers.drain() { - // TODO(das): `participating_peers` only includes block peers. Should we - // penalize the custody column peers too? - network.report_peer(peer, *penalty, "backfill_batch_failed"); - } self.fail_sync(BackFillError::BatchProcessingFailed(batch_id)) .map(|_| ProcessResult::Successful) } @@ -781,37 +776,38 @@ impl BackFillSync { // The validated batch has been re-processed if attempt.hash != processed_attempt.hash { // The re-downloaded version was different. - if processed_attempt.peer_id != attempt.peer_id { + // TODO(das): should penalize other peers? + let valid_attempt_peer = processed_attempt.block_peer(); + let bad_attempt_peer = attempt.block_peer(); + if valid_attempt_peer != bad_attempt_peer { // A different peer sent the correct batch, the previous peer did not // We negatively score the original peer. let action = PeerAction::LowToleranceError; debug!( - batch_epoch = ?id, - score_adjustment = %action, - original_peer = %attempt.peer_id, - new_peer = %processed_attempt.peer_id, + batch_epoch = %id, score_adjustment = %action, + original_peer = %bad_attempt_peer, new_peer = %valid_attempt_peer, "Re-processed batch validated. Scoring original peer" ); network.report_peer( - attempt.peer_id, + bad_attempt_peer, action, - "backfill_reprocessed_original_peer", + "batch_reprocessed_original_peer", ); } else { // The same peer corrected it's previous mistake. There was an error, so we // negative score the original peer. let action = PeerAction::MidToleranceError; debug!( - batch_epoch = ?id, + batch_epoch = %id, score_adjustment = %action, - original_peer = %attempt.peer_id, - new_peer = %processed_attempt.peer_id, + original_peer = %bad_attempt_peer, + new_peer = %valid_attempt_peer, "Re-processed batch validated by the same peer" ); network.report_peer( - attempt.peer_id, + bad_attempt_peer, action, - "backfill_reprocessed_same_peer", + "batch_reprocessed_same_peer", ); } } @@ -926,10 +922,9 @@ impl BackFillSync { .cloned() .collect::>(); - let (request, is_blob_batch) = batch.to_blocks_by_range_request(); - let failed_peers = batch.failed_peers(); + let request = batch.to_blocks_by_range_request(); + let failed_peers = batch.failed_block_peers(); match network.block_components_by_range_request( - is_blob_batch, request, RangeRequestId::BackfillSync { batch_id }, &synced_peers, @@ -1089,12 +1084,7 @@ impl BackFillSync { self.include_next_batch(network) } Entry::Vacant(entry) => { - let batch_type = network.batch_type(batch_id); - entry.insert(BatchInfo::new( - &batch_id, - BACKFILL_EPOCHS_PER_BATCH, - batch_type, - )); + entry.insert(BatchInfo::new(&batch_id, BACKFILL_EPOCHS_PER_BATCH)); if self.would_complete(batch_id) { self.last_batch_downloaded = true; } diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 99428b0c80..68f1549125 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,15 +1,23 @@ use beacon_chain::{ block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, get_block_root, }; -use lighthouse_network::service::api_types::{ - BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, +use lighthouse_network::{ + service::api_types::{ + BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, + }, + PeerId, +}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, }; -use std::{collections::HashMap, sync::Arc}; use types::{ BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, - Hash256, RuntimeVariableList, SignedBeaconBlock, + Hash256, RuntimeVariableList, SignedBeaconBlock, Slot, }; +use super::range_sync::BatchPeers; + pub struct RangeBlockComponentsRequest { /// Blocks we have received awaiting for their corresponding sidecar. blocks_request: ByRangeRequest>>>, @@ -19,18 +27,21 @@ pub struct RangeBlockComponentsRequest { enum ByRangeRequest { Active(I), - Complete(T), + Complete(T, PeerId), } enum RangeBlockDataRequest { + /// All pre-deneb blocks NoData, + /// All post-Deneb blocks, regardless of if they have data or not Blobs(ByRangeRequest>>>), + /// All post-Fulu blocks, regardless of if they have data or not DataColumns { requests: HashMap< DataColumnsByRangeRequestId, ByRangeRequest>, >, - expected_custody_columns: Vec, + expected_column_to_peer: HashMap, }, } @@ -38,17 +49,20 @@ impl RangeBlockComponentsRequest { pub fn new( blocks_req_id: BlocksByRangeRequestId, blobs_req_id: Option, - data_columns: Option<(Vec, Vec)>, + data_columns: Option<( + Vec, + HashMap, + )>, ) -> Self { let block_data_request = if let Some(blobs_req_id) = blobs_req_id { RangeBlockDataRequest::Blobs(ByRangeRequest::Active(blobs_req_id)) - } else if let Some((requests, expected_custody_columns)) = data_columns { + } else if let Some((requests, expected_column_to_peer)) = data_columns { RangeBlockDataRequest::DataColumns { requests: requests .into_iter() .map(|id| (id, ByRangeRequest::Active(id))) .collect(), - expected_custody_columns, + expected_column_to_peer, } } else { RangeBlockDataRequest::NoData @@ -64,18 +78,20 @@ impl RangeBlockComponentsRequest { &mut self, req_id: BlocksByRangeRequestId, blocks: Vec>>, + peer_id: PeerId, ) -> Result<(), String> { - self.blocks_request.finish(req_id, blocks) + self.blocks_request.finish(req_id, blocks, peer_id) } pub fn add_blobs( &mut self, req_id: BlobsByRangeRequestId, blobs: Vec>>, + peer_id: PeerId, ) -> Result<(), String> { match &mut self.block_data_request { RangeBlockDataRequest::NoData => Err("received blobs but expected no data".to_owned()), - RangeBlockDataRequest::Blobs(ref mut req) => req.finish(req_id, blobs), + RangeBlockDataRequest::Blobs(ref mut req) => req.finish(req_id, blobs, peer_id), RangeBlockDataRequest::DataColumns { .. } => { Err("received blobs but expected data columns".to_owned()) } @@ -86,6 +102,7 @@ impl RangeBlockComponentsRequest { &mut self, req_id: DataColumnsByRangeRequestId, columns: Vec>>, + peer_id: PeerId, ) -> Result<(), String> { match &mut self.block_data_request { RangeBlockDataRequest::NoData => { @@ -100,48 +117,60 @@ impl RangeBlockComponentsRequest { let req = requests .get_mut(&req_id) .ok_or(format!("unknown data columns by range req_id {req_id}"))?; - req.finish(req_id, columns) + req.finish(req_id, columns, peer_id) } } } - pub fn responses(&self, spec: &ChainSpec) -> Option>, String>> { - let Some(blocks) = self.blocks_request.to_finished() else { + /// If all internal requests are complete returns a Vec of coupled RpcBlocks + #[allow(clippy::type_complexity)] + pub fn responses( + &self, + spec: &ChainSpec, + ) -> Option>, BatchPeers), String>> { + let Some((blocks, &block_peer)) = self.blocks_request.to_finished() else { return None; }; match &self.block_data_request { - RangeBlockDataRequest::NoData => { - Some(Self::responses_with_blobs(blocks.to_vec(), vec![], spec)) - } + RangeBlockDataRequest::NoData => Some( + Self::responses_with_blobs(blocks.to_vec(), vec![], spec) + .map(|blocks| (blocks, BatchPeers::new_from_block_peer(block_peer))), + ), RangeBlockDataRequest::Blobs(request) => { - let Some(blobs) = request.to_finished() else { + let Some((blobs, _blob_peer)) = request.to_finished() else { return None; }; - Some(Self::responses_with_blobs( - blocks.to_vec(), - blobs.to_vec(), - spec, - )) + Some( + Self::responses_with_blobs(blocks.to_vec(), blobs.to_vec(), spec) + .map(|blocks| (blocks, BatchPeers::new_from_block_peer(block_peer))), + ) } RangeBlockDataRequest::DataColumns { requests, - expected_custody_columns, + expected_column_to_peer, } => { let mut data_columns = vec![]; + let mut column_peers = HashMap::new(); for req in requests.values() { - let Some(data) = req.to_finished() else { + let Some((resp_columns, column_peer)) = req.to_finished() else { return None; }; - data_columns.extend(data.clone()) + data_columns.extend(resp_columns.clone()); + for column in resp_columns { + column_peers.insert(column.index, *column_peer); + } } - Some(Self::responses_with_custody_columns( - blocks.to_vec(), - data_columns, - expected_custody_columns, - spec, - )) + Some( + Self::responses_with_custody_columns( + blocks.to_vec(), + data_columns, + expected_column_to_peer.clone(), + spec, + ) + .map(|blocks| (blocks, BatchPeers::new(block_peer, column_peers))), + ) } } } @@ -199,106 +228,98 @@ impl RangeBlockComponentsRequest { fn responses_with_custody_columns( blocks: Vec>>, data_columns: DataColumnSidecarList, - expects_custody_columns: &[ColumnIndex], + expected_custody_columns: HashMap, spec: &ChainSpec, ) -> Result>, String> { // Group data columns by block_root and index - let mut data_columns_by_block = - HashMap::>>>::new(); + let mut custody_columns_by_block = HashMap::>>::new(); + let mut block_roots_by_slot = HashMap::>::new(); + let expected_custody_indices = expected_custody_columns.keys().cloned().collect::>(); for column in data_columns { let block_root = column.block_root(); let index = column.index; - if data_columns_by_block - .entry(block_root) + + block_roots_by_slot + .entry(column.slot()) .or_default() - .insert(index, column) - .is_some() - { + .insert(block_root); + + // Sanity check before casting to `CustodyDataColumn`. But this should never happen + if !expected_custody_columns.contains_key(&index) { return Err(format!( - "Repeated column block_root {block_root:?} index {index}" + "Received column not in expected custody indices {index}" )); } + + custody_columns_by_block + .entry(block_root) + .or_default() + .push(CustodyDataColumn::from_asserted_custody(column)); } // Now iterate all blocks ensuring that the block roots of each block and data column match, // plus we have columns for our custody requirements - let mut rpc_blocks = Vec::with_capacity(blocks.len()); + let rpc_blocks = blocks + .into_iter() + .map(|block| { + let block_root = get_block_root(&block); + block_roots_by_slot + .entry(block.slot()) + .or_default() + .insert(block_root); - for block in blocks { - let block_root = get_block_root(&block); - rpc_blocks.push(if block.num_expected_blobs() > 0 { - let Some(mut data_columns_by_index) = data_columns_by_block.remove(&block_root) - else { - // This PR ignores the fix from https://github.com/sigp/lighthouse/pull/5675 - // which allows blobs to not match blocks. - // TODO(das): on the initial version of PeerDAS the beacon chain does not check - // rpc custody requirements and dropping this check can allow the block to have - // an inconsistent DB. - return Err(format!("No columns for block {block_root:?} with data")); - }; - - let mut custody_columns = vec![]; - for index in expects_custody_columns { - let Some(data_column) = data_columns_by_index.remove(index) else { - return Err(format!("No column for block {block_root:?} index {index}")); - }; - // Safe to convert to `CustodyDataColumn`: we have asserted that the index of - // this column is in the set of `expects_custody_columns` and with the expected - // block root, so for the expected epoch of this batch. - custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column)); - } - - // Assert that there are no columns left - if !data_columns_by_index.is_empty() { - let remaining_indices = data_columns_by_index.keys().collect::>(); - return Err(format!( - "Not all columns consumed for block {block_root:?}: {remaining_indices:?}" - )); - } + let custody_columns = custody_columns_by_block + .remove(&block_root) + .unwrap_or_default(); RpcBlock::new_with_custody_columns( Some(block_root), block, custody_columns, - expects_custody_columns.len(), + expected_custody_indices.clone(), spec, ) - .map_err(|e| format!("{e:?}"))? - } else { - // Block has no data, expects zero columns - RpcBlock::new_without_blobs(Some(block_root), block, 0) - }); - } + .map_err(|e| format!("{e:?}")) + }) + .collect::, _>>()?; // Assert that there are no columns left for other blocks - if !data_columns_by_block.is_empty() { - let remaining_roots = data_columns_by_block.keys().collect::>(); + if !custody_columns_by_block.is_empty() { + let remaining_roots = custody_columns_by_block.keys().collect::>(); return Err(format!("Not all columns consumed: {remaining_roots:?}")); } + for (_slot, block_roots) in block_roots_by_slot { + if block_roots.len() > 1 { + // TODO: Some peer(s) are faulty or malicious. This batch will fail processing but + // we want to send it to the process to better attribute fault. Maybe warn log for + // now and track it in a metric? + } + } + Ok(rpc_blocks) } } impl ByRangeRequest { - fn finish(&mut self, id: I, data: T) -> Result<(), String> { + fn finish(&mut self, id: I, data: T, peer_id: PeerId) -> Result<(), String> { match self { Self::Active(expected_id) => { if expected_id != &id { return Err(format!("unexpected req_id expected {expected_id} got {id}")); } - *self = Self::Complete(data); + *self = Self::Complete(data, peer_id); Ok(()) } - Self::Complete(_) => Err("request already complete".to_owned()), + Self::Complete(_, _) => Err("request already complete".to_owned()), } } - fn to_finished(&self) -> Option<&T> { + fn to_finished(&self) -> Option<(&T, &PeerId)> { match self { Self::Active(_) => None, - Self::Complete(data) => Some(data), + Self::Complete(data, peer_id) => Some((data, peer_id)), } } } @@ -309,12 +330,15 @@ mod tests { use beacon_chain::test_utils::{ generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_spec, NumBlobs, }; - use lighthouse_network::service::api_types::{ - BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, - DataColumnsByRangeRequestId, Id, RangeRequestId, + use lighthouse_network::{ + service::api_types::{ + BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, + DataColumnsByRangeRequestId, Id, RangeRequestId, + }, + PeerId, }; use rand::SeedableRng; - use std::sync::Arc; + use std::{collections::HashMap, sync::Arc}; use types::{test_utils::XorShiftRng, Epoch, ForkName, MinimalEthSpec as E, SignedBeaconBlock}; fn components_id() -> ComponentsByRangeRequestId { @@ -359,6 +383,7 @@ mod tests { #[test] fn no_blobs_into_responses() { let spec = test_spec::(); + let peer = PeerId::random(); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..4) .map(|_| { @@ -372,7 +397,7 @@ mod tests { let mut info = RangeBlockComponentsRequest::::new(blocks_req_id, None, None); // Send blocks and complete terminate response - info.add_blocks(blocks_req_id, blocks).unwrap(); + info.add_blocks(blocks_req_id, blocks, peer).unwrap(); // Assert response is finished and RpcBlocks can be constructed info.responses(&test_spec::()).unwrap().unwrap(); @@ -381,6 +406,7 @@ mod tests { #[test] fn empty_blobs_into_responses() { let spec = test_spec::(); + let peer = PeerId::random(); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..4) .map(|_| { @@ -403,9 +429,9 @@ mod tests { RangeBlockComponentsRequest::::new(blocks_req_id, Some(blobs_req_id), None); // Send blocks and complete terminate response - info.add_blocks(blocks_req_id, blocks).unwrap(); + info.add_blocks(blocks_req_id, blocks, peer).unwrap(); // Expect no blobs returned - info.add_blobs(blobs_req_id, vec![]).unwrap(); + info.add_blobs(blobs_req_id, vec![], peer).unwrap(); // Assert response is finished and RpcBlocks can be constructed, even if blobs weren't returned. // This makes sure we don't expect blobs here when they have expired. Checking this logic should @@ -416,7 +442,8 @@ mod tests { #[test] fn rpc_block_with_custody_columns() { let spec = test_spec::(); - let expects_custody_columns = vec![1, 2, 3, 4]; + let peer = PeerId::random(); + let expects_custody_columns = [1, 2, 3, 4]; let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..4) .map(|_| { @@ -436,15 +463,22 @@ mod tests { .enumerate() .map(|(i, _)| columns_id(i as Id, components_id)) .collect::>(); + + let column_to_peer = expects_custody_columns + .iter() + .map(|index| (*index, peer)) + .collect::>(); + let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, None, - Some((columns_req_id.clone(), expects_custody_columns.clone())), + Some((columns_req_id.clone(), column_to_peer)), ); // Send blocks and complete terminate response info.add_blocks( blocks_req_id, blocks.iter().map(|b| b.0.clone().into()).collect(), + peer, ) .unwrap(); // Assert response is not finished @@ -458,6 +492,7 @@ mod tests { .iter() .flat_map(|b| b.1.iter().filter(|d| d.index == column_index).cloned()) .collect(), + peer, ) .unwrap(); @@ -476,12 +511,13 @@ mod tests { #[test] fn rpc_block_with_custody_columns_batched() { let spec = test_spec::(); + let peer = PeerId::random(); let batched_column_requests = [vec![1_u64, 2], vec![3, 4]]; let expects_custody_columns = batched_column_requests .iter() .flatten() - .cloned() - .collect::>(); + .map(|index| (*index, peer)) + .collect::>(); let custody_column_request_ids = (0..batched_column_requests.len() as u32).collect::>(); let num_of_data_column_requests = custody_column_request_ids.len(); @@ -516,6 +552,7 @@ mod tests { info.add_blocks( blocks_req_id, blocks.iter().map(|b| b.0.clone().into()).collect(), + peer, ) .unwrap(); // Assert response is not finished @@ -533,6 +570,7 @@ mod tests { .cloned() }) .collect::>(), + peer, ) .unwrap(); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 473881f182..3c94793941 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -41,7 +41,9 @@ use super::network_context::{ use super::peer_sampling::{Sampling, SamplingConfig, SamplingResult}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; -use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor}; +use crate::network_beacon_processor::{ + ChainSegmentProcessId, NetworkBeaconProcessor, PeerGroupAction, +}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::{ @@ -61,8 +63,8 @@ use lighthouse_network::service::api_types::{ SamplingId, SamplingRequester, SingleLookupReqId, SyncRequestId, }; use lighthouse_network::types::{NetworkGlobals, SyncState}; +use lighthouse_network::PeerId; use lighthouse_network::SyncInfo; -use lighthouse_network::{PeerAction, PeerId}; use logging::crit; use lru_cache::LRUTimeCache; use std::ops::Sub; @@ -218,7 +220,8 @@ pub enum BatchProcessResult { /// The batch processing failed. It carries whether the processing imported any block. FaultyFailure { imported_blocks: usize, - penalty: PeerAction, + peer_action: PeerGroupAction, + error: String, }, NonFaultyFailure, } @@ -528,7 +531,6 @@ impl SyncManager { // Remove peer from all data structures self.range_sync.peer_disconnect(&mut self.network, peer_id); - let _ = self.backfill_sync.peer_disconnected(peer_id); self.block_lookups.peer_disconnected(peer_id); // Regardless of the outcome, we update the sync status. @@ -1271,17 +1273,18 @@ impl SyncManager { peer_id: PeerId, range_block_component: RangeBlockComponent, ) { - if let Some(resp) = self - .network - .range_block_component_response(range_request_id, range_block_component) - { + if let Some(resp) = self.network.range_block_component_response( + range_request_id, + peer_id, + range_block_component, + ) { match resp { - Ok(blocks) => { + Ok((blocks, batch_peers)) => { match range_request_id.requester { RangeRequestId::RangeSync { chain_id, batch_id } => { self.range_sync.blocks_by_range_response( &mut self.network, - peer_id, + batch_peers, chain_id, batch_id, range_request_id.id, @@ -1293,7 +1296,7 @@ impl SyncManager { match self.backfill_sync.on_block_response( &mut self.network, batch_id, - &peer_id, + batch_peers, range_request_id.id, blocks, ) { diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 58641f8606..50b39fe72e 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -5,7 +5,7 @@ use self::custody::{ActiveCustodyRequest, Error as CustodyRequestError}; pub use self::requests::{BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest}; use super::block_sidecar_coupling::RangeBlockComponentsRequest; use super::manager::BlockProcessType; -use super::range_sync::ByRangeRequestType; +use super::range_sync::{BatchPeers, ByRangeRequestType}; use super::SyncMessage; use crate::metrics; use crate::network_beacon_processor::NetworkBeaconProcessor; @@ -443,12 +443,14 @@ impl SyncNetworkContext { /// A blocks by range request sent by the range sync algorithm pub fn block_components_by_range_request( &mut self, - batch_type: ByRangeRequestType, request: BlocksByRangeRequest, requester: RangeRequestId, peers: &HashSet, peers_to_deprioritize: &HashSet, ) -> Result { + let batch_epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); + let batch_type = self.batch_type(batch_epoch); + let active_request_count_by_peer = self.active_request_count_by_peer(); let Some(block_peer) = peers @@ -510,7 +512,12 @@ impl SyncNetworkContext { let data_column_requests = columns_by_range_peers_to_request .map(|columns_by_range_peers_to_request| { - columns_by_range_peers_to_request + let column_to_peer_map = columns_by_range_peers_to_request + .iter() + .flat_map(|(peer_id, columns)| columns.iter().map(|column| (*column, *peer_id))) + .collect::>(); + + let requests = columns_by_range_peers_to_request .into_iter() .map(|(peer_id, columns)| { self.send_data_columns_by_range_request( @@ -523,25 +530,14 @@ impl SyncNetworkContext { id, ) }) - .collect::, _>>() + .collect::, _>>()?; + + Ok((requests, column_to_peer_map)) }) .transpose()?; - let info = RangeBlockComponentsRequest::new( - blocks_req_id, - blobs_req_id, - data_column_requests.map(|data_column_requests| { - ( - data_column_requests, - self.network_globals() - .sampling_columns - .clone() - .iter() - .copied() - .collect(), - ) - }), - ); + let info = + RangeBlockComponentsRequest::new(blocks_req_id, blobs_req_id, data_column_requests); self.components_by_range_requests.insert(id, info); Ok(id.id) @@ -602,13 +598,16 @@ impl SyncNetworkContext { Ok(columns_to_request_by_peer) } - /// Received a blocks by range or blobs by range response for a request that couples blocks ' - /// and blobs. + /// Received a _by_range response for a request that couples blocks and its data + /// + /// `peer_id` is the peer that served this individual RPC _by_range response. + #[allow(clippy::type_complexity)] pub fn range_block_component_response( &mut self, id: ComponentsByRangeRequestId, + peer_id: PeerId, range_block_component: RangeBlockComponent, - ) -> Option>, RpcResponseError>> { + ) -> Option>, BatchPeers), RpcResponseError>> { let Entry::Occupied(mut entry) = self.components_by_range_requests.entry(id) else { metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["range_blocks"]); return None; @@ -619,18 +618,18 @@ impl SyncNetworkContext { match range_block_component { RangeBlockComponent::Block(req_id, resp) => resp.and_then(|(blocks, _)| { request - .add_blocks(req_id, blocks) + .add_blocks(req_id, blocks, peer_id) .map_err(RpcResponseError::BlockComponentCouplingError) }), RangeBlockComponent::Blob(req_id, resp) => resp.and_then(|(blobs, _)| { request - .add_blobs(req_id, blobs) + .add_blobs(req_id, blobs, peer_id) .map_err(RpcResponseError::BlockComponentCouplingError) }), RangeBlockComponent::CustodyColumns(req_id, resp) => { resp.and_then(|(custody_columns, _)| { request - .add_custody_columns(req_id, custody_columns) + .add_custody_columns(req_id, custody_columns, peer_id) .map_err(RpcResponseError::BlockComponentCouplingError) }) } @@ -1154,7 +1153,7 @@ impl SyncNetworkContext { ); let _enter = span.enter(); - debug!(%peer_id, %action, %msg, "Sync reporting peer"); + debug!(%peer_id, %action, %msg, client = %self.client_type(&peer_id), "Sync reporting peer"); self.network_send .send(NetworkMessage::ReportPeer { peer_id, @@ -1215,7 +1214,7 @@ impl SyncNetworkContext { /// Check whether a batch for this epoch (and only this epoch) should request just blocks or /// blocks and blobs. - pub fn batch_type(&self, epoch: types::Epoch) -> ByRangeRequestType { + fn batch_type(&self, epoch: types::Epoch) -> ByRangeRequestType { // Induces a compile time panic if this doesn't hold true. #[allow(clippy::assertions_on_constants)] const _: () = assert!( diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 963b633ed6..cd70a2e7eb 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -28,11 +28,17 @@ mod data_columns_by_root; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupVerifyError { - NotEnoughResponsesReturned { actual: usize }, + NotEnoughResponsesReturned { + actual: usize, + }, TooManyResponses, UnrequestedBlockRoot(Hash256), UnrequestedIndex(u64), - UnrequestedSlot(Slot), + UnrequestedSlot { + slot: Slot, + start_slot: Slot, + end_slot: Slot, + }, InvalidInclusionProof, DuplicatedData(Slot, u64), InternalError(String), diff --git a/beacon_node/network/src/sync/network_context/requests/blobs_by_range.rs b/beacon_node/network/src/sync/network_context/requests/blobs_by_range.rs index 9c6f516199..8a9a8c9813 100644 --- a/beacon_node/network/src/sync/network_context/requests/blobs_by_range.rs +++ b/beacon_node/network/src/sync/network_context/requests/blobs_by_range.rs @@ -1,7 +1,7 @@ use super::{ActiveRequestItems, LookupVerifyError}; use lighthouse_network::rpc::methods::BlobsByRangeRequest; use std::sync::Arc; -use types::{BlobSidecar, EthSpec}; +use types::{BlobSidecar, EthSpec, Slot}; /// Accumulates results of a blobs_by_range request. Only returns items after receiving the /// stream termination. @@ -25,10 +25,15 @@ impl ActiveRequestItems for BlobsByRangeRequestItems { type Item = Arc>; fn add(&mut self, blob: Self::Item) -> Result { - if blob.slot() < self.request.start_slot - || blob.slot() >= self.request.start_slot + self.request.count - { - return Err(LookupVerifyError::UnrequestedSlot(blob.slot())); + let start_slot = Slot::new(self.request.start_slot); + let end_slot = start_slot + Slot::new(self.request.count); + + if blob.slot() < start_slot || blob.slot() >= end_slot { + return Err(LookupVerifyError::UnrequestedSlot { + slot: blob.slot(), + start_slot, + end_slot, + }); } if blob.index >= self.max_blobs_per_block { return Err(LookupVerifyError::UnrequestedIndex(blob.index)); diff --git a/beacon_node/network/src/sync/network_context/requests/blocks_by_range.rs b/beacon_node/network/src/sync/network_context/requests/blocks_by_range.rs index c7d2dda01e..ae39ac1d76 100644 --- a/beacon_node/network/src/sync/network_context/requests/blocks_by_range.rs +++ b/beacon_node/network/src/sync/network_context/requests/blocks_by_range.rs @@ -1,7 +1,7 @@ use super::{ActiveRequestItems, LookupVerifyError}; use lighthouse_network::rpc::BlocksByRangeRequest; use std::sync::Arc; -use types::{EthSpec, SignedBeaconBlock}; +use types::{EthSpec, SignedBeaconBlock, Slot}; /// Accumulates results of a blocks_by_range request. Only returns items after receiving the /// stream termination. @@ -23,10 +23,15 @@ impl ActiveRequestItems for BlocksByRangeRequestItems { type Item = Arc>; fn add(&mut self, block: Self::Item) -> Result { - if block.slot().as_u64() < *self.request.start_slot() - || block.slot().as_u64() >= self.request.start_slot() + self.request.count() - { - return Err(LookupVerifyError::UnrequestedSlot(block.slot())); + let start_slot = Slot::new(*self.request.start_slot()); + let end_slot = start_slot + Slot::new(*self.request.count()); + + if block.slot() < start_slot || block.slot() >= end_slot { + return Err(LookupVerifyError::UnrequestedSlot { + slot: block.slot(), + start_slot, + end_slot, + }); } if self .items diff --git a/beacon_node/network/src/sync/network_context/requests/data_columns_by_range.rs b/beacon_node/network/src/sync/network_context/requests/data_columns_by_range.rs index 9dabb2defa..276ede93c1 100644 --- a/beacon_node/network/src/sync/network_context/requests/data_columns_by_range.rs +++ b/beacon_node/network/src/sync/network_context/requests/data_columns_by_range.rs @@ -1,7 +1,7 @@ use super::{ActiveRequestItems, LookupVerifyError}; use lighthouse_network::rpc::methods::DataColumnsByRangeRequest; use std::sync::Arc; -use types::{DataColumnSidecar, EthSpec}; +use types::{DataColumnSidecar, EthSpec, Slot}; /// Accumulates results of a data_columns_by_range request. Only returns items after receiving the /// stream termination. @@ -23,10 +23,15 @@ impl ActiveRequestItems for DataColumnsByRangeRequestItems { type Item = Arc>; fn add(&mut self, data_column: Self::Item) -> Result { - if data_column.slot() < self.request.start_slot - || data_column.slot() >= self.request.start_slot + self.request.count - { - return Err(LookupVerifyError::UnrequestedSlot(data_column.slot())); + let start_slot = Slot::new(self.request.start_slot); + let end_slot = start_slot + Slot::new(self.request.count); + + if data_column.slot() < start_slot || data_column.slot() >= end_slot { + return Err(LookupVerifyError::UnrequestedSlot { + slot: data_column.slot(), + start_slot, + end_slot, + }); } if !self.request.columns.contains(&data_column.index) { return Err(LookupVerifyError::UnrequestedIndex(data_column.index)); diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 264f83ee82..72598a2540 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -2,13 +2,13 @@ use beacon_chain::block_verification_types::RpcBlock; use lighthouse_network::rpc::methods::BlocksByRangeRequest; use lighthouse_network::service::api_types::Id; use lighthouse_network::PeerId; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::fmt; use std::hash::{Hash, Hasher}; use std::ops::Sub; use std::time::{Duration, Instant}; use strum::Display; -use types::{Epoch, EthSpec, Slot}; +use types::{ColumnIndex, Epoch, EthSpec, Slot}; /// The number of times to retry a batch before it is considered failed. const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5; @@ -26,6 +26,35 @@ pub enum ByRangeRequestType { Blocks, } +#[derive(Clone, Debug)] +pub struct BatchPeers { + block_peer: PeerId, + column_peers: HashMap, +} + +impl BatchPeers { + pub fn new_from_block_peer(block_peer: PeerId) -> Self { + Self { + block_peer, + column_peers: <_>::default(), + } + } + pub fn new(block_peer: PeerId, column_peers: HashMap) -> Self { + Self { + block_peer, + column_peers, + } + } + + pub fn block(&self) -> PeerId { + self.block_peer + } + + pub fn column(&self, index: &ColumnIndex) -> Option<&PeerId> { + self.column_peers.get(index) + } +} + /// Allows customisation of the above constants used in other sync methods such as BackFillSync. pub trait BatchConfig { /// The maximum batch download attempts. @@ -110,8 +139,6 @@ pub struct BatchInfo { failed_download_attempts: Vec>, /// State of the batch. state: BatchState, - /// Whether this batch contains all blocks or all blocks and blobs. - batch_type: ByRangeRequestType, /// Pin the generic marker: std::marker::PhantomData, } @@ -134,7 +161,7 @@ pub enum BatchState { /// The batch is being downloaded. Downloading(Id), /// The batch has been completely downloaded and is ready for processing. - AwaitingProcessing(PeerId, Vec>, Instant), + AwaitingProcessing(BatchPeers, Vec>, Instant), /// The batch is being processed. Processing(Attempt), /// The batch was successfully processed and is waiting to be validated. @@ -171,7 +198,7 @@ impl BatchInfo { /// fork boundary will be of mixed type (all blocks and one last blockblob), and I don't want to /// deal with this for now. /// This means finalization might be slower in deneb - pub fn new(start_epoch: &Epoch, num_of_epochs: u64, batch_type: ByRangeRequestType) -> Self { + pub fn new(start_epoch: &Epoch, num_of_epochs: u64) -> Self { let start_slot = start_epoch.start_slot(E::slots_per_epoch()); let end_slot = start_slot + num_of_epochs * E::slots_per_epoch(); BatchInfo { @@ -181,20 +208,22 @@ impl BatchInfo { failed_download_attempts: Vec::new(), non_faulty_processing_attempts: 0, state: BatchState::AwaitingDownload, - batch_type, marker: std::marker::PhantomData, } } /// Gives a list of peers from which this batch has had a failed download or processing /// attempt. - pub fn failed_peers(&self) -> HashSet { + /// + /// TODO(das): Returns only block peers to keep the mainnet path equivalent. The failed peers + /// mechanism is broken for PeerDAS and will be fixed with https://github.com/sigp/lighthouse/issues/6258 + pub fn failed_block_peers(&self) -> HashSet { let mut peers = HashSet::with_capacity( self.failed_processing_attempts.len() + self.failed_download_attempts.len(), ); for attempt in &self.failed_processing_attempts { - peers.insert(attempt.peer_id); + peers.insert(attempt.peers.block()); } for peer in self.failed_download_attempts.iter().flatten() { @@ -212,13 +241,13 @@ impl BatchInfo { false } - /// Returns the peer that is currently responsible for progressing the state of the batch. - pub fn processing_peer(&self) -> Option<&PeerId> { + /// Returns the peers that provided this batch's downloaded contents + pub fn processing_peers(&self) -> Option<&BatchPeers> { match &self.state { BatchState::AwaitingDownload | BatchState::Failed | BatchState::Downloading(..) => None, - BatchState::AwaitingProcessing(peer_id, _, _) - | BatchState::Processing(Attempt { peer_id, .. }) - | BatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(peer_id), + BatchState::AwaitingProcessing(peers, _, _) + | BatchState::Processing(Attempt { peers, .. }) + | BatchState::AwaitingValidation(Attempt { peers, .. }) => Some(peers), BatchState::Poisoned => unreachable!("Poisoned batch"), } } @@ -237,13 +266,10 @@ impl BatchInfo { } /// Returns a BlocksByRange request associated with the batch. - pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ByRangeRequestType) { - ( - BlocksByRangeRequest::new( - self.start_slot.into(), - self.end_slot.sub(self.start_slot).into(), - ), - self.batch_type, + pub fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest { + BlocksByRangeRequest::new( + self.start_slot.into(), + self.end_slot.sub(self.start_slot).into(), ) } @@ -275,12 +301,12 @@ impl BatchInfo { pub fn download_completed( &mut self, blocks: Vec>, - peer: PeerId, + batch_peers: BatchPeers, ) -> Result { match self.state.poison() { - BatchState::Downloading(_) => { + BatchState::Downloading(_request_id) => { let received = blocks.len(); - self.state = BatchState::AwaitingProcessing(peer, blocks, Instant::now()); + self.state = BatchState::AwaitingProcessing(batch_peers, blocks, Instant::now()); Ok(received) } BatchState::Poisoned => unreachable!("Poisoned batch"), @@ -305,10 +331,9 @@ impl BatchInfo { peer: Option, ) -> Result { match self.state.poison() { - BatchState::Downloading(_) => { + BatchState::Downloading(_request_id) => { // register the attempt and check if the batch can be tried again self.failed_download_attempts.push(peer); - self.state = if self.failed_download_attempts.len() >= B::max_batch_download_attempts() as usize { @@ -349,8 +374,8 @@ impl BatchInfo { pub fn start_processing(&mut self) -> Result<(Vec>, Duration), WrongState> { match self.state.poison() { - BatchState::AwaitingProcessing(peer, blocks, start_instant) => { - self.state = BatchState::Processing(Attempt::new::(peer, &blocks)); + BatchState::AwaitingProcessing(peers, blocks, start_instant) => { + self.state = BatchState::Processing(Attempt::new::(peers, &blocks)); Ok((blocks, start_instant.elapsed())) } BatchState::Poisoned => unreachable!("Poisoned batch"), @@ -438,39 +463,41 @@ impl BatchInfo { } } -/// Represents a peer's attempt and providing the result for this batch. +/// Represents a batch attempt awaiting validation /// -/// Invalid attempts will downscore a peer. -#[derive(PartialEq, Debug)] +/// Invalid attempts will downscore its peers +#[derive(Debug)] pub struct Attempt { - /// The peer that made the attempt. - pub peer_id: PeerId, + /// The peers that served this batch contents + peers: BatchPeers, /// The hash of the blocks of the attempt. pub hash: u64, } impl Attempt { - fn new(peer_id: PeerId, blocks: &[RpcBlock]) -> Self { + fn new(peers: BatchPeers, blocks: &[RpcBlock]) -> Self { let hash = B::batch_attempt_hash(blocks); - Attempt { peer_id, hash } + Attempt { peers, hash } + } + + pub fn block_peer(&self) -> PeerId { + self.peers.block() } } impl std::fmt::Debug for BatchState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - BatchState::Processing(Attempt { - ref peer_id, - hash: _, - }) => write!(f, "Processing({})", peer_id), - BatchState::AwaitingValidation(Attempt { - ref peer_id, - hash: _, - }) => write!(f, "AwaitingValidation({})", peer_id), + BatchState::Processing(Attempt { ref peers, hash: _ }) => { + write!(f, "Processing({})", peers.block()) + } + BatchState::AwaitingValidation(Attempt { ref peers, hash: _ }) => { + write!(f, "AwaitingValidation({})", peers.block()) + } BatchState::AwaitingDownload => f.write_str("AwaitingDownload"), BatchState::Failed => f.write_str("Failed"), - BatchState::AwaitingProcessing(ref peer, ref blocks, _) => { - write!(f, "AwaitingProcessing({}, {} blocks)", peer, blocks.len()) + BatchState::AwaitingProcessing(_, ref blocks, _) => { + write!(f, "AwaitingProcessing({} blocks)", blocks.len()) } BatchState::Downloading(request_id) => { write!(f, "Downloading({})", request_id) diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index be01734417..ba809a14ba 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,4 +1,4 @@ -use super::batch::{BatchInfo, BatchProcessingResult, BatchState}; +use super::batch::{BatchInfo, BatchPeers, BatchProcessingResult, BatchState}; use super::RangeSyncType; use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; @@ -6,6 +6,7 @@ use crate::sync::network_context::{RangeRequestId, RpcRequestSendError, RpcRespo use crate::sync::{network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::BeaconChainTypes; +use itertools::Itertools; use lighthouse_network::service::api_types::Id; use lighthouse_network::{PeerAction, PeerId}; use logging::crit; @@ -216,7 +217,7 @@ impl SyncingChain { &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, - peer_id: &PeerId, + batch_peers: BatchPeers, request_id: Id, blocks: Vec>, ) -> ProcessingResult { @@ -244,8 +245,7 @@ impl SyncingChain { // A stream termination has been sent. This batch has ended. Process a completed batch. // Remove the request from the peer's active batches - // TODO(das): should use peer group here https://github.com/sigp/lighthouse/issues/6258 - let received = batch.download_completed(blocks, *peer_id)?; + let received = batch.download_completed(blocks, batch_peers)?; let awaiting_batches = batch_id .saturating_sub(self.optimistic_start.unwrap_or(self.processing_target)) / EPOCHS_PER_BATCH; @@ -447,7 +447,7 @@ impl SyncingChain { } }; - let peer = batch.processing_peer().cloned().ok_or_else(|| { + let batch_peers = batch.processing_peers().ok_or_else(|| { RemoveChain::WrongBatchState(format!( "Processing target is in wrong state: {:?}", batch.state(), @@ -458,7 +458,6 @@ impl SyncingChain { debug!( result = ?result, batch_epoch = %batch_id, - client = %network.client_type(&peer), batch_state = ?batch_state, ?batch, "Batch processing result" @@ -521,10 +520,30 @@ impl SyncingChain { } BatchProcessResult::FaultyFailure { imported_blocks, - penalty, + peer_action, + // TODO(sync): propagate error in logs + error: _, } => { - // Penalize the peer appropiately. - network.report_peer(peer, *penalty, "faulty_batch"); + // TODO(sync): De-dup between back and forwards sync + if let Some(penalty) = peer_action.block_peer { + // Penalize the peer appropiately. + network.report_peer(batch_peers.block(), penalty, "faulty_batch"); + } + + // Penalize each peer only once. Currently a peer_action does not mix different + // PeerAction levels. + for (peer, penalty) in peer_action + .column_peer + .iter() + .filter_map(|(column_index, penalty)| { + batch_peers + .column(column_index) + .map(|peer| (*peer, *penalty)) + }) + .unique() + { + network.report_peer(peer, penalty, "faulty_batch_column"); + } // Check if this batch is allowed to continue match batch.processing_completed(BatchProcessingResult::FaultyFailure)? { @@ -540,6 +559,11 @@ impl SyncingChain { self.handle_invalid_batch(network, batch_id) } BatchOperationOutcome::Failed { blacklist } => { + // TODO(das): what peer action should we apply to the rest of + // peers? Say a batch repeatedly fails because a custody peer is not + // sending us its custody columns + let penalty = PeerAction::LowToleranceError; + // Check that we have not exceeded the re-process retry counter, // If a batch has exceeded the invalid batch lookup attempts limit, it means // that it is likely all peers in this chain are are sending invalid batches @@ -554,7 +578,7 @@ impl SyncingChain { ); for peer in self.peers.drain() { - network.report_peer(peer, *penalty, "faulty_chain"); + network.report_peer(peer, penalty, "faulty_chain"); } Err(RemoveChain::ChainFailed { blacklist, @@ -633,17 +657,20 @@ impl SyncingChain { // The validated batch has been re-processed if attempt.hash != processed_attempt.hash { // The re-downloaded version was different - if processed_attempt.peer_id != attempt.peer_id { + // TODO(das): should penalize other peers? + let valid_attempt_peer = processed_attempt.block_peer(); + let bad_attempt_peer = attempt.block_peer(); + if valid_attempt_peer != bad_attempt_peer { // A different peer sent the correct batch, the previous peer did not // We negatively score the original peer. let action = PeerAction::LowToleranceError; debug!( batch_epoch = %id, score_adjustment = %action, - original_peer = %attempt.peer_id, new_peer = %processed_attempt.peer_id, + original_peer = %bad_attempt_peer, new_peer = %valid_attempt_peer, "Re-processed batch validated. Scoring original peer" ); network.report_peer( - attempt.peer_id, + bad_attempt_peer, action, "batch_reprocessed_original_peer", ); @@ -654,12 +681,12 @@ impl SyncingChain { debug!( batch_epoch = %id, score_adjustment = %action, - original_peer = %attempt.peer_id, - new_peer = %processed_attempt.peer_id, + original_peer = %bad_attempt_peer, + new_peer = %valid_attempt_peer, "Re-processed batch validated by the same peer" ); network.report_peer( - attempt.peer_id, + bad_attempt_peer, action, "batch_reprocessed_same_peer", ); @@ -888,8 +915,8 @@ impl SyncingChain { ) -> ProcessingResult { let batch_state = self.visualize_batch_state(); if let Some(batch) = self.batches.get_mut(&batch_id) { - let (request, batch_type) = batch.to_blocks_by_range_request(); - let failed_peers = batch.failed_peers(); + let request = batch.to_blocks_by_range_request(); + let failed_peers = batch.failed_block_peers(); // TODO(das): we should request only from peers that are part of this SyncingChain. // However, then we hit the NoPeer error frequently which causes the batch to fail and @@ -903,7 +930,6 @@ impl SyncingChain { .collect::>(); match network.block_components_by_range_request( - batch_type, request, RangeRequestId::RangeSync { chain_id: self.id, @@ -999,8 +1025,7 @@ impl SyncingChain { } if let Entry::Vacant(entry) = self.batches.entry(epoch) { - let batch_type = network.batch_type(epoch); - let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH, batch_type); + let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH); entry.insert(optimistic_batch); self.send_batch(network, epoch)?; } @@ -1101,8 +1126,7 @@ impl SyncingChain { self.include_next_batch(network) } Entry::Vacant(entry) => { - let batch_type = network.batch_type(next_batch_id); - entry.insert(BatchInfo::new(&next_batch_id, EPOCHS_PER_BATCH, batch_type)); + entry.insert(BatchInfo::new(&next_batch_id, EPOCHS_PER_BATCH)); self.to_be_downloaded += EPOCHS_PER_BATCH; Some(next_batch_id) } diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index 8f881fba90..1218e0cd09 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -8,7 +8,7 @@ mod range; mod sync_type; pub use batch::{ - BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, + BatchConfig, BatchInfo, BatchOperationOutcome, BatchPeers, BatchProcessingResult, BatchState, ByRangeRequestType, }; pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH}; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 1ec1440991..e2c076484a 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -42,6 +42,7 @@ use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain}; use super::chain_collection::{ChainCollection, SyncChainStatus}; use super::sync_type::RangeSyncType; +use super::BatchPeers; use crate::metrics; use crate::status::ToStatusMessage; use crate::sync::network_context::{RpcResponseError, SyncNetworkContext}; @@ -227,7 +228,7 @@ where pub fn blocks_by_range_response( &mut self, network: &mut SyncNetworkContext, - peer_id: PeerId, + batch_peers: BatchPeers, chain_id: ChainId, batch_id: BatchId, request_id: Id, @@ -235,7 +236,7 @@ where ) { // check if this chunk removes the chain match self.chains.call_by_id(chain_id, |chain| { - chain.on_block_response(network, batch_id, &peer_id, request_id, blocks) + chain.on_block_response(network, batch_id, batch_peers, request_id, blocks) }) { Ok((removed_chain, sync_type)) => { if let Some((removed_chain, remove_reason)) = removed_chain { diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index 932f485dd0..06dca355e5 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -449,12 +449,13 @@ fn build_rpc_block( RpcBlock::new(None, block, Some(blobs.clone())).unwrap() } Some(DataSidecars::DataColumns(columns)) => { + // TODO(das): Assumes CGC = max value. Change if we want to do more complex tests + let expected_custody_indices = columns.iter().map(|d| d.index()).collect::>(); RpcBlock::new_with_custody_columns( None, block, columns.clone(), - // TODO(das): Assumes CGC = max value. Change if we want to do more complex tests - columns.len(), + expected_custody_indices, spec, ) .unwrap() diff --git a/consensus/state_processing/src/per_block_processing/signature_sets.rs b/consensus/state_processing/src/per_block_processing/signature_sets.rs index 39f438f97f..e954541b59 100644 --- a/consensus/state_processing/src/per_block_processing/signature_sets.rs +++ b/consensus/state_processing/src/per_block_processing/signature_sets.rs @@ -34,9 +34,6 @@ pub enum Error { /// /// The block is invalid. IncorrectBlockProposer { block: u64, local_shuffling: u64 }, - /// The public keys supplied do not match the number of objects requiring keys. Block validity - /// was not determined. - MismatchedPublicKeyLen { pubkey_len: usize, other_len: usize }, /// Pubkey decompression failed. The block is invalid. PublicKeyDecompressionFailed, /// The public key bytes stored in the `BeaconState` were not valid. This is a serious internal