improve retry code

This commit is contained in:
realbigsean
2023-04-24 18:56:19 -04:00
parent 0560b7d1a5
commit 1d18756303

View File

@@ -434,32 +434,28 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut Id, &mut Id,
&mut SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T>, &mut SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T>,
)> { )> {
let lookup: Option<( let lookup =
bool, self.single_block_lookups
&mut Id, .iter_mut()
&mut SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T>, .find_map(|(block_id_opt, blob_id_opt, req)| {
)> = self let id_opt = match response_type {
.single_block_lookups ResponseType::Block => block_id_opt,
.iter_mut() ResponseType::Blob => blob_id_opt,
.find_map(|(block_id_opt, blob_id_opt, req)| { };
let id_opt = match response_type { if let Some(lookup_id) = id_opt {
ResponseType::Block => block_id_opt, if *lookup_id == target_id {
ResponseType::Blob => blob_id_opt, // Only send for processing if we don't have parent requests that were triggered by
}; // this block.
if let Some(lookup_id) = id_opt { let triggered_parent_request = self
if *lookup_id == target_id { .parent_lookups
// Only send for processing if we don't have parent requests that were triggered by .iter()
// this block. .any(|lookup| lookup.chain_hash() == req.requested_block_root);
let triggered_parent_request = self
.parent_lookups
.iter()
.any(|lookup| lookup.chain_hash() == req.requested_block_root);
return Some((triggered_parent_request, lookup_id, req)); return Some((triggered_parent_request, lookup_id, req));
}
} }
} None
None });
});
let (triggered_parent_request, id_ref, request) = match lookup { let (triggered_parent_request, id_ref, request) = match lookup {
Some((triggered_parent_request, id_ref, req)) => { Some((triggered_parent_request, id_ref, req)) => {
@@ -672,70 +668,32 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn peer_disconnected(&mut self, peer_id: &PeerId, cx: &mut SyncNetworkContext<T>) { pub fn peer_disconnected(&mut self, peer_id: &PeerId, cx: &mut SyncNetworkContext<T>) {
self.single_block_lookups self.single_block_lookups
.retain_mut(|(block_id, blob_id, req)| { .retain_mut(|(block_id_opt, blob_id_opt, req)| {
if req let should_remove_block = block_id_opt
.block_request_state .as_mut()
.filter(|_| req.block_request_state
.check_peer_disconnected(peer_id) .check_peer_disconnected(peer_id)
.is_err() .is_err())
{ .map(|block_id| {
// retry the request
match req.request_block() { trace!(self.log, "Single block lookup failed on peer disconnection"; "block_root" => ?req.requested_block_root, );
Ok(Some((peer_id, block_request))) => { retry_request_after_failure(block_id, req, ResponseType::Block, peer_id, cx, &self.log)
if let Ok(request_id) = })
cx.single_block_lookup_request(peer_id, block_request) .unwrap_or(ShouldRemoveLookup::False);
{
*block_id = Some(request_id); let should_remove_blob = blob_id_opt
return true; .as_mut()
} .filter(|_| req.blob_request_state
}
Ok(None) => {
// We've already successfully downloaded the block, we may be waiting
// for blobs, so don't drop the lookup.
return true;
}
Err(e) => {
trace!(
self.log,
"Single block request failed on peer disconnection";
"block_root" => %req.requested_block_root,
"peer_id" => %peer_id,
"reason" => <&str>::from(e),
);
}
}
}
if req
.blob_request_state
.check_peer_disconnected(peer_id) .check_peer_disconnected(peer_id)
.is_err() .is_err())
{ .map(|blob_id| {
// retry the request
match req.request_blobs() { trace!(self.log, "Single blob lookup failed on peer disconnection"; "block_root" => ?req.requested_block_root, );
Ok(Some((peer_id, blobs_request))) => { retry_request_after_failure(blob_id, req, ResponseType::Blob, peer_id, cx, &self.log)
if let Ok(request_id) = })
cx.single_blobs_lookup_request(peer_id, blobs_request) .unwrap_or(ShouldRemoveLookup::False);
{
*blob_id = Some(request_id); matches!(should_remove_block, ShouldRemoveLookup::False) && matches!(should_remove_blob, ShouldRemoveLookup::False)
return true;
}
}
Ok(None) => {
// We've already successfully downloaded the blobs, we may be waiting
// for block, so don't drop the lookup.
return true;
}
Err(e) => {
trace!(
self.log,
"Single blobs request failed on peer disconnection";
"block_root" => %req.requested_block_root,
"peer_id" => %peer_id,
"reason" => <&str>::from(e),
);
}
}
}
false
}); });
/* Check disconnection for parent lookups */ /* Check disconnection for parent lookups */
@@ -757,8 +715,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
error: RPCError, error: RPCError,
) { ) {
//TODO(sean) check if there's a pending blob response when deciding whether to drop?
if let Some(pos) = self if let Some(pos) = self
.parent_lookups .parent_lookups
.iter() .iter()
@@ -799,30 +755,29 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
error: RPCError, error: RPCError,
) { ) {
let msg = error.as_static_str();
self.single_block_lookups.retain_mut(|(block_id_opt, blob_id_opt, req)|{ self.single_block_lookups.retain_mut(|(block_id_opt, blob_id_opt, req)|{
let should_remove_block = if let Some(block_id) = block_id_opt.as_mut() {
if *block_id == id {
req.block_request_state.register_failure_downloading();
trace!(self.log, "Single block lookup failed"; "block" => %req.requested_block_root);
retry_request_after_failure(block_id, req, ResponseType::Block, peer_id, error.as_static_str(), cx, &self.log)
} else {
ShouldRemoveLookup::False
}
} else {
ShouldRemoveLookup::False
};
let should_remove_blob = if let Some(blob_id) = blob_id_opt.as_mut() { let should_remove_block = block_id_opt
if *blob_id == id { .as_mut()
req.blob_request_state.register_failure_downloading(); .filter(|block_id| **block_id == id)
trace!(self.log, "Single blob lookup failed"; "block" => %req.requested_block_root); .map(|block_id| {
retry_request_after_failure(blob_id, req, ResponseType::Block, peer_id,error.as_static_str(), cx, &self.log) req.block_request_state.register_failure_downloading();
} else { trace!(self.log, "Single block lookup failed"; "block" => %req.requested_block_root, "error" => msg);
ShouldRemoveLookup::False retry_request_after_failure(block_id, req, ResponseType::Block, peer_id, cx, &self.log)
} })
} else { .unwrap_or(ShouldRemoveLookup::False);
ShouldRemoveLookup::False
}; let should_remove_blob = blob_id_opt
.as_mut()
.filter(|blob_id| **blob_id == id)
.map(|blob_id| {
req.blob_request_state.register_failure_downloading();
trace!(self.log, "Single blob lookup failed"; "block" => %req.requested_block_root, "error" =>msg);
retry_request_after_failure(blob_id, req, ResponseType::Block, peer_id, cx, &self.log)
})
.unwrap_or(ShouldRemoveLookup::False);
matches!(should_remove_block, ShouldRemoveLookup::False) && matches!(should_remove_blob, ShouldRemoveLookup::False) matches!(should_remove_block, ShouldRemoveLookup::False) && matches!(should_remove_blob, ShouldRemoveLookup::False)
}); });
@@ -1410,18 +1365,16 @@ fn handle_block_lookup_verify_error<T: BeaconChainTypes>(
request_ref, request_ref,
response_type, response_type,
&peer_id, &peer_id,
msg,
cx, cx,
log, log,
) )
} }
fn retry_request_after_failure<T: BeaconChainTypes, Err: Debug>( fn retry_request_after_failure<T: BeaconChainTypes>(
request_id_ref: &mut u32, request_id_ref: &mut u32,
request_ref: &mut SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T>, request_ref: &mut SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T>,
response_type: ResponseType, response_type: ResponseType,
initial_peer_id: &PeerId, initial_peer_id: &PeerId,
e: Err,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
log: &Logger, log: &Logger,
) -> ShouldRemoveLookup { ) -> ShouldRemoveLookup {