mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-10 20:22:02 +00:00
This is a little bit of a tip-of-the-iceberg PR. It houses a lot of code changes in the libp2p dependency. This needs a bit of thorough testing before merging. The primary code changes are: - General libp2p dependency update - Gossipsub refactor to shift compression into gossipsub providing performance improvements and improved API for handling compression Co-authored-by: Paul Hauner <paul@paulhauner.com>
1357 lines
53 KiB
Rust
1357 lines
53 KiB
Rust
use crate::behaviour::gossipsub_scoring_parameters::PeerScoreSettings;
|
|
use crate::peer_manager::{
|
|
score::{PeerAction, ReportSource},
|
|
ConnectionDirection, PeerManager, PeerManagerEvent,
|
|
};
|
|
use crate::rpc::*;
|
|
use crate::service::METADATA_FILENAME;
|
|
use crate::types::{
|
|
subnet_id_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, SnappyTransform,
|
|
SubnetDiscovery,
|
|
};
|
|
use crate::Eth2Enr;
|
|
use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
|
|
use futures::prelude::*;
|
|
use handler::{BehaviourHandler, BehaviourHandlerIn, DelegateIn, DelegateOut};
|
|
use libp2p::{
|
|
core::{
|
|
connection::{ConnectedPoint, ConnectionId, ListenerId},
|
|
identity::Keypair,
|
|
Multiaddr,
|
|
},
|
|
gossipsub::{
|
|
subscription_filter::{MaxCountSubscriptionFilter, WhitelistSubscriptionFilter},
|
|
Gossipsub as BaseGossipsub, GossipsubEvent, IdentTopic as Topic, MessageAcceptance,
|
|
MessageAuthenticity, MessageId, PeerScoreThresholds,
|
|
},
|
|
identify::{Identify, IdentifyEvent},
|
|
swarm::{
|
|
AddressScore, NetworkBehaviour, NetworkBehaviourAction as NBAction, NotifyHandler,
|
|
PollParameters, ProtocolsHandler,
|
|
},
|
|
PeerId,
|
|
};
|
|
use slog::{crit, debug, o, trace, warn};
|
|
use ssz::Encode;
|
|
use std::collections::HashSet;
|
|
use std::fs::File;
|
|
use std::io::Write;
|
|
use std::path::PathBuf;
|
|
use std::{
|
|
collections::VecDeque,
|
|
marker::PhantomData,
|
|
sync::Arc,
|
|
task::{Context, Poll},
|
|
};
|
|
use types::{ChainSpec, EnrForkId, EthSpec, SignedBeaconBlock, Slot, SubnetId};
|
|
|
|
mod gossipsub_scoring_parameters;
|
|
mod handler;
|
|
|
|
const MAX_IDENTIFY_ADDRESSES: usize = 10;
|
|
pub const GOSSIPSUB_GREYLIST_THRESHOLD: f64 = -16000.0;
|
|
|
|
/// Identifier of requests sent by a peer.
|
|
pub type PeerRequestId = (ConnectionId, SubstreamId);
|
|
|
|
pub type SubscriptionFilter = MaxCountSubscriptionFilter<WhitelistSubscriptionFilter>;
|
|
pub type Gossipsub = BaseGossipsub<SnappyTransform, SubscriptionFilter>;
|
|
|
|
/// The types of events than can be obtained from polling the behaviour.
|
|
#[derive(Debug)]
|
|
pub enum BehaviourEvent<TSpec: EthSpec> {
|
|
/// We have successfully dialed and connected to a peer.
|
|
PeerDialed(PeerId),
|
|
/// A peer has successfully dialed and connected to us.
|
|
PeerConnected(PeerId),
|
|
/// A peer has disconnected.
|
|
PeerDisconnected(PeerId),
|
|
/// An RPC Request that was sent failed.
|
|
RPCFailed {
|
|
/// The id of the failed request.
|
|
id: RequestId,
|
|
/// The peer to which this request was sent.
|
|
peer_id: PeerId,
|
|
},
|
|
RequestReceived {
|
|
/// The peer that sent the request.
|
|
peer_id: PeerId,
|
|
/// Identifier of the request. All responses to this request must use this id.
|
|
id: PeerRequestId,
|
|
/// Request the peer sent.
|
|
request: Request,
|
|
},
|
|
ResponseReceived {
|
|
/// Peer that sent the response.
|
|
peer_id: PeerId,
|
|
/// Id of the request to which the peer is responding.
|
|
id: RequestId,
|
|
/// Response the peer sent.
|
|
response: Response<TSpec>,
|
|
},
|
|
PubsubMessage {
|
|
/// The gossipsub message id. Used when propagating blocks after validation.
|
|
id: MessageId,
|
|
/// The peer from which we received this message, not the peer that published it.
|
|
source: PeerId,
|
|
/// The topic that this message was sent on.
|
|
topic: TopicHash,
|
|
/// The message itself.
|
|
message: PubsubMessage<TSpec>,
|
|
},
|
|
/// Inform the network to send a Status to this peer.
|
|
StatusPeer(PeerId),
|
|
}
|
|
|
|
/// Builds the network behaviour that manages the core protocols of eth2.
|
|
/// This core behaviour is managed by `Behaviour` which adds peer management to all core
|
|
/// behaviours.
|
|
pub struct Behaviour<TSpec: EthSpec> {
|
|
/// The routing pub-sub mechanism for eth2.
|
|
gossipsub: Gossipsub,
|
|
/// The Eth2 RPC specified in the wire-0 protocol.
|
|
eth2_rpc: RPC<TSpec>,
|
|
/// Keep regular connection to peers and disconnect if absent.
|
|
// NOTE: The id protocol is used for initial interop. This will be removed by mainnet.
|
|
/// Provides IP addresses and peer information.
|
|
identify: Identify,
|
|
/// The peer manager that keeps track of peer's reputation and status.
|
|
peer_manager: PeerManager<TSpec>,
|
|
/// The output events generated by this behaviour to be consumed in the swarm poll.
|
|
events: VecDeque<BehaviourEvent<TSpec>>,
|
|
/// Queue of peers to disconnect and an optional reason for the disconnection.
|
|
peers_to_dc: VecDeque<(PeerId, Option<GoodbyeReason>)>,
|
|
/// A collections of variables accessible outside the network service.
|
|
network_globals: Arc<NetworkGlobals<TSpec>>,
|
|
/// Keeps track of the current EnrForkId for upgrading gossipsub topics.
|
|
// NOTE: This can be accessed via the network_globals ENR. However we keep it here for quick
|
|
// lookups for every gossipsub message send.
|
|
enr_fork_id: EnrForkId,
|
|
/// The waker for the current thread.
|
|
waker: Option<std::task::Waker>,
|
|
/// Directory where metadata is stored
|
|
network_dir: PathBuf,
|
|
/// Logger for behaviour actions.
|
|
log: slog::Logger,
|
|
|
|
score_settings: PeerScoreSettings<TSpec>,
|
|
|
|
/// The interval for updating gossipsub scores
|
|
update_gossipsub_scores: tokio::time::Interval,
|
|
}
|
|
|
|
/// Implements the combined behaviour for the libp2p service.
|
|
impl<TSpec: EthSpec> Behaviour<TSpec> {
|
|
pub async fn new(
|
|
local_key: &Keypair,
|
|
net_conf: &NetworkConfig,
|
|
network_globals: Arc<NetworkGlobals<TSpec>>,
|
|
log: &slog::Logger,
|
|
chain_spec: &ChainSpec,
|
|
) -> error::Result<Self> {
|
|
let behaviour_log = log.new(o!());
|
|
|
|
let identify = if net_conf.private {
|
|
Identify::new(
|
|
"".into(),
|
|
"".into(),
|
|
local_key.public(), // Still send legitimate public key
|
|
)
|
|
} else {
|
|
Identify::new(
|
|
"lighthouse/libp2p".into(),
|
|
lighthouse_version::version_with_platform(),
|
|
local_key.public(),
|
|
)
|
|
};
|
|
|
|
let enr_fork_id = network_globals
|
|
.local_enr()
|
|
.eth2()
|
|
.expect("Local ENR must have a fork id");
|
|
|
|
let possible_fork_digests = vec![enr_fork_id.fork_digest];
|
|
let filter = MaxCountSubscriptionFilter {
|
|
filter: Self::create_whitelist_filter(possible_fork_digests, 64), //TODO change this to a constant
|
|
max_subscribed_topics: 200, //TODO change this to a constant
|
|
max_subscriptions_per_request: 100, //this is according to the current go implementation
|
|
};
|
|
|
|
// Initialize the compression transform.
|
|
let snappy_transform = SnappyTransform::new(net_conf.gs_config.max_transmit_size());
|
|
|
|
let mut gossipsub = Gossipsub::new_with_subscription_filter_and_transform(
|
|
MessageAuthenticity::Anonymous,
|
|
net_conf.gs_config.clone(),
|
|
filter,
|
|
snappy_transform,
|
|
)
|
|
.map_err(|e| format!("Could not construct gossipsub: {:?}", e))?;
|
|
|
|
//we don't know the number of active validators and the current slot yet
|
|
let active_validators = TSpec::minimum_validator_count();
|
|
let current_slot = Slot::new(0);
|
|
|
|
let thresholds = PeerScoreThresholds {
|
|
gossip_threshold: -4000.0,
|
|
publish_threshold: -8000.0,
|
|
graylist_threshold: GOSSIPSUB_GREYLIST_THRESHOLD,
|
|
accept_px_threshold: 100.0,
|
|
opportunistic_graft_threshold: 5.0,
|
|
};
|
|
|
|
let score_settings = PeerScoreSettings::new(chain_spec, &net_conf.gs_config);
|
|
|
|
//Prepare scoring parameters
|
|
let params = score_settings.get_peer_score_params(
|
|
active_validators,
|
|
&thresholds,
|
|
&enr_fork_id,
|
|
current_slot,
|
|
)?;
|
|
|
|
trace!(behaviour_log, "Using peer score params"; "params" => ?params);
|
|
|
|
let update_gossipsub_scores = tokio::time::interval(params.decay_interval);
|
|
|
|
gossipsub
|
|
.with_peer_score(params.clone(), thresholds)
|
|
.expect("Valid score params and thresholds");
|
|
|
|
Ok(Behaviour {
|
|
eth2_rpc: RPC::new(log.clone()),
|
|
gossipsub,
|
|
identify,
|
|
peer_manager: PeerManager::new(local_key, net_conf, network_globals.clone(), log)
|
|
.await?,
|
|
events: VecDeque::new(),
|
|
peers_to_dc: VecDeque::new(),
|
|
network_globals,
|
|
enr_fork_id,
|
|
waker: None,
|
|
network_dir: net_conf.network_dir.clone(),
|
|
log: behaviour_log,
|
|
score_settings,
|
|
update_gossipsub_scores,
|
|
})
|
|
}
|
|
|
|
pub fn update_gossipsub_parameters(
|
|
&mut self,
|
|
active_validators: usize,
|
|
current_slot: Slot,
|
|
) -> error::Result<()> {
|
|
let (beacon_block_params, beacon_aggregate_proof_params, beacon_attestation_subnet_params) =
|
|
self.score_settings
|
|
.get_dynamic_topic_params(active_validators, current_slot)?;
|
|
|
|
let fork_digest = self.enr_fork_id.fork_digest;
|
|
let get_topic = |kind: GossipKind| -> Topic {
|
|
GossipTopic::new(kind, GossipEncoding::default(), fork_digest).into()
|
|
};
|
|
|
|
debug!(self.log, "Updating gossipsub score parameters";
|
|
"active_validators" => active_validators);
|
|
trace!(self.log, "Updated gossipsub score parameters";
|
|
"beacon_block_params" => ?beacon_block_params,
|
|
"beacon_aggregate_proof_params" => ?beacon_aggregate_proof_params,
|
|
"beacon_attestation_subnet_params" => ?beacon_attestation_subnet_params,
|
|
);
|
|
|
|
self.gossipsub
|
|
.set_topic_params(get_topic(GossipKind::BeaconBlock), beacon_block_params)?;
|
|
|
|
self.gossipsub.set_topic_params(
|
|
get_topic(GossipKind::BeaconAggregateAndProof),
|
|
beacon_aggregate_proof_params,
|
|
)?;
|
|
|
|
for i in 0..self.score_settings.attestation_subnet_count() {
|
|
self.gossipsub.set_topic_params(
|
|
get_topic(GossipKind::Attestation(SubnetId::new(i))),
|
|
beacon_attestation_subnet_params.clone(),
|
|
)?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Attempts to connect to a libp2p peer.
|
|
///
|
|
/// This MUST be used over Swarm::dial() as this keeps track of the peer in the peer manager.
|
|
///
|
|
/// All external dials, dial a multiaddr. This is currently unused but kept here in case any
|
|
/// part of lighthouse needs to connect to a peer_id in the future.
|
|
pub fn dial(&mut self, peer_id: &PeerId) {
|
|
self.peer_manager.dial_peer(peer_id);
|
|
}
|
|
|
|
/// Returns the local ENR of the node.
|
|
pub fn local_enr(&self) -> Enr {
|
|
self.network_globals.local_enr()
|
|
}
|
|
|
|
/// Obtain a reference to the gossipsub protocol.
|
|
pub fn gs(&self) -> &Gossipsub {
|
|
&self.gossipsub
|
|
}
|
|
|
|
/* Pubsub behaviour functions */
|
|
|
|
/// Subscribes to a gossipsub topic kind, letting the network service determine the
|
|
/// encoding and fork version.
|
|
pub fn subscribe_kind(&mut self, kind: GossipKind) -> bool {
|
|
let gossip_topic = GossipTopic::new(
|
|
kind,
|
|
GossipEncoding::default(),
|
|
self.enr_fork_id.fork_digest,
|
|
);
|
|
|
|
self.subscribe(gossip_topic)
|
|
}
|
|
|
|
/// Unsubscribes from a gossipsub topic kind, letting the network service determine the
|
|
/// encoding and fork version.
|
|
pub fn unsubscribe_kind(&mut self, kind: GossipKind) -> bool {
|
|
let gossip_topic = GossipTopic::new(
|
|
kind,
|
|
GossipEncoding::default(),
|
|
self.enr_fork_id.fork_digest,
|
|
);
|
|
self.unsubscribe(gossip_topic)
|
|
}
|
|
|
|
/// Subscribes to a specific subnet id;
|
|
pub fn subscribe_to_subnet(&mut self, subnet_id: SubnetId) -> bool {
|
|
let topic = GossipTopic::new(
|
|
subnet_id.into(),
|
|
GossipEncoding::default(),
|
|
self.enr_fork_id.fork_digest,
|
|
);
|
|
self.subscribe(topic)
|
|
}
|
|
|
|
/// Un-Subscribes from a specific subnet id;
|
|
pub fn unsubscribe_from_subnet(&mut self, subnet_id: SubnetId) -> bool {
|
|
let topic = GossipTopic::new(
|
|
subnet_id.into(),
|
|
GossipEncoding::default(),
|
|
self.enr_fork_id.fork_digest,
|
|
);
|
|
self.unsubscribe(topic)
|
|
}
|
|
|
|
/// Subscribes to a gossipsub topic.
|
|
fn subscribe(&mut self, topic: GossipTopic) -> bool {
|
|
// update the network globals
|
|
self.network_globals
|
|
.gossipsub_subscriptions
|
|
.write()
|
|
.insert(topic.clone());
|
|
|
|
let topic: Topic = topic.into();
|
|
|
|
match self.gossipsub.subscribe(&topic) {
|
|
Err(_) => {
|
|
warn!(self.log, "Failed to subscribe to topic"; "topic" => %topic);
|
|
false
|
|
}
|
|
Ok(v) => {
|
|
debug!(self.log, "Subscribed to topic"; "topic" => %topic);
|
|
v
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Unsubscribe from a gossipsub topic.
|
|
fn unsubscribe(&mut self, topic: GossipTopic) -> bool {
|
|
// update the network globals
|
|
self.network_globals
|
|
.gossipsub_subscriptions
|
|
.write()
|
|
.remove(&topic);
|
|
|
|
// unsubscribe from the topic
|
|
let topic: Topic = topic.into();
|
|
|
|
match self.gossipsub.unsubscribe(&topic) {
|
|
Err(_) => {
|
|
warn!(self.log, "Failed to unsubscribe from topic"; "topic" => %topic);
|
|
false
|
|
}
|
|
Ok(v) => {
|
|
debug!(self.log, "Unsubscribed to topic"; "topic" => %topic);
|
|
v
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Publishes a list of messages on the pubsub (gossipsub) behaviour, choosing the encoding.
|
|
pub fn publish(&mut self, messages: Vec<PubsubMessage<TSpec>>) {
|
|
for message in messages {
|
|
for topic in message.topics(GossipEncoding::default(), self.enr_fork_id.fork_digest) {
|
|
let message_data = message.encode(GossipEncoding::default());
|
|
if let Err(e) = self.gossipsub.publish(topic.clone().into(), message_data) {
|
|
slog::warn!(self.log, "Could not publish message";
|
|
"error" => ?e);
|
|
|
|
// add to metrics
|
|
match topic.kind() {
|
|
GossipKind::Attestation(subnet_id) => {
|
|
if let Some(v) = metrics::get_int_gauge(
|
|
&metrics::FAILED_ATTESTATION_PUBLISHES_PER_SUBNET,
|
|
&[&subnet_id.to_string()],
|
|
) {
|
|
v.inc()
|
|
};
|
|
}
|
|
kind => {
|
|
if let Some(v) = metrics::get_int_gauge(
|
|
&metrics::FAILED_PUBLISHES_PER_MAIN_TOPIC,
|
|
&[&format!("{:?}", kind)],
|
|
) {
|
|
v.inc()
|
|
};
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Informs the gossipsub about the result of a message validation.
|
|
/// If the message is valid it will get propagated by gossipsub.
|
|
pub fn report_message_validation_result(
|
|
&mut self,
|
|
propagation_source: &PeerId,
|
|
message_id: MessageId,
|
|
validation_result: MessageAcceptance,
|
|
) {
|
|
if let Some(result) = match validation_result {
|
|
MessageAcceptance::Accept => None,
|
|
MessageAcceptance::Ignore => Some("ignore"),
|
|
MessageAcceptance::Reject => Some("reject"),
|
|
} {
|
|
if let Some(client) = self
|
|
.network_globals
|
|
.peers
|
|
.read()
|
|
.peer_info(propagation_source)
|
|
.map(|info| info.client.kind.as_static_ref())
|
|
{
|
|
metrics::inc_counter_vec(
|
|
&metrics::GOSSIP_UNACCEPTED_MESSAGES_PER_CLIENT,
|
|
&[client, result],
|
|
)
|
|
}
|
|
}
|
|
|
|
if let Err(e) = self.gossipsub.report_message_validation_result(
|
|
&message_id,
|
|
propagation_source,
|
|
validation_result,
|
|
) {
|
|
warn!(self.log, "Failed to report message validation"; "message_id" => %message_id, "peer_id" => %propagation_source, "error" => ?e);
|
|
}
|
|
}
|
|
|
|
/* Eth2 RPC behaviour functions */
|
|
|
|
/// Send a request to a peer over RPC.
|
|
pub fn send_request(&mut self, peer_id: PeerId, request_id: RequestId, request: Request) {
|
|
self.eth2_rpc
|
|
.send_request(peer_id, request_id, request.into())
|
|
}
|
|
|
|
/// Send a successful response to a peer over RPC.
|
|
pub fn send_successful_response(
|
|
&mut self,
|
|
peer_id: PeerId,
|
|
id: PeerRequestId,
|
|
response: Response<TSpec>,
|
|
) {
|
|
self.eth2_rpc.send_response(peer_id, id, response.into())
|
|
}
|
|
|
|
/// Inform the peer that their request produced an error.
|
|
pub fn _send_error_reponse(
|
|
&mut self,
|
|
peer_id: PeerId,
|
|
id: PeerRequestId,
|
|
error: RPCResponseErrorCode,
|
|
reason: String,
|
|
) {
|
|
self.eth2_rpc
|
|
.send_response(peer_id, id, RPCCodedResponse::Error(error, reason.into()))
|
|
}
|
|
|
|
/* Peer management functions */
|
|
|
|
/// Report a peer's action.
|
|
pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction, source: ReportSource) {
|
|
self.peer_manager.report_peer(peer_id, action, source)
|
|
}
|
|
|
|
/// Disconnects from a peer providing a reason.
|
|
///
|
|
/// This will send a goodbye, disconnect and then ban the peer.
|
|
/// This is fatal for a peer, and should be used in unrecoverable circumstances.
|
|
pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason, source: ReportSource) {
|
|
self.peer_manager.goodbye_peer(peer_id, reason, source);
|
|
}
|
|
|
|
/// Returns an iterator over all enr entries in the DHT.
|
|
pub fn enr_entries(&mut self) -> Vec<Enr> {
|
|
self.peer_manager.discovery_mut().table_entries_enr()
|
|
}
|
|
|
|
/// Add an ENR to the routing table of the discovery mechanism.
|
|
pub fn add_enr(&mut self, enr: Enr) {
|
|
self.peer_manager.discovery_mut().add_enr(enr);
|
|
}
|
|
|
|
/// Updates a subnet value to the ENR bitfield.
|
|
///
|
|
/// The `value` is `true` if a subnet is being added and false otherwise.
|
|
pub fn update_enr_subnet(&mut self, subnet_id: SubnetId, value: bool) {
|
|
if let Err(e) = self
|
|
.peer_manager
|
|
.discovery_mut()
|
|
.update_enr_bitfield(subnet_id, value)
|
|
{
|
|
crit!(self.log, "Could not update ENR bitfield"; "error" => e);
|
|
}
|
|
// update the local meta data which informs our peers of the update during PINGS
|
|
self.update_metadata();
|
|
}
|
|
|
|
/// Attempts to discover new peers for a given subnet. The `min_ttl` gives the time at which we
|
|
/// would like to retain the peers for.
|
|
pub fn discover_subnet_peers(&mut self, subnet_subscriptions: Vec<SubnetDiscovery>) {
|
|
self.peer_manager
|
|
.discover_subnet_peers(subnet_subscriptions)
|
|
}
|
|
|
|
/// Updates the local ENR's "eth2" field with the latest EnrForkId.
|
|
pub fn update_fork_version(&mut self, enr_fork_id: EnrForkId) {
|
|
self.peer_manager
|
|
.discovery_mut()
|
|
.update_eth2_enr(enr_fork_id.clone());
|
|
|
|
// unsubscribe from all gossip topics and re-subscribe to their new fork counterparts
|
|
let subscribed_topics = self
|
|
.network_globals
|
|
.gossipsub_subscriptions
|
|
.read()
|
|
.iter()
|
|
.cloned()
|
|
.collect::<Vec<GossipTopic>>();
|
|
|
|
// unsubscribe from all topics
|
|
for topic in &subscribed_topics {
|
|
self.unsubscribe(topic.clone());
|
|
}
|
|
|
|
// re-subscribe modifying the fork version
|
|
for mut topic in subscribed_topics {
|
|
*topic.digest() = enr_fork_id.fork_digest;
|
|
self.subscribe(topic);
|
|
}
|
|
|
|
// update the local reference
|
|
self.enr_fork_id = enr_fork_id;
|
|
}
|
|
|
|
/* Private internal functions */
|
|
|
|
/// Updates the current meta data of the node to match the local ENR.
|
|
fn update_metadata(&mut self) {
|
|
let local_attnets = self
|
|
.peer_manager
|
|
.discovery()
|
|
.local_enr()
|
|
.bitfield::<TSpec>()
|
|
.expect("Local discovery must have bitfield");
|
|
|
|
{
|
|
// write lock scope
|
|
let mut meta_data = self.network_globals.local_metadata.write();
|
|
meta_data.seq_number += 1;
|
|
meta_data.attnets = local_attnets;
|
|
}
|
|
// Save the updated metadata to disk
|
|
save_metadata_to_disk(
|
|
&self.network_dir,
|
|
self.network_globals.local_metadata.read().clone(),
|
|
&self.log,
|
|
);
|
|
}
|
|
|
|
/// Sends a Ping request to the peer.
|
|
fn ping(&mut self, id: RequestId, peer_id: PeerId) {
|
|
let ping = crate::rpc::Ping {
|
|
data: self.network_globals.local_metadata.read().seq_number,
|
|
};
|
|
trace!(self.log, "Sending Ping"; "request_id" => id, "peer_id" => %peer_id);
|
|
|
|
self.eth2_rpc
|
|
.send_request(peer_id, id, RPCRequest::Ping(ping));
|
|
}
|
|
|
|
/// Sends a Pong response to the peer.
|
|
fn pong(&mut self, id: PeerRequestId, peer_id: PeerId) {
|
|
let ping = crate::rpc::Ping {
|
|
data: self.network_globals.local_metadata.read().seq_number,
|
|
};
|
|
trace!(self.log, "Sending Pong"; "request_id" => id.1, "peer_id" => %peer_id);
|
|
let event = RPCCodedResponse::Success(RPCResponse::Pong(ping));
|
|
self.eth2_rpc.send_response(peer_id, id, event);
|
|
}
|
|
|
|
/// Sends a METADATA request to a peer.
|
|
fn send_meta_data_request(&mut self, peer_id: PeerId) {
|
|
let event = RPCRequest::MetaData(PhantomData);
|
|
self.eth2_rpc
|
|
.send_request(peer_id, RequestId::Behaviour, event);
|
|
}
|
|
|
|
/// Sends a METADATA response to a peer.
|
|
fn send_meta_data_response(&mut self, id: PeerRequestId, peer_id: PeerId) {
|
|
let event = RPCCodedResponse::Success(RPCResponse::MetaData(
|
|
self.network_globals.local_metadata.read().clone(),
|
|
));
|
|
self.eth2_rpc.send_response(peer_id, id, event);
|
|
}
|
|
|
|
/// Returns a reference to the peer manager to allow the swarm to notify the manager of peer
|
|
/// status
|
|
pub fn peer_manager(&mut self) -> &mut PeerManager<TSpec> {
|
|
&mut self.peer_manager
|
|
}
|
|
|
|
fn on_gossip_event(&mut self, event: GossipsubEvent) {
|
|
match event {
|
|
GossipsubEvent::Message {
|
|
propagation_source,
|
|
message_id: id,
|
|
message: gs_msg,
|
|
} => {
|
|
// Note: We are keeping track here of the peer that sent us the message, not the
|
|
// peer that originally published the message.
|
|
match PubsubMessage::decode(&gs_msg.topic, &gs_msg.data) {
|
|
Err(e) => {
|
|
debug!(self.log, "Could not decode gossipsub message"; "error" => e);
|
|
//reject the message
|
|
if let Err(e) = self.gossipsub.report_message_validation_result(
|
|
&id,
|
|
&propagation_source,
|
|
MessageAcceptance::Reject,
|
|
) {
|
|
warn!(self.log, "Failed to report message validation"; "message_id" => %id, "peer_id" => %propagation_source, "error" => ?e);
|
|
}
|
|
}
|
|
Ok(msg) => {
|
|
// Notify the network
|
|
self.add_event(BehaviourEvent::PubsubMessage {
|
|
id,
|
|
source: propagation_source,
|
|
topic: gs_msg.topic,
|
|
message: msg,
|
|
});
|
|
}
|
|
}
|
|
}
|
|
GossipsubEvent::Subscribed { peer_id, topic } => {
|
|
if let Some(subnet_id) = subnet_id_from_topic_hash(&topic) {
|
|
self.peer_manager.add_subscription(&peer_id, subnet_id);
|
|
}
|
|
}
|
|
GossipsubEvent::Unsubscribed { peer_id, topic } => {
|
|
if let Some(subnet_id) = subnet_id_from_topic_hash(&topic) {
|
|
self.peer_manager.remove_subscription(&peer_id, subnet_id);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Queues the response to be sent upwards as long at it was requested outside the Behaviour.
|
|
fn propagate_response(&mut self, id: RequestId, peer_id: PeerId, response: Response<TSpec>) {
|
|
if !matches!(id, RequestId::Behaviour) {
|
|
self.add_event(BehaviourEvent::ResponseReceived {
|
|
peer_id,
|
|
id,
|
|
response,
|
|
});
|
|
}
|
|
}
|
|
|
|
/// Convenience function to propagate a request.
|
|
fn propagate_request(&mut self, id: PeerRequestId, peer_id: PeerId, request: Request) {
|
|
self.add_event(BehaviourEvent::RequestReceived {
|
|
peer_id,
|
|
id,
|
|
request,
|
|
});
|
|
}
|
|
|
|
fn on_rpc_event(&mut self, message: RPCMessage<TSpec>) {
|
|
let peer_id = message.peer_id;
|
|
|
|
if !self.peer_manager.is_connected(&peer_id) {
|
|
//ignore this event
|
|
debug!(
|
|
self.log,
|
|
"Ignoring rpc message of disconnected peer";
|
|
"peer" => %peer_id
|
|
);
|
|
return;
|
|
}
|
|
|
|
let handler_id = message.conn_id;
|
|
// The METADATA and PING RPC responses are handled within the behaviour and not propagated
|
|
match message.event {
|
|
Err(handler_err) => {
|
|
match handler_err {
|
|
HandlerErr::Inbound {
|
|
id: _,
|
|
proto,
|
|
error,
|
|
} => {
|
|
if matches!(error, RPCError::HandlerRejected) {
|
|
// this peer's request got canceled
|
|
}
|
|
// Inform the peer manager of the error.
|
|
// An inbound error here means we sent an error to the peer, or the stream
|
|
// timed out.
|
|
self.peer_manager.handle_rpc_error(
|
|
&peer_id,
|
|
proto,
|
|
&error,
|
|
ConnectionDirection::Incoming,
|
|
);
|
|
}
|
|
HandlerErr::Outbound { id, proto, error } => {
|
|
// Inform the peer manager that a request we sent to the peer failed
|
|
self.peer_manager.handle_rpc_error(
|
|
&peer_id,
|
|
proto,
|
|
&error,
|
|
ConnectionDirection::Outgoing,
|
|
);
|
|
// inform failures of requests comming outside the behaviour
|
|
if !matches!(id, RequestId::Behaviour) {
|
|
self.add_event(BehaviourEvent::RPCFailed { peer_id, id });
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Ok(RPCReceived::Request(id, request)) => {
|
|
let peer_request_id = (handler_id, id);
|
|
match request {
|
|
/* Behaviour managed protocols: Ping and Metadata */
|
|
RPCRequest::Ping(ping) => {
|
|
// inform the peer manager and send the response
|
|
self.peer_manager.ping_request(&peer_id, ping.data);
|
|
// send a ping response
|
|
self.pong(peer_request_id, peer_id);
|
|
}
|
|
RPCRequest::MetaData(_) => {
|
|
// send the requested meta-data
|
|
self.send_meta_data_response((handler_id, id), peer_id);
|
|
}
|
|
RPCRequest::Goodbye(reason) => {
|
|
// queue for disconnection without a goodbye message
|
|
debug!(
|
|
self.log, "Peer sent Goodbye";
|
|
"peer_id" => %peer_id,
|
|
"reason" => %reason,
|
|
"client" => %self.network_globals.client(&peer_id),
|
|
);
|
|
self.peers_to_dc.push_back((peer_id, None));
|
|
// NOTE: We currently do not inform the application that we are
|
|
// disconnecting here.
|
|
// The actual disconnection event will be relayed to the application. Ideally
|
|
// this time difference is short, but we may need to introduce a message to
|
|
// inform the application layer early.
|
|
}
|
|
/* Protocols propagated to the Network */
|
|
RPCRequest::Status(msg) => {
|
|
// inform the peer manager that we have received a status from a peer
|
|
self.peer_manager.peer_statusd(&peer_id);
|
|
// propagate the STATUS message upwards
|
|
self.propagate_request(peer_request_id, peer_id, Request::Status(msg))
|
|
}
|
|
RPCRequest::BlocksByRange(req) => self.propagate_request(
|
|
peer_request_id,
|
|
peer_id,
|
|
Request::BlocksByRange(req),
|
|
),
|
|
RPCRequest::BlocksByRoot(req) => {
|
|
self.propagate_request(peer_request_id, peer_id, Request::BlocksByRoot(req))
|
|
}
|
|
}
|
|
}
|
|
Ok(RPCReceived::Response(id, resp)) => {
|
|
match resp {
|
|
/* Behaviour managed protocols */
|
|
RPCResponse::Pong(ping) => self.peer_manager.pong_response(&peer_id, ping.data),
|
|
RPCResponse::MetaData(meta_data) => {
|
|
self.peer_manager.meta_data_response(&peer_id, meta_data)
|
|
}
|
|
/* Network propagated protocols */
|
|
RPCResponse::Status(msg) => {
|
|
// inform the peer manager that we have received a status from a peer
|
|
self.peer_manager.peer_statusd(&peer_id);
|
|
// propagate the STATUS message upwards
|
|
self.propagate_response(id, peer_id, Response::Status(msg));
|
|
}
|
|
RPCResponse::BlocksByRange(resp) => {
|
|
self.propagate_response(id, peer_id, Response::BlocksByRange(Some(resp)))
|
|
}
|
|
RPCResponse::BlocksByRoot(resp) => {
|
|
self.propagate_response(id, peer_id, Response::BlocksByRoot(Some(resp)))
|
|
}
|
|
}
|
|
}
|
|
Ok(RPCReceived::EndOfStream(id, termination)) => {
|
|
let response = match termination {
|
|
ResponseTermination::BlocksByRange => Response::BlocksByRange(None),
|
|
ResponseTermination::BlocksByRoot => Response::BlocksByRoot(None),
|
|
};
|
|
self.propagate_response(id, peer_id, response);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Consumes the events list when polled.
|
|
fn custom_poll(
|
|
&mut self,
|
|
cx: &mut Context,
|
|
) -> Poll<NBAction<BehaviourHandlerIn<TSpec>, BehaviourEvent<TSpec>>> {
|
|
// handle pending disconnections to perform
|
|
if let Some((peer_id, reason)) = self.peers_to_dc.pop_front() {
|
|
return Poll::Ready(NBAction::NotifyHandler {
|
|
peer_id,
|
|
handler: NotifyHandler::All,
|
|
event: BehaviourHandlerIn::Shutdown(
|
|
reason.map(|reason| (RequestId::Behaviour, RPCRequest::Goodbye(reason))),
|
|
),
|
|
});
|
|
}
|
|
|
|
// check the peer manager for events
|
|
loop {
|
|
match self.peer_manager.poll_next_unpin(cx) {
|
|
Poll::Ready(Some(event)) => match event {
|
|
PeerManagerEvent::Dial(peer_id) => {
|
|
return Poll::Ready(NBAction::DialPeer {
|
|
peer_id,
|
|
condition: libp2p::swarm::DialPeerCondition::Disconnected,
|
|
});
|
|
}
|
|
PeerManagerEvent::SocketUpdated(address) => {
|
|
return Poll::Ready(NBAction::ReportObservedAddr {
|
|
address,
|
|
score: AddressScore::Finite(1),
|
|
});
|
|
}
|
|
PeerManagerEvent::Status(peer_id) => {
|
|
// it's time to status. We don't keep a beacon chain reference here, so we inform
|
|
// the network to send a status to this peer
|
|
return Poll::Ready(NBAction::GenerateEvent(BehaviourEvent::StatusPeer(
|
|
peer_id,
|
|
)));
|
|
}
|
|
PeerManagerEvent::Ping(peer_id) => {
|
|
// send a ping request to this peer
|
|
self.ping(RequestId::Behaviour, peer_id);
|
|
}
|
|
PeerManagerEvent::MetaData(peer_id) => {
|
|
self.send_meta_data_request(peer_id);
|
|
}
|
|
PeerManagerEvent::DisconnectPeer(peer_id, reason) => {
|
|
debug!(self.log, "PeerManager disconnecting peer";
|
|
"peer_id" => %peer_id, "reason" => %reason);
|
|
// send one goodbye
|
|
return Poll::Ready(NBAction::NotifyHandler {
|
|
peer_id,
|
|
handler: NotifyHandler::Any,
|
|
event: BehaviourHandlerIn::Shutdown(Some((
|
|
RequestId::Behaviour,
|
|
RPCRequest::Goodbye(reason),
|
|
))),
|
|
});
|
|
}
|
|
},
|
|
Poll::Pending => break,
|
|
Poll::Ready(None) => break, // peer manager ended
|
|
}
|
|
}
|
|
|
|
if let Some(event) = self.events.pop_front() {
|
|
return Poll::Ready(NBAction::GenerateEvent(event));
|
|
}
|
|
|
|
// perform gossipsub score updates when necessary
|
|
while let Poll::Ready(Some(_)) = self.update_gossipsub_scores.poll_next_unpin(cx) {
|
|
self.peer_manager.update_gossipsub_scores(&self.gossipsub);
|
|
}
|
|
|
|
Poll::Pending
|
|
}
|
|
|
|
fn on_identify_event(&mut self, event: IdentifyEvent) {
|
|
match event {
|
|
IdentifyEvent::Received {
|
|
peer_id,
|
|
mut info,
|
|
observed_addr,
|
|
} => {
|
|
if info.listen_addrs.len() > MAX_IDENTIFY_ADDRESSES {
|
|
debug!(
|
|
self.log,
|
|
"More than 10 addresses have been identified, truncating"
|
|
);
|
|
info.listen_addrs.truncate(MAX_IDENTIFY_ADDRESSES);
|
|
}
|
|
// send peer info to the peer manager.
|
|
self.peer_manager.identify(&peer_id, &info);
|
|
|
|
debug!(self.log, "Identified Peer"; "peer" => %peer_id,
|
|
"protocol_version" => info.protocol_version,
|
|
"agent_version" => info.agent_version,
|
|
"listening_ addresses" => ?info.listen_addrs,
|
|
"observed_address" => ?observed_addr,
|
|
"protocols" => ?info.protocols
|
|
);
|
|
}
|
|
IdentifyEvent::Sent { .. } => {}
|
|
IdentifyEvent::Error { .. } => {}
|
|
}
|
|
}
|
|
|
|
/// Adds an event to the queue waking the current thread to process it.
|
|
fn add_event(&mut self, event: BehaviourEvent<TSpec>) {
|
|
self.events.push_back(event);
|
|
if let Some(waker) = &self.waker {
|
|
waker.wake_by_ref();
|
|
}
|
|
}
|
|
|
|
/// Creates a whitelist topic filter that covers all possible topics using the given set of
|
|
/// possible fork digests.
|
|
fn create_whitelist_filter(
|
|
possible_fork_digests: Vec<[u8; 4]>,
|
|
attestation_subnet_count: u64,
|
|
) -> WhitelistSubscriptionFilter {
|
|
let mut possible_hashes = HashSet::new();
|
|
for fork_digest in possible_fork_digests {
|
|
let mut add = |kind| {
|
|
let topic: Topic =
|
|
GossipTopic::new(kind, GossipEncoding::SSZSnappy, fork_digest).into();
|
|
possible_hashes.insert(topic.hash());
|
|
};
|
|
|
|
use GossipKind::*;
|
|
add(BeaconBlock);
|
|
add(BeaconAggregateAndProof);
|
|
add(VoluntaryExit);
|
|
add(ProposerSlashing);
|
|
add(AttesterSlashing);
|
|
for id in 0..attestation_subnet_count {
|
|
add(Attestation(SubnetId::new(id)));
|
|
}
|
|
}
|
|
WhitelistSubscriptionFilter(possible_hashes)
|
|
}
|
|
}
|
|
|
|
/// Calls the given function with the given args on all sub behaviours.
|
|
macro_rules! delegate_to_behaviours {
|
|
($self: ident, $fn: ident, $($arg: ident), *) => {
|
|
$self.gossipsub.$fn($($arg),*);
|
|
$self.eth2_rpc.$fn($($arg),*);
|
|
$self.identify.$fn($($arg),*);
|
|
};
|
|
}
|
|
|
|
impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
|
|
type ProtocolsHandler = BehaviourHandler<TSpec>;
|
|
type OutEvent = BehaviourEvent<TSpec>;
|
|
|
|
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
|
BehaviourHandler::new(&mut self.gossipsub, &mut self.eth2_rpc, &mut self.identify)
|
|
}
|
|
|
|
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
|
|
self.peer_manager.addresses_of_peer(peer_id)
|
|
}
|
|
|
|
// This gets called every time a connection is established.
|
|
// NOTE: The current logic implies that we would reject extra connections for already connected
|
|
// peers if we have reached our peer limit. This is fine for the time being as we currently
|
|
// only allow a single connection per peer.
|
|
fn inject_connection_established(
|
|
&mut self,
|
|
peer_id: &PeerId,
|
|
conn_id: &ConnectionId,
|
|
endpoint: &ConnectedPoint,
|
|
) {
|
|
let goodbye_reason: Option<GoodbyeReason> = if self.peer_manager.is_banned(peer_id) {
|
|
// If the peer is banned, send goodbye with reason banned.
|
|
// A peer that has recently transitioned to the banned state should be in the
|
|
// disconnecting state, but the `is_banned()` function is dependent on score so should
|
|
// be true here in this case.
|
|
Some(GoodbyeReason::Banned)
|
|
} else if self.peer_manager.peer_limit_reached()
|
|
&& self
|
|
.network_globals
|
|
.peers
|
|
.read()
|
|
.peer_info(peer_id)
|
|
.map_or(true, |i| !i.has_future_duty())
|
|
{
|
|
// If we are at our peer limit and we don't need the peer for a future validator
|
|
// duty, send goodbye with reason TooManyPeers
|
|
Some(GoodbyeReason::TooManyPeers)
|
|
} else {
|
|
None
|
|
};
|
|
|
|
if let Some(goodbye_reason) = goodbye_reason {
|
|
match goodbye_reason {
|
|
GoodbyeReason::Banned => {
|
|
debug!(self.log, "Disconnecting newly connected peer"; "peer_id" => %peer_id, "reason" => %goodbye_reason)
|
|
}
|
|
_ => {
|
|
trace!(self.log, "Disconnecting newly connected peer"; "peer_id" => %peer_id, "reason" => %goodbye_reason)
|
|
}
|
|
}
|
|
self.peers_to_dc.push_back((*peer_id, Some(goodbye_reason)));
|
|
// NOTE: We don't inform the peer manager that this peer is disconnecting. It is simply
|
|
// rejected with a goodbye.
|
|
return;
|
|
}
|
|
|
|
// All peers at this point will be registered as being connected.
|
|
// Notify the peer manager of a successful connection
|
|
match endpoint {
|
|
ConnectedPoint::Listener { send_back_addr, .. } => {
|
|
self.peer_manager
|
|
.connect_ingoing(&peer_id, send_back_addr.clone());
|
|
self.add_event(BehaviourEvent::PeerConnected(*peer_id));
|
|
debug!(self.log, "Connection established"; "peer_id" => %peer_id, "connection" => "Incoming");
|
|
}
|
|
ConnectedPoint::Dialer { address } => {
|
|
self.peer_manager
|
|
.connect_outgoing(&peer_id, address.clone());
|
|
self.add_event(BehaviourEvent::PeerDialed(*peer_id));
|
|
debug!(self.log, "Connection established"; "peer_id" => %peer_id, "connection" => "Dialed");
|
|
}
|
|
}
|
|
// report the event to the behaviour
|
|
delegate_to_behaviours!(
|
|
self,
|
|
inject_connection_established,
|
|
peer_id,
|
|
conn_id,
|
|
endpoint
|
|
);
|
|
}
|
|
|
|
// This gets called on the initial connection establishment.
|
|
// NOTE: This gets called after inject_connection_established. Therefore the logic in that
|
|
// function dictates the logic here.
|
|
fn inject_connected(&mut self, peer_id: &PeerId) {
|
|
// If the PeerManager has connected this peer, inform the behaviours
|
|
if !self.network_globals.peers.read().is_connected(&peer_id) {
|
|
return;
|
|
}
|
|
|
|
// increment prometheus metrics
|
|
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
|
|
metrics::set_gauge(
|
|
&metrics::PEERS_CONNECTED,
|
|
self.network_globals.connected_peers() as i64,
|
|
);
|
|
|
|
delegate_to_behaviours!(self, inject_connected, peer_id);
|
|
}
|
|
|
|
// This gets called every time a connection is closed.
|
|
// NOTE: The peer manager state can be modified in the lifetime of the peer. Due to the scoring
|
|
// mechanism. Peers can become banned. In this case, we still want to inform the behaviours.
|
|
fn inject_connection_closed(
|
|
&mut self,
|
|
peer_id: &PeerId,
|
|
conn_id: &ConnectionId,
|
|
endpoint: &ConnectedPoint,
|
|
) {
|
|
// If the peer manager (and therefore the behaviour's) believe this peer connected, inform
|
|
// about the disconnection.
|
|
// It could be the peer was in the process of being disconnected. In this case the
|
|
// sub-behaviours are expecting this peer to be connected and we inform them.
|
|
if self
|
|
.network_globals
|
|
.peers
|
|
.read()
|
|
.is_connected_or_disconnecting(peer_id)
|
|
{
|
|
// We are disconnecting the peer or the peer has already been connected.
|
|
// Both these cases, the peer has been previously registered in the sub protocols.
|
|
delegate_to_behaviours!(self, inject_connection_closed, peer_id, conn_id, endpoint);
|
|
}
|
|
}
|
|
|
|
// This gets called once there are no more active connections.
|
|
fn inject_disconnected(&mut self, peer_id: &PeerId) {
|
|
// If the application/behaviour layers thinks this peer has connected inform it of the disconnect.
|
|
|
|
// Remove all subnet subscriptions from peerdb for the disconnected peer.
|
|
self.peer_manager().remove_all_subscriptions(&peer_id);
|
|
|
|
if self
|
|
.network_globals
|
|
.peers
|
|
.read()
|
|
.is_connected_or_disconnecting(peer_id)
|
|
{
|
|
// We are disconnecting the peer or the peer has already been connected.
|
|
// Both these cases, the peer has been previously registered in the sub protocols and
|
|
// potentially the application layer.
|
|
// Inform the application.
|
|
self.add_event(BehaviourEvent::PeerDisconnected(*peer_id));
|
|
// Inform the behaviour.
|
|
delegate_to_behaviours!(self, inject_disconnected, peer_id);
|
|
|
|
debug!(self.log, "Peer disconnected"; "peer_id" => %peer_id);
|
|
|
|
// Decrement the PEERS_PER_CLIENT metric
|
|
if let Some(kind) = self
|
|
.network_globals
|
|
.peers
|
|
.read()
|
|
.peer_info(peer_id)
|
|
.map(|info| info.client.kind.clone())
|
|
{
|
|
if let Some(v) =
|
|
metrics::get_int_gauge(&metrics::PEERS_PER_CLIENT, &[&kind.to_string()])
|
|
{
|
|
v.dec()
|
|
};
|
|
}
|
|
}
|
|
|
|
// Inform the peer manager.
|
|
// NOTE: It may be the case that a rejected node, due to too many peers is disconnected
|
|
// here and the peer manager has no knowledge of its connection. We insert it here for
|
|
// reference so that peer manager can track this peer.
|
|
self.peer_manager.notify_disconnect(&peer_id);
|
|
|
|
// Update the prometheus metrics
|
|
metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT);
|
|
metrics::set_gauge(
|
|
&metrics::PEERS_CONNECTED,
|
|
self.network_globals.connected_peers() as i64,
|
|
);
|
|
}
|
|
|
|
fn inject_addr_reach_failure(
|
|
&mut self,
|
|
peer_id: Option<&PeerId>,
|
|
addr: &Multiaddr,
|
|
error: &dyn std::error::Error,
|
|
) {
|
|
delegate_to_behaviours!(self, inject_addr_reach_failure, peer_id, addr, error);
|
|
}
|
|
|
|
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
|
|
// Could not dial the peer, inform the peer manager.
|
|
self.peer_manager.notify_dial_failure(&peer_id);
|
|
delegate_to_behaviours!(self, inject_dial_failure, peer_id);
|
|
}
|
|
|
|
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
|
|
delegate_to_behaviours!(self, inject_new_listen_addr, addr);
|
|
}
|
|
|
|
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
|
|
delegate_to_behaviours!(self, inject_expired_listen_addr, addr);
|
|
}
|
|
|
|
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
|
|
delegate_to_behaviours!(self, inject_new_external_addr, addr);
|
|
}
|
|
|
|
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
|
|
delegate_to_behaviours!(self, inject_listener_error, id, err);
|
|
}
|
|
fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &std::io::Error>) {
|
|
delegate_to_behaviours!(self, inject_listener_closed, id, reason);
|
|
}
|
|
|
|
fn inject_event(
|
|
&mut self,
|
|
peer_id: PeerId,
|
|
conn_id: ConnectionId,
|
|
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
|
|
) {
|
|
// If the peer is not supposed to be connected (undergoing active disconnection,
|
|
// don't process any of its messages.
|
|
if !self.network_globals.peers.read().is_connected(&peer_id) {
|
|
return;
|
|
}
|
|
|
|
// Events comming from the handler, redirected to each behaviour
|
|
match event {
|
|
DelegateOut::Gossipsub(ev) => self.gossipsub.inject_event(peer_id, conn_id, ev),
|
|
DelegateOut::RPC(ev) => self.eth2_rpc.inject_event(peer_id, conn_id, ev),
|
|
DelegateOut::Identify(ev) => self.identify.inject_event(peer_id, conn_id, *ev),
|
|
}
|
|
}
|
|
|
|
fn poll(
|
|
&mut self,
|
|
cx: &mut Context,
|
|
poll_params: &mut impl PollParameters,
|
|
) -> Poll<NBAction<<Self::ProtocolsHandler as ProtocolsHandler>::InEvent, Self::OutEvent>> {
|
|
// update the waker if needed
|
|
if let Some(waker) = &self.waker {
|
|
if waker.will_wake(cx.waker()) {
|
|
self.waker = Some(cx.waker().clone());
|
|
}
|
|
} else {
|
|
self.waker = Some(cx.waker().clone());
|
|
}
|
|
|
|
macro_rules! poll_behaviour {
|
|
/* $behaviour: The sub-behaviour being polled.
|
|
* $on_event_fn: Function to call if we get an event from the sub-behaviour.
|
|
* $notify_handler_event_closure: Closure mapping the received event type to
|
|
* the one that the handler should get.
|
|
*/
|
|
($behaviour: ident, $on_event_fn: ident, $notify_handler_event_closure: expr) => {
|
|
loop {
|
|
// poll the sub-behaviour
|
|
match self.$behaviour.poll(cx, poll_params) {
|
|
Poll::Ready(action) => match action {
|
|
// call the designated function to handle the event from sub-behaviour
|
|
NBAction::GenerateEvent(event) => self.$on_event_fn(event),
|
|
NBAction::DialAddress { address } => {
|
|
return Poll::Ready(NBAction::DialAddress { address })
|
|
}
|
|
NBAction::DialPeer { peer_id, condition } => {
|
|
return Poll::Ready(NBAction::DialPeer { peer_id, condition })
|
|
}
|
|
NBAction::NotifyHandler {
|
|
peer_id,
|
|
handler,
|
|
event,
|
|
} => {
|
|
return Poll::Ready(NBAction::NotifyHandler {
|
|
peer_id,
|
|
handler,
|
|
// call the closure mapping the received event to the needed one
|
|
// in order to notify the handler
|
|
event: BehaviourHandlerIn::Delegate(
|
|
$notify_handler_event_closure(event),
|
|
),
|
|
});
|
|
}
|
|
NBAction::ReportObservedAddr { address, score } => {
|
|
return Poll::Ready(NBAction::ReportObservedAddr { address, score })
|
|
}
|
|
},
|
|
Poll::Pending => break,
|
|
}
|
|
}
|
|
};
|
|
}
|
|
|
|
poll_behaviour!(gossipsub, on_gossip_event, DelegateIn::Gossipsub);
|
|
poll_behaviour!(eth2_rpc, on_rpc_event, DelegateIn::RPC);
|
|
poll_behaviour!(identify, on_identify_event, DelegateIn::Identify);
|
|
|
|
self.custom_poll(cx)
|
|
}
|
|
}
|
|
|
|
/* Public API types */
|
|
|
|
/// The type of RPC requests the Behaviour informs it has received and allows for sending.
|
|
///
|
|
// NOTE: This is an application-level wrapper over the lower network level requests that can be
|
|
// sent. The main difference is the absence of the Ping, Metadata and Goodbye protocols, which don't
|
|
// leave the Behaviour. For all protocols managed by RPC see `RPCRequest`.
|
|
#[derive(Debug, Clone, PartialEq)]
|
|
pub enum Request {
|
|
/// A Status message.
|
|
Status(StatusMessage),
|
|
/// A blocks by range request.
|
|
BlocksByRange(BlocksByRangeRequest),
|
|
/// A request blocks root request.
|
|
BlocksByRoot(BlocksByRootRequest),
|
|
}
|
|
|
|
impl<TSpec: EthSpec> std::convert::From<Request> for RPCRequest<TSpec> {
|
|
fn from(req: Request) -> RPCRequest<TSpec> {
|
|
match req {
|
|
Request::BlocksByRoot(r) => RPCRequest::BlocksByRoot(r),
|
|
Request::BlocksByRange(r) => RPCRequest::BlocksByRange(r),
|
|
Request::Status(s) => RPCRequest::Status(s),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// The type of RPC responses the Behaviour informs it has received, and allows for sending.
|
|
///
|
|
// NOTE: This is an application-level wrapper over the lower network level responses that can be
|
|
// sent. The main difference is the absense of Pong and Metadata, which don't leave the
|
|
// Behaviour. For all protocol reponses managed by RPC see `RPCResponse` and
|
|
// `RPCCodedResponse`.
|
|
#[derive(Debug, Clone, PartialEq)]
|
|
pub enum Response<TSpec: EthSpec> {
|
|
/// A Status message.
|
|
Status(StatusMessage),
|
|
/// A response to a get BLOCKS_BY_RANGE request. A None response signals the end of the batch.
|
|
BlocksByRange(Option<Box<SignedBeaconBlock<TSpec>>>),
|
|
/// A response to a get BLOCKS_BY_ROOT request.
|
|
BlocksByRoot(Option<Box<SignedBeaconBlock<TSpec>>>),
|
|
}
|
|
|
|
impl<TSpec: EthSpec> std::convert::From<Response<TSpec>> for RPCCodedResponse<TSpec> {
|
|
fn from(resp: Response<TSpec>) -> RPCCodedResponse<TSpec> {
|
|
match resp {
|
|
Response::BlocksByRoot(r) => match r {
|
|
Some(b) => RPCCodedResponse::Success(RPCResponse::BlocksByRoot(b)),
|
|
None => RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRoot),
|
|
},
|
|
Response::BlocksByRange(r) => match r {
|
|
Some(b) => RPCCodedResponse::Success(RPCResponse::BlocksByRange(b)),
|
|
None => RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange),
|
|
},
|
|
Response::Status(s) => RPCCodedResponse::Success(RPCResponse::Status(s)),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Persist metadata to disk
|
|
pub fn save_metadata_to_disk<E: EthSpec>(dir: &PathBuf, metadata: MetaData<E>, log: &slog::Logger) {
|
|
let _ = std::fs::create_dir_all(&dir);
|
|
match File::create(dir.join(METADATA_FILENAME))
|
|
.and_then(|mut f| f.write_all(&metadata.as_ssz_bytes()))
|
|
{
|
|
Ok(_) => {
|
|
debug!(log, "Metadata written to disk");
|
|
}
|
|
Err(e) => {
|
|
warn!(
|
|
log,
|
|
"Could not write metadata to disk";
|
|
"file" => format!("{:?}{:?}", dir, METADATA_FILENAME),
|
|
"error" => %e
|
|
);
|
|
}
|
|
}
|
|
}
|