Simplify side effects

This commit is contained in:
dapplion
2026-06-09 19:57:59 +02:00
parent 476c1ec240
commit 3f550adf63
3 changed files with 135 additions and 101 deletions

View File

@@ -962,7 +962,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// The classified outcome of submitting a block / blob / column for processing, ready for the
/// lookup state machine to act on without re-inspecting `BlockError`.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum BlockProcessingResult {
/// `fully_imported` is true if the lookup is complete; false if `MissingComponents` (the
/// lookup must keep fetching). `info` is a stable label for logs / metrics.

View File

@@ -29,7 +29,9 @@ use crate::metrics;
use crate::network_beacon_processor::BlockProcessingResult;
use crate::sync::SyncMessage;
use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor;
use crate::sync::block_lookups::single_block_lookup::{AwaitingParent, ImportedAction};
use crate::sync::block_lookups::single_block_lookup::{
AwaitingParent, ImportedParent, LookupResult,
};
use beacon_chain::BeaconChainTypes;
use fnv::FnvHashMap;
use lighthouse_network::PeerId;
@@ -414,7 +416,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.metrics.created_lookups += 1;
let result = lookup.continue_requests(cx);
self.on_lookup_result(id, result, "new_current_lookup");
self.on_lookup_result(id, result, "new_current_lookup", cx);
self.update_metrics();
self.single_block_lookups.contains_key(&id)
}
@@ -433,7 +435,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
return;
};
let result = lookup.on_block_download_response(id.req_id, response, cx);
self.on_lookup_result(id.lookup_id, result, "block_download_response");
self.on_lookup_result(id.lookup_id, result, "block_download_response", cx);
}
pub fn on_custody_download_response(
@@ -447,7 +449,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
return;
};
let result = lookup.on_custody_download_response(id.req_id, response, cx);
self.on_lookup_result(id.lookup_id, result, "custody_download_response");
self.on_lookup_result(id.lookup_id, result, "custody_download_response", cx);
}
pub fn on_payload_download_response(
@@ -464,7 +466,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
return;
};
let result = lookup.on_payload_download_response(id.req_id, response, cx);
self.on_lookup_result(id.lookup_id, result, "payload_download_response");
self.on_lookup_result(id.lookup_id, result, "payload_download_response", cx);
}
/* Error responses */
@@ -502,73 +504,39 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
);
let lookup_result = match process_type {
BlockProcessType::SingleBlock { .. } => lookup.on_block_processing_result(&result, cx),
BlockProcessType::SingleCustodyColumn(_) => {
lookup.on_data_processing_result(&result, cx)
}
BlockProcessType::SinglePayloadEnvelope(_) => {
lookup.on_payload_processing_result(&result, cx)
}
};
match &result {
BlockProcessingResult::Imported(..) => {
// Some component got imported potentially continue
if lookup.is_complete() {
if self.single_block_lookups.remove(&id).is_some() {
debug!(?block_root, id, "Dropping completed lookup");
metrics::inc_counter(&metrics::SYNC_LOOKUP_COMPLETED);
self.metrics.completed_lookups += 1;
// Block imported, continue the requests of pending child blocks
self.continue_child_lookups(block_root, ImportedAction::LookupComplete, cx);
self.update_metrics();
} else {
debug!(id, "Attempting to drop non-existent lookup");
}
} else if matches!(process_type, BlockProcessType::SingleBlock { .. })
BlockProcessType::SingleBlock { .. } => {
// Update the result of the lookup first, here we may start the download of Gloas
// payload, which may error.
let lookup_result = lookup.on_block_processing_result(result.clone(), cx);
let lookup_is_awaiting_event = lookup.is_awaiting_event();
// Then, as a side-effect continue the EMPTY children of this lookup. Only if the
// block just imported which ensures we just do it once per lookup.
if let BlockProcessingResult::Imported(..) = result
&& let Some(bid_block_hash) = lookup.peek_downloaded_bid_block_hash()
{
// For post-Gloas blocks, if the block just became imported attempt to make
// progress on its EMPTY children. Then, if there are no FULL children, remove
// the lookup.
self.continue_child_lookups(
block_root,
ImportedAction::GloasBlockComplete(bid_block_hash),
ImportedParent::OnlyGloasBlock(bid_block_hash),
cx,
);
if !self.has_any_awaiting_children(block_root) {
self.single_block_lookups.remove(&id);
metrics::inc_counter(&metrics::SYNC_LOOKUP_COMPLETED);
self.metrics.completed_lookups += 1;
debug!(
?block_root,
id, "Dropping completed lookup after block import"
);
}
self.update_metrics();
// Then if this lookup happens to have only empty children we can remove it now. We
// must make sure that no other lookup is awaiting this one, and that no requests
// are on-going.
if !lookup_is_awaiting_event && !self.has_any_awaiting_children(block_root) {
Ok(LookupResult::Completed)
} else {
lookup_result
}
}
BlockProcessingResult::ParentUnknown {
parent_root,
parent_block_hash,
} => {
// Parent unknown error, create parent lookup
let peers = lookup.all_peers();
if !self.search_parent_of_child(
*parent_root,
&PeerType::new(*parent_block_hash),
block_root,
&peers,
cx,
) {
self.drop_lookup_and_children(id, "Failed");
self.update_metrics();
BlockProcessType::SingleCustodyColumn(_) => {
lookup.on_data_processing_result(result, cx)
}
BlockProcessType::SinglePayloadEnvelope(_) => {
lookup.on_payload_processing_result(result, cx)
}
BlockProcessingResult::Error { .. } => {}
}
self.on_lookup_result(id, lookup_result, "processing_result");
};
self.on_lookup_result(id, lookup_result, "processing_result", cx);
}
pub fn has_any_awaiting_children(&self, block_root: Hash256) -> bool {
@@ -581,15 +549,15 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn continue_child_lookups(
&mut self,
parent_root: Hash256,
import_action: ImportedAction,
imported_parent: ImportedParent,
cx: &mut SyncNetworkContext<T>,
) {
let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self
for (id, lookup) in self.single_block_lookups.iter_mut() {
if lookup.maybe_resolve_awaiting_parent(parent_root, import_action) {
if lookup.maybe_resolve_awaiting_parent(parent_root, imported_parent) {
debug!(
?import_action,
?imported_parent,
id,
block_root = ?lookup.block_root(),
"Continuing child lookup"
@@ -600,7 +568,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
for (id, result) in lookup_results {
self.on_lookup_result(id, result, "continue_child_lookups");
self.on_lookup_result(id, result, "continue_child_lookups", cx);
}
}
@@ -637,11 +605,51 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
fn on_lookup_result(
&mut self,
id: SingleLookupId,
result: Result<(), LookupRequestError>,
result: Result<LookupResult, LookupRequestError>,
source: &str,
cx: &mut SyncNetworkContext<T>,
) {
match result {
Ok(_) => {}
Ok(LookupResult::Pending) => {}
Ok(LookupResult::ParentUnknown {
parent_root,
parent_block_hash,
block_root,
peers,
}) => {
if self.search_parent_of_child(
parent_root,
&PeerType::new(parent_block_hash),
block_root,
&peers,
cx,
) {
//
} else {
self.drop_lookup_and_children(id, "Failed");
self.update_metrics();
}
}
Ok(LookupResult::Completed) => {
if let Some(lookup) = self.single_block_lookups.remove(&id) {
debug!(
block = ?lookup.block_root(),
id,
"Dropping completed lookup"
);
metrics::inc_counter(&metrics::SYNC_LOOKUP_COMPLETED);
self.metrics.completed_lookups += 1;
// Block imported, continue the requests of pending child blocks
self.continue_child_lookups(
lookup.block_root(),
ImportedParent::LookupComplete,
cx,
);
self.update_metrics();
} else {
debug!(id, "Attempting to drop non-existent lookup");
}
}
Err(error) => {
debug!(id, source, ?error, "Dropping lookup on request error");
self.drop_lookup_and_children(id, error.into());
@@ -836,7 +844,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// pruned with `drop_lookups_without_peers` because it has peers. This is rare corner
// case, but it can result in stuck lookups.
let result = lookup.continue_requests(cx);
self.on_lookup_result(lookup_id, result, "add_peers");
self.on_lookup_result(lookup_id, result, "add_peers", cx);
Ok(())
} else {
Ok(())

View File

@@ -24,6 +24,22 @@ use types::{
SignedExecutionPayloadEnvelope, Slot,
};
// Dedicated enum for LookupResult to force its usage
#[must_use = "LookupResult must be handled with on_lookup_result"]
pub enum LookupResult {
/// Lookup completed successfully
Completed,
/// Lookup is expecting some future event from the network
Pending,
/// Block's parent is not known to fork-choice, a parent lookup is needed
ParentUnknown {
parent_root: Hash256,
parent_block_hash: Option<ExecutionBlockHash>,
block_root: Hash256,
peers: Vec<PeerId>,
},
}
#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
pub enum LookupRequestError {
/// Too many failed attempts
@@ -164,9 +180,9 @@ impl From<&AwaitingParent> for PeerType {
}
#[derive(Debug, Clone, Copy)]
pub enum ImportedAction {
pub enum ImportedParent {
LookupComplete,
GloasBlockComplete(ExecutionBlockHash),
OnlyGloasBlock(ExecutionBlockHash),
}
#[derive(Educe)]
@@ -283,18 +299,18 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
/// processing.
pub fn maybe_resolve_awaiting_parent(
&mut self,
block_root: Hash256,
action: ImportedAction,
parent_root: Hash256,
imported_parent: ImportedParent,
) -> bool {
let Some(awaiting_parent) = self.awaiting_parent else {
return false;
};
if awaiting_parent.parent_root() != block_root {
if awaiting_parent.parent_root() != parent_root {
return false;
}
let should_resolve = match action {
ImportedAction::LookupComplete => true,
ImportedAction::GloasBlockComplete(bid_block_hash) => {
let should_resolve = match imported_parent {
ImportedParent::LookupComplete => true,
ImportedParent::OnlyGloasBlock(bid_block_hash) => {
if let Some(parent_block_hash) = awaiting_parent.parent_block_hash {
// This lookup is the execution child of `parent_execution_hash`. If the
// parent hash the same `bid_block_hash` this is FULL child and we must wait
@@ -354,18 +370,12 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
}
pub fn is_complete(&self) -> bool {
self.block_request.is_complete()
&& self.data_request.is_complete()
&& self.payload_request.is_complete()
}
/// Makes progress on all requests of this lookup. Any error is not recoverable and must result
/// in dropping the lookup. May mark the lookup as completed.
pub fn continue_requests(
&mut self,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
) -> Result<LookupResult, LookupRequestError> {
let _guard = self.span.clone().entered();
// === Block request ===
@@ -467,7 +477,17 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
}
Ok(())
// If all components of this lookup are already processed, there will be no future events
// that can make progress so it must be dropped. Consider the lookup completed.
// This case can happen if we receive the components from gossip during a retry.
if self.block_request.is_complete()
&& self.data_request.is_complete()
&& self.payload_request.is_complete()
{
return Ok(LookupResult::Completed);
}
Ok(LookupResult::Pending)
}
/// Returns the peers that should serve this block's data columns and payload envelope. For FULL
@@ -492,9 +512,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
/// Handle block processing result. Advances the lookup state machine.
pub fn on_block_processing_result(
&mut self,
result: &BlockProcessingResult,
result: BlockProcessingResult,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
) -> Result<LookupResult, LookupRequestError> {
match result {
BlockProcessingResult::Imported(_fully_imported, _info) => {
self.block_request.state.on_processing_success()?;
@@ -509,14 +529,20 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
// the parent lookup completes.
self.block_request.state.revert_to_awaiting_processing()?;
self.set_awaiting_parent(AwaitingParent {
parent_root: *parent_root,
parent_block_hash: *parent_block_hash,
parent_root,
parent_block_hash,
});
return Ok(LookupResult::ParentUnknown {
parent_root,
parent_block_hash,
block_root: self.block_root,
peers: self.all_peers(),
});
}
BlockProcessingResult::Error { penalty, .. } => {
let peers = self.block_request.state.on_processing_failure()?;
if let Some((action, whom, msg)) = penalty {
whom.apply(*action, &peers, msg, cx);
whom.apply(action, &peers, msg, cx);
}
}
}
@@ -526,9 +552,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
/// Handle data processing result
pub fn on_data_processing_result(
&mut self,
result: &BlockProcessingResult,
result: BlockProcessingResult,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
) -> Result<LookupResult, LookupRequestError> {
let DataRequest::Request { state, .. } = &mut self.data_request else {
return Err(LookupRequestError::BadState("no data_request".to_owned()));
};
@@ -545,7 +571,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
BlockProcessingResult::Error { penalty, .. } => {
let peers = state.on_processing_failure()?;
if let Some((action, whom, msg)) = penalty {
whom.apply(*action, &peers, msg, cx);
whom.apply(action, &peers, msg, cx);
}
}
}
@@ -555,9 +581,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
/// Handle payload envelope processing result (Gloas only).
pub fn on_payload_processing_result(
&mut self,
result: &BlockProcessingResult,
result: BlockProcessingResult,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
) -> Result<LookupResult, LookupRequestError> {
let PayloadRequest::Request { state, .. } = &mut self.payload_request else {
return Err(LookupRequestError::BadState(
"no payload_request".to_owned(),
@@ -576,7 +602,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
BlockProcessingResult::Error { penalty, .. } => {
let peers = state.on_processing_failure()?;
if let Some((action, whom, msg)) = penalty {
whom.apply(*action, &peers, msg, cx);
whom.apply(action, &peers, msg, cx);
}
}
}
@@ -589,7 +615,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
req_id: ReqId,
result: BlockDownloadResponse<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
) -> Result<LookupResult, LookupRequestError> {
self.block_request
.state
.on_download_response(req_id, result)?;
@@ -602,7 +628,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
req_id: ReqId,
result: CustodyDownloadResponse<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
) -> Result<LookupResult, LookupRequestError> {
let DataRequest::Request { state, .. } = &mut self.data_request else {
return Err(LookupRequestError::BadState("no data_request".to_owned()));
};
@@ -617,7 +643,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
req_id: ReqId,
result: PayloadDownloadResponse<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
) -> Result<LookupResult, LookupRequestError> {
let PayloadRequest::Request { state, .. } = &mut self.payload_request else {
return Err(LookupRequestError::BadState(
"no payload_request".to_owned(),