From 4e13b3be0f77d50eb5db04d4af335b4ee440f62e Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 11 Jun 2025 11:49:25 +0200 Subject: [PATCH] Fix failed_peers post fulu --- .../network/src/sync/backfill_sync/mod.rs | 11 +++- .../network/src/sync/range_sync/batch.rs | 58 +++++++++---------- .../network/src/sync/range_sync/chain.rs | 12 +++- 3 files changed, 43 insertions(+), 38 deletions(-) diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 0a68dc2ce8..5037cf4860 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -617,9 +617,12 @@ impl BackFillSync { error, } => { // TODO(sync): De-dup between back and forwards sync + let mut failed_peers = vec![]; + if let Some(penalty) = peer_action.block_peer { // Penalize the peer appropiately. network.report_peer(batch_peers.block(), penalty, "faulty_batch"); + failed_peers.push(batch_peers.block()); } // Penalize each peer only once. Currently a peer_action does not mix different @@ -635,9 +638,11 @@ impl BackFillSync { .unique() { network.report_peer(peer, penalty, "faulty_batch_column"); + failed_peers.push(peer); } - match batch.processing_completed(BatchProcessingResult::FaultyFailure) { + match batch.processing_completed(BatchProcessingResult::FaultyFailure(failed_peers)) + { Err(e) => { // Batch was in the wrong state self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)) @@ -926,12 +931,12 @@ impl BackFillSync { ) -> Result<(), BackFillError> { if let Some(batch) = self.batches.get_mut(&batch_id) { let request = batch.to_blocks_by_range_request(); - let failed_peers = batch.failed_block_peers(); + let failed_peers = batch.failed_peers(); match network.block_components_by_range_request( request, RangeRequestId::BackfillSync { batch_id }, self.peers.clone(), - &failed_peers, + failed_peers, ) { Ok(request_id) => { // inform the batch about the new request diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index ab9fd40bab..5267ba56ba 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -112,7 +112,7 @@ pub enum BatchOperationOutcome { pub enum BatchProcessingResult { Success, - FaultyFailure, + FaultyFailure(Vec), NonFaultyFailure, } @@ -128,7 +128,9 @@ pub struct BatchInfo { /// Number of processing attempts that have failed but we do not count. non_faulty_processing_attempts: u8, /// The number of download retries this batch has undergone due to a failed request. - failed_download_attempts: Vec>, + failed_download_attempts: usize, + /// Peers that returned bad data, and we want to de-prioritize + failed_peers: HashSet, /// State of the batch. state: BatchState, /// Pin the generic @@ -197,7 +199,8 @@ impl BatchInfo { start_slot, end_slot, failed_processing_attempts: Vec::new(), - failed_download_attempts: Vec::new(), + failed_download_attempts: 0, + failed_peers: <_>::default(), non_faulty_processing_attempts: 0, state: BatchState::AwaitingDownload, marker: std::marker::PhantomData, @@ -206,23 +209,8 @@ impl BatchInfo { /// Gives a list of peers from which this batch has had a failed download or processing /// attempt. - /// - /// TODO(das): Returns only block peers to keep the mainnet path equivalent. The failed peers - /// mechanism is broken for PeerDAS and will be fixed with https://github.com/sigp/lighthouse/issues/6258 - pub fn failed_block_peers(&self) -> HashSet { - let mut peers = HashSet::with_capacity( - self.failed_processing_attempts.len() + self.failed_download_attempts.len(), - ); - - for attempt in &self.failed_processing_attempts { - peers.insert(attempt.peers.block()); - } - - for peer in self.failed_download_attempts.iter().flatten() { - peers.insert(*peer); - } - - peers + pub fn failed_peers(&self) -> &HashSet { + &self.failed_peers } /// Verifies if an incoming block belongs to this batch. @@ -272,8 +260,7 @@ impl BatchInfo { match self.state { BatchState::Poisoned => unreachable!("Poisoned batch"), BatchState::Failed => BatchOperationOutcome::Failed { - blacklist: self.failed_processing_attempts.len() - > self.failed_download_attempts.len(), + blacklist: self.failed_processing_attempts.len() > self.failed_download_attempts, }, _ => BatchOperationOutcome::Continue, } @@ -325,15 +312,19 @@ impl BatchInfo { match self.state.poison() { BatchState::Downloading(_request_id) => { // register the attempt and check if the batch can be tried again - self.failed_download_attempts.push(peer); - self.state = if self.failed_download_attempts.len() - >= B::max_batch_download_attempts() as usize - { - BatchState::Failed - } else { - // drop the blocks - BatchState::AwaitingDownload - }; + if let Some(peer) = peer { + self.failed_peers.insert(peer); + } + + self.failed_download_attempts += 1; + + self.state = + if self.failed_download_attempts >= B::max_batch_download_attempts() as usize { + BatchState::Failed + } else { + // drop the blocks + BatchState::AwaitingDownload + }; Ok(self.outcome()) } BatchState::Poisoned => unreachable!("Poisoned batch"), @@ -390,9 +381,12 @@ impl BatchInfo { BatchState::Processing(attempt) => { self.state = match procesing_result { BatchProcessingResult::Success => BatchState::AwaitingValidation(attempt), - BatchProcessingResult::FaultyFailure => { + BatchProcessingResult::FaultyFailure(failed_peers) => { // register the failed attempt self.failed_processing_attempts.push(attempt); + for peer in failed_peers { + self.failed_peers.insert(peer); + } // check if the batch can be downloaded again if self.failed_processing_attempts.len() diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 87e00bc91a..17bce62a7c 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -539,10 +539,13 @@ impl SyncingChain { // TODO(sync): propagate error in logs error: _, } => { + let mut failed_peers = vec![]; + // TODO(sync): De-dup between back and forwards sync if let Some(penalty) = peer_action.block_peer { // Penalize the peer appropiately. network.report_peer(batch_peers.block(), penalty, "faulty_batch"); + failed_peers.push(batch_peers.block()); } // Penalize each peer only once. Currently a peer_action does not mix different @@ -558,10 +561,13 @@ impl SyncingChain { .unique() { network.report_peer(peer, penalty, "faulty_batch_column"); + failed_peers.push(peer); } // Check if this batch is allowed to continue - match batch.processing_completed(BatchProcessingResult::FaultyFailure)? { + match batch + .processing_completed(BatchProcessingResult::FaultyFailure(failed_peers))? + { BatchOperationOutcome::Continue => { // Chain can continue. Check if it can be moved forward. if *imported_blocks > 0 { @@ -929,7 +935,7 @@ impl SyncingChain { let batch_state = self.visualize_batch_state(); if let Some(batch) = self.batches.get_mut(&batch_id) { let request = batch.to_blocks_by_range_request(); - let failed_peers = batch.failed_block_peers(); + let failed_peers = batch.failed_peers(); match network.block_components_by_range_request( request, @@ -938,7 +944,7 @@ impl SyncingChain { batch_id, }, self.peers.clone(), - &failed_peers, + failed_peers, ) { Ok(request_id) => { // inform the batch about the new request