Switch libp2p sigp gossipsub fork (#4999)

* switch libp2p source to sigp fork

* Shift the connection closing inside RPC behaviour

* Tag specific commits

* Add slow peer scoring

* Fix test

* Use default yamux config

* Pin discv5 to our libp2p fork and cargo update

* Upgrade libp2p to enable yamux gains

* Add a comment specifying the branch being used

* cleanup build output from within container
(prevents CI warnings related to fs permissions)

* Remove revision tags add branches for testing, will revert back once we're happy

* Update to latest rust-libp2p version

* Pin forks

* Update cargo.lock

* Re-pin to panic-free rust

---------

Co-authored-by: Age Manning <Age@AgeManning.com>
Co-authored-by: Pawan Dhananjay <pawandhananjay@gmail.com>
Co-authored-by: antondlr <anton@delaruelle.net>
Co-authored-by: Michael Sproul <michael@sigmaprime.io>
This commit is contained in:
João Oliveira
2024-01-10 05:26:52 +00:00
committed by GitHub
parent be79f74c6d
commit 38df87c3c5
17 changed files with 1057 additions and 987 deletions

1690
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -105,7 +105,7 @@ criterion = "0.3"
delay_map = "0.3"
derivative = "2"
dirs = "3"
discv5 = { version = "0.3", features = ["libp2p"] }
discv5 = { git="https://github.com/sigp/discv5", rev="dbb4a718cd32eaed8127c3c8241bfd0fde9eb908", features = ["libp2p"] }
env_logger = "0.9"
error-chain = "0.12"
ethereum-types = "0.14"
@@ -149,7 +149,7 @@ slog = { version = "2", features = ["max_level_trace", "release_max_level_trace"
slog-async = "2"
slog-term = "2"
sloggers = { version = "2", features = ["json"] }
smallvec = "1"
smallvec = "1.11.2"
snap = "1"
ssz_types = "0.5"
strum = { version = "0.24", features = ["derive"] }

View File

@@ -7,7 +7,7 @@ use beacon_processor::{BeaconProcessor, BeaconProcessorChannels, BeaconProcessor
use directory::DEFAULT_ROOT_DIR;
use eth2::{BeaconNodeHttpClient, Timeouts};
use lighthouse_network::{
discv5::enr::{CombinedKey, EnrBuilder},
discv5::enr::CombinedKey,
libp2p::swarm::{
behaviour::{ConnectionEstablished, FromSwarm},
ConnectionId, NetworkBehaviour,
@@ -138,7 +138,7 @@ pub async fn create_api_server<T: BeaconChainTypes>(
syncnets: EnrSyncCommitteeBitfield::<T::EthSpec>::default(),
});
let enr_key = CombinedKey::generate_secp256k1();
let enr = EnrBuilder::new("v4").build(&enr_key).unwrap();
let enr = Enr::builder().build(&enr_key).unwrap();
let network_globals = Arc::new(NetworkGlobals::new(
enr.clone(),
meta_data,

View File

@@ -43,10 +43,11 @@ prometheus-client = "0.22.0"
unused_port = { workspace = true }
delay_map = { workspace = true }
void = "1"
libp2p-mplex = "0.41.0"
libp2p-mplex = { git = "https://github.com/sigp/rust-libp2p/", rev = "b96b90894faab0a1eed78e1c82c6452138a3538a" }
[dependencies.libp2p]
version = "0.53"
git = "https://github.com/sigp/rust-libp2p/"
rev = "b96b90894faab0a1eed78e1c82c6452138a3538a"
default-features = false
features = ["identify", "yamux", "noise", "gossipsub", "dns", "tcp", "tokio", "plaintext", "secp256k1", "macros", "ecdsa", "metrics", "quic"]

View File

@@ -5,7 +5,6 @@ use crate::{Enr, PeerIdSerialized};
use directory::{
DEFAULT_BEACON_NODE_DIR, DEFAULT_HARDCODED_NETWORK, DEFAULT_NETWORK_DIR, DEFAULT_ROOT_DIR,
};
use discv5::{Discv5Config, Discv5ConfigBuilder};
use libp2p::gossipsub;
use libp2p::Multiaddr;
use serde::{Deserialize, Serialize};
@@ -91,7 +90,7 @@ pub struct Config {
/// Discv5 configuration parameters.
#[serde(skip)]
pub discv5_config: Discv5Config,
pub discv5_config: discv5::Config,
/// List of nodes to initially connect to.
pub boot_nodes_enr: Vec<Enr>,
@@ -324,7 +323,7 @@ impl Default for Config {
discv5::ListenConfig::from_ip(Ipv4Addr::UNSPECIFIED.into(), 9000);
// discv5 configuration
let discv5_config = Discv5ConfigBuilder::new(discv5_listen_config)
let discv5_config = discv5::ConfigBuilder::new(discv5_listen_config)
.enable_packet_filter()
.session_cache_capacity(5000)
.request_timeout(Duration::from_secs(1))

View File

@@ -1,12 +1,11 @@
//! Helper functions and an extension trait for Ethereum 2 ENRs.
pub use discv5::enr::{CombinedKey, EnrBuilder};
pub use discv5::enr::CombinedKey;
use super::enr_ext::CombinedKeyExt;
use super::ENR_FILENAME;
use crate::types::{Enr, EnrAttestationBitfield, EnrSyncCommitteeBitfield};
use crate::NetworkConfig;
use discv5::enr::EnrKey;
use libp2p::identity::Keypair;
use slog::{debug, warn};
use ssz::{Decode, Encode};
@@ -142,11 +141,13 @@ pub fn build_or_load_enr<T: EthSpec>(
Ok(local_enr)
}
pub fn create_enr_builder_from_config<T: EnrKey>(
/// Builds a lighthouse ENR given a `NetworkConfig`.
pub fn build_enr<T: EthSpec>(
enr_key: &CombinedKey,
config: &NetworkConfig,
enable_libp2p: bool,
) -> EnrBuilder<T> {
let mut builder = EnrBuilder::new("v4");
enr_fork_id: &EnrForkId,
) -> Result<Enr, String> {
let mut builder = discv5::enr::Enr::builder();
let (maybe_ipv4_address, maybe_ipv6_address) = &config.enr_address;
if let Some(ip) = maybe_ipv4_address {
@@ -165,7 +166,6 @@ pub fn create_enr_builder_from_config<T: EnrKey>(
builder.udp6(udp6_port.get());
}
if enable_libp2p {
// Add QUIC fields to the ENR.
// Since QUIC is used as an alternative transport for the libp2p protocols,
// the related fields should only be added when both QUIC and libp2p are enabled
@@ -211,17 +211,6 @@ pub fn create_enr_builder_from_config<T: EnrKey>(
if let Some(tcp6_port) = tcp6_port {
builder.tcp6(tcp6_port.get());
}
}
builder
}
/// Builds a lighthouse ENR given a `NetworkConfig`.
pub fn build_enr<T: EthSpec>(
enr_key: &CombinedKey,
config: &NetworkConfig,
enr_fork_id: &EnrForkId,
) -> Result<Enr, String> {
let mut builder = create_enr_builder_from_config(config, true);
// set the `eth2` field on our ENR
builder.add_value(ETH2_ENR_KEY, &enr_fork_id.as_ssz_bytes());

View File

@@ -366,9 +366,7 @@ mod tests {
let libp2p_kp: Keypair = secp256k1_kp.into();
let peer_id = libp2p_kp.public().to_peer_id();
let enr = discv5::enr::EnrBuilder::new("v4")
.build(&secret_key)
.unwrap();
let enr = discv5::enr::Enr::builder().build(&secret_key).unwrap();
let node_id = peer_id_to_node_id(&peer_id).unwrap();
assert_eq!(enr.node_id(), node_id);
@@ -387,9 +385,7 @@ mod tests {
let libp2p_kp: Keypair = secp256k1_kp.into();
let peer_id = libp2p_kp.public().to_peer_id();
let enr = discv5::enr::EnrBuilder::new("v4")
.build(&secret_key)
.unwrap();
let enr = discv5::enr::Enr::builder().build(&secret_key).unwrap();
let node_id = peer_id_to_node_id(&peer_id).unwrap();
assert_eq!(enr.node_id(), node_id);

View File

@@ -10,11 +10,8 @@ pub mod enr_ext;
use crate::service::TARGET_SUBNET_PEERS;
use crate::{error, Enr, NetworkConfig, NetworkGlobals, Subnet, SubnetDiscovery};
use crate::{metrics, ClearDialError};
use discv5::{enr::NodeId, Discv5, Discv5Event};
pub use enr::{
build_enr, create_enr_builder_from_config, load_enr_from_disk, use_or_load_enr, CombinedKey,
Eth2Enr,
};
use discv5::{enr::NodeId, Discv5};
pub use enr::{build_enr, load_enr_from_disk, use_or_load_enr, CombinedKey, Eth2Enr};
pub use enr_ext::{peer_id_to_node_id, CombinedKeyExt, EnrExt};
pub use libp2p::identity::{Keypair, PublicKey};
@@ -147,15 +144,10 @@ enum EventStream {
/// Awaiting an event stream to be generated. This is required due to the poll nature of
/// `Discovery`
Awaiting(
Pin<
Box<
dyn Future<Output = Result<mpsc::Receiver<Discv5Event>, discv5::Discv5Error>>
+ Send,
>,
>,
Pin<Box<dyn Future<Output = Result<mpsc::Receiver<discv5::Event>, discv5::Error>> + Send>>,
),
/// The future has completed.
Present(mpsc::Receiver<Discv5Event>),
Present(mpsc::Receiver<discv5::Event>),
// The future has failed or discv5 has been disabled. There are no events from discv5.
InActive,
}
@@ -996,7 +988,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
match event {
// We filter out unwanted discv5 events here and only propagate useful results to
// the peer manager.
Discv5Event::Discovered(_enr) => {
discv5::Event::Discovered(_enr) => {
// Peers that get discovered during a query but are not contactable or
// don't match a predicate can end up here. For debugging purposes we
// log these to see if we are unnecessarily dropping discovered peers
@@ -1009,7 +1001,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
}
*/
}
Discv5Event::SocketUpdated(socket_addr) => {
discv5::Event::SocketUpdated(socket_addr) => {
info!(self.log, "Address updated"; "ip" => %socket_addr.ip(), "udp_port" => %socket_addr.port());
metrics::inc_counter(&metrics::ADDRESS_UPDATE_COUNT);
metrics::check_nat();
@@ -1030,10 +1022,10 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
// NOTE: We assume libp2p itself can keep track of IP changes and we do
// not inform it about IP changes found via discovery.
}
Discv5Event::EnrAdded { .. }
| Discv5Event::TalkRequest(_)
| Discv5Event::NodeInserted { .. }
| Discv5Event::SessionEstablished { .. } => {} // Ignore all other discv5 server events
discv5::Event::EnrAdded { .. }
| discv5::Event::TalkRequest(_)
| discv5::Event::NodeInserted { .. }
| discv5::Event::SessionEstablished { .. } => {} // Ignore all other discv5 server events
}
}
}
@@ -1144,7 +1136,6 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
mod tests {
use super::*;
use crate::rpc::methods::{MetaData, MetaDataV2};
use enr::EnrBuilder;
use libp2p::identity::secp256k1;
use slog::{o, Drain};
use types::{BitVector, MinimalEthSpec, SubnetId};
@@ -1227,7 +1218,7 @@ mod tests {
}
fn make_enr(subnet_ids: Vec<usize>) -> Enr {
let mut builder = EnrBuilder::new("v4");
let mut builder = Enr::builder();
let keypair = secp256k1::Keypair::generate();
let enr_key: CombinedKey = CombinedKey::from_secp256k1(&keypair);

View File

@@ -7,7 +7,8 @@
use futures::future::FutureExt;
use handler::RPCHandler;
use libp2p::swarm::{
handler::ConnectionHandler, ConnectionId, NetworkBehaviour, NotifyHandler, ToSwarm,
handler::ConnectionHandler, CloseConnection, ConnectionId, NetworkBehaviour, NotifyHandler,
ToSwarm,
};
use libp2p::swarm::{FromSwarm, SubstreamProtocol, THandlerInEvent};
use libp2p::PeerId;
@@ -292,7 +293,8 @@ where
conn_id: ConnectionId,
event: <Self::ConnectionHandler as ConnectionHandler>::ToBehaviour,
) {
if let HandlerEvent::Ok(RPCReceived::Request(ref id, ref req)) = event {
match event {
HandlerEvent::Ok(RPCReceived::Request(ref id, ref req)) => {
if let Some(limiter) = self.limiter.as_mut() {
// check if the request is conformant to the quota
match limiter.allows(&peer_id, req) {
@@ -348,7 +350,15 @@ where
event,
}))
}
} else {
}
HandlerEvent::Close(_) => {
// Handle the close event here.
self.events.push(ToSwarm::CloseConnection {
peer_id,
connection: CloseConnection::All,
});
}
_ => {
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
conn_id,
@@ -356,6 +366,7 @@ where
}));
}
}
}
fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
// let the rate limiter prune.

View File

@@ -96,6 +96,9 @@ impl<TSpec: EthSpec> PeerScoreSettings<TSpec> {
ip_colocation_factor_threshold: 8.0, // Allow up to 8 nodes per IP
behaviour_penalty_threshold: 6.0,
behaviour_penalty_decay: self.score_parameter_decay(self.epoch * 10),
slow_peer_decay: 0.1,
slow_peer_weight: -10.0,
slow_peer_threshold: 0.0,
..Default::default()
};

View File

@@ -1264,6 +1264,32 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
"does_not_support_gossipsub",
);
}
gossipsub::Event::SlowPeer {
peer_id,
failed_messages,
} => {
debug!(self.log, "Slow gossipsub peer"; "peer_id" => %peer_id, "publish" => failed_messages.publish, "forward" => failed_messages.forward, "priority" => failed_messages.priority, "non_priority" => failed_messages.non_priority);
// Punish the peer if it cannot handle priority messages
if failed_messages.total_timeout() > 10 {
debug!(self.log, "Slow gossipsub peer penalized for priority failure"; "peer_id" => %peer_id);
self.peer_manager_mut().report_peer(
&peer_id,
PeerAction::HighToleranceError,
ReportSource::Gossipsub,
None,
"publish_timeout_penalty",
);
} else if failed_messages.total_queue_full() > 10 {
debug!(self.log, "Slow gossipsub peer penalized for send queue full"; "peer_id" => %peer_id);
self.peer_manager_mut().report_peer(
&peer_id,
PeerAction::HighToleranceError,
ReportSource::Gossipsub,
None,
"queue_full_penalty",
);
}
}
}
None
}
@@ -1468,8 +1494,7 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
self.build_response(id, peer_id, response)
}
HandlerEvent::Close(_) => {
let _ = self.swarm.disconnect_peer_id(peer_id);
// NOTE: we wait for the swarm to report the connection as actually closed
// NOTE: This is handled in the RPC behaviour.
None
}
}

View File

@@ -50,8 +50,7 @@ pub fn build_transport(
mplex_config.set_max_buffer_behaviour(libp2p_mplex::MaxBufferBehaviour::Block);
// yamux config
let mut yamux_config = yamux::Config::default();
yamux_config.set_window_update_mode(yamux::WindowUpdateMode::on_read());
let yamux_config = yamux::Config::default();
// Creates the TCP transport layer
let tcp = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true))
.upgrade(core::upgrade::Version::V1)

View File

@@ -118,7 +118,7 @@ impl<TSpec: EthSpec> NetworkGlobals<TSpec> {
use crate::CombinedKeyExt;
let keypair = libp2p::identity::secp256k1::Keypair::generate();
let enr_key: discv5::enr::CombinedKey = discv5::enr::CombinedKey::from_secp256k1(&keypair);
let enr = discv5::enr::EnrBuilder::new("v4").build(&enr_key).unwrap();
let enr = discv5::enr::Enr::builder().build(&enr_key).unwrap();
NetworkGlobals::new(
enr,
MetaData::V2(MetaDataV2 {

View File

@@ -18,7 +18,7 @@ use lighthouse_network::discovery::ConnectionId;
use lighthouse_network::rpc::methods::BlobsByRangeRequest;
use lighthouse_network::rpc::SubstreamId;
use lighthouse_network::{
discv5::enr::{CombinedKey, EnrBuilder},
discv5::enr::{self, CombinedKey},
rpc::methods::{MetaData, MetaDataV2},
types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield},
Client, MessageId, NetworkGlobals, PeerId, Response,
@@ -203,7 +203,7 @@ impl TestRig {
syncnets: EnrSyncCommitteeBitfield::<MainnetEthSpec>::default(),
});
let enr_key = CombinedKey::generate_secp256k1();
let enr = EnrBuilder::new("v4").build(&enr_key).unwrap();
let enr = enr::Enr::builder().build(&enr_key).unwrap();
let network_globals = Arc::new(NetworkGlobals::new(enr, meta_data, vec![], false, &log));
let executor = harness.runtime.task_executor.clone();

View File

@@ -1,8 +1,7 @@
use beacon_node::{get_data_dir, set_network_config};
use clap::ArgMatches;
use eth2_network_config::Eth2NetworkConfig;
use lighthouse_network::discovery::create_enr_builder_from_config;
use lighthouse_network::discv5::{enr::CombinedKey, Discv5Config, Enr};
use lighthouse_network::discv5::{self, enr::CombinedKey, Enr};
use lighthouse_network::{
discovery::{load_enr_from_disk, use_or_load_enr},
load_private_key, CombinedKeyExt, NetworkConfig,
@@ -20,7 +19,7 @@ pub struct BootNodeConfig<T: EthSpec> {
pub boot_nodes: Vec<Enr>,
pub local_enr: Enr,
pub local_key: CombinedKey,
pub discv5_config: Discv5Config,
pub discv5_config: discv5::Config,
phantom: PhantomData<T>,
}
@@ -130,8 +129,25 @@ impl<T: EthSpec> BootNodeConfig<T> {
// Build the local ENR
let mut local_enr = {
let enable_tcp = false;
let mut builder = create_enr_builder_from_config(&network_config, enable_tcp);
let (maybe_ipv4_address, maybe_ipv6_address) = network_config.enr_address;
let mut builder = discv5::Enr::builder();
if let Some(ip) = maybe_ipv4_address {
builder.ip4(ip);
}
if let Some(ip) = maybe_ipv6_address {
builder.ip6(ip);
}
if let Some(udp4_port) = network_config.enr_udp4_port {
builder.udp4(udp4_port.get());
}
if let Some(udp6_port) = network_config.enr_udp6_port {
builder.udp6(udp6_port.get());
}
// If we know of the ENR field, add it to the initial construction
if let Some(enr_fork_bytes) = enr_fork {
builder.add_value("eth2", &enr_fork_bytes);
@@ -157,7 +173,7 @@ impl<T: EthSpec> BootNodeConfig<T> {
/// The set of configuration parameters that can safely be (de)serialized.
///
/// Its fields are a subset of the fields of `BootNodeConfig`, some of them are copied from `Discv5Config`.
/// Its fields are a subset of the fields of `BootNodeConfig`, some of them are copied from `discv5::Config`.
#[derive(Serialize, Deserialize)]
pub struct BootNodeConfigSerialization {
pub ipv4_listen_socket: Option<SocketAddrV4>,

View File

@@ -5,7 +5,7 @@ use crate::config::BootNodeConfigSerialization;
use clap::ArgMatches;
use eth2_network_config::Eth2NetworkConfig;
use lighthouse_network::{
discv5::{enr::NodeId, Discv5, Discv5Event},
discv5::{self, enr::NodeId, Discv5},
EnrExt, Eth2Enr,
};
use slog::info;
@@ -144,17 +144,17 @@ pub async fn run<T: EthSpec>(
}
Some(event) = event_stream.recv() => {
match event {
Discv5Event::Discovered(_enr) => {
discv5::Event::Discovered(_enr) => {
// An ENR has bee obtained by the server
// Ignore these events here
}
Discv5Event::EnrAdded { .. } => {} // Ignore
Discv5Event::TalkRequest(_) => {} // Ignore
Discv5Event::NodeInserted { .. } => {} // Ignore
Discv5Event::SocketUpdated(socket_addr) => {
discv5::Event::EnrAdded { .. } => {} // Ignore
discv5::Event::TalkRequest(_) => {} // Ignore
discv5::Event::NodeInserted { .. } => {} // Ignore
discv5::Event::SocketUpdated(socket_addr) => {
info!(log, "Advertised socket address updated"; "socket_addr" => %socket_addr);
}
Discv5Event::SessionEstablished{ .. } => {} // Ignore
discv5::Event::SessionEstablished{ .. } => {} // Ignore
}
}
}