Merge remote-tracking branch 'origin/unstable' into capella

Fixing the conflicts involved patching up some of the `block_hash` verification,
the rest will be done as part of https://github.com/sigp/lighthouse/issues/3870
This commit is contained in:
Michael Sproul
2023-01-12 16:20:44 +11:00
59 changed files with 2994 additions and 1748 deletions

View File

@@ -22,12 +22,13 @@ use enr::{ATTESTATION_BITFIELD_ENR_KEY, ETH2_ENR_KEY, SYNC_COMMITTEE_BITFIELD_EN
use futures::prelude::*;
use futures::stream::FuturesUnordered;
use libp2p::multiaddr::Protocol;
use libp2p::swarm::behaviour::{DialFailure, FromSwarm};
use libp2p::swarm::AddressScore;
pub use libp2p::{
core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId},
swarm::{
handler::ConnectionHandler, DialError, NetworkBehaviour,
NetworkBehaviourAction as NBAction, NotifyHandler, PollParameters, SubstreamProtocol,
dummy::ConnectionHandler, DialError, NetworkBehaviour, NetworkBehaviourAction as NBAction,
NotifyHandler, PollParameters, SubstreamProtocol,
},
};
use lru::LruCache;
@@ -927,11 +928,11 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
// Discovery is not a real NetworkBehaviour...
type ConnectionHandler = libp2p::swarm::handler::DummyConnectionHandler;
type ConnectionHandler = ConnectionHandler;
type OutEvent = DiscoveredPeers;
fn new_handler(&mut self) -> Self::ConnectionHandler {
libp2p::swarm::handler::DummyConnectionHandler::default()
ConnectionHandler
}
// Handles the libp2p request to obtain multiaddrs for peer_id's in order to dial them.
@@ -947,40 +948,6 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
}
}
fn inject_event(
&mut self,
_: PeerId,
_: ConnectionId,
_: <Self::ConnectionHandler as ConnectionHandler>::OutEvent,
) {
}
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
_handler: Self::ConnectionHandler,
error: &DialError,
) {
if let Some(peer_id) = peer_id {
match error {
DialError::Banned
| DialError::LocalPeerId
| DialError::InvalidPeerId(_)
| DialError::ConnectionIo(_)
| DialError::NoAddresses
| DialError::Transport(_)
| DialError::WrongPeerId { .. } => {
// set peer as disconnected in discovery DHT
debug!(self.log, "Marking peer disconnected in DHT"; "peer_id" => %peer_id);
self.disconnect_peer(&peer_id);
}
DialError::ConnectionLimit(_)
| DialError::DialPeerConditionFalse(_)
| DialError::Aborted => {}
}
}
}
// Main execution loop to drive the behaviour
fn poll(
&mut self,
@@ -1067,6 +1034,50 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
}
Poll::Pending
}
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
match event {
FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
self.on_dial_failure(peer_id, error)
}
FromSwarm::ConnectionEstablished(_)
| FromSwarm::ConnectionClosed(_)
| FromSwarm::AddressChange(_)
| FromSwarm::ListenFailure(_)
| FromSwarm::NewListener(_)
| FromSwarm::NewListenAddr(_)
| FromSwarm::ExpiredListenAddr(_)
| FromSwarm::ListenerError(_)
| FromSwarm::ListenerClosed(_)
| FromSwarm::NewExternalAddr(_)
| FromSwarm::ExpiredExternalAddr(_) => {
// Ignore events not relevant to discovery
}
}
}
}
impl<TSpec: EthSpec> Discovery<TSpec> {
fn on_dial_failure(&mut self, peer_id: Option<PeerId>, error: &DialError) {
if let Some(peer_id) = peer_id {
match error {
DialError::Banned
| DialError::LocalPeerId
| DialError::InvalidPeerId(_)
| DialError::ConnectionIo(_)
| DialError::NoAddresses
| DialError::Transport(_)
| DialError::WrongPeerId { .. } => {
// set peer as disconnected in discovery DHT
debug!(self.log, "Marking peer disconnected in DHT"; "peer_id" => %peer_id);
self.disconnect_peer(&peer_id);
}
DialError::ConnectionLimit(_)
| DialError::DialPeerConditionFalse(_)
| DialError::Aborted => {}
}
}
}
}
#[cfg(test)]

View File

@@ -7,7 +7,7 @@ use crate::{NetworkGlobals, PeerId};
use crate::{Subnet, SubnetDiscovery};
use delay_map::HashSetDelay;
use discv5::Enr;
use libp2p::identify::IdentifyInfo;
use libp2p::identify::Info as IdentifyInfo;
use peerdb::{client::ClientKind, BanOperation, BanResult, ScoreUpdateResult};
use rand::seq::SliceRandom;
use slog::{debug, error, trace, warn};

