Enforce sync lookup receives a single result (#5777)

* Enforce sync lookup receives a single result
This commit is contained in:
Lion - dapplion
2024-05-14 13:12:48 +03:00
committed by GitHub
parent f37ffe4b8d
commit ce66ab374e
4 changed files with 60 additions and 36 deletions

View File

@@ -6,7 +6,7 @@ use super::network_context::{RpcProcessingResult, SyncNetworkContext};
use crate::metrics; use crate::metrics;
use crate::sync::block_lookups::common::{ResponseType, PARENT_DEPTH_TOLERANCE}; use crate::sync::block_lookups::common::{ResponseType, PARENT_DEPTH_TOLERANCE};
use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor;
use crate::sync::manager::Id; use crate::sync::manager::{Id, SingleLookupReqId};
use crate::sync::network_context::LookupFailure; use crate::sync::network_context::LookupFailure;
use beacon_chain::block_verification_types::AsBlock; use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::data_availability_checker::AvailabilityCheckErrorCategory; use beacon_chain::data_availability_checker::AvailabilityCheckErrorCategory;
@@ -308,19 +308,19 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/// Process a block or blob response received from a single lookup request. /// Process a block or blob response received from a single lookup request.
pub fn on_download_response<R: RequestState<T>>( pub fn on_download_response<R: RequestState<T>>(
&mut self, &mut self,
id: SingleLookupId, id: SingleLookupReqId,
peer_id: PeerId, peer_id: PeerId,
response: RpcProcessingResult<R::VerifiedResponseType>, response: RpcProcessingResult<R::VerifiedResponseType>,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
) { ) {
let result = self.on_download_response_inner::<R>(id, peer_id, response, cx); let result = self.on_download_response_inner::<R>(id, peer_id, response, cx);
self.on_lookup_result(id, result, "download_response", cx); self.on_lookup_result(id.lookup_id, result, "download_response", cx);
} }
/// Process a block or blob response received from a single lookup request. /// Process a block or blob response received from a single lookup request.
pub fn on_download_response_inner<R: RequestState<T>>( pub fn on_download_response_inner<R: RequestState<T>>(
&mut self, &mut self,
id: SingleLookupId, id: SingleLookupReqId,
peer_id: PeerId, peer_id: PeerId,
response: RpcProcessingResult<R::VerifiedResponseType>, response: RpcProcessingResult<R::VerifiedResponseType>,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
@@ -333,10 +333,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
} }
let response_type = R::response_type(); let response_type = R::response_type();
let Some(lookup) = self.single_block_lookups.get_mut(&id) else { let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else {
// We don't have the ability to cancel in-flight RPC requests. So this can happen // We don't have the ability to cancel in-flight RPC requests. So this can happen
// if we started this RPC request, and later saw the block/blobs via gossip. // if we started this RPC request, and later saw the block/blobs via gossip.
debug!(self.log, "Block returned for single block lookup not present"; "id" => id); debug!(self.log, "Block returned for single block lookup not present"; "id" => ?id);
return Err(LookupRequestError::UnknownLookup); return Err(LookupRequestError::UnknownLookup);
}; };
@@ -348,7 +348,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
debug!(self.log, debug!(self.log,
"Received lookup download success"; "Received lookup download success";
"block_root" => ?block_root, "block_root" => ?block_root,
"id" => id, "id" => ?id,
"peer_id" => %peer_id, "peer_id" => %peer_id,
"response_type" => ?response_type, "response_type" => ?response_type,
); );
@@ -356,25 +356,28 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// Register the download peer here. Once we have received some data over the wire we // Register the download peer here. Once we have received some data over the wire we
// attribute it to this peer for scoring latter regardless of how the request was // attribute it to this peer for scoring latter regardless of how the request was
// done. // done.
request_state.on_download_success(DownloadResult { request_state.on_download_success(
value: response, id.req_id,
block_root, DownloadResult {
seen_timestamp, value: response,
peer_id, block_root,
})?; seen_timestamp,
peer_id,
},
)?;
// continue_request will send for processing as the request state is AwaitingProcessing // continue_request will send for processing as the request state is AwaitingProcessing
} }
Err(e) => { Err(e) => {
debug!(self.log, debug!(self.log,
"Received lookup download failure"; "Received lookup download failure";
"block_root" => ?block_root, "block_root" => ?block_root,
"id" => id, "id" => ?id,
"peer_id" => %peer_id, "peer_id" => %peer_id,
"response_type" => ?response_type, "response_type" => ?response_type,
"error" => %e, "error" => %e,
); );
request_state.on_download_failure()?; request_state.on_download_failure(id.req_id)?;
// continue_request will retry a download as the request state is AwaitingDownload // continue_request will retry a download as the request state is AwaitingDownload
} }
} }

