De-prioritize failed peers in block and payload lookups (#9542)

Range/backfill sync already de-prioritizes peers that failed a batch download (batch.failed_peers() -> peers_to_deprioritize). Block and payload lookups did not: SingleLookupRequestState only tracked a failure counter, not which peer failed, so a retry could re-select the same flaky peer.


  Track failed_peers per request-state, record the peer on download failure, and feed it into the block_lookup_request / payload_lookup_request peer-selection sort key, mirroring range sync. Custody lookups already have their own per-peer-attempt mechanism and are left unchanged.


Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com>
This commit is contained in:
Lion - dapplion
2026-06-26 21:36:56 +02:00
committed by GitHub
parent 812913643b
commit 5e9e86d0c3
4 changed files with 48 additions and 12 deletions

View File

@@ -433,6 +433,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn on_block_download_response(
&mut self,
id: SingleLookupReqId,
peer_id: PeerId,
response: BlockDownloadResponse<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) {
@@ -440,7 +441,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
debug!(?id, "Block returned for single block lookup not present");
return;
};
let result = lookup.on_block_download_response(id.req_id, response, cx);
let result = lookup.on_block_download_response(id.req_id, peer_id, response, cx);
self.on_lookup_result(id.lookup_id, result, "block_download_response", cx);
}
@@ -461,6 +462,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn on_payload_download_response(
&mut self,
id: SingleLookupReqId,
peer_id: PeerId,
response: PayloadDownloadResponse<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) {
@@ -471,7 +473,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
);
return;
};
let result = lookup.on_payload_download_response(id.req_id, response, cx);
let result = lookup.on_payload_download_response(id.req_id, peer_id, response, cx);
self.on_lookup_result(id.lookup_id, result, "payload_download_response", cx);
}

View File

