Fix failed_peers post fulu

This commit is contained in:
dapplion
2025-06-11 11:49:25 +02:00
parent 7a03578795
commit 4e13b3be0f
3 changed files with 43 additions and 38 deletions

View File

@@ -617,9 +617,12 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
error, error,
} => { } => {
// TODO(sync): De-dup between back and forwards sync // TODO(sync): De-dup between back and forwards sync
let mut failed_peers = vec![];
if let Some(penalty) = peer_action.block_peer { if let Some(penalty) = peer_action.block_peer {
// Penalize the peer appropiately. // Penalize the peer appropiately.
network.report_peer(batch_peers.block(), penalty, "faulty_batch"); network.report_peer(batch_peers.block(), penalty, "faulty_batch");
failed_peers.push(batch_peers.block());
} }
// Penalize each peer only once. Currently a peer_action does not mix different // Penalize each peer only once. Currently a peer_action does not mix different
@@ -635,9 +638,11 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
.unique() .unique()
{ {
network.report_peer(peer, penalty, "faulty_batch_column"); network.report_peer(peer, penalty, "faulty_batch_column");
failed_peers.push(peer);
} }
match batch.processing_completed(BatchProcessingResult::FaultyFailure) { match batch.processing_completed(BatchProcessingResult::FaultyFailure(failed_peers))
{
Err(e) => { Err(e) => {
// Batch was in the wrong state // Batch was in the wrong state
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)) self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))
@@ -926,12 +931,12 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
) -> Result<(), BackFillError> { ) -> Result<(), BackFillError> {
if let Some(batch) = self.batches.get_mut(&batch_id) { if let Some(batch) = self.batches.get_mut(&batch_id) {
let request = batch.to_blocks_by_range_request(); let request = batch.to_blocks_by_range_request();
let failed_peers = batch.failed_block_peers(); let failed_peers = batch.failed_peers();
match network.block_components_by_range_request( match network.block_components_by_range_request(
request, request,
RangeRequestId::BackfillSync { batch_id }, RangeRequestId::BackfillSync { batch_id },
self.peers.clone(), self.peers.clone(),
&failed_peers, failed_peers,
) { ) {
Ok(request_id) => { Ok(request_id) => {
// inform the batch about the new request // inform the batch about the new request

View File

@@ -112,7 +112,7 @@ pub enum BatchOperationOutcome {
pub enum BatchProcessingResult { pub enum BatchProcessingResult {
Success, Success,
FaultyFailure, FaultyFailure(Vec<PeerId>),
NonFaultyFailure, NonFaultyFailure,
} }
@@ -128,7 +128,9 @@ pub struct BatchInfo<E: EthSpec, B: BatchConfig = RangeSyncBatchConfig> {
/// Number of processing attempts that have failed but we do not count. /// Number of processing attempts that have failed but we do not count.
non_faulty_processing_attempts: u8, non_faulty_processing_attempts: u8,
/// The number of download retries this batch has undergone due to a failed request. /// The number of download retries this batch has undergone due to a failed request.
failed_download_attempts: Vec<Option<PeerId>>, failed_download_attempts: usize,
/// Peers that returned bad data, and we want to de-prioritize
failed_peers: HashSet<PeerId>,
/// State of the batch. /// State of the batch.
state: BatchState<E>, state: BatchState<E>,
/// Pin the generic /// Pin the generic
@@ -197,7 +199,8 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
start_slot, start_slot,
end_slot, end_slot,
failed_processing_attempts: Vec::new(), failed_processing_attempts: Vec::new(),
failed_download_attempts: Vec::new(), failed_download_attempts: 0,
failed_peers: <_>::default(),
non_faulty_processing_attempts: 0, non_faulty_processing_attempts: 0,
state: BatchState::AwaitingDownload, state: BatchState::AwaitingDownload,
marker: std::marker::PhantomData, marker: std::marker::PhantomData,
@@ -206,23 +209,8 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
/// Gives a list of peers from which this batch has had a failed download or processing /// Gives a list of peers from which this batch has had a failed download or processing
/// attempt. /// attempt.
/// pub fn failed_peers(&self) -> &HashSet<PeerId> {
/// TODO(das): Returns only block peers to keep the mainnet path equivalent. The failed peers &self.failed_peers
/// mechanism is broken for PeerDAS and will be fixed with https://github.com/sigp/lighthouse/issues/6258
pub fn failed_block_peers(&self) -> HashSet<PeerId> {
let mut peers = HashSet::with_capacity(
self.failed_processing_attempts.len() + self.failed_download_attempts.len(),
);
for attempt in &self.failed_processing_attempts {
peers.insert(attempt.peers.block());
}
for peer in self.failed_download_attempts.iter().flatten() {
peers.insert(*peer);
}
peers
} }
/// Verifies if an incoming block belongs to this batch. /// Verifies if an incoming block belongs to this batch.
@@ -272,8 +260,7 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
match self.state { match self.state {
BatchState::Poisoned => unreachable!("Poisoned batch"), BatchState::Poisoned => unreachable!("Poisoned batch"),
BatchState::Failed => BatchOperationOutcome::Failed { BatchState::Failed => BatchOperationOutcome::Failed {
blacklist: self.failed_processing_attempts.len() blacklist: self.failed_processing_attempts.len() > self.failed_download_attempts,
> self.failed_download_attempts.len(),
}, },
_ => BatchOperationOutcome::Continue, _ => BatchOperationOutcome::Continue,
} }
@@ -325,15 +312,19 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
match self.state.poison() { match self.state.poison() {
BatchState::Downloading(_request_id) => { BatchState::Downloading(_request_id) => {
// register the attempt and check if the batch can be tried again // register the attempt and check if the batch can be tried again
self.failed_download_attempts.push(peer); if let Some(peer) = peer {
self.state = if self.failed_download_attempts.len() self.failed_peers.insert(peer);
>= B::max_batch_download_attempts() as usize }
{
BatchState::Failed self.failed_download_attempts += 1;
} else {
// drop the blocks self.state =
BatchState::AwaitingDownload if self.failed_download_attempts >= B::max_batch_download_attempts() as usize {
}; BatchState::Failed
} else {
// drop the blocks
BatchState::AwaitingDownload
};
Ok(self.outcome()) Ok(self.outcome())
} }
BatchState::Poisoned => unreachable!("Poisoned batch"), BatchState::Poisoned => unreachable!("Poisoned batch"),
@@ -390,9 +381,12 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
BatchState::Processing(attempt) => { BatchState::Processing(attempt) => {
self.state = match procesing_result { self.state = match procesing_result {
BatchProcessingResult::Success => BatchState::AwaitingValidation(attempt), BatchProcessingResult::Success => BatchState::AwaitingValidation(attempt),
BatchProcessingResult::FaultyFailure => { BatchProcessingResult::FaultyFailure(failed_peers) => {
// register the failed attempt // register the failed attempt
self.failed_processing_attempts.push(attempt); self.failed_processing_attempts.push(attempt);
for peer in failed_peers {
self.failed_peers.insert(peer);
}
// check if the batch can be downloaded again // check if the batch can be downloaded again
if self.failed_processing_attempts.len() if self.failed_processing_attempts.len()

View File

@@ -539,10 +539,13 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// TODO(sync): propagate error in logs // TODO(sync): propagate error in logs
error: _, error: _,
} => { } => {
let mut failed_peers = vec![];
// TODO(sync): De-dup between back and forwards sync // TODO(sync): De-dup between back and forwards sync
if let Some(penalty) = peer_action.block_peer { if let Some(penalty) = peer_action.block_peer {
// Penalize the peer appropiately. // Penalize the peer appropiately.
network.report_peer(batch_peers.block(), penalty, "faulty_batch"); network.report_peer(batch_peers.block(), penalty, "faulty_batch");
failed_peers.push(batch_peers.block());
} }
// Penalize each peer only once. Currently a peer_action does not mix different // Penalize each peer only once. Currently a peer_action does not mix different
@@ -558,10 +561,13 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
.unique() .unique()
{ {
network.report_peer(peer, penalty, "faulty_batch_column"); network.report_peer(peer, penalty, "faulty_batch_column");
failed_peers.push(peer);
} }
// Check if this batch is allowed to continue // Check if this batch is allowed to continue
match batch.processing_completed(BatchProcessingResult::FaultyFailure)? { match batch
.processing_completed(BatchProcessingResult::FaultyFailure(failed_peers))?
{
BatchOperationOutcome::Continue => { BatchOperationOutcome::Continue => {
// Chain can continue. Check if it can be moved forward. // Chain can continue. Check if it can be moved forward.
if *imported_blocks > 0 { if *imported_blocks > 0 {
@@ -929,7 +935,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let batch_state = self.visualize_batch_state(); let batch_state = self.visualize_batch_state();
if let Some(batch) = self.batches.get_mut(&batch_id) { if let Some(batch) = self.batches.get_mut(&batch_id) {
let request = batch.to_blocks_by_range_request(); let request = batch.to_blocks_by_range_request();
let failed_peers = batch.failed_block_peers(); let failed_peers = batch.failed_peers();
match network.block_components_by_range_request( match network.block_components_by_range_request(
request, request,
@@ -938,7 +944,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
batch_id, batch_id,
}, },
self.peers.clone(), self.peers.clone(),
&failed_peers, failed_peers,
) { ) {
Ok(request_id) => { Ok(request_id) => {
// inform the batch about the new request // inform the batch about the new request