diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index ef9807f037..b6d78e5e31 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -433,6 +433,7 @@ impl BlockLookups { pub fn on_block_download_response( &mut self, id: SingleLookupReqId, + peer_id: PeerId, response: BlockDownloadResponse, cx: &mut SyncNetworkContext, ) { @@ -440,7 +441,7 @@ impl BlockLookups { debug!(?id, "Block returned for single block lookup not present"); return; }; - let result = lookup.on_block_download_response(id.req_id, response, cx); + let result = lookup.on_block_download_response(id.req_id, peer_id, response, cx); self.on_lookup_result(id.lookup_id, result, "block_download_response", cx); } @@ -461,6 +462,7 @@ impl BlockLookups { pub fn on_payload_download_response( &mut self, id: SingleLookupReqId, + peer_id: PeerId, response: PayloadDownloadResponse, cx: &mut SyncNetworkContext, ) { @@ -471,7 +473,7 @@ impl BlockLookups { ); return; }; - let result = lookup.on_payload_download_response(id.req_id, response, cx); + let result = lookup.on_payload_download_response(id.req_id, peer_id, response, cx); self.on_lookup_result(id.lookup_id, result, "payload_download_response", cx); } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 7a7a2f1d56..6962e4bc1e 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -379,9 +379,11 @@ impl SingleBlockLookup { let _guard = self.span.clone().entered(); // === Block request === - self.block_request.state.maybe_start_downloading(|| { - cx.block_lookup_request(self.id, self.peers.clone(), self.block_root) - })?; + self.block_request + .state + .maybe_start_downloading(|failed_peers| { + cx.block_lookup_request(self.id, self.peers.clone(), failed_peers, self.block_root) + })?; if self.awaiting_parent.is_none() && let Some(data) = self.block_request.state.maybe_start_processing() { @@ -412,7 +414,8 @@ impl SingleBlockLookup { } } DataRequest::Request { slot, peers, state } => { - state.maybe_start_downloading(|| { + // Custody selects/de-prioritizes peers internally in `ActiveCustodyRequest`. + state.maybe_start_downloading(|_| { let req_id = cx.next_id(); cx.custody_lookup_request( CustodyRequester::SingleLookup(SingleLookupReqId { @@ -464,8 +467,13 @@ impl SingleBlockLookup { } } PayloadRequest::Request { peers, state } => { - state.maybe_start_downloading(|| { - cx.payload_lookup_request(self.id, peers.clone(), self.block_root) + state.maybe_start_downloading(|failed_peers| { + cx.payload_lookup_request( + self.id, + peers.clone(), + failed_peers, + self.block_root, + ) })?; // The envelope can only be verified once the block itself is imported; // otherwise processing returns `BlockRootUnknown` and the lookup burns retries @@ -621,9 +629,13 @@ impl SingleBlockLookup { pub fn on_block_download_response( &mut self, req_id: ReqId, + peer_id: PeerId, result: BlockDownloadResponse, cx: &mut SyncNetworkContext, ) -> Result { + if result.is_err() { + self.block_request.state.record_failed_peer(peer_id); + } self.block_request .state .on_download_response(req_id, result)?; @@ -641,6 +653,7 @@ impl SingleBlockLookup { return Err(LookupRequestError::BadState("no data_request".to_owned())); }; + // Custody requests track and de-prioritize failed peers internally in `ActiveCustodyRequest`. state.on_download_response(req_id, result)?; self.continue_requests(cx) } @@ -649,6 +662,7 @@ impl SingleBlockLookup { pub fn on_payload_download_response( &mut self, req_id: ReqId, + peer_id: PeerId, result: PayloadDownloadResponse, cx: &mut SyncNetworkContext, ) -> Result { @@ -658,6 +672,9 @@ impl SingleBlockLookup { )); }; + if result.is_err() { + state.record_failed_peer(peer_id); + } state.on_download_response(req_id, result)?; self.continue_requests(cx) } @@ -752,6 +769,9 @@ pub struct SingleLookupRequestState { failed_processing: u8, /// How many times have we attempted to download this block or blob. failed_downloading: u8, + /// Peers that have failed to serve this request. Used to de-prioritize them when selecting a + /// peer to retry the download from. + failed_peers: HashSet, } impl SingleLookupRequestState { @@ -760,6 +780,7 @@ impl SingleLookupRequestState { state: State::AwaitingDownload("not started"), failed_processing: 0, failed_downloading: 0, + failed_peers: HashSet::new(), } } @@ -814,10 +835,10 @@ impl SingleLookupRequestState { /// Drive download: check max attempts, issue request, handle result. fn maybe_start_downloading( &mut self, - request_fn: impl FnOnce() -> Result, RpcRequestSendError>, + request_fn: impl FnOnce(&HashSet) -> Result, RpcRequestSendError>, ) -> Result<(), LookupRequestError> { if self.is_awaiting_download() { - match request_fn().map_err(LookupRequestError::SendFailedNetwork)? { + match request_fn(&self.failed_peers).map_err(LookupRequestError::SendFailedNetwork)? { LookupRequestResult::RequestSent(req_id) => self.on_download_start(req_id)?, LookupRequestResult::NoRequestNeeded(reason, value) => { self.on_completed_request(reason, value)? @@ -862,6 +883,11 @@ impl SingleLookupRequestState { } } + /// Record a peer that failed to serve this request, to be de-prioritized on retry. + fn record_failed_peer(&mut self, peer_id: PeerId) { + self.failed_peers.insert(peer_id); + } + pub fn on_download_response( &mut self, req_id: ReqId, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 8741584602..f2c01bb4da 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -1129,6 +1129,7 @@ impl SyncManager { if let Some(resp) = self.network.on_single_block_response(id, peer_id, block) { self.block_lookups.on_block_download_response( id, + peer_id, resp.map(|value| DownloadResult::new(value, PeerGroup::from_single(peer_id))), &mut self.network, ) @@ -1212,6 +1213,7 @@ impl SyncManager { { self.block_lookups.on_payload_download_response( id, + peer_id, resp.map(|value| DownloadResult::new(value, PeerGroup::from_single(peer_id))), &mut self.network, ) diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 6ea878f2f3..3375d496f8 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -712,6 +712,7 @@ impl SyncNetworkContext { &mut self, lookup_id: SingleLookupId, lookup_peers: Arc>>, + peers_to_deprioritize: &HashSet, block_root: Hash256, ) -> Result>>, RpcRequestSendError> { let blocks_by_root_per_peer = ActiveRequestsPerPeer::new(&self.blocks_by_root_requests); @@ -720,6 +721,8 @@ impl SyncNetworkContext { .iter() .map(|peer| { ( + // De-prioritize peers that have already failed this request + peers_to_deprioritize.contains(peer), // Strictly de-prioritize peers already at the per-protocol concurrency limit blocks_by_root_per_peer.at_concurrency_limit(peer), // Random factor to break ties, otherwise the PeerID breaks ties @@ -728,7 +731,7 @@ impl SyncNetworkContext { ) }) .min() - .map(|(_, _, peer)| *peer) + .map(|(_, _, _, peer)| *peer) else { // Allow lookup to not have any peers and do nothing. This is an optimization to not // lose progress of lookups created from a block with unknown parent before we receive @@ -818,6 +821,7 @@ impl SyncNetworkContext { &mut self, lookup_id: SingleLookupId, lookup_peers: Arc>>, + peers_to_deprioritize: &HashSet, block_root: Hash256, ) -> Result< LookupRequestResult>>, @@ -841,6 +845,8 @@ impl SyncNetworkContext { .iter() .map(|peer| { ( + // De-prioritize peers that have already failed this request + peers_to_deprioritize.contains(peer), // Strictly de-prioritize peers already at the per-protocol concurrency limit payload_envelopes_by_root_per_peer.at_concurrency_limit(peer), rand::random::(), @@ -848,7 +854,7 @@ impl SyncNetworkContext { ) }) .min() - .map(|(_, _, peer)| *peer) + .map(|(_, _, _, peer)| *peer) else { return Ok(LookupRequestResult::Pending("no peers")); };