mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-11 18:04:18 +00:00
Network protocol upgrades (#2345)
This provides a number of upgrades to gossipsub and discovery. The updates are extensive and this needs thorough testing.
This commit is contained in:
@@ -56,7 +56,7 @@ impl<TSpec: EthSpec> DelegatingHandler<TSpec> {
|
||||
|
||||
/// Wrapper around the `ProtocolsHandler::InEvent` types of the handlers.
|
||||
/// Simply delegated to the corresponding behaviour's handler.
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug)]
|
||||
pub enum DelegateIn<TSpec: EthSpec> {
|
||||
Gossipsub(<GossipHandler as ProtocolsHandler>::InEvent),
|
||||
RPC(<RPCHandler<TSpec> as ProtocolsHandler>::InEvent),
|
||||
@@ -141,8 +141,8 @@ impl<TSpec: EthSpec> ProtocolsHandler for DelegatingHandler<TSpec> {
|
||||
.max(identify_proto.timeout());
|
||||
|
||||
let select = SelectUpgrade::new(
|
||||
gossip_proto.into_upgrade().1,
|
||||
SelectUpgrade::new(rpc_proto.into_upgrade().1, identify_proto.into_upgrade().1),
|
||||
gossip_proto.into_upgrade().0,
|
||||
SelectUpgrade::new(rpc_proto.into_upgrade().0, identify_proto.into_upgrade().0),
|
||||
);
|
||||
|
||||
SubstreamProtocol::new(select, ()).with_timeout(timeout)
|
||||
@@ -202,7 +202,7 @@ impl<TSpec: EthSpec> ProtocolsHandler for DelegatingHandler<TSpec> {
|
||||
match event {
|
||||
DelegateIn::Gossipsub(ev) => self.gossip_handler.inject_event(ev),
|
||||
DelegateIn::RPC(ev) => self.rpc_handler.inject_event(ev),
|
||||
DelegateIn::Identify(()) => self.identify_handler.inject_event(()),
|
||||
DelegateIn::Identify(ev) => self.identify_handler.inject_event(ev),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -34,7 +34,6 @@ impl<TSpec: EthSpec> BehaviourHandler<TSpec> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum BehaviourHandlerIn<TSpec: EthSpec> {
|
||||
Delegate(DelegateIn<TSpec>),
|
||||
/// Start the shutdown process.
|
||||
|
||||
@@ -24,7 +24,7 @@ use libp2p::{
|
||||
Gossipsub as BaseGossipsub, GossipsubEvent, IdentTopic as Topic, MessageAcceptance,
|
||||
MessageAuthenticity, MessageId, PeerScoreThresholds,
|
||||
},
|
||||
identify::{Identify, IdentifyEvent},
|
||||
identify::{Identify, IdentifyConfig, IdentifyEvent},
|
||||
swarm::{
|
||||
AddressScore, NetworkBehaviour, NetworkBehaviourAction as NBAction, NotifyHandler,
|
||||
PollParameters, ProtocolsHandler,
|
||||
@@ -151,18 +151,14 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
) -> error::Result<Self> {
|
||||
let behaviour_log = log.new(o!());
|
||||
|
||||
let identify = if net_conf.private {
|
||||
Identify::new(
|
||||
"".into(),
|
||||
let identify_config = if net_conf.private {
|
||||
IdentifyConfig::new(
|
||||
"".into(),
|
||||
local_key.public(), // Still send legitimate public key
|
||||
)
|
||||
} else {
|
||||
Identify::new(
|
||||
"lighthouse/libp2p".into(),
|
||||
lighthouse_version::version_with_platform(),
|
||||
local_key.public(),
|
||||
)
|
||||
IdentifyConfig::new("eth2/1.0.0".into(), local_key.public())
|
||||
.with_agent_version(lighthouse_version::version_with_platform())
|
||||
};
|
||||
|
||||
let enr_fork_id = network_globals
|
||||
@@ -221,7 +217,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
Ok(Behaviour {
|
||||
eth2_rpc: RPC::new(log.clone()),
|
||||
gossipsub,
|
||||
identify,
|
||||
identify: Identify::new(identify_config),
|
||||
peer_manager: PeerManager::new(local_key, net_conf, network_globals.clone(), log)
|
||||
.await?,
|
||||
events: VecDeque::new(),
|
||||
@@ -902,11 +898,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
|
||||
fn on_identify_event(&mut self, event: IdentifyEvent) {
|
||||
match event {
|
||||
IdentifyEvent::Received {
|
||||
peer_id,
|
||||
mut info,
|
||||
observed_addr,
|
||||
} => {
|
||||
IdentifyEvent::Received { peer_id, mut info } => {
|
||||
if info.listen_addrs.len() > MAX_IDENTIFY_ADDRESSES {
|
||||
debug!(
|
||||
self.log,
|
||||
@@ -921,12 +913,13 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
"protocol_version" => info.protocol_version,
|
||||
"agent_version" => info.agent_version,
|
||||
"listening_ addresses" => ?info.listen_addrs,
|
||||
"observed_address" => ?observed_addr,
|
||||
"observed_address" => ?info.observed_addr,
|
||||
"protocols" => ?info.protocols
|
||||
);
|
||||
}
|
||||
IdentifyEvent::Sent { .. } => {}
|
||||
IdentifyEvent::Error { .. } => {}
|
||||
IdentifyEvent::Pushed { .. } => {}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1171,12 +1164,16 @@ impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
|
||||
delegate_to_behaviours!(self, inject_dial_failure, peer_id);
|
||||
}
|
||||
|
||||
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
|
||||
delegate_to_behaviours!(self, inject_new_listen_addr, addr);
|
||||
fn inject_new_listener(&mut self, id: ListenerId) {
|
||||
delegate_to_behaviours!(self, inject_new_listener, id);
|
||||
}
|
||||
|
||||
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
|
||||
delegate_to_behaviours!(self, inject_expired_listen_addr, addr);
|
||||
fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
|
||||
delegate_to_behaviours!(self, inject_new_listen_addr, id, addr);
|
||||
}
|
||||
|
||||
fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
|
||||
delegate_to_behaviours!(self, inject_expired_listen_addr, id, addr);
|
||||
}
|
||||
|
||||
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
|
||||
|
||||
@@ -163,6 +163,7 @@ impl Default for Config {
|
||||
.query_parallelism(5)
|
||||
.disable_report_discovered_peers()
|
||||
.ip_limit() // limits /24 IP's in buckets.
|
||||
.incoming_bucket_limit(8) // half the bucket size
|
||||
.ping_interval(Duration::from_secs(300))
|
||||
.build();
|
||||
|
||||
|
||||
@@ -221,7 +221,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
|
||||
let mut subscribed_topics: Vec<GossipKind> = vec![];
|
||||
|
||||
for topic_kind in &config.topics {
|
||||
if swarm.subscribe_kind(topic_kind.clone()) {
|
||||
if swarm.behaviour_mut().subscribe_kind(topic_kind.clone()) {
|
||||
subscribed_topics.push(topic_kind.clone());
|
||||
} else {
|
||||
warn!(log, "Could not subscribe to topic"; "topic" => %topic_kind);
|
||||
@@ -244,7 +244,9 @@ impl<TSpec: EthSpec> Service<TSpec> {
|
||||
|
||||
/// Sends a request to a peer, with a given Id.
|
||||
pub fn send_request(&mut self, peer_id: PeerId, request_id: RequestId, request: Request) {
|
||||
self.swarm.send_request(peer_id, request_id, request);
|
||||
self.swarm
|
||||
.behaviour_mut()
|
||||
.send_request(peer_id, request_id, request);
|
||||
}
|
||||
|
||||
/// Informs the peer that their request failed.
|
||||
@@ -255,22 +257,30 @@ impl<TSpec: EthSpec> Service<TSpec> {
|
||||
error: RPCResponseErrorCode,
|
||||
reason: String,
|
||||
) {
|
||||
self.swarm._send_error_reponse(peer_id, id, error, reason);
|
||||
self.swarm
|
||||
.behaviour_mut()
|
||||
._send_error_reponse(peer_id, id, error, reason);
|
||||
}
|
||||
|
||||
/// Report a peer's action.
|
||||
pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction, source: ReportSource) {
|
||||
self.swarm.report_peer(peer_id, action, source);
|
||||
self.swarm
|
||||
.behaviour_mut()
|
||||
.report_peer(peer_id, action, source);
|
||||
}
|
||||
|
||||
/// Disconnect and ban a peer, providing a reason.
|
||||
pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason, source: ReportSource) {
|
||||
self.swarm.goodbye_peer(peer_id, reason, source);
|
||||
self.swarm
|
||||
.behaviour_mut()
|
||||
.goodbye_peer(peer_id, reason, source);
|
||||
}
|
||||
|
||||
/// Sends a response to a peer's request.
|
||||
pub fn send_response(&mut self, peer_id: PeerId, id: PeerRequestId, response: Response<TSpec>) {
|
||||
self.swarm.send_successful_response(peer_id, id, response);
|
||||
self.swarm
|
||||
.behaviour_mut()
|
||||
.send_successful_response(peer_id, id, response);
|
||||
}
|
||||
|
||||
pub async fn next_event(&mut self) -> Libp2pEvent<TSpec> {
|
||||
@@ -350,8 +360,8 @@ type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>;
|
||||
fn build_transport(
|
||||
local_private_key: Keypair,
|
||||
) -> std::io::Result<(BoxedTransport, Arc<BandwidthSinks>)> {
|
||||
let transport = libp2p::tcp::TokioTcpConfig::new().nodelay(true);
|
||||
let transport = libp2p::dns::DnsConfig::new(transport)?;
|
||||
let tcp = libp2p::tcp::TokioTcpConfig::new().nodelay(true);
|
||||
let transport = libp2p::dns::TokioDnsConfig::system(tcp)?;
|
||||
#[cfg(feature = "libp2p-websocket")]
|
||||
let transport = {
|
||||
let trans_clone = transport.clone();
|
||||
|
||||
Reference in New Issue
Block a user