diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 988a68c9dd..cc4fca0c23 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -4,11 +4,13 @@ use crate::sync::BatchProcessResult; use crate::sync::manager::CustodyBatchProcessResult; use crate::sync::{ ChainId, - manager::{BlockProcessType, SyncMessage}, + manager::{BlockProcessType, BlockProcessingResult, SyncMessage, WhichPeerToPenalize}, }; use beacon_chain::block_verification_types::LookupBlock; use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock}; -use beacon_chain::data_availability_checker::AvailabilityCheckError; +use beacon_chain::data_availability_checker::{ + AvailabilityCheckError, AvailabilityCheckErrorCategory, +}; use beacon_chain::historical_data_columns::HistoricalDataColumnError; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChainTypes, BlockError, ChainSegmentResult, @@ -90,10 +92,17 @@ impl NetworkBeaconProcessor { ); // A closure which will ignore the block. let ignore_fn = move || { + warn!( + ?process_type, + "Block processing task dropped, cpu might be overloaded" + ); // Sync handles these results self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type, - result: crate::sync::manager::BlockProcessingResult::Ignored, + result: BlockProcessingResult::Error { + penalty: None, + reason: "processor_overloaded", + }, }); }; (process_fn, Box::new(ignore_fn)) @@ -232,9 +241,10 @@ impl NetworkBeaconProcessor { } // Sync handles these results + let result = classify_processing_result(result, &process_type); self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type, - result: result.into(), + result, }); // Drop the handle to remove the entry from the cache @@ -343,9 +353,10 @@ impl NetworkBeaconProcessor { } // Sync handles these results + let result = classify_processing_result(result, &process_type); self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type, - result: result.into(), + result, }); } @@ -420,9 +431,10 @@ impl NetworkBeaconProcessor { Err(_) => {} } + let result = classify_processing_result(result, &process_type); self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type, - result: result.into(), + result, }); } @@ -1003,3 +1015,76 @@ impl NetworkBeaconProcessor { } } } + +/// Translate the beacon-chain processing outcome into a `BlockProcessingResult` the lookup state +/// machine can act on directly. The policy decisions about *whether* and *which peer-class* to +/// penalize live here, on the producer side, so consumers only need to resolve the symbolic +/// `WhichPeerToPenalize` to an actual peer id at penalty time. +fn classify_processing_result( + result: Result, + process_type: &BlockProcessType, +) -> BlockProcessingResult { + let e = match result { + Ok(AvailabilityProcessingStatus::Imported(_)) => { + return BlockProcessingResult::Imported("imported"); + } + Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => { + return BlockProcessingResult::Imported("missing_components"); + } + Err(BlockError::DuplicateFullyImported(_)) => { + return BlockProcessingResult::Imported("duplicate"); + } + Err(BlockError::GenesisBlock) => { + return BlockProcessingResult::Imported("genesis"); + } + Err(e) => e, + }; + + // Non-attributable failures. + let no_penalty = |reason| BlockProcessingResult::Error { + penalty: None, + reason, + }; + match &e { + BlockError::BeaconChainError(_) => return no_penalty("beacon_chain_error"), + BlockError::DuplicateImportStatusUnknown(_) => { + return no_penalty("duplicate_unknown_status"); + } + BlockError::AvailabilityCheck(inner) + if inner.category() == AvailabilityCheckErrorCategory::Internal => + { + return no_penalty("availability_internal"); + } + BlockError::ExecutionPayloadError(epe) if !epe.penalize_peer() => { + return no_penalty("execution_payload"); + } + BlockError::ParentUnknown { .. } => return no_penalty("parent_unknown"), + // Bad-column attribution: only meaningful for the data path, but classify uniformly — + // block-side processing won't produce this variant. + BlockError::AvailabilityCheck(AvailabilityCheckError::InvalidColumn((Some(idx), _))) => { + return BlockProcessingResult::Error { + penalty: Some(( + PeerAction::MidToleranceError, + WhichPeerToPenalize::CustodyPeerForColumn(*idx as u64), + )), + reason: "lookup_data_processing_failure", + }; + } + _ => {} + } + + // Attributable to the block peer (which is also the data peer pre-Gloas). + let reason = match process_type { + BlockProcessType::SingleBlock { .. } => "lookup_block_processing_failure", + BlockProcessType::SingleBlob { .. } | BlockProcessType::SingleCustodyColumn(_) => { + "lookup_data_processing_failure" + } + }; + BlockProcessingResult::Error { + penalty: Some(( + PeerAction::MidToleranceError, + WhichPeerToPenalize::BlockPeer, + )), + reason, + } +} diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 86f1694342..069ca611ab 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -29,13 +29,10 @@ use crate::metrics; use crate::sync::SyncMessage; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; use crate::sync::block_lookups::single_block_lookup::PeerType; -use beacon_chain::data_availability_checker::{ - AvailabilityCheckError, AvailabilityCheckErrorCategory, -}; -use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; +use beacon_chain::BeaconChainTypes; use fnv::FnvHashMap; +use lighthouse_network::PeerId; use lighthouse_network::service::api_types::SingleLookupReqId; -use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; use std::collections::hash_map::Entry; use std::sync::Arc; @@ -543,69 +540,16 @@ impl BlockLookups { debug!(id = lookup_id, "Unknown single block lookup"); return Err(LookupRequestError::UnknownLookup); }; - - let block_root = lookup.block_root(); - debug!( - ?block_root, + block_root = ?lookup.block_root(), id = lookup_id, ?result, "Received block processing result" ); - - match result { - // Block processed successfully (imported or missing components — both are ok since - // we send the block alone first, data follows independently) - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_)) - | BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents { - .. - }) - | BlockProcessingResult::Err(BlockError::DuplicateFullyImported(..)) - | BlockProcessingResult::Err(BlockError::GenesisBlock) => { - lookup.on_block_processing_result(true, cx) - } - BlockProcessingResult::Ignored => { - warn!("Block processing ignored, cpu might be overloaded"); - Err(LookupRequestError::Failed( - "Block processing ignored".to_owned(), - )) - } - BlockProcessingResult::Err(e) => { - debug!(?block_root, error = ?e, "Block processing error, retrying"); - - match &e { - BlockError::ParentUnknown { .. } => { - return Err(LookupRequestError::InternalError( - "ParentUnknown on processing".to_string(), - )); - } - // No penalization for internal / non-attributable errors - BlockError::BeaconChainError(_) - | BlockError::DuplicateImportStatusUnknown(..) => {} - BlockError::ExecutionPayloadError(epe) if !epe.penalize_peer() => {} - BlockError::AvailabilityCheck(e) - if e.category() == AvailabilityCheckErrorCategory::Internal => {} - // All other attributable errors: penalize the block peer - _ => { - if let Some(block_peer) = lookup.block_peer() { - cx.report_peer( - block_peer, - PeerAction::MidToleranceError, - "lookup_block_processing_failure", - ); - } - } - } - - // Block processing failed — reset everything and retry from scratch - lookup.on_block_processing_result(false, cx) - } - } + lookup.on_block_processing_result(result, cx) } /// Handle data processing result (blobs or custody columns). - /// On success: marks data processing done, may complete the lookup. - /// On error: penalizes data peers, retries data download only. fn on_data_processing_result( &mut self, lookup_id: SingleLookupId, @@ -616,74 +560,13 @@ impl BlockLookups { debug!(id = lookup_id, "Unknown single block lookup"); return Err(LookupRequestError::UnknownLookup); }; - - let block_root = lookup.block_root(); - debug!( - ?block_root, + block_root = ?lookup.block_root(), id = lookup_id, ?result, "Received data processing result" ); - - match result { - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_)) - | BlockProcessingResult::Err(BlockError::DuplicateFullyImported(..)) - | BlockProcessingResult::Err(BlockError::GenesisBlock) => { - lookup.on_data_processing_result(true, cx) - } - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents { - .. - }) => { - // Data sent for processing but still missing components — this can happen if - // the block hasn't been fully validated yet. Treat as success for the data - // stream; completion check will handle the rest. - lookup.on_data_processing_result(true, cx) - } - BlockProcessingResult::Ignored => { - warn!("Data processing ignored, cpu might be overloaded"); - Err(LookupRequestError::Failed( - "Data processing ignored".to_owned(), - )) - } - BlockProcessingResult::Err(e) => { - debug!(?block_root, error = ?e, "Data processing error, retrying"); - - // Use the data kind to pick a penalty string the peer-scoring tests - // distinguish on (blobs vs custody columns). - let penalty_msg = "lookup_data_processing_failure"; - - match &e { - // No penalization for internal / non-attributable errors - BlockError::BeaconChainError(_) - | BlockError::DuplicateImportStatusUnknown(..) => {} - BlockError::AvailabilityCheck(e) - if e.category() == AvailabilityCheckErrorCategory::Internal => {} - // InvalidColumn: penalize only the peer(s) that served the bad column - BlockError::AvailabilityCheck(AvailabilityCheckError::InvalidColumn(( - index_opt, - _, - ))) => { - if let Some(custody_pg) = lookup.data_peer_group() - && let Some(index) = index_opt - { - for peer in custody_pg.of_index(*index as usize) { - cx.report_peer(*peer, PeerAction::MidToleranceError, penalty_msg); - } - } - } - // All other attributable errors: penalize the block peer (who also serves blobs) - _ => { - if let Some(block_peer) = lookup.block_peer() { - cx.report_peer(block_peer, PeerAction::MidToleranceError, penalty_msg); - } - } - } - - // Data processing failed — retry data download only - lookup.on_data_processing_result(false, cx) - } - } + lookup.on_data_processing_result(result, cx) } pub fn on_external_processing_result( diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index f20db7b23a..e098bdd8f7 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -2,7 +2,7 @@ use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS}; use crate::sync::block_lookups::{ BlobDownloadResponse, BlockDownloadResponse, CustodyDownloadResponse, PayloadDownloadResponse, }; -use crate::sync::manager::BlockProcessType; +use crate::sync::manager::{BlockProcessType, BlockProcessingResult}; use crate::sync::network_context::{ LookupRequestResult, PeerGroup, ReqId, RpcRequestSendError, RpcResponseError, SendErrorProcessor, SyncNetworkContext, @@ -192,19 +192,6 @@ impl BlockRequest { self.peek_block().map(|b| b.slot()) } - /// Returns the block peer for error attribution. Available in Downloaded/Processing states. - fn peer(&self) -> Option { - match self { - BlockRequest::Downloaded { peer, .. } | BlockRequest::Processing { peer, .. } => { - Some(*peer) - } - BlockRequest::Downloading { state, .. } => state - .peek_downloaded_peer_group() - .and_then(|pg| pg.all().next().copied()), - BlockRequest::Complete { peer, .. } => *peer, - } - } - fn is_awaiting_event(&self) -> bool { match self { BlockRequest::Downloading { state, .. } => state.is_awaiting_event(), @@ -603,21 +590,6 @@ impl SingleBlockLookup { } } - /// Returns the block peer if block has been downloaded. Used for peer penalization. - pub fn block_peer(&self) -> Option { - self.block_request.peer() - } - - /// Returns the peer group that served the downloaded data (blobs or custody columns) if - /// available, used for peer penalization on data-processing failures. - pub fn data_peer_group(&self) -> Option<&PeerGroup> { - match &self.data_request.as_ref()?.state { - DataRequestState::Downloaded { peer_group, .. } - | DataRequestState::Processing { peer_group } => Some(peer_group), - DataRequestState::Downloading(_) | DataRequestState::Complete => None, - } - } - // -- Main state machine driver -- /// Makes progress on all requests of this lookup. Any error is not recoverable and must result @@ -929,7 +901,7 @@ impl SingleBlockLookup { /// Handle block processing result. Advances the lookup state machine. pub fn on_block_processing_result( &mut self, - result_is_ok: bool, + result: BlockProcessingResult, cx: &mut SyncNetworkContext, ) -> Result { let BlockRequest::Processing { block, peer } = &self.block_request else { @@ -937,51 +909,62 @@ impl SingleBlockLookup { "block processing result but not in Processing state".to_owned(), )); }; - if result_is_ok { - let block = block.clone(); - let peer = Some(*peer); - self.block_request = BlockRequest::Complete { block, peer }; - self.continue_requests(cx) - } else { - // Block processing failed — reset everything and retry from scratch - self.reset_requests(); - self.continue_requests(cx) + let block_peer = *peer; + + match result { + BlockProcessingResult::Imported(_) => { + let block = block.clone(); + self.block_request = BlockRequest::Complete { + block, + peer: Some(block_peer), + }; + self.continue_requests(cx) + } + BlockProcessingResult::Error { penalty, reason } => { + if let Some((action, whom)) = penalty { + whom.apply(action, &PeerGroup::from_single(block_peer), reason, cx); + } + // Block processing failed — reset everything and retry from scratch. + self.reset_requests(); + self.continue_requests(cx) + } } } /// Handle data processing result (blobs or custody columns imported). pub fn on_data_processing_result( &mut self, - result_is_ok: bool, + result: BlockProcessingResult, cx: &mut SyncNetworkContext, ) -> Result { - let Some(request) = &mut self.data_request else { + let Some(DataRequest { + state: DataRequestState::Processing { peer_group }, + .. + }) = &self.data_request + else { return Err(LookupRequestError::BadState( "data processing result but not in Processing state".to_owned(), )); }; + let peer_group = peer_group.clone(); - if !matches!( - request, - DataRequest { - state: DataRequestState::Processing { .. }, - .. + match result { + BlockProcessingResult::Imported(_) => { + if let Some(req) = &mut self.data_request { + req.state = DataRequestState::Complete; + } + self.continue_requests(cx) + } + BlockProcessingResult::Error { penalty, reason } => { + if let Some((action, whom)) = penalty { + whom.apply(action, &peer_group, reason, cx); + } + // Data processing failed — bump the shared processing-failure counter and rebuild + // the data request so retries stay bounded against MAX_ATTEMPTS. + self.failed_processing = self.failed_processing.saturating_add(1); + self.data_request = None; + self.continue_requests(cx) } - ) { - return Err(LookupRequestError::BadState( - "data processing result but not in Processing state".to_owned(), - )); - } - if result_is_ok { - request.state = DataRequestState::Complete; - self.continue_requests(cx) - } else { - // Data processing failed — bump the shared processing-failure counter so the - // retry is bounded against `SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS`, then reset. - self.failed_processing = self.failed_processing.saturating_add(1); - // TODO(gloas-sync): Should this persist some state? - self.data_request = None; - self.continue_requests(cx) } } @@ -1222,13 +1205,6 @@ impl SingleLookupRequestState { } } - fn peek_downloaded_peer_group(&self) -> Option<&PeerGroup> { - match &self.state { - DownloadState::Downloaded(data) => Some(&data.peer_group), - _ => None, - } - } - /// Take the download result out, transitioning back to AwaitingDownload. /// Returns None if not in Downloaded state. fn take_download_result(&mut self) -> Option> { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 454807e3a5..bb634e9b5e 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -47,9 +47,7 @@ use crate::sync::block_lookups::{AwaitingParent, BlockComponent, DownloadResult} use crate::sync::custody_backfill_sync::CustodyBackFillSync; use crate::sync::network_context::{PeerGroup, RpcResponseResult}; use beacon_chain::block_verification_types::AsBlock; -use beacon_chain::{ - AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, EngineState, -}; +use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; use futures::StreamExt; use lighthouse_network::SyncInfo; use lighthouse_network::rpc::RPCError; @@ -206,11 +204,52 @@ impl BlockProcessType { } } +/// The classified outcome of submitting a block / blob / column for processing. The producer +/// (`network_beacon_processor`) translates the raw beacon-chain `Result<_, BlockError>` into this +/// shape so the lookup state machine only has to resolve "which peer to penalize" symbolically. #[derive(Debug)] pub enum BlockProcessingResult { - Ok(AvailabilityProcessingStatus), - Err(BlockError), - Ignored, + /// Data was imported (or already present, or otherwise satisfies the lookup). `info` is a + /// short stable identifier suitable for debug logs / metrics. + Imported(&'static str), + /// Processing failed. `penalty` is `Some` when an attributable peer should be downscored. + Error { + penalty: Option<(PeerAction, WhichPeerToPenalize)>, + reason: &'static str, + }, +} + +/// Symbolic identifier for the peer(s) the lookup should resolve and downscore. The consumer +/// passes in the relevant `PeerGroup` (a singleton for block processing, the in-flight data peer +/// group for data processing) and `apply` selects from it. +#[derive(Debug, Clone, Copy)] +pub enum WhichPeerToPenalize { + /// All peers in the passed `PeerGroup` (typically a singleton constructed from the block peer + /// or the blob peer — i.e. the peer responsible for the component as a whole). + BlockPeer, + /// The custody peer(s) that served a specific column index in the passed `PeerGroup`. + CustodyPeerForColumn(u64), +} + +impl WhichPeerToPenalize { + /// Resolve this symbolic identifier against `peer_group` and downscore the matching peer(s). + pub fn apply( + self, + action: PeerAction, + peer_group: &crate::sync::network_context::PeerGroup, + reason: &'static str, + cx: &mut crate::sync::network_context::SyncNetworkContext, + ) { + let peers: Vec = match self { + WhichPeerToPenalize::BlockPeer => peer_group.all().copied().collect(), + WhichPeerToPenalize::CustodyPeerForColumn(idx) => { + peer_group.of_index(idx as usize).copied().collect() + } + }; + for peer in peers { + cx.report_peer(peer, action, reason); + } + } } /// The result of processing multiple blocks (a chain segment). @@ -1470,18 +1509,3 @@ impl SyncManager { &self.network_globals().spec } } - -impl From> for BlockProcessingResult { - fn from(result: Result) -> Self { - match result { - Ok(status) => BlockProcessingResult::Ok(status), - Err(e) => BlockProcessingResult::Err(e), - } - } -} - -impl From for BlockProcessingResult { - fn from(e: BlockError) -> Self { - BlockProcessingResult::Err(e) - } -} diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index c9d351818d..c16d8969e9 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -12,7 +12,7 @@ use beacon_chain::blob_verification::KzgVerifiedBlob; use beacon_chain::block_verification_types::LookupBlock; use beacon_chain::custody_context::NodeCustodyType; use beacon_chain::{ - AvailabilityProcessingStatus, BlockError, EngineState, NotifyExecutionLayer, + AvailabilityProcessingStatus, EngineState, NotifyExecutionLayer, block_verification_types::{AsBlock, AvailableBlockData}, data_availability_checker::Availability, test_utils::{ @@ -2160,7 +2160,13 @@ async fn too_many_processing_failures(depth: usize) { r.build_chain_and_trigger_last_block(depth).await; // Simulate that a peer always returns empty r.simulate( - SimulateConfig::new().with_process_result(|| BlockError::BlockSlotLimitReached.into()), + SimulateConfig::new().with_process_result(|| BlockProcessingResult::Error { + penalty: Some(( + lighthouse_network::PeerAction::MidToleranceError, + crate::sync::manager::WhichPeerToPenalize::BlockPeer, + )), + reason: "lookup_block_processing_failure", + }), ) .await; // We register multiple penalties, the lookup fails and sync does not progress @@ -2208,13 +2214,20 @@ async fn unknown_parent_does_not_add_peers_to_itself() { } #[tokio::test] -/// Assert that if the beacon processor returns Ignored, the lookup is dropped +/// Assert that if the beacon processor returns a processor-overloaded error, the lookup retries +/// without penalizing peers and eventually fails after MAX_ATTEMPTS. async fn test_single_block_lookup_ignored_response() { let mut r = TestRig::default(); r.build_chain_and_trigger_last_block(1).await; - // Send an Ignored response, the request should be dropped - r.simulate(SimulateConfig::new().with_process_result(|| BlockProcessingResult::Ignored)) - .await; + // Send a "processor overloaded" response repeatedly. Under the new model this is just an + // Error with no peer penalty; the lookup retries until MAX_ATTEMPTS, then drops. + r.simulate( + SimulateConfig::new().with_process_result(|| BlockProcessingResult::Error { + penalty: None, + reason: "processor_overloaded", + }), + ) + .await; // The block was not actually imported r.assert_head_slot(0); assert_eq!(r.created_lookups(), 1, "no created lookups"); @@ -2229,8 +2242,7 @@ async fn test_single_block_lookup_duplicate_response() { r.build_chain_and_trigger_last_block(1).await; // Send a DuplicateFullyImported response, the lookup should complete successfully r.simulate( - SimulateConfig::new() - .with_process_result(|| BlockError::DuplicateFullyImported(Hash256::ZERO).into()), + SimulateConfig::new().with_process_result(|| BlockProcessingResult::Imported("duplicate")), ) .await; // The block was not actually imported