mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-06 10:11:44 +00:00
Correct issue with network message passing (#1439)
## Issue Addressed Sync was breaking occasionally. The root cause appears to be identify crashing as events we being sent to the protocol after nodes were banned. Have not been able to reproduce sync issues since this update. ## Proposed Changes Only send messages to sub-behaviour protocols if the peer manager thinks the peer is connected. All other messages are dropped.
This commit is contained in:
@@ -49,10 +49,8 @@ pub struct Behaviour<TSpec: EthSpec> {
|
||||
peer_manager: PeerManager<TSpec>,
|
||||
/// The output events generated by this behaviour to be consumed in the swarm poll.
|
||||
events: VecDeque<BehaviourEvent<TSpec>>,
|
||||
/// Events generated in the global behaviour to be sent to the behaviour handler.
|
||||
handler_events: VecDeque<NBAction<BehaviourHandlerIn<TSpec>, BehaviourEvent<TSpec>>>,
|
||||
/// Queue of peers to disconnect.
|
||||
peers_to_dc: VecDeque<PeerId>,
|
||||
/// Queue of peers to disconnect and an optional reason for the disconnection.
|
||||
peers_to_dc: VecDeque<(PeerId, Option<GoodbyeReason>)>,
|
||||
/// The current meta data of the node, so respond to pings and get metadata
|
||||
meta_data: MetaData<TSpec>,
|
||||
/// A collections of variables accessible outside the network service.
|
||||
@@ -110,7 +108,6 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
identify,
|
||||
peer_manager: PeerManager::new(local_key, net_conf, network_globals.clone(), log)?,
|
||||
events: VecDeque::new(),
|
||||
handler_events: VecDeque::new(),
|
||||
peers_to_dc: VecDeque::new(),
|
||||
meta_data,
|
||||
network_globals,
|
||||
@@ -491,7 +488,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
"reason" => reason.to_string(),
|
||||
"client" => self.network_globals.client(&peer_id).to_string(),
|
||||
);
|
||||
self.peers_to_dc.push_back(peer_id);
|
||||
self.peers_to_dc.push_back((peer_id, None));
|
||||
// NOTE: We currently do not inform the application that we are
|
||||
// disconnecting here.
|
||||
// The actual disconnection event will be relayed to the application. Ideally
|
||||
@@ -552,17 +549,14 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
&mut self,
|
||||
cx: &mut Context,
|
||||
) -> Poll<NBAction<BehaviourHandlerIn<TSpec>, BehaviourEvent<TSpec>>> {
|
||||
// if there are any handler_events process them
|
||||
if let Some(event) = self.handler_events.pop_front() {
|
||||
return Poll::Ready(event);
|
||||
}
|
||||
|
||||
// handle pending disconnections to perform
|
||||
if let Some(peer_id) = self.peers_to_dc.pop_front() {
|
||||
if let Some((peer_id, reason)) = self.peers_to_dc.pop_front() {
|
||||
return Poll::Ready(NBAction::NotifyHandler {
|
||||
peer_id,
|
||||
handler: NotifyHandler::All,
|
||||
event: BehaviourHandlerIn::Shutdown(None),
|
||||
event: BehaviourHandlerIn::Shutdown(
|
||||
reason.map(|reason| (RequestId::Behaviour, RPCRequest::Goodbye(reason))),
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
@@ -594,10 +588,8 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
self.send_meta_data_request(peer_id);
|
||||
}
|
||||
PeerManagerEvent::DisconnectPeer(peer_id, reason) => {
|
||||
debug!(self.log, "PeerManager requested to disconnect a peer";
|
||||
debug!(self.log, "PeerManager disconnecting peer";
|
||||
"peer_id" => peer_id.to_string(), "reason" => reason.to_string());
|
||||
// queue for disabling
|
||||
self.peers_to_dc.push_back(peer_id.clone());
|
||||
// send one goodbye
|
||||
return Poll::Ready(NBAction::NotifyHandler {
|
||||
peer_id,
|
||||
@@ -734,17 +726,9 @@ impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
|
||||
None
|
||||
};
|
||||
|
||||
if let Some(reason) = goodbye_reason {
|
||||
self.peers_to_dc.push_back(peer_id.clone());
|
||||
// send a goodbye on all possible handlers for this peer
|
||||
self.handler_events.push_back(NBAction::NotifyHandler {
|
||||
peer_id: peer_id.clone(),
|
||||
handler: NotifyHandler::All,
|
||||
event: BehaviourHandlerIn::Shutdown(Some((
|
||||
RequestId::Behaviour,
|
||||
RPCRequest::Goodbye(reason),
|
||||
))),
|
||||
});
|
||||
if goodbye_reason.is_some() {
|
||||
self.peers_to_dc
|
||||
.push_back((peer_id.clone(), goodbye_reason));
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -838,17 +822,9 @@ impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
|
||||
conn_id: ConnectionId,
|
||||
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
|
||||
) {
|
||||
// All events from banned peers are rejected
|
||||
// The same holds if we reached the peer limit and the connected peer has no future duty.
|
||||
if self.peer_manager.is_banned(&peer_id)
|
||||
|| (self.peer_manager.peer_limit_reached()
|
||||
&& self
|
||||
.network_globals
|
||||
.peers
|
||||
.read()
|
||||
.peer_info(&peer_id)
|
||||
.map_or(true, |i| !i.has_future_duty()))
|
||||
{
|
||||
// If the peer is not supposed to be connected (undergoing active disconnection,
|
||||
// don't process any of its messages.
|
||||
if !self.network_globals.peers.read().is_connected(&peer_id) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ use libp2p::swarm::{
|
||||
};
|
||||
use libp2p::{Multiaddr, PeerId};
|
||||
use rate_limiter::{RPCRateLimiter as RateLimiter, RPCRateLimiterBuilder, RateLimitedErr};
|
||||
use slog::{crit, debug, o, warn};
|
||||
use slog::{crit, debug, o};
|
||||
use std::marker::PhantomData;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
@@ -106,7 +106,7 @@ impl<TSpec: EthSpec> RPC<TSpec> {
|
||||
let limiter = RPCRateLimiterBuilder::new()
|
||||
.n_every(Protocol::MetaData, 2, Duration::from_secs(5))
|
||||
.one_every(Protocol::Ping, Duration::from_secs(5))
|
||||
.n_every(Protocol::Status, 3, Duration::from_secs(15))
|
||||
.n_every(Protocol::Status, 5, Duration::from_secs(15))
|
||||
.one_every(Protocol::Goodbye, Duration::from_secs(10))
|
||||
.n_every(
|
||||
Protocol::BlocksByRange,
|
||||
@@ -235,7 +235,7 @@ where
|
||||
"protocol" => format!("{}", req.protocol()));
|
||||
}
|
||||
Err(RateLimitedErr::TooSoon(wait_time)) => {
|
||||
warn!(self.log, "Request exceeds the rate limit";
|
||||
debug!(self.log, "Request exceeds the rate limit";
|
||||
"request" => req.to_string(), "peer_id" => peer_id.to_string(), "wait_time_ms" => wait_time.as_millis());
|
||||
// send an error code to the peer.
|
||||
// the handler upon receiving the error code will send it back to the behaviour
|
||||
|
||||
Reference in New Issue
Block a user