Files
lighthouse/beacon_node/eth2-libp2p/src/behaviour.rs
Age Manning 6ca4f4709b Connects the attestation service to network components (#961)
* Sends attestations to the attestation service for processing

* Adds 'attnets' field to local ENR

* Adds ENR bitfield modification logic

* Link attestation service to discovery

- Updates discv5
- Links discover events to discovery
- Support for ENRBitfield

* Adds discovery config params, correct warnings

* Rust fmt fixes

* Correct tests
2020-03-25 22:18:06 +11:00

343 lines
12 KiB
Rust

use crate::discovery::Discovery;
use crate::rpc::{RPCEvent, RPCMessage, RPC};
use crate::types::GossipEncoding;
use crate::Enr;
use crate::{error, GossipTopic, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
use futures::prelude::*;
use libp2p::{
core::identity::Keypair,
discv5::Discv5Event,
gossipsub::{Gossipsub, GossipsubEvent, MessageId},
identify::{Identify, IdentifyEvent},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess},
tokio_io::{AsyncRead, AsyncWrite},
NetworkBehaviour, PeerId,
};
use lru::LruCache;
use slog::{crit, debug, o, warn};
use std::sync::Arc;
use types::{EnrForkId, EthSpec, SubnetId};
const MAX_IDENTIFY_ADDRESSES: usize = 20;
/// Builds the network behaviour that manages the core protocols of eth2.
/// This core behaviour is managed by `Behaviour` which adds peer management to all core
/// behaviours.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourEvent<TSpec>", poll_method = "poll")]
pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> {
/// The routing pub-sub mechanism for eth2.
gossipsub: Gossipsub<TSubstream>,
/// The Eth2 RPC specified in the wire-0 protocol.
eth2_rpc: RPC<TSubstream, TSpec>,
/// Keep regular connection to peers and disconnect if absent.
// TODO: Using id for initial interop. This will be removed by mainnet.
/// Provides IP addresses and peer information.
identify: Identify<TSubstream>,
/// Discovery behaviour.
discovery: Discovery<TSubstream, TSpec>,
/// The events generated by this behaviour to be consumed in the swarm poll.
#[behaviour(ignore)]
events: Vec<BehaviourEvent<TSpec>>,
/// A cache of recently seen gossip messages. This is used to filter out any possible
/// duplicates that may still be seen over gossipsub.
#[behaviour(ignore)]
seen_gossip_messages: LruCache<MessageId, ()>,
/// A collections of variables accessible outside the network service.
#[behaviour(ignore)]
network_globals: Arc<NetworkGlobals<TSpec>>,
#[behaviour(ignore)]
/// Logger for behaviour actions.
log: slog::Logger,
}
impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, TSpec> {
pub fn new(
local_key: &Keypair,
net_conf: &NetworkConfig,
network_globals: Arc<NetworkGlobals<TSpec>>,
enr_fork_id: EnrForkId,
log: &slog::Logger,
) -> error::Result<Self> {
let local_peer_id = local_key.public().into_peer_id();
let behaviour_log = log.new(o!());
let identify = Identify::new(
"lighthouse/libp2p".into(),
version::version(),
local_key.public(),
);
Ok(Behaviour {
eth2_rpc: RPC::new(log.clone()),
gossipsub: Gossipsub::new(local_peer_id, net_conf.gs_config.clone()),
discovery: Discovery::new(
local_key,
net_conf,
enr_fork_id,
network_globals.clone(),
log,
)?,
identify,
events: Vec::new(),
seen_gossip_messages: LruCache::new(100_000),
network_globals,
log: behaviour_log,
})
}
pub fn discovery(&self) -> &Discovery<TSubstream, TSpec> {
&self.discovery
}
pub fn gs(&self) -> &Gossipsub<TSubstream> {
&self.gossipsub
}
}
/// Implements the combined behaviour for the libp2p service.
impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, TSpec> {
/* Pubsub behaviour functions */
/// Subscribes to a gossipsub topic.
pub fn subscribe(&mut self, topic: GossipTopic) -> bool {
if !self
.network_globals
.gossipsub_subscriptions
.read()
.contains(&topic)
{
self.network_globals
.gossipsub_subscriptions
.write()
.push(topic.clone());
}
self.gossipsub.subscribe(topic.into())
}
/// Subscribes to a specific subnet id;
pub fn subscribe_to_subnet(&mut self, subnet_id: SubnetId) {
let topic = GossipTopic::new(subnet_id.into(), GossipEncoding::SSZ);
self.subscribe(topic);
}
/// Unsubscribe from a gossipsub topic.
pub fn unsubscribe(&mut self, topic: GossipTopic) -> bool {
let pos = self
.network_globals
.gossipsub_subscriptions
.read()
.iter()
.position(|s| s == &topic);
if let Some(pos) = pos {
self.network_globals
.gossipsub_subscriptions
.write()
.swap_remove(pos);
}
self.gossipsub.unsubscribe(topic.into())
}
/// Un-Subscribes from a specific subnet id;
pub fn unsubscribe_from_subnet(&mut self, subnet_id: SubnetId) {
let topic = GossipTopic::new(subnet_id.into(), GossipEncoding::SSZ);
self.unsubscribe(topic);
}
/// Publishes a list of messages on the pubsub (gossipsub) behaviour, choosing the encoding.
pub fn publish(&mut self, messages: Vec<PubsubMessage<TSpec>>) {
for message in messages {
for topic in message.topics() {
let message_data = message.encode();
self.gossipsub.publish(&topic.into(), message_data);
}
}
}
/// Forwards a message that is waiting in gossipsub's mcache. Messages are only propagated
/// once validated by the beacon chain.
pub fn propagate_message(&mut self, propagation_source: &PeerId, message_id: MessageId) {
self.gossipsub
.propagate_message(&message_id, propagation_source);
}
/* Eth2 RPC behaviour functions */
/// Sends an RPC Request/Response via the RPC protocol.
pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent<TSpec>) {
self.eth2_rpc.send_rpc(peer_id, rpc_event);
}
/* Discovery / Peer management functions */
/// Notify discovery that the peer has been banned.
pub fn peer_banned(&mut self, peer_id: PeerId) {
self.discovery.peer_banned(peer_id);
}
/// Notify discovery that the peer has been unbanned.
pub fn peer_unbanned(&mut self, peer_id: &PeerId) {
self.discovery.peer_unbanned(peer_id);
}
/// Returns an iterator over all enr entries in the DHT.
pub fn enr_entries(&mut self) -> impl Iterator<Item = &Enr> {
self.discovery.enr_entries()
}
/// Add an ENR to the routing table of the discovery mechanism.
pub fn add_enr(&mut self, enr: Enr) {
self.discovery.add_enr(enr);
}
/// Updates a subnet value to the ENR bitfield.
///
/// The `value` is `true` if a subnet is being added and false otherwise.
pub fn update_enr_subnet(&mut self, subnet_id: SubnetId, value: bool) {
if let Err(e) = self.discovery.update_enr_bitfield(subnet_id, value) {
crit!(self.log, "Could not update ENR bitfield"; "error" => e);
}
}
/// A request to search for peers connected to a long-lived subnet.
pub fn peers_request(&mut self, subnet_id: SubnetId) {
self.discovery.peers_request(subnet_id);
}
/// Updates the local ENR's "eth2" field with the latest EnrForkId.
pub fn update_fork_version(&mut self, enr_fork_id: EnrForkId) {
self.discovery.update_eth2_enr(enr_fork_id);
// TODO: Handle gossipsub fork update
}
}
// Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour
impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec>
NetworkBehaviourEventProcess<GossipsubEvent> for Behaviour<TSubstream, TSpec>
{
fn inject_event(&mut self, event: GossipsubEvent) {
match event {
GossipsubEvent::Message(propagation_source, id, gs_msg) => {
// Note: We are keeping track here of the peer that sent us the message, not the
// peer that originally published the message.
if self.seen_gossip_messages.put(id.clone(), ()).is_none() {
match PubsubMessage::decode(&gs_msg.topics, &gs_msg.data) {
Err(e) => {
debug!(self.log, "Could not decode gossipsub message"; "error" => format!("{}", e))
}
Ok(msg) => {
// if this message isn't a duplicate, notify the network
self.events.push(BehaviourEvent::GossipMessage {
id,
source: propagation_source,
topics: gs_msg.topics,
message: msg,
});
}
}
} else {
warn!(self.log, "A duplicate gossipsub message was received"; "message" => format!("{:?}", gs_msg));
}
}
GossipsubEvent::Subscribed { peer_id, topic } => {
self.events
.push(BehaviourEvent::PeerSubscribed(peer_id, topic));
}
GossipsubEvent::Unsubscribed { .. } => {}
}
}
}
impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec>
NetworkBehaviourEventProcess<RPCMessage<TSpec>> for Behaviour<TSubstream, TSpec>
{
fn inject_event(&mut self, event: RPCMessage<TSpec>) {
match event {
RPCMessage::PeerDialed(peer_id) => {
self.events.push(BehaviourEvent::PeerDialed(peer_id))
}
RPCMessage::PeerDisconnected(peer_id) => {
self.events.push(BehaviourEvent::PeerDisconnected(peer_id))
}
RPCMessage::RPC(peer_id, rpc_event) => {
self.events.push(BehaviourEvent::RPC(peer_id, rpc_event))
}
}
}
}
impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> Behaviour<TSubstream, TSpec> {
/// Consumes the events list when polled.
fn poll<TBehaviourIn>(
&mut self,
) -> Async<NetworkBehaviourAction<TBehaviourIn, BehaviourEvent<TSpec>>> {
if !self.events.is_empty() {
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)));
}
Async::NotReady
}
}
impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> NetworkBehaviourEventProcess<IdentifyEvent>
for Behaviour<TSubstream, TSpec>
{
fn inject_event(&mut self, event: IdentifyEvent) {
match event {
IdentifyEvent::Received {
peer_id,
mut info,
observed_addr,
} => {
if info.listen_addrs.len() > MAX_IDENTIFY_ADDRESSES {
debug!(
self.log,
"More than 20 addresses have been identified, truncating"
);
info.listen_addrs.truncate(MAX_IDENTIFY_ADDRESSES);
}
debug!(self.log, "Identified Peer"; "peer" => format!("{}", peer_id),
"protocol_version" => info.protocol_version,
"agent_version" => info.agent_version,
"listening_ addresses" => format!("{:?}", info.listen_addrs),
"observed_address" => format!("{:?}", observed_addr),
"protocols" => format!("{:?}", info.protocols)
);
}
IdentifyEvent::Sent { .. } => {}
IdentifyEvent::Error { .. } => {}
}
}
}
impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec> NetworkBehaviourEventProcess<Discv5Event>
for Behaviour<TSubstream, TSpec>
{
fn inject_event(&mut self, _event: Discv5Event) {
// discv5 has no events to inject
}
}
/// The types of events than can be obtained from polling the behaviour.
pub enum BehaviourEvent<TSpec: EthSpec> {
/// A received RPC event and the peer that it was received from.
RPC(PeerId, RPCEvent<TSpec>),
/// We have completed an initial connection to a new peer.
PeerDialed(PeerId),
/// A peer has disconnected.
PeerDisconnected(PeerId),
/// A gossipsub message has been received.
GossipMessage {
/// The gossipsub message id. Used when propagating blocks after validation.
id: MessageId,
/// The peer from which we received this message, not the peer that published it.
source: PeerId,
/// The topics that this message was sent on.
topics: Vec<TopicHash>,
/// The message itself.
message: PubsubMessage<TSpec>,
},
/// Subscribed to peer for given topic
PeerSubscribed(PeerId, TopicHash),
}