Errors for all RPC Requests (#5867)

* Return and error if peer has disconnected

* Report errors for rate limited requests

* Code improvement

* Bump rust version to 1.78

* Downgrade to 1.77

* Update beacon_node/lighthouse_network/src/service/mod.rs

Co-authored-by: João Oliveira <hello@jxs.pt>

* fix fmt

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into rpc-peer-disconnect-error

* update lockfile
This commit is contained in:
Age Manning
2024-06-07 10:12:45 +10:00
committed by GitHub
parent 7a7fc82cbd
commit 7b48b0b4a7
13 changed files with 452 additions and 412 deletions

608
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -168,7 +168,7 @@ sysinfo = "0.26"
tempfile = "3"
tokio = { version = "1", features = ["rt-multi-thread", "sync", "signal"] }
tokio-stream = { version = "0.1", features = ["sync"] }
tokio-util = { version = "0.6", features = ["codec", "compat", "time"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "time"] }
tracing = "0.1.40"
tracing-appender = "0.2"
tracing-core = "0.1"

View File

@@ -1,4 +1,4 @@
FROM rust:1.75.0-bullseye AS builder
FROM rust:1.78.0-bullseye AS builder
RUN apt-get update && apt-get -y upgrade && apt-get install -y cmake libclang-dev
COPY . lighthouse
ARG FEATURES

View File

@@ -28,7 +28,6 @@ use std::time::Duration;
use strum::AsRefStr;
use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::time::error::Error as TimeError;
use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey};
use types::{EthSpec, Hash256, Slot};
@@ -196,8 +195,6 @@ enum InboundEvent {
ReadyLightClientUpdate(QueuedLightClientUpdateId),
/// A backfill batch that was queued is ready for processing.
ReadyBackfillSync(QueuedBackfillBatch),
/// A `DelayQueue` returned an error.
DelayQueueError(TimeError, &'static str),
/// A message sent to the `ReprocessQueue`
Msg(ReprocessQueueMessage),
}
@@ -279,54 +276,42 @@ impl<S: SlotClock> Stream for ReprocessQueue<S> {
// The sequential nature of blockchains means it is generally better to try and import all
// existing blocks before new ones.
match self.gossip_block_delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(queued_block))) => {
Poll::Ready(Some(queued_block)) => {
return Poll::Ready(Some(InboundEvent::ReadyGossipBlock(
queued_block.into_inner(),
)));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "gossip_block_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}
match self.rpc_block_delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(queued_block))) => {
Poll::Ready(Some(queued_block)) => {
return Poll::Ready(Some(InboundEvent::ReadyRpcBlock(queued_block.into_inner())));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "rpc_block_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}
match self.attestations_delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(attestation_id))) => {
Poll::Ready(Some(attestation_id)) => {
return Poll::Ready(Some(InboundEvent::ReadyAttestation(
attestation_id.into_inner(),
)));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "attestations_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}
match self.lc_updates_delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(lc_id))) => {
Poll::Ready(Some(lc_id)) => {
return Poll::Ready(Some(InboundEvent::ReadyLightClientUpdate(
lc_id.into_inner(),
)));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "lc_updates_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
@@ -786,14 +771,6 @@ impl<S: SlotClock> ReprocessQueue<S> {
);
}
}
InboundEvent::DelayQueueError(e, queue_name) => {
crit!(
log,
"Failed to poll queue";
"queue" => queue_name,
"e" => ?e
)
}
InboundEvent::ReadyAttestation(queued_id) => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS,

View File

@@ -7,7 +7,7 @@ edition = { workspace = true }
[dependencies]
discv5 = { workspace = true }
gossipsub = { workspace = true }
unsigned-varint = { version = "0.6", features = ["codec"] }
unsigned-varint = { version = "0.8", features = ["codec"] }
ssz_types = { workspace = true }
types = { workspace = true }
serde = { workspace = true }

View File

