diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index a08f34f707..e9a115904d 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -76,9 +76,7 @@ mod work_reprocessing_queue; mod worker; use crate::beacon_processor::work_reprocessing_queue::QueuedGossipBlock; -pub use worker::{ - ChainSegmentProcessId, FailureMode, GossipAggregatePackage, GossipAttestationPackage, -}; +pub use worker::{ChainSegmentProcessId, GossipAggregatePackage, GossipAttestationPackage}; /// The maximum size of the channel for work events to the `BeaconProcessor`. /// diff --git a/beacon_node/network/src/beacon_processor/worker/mod.rs b/beacon_node/network/src/beacon_processor/worker/mod.rs index 04147245ea..f907c49b7d 100644 --- a/beacon_node/network/src/beacon_processor/worker/mod.rs +++ b/beacon_node/network/src/beacon_processor/worker/mod.rs @@ -10,7 +10,7 @@ mod rpc_methods; mod sync_methods; pub use gossip_methods::{GossipAggregatePackage, GossipAttestationPackage}; -pub use sync_methods::{ChainSegmentProcessId, FailureMode}; +pub use sync_methods::ChainSegmentProcessId; pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 3b2429ee9b..760896e0e9 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -34,15 +34,6 @@ struct ChainSegmentFailed { message: String, /// Used to penalize peers. peer_action: Option, - /// Failure mode - mode: FailureMode, -} - -/// Represents if a block processing failure was on the consensus or execution side. -#[derive(Debug)] -pub enum FailureMode { - ExecutionLayer { pause_sync: bool }, - ConsensusLayer, } impl Worker { @@ -150,7 +141,9 @@ impl Worker { "last_block_slot" => end_slot, "processed_blocks" => sent_blocks, "service"=> "sync"); - BatchProcessResult::Success(sent_blocks > 0) + BatchProcessResult::Success { + was_non_empty: sent_blocks > 0, + } } (imported_blocks, Err(e)) => { debug!(self.log, "Batch processing failed"; @@ -161,11 +154,12 @@ impl Worker { "imported_blocks" => imported_blocks, "error" => %e.message, "service" => "sync"); - - BatchProcessResult::Failed { - imported_blocks: imported_blocks > 0, - peer_action: e.peer_action, - mode: e.mode, + match e.peer_action { + Some(penalty) => BatchProcessResult::FaultyFailure { + imported_blocks: imported_blocks > 0, + penalty, + }, + None => BatchProcessResult::NonFaultyFailure, } } } @@ -184,7 +178,9 @@ impl Worker { "last_block_slot" => end_slot, "processed_blocks" => sent_blocks, "service"=> "sync"); - BatchProcessResult::Success(sent_blocks > 0) + BatchProcessResult::Success { + was_non_empty: sent_blocks > 0, + } } (_, Err(e)) => { debug!(self.log, "Backfill batch processing failed"; @@ -193,10 +189,12 @@ impl Worker { "last_block_slot" => end_slot, "error" => %e.message, "service" => "sync"); - BatchProcessResult::Failed { - imported_blocks: false, - peer_action: e.peer_action, - mode: e.mode, + match e.peer_action { + Some(penalty) => BatchProcessResult::FaultyFailure { + imported_blocks: false, + penalty, + }, + None => BatchProcessResult::NonFaultyFailure, } } } @@ -216,15 +214,19 @@ impl Worker { { (imported_blocks, Err(e)) => { debug!(self.log, "Parent lookup failed"; "error" => %e.message); - BatchProcessResult::Failed { - imported_blocks: imported_blocks > 0, - peer_action: e.peer_action, - mode: e.mode, + match e.peer_action { + Some(penalty) => BatchProcessResult::FaultyFailure { + imported_blocks: imported_blocks > 0, + penalty, + }, + None => BatchProcessResult::NonFaultyFailure, } } (imported_blocks, Ok(_)) => { debug!(self.log, "Parent lookup processed successfully"); - BatchProcessResult::Success(imported_blocks > 0) + BatchProcessResult::Success { + was_non_empty: imported_blocks > 0, + } } } } @@ -307,7 +309,6 @@ impl Worker { message: String::from("mismatched_block_root"), // The peer is faulty if they send blocks with bad roots. peer_action: Some(PeerAction::LowToleranceError), - mode: FailureMode::ConsensusLayer, } } HistoricalBlockError::InvalidSignature @@ -322,7 +323,6 @@ impl Worker { message: "invalid_signature".into(), // The peer is faulty if they bad signatures. peer_action: Some(PeerAction::LowToleranceError), - mode: FailureMode::ConsensusLayer, } } HistoricalBlockError::ValidatorPubkeyCacheTimeout => { @@ -336,7 +336,6 @@ impl Worker { message: "pubkey_cache_timeout".into(), // This is an internal error, do not penalize the peer. peer_action: None, - mode: FailureMode::ConsensusLayer, } } HistoricalBlockError::NoAnchorInfo => { @@ -347,7 +346,6 @@ impl Worker { // There is no need to do a historical sync, this is not a fault of // the peer. peer_action: None, - mode: FailureMode::ConsensusLayer, } } HistoricalBlockError::IndexOutOfBounds => { @@ -360,7 +358,6 @@ impl Worker { message: String::from("logic_error"), // This should never occur, don't penalize the peer. peer_action: None, - mode: FailureMode::ConsensusLayer, } } HistoricalBlockError::BlockOutOfRange { .. } => { @@ -373,7 +370,6 @@ impl Worker { message: String::from("unexpected_error"), // This should never occur, don't penalize the peer. peer_action: None, - mode: FailureMode::ConsensusLayer, } } }, @@ -383,7 +379,6 @@ impl Worker { message: format!("{:?}", other), // This is an internal error, don't penalize the peer. peer_action: None, - mode: FailureMode::ConsensusLayer, } } }; @@ -404,7 +399,6 @@ impl Worker { message: format!("Block has an unknown parent: {}", block.parent_root()), // Peers are faulty if they send non-sequential blocks. peer_action: Some(PeerAction::LowToleranceError), - mode: FailureMode::ConsensusLayer, }) } BlockError::BlockIsAlreadyKnown => { @@ -442,7 +436,6 @@ impl Worker { ), // Peers are faulty if they send blocks from the future. peer_action: Some(PeerAction::LowToleranceError), - mode: FailureMode::ConsensusLayer, }) } BlockError::WouldRevertFinalizedSlot { .. } => { @@ -464,7 +457,6 @@ impl Worker { message: format!("Internal error whilst processing block: {:?}", e), // Do not penalize peers for internal errors. peer_action: None, - mode: FailureMode::ConsensusLayer, }) } ref err @ BlockError::ExecutionPayloadError(ref epe) => { @@ -480,7 +472,6 @@ impl Worker { message: format!("Execution layer offline. Reason: {:?}", err), // Do not penalize peers for internal errors. peer_action: None, - mode: FailureMode::ExecutionLayer { pause_sync: true }, }) } else { debug!(self.log, @@ -493,7 +484,6 @@ impl Worker { err ), peer_action: Some(PeerAction::LowToleranceError), - mode: FailureMode::ExecutionLayer { pause_sync: false }, }) } } @@ -508,7 +498,6 @@ impl Worker { message: format!("Peer sent invalid block. Reason: {:?}", other), // Do not penalize peers for internal errors. peer_action: None, - mode: FailureMode::ConsensusLayer, }) } } diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 7ff640065a..6767350ce3 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -8,10 +8,12 @@ //! If a batch fails, the backfill sync cannot progress. In this scenario, we mark the backfill //! sync as failed, log an error and attempt to retry once a new peer joins the node. -use crate::beacon_processor::{ChainSegmentProcessId, FailureMode, WorkEvent as BeaconWorkEvent}; +use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; use crate::sync::manager::{BatchProcessResult, Id}; use crate::sync::network_context::SyncNetworkContext; -use crate::sync::range_sync::{BatchConfig, BatchId, BatchInfo, BatchProcessingResult, BatchState}; +use crate::sync::range_sync::{ + BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, +}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use lighthouse_network::types::{BackFillState, NetworkGlobals}; use lighthouse_network::{PeerAction, PeerId}; @@ -324,10 +326,10 @@ impl BackFillSync { for id in batch_ids { if let Some(batch) = self.batches.get_mut(&id) { match batch.download_failed(false) { - Ok(true) => { + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { self.fail_sync(BackFillError::BatchDownloadFailed(id))?; } - Ok(false) => {} + Ok(BatchOperationOutcome::Continue) => {} Err(e) => { self.fail_sync(BackFillError::BatchInvalidState(id, e.0))?; } @@ -371,8 +373,10 @@ impl BackFillSync { } match batch.download_failed(true) { Err(e) => self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)), - Ok(true) => self.fail_sync(BackFillError::BatchDownloadFailed(batch_id)), - Ok(false) => self.retry_batch_download(network, batch_id), + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { + self.fail_sync(BackFillError::BatchDownloadFailed(batch_id)) + } + Ok(BatchOperationOutcome::Continue) => self.retry_batch_download(network, batch_id), } } else { // this could be an error for an old batch, removed when the chain advances @@ -439,7 +443,7 @@ impl BackFillSync { self.process_completed_batches(network) } Err(result) => { - let (expected_boundary, received_boundary, is_failed) = match result { + let (expected_boundary, received_boundary, outcome) = match result { Err(e) => { return self .fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)) @@ -450,7 +454,7 @@ impl BackFillSync { warn!(self.log, "Batch received out of range blocks"; "expected_boundary" => expected_boundary, "received_boundary" => received_boundary, "peer_id" => %peer_id, batch); - if is_failed { + if let BatchOperationOutcome::Failed { blacklist: _ } = outcome { error!(self.log, "Backfill failed"; "epoch" => batch_id, "received_boundary" => received_boundary, "expected_boundary" => expected_boundary); return self .fail_sync(BackFillError::BatchDownloadFailed(batch_id)) @@ -547,16 +551,7 @@ impl BackFillSync { // blocks to continue, and the chain is expecting a processing result that won't // arrive. To mitigate this, (fake) fail this processing so that the batch is // re-downloaded. - self.on_batch_process_result( - network, - batch_id, - &BatchProcessResult::Failed { - imported_blocks: false, - // The beacon processor queue is full, no need to penalize the peer. - peer_action: None, - mode: FailureMode::ConsensusLayer, - }, - ) + self.on_batch_process_result(network, batch_id, &BatchProcessResult::NonFaultyFailure) } else { Ok(ProcessResult::Successful) } @@ -575,7 +570,7 @@ impl BackFillSync { // The first two cases are possible in regular sync, should not occur in backfill, but we // keep this logic for handling potential processing race conditions. // result - match &self.current_processing_batch { + let batch = match &self.current_processing_batch { Some(processing_id) if *processing_id != batch_id => { debug!(self.log, "Unexpected batch result"; "batch_epoch" => batch_id, "expected_batch_epoch" => processing_id); @@ -589,13 +584,9 @@ impl BackFillSync { _ => { // batch_id matches, continue self.current_processing_batch = None; - } - } - match result { - BatchProcessResult::Success(was_non_empty) => { - let batch = match self.batches.get_mut(&batch_id) { - Some(v) => v, + match self.batches.get_mut(&batch_id) { + Some(batch) => batch, None => { // This is an error. Fail the sync algorithm. return self @@ -605,8 +596,27 @@ impl BackFillSync { ))) .map(|_| ProcessResult::Successful); } - }; + } + } + }; + let peer = match batch.current_peer() { + Some(v) => *v, + None => { + return self + .fail_sync(BackFillError::BatchInvalidState( + batch_id, + String::from("Peer does not exist"), + )) + .map(|_| ProcessResult::Successful) + } + }; + + debug!(self.log, "Backfill batch processed"; "result" => ?result, &batch, + "batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer)); + + match result { + BatchProcessResult::Success { was_non_empty } => { if let Err(e) = batch.processing_completed(BatchProcessingResult::Success) { self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?; } @@ -636,45 +646,17 @@ impl BackFillSync { self.process_completed_batches(network) } } - BatchProcessResult::Failed { + BatchProcessResult::FaultyFailure { imported_blocks, - peer_action, - mode: _, + penalty, } => { - let batch = match self.batches.get_mut(&batch_id) { - Some(v) => v, - None => { - return self - .fail_sync(BackFillError::InvalidSyncState(format!( - "Batch not found for current processing target {}", - batch_id - ))) - .map(|_| ProcessResult::Successful) - } - }; - - let peer = match batch.current_peer() { - Some(v) => *v, - None => { - return self - .fail_sync(BackFillError::BatchInvalidState( - batch_id, - String::from("Peer does not exist"), - )) - .map(|_| ProcessResult::Successful) - } - }; - debug!(self.log, "Batch processing failed"; "imported_blocks" => imported_blocks, - "batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer)); - match batch.processing_completed(BatchProcessingResult::Failed { - count_attempt: peer_action.is_some(), - }) { + match batch.processing_completed(BatchProcessingResult::FaultyFailure) { Err(e) => { // Batch was in the wrong state self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)) .map(|_| ProcessResult::Successful) } - Ok(true) => { + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { // check that we have not exceeded the re-process retry counter // If a batch has exceeded the invalid batch lookup attempts limit, it means // that it is likely all peers are sending invalid batches @@ -683,23 +665,18 @@ impl BackFillSync { warn!( self.log, "Backfill batch failed to download. Penalizing peers"; - "score_adjustment" => %peer_action - .as_ref() - .map(ToString::to_string) - .unwrap_or_else(|| "None".into()), + "score_adjustment" => %penalty, "batch_epoch"=> batch_id ); - if let Some(peer_action) = peer_action { - for peer in self.participating_peers.drain() { - network.report_peer(peer, *peer_action, "backfill_batch_failed"); - } + for peer in self.participating_peers.drain() { + network.report_peer(peer, *penalty, "backfill_batch_failed"); } self.fail_sync(BackFillError::BatchProcessingFailed(batch_id)) .map(|_| ProcessResult::Successful) } - Ok(false) => { + Ok(BatchOperationOutcome::Continue) => { // chain can continue. Check if it can be progressed if *imported_blocks { // At least one block was successfully verified and imported, then we can be sure all @@ -713,6 +690,14 @@ impl BackFillSync { } } } + BatchProcessResult::NonFaultyFailure => { + if let Err(e) = batch.processing_completed(BatchProcessingResult::NonFaultyFailure) + { + self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?; + } + self.retry_batch_download(network, batch_id) + .map(|_| ProcessResult::Successful) + } } } @@ -905,11 +890,11 @@ impl BackFillSync { .validation_failed() .map_err(|e| BackFillError::BatchInvalidState(batch_id, e.0))? { - true => { + BatchOperationOutcome::Failed { blacklist: _ } => { // Batch has failed and cannot be redownloaded. return self.fail_sync(BackFillError::BatchProcessingFailed(batch_id)); } - false => { + BatchOperationOutcome::Continue => { redownload_queue.push(*id); } } @@ -1010,8 +995,12 @@ impl BackFillSync { Err(e) => { self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))? } - Ok(true) => self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))?, - Ok(false) => return self.retry_batch_download(network, batch_id), + Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { + self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))? + } + Ok(BatchOperationOutcome::Continue) => { + return self.retry_batch_download(network, batch_id) + } } } } diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 2aa4acdb5a..9f2a5fdce7 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -11,7 +11,7 @@ use std::sync::Arc; use store::{Hash256, SignedBeaconBlock}; use tokio::sync::mpsc; -use crate::beacon_processor::{ChainSegmentProcessId, FailureMode, WorkEvent}; +use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent}; use crate::metrics; use self::{ @@ -610,35 +610,26 @@ impl BlockLookups { chain_hash ); #[cfg(not(debug_assertions))] - return crit!(self.log, "Chain process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash); + return debug!(self.log, "Chain process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash); }; debug!(self.log, "Parent chain processed"; "chain_hash" => %chain_hash, "result" => ?result); match result { - BatchProcessResult::Success(_) => { + BatchProcessResult::Success { .. } => { // nothing to do. } - BatchProcessResult::Failed { + BatchProcessResult::FaultyFailure { imported_blocks: _, - peer_action, - mode, + penalty, } => { - if let FailureMode::ExecutionLayer { pause_sync: _ } = mode { - debug!( - self.log, - "Chain segment processing failed. Execution layer is offline"; - "chain_hash" => %chain_hash, - "error" => ?mode - ); - } else { - self.failed_chains.insert(parent_lookup.chain_hash()); - if let Some(peer_action) = peer_action { - for &peer_id in parent_lookup.used_peers() { - cx.report_peer(peer_id, peer_action, "parent_chain_failure") - } - } + self.failed_chains.insert(parent_lookup.chain_hash()); + for &peer_id in parent_lookup.used_peers() { + cx.report_peer(peer_id, penalty, "parent_chain_failure") } } + BatchProcessResult::NonFaultyFailure => { + // We might request this chain again if there is need but otherwise, don't try again + } } metrics::set_gauge( diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index b3afadda2c..2f2720fd1e 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -284,7 +284,10 @@ fn test_parent_lookup_happy_path() { // Processing succeeds, now the rest of the chain should be sent for processing. bl.parent_block_processed(chain_hash, BlockError::BlockIsAlreadyKnown.into(), &mut cx); rig.expect_parent_chain_process(); - bl.parent_chain_processed(chain_hash, BatchProcessResult::Success(true), &mut cx); + let process_result = BatchProcessResult::Success { + was_non_empty: true, + }; + bl.parent_chain_processed(chain_hash, process_result, &mut cx); assert_eq!(bl.parent_queue.len(), 0); } @@ -318,7 +321,10 @@ fn test_parent_lookup_wrong_response() { // Processing succeeds, now the rest of the chain should be sent for processing. bl.parent_block_processed(chain_hash, Ok(()).into(), &mut cx); rig.expect_parent_chain_process(); - bl.parent_chain_processed(chain_hash, BatchProcessResult::Success(true), &mut cx); + let process_result = BatchProcessResult::Success { + was_non_empty: true, + }; + bl.parent_chain_processed(chain_hash, process_result, &mut cx); assert_eq!(bl.parent_queue.len(), 0); } @@ -347,7 +353,10 @@ fn test_parent_lookup_empty_response() { // Processing succeeds, now the rest of the chain should be sent for processing. bl.parent_block_processed(chain_hash, Ok(()).into(), &mut cx); rig.expect_parent_chain_process(); - bl.parent_chain_processed(chain_hash, BatchProcessResult::Success(true), &mut cx); + let process_result = BatchProcessResult::Success { + was_non_empty: true, + }; + bl.parent_chain_processed(chain_hash, process_result, &mut cx); assert_eq!(bl.parent_queue.len(), 0); } @@ -375,7 +384,10 @@ fn test_parent_lookup_rpc_failure() { // Processing succeeds, now the rest of the chain should be sent for processing. bl.parent_block_processed(chain_hash, Ok(()).into(), &mut cx); rig.expect_parent_chain_process(); - bl.parent_chain_processed(chain_hash, BatchProcessResult::Success(true), &mut cx); + let process_result = BatchProcessResult::Success { + was_non_empty: true, + }; + bl.parent_chain_processed(chain_hash, process_result, &mut cx); assert_eq!(bl.parent_queue.len(), 0); } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index fe27a33c5c..64755300c3 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -38,7 +38,7 @@ use super::block_lookups::BlockLookups; use super::network_context::SyncNetworkContext; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; -use crate::beacon_processor::{ChainSegmentProcessId, FailureMode, WorkEvent as BeaconWorkEvent}; +use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; @@ -139,13 +139,15 @@ pub enum BlockProcessResult { #[derive(Debug)] pub enum BatchProcessResult { /// The batch was completed successfully. It carries whether the sent batch contained blocks. - Success(bool), - /// The batch processing failed. It carries whether the processing imported any block. - Failed { - imported_blocks: bool, - peer_action: Option, - mode: FailureMode, + Success { + was_non_empty: bool, }, + /// The batch processing failed. It carries whether the processing imported any block. + FaultyFailure { + imported_blocks: bool, + penalty: PeerAction, + }, + NonFaultyFailure, } /// The primary object for handling and driving all the current syncing logic. It maintains the diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index 7a891de728..dc18a5c981 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -9,4 +9,4 @@ mod peer_sync_info; mod range_sync; pub use manager::{BatchProcessResult, SyncMessage}; -pub use range_sync::ChainId; +pub use range_sync::{BatchOperationOutcome, ChainId}; diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index c642d81db8..3eee7223db 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -70,12 +70,16 @@ impl BatchConfig for RangeSyncBatchConfig { // Such errors should never be encountered. pub struct WrongState(pub(crate) String); -/// Auxiliary type alias for readability. -type IsFailed = bool; +/// After batch operations, we use this to communicate whether a batch can continue or not +pub enum BatchOperationOutcome { + Continue, + Failed { blacklist: bool }, +} pub enum BatchProcessingResult { Success, - Failed { count_attempt: bool }, + FaultyFailure, + NonFaultyFailure, } /// A segment of a chain. @@ -87,7 +91,7 @@ pub struct BatchInfo { /// The `Attempts` that have been made and failed to send us this batch. failed_processing_attempts: Vec, /// Number of processing attempts that have failed but we do not count. - other_failed_processing_attempts: u8, + non_faulty_processing_attempts: u8, /// The number of download retries this batch has undergone due to a failed request. failed_download_attempts: Vec, /// State of the batch. @@ -124,14 +128,6 @@ impl BatchState { pub fn poison(&mut self) -> BatchState { std::mem::replace(self, BatchState::Poisoned) } - - pub fn is_failed(&self) -> IsFailed { - match self { - BatchState::Failed => true, - BatchState::Poisoned => unreachable!("Poisoned batch"), - _ => false, - } - } } impl BatchInfo { @@ -151,7 +147,7 @@ impl BatchInfo { end_slot, failed_processing_attempts: Vec::new(), failed_download_attempts: Vec::new(), - other_failed_processing_attempts: 0, + non_faulty_processing_attempts: 0, state: BatchState::AwaitingDownload, marker: std::marker::PhantomData, } @@ -175,7 +171,16 @@ impl BatchInfo { peers } - /// Verifies if an incomming block belongs to this batch. + /// Return the number of times this batch has failed downloading and failed processing, in this + /// order. + pub fn failed_attempts(&self) -> (usize, usize) { + ( + self.failed_download_attempts.len(), + self.failed_processing_attempts.len(), + ) + } + + /// Verifies if an incoming block belongs to this batch. pub fn is_expecting_block(&self, peer_id: &PeerId, request_id: &Id) -> bool { if let BatchState::Downloading(expected_peer, _, expected_id) = &self.state { return peer_id == expected_peer && expected_id == request_id; @@ -203,6 +208,20 @@ impl BatchInfo { } } + /// After different operations over a batch, this could be in a state that allows it to + /// continue, or in failed state. When the batch has failed, we check if it did mainly due to + /// processing failures. In this case the batch is considered failed and faulty. + pub fn outcome(&self) -> BatchOperationOutcome { + match self.state { + BatchState::Poisoned => unreachable!("Poisoned batch"), + BatchState::Failed => BatchOperationOutcome::Failed { + blacklist: self.failed_processing_attempts.len() + > self.failed_download_attempts.len(), + }, + _ => BatchOperationOutcome::Continue, + } + } + pub fn state(&self) -> &BatchState { &self.state } @@ -235,7 +254,10 @@ impl BatchInfo { #[must_use = "Batch may have failed"] pub fn download_completed( &mut self, - ) -> Result> { + ) -> Result< + usize, /* Received blocks */ + Result<(Slot, Slot, BatchOperationOutcome), WrongState>, + > { match self.state.poison() { BatchState::Downloading(peer, blocks, _request_id) => { // verify that blocks are in range @@ -264,7 +286,7 @@ impl BatchInfo { BatchState::AwaitingDownload }; - return Err(Ok((expected, received, self.state.is_failed()))); + return Err(Ok((expected, received, self.outcome()))); } } @@ -289,7 +311,10 @@ impl BatchInfo { /// THe `mark_failed` parameter, when set to false, does not increment the failed attempts of /// this batch and register the peer, rather attempts a re-download. #[must_use = "Batch may have failed"] - pub fn download_failed(&mut self, mark_failed: bool) -> Result { + pub fn download_failed( + &mut self, + mark_failed: bool, + ) -> Result { match self.state.poison() { BatchState::Downloading(peer, _, _request_id) => { // register the attempt and check if the batch can be tried again @@ -304,7 +329,7 @@ impl BatchInfo { // drop the blocks BatchState::AwaitingDownload }; - Ok(self.state.is_failed()) + Ok(self.outcome()) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { @@ -359,32 +384,31 @@ impl BatchInfo { pub fn processing_completed( &mut self, procesing_result: BatchProcessingResult, - ) -> Result { + ) -> Result { match self.state.poison() { BatchState::Processing(attempt) => { self.state = match procesing_result { BatchProcessingResult::Success => BatchState::AwaitingValidation(attempt), - BatchProcessingResult::Failed { count_attempt } => { - if count_attempt { - // register the failed attempt - self.failed_processing_attempts.push(attempt); + BatchProcessingResult::FaultyFailure => { + // register the failed attempt + self.failed_processing_attempts.push(attempt); - // check if the batch can be downloaded again - if self.failed_processing_attempts.len() - >= B::max_batch_processing_attempts() as usize - { - BatchState::Failed - } else { - BatchState::AwaitingDownload - } + // check if the batch can be downloaded again + if self.failed_processing_attempts.len() + >= B::max_batch_processing_attempts() as usize + { + BatchState::Failed } else { - self.other_failed_processing_attempts = - self.other_failed_processing_attempts.saturating_add(1); BatchState::AwaitingDownload } } + BatchProcessingResult::NonFaultyFailure => { + self.non_faulty_processing_attempts = + self.non_faulty_processing_attempts.saturating_add(1); + BatchState::AwaitingDownload + } }; - Ok(self.state.is_failed()) + Ok(self.outcome()) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { @@ -398,7 +422,7 @@ impl BatchInfo { } #[must_use = "Batch may have failed"] - pub fn validation_failed(&mut self) -> Result { + pub fn validation_failed(&mut self) -> Result { match self.state.poison() { BatchState::AwaitingValidation(attempt) => { self.failed_processing_attempts.push(attempt); @@ -411,7 +435,7 @@ impl BatchInfo { } else { BatchState::AwaitingDownload }; - Ok(self.state.is_failed()) + Ok(self.outcome()) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { @@ -472,10 +496,7 @@ impl slog::KV for BatchInfo { )?; serializer.emit_usize("downloaded", self.failed_download_attempts.len())?; serializer.emit_usize("processed", self.failed_processing_attempts.len())?; - serializer.emit_u8( - "processed_no_penalty", - self.other_failed_processing_attempts, - )?; + serializer.emit_u8("processed_no_penalty", self.non_faulty_processing_attempts)?; serializer.emit_arguments("state", &format_args!("{:?}", self.state))?; slog::Result::Ok(()) } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index caa08165a9..a54105f5cb 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,7 +1,8 @@ use super::batch::{BatchInfo, BatchProcessingResult, BatchState}; -use crate::beacon_processor::WorkEvent as BeaconWorkEvent; -use crate::beacon_processor::{ChainSegmentProcessId, FailureMode}; -use crate::sync::{manager::Id, network_context::SyncNetworkContext, BatchProcessResult}; +use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; +use crate::sync::{ + manager::Id, network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult, +}; use beacon_chain::{BeaconChainTypes, CountUnrealized}; use fnv::FnvHashMap; use lighthouse_network::{PeerAction, PeerId}; @@ -37,7 +38,11 @@ pub type ProcessingResult = Result; pub enum RemoveChain { EmptyPeerPool, ChainCompleted, - ChainFailed(BatchId), + /// A chain has failed. This boolean signals whether the chain should be blacklisted. + ChainFailed { + blacklist: bool, + failing_batch: BatchId, + }, WrongBatchState(String), WrongChainState(String), } @@ -187,8 +192,13 @@ impl SyncingChain { // fail the batches for id in batch_ids { if let Some(batch) = self.batches.get_mut(&id) { - if batch.download_failed(true)? { - return Err(RemoveChain::ChainFailed(id)); + if let BatchOperationOutcome::Failed { blacklist } = + batch.download_failed(true)? + { + return Err(RemoveChain::ChainFailed { + blacklist, + failing_batch: id, + }); } self.retry_batch_download(network, id)?; } else { @@ -265,12 +275,15 @@ impl SyncingChain { self.process_completed_batches(network) } Err(result) => { - let (expected_boundary, received_boundary, is_failed) = result?; + let (expected_boundary, received_boundary, outcome) = result?; warn!(self.log, "Batch received out of range blocks"; "expected_boundary" => expected_boundary, "received_boundary" => received_boundary, "peer_id" => %peer_id, batch); - if is_failed { - return Err(RemoveChain::ChainFailed(batch_id)); + if let BatchOperationOutcome::Failed { blacklist } = outcome { + return Err(RemoveChain::ChainFailed { + blacklist, + failing_batch: batch_id, + }); } // this batch can't be used, so we need to request it again. self.retry_batch_download(network, batch_id) @@ -324,15 +337,7 @@ impl SyncingChain { // blocks to continue, and the chain is expecting a processing result that won't // arrive. To mitigate this, (fake) fail this processing so that the batch is // re-downloaded. - self.on_batch_process_result( - network, - batch_id, - &BatchProcessResult::Failed { - imported_blocks: false, - peer_action: None, - mode: FailureMode::ConsensusLayer, - }, - ) + self.on_batch_process_result(network, batch_id, &BatchProcessResult::NonFaultyFailure) } else { Ok(KeepChain) } @@ -448,7 +453,7 @@ impl SyncingChain { ) -> ProcessingResult { // the first two cases are possible if the chain advances while waiting for a processing // result - match &self.current_processing_batch { + let batch = match &self.current_processing_batch { Some(processing_id) if *processing_id != batch_id => { debug!(self.log, "Unexpected batch result"; "batch_epoch" => batch_id, "expected_batch_epoch" => processing_id); @@ -462,22 +467,35 @@ impl SyncingChain { _ => { // batch_id matches, continue self.current_processing_batch = None; - } - } - - match result { - BatchProcessResult::Success(was_non_empty) => { - let batch = self.batches.get_mut(&batch_id).ok_or_else(|| { + self.batches.get_mut(&batch_id).ok_or_else(|| { RemoveChain::WrongChainState(format!( "Current processing batch not found: {}", batch_id )) - })?; + })? + } + }; + let peer = batch.current_peer().cloned().ok_or_else(|| { + RemoveChain::WrongBatchState(format!( + "Processing target is in wrong state: {:?}", + batch.state(), + )) + })?; + + // Log the process result and the batch for debugging purposes. + debug!(self.log, "Batch processing result"; "result" => ?result, &batch, + "batch_epoch" => batch_id, "client" => %network.client_type(&peer)); + + // We consider three cases. Batch was successfully processed, Batch failed processing due + // to a faulty peer, or batch failed processing but the peer can't be deemed faulty. + match result { + BatchProcessResult::Success { was_non_empty } => { batch.processing_completed(BatchProcessingResult::Success)?; - // If the processed batch was not empty, we can validate previous unvalidated - // blocks. + if *was_non_empty { + // If the processed batch was not empty, we can validate previous unvalidated + // blocks. self.advance_chain(network, batch_id); // we register so that on chain switching we don't try it again self.attempted_optimistic_starts.insert(batch_id); @@ -507,64 +525,56 @@ impl SyncingChain { self.process_completed_batches(network) } } - BatchProcessResult::Failed { + BatchProcessResult::FaultyFailure { imported_blocks, - peer_action, - mode: _, + penalty, } => { - let batch = self.batches.get_mut(&batch_id).ok_or_else(|| { - RemoveChain::WrongChainState(format!( - "Batch not found for current processing target {}", - batch_id - )) - })?; - let peer = batch.current_peer().cloned().ok_or_else(|| { - RemoveChain::WrongBatchState(format!( - "Processing target is in wrong state: {:?}", - batch.state(), - )) - })?; - debug!(self.log, "Batch processing failed"; "imported_blocks" => imported_blocks, "peer_penalty" => ?peer_action, - "batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer)); + // Penalize the peer appropiately. + network.report_peer(peer, *penalty, "faulty_batch"); - if batch.processing_completed(BatchProcessingResult::Failed { - count_attempt: peer_action.is_some(), - })? { - // check that we have not exceeded the re-process retry counter - // If a batch has exceeded the invalid batch lookup attempts limit, it means - // that it is likely all peers in this chain are are sending invalid batches - // repeatedly and are either malicious or faulty. We drop the chain and - // report all peers. - // There are some edge cases with forks that could land us in this situation. - // This should be unlikely, so we tolerate these errors, but not often. - warn!( - self.log, - "Batch failed to download. Dropping chain scoring peers"; - "score_adjustment" => %peer_action - .as_ref() - .map(ToString::to_string) - .unwrap_or_else(|| "None".into()), - "batch_epoch"=> batch_id - ); - - if let Some(peer_action) = peer_action { - for (peer, _) in self.peers.drain() { - network.report_peer(peer, *peer_action, "batch_failed"); + // Check if this batch is allowed to continue + match batch.processing_completed(BatchProcessingResult::FaultyFailure)? { + BatchOperationOutcome::Continue => { + // Chain can continue. Check if it can be moved forward. + if *imported_blocks { + // At least one block was successfully verified and imported, so we can be sure all + // previous batches are valid and we only need to download the current failed + // batch. + self.advance_chain(network, batch_id); } + // Handle this invalid batch, that is within the re-process retries limit. + self.handle_invalid_batch(network, batch_id) } - Err(RemoveChain::ChainFailed(batch_id)) - } else { - // chain can continue. Check if it can be moved forward - if *imported_blocks { - // At least one block was successfully verified and imported, so we can be sure all - // previous batches are valid and we only need to download the current failed - // batch. - self.advance_chain(network, batch_id); + BatchOperationOutcome::Failed { blacklist } => { + // Check that we have not exceeded the re-process retry counter, + // If a batch has exceeded the invalid batch lookup attempts limit, it means + // that it is likely all peers in this chain are are sending invalid batches + // repeatedly and are either malicious or faulty. We drop the chain and + // report all peers. + // There are some edge cases with forks that could land us in this situation. + // This should be unlikely, so we tolerate these errors, but not often. + warn!( + self.log, + "Batch failed to download. Dropping chain scoring peers"; + "score_adjustment" => %penalty, + "batch_epoch"=> batch_id, + ); + + for (peer, _) in self.peers.drain() { + network.report_peer(peer, *penalty, "faulty_chain"); + } + Err(RemoveChain::ChainFailed { + blacklist, + failing_batch: batch_id, + }) } - // Handle this invalid batch, that is within the re-process retries limit. - self.handle_invalid_batch(network, batch_id) } } + BatchProcessResult::NonFaultyFailure => { + batch.processing_completed(BatchProcessingResult::NonFaultyFailure)?; + // Simply redownload the batch. + self.retry_batch_download(network, batch_id) + } } } @@ -737,9 +747,12 @@ impl SyncingChain { let mut redownload_queue = Vec::new(); for (id, batch) in self.batches.range_mut(..batch_id) { - if batch.validation_failed()? { + if let BatchOperationOutcome::Failed { blacklist } = batch.validation_failed()? { // remove the chain early - return Err(RemoveChain::ChainFailed(batch_id)); + return Err(RemoveChain::ChainFailed { + blacklist, + failing_batch: *id, + }); } redownload_queue.push(*id); } @@ -836,8 +849,11 @@ impl SyncingChain { if let Some(active_requests) = self.peers.get_mut(peer_id) { active_requests.remove(&batch_id); } - if batch.download_failed(true)? { - return Err(RemoveChain::ChainFailed(batch_id)); + if let BatchOperationOutcome::Failed { blacklist } = batch.download_failed(true)? { + return Err(RemoveChain::ChainFailed { + blacklist, + failing_batch: batch_id, + }); } self.retry_batch_download(network, batch_id) } else { @@ -925,10 +941,16 @@ impl SyncingChain { self.peers .get_mut(&peer) .map(|request| request.remove(&batch_id)); - if batch.download_failed(true)? { - return Err(RemoveChain::ChainFailed(batch_id)); - } else { - return self.retry_batch_download(network, batch_id); + match batch.download_failed(true)? { + BatchOperationOutcome::Failed { blacklist } => { + return Err(RemoveChain::ChainFailed { + blacklist, + failing_batch: batch_id, + }) + } + BatchOperationOutcome::Continue => { + return self.retry_batch_download(network, batch_id) + } } } } diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index 31122d59a1..f4db32bc96 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -8,7 +8,7 @@ mod chain_collection; mod range; mod sync_type; -pub use batch::{BatchConfig, BatchInfo, BatchProcessingResult, BatchState}; +pub use batch::{BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState}; pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH}; pub use range::RangeSync; pub use sync_type::RangeSyncType; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index f08f8eb82a..4b29d31295 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -356,8 +356,8 @@ where debug!(self.log, "Chain removed"; "sync_type" => ?sync_type, &chain, "reason" => ?remove_reason, "op" => op); } - if let RemoveChain::ChainFailed(_) = remove_reason { - if RangeSyncType::Finalized == sync_type { + if let RemoveChain::ChainFailed { blacklist, .. } = remove_reason { + if RangeSyncType::Finalized == sync_type && blacklist { warn!(self.log, "Chain failed! Syncing to its head won't be retried for at least the next {} seconds", FAILED_CHAINS_EXPIRY_SECONDS; &chain); self.failed_chains.insert(chain.target_head_root); }