Fix PeerDAS sync scoring (#7352)

* Remove request tracking inside syncing chains

* Prioritize by range peers in network context

* Prioritize custody peers for columns by range

* Explicit error handling of the no peers error case

* Remove good_peers_on_sampling_subnets

* Count AwaitingDownload towards the buffer limit

* Retry syncing chains in AwaitingDownload state

* Use same peer priorization for lookups

* Review PR

* Address TODOs

* Revert changes to peer erroring in range sync

* Revert metrics changes

* Update comment

* Pass peers_to_deprioritize to select_columns_by_range_peers_to_request

* more idiomatic

* Idiomatic while

* Add note about infinite loop

* Use while let

* Fix wrong custody column count for lookup blocks

* Remove impl

* Remove stale comment

* Fix build errors.

* Or default

* Review PR

* BatchPeerGroup

* Match block and blob signatures

* Explicit match statement to BlockError in range sync

* Remove todo in BatchPeerGroup

* Remove participating peers from backfill sync

* Remove MissingAllCustodyColumns error

* Merge fixes

* Clean up PR

* Consistent naming of batch_peers

* Address multiple review comments

* Better errors for das

* Penalize column peers once

* Restore fn

* Fix error enum

* Removed MismatchedPublicKeyLen

* Revert testing changes

* Change BlockAndCustodyColumns enum variant

* Revert type change in import_historical_block_batch

* Drop pubkey cache

* Don't collect Vec

* Classify errors

* Remove ReconstructColumnsError

* More detailed UnrequestedSlot error

* Lint test

* Fix slot conversion

* Reduce penalty for missing blobs

* Revert changes in peer selection

* Lint tests

* Rename block matching functions

* Reorder block matching in historical blocks

* Fix order of block matching

* Add store tests

* Filter blockchain in assert_correct_historical_block_chain

* Also filter before KZG checks

* Lint tests

* Fix lint

* Fix fulu err assertion

* Check point is not at infinity

* Fix ws sync test

* Revert dropping filter fn

---------

Co-authored-by: Jimmy Chen <jchen.tc@gmail.com>
Co-authored-by: Jimmy Chen <jimmy@sigmaprime.io>
Co-authored-by: Pawan Dhananjay <pawandhananjay@gmail.com>
This commit is contained in:
Lion - dapplion
2025-05-21 08:06:42 -05:00
committed by GitHub
parent f06d1d0346
commit b014675b7a
27 changed files with 1103 additions and 654 deletions

View File

@@ -2,13 +2,13 @@ use beacon_chain::block_verification_types::RpcBlock;
use lighthouse_network::rpc::methods::BlocksByRangeRequest;
use lighthouse_network::service::api_types::Id;
use lighthouse_network::PeerId;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::hash::{Hash, Hasher};
use std::ops::Sub;
use std::time::{Duration, Instant};
use strum::Display;
use types::{Epoch, EthSpec, Slot};
use types::{ColumnIndex, Epoch, EthSpec, Slot};
/// The number of times to retry a batch before it is considered failed.
const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5;
@@ -26,6 +26,35 @@ pub enum ByRangeRequestType {
Blocks,
}
#[derive(Clone, Debug)]
pub struct BatchPeers {
block_peer: PeerId,
column_peers: HashMap<ColumnIndex, PeerId>,
}
impl BatchPeers {
pub fn new_from_block_peer(block_peer: PeerId) -> Self {
Self {
block_peer,
column_peers: <_>::default(),
}
}
pub fn new(block_peer: PeerId, column_peers: HashMap<ColumnIndex, PeerId>) -> Self {
Self {
block_peer,
column_peers,
}
}
pub fn block(&self) -> PeerId {
self.block_peer
}
pub fn column(&self, index: &ColumnIndex) -> Option<&PeerId> {
self.column_peers.get(index)
}
}
/// Allows customisation of the above constants used in other sync methods such as BackFillSync.
pub trait BatchConfig {
/// The maximum batch download attempts.
@@ -110,8 +139,6 @@ pub struct BatchInfo<E: EthSpec, B: BatchConfig = RangeSyncBatchConfig> {
failed_download_attempts: Vec<Option<PeerId>>,
/// State of the batch.
state: BatchState<E>,
/// Whether this batch contains all blocks or all blocks and blobs.
batch_type: ByRangeRequestType,
/// Pin the generic
marker: std::marker::PhantomData<B>,
}
@@ -134,7 +161,7 @@ pub enum BatchState<E: EthSpec> {
/// The batch is being downloaded.
Downloading(Id),
/// The batch has been completely downloaded and is ready for processing.
AwaitingProcessing(PeerId, Vec<RpcBlock<E>>, Instant),
AwaitingProcessing(BatchPeers, Vec<RpcBlock<E>>, Instant),
/// The batch is being processed.
Processing(Attempt),
/// The batch was successfully processed and is waiting to be validated.
@@ -171,7 +198,7 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
/// fork boundary will be of mixed type (all blocks and one last blockblob), and I don't want to
/// deal with this for now.
/// This means finalization might be slower in deneb
pub fn new(start_epoch: &Epoch, num_of_epochs: u64, batch_type: ByRangeRequestType) -> Self {
pub fn new(start_epoch: &Epoch, num_of_epochs: u64) -> Self {
let start_slot = start_epoch.start_slot(E::slots_per_epoch());
let end_slot = start_slot + num_of_epochs * E::slots_per_epoch();
BatchInfo {
@@ -181,20 +208,22 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
failed_download_attempts: Vec::new(),
non_faulty_processing_attempts: 0,
state: BatchState::AwaitingDownload,
batch_type,
marker: std::marker::PhantomData,
}
}
/// Gives a list of peers from which this batch has had a failed download or processing
/// attempt.
pub fn failed_peers(&self) -> HashSet<PeerId> {
///
/// TODO(das): Returns only block peers to keep the mainnet path equivalent. The failed peers
/// mechanism is broken for PeerDAS and will be fixed with https://github.com/sigp/lighthouse/issues/6258
pub fn failed_block_peers(&self) -> HashSet<PeerId> {
let mut peers = HashSet::with_capacity(
self.failed_processing_attempts.len() + self.failed_download_attempts.len(),
);
for attempt in &self.failed_processing_attempts {
peers.insert(attempt.peer_id);
peers.insert(attempt.peers.block());
}
for peer in self.failed_download_attempts.iter().flatten() {
@@ -212,13 +241,13 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
false
}
/// Returns the peer that is currently responsible for progressing the state of the batch.
pub fn processing_peer(&self) -> Option<&PeerId> {
/// Returns the peers that provided this batch's downloaded contents
pub fn processing_peers(&self) -> Option<&BatchPeers> {
match &self.state {
BatchState::AwaitingDownload | BatchState::Failed | BatchState::Downloading(..) => None,
BatchState::AwaitingProcessing(peer_id, _, _)
| BatchState::Processing(Attempt { peer_id, .. })
| BatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(peer_id),
BatchState::AwaitingProcessing(peers, _, _)
| BatchState::Processing(Attempt { peers, .. })
| BatchState::AwaitingValidation(Attempt { peers, .. }) => Some(peers),
BatchState::Poisoned => unreachable!("Poisoned batch"),
}
}
@@ -237,13 +266,10 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
}
/// Returns a BlocksByRange request associated with the batch.
pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ByRangeRequestType) {
(
BlocksByRangeRequest::new(
self.start_slot.into(),
self.end_slot.sub(self.start_slot).into(),
),
self.batch_type,
pub fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest {
BlocksByRangeRequest::new(
self.start_slot.into(),
self.end_slot.sub(self.start_slot).into(),
)
}
@@ -275,12 +301,12 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
pub fn download_completed(
&mut self,
blocks: Vec<RpcBlock<E>>,
peer: PeerId,
batch_peers: BatchPeers,
) -> Result<usize /* Received blocks */, WrongState> {
match self.state.poison() {
BatchState::Downloading(_) => {
BatchState::Downloading(_request_id) => {
let received = blocks.len();
self.state = BatchState::AwaitingProcessing(peer, blocks, Instant::now());
self.state = BatchState::AwaitingProcessing(batch_peers, blocks, Instant::now());
Ok(received)
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
@@ -305,10 +331,9 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
peer: Option<PeerId>,
) -> Result<BatchOperationOutcome, WrongState> {
match self.state.poison() {
BatchState::Downloading(_) => {
BatchState::Downloading(_request_id) => {
// register the attempt and check if the batch can be tried again
self.failed_download_attempts.push(peer);
self.state = if self.failed_download_attempts.len()
>= B::max_batch_download_attempts() as usize
{
@@ -349,8 +374,8 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
pub fn start_processing(&mut self) -> Result<(Vec<RpcBlock<E>>, Duration), WrongState> {
match self.state.poison() {
BatchState::AwaitingProcessing(peer, blocks, start_instant) => {
self.state = BatchState::Processing(Attempt::new::<B, E>(peer, &blocks));
BatchState::AwaitingProcessing(peers, blocks, start_instant) => {
self.state = BatchState::Processing(Attempt::new::<B, E>(peers, &blocks));
Ok((blocks, start_instant.elapsed()))
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
@@ -438,39 +463,41 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
}
}
/// Represents a peer's attempt and providing the result for this batch.
/// Represents a batch attempt awaiting validation
///
/// Invalid attempts will downscore a peer.
#[derive(PartialEq, Debug)]
/// Invalid attempts will downscore its peers
#[derive(Debug)]
pub struct Attempt {
/// The peer that made the attempt.
pub peer_id: PeerId,
/// The peers that served this batch contents
peers: BatchPeers,
/// The hash of the blocks of the attempt.
pub hash: u64,
}
impl Attempt {
fn new<B: BatchConfig, E: EthSpec>(peer_id: PeerId, blocks: &[RpcBlock<E>]) -> Self {
fn new<B: BatchConfig, E: EthSpec>(peers: BatchPeers, blocks: &[RpcBlock<E>]) -> Self {
let hash = B::batch_attempt_hash(blocks);
Attempt { peer_id, hash }
Attempt { peers, hash }
}
pub fn block_peer(&self) -> PeerId {
self.peers.block()
}
}
impl<E: EthSpec> std::fmt::Debug for BatchState<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BatchState::Processing(Attempt {
ref peer_id,
hash: _,
}) => write!(f, "Processing({})", peer_id),
BatchState::AwaitingValidation(Attempt {
ref peer_id,
hash: _,
}) => write!(f, "AwaitingValidation({})", peer_id),
BatchState::Processing(Attempt { ref peers, hash: _ }) => {
write!(f, "Processing({})", peers.block())
}
BatchState::AwaitingValidation(Attempt { ref peers, hash: _ }) => {
write!(f, "AwaitingValidation({})", peers.block())
}
BatchState::AwaitingDownload => f.write_str("AwaitingDownload"),
BatchState::Failed => f.write_str("Failed"),
BatchState::AwaitingProcessing(ref peer, ref blocks, _) => {
write!(f, "AwaitingProcessing({}, {} blocks)", peer, blocks.len())
BatchState::AwaitingProcessing(_, ref blocks, _) => {
write!(f, "AwaitingProcessing({} blocks)", blocks.len())
}
BatchState::Downloading(request_id) => {
write!(f, "Downloading({})", request_id)

View File

@@ -1,4 +1,4 @@
use super::batch::{BatchInfo, BatchProcessingResult, BatchState};
use super::batch::{BatchInfo, BatchPeers, BatchProcessingResult, BatchState};
use super::RangeSyncType;
use crate::metrics;
use crate::network_beacon_processor::ChainSegmentProcessId;
@@ -6,6 +6,7 @@ use crate::sync::network_context::{RangeRequestId, RpcRequestSendError, RpcRespo
use crate::sync::{network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use itertools::Itertools;
use lighthouse_network::service::api_types::Id;
use lighthouse_network::{PeerAction, PeerId};
use logging::crit;
@@ -216,7 +217,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
&mut self,
network: &mut SyncNetworkContext<T>,
batch_id: BatchId,
peer_id: &PeerId,
batch_peers: BatchPeers,
request_id: Id,
blocks: Vec<RpcBlock<T::EthSpec>>,
) -> ProcessingResult {
@@ -244,8 +245,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// A stream termination has been sent. This batch has ended. Process a completed batch.
// Remove the request from the peer's active batches
// TODO(das): should use peer group here https://github.com/sigp/lighthouse/issues/6258
let received = batch.download_completed(blocks, *peer_id)?;
let received = batch.download_completed(blocks, batch_peers)?;
let awaiting_batches = batch_id
.saturating_sub(self.optimistic_start.unwrap_or(self.processing_target))
/ EPOCHS_PER_BATCH;
@@ -447,7 +447,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
};
let peer = batch.processing_peer().cloned().ok_or_else(|| {
let batch_peers = batch.processing_peers().ok_or_else(|| {
RemoveChain::WrongBatchState(format!(
"Processing target is in wrong state: {:?}",
batch.state(),
@@ -458,7 +458,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
debug!(
result = ?result,
batch_epoch = %batch_id,
client = %network.client_type(&peer),
batch_state = ?batch_state,
?batch,
"Batch processing result"
@@ -521,10 +520,30 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
BatchProcessResult::FaultyFailure {
imported_blocks,
penalty,
peer_action,
// TODO(sync): propagate error in logs
error: _,
} => {
// Penalize the peer appropiately.
network.report_peer(peer, *penalty, "faulty_batch");
// TODO(sync): De-dup between back and forwards sync
if let Some(penalty) = peer_action.block_peer {
// Penalize the peer appropiately.
network.report_peer(batch_peers.block(), penalty, "faulty_batch");
}
// Penalize each peer only once. Currently a peer_action does not mix different
// PeerAction levels.
for (peer, penalty) in peer_action
.column_peer
.iter()
.filter_map(|(column_index, penalty)| {
batch_peers
.column(column_index)
.map(|peer| (*peer, *penalty))
})
.unique()
{
network.report_peer(peer, penalty, "faulty_batch_column");
}
// Check if this batch is allowed to continue
match batch.processing_completed(BatchProcessingResult::FaultyFailure)? {
@@ -540,6 +559,11 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
self.handle_invalid_batch(network, batch_id)
}
BatchOperationOutcome::Failed { blacklist } => {
// TODO(das): what peer action should we apply to the rest of
// peers? Say a batch repeatedly fails because a custody peer is not
// sending us its custody columns
let penalty = PeerAction::LowToleranceError;
// Check that we have not exceeded the re-process retry counter,
// If a batch has exceeded the invalid batch lookup attempts limit, it means
// that it is likely all peers in this chain are are sending invalid batches
@@ -554,7 +578,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
);
for peer in self.peers.drain() {
network.report_peer(peer, *penalty, "faulty_chain");
network.report_peer(peer, penalty, "faulty_chain");
}
Err(RemoveChain::ChainFailed {
blacklist,
@@ -633,17 +657,20 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// The validated batch has been re-processed
if attempt.hash != processed_attempt.hash {
// The re-downloaded version was different
if processed_attempt.peer_id != attempt.peer_id {
// TODO(das): should penalize other peers?
let valid_attempt_peer = processed_attempt.block_peer();
let bad_attempt_peer = attempt.block_peer();
if valid_attempt_peer != bad_attempt_peer {
// A different peer sent the correct batch, the previous peer did not
// We negatively score the original peer.
let action = PeerAction::LowToleranceError;
debug!(
batch_epoch = %id, score_adjustment = %action,
original_peer = %attempt.peer_id, new_peer = %processed_attempt.peer_id,
original_peer = %bad_attempt_peer, new_peer = %valid_attempt_peer,
"Re-processed batch validated. Scoring original peer"
);
network.report_peer(
attempt.peer_id,
bad_attempt_peer,
action,
"batch_reprocessed_original_peer",
);
@@ -654,12 +681,12 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
debug!(
batch_epoch = %id,
score_adjustment = %action,
original_peer = %attempt.peer_id,
new_peer = %processed_attempt.peer_id,
original_peer = %bad_attempt_peer,
new_peer = %valid_attempt_peer,
"Re-processed batch validated by the same peer"
);
network.report_peer(
attempt.peer_id,
bad_attempt_peer,
action,
"batch_reprocessed_same_peer",
);
@@ -888,8 +915,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
) -> ProcessingResult {
let batch_state = self.visualize_batch_state();
if let Some(batch) = self.batches.get_mut(&batch_id) {
let (request, batch_type) = batch.to_blocks_by_range_request();
let failed_peers = batch.failed_peers();
let request = batch.to_blocks_by_range_request();
let failed_peers = batch.failed_block_peers();
// TODO(das): we should request only from peers that are part of this SyncingChain.
// However, then we hit the NoPeer error frequently which causes the batch to fail and
@@ -903,7 +930,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
.collect::<HashSet<_>>();
match network.block_components_by_range_request(
batch_type,
request,
RangeRequestId::RangeSync {
chain_id: self.id,
@@ -999,8 +1025,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
if let Entry::Vacant(entry) = self.batches.entry(epoch) {
let batch_type = network.batch_type(epoch);
let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH, batch_type);
let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH);
entry.insert(optimistic_batch);
self.send_batch(network, epoch)?;
}
@@ -1101,8 +1126,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
self.include_next_batch(network)
}
Entry::Vacant(entry) => {
let batch_type = network.batch_type(next_batch_id);
entry.insert(BatchInfo::new(&next_batch_id, EPOCHS_PER_BATCH, batch_type));
entry.insert(BatchInfo::new(&next_batch_id, EPOCHS_PER_BATCH));
self.to_be_downloaded += EPOCHS_PER_BATCH;
Some(next_batch_id)
}

View File

@@ -8,7 +8,7 @@ mod range;
mod sync_type;
pub use batch::{
BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState,
BatchConfig, BatchInfo, BatchOperationOutcome, BatchPeers, BatchProcessingResult, BatchState,
ByRangeRequestType,
};
pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH};

View File

@@ -42,6 +42,7 @@
use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain};
use super::chain_collection::{ChainCollection, SyncChainStatus};
use super::sync_type::RangeSyncType;
use super::BatchPeers;
use crate::metrics;
use crate::status::ToStatusMessage;
use crate::sync::network_context::{RpcResponseError, SyncNetworkContext};
@@ -227,7 +228,7 @@ where
pub fn blocks_by_range_response(
&mut self,
network: &mut SyncNetworkContext<T>,
peer_id: PeerId,
batch_peers: BatchPeers,
chain_id: ChainId,
batch_id: BatchId,
request_id: Id,
@@ -235,7 +236,7 @@ where
) {
// check if this chunk removes the chain
match self.chains.call_by_id(chain_id, |chain| {
chain.on_block_response(network, batch_id, &peer_id, request_id, blocks)
chain.on_block_response(network, batch_id, batch_peers, request_id, blocks)
}) {
Ok((removed_chain, sync_type)) => {
if let Some((removed_chain, remove_reason)) = removed_chain {