Ensure proper ReqResp response stream termination (#5556)

* Ensure proper ReqResp response stream termination

* Update beacon_node/network/src/network_beacon_processor/rpc_methods.rs

* Update beacon_node/network/src/network_beacon_processor/rpc_methods.rs

* cargo fmt
This commit is contained in:
Lion - dapplion
2024-04-13 01:15:04 +09:00
committed by GitHub
parent 6fb0b2ed78
commit 116a55e8a5

View File

@@ -212,6 +212,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"load_blocks_by_root_blocks",
)
}
/// Handle a `BlobsByRoot` request from the peer.
pub fn handle_blobs_by_root_request(
self: Arc<Self>,
@@ -219,10 +220,25 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
request_id: PeerRequestId,
request: BlobsByRootRequest,
) {
self.terminate_response_stream(
peer_id,
request_id,
self.handle_blobs_by_root_request_inner(peer_id, request_id, request),
Response::BlobsByRoot,
);
}
/// Handle a `BlobsByRoot` request from the peer.
pub fn handle_blobs_by_root_request_inner(
&self,
peer_id: PeerId,
request_id: PeerRequestId,
request: BlobsByRootRequest,
) -> Result<(), (RPCResponseErrorCode, &'static str)> {
let Some(requested_root) = request.blob_ids.as_slice().first().map(|id| id.block_root)
else {
// No blob ids requested.
return;
return Ok(());
};
let requested_indices = request
.blob_ids
@@ -231,7 +247,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.map(|id| id.index)
.collect::<Vec<_>>();
let mut send_blob_count = 0;
let send_response = true;
let mut blob_list_results = HashMap::new();
for id in request.blob_ids.as_slice() {
@@ -287,10 +302,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"returned" => send_blob_count
);
// send stream termination
if send_response {
self.send_response(peer_id, Response::BlobsByRoot(None), request_id);
}
Ok(())
}
/// Handle a `LightClientBootstrap` request from the peer.
@@ -300,33 +312,29 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
request_id: PeerRequestId,
request: LightClientBootstrapRequest,
) {
let block_root = request.root;
match self.chain.get_light_client_bootstrap(&block_root) {
Ok(Some((bootstrap, _))) => self.send_response(
peer_id,
Response::LightClientBootstrap(Arc::new(bootstrap)),
request_id,
),
Ok(None) => self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Bootstrap not available".into(),
request_id,
),
Err(e) => {
self.send_error_response(
peer_id,
self.terminate_response_single_item(
peer_id,
request_id,
match self.chain.get_light_client_bootstrap(&request.root) {
Ok(Some((bootstrap, _))) => Ok(Arc::new(bootstrap)),
Ok(None) => Err((
RPCResponseErrorCode::ResourceUnavailable,
"Bootstrap not available".into(),
request_id,
);
error!(self.log, "Error getting LightClientBootstrap instance";
"block_root" => ?block_root,
"peer" => %peer_id,
"error" => ?e
)
}
};
"Bootstrap not available",
)),
Err(e) => {
error!(self.log, "Error getting LightClientBootstrap instance";
"block_root" => ?request.root,
"peer" => %peer_id,
"error" => ?e
);
Err((
RPCResponseErrorCode::ResourceUnavailable,
"Bootstrap not available",
))
}
},
Response::LightClientBootstrap,
);
}
/// Handle a `LightClientOptimisticUpdate` request from the peer.
@@ -335,25 +343,22 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
peer_id: PeerId,
request_id: PeerRequestId,
) {
let Some(light_client_optimistic_update) = self
.chain
.light_client_server_cache
.get_latest_optimistic_update()
else {
self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Latest optimistic update not available".into(),
request_id,
);
return;
};
self.send_response(
self.terminate_response_single_item(
peer_id,
Response::LightClientOptimisticUpdate(Arc::new(light_client_optimistic_update)),
request_id,
)
match self
.chain
.light_client_server_cache
.get_latest_optimistic_update()
{
Some(update) => Ok(Arc::new(update)),
None => Err((
RPCResponseErrorCode::ResourceUnavailable,
"Latest optimistic update not available",
)),
},
Response::LightClientOptimisticUpdate,
);
}
/// Handle a `LightClientFinalityUpdate` request from the peer.
@@ -362,25 +367,22 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
peer_id: PeerId,
request_id: PeerRequestId,
) {
let Some(light_client_finality_update) = self
.chain
.light_client_server_cache
.get_latest_finality_update()
else {
self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Latest finality update not available".into(),
request_id,
);
return;
};
self.send_response(
self.terminate_response_single_item(
peer_id,
Response::LightClientFinalityUpdate(Arc::new(light_client_finality_update)),
request_id,
)
match self
.chain
.light_client_server_cache
.get_latest_finality_update()
{
Some(update) => Ok(Arc::new(update)),
None => Err((
RPCResponseErrorCode::ResourceUnavailable,
"Latest finality update not available",
)),
},
Response::LightClientFinalityUpdate,
);
}
/// Handle a `BlocksByRange` request from the peer.
@@ -637,6 +639,21 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
request_id: PeerRequestId,
req: BlobsByRangeRequest,
) {
self.terminate_response_stream(
peer_id,
request_id,
self.handle_blobs_by_range_request_inner(peer_id, request_id, req),
Response::BlobsByRange,
);
}
/// Handle a `BlobsByRange` request from the peer.
fn handle_blobs_by_range_request_inner(
&self,
peer_id: PeerId,
request_id: PeerRequestId,
req: BlobsByRangeRequest,
) -> Result<(), (RPCResponseErrorCode, &'static str)> {
debug!(self.log, "Received BlobsByRange Request";
"peer_id" => %peer_id,
"count" => req.count,
@@ -645,12 +662,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// Should not send more than max request blocks
if req.max_blobs_requested::<T::EthSpec>() > self.chain.spec.max_request_blob_sidecars {
return self.send_error_response(
peer_id,
return Err((
RPCResponseErrorCode::InvalidRequest,
"Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`".into(),
request_id,
);
"Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`",
));
}
let request_start_slot = Slot::from(req.start_slot);
@@ -659,13 +674,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()),
None => {
debug!(self.log, "Deneb fork is disabled");
self.send_error_response(
peer_id,
return Err((
RPCResponseErrorCode::InvalidRequest,
"Deneb fork is disabled".into(),
request_id,
);
return;
"Deneb fork is disabled",
));
}
};
@@ -685,19 +697,15 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
return if data_availability_boundary_slot < oldest_blob_slot {
self.send_error_response(
peer_id,
Err((
RPCResponseErrorCode::ResourceUnavailable,
"blobs pruned within boundary".into(),
request_id,
)
"blobs pruned within boundary",
))
} else {
self.send_error_response(
peer_id,
Err((
RPCResponseErrorCode::InvalidRequest,
"Req outside availability period".into(),
request_id,
)
"Req outside availability period",
))
};
}
@@ -714,25 +722,15 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"requested_slot" => slot,
"oldest_known_slot" => oldest_block_slot
);
return self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Backfilling".into(),
request_id,
);
return Err((RPCResponseErrorCode::ResourceUnavailable, "Backfilling"));
}
Err(e) => {
self.send_error_response(
peer_id,
RPCResponseErrorCode::ServerError,
"Database error".into(),
request_id,
);
return error!(self.log, "Unable to obtain root iter";
error!(self.log, "Unable to obtain root iter";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
);
return Err((RPCResponseErrorCode::ServerError, "Database error"));
}
};
@@ -764,11 +762,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let block_roots = match maybe_block_roots {
Ok(block_roots) => block_roots,
Err(e) => {
return error!(self.log, "Error during iteration over blocks";
error!(self.log, "Error during iteration over blocks";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
)
);
return Err((RPCResponseErrorCode::ServerError, "Database error"));
}
};
@@ -776,7 +775,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let block_roots = block_roots.into_iter().flatten();
let mut blobs_sent = 0;
let mut send_response = true;
for root in block_roots {
match self.chain.get_blobs(&root) {
@@ -799,14 +797,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"block_root" => ?root,
"error" => ?e
);
self.send_error_response(
peer_id,
return Err((
RPCResponseErrorCode::ServerError,
"No blobs and failed fetching corresponding block".into(),
request_id,
);
send_response = false;
break;
"No blobs and failed fetching corresponding block",
));
}
}
}
@@ -826,13 +820,53 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"returned" => blobs_sent
);
if send_response {
// send the stream terminator
self.send_network_message(NetworkMessage::SendResponse {
Ok(())
}
/// Helper function to ensure single item protocol always end with either a single chunk or an
/// error
fn terminate_response_single_item<R, F: Fn(R) -> Response<T::EthSpec>>(
&self,
peer_id: PeerId,
request_id: PeerRequestId,
result: Result<R, (RPCResponseErrorCode, &'static str)>,
into_response: F,
) {
match result {
Ok(resp) => {
// Not necessary to explicitly send a termination message if this InboundRequest
// returns <= 1 for InboundRequest::expected_responses
// https://github.com/sigp/lighthouse/blob/3058b96f2560f1da04ada4f9d8ba8e5651794ff6/beacon_node/lighthouse_network/src/rpc/handler.rs#L555-L558
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: into_response(resp),
id: request_id,
});
}
Err((error_code, reason)) => {
self.send_error_response(peer_id, error_code, reason.into(), request_id);
}
}
}
/// Helper function to ensure streamed protocols with multiple responses always end with either
/// a stream termination or an error
fn terminate_response_stream<R, F: FnOnce(Option<R>) -> Response<T::EthSpec>>(
&self,
peer_id: PeerId,
request_id: PeerRequestId,
result: Result<(), (RPCResponseErrorCode, &'static str)>,
into_response: F,
) {
match result {
Ok(_) => self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::BlobsByRange(None),
response: into_response(None),
id: request_id,
});
}),
Err((error_code, reason)) => {
self.send_error_response(peer_id, error_code, reason.into(), request_id);
}
}
}
}