Use Action in single_block_component_processed (#5564)

* Use Action in single_block_component_processed

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into single_block_component_processed-action

* fix compile after merge

* add continue action for when we are awaiting other parts of missing components
This commit is contained in:
Lion - dapplion
2024-04-16 21:59:53 +09:00
committed by GitHub
parent f5e0404fb8
commit e5b8d1237d

View File

@@ -45,6 +45,13 @@ pub type DownloadedBlock<E> = (Hash256, RpcBlock<E>);
const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60;
pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3;
enum Action {
Retry,
ParentUnknown { parent_root: Hash256, slot: Slot },
Drop,
Continue,
}
pub struct BlockLookups<T: BeaconChainTypes> {
/// Parent chain lookups being downloaded.
parent_lookups: SmallVec<[ParentLookup<T>; 3]>,
@@ -773,7 +780,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
return;
};
let root = lookup.block_root();
let block_root = lookup.block_root();
let request_state = R::request_state_mut(&mut lookup);
let peer_id = match request_state.get_state().processing_peer() {
@@ -787,28 +794,49 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.log,
"Block component processed for lookup";
"response_type" => ?R::response_type(),
"block_root" => ?root,
"block_root" => ?block_root,
"result" => ?result,
"id" => target_id,
);
match result {
BlockProcessingResult::Ok(status) => match status {
AvailabilityProcessingStatus::Imported(root) => {
trace!(self.log, "Single block processing succeeded"; "block" => %root);
let action = match result {
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_))
| BlockProcessingResult::Err(BlockError::BlockIsAlreadyKnown { .. }) => {
// Successfully imported
trace!(self.log, "Single block processing succeeded"; "block" => %block_root);
Action::Drop
}
BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents(
_,
_block_root,
)) => {
// `on_processing_success` is called here to ensure the request state is updated prior to checking
// if both components have been processed.
if R::request_state_mut(&mut lookup)
.get_state_mut()
.on_processing_success()
.is_err()
{
warn!(
self.log,
"Single block processing state incorrect";
"action" => "dropping single block request"
);
Action::Drop
// If this was the result of a block request, we can't determined if the block peer did anything
// wrong. If we already had both a block and blobs response processed, we should penalize the
// blobs peer because they did not provide all blobs on the initial request.
} else if lookup.both_components_processed() {
lookup.penalize_blob_peer(cx);
// Try it again if possible.
lookup.blob_request_state.state.on_processing_failure();
Action::Retry
} else {
Action::Continue
}
AvailabilityProcessingStatus::MissingComponents(_, _block_root) => {
match self.handle_missing_components::<R>(cx, &mut lookup) {
Ok(()) => {
self.single_block_lookups.insert(target_id, lookup);
}
Err(e) => {
// Drop with an additional error.
warn!(self.log, "Single block lookup failed"; "block" => %root, "error" => ?e);
}
}
}
},
}
BlockProcessingResult::Ignored => {
// Beacon processor signalled to ignore the block processing result.
// This implies that the cpu is overloaded. Drop the request.
@@ -817,118 +845,88 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
"Single block processing was ignored, cpu might be overloaded";
"action" => "dropping single block request"
);
Action::Drop
}
BlockProcessingResult::Err(e) => {
match self.handle_single_lookup_block_error(cx, lookup, peer_id, e) {
Ok(Some(lookup)) => {
self.single_block_lookups.insert(target_id, lookup);
let root = lookup.block_root();
trace!(self.log, "Single block processing failed"; "block" => %root, "error" => %e);
match e {
BlockError::BeaconChainError(e) => {
// Internal error
error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e);
Action::Drop
}
Ok(None) => {
// Drop without an additional error.
BlockError::ParentUnknown(block) => {
let slot = block.slot();
let parent_root = block.parent_root();
lookup.add_child_components(block.into());
Action::ParentUnknown { parent_root, slot }
}
Err(e) => {
// Drop with an additional error.
warn!(self.log, "Single block lookup failed"; "block" => %root, "error" => ?e);
ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => {
// These errors indicate that the execution layer is offline
// and failed to validate the execution payload. Do not downscore peer.
debug!(
self.log,
"Single block lookup failed. Execution layer is offline / unsynced / misconfigured";
"root" => %root,
"error" => ?e
);
Action::Drop
}
BlockError::AvailabilityCheck(e) => match e.category() {
AvailabilityCheckErrorCategory::Internal => {
warn!(self.log, "Internal availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e);
lookup.block_request_state.state.on_download_failure();
lookup.blob_request_state.state.on_download_failure();
Action::Retry
}
AvailabilityCheckErrorCategory::Malicious => {
warn!(self.log, "Availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e);
lookup.handle_availability_check_failure(cx);
Action::Retry
}
},
other => {
warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id);
if let Ok(block_peer) = lookup.block_request_state.state.processing_peer() {
cx.report_peer(
block_peer,
PeerAction::MidToleranceError,
"single_block_failure",
);
lookup.block_request_state.state.on_processing_failure();
}
Action::Retry
}
}
}
};
}
/// Handles a `MissingComponents` block processing error. Handles peer scoring and retries.
///
/// If this was the result of a block request, we can't determined if the block peer did anything
/// wrong. If we already had both a block and blobs response processed, we should penalize the
/// blobs peer because they did not provide all blobs on the initial request.
fn handle_missing_components<R: RequestState<Current, T>>(
&self,
cx: &SyncNetworkContext<T>,
lookup: &mut SingleBlockLookup<Current, T>,
) -> Result<(), LookupRequestError> {
let request_state = R::request_state_mut(lookup);
request_state
.get_state_mut()
.on_processing_success()
.map_err(LookupRequestError::BadState)?;
if lookup.both_components_processed() {
lookup.penalize_blob_peer(cx);
// Try it again if possible.
lookup.blob_request_state.state.on_processing_failure();
lookup.request_block_and_blobs(cx)?;
}
Ok(())
}
/// Handles peer scoring and retries related to a `BlockError` in response to a single block
/// or blob lookup processing result.
fn handle_single_lookup_block_error(
&mut self,
cx: &mut SyncNetworkContext<T>,
mut lookup: SingleBlockLookup<Current, T>,
peer_id: PeerId,
e: BlockError<T::EthSpec>,
) -> Result<Option<SingleBlockLookup<Current, T>>, LookupRequestError> {
let root = lookup.block_root();
trace!(self.log, "Single block processing failed"; "block" => %root, "error" => %e);
match e {
BlockError::BlockIsAlreadyKnown(_) => {
// No error here
return Ok(None);
}
BlockError::BeaconChainError(e) => {
// Internal error
error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e);
return Ok(None);
}
BlockError::ParentUnknown(block) => {
let slot = block.slot();
let parent_root = block.parent_root();
lookup.add_child_components(block.into());
lookup.request_block_and_blobs(cx)?;
self.search_parent(slot, root, parent_root, peer_id, cx);
}
ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => {
// These errors indicate that the execution layer is offline
// and failed to validate the execution payload. Do not downscore peer.
debug!(
self.log,
"Single block lookup failed. Execution layer is offline / unsynced / misconfigured";
"root" => %root,
"error" => ?e
);
return Ok(None);
}
BlockError::AvailabilityCheck(e) => match e.category() {
AvailabilityCheckErrorCategory::Internal => {
warn!(self.log, "Internal availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e);
lookup.block_request_state.state.on_download_failure();
lookup.blob_request_state.state.on_download_failure();
lookup.request_block_and_blobs(cx)?
}
AvailabilityCheckErrorCategory::Malicious => {
warn!(self.log, "Availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e);
lookup.handle_availability_check_failure(cx);
lookup.request_block_and_blobs(cx)?
}
},
other => {
warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id);
if let Ok(block_peer) = lookup.block_request_state.state.processing_peer() {
cx.report_peer(
block_peer,
PeerAction::MidToleranceError,
"single_block_failure",
);
// Try it again if possible.
lookup.block_request_state.state.on_processing_failure();
lookup.request_block_and_blobs(cx)?
match action {
Action::Retry => {
if let Err(e) = lookup.request_block_and_blobs(cx) {
warn!(self.log, "Single block lookup failed"; "block_root" => %block_root, "error" => ?e);
// Failed with too many retries, drop with noop
self.update_metrics();
} else {
self.single_block_lookups.insert(target_id, lookup);
}
}
Action::ParentUnknown { parent_root, slot } => {
// TODO: Consider including all peers from the lookup, claiming to know this block, not
// just the one that sent this specific block
self.search_parent(slot, block_root, parent_root, peer_id, cx);
self.single_block_lookups.insert(target_id, lookup);
}
Action::Drop => {
// drop with noop
self.update_metrics();
}
Action::Continue => {
self.single_block_lookups.insert(target_id, lookup);
}
}
Ok(Some(lookup))
}
pub fn parent_block_processed(
@@ -1369,4 +1367,16 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn drop_parent_chain_requests(&mut self) -> usize {
self.parent_lookups.drain(..).len()
}
pub fn update_metrics(&self) {
metrics::set_gauge(
&metrics::SYNC_SINGLE_BLOCK_LOOKUPS,
self.single_block_lookups.len() as i64,
);
metrics::set_gauge(
&metrics::SYNC_PARENT_BLOCK_LOOKUPS,
self.parent_lookups.len() as i64,
);
}
}