View File

@@ -1,14 +1,12 @@
use std::task::{Context, Poll};
use futures::StreamExt;
use libp2p::core::connection::ConnectionId;
use libp2p::core::ConnectedPoint;
use libp2p::swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
use libp2p::swarm::handler::DummyConnectionHandler;
use libp2p::swarm::{
ConnectionHandler, DialError, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
};
use libp2p::{Multiaddr, PeerId};
use libp2p::swarm::dummy::ConnectionHandler;
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p::PeerId;
use slog::{debug, error};
use types::EthSpec;
@@ -20,23 +18,14 @@ use super::peerdb::BanResult;
use super::{ConnectingType, PeerManager, PeerManagerEvent, ReportSource};
impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
type ConnectionHandler = DummyConnectionHandler;
type ConnectionHandler = ConnectionHandler;
type OutEvent = PeerManagerEvent;
/* Required trait members */
fn new_handler(&mut self) -> Self::ConnectionHandler {
DummyConnectionHandler::default()
}
fn inject_event(
&mut self,
_: PeerId,
_: ConnectionId,
_: <DummyConnectionHandler as ConnectionHandler>::OutEvent,
) {
unreachable!("Dummy handler does not emit events")
ConnectionHandler
}
fn poll(
@@ -114,19 +103,46 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
Poll::Pending
}
/* Overwritten trait members */
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
endpoint,
other_established,
..
}) => self.on_connection_established(peer_id, endpoint, other_established),
FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
remaining_established,
..
}) => self.on_connection_closed(peer_id, remaining_established),
FromSwarm::DialFailure(DialFailure { peer_id, .. }) => self.on_dial_failure(peer_id),
FromSwarm::AddressChange(_)
| FromSwarm::ListenFailure(_)
| FromSwarm::NewListener(_)
| FromSwarm::NewListenAddr(_)
| FromSwarm::ExpiredListenAddr(_)
| FromSwarm::ListenerError(_)
| FromSwarm::ListenerClosed(_)
| FromSwarm::NewExternalAddr(_)
| FromSwarm::ExpiredExternalAddr(_) => {
// The rest of the events we ignore since they are handled in their associated
// `SwarmEvent`
}
}
}
}
fn inject_connection_established(
impl<TSpec: EthSpec> PeerManager<TSpec> {
fn on_connection_established(
&mut self,
peer_id: &PeerId,
_connection_id: &ConnectionId,
peer_id: PeerId,
endpoint: &ConnectedPoint,
_failed_addresses: Option<&Vec<Multiaddr>>,
other_established: usize,
) {
debug!(self.log, "Connection established"; "peer_id" => %peer_id, "connection" => ?endpoint.to_endpoint());
if other_established == 0 {
self.events.push(PeerManagerEvent::MetaData(*peer_id));
self.events.push(PeerManagerEvent::MetaData(peer_id));
}
// Check NAT if metrics are enabled
@@ -135,20 +151,20 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
}
// Check to make sure the peer is not supposed to be banned
match self.ban_status(peer_id) {
match self.ban_status(&peer_id) {
// TODO: directly emit the ban event?
BanResult::BadScore => {
// This is a faulty state
error!(self.log, "Connected to a banned peer. Re-banning"; "peer_id" => %peer_id);
// Reban the peer
self.goodbye_peer(peer_id, GoodbyeReason::Banned, ReportSource::PeerManager);
self.goodbye_peer(&peer_id, GoodbyeReason::Banned, ReportSource::PeerManager);
return;
}
BanResult::BannedIp(ip_addr) => {
// A good peer has connected to us via a banned IP address. We ban the peer and
// prevent future connections.
debug!(self.log, "Peer connected via banned IP. Banning"; "peer_id" => %peer_id, "banned_ip" => %ip_addr);
self.goodbye_peer(peer_id, GoodbyeReason::BannedIP, ReportSource::PeerManager);
self.goodbye_peer(&peer_id, GoodbyeReason::BannedIP, ReportSource::PeerManager);
return;
}
BanResult::NotBanned => {}
@@ -162,11 +178,11 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
.network_globals
.peers
.read()
.peer_info(peer_id)
.peer_info(&peer_id)
.map_or(true, |peer| !peer.has_future_duty())
{
// Gracefully disconnect the peer.
self.disconnect_peer(*peer_id, GoodbyeReason::TooManyPeers);
self.disconnect_peer(peer_id, GoodbyeReason::TooManyPeers);
return;
}
@@ -174,14 +190,14 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
// does not need to know about these peers.
match endpoint {
ConnectedPoint::Listener { send_back_addr, .. } => {
self.inject_connect_ingoing(peer_id, send_back_addr.clone(), None);
self.inject_connect_ingoing(&peer_id, send_back_addr.clone(), None);
self.events
.push(PeerManagerEvent::PeerConnectedIncoming(*peer_id));
.push(PeerManagerEvent::PeerConnectedIncoming(peer_id));
}
ConnectedPoint::Dialer { address, .. } => {
self.inject_connect_outgoing(peer_id, address.clone(), None);
self.inject_connect_outgoing(&peer_id, address.clone(), None);
self.events
.push(PeerManagerEvent::PeerConnectedOutgoing(*peer_id));
.push(PeerManagerEvent::PeerConnectedOutgoing(peer_id));
}
}
@@ -189,14 +205,8 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
self.update_connected_peer_metrics();
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
}
fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
_: &ConnectionId,
_: &ConnectedPoint,
_: DummyConnectionHandler,
remaining_established: usize,
) {
fn on_connection_closed(&mut self, peer_id: PeerId, remaining_established: usize) {
if remaining_established > 0 {
return;
}
@@ -206,62 +216,33 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
.network_globals
.peers
.read()
.is_connected_or_disconnecting(peer_id)
.is_connected_or_disconnecting(&peer_id)
{
// We are disconnecting the peer or the peer has already been connected.
// Both these cases, the peer has been previously registered by the peer manager and
// potentially the application layer.
// Inform the application.
self.events
.push(PeerManagerEvent::PeerDisconnected(*peer_id));
.push(PeerManagerEvent::PeerDisconnected(peer_id));
debug!(self.log, "Peer disconnected"; "peer_id" => %peer_id);
}
// NOTE: It may be the case that a rejected node, due to too many peers is disconnected
// here and the peer manager has no knowledge of its connection. We insert it here for
// reference so that peer manager can track this peer.
self.inject_disconnect(peer_id);
self.inject_disconnect(&peer_id);
// Update the prometheus metrics
self.update_connected_peer_metrics();
metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT);
}
fn inject_address_change(
&mut self,
_peer_id: &PeerId,
_connection_id: &ConnectionId,
old: &ConnectedPoint,
new: &ConnectedPoint,
) {
debug_assert!(
matches!(
(old, new),
(
// inbound remains inbound
ConnectedPoint::Listener { .. },
ConnectedPoint::Listener { .. }
) | (
// outbound remains outbound
ConnectedPoint::Dialer { .. },
ConnectedPoint::Dialer { .. }
)
),
"A peer has changed between inbound and outbound"
)
}
/// A dial attempt has failed.
///
/// NOTE: It can be the case that we are dialing a peer and during the dialing process the peer
/// connects and the dial attempt later fails. To handle this, we only update the peer_db if
/// the peer is not already connected.
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
_handler: DummyConnectionHandler,
_error: &DialError,
) {
fn on_dial_failure(&mut self, peer_id: Option<PeerId>) {
if let Some(peer_id) = peer_id {
if !self.network_globals.peers.read().is_connected(&peer_id) {
self.inject_disconnect(&peer_id);

View File

@@ -2,7 +2,7 @@
//!
//! Currently using identify to fingerprint.
use libp2p::identify::IdentifyInfo;
use libp2p::identify::Info as IdentifyInfo;
use serde::Serialize;
use strum::{AsRefStr, EnumIter, IntoStaticStr};

View File

@@ -327,61 +327,6 @@ where
self.listen_protocol.clone()
}
fn inject_fully_negotiated_inbound(
&mut self,
substream: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
_info: Self::InboundOpenInfo,
) {
// only accept new peer requests when active
if !matches!(self.state, HandlerState::Active) {
return;
}
let (req, substream) = substream;
let expected_responses = req.expected_responses();
// store requests that expect responses
if expected_responses > 0 {
if self.inbound_substreams.len() < MAX_INBOUND_SUBSTREAMS {
// Store the stream and tag the output.
let delay_key = self.inbound_substreams_delay.insert(
self.current_inbound_substream_id,
Duration::from_secs(RESPONSE_TIMEOUT),
);
let awaiting_stream = InboundState::Idle(substream);
self.inbound_substreams.insert(
self.current_inbound_substream_id,
InboundInfo {
state: awaiting_stream,
pending_items: VecDeque::with_capacity(expected_responses as usize),
delay_key: Some(delay_key),
protocol: req.protocol(),
request_start_time: Instant::now(),
remaining_chunks: expected_responses,
},
);
} else {
self.events_out.push(Err(HandlerErr::Inbound {
id: self.current_inbound_substream_id,
proto: req.protocol(),
error: RPCError::HandlerRejected,
}));
return self.shutdown(None);
}
}
// If we received a goodbye, shutdown the connection.
if let InboundRequest::Goodbye(_) = req {
self.shutdown(None);
}
self.events_out.push(Ok(RPCReceived::Request(
self.current_inbound_substream_id,
req,
)));
self.current_inbound_substream_id.0 += 1;
}
fn inject_fully_negotiated_outbound(
&mut self,
out: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
@@ -438,6 +383,64 @@ where
}
}
fn inject_fully_negotiated_inbound(
&mut self,
substream: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
_info: Self::InboundOpenInfo,
) {
// only accept new peer requests when active
if !matches!(self.state, HandlerState::Active) {
return;
}
let (req, substream) = substream;
let expected_responses = req.expected_responses();
// store requests that expect responses
if expected_responses > 0 {
if self.inbound_substreams.len() < MAX_INBOUND_SUBSTREAMS {
// Store the stream and tag the output.
let delay_key = self.inbound_substreams_delay.insert(
self.current_inbound_substream_id,
Duration::from_secs(RESPONSE_TIMEOUT),
);
let awaiting_stream = InboundState::Idle(substream);
self.inbound_substreams.insert(
self.current_inbound_substream_id,
InboundInfo {
state: awaiting_stream,
pending_items: VecDeque::with_capacity(std::cmp::min(
expected_responses,
128,
) as usize),
delay_key: Some(delay_key),
protocol: req.protocol(),
request_start_time: Instant::now(),
remaining_chunks: expected_responses,
},
);
} else {
self.events_out.push(Err(HandlerErr::Inbound {
id: self.current_inbound_substream_id,
proto: req.protocol(),
error: RPCError::HandlerRejected,
}));
return self.shutdown(None);
}
}
// If we received a goodbye, shutdown the connection.
if let InboundRequest::Goodbye(_) = req {
self.shutdown(None);
}
self.events_out.push(Ok(RPCReceived::Request(
self.current_inbound_substream_id,
req,
)));
self.current_inbound_substream_id.0 += 1;
}
fn inject_event(&mut self, rpc_event: Self::InEvent) {
match rpc_event {
RPCSend::Request(id, req) => self.send_request(id, req),

View File

@@ -7,8 +7,8 @@ use libp2p::gossipsub::subscription_filter::{
MaxCountSubscriptionFilter, WhitelistSubscriptionFilter,
};
use libp2p::gossipsub::Gossipsub as BaseGossipsub;
use libp2p::identify::Identify;
use libp2p::NetworkBehaviour;
use libp2p::identify::Behaviour as Identify;
use libp2p::swarm::NetworkBehaviour;
use types::EthSpec;
use super::api_types::RequestId;

View File

@@ -29,7 +29,7 @@ use libp2p::gossipsub::subscription_filter::MaxCountSubscriptionFilter;
use libp2p::gossipsub::{
GossipsubEvent, IdentTopic as Topic, MessageAcceptance, MessageAuthenticity, MessageId,
};
use libp2p::identify::{Identify, IdentifyConfig, IdentifyEvent};
use libp2p::identify::{Behaviour as Identify, Config as IdentifyConfig, Event as IdentifyEvent};
use libp2p::multiaddr::{Multiaddr, Protocol as MProtocol};
use libp2p::swarm::{ConnectionLimits, Swarm, SwarmBuilder, SwarmEvent};
use libp2p::PeerId;
@@ -318,7 +318,7 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
// use the executor for libp2p
struct Executor(task_executor::TaskExecutor);
impl libp2p::core::Executor for Executor {
impl libp2p::swarm::Executor for Executor {
fn exec(&self, f: Pin<Box<dyn futures::Future<Output = ()> + Send>>) {
self.0.spawn(f, "libp2p");
}
@@ -343,12 +343,16 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
.with_max_established_per_peer(Some(MAX_CONNECTIONS_PER_PEER));
(
SwarmBuilder::new(transport, behaviour, local_peer_id)
.notify_handler_buffer_size(std::num::NonZeroUsize::new(7).expect("Not zero"))
.connection_event_buffer_size(64)
.connection_limits(limits)
.executor(Box::new(Executor(executor)))
.build(),
SwarmBuilder::with_executor(
transport,
behaviour,
local_peer_id,
Executor(executor),
)
.notify_handler_buffer_size(std::num::NonZeroUsize::new(7).expect("Not zero"))
.connection_event_buffer_size(64)
.connection_limits(limits)
.build(),
bandwidth,
)
};

View File

@@ -44,8 +44,7 @@ type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>;
pub fn build_transport(
local_private_key: Keypair,
) -> std::io::Result<(BoxedTransport, Arc<BandwidthSinks>)> {
let tcp =
libp2p::tcp::TokioTcpTransport::new(libp2p::tcp::GenTcpConfig::default().nodelay(true));
let tcp = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true));
let transport = libp2p::dns::TokioDnsConfig::system(tcp)?;
#[cfg(feature = "libp2p-websocket")]
let transport = {