Move BlockProcessingResult match out of block lookups (#9327)

- https://github.com/sigp/lighthouse/pull/9155 remove the trait abstraction for processing block / blobs / columns / payloads

As a result we would have to duplicate x3 the big match on `BlockProcessingResult` we currently have in block lookups mod.rs

This PR moves the match of `BlockProcessingResult` to `sync_methods` to reduce the diff of https://github.com/sigp/lighthouse/pull/9155. There are some subtle changes that deserve dedicated attention, and may be drowned in the bigger diff of https://github.com/sigp/lighthouse/pull/9155 otherwise:

| Unstable | This PR / #9115 |
| - | - |
| Some error conditions immediately `Drop` the lookup (no retries). For example for "internal" errors like the BeaconChainError | Retries ALL errors 4 times. I believe assuming some errors are internal is risky as dropping a lookup drops all its children potentially forcing the node to resync a lot of blocks because of an internal timeout


  


Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com>
This commit is contained in:
Lion - dapplion
2026-06-02 04:50:56 +02:00
committed by GitHub
parent b781227f1d
commit bbe7ead813
10 changed files with 214 additions and 167 deletions

View File

@@ -23,21 +23,18 @@
use self::parent_chain::{NodeChain, compute_parent_chains};
pub use self::single_block_lookup::DownloadResult;
use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup};
use super::manager::{BlockProcessType, BlockProcessingResult, SLOT_IMPORT_TOLERANCE};
use super::manager::{BlockProcessType, SLOT_IMPORT_TOLERANCE};
use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext};
use crate::metrics;
use crate::network_beacon_processor::BlockProcessingResult;
use crate::sync::SyncMessage;
use crate::sync::block_lookups::common::ResponseType;
use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor;
use beacon_chain::BeaconChainTypes;
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::data_availability_checker::{
AvailabilityCheckError, AvailabilityCheckErrorCategory,
};
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};
pub use common::RequestState;
use fnv::FnvHashMap;
use lighthouse_network::PeerId;
use lighthouse_network::service::api_types::SingleLookupReqId;
use lighthouse_network::{PeerAction, PeerId};
use lru_cache::LRUTimeCache;
pub use single_block_lookup::{BlockRequestState, CustodyRequestState};
use std::collections::hash_map::Entry;
@@ -106,7 +103,6 @@ pub type SingleLookupId = u32;
enum Action {
Retry,
ParentUnknown { parent_root: Hash256 },
Drop(/* reason: */ String),
Continue,
}
@@ -584,125 +580,51 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
);
let action = match result {
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_))
| BlockProcessingResult::Err(BlockError::DuplicateFullyImported(..))
| BlockProcessingResult::Err(BlockError::GenesisBlock) => {
// Successfully imported
request_state.on_processing_success()?;
Action::Continue
}
BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents {
..
}) => {
// `on_processing_success` is called here to ensure the request state is updated prior to checking
// if both components have been processed.
BlockProcessingResult::Imported(fully_imported, _info) => {
// `on_processing_success` is called here to ensure the request state is updated
// prior to checking if all components have been processed (relevant for
// MissingComponents).
request_state.on_processing_success()?;
if lookup.all_components_processed() {
if fully_imported {
Action::Continue
} else if lookup.all_components_processed() {
// We don't request for other block components until being sure that the block has
// data. If we request blobs / columns to a peer we are sure those must exist.
// Therefore if all components are processed and we still receive `MissingComponents`
// it indicates an internal bug.
return Err(LookupRequestError::MissingComponentsAfterAllProcessed);
return Err(LookupRequestError::Failed(
"missing components after all processed".to_owned(),
));
} else {
// Continue request, potentially request blobs
Action::Retry
}
}
BlockProcessingResult::Err(BlockError::DuplicateImportStatusUnknown(..)) => {
// This is unreachable because RPC blocks do not undergo gossip verification, and
// this error can *only* come from gossip verification.
error!(?block_root, "Single block lookup hit unreachable condition");
Action::Drop("DuplicateImportStatusUnknown".to_owned())
BlockProcessingResult::ParentUnknown { parent_root } => {
// `BlockError::ParentUnknown` is only returned when processing blocks. Reverts
// the status of this request to `AwaitingProcessing` holding the downloaded
// data. A future call to `continue_requests` will re-submit it once there are
// no pending parent requests.
request_state.revert_to_awaiting_processing()?;
Action::ParentUnknown { parent_root }
}
BlockProcessingResult::Ignored => {
// Beacon processor signalled to ignore the block processing result.
// This implies that the cpu is overloaded. Drop the request.
warn!(
BlockProcessingResult::Error { penalty, reason } => {
// Retry on every processing error: `on_processing_failure` increments the
// per-component failure counter, so `SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS` bounds the
// retry loop and eventually drops the lookup if the failure persists. Whether the
// peer should be downscored is the producer's call (encoded in `penalty`).
debug!(
?block_root,
component = ?R::response_type(),
"Lookup component processing ignored, cpu might be overloaded"
reason,
?penalty,
"Lookup component processing failed; retrying"
);
Action::Drop("Block processing ignored".to_owned())
}
BlockProcessingResult::Err(e) => {
match e {
BlockError::BeaconChainError(e) => {
// Internal error
error!(%block_root, error = ?e, "Beacon chain error processing lookup component");
Action::Drop(format!("{e:?}"))
}
BlockError::ParentUnknown { parent_root, .. } => {
// Reverts the status of this request to `AwaitingProcessing` holding the
// downloaded data. A future call to `continue_requests` will re-submit it
// once there are no pending parent requests.
// Note: `BlockError::ParentUnknown` is only returned when processing
// blocks, not blobs.
request_state.revert_to_awaiting_processing()?;
Action::ParentUnknown { parent_root }
}
ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => {
// These errors indicate that the execution layer is offline
// and failed to validate the execution payload. Do not downscore peer.
debug!(
?block_root,
error = ?e,
"Single block lookup failed. Execution layer is offline / unsynced / misconfigured"
);
Action::Drop(format!("{e:?}"))
}
BlockError::AvailabilityCheck(e)
if e.category() == AvailabilityCheckErrorCategory::Internal =>
{
// There errors indicate internal problems and should not downscore the peer
warn!(?block_root, error = ?e, "Internal availability check failure");
// Here we choose *not* to call `on_processing_failure` because this could result in a bad
// lookup state transition. This error invalidates both blob and block requests, and we don't know the
// state of both requests. Blobs may have already successfullly processed for example.
// We opt to drop the lookup instead.
Action::Drop(format!("{e:?}"))
}
other => {
debug!(
?block_root,
component = ?R::response_type(),
error = ?other,
"Invalid lookup component"
);
let peer_group = request_state.on_processing_failure()?;
let peers_to_penalize: Vec<_> = match other {
// Note: currenlty only InvalidColumn errors have index granularity,
// but future errors may follow the same pattern. Generalize this
// pattern with https://github.com/sigp/lighthouse/pull/6321
BlockError::AvailabilityCheck(
AvailabilityCheckError::InvalidColumn((index_opt, _)),
) => {
match index_opt {
Some(index) => peer_group.of_index(index as usize).collect(),
// If no index supplied this is an un-attributable fault. In practice
// this should never happen.
None => vec![],
}
}
_ => peer_group.all().collect(),
};
for peer in peers_to_penalize {
cx.report_peer(
*peer,
PeerAction::MidToleranceError,
match R::response_type() {
ResponseType::Block => "lookup_block_processing_failure",
ResponseType::CustodyColumn => {
"lookup_custody_column_processing_failure"
}
},
);
}
Action::Retry
}
let peer_group = request_state.on_processing_failure()?;
if let Some((action_kind, whom, msg)) = penalty {
whom.apply(action_kind, &peer_group, msg, cx);
}
Action::Retry
}
};
@@ -737,10 +659,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
)))
}
}
Action::Drop(reason) => {
// Drop with noop
Err(LookupRequestError::Failed(reason))
}
Action::Continue => {
// Drop this completed lookup only
Ok(LookupResult::Completed)