From 746935fa41f5786fefe583c2541fe30314adc9e3 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 4 Jul 2019 14:05:01 +1000 Subject: [PATCH] Initial sub-protocol implementation --- beacon_node/eth2-libp2p/src/rpc/methods.rs | 57 +------ beacon_node/eth2-libp2p/src/rpc/mod.rs | 44 +++--- beacon_node/eth2-libp2p/src/rpc/protocol.rs | 165 +++++++++----------- 3 files changed, 103 insertions(+), 163 deletions(-) diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index b752b74cbb..76f62f23ae 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -23,34 +23,8 @@ pub enum RPCMethod { Unknown, } -impl From for RPCMethod { - fn from(method_id: u16) -> Self { - match method_id { - 0 => RPCMethod::Hello, - 1 => RPCMethod::Goodbye, - 10 => RPCMethod::BeaconBlockRoots, - 11 => RPCMethod::BeaconBlockHeaders, - 12 => RPCMethod::BeaconBlockBodies, - 13 => RPCMethod::BeaconChainState, +pub enum RawRPCRequest - _ => RPCMethod::Unknown, - } - } -} - -impl Into for RPCMethod { - fn into(self) -> u16 { - match self { - RPCMethod::Hello => 0, - RPCMethod::Goodbye => 1, - RPCMethod::BeaconBlockRoots => 10, - RPCMethod::BeaconBlockHeaders => 11, - RPCMethod::BeaconBlockBodies => 12, - RPCMethod::BeaconChainState => 13, - _ => 0, - } - } -} #[derive(Debug, Clone)] pub enum RPCRequest { @@ -62,20 +36,6 @@ pub enum RPCRequest { BeaconChainState(BeaconChainStateRequest), } -impl RPCRequest { - pub fn method_id(&self) -> u16 { - let method = match self { - RPCRequest::Hello(_) => RPCMethod::Hello, - RPCRequest::Goodbye(_) => RPCMethod::Goodbye, - RPCRequest::BeaconBlockRoots(_) => RPCMethod::BeaconBlockRoots, - RPCRequest::BeaconBlockHeaders(_) => RPCMethod::BeaconBlockHeaders, - RPCRequest::BeaconBlockBodies(_) => RPCMethod::BeaconBlockBodies, - RPCRequest::BeaconChainState(_) => RPCMethod::BeaconChainState, - }; - method.into() - } -} - #[derive(Debug, Clone)] pub enum RPCResponse { Hello(HelloMessage), @@ -85,19 +45,6 @@ pub enum RPCResponse { BeaconChainState(BeaconChainStateResponse), } -impl RPCResponse { - pub fn method_id(&self) -> u16 { - let method = match self { - RPCResponse::Hello(_) => RPCMethod::Hello, - RPCResponse::BeaconBlockRoots(_) => RPCMethod::BeaconBlockRoots, - RPCResponse::BeaconBlockHeaders(_) => RPCMethod::BeaconBlockHeaders, - RPCResponse::BeaconBlockBodies(_) => RPCMethod::BeaconBlockBodies, - RPCResponse::BeaconChainState(_) => RPCMethod::BeaconChainState, - }; - method.into() - } -} - /* Request/Response data structures for RPC methods */ /// The HELLO request/response handshake message. @@ -170,7 +117,7 @@ pub struct BeaconBlockRootsResponse { } impl BeaconBlockRootsResponse { - /// Returns `true` if each `self.roots.slot[i]` is higher than the preceeding `i`. + /// Returns `true` if each `self.roots.slot[i]` is higher than the preceding `i`. pub fn slots_are_ascending(&self) -> bool { for window in self.roots.windows(2) { if window[0].slot >= window[1].slot { diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index 2d303469c6..59015a86ef 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -1,7 +1,9 @@ -/// RPC Protocol over libp2p. +/// The Ethereum 2.0 Wire Protocol +/// +/// This protocol is a purpose built ethereum 2.0 libp2p protocol. It's role is to facilitate +/// direct peer-to-peer communication primarily for sending/receiving chain information for +/// syncing. /// -/// This is purpose built for Ethereum 2.0 serenity and the protocol listens on -/// `/eth/serenity/rpc/1.0.0` pub mod methods; mod protocol; @@ -17,9 +19,8 @@ use slog::o; use std::marker::PhantomData; use tokio::io::{AsyncRead, AsyncWrite}; -/// The network behaviour handles RPC requests/responses as specified in the Eth 2.0 phase 0 -/// specification. - +/// This struct implements the libp2p `NetworkBehaviour` trait and therefore manages network-level +/// logic. pub struct Rpc { /// Queue of events to processed. events: Vec>, @@ -39,7 +40,9 @@ impl Rpc { } } - /// Submits and RPC request. + /// Submits an RPC request. + /// + /// The peer must be connected for this to succeed. pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { self.events.push(NetworkBehaviourAction::SendEvent { peer_id, @@ -52,13 +55,14 @@ impl NetworkBehaviour for Rpc where TSubstream: AsyncRead + AsyncWrite, { - type ProtocolsHandler = OneShotHandler; + type ProtocolsHandler = OneShotHandler; type OutEvent = RPCMessage; fn new_handler(&mut self) -> Self::ProtocolsHandler { Default::default() } + // handled by discovery fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec { Vec::new() } @@ -81,8 +85,8 @@ where ) { // ignore successful send events let event = match event { - OneShotEvent::Rx(event) => event, - OneShotEvent::Sent => return, + HandlerEvent::Rx(event) => event, + HandlerEvent::Sent => return, }; // send the event to the user @@ -114,25 +118,25 @@ pub enum RPCMessage { PeerDialed(PeerId), } -/// Transmission between the `OneShotHandler` and the `RPCEvent`. +/// The output type received from the `OneShotHandler`. #[derive(Debug)] -pub enum OneShotEvent { - /// We received an RPC from a remote. +pub enum HandlerEvent { + /// An RPC was received from a remote. Rx(RPCEvent), - /// We successfully sent an RPC request. + /// An RPC was sent. Sent, } -impl From for OneShotEvent { +impl From for HandlerEvent { #[inline] - fn from(rpc: RPCEvent) -> OneShotEvent { - OneShotEvent::Rx(rpc) + fn from(rpc: RPCEvent) -> HandlerEvent { + HandlerEvent::Rx(rpc) } } -impl From<()> for OneShotEvent { +impl From<()> for HandlerEvent { #[inline] - fn from(_: ()) -> OneShotEvent { - OneShotEvent::Sent + fn from(_: ()) -> HandlerEvent { + HandlerEvent::Sent } } diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 7afded3ac7..136b9cc260 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -16,106 +16,109 @@ pub struct RPCProtocol; impl UpgradeInfo for RPCProtocol { type Info = &'static [u8]; - type InfoIter = iter::Once; + type InfoIter = iter::Iter; - #[inline] fn protocol_info(&self) -> Self::InfoIter { - iter::once(b"/eth/serenity/rpc/1.0.0") + vec![b"/eth/serenity/rpc/hello/1/ssz", + b"/eth/serenity/rpc/goodbye/1/ssz", + b"/eth/serenity/rpc/beacon_block_roots/1/ssz", + b"/eth/serenity/rpc/beacon_block_headers/1/ssz", + b"/eth/serenity/rpc/beacon_block_bodies/1/ssz", + b"/eth/serenity/rpc/beacon_chain_state/1/ssz"].into_iter() } } -impl Default for RPCProtocol { - fn default() -> Self { - RPCProtocol - } -} - -/// A monotonic counter for ordering `RPCRequest`s. -#[derive(Debug, Clone, Copy, Default)] -pub struct RequestId(u64); - -impl RequestId { - /// Increment the request id. - pub fn increment(&mut self) { - self.0 += 1 - } - - /// Return the previous id. - pub fn previous(self) -> Self { - Self(self.0 - 1) - } -} - -impl Eq for RequestId {} - -impl PartialEq for RequestId { - fn eq(&self, other: &RequestId) -> bool { - self.0 == other.0 - } -} - -impl Hash for RequestId { - fn hash(&self, state: &mut H) { - self.0.hash(state); - } -} - -impl From for RequestId { - fn from(x: u64) -> RequestId { - RequestId(x) - } -} - -impl Into for RequestId { - fn into(self) -> u64 { - self.0 - } -} - -impl_encode_via_from!(RequestId, u64); -impl_decode_via_from!(RequestId, u64); - -/// The RPC types which are sent/received in this protocol. +/// The outbound RPC type as well as the return type used in the behaviour. #[derive(Debug, Clone)] pub enum RPCEvent { - Request { - id: RequestId, - method_id: u16, - body: RPCRequest, - }, - Response { - id: RequestId, - method_id: u16, //TODO: Remove and process decoding upstream - result: RPCResponse, - }, + Request ( RPCRequest ), + Response ( RPCResponse ), } +// outbound protocol supports the same as the inbound. impl UpgradeInfo for RPCEvent { type Info = &'static [u8]; - type InfoIter = iter::Once; + type InfoIter = iter::Iter; - #[inline] fn protocol_info(&self) -> Self::InfoIter { - iter::once(b"/eth/serenity/rpc/1.0.0") + vec![b"/eth/serenity/rpc/hello/1/ssz", + b"/eth/serenity/rpc/goodbye/1/ssz", + b"/eth/serenity/rpc/beacon_block_roots/1/ssz", + b"/eth/serenity/rpc/beacon_block_headers/1/ssz", + b"/eth/serenity/rpc/beacon_block_bodies/1/ssz", + b"/eth/serenity/rpc/beacon_chain_state/1/ssz"].into_iter() } } -type FnDecodeRPCEvent = fn(Vec, ()) -> Result; +/* Inbound upgrade */ + +// The inbound protocol reads the request, decodes it and returns the stream to the protocol +// handler to respond to once ready. + +type FnDecodeRPCEvent = fn(upgrade::Negotiated, Vec, ()) -> Result<(upgrade::Negotiated,RPCEvent), DecodeError>; impl InboundUpgrade for RPCProtocol where TSocket: AsyncRead + AsyncWrite, { - type Output = RPCEvent; + type Socket = upgrade::Negotiated, + type Output = (Self::Socket, RPCEvent), type Error = DecodeError; - type Future = upgrade::ReadOneThen, (), FnDecodeRPCEvent>; + type Future = upgrade::ReadRespond; - fn upgrade_inbound(self, socket: upgrade::Negotiated, _: Self::Info) -> Self::Future { - upgrade::read_one_then(socket, MAX_READ_SIZE, (), |packet, ()| Ok(decode(packet)?)) + fn upgrade_inbound(self, socket: upgrade::Negotiated, protocol: Self::Info) -> Self::Future { + upgrade::read_respond(socket, MAX_READ_SIZE, (), |socket, packet, ()| Ok((socket, decode_request(packet, protocol)?))) } } -/// A helper structed used to obtain SSZ serialization for RPC messages. + +/* Outbound upgrade */ + +impl OutboundUpgrade for RPCEvent +where + TSocket: AsyncWrite, +{ + type Output = (); + type Error = io::Error; + type Future = upgrade::WriteOne>; + + #[inline] + fn upgrade_outbound(self, socket: upgrade::Negotiated, protocol: Self::Info) -> Self::Future { + let bytes = ssz_encode(&self); + upgrade::request_response(socket, + upgrade::write_one(socket, bytes) + } +} + +// This function can be extended to provide further logic for supporting various protocol versions/encoding +fn decode_request(packet: Vec, protocol: &'static[u8]) { + match protocol { + b"/eth/serenity/rpc/hello/1/ssz" => { } + b"/eth/serenity/rpc/goodbye/1/ssz", + b"/eth/serenity/rpc/beacon_block_roots/1/ssz", + b"/eth/serenity/rpc/beacon_block_headers/1/ssz", + b"/eth/serenity/rpc/beacon_block_bodies/1/ssz", + b"/eth/serenity/rpc/beacon_chain_state/1/ssz", + _=> { // Other protocols are not supported. + return Err(DecodeError::UnknownProtocol); + } + } +} + + + + + + + + + + + + + + +/// A helper struct used to obtain SSZ serialization for RPC messages. #[derive(Encode, Decode, Default)] struct SszContainer { /// Note: the `is_request` field is not included in the spec. @@ -186,20 +189,6 @@ fn decode(packet: Vec) -> Result { } } -impl OutboundUpgrade for RPCEvent -where - TSocket: AsyncWrite, -{ - type Output = (); - type Error = io::Error; - type Future = upgrade::WriteOne>; - - #[inline] - fn upgrade_outbound(self, socket: upgrade::Negotiated, _: Self::Info) -> Self::Future { - let bytes = ssz_encode(&self); - upgrade::write_one(socket, bytes) - } -} impl Encode for RPCEvent { fn is_ssz_fixed_len() -> bool {