mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-06 18:21:45 +00:00
Ensure proper ReqResp blocks_by_* response stream termination (#5582)
* Ensure proper ReqResp blocks_by_* response stream termination * retrigger CI
This commit is contained in:
@@ -134,13 +134,37 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
request_id: PeerRequestId,
|
||||
request: BlocksByRootRequest,
|
||||
) {
|
||||
self.terminate_response_stream(
|
||||
peer_id,
|
||||
request_id,
|
||||
self.clone()
|
||||
.handle_blocks_by_root_request_inner(executor, peer_id, request_id, request)
|
||||
.await,
|
||||
Response::BlocksByRoot,
|
||||
);
|
||||
}
|
||||
|
||||
/// Handle a `BlocksByRoot` request from the peer.
|
||||
pub async fn handle_blocks_by_root_request_inner(
|
||||
self: Arc<Self>,
|
||||
executor: TaskExecutor,
|
||||
peer_id: PeerId,
|
||||
request_id: PeerRequestId,
|
||||
request: BlocksByRootRequest,
|
||||
) -> Result<(), (RPCResponseErrorCode, &'static str)> {
|
||||
let requested_blocks = request.block_roots().len();
|
||||
let mut block_stream = match self
|
||||
.chain
|
||||
.get_blocks_checking_caches(request.block_roots().to_vec(), &executor)
|
||||
{
|
||||
Ok(block_stream) => block_stream,
|
||||
Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e),
|
||||
Err(e) => {
|
||||
error!(self.log, "Error getting block stream"; "error" => ?e);
|
||||
return Err((
|
||||
RPCResponseErrorCode::ServerError,
|
||||
"Error getting block stream",
|
||||
));
|
||||
}
|
||||
};
|
||||
// Fetching blocks is async because it may have to hit the execution layer for payloads.
|
||||
let mut send_block_count = 0;
|
||||
@@ -169,13 +193,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
"block_root" => ?root,
|
||||
"reason" => "execution layer not synced",
|
||||
);
|
||||
// send the stream terminator
|
||||
return self.send_error_response(
|
||||
peer_id,
|
||||
return Err((
|
||||
RPCResponseErrorCode::ResourceUnavailable,
|
||||
"Execution layer not synced".into(),
|
||||
request_id,
|
||||
);
|
||||
"Execution layer not synced",
|
||||
));
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(
|
||||
@@ -196,8 +217,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
"returned" => %send_block_count
|
||||
);
|
||||
|
||||
// send stream termination
|
||||
self.send_response(peer_id, Response::BlocksByRoot(None), request_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle a `BlobsByRoot` request from the peer.
|
||||
@@ -380,6 +400,24 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
request_id: PeerRequestId,
|
||||
req: BlocksByRangeRequest,
|
||||
) {
|
||||
self.terminate_response_stream(
|
||||
peer_id,
|
||||
request_id,
|
||||
self.clone()
|
||||
.handle_blocks_by_range_request_inner(executor, peer_id, request_id, req)
|
||||
.await,
|
||||
Response::BlocksByRange,
|
||||
);
|
||||
}
|
||||
|
||||
/// Handle a `BlocksByRange` request from the peer.
|
||||
pub async fn handle_blocks_by_range_request_inner(
|
||||
self: Arc<Self>,
|
||||
executor: TaskExecutor,
|
||||
peer_id: PeerId,
|
||||
request_id: PeerRequestId,
|
||||
req: BlocksByRangeRequest,
|
||||
) -> Result<(), (RPCResponseErrorCode, &'static str)> {
|
||||
debug!(self.log, "Received BlocksByRange Request";
|
||||
"peer_id" => %peer_id,
|
||||
"count" => req.count(),
|
||||
@@ -401,12 +439,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
}
|
||||
});
|
||||
if *req.count() > max_request_size {
|
||||
return self.send_error_response(
|
||||
peer_id,
|
||||
return Err((
|
||||
RPCResponseErrorCode::InvalidRequest,
|
||||
format!("Request exceeded max size {max_request_size}"),
|
||||
request_id,
|
||||
);
|
||||
"Request exceeded max size",
|
||||
));
|
||||
}
|
||||
|
||||
let forwards_block_root_iter = match self
|
||||
@@ -424,25 +460,15 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
"requested_slot" => slot,
|
||||
"oldest_known_slot" => oldest_block_slot
|
||||
);
|
||||
return self.send_error_response(
|
||||
peer_id,
|
||||
RPCResponseErrorCode::ResourceUnavailable,
|
||||
"Backfilling".into(),
|
||||
request_id,
|
||||
);
|
||||
return Err((RPCResponseErrorCode::ResourceUnavailable, "Backfilling"));
|
||||
}
|
||||
Err(e) => {
|
||||
self.send_error_response(
|
||||
peer_id,
|
||||
RPCResponseErrorCode::ServerError,
|
||||
"Database error".into(),
|
||||
request_id,
|
||||
);
|
||||
return error!(self.log, "Unable to obtain root iter";
|
||||
error!(self.log, "Unable to obtain root iter";
|
||||
"request" => ?req,
|
||||
"peer" => %peer_id,
|
||||
"error" => ?e
|
||||
);
|
||||
return Err((RPCResponseErrorCode::ServerError, "Database error"));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -468,11 +494,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
let block_roots = match maybe_block_roots {
|
||||
Ok(block_roots) => block_roots,
|
||||
Err(e) => {
|
||||
return error!(self.log, "Error during iteration over blocks";
|
||||
error!(self.log, "Error during iteration over blocks";
|
||||
"request" => ?req,
|
||||
"peer" => %peer_id,
|
||||
"error" => ?e
|
||||
)
|
||||
);
|
||||
return Err((RPCResponseErrorCode::ServerError, "Iteration error"));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -481,7 +508,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
|
||||
let mut block_stream = match self.chain.get_blocks(block_roots, &executor) {
|
||||
Ok(block_stream) => block_stream,
|
||||
Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e),
|
||||
Err(e) => {
|
||||
error!(self.log, "Error getting block stream"; "error" => ?e);
|
||||
return Err((RPCResponseErrorCode::ServerError, "Iterator error"));
|
||||
}
|
||||
};
|
||||
|
||||
// Fetching blocks is async because it may have to hit the execution layer for payloads.
|
||||
@@ -511,12 +541,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
"peer" => %peer_id,
|
||||
"request_root" => ?root
|
||||
);
|
||||
return self.send_error_response(
|
||||
peer_id,
|
||||
RPCResponseErrorCode::ServerError,
|
||||
"Database inconsistency".into(),
|
||||
request_id,
|
||||
);
|
||||
return Err((RPCResponseErrorCode::ServerError, "Database inconsistency"));
|
||||
}
|
||||
Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => {
|
||||
debug!(
|
||||
@@ -526,12 +551,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
"reason" => "execution layer not synced",
|
||||
);
|
||||
// send the stream terminator
|
||||
return self.send_error_response(
|
||||
peer_id,
|
||||
return Err((
|
||||
RPCResponseErrorCode::ResourceUnavailable,
|
||||
"Execution layer not synced".into(),
|
||||
request_id,
|
||||
);
|
||||
"Execution layer not synced",
|
||||
));
|
||||
}
|
||||
Err(e) => {
|
||||
if matches!(
|
||||
@@ -556,12 +579,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
}
|
||||
|
||||
// send the stream terminator
|
||||
return self.send_error_response(
|
||||
peer_id,
|
||||
RPCResponseErrorCode::ServerError,
|
||||
"Failed fetching blocks".into(),
|
||||
request_id,
|
||||
);
|
||||
return Err((RPCResponseErrorCode::ServerError, "Failed fetching blocks"));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -594,12 +612,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
);
|
||||
}
|
||||
|
||||
// send the stream terminator
|
||||
self.send_network_message(NetworkMessage::SendResponse {
|
||||
peer_id,
|
||||
response: Response::BlocksByRange(None),
|
||||
id: request_id,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle a `BlobsByRange` request from the peer.
|
||||
|
||||
Reference in New Issue
Block a user