add a unique integer id to Rpc requests (#6444)

* add id to rpc requests

* rename rpc request and response types for more accurate meaning

* remove unrequired build_request function

* remove unirequired Request wrapper types and unify Outbound and Inbound Request

* add RequestId to NetworkMessage::SendResponse

,NetworkMessage::SendErrorResponse to be passed to Rpc::send_response
This commit is contained in:
João Oliveira
2024-10-01 02:36:17 +01:00
committed by GitHub
parent 5d1ff7c6f8
commit 82098e1ef7
20 changed files with 1327 additions and 1046 deletions

View File

@@ -1,11 +1,12 @@
#![allow(clippy::type_complexity)]
#![allow(clippy::cognitive_complexity)]
use super::methods::{GoodbyeReason, RPCCodedResponse, RPCResponseErrorCode};
use super::methods::{GoodbyeReason, RpcErrorResponse, RpcResponse};
use super::outbound::OutboundRequestContainer;
use super::protocol::{InboundOutput, InboundRequest, Protocol, RPCError, RPCProtocol};
use super::{RPCReceived, RPCSend, ReqId};
use crate::rpc::outbound::{OutboundFramed, OutboundRequest};
use super::protocol::{InboundOutput, Protocol, RPCError, RPCProtocol, RequestType};
use super::RequestId;
use super::{RPCReceived, RPCSend, ReqId, Request};
use crate::rpc::outbound::OutboundFramed;
use crate::rpc::protocol::InboundFramed;
use fnv::FnvHashMap;
use futures::prelude::*;
@@ -95,7 +96,7 @@ where
events_out: SmallVec<[HandlerEvent<Id, E>; 4]>,
/// Queue of outbound substreams to open.
dial_queue: SmallVec<[(Id, OutboundRequest<E>); 4]>,
dial_queue: SmallVec<[(Id, RequestType<E>); 4]>,
/// Current number of concurrent outbound substreams being opened.
dial_negotiated: u32,
@@ -159,7 +160,7 @@ struct InboundInfo<E: EthSpec> {
/// State of the substream.
state: InboundState<E>,
/// Responses queued for sending.
pending_items: VecDeque<RPCCodedResponse<E>>,
pending_items: VecDeque<RpcResponse<E>>,
/// Protocol of the original request we received from the peer.
protocol: Protocol,
/// Responses that the peer is still expecting from us.
@@ -205,7 +206,7 @@ pub enum OutboundSubstreamState<E: EthSpec> {
/// The framed negotiated substream.
substream: Box<OutboundFramed<Stream, E>>,
/// Keeps track of the actual request sent.
request: OutboundRequest<E>,
request: RequestType<E>,
},
/// Closing an outbound substream>
Closing(Box<OutboundFramed<Stream, E>>),
@@ -263,7 +264,7 @@ where
// Queue our goodbye message.
if let Some((id, reason)) = goodbye_reason {
self.dial_queue.push((id, OutboundRequest::Goodbye(reason)));
self.dial_queue.push((id, RequestType::Goodbye(reason)));
}
self.state = HandlerState::ShuttingDown(Box::pin(sleep(Duration::from_secs(
@@ -273,7 +274,7 @@ where
}
/// Opens an outbound substream with a request.
fn send_request(&mut self, id: Id, req: OutboundRequest<E>) {
fn send_request(&mut self, id: Id, req: RequestType<E>) {
match self.state {
HandlerState::Active => {
self.dial_queue.push((id, req));
@@ -291,10 +292,10 @@ where
/// Sends a response to a peer's request.
// NOTE: If the substream has closed due to inactivity, or the substream is in the
// wrong state a response will fail silently.
fn send_response(&mut self, inbound_id: SubstreamId, response: RPCCodedResponse<E>) {
fn send_response(&mut self, inbound_id: SubstreamId, response: RpcResponse<E>) {
// check if the stream matching the response still exists
let Some(inbound_info) = self.inbound_substreams.get_mut(&inbound_id) else {
if !matches!(response, RPCCodedResponse::StreamTermination(..)) {
if !matches!(response, RpcResponse::StreamTermination(..)) {
// the stream is closed after sending the expected number of responses
trace!(self.log, "Inbound stream has expired. Response not sent";
"response" => %response, "id" => inbound_id);
@@ -302,7 +303,7 @@ where
return;
};
// If the response we are sending is an error, report back for handling
if let RPCCodedResponse::Error(ref code, ref reason) = response {
if let RpcResponse::Error(ref code, ref reason) = response {
self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
error: RPCError::ErrorResponse(*code, reason.to_string()),
proto: inbound_info.protocol,
@@ -329,7 +330,7 @@ where
type ToBehaviour = HandlerEvent<Id, E>;
type InboundProtocol = RPCProtocol<E>;
type OutboundProtocol = OutboundRequestContainer<E>;
type OutboundOpenInfo = (Id, OutboundRequest<E>); // Keep track of the id and the request
type OutboundOpenInfo = (Id, RequestType<E>); // Keep track of the id and the request
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
@@ -403,8 +404,8 @@ where
if info.pending_items.back().map(|l| l.close_after()) == Some(false) {
// if the last chunk does not close the stream, append an error
info.pending_items.push_back(RPCCodedResponse::Error(
RPCResponseErrorCode::ServerError,
info.pending_items.push_back(RpcResponse::Error(
RpcErrorResponse::ServerError,
"Request timed out".into(),
));
}
@@ -672,13 +673,13 @@ where
let proto = entry.get().proto;
let received = match response {
RPCCodedResponse::StreamTermination(t) => {
RpcResponse::StreamTermination(t) => {
HandlerEvent::Ok(RPCReceived::EndOfStream(id, t))
}
RPCCodedResponse::Success(resp) => {
RpcResponse::Success(resp) => {
HandlerEvent::Ok(RPCReceived::Response(id, resp))
}
RPCCodedResponse::Error(ref code, ref r) => {
RpcResponse::Error(ref code, ref r) => {
HandlerEvent::Err(HandlerErr::Outbound {
id,
proto,
@@ -888,21 +889,23 @@ where
}
// If we received a goodbye, shutdown the connection.
if let InboundRequest::Goodbye(_) = req {
if let RequestType::Goodbye(_) = req {
self.shutdown(None);
}
self.events_out.push(HandlerEvent::Ok(RPCReceived::Request(
self.current_inbound_substream_id,
req,
)));
self.events_out
.push(HandlerEvent::Ok(RPCReceived::Request(Request {
id: RequestId::next(),
substream_id: self.current_inbound_substream_id,
r#type: req,
})));
self.current_inbound_substream_id.0 += 1;
}
fn on_fully_negotiated_outbound(
&mut self,
substream: OutboundFramed<Stream, E>,
(id, request): (Id, OutboundRequest<E>),
(id, request): (Id, RequestType<E>),
) {
self.dial_negotiated -= 1;
// Reset any io-retries counter.
@@ -958,7 +961,7 @@ where
}
fn on_dial_upgrade_error(
&mut self,
request_info: (Id, OutboundRequest<E>),
request_info: (Id, RequestType<E>),
error: StreamUpgradeError<RPCError>,
) {
let (id, req) = request_info;
@@ -1016,15 +1019,15 @@ impl slog::Value for SubstreamId {
/// error that occurred with sending a message is reported also.
async fn send_message_to_inbound_substream<E: EthSpec>(
mut substream: InboundSubstream<E>,
message: RPCCodedResponse<E>,
message: RpcResponse<E>,
last_chunk: bool,
) -> Result<(InboundSubstream<E>, bool), RPCError> {
if matches!(message, RPCCodedResponse::StreamTermination(_)) {
if matches!(message, RpcResponse::StreamTermination(_)) {
substream.close().await.map(|_| (substream, true))
} else {
// chunks that are not stream terminations get sent, and the stream is closed if
// the response is an error
let is_error = matches!(message, RPCCodedResponse::Error(..));
let is_error = matches!(message, RpcResponse::Error(..));
let send_result = substream.send(message).await;