View File

@@ -2,7 +2,7 @@ use super::common::ResponseType;
use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS}; use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS};
use crate::sync::block_lookups::common::RequestState; use crate::sync::block_lookups::common::RequestState;
use crate::sync::block_lookups::Id; use crate::sync::block_lookups::Id;
use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext}; use crate::sync::network_context::{LookupRequestResult, ReqId, SyncNetworkContext};
use beacon_chain::BeaconChainTypes; use beacon_chain::BeaconChainTypes;
use itertools::Itertools; use itertools::Itertools;
use rand::seq::IteratorRandom; use rand::seq::IteratorRandom;
@@ -41,6 +41,13 @@ pub enum LookupRequestError {
Failed, Failed,
/// Attempted to retrieve a not known lookup id /// Attempted to retrieve a not known lookup id
UnknownLookup, UnknownLookup,
/// Received a download result for a different request id than the in-flight request.
/// There should only exist a single request at a time. Having multiple requests is a bug and
/// can result in undefined state, so it's treated as a hard error and the lookup is dropped.
UnexpectedRequestId {
expected_req_id: ReqId,
req_id: ReqId,
},
} }
pub struct SingleBlockLookup<T: BeaconChainTypes> { pub struct SingleBlockLookup<T: BeaconChainTypes> {
@@ -185,7 +192,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}; };
match request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? { match request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
LookupRequestResult::RequestSent => request.get_state_mut().on_download_start()?, LookupRequestResult::RequestSent(req_id) => {
request.get_state_mut().on_download_start(req_id)?
}
LookupRequestResult::NoRequestNeeded => { LookupRequestResult::NoRequestNeeded => {
request.get_state_mut().on_completed_request()? request.get_state_mut().on_completed_request()?
} }
@@ -272,7 +281,7 @@ pub struct DownloadResult<T: Clone> {
#[derive(Debug, PartialEq, Eq, IntoStaticStr)] #[derive(Debug, PartialEq, Eq, IntoStaticStr)]
pub enum State<T: Clone> { pub enum State<T: Clone> {
AwaitingDownload, AwaitingDownload,
Downloading, Downloading(ReqId),
AwaitingProcess(DownloadResult<T>), AwaitingProcess(DownloadResult<T>),
/// Request is processing, sent by lookup sync /// Request is processing, sent by lookup sync
Processing(DownloadResult<T>), Processing(DownloadResult<T>),
@@ -355,10 +364,10 @@ impl<T: Clone> SingleLookupRequestState<T> {
} }
/// Switch to `Downloading` if the request is in `AwaitingDownload` state, otherwise returns None. /// Switch to `Downloading` if the request is in `AwaitingDownload` state, otherwise returns None.
pub fn on_download_start(&mut self) -> Result<(), LookupRequestError> { pub fn on_download_start(&mut self, req_id: ReqId) -> Result<(), LookupRequestError> {
match &self.state { match &self.state {
State::AwaitingDownload => { State::AwaitingDownload => {
self.state = State::Downloading; self.state = State::Downloading(req_id);
Ok(()) Ok(())
} }
other => Err(LookupRequestError::BadState(format!( other => Err(LookupRequestError::BadState(format!(
@@ -369,9 +378,15 @@ impl<T: Clone> SingleLookupRequestState<T> {
/// Registers a failure in downloading a block. This might be a peer disconnection or a wrong /// Registers a failure in downloading a block. This might be a peer disconnection or a wrong
/// block. /// block.
pub fn on_download_failure(&mut self) -> Result<(), LookupRequestError> { pub fn on_download_failure(&mut self, req_id: ReqId) -> Result<(), LookupRequestError> {
match &self.state { match &self.state {
State::Downloading => { State::Downloading(expected_req_id) => {
if req_id != *expected_req_id {
return Err(LookupRequestError::UnexpectedRequestId {
expected_req_id: *expected_req_id,
req_id,
});
}
self.failed_downloading = self.failed_downloading.saturating_add(1); self.failed_downloading = self.failed_downloading.saturating_add(1);
self.state = State::AwaitingDownload; self.state = State::AwaitingDownload;
Ok(()) Ok(())
@@ -384,10 +399,17 @@ impl<T: Clone> SingleLookupRequestState<T> {
pub fn on_download_success( pub fn on_download_success(
&mut self, &mut self,
req_id: ReqId,
result: DownloadResult<T>, result: DownloadResult<T>,
) -> Result<(), LookupRequestError> { ) -> Result<(), LookupRequestError> {
match &self.state { match &self.state {
State::Downloading => { State::Downloading(expected_req_id) => {
if req_id != *expected_req_id {
return Err(LookupRequestError::UnexpectedRequestId {
expected_req_id: *expected_req_id,
req_id,
});
}
self.state = State::AwaitingProcess(result); self.state = State::AwaitingProcess(result);
Ok(()) Ok(())
} }

View File

@@ -819,7 +819,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
if let Some(resp) = self.network.on_single_block_response(id, block) { if let Some(resp) = self.network.on_single_block_response(id, block) {
self.block_lookups self.block_lookups
.on_download_response::<BlockRequestState<T::EthSpec>>( .on_download_response::<BlockRequestState<T::EthSpec>>(
id.lookup_id, id,
peer_id, peer_id,
resp, resp,
&mut self.network, &mut self.network,
@@ -861,7 +861,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
if let Some(resp) = self.network.on_single_blob_response(id, blob) { if let Some(resp) = self.network.on_single_blob_response(id, blob) {
self.block_lookups self.block_lookups
.on_download_response::<BlobRequestState<T::EthSpec>>( .on_download_response::<BlobRequestState<T::EthSpec>>(
id.lookup_id, id,
peer_id, peer_id,
resp, resp,
&mut self.network, &mut self.network,

View File

@@ -81,10 +81,13 @@ impl From<LookupVerifyError> for LookupFailure {
} }
} }
/// Sequential ID that uniquely identifies ReqResp outgoing requests
pub type ReqId = u32;
pub enum LookupRequestResult { pub enum LookupRequestResult {
/// A request is sent. Sync MUST receive an event from the network in the future for either: /// A request is sent. Sync MUST receive an event from the network in the future for either:
/// completed response or failed request /// completed response or failed request
RequestSent, RequestSent(ReqId),
/// No request is sent, and no further action is necessary to consider this request completed /// No request is sent, and no further action is necessary to consider this request completed
NoRequestNeeded, NoRequestNeeded,
/// No request is sent, but the request is not completed. Sync MUST receive some future event /// No request is sent, but the request is not completed. Sync MUST receive some future event
@@ -341,10 +344,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
return Ok(LookupRequestResult::Pending); return Ok(LookupRequestResult::Pending);
} }
let id = SingleLookupReqId { let req_id = self.next_id();
lookup_id, let id = SingleLookupReqId { lookup_id, req_id };
req_id: self.next_id(),
};
debug!( debug!(
self.log, self.log,
@@ -366,7 +367,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.blocks_by_root_requests self.blocks_by_root_requests
.insert(id, ActiveBlocksByRootRequest::new(request)); .insert(id, ActiveBlocksByRootRequest::new(request));
Ok(LookupRequestResult::RequestSent) Ok(LookupRequestResult::RequestSent(req_id))
} }
/// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking: /// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking:
@@ -416,10 +417,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
return Ok(LookupRequestResult::NoRequestNeeded); return Ok(LookupRequestResult::NoRequestNeeded);
} }
let id = SingleLookupReqId { let req_id = self.next_id();
lookup_id, let id = SingleLookupReqId { lookup_id, req_id };
req_id: self.next_id(),
};
debug!( debug!(
self.log, self.log,
@@ -445,7 +444,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.blobs_by_root_requests self.blobs_by_root_requests
.insert(id, ActiveBlobsByRootRequest::new(request)); .insert(id, ActiveBlobsByRootRequest::new(request));
Ok(LookupRequestResult::RequestSent) Ok(LookupRequestResult::RequestSent(req_id))
} }
pub fn is_execution_engine_online(&self) -> bool { pub fn is_execution_engine_online(&self) -> bool {