Allow 1 count block request to return 0 blocks (#5554)

* Allow 1 count block request to return 0 blocks

* Address @pawanjay176 review
This commit is contained in:
Lion - dapplion
2024-04-12 23:22:20 +09:00
committed by GitHub
parent 6bac5ce12b
commit 5fdd3b39bb
5 changed files with 63 additions and 72 deletions

View File

@@ -163,7 +163,7 @@ struct InboundInfo<E: EthSpec> {
/// Protocol of the original request we received from the peer.
protocol: Protocol,
/// Responses that the peer is still expecting from us.
remaining_chunks: u64,
max_remaining_chunks: u64,
/// Useful to timing how long each request took to process. Currently only used by
/// BlocksByRange.
request_start_time: Instant,
@@ -180,7 +180,7 @@ struct OutboundInfo<Id, E: EthSpec> {
/// Info over the protocol this substream is handling.
proto: Protocol,
/// Number of chunks to be seen from the peer's response.
remaining_chunks: Option<u64>,
max_remaining_chunks: Option<u64>,
/// `Id` as given by the application that sent the request.
req_id: Id,
}
@@ -471,7 +471,7 @@ where
// Process one more message if one exists.
if let Some(message) = info.pending_items.pop_front() {
// If this is the last chunk, terminate the stream.
let last_chunk = info.remaining_chunks <= 1;
let last_chunk = info.max_remaining_chunks <= 1;
let fut =
send_message_to_inbound_substream(substream, message, last_chunk)
.boxed();
@@ -537,7 +537,8 @@ where
{
// The substream is still active, decrement the remaining
// chunks expected.
info.remaining_chunks = info.remaining_chunks.saturating_sub(1);
info.max_remaining_chunks =
info.max_remaining_chunks.saturating_sub(1);
// If this substream has not ended, we reset the timer.
// Each chunk is allowed RESPONSE_TIMEOUT to be sent.
@@ -552,7 +553,7 @@ where
// Process one more message if one exists.
if let Some(message) = info.pending_items.pop_front() {
// If this is the last chunk, terminate the stream.
let last_chunk = info.remaining_chunks <= 1;
let last_chunk = info.max_remaining_chunks <= 1;
let fut = send_message_to_inbound_substream(
substream, message, last_chunk,
)
@@ -664,15 +665,19 @@ where
request,
} => match substream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(response))) => {
if request.expected_responses() > 1 && !response.close_after() {
if request.expect_exactly_one_response() || response.close_after() {
// either this is a single response request or this response closes the
// stream
entry.get_mut().state = OutboundSubstreamState::Closing(substream);
} else {
let substream_entry = entry.get_mut();
let delay_key = &substream_entry.delay_key;
// chunks left after this one
let remaining_chunks = substream_entry
.remaining_chunks
let max_remaining_chunks = substream_entry
.max_remaining_chunks
.map(|count| count.saturating_sub(1))
.unwrap_or_else(|| 0);
if remaining_chunks == 0 {
if max_remaining_chunks == 0 {
// this is the last expected message, close the stream as all expected chunks have been received
substream_entry.state = OutboundSubstreamState::Closing(substream);
} else {
@@ -682,14 +687,10 @@ where
substream,
request,
};
substream_entry.remaining_chunks = Some(remaining_chunks);
substream_entry.max_remaining_chunks = Some(max_remaining_chunks);
self.outbound_substreams_delay
.reset(delay_key, self.resp_timeout);
}
} else {
// either this is a single response request or this response closes the
// stream
entry.get_mut().state = OutboundSubstreamState::Closing(substream);
}
// Check what type of response we got and report it accordingly
@@ -725,7 +726,16 @@ where
self.outbound_substreams_delay.remove(delay_key);
entry.remove_entry();
// notify the application error
if request.expected_responses() > 1 {
if request.expect_exactly_one_response() {
// return an error, stream should not have closed early.
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::Err(HandlerErr::Outbound {
id: request_id,
proto: request.versioned_protocol().protocol(),
error: RPCError::IncompleteStream,
}),
));
} else {
// return an end of stream result
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::Ok(RPCReceived::EndOfStream(
@@ -734,16 +744,6 @@ where
)),
));
}
// else we return an error, stream should not have closed early.
let outbound_err = HandlerErr::Outbound {
id: request_id,
proto: request.versioned_protocol().protocol(),
error: RPCError::IncompleteStream,
};
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::Err(outbound_err),
));
}
Poll::Pending => {
entry.get_mut().state =
@@ -880,10 +880,10 @@ where
}
let (req, substream) = substream;
let expected_responses = req.expected_responses();
let max_responses = req.max_responses();
// store requests that expect responses
if expected_responses > 0 {
if max_responses > 0 {
if self.inbound_substreams.len() < MAX_INBOUND_SUBSTREAMS {
// Store the stream and tag the output.
let delay_key = self
@@ -894,14 +894,13 @@ where
self.current_inbound_substream_id,
InboundInfo {
state: awaiting_stream,
pending_items: VecDeque::with_capacity(std::cmp::min(
expected_responses,
128,
) as usize),
pending_items: VecDeque::with_capacity(
std::cmp::min(max_responses, 128) as usize
),
delay_key: Some(delay_key),
protocol: req.versioned_protocol().protocol(),
request_start_time: Instant::now(),
remaining_chunks: expected_responses,
max_remaining_chunks: max_responses,
},
);
} else {
@@ -948,8 +947,14 @@ where
}
// add the stream to substreams if we expect a response, otherwise drop the stream.
let expected_responses = request.expected_responses();
if expected_responses > 0 {
let max_responses = request.max_responses();
if max_responses > 0 {
let max_remaining_chunks = if request.expect_exactly_one_response() {
// Currently enforced only for multiple responses
None
} else {
Some(max_responses)
};
// new outbound request. Store the stream and tag the output.
let delay_key = self
.outbound_substreams_delay
@@ -958,12 +963,6 @@ where
substream: Box::new(substream),
request,
};
let expected_responses = if expected_responses > 1 {
// Currently enforced only for multiple responses
Some(expected_responses)
} else {
None
};
if self
.outbound_substreams
.insert(
@@ -972,7 +971,7 @@ where
state: awaiting_stream,
delay_key,
proto,
remaining_chunks: expected_responses,
max_remaining_chunks,
req_id: id,
},
)

View File

@@ -483,27 +483,6 @@ impl<E: EthSpec> RPCCodedResponse<E> {
RPCCodedResponse::Error(code, err)
}
/// Specifies which response allows for multiple chunks for the stream handler.
pub fn multiple_responses(&self) -> bool {
match self {
RPCCodedResponse::Success(resp) => match resp {
RPCResponse::Status(_) => false,
RPCResponse::BlocksByRange(_) => true,
RPCResponse::BlocksByRoot(_) => true,
RPCResponse::BlobsByRange(_) => true,
RPCResponse::BlobsByRoot(_) => true,
RPCResponse::Pong(_) => false,
RPCResponse::MetaData(_) => false,
RPCResponse::LightClientBootstrap(_) => false,
RPCResponse::LightClientOptimisticUpdate(_) => false,
RPCResponse::LightClientFinalityUpdate(_) => false,
},
RPCCodedResponse::Error(_, _) => true,
// Stream terminations are part of responses that have chunks
RPCCodedResponse::StreamTermination(_) => true,
}
}
/// Returns true if this response always terminates the stream.
pub fn close_after(&self) -> bool {
!matches!(self, RPCCodedResponse::Success(_))

View File

@@ -91,8 +91,8 @@ impl<E: EthSpec> OutboundRequest<E> {
}
/* These functions are used in the handler for stream management */
/// Number of responses expected for this request.
pub fn expected_responses(&self) -> u64 {
/// Maximum number of responses expected for this request.
pub fn max_responses(&self) -> u64 {
match self {
OutboundRequest::Status(_) => 1,
OutboundRequest::Goodbye(_) => 0,
@@ -105,6 +105,19 @@ impl<E: EthSpec> OutboundRequest<E> {
}
}
pub fn expect_exactly_one_response(&self) -> bool {
match self {
OutboundRequest::Status(_) => true,
OutboundRequest::Goodbye(_) => false,
OutboundRequest::BlocksByRange(_) => false,
OutboundRequest::BlocksByRoot(_) => false,
OutboundRequest::BlobsByRange(_) => false,
OutboundRequest::BlobsByRoot(_) => false,
OutboundRequest::Ping(_) => true,
OutboundRequest::MetaData(_) => true,
}
}
/// Gives the corresponding `SupportedProtocol` to this request.
pub fn versioned_protocol(&self) -> SupportedProtocol {
match self {

View File

@@ -654,8 +654,8 @@ pub enum InboundRequest<E: EthSpec> {
impl<E: EthSpec> InboundRequest<E> {
/* These functions are used in the handler for stream management */
/// Number of responses expected for this request.
pub fn expected_responses(&self) -> u64 {
/// Maximum number of responses expected for this request.
pub fn max_responses(&self) -> u64 {
match self {
InboundRequest::Status(_) => 1,
InboundRequest::Goodbye(_) => 0,

View File

@@ -228,7 +228,7 @@ impl RPCRateLimiterBuilder {
pub trait RateLimiterItem {
fn protocol(&self) -> Protocol;
fn expected_responses(&self) -> u64;
fn max_responses(&self) -> u64;
}
impl<E: EthSpec> RateLimiterItem for super::InboundRequest<E> {
@@ -236,8 +236,8 @@ impl<E: EthSpec> RateLimiterItem for super::InboundRequest<E> {
self.versioned_protocol().protocol()
}
fn expected_responses(&self) -> u64 {
self.expected_responses()
fn max_responses(&self) -> u64 {
self.max_responses()
}
}
@@ -246,8 +246,8 @@ impl<E: EthSpec> RateLimiterItem for super::OutboundRequest<E> {
self.versioned_protocol().protocol()
}
fn expected_responses(&self) -> u64 {
self.expected_responses()
fn max_responses(&self) -> u64 {
self.max_responses()
}
}
impl RPCRateLimiter {
@@ -299,7 +299,7 @@ impl RPCRateLimiter {
request: &Item,
) -> Result<(), RateLimitedErr> {
let time_since_start = self.init_time.elapsed();
let tokens = request.expected_responses().max(1);
let tokens = request.max_responses().max(1);
let check =
|limiter: &mut Limiter<PeerId>| limiter.allows(time_since_start, peer_id, tokens);