diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 78d0130025..458b32cf93 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -1,11 +1,18 @@ use crate::rpc::{RPCEvent, RPCMessage, Rpc}; +use crate::NetworkConfig; use futures::prelude::*; use libp2p::{ - core::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}, - gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent}, + core::{ + swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}, + PublicKey, + }, + gossipsub::{Gossipsub, GossipsubEvent}, + identify::{protocol::IdentifyInfo, Identify, IdentifyEvent}, + ping::{Ping, PingEvent}, tokio_io::{AsyncRead, AsyncWrite}, NetworkBehaviour, PeerId, }; +use slog::{debug, o}; use types::Topic; /// Builds the network behaviour for the libp2p Swarm. @@ -13,12 +20,22 @@ use types::Topic; #[derive(NetworkBehaviour)] #[behaviour(out_event = "BehaviourEvent", poll_method = "poll")] pub struct Behaviour { + /// The routing pub-sub mechanism for eth2. gossipsub: Gossipsub, // TODO: Add Kademlia for peer discovery /// The events generated by this behaviour to be consumed in the swarm poll. serenity_rpc: Rpc, + /// Allows discovery of IP addresses for peers on the network. + identify: Identify, + /// Keep regular connection to peers and disconnect if absent. + // TODO: Keepalive, likely remove this later. + // TODO: Make the ping time customizeable. + ping: Ping, #[behaviour(ignore)] events: Vec, + /// Logger for behaviour actions. + #[behaviour(ignore)] + log: slog::Logger, } // Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour @@ -53,12 +70,54 @@ impl NetworkBehaviourEventProcess NetworkBehaviourEventProcess + for Behaviour +{ + fn inject_event(&mut self, event: IdentifyEvent) { + match event { + IdentifyEvent::Identified { + peer_id, mut info, .. + } => { + if info.listen_addrs.len() > 20 { + debug!( + self.log, + "More than 20 peers have been identified, truncating" + ); + info.listen_addrs.truncate(20); + } + self.events.push(BehaviourEvent::Identified(peer_id, info)); + } + IdentifyEvent::Error { .. } => {} + IdentifyEvent::SendBack { .. } => {} + } + } +} + +impl NetworkBehaviourEventProcess + for Behaviour +{ + fn inject_event(&mut self, _event: PingEvent) { + // not interested in ping responses at the moment. + } +} + impl Behaviour { - pub fn new(local_peer_id: PeerId, gs_config: GossipsubConfig, log: &slog::Logger) -> Self { + pub fn new(local_public_key: PublicKey, net_conf: &NetworkConfig, log: &slog::Logger) -> Self { + let local_peer_id = local_public_key.clone().into_peer_id(); + let identify_config = net_conf.identify_config.clone(); + let behaviour_log = log.new(o!()); + Behaviour { - gossipsub: Gossipsub::new(local_peer_id, gs_config), + gossipsub: Gossipsub::new(local_peer_id, net_conf.gs_config.clone()), serenity_rpc: Rpc::new(log), + identify: Identify::new( + identify_config.version, + identify_config.user_agent, + local_public_key, + ), + ping: Ping::new(), events: Vec::new(), + log: behaviour_log, } } @@ -91,6 +150,7 @@ impl Behaviour { pub enum BehaviourEvent { RPC(PeerId, RPCEvent), PeerDialed(PeerId), + Identified(PeerId, IdentifyInfo), // TODO: This is a stub at the moment Message(String), } diff --git a/beacon_node/eth2-libp2p/src/network_config.rs b/beacon_node/eth2-libp2p/src/config.rs similarity index 55% rename from beacon_node/eth2-libp2p/src/network_config.rs rename to beacon_node/eth2-libp2p/src/config.rs index 176892bb02..2b4972237d 100644 --- a/beacon_node/eth2-libp2p/src/network_config.rs +++ b/beacon_node/eth2-libp2p/src/config.rs @@ -1,11 +1,9 @@ use crate::Multiaddr; use libp2p::gossipsub::{GossipsubConfig, GossipsubConfigBuilder}; -use libp2p::secio; -use std::fmt; -#[derive(Clone)] +#[derive(Clone, Debug)] /// Network configuration for lighthouse. -pub struct NetworkConfig { +pub struct Config { //TODO: stubbing networking initial params, change in the future /// IP address to listen on. pub listen_addresses: Vec, @@ -13,47 +11,56 @@ pub struct NetworkConfig { pub listen_port: u16, /// Gossipsub configuration parameters. pub gs_config: GossipsubConfig, + /// Configuration parameters for node identification protocol. + pub identify_config: IdentifyConfig, /// List of nodes to initially connect to. pub boot_nodes: Vec, - /// Peer key related to this nodes PeerId. - pub local_private_key: secio::SecioKeyPair, /// Client version pub client_version: String, /// List of topics to subscribe to as strings pub topics: Vec, } -impl Default for NetworkConfig { +impl Default for Config { /// Generate a default network configuration. fn default() -> Self { - // TODO: Currently using secp256k1 key pairs. Wire protocol specifies RSA. Waiting for this - // PR to be merged to generate RSA keys: https://github.com/briansmith/ring/pull/733 - - NetworkConfig { + Config { listen_addresses: vec!["/ip4/127.0.0.1/tcp/9000" .parse() .expect("is a correct multi-address")], listen_port: 9000, gs_config: GossipsubConfigBuilder::new().build(), + identify_config: IdentifyConfig::default(), boot_nodes: Vec::new(), - local_private_key: secio::SecioKeyPair::secp256k1_generated().unwrap(), client_version: version::version(), topics: vec![String::from("beacon_chain")], } } } -impl NetworkConfig { +impl Config { pub fn new(boot_nodes: Vec) -> Self { - let mut conf = NetworkConfig::default(); + let mut conf = Config::default(); conf.boot_nodes = boot_nodes; conf } } -impl fmt::Debug for NetworkConfig { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "NetworkConfig: listen_addresses: {:?}, listen_port: {:?}, gs_config: {:?}, boot_nodes: {:?}, local_private_key: , client_version: {:?}", self.listen_addresses, self.listen_port, self.gs_config, self.boot_nodes, self.local_private_key.to_public_key(), self.client_version) +/// The configuration parameters for the Identify protocol +#[derive(Debug, Clone)] +pub struct IdentifyConfig { + /// The protocol version to listen on. + pub version: String, + /// The client's name and version for identification. + pub user_agent: String, +} + +impl Default for IdentifyConfig { + fn default() -> Self { + Self { + version: "/eth/serenity/1.0".to_string(), + user_agent: version::version(), + } } } diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2-libp2p/src/lib.rs index f3e97355d7..f7a961bb2e 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2-libp2p/src/lib.rs @@ -3,16 +3,16 @@ /// /// This crate builds and manages the libp2p services required by the beacon node. pub mod behaviour; +mod config; pub mod error; -mod network_config; pub mod rpc; mod service; +pub use config::Config as NetworkConfig; pub use libp2p::{ gossipsub::{GossipsubConfig, GossipsubConfigBuilder}, PeerId, }; -pub use network_config::NetworkConfig; pub use rpc::{HelloMessage, RPCEvent}; pub use service::Libp2pEvent; pub use service::Service; diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index c19aca8ffd..f4fe26fac3 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -1,4 +1,4 @@ -use super::methods::{HelloMessage, RPCMethod, RPCRequest, RPCResponse}; +use super::methods::*; use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use ssz::{ssz_encode, Decodable, Encodable, SszStream}; use std::io; @@ -6,7 +6,7 @@ use std::iter; use tokio::io::{AsyncRead, AsyncWrite}; /// The maximum bytes that can be sent across the RPC. -const MAX_READ_SIZE: usize = 2048; +const MAX_READ_SIZE: usize = 4_194_304; // 4M /// Implementation of the `ConnectionUpgrade` for the rpc protocol. @@ -81,7 +81,31 @@ fn decode(packet: Vec) -> Result { let (hello_body, _index) = HelloMessage::ssz_decode(&packet, index)?; RPCRequest::Hello(hello_body) } - RPCMethod::Unknown | _ => return Err(DecodeError::UnknownRPCMethod), + RPCMethod::Goodbye => { + let (goodbye_code, _index) = u64::ssz_decode(&packet, index)?; + RPCRequest::Goodbye(goodbye_code) + } + RPCMethod::BeaconBlockRoots => { + let (block_roots_request, _index) = + BeaconBlockRootsRequest::ssz_decode(&packet, index)?; + RPCRequest::BeaconBlockRoots(block_roots_request) + } + RPCMethod::BeaconBlockHeaders => { + let (block_headers_request, _index) = + BeaconBlockHeadersRequest::ssz_decode(&packet, index)?; + RPCRequest::BeaconBlockHeaders(block_headers_request) + } + RPCMethod::BeaconBlockBodies => { + let (block_bodies_request, _index) = + BeaconBlockBodiesRequest::ssz_decode(&packet, index)?; + RPCRequest::BeaconBlockBodies(block_bodies_request) + } + RPCMethod::BeaconChainState => { + let (chain_state_request, _index) = + BeaconChainStateRequest::ssz_decode(&packet, index)?; + RPCRequest::BeaconChainState(chain_state_request) + } + RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod), }; Ok(RPCEvent::Request { @@ -97,7 +121,24 @@ fn decode(packet: Vec) -> Result { let (body, _index) = HelloMessage::ssz_decode(&packet, index)?; RPCResponse::Hello(body) } - RPCMethod::Unknown | _ => return Err(DecodeError::UnknownRPCMethod), + RPCMethod::Goodbye => unreachable!("Should never receive a goodbye response"), + RPCMethod::BeaconBlockRoots => { + let (body, _index) = BeaconBlockRootsResponse::ssz_decode(&packet, index)?; + RPCResponse::BeaconBlockRoots(body) + } + RPCMethod::BeaconBlockHeaders => { + let (body, _index) = BeaconBlockHeadersResponse::ssz_decode(&packet, index)?; + RPCResponse::BeaconBlockHeaders(body) + } + RPCMethod::BeaconBlockBodies => { + let (body, _index) = BeaconBlockBodiesResponse::ssz_decode(&packet, index)?; + RPCResponse::BeaconBlockBodies(body) + } + RPCMethod::BeaconChainState => { + let (body, _index) = BeaconChainStateResponse::ssz_decode(&packet, index)?; + RPCResponse::BeaconChainState(body) + } + RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod), }; Ok(RPCEvent::Response { id, @@ -137,7 +178,21 @@ impl Encodable for RPCEvent { RPCRequest::Hello(body) => { s.append(body); } - _ => {} + RPCRequest::Goodbye(body) => { + s.append(body); + } + RPCRequest::BeaconBlockRoots(body) => { + s.append(body); + } + RPCRequest::BeaconBlockHeaders(body) => { + s.append(body); + } + RPCRequest::BeaconBlockBodies(body) => { + s.append(body); + } + RPCRequest::BeaconChainState(body) => { + s.append(body); + } } } RPCEvent::Response { @@ -152,7 +207,18 @@ impl Encodable for RPCEvent { RPCResponse::Hello(response) => { s.append(response); } - _ => {} + RPCResponse::BeaconBlockRoots(response) => { + s.append(response); + } + RPCResponse::BeaconBlockHeaders(response) => { + s.append(response); + } + RPCResponse::BeaconBlockBodies(response) => { + s.append(response); + } + RPCResponse::BeaconChainState(response) => { + s.append(response); + } } } } diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index e378cd634a..e68df2d389 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -11,8 +11,8 @@ use libp2p::core::{ transport::boxed::Boxed, upgrade::{InboundUpgradeExt, OutboundUpgradeExt}, }; -use libp2p::{core, secio, Transport}; -use libp2p::{PeerId, Swarm}; +use libp2p::identify::protocol::IdentifyInfo; +use libp2p::{core, secio, PeerId, Swarm, Transport}; use slog::{debug, info, trace, warn}; use std::io::{Error, ErrorKind}; use std::time::Duration; @@ -33,7 +33,12 @@ impl Service { pub fn new(config: NetworkConfig, log: slog::Logger) -> error::Result { debug!(log, "Libp2p Service starting"); - let local_private_key = config.local_private_key; + // TODO: Currently using secp256k1 key pairs. Wire protocol specifies RSA. Waiting for this + // PR to be merged to generate RSA keys: https://github.com/briansmith/ring/pull/733 + // TODO: Save and recover node key from disk + let local_private_key = secio::SecioKeyPair::secp256k1_generated().unwrap(); + + let local_public_key = local_private_key.to_public_key(); let local_peer_id = local_private_key.to_peer_id(); info!(log, "Local peer id: {:?}", local_peer_id); @@ -41,7 +46,7 @@ impl Service { // Set up the transport let transport = build_transport(local_private_key); // Set up gossipsub routing - let behaviour = Behaviour::new(local_peer_id.clone(), config.gs_config, &log); + let behaviour = Behaviour::new(local_public_key.clone(), &config, &log); // Set up Topology let topology = local_peer_id.clone(); Swarm::new(transport, behaviour, topology) @@ -99,17 +104,23 @@ impl Stream for Service { // TODO: Currently only gossipsub events passed here. // Build a type for more generic events match self.swarm.poll() { - Ok(Async::Ready(Some(BehaviourEvent::Message(m)))) => { + //Behaviour events + Ok(Async::Ready(Some(event))) => match event { // TODO: Stub here for debugging - debug!(self.log, "Message received: {}", m); - return Ok(Async::Ready(Some(Libp2pEvent::Message(m)))); - } - Ok(Async::Ready(Some(BehaviourEvent::RPC(peer_id, event)))) => { - return Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, event)))); - } - Ok(Async::Ready(Some(BehaviourEvent::PeerDialed(peer_id)))) => { - return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))); - } + BehaviourEvent::Message(m) => { + debug!(self.log, "Message received: {}", m); + return Ok(Async::Ready(Some(Libp2pEvent::Message(m)))); + } + BehaviourEvent::RPC(peer_id, event) => { + return Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, event)))); + } + BehaviourEvent::PeerDialed(peer_id) => { + return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))); + } + BehaviourEvent::Identified(peer_id, info) => { + return Ok(Async::Ready(Some(Libp2pEvent::Identified(peer_id, info)))); + } + }, Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"), Ok(Async::NotReady) => break, _ => break, @@ -156,8 +167,12 @@ fn build_transport( /// Events that can be obtained from polling the Libp2p Service. pub enum Libp2pEvent { - // We have received an RPC event on the swarm + /// An RPC response request has been received on the swarm. RPC(PeerId, RPCEvent), + /// Initiated the connection to a new peer. PeerDialed(PeerId), + /// Received information about a peer on the network. + Identified(PeerId, IdentifyInfo), + // TODO: Pub-sub testing only. Message(String), } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 14f994e4a5..a3eb6f0d9d 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -15,8 +15,8 @@ use tokio::runtime::TaskExecutor; /// Service that handles communication between internal services and the eth2_libp2p network service. pub struct Service { - //eth2_libp2p_service: Arc>, - eth2_libp2p_exit: oneshot::Sender<()>, + //libp2p_service: Arc>, + libp2p_exit: oneshot::Sender<()>, network_send: crossbeam_channel::Sender, //message_handler: MessageHandler, //message_handler_send: Sender, @@ -40,20 +40,20 @@ impl Service { message_handler_log, )?; - // launch eth2_libp2p service - let eth2_libp2p_log = log.new(o!("Service" => "Libp2p")); - let eth2_libp2p_service = LibP2PService::new(config.clone(), eth2_libp2p_log)?; + // launch libp2p service + let libp2p_log = log.new(o!("Service" => "Libp2p")); + let libp2p_service = LibP2PService::new(config.clone(), libp2p_log)?; - // TODO: Spawn thread to handle eth2_libp2p messages and pass to message handler thread. - let eth2_libp2p_exit = spawn_service( - eth2_libp2p_service, + // TODO: Spawn thread to handle libp2p messages and pass to message handler thread. + let libp2p_exit = spawn_service( + libp2p_service, network_recv, message_handler_send, executor, log, )?; let network_service = Service { - eth2_libp2p_exit, + libp2p_exit, network_send: network_send.clone(), }; @@ -72,7 +72,7 @@ impl Service { } fn spawn_service( - eth2_libp2p_service: LibP2PService, + libp2p_service: LibP2PService, network_recv: crossbeam_channel::Receiver, message_handler_send: crossbeam_channel::Sender, executor: &TaskExecutor, @@ -83,7 +83,7 @@ fn spawn_service( // spawn on the current executor executor.spawn( network_service( - eth2_libp2p_service, + libp2p_service, network_recv, message_handler_send, log.clone(), @@ -100,7 +100,7 @@ fn spawn_service( } fn network_service( - mut eth2_libp2p_service: LibP2PService, + mut libp2p_service: LibP2PService, network_recv: crossbeam_channel::Receiver, message_handler_send: crossbeam_channel::Sender, log: slog::Logger, @@ -108,28 +108,34 @@ fn network_service( futures::future::poll_fn(move || -> Result<_, eth2_libp2p::error::Error> { // poll the swarm loop { - match eth2_libp2p_service.poll() { - Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, rpc_event)))) => { - trace!( - eth2_libp2p_service.log, - "RPC Event: RPC message received: {:?}", - rpc_event - ); - message_handler_send - .send(HandlerMessage::RPC(peer_id, rpc_event)) - .map_err(|_| "failed to send rpc to handler")?; - } - Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))) => { - debug!(eth2_libp2p_service.log, "Peer Dialed: {:?}", peer_id); - message_handler_send - .send(HandlerMessage::PeerDialed(peer_id)) - .map_err(|_| "failed to send rpc to handler")?; - } - Ok(Async::Ready(Some(Libp2pEvent::Message(m)))) => debug!( - eth2_libp2p_service.log, - "Network Service: Message received: {}", m - ), - _ => break, + match libp2p_service.poll() { + Ok(Async::Ready(Some(event))) => match event { + Libp2pEvent::RPC(peer_id, rpc_event) => { + trace!(log, "RPC Event: RPC message received: {:?}", rpc_event); + message_handler_send + .send(HandlerMessage::RPC(peer_id, rpc_event)) + .map_err(|_| "failed to send rpc to handler")?; + } + Libp2pEvent::PeerDialed(peer_id) => { + debug!(log, "Peer Dialed: {:?}", peer_id); + message_handler_send + .send(HandlerMessage::PeerDialed(peer_id)) + .map_err(|_| "failed to send rpc to handler")?; + } + Libp2pEvent::Identified(peer_id, info) => { + debug!( + log, + "We have identified peer: {:?} with {:?}", peer_id, info + ); + } + Libp2pEvent::Message(m) => debug!( + libp2p_service.log, + "Network Service: Message received: {}", m + ), + }, + Ok(Async::Ready(None)) => unreachable!("Stream never ends"), + Ok(Async::NotReady) => break, + Err(_) => break, } } // poll the network channel @@ -143,7 +149,7 @@ fn network_service( trace!(log, "Sending RPC Event: {:?}", rpc_event); //TODO: Make swarm private //TODO: Implement correct peer id topic message handling - eth2_libp2p_service.swarm.send_rpc(peer_id, rpc_event); + libp2p_service.swarm.send_rpc(peer_id, rpc_event); } OutgoingMessage::NotifierTest => { debug!(log, "Received message from notifier"); @@ -165,7 +171,7 @@ fn network_service( /// Types of messages that the network service can receive. #[derive(Debug, Clone)] pub enum NetworkMessage { - /// Send a message to eth2_libp2p service. + /// Send a message to libp2p service. //TODO: Define typing for messages across the wire Send(PeerId, OutgoingMessage), }