diff --git a/beacon_node/network/src/sync/batch.rs b/beacon_node/network/src/sync/batch.rs index 8de386f5be..f9a1fcce39 100644 --- a/beacon_node/network/src/sync/batch.rs +++ b/beacon_node/network/src/sync/batch.rs @@ -213,6 +213,9 @@ impl BatchInfo { /// After different operations over a batch, this could be in a state that allows it to /// continue, or in failed state. When the batch has failed, we check if it did mainly due to /// processing failures. In this case the batch is considered failed and faulty. + /// + /// When failure counts are equal, `blacklist` is `false` — we assume network issues over + /// peer fault when the evidence is ambiguous. pub fn outcome(&self) -> BatchOperationOutcome { match self.state { BatchState::Poisoned => unreachable!("Poisoned batch"), @@ -255,8 +258,10 @@ impl BatchInfo { /// Mark the batch as failed and return whether we can attempt a re-download. /// /// This can happen if a peer disconnects or some error occurred that was not the peers fault. - /// The `peer` parameter, when set to None, does not increment the failed attempts of - /// this batch and register the peer, rather attempts a re-download. + /// The `peer` parameter, when set to `None`, still counts toward + /// `max_batch_download_attempts` (to prevent infinite retries on persistent failures) + /// but does not register a peer in `failed_peers()`. Use + /// [`Self::downloading_to_awaiting_download`] to retry without counting a failed attempt. #[must_use = "Batch may have failed"] pub fn download_failed( &mut self, @@ -272,7 +277,6 @@ impl BatchInfo { { BatchState::Failed } else { - // drop the blocks BatchState::AwaitingDownload }; Ok(self.outcome()) @@ -524,3 +528,196 @@ impl BatchState { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::sync::range_sync::RangeSyncBatchConfig; + use types::MinimalEthSpec; + + type Cfg = RangeSyncBatchConfig; + type TestBatch = BatchInfo>; + + fn max_dl() -> u8 { + Cfg::max_batch_download_attempts() + } + + fn max_proc() -> u8 { + Cfg::max_batch_processing_attempts() + } + + fn new_batch() -> TestBatch { + BatchInfo::new(&Epoch::new(0), 1, ByRangeRequestType::Blocks) + } + + fn peer() -> PeerId { + PeerId::random() + } + + fn advance_to_processing(batch: &mut TestBatch, req_id: Id, peer_id: PeerId) { + batch.start_downloading(req_id).unwrap(); + batch.download_completed(vec![1, 2, 3], peer_id).unwrap(); + batch.start_processing().unwrap(); + } + + fn advance_to_awaiting_validation(batch: &mut TestBatch, req_id: Id, peer_id: PeerId) { + advance_to_processing(batch, req_id, peer_id); + batch + .processing_completed(BatchProcessingResult::Success) + .unwrap(); + } + + #[test] + fn happy_path_lifecycle() { + let mut batch = new_batch(); + let p = peer(); + + assert!(matches!(batch.state(), BatchState::AwaitingDownload)); + + batch.start_downloading(1).unwrap(); + assert!(matches!(batch.state(), BatchState::Downloading(1))); + + batch.download_completed(vec![10, 20], p).unwrap(); + assert!(matches!(batch.state(), BatchState::AwaitingProcessing(..))); + + let (data, _duration) = batch.start_processing().unwrap(); + assert_eq!(data, vec![10, 20]); + assert!(matches!(batch.state(), BatchState::Processing(..))); + + let outcome = batch + .processing_completed(BatchProcessingResult::Success) + .unwrap(); + assert!(matches!(outcome, BatchOperationOutcome::Continue)); + assert!(matches!(batch.state(), BatchState::AwaitingValidation(..))); + } + + #[test] + fn download_failures_count_toward_limit() { + let mut batch = new_batch(); + + for i in 1..max_dl() as Id { + batch.start_downloading(i).unwrap(); + let outcome = batch.download_failed(Some(peer())).unwrap(); + assert!(matches!(outcome, BatchOperationOutcome::Continue)); + } + + // Next failure hits the limit + batch.start_downloading(max_dl() as Id).unwrap(); + let outcome = batch.download_failed(Some(peer())).unwrap(); + assert!(matches!( + outcome, + BatchOperationOutcome::Failed { blacklist: false } + )); + } + + #[test] + fn download_failed_none_counts_but_does_not_blame_peer() { + let mut batch = new_batch(); + + // None still counts toward the limit (prevents infinite retry on persistent + // network failures), but doesn't register a peer in failed_peers(). + for i in 0..max_dl() as Id { + batch.start_downloading(i).unwrap(); + batch.download_failed(None).unwrap(); + } + assert!(matches!(batch.state(), BatchState::Failed)); + assert!(batch.failed_peers().is_empty()); + } + + #[test] + fn faulty_processing_failures_count_toward_limit() { + let mut batch = new_batch(); + + for i in 1..max_proc() as Id { + advance_to_processing(&mut batch, i, peer()); + let outcome = batch + .processing_completed(BatchProcessingResult::FaultyFailure) + .unwrap(); + assert!(matches!(outcome, BatchOperationOutcome::Continue)); + } + + // Next faulty failure: limit reached + advance_to_processing(&mut batch, max_proc() as Id, peer()); + let outcome = batch + .processing_completed(BatchProcessingResult::FaultyFailure) + .unwrap(); + assert!(matches!( + outcome, + BatchOperationOutcome::Failed { blacklist: true } + )); + } + + #[test] + fn non_faulty_processing_failures_never_exhaust_batch() { + let mut batch = new_batch(); + + // Well past both limits — non-faulty failures should never cause failure + let iterations = (max_dl() + max_proc()) as Id * 2; + for i in 0..iterations { + advance_to_processing(&mut batch, i, peer()); + let outcome = batch + .processing_completed(BatchProcessingResult::NonFaultyFailure) + .unwrap(); + assert!(matches!(outcome, BatchOperationOutcome::Continue)); + } + // Non-faulty failures also don't register peers as failed + assert!(batch.failed_peers().is_empty()); + } + + #[test] + fn validation_failures_count_toward_processing_limit() { + let mut batch = new_batch(); + + for i in 1..max_proc() as Id { + advance_to_awaiting_validation(&mut batch, i, peer()); + let outcome = batch.validation_failed().unwrap(); + assert!(matches!(outcome, BatchOperationOutcome::Continue)); + } + + advance_to_awaiting_validation(&mut batch, max_proc() as Id, peer()); + let outcome = batch.validation_failed().unwrap(); + assert!(matches!( + outcome, + BatchOperationOutcome::Failed { blacklist: true } + )); + } + + #[test] + fn mixed_failure_types_interact_correctly() { + let mut batch = new_batch(); + let mut req_id: Id = 0; + let mut next_id = || { + req_id += 1; + req_id + }; + + // One download failure + batch.start_downloading(next_id()).unwrap(); + batch.download_failed(Some(peer())).unwrap(); + + // One faulty processing failure (requires a successful download first) + advance_to_processing(&mut batch, next_id(), peer()); + batch + .processing_completed(BatchProcessingResult::FaultyFailure) + .unwrap(); + + // One non-faulty processing failure + advance_to_processing(&mut batch, next_id(), peer()); + batch + .processing_completed(BatchProcessingResult::NonFaultyFailure) + .unwrap(); + assert!(matches!(batch.state(), BatchState::AwaitingDownload)); + + // Fill remaining download failures to hit the limit + for _ in 1..max_dl() { + batch.start_downloading(next_id()).unwrap(); + batch.download_failed(Some(peer())).unwrap(); + } + + // Download failures > processing failures → blacklist: false + assert!(matches!( + batch.outcome(), + BatchOperationOutcome::Failed { blacklist: false } + )); + } +} diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index dd9f17bfd1..3b65e1c84a 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -5,6 +5,8 @@ mod chain_collection; mod range; mod sync_type; +#[cfg(test)] +pub use chain::RangeSyncBatchConfig; pub use chain::{ChainId, EPOCHS_PER_BATCH}; #[cfg(test)] pub use chain_collection::SyncChainStatus;