@@ -15,7 +15,7 @@ use libp2p::swarm::handler::{
FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
};
use libp2p::swarm::Stream;
use slog::{crit, debug, trace, warn};
use slog::{crit, debug, trace};
use smallvec::SmallVec;
use std::{
collections::{hash_map::Entry, VecDeque},
@@ -414,9 +414,8 @@ where
}
// purge expired inbound substreams and send an error
loop {
match self.inbound_substreams_delay.poll_expired(cx) {
Poll::Ready(Some(Ok(inbound_id))) => {
while let Poll::Ready(Some(inbound_id)) = self.inbound_substreams_delay.poll_expired(cx) {
// handle a stream timeout for various states
if let Some(info) = self.inbound_substreams.get_mut(inbound_id.get_ref()) {
// the delay has been removed
@@ -436,23 +435,9 @@ where
}
}
}
Poll::Ready(Some(Err(e))) => {
warn!(self.log, "Inbound substream poll failed"; "error" => ?e);
// drops the peer if we cannot read the delay queue
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::Close(RPCError::InternalError(
"Could not poll inbound stream timer",
)),
));
}
Poll::Pending | Poll::Ready(None) => break,
}
}
// purge expired outbound substreams
loop {
match self.outbound_substreams_delay.poll_expired(cx) {
Poll::Ready(Some(Ok(outbound_id))) => {
while let Poll::Ready(Some(outbound_id)) = self.outbound_substreams_delay.poll_expired(cx) {
if let Some(OutboundInfo { proto, req_id, .. }) =
self.outbound_substreams.remove(outbound_id.get_ref())
{
@@ -462,24 +447,13 @@ where
error: RPCError::StreamTimeout,
};
// notify the user
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::Err(outbound_err),
));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(HandlerEvent::Err(
outbound_err,
)));
} else {
crit!(self.log, "timed out substream not in the books"; "stream_id" => outbound_id.get_ref());
}
}
Poll::Ready(Some(Err(e))) => {
warn!(self.log, "Outbound substream poll failed"; "error" => ?e);
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::Close(RPCError::InternalError(
"Could not poll outbound stream timer",
)),
));
}
Poll::Pending | Poll::Ready(None) => break,
}
}
// when deactivated, close all streams
let deactivated = matches!(self.state, HandlerState::Deactivated);

View File

