mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-11 18:04:18 +00:00
Merge branch 'unstable' of https://github.com/sigp/lighthouse into electra_attestation_changes
This commit is contained in:
@@ -15,7 +15,7 @@ use libp2p::swarm::handler::{
|
||||
FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
|
||||
};
|
||||
use libp2p::swarm::Stream;
|
||||
use slog::{crit, debug, trace, warn};
|
||||
use slog::{crit, debug, trace};
|
||||
use smallvec::SmallVec;
|
||||
use std::{
|
||||
collections::{hash_map::Entry, VecDeque},
|
||||
@@ -414,70 +414,44 @@ where
|
||||
}
|
||||
|
||||
// purge expired inbound substreams and send an error
|
||||
loop {
|
||||
match self.inbound_substreams_delay.poll_expired(cx) {
|
||||
Poll::Ready(Some(Ok(inbound_id))) => {
|
||||
// handle a stream timeout for various states
|
||||
if let Some(info) = self.inbound_substreams.get_mut(inbound_id.get_ref()) {
|
||||
// the delay has been removed
|
||||
info.delay_key = None;
|
||||
self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
|
||||
error: RPCError::StreamTimeout,
|
||||
proto: info.protocol,
|
||||
id: *inbound_id.get_ref(),
|
||||
}));
|
||||
|
||||
if info.pending_items.back().map(|l| l.close_after()) == Some(false) {
|
||||
// if the last chunk does not close the stream, append an error
|
||||
info.pending_items.push_back(RPCCodedResponse::Error(
|
||||
RPCResponseErrorCode::ServerError,
|
||||
"Request timed out".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
Poll::Ready(Some(Err(e))) => {
|
||||
warn!(self.log, "Inbound substream poll failed"; "error" => ?e);
|
||||
// drops the peer if we cannot read the delay queue
|
||||
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
|
||||
HandlerEvent::Close(RPCError::InternalError(
|
||||
"Could not poll inbound stream timer",
|
||||
)),
|
||||
while let Poll::Ready(Some(inbound_id)) = self.inbound_substreams_delay.poll_expired(cx) {
|
||||
// handle a stream timeout for various states
|
||||
if let Some(info) = self.inbound_substreams.get_mut(inbound_id.get_ref()) {
|
||||
// the delay has been removed
|
||||
info.delay_key = None;
|
||||
self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
|
||||
error: RPCError::StreamTimeout,
|
||||
proto: info.protocol,
|
||||
id: *inbound_id.get_ref(),
|
||||
}));
|
||||
|
||||
if info.pending_items.back().map(|l| l.close_after()) == Some(false) {
|
||||
// if the last chunk does not close the stream, append an error
|
||||
info.pending_items.push_back(RPCCodedResponse::Error(
|
||||
RPCResponseErrorCode::ServerError,
|
||||
"Request timed out".into(),
|
||||
));
|
||||
}
|
||||
Poll::Pending | Poll::Ready(None) => break,
|
||||
}
|
||||
}
|
||||
|
||||
// purge expired outbound substreams
|
||||
loop {
|
||||
match self.outbound_substreams_delay.poll_expired(cx) {
|
||||
Poll::Ready(Some(Ok(outbound_id))) => {
|
||||
if let Some(OutboundInfo { proto, req_id, .. }) =
|
||||
self.outbound_substreams.remove(outbound_id.get_ref())
|
||||
{
|
||||
let outbound_err = HandlerErr::Outbound {
|
||||
id: req_id,
|
||||
proto,
|
||||
error: RPCError::StreamTimeout,
|
||||
};
|
||||
// notify the user
|
||||
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
|
||||
HandlerEvent::Err(outbound_err),
|
||||
));
|
||||
} else {
|
||||
crit!(self.log, "timed out substream not in the books"; "stream_id" => outbound_id.get_ref());
|
||||
}
|
||||
}
|
||||
Poll::Ready(Some(Err(e))) => {
|
||||
warn!(self.log, "Outbound substream poll failed"; "error" => ?e);
|
||||
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
|
||||
HandlerEvent::Close(RPCError::InternalError(
|
||||
"Could not poll outbound stream timer",
|
||||
)),
|
||||
));
|
||||
}
|
||||
Poll::Pending | Poll::Ready(None) => break,
|
||||
while let Poll::Ready(Some(outbound_id)) = self.outbound_substreams_delay.poll_expired(cx) {
|
||||
if let Some(OutboundInfo { proto, req_id, .. }) =
|
||||
self.outbound_substreams.remove(outbound_id.get_ref())
|
||||
{
|
||||
let outbound_err = HandlerErr::Outbound {
|
||||
id: req_id,
|
||||
proto,
|
||||
error: RPCError::StreamTimeout,
|
||||
};
|
||||
// notify the user
|
||||
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(HandlerEvent::Err(
|
||||
outbound_err,
|
||||
)));
|
||||
} else {
|
||||
crit!(self.log, "timed out substream not in the books"; "stream_id" => outbound_id.get_ref());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ use libp2p::swarm::{
|
||||
handler::ConnectionHandler, CloseConnection, ConnectionId, NetworkBehaviour, NotifyHandler,
|
||||
ToSwarm,
|
||||
};
|
||||
use libp2p::swarm::{FromSwarm, SubstreamProtocol, THandlerInEvent};
|
||||
use libp2p::swarm::{ConnectionClosed, FromSwarm, SubstreamProtocol, THandlerInEvent};
|
||||
use libp2p::PeerId;
|
||||
use rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr};
|
||||
use slog::{crit, debug, o};
|
||||
@@ -283,9 +283,40 @@ where
|
||||
Ok(handler)
|
||||
}
|
||||
|
||||
fn on_swarm_event(&mut self, _event: FromSwarm) {
|
||||
fn on_swarm_event(&mut self, event: FromSwarm) {
|
||||
// NOTE: FromSwarm is a non exhaustive enum so updates should be based on release notes more
|
||||
// than compiler feedback
|
||||
// The self rate limiter holds on to requests and attempts to process them within our rate
|
||||
// limits. If a peer disconnects whilst we are self-rate limiting, we want to terminate any
|
||||
// pending requests and return an error response to the application.
|
||||
|
||||
if let FromSwarm::ConnectionClosed(ConnectionClosed {
|
||||
peer_id,
|
||||
remaining_established,
|
||||
connection_id,
|
||||
..
|
||||
}) = event
|
||||
{
|
||||
// If there are still connections remaining, do nothing.
|
||||
if remaining_established > 0 {
|
||||
return;
|
||||
}
|
||||
// Get a list of pending requests from the self rate limiter
|
||||
if let Some(limiter) = self.self_limiter.as_mut() {
|
||||
for (id, proto) in limiter.peer_disconnected(peer_id) {
|
||||
let error_msg = ToSwarm::GenerateEvent(RPCMessage {
|
||||
peer_id,
|
||||
conn_id: connection_id,
|
||||
event: HandlerEvent::Err(HandlerErr::Outbound {
|
||||
id,
|
||||
proto,
|
||||
error: RPCError::Disconnected,
|
||||
}),
|
||||
});
|
||||
self.events.push(error_msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn on_connection_handler_event(
|
||||
|
||||
@@ -158,13 +158,39 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
|
||||
entry.remove();
|
||||
}
|
||||
}
|
||||
// NOTE: There can be entries that have been removed due to peer disconnections, we simply
|
||||
// ignore these messages here.
|
||||
}
|
||||
|
||||
/// Informs the limiter that a peer has disconnected. This removes any pending requests and
|
||||
/// returns their IDs.
|
||||
pub fn peer_disconnected(&mut self, peer_id: PeerId) -> Vec<(Id, Protocol)> {
|
||||
// It's not ideal to iterate this map, but the key is (PeerId, Protocol) and this map
|
||||
// should never really be large. So we iterate for simplicity
|
||||
let mut failed_requests = Vec::new();
|
||||
self.delayed_requests
|
||||
.retain(|(map_peer_id, protocol), queue| {
|
||||
if map_peer_id == &peer_id {
|
||||
// NOTE: Currently cannot remove entries from the DelayQueue, we will just let
|
||||
// them expire and ignore them.
|
||||
for message in queue {
|
||||
failed_requests.push((message.request_id, *protocol))
|
||||
}
|
||||
// Remove the entry
|
||||
false
|
||||
} else {
|
||||
// Keep the entry
|
||||
true
|
||||
}
|
||||
});
|
||||
failed_requests
|
||||
}
|
||||
|
||||
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<BehaviourAction<Id, E>> {
|
||||
// First check the requests that were self rate limited, since those might add events to
|
||||
// the queue. Also do this this before rate limiter prunning to avoid removing and
|
||||
// immediately adding rate limiting keys.
|
||||
if let Poll::Ready(Some(Ok(expired))) = self.next_peer_request.poll_expired(cx) {
|
||||
if let Poll::Ready(Some(expired)) = self.next_peer_request.poll_expired(cx) {
|
||||
let (peer_id, protocol) = expired.into_inner();
|
||||
self.next_peer_request_ready(peer_id, protocol);
|
||||
}
|
||||
|
||||
@@ -240,7 +240,7 @@ impl futures::stream::Stream for GossipCache {
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
match self.expirations.poll_expired(cx) {
|
||||
Poll::Ready(Some(Ok(expired))) => {
|
||||
Poll::Ready(Some(expired)) => {
|
||||
let expected_key = expired.key();
|
||||
let (topic, data) = expired.into_inner();
|
||||
match self.topic_msgs.get_mut(&topic) {
|
||||
@@ -259,7 +259,6 @@ impl futures::stream::Stream for GossipCache {
|
||||
}
|
||||
Poll::Ready(Some(Ok(topic)))
|
||||
}
|
||||
Poll::Ready(Some(Err(x))) => Poll::Ready(Some(Err(x.to_string()))),
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
|
||||
@@ -917,12 +917,23 @@ impl<AppReqId: ReqId, E: EthSpec> Network<AppReqId, E> {
|
||||
/* Eth2 RPC behaviour functions */
|
||||
|
||||
/// Send a request to a peer over RPC.
|
||||
pub fn send_request(&mut self, peer_id: PeerId, request_id: AppReqId, request: Request) {
|
||||
pub fn send_request(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: AppReqId,
|
||||
request: Request,
|
||||
) -> Result<(), (AppReqId, RPCError)> {
|
||||
// Check if the peer is connected before sending an RPC request
|
||||
if !self.swarm.is_connected(&peer_id) {
|
||||
return Err((request_id, RPCError::Disconnected));
|
||||
}
|
||||
|
||||
self.eth2_rpc_mut().send_request(
|
||||
peer_id,
|
||||
RequestId::Application(request_id),
|
||||
request.into(),
|
||||
)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send a successful response to a peer over RPC.
|
||||
@@ -1378,26 +1389,17 @@ impl<AppReqId: ReqId, E: EthSpec> Network<AppReqId, E> {
|
||||
) -> Option<NetworkEvent<AppReqId, E>> {
|
||||
let peer_id = event.peer_id;
|
||||
|
||||
if !self.peer_manager().is_connected(&peer_id) {
|
||||
// Sync expects a RPCError::Disconnected to drop associated lookups with this peer.
|
||||
// Silencing this event breaks the API contract with RPC where every request ends with
|
||||
// - A stream termination event, or
|
||||
// - An RPCError event
|
||||
return if let HandlerEvent::Err(HandlerErr::Outbound {
|
||||
id: RequestId::Application(id),
|
||||
error,
|
||||
..
|
||||
}) = event.event
|
||||
{
|
||||
Some(NetworkEvent::RPCFailed { peer_id, id, error })
|
||||
} else {
|
||||
debug!(
|
||||
self.log,
|
||||
"Ignoring rpc message of disconnecting peer";
|
||||
event
|
||||
);
|
||||
None
|
||||
};
|
||||
// Do not permit Inbound events from peers that are being disconnected, or RPC requests.
|
||||
if !self.peer_manager().is_connected(&peer_id)
|
||||
&& (matches!(event.event, HandlerEvent::Err(HandlerErr::Inbound { .. }))
|
||||
|| matches!(event.event, HandlerEvent::Ok(RPCReceived::Request(..))))
|
||||
{
|
||||
debug!(
|
||||
self.log,
|
||||
"Ignoring rpc message of disconnecting peer";
|
||||
event
|
||||
);
|
||||
return None;
|
||||
}
|
||||
|
||||
let handler_id = event.conn_id;
|
||||
|
||||
Reference in New Issue
Block a user