@@ -379,9 +379,11 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
let _guard = self.span.clone().entered();
// === Block request ===
self.block_request.state.maybe_start_downloading(|| {
cx.block_lookup_request(self.id, self.peers.clone(), self.block_root)
})?;
self.block_request
.state
.maybe_start_downloading(|failed_peers| {
cx.block_lookup_request(self.id, self.peers.clone(), failed_peers, self.block_root)
})?;
if self.awaiting_parent.is_none()
&& let Some(data) = self.block_request.state.maybe_start_processing()
{
@@ -412,7 +414,8 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
}
DataRequest::Request { slot, peers, state } => {
state.maybe_start_downloading(|| {
// Custody selects/de-prioritizes peers internally in `ActiveCustodyRequest`.
state.maybe_start_downloading(|_| {
let req_id = cx.next_id();
cx.custody_lookup_request(
CustodyRequester::SingleLookup(SingleLookupReqId {
@@ -464,8 +467,13 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
}
PayloadRequest::Request { peers, state } => {
state.maybe_start_downloading(|| {
cx.payload_lookup_request(self.id, peers.clone(), self.block_root)
state.maybe_start_downloading(|failed_peers| {
cx.payload_lookup_request(
self.id,
peers.clone(),
failed_peers,
self.block_root,
)
})?;
// The envelope can only be verified once the block itself is imported;
// otherwise processing returns `BlockRootUnknown` and the lookup burns retries
@@ -621,9 +629,13 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
pub fn on_block_download_response(
&mut self,
req_id: ReqId,
peer_id: PeerId,
result: BlockDownloadResponse<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupResult, LookupRequestError> {
if result.is_err() {
self.block_request.state.record_failed_peer(peer_id);
}
self.block_request
.state
.on_download_response(req_id, result)?;
@@ -641,6 +653,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
return Err(LookupRequestError::BadState("no data_request".to_owned()));
};
// Custody requests track and de-prioritize failed peers internally in `ActiveCustodyRequest`.
state.on_download_response(req_id, result)?;
self.continue_requests(cx)
}
@@ -649,6 +662,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
pub fn on_payload_download_response(
&mut self,
req_id: ReqId,
peer_id: PeerId,
result: PayloadDownloadResponse<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupResult, LookupRequestError> {
@@ -658,6 +672,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
));
};
if result.is_err() {
state.record_failed_peer(peer_id);
}
state.on_download_response(req_id, result)?;
self.continue_requests(cx)
}
@@ -752,6 +769,9 @@ pub struct SingleLookupRequestState<T: Clone> {
failed_processing: u8,
/// How many times have we attempted to download this block or blob.
failed_downloading: u8,
/// Peers that have failed to serve this request. Used to de-prioritize them when selecting a
/// peer to retry the download from.
failed_peers: HashSet<PeerId>,
}
impl<T: Clone> SingleLookupRequestState<T> {
@@ -760,6 +780,7 @@ impl<T: Clone> SingleLookupRequestState<T> {
state: State::AwaitingDownload("not started"),
failed_processing: 0,
failed_downloading: 0,
failed_peers: HashSet::new(),
}
}
@@ -814,10 +835,10 @@ impl<T: Clone> SingleLookupRequestState<T> {
/// Drive download: check max attempts, issue request, handle result.
fn maybe_start_downloading(
&mut self,
request_fn: impl FnOnce() -> Result<LookupRequestResult<T>, RpcRequestSendError>,
request_fn: impl FnOnce(&HashSet<PeerId>) -> Result<LookupRequestResult<T>, RpcRequestSendError>,
) -> Result<(), LookupRequestError> {
if self.is_awaiting_download() {
match request_fn().map_err(LookupRequestError::SendFailedNetwork)? {
match request_fn(&self.failed_peers).map_err(LookupRequestError::SendFailedNetwork)? {
LookupRequestResult::RequestSent(req_id) => self.on_download_start(req_id)?,
LookupRequestResult::NoRequestNeeded(reason, value) => {
self.on_completed_request(reason, value)?
@@ -862,6 +883,11 @@ impl<T: Clone> SingleLookupRequestState<T> {
}
}
/// Record a peer that failed to serve this request, to be de-prioritized on retry.
fn record_failed_peer(&mut self, peer_id: PeerId) {
self.failed_peers.insert(peer_id);
}
pub fn on_download_response(
&mut self,
req_id: ReqId,

View File

@@ -1129,6 +1129,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
if let Some(resp) = self.network.on_single_block_response(id, peer_id, block) {
self.block_lookups.on_block_download_response(
id,
peer_id,
resp.map(|value| DownloadResult::new(value, PeerGroup::from_single(peer_id))),
&mut self.network,
)
@@ -1212,6 +1213,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
{
self.block_lookups.on_payload_download_response(
id,
peer_id,
resp.map(|value| DownloadResult::new(value, PeerGroup::from_single(peer_id))),
&mut self.network,
)

View File

@@ -712,6 +712,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&mut self,
lookup_id: SingleLookupId,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
peers_to_deprioritize: &HashSet<PeerId>,
block_root: Hash256,
) -> Result<LookupRequestResult<Arc<SignedBeaconBlock<T::EthSpec>>>, RpcRequestSendError> {
let blocks_by_root_per_peer = ActiveRequestsPerPeer::new(&self.blocks_by_root_requests);
@@ -720,6 +721,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.iter()
.map(|peer| {
(
// De-prioritize peers that have already failed this request
peers_to_deprioritize.contains(peer),
// Strictly de-prioritize peers already at the per-protocol concurrency limit
blocks_by_root_per_peer.at_concurrency_limit(peer),
// Random factor to break ties, otherwise the PeerID breaks ties
@@ -728,7 +731,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
)
})
.min()
.map(|(_, _, peer)| *peer)
.map(|(_, _, _, peer)| *peer)
else {
// Allow lookup to not have any peers and do nothing. This is an optimization to not
// lose progress of lookups created from a block with unknown parent before we receive
@@ -818,6 +821,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&mut self,
lookup_id: SingleLookupId,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
peers_to_deprioritize: &HashSet<PeerId>,
block_root: Hash256,
) -> Result<
LookupRequestResult<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
@@ -841,6 +845,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.iter()
.map(|peer| {
(
// De-prioritize peers that have already failed this request
peers_to_deprioritize.contains(peer),
// Strictly de-prioritize peers already at the per-protocol concurrency limit
payload_envelopes_by_root_per_peer.at_concurrency_limit(peer),
rand::random::<u32>(),
@@ -848,7 +854,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
)
})
.min()
.map(|(_, _, peer)| *peer)
.map(|(_, _, _, peer)| *peer)
else {
return Ok(LookupRequestResult::Pending("no peers"));
};