mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-03 00:31:50 +00:00
Remove manual poll of the libp2p Swarm (#6550)
* remove manual poll for libp2p Swarm, use tokio::select! instead
This commit is contained in:
@@ -37,10 +37,7 @@ use slog::{crit, debug, info, o, trace, warn};
|
||||
use std::num::{NonZeroU8, NonZeroUsize};
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use std::{
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use types::{
|
||||
consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId,
|
||||
};
|
||||
@@ -1794,148 +1791,148 @@ impl<E: EthSpec> Network<E> {
|
||||
|
||||
/* Networking polling */
|
||||
|
||||
/// Poll the p2p networking stack.
|
||||
///
|
||||
/// This will poll the swarm and do maintenance routines.
|
||||
pub fn poll_network(&mut self, cx: &mut Context) -> Poll<NetworkEvent<E>> {
|
||||
while let Poll::Ready(Some(swarm_event)) = self.swarm.poll_next_unpin(cx) {
|
||||
let maybe_event = match swarm_event {
|
||||
SwarmEvent::Behaviour(behaviour_event) => match behaviour_event {
|
||||
// Handle sub-behaviour events.
|
||||
BehaviourEvent::Gossipsub(ge) => self.inject_gs_event(ge),
|
||||
BehaviourEvent::Eth2Rpc(re) => self.inject_rpc_event(re),
|
||||
// Inform the peer manager about discovered peers.
|
||||
//
|
||||
// The peer manager will subsequently decide which peers need to be dialed and then dial
|
||||
// them.
|
||||
BehaviourEvent::Discovery(DiscoveredPeers { peers }) => {
|
||||
self.peer_manager_mut().peers_discovered(peers);
|
||||
None
|
||||
pub async fn next_event(&mut self) -> NetworkEvent<E> {
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Poll the libp2p `Swarm`.
|
||||
// This will poll the swarm and do maintenance routines.
|
||||
Some(event) = self.swarm.next() => {
|
||||
if let Some(event) = self.parse_swarm_event(event) {
|
||||
return event;
|
||||
}
|
||||
BehaviourEvent::Identify(ie) => self.inject_identify_event(ie),
|
||||
BehaviourEvent::PeerManager(pe) => self.inject_pm_event(pe),
|
||||
BehaviourEvent::Upnp(e) => {
|
||||
self.inject_upnp_event(e);
|
||||
None
|
||||
}
|
||||
#[allow(unreachable_patterns)]
|
||||
BehaviourEvent::ConnectionLimits(le) => void::unreachable(le),
|
||||
},
|
||||
SwarmEvent::ConnectionEstablished { .. } => None,
|
||||
SwarmEvent::ConnectionClosed { .. } => None,
|
||||
SwarmEvent::IncomingConnection {
|
||||
local_addr,
|
||||
send_back_addr,
|
||||
connection_id: _,
|
||||
} => {
|
||||
trace!(self.log, "Incoming connection"; "our_addr" => %local_addr, "from" => %send_back_addr);
|
||||
None
|
||||
|
||||
// perform gossipsub score updates when necessary
|
||||
_ = self.update_gossipsub_scores.tick() => {
|
||||
let this = self.swarm.behaviour_mut();
|
||||
this.peer_manager.update_gossipsub_scores(&this.gossipsub);
|
||||
}
|
||||
SwarmEvent::IncomingConnectionError {
|
||||
local_addr,
|
||||
send_back_addr,
|
||||
error,
|
||||
connection_id: _,
|
||||
} => {
|
||||
let error_repr = match error {
|
||||
libp2p::swarm::ListenError::Aborted => {
|
||||
"Incoming connection aborted".to_string()
|
||||
// poll the gossipsub cache to clear expired messages
|
||||
Some(result) = self.gossip_cache.next() => {
|
||||
match result {
|
||||
Err(e) => warn!(self.log, "Gossip cache error"; "error" => e),
|
||||
Ok(expired_topic) => {
|
||||
if let Some(v) = metrics::get_int_counter(
|
||||
&metrics::GOSSIP_EXPIRED_LATE_PUBLISH_PER_TOPIC_KIND,
|
||||
&[expired_topic.kind().as_ref()],
|
||||
) {
|
||||
v.inc()
|
||||
};
|
||||
}
|
||||
libp2p::swarm::ListenError::WrongPeerId { obtained, endpoint } => {
|
||||
format!("Wrong peer id, obtained {obtained}, endpoint {endpoint:?}")
|
||||
}
|
||||
libp2p::swarm::ListenError::LocalPeerId { endpoint } => {
|
||||
format!("Dialing local peer id {endpoint:?}")
|
||||
}
|
||||
libp2p::swarm::ListenError::Denied { cause } => {
|
||||
format!("Connection was denied with cause: {cause:?}")
|
||||
}
|
||||
libp2p::swarm::ListenError::Transport(t) => match t {
|
||||
libp2p::TransportError::MultiaddrNotSupported(m) => {
|
||||
format!("Transport error: Multiaddr not supported: {m}")
|
||||
}
|
||||
libp2p::TransportError::Other(e) => {
|
||||
format!("Transport error: other: {e}")
|
||||
}
|
||||
},
|
||||
};
|
||||
debug!(self.log, "Failed incoming connection"; "our_addr" => %local_addr, "from" => %send_back_addr, "error" => error_repr);
|
||||
None
|
||||
}
|
||||
SwarmEvent::OutgoingConnectionError {
|
||||
peer_id: _,
|
||||
error: _,
|
||||
connection_id: _,
|
||||
} => {
|
||||
// The Behaviour event is more general than the swarm event here. It includes
|
||||
// connection failures. So we use that log for now, in the peer manager
|
||||
// behaviour implementation.
|
||||
None
|
||||
}
|
||||
SwarmEvent::NewListenAddr { address, .. } => {
|
||||
Some(NetworkEvent::NewListenAddr(address))
|
||||
}
|
||||
SwarmEvent::ExpiredListenAddr { address, .. } => {
|
||||
debug!(self.log, "Listen address expired"; "address" => %address);
|
||||
None
|
||||
}
|
||||
SwarmEvent::ListenerClosed {
|
||||
addresses, reason, ..
|
||||
} => {
|
||||
match reason {
|
||||
Ok(_) => {
|
||||
debug!(self.log, "Listener gracefully closed"; "addresses" => ?addresses)
|
||||
}
|
||||
Err(reason) => {
|
||||
crit!(self.log, "Listener abruptly closed"; "addresses" => ?addresses, "reason" => ?reason)
|
||||
}
|
||||
};
|
||||
if Swarm::listeners(&self.swarm).count() == 0 {
|
||||
Some(NetworkEvent::ZeroListeners)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
SwarmEvent::ListenerError { error, .. } => {
|
||||
debug!(self.log, "Listener closed connection attempt"; "reason" => ?error);
|
||||
None
|
||||
}
|
||||
_ => {
|
||||
// NOTE: SwarmEvent is a non exhaustive enum so updates should be based on
|
||||
// release notes more than compiler feedback
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(ev) = maybe_event {
|
||||
return Poll::Ready(ev);
|
||||
}
|
||||
}
|
||||
|
||||
// perform gossipsub score updates when necessary
|
||||
while self.update_gossipsub_scores.poll_tick(cx).is_ready() {
|
||||
let this = self.swarm.behaviour_mut();
|
||||
this.peer_manager.update_gossipsub_scores(&this.gossipsub);
|
||||
}
|
||||
|
||||
// poll the gossipsub cache to clear expired messages
|
||||
while let Poll::Ready(Some(result)) = self.gossip_cache.poll_next_unpin(cx) {
|
||||
match result {
|
||||
Err(e) => warn!(self.log, "Gossip cache error"; "error" => e),
|
||||
Ok(expired_topic) => {
|
||||
if let Some(v) = metrics::get_int_counter(
|
||||
&metrics::GOSSIP_EXPIRED_LATE_PUBLISH_PER_TOPIC_KIND,
|
||||
&[expired_topic.kind().as_ref()],
|
||||
) {
|
||||
v.inc()
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
pub async fn next_event(&mut self) -> NetworkEvent<E> {
|
||||
futures::future::poll_fn(|cx| self.poll_network(cx)).await
|
||||
fn parse_swarm_event(
|
||||
&mut self,
|
||||
event: SwarmEvent<BehaviourEvent<E>>,
|
||||
) -> Option<NetworkEvent<E>> {
|
||||
match event {
|
||||
SwarmEvent::Behaviour(behaviour_event) => match behaviour_event {
|
||||
// Handle sub-behaviour events.
|
||||
BehaviourEvent::Gossipsub(ge) => self.inject_gs_event(ge),
|
||||
BehaviourEvent::Eth2Rpc(re) => self.inject_rpc_event(re),
|
||||
// Inform the peer manager about discovered peers.
|
||||
//
|
||||
// The peer manager will subsequently decide which peers need to be dialed and then dial
|
||||
// them.
|
||||
BehaviourEvent::Discovery(DiscoveredPeers { peers }) => {
|
||||
self.peer_manager_mut().peers_discovered(peers);
|
||||
None
|
||||
}
|
||||
BehaviourEvent::Identify(ie) => self.inject_identify_event(ie),
|
||||
BehaviourEvent::PeerManager(pe) => self.inject_pm_event(pe),
|
||||
BehaviourEvent::Upnp(e) => {
|
||||
self.inject_upnp_event(e);
|
||||
None
|
||||
}
|
||||
#[allow(unreachable_patterns)]
|
||||
BehaviourEvent::ConnectionLimits(le) => void::unreachable(le),
|
||||
},
|
||||
SwarmEvent::ConnectionEstablished { .. } => None,
|
||||
SwarmEvent::ConnectionClosed { .. } => None,
|
||||
SwarmEvent::IncomingConnection {
|
||||
local_addr,
|
||||
send_back_addr,
|
||||
connection_id: _,
|
||||
} => {
|
||||
trace!(self.log, "Incoming connection"; "our_addr" => %local_addr, "from" => %send_back_addr);
|
||||
None
|
||||
}
|
||||
SwarmEvent::IncomingConnectionError {
|
||||
local_addr,
|
||||
send_back_addr,
|
||||
error,
|
||||
connection_id: _,
|
||||
} => {
|
||||
let error_repr = match error {
|
||||
libp2p::swarm::ListenError::Aborted => {
|
||||
"Incoming connection aborted".to_string()
|
||||
}
|
||||
libp2p::swarm::ListenError::WrongPeerId { obtained, endpoint } => {
|
||||
format!("Wrong peer id, obtained {obtained}, endpoint {endpoint:?}")
|
||||
}
|
||||
libp2p::swarm::ListenError::LocalPeerId { endpoint } => {
|
||||
format!("Dialing local peer id {endpoint:?}")
|
||||
}
|
||||
libp2p::swarm::ListenError::Denied { cause } => {
|
||||
format!("Connection was denied with cause: {cause:?}")
|
||||
}
|
||||
libp2p::swarm::ListenError::Transport(t) => match t {
|
||||
libp2p::TransportError::MultiaddrNotSupported(m) => {
|
||||
format!("Transport error: Multiaddr not supported: {m}")
|
||||
}
|
||||
libp2p::TransportError::Other(e) => {
|
||||
format!("Transport error: other: {e}")
|
||||
}
|
||||
},
|
||||
};
|
||||
debug!(self.log, "Failed incoming connection"; "our_addr" => %local_addr, "from" => %send_back_addr, "error" => error_repr);
|
||||
None
|
||||
}
|
||||
SwarmEvent::OutgoingConnectionError {
|
||||
peer_id: _,
|
||||
error: _,
|
||||
connection_id: _,
|
||||
} => {
|
||||
// The Behaviour event is more general than the swarm event here. It includes
|
||||
// connection failures. So we use that log for now, in the peer manager
|
||||
// behaviour implementation.
|
||||
None
|
||||
}
|
||||
SwarmEvent::NewListenAddr { address, .. } => Some(NetworkEvent::NewListenAddr(address)),
|
||||
SwarmEvent::ExpiredListenAddr { address, .. } => {
|
||||
debug!(self.log, "Listen address expired"; "address" => %address);
|
||||
None
|
||||
}
|
||||
SwarmEvent::ListenerClosed {
|
||||
addresses, reason, ..
|
||||
} => {
|
||||
match reason {
|
||||
Ok(_) => {
|
||||
debug!(self.log, "Listener gracefully closed"; "addresses" => ?addresses)
|
||||
}
|
||||
Err(reason) => {
|
||||
crit!(self.log, "Listener abruptly closed"; "addresses" => ?addresses, "reason" => ?reason)
|
||||
}
|
||||
};
|
||||
if Swarm::listeners(&self.swarm).count() == 0 {
|
||||
Some(NetworkEvent::ZeroListeners)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
SwarmEvent::ListenerError { error, .. } => {
|
||||
debug!(self.log, "Listener closed connection attempt"; "reason" => ?error);
|
||||
None
|
||||
}
|
||||
_ => {
|
||||
// NOTE: SwarmEvent is a non exhaustive enum so updates should be based on
|
||||
// release notes more than compiler feedback
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user