use crate::peer_manager::{score::PeerAction, PeerManager, PeerManagerEvent}; use crate::rpc::*; use crate::service::METADATA_FILENAME; use crate::types::{GossipEncoding, GossipKind, GossipTopic, SubnetDiscovery}; use crate::Eth2Enr; use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash}; use futures::prelude::*; use handler::{BehaviourHandler, BehaviourHandlerIn, BehaviourHandlerOut, DelegateIn, DelegateOut}; use libp2p::{ core::{ connection::{ConnectedPoint, ConnectionId, ListenerId}, identity::Keypair, Multiaddr, }, gossipsub::{ Gossipsub, GossipsubEvent, IdentTopic as Topic, MessageAcceptance, MessageAuthenticity, MessageId, }, identify::{Identify, IdentifyEvent}, swarm::{ NetworkBehaviour, NetworkBehaviourAction as NBAction, NotifyHandler, PollParameters, ProtocolsHandler, }, PeerId, }; use slog::{crit, debug, o, trace, warn}; use ssz::Encode; use std::fs::File; use std::io::Write; use std::path::PathBuf; use std::{ collections::VecDeque, marker::PhantomData, sync::Arc, task::{Context, Poll}, }; use types::{EnrForkId, EthSpec, SignedBeaconBlock, SubnetId}; mod handler; const MAX_IDENTIFY_ADDRESSES: usize = 10; /// Identifier of requests sent by a peer. pub type PeerRequestId = (ConnectionId, SubstreamId); /// The types of events than can be obtained from polling the behaviour. #[derive(Debug)] pub enum BehaviourEvent { /// We have successfully dialed and connected to a peer. PeerDialed(PeerId), /// A peer has successfully dialed and connected to us. PeerConnected(PeerId), /// A peer has disconnected. PeerDisconnected(PeerId), /// An RPC Request that was sent failed. RPCFailed { /// The id of the failed request. id: RequestId, /// The peer to which this request was sent. peer_id: PeerId, /// The error that occurred. error: RPCError, }, RequestReceived { /// The peer that sent the request. peer_id: PeerId, /// Identifier of the request. All responses to this request must use this id. id: PeerRequestId, /// Request the peer sent. request: Request, }, ResponseReceived { /// Peer that sent the response. peer_id: PeerId, /// Id of the request to which the peer is responding. id: RequestId, /// Response the peer sent. response: Response, }, PubsubMessage { /// The gossipsub message id. Used when propagating blocks after validation. id: MessageId, /// The peer from which we received this message, not the peer that published it. source: PeerId, /// The topics that this message was sent on. topics: Vec, /// The message itself. message: PubsubMessage, }, /// Subscribed to peer for given topic PeerSubscribed(PeerId, TopicHash), /// Inform the network to send a Status to this peer. StatusPeer(PeerId), } /// Builds the network behaviour that manages the core protocols of eth2. /// This core behaviour is managed by `Behaviour` which adds peer management to all core /// behaviours. pub struct Behaviour { /// The routing pub-sub mechanism for eth2. gossipsub: Gossipsub, /// The Eth2 RPC specified in the wire-0 protocol. eth2_rpc: RPC, /// Keep regular connection to peers and disconnect if absent. // TODO: Using id for initial interop. This will be removed by mainnet. /// Provides IP addresses and peer information. identify: Identify, /// The peer manager that keeps track of peer's reputation and status. peer_manager: PeerManager, /// The output events generated by this behaviour to be consumed in the swarm poll. events: VecDeque>, /// Queue of peers to disconnect and an optional reason for the disconnection. peers_to_dc: VecDeque<(PeerId, Option)>, /// A collections of variables accessible outside the network service. network_globals: Arc>, /// Keeps track of the current EnrForkId for upgrading gossipsub topics. // NOTE: This can be accessed via the network_globals ENR. However we keep it here for quick // lookups for every gossipsub message send. enr_fork_id: EnrForkId, /// The waker for the current thread. waker: Option, /// Directory where metadata is stored network_dir: PathBuf, /// Logger for behaviour actions. log: slog::Logger, } /// Implements the combined behaviour for the libp2p service. impl Behaviour { pub async fn new( local_key: &Keypair, net_conf: &NetworkConfig, network_globals: Arc>, log: &slog::Logger, ) -> error::Result { let behaviour_log = log.new(o!()); let identify = Identify::new( "lighthouse/libp2p".into(), lighthouse_version::version_with_platform(), local_key.public(), ); let enr_fork_id = network_globals .local_enr() .eth2() .expect("Local ENR must have a fork id"); let gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, net_conf.gs_config.clone()) .map_err(|e| format!("Could not construct gossipsub: {:?}", e))?; // Temporarily disable scoring until parameters are tested. /* gossipsub .with_peer_score(PeerScoreParams::default(), PeerScoreThresholds::default()) .expect("Valid score params and thresholds"); */ Ok(Behaviour { eth2_rpc: RPC::new(log.clone()), gossipsub, identify, peer_manager: PeerManager::new(local_key, net_conf, network_globals.clone(), log) .await?, events: VecDeque::new(), peers_to_dc: VecDeque::new(), network_globals, enr_fork_id, waker: None, network_dir: net_conf.network_dir.clone(), log: behaviour_log, }) } /// Attempts to connect to a libp2p peer. /// /// This MUST be used over Swarm::dial() as this keeps track of the peer in the peer manager. /// /// All external dials, dial a multiaddr. This is currently unused but kept here in case any /// part of lighthouse needs to connect to a peer_id in the future. pub fn dial(&mut self, peer_id: &PeerId) { self.peer_manager.dial_peer(peer_id); } /// Returns the local ENR of the node. pub fn local_enr(&self) -> Enr { self.network_globals.local_enr() } /// Obtain a reference to the gossipsub protocol. pub fn gs(&self) -> &Gossipsub { &self.gossipsub } /* Pubsub behaviour functions */ /// Subscribes to a gossipsub topic kind, letting the network service determine the /// encoding and fork version. pub fn subscribe_kind(&mut self, kind: GossipKind) -> bool { let gossip_topic = GossipTopic::new( kind, GossipEncoding::default(), self.enr_fork_id.fork_digest, ); // TODO: Implement scoring // let topic: Topic = gossip_topic.into(); // self.gossipsub.set_topic_params(t.hash(), TopicScoreParams::default()); self.subscribe(gossip_topic) } /// Unsubscribes from a gossipsub topic kind, letting the network service determine the /// encoding and fork version. pub fn unsubscribe_kind(&mut self, kind: GossipKind) -> bool { let gossip_topic = GossipTopic::new( kind, GossipEncoding::default(), self.enr_fork_id.fork_digest, ); self.unsubscribe(gossip_topic) } /// Subscribes to a specific subnet id; pub fn subscribe_to_subnet(&mut self, subnet_id: SubnetId) -> bool { let topic = GossipTopic::new( subnet_id.into(), GossipEncoding::default(), self.enr_fork_id.fork_digest, ); // TODO: Implement scoring /* let t: Topic = topic.clone().into(); self.gossipsub .set_topic_params(t.hash(), TopicScoreParams::default()); */ self.subscribe(topic) } /// Un-Subscribes from a specific subnet id; pub fn unsubscribe_from_subnet(&mut self, subnet_id: SubnetId) -> bool { let topic = GossipTopic::new( subnet_id.into(), GossipEncoding::default(), self.enr_fork_id.fork_digest, ); self.unsubscribe(topic) } /// Subscribes to a gossipsub topic. fn subscribe(&mut self, topic: GossipTopic) -> bool { // update the network globals self.network_globals .gossipsub_subscriptions .write() .insert(topic.clone()); let topic: Topic = topic.into(); match self.gossipsub.subscribe(&topic) { Err(_) => { warn!(self.log, "Failed to subscribe to topic"; "topic" => topic.to_string()); false } Ok(v) => { debug!(self.log, "Subscribed to topic"; "topic" => topic.to_string()); v } } } /// Unsubscribe from a gossipsub topic. fn unsubscribe(&mut self, topic: GossipTopic) -> bool { // update the network globals self.network_globals .gossipsub_subscriptions .write() .remove(&topic); // unsubscribe from the topic let topic: Topic = topic.into(); match self.gossipsub.unsubscribe(&topic) { Err(_) => { warn!(self.log, "Failed to unsubscribe from topic"; "topic" => topic.to_string()); false } Ok(v) => { debug!(self.log, "Unsubscribed to topic"; "topic" => topic.to_string()); v } } } /// Publishes a list of messages on the pubsub (gossipsub) behaviour, choosing the encoding. pub fn publish(&mut self, messages: Vec>) { for message in messages { for topic in message.topics(GossipEncoding::default(), self.enr_fork_id.fork_digest) { match message.encode(GossipEncoding::default()) { Ok(message_data) => { if let Err(e) = self.gossipsub.publish(topic.clone().into(), message_data) { slog::warn!(self.log, "Could not publish message"; "error" => format!("{:?}", e)); // add to metrics match topic.kind() { GossipKind::Attestation(subnet_id) => { if let Some(v) = metrics::get_int_gauge( &metrics::FAILED_ATTESTATION_PUBLISHES_PER_SUBNET, &[&subnet_id.to_string()], ) { v.inc() }; } kind => { if let Some(v) = metrics::get_int_gauge( &metrics::FAILED_PUBLISHES_PER_MAIN_TOPIC, &[&format!("{:?}", kind)], ) { v.inc() }; } } } } Err(e) => crit!(self.log, "Could not publish message"; "error" => e), } } } } /// Informs the gossipsub about the result of a message validation. /// If the message is valid it will get propagated by gossipsub. pub fn report_message_validation_result( &mut self, propagation_source: &PeerId, message_id: MessageId, validation_result: MessageAcceptance, ) { if let Err(e) = self.gossipsub.report_message_validation_result( &message_id, propagation_source, validation_result, ) { warn!(self.log, "Failed to report message validation"; "message_id" => message_id.to_string(), "peer_id" => propagation_source.to_string(), "error" => format!("{:?}", e)); } } /* Eth2 RPC behaviour functions */ /// Send a request to a peer over RPC. pub fn send_request(&mut self, peer_id: PeerId, request_id: RequestId, request: Request) { self.eth2_rpc .send_request(peer_id, request_id, request.into()) } /// Send a successful response to a peer over RPC. pub fn send_successful_response( &mut self, peer_id: PeerId, id: PeerRequestId, response: Response, ) { self.eth2_rpc.send_response(peer_id, id, response.into()) } /// Inform the peer that their request produced an error. pub fn _send_error_reponse( &mut self, peer_id: PeerId, id: PeerRequestId, error: RPCResponseErrorCode, reason: String, ) { self.eth2_rpc .send_response(peer_id, id, RPCCodedResponse::Error(error, reason.into())) } /* Peer management functions */ /// Report a peer's action. pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction) { self.peer_manager.report_peer(peer_id, action) } /// Disconnects from a peer providing a reason. /// /// This will send a goodbye, disconnect and then ban the peer. /// This is fatal for a peer, and should be used in unrecoverable circumstances. pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason) { self.peer_manager.goodbye_peer(peer_id, reason); } /// Returns an iterator over all enr entries in the DHT. pub fn enr_entries(&mut self) -> Vec { self.peer_manager.discovery_mut().table_entries_enr() } /// Add an ENR to the routing table of the discovery mechanism. pub fn add_enr(&mut self, enr: Enr) { self.peer_manager.discovery_mut().add_enr(enr); } /// Updates a subnet value to the ENR bitfield. /// /// The `value` is `true` if a subnet is being added and false otherwise. pub fn update_enr_subnet(&mut self, subnet_id: SubnetId, value: bool) { if let Err(e) = self .peer_manager .discovery_mut() .update_enr_bitfield(subnet_id, value) { crit!(self.log, "Could not update ENR bitfield"; "error" => e); } // update the local meta data which informs our peers of the update during PINGS self.update_metadata(); } /// Attempts to discover new peers for a given subnet. The `min_ttl` gives the time at which we /// would like to retain the peers for. pub fn discover_subnet_peers(&mut self, subnet_subscriptions: Vec) { self.peer_manager .discover_subnet_peers(subnet_subscriptions) } /// Updates the local ENR's "eth2" field with the latest EnrForkId. pub fn update_fork_version(&mut self, enr_fork_id: EnrForkId) { self.peer_manager .discovery_mut() .update_eth2_enr(enr_fork_id.clone()); // unsubscribe from all gossip topics and re-subscribe to their new fork counterparts let subscribed_topics = self .network_globals .gossipsub_subscriptions .read() .iter() .cloned() .collect::>(); // unsubscribe from all topics for topic in &subscribed_topics { self.unsubscribe(topic.clone()); } // re-subscribe modifying the fork version for mut topic in subscribed_topics { *topic.digest() = enr_fork_id.fork_digest; self.subscribe(topic); } // update the local reference self.enr_fork_id = enr_fork_id; } /* Private internal functions */ /// Updates the current meta data of the node to match the local ENR. fn update_metadata(&mut self) { let local_attnets = self .peer_manager .discovery() .local_enr() .bitfield::() .expect("Local discovery must have bitfield"); { // write lock scope let mut meta_data = self.network_globals.local_metadata.write(); meta_data.seq_number += 1; meta_data.attnets = local_attnets; } // Save the updated metadata to disk save_metadata_to_disk( &self.network_dir, self.network_globals.local_metadata.read().clone(), &self.log, ); } /// Sends a Ping request to the peer. fn ping(&mut self, id: RequestId, peer_id: PeerId) { let ping = crate::rpc::Ping { data: self.network_globals.local_metadata.read().seq_number, }; trace!(self.log, "Sending Ping"; "request_id" => id, "peer_id" => peer_id.to_string()); self.eth2_rpc .send_request(peer_id, id, RPCRequest::Ping(ping)); } /// Sends a Pong response to the peer. fn pong(&mut self, id: PeerRequestId, peer_id: PeerId) { let ping = crate::rpc::Ping { data: self.network_globals.local_metadata.read().seq_number, }; trace!(self.log, "Sending Pong"; "request_id" => id.1, "peer_id" => peer_id.to_string()); let event = RPCCodedResponse::Success(RPCResponse::Pong(ping)); self.eth2_rpc.send_response(peer_id, id, event); } /// Sends a METADATA request to a peer. fn send_meta_data_request(&mut self, peer_id: PeerId) { let event = RPCRequest::MetaData(PhantomData); self.eth2_rpc .send_request(peer_id, RequestId::Behaviour, event); } /// Sends a METADATA response to a peer. fn send_meta_data_response(&mut self, id: PeerRequestId, peer_id: PeerId) { let event = RPCCodedResponse::Success(RPCResponse::MetaData( self.network_globals.local_metadata.read().clone(), )); self.eth2_rpc.send_response(peer_id, id, event); } /// Returns a reference to the peer manager to allow the swarm to notify the manager of peer /// status pub fn peer_manager(&mut self) -> &mut PeerManager { &mut self.peer_manager } fn on_gossip_event(&mut self, event: GossipsubEvent) { match event { GossipsubEvent::Message { propagation_source, message_id: id, message: gs_msg, } => { // Note: We are keeping track here of the peer that sent us the message, not the // peer that originally published the message. match PubsubMessage::decode(&gs_msg.topics, &gs_msg.data) { Err(e) => { debug!(self.log, "Could not decode gossipsub message"; "error" => e); //reject the message if let Err(e) = self.gossipsub.report_message_validation_result( &id, &propagation_source, MessageAcceptance::Reject, ) { warn!(self.log, "Failed to report message validation"; "message_id" => id.to_string(), "peer_id" => propagation_source.to_string(), "error" => format!("{:?}", e)); } } Ok(msg) => { // Notify the network self.add_event(BehaviourEvent::PubsubMessage { id, source: propagation_source, topics: gs_msg.topics, message: msg, }); } } } GossipsubEvent::Subscribed { peer_id, topic } => { self.add_event(BehaviourEvent::PeerSubscribed(peer_id, topic)); } GossipsubEvent::Unsubscribed { .. } => {} } } /// Queues the response to be sent upwards as long at it was requested outside the Behaviour. fn propagate_response(&mut self, id: RequestId, peer_id: PeerId, response: Response) { if !matches!(id, RequestId::Behaviour) { self.add_event(BehaviourEvent::ResponseReceived { peer_id, id, response, }); } } /// Convenience function to propagate a request. fn propagate_request(&mut self, id: PeerRequestId, peer_id: PeerId, request: Request) { self.add_event(BehaviourEvent::RequestReceived { peer_id, id, request, }); } fn on_rpc_event(&mut self, message: RPCMessage) { let peer_id = message.peer_id; let handler_id = message.conn_id; // The METADATA and PING RPC responses are handled within the behaviour and not propagated match message.event { Err(handler_err) => { match handler_err { HandlerErr::Inbound { id: _, proto, error, } => { if matches!(error, RPCError::HandlerRejected) { // this peer's request got canceled // TODO: cancel processing for this request } // Inform the peer manager of the error. // An inbound error here means we sent an error to the peer, or the stream // timed out. self.peer_manager.handle_rpc_error(&peer_id, proto, &error); } HandlerErr::Outbound { id, proto, error } => { // Inform the peer manager that a request we sent to the peer failed self.peer_manager.handle_rpc_error(&peer_id, proto, &error); // inform failures of requests comming outside the behaviour if !matches!(id, RequestId::Behaviour) { self.add_event(BehaviourEvent::RPCFailed { peer_id, id, error }); } } } } Ok(RPCReceived::Request(id, request)) => { let peer_request_id = (handler_id, id); match request { /* Behaviour managed protocols: Ping and Metadata */ RPCRequest::Ping(ping) => { // inform the peer manager and send the response self.peer_manager.ping_request(&peer_id, ping.data); // send a ping response self.pong(peer_request_id, peer_id); } RPCRequest::MetaData(_) => { // send the requested meta-data self.send_meta_data_response((handler_id, id), peer_id); // TODO: inform the peer manager? } RPCRequest::Goodbye(reason) => { // let the peer manager know this peer is in the process of disconnecting self.peer_manager._disconnecting_peer(&peer_id); // queue for disconnection without a goodbye message debug!( self.log, "Peer sent Goodbye"; "peer_id" => peer_id.to_string(), "reason" => reason.to_string(), "client" => self.network_globals.client(&peer_id).to_string(), ); self.peers_to_dc.push_back((peer_id, None)); // NOTE: We currently do not inform the application that we are // disconnecting here. // The actual disconnection event will be relayed to the application. Ideally // this time difference is short, but we may need to introduce a message to // inform the application layer early. } /* Protocols propagated to the Network */ RPCRequest::Status(msg) => { // inform the peer manager that we have received a status from a peer self.peer_manager.peer_statusd(&peer_id); // propagate the STATUS message upwards self.propagate_request(peer_request_id, peer_id, Request::Status(msg)) } RPCRequest::BlocksByRange(req) => self.propagate_request( peer_request_id, peer_id, Request::BlocksByRange(req), ), RPCRequest::BlocksByRoot(req) => { self.propagate_request(peer_request_id, peer_id, Request::BlocksByRoot(req)) } } } Ok(RPCReceived::Response(id, resp)) => { match resp { /* Behaviour managed protocols */ RPCResponse::Pong(ping) => self.peer_manager.pong_response(&peer_id, ping.data), RPCResponse::MetaData(meta_data) => { self.peer_manager.meta_data_response(&peer_id, meta_data) } /* Network propagated protocols */ RPCResponse::Status(msg) => { // inform the peer manager that we have received a status from a peer self.peer_manager.peer_statusd(&peer_id); // propagate the STATUS message upwards self.propagate_response(id, peer_id, Response::Status(msg)); } RPCResponse::BlocksByRange(resp) => { self.propagate_response(id, peer_id, Response::BlocksByRange(Some(resp))) } RPCResponse::BlocksByRoot(resp) => { self.propagate_response(id, peer_id, Response::BlocksByRoot(Some(resp))) } } } Ok(RPCReceived::EndOfStream(id, termination)) => { let response = match termination { ResponseTermination::BlocksByRange => Response::BlocksByRange(None), ResponseTermination::BlocksByRoot => Response::BlocksByRoot(None), }; self.propagate_response(id, peer_id, response); } } } /// Consumes the events list when polled. fn custom_poll( &mut self, cx: &mut Context, ) -> Poll, BehaviourEvent>> { // handle pending disconnections to perform if let Some((peer_id, reason)) = self.peers_to_dc.pop_front() { return Poll::Ready(NBAction::NotifyHandler { peer_id, handler: NotifyHandler::All, event: BehaviourHandlerIn::Shutdown( reason.map(|reason| (RequestId::Behaviour, RPCRequest::Goodbye(reason))), ), }); } // check the peer manager for events loop { match self.peer_manager.poll_next_unpin(cx) { Poll::Ready(Some(event)) => match event { PeerManagerEvent::Dial(peer_id) => { return Poll::Ready(NBAction::DialPeer { peer_id, condition: libp2p::swarm::DialPeerCondition::Disconnected, }); } PeerManagerEvent::SocketUpdated(address) => { return Poll::Ready(NBAction::ReportObservedAddr { address }); } PeerManagerEvent::Status(peer_id) => { // it's time to status. We don't keep a beacon chain reference here, so we inform // the network to send a status to this peer return Poll::Ready(NBAction::GenerateEvent(BehaviourEvent::StatusPeer( peer_id, ))); } PeerManagerEvent::Ping(peer_id) => { // send a ping request to this peer self.ping(RequestId::Behaviour, peer_id); } PeerManagerEvent::MetaData(peer_id) => { self.send_meta_data_request(peer_id); } PeerManagerEvent::DisconnectPeer(peer_id, reason) => { debug!(self.log, "PeerManager disconnecting peer"; "peer_id" => peer_id.to_string(), "reason" => reason.to_string()); // send one goodbye return Poll::Ready(NBAction::NotifyHandler { peer_id, handler: NotifyHandler::Any, event: BehaviourHandlerIn::Shutdown(Some(( RequestId::Behaviour, RPCRequest::Goodbye(reason), ))), }); } }, Poll::Pending => break, Poll::Ready(None) => break, // peer manager ended } } if let Some(event) = self.events.pop_front() { return Poll::Ready(NBAction::GenerateEvent(event)); } Poll::Pending } fn on_identify_event(&mut self, event: IdentifyEvent) { match event { IdentifyEvent::Received { peer_id, mut info, observed_addr, } => { if info.listen_addrs.len() > MAX_IDENTIFY_ADDRESSES { debug!( self.log, "More than 10 addresses have been identified, truncating" ); info.listen_addrs.truncate(MAX_IDENTIFY_ADDRESSES); } // send peer info to the peer manager. self.peer_manager.identify(&peer_id, &info); debug!(self.log, "Identified Peer"; "peer" => format!("{}", peer_id), "protocol_version" => info.protocol_version, "agent_version" => info.agent_version, "listening_ addresses" => format!("{:?}", info.listen_addrs), "observed_address" => format!("{:?}", observed_addr), "protocols" => format!("{:?}", info.protocols) ); } IdentifyEvent::Sent { .. } => {} IdentifyEvent::Error { .. } => {} } } /// Adds an event to the queue waking the current thread to process it. fn add_event(&mut self, event: BehaviourEvent) { self.events.push_back(event); if let Some(waker) = &self.waker { waker.wake_by_ref(); } } } /// Calls the given function with the given args on all sub behaviours. macro_rules! delegate_to_behaviours { ($self: ident, $fn: ident, $($arg: ident), *) => { $self.gossipsub.$fn($($arg),*); $self.eth2_rpc.$fn($($arg),*); $self.identify.$fn($($arg),*); }; } impl NetworkBehaviour for Behaviour { type ProtocolsHandler = BehaviourHandler; type OutEvent = BehaviourEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { BehaviourHandler::new(&mut self.gossipsub, &mut self.eth2_rpc, &mut self.identify) } fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { self.peer_manager.addresses_of_peer(peer_id) } // This gets called every time a connection is closed. fn inject_connection_closed( &mut self, peer_id: &PeerId, conn_id: &ConnectionId, endpoint: &ConnectedPoint, ) { // If the peer manager (and therefore the behaviour's) believe this peer connected, inform // about the disconnection. if self.network_globals.peers.read().is_connected(&peer_id) { return; } delegate_to_behaviours!(self, inject_connection_closed, peer_id, conn_id, endpoint); } // This gets called once there are no more active connections. fn inject_disconnected(&mut self, peer_id: &PeerId) { // If the application/behaviour layers thinks this peer has connected inform it of the disconnect. if self.network_globals.peers.read().is_connected(&peer_id) { // Inform the application. self.add_event(BehaviourEvent::PeerDisconnected(peer_id.clone())); // Inform the behaviour. delegate_to_behaviours!(self, inject_disconnected, peer_id); } // Inform the peer manager. // NOTE: It may be the case that a rejected node, due to too many peers is disconnected // here and the peer manager has no knowledge of its connection. We insert it here for // reference so that peer manager can track this peer. self.peer_manager.notify_disconnect(&peer_id); // Update the prometheus metrics metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT); metrics::set_gauge( &metrics::PEERS_CONNECTED, self.network_globals.connected_peers() as i64, ); } // This gets called every time a connection is established. fn inject_connection_established( &mut self, peer_id: &PeerId, conn_id: &ConnectionId, endpoint: &ConnectedPoint, ) { let goodbye_reason: Option = if self.peer_manager.is_banned(peer_id) { // If the peer is banned, send goodbye with reason banned. Some(GoodbyeReason::Banned) } else if self.peer_manager.peer_limit_reached() && self .network_globals .peers .read() .peer_info(peer_id) .map_or(true, |i| !i.has_future_duty()) { // If we are at our peer limit and we don't need the peer for a future validator // duty, send goodbye with reason TooManyPeers Some(GoodbyeReason::TooManyPeers) } else { None }; if goodbye_reason.is_some() { debug!(self.log, "Disconnecting newly connected peer"; "peer_id" => peer_id.to_string(), "reason" => goodbye_reason.as_ref().expect("Is some").to_string()); self.peers_to_dc .push_back((peer_id.clone(), goodbye_reason)); return; } // notify the peer manager of a successful connection match endpoint { ConnectedPoint::Listener { send_back_addr, .. } => { self.peer_manager .connect_ingoing(&peer_id, send_back_addr.clone()); self.add_event(BehaviourEvent::PeerConnected(peer_id.clone())); debug!(self.log, "Connection established"; "peer_id" => peer_id.to_string(), "connection" => "Incoming"); } ConnectedPoint::Dialer { address } => { self.peer_manager .connect_outgoing(&peer_id, address.clone()); self.add_event(BehaviourEvent::PeerDialed(peer_id.clone())); debug!(self.log, "Connection established"; "peer_id" => peer_id.to_string(), "connection" => "Dialed"); } } // report the event to the behaviour delegate_to_behaviours!( self, inject_connection_established, peer_id, conn_id, endpoint ); } // This gets called on the initial connection establishment. fn inject_connected(&mut self, peer_id: &PeerId) { // If the PeerManager has connected this peer, inform the behaviours if !self.network_globals.peers.read().is_connected(&peer_id) { return; } // increment prometheus metrics metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT); metrics::set_gauge( &metrics::PEERS_CONNECTED, self.network_globals.connected_peers() as i64, ); delegate_to_behaviours!(self, inject_connected, peer_id); } fn inject_addr_reach_failure( &mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, error: &dyn std::error::Error, ) { delegate_to_behaviours!(self, inject_addr_reach_failure, peer_id, addr, error); } fn inject_dial_failure(&mut self, peer_id: &PeerId) { // Could not dial the peer, inform the peer manager. self.peer_manager.notify_dial_failure(&peer_id); delegate_to_behaviours!(self, inject_dial_failure, peer_id); } fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { delegate_to_behaviours!(self, inject_new_listen_addr, addr); } fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { delegate_to_behaviours!(self, inject_expired_listen_addr, addr); } fn inject_new_external_addr(&mut self, addr: &Multiaddr) { delegate_to_behaviours!(self, inject_new_external_addr, addr); } fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) { delegate_to_behaviours!(self, inject_listener_error, id, err); } fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &std::io::Error>) { delegate_to_behaviours!(self, inject_listener_closed, id, reason); } fn inject_event( &mut self, peer_id: PeerId, conn_id: ConnectionId, event: ::OutEvent, ) { // If the peer is not supposed to be connected (undergoing active disconnection, // don't process any of its messages. if !self.network_globals.peers.read().is_connected(&peer_id) { return; } match event { // Events comming from the handler, redirected to each behaviour BehaviourHandlerOut::Delegate(delegate) => match *delegate { DelegateOut::Gossipsub(ev) => self.gossipsub.inject_event(peer_id, conn_id, ev), DelegateOut::RPC(ev) => self.eth2_rpc.inject_event(peer_id, conn_id, ev), DelegateOut::Identify(ev) => self.identify.inject_event(peer_id, conn_id, *ev), }, /* Custom events sent BY the handler */ BehaviourHandlerOut::Custom => { // TODO: implement } } } fn poll( &mut self, cx: &mut Context, poll_params: &mut impl PollParameters, ) -> Poll::InEvent, Self::OutEvent>> { // update the waker if needed if let Some(waker) = &self.waker { if waker.will_wake(cx.waker()) { self.waker = Some(cx.waker().clone()); } } else { self.waker = Some(cx.waker().clone()); } // TODO: move where it's less distracting macro_rules! poll_behaviour { /* $behaviour: The sub-behaviour being polled. * $on_event_fn: Function to call if we get an event from the sub-behaviour. * $notify_handler_event_closure: Closure mapping the received event type to * the one that the handler should get. */ ($behaviour: ident, $on_event_fn: ident, $notify_handler_event_closure: expr) => { loop { // poll the sub-behaviour match self.$behaviour.poll(cx, poll_params) { Poll::Ready(action) => match action { // call the designated function to handle the event from sub-behaviour NBAction::GenerateEvent(event) => self.$on_event_fn(event), NBAction::DialAddress { address } => { return Poll::Ready(NBAction::DialAddress { address }) } NBAction::DialPeer { peer_id, condition } => { return Poll::Ready(NBAction::DialPeer { peer_id, condition }) } NBAction::NotifyHandler { peer_id, handler, event, } => { return Poll::Ready(NBAction::NotifyHandler { peer_id, handler, // call the closure mapping the received event to the needed one // in order to notify the handler event: BehaviourHandlerIn::Delegate( $notify_handler_event_closure(event), ), }); } NBAction::ReportObservedAddr { address } => { return Poll::Ready(NBAction::ReportObservedAddr { address }) } }, Poll::Pending => break, } } }; } poll_behaviour!(gossipsub, on_gossip_event, DelegateIn::Gossipsub); poll_behaviour!(eth2_rpc, on_rpc_event, DelegateIn::RPC); poll_behaviour!(identify, on_identify_event, DelegateIn::Identify); self.custom_poll(cx) } } /* Public API types */ /// The type of RPC requests the Behaviour informs it has received and allows for sending. /// // NOTE: This is an application-level wrapper over the lower network level requests that can be // sent. The main difference is the absence of the Ping, Metadata and Goodbye protocols, which don't // leave the Behaviour. For all protocols managed by RPC see `RPCRequest`. #[derive(Debug, Clone, PartialEq)] pub enum Request { /// A Status message. Status(StatusMessage), /// A blocks by range request. BlocksByRange(BlocksByRangeRequest), /// A request blocks root request. BlocksByRoot(BlocksByRootRequest), } impl std::convert::From for RPCRequest { fn from(req: Request) -> RPCRequest { match req { Request::BlocksByRoot(r) => RPCRequest::BlocksByRoot(r), Request::BlocksByRange(r) => RPCRequest::BlocksByRange(r), Request::Status(s) => RPCRequest::Status(s), } } } /// The type of RPC responses the Behaviour informs it has received, and allows for sending. /// // NOTE: This is an application-level wrapper over the lower network level responses that can be // sent. The main difference is the absense of Pong and Metadata, which don't leave the // Behaviour. For all protocol reponses managed by RPC see `RPCResponse` and // `RPCCodedResponse`. #[derive(Debug, Clone, PartialEq)] pub enum Response { /// A Status message. Status(StatusMessage), /// A response to a get BLOCKS_BY_RANGE request. A None response signals the end of the batch. BlocksByRange(Option>>), /// A response to a get BLOCKS_BY_ROOT request. BlocksByRoot(Option>>), } impl std::convert::From> for RPCCodedResponse { fn from(resp: Response) -> RPCCodedResponse { match resp { Response::BlocksByRoot(r) => match r { Some(b) => RPCCodedResponse::Success(RPCResponse::BlocksByRoot(b)), None => RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRoot), }, Response::BlocksByRange(r) => match r { Some(b) => RPCCodedResponse::Success(RPCResponse::BlocksByRange(b)), None => RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange), }, Response::Status(s) => RPCCodedResponse::Success(RPCResponse::Status(s)), } } } /// Persist metadata to disk pub fn save_metadata_to_disk(dir: &PathBuf, metadata: MetaData, log: &slog::Logger) { let _ = std::fs::create_dir_all(&dir); match File::create(dir.join(METADATA_FILENAME)) .and_then(|mut f| f.write_all(&metadata.as_ssz_bytes())) { Ok(_) => { debug!(log, "Metadata written to disk"); } Err(e) => { warn!( log, "Could not write metadata to disk"; "file" => format!("{:?}{:?}",dir, METADATA_FILENAME), "error" => format!("{}", e) ); } } }