mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-09 03:31:45 +00:00
Anchor currently depends on `lighthouse_network` for a few types and utilities that live within. As we use our own libp2p behaviours, we actually do not use the core logic in that crate. This makes us transitively depend on a bunch of unneeded crates (even a whole separate libp2p if the versions mismatch!) Move things we require into it's own lightweight crate. Co-Authored-By: Daniel Knopik <daniel@dknopik.de>
1953 lines
79 KiB
Rust
1953 lines
79 KiB
Rust
use self::gossip_cache::GossipCache;
|
|
use crate::Eth2Enr;
|
|
use crate::config::{GossipsubConfigParams, NetworkLoad, gossipsub_config};
|
|
use crate::discovery::{
|
|
DiscoveredPeers, Discovery, FIND_NODE_QUERY_CLOSEST_PEERS, subnet_predicate,
|
|
};
|
|
use crate::peer_manager::{
|
|
ConnectionDirection, PeerManager, PeerManagerEvent, config::Config as PeerManagerCfg,
|
|
peerdb::score::PeerAction, peerdb::score::ReportSource,
|
|
};
|
|
use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY_PEER_EXCESS};
|
|
use crate::rpc::methods::MetadataRequest;
|
|
use crate::rpc::{
|
|
GoodbyeReason, HandlerErr, InboundRequestId, Protocol, RPC, RPCError, RPCMessage, RPCReceived,
|
|
RequestType, ResponseTermination, RpcResponse, RpcSuccessResponse,
|
|
};
|
|
use crate::types::{
|
|
GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery,
|
|
all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash,
|
|
};
|
|
use crate::{Enr, NetworkGlobals, PubsubMessage, TopicHash, metrics};
|
|
use api_types::{AppRequestId, Response};
|
|
use futures::stream::StreamExt;
|
|
use gossipsub::{
|
|
IdentTopic as Topic, MessageAcceptance, MessageAuthenticity, MessageId, PublishError,
|
|
TopicScoreParams,
|
|
};
|
|
use gossipsub_scoring_parameters::{PeerScoreSettings, lighthouse_gossip_thresholds};
|
|
use libp2p::multiaddr::{self, Multiaddr, Protocol as MProtocol};
|
|
use libp2p::swarm::behaviour::toggle::Toggle;
|
|
use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent};
|
|
use libp2p::upnp::tokio::Behaviour as Upnp;
|
|
use libp2p::{PeerId, SwarmBuilder, identify};
|
|
use logging::crit;
|
|
use network_utils::enr_ext::EnrExt;
|
|
use std::num::{NonZeroU8, NonZeroUsize};
|
|
use std::path::PathBuf;
|
|
use std::pin::Pin;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
use tracing::{debug, error, info, trace, warn};
|
|
use types::{ChainSpec, ForkName};
|
|
use types::{
|
|
EnrForkId, EthSpec, ForkContext, Slot, SubnetId, consts::altair::SYNC_COMMITTEE_SUBNET_COUNT,
|
|
};
|
|
use utils::{Context as ServiceContext, build_transport, strip_peer_id};
|
|
|
|
pub mod api_types;
|
|
mod gossip_cache;
|
|
pub mod gossipsub_scoring_parameters;
|
|
pub mod utils;
|
|
/// The number of peers we target per subnet for discovery queries.
|
|
pub const TARGET_SUBNET_PEERS: usize = 3;
|
|
|
|
const MAX_IDENTIFY_ADDRESSES: usize = 10;
|
|
|
|
/// The types of events than can be obtained from polling the behaviour.
|
|
#[derive(Debug)]
|
|
pub enum NetworkEvent<E: EthSpec> {
|
|
/// We have successfully dialed and connected to a peer.
|
|
PeerConnectedOutgoing(PeerId),
|
|
/// A peer has successfully dialed and connected to us.
|
|
PeerConnectedIncoming(PeerId),
|
|
/// A peer has disconnected.
|
|
PeerDisconnected(PeerId),
|
|
/// An RPC Request that was sent failed.
|
|
RPCFailed {
|
|
/// The id of the failed request.
|
|
app_request_id: AppRequestId,
|
|
/// The peer to which this request was sent.
|
|
peer_id: PeerId,
|
|
/// The error of the failed request.
|
|
error: RPCError,
|
|
},
|
|
RequestReceived {
|
|
/// The peer that sent the request.
|
|
peer_id: PeerId,
|
|
/// Identifier of the request. All responses to this request must use this id.
|
|
inbound_request_id: InboundRequestId,
|
|
/// Request the peer sent.
|
|
request_type: RequestType<E>,
|
|
},
|
|
ResponseReceived {
|
|
/// Peer that sent the response.
|
|
peer_id: PeerId,
|
|
/// Id of the request to which the peer is responding.
|
|
app_request_id: AppRequestId,
|
|
/// Response the peer sent.
|
|
response: Response<E>,
|
|
},
|
|
PubsubMessage {
|
|
/// The gossipsub message id. Used when propagating blocks after validation.
|
|
id: MessageId,
|
|
/// The peer from which we received this message, not the peer that published it.
|
|
source: PeerId,
|
|
/// The topic that this message was sent on.
|
|
topic: TopicHash,
|
|
/// The message itself.
|
|
message: PubsubMessage<E>,
|
|
},
|
|
/// Inform the network to send a Status to this peer.
|
|
StatusPeer(PeerId),
|
|
NewListenAddr(Multiaddr),
|
|
ZeroListeners,
|
|
/// A peer has an updated custody group count from MetaData.
|
|
PeerUpdatedCustodyGroupCount(PeerId),
|
|
}
|
|
|
|
pub type Gossipsub = gossipsub::Behaviour<SnappyTransform, SubscriptionFilter>;
|
|
pub type SubscriptionFilter =
|
|
gossipsub::MaxCountSubscriptionFilter<gossipsub::WhitelistSubscriptionFilter>;
|
|
|
|
#[derive(NetworkBehaviour)]
|
|
pub(crate) struct Behaviour<E>
|
|
where
|
|
E: EthSpec,
|
|
{
|
|
// NOTE: The order of the following list of behaviours has meaning,
|
|
// `NetworkBehaviour::handle_{pending, established}_{inbound, outbound}` methods
|
|
// are called sequentially for each behaviour and they are fallible,
|
|
// therefore we want `connection_limits` and `peer_manager` running first,
|
|
// which are the behaviours that may reject a connection, so that
|
|
// when the subsequent behaviours are called they are certain the connection won't be rejected.
|
|
|
|
//
|
|
/// Keep track of active and pending connections to enforce hard limits.
|
|
pub connection_limits: libp2p::connection_limits::Behaviour,
|
|
/// The peer manager that keeps track of peer's reputation and status.
|
|
pub peer_manager: PeerManager<E>,
|
|
/// The Eth2 RPC specified in the wire-0 protocol.
|
|
pub eth2_rpc: RPC<AppRequestId, E>,
|
|
/// Discv5 Discovery protocol.
|
|
pub discovery: Discovery<E>,
|
|
/// Keep regular connection to peers and disconnect if absent.
|
|
// NOTE: The id protocol is used for initial interop. This will be removed by mainnet.
|
|
/// Provides IP addresses and peer information.
|
|
pub identify: identify::Behaviour,
|
|
/// Libp2p UPnP port mapping.
|
|
pub upnp: Toggle<Upnp>,
|
|
/// The routing pub-sub mechanism for eth2.
|
|
pub gossipsub: Gossipsub,
|
|
}
|
|
|
|
/// Builds the network behaviour that manages the core protocols of eth2.
|
|
/// This core behaviour is managed by `Behaviour` which adds peer management to all core
|
|
/// behaviours.
|
|
pub struct Network<E: EthSpec> {
|
|
swarm: libp2p::swarm::Swarm<Behaviour<E>>,
|
|
/* Auxiliary Fields */
|
|
/// A collections of variables accessible outside the network service.
|
|
network_globals: Arc<NetworkGlobals<E>>,
|
|
/// Keeps track of the current EnrForkId for upgrading gossipsub topics.
|
|
// NOTE: This can be accessed via the network_globals ENR. However we keep it here for quick
|
|
// lookups for every gossipsub message send.
|
|
enr_fork_id: EnrForkId,
|
|
/// Directory where metadata is stored.
|
|
network_dir: PathBuf,
|
|
fork_context: Arc<ForkContext>,
|
|
/// Gossipsub score parameters.
|
|
score_settings: PeerScoreSettings<E>,
|
|
/// The interval for updating gossipsub scores
|
|
update_gossipsub_scores: tokio::time::Interval,
|
|
gossip_cache: GossipCache,
|
|
/// This node's PeerId.
|
|
pub local_peer_id: PeerId,
|
|
}
|
|
|
|
/// Implements the combined behaviour for the libp2p service.
|
|
impl<E: EthSpec> Network<E> {
|
|
pub async fn new(
|
|
executor: task_executor::TaskExecutor,
|
|
mut ctx: ServiceContext<'_>,
|
|
custody_group_count: u64,
|
|
) -> Result<(Self, Arc<NetworkGlobals<E>>), String> {
|
|
let config = ctx.config.clone();
|
|
trace!("Libp2p Service starting");
|
|
// initialise the node's ID
|
|
let local_keypair = utils::load_private_key(&config);
|
|
|
|
// Trusted peers will also be marked as explicit in GossipSub.
|
|
// Cfr. https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#explicit-peering-agreements
|
|
let trusted_peers: Vec<PeerId> = config
|
|
.trusted_peers
|
|
.iter()
|
|
.map(|x| PeerId::from(x.clone()))
|
|
.collect();
|
|
|
|
// set up a collection of variables accessible outside of the network crate
|
|
// Create an ENR or load from disk if appropriate
|
|
let next_fork_digest = ctx
|
|
.fork_context
|
|
.next_fork_digest()
|
|
.unwrap_or_else(|| ctx.fork_context.current_fork_digest());
|
|
|
|
let advertised_cgc = config
|
|
.advertise_false_custody_group_count
|
|
.unwrap_or(custody_group_count);
|
|
let enr = crate::discovery::enr::build_or_load_enr::<E>(
|
|
local_keypair.clone(),
|
|
&config,
|
|
&ctx.enr_fork_id,
|
|
Some(advertised_cgc),
|
|
next_fork_digest,
|
|
&ctx.chain_spec,
|
|
)?;
|
|
|
|
// Construct the metadata
|
|
|
|
let meta_data = utils::load_or_build_metadata(&config.network_dir, advertised_cgc);
|
|
let seq_number = *meta_data.seq_number();
|
|
let globals = NetworkGlobals::new(
|
|
enr,
|
|
meta_data,
|
|
trusted_peers,
|
|
config.disable_peer_scoring,
|
|
config.clone(),
|
|
ctx.chain_spec.clone(),
|
|
);
|
|
let network_globals = Arc::new(globals);
|
|
|
|
// Grab our local ENR FORK ID
|
|
let enr_fork_id = network_globals
|
|
.local_enr()
|
|
.eth2()
|
|
.expect("Local ENR must have a fork id");
|
|
|
|
let gossipsub_config_params = GossipsubConfigParams {
|
|
message_domain_valid_snappy: ctx.chain_spec.message_domain_valid_snappy,
|
|
gossipsub_max_transmit_size: ctx.chain_spec.max_message_size(),
|
|
};
|
|
let gs_config = gossipsub_config(
|
|
config.network_load,
|
|
ctx.fork_context.clone(),
|
|
gossipsub_config_params,
|
|
ctx.chain_spec.seconds_per_slot,
|
|
E::slots_per_epoch(),
|
|
config.idontwant_message_size_threshold,
|
|
);
|
|
|
|
let score_settings = PeerScoreSettings::new(&ctx.chain_spec, gs_config.mesh_n());
|
|
|
|
let gossip_cache = {
|
|
let slot_duration = std::time::Duration::from_secs(ctx.chain_spec.seconds_per_slot);
|
|
let half_epoch = std::time::Duration::from_secs(
|
|
ctx.chain_spec.seconds_per_slot * E::slots_per_epoch() / 2,
|
|
);
|
|
|
|
GossipCache::builder()
|
|
.beacon_block_timeout(slot_duration)
|
|
.aggregates_timeout(half_epoch)
|
|
.attestation_timeout(half_epoch)
|
|
.voluntary_exit_timeout(half_epoch * 2)
|
|
.proposer_slashing_timeout(half_epoch * 2)
|
|
.attester_slashing_timeout(half_epoch * 2)
|
|
// .signed_contribution_and_proof_timeout(timeout) // Do not retry
|
|
// .sync_committee_message_timeout(timeout) // Do not retry
|
|
.bls_to_execution_change_timeout(half_epoch * 2)
|
|
.build()
|
|
};
|
|
|
|
let local_peer_id = network_globals.local_peer_id();
|
|
|
|
let (gossipsub, update_gossipsub_scores) = {
|
|
let thresholds = lighthouse_gossip_thresholds();
|
|
|
|
// Prepare scoring parameters
|
|
let params = {
|
|
// Construct a set of gossipsub peer scoring parameters
|
|
// We don't know the number of active validators and the current slot yet
|
|
let active_validators = E::minimum_validator_count();
|
|
let current_slot = Slot::new(0);
|
|
score_settings.get_peer_score_params(
|
|
active_validators,
|
|
&thresholds,
|
|
&enr_fork_id,
|
|
current_slot,
|
|
)?
|
|
};
|
|
|
|
trace!(?params, "Using peer score params");
|
|
|
|
// Set up a scoring update interval
|
|
let update_gossipsub_scores = tokio::time::interval(params.decay_interval);
|
|
|
|
let current_digest_epoch = ctx.fork_context.current_fork_epoch();
|
|
let current_and_future_digests =
|
|
ctx.chain_spec
|
|
.all_digest_epochs()
|
|
.filter_map(|digest_epoch| {
|
|
if digest_epoch >= current_digest_epoch {
|
|
Some((digest_epoch, ctx.fork_context.context_bytes(digest_epoch)))
|
|
} else {
|
|
None
|
|
}
|
|
});
|
|
|
|
let all_topics_for_digests = current_and_future_digests
|
|
.map(|(epoch, digest)| {
|
|
let fork = ctx.chain_spec.fork_name_at_epoch(epoch);
|
|
all_topics_at_fork::<E>(fork, &ctx.chain_spec)
|
|
.into_iter()
|
|
.map(|topic| {
|
|
Topic::new(GossipTopic::new(topic, GossipEncoding::default(), digest))
|
|
.into()
|
|
})
|
|
.collect::<Vec<TopicHash>>()
|
|
})
|
|
.collect::<Vec<_>>();
|
|
|
|
// For simplicity find the fork with the most individual topics and assume all forks
|
|
// have the same topic count
|
|
let max_topics_at_any_fork = all_topics_for_digests
|
|
.iter()
|
|
.map(|topics| topics.len())
|
|
.max()
|
|
.expect("each fork has at least 5 hardcoded core topics");
|
|
|
|
let possible_fork_digests = ctx.fork_context.all_fork_digests();
|
|
let filter = gossipsub::MaxCountSubscriptionFilter {
|
|
filter: utils::create_whitelist_filter(
|
|
possible_fork_digests,
|
|
&ctx.chain_spec,
|
|
SYNC_COMMITTEE_SUBNET_COUNT,
|
|
),
|
|
// during a fork we subscribe to both the old and new topics
|
|
max_subscribed_topics: max_topics_at_any_fork * 4,
|
|
// 424 in theory = (64 attestation + 4 sync committee + 7 core topics + 9 blob topics + 128 column topics) * 2
|
|
max_subscriptions_per_request: max_topics_at_any_fork * 2,
|
|
};
|
|
|
|
let spec = &ctx.chain_spec;
|
|
let snappy_transform =
|
|
SnappyTransform::new(spec.max_payload_size as usize, spec.max_compressed_len());
|
|
let mut gossipsub = Gossipsub::new_with_subscription_filter_and_transform(
|
|
MessageAuthenticity::Anonymous,
|
|
gs_config.clone(),
|
|
filter,
|
|
snappy_transform,
|
|
)
|
|
.map_err(|e| format!("Could not construct gossipsub: {:?}", e))?;
|
|
|
|
// If metrics are enabled for libp2p build the configuration
|
|
if let Some(ref mut registry) = ctx.libp2p_registry {
|
|
gossipsub = gossipsub.with_metrics(
|
|
registry.sub_registry_with_prefix("gossipsub"),
|
|
Default::default(),
|
|
);
|
|
}
|
|
|
|
gossipsub
|
|
.with_peer_score(params, thresholds)
|
|
.expect("Valid score params and thresholds");
|
|
|
|
// Mark trusted peers as explicit.
|
|
for explicit_peer in config.trusted_peers.iter() {
|
|
gossipsub.add_explicit_peer(&PeerId::from(explicit_peer.clone()));
|
|
}
|
|
|
|
// If we are using metrics, then register which topics we want to make sure to keep
|
|
// track of
|
|
if ctx.libp2p_registry.is_some() {
|
|
for topics in all_topics_for_digests {
|
|
gossipsub.register_topics_for_metrics(topics);
|
|
}
|
|
}
|
|
|
|
(gossipsub, update_gossipsub_scores)
|
|
};
|
|
|
|
let eth2_rpc = RPC::new(
|
|
ctx.fork_context.clone(),
|
|
config.enable_light_client_server,
|
|
config.inbound_rate_limiter_config.clone(),
|
|
config.outbound_rate_limiter_config.clone(),
|
|
seq_number,
|
|
);
|
|
|
|
let discovery = {
|
|
// Build and start the discovery sub-behaviour
|
|
let mut discovery = Discovery::new(
|
|
local_keypair.clone(),
|
|
&config,
|
|
network_globals.clone(),
|
|
&ctx.chain_spec,
|
|
)
|
|
.await?;
|
|
// start searching for peers
|
|
discovery.discover_peers(FIND_NODE_QUERY_CLOSEST_PEERS);
|
|
discovery
|
|
};
|
|
|
|
let identify = {
|
|
let local_public_key = local_keypair.public();
|
|
let identify_config = if config.private {
|
|
identify::Config::new(
|
|
"".into(),
|
|
local_public_key, // Still send legitimate public key
|
|
)
|
|
.with_cache_size(0)
|
|
} else {
|
|
identify::Config::new("eth2/1.0.0".into(), local_public_key)
|
|
.with_agent_version(lighthouse_version::version_with_platform())
|
|
.with_cache_size(0)
|
|
};
|
|
identify::Behaviour::new(identify_config)
|
|
};
|
|
|
|
let peer_manager = {
|
|
let peer_manager_cfg = PeerManagerCfg {
|
|
discovery_enabled: !config.disable_discovery,
|
|
quic_enabled: !config.disable_quic_support,
|
|
metrics_enabled: config.metrics_enabled,
|
|
target_peer_count: config.target_peers,
|
|
..Default::default()
|
|
};
|
|
PeerManager::new(peer_manager_cfg, network_globals.clone())?
|
|
};
|
|
|
|
let connection_limits = {
|
|
let limits = libp2p::connection_limits::ConnectionLimits::default()
|
|
.with_max_pending_incoming(Some(5))
|
|
.with_max_pending_outgoing(Some(16))
|
|
.with_max_established_incoming(Some(
|
|
(config.target_peers as f32
|
|
* (1.0 + PEER_EXCESS_FACTOR - MIN_OUTBOUND_ONLY_FACTOR))
|
|
.ceil() as u32,
|
|
))
|
|
.with_max_established_outgoing(Some(
|
|
(config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR)).ceil() as u32,
|
|
))
|
|
.with_max_established(Some(
|
|
(config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR + PRIORITY_PEER_EXCESS))
|
|
.ceil() as u32,
|
|
))
|
|
.with_max_established_per_peer(Some(1));
|
|
|
|
libp2p::connection_limits::Behaviour::new(limits)
|
|
};
|
|
|
|
let upnp = Toggle::from(
|
|
config
|
|
.upnp_enabled
|
|
.then(libp2p::upnp::tokio::Behaviour::default),
|
|
);
|
|
let behaviour = {
|
|
Behaviour {
|
|
gossipsub,
|
|
eth2_rpc,
|
|
discovery,
|
|
identify,
|
|
peer_manager,
|
|
connection_limits,
|
|
upnp,
|
|
}
|
|
};
|
|
|
|
// Set up the transport - tcp/quic with noise and mplex
|
|
let transport = build_transport(local_keypair.clone(), !config.disable_quic_support)
|
|
.map_err(|e| format!("Failed to build transport: {:?}", e))?;
|
|
|
|
// use the executor for libp2p
|
|
struct Executor(task_executor::TaskExecutor);
|
|
impl libp2p::swarm::Executor for Executor {
|
|
fn exec(&self, f: Pin<Box<dyn futures::Future<Output = ()> + Send>>) {
|
|
self.0.spawn(f, "libp2p");
|
|
}
|
|
}
|
|
|
|
// sets up the libp2p swarm.
|
|
|
|
let swarm = {
|
|
let config = libp2p::swarm::Config::with_executor(Executor(executor))
|
|
.with_notify_handler_buffer_size(NonZeroUsize::new(7).expect("Not zero"))
|
|
.with_per_connection_event_buffer_size(4)
|
|
.with_idle_connection_timeout(Duration::from_secs(10)) // Other clients can timeout
|
|
// during negotiation
|
|
.with_dial_concurrency_factor(NonZeroU8::new(1).unwrap());
|
|
|
|
let builder = SwarmBuilder::with_existing_identity(local_keypair)
|
|
.with_tokio()
|
|
.with_other_transport(|_key| transport)
|
|
.expect("infalible");
|
|
|
|
// NOTE: adding bandwidth metrics changes the generics of the swarm, so types diverge
|
|
if let Some(libp2p_registry) = ctx.libp2p_registry {
|
|
builder
|
|
.with_bandwidth_metrics(libp2p_registry)
|
|
.with_behaviour(|_| behaviour)
|
|
.expect("infalible")
|
|
.with_swarm_config(|_| config)
|
|
.build()
|
|
} else {
|
|
builder
|
|
.with_behaviour(|_| behaviour)
|
|
.expect("infalible")
|
|
.with_swarm_config(|_| config)
|
|
.build()
|
|
}
|
|
};
|
|
|
|
let mut network = Network {
|
|
swarm,
|
|
network_globals,
|
|
enr_fork_id,
|
|
network_dir: config.network_dir.clone(),
|
|
fork_context: ctx.fork_context,
|
|
score_settings,
|
|
update_gossipsub_scores,
|
|
gossip_cache,
|
|
local_peer_id,
|
|
};
|
|
|
|
network.start(&config).await?;
|
|
|
|
let network_globals = network.network_globals.clone();
|
|
|
|
Ok((network, network_globals))
|
|
}
|
|
|
|
/// Starts the network:
|
|
///
|
|
/// - Starts listening in the given ports.
|
|
/// - Dials boot-nodes and libp2p peers.
|
|
/// - Subscribes to starting gossipsub topics.
|
|
async fn start(&mut self, config: &crate::NetworkConfig) -> Result<(), String> {
|
|
let enr = self.network_globals.local_enr();
|
|
info!(
|
|
peer_id = %enr.peer_id(),
|
|
bandwidth_config = format!("{}-{}", config.network_load, NetworkLoad::from(config.network_load).name),
|
|
"Libp2p Starting"
|
|
);
|
|
debug!(
|
|
listen_addrs = ?config.listen_addrs(),
|
|
discovery_enabled = !config.disable_discovery,
|
|
quic_enabled = !config.disable_quic_support,
|
|
"Attempting to open listening ports"
|
|
);
|
|
|
|
for listen_multiaddr in config.listen_addrs().libp2p_addresses() {
|
|
// If QUIC is disabled, ignore listening on QUIC ports
|
|
if config.disable_quic_support
|
|
&& listen_multiaddr.iter().any(|v| v == MProtocol::QuicV1)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
match self.swarm.listen_on(listen_multiaddr.clone()) {
|
|
Ok(_) => {
|
|
let mut log_address = listen_multiaddr;
|
|
log_address.push(MProtocol::P2p(enr.peer_id()));
|
|
info!(address = %log_address, "Listening established");
|
|
}
|
|
Err(err) => {
|
|
crit!(
|
|
error = ?err,
|
|
%listen_multiaddr,
|
|
"Unable to listen on libp2p address"
|
|
);
|
|
return Err("Libp2p was unable to listen on the given listen address.".into());
|
|
}
|
|
};
|
|
}
|
|
|
|
// helper closure for dialing peers
|
|
let mut dial = |mut multiaddr: Multiaddr| {
|
|
// strip the p2p protocol if it exists
|
|
strip_peer_id(&mut multiaddr);
|
|
match self.swarm.dial(multiaddr.clone()) {
|
|
Ok(()) => debug!(address = %multiaddr, "Dialing libp2p peer"),
|
|
Err(err) => {
|
|
debug!(address = %multiaddr, error = ?err, "Could not connect to peer")
|
|
}
|
|
};
|
|
};
|
|
|
|
// attempt to connect to user-input libp2p nodes
|
|
for multiaddr in &config.libp2p_nodes {
|
|
dial(multiaddr.clone());
|
|
}
|
|
|
|
// attempt to connect to any specified boot-nodes
|
|
let mut boot_nodes = config.boot_nodes_enr.clone();
|
|
boot_nodes.dedup();
|
|
|
|
for bootnode_enr in boot_nodes {
|
|
// If QUIC is enabled, attempt QUIC connections first
|
|
if !config.disable_quic_support {
|
|
for quic_multiaddr in &bootnode_enr.multiaddr_quic() {
|
|
if !self
|
|
.network_globals
|
|
.peers
|
|
.read()
|
|
.is_connected_or_dialing(&bootnode_enr.peer_id())
|
|
{
|
|
dial(quic_multiaddr.clone());
|
|
}
|
|
}
|
|
}
|
|
|
|
for multiaddr in &bootnode_enr.multiaddr() {
|
|
// ignore udp multiaddr if it exists
|
|
let components = multiaddr.iter().collect::<Vec<_>>();
|
|
if let MProtocol::Udp(_) = components[1] {
|
|
continue;
|
|
}
|
|
|
|
if !self
|
|
.network_globals
|
|
.peers
|
|
.read()
|
|
.is_connected_or_dialing(&bootnode_enr.peer_id())
|
|
{
|
|
dial(multiaddr.clone());
|
|
}
|
|
}
|
|
}
|
|
|
|
for multiaddr in &config.boot_nodes_multiaddr {
|
|
// check TCP support for dialing
|
|
if multiaddr
|
|
.iter()
|
|
.any(|proto| matches!(proto, MProtocol::Tcp(_)))
|
|
{
|
|
dial(multiaddr.clone());
|
|
}
|
|
}
|
|
|
|
let mut subscribed_topics: Vec<GossipKind> = vec![];
|
|
|
|
for topic_kind in &config.topics {
|
|
if self.subscribe_kind(topic_kind.clone()) {
|
|
subscribed_topics.push(topic_kind.clone());
|
|
} else {
|
|
warn!(topic = %topic_kind, "Could not subscribe to topic");
|
|
}
|
|
}
|
|
|
|
if !subscribed_topics.is_empty() {
|
|
info!(topics = ?subscribed_topics, "Subscribed to topics");
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/* Public Accessible Functions to interact with the behaviour */
|
|
|
|
/// The routing pub-sub mechanism for eth2.
|
|
pub fn gossipsub_mut(&mut self) -> &mut Gossipsub {
|
|
&mut self.swarm.behaviour_mut().gossipsub
|
|
}
|
|
/// The Eth2 RPC specified in the wire-0 protocol.
|
|
pub fn eth2_rpc_mut(&mut self) -> &mut RPC<AppRequestId, E> {
|
|
&mut self.swarm.behaviour_mut().eth2_rpc
|
|
}
|
|
/// Discv5 Discovery protocol.
|
|
pub fn discovery_mut(&mut self) -> &mut Discovery<E> {
|
|
&mut self.swarm.behaviour_mut().discovery
|
|
}
|
|
/// Provides IP addresses and peer information.
|
|
pub fn identify_mut(&mut self) -> &mut identify::Behaviour {
|
|
&mut self.swarm.behaviour_mut().identify
|
|
}
|
|
/// The peer manager that keeps track of peer's reputation and status.
|
|
pub fn peer_manager_mut(&mut self) -> &mut PeerManager<E> {
|
|
&mut self.swarm.behaviour_mut().peer_manager
|
|
}
|
|
|
|
/// The routing pub-sub mechanism for eth2.
|
|
pub fn gossipsub(&self) -> &Gossipsub {
|
|
&self.swarm.behaviour().gossipsub
|
|
}
|
|
/// The Eth2 RPC specified in the wire-0 protocol.
|
|
pub fn eth2_rpc(&self) -> &RPC<AppRequestId, E> {
|
|
&self.swarm.behaviour().eth2_rpc
|
|
}
|
|
/// Discv5 Discovery protocol.
|
|
pub fn discovery(&self) -> &Discovery<E> {
|
|
&self.swarm.behaviour().discovery
|
|
}
|
|
/// Provides IP addresses and peer information.
|
|
pub fn identify(&self) -> &identify::Behaviour {
|
|
&self.swarm.behaviour().identify
|
|
}
|
|
/// The peer manager that keeps track of peer's reputation and status.
|
|
pub fn peer_manager(&self) -> &PeerManager<E> {
|
|
&self.swarm.behaviour().peer_manager
|
|
}
|
|
|
|
/// Returns the local ENR of the node.
|
|
pub fn local_enr(&self) -> Enr {
|
|
self.network_globals.local_enr()
|
|
}
|
|
|
|
/* Pubsub behaviour functions */
|
|
|
|
/// Subscribes to a gossipsub topic kind, letting the network service determine the
|
|
/// encoding and fork version.
|
|
pub fn subscribe_kind(&mut self, kind: GossipKind) -> bool {
|
|
let gossip_topic = GossipTopic::new(
|
|
kind,
|
|
GossipEncoding::default(),
|
|
self.enr_fork_id.fork_digest,
|
|
);
|
|
|
|
self.subscribe(gossip_topic)
|
|
}
|
|
|
|
/// Unsubscribes from a gossipsub topic kind, letting the network service determine the
|
|
/// encoding and fork version.
|
|
pub fn unsubscribe_kind(&mut self, kind: GossipKind) -> bool {
|
|
let gossip_topic = GossipTopic::new(
|
|
kind,
|
|
GossipEncoding::default(),
|
|
self.enr_fork_id.fork_digest,
|
|
);
|
|
self.unsubscribe(gossip_topic)
|
|
}
|
|
|
|
/// Subscribe to all required topics for the `new_fork` with the given `new_fork_digest`.
|
|
pub fn subscribe_new_fork_topics(&mut self, new_fork: ForkName, new_fork_digest: [u8; 4]) {
|
|
// Re-subscribe to non-core topics with the new fork digest
|
|
let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone();
|
|
for mut topic in subscriptions.into_iter() {
|
|
if is_fork_non_core_topic(&topic, new_fork) {
|
|
topic.fork_digest = new_fork_digest;
|
|
self.subscribe(topic);
|
|
}
|
|
}
|
|
|
|
// Subscribe to core topics for the new fork
|
|
for kind in core_topics_to_subscribe::<E>(
|
|
new_fork,
|
|
&self.network_globals.as_topic_config(),
|
|
&self.fork_context.spec,
|
|
) {
|
|
let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest);
|
|
self.subscribe(topic);
|
|
}
|
|
|
|
// Already registered all possible gossipsub topics for metrics
|
|
}
|
|
|
|
/// Unsubscribe from all topics that doesn't have the given fork_digest
|
|
pub fn unsubscribe_from_fork_topics_except(&mut self, except: [u8; 4]) {
|
|
let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone();
|
|
for topic in subscriptions
|
|
.iter()
|
|
.filter(|topic| topic.fork_digest != except)
|
|
.cloned()
|
|
{
|
|
self.unsubscribe(topic);
|
|
}
|
|
}
|
|
|
|
/// Remove topic weight from all topics that don't have the given fork digest.
|
|
pub fn remove_topic_weight_except(&mut self, except: [u8; 4]) {
|
|
let new_param = TopicScoreParams {
|
|
topic_weight: 0.0,
|
|
..Default::default()
|
|
};
|
|
let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone();
|
|
for topic in subscriptions
|
|
.iter()
|
|
.filter(|topic| topic.fork_digest != except)
|
|
{
|
|
let libp2p_topic: Topic = topic.clone().into();
|
|
match self
|
|
.gossipsub_mut()
|
|
.set_topic_params(libp2p_topic, new_param.clone())
|
|
{
|
|
Ok(_) => debug!(%topic, "Removed topic weight"),
|
|
Err(e) => {
|
|
warn!(%topic, error = e, "Failed to remove topic weight")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Subscribe to all data columns determined by the cgc.
|
|
pub fn subscribe_new_data_column_subnets(&mut self, sampling_column_count: u64) {
|
|
self.network_globals
|
|
.update_data_column_subnets(sampling_column_count);
|
|
|
|
for column in self.network_globals.sampling_subnets() {
|
|
let kind = GossipKind::DataColumnSidecar(column);
|
|
self.subscribe_kind(kind);
|
|
}
|
|
}
|
|
|
|
/// Returns the scoring parameters for a topic if set.
|
|
pub fn get_topic_params(&self, topic: GossipTopic) -> Option<&TopicScoreParams> {
|
|
self.swarm
|
|
.behaviour()
|
|
.gossipsub
|
|
.get_topic_params(&topic.into())
|
|
}
|
|
|
|
/// Subscribes to a gossipsub topic.
|
|
///
|
|
/// Returns `true` if the subscription was successful and `false` otherwise.
|
|
pub fn subscribe(&mut self, topic: GossipTopic) -> bool {
|
|
// update the network globals
|
|
self.network_globals
|
|
.gossipsub_subscriptions
|
|
.write()
|
|
.insert(topic.clone());
|
|
|
|
let topic: Topic = topic.into();
|
|
|
|
match self.gossipsub_mut().subscribe(&topic) {
|
|
Err(e) => {
|
|
warn!(%topic, error = ?e, "Failed to subscribe to topic");
|
|
false
|
|
}
|
|
Ok(_) => {
|
|
debug!(%topic, "Subscribed to topic");
|
|
true
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Unsubscribe from a gossipsub topic.
|
|
pub fn unsubscribe(&mut self, topic: GossipTopic) -> bool {
|
|
// update the network globals
|
|
self.network_globals
|
|
.gossipsub_subscriptions
|
|
.write()
|
|
.remove(&topic);
|
|
|
|
// unsubscribe from the topic
|
|
let libp2p_topic: Topic = topic.clone().into();
|
|
|
|
debug!(%topic, "Unsubscribed to topic");
|
|
self.gossipsub_mut().unsubscribe(&libp2p_topic)
|
|
}
|
|
|
|
/// Publishes a list of messages on the pubsub (gossipsub) behaviour, choosing the encoding.
|
|
pub fn publish(&mut self, messages: Vec<PubsubMessage<E>>) {
|
|
for message in messages {
|
|
for topic in message.topics(GossipEncoding::default(), self.enr_fork_id.fork_digest) {
|
|
let message_data = message.encode(GossipEncoding::default());
|
|
if let Err(e) = self
|
|
.gossipsub_mut()
|
|
.publish(Topic::from(topic.clone()), message_data.clone())
|
|
{
|
|
match e {
|
|
PublishError::Duplicate => {
|
|
debug!(
|
|
kind = %topic.kind(),
|
|
"Attempted to publish duplicate message"
|
|
);
|
|
}
|
|
ref e => {
|
|
warn!(
|
|
error = ?e,
|
|
kind = %topic.kind(),
|
|
"Could not publish message"
|
|
);
|
|
}
|
|
}
|
|
|
|
// add to metrics
|
|
match topic.kind() {
|
|
GossipKind::Attestation(subnet_id) => {
|
|
if let Some(v) = metrics::get_int_gauge(
|
|
&metrics::FAILED_ATTESTATION_PUBLISHES_PER_SUBNET,
|
|
&[subnet_id.as_ref()],
|
|
) {
|
|
v.inc()
|
|
};
|
|
}
|
|
kind => {
|
|
if let Some(v) = metrics::get_int_gauge(
|
|
&metrics::FAILED_PUBLISHES_PER_MAIN_TOPIC,
|
|
&[&format!("{:?}", kind)],
|
|
) {
|
|
v.inc()
|
|
};
|
|
}
|
|
}
|
|
|
|
if let PublishError::NoPeersSubscribedToTopic = e {
|
|
self.gossip_cache.insert(topic, message_data);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Informs the gossipsub about the result of a message validation.
|
|
/// If the message is valid it will get propagated by gossipsub.
|
|
pub fn report_message_validation_result(
|
|
&mut self,
|
|
propagation_source: &PeerId,
|
|
message_id: MessageId,
|
|
validation_result: MessageAcceptance,
|
|
) {
|
|
if let Some(result) = match validation_result {
|
|
MessageAcceptance::Accept => None,
|
|
MessageAcceptance::Ignore => Some("ignore"),
|
|
MessageAcceptance::Reject => Some("reject"),
|
|
} && let Some(client) = self
|
|
.network_globals
|
|
.peers
|
|
.read()
|
|
.peer_info(propagation_source)
|
|
.map(|info| info.client().kind.as_ref())
|
|
{
|
|
metrics::inc_counter_vec(
|
|
&metrics::GOSSIP_UNACCEPTED_MESSAGES_PER_CLIENT,
|
|
&[client, result],
|
|
)
|
|
}
|
|
|
|
self.gossipsub_mut().report_message_validation_result(
|
|
&message_id,
|
|
propagation_source,
|
|
validation_result,
|
|
);
|
|
}
|
|
|
|
/// Updates the current gossipsub scoring parameters based on the validator count and current
|
|
/// slot.
|
|
pub fn update_gossipsub_parameters(
|
|
&mut self,
|
|
active_validators: usize,
|
|
current_slot: Slot,
|
|
) -> Result<(), String> {
|
|
let (beacon_block_params, beacon_aggregate_proof_params, beacon_attestation_subnet_params) =
|
|
self.score_settings
|
|
.get_dynamic_topic_params(active_validators, current_slot)?;
|
|
|
|
let fork_digest = self.enr_fork_id.fork_digest;
|
|
let get_topic = |kind: GossipKind| -> Topic {
|
|
GossipTopic::new(kind, GossipEncoding::default(), fork_digest).into()
|
|
};
|
|
|
|
debug!(active_validators, "Updating gossipsub score parameters");
|
|
trace!(
|
|
?beacon_block_params,
|
|
?beacon_aggregate_proof_params,
|
|
?beacon_attestation_subnet_params,
|
|
"Updated gossipsub score parameters"
|
|
);
|
|
|
|
self.gossipsub_mut()
|
|
.set_topic_params(get_topic(GossipKind::BeaconBlock), beacon_block_params)?;
|
|
|
|
self.gossipsub_mut().set_topic_params(
|
|
get_topic(GossipKind::BeaconAggregateAndProof),
|
|
beacon_aggregate_proof_params,
|
|
)?;
|
|
|
|
for i in 0..self.score_settings.attestation_subnet_count() {
|
|
self.gossipsub_mut().set_topic_params(
|
|
get_topic(GossipKind::Attestation(SubnetId::new(i))),
|
|
beacon_attestation_subnet_params.clone(),
|
|
)?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/* Eth2 RPC behaviour functions */
|
|
|
|
/// Send a request to a peer over RPC.
|
|
#[allow(clippy::result_large_err)]
|
|
pub fn send_request(
|
|
&mut self,
|
|
peer_id: PeerId,
|
|
app_request_id: AppRequestId,
|
|
request: RequestType<E>,
|
|
) -> Result<(), (AppRequestId, RPCError)> {
|
|
// Check if the peer is connected before sending an RPC request
|
|
if !self.swarm.is_connected(&peer_id) {
|
|
return Err((app_request_id, RPCError::Disconnected));
|
|
}
|
|
|
|
self.eth2_rpc_mut()
|
|
.send_request(peer_id, app_request_id, request);
|
|
Ok(())
|
|
}
|
|
|
|
/// Send a successful response to a peer over RPC.
|
|
pub fn send_response<T: Into<RpcResponse<E>>>(
|
|
&mut self,
|
|
peer_id: PeerId,
|
|
inbound_request_id: InboundRequestId,
|
|
response: T,
|
|
) {
|
|
if let Err(response) = self
|
|
.eth2_rpc_mut()
|
|
.send_response(inbound_request_id, response.into())
|
|
&& self.network_globals.peers.read().is_connected(&peer_id)
|
|
{
|
|
error!(%peer_id, ?inbound_request_id, %response,
|
|
"Request not found in RPC active requests"
|
|
);
|
|
}
|
|
}
|
|
|
|
/* Peer management functions */
|
|
pub fn testing_dial(&mut self, addr: Multiaddr) -> Result<(), libp2p::swarm::DialError> {
|
|
self.swarm.dial(addr)
|
|
}
|
|
|
|
pub fn report_peer(
|
|
&mut self,
|
|
peer_id: &PeerId,
|
|
action: PeerAction,
|
|
source: ReportSource,
|
|
msg: &'static str,
|
|
) {
|
|
self.peer_manager_mut()
|
|
.report_peer(peer_id, action, source, None, msg);
|
|
}
|
|
|
|
/// Disconnects from a peer providing a reason.
|
|
///
|
|
/// This will send a goodbye, disconnect and then ban the peer.
|
|
/// This is fatal for a peer, and should be used in unrecoverable circumstances.
|
|
pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason, source: ReportSource) {
|
|
self.peer_manager_mut()
|
|
.goodbye_peer(peer_id, reason, source);
|
|
}
|
|
|
|
/// Hard (ungraceful) disconnect for testing purposes only
|
|
/// Use goodbye_peer for disconnections, do not use this function.
|
|
pub fn __hard_disconnect_testing_only(&mut self, peer_id: PeerId) {
|
|
let _ = self.swarm.disconnect_peer_id(peer_id);
|
|
}
|
|
|
|
/// Returns an iterator over all enr entries in the DHT.
|
|
pub fn enr_entries(&self) -> Vec<Enr> {
|
|
self.discovery().table_entries_enr()
|
|
}
|
|
|
|
/// Add an ENR to the routing table of the discovery mechanism.
|
|
pub fn add_enr(&mut self, enr: Enr) {
|
|
self.discovery_mut().add_enr(enr);
|
|
}
|
|
|
|
/// Updates a subnet value to the ENR attnets/syncnets bitfield.
|
|
///
|
|
/// The `value` is `true` if a subnet is being added and false otherwise.
|
|
pub fn update_enr_subnet(&mut self, subnet_id: Subnet, value: bool) {
|
|
if let Err(e) = self.discovery_mut().update_enr_bitfield(subnet_id, value) {
|
|
crit!(error = e, "Could not update ENR bitfield");
|
|
}
|
|
// update the local meta data which informs our peers of the update during PINGS
|
|
self.update_metadata_bitfields();
|
|
}
|
|
|
|
/// Updates the cgc value in the ENR.
|
|
pub fn update_enr_cgc(&mut self, new_custody_group_count: u64) {
|
|
if let Err(e) = self.discovery_mut().update_enr_cgc(new_custody_group_count) {
|
|
crit!(error = e, "Could not update cgc in ENR");
|
|
}
|
|
// update the local meta data which informs our peers of the update during PINGS
|
|
self.update_metadata_cgc(new_custody_group_count);
|
|
}
|
|
|
|
/// Attempts to discover new peers for a given subnet. The `min_ttl` gives the time at which we
|
|
/// would like to retain the peers for.
|
|
pub fn discover_subnet_peers(&mut self, subnets_to_discover: Vec<SubnetDiscovery>) {
|
|
// If discovery is not started or disabled, ignore the request
|
|
if !self.discovery().started {
|
|
return;
|
|
}
|
|
|
|
let spec = Arc::new(self.fork_context.spec.clone());
|
|
let filtered: Vec<SubnetDiscovery> = subnets_to_discover
|
|
.into_iter()
|
|
.filter(|s| {
|
|
// Extend min_ttl of connected peers on required subnets
|
|
if let Some(min_ttl) = s.min_ttl {
|
|
self.network_globals
|
|
.peers
|
|
.write()
|
|
.extend_peers_on_subnet(&s.subnet, min_ttl);
|
|
if let Subnet::SyncCommittee(sync_subnet) = s.subnet {
|
|
self.peer_manager_mut()
|
|
.add_sync_subnet(sync_subnet, min_ttl);
|
|
}
|
|
}
|
|
// Already have target number of peers, no need for subnet discovery
|
|
let peers_on_subnet = self
|
|
.network_globals
|
|
.peers
|
|
.read()
|
|
.good_peers_on_subnet(s.subnet)
|
|
.count();
|
|
if peers_on_subnet >= TARGET_SUBNET_PEERS {
|
|
trace!(
|
|
subnet = ?s.subnet,
|
|
reason = "Already connected to desired peers",
|
|
connected_peers_on_subnet = peers_on_subnet,
|
|
target_subnet_peers = TARGET_SUBNET_PEERS,
|
|
"Discovery query ignored"
|
|
);
|
|
false
|
|
// Queue an outgoing connection request to the cached peers that are on `s.subnet_id`.
|
|
// If we connect to the cached peers before the discovery query starts, then we potentially
|
|
// save a costly discovery query.
|
|
} else {
|
|
self.dial_cached_enrs_in_subnet(s.subnet, spec.clone());
|
|
true
|
|
}
|
|
})
|
|
.collect();
|
|
|
|
// request the subnet query from discovery
|
|
if !filtered.is_empty() {
|
|
self.discovery_mut().discover_subnet_peers(filtered);
|
|
}
|
|
}
|
|
|
|
/// Updates the local ENR's "eth2" field with the latest EnrForkId.
|
|
pub fn update_fork_version(&mut self, enr_fork_id: EnrForkId) {
|
|
self.discovery_mut().update_eth2_enr(enr_fork_id.clone());
|
|
|
|
// update the local reference
|
|
self.enr_fork_id = enr_fork_id;
|
|
}
|
|
|
|
pub fn update_nfd(&mut self, nfd: [u8; 4]) {
|
|
if let Err(e) = self.discovery_mut().update_enr_nfd(nfd) {
|
|
crit!(error = e, "Could not update nfd in ENR");
|
|
}
|
|
}
|
|
|
|
/* Private internal functions */
|
|
|
|
/// Updates the current meta data of the node to match the local ENR.
|
|
fn update_metadata_bitfields(&mut self) {
|
|
let local_attnets = self
|
|
.discovery_mut()
|
|
.local_enr()
|
|
.attestation_bitfield::<E>()
|
|
.expect("Local discovery must have attestation bitfield");
|
|
|
|
let local_syncnets = self
|
|
.discovery_mut()
|
|
.local_enr()
|
|
.sync_committee_bitfield::<E>()
|
|
.expect("Local discovery must have sync committee bitfield");
|
|
|
|
// write lock scope
|
|
let mut meta_data_w = self.network_globals.local_metadata.write();
|
|
|
|
*meta_data_w.seq_number_mut() += 1;
|
|
*meta_data_w.attnets_mut() = local_attnets;
|
|
if let Ok(syncnets) = meta_data_w.syncnets_mut() {
|
|
*syncnets = local_syncnets;
|
|
}
|
|
let seq_number = *meta_data_w.seq_number();
|
|
let meta_data = meta_data_w.clone();
|
|
|
|
drop(meta_data_w);
|
|
self.eth2_rpc_mut().update_seq_number(seq_number);
|
|
// Save the updated metadata to disk
|
|
utils::save_metadata_to_disk(&self.network_dir, meta_data);
|
|
}
|
|
|
|
fn update_metadata_cgc(&mut self, custody_group_count: u64) {
|
|
let mut meta_data_w = self.network_globals.local_metadata.write();
|
|
|
|
*meta_data_w.seq_number_mut() += 1;
|
|
if let Ok(cgc) = meta_data_w.custody_group_count_mut() {
|
|
*cgc = custody_group_count;
|
|
}
|
|
let seq_number = *meta_data_w.seq_number();
|
|
let meta_data = meta_data_w.clone();
|
|
|
|
drop(meta_data_w);
|
|
self.eth2_rpc_mut().update_seq_number(seq_number);
|
|
// Save the updated metadata to disk
|
|
utils::save_metadata_to_disk(&self.network_dir, meta_data);
|
|
}
|
|
|
|
/// Sends a Ping request to the peer.
|
|
fn ping(&mut self, peer_id: PeerId) {
|
|
self.eth2_rpc_mut().ping(peer_id, AppRequestId::Internal);
|
|
}
|
|
|
|
/// Sends a METADATA request to a peer.
|
|
fn send_meta_data_request(&mut self, peer_id: PeerId) {
|
|
let event = if self.fork_context.spec.is_peer_das_scheduled() {
|
|
// Nodes with higher custody will probably start advertising it
|
|
// before peerdas is activated
|
|
RequestType::MetaData(MetadataRequest::new_v3())
|
|
} else {
|
|
// We always prefer sending V2 requests otherwise
|
|
RequestType::MetaData(MetadataRequest::new_v2())
|
|
};
|
|
self.eth2_rpc_mut()
|
|
.send_request(peer_id, AppRequestId::Internal, event);
|
|
}
|
|
|
|
/// Sends a METADATA response to a peer.
|
|
// RPC Propagation methods
|
|
/// Queues the response to be sent upwards as long at it was requested outside the Behaviour.
|
|
#[must_use = "return the response"]
|
|
fn build_response(
|
|
&mut self,
|
|
app_request_id: AppRequestId,
|
|
peer_id: PeerId,
|
|
response: Response<E>,
|
|
) -> Option<NetworkEvent<E>> {
|
|
match app_request_id {
|
|
AppRequestId::Internal => None,
|
|
_ => Some(NetworkEvent::ResponseReceived {
|
|
peer_id,
|
|
app_request_id,
|
|
response,
|
|
}),
|
|
}
|
|
}
|
|
|
|
/// Dial cached Enrs in discovery service that are in the given `subnet_id` and aren't
|
|
/// in Connected, Dialing or Banned state.
|
|
fn dial_cached_enrs_in_subnet(&mut self, subnet: Subnet, spec: Arc<ChainSpec>) {
|
|
let predicate = subnet_predicate::<E>(vec![subnet], spec);
|
|
let peers_to_dial: Vec<Enr> = self
|
|
.discovery()
|
|
.cached_enrs()
|
|
.filter_map(|(_peer_id, enr)| {
|
|
if predicate(enr) {
|
|
Some(enr.clone())
|
|
} else {
|
|
None
|
|
}
|
|
})
|
|
.collect();
|
|
|
|
// Remove the ENR from the cache to prevent continual re-dialing on disconnects
|
|
for enr in peers_to_dial {
|
|
self.discovery_mut().remove_cached_enr(&enr.peer_id());
|
|
let peer_id = enr.peer_id();
|
|
if self.peer_manager_mut().dial_peer(enr) {
|
|
debug!(%peer_id, "Added cached ENR peer to dial queue");
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Adds the given `enr` to the trusted peers mapping and tries to dial it
|
|
/// every heartbeat to maintain the connection.
|
|
pub fn dial_trusted_peer(&mut self, enr: Enr) {
|
|
self.peer_manager_mut().add_trusted_peer(enr.clone());
|
|
self.peer_manager_mut().dial_peer(enr);
|
|
}
|
|
|
|
/// Remove the given peer from the trusted peers mapping if it exists and disconnect
|
|
/// from it.
|
|
pub fn remove_trusted_peer(&mut self, enr: Enr) {
|
|
self.peer_manager_mut().remove_trusted_peer(enr.clone());
|
|
self.peer_manager_mut()
|
|
.disconnect_peer(enr.peer_id(), GoodbyeReason::TooManyPeers);
|
|
}
|
|
|
|
/* Sub-behaviour event handling functions */
|
|
|
|
/// Handle a gossipsub event.
|
|
fn inject_gs_event(&mut self, event: gossipsub::Event) -> Option<NetworkEvent<E>> {
|
|
match event {
|
|
gossipsub::Event::Message {
|
|
propagation_source,
|
|
message_id: id,
|
|
message: gs_msg,
|
|
} => {
|
|
// Note: We are keeping track here of the peer that sent us the message, not the
|
|
// peer that originally published the message.
|
|
match PubsubMessage::decode(&gs_msg.topic, &gs_msg.data, &self.fork_context) {
|
|
Err(e) => {
|
|
debug!(topic = ?gs_msg.topic, error = e, "Could not decode gossipsub message");
|
|
//reject the message
|
|
self.gossipsub_mut().report_message_validation_result(
|
|
&id,
|
|
&propagation_source,
|
|
MessageAcceptance::Reject,
|
|
);
|
|
}
|
|
Ok(msg) => {
|
|
// Notify the network
|
|
return Some(NetworkEvent::PubsubMessage {
|
|
id,
|
|
source: propagation_source,
|
|
topic: gs_msg.topic,
|
|
message: msg,
|
|
});
|
|
}
|
|
}
|
|
}
|
|
gossipsub::Event::Subscribed { peer_id, topic } => {
|
|
if let Ok(topic) = GossipTopic::decode(topic.as_str()) {
|
|
if let Some(subnet_id) = topic.subnet_id() {
|
|
self.network_globals
|
|
.peers
|
|
.write()
|
|
.add_subscription(&peer_id, subnet_id);
|
|
}
|
|
// Try to send the cached messages for this topic
|
|
if let Some(msgs) = self.gossip_cache.retrieve(&topic) {
|
|
for data in msgs {
|
|
let topic_str: &str = topic.kind().as_ref();
|
|
match self
|
|
.swarm
|
|
.behaviour_mut()
|
|
.gossipsub
|
|
.publish(Topic::from(topic.clone()), data)
|
|
{
|
|
Ok(_) => {
|
|
debug!(topic = topic_str, "Gossip message published on retry");
|
|
metrics::inc_counter_vec(
|
|
&metrics::GOSSIP_LATE_PUBLISH_PER_TOPIC_KIND,
|
|
&[topic_str],
|
|
);
|
|
}
|
|
Err(PublishError::Duplicate) => {
|
|
debug!(
|
|
reason = "duplicate",
|
|
topic = topic_str,
|
|
"Gossip message publish ignored on retry"
|
|
);
|
|
metrics::inc_counter_vec(
|
|
&metrics::GOSSIP_FAILED_LATE_PUBLISH_PER_TOPIC_KIND,
|
|
&[topic_str],
|
|
);
|
|
}
|
|
Err(e) => {
|
|
warn!(
|
|
topic = topic_str,
|
|
error = %e,
|
|
"Gossip message publish failed on retry"
|
|
);
|
|
metrics::inc_counter_vec(
|
|
&metrics::GOSSIP_FAILED_LATE_PUBLISH_PER_TOPIC_KIND,
|
|
&[topic_str],
|
|
);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
gossipsub::Event::Unsubscribed { peer_id, topic } => {
|
|
if let Some(subnet_id) = subnet_from_topic_hash(&topic) {
|
|
self.network_globals
|
|
.peers
|
|
.write()
|
|
.remove_subscription(&peer_id, &subnet_id);
|
|
}
|
|
}
|
|
gossipsub::Event::GossipsubNotSupported { peer_id } => {
|
|
debug!(%peer_id, "Peer does not support gossipsub");
|
|
self.peer_manager_mut().report_peer(
|
|
&peer_id,
|
|
PeerAction::Fatal,
|
|
ReportSource::Gossipsub,
|
|
Some(GoodbyeReason::Unknown),
|
|
"does_not_support_gossipsub",
|
|
);
|
|
}
|
|
gossipsub::Event::SlowPeer {
|
|
peer_id,
|
|
failed_messages,
|
|
} => {
|
|
debug!(
|
|
peer_id = %peer_id,
|
|
priority = failed_messages.priority,
|
|
non_priority = failed_messages.non_priority,
|
|
"Slow gossipsub peer"
|
|
);
|
|
// Punish the peer if it cannot handle priority messages
|
|
if failed_messages.priority > 10 {
|
|
debug!(%peer_id, "Slow gossipsub peer penalized for priority failure");
|
|
self.peer_manager_mut().report_peer(
|
|
&peer_id,
|
|
PeerAction::HighToleranceError,
|
|
ReportSource::Gossipsub,
|
|
None,
|
|
"publish_timeout_penalty",
|
|
);
|
|
} else if failed_messages.non_priority > 10 {
|
|
debug!(%peer_id, "Slow gossipsub peer penalized for send queue full");
|
|
self.peer_manager_mut().report_peer(
|
|
&peer_id,
|
|
PeerAction::HighToleranceError,
|
|
ReportSource::Gossipsub,
|
|
None,
|
|
"queue_full_penalty",
|
|
);
|
|
}
|
|
}
|
|
}
|
|
None
|
|
}
|
|
|
|
/// Handle an RPC event.
|
|
fn inject_rpc_event(&mut self, event: RPCMessage<AppRequestId, E>) -> Option<NetworkEvent<E>> {
|
|
let peer_id = event.peer_id;
|
|
|
|
// Do not permit Inbound events from peers that are being disconnected or RPC requests,
|
|
// but allow `RpcFailed` and `HandlerErr::Outbound` to be bubble up to sync for state management.
|
|
if !self.peer_manager().is_connected(&peer_id)
|
|
&& (matches!(event.message, Err(HandlerErr::Inbound { .. }))
|
|
|| matches!(event.message, Ok(RPCReceived::Request(..))))
|
|
{
|
|
debug!(?event, "Ignoring rpc message of disconnecting peer");
|
|
return None;
|
|
}
|
|
|
|
// The PING RPC responses are handled within the behaviour and not propagated
|
|
match event.message {
|
|
Err(handler_err) => {
|
|
match handler_err {
|
|
HandlerErr::Inbound {
|
|
id: _,
|
|
proto,
|
|
error,
|
|
} => {
|
|
// Inform the peer manager of the error.
|
|
// An inbound error here means we sent an error to the peer, or the stream
|
|
// timed out.
|
|
self.peer_manager_mut().handle_rpc_error(
|
|
&peer_id,
|
|
proto,
|
|
&error,
|
|
ConnectionDirection::Incoming,
|
|
);
|
|
None
|
|
}
|
|
HandlerErr::Outbound { id, proto, error } => {
|
|
// Inform the peer manager that a request we sent to the peer failed
|
|
self.peer_manager_mut().handle_rpc_error(
|
|
&peer_id,
|
|
proto,
|
|
&error,
|
|
ConnectionDirection::Outgoing,
|
|
);
|
|
// inform failures of requests coming outside the behaviour
|
|
if let AppRequestId::Internal = id {
|
|
None
|
|
} else {
|
|
Some(NetworkEvent::RPCFailed {
|
|
peer_id,
|
|
app_request_id: id,
|
|
error,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Ok(RPCReceived::Request(inbound_request_id, request_type)) => {
|
|
match request_type {
|
|
/* Behaviour managed protocols: Ping and Metadata */
|
|
RequestType::Ping(ping) => {
|
|
// inform the peer manager and send the response
|
|
self.peer_manager_mut().ping_request(&peer_id, ping.data);
|
|
None
|
|
}
|
|
RequestType::MetaData(_req) => {
|
|
// send the requested meta-data
|
|
let metadata = self.network_globals.local_metadata.read().clone();
|
|
// The encoder is responsible for sending the negotiated version of the metadata
|
|
let response =
|
|
RpcResponse::Success(RpcSuccessResponse::MetaData(Arc::new(metadata)));
|
|
self.send_response(peer_id, inbound_request_id, response);
|
|
None
|
|
}
|
|
RequestType::Goodbye(reason) => {
|
|
// queue for disconnection without a goodbye message
|
|
debug!(
|
|
%peer_id,
|
|
%reason,
|
|
client = %self.network_globals.client(&peer_id),
|
|
"Peer sent Goodbye"
|
|
);
|
|
// NOTE: We currently do not inform the application that we are
|
|
// disconnecting here. The RPC handler will automatically
|
|
// disconnect for us.
|
|
// The actual disconnection event will be relayed to the application.
|
|
None
|
|
}
|
|
/* Protocols propagated to the Network */
|
|
RequestType::Status(_) => {
|
|
// inform the peer manager that we have received a status from a peer
|
|
self.peer_manager_mut().peer_statusd(&peer_id);
|
|
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["status"]);
|
|
// propagate the STATUS message upwards
|
|
Some(NetworkEvent::RequestReceived {
|
|
peer_id,
|
|
inbound_request_id,
|
|
request_type,
|
|
})
|
|
}
|
|
RequestType::BlocksByRange(ref req) => {
|
|
// Still disconnect the peer if the request is naughty.
|
|
if *req.step() == 0 {
|
|
self.peer_manager_mut().handle_rpc_error(
|
|
&peer_id,
|
|
Protocol::BlocksByRange,
|
|
&RPCError::InvalidData(
|
|
"Blocks by range with 0 step parameter".into(),
|
|
),
|
|
ConnectionDirection::Incoming,
|
|
);
|
|
return None;
|
|
}
|
|
metrics::inc_counter_vec(
|
|
&metrics::TOTAL_RPC_REQUESTS,
|
|
&["blocks_by_range"],
|
|
);
|
|
Some(NetworkEvent::RequestReceived {
|
|
peer_id,
|
|
inbound_request_id,
|
|
request_type,
|
|
})
|
|
}
|
|
RequestType::BlocksByRoot(_) => {
|
|
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blocks_by_root"]);
|
|
Some(NetworkEvent::RequestReceived {
|
|
peer_id,
|
|
inbound_request_id,
|
|
request_type,
|
|
})
|
|
}
|
|
RequestType::BlobsByRange(_) => {
|
|
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blobs_by_range"]);
|
|
Some(NetworkEvent::RequestReceived {
|
|
peer_id,
|
|
inbound_request_id,
|
|
request_type,
|
|
})
|
|
}
|
|
RequestType::BlobsByRoot(_) => {
|
|
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blobs_by_root"]);
|
|
Some(NetworkEvent::RequestReceived {
|
|
peer_id,
|
|
inbound_request_id,
|
|
request_type,
|
|
})
|
|
}
|
|
RequestType::DataColumnsByRoot(_) => {
|
|
metrics::inc_counter_vec(
|
|
&metrics::TOTAL_RPC_REQUESTS,
|
|
&["data_columns_by_root"],
|
|
);
|
|
Some(NetworkEvent::RequestReceived {
|
|
peer_id,
|
|
inbound_request_id,
|
|
request_type,
|
|
})
|
|
}
|
|
RequestType::DataColumnsByRange(_) => {
|
|
metrics::inc_counter_vec(
|
|
&metrics::TOTAL_RPC_REQUESTS,
|
|
&["data_columns_by_range"],
|
|
);
|
|
Some(NetworkEvent::RequestReceived {
|
|
peer_id,
|
|
inbound_request_id,
|
|
request_type,
|
|
})
|
|
}
|
|
RequestType::LightClientBootstrap(_) => {
|
|
metrics::inc_counter_vec(
|
|
&metrics::TOTAL_RPC_REQUESTS,
|
|
&["light_client_bootstrap"],
|
|
);
|
|
Some(NetworkEvent::RequestReceived {
|
|
peer_id,
|
|
inbound_request_id,
|
|
request_type,
|
|
})
|
|
}
|
|
RequestType::LightClientOptimisticUpdate => {
|
|
metrics::inc_counter_vec(
|
|
&metrics::TOTAL_RPC_REQUESTS,
|
|
&["light_client_optimistic_update"],
|
|
);
|
|
Some(NetworkEvent::RequestReceived {
|
|
peer_id,
|
|
inbound_request_id,
|
|
request_type,
|
|
})
|
|
}
|
|
RequestType::LightClientFinalityUpdate => {
|
|
metrics::inc_counter_vec(
|
|
&metrics::TOTAL_RPC_REQUESTS,
|
|
&["light_client_finality_update"],
|
|
);
|
|
Some(NetworkEvent::RequestReceived {
|
|
peer_id,
|
|
inbound_request_id,
|
|
request_type,
|
|
})
|
|
}
|
|
RequestType::LightClientUpdatesByRange(_) => {
|
|
metrics::inc_counter_vec(
|
|
&metrics::TOTAL_RPC_REQUESTS,
|
|
&["light_client_updates_by_range"],
|
|
);
|
|
Some(NetworkEvent::RequestReceived {
|
|
peer_id,
|
|
inbound_request_id,
|
|
request_type,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
Ok(RPCReceived::Response(id, resp)) => {
|
|
match resp {
|
|
/* Behaviour managed protocols */
|
|
RpcSuccessResponse::Pong(ping) => {
|
|
self.peer_manager_mut().pong_response(&peer_id, ping.data);
|
|
None
|
|
}
|
|
RpcSuccessResponse::MetaData(meta_data) => {
|
|
let updated_cgc = self
|
|
.peer_manager_mut()
|
|
.meta_data_response(&peer_id, meta_data.as_ref().clone());
|
|
// Send event after calling into peer_manager so the PeerDB is updated.
|
|
updated_cgc.then(|| NetworkEvent::PeerUpdatedCustodyGroupCount(peer_id))
|
|
}
|
|
/* Network propagated protocols */
|
|
RpcSuccessResponse::Status(msg) => {
|
|
// inform the peer manager that we have received a status from a peer
|
|
self.peer_manager_mut().peer_statusd(&peer_id);
|
|
// propagate the STATUS message upwards
|
|
self.build_response(id, peer_id, Response::Status(msg))
|
|
}
|
|
RpcSuccessResponse::BlocksByRange(resp) => {
|
|
self.build_response(id, peer_id, Response::BlocksByRange(Some(resp)))
|
|
}
|
|
RpcSuccessResponse::BlobsByRange(resp) => {
|
|
self.build_response(id, peer_id, Response::BlobsByRange(Some(resp)))
|
|
}
|
|
RpcSuccessResponse::BlocksByRoot(resp) => {
|
|
self.build_response(id, peer_id, Response::BlocksByRoot(Some(resp)))
|
|
}
|
|
RpcSuccessResponse::BlobsByRoot(resp) => {
|
|
self.build_response(id, peer_id, Response::BlobsByRoot(Some(resp)))
|
|
}
|
|
RpcSuccessResponse::DataColumnsByRoot(resp) => {
|
|
self.build_response(id, peer_id, Response::DataColumnsByRoot(Some(resp)))
|
|
}
|
|
RpcSuccessResponse::DataColumnsByRange(resp) => {
|
|
self.build_response(id, peer_id, Response::DataColumnsByRange(Some(resp)))
|
|
}
|
|
// Should never be reached
|
|
RpcSuccessResponse::LightClientBootstrap(bootstrap) => {
|
|
self.build_response(id, peer_id, Response::LightClientBootstrap(bootstrap))
|
|
}
|
|
RpcSuccessResponse::LightClientOptimisticUpdate(update) => self.build_response(
|
|
id,
|
|
peer_id,
|
|
Response::LightClientOptimisticUpdate(update),
|
|
),
|
|
RpcSuccessResponse::LightClientFinalityUpdate(update) => self.build_response(
|
|
id,
|
|
peer_id,
|
|
Response::LightClientFinalityUpdate(update),
|
|
),
|
|
RpcSuccessResponse::LightClientUpdatesByRange(update) => self.build_response(
|
|
id,
|
|
peer_id,
|
|
Response::LightClientUpdatesByRange(Some(update)),
|
|
),
|
|
}
|
|
}
|
|
Ok(RPCReceived::EndOfStream(id, termination)) => {
|
|
let response = match termination {
|
|
ResponseTermination::BlocksByRange => Response::BlocksByRange(None),
|
|
ResponseTermination::BlocksByRoot => Response::BlocksByRoot(None),
|
|
ResponseTermination::BlobsByRange => Response::BlobsByRange(None),
|
|
ResponseTermination::BlobsByRoot => Response::BlobsByRoot(None),
|
|
ResponseTermination::DataColumnsByRoot => Response::DataColumnsByRoot(None),
|
|
ResponseTermination::DataColumnsByRange => Response::DataColumnsByRange(None),
|
|
ResponseTermination::LightClientUpdatesByRange => {
|
|
Response::LightClientUpdatesByRange(None)
|
|
}
|
|
};
|
|
self.build_response(id, peer_id, response)
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Handle an identify event.
|
|
fn inject_identify_event(&mut self, event: identify::Event) -> Option<NetworkEvent<E>> {
|
|
match event {
|
|
identify::Event::Received {
|
|
peer_id,
|
|
mut info,
|
|
connection_id: _,
|
|
} => {
|
|
if info.listen_addrs.len() > MAX_IDENTIFY_ADDRESSES {
|
|
debug!("More than 10 addresses have been identified, truncating");
|
|
info.listen_addrs.truncate(MAX_IDENTIFY_ADDRESSES);
|
|
}
|
|
// send peer info to the peer manager.
|
|
self.peer_manager_mut().identify(&peer_id, &info);
|
|
}
|
|
identify::Event::Sent { .. } => {}
|
|
identify::Event::Error { .. } => {}
|
|
identify::Event::Pushed { .. } => {}
|
|
}
|
|
None
|
|
}
|
|
|
|
/// Handle a peer manager event.
|
|
fn inject_pm_event(&mut self, event: PeerManagerEvent) -> Option<NetworkEvent<E>> {
|
|
match event {
|
|
PeerManagerEvent::PeerConnectedIncoming(peer_id) => {
|
|
Some(NetworkEvent::PeerConnectedIncoming(peer_id))
|
|
}
|
|
PeerManagerEvent::PeerConnectedOutgoing(peer_id) => {
|
|
Some(NetworkEvent::PeerConnectedOutgoing(peer_id))
|
|
}
|
|
PeerManagerEvent::PeerDisconnected(peer_id) => {
|
|
Some(NetworkEvent::PeerDisconnected(peer_id))
|
|
}
|
|
PeerManagerEvent::Banned(peer_id, associated_ips) => {
|
|
self.discovery_mut().ban_peer(&peer_id, associated_ips);
|
|
None
|
|
}
|
|
PeerManagerEvent::UnBanned(peer_id, associated_ips) => {
|
|
self.discovery_mut().unban_peer(&peer_id, associated_ips);
|
|
None
|
|
}
|
|
PeerManagerEvent::Status(peer_id) => {
|
|
// it's time to status. We don't keep a beacon chain reference here, so we inform
|
|
// the network to send a status to this peer
|
|
Some(NetworkEvent::StatusPeer(peer_id))
|
|
}
|
|
PeerManagerEvent::DiscoverPeers(peers_to_find) => {
|
|
// Peer manager has requested a discovery query for more peers.
|
|
self.discovery_mut().discover_peers(peers_to_find);
|
|
None
|
|
}
|
|
PeerManagerEvent::DiscoverSubnetPeers(subnets_to_discover) => {
|
|
// Peer manager has requested a subnet discovery query for more peers.
|
|
self.discover_subnet_peers(subnets_to_discover);
|
|
None
|
|
}
|
|
PeerManagerEvent::Ping(peer_id) => {
|
|
// send a ping request to this peer
|
|
self.ping(peer_id);
|
|
None
|
|
}
|
|
PeerManagerEvent::MetaData(peer_id) => {
|
|
self.send_meta_data_request(peer_id);
|
|
None
|
|
}
|
|
PeerManagerEvent::DisconnectPeer(peer_id, reason) => {
|
|
debug!(%peer_id, %reason, "Peer Manager disconnecting peer");
|
|
// send one goodbye
|
|
self.eth2_rpc_mut()
|
|
.shutdown(peer_id, AppRequestId::Internal, reason);
|
|
None
|
|
}
|
|
}
|
|
}
|
|
|
|
fn inject_upnp_event(&mut self, event: libp2p::upnp::Event) {
|
|
match event {
|
|
libp2p::upnp::Event::NewExternalAddr(addr) => {
|
|
info!(%addr, "UPnP route established");
|
|
let mut iter = addr.iter();
|
|
let is_ip6 = {
|
|
let addr = iter.next();
|
|
matches!(addr, Some(MProtocol::Ip6(_)))
|
|
};
|
|
match iter.next() {
|
|
Some(multiaddr::Protocol::Udp(udp_port)) => match iter.next() {
|
|
Some(multiaddr::Protocol::QuicV1) => {
|
|
if let Err(e) =
|
|
self.discovery_mut().update_enr_quic_port(udp_port, is_ip6)
|
|
{
|
|
warn!(error = e, "Failed to update ENR");
|
|
}
|
|
}
|
|
_ => {
|
|
trace!(%addr, "UPnP address mapped multiaddr from unknown transport");
|
|
}
|
|
},
|
|
Some(multiaddr::Protocol::Tcp(tcp_port)) => {
|
|
if let Err(e) = self.discovery_mut().update_enr_tcp_port(tcp_port, is_ip6) {
|
|
warn!(error = e, "Failed to update ENR");
|
|
}
|
|
}
|
|
_ => {
|
|
trace!(%addr, "UPnP address mapped multiaddr from unknown transport");
|
|
}
|
|
}
|
|
}
|
|
libp2p::upnp::Event::ExpiredExternalAddr(_) => {}
|
|
libp2p::upnp::Event::GatewayNotFound => {
|
|
info!("UPnP not available");
|
|
}
|
|
libp2p::upnp::Event::NonRoutableGateway => {
|
|
info!("UPnP is available but gateway is not exposed to public network");
|
|
}
|
|
}
|
|
}
|
|
|
|
/* Networking polling */
|
|
pub async fn next_event(&mut self) -> NetworkEvent<E> {
|
|
loop {
|
|
tokio::select! {
|
|
// Poll the libp2p `Swarm`.
|
|
// This will poll the swarm and do maintenance routines.
|
|
Some(event) = self.swarm.next() => {
|
|
if let Some(event) = self.parse_swarm_event(event) {
|
|
return event;
|
|
}
|
|
},
|
|
// perform gossipsub score updates when necessary
|
|
_ = self.update_gossipsub_scores.tick() => {
|
|
let this = self.swarm.behaviour_mut();
|
|
this.peer_manager.update_gossipsub_scores(&this.gossipsub);
|
|
}
|
|
// poll the gossipsub cache to clear expired messages
|
|
Some(result) = self.gossip_cache.next() => {
|
|
match result {
|
|
Err(e) => warn!(error = e, "Gossip cache error"),
|
|
Ok(expired_topic) => {
|
|
if let Some(v) = metrics::get_int_counter(
|
|
&metrics::GOSSIP_EXPIRED_LATE_PUBLISH_PER_TOPIC_KIND,
|
|
&[expired_topic.kind().as_ref()],
|
|
) {
|
|
v.inc()
|
|
};
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn parse_swarm_event(
|
|
&mut self,
|
|
event: SwarmEvent<BehaviourEvent<E>>,
|
|
) -> Option<NetworkEvent<E>> {
|
|
match event {
|
|
SwarmEvent::Behaviour(behaviour_event) => match behaviour_event {
|
|
// Handle sub-behaviour events.
|
|
BehaviourEvent::Gossipsub(ge) => self.inject_gs_event(ge),
|
|
BehaviourEvent::Eth2Rpc(re) => self.inject_rpc_event(re),
|
|
// Inform the peer manager about discovered peers.
|
|
//
|
|
// The peer manager will subsequently decide which peers need to be dialed and then dial
|
|
// them.
|
|
BehaviourEvent::Discovery(DiscoveredPeers { peers }) => {
|
|
self.peer_manager_mut().peers_discovered(peers);
|
|
None
|
|
}
|
|
BehaviourEvent::Identify(ie) => self.inject_identify_event(ie),
|
|
BehaviourEvent::PeerManager(pe) => self.inject_pm_event(pe),
|
|
BehaviourEvent::Upnp(e) => {
|
|
self.inject_upnp_event(e);
|
|
None
|
|
}
|
|
#[allow(unreachable_patterns)]
|
|
BehaviourEvent::ConnectionLimits(le) => libp2p::core::util::unreachable(le),
|
|
},
|
|
SwarmEvent::ConnectionEstablished { .. } => None,
|
|
SwarmEvent::ConnectionClosed { .. } => None,
|
|
SwarmEvent::IncomingConnection {
|
|
local_addr,
|
|
send_back_addr,
|
|
connection_id: _,
|
|
} => {
|
|
trace!(our_addr = %local_addr, from = %send_back_addr, "Incoming connection");
|
|
None
|
|
}
|
|
SwarmEvent::IncomingConnectionError {
|
|
local_addr,
|
|
send_back_addr,
|
|
error,
|
|
connection_id: _,
|
|
peer_id: _,
|
|
} => {
|
|
let error_repr = match error {
|
|
libp2p::swarm::ListenError::Aborted => {
|
|
"Incoming connection aborted".to_string()
|
|
}
|
|
libp2p::swarm::ListenError::WrongPeerId { obtained, endpoint } => {
|
|
format!("Wrong peer id, obtained {obtained}, endpoint {endpoint:?}")
|
|
}
|
|
libp2p::swarm::ListenError::LocalPeerId { address } => {
|
|
format!("Dialing local peer id {address:?}")
|
|
}
|
|
libp2p::swarm::ListenError::Denied { cause } => {
|
|
format!("Connection was denied with cause: {cause:?}")
|
|
}
|
|
libp2p::swarm::ListenError::Transport(t) => match t {
|
|
libp2p::TransportError::MultiaddrNotSupported(m) => {
|
|
format!("Transport error: Multiaddr not supported: {m}")
|
|
}
|
|
libp2p::TransportError::Other(e) => {
|
|
format!("Transport error: other: {e}")
|
|
}
|
|
},
|
|
};
|
|
debug!(our_addr = %local_addr, from = %send_back_addr, error = error_repr, "Failed incoming connection");
|
|
None
|
|
}
|
|
SwarmEvent::OutgoingConnectionError {
|
|
peer_id: _,
|
|
error: _,
|
|
connection_id: _,
|
|
} => {
|
|
// The Behaviour event is more general than the swarm event here. It includes
|
|
// connection failures. So we use that log for now, in the peer manager
|
|
// behaviour implementation.
|
|
None
|
|
}
|
|
SwarmEvent::NewListenAddr { address, .. } => Some(NetworkEvent::NewListenAddr(address)),
|
|
SwarmEvent::ExpiredListenAddr { address, .. } => {
|
|
debug!(%address, "Listen address expired");
|
|
None
|
|
}
|
|
SwarmEvent::ListenerClosed {
|
|
addresses, reason, ..
|
|
} => {
|
|
match reason {
|
|
Ok(_) => {
|
|
debug!(?addresses, "Listener gracefully closed")
|
|
}
|
|
Err(reason) => {
|
|
crit!(?addresses, ?reason, "Listener abruptly closed")
|
|
}
|
|
};
|
|
if Swarm::listeners(&self.swarm).count() == 0 {
|
|
Some(NetworkEvent::ZeroListeners)
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
SwarmEvent::ListenerError { error, .. } => {
|
|
debug!(reason = ?error, "Listener closed connection attempt");
|
|
None
|
|
}
|
|
_ => {
|
|
// NOTE: SwarmEvent is a non exhaustive enum so updates should be based on
|
|
// release notes more than compiler feedback
|
|
None
|
|
}
|
|
}
|
|
}
|
|
}
|