Merge branch 'unstable' of https://github.com/sigp/lighthouse into gloas-remove-slashed-validators-from-propser-duties

This commit is contained in:
Eitan Seri-Levi
2026-06-02 10:34:12 +03:00
10 changed files with 214 additions and 167 deletions

View File

@@ -92,7 +92,7 @@ use std::fs;
use std::io::Write;
use std::sync::Arc;
use store::{Error as DBError, KeyValueStore};
use strum::AsRefStr;
use strum::{AsRefStr, IntoStaticStr};
use task_executor::JoinHandle;
use tracing::{Instrument, Span, debug, debug_span, error, info_span, instrument};
use types::{
@@ -114,7 +114,7 @@ const WRITE_BLOCK_PROCESSING_SSZ: bool = cfg!(feature = "write_ssz_files");
///
/// - The block is malformed/invalid (indicated by all results other than `BeaconChainError`.
/// - We encountered an error whilst trying to verify the block (a `BeaconChainError`).
#[derive(Debug, AsRefStr)]
#[derive(Debug, AsRefStr, IntoStaticStr)]
pub enum BlockError {
/// The parent block was unknown.
///
@@ -336,7 +336,7 @@ impl From<AvailabilityCheckError> for BlockError {
/// Returned when block validation failed due to some issue verifying
/// the execution payload.
#[derive(Debug)]
#[derive(Debug, IntoStaticStr)]
pub enum ExecutionPayloadError {
/// There's no eth1 connection (mandatory after merge)
///

View File

@@ -1,7 +1,8 @@
use kzg::{Error as KzgError, KzgCommitment};
use strum::IntoStaticStr;
use types::{BeaconStateError, ColumnIndex, Hash256};
#[derive(Debug)]
#[derive(Debug, IntoStaticStr)]
pub enum Error {
InvalidBlobs(KzgError),
MissingBid(Hash256),

View File

@@ -36,13 +36,13 @@ use {
slot_clock::ManualSlotClock, store::MemoryStore, tokio::sync::mpsc::UnboundedSender,
};
pub use sync_methods::ChainSegmentProcessId;
pub use sync_methods::{BlockProcessingResult, ChainSegmentProcessId};
pub type Error<T> = TrySendError<BeaconWorkEvent<T>>;
mod gossip_methods;
mod rpc_methods;
mod sync_methods;
pub(crate) mod sync_methods;
mod tests;
pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1;

View File

@@ -3,12 +3,14 @@ use crate::network_beacon_processor::{FUTURE_SLOT_TOLERANCE, NetworkBeaconProces
use crate::sync::BatchProcessResult;
use crate::sync::manager::CustodyBatchProcessResult;
use crate::sync::{
ChainId,
ChainId, PeerGroup, SyncNetworkContext,
manager::{BlockProcessType, SyncMessage},
};
use beacon_chain::block_verification_types::LookupBlock;
use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock};
use beacon_chain::data_availability_checker::AvailabilityCheckError;
use beacon_chain::data_availability_checker::{
AvailabilityCheckError, AvailabilityCheckErrorCategory,
};
use beacon_chain::historical_data_columns::HistoricalDataColumnError;
use beacon_chain::{
AvailabilityProcessingStatus, BeaconChainTypes, BlockError, ChainSegmentResult,
@@ -20,6 +22,7 @@ use beacon_processor::{
};
use beacon_processor::{Work, WorkEvent};
use lighthouse_network::PeerAction;
use lighthouse_network::PeerId;
use lighthouse_network::service::api_types::CustodyBackfillBatchId;
use logging::crit;
use std::sync::Arc;
@@ -87,10 +90,17 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
// A closure which will ignore the block.
let ignore_fn = move || {
warn!(
?process_type,
"Block processing task dropped, cpu might be overloaded"
);
// Sync handles these results
self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type,
result: crate::sync::manager::BlockProcessingResult::Ignored,
result: BlockProcessingResult::Error {
penalty: None,
reason: "ignored_processor_overloaded".to_string(),
},
});
};
(process_fn, Box::new(ignore_fn))
@@ -949,3 +959,130 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}
}
/// The classified outcome of submitting a block / blob / column for processing, ready for the
/// lookup state machine to act on without re-inspecting `BlockError`.
#[derive(Debug)]
pub enum BlockProcessingResult {
/// `fully_imported` is true if the lookup is complete; false if `MissingComponents` (the
/// lookup must keep fetching). `info` is a stable label for logs / metrics.
Imported(bool, &'static str),
ParentUnknown {
parent_root: Hash256,
},
/// Processing failed. `penalty` is `Some` when an attributable peer should be downscored;
/// the third tuple element is the `report_peer` telemetry msg. `reason` is for logs only.
Error {
penalty: Option<(PeerAction, WhichPeerToPenalize, &'static str)>,
reason: String,
},
}
impl From<Result<AvailabilityProcessingStatus, BlockError>> for BlockProcessingResult {
fn from(result: Result<AvailabilityProcessingStatus, BlockError>) -> Self {
fn block_peer_penalty<E: Into<&'static str>>(
err: E,
) -> Option<(PeerAction, WhichPeerToPenalize, &'static str)> {
Some((
PeerAction::MidToleranceError,
WhichPeerToPenalize::BlockPeer,
err.into(),
))
}
match result {
Ok(AvailabilityProcessingStatus::Imported(_)) => Self::Imported(true, "imported"),
Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => {
Self::Imported(false, "missing_components")
}
Err(e) => {
let penalty = match &e {
BlockError::DuplicateFullyImported(_) => {
return Self::Imported(true, "duplicate");
}
BlockError::GenesisBlock => return Self::Imported(true, "genesis"),
BlockError::ParentUnknown { parent_root, .. } => {
return Self::ParentUnknown {
parent_root: *parent_root,
};
}
BlockError::BeaconChainError(_) | BlockError::InternalError(_) => None,
BlockError::DuplicateImportStatusUnknown(_) => None,
BlockError::AvailabilityCheck(inner) => match inner {
AvailabilityCheckError::InvalidColumn((Some(idx), _)) => Some((
PeerAction::MidToleranceError,
WhichPeerToPenalize::CustodyPeerForColumn(*idx),
(&e).into(),
)),
inner => match inner.category() {
AvailabilityCheckErrorCategory::Internal => None,
AvailabilityCheckErrorCategory::Malicious => block_peer_penalty(inner),
},
},
BlockError::ExecutionPayloadError(epe) => {
if epe.penalize_peer() {
block_peer_penalty(epe)
} else {
None
}
}
// Remaining invalid blocks: penalize the block peer. Listed explicitly so a
// new `BlockError` variant forces a compile error here.
BlockError::FutureSlot { .. }
| BlockError::StateRootMismatch { .. }
| BlockError::WouldRevertFinalizedSlot { .. }
| BlockError::NotFinalizedDescendant { .. }
| BlockError::BlockSlotLimitReached
| BlockError::IncorrectBlockProposer { .. }
| BlockError::UnknownValidator(_)
| BlockError::InvalidSignature(_)
| BlockError::BlockIsNotLaterThanParent { .. }
| BlockError::NonLinearParentRoots
| BlockError::NonLinearSlots
| BlockError::PerBlockProcessingError(_)
| BlockError::WeakSubjectivityConflict
| BlockError::InconsistentFork(_)
| BlockError::ParentExecutionPayloadInvalid { .. }
| BlockError::KnownInvalidExecutionPayload(_)
| BlockError::Slashable
| BlockError::EnvelopeBlockRootUnknown(_)
| BlockError::OptimisticSyncNotSupported { .. }
| BlockError::InvalidBlobCount { .. }
| BlockError::BidParentRootMismatch { .. } => block_peer_penalty(&e),
};
Self::Error {
penalty,
reason: format!("{e:?}"),
}
}
}
}
}
/// Selector for which peer(s) in a `PeerGroup` to downscore.
#[derive(Debug, Clone, Copy)]
pub enum WhichPeerToPenalize {
/// All peers in the group (block peer, or all data peers).
BlockPeer,
/// Only the peer(s) that served the given column index.
CustodyPeerForColumn(u64),
}
impl WhichPeerToPenalize {
pub fn apply<T: BeaconChainTypes>(
self,
action: PeerAction,
peer_group: &PeerGroup,
msg: &'static str,
cx: &mut SyncNetworkContext<T>,
) {
let peers: Vec<PeerId> = match self {
WhichPeerToPenalize::BlockPeer => peer_group.all().copied().collect(),
WhichPeerToPenalize::CustodyPeerForColumn(idx) => {
peer_group.of_index(idx as usize).copied().collect()
}
};
for peer in peers {
cx.report_peer(peer, action, msg);
}
}
}

View File

@@ -23,21 +23,18 @@
use self::parent_chain::{NodeChain, compute_parent_chains};
pub use self::single_block_lookup::DownloadResult;
use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup};
use super::manager::{BlockProcessType, BlockProcessingResult, SLOT_IMPORT_TOLERANCE};
use super::manager::{BlockProcessType, SLOT_IMPORT_TOLERANCE};
use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext};
use crate::metrics;
use crate::network_beacon_processor::BlockProcessingResult;
use crate::sync::SyncMessage;
use crate::sync::block_lookups::common::ResponseType;
use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor;
use beacon_chain::BeaconChainTypes;
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::data_availability_checker::{
AvailabilityCheckError, AvailabilityCheckErrorCategory,
};
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};
pub use common::RequestState;
use fnv::FnvHashMap;
use lighthouse_network::PeerId;
use lighthouse_network::service::api_types::SingleLookupReqId;
use lighthouse_network::{PeerAction, PeerId};
use lru_cache::LRUTimeCache;
pub use single_block_lookup::{BlockRequestState, CustodyRequestState};
use std::collections::hash_map::Entry;
@@ -106,7 +103,6 @@ pub type SingleLookupId = u32;
enum Action {
Retry,
ParentUnknown { parent_root: Hash256 },
Drop(/* reason: */ String),
Continue,
}
@@ -584,125 +580,51 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
);
let action = match result {
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_))
| BlockProcessingResult::Err(BlockError::DuplicateFullyImported(..))
| BlockProcessingResult::Err(BlockError::GenesisBlock) => {
// Successfully imported
request_state.on_processing_success()?;
Action::Continue
}
BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents {
..
}) => {
// `on_processing_success` is called here to ensure the request state is updated prior to checking
// if both components have been processed.
BlockProcessingResult::Imported(fully_imported, _info) => {
// `on_processing_success` is called here to ensure the request state is updated
// prior to checking if all components have been processed (relevant for
// MissingComponents).
request_state.on_processing_success()?;
if lookup.all_components_processed() {
if fully_imported {
Action::Continue
} else if lookup.all_components_processed() {
// We don't request for other block components until being sure that the block has
// data. If we request blobs / columns to a peer we are sure those must exist.
// Therefore if all components are processed and we still receive `MissingComponents`
// it indicates an internal bug.
return Err(LookupRequestError::MissingComponentsAfterAllProcessed);
return Err(LookupRequestError::Failed(
"missing components after all processed".to_owned(),
));
} else {
// Continue request, potentially request blobs
Action::Retry
}
}
BlockProcessingResult::Err(BlockError::DuplicateImportStatusUnknown(..)) => {
// This is unreachable because RPC blocks do not undergo gossip verification, and
// this error can *only* come from gossip verification.
error!(?block_root, "Single block lookup hit unreachable condition");
Action::Drop("DuplicateImportStatusUnknown".to_owned())
BlockProcessingResult::ParentUnknown { parent_root } => {
// `BlockError::ParentUnknown` is only returned when processing blocks. Reverts
// the status of this request to `AwaitingProcessing` holding the downloaded
// data. A future call to `continue_requests` will re-submit it once there are
// no pending parent requests.
request_state.revert_to_awaiting_processing()?;
Action::ParentUnknown { parent_root }
}
BlockProcessingResult::Ignored => {
// Beacon processor signalled to ignore the block processing result.
// This implies that the cpu is overloaded. Drop the request.
warn!(
BlockProcessingResult::Error { penalty, reason } => {
// Retry on every processing error: `on_processing_failure` increments the
// per-component failure counter, so `SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS` bounds the
// retry loop and eventually drops the lookup if the failure persists. Whether the
// peer should be downscored is the producer's call (encoded in `penalty`).
debug!(
?block_root,
component = ?R::response_type(),
"Lookup component processing ignored, cpu might be overloaded"
reason,
?penalty,
"Lookup component processing failed; retrying"
);
Action::Drop("Block processing ignored".to_owned())
}
BlockProcessingResult::Err(e) => {
match e {
BlockError::BeaconChainError(e) => {
// Internal error
error!(%block_root, error = ?e, "Beacon chain error processing lookup component");
Action::Drop(format!("{e:?}"))
}
BlockError::ParentUnknown { parent_root, .. } => {
// Reverts the status of this request to `AwaitingProcessing` holding the
// downloaded data. A future call to `continue_requests` will re-submit it
// once there are no pending parent requests.
// Note: `BlockError::ParentUnknown` is only returned when processing
// blocks, not blobs.
request_state.revert_to_awaiting_processing()?;
Action::ParentUnknown { parent_root }
}
ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => {
// These errors indicate that the execution layer is offline
// and failed to validate the execution payload. Do not downscore peer.
debug!(
?block_root,
error = ?e,
"Single block lookup failed. Execution layer is offline / unsynced / misconfigured"
);
Action::Drop(format!("{e:?}"))
}
BlockError::AvailabilityCheck(e)
if e.category() == AvailabilityCheckErrorCategory::Internal =>
{
// There errors indicate internal problems and should not downscore the peer
warn!(?block_root, error = ?e, "Internal availability check failure");
// Here we choose *not* to call `on_processing_failure` because this could result in a bad
// lookup state transition. This error invalidates both blob and block requests, and we don't know the
// state of both requests. Blobs may have already successfullly processed for example.
// We opt to drop the lookup instead.
Action::Drop(format!("{e:?}"))
}
other => {
debug!(
?block_root,
component = ?R::response_type(),
error = ?other,
"Invalid lookup component"
);
let peer_group = request_state.on_processing_failure()?;
let peers_to_penalize: Vec<_> = match other {
// Note: currenlty only InvalidColumn errors have index granularity,
// but future errors may follow the same pattern. Generalize this
// pattern with https://github.com/sigp/lighthouse/pull/6321
BlockError::AvailabilityCheck(
AvailabilityCheckError::InvalidColumn((index_opt, _)),
) => {
match index_opt {
Some(index) => peer_group.of_index(index as usize).collect(),
// If no index supplied this is an un-attributable fault. In practice
// this should never happen.
None => vec![],
}
}
_ => peer_group.all().collect(),
};
for peer in peers_to_penalize {
cx.report_peer(
*peer,
PeerAction::MidToleranceError,
match R::response_type() {
ResponseType::Block => "lookup_block_processing_failure",
ResponseType::CustodyColumn => {
"lookup_custody_column_processing_failure"
}
},
);
}
Action::Retry
}
let peer_group = request_state.on_processing_failure()?;
if let Some((action_kind, whom, msg)) = penalty {
whom.apply(action_kind, &peer_group, msg, cx);
}
Action::Retry
}
};
@@ -737,10 +659,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
)))
}
}
Action::Drop(reason) => {
// Drop with noop
Err(LookupRequestError::Failed(reason))
}
Action::Continue => {
// Drop this completed lookup only
Ok(LookupResult::Completed)

View File

@@ -41,9 +41,6 @@ pub enum LookupRequestError {
BadState(String),
/// Lookup failed for some other reason and should be dropped
Failed(/* reason: */ String),
/// Received MissingComponents when all components have been processed. This should never
/// happen, and indicates some internal bug
MissingComponentsAfterAllProcessed,
/// Attempted to retrieve a not known lookup id
UnknownLookup,
/// Received a download result for a different request id than the in-flight request.

View File

@@ -62,7 +62,7 @@ enum RangeBlockDataRequest<E: EthSpec> {
}
#[derive(Debug)]
pub(crate) enum CouplingError {
pub enum CouplingError {
InternalError(String),
/// The peer we requested the columns from was faulty/malicious
DataColumnPeerFailure {

View File

@@ -40,7 +40,9 @@ use super::network_context::{
};
use super::peer_sync_info::{PeerSyncType, remote_sync_type};
use super::range_sync::{EPOCHS_PER_BATCH, RangeSync, RangeSyncType};
use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor};
use crate::network_beacon_processor::{
BlockProcessingResult, ChainSegmentProcessId, NetworkBeaconProcessor,
};
use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
use crate::sync::block_lookups::{
@@ -49,9 +51,7 @@ use crate::sync::block_lookups::{
use crate::sync::custody_backfill_sync::CustodyBackFillSync;
use crate::sync::network_context::{PeerGroup, RpcResponseResult};
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::{
AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, EngineState,
};
use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState};
use futures::StreamExt;
use lighthouse_network::SyncInfo;
use lighthouse_network::rpc::RPCError;
@@ -211,13 +211,6 @@ impl BlockProcessType {
}
}
#[derive(Debug)]
pub enum BlockProcessingResult {
Ok(AvailabilityProcessingStatus),
Err(BlockError),
Ignored,
}
/// The result of processing multiple blocks (a chain segment).
#[derive(Debug)]
pub enum BatchProcessResult {
@@ -1467,18 +1460,3 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
}
impl From<Result<AvailabilityProcessingStatus, BlockError>> for BlockProcessingResult {
fn from(result: Result<AvailabilityProcessingStatus, BlockError>) -> Self {
match result {
Ok(status) => BlockProcessingResult::Ok(status),
Err(e) => BlockProcessingResult::Err(e),
}
}
}
impl From<BlockError> for BlockProcessingResult {
fn from(e: BlockError) -> Self {
BlockProcessingResult::Err(e)
}
}

View File

@@ -15,4 +15,5 @@ mod range_sync;
mod tests;
pub use manager::{BatchProcessResult, SyncMessage};
pub use network_context::{PeerGroup, SyncNetworkContext};
pub use range_sync::ChainId;

View File

@@ -1,17 +1,19 @@
use super::*;
use crate::NetworkMessage;
use crate::network_beacon_processor::BlockProcessingResult;
use crate::network_beacon_processor::sync_methods::WhichPeerToPenalize;
use crate::network_beacon_processor::{
ChainSegmentProcessId, InvalidBlockStorage, NetworkBeaconProcessor,
};
use crate::sync::block_lookups::{BlockLookupSummary, PARENT_DEPTH_TOLERANCE};
use crate::sync::{
SyncMessage,
manager::{BatchProcessResult, BlockProcessType, BlockProcessingResult, SyncManager},
manager::{BatchProcessResult, BlockProcessType, SyncManager},
};
use beacon_chain::block_verification_types::LookupBlock;
use beacon_chain::custody_context::NodeCustodyType;
use beacon_chain::{
AvailabilityProcessingStatus, BlockError, EngineState, NotifyExecutionLayer,
AvailabilityProcessingStatus, EngineState, NotifyExecutionLayer,
block_verification_types::{AsBlock, AvailableBlockData},
test_utils::{
AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, NumBlobs,
@@ -1947,7 +1949,14 @@ async fn too_many_processing_failures(depth: usize) {
r.build_chain_and_trigger_last_block(depth).await;
// Simulate that a peer always returns empty
r.simulate(
SimulateConfig::new().with_process_result(|| BlockError::BlockSlotLimitReached.into()),
SimulateConfig::new().with_process_result(|| BlockProcessingResult::Error {
penalty: Some((
PeerAction::MidToleranceError,
WhichPeerToPenalize::BlockPeer,
"lookup_block_processing_failure",
)),
reason: "lookup_block_processing_failure".to_string(),
}),
)
.await;
// We register multiple penalties, the lookup fails and sync does not progress
@@ -1991,15 +2000,21 @@ async fn unknown_parent_does_not_add_peers_to_itself() {
}
#[tokio::test]
/// Assert that if the beacon processor returns Ignored, the lookup is dropped
/// Assert that a non-attributable processing error (e.g. processor overloaded) is retried up to
/// `SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS`, no peer is penalized, and the lookup is then dropped.
async fn test_single_block_lookup_ignored_response() {
let mut r = TestRig::default();
r.build_chain_and_trigger_last_block(1).await;
// Send an Ignored response, the request should be dropped
r.simulate(SimulateConfig::new().with_process_result(|| BlockProcessingResult::Ignored))
.await;
r.simulate(
SimulateConfig::new().with_process_result(|| BlockProcessingResult::Error {
penalty: None,
reason: "processor_overloaded".to_string(),
}),
)
.await;
// The block was not actually imported
r.assert_head_slot(0);
r.assert_no_penalties();
assert_eq!(r.created_lookups(), 1, "no created lookups");
assert_eq!(r.dropped_lookups(), 1, "no dropped lookups");
assert_eq!(r.completed_lookups(), 0, "some completed lookups");
@@ -2013,7 +2028,7 @@ async fn test_single_block_lookup_duplicate_response() {
// Send a DuplicateFullyImported response, the lookup should complete successfully
r.simulate(
SimulateConfig::new()
.with_process_result(|| BlockError::DuplicateFullyImported(Hash256::ZERO).into()),
.with_process_result(|| BlockProcessingResult::Imported(true, "duplicate")),
)
.await;
// The block was not actually imported
@@ -2392,7 +2407,7 @@ async fn crypto_on_fail_with_invalid_block_signature() {
r.assert_no_penalties();
} else {
r.assert_failed_lookup_sync();
r.assert_penalties_of_type("lookup_block_processing_failure");
r.assert_penalties_of_type("InvalidSignature");
}
}
@@ -2410,7 +2425,7 @@ async fn crypto_on_fail_with_bad_column_proposer_signature() {
r.assert_no_penalties();
} else {
r.assert_failed_lookup_sync();
r.assert_penalties_of_type("lookup_custody_column_processing_failure");
r.assert_penalties_of_type("InvalidSignature");
}
}
@@ -2428,6 +2443,6 @@ async fn crypto_on_fail_with_bad_column_kzg_proof() {
r.assert_no_penalties();
} else {
r.assert_failed_lookup_sync();
r.assert_penalties_of_type("lookup_custody_column_processing_failure");
r.assert_penalties_of_type("AvailabilityCheck");
}
}