diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 8b26417860..56f5c654e2 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -7,7 +7,6 @@ edition = "2018" [dependencies] client = { path = "client" } version = { path = "version" } - clap = "2.32.0" slog = "^2.2.3" slog-term = "^2.4.0" diff --git a/beacon_node/libp2p/Cargo.toml b/beacon_node/libp2p/Cargo.toml index ecd91e1701..dcbc04d0b9 100644 --- a/beacon_node/libp2p/Cargo.toml +++ b/beacon_node/libp2p/Cargo.toml @@ -8,6 +8,8 @@ edition = "2018" # SigP repository until PR is merged libp2p = { git = "https://github.com/SigP/rust-libp2p", branch = "gossipsub" } types = { path = "../../eth2/types" } +ssz = { path = "../../eth2/utils/ssz" } +ssz_derive = { path = "../../eth2/utils/ssz_derive" } slog = "2.4.1" version = { path = "../version" } tokio = "0.1.16" diff --git a/beacon_node/libp2p/src/lib.rs b/beacon_node/libp2p/src/lib.rs index a1bf4402c6..718b7fc222 100644 --- a/beacon_node/libp2p/src/lib.rs +++ b/beacon_node/libp2p/src/lib.rs @@ -5,6 +5,7 @@ pub mod behaviour; pub mod error; mod network_config; +mod rpc; mod service; pub use libp2p::{ diff --git a/beacon_node/libp2p/src/rpc/handler.rs b/beacon_node/libp2p/src/rpc/handler.rs new file mode 100644 index 0000000000..e69de29bb2 diff --git a/beacon_node/libp2p/src/rpc/methods.rs b/beacon_node/libp2p/src/rpc/methods.rs new file mode 100644 index 0000000000..d299e9bb7a --- /dev/null +++ b/beacon_node/libp2p/src/rpc/methods.rs @@ -0,0 +1,38 @@ +/// Available RPC methods types and ids. +use ssz_derive::{Decode, Encode}; +use types::{Epoch, Hash256, Slot}; + +#[derive(Debug)] +pub enum RPCMethod { + Hello, + Unknown, +} + +impl From for RPCMethod { + fn from(method_id: u16) -> Self { + match method_id { + 0 => RPCMethod::Hello, + _ => RPCMethod::Unknown, + } + } +} + +#[derive(Debug, Clone)] +pub enum RPCRequest { + HelloRequest, +} + +#[derive(Debug, Clone)] +pub enum RPCResponse { + HelloResponse(HelloResponse), +} + +// request/response structs for RPC methods +#[derive(Encode, Decode, Clone, Debug)] +pub struct HelloResponse { + pub network_id: u8, + pub latest_finalized_root: Hash256, + pub latest_finalized_epoch: Epoch, + pub best_root: Hash256, + pub best_slot: Slot, +} diff --git a/beacon_node/libp2p/src/rpc/mod.rs b/beacon_node/libp2p/src/rpc/mod.rs new file mode 100644 index 0000000000..004f17d9e8 --- /dev/null +++ b/beacon_node/libp2p/src/rpc/mod.rs @@ -0,0 +1,120 @@ +mod handler; +mod methods; +/// RPC Protocol over libp2p. +/// +/// This is purpose built for Ethereum 2.0 serenity and the protocol listens on +/// `/eth/serenity/rpc/1.0.0` +mod protocol; + +use futures::prelude::*; +use libp2p::core::protocols_handler::{OneShotHandler, ProtocolsHandler}; +use libp2p::core::swarm::{ + ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters, +}; +use libp2p::{Multiaddr, PeerId}; +use methods::RPCRequest; +use protocol::{RPCProtocol, RpcEvent}; +use std::marker::PhantomData; +use tokio::io::{AsyncRead, AsyncWrite}; + +/// The network behaviour handles RPC requests/responses as specified in the Eth 2.0 phase 0 +/// specification. + +pub struct Rpc { + /// Queue of events to processed. + events: Vec, + /// Pins the generic substream. + marker: PhantomData, +} + +impl Rpc { + pub fn new() -> Self { + Rpc { + events: Vec::new(), + marker: PhantomData, + } + } + + /// Submits and RPC request. + pub fn send_request(&mut self, id: u64, method_id: u16, body: RPCRequest) { + let request = RpcEvent::Request { + id, + method_id, + body, + }; + self.events.push(request); + } +} + +impl NetworkBehaviour for Rpc +where + TSubstream: AsyncRead + AsyncWrite, +{ + type ProtocolsHandler = OneShotHandler; + type OutEvent = RpcEvent; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + Default::default() + } + + fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec { + Vec::new() + } + + fn inject_connected(&mut self, _: PeerId, _: ConnectedPoint) {} + + fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {} + + fn inject_node_event( + &mut self, + source: PeerId, + event: ::OutEvent, + ) { + // ignore successful sends event + let event = match event { + OneShotEvent::Rx(event) => event, + OneShotEvent::Sent => return, + }; + + // send the event to the user + self.events.push(event); + } + + fn poll( + &mut self, + _: &mut PollParameters<'_>, + ) -> Async< + NetworkBehaviourAction< + ::InEvent, + Self::OutEvent, + >, + > { + if !self.events.is_empty() { + return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))); + } + Async::NotReady + } +} + +/// Transmission between the `OneShotHandler` and the `RpcEvent`. +#[derive(Debug)] +pub enum OneShotEvent { + /// We received an RPC from a remote. + Rx(RpcEvent), + /// We successfully sent an RPC request. + Sent, +} + +impl From for OneShotEvent { + #[inline] + fn from(rpc: RpcEvent) -> OneShotEvent { + OneShotEvent::Rx(rpc) + } +} + +impl From<()> for OneShotEvent { + #[inline] + fn from(_: ()) -> OneShotEvent { + OneShotEvent::Sent + } +} diff --git a/beacon_node/libp2p/src/rpc/protocol.rs b/beacon_node/libp2p/src/rpc/protocol.rs new file mode 100644 index 0000000000..e65927b03a --- /dev/null +++ b/beacon_node/libp2p/src/rpc/protocol.rs @@ -0,0 +1,179 @@ +use super::methods::HelloResponse; +use super::methods::{RPCMethod, RPCRequest, RPCResponse}; +//use crate::rpc_proto; +//use byteorder::{BigEndian, ByteOrder}; +//use bytes::BytesMut; +use futures::{future, stream, Future, Stream}; +use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo}; +//use std::{io, iter}; +use ssz::{ssz_encode, Decodable, Encodable, SszStream}; +use std::io; +use std::iter; +use tokio::io::{AsyncRead, AsyncWrite}; + +/// The maximum bytes that can be sent across the RPC. +const MAX_READ_SIZE: usize = 2048; + +/// Implementation of the `ConnectionUpgrade` for the rpc protocol. + +#[derive(Debug, Clone)] +pub struct RPCProtocol; + +impl UpgradeInfo for RPCProtocol { + type Info = &'static [u8]; + type InfoIter = iter::Once; + + #[inline] + fn protocol_info(&self) -> Self::InfoIter { + iter::once(b"/eth/serenity/rpc/1.0.0") + } +} + +impl Default for RPCProtocol { + fn default() -> Self { + RPCProtocol + } +} + +/// The RPC types which are sent/received in this protocol. +#[derive(Debug, Clone)] +pub enum RpcEvent { + Request { + id: u64, + method_id: u16, + body: RPCRequest, + }, + Response { + id: u64, + method_id: u16, //TODO: Remove and process decoding upstream + result: RPCResponse, + }, +} + +impl UpgradeInfo for RpcEvent { + type Info = &'static [u8]; + type InfoIter = iter::Once; + + #[inline] + fn protocol_info(&self) -> Self::InfoIter { + iter::once(b"/eth/serenity/rpc/1.0.0") + } +} + +impl InboundUpgrade for RPCProtocol +where + TSocket: AsyncRead + AsyncWrite, +{ + type Output = RpcEvent; + type Error = DecodeError; + type Future = + upgrade::ReadOneThen, ()) -> Result>; + + fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future { + upgrade::read_one_then(socket, MAX_READ_SIZE, (), |packet, ()| Ok(decode(packet)?)) + } +} + +fn decode(packet: Vec) -> Result { + // decode the header of the rpc + // request/response + let (request, index) = bool::ssz_decode(&packet, 0)?; + let (id, index) = u64::ssz_decode(&packet, index)?; + let (method_id, index) = u16::ssz_decode(&packet, index)?; + + if request { + let body = match RPCMethod::from(method_id) { + RPCMethod::Hello => RPCRequest::HelloRequest, + RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod), + }; + + return Ok(RpcEvent::Request { + id, + method_id, + body, + }); + } + // we have received a response + else { + let result = match RPCMethod::from(method_id) { + RPCMethod::Hello => { + let (hello_response, _index) = HelloResponse::ssz_decode(&packet, index)?; + RPCResponse::HelloResponse(hello_response) + } + RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod), + }; + return Ok(RpcEvent::Response { + id, + method_id, + result, + }); + } +} + +impl OutboundUpgrade for RpcEvent +where + TSocket: AsyncWrite, +{ + type Output = (); + type Error = io::Error; + type Future = upgrade::WriteOne; + + #[inline] + fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future { + let bytes = ssz_encode(&self); + upgrade::write_one(socket, bytes) + } +} + +impl Encodable for RpcEvent { + fn ssz_append(&self, s: &mut SszStream) { + match self { + RpcEvent::Request { + id, + method_id, + body, + } => { + s.append(&true); + s.append(id); + s.append(method_id); + match body { + RPCRequest::HelloRequest => {} + } + } + RpcEvent::Response { + id, + method_id, + result, + } => { + s.append(&false); + s.append(id); + s.append(method_id); + match result { + RPCResponse::HelloResponse(response) => { + s.append(response); + } + } + } + } + } +} + +pub enum DecodeError { + ReadError(upgrade::ReadOneError), + SSZDecodeError(ssz::DecodeError), + UnknownRPCMethod, +} + +impl From for DecodeError { + #[inline] + fn from(err: upgrade::ReadOneError) -> Self { + DecodeError::ReadError(err) + } +} + +impl From for DecodeError { + #[inline] + fn from(err: ssz::DecodeError) -> Self { + DecodeError::SSZDecodeError(err) + } +} diff --git a/beacon_node/libp2p/src/service.rs b/beacon_node/libp2p/src/service.rs index 26154beb66..00c11101c3 100644 --- a/beacon_node/libp2p/src/service.rs +++ b/beacon_node/libp2p/src/service.rs @@ -15,7 +15,7 @@ use libp2p::{PeerId, Swarm}; use slog::{debug, info, trace, warn}; use std::io::{Error, ErrorKind}; use std::time::Duration; -use types::{Topic, TopicBuilder}; +use types::TopicBuilder; /// The configuration and state of the libp2p components for the beacon node. pub struct Service {