@@ -10,7 +10,7 @@ use libp2p::swarm::{
handler::ConnectionHandler, CloseConnection, ConnectionId, NetworkBehaviour, NotifyHandler,
ToSwarm,
};
use libp2p::swarm::{FromSwarm, SubstreamProtocol, THandlerInEvent};
use libp2p::swarm::{ConnectionClosed, FromSwarm, SubstreamProtocol, THandlerInEvent};
use libp2p::PeerId;
use rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr};
use slog::{crit, debug, o};
@@ -283,9 +283,40 @@ where
Ok(handler)
}
fn on_swarm_event(&mut self, _event: FromSwarm) {
fn on_swarm_event(&mut self, event: FromSwarm) {
// NOTE: FromSwarm is a non exhaustive enum so updates should be based on release notes more
// than compiler feedback
// The self rate limiter holds on to requests and attempts to process them within our rate
// limits. If a peer disconnects whilst we are self-rate limiting, we want to terminate any
// pending requests and return an error response to the application.
if let FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
remaining_established,
connection_id,
..
}) = event
{
// If there are still connections remaining, do nothing.
if remaining_established > 0 {
return;
}
// Get a list of pending requests from the self rate limiter
if let Some(limiter) = self.self_limiter.as_mut() {
for (id, proto) in limiter.peer_disconnected(peer_id) {
let error_msg = ToSwarm::GenerateEvent(RPCMessage {
peer_id,
conn_id: connection_id,
event: HandlerEvent::Err(HandlerErr::Outbound {
id,
proto,
error: RPCError::Disconnected,
}),
});
self.events.push(error_msg);
}
}
}
}
fn on_connection_handler_event(

View File

@@ -158,13 +158,39 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
entry.remove();
}
}
// NOTE: There can be entries that have been removed due to peer disconnections, we simply
// ignore these messages here.
}
/// Informs the limiter that a peer has disconnected. This removes any pending requests and
/// returns their IDs.
pub fn peer_disconnected(&mut self, peer_id: PeerId) -> Vec<(Id, Protocol)> {
// It's not ideal to iterate this map, but the key is (PeerId, Protocol) and this map
// should never really be large. So we iterate for simplicity
let mut failed_requests = Vec::new();
self.delayed_requests
.retain(|(map_peer_id, protocol), queue| {
if map_peer_id == &peer_id {
// NOTE: Currently cannot remove entries from the DelayQueue, we will just let
// them expire and ignore them.
for message in queue {
failed_requests.push((message.request_id, *protocol))
}
// Remove the entry
false
} else {
// Keep the entry
true
}
});
failed_requests
}
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<BehaviourAction<Id, E>> {
// First check the requests that were self rate limited, since those might add events to
// the queue. Also do this this before rate limiter prunning to avoid removing and
// immediately adding rate limiting keys.
if let Poll::Ready(Some(Ok(expired))) = self.next_peer_request.poll_expired(cx) {
if let Poll::Ready(Some(expired)) = self.next_peer_request.poll_expired(cx) {
let (peer_id, protocol) = expired.into_inner();
self.next_peer_request_ready(peer_id, protocol);
}

View File

@@ -240,7 +240,7 @@ impl futures::stream::Stream for GossipCache {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.expirations.poll_expired(cx) {
Poll::Ready(Some(Ok(expired))) => {
Poll::Ready(Some(expired)) => {
let expected_key = expired.key();
let (topic, data) = expired.into_inner();
match self.topic_msgs.get_mut(&topic) {
@@ -259,7 +259,6 @@ impl futures::stream::Stream for GossipCache {
}
Poll::Ready(Some(Ok(topic)))
}
Poll::Ready(Some(Err(x))) => Poll::Ready(Some(Err(x.to_string()))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}

View File

@@ -917,12 +917,23 @@ impl<AppReqId: ReqId, E: EthSpec> Network<AppReqId, E> {
/* Eth2 RPC behaviour functions */
/// Send a request to a peer over RPC.
pub fn send_request(&mut self, peer_id: PeerId, request_id: AppReqId, request: Request) {
pub fn send_request(
&mut self,
peer_id: PeerId,
request_id: AppReqId,
request: Request,
) -> Result<(), (AppReqId, RPCError)> {
// Check if the peer is connected before sending an RPC request
if !self.swarm.is_connected(&peer_id) {
return Err((request_id, RPCError::Disconnected));
}
self.eth2_rpc_mut().send_request(
peer_id,
RequestId::Application(request_id),
request.into(),
)
);
Ok(())
}
/// Send a successful response to a peer over RPC.

View File

@@ -98,7 +98,9 @@ fn test_tcp_status_rpc() {
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.send_request(peer_id, 10, rpc_request.clone());
sender
.send_request(peer_id, 10, rpc_request.clone())
.unwrap();
}
NetworkEvent::ResponseReceived {
peer_id: _,
@@ -202,7 +204,9 @@ fn test_tcp_blocks_by_range_chunked_rpc() {
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.send_request(peer_id, request_id, rpc_request.clone());
sender
.send_request(peer_id, request_id, rpc_request.clone())
.unwrap();
}
NetworkEvent::ResponseReceived {
peer_id: _,
@@ -327,7 +331,9 @@ fn test_blobs_by_range_chunked_rpc() {
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.send_request(peer_id, request_id, rpc_request.clone());
sender
.send_request(peer_id, request_id, rpc_request.clone())
.unwrap();
}
NetworkEvent::ResponseReceived {
peer_id: _,
@@ -435,7 +441,9 @@ fn test_tcp_blocks_by_range_over_limit() {
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.send_request(peer_id, request_id, rpc_request.clone());
sender
.send_request(peer_id, request_id, rpc_request.clone())
.unwrap();
}
// The request will fail because the sender will refuse to send anything > MAX_RPC_SIZE
NetworkEvent::RPCFailed { id, .. } => {
@@ -528,7 +536,9 @@ fn test_tcp_blocks_by_range_chunked_rpc_terminates_correctly() {
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.send_request(peer_id, request_id, rpc_request.clone());
sender
.send_request(peer_id, request_id, rpc_request.clone())
.unwrap();
}
NetworkEvent::ResponseReceived {
peer_id: _,
@@ -657,7 +667,9 @@ fn test_tcp_blocks_by_range_single_empty_rpc() {
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.send_request(peer_id, 10, rpc_request.clone());
sender
.send_request(peer_id, 10, rpc_request.clone())
.unwrap();
}
NetworkEvent::ResponseReceived {
peer_id: _,
@@ -780,7 +792,9 @@ fn test_tcp_blocks_by_root_chunked_rpc() {
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.send_request(peer_id, 6, rpc_request.clone());
sender
.send_request(peer_id, 6, rpc_request.clone())
.unwrap();
}
NetworkEvent::ResponseReceived {
peer_id: _,
@@ -911,7 +925,9 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() {
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.send_request(peer_id, 10, rpc_request.clone());
sender
.send_request(peer_id, 10, rpc_request.clone())
.unwrap();
}
NetworkEvent::ResponseReceived {
peer_id: _,
@@ -1031,7 +1047,9 @@ fn test_disconnect_triggers_rpc_error() {
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.send_request(peer_id, 42, rpc_request.clone());
sender
.send_request(peer_id, 42, rpc_request.clone())
.unwrap();
}
NetworkEvent::RPCFailed { error, id: 42, .. } => match error {
RPCError::Disconnected => return,

View File

@@ -613,7 +613,15 @@ impl<T: BeaconChainTypes> NetworkService<T> {
request,
request_id,
} => {
self.libp2p.send_request(peer_id, request_id, request);
if let Err((request_id, error)) =
self.libp2p.send_request(peer_id, request_id, request)
{
self.send_to_router(RouterMessage::RPCFailed {
peer_id,
request_id,
error,
});
}
}
NetworkMessage::SendResponse {
peer_id,

View File

@@ -4,7 +4,7 @@ version = "5.1.3"
authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = { workspace = true }
autotests = false
rust-version = "1.75.0"
rust-version = "1.77.0"
[features]
default = ["slasher-lmdb"]