Move processing-result classification to the producer side

Reshape BlockProcessingResult from the AC-verdict-passthrough
Ok/Err/Ignored enum to Imported(info) | Error { penalty, reason }.
The producer (network_beacon_processor) translates beacon-chain
Result<AvailabilityProcessingStatus, BlockError> into this shape via a
new classify_processing_result(), so the consumer only has to resolve
the symbolic WhichPeerToPenalize against an in-scope PeerGroup.

- on_block_processing_result and on_data_processing_result collapse
  to a single state-match each, then dispatch to
  WhichPeerToPenalize::apply(action, &peer_group, reason, cx).
- mod.rs sheds the per-BlockError policy block (-129 lines).
- Drops the now-unused data_peer_group, block_peer, BlockRequest::peer,
  peek_downloaded_peer_group accessors; their job is the consumer's
  responsibility now.
- Ignored becomes Error { penalty: None, reason: "processor_overloaded" }
  with a producer-side warn!; the lookup retries up to MAX_ATTEMPTS
  instead of dropping immediately (test updated to match).
- DuplicateFullyImported and GenesisBlock map to Imported; the test
  helper constructs the new variant directly.
This commit is contained in:
dapplion
2026-05-19 14:14:42 -06:00
parent 5c58f7e4b7
commit a98e6531bf
5 changed files with 206 additions and 226 deletions

View File

@@ -29,13 +29,10 @@ use crate::metrics;
use crate::sync::SyncMessage;
use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor;
use crate::sync::block_lookups::single_block_lookup::PeerType;
use beacon_chain::data_availability_checker::{
AvailabilityCheckError, AvailabilityCheckErrorCategory,
};
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};
use beacon_chain::BeaconChainTypes;
use fnv::FnvHashMap;
use lighthouse_network::PeerId;
use lighthouse_network::service::api_types::SingleLookupReqId;
use lighthouse_network::{PeerAction, PeerId};
use lru_cache::LRUTimeCache;
use std::collections::hash_map::Entry;
use std::sync::Arc;
@@ -543,69 +540,16 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
debug!(id = lookup_id, "Unknown single block lookup");
return Err(LookupRequestError::UnknownLookup);
};
let block_root = lookup.block_root();
debug!(
?block_root,
block_root = ?lookup.block_root(),
id = lookup_id,
?result,
"Received block processing result"
);
match result {
// Block processed successfully (imported or missing components — both are ok since
// we send the block alone first, data follows independently)
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_))
| BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents {
..
})
| BlockProcessingResult::Err(BlockError::DuplicateFullyImported(..))
| BlockProcessingResult::Err(BlockError::GenesisBlock) => {
lookup.on_block_processing_result(true, cx)
}
BlockProcessingResult::Ignored => {
warn!("Block processing ignored, cpu might be overloaded");
Err(LookupRequestError::Failed(
"Block processing ignored".to_owned(),
))
}
BlockProcessingResult::Err(e) => {
debug!(?block_root, error = ?e, "Block processing error, retrying");
match &e {
BlockError::ParentUnknown { .. } => {
return Err(LookupRequestError::InternalError(
"ParentUnknown on processing".to_string(),
));
}
// No penalization for internal / non-attributable errors
BlockError::BeaconChainError(_)
| BlockError::DuplicateImportStatusUnknown(..) => {}
BlockError::ExecutionPayloadError(epe) if !epe.penalize_peer() => {}
BlockError::AvailabilityCheck(e)
if e.category() == AvailabilityCheckErrorCategory::Internal => {}
// All other attributable errors: penalize the block peer
_ => {
if let Some(block_peer) = lookup.block_peer() {
cx.report_peer(
block_peer,
PeerAction::MidToleranceError,
"lookup_block_processing_failure",
);
}
}
}
// Block processing failed — reset everything and retry from scratch
lookup.on_block_processing_result(false, cx)
}
}
lookup.on_block_processing_result(result, cx)
}
/// Handle data processing result (blobs or custody columns).
/// On success: marks data processing done, may complete the lookup.
/// On error: penalizes data peers, retries data download only.
fn on_data_processing_result(
&mut self,
lookup_id: SingleLookupId,
@@ -616,74 +560,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
debug!(id = lookup_id, "Unknown single block lookup");
return Err(LookupRequestError::UnknownLookup);
};
let block_root = lookup.block_root();
debug!(
?block_root,
block_root = ?lookup.block_root(),
id = lookup_id,
?result,
"Received data processing result"
);
match result {
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_))
| BlockProcessingResult::Err(BlockError::DuplicateFullyImported(..))
| BlockProcessingResult::Err(BlockError::GenesisBlock) => {
lookup.on_data_processing_result(true, cx)
}
BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents {
..
}) => {
// Data sent for processing but still missing components — this can happen if
// the block hasn't been fully validated yet. Treat as success for the data
// stream; completion check will handle the rest.
lookup.on_data_processing_result(true, cx)
}
BlockProcessingResult::Ignored => {
warn!("Data processing ignored, cpu might be overloaded");
Err(LookupRequestError::Failed(
"Data processing ignored".to_owned(),
))
}
BlockProcessingResult::Err(e) => {
debug!(?block_root, error = ?e, "Data processing error, retrying");
// Use the data kind to pick a penalty string the peer-scoring tests
// distinguish on (blobs vs custody columns).
let penalty_msg = "lookup_data_processing_failure";
match &e {
// No penalization for internal / non-attributable errors
BlockError::BeaconChainError(_)
| BlockError::DuplicateImportStatusUnknown(..) => {}
BlockError::AvailabilityCheck(e)
if e.category() == AvailabilityCheckErrorCategory::Internal => {}
// InvalidColumn: penalize only the peer(s) that served the bad column
BlockError::AvailabilityCheck(AvailabilityCheckError::InvalidColumn((
index_opt,
_,
))) => {
if let Some(custody_pg) = lookup.data_peer_group()
&& let Some(index) = index_opt
{
for peer in custody_pg.of_index(*index as usize) {
cx.report_peer(*peer, PeerAction::MidToleranceError, penalty_msg);
}
}
}
// All other attributable errors: penalize the block peer (who also serves blobs)
_ => {
if let Some(block_peer) = lookup.block_peer() {
cx.report_peer(block_peer, PeerAction::MidToleranceError, penalty_msg);
}
}
}
// Data processing failed — retry data download only
lookup.on_data_processing_result(false, cx)
}
}
lookup.on_data_processing_result(result, cx)
}
pub fn on_external_processing_result(

View File

@@ -2,7 +2,7 @@ use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS};
use crate::sync::block_lookups::{
BlobDownloadResponse, BlockDownloadResponse, CustodyDownloadResponse, PayloadDownloadResponse,
};
use crate::sync::manager::BlockProcessType;
use crate::sync::manager::{BlockProcessType, BlockProcessingResult};
use crate::sync::network_context::{
LookupRequestResult, PeerGroup, ReqId, RpcRequestSendError, RpcResponseError,
SendErrorProcessor, SyncNetworkContext,
@@ -192,19 +192,6 @@ impl<E: EthSpec> BlockRequest<E> {
self.peek_block().map(|b| b.slot())
}
/// Returns the block peer for error attribution. Available in Downloaded/Processing states.
fn peer(&self) -> Option<PeerId> {
match self {
BlockRequest::Downloaded { peer, .. } | BlockRequest::Processing { peer, .. } => {
Some(*peer)
}
BlockRequest::Downloading { state, .. } => state
.peek_downloaded_peer_group()
.and_then(|pg| pg.all().next().copied()),
BlockRequest::Complete { peer, .. } => *peer,
}
}
fn is_awaiting_event(&self) -> bool {
match self {
BlockRequest::Downloading { state, .. } => state.is_awaiting_event(),
@@ -603,21 +590,6 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
}
/// Returns the block peer if block has been downloaded. Used for peer penalization.
pub fn block_peer(&self) -> Option<PeerId> {
self.block_request.peer()
}
/// Returns the peer group that served the downloaded data (blobs or custody columns) if
/// available, used for peer penalization on data-processing failures.
pub fn data_peer_group(&self) -> Option<&PeerGroup> {
match &self.data_request.as_ref()?.state {
DataRequestState::Downloaded { peer_group, .. }
| DataRequestState::Processing { peer_group } => Some(peer_group),
DataRequestState::Downloading(_) | DataRequestState::Complete => None,
}
}
// -- Main state machine driver --
/// Makes progress on all requests of this lookup. Any error is not recoverable and must result
@@ -929,7 +901,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
/// Handle block processing result. Advances the lookup state machine.
pub fn on_block_processing_result(
&mut self,
result_is_ok: bool,
result: BlockProcessingResult,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupResult, LookupRequestError> {
let BlockRequest::Processing { block, peer } = &self.block_request else {
@@ -937,51 +909,62 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
"block processing result but not in Processing state".to_owned(),
));
};
if result_is_ok {
let block = block.clone();
let peer = Some(*peer);
self.block_request = BlockRequest::Complete { block, peer };
self.continue_requests(cx)
} else {
// Block processing failed — reset everything and retry from scratch
self.reset_requests();
self.continue_requests(cx)
let block_peer = *peer;
match result {
BlockProcessingResult::Imported(_) => {
let block = block.clone();
self.block_request = BlockRequest::Complete {
block,
peer: Some(block_peer),
};
self.continue_requests(cx)
}
BlockProcessingResult::Error { penalty, reason } => {
if let Some((action, whom)) = penalty {
whom.apply(action, &PeerGroup::from_single(block_peer), reason, cx);
}
// Block processing failed — reset everything and retry from scratch.
self.reset_requests();
self.continue_requests(cx)
}
}
}
/// Handle data processing result (blobs or custody columns imported).
pub fn on_data_processing_result(
&mut self,
result_is_ok: bool,
result: BlockProcessingResult,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupResult, LookupRequestError> {
let Some(request) = &mut self.data_request else {
let Some(DataRequest {
state: DataRequestState::Processing { peer_group },
..
}) = &self.data_request
else {
return Err(LookupRequestError::BadState(
"data processing result but not in Processing state".to_owned(),
));
};
let peer_group = peer_group.clone();
if !matches!(
request,
DataRequest {
state: DataRequestState::Processing { .. },
..
match result {
BlockProcessingResult::Imported(_) => {
if let Some(req) = &mut self.data_request {
req.state = DataRequestState::Complete;
}
self.continue_requests(cx)
}
BlockProcessingResult::Error { penalty, reason } => {
if let Some((action, whom)) = penalty {
whom.apply(action, &peer_group, reason, cx);
}
// Data processing failed — bump the shared processing-failure counter and rebuild
// the data request so retries stay bounded against MAX_ATTEMPTS.
self.failed_processing = self.failed_processing.saturating_add(1);
self.data_request = None;
self.continue_requests(cx)
}
) {
return Err(LookupRequestError::BadState(
"data processing result but not in Processing state".to_owned(),
));
}
if result_is_ok {
request.state = DataRequestState::Complete;
self.continue_requests(cx)
} else {
// Data processing failed — bump the shared processing-failure counter so the
// retry is bounded against `SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS`, then reset.
self.failed_processing = self.failed_processing.saturating_add(1);
// TODO(gloas-sync): Should this persist some state?
self.data_request = None;
self.continue_requests(cx)
}
}
@@ -1222,13 +1205,6 @@ impl<T: Clone> SingleLookupRequestState<T> {
}
}
fn peek_downloaded_peer_group(&self) -> Option<&PeerGroup> {
match &self.state {
DownloadState::Downloaded(data) => Some(&data.peer_group),
_ => None,
}
}
/// Take the download result out, transitioning back to AwaitingDownload.
/// Returns None if not in Downloaded state.
fn take_download_result(&mut self) -> Option<DownloadResult<T>> {

View File

@@ -47,9 +47,7 @@ use crate::sync::block_lookups::{AwaitingParent, BlockComponent, DownloadResult}
use crate::sync::custody_backfill_sync::CustodyBackFillSync;
use crate::sync::network_context::{PeerGroup, RpcResponseResult};
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::{
AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, EngineState,
};
use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState};
use futures::StreamExt;
use lighthouse_network::SyncInfo;
use lighthouse_network::rpc::RPCError;
@@ -206,11 +204,52 @@ impl BlockProcessType {
}
}
/// The classified outcome of submitting a block / blob / column for processing. The producer
/// (`network_beacon_processor`) translates the raw beacon-chain `Result<_, BlockError>` into this
/// shape so the lookup state machine only has to resolve "which peer to penalize" symbolically.
#[derive(Debug)]
pub enum BlockProcessingResult {
Ok(AvailabilityProcessingStatus),
Err(BlockError),
Ignored,
/// Data was imported (or already present, or otherwise satisfies the lookup). `info` is a
/// short stable identifier suitable for debug logs / metrics.
Imported(&'static str),
/// Processing failed. `penalty` is `Some` when an attributable peer should be downscored.
Error {
penalty: Option<(PeerAction, WhichPeerToPenalize)>,
reason: &'static str,
},
}
/// Symbolic identifier for the peer(s) the lookup should resolve and downscore. The consumer
/// passes in the relevant `PeerGroup` (a singleton for block processing, the in-flight data peer
/// group for data processing) and `apply` selects from it.
#[derive(Debug, Clone, Copy)]
pub enum WhichPeerToPenalize {
/// All peers in the passed `PeerGroup` (typically a singleton constructed from the block peer
/// or the blob peer — i.e. the peer responsible for the component as a whole).
BlockPeer,
/// The custody peer(s) that served a specific column index in the passed `PeerGroup`.
CustodyPeerForColumn(u64),
}
impl WhichPeerToPenalize {
/// Resolve this symbolic identifier against `peer_group` and downscore the matching peer(s).
pub fn apply<T: BeaconChainTypes>(
self,
action: PeerAction,
peer_group: &crate::sync::network_context::PeerGroup,
reason: &'static str,
cx: &mut crate::sync::network_context::SyncNetworkContext<T>,
) {
let peers: Vec<PeerId> = match self {
WhichPeerToPenalize::BlockPeer => peer_group.all().copied().collect(),
WhichPeerToPenalize::CustodyPeerForColumn(idx) => {
peer_group.of_index(idx as usize).copied().collect()
}
};
for peer in peers {
cx.report_peer(peer, action, reason);
}
}
}
/// The result of processing multiple blocks (a chain segment).
@@ -1470,18 +1509,3 @@ impl<T: BeaconChainTypes> SyncManager<T> {
&self.network_globals().spec
}
}
impl From<Result<AvailabilityProcessingStatus, BlockError>> for BlockProcessingResult {
fn from(result: Result<AvailabilityProcessingStatus, BlockError>) -> Self {
match result {
Ok(status) => BlockProcessingResult::Ok(status),
Err(e) => BlockProcessingResult::Err(e),
}
}
}
impl From<BlockError> for BlockProcessingResult {
fn from(e: BlockError) -> Self {
BlockProcessingResult::Err(e)
}
}

View File

@@ -12,7 +12,7 @@ use beacon_chain::blob_verification::KzgVerifiedBlob;
use beacon_chain::block_verification_types::LookupBlock;
use beacon_chain::custody_context::NodeCustodyType;
use beacon_chain::{
AvailabilityProcessingStatus, BlockError, EngineState, NotifyExecutionLayer,
AvailabilityProcessingStatus, EngineState, NotifyExecutionLayer,
block_verification_types::{AsBlock, AvailableBlockData},
data_availability_checker::Availability,
test_utils::{
@@ -2160,7 +2160,13 @@ async fn too_many_processing_failures(depth: usize) {
r.build_chain_and_trigger_last_block(depth).await;
// Simulate that a peer always returns empty
r.simulate(
SimulateConfig::new().with_process_result(|| BlockError::BlockSlotLimitReached.into()),
SimulateConfig::new().with_process_result(|| BlockProcessingResult::Error {
penalty: Some((
lighthouse_network::PeerAction::MidToleranceError,
crate::sync::manager::WhichPeerToPenalize::BlockPeer,
)),
reason: "lookup_block_processing_failure",
}),
)
.await;
// We register multiple penalties, the lookup fails and sync does not progress
@@ -2208,13 +2214,20 @@ async fn unknown_parent_does_not_add_peers_to_itself() {
}
#[tokio::test]
/// Assert that if the beacon processor returns Ignored, the lookup is dropped
/// Assert that if the beacon processor returns a processor-overloaded error, the lookup retries
/// without penalizing peers and eventually fails after MAX_ATTEMPTS.
async fn test_single_block_lookup_ignored_response() {
let mut r = TestRig::default();
r.build_chain_and_trigger_last_block(1).await;
// Send an Ignored response, the request should be dropped
r.simulate(SimulateConfig::new().with_process_result(|| BlockProcessingResult::Ignored))
.await;
// Send a "processor overloaded" response repeatedly. Under the new model this is just an
// Error with no peer penalty; the lookup retries until MAX_ATTEMPTS, then drops.
r.simulate(
SimulateConfig::new().with_process_result(|| BlockProcessingResult::Error {
penalty: None,
reason: "processor_overloaded",
}),
)
.await;
// The block was not actually imported
r.assert_head_slot(0);
assert_eq!(r.created_lookups(), 1, "no created lookups");
@@ -2229,8 +2242,7 @@ async fn test_single_block_lookup_duplicate_response() {
r.build_chain_and_trigger_last_block(1).await;
// Send a DuplicateFullyImported response, the lookup should complete successfully
r.simulate(
SimulateConfig::new()
.with_process_result(|| BlockError::DuplicateFullyImported(Hash256::ZERO).into()),
SimulateConfig::new().with_process_result(|| BlockProcessingResult::Imported("duplicate")),
)
.await;
// The block was not actually imported