diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index e76c037dad..be750e25f0 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -11,7 +11,7 @@ use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; use crate::sync::manager::{BatchProcessResult, Id}; use crate::sync::network_context::SyncNetworkContext; -use crate::sync::range_sync::{BatchConfig, BatchId, BatchInfo, BatchState}; +use crate::sync::range_sync::{BatchConfig, BatchId, BatchInfo, BatchProcessingResult, BatchState}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use lighthouse_network::types::{BackFillState, NetworkGlobals}; use lighthouse_network::{PeerAction, PeerId}; @@ -606,7 +606,7 @@ impl BackFillSync { } }; - if let Err(e) = batch.processing_completed(true) { + if let Err(e) = batch.processing_completed(BatchProcessingResult::Success) { self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?; } // If the processed batch was not empty, we can validate previous unvalidated @@ -664,7 +664,9 @@ impl BackFillSync { }; debug!(self.log, "Batch processing failed"; "imported_blocks" => imported_blocks, "batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer)); - match batch.processing_completed(false) { + match batch.processing_completed(BatchProcessingResult::Failed { + count_attempt: peer_action.is_some(), + }) { Err(e) => { // Batch was in the wrong state self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)) diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 614bf57dd0..aaebe022c7 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -72,6 +72,11 @@ pub struct WrongState(pub(crate) String); /// Auxiliary type alias for readability. type IsFailed = bool; +pub enum BatchProcessingResult { + Success, + Failed { count_attempt: bool }, +} + /// A segment of a chain. pub struct BatchInfo { /// Start slot of the batch. @@ -80,6 +85,8 @@ pub struct BatchInfo { end_slot: Slot, /// The `Attempts` that have been made and failed to send us this batch. failed_processing_attempts: Vec, + /// Number of processing attempts that have failed but we do not count. + other_failed_processing_attempts: u8, /// The number of download retries this batch has undergone due to a failed request. failed_download_attempts: Vec, /// State of the batch. @@ -143,6 +150,7 @@ impl BatchInfo { end_slot, failed_processing_attempts: Vec::new(), failed_download_attempts: Vec::new(), + other_failed_processing_attempts: 0, state: BatchState::AwaitingDownload, marker: std::marker::PhantomData, } @@ -348,23 +356,33 @@ impl BatchInfo { } #[must_use = "Batch may have failed"] - pub fn processing_completed(&mut self, was_sucessful: bool) -> Result { + pub fn processing_completed( + &mut self, + procesing_result: BatchProcessingResult, + ) -> Result { match self.state.poison() { BatchState::Processing(attempt) => { - self.state = if !was_sucessful { - // register the failed attempt - self.failed_processing_attempts.push(attempt); + self.state = match procesing_result { + BatchProcessingResult::Success => BatchState::AwaitingValidation(attempt), + BatchProcessingResult::Failed { count_attempt } => { + if count_attempt { + // register the failed attempt + self.failed_processing_attempts.push(attempt); - // check if the batch can be downloaded again - if self.failed_processing_attempts.len() - >= B::max_batch_processing_attempts() as usize - { - BatchState::Failed - } else { - BatchState::AwaitingDownload + // check if the batch can be downloaded again + if self.failed_processing_attempts.len() + >= B::max_batch_processing_attempts() as usize + { + BatchState::Failed + } else { + BatchState::AwaitingDownload + } + } else { + self.other_failed_processing_attempts = + self.other_failed_processing_attempts.saturating_add(1); + BatchState::AwaitingDownload + } } - } else { - BatchState::AwaitingValidation(attempt) }; Ok(self.state.is_failed()) } @@ -451,6 +469,10 @@ impl slog::KV for BatchInfo { )?; serializer.emit_usize("downloaded", self.failed_download_attempts.len())?; serializer.emit_usize("processed", self.failed_processing_attempts.len())?; + serializer.emit_u8( + "processed_no_penalty", + self.other_failed_processing_attempts, + )?; serializer.emit_arguments("state", &format_args!("{:?}", self.state))?; slog::Result::Ok(()) } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 9f4142dd66..88837d0e12 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,4 +1,4 @@ -use super::batch::{BatchInfo, BatchState}; +use super::batch::{BatchInfo, BatchProcessingResult, BatchState}; use crate::beacon_processor::ChainSegmentProcessId; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; use crate::sync::{manager::Id, network_context::SyncNetworkContext, BatchProcessResult}; @@ -463,7 +463,7 @@ impl SyncingChain { )) })?; - batch.processing_completed(true)?; + batch.processing_completed(BatchProcessingResult::Success)?; // If the processed batch was not empty, we can validate previous unvalidated // blocks. if *was_non_empty { @@ -512,9 +512,12 @@ impl SyncingChain { batch.state(), )) })?; - debug!(self.log, "Batch processing failed"; "imported_blocks" => imported_blocks, + debug!(self.log, "Batch processing failed"; "imported_blocks" => imported_blocks, "peer_penalty" => ?peer_action, "batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer)); - if batch.processing_completed(false)? { + + if batch.processing_completed(BatchProcessingResult::Failed { + count_attempt: peer_action.is_some(), + })? { // check that we have not exceeded the re-process retry counter // If a batch has exceeded the invalid batch lookup attempts limit, it means // that it is likely all peers in this chain are are sending invalid batches diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 512f7a989a..7ddfc3f70a 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -407,7 +407,6 @@ impl ChainCollection { local_info: &SyncInfo, awaiting_head_peers: &mut HashMap, ) { - debug!(self.log, "Purging chains"); let local_finalized_slot = local_info .finalized_epoch .start_slot(T::EthSpec::slots_per_epoch()); @@ -416,10 +415,7 @@ impl ChainCollection { let log_ref = &self.log; let is_outdated = |target_slot: &Slot, target_root: &Hash256| { - let is = - target_slot <= &local_finalized_slot || beacon_chain.is_block_known(target_root); - debug!(log_ref, "Chain is outdated {}", is); - is + target_slot <= &local_finalized_slot || beacon_chain.is_block_known(target_root) }; // Retain only head peers that remain relevant diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index b4a27c23c7..31122d59a1 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -8,7 +8,7 @@ mod chain_collection; mod range; mod sync_type; -pub use batch::{BatchConfig, BatchInfo, BatchState}; +pub use batch::{BatchConfig, BatchInfo, BatchProcessingResult, BatchState}; pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH}; pub use range::RangeSync; pub use sync_type::RangeSyncType;