From 15cdd2afb95f985ca73c40813d360e7474b0432f Mon Sep 17 00:00:00 2001 From: Age Manning Date: Sat, 13 Jul 2019 18:56:08 +1000 Subject: [PATCH] Initial codec module --- .../eth2-libp2p/src/rpc/codecs/base.rs | 501 ++++-------------- beacon_node/eth2-libp2p/src/rpc/codecs/mod.rs | 1 + beacon_node/eth2-libp2p/src/rpc/codecs/ssz.rs | 323 +++++++++++ beacon_node/eth2-libp2p/src/rpc/mod.rs | 1 + beacon_node/eth2-libp2p/src/rpc/protocol.rs | 1 + codec.rs | 461 ---------------- 6 files changed, 415 insertions(+), 873 deletions(-) create mode 100644 beacon_node/eth2-libp2p/src/rpc/codecs/mod.rs delete mode 100644 codec.rs diff --git a/beacon_node/eth2-libp2p/src/rpc/codecs/base.rs b/beacon_node/eth2-libp2p/src/rpc/codecs/base.rs index 5fe1aad8ad..6957ad180f 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codecs/base.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codecs/base.rs @@ -1,442 +1,119 @@ +///! This handles the various supported encoding mechanism for the Eth 2.0 RPC. -///! This handles the various supported encoding mechanism for the Eth 2.0 RPC. -///! -///! Currently supported encodings are: -///! - ssz - Varint length-prefixed SSZ-encoding. +pub trait InnerCodec: Encoder + Decoder { + type Error; + fn decode_error( + &mut self, + &mut BytesMut, + ) -> Result, ::Error>; +} -pub struct BaseCodec { +pub struct BaseInboundCodec { + /// Inner codec for handling various encodings + inner: TCodec, +} + +pub struct BaseOutboundCodec +where + TCodec: InnerCodec, + ::Item = RPCResponse, + ::ErrorItem = ErrorMessage, +{ /// Inner codec for handling various encodings inner: TCodec, /// Optimisation for decoding. True if the response code has been read and we are awaiting a /// response. - read_response_code: bool, + response_code: Option, } +impl Encoder for BaseInboundCodec +where + TCodec: Encoder, + ::Item = RPCResponse, +{ + type Item = RPCResponse; + type Error = ::Error; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + dst.clear(); + dst.reserve(1); + dst.put_u8(item.as_u8); + return self.inner.encode(); + } +} + +impl Decoder for BaseInboundCodec +where + TCodec: Decoder, + ::Item: RPCrequest, + ::Error: From, +{ + type Item = RPCRequest; + type Error = ::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + self.inner.decode(src) + } +} impl Encoder for BaseOutboundCodec -where - TCodec: Encoder +where + TCodec: Encoder, { type Item = RPCRequest; type Error = ::Error; - fn encode( - &mut self, - item: Self::Item, - dst: &mut BytesMut - ) -> Result<(), Self::Error> { + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { self.inner.encode(item, dst) } } - -impl Encoder for BaseInboundCodec -where - TCodec: Encoder +impl Decoder for BaseOutboundCodec +where + TCodec: InnerCodec, + ::Error: From, { - type Item = RPCResponse; - type Error = ::Error; - - fn encode( - &mut self, - item: Self::Item, - dst: &mut BytesMut - ) -> Result<(), Self::Error> { - - match item { - RPCResponse::Error(response) => { - match response = { - ErrorResponse::EncodingError => { - dst.clear(); - dst.reserve(1); - dst.put(response as u8); - return; - } - ErrorResponse:: - self.inner.encode(item, dst) - } -} - - -impl Decoder for BaseCodec -where - TCodec: Decoder, - ::Error: From, -{ - type Item = RPCResponse; type Error = ::Error; - fn decode( - &mut self, - src: &mut BytesMut - ) -> Result, Self::Error> { - - if !self.read_response_code { - if src.len() < 1 { - return Err(io::Error::new(io::ErrorKind::InvalidData, "no bytes received")); - } - - let resp_code_byte = [0; 1]; - // data must be only 1-byte - this cannot panic - resp_code_byte.copy_from_slice(&src); - let response_code = - ResponseCode::from(u8::from_be_bytes(resp_code_byte)); - match response_code { - ResponseCode::EncodingError => { - // invalid encoding - let response = RPCResponse::Error("Invalid Encoding".into()); - return Ok(Async::Ready(response)); - } - ResponseCode::Success - | ResponseCode::InvalidRequest - | ResponseCode::ServerError => { - // need to read another packet - self.inner = RPCRequestResponseInner::Read( - read_one(socket, max_size), - response_code, - ) - } - ResponseCode::Unknown => { - // unknown response code - let response = RPCResponse::Error(format!( - "Unknown response code: {}", - (response_code as u8) + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + let response_code = { + if let Some(resp_code) = self.response_code { + resp_code; + } else { + if src.is_empty() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "no bytes received", )); - return Ok(Async::Ready(response)); + } + let resp_byte = src.split_to(1); + let resp_code_byte = [0; 1]; + resp_code_byte.copy_from_slice(&resp_byte); + let resp_code = u8::from_be_bytes(resp_code_byte); - - - - } - -} - - - - - -/// SSZ Input stream -pub struct SSZInboundSink { - inner: - protocol: ProtocolId - -impl for SSZInputStream -where - TSocket: AsyncRead + AsyncWrite -{ - - /// Set up the initial input stream object. - pub fn new(incomming: TSocket, protocol: ProtocolId, max_size: usize) -> Self { - - // this type of stream should only apply to ssz protocols - debug_assert!(protocol.encoding.as_str() == "ssz"); - - let mut uvi_codec = UviBytes::default(); - uvi_codec.set_max_len(max_size); - - let inner = Framed::new(incomming, uvi_codec).from_err() - .with(|response| { - self.encode(response) - }) - .and_then(|bytes| { - self.decode(request) - }).into_future(); - - //TODO: add timeout - - SSZInputStream { - inner, - protocol - } - } - - /// Decodes an SSZ-encoded RPCRequest. - fn decode(&self, request: RPCRequest) { - - match self.protocol.message_name.as_str() { - "hello" => match protocol.version.as_str() { - "1.0.0" => Ok(RPCRequest::Hello(HelloMessage::from_ssz_bytes(&packet)?)), - _ => Err(RPCError::InvalidProtocol("Unknown HELLO version")), - }, - "goodbye" => match protocol.version.as_str() { - "1.0.0" => Ok(RPCRequest::Goodbye(Goodbye::from_ssz_bytes(&packet)?)), - _ => Err(RPCError::InvalidProtocol( - "Unknown GOODBYE version.as_str()", - )), - }, - "beacon_block_roots" => match protocol.version.as_str() { - "1.0.0" => Ok(RPCRequest::BeaconBlockRoots( - BeaconBlockRootsRequest::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_ROOTS version.", - )), - }, - "beacon_block_headers" => match protocol.version.as_str() { - "1.0.0" => Ok(RPCRequest::BeaconBlockHeaders( - BeaconBlockHeadersRequest::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_HEADERS version.", - )), - }, - "beacon_block_bodies" => match protocol.version.as_str() { - "1.0.0" => Ok(RPCRequest::BeaconBlockBodies( - BeaconBlockBodiesRequest::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_BODIES version.", - )), - }, - "beacon_chain_state" => match protocol.version.as_str() { - "1.0.0" => Ok(RPCRequest::BeaconChainState( - BeaconChainStateRequest::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_CHAIN_STATE version.", - )), - }, - } - } - - fn encode(&self, response: RPCResponse) { - - // TODO: Add error code - - match response { - RPCResponse::Hello(res) => res.as_ssz_bytes(), - RPCResponse::Goodbye => unreachable!(), - RPCResponse::BeaconBlockRoots(res) => res.as_ssz_bytes(), - RPCResponse::BeaconBlockHeaders(res) => res.headers, // already raw bytes - RPCResponse::BeaconBlockBodies(res) => res.block_bodies, // already raw bytes - RPCResponse::BeaconChainState(res) => res.as_ssz_bytes(), - } - } - -} - -type SSZInboundOutput = stream::AndThen>>, RPCError>, - RPCResponse, - fn(RPCResponse) -> Result, RPCError>, - Result, RPCError>, - >, - fn(BytesMut) -> Result, - Result - >; - -impl Sink for SSZInputStreamSink { - - type SinkItem = RPCResponse; - type SinkError = RPCError; - - fn start_send( - &mut self, - item: Self::SinkItem -) -> Result, Self::SinkError> { - self.inner.start_send(item) - } - - fn poll_complete(&mut self) -> Result, Self::SinkError> { - self.inner.poll_complete() - } -} - -/* Outbound specific stream */ - -// Implement our own decoder to handle the response byte - -struct SSZOutboundCodec - - - -pub struct SSZOutboundStreamSink { - inner: - protocol: ProtocolId - -impl for SSZOutboundStreamSink -where - TSocket: AsyncRead + AsyncWrite -{ - - /// Set up the initial outbound stream object. - pub fn new(socket: TSocket, protocol: ProtocolId, max_size: usize) -> Self { - - // this type of stream should only apply to ssz protocols - debug_assert!(protocol.encoding.as_str() == "ssz"); - - let mut uvi_codec = UviBytes::default(); - uvi_codec.set_max_len(max_size); - - let inner = Framed::new(socket, uvi_codec).from_err() - .with(|request| { - self.encode(request) - }) - .and_then(|bytes| { - self.decode(response) - }); - - SSZOutboundStream { - inner, - protocol - } - } - - - - - - /// Decodes a response that was received on the same stream as a request. The response type should - /// therefore match the request protocol type. - pub fn decode(&self, response: Vec, - protocol: ProtocolId, - response_code: ResponseCode, - ) -> Result { - match response_code { - ResponseCode::EncodingError => Ok(RPCResponse::Error("Encoding error".into())), - ResponseCode::InvalidRequest => { - let response = match protocol.encoding.as_str() { - "ssz" => ErrorResponse::from_ssz_bytes(&packet)?, - _ => return Err(RPCError::InvalidProtocol("Unknown Encoding")), - }; - Ok(RPCResponse::Error(format!( - "Invalid Request: {}", - response.error_message - ))) + if let Some(response) = RPCErrorResponse::internal_data(resp_code) { + self.response_code = None; + return response; + } + resp_code } - ResponseCode::ServerError => { - let response = match protocol.encoding.as_str() { - "ssz" => ErrorResponse::from_ssz_bytes(&packet)?, - _ => return Err(RPCError::InvalidProtocol("Unknown Encoding")), - }; - Ok(RPCResponse::Error(format!( - "Remote Server Error: {}", - response.error_message - ))) - } - ResponseCode::Success => match protocol.message_name.as_str() { - "hello" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCResponse::Hello(HelloMessage::from_ssz_bytes(&packet)?)), - _ => Err(RPCError::InvalidProtocol("Unknown HELLO encoding")), - }, - _ => Err(RPCError::InvalidProtocol("Unknown HELLO version.")), - }, - "goodbye" => Err(RPCError::Custom( - "GOODBYE should not have a response".into(), - )), - "beacon_block_roots" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCResponse::BeaconBlockRoots( - BeaconBlockRootsResponse::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_ROOTS encoding", - )), - }, - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_ROOTS version.", - )), - }, - "beacon_block_headers" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCResponse::BeaconBlockHeaders( - BeaconBlockHeadersResponse { headers: packet }, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_HEADERS encoding", - )), - }, - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_HEADERS version.", - )), - }, - "beacon_block_bodies" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse { - block_bodies: packet, - })), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_BODIES encoding", - )), - }, - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_BODIES version.", - )), - }, - "beacon_chain_state" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCResponse::BeaconChainState( - BeaconChainStateResponse::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_CHAIN_STATE encoding", - )), - }, - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_CHAIN_STATE version.", - )), - }, - }, + }; + + if RPCErrorResponse::is_response(response_code) { + // decode an actual response + return self + .inner + .decode(src) + .map(|r| r.map(|resp| RPCErrorResponse::Success(resp))); + } else { + // decode an error + return self + .inner + .decode_error(src) + .map(|r| r.map(|resp| RPCErrorResponse::from_error(response_code, resp))); } } - - fn encode(&self, response: RPCResponse) { - - match response { - RPCResponse::Hello(res) => res.as_ssz_bytes(), - RPCResponse::Goodbye => unreachable!(), - RPCResponse::BeaconBlockRoots(res) => res.as_ssz_bytes(), - RPCResponse::BeaconBlockHeaders(res) => res.headers, // already raw bytes - RPCResponse::BeaconBlockBodies(res) => res.block_bodies, // already raw bytes - RPCResponse::BeaconChainState(res) => res.as_ssz_bytes(), - } - } - } - -type SSZOutboundStream = stream::AndThen>>, RPCError>, - RPCResponse, - fn(RPCResponse) -> Result, RPCError>, - Result, RPCError>, - >, - fn(BytesMut) -> Result, - Result - >; - - -impl Stream for SSZInputStreamSink { - - type Item = SSZInboundOutput; - type Error = RPCError; - - fn poll(&mut self) -> Result>, Self::Error> { - self.inner.poll() - } -} - -impl Sink for SSZInputStreamSink { - - type SinkItem = RPCResponse; - type SinkError = RPCError; - - fn start_send( - &mut self, - item: Self::SinkItem -) -> Result, Self::SinkError> { - self.inner.start_send(item) - } - - fn poll_complete(&mut self) -> Result, Self::SinkError> { - self.inner.poll_complete() - } -} - - - - - - - diff --git a/beacon_node/eth2-libp2p/src/rpc/codecs/mod.rs b/beacon_node/eth2-libp2p/src/rpc/codecs/mod.rs new file mode 100644 index 0000000000..77ed8456da --- /dev/null +++ b/beacon_node/eth2-libp2p/src/rpc/codecs/mod.rs @@ -0,0 +1 @@ +mod base; diff --git a/beacon_node/eth2-libp2p/src/rpc/codecs/ssz.rs b/beacon_node/eth2-libp2p/src/rpc/codecs/ssz.rs index e69de29bb2..fd20ae57ef 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codecs/ssz.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codecs/ssz.rs @@ -0,0 +1,323 @@ + +/// SSZ Input stream +pub struct SSZInboundSink { + inner: + protocol: ProtocolId + +impl for SSZInputStream +where + TSocket: AsyncRead + AsyncWrite +{ + + /// Set up the initial input stream object. + pub fn new(incomming: TSocket, protocol: ProtocolId, max_size: usize) -> Self { + + // this type of stream should only apply to ssz protocols + debug_assert!(protocol.encoding.as_str() == "ssz"); + + let mut uvi_codec = UviBytes::default(); + uvi_codec.set_max_len(max_size); + + let inner = Framed::new(incomming, uvi_codec).from_err() + .with(|response| { + self.encode(response) + }) + .and_then(|bytes| { + self.decode(request) + }).into_future(); + + //TODO: add timeout + + SSZInputStream { + inner, + protocol + } + } + + /// Decodes an SSZ-encoded RPCRequest. + fn decode(&self, request: RPCRequest) { + + match self.protocol.message_name.as_str() { + "hello" => match protocol.version.as_str() { + "1.0.0" => Ok(RPCRequest::Hello(HelloMessage::from_ssz_bytes(&packet)?)), + _ => Err(RPCError::InvalidProtocol("Unknown HELLO version")), + }, + "goodbye" => match protocol.version.as_str() { + "1.0.0" => Ok(RPCRequest::Goodbye(Goodbye::from_ssz_bytes(&packet)?)), + _ => Err(RPCError::InvalidProtocol( + "Unknown GOODBYE version.as_str()", + )), + }, + "beacon_block_roots" => match protocol.version.as_str() { + "1.0.0" => Ok(RPCRequest::BeaconBlockRoots( + BeaconBlockRootsRequest::from_ssz_bytes(&packet)?, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_ROOTS version.", + )), + }, + "beacon_block_headers" => match protocol.version.as_str() { + "1.0.0" => Ok(RPCRequest::BeaconBlockHeaders( + BeaconBlockHeadersRequest::from_ssz_bytes(&packet)?, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_HEADERS version.", + )), + }, + "beacon_block_bodies" => match protocol.version.as_str() { + "1.0.0" => Ok(RPCRequest::BeaconBlockBodies( + BeaconBlockBodiesRequest::from_ssz_bytes(&packet)?, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_BODIES version.", + )), + }, + "beacon_chain_state" => match protocol.version.as_str() { + "1.0.0" => Ok(RPCRequest::BeaconChainState( + BeaconChainStateRequest::from_ssz_bytes(&packet)?, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_CHAIN_STATE version.", + )), + }, + } + } + + fn encode(&self, response: RPCResponse) { + + // TODO: Add error code + + match response { + RPCResponse::Hello(res) => res.as_ssz_bytes(), + RPCResponse::Goodbye => unreachable!(), + RPCResponse::BeaconBlockRoots(res) => res.as_ssz_bytes(), + RPCResponse::BeaconBlockHeaders(res) => res.headers, // already raw bytes + RPCResponse::BeaconBlockBodies(res) => res.block_bodies, // already raw bytes + RPCResponse::BeaconChainState(res) => res.as_ssz_bytes(), + } + } + +} + +type SSZInboundOutput = stream::AndThen>>, RPCError>, + RPCResponse, + fn(RPCResponse) -> Result, RPCError>, + Result, RPCError>, + >, + fn(BytesMut) -> Result, + Result + >; + +impl Sink for SSZInputStreamSink { + + type SinkItem = RPCResponse; + type SinkError = RPCError; + + fn start_send( + &mut self, + item: Self::SinkItem +) -> Result, Self::SinkError> { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Result, Self::SinkError> { + self.inner.poll_complete() + } +} + +/* Outbound specific stream */ + +// Implement our own decoder to handle the response byte + +struct SSZOutboundCodec + + + +pub struct SSZOutboundStreamSink { + inner: + protocol: ProtocolId + +impl for SSZOutboundStreamSink +where + TSocket: AsyncRead + AsyncWrite +{ + + /// Set up the initial outbound stream object. + pub fn new(socket: TSocket, protocol: ProtocolId, max_size: usize) -> Self { + + // this type of stream should only apply to ssz protocols + debug_assert!(protocol.encoding.as_str() == "ssz"); + + let mut uvi_codec = UviBytes::default(); + uvi_codec.set_max_len(max_size); + + let inner = Framed::new(socket, uvi_codec).from_err() + .with(|request| { + self.encode(request) + }) + .and_then(|bytes| { + self.decode(response) + }); + + SSZOutboundStream { + inner, + protocol + } + } + + + + + + /// Decodes a response that was received on the same stream as a request. The response type should + /// therefore match the request protocol type. + pub fn decode(&self, response: Vec, + protocol: ProtocolId, + response_code: ResponseCode, + ) -> Result { + match response_code { + ResponseCode::EncodingError => Ok(RPCResponse::Error("Encoding error".into())), + ResponseCode::InvalidRequest => { + let response = match protocol.encoding.as_str() { + "ssz" => ErrorResponse::from_ssz_bytes(&packet)?, + _ => return Err(RPCError::InvalidProtocol("Unknown Encoding")), + }; + Ok(RPCResponse::Error(format!( + "Invalid Request: {}", + response.error_message + ))) + } + ResponseCode::ServerError => { + let response = match protocol.encoding.as_str() { + "ssz" => ErrorResponse::from_ssz_bytes(&packet)?, + _ => return Err(RPCError::InvalidProtocol("Unknown Encoding")), + }; + Ok(RPCResponse::Error(format!( + "Remote Server Error: {}", + response.error_message + ))) + } + ResponseCode::Success => match protocol.message_name.as_str() { + "hello" => match protocol.version.as_str() { + "1.0.0" => match protocol.encoding.as_str() { + "ssz" => Ok(RPCResponse::Hello(HelloMessage::from_ssz_bytes(&packet)?)), + _ => Err(RPCError::InvalidProtocol("Unknown HELLO encoding")), + }, + _ => Err(RPCError::InvalidProtocol("Unknown HELLO version.")), + }, + "goodbye" => Err(RPCError::Custom( + "GOODBYE should not have a response".into(), + )), + "beacon_block_roots" => match protocol.version.as_str() { + "1.0.0" => match protocol.encoding.as_str() { + "ssz" => Ok(RPCResponse::BeaconBlockRoots( + BeaconBlockRootsResponse::from_ssz_bytes(&packet)?, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_ROOTS encoding", + )), + }, + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_ROOTS version.", + )), + }, + "beacon_block_headers" => match protocol.version.as_str() { + "1.0.0" => match protocol.encoding.as_str() { + "ssz" => Ok(RPCResponse::BeaconBlockHeaders( + BeaconBlockHeadersResponse { headers: packet }, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_HEADERS encoding", + )), + }, + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_HEADERS version.", + )), + }, + "beacon_block_bodies" => match protocol.version.as_str() { + "1.0.0" => match protocol.encoding.as_str() { + "ssz" => Ok(RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse { + block_bodies: packet, + })), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_BODIES encoding", + )), + }, + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_BODIES version.", + )), + }, + "beacon_chain_state" => match protocol.version.as_str() { + "1.0.0" => match protocol.encoding.as_str() { + "ssz" => Ok(RPCResponse::BeaconChainState( + BeaconChainStateResponse::from_ssz_bytes(&packet)?, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_CHAIN_STATE encoding", + )), + }, + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_CHAIN_STATE version.", + )), + }, + }, + } + } + + fn encode(&self, response: RPCResponse) { + + match response { + RPCResponse::Hello(res) => res.as_ssz_bytes(), + RPCResponse::Goodbye => unreachable!(), + RPCResponse::BeaconBlockRoots(res) => res.as_ssz_bytes(), + RPCResponse::BeaconBlockHeaders(res) => res.headers, // already raw bytes + RPCResponse::BeaconBlockBodies(res) => res.block_bodies, // already raw bytes + RPCResponse::BeaconChainState(res) => res.as_ssz_bytes(), + } + } + +} + +type SSZOutboundStream = stream::AndThen>>, RPCError>, + RPCResponse, + fn(RPCResponse) -> Result, RPCError>, + Result, RPCError>, + >, + fn(BytesMut) -> Result, + Result + >; + + +impl Stream for SSZInputStreamSink { + + type Item = SSZInboundOutput; + type Error = RPCError; + + fn poll(&mut self) -> Result>, Self::Error> { + self.inner.poll() + } +} + +impl Sink for SSZInputStreamSink { + + type SinkItem = RPCResponse; + type SinkError = RPCError; + + fn start_send( + &mut self, + item: Self::SinkItem +) -> Result, Self::SinkError> { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Result, Self::SinkError> { + self.inner.poll_complete() + } +} + + + + + + + diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index e6ac74d9ae..fc7560effe 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -16,6 +16,7 @@ use slog::o; use std::marker::PhantomData; use tokio::io::{AsyncRead, AsyncWrite}; +mod codecs; mod handler; pub mod methods; mod protocol; diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index ce58c56076..bb593f3c1c 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -251,6 +251,7 @@ impl RPCErrorResponse { } } + /// Used to encode the response. pub fn as_u8(&self) -> u8 { match self { RPCErrorResponse::Success(_) => 0, diff --git a/codec.rs b/codec.rs deleted file mode 100644 index a4e81b5f0d..0000000000 --- a/codec.rs +++ /dev/null @@ -1,461 +0,0 @@ - -///! This handles the various supported encoding mechanism for the Eth 2.0 RPC. -///! -///! Currently supported encodings are: -///! - ssz - Varint length-prefixed SSZ-encoding. - - -pub trait InnerCodec: Encoder + Decoder { - type Error; - - pub fn decode_error(&mut self, &mut BytesMut) -> Result, ::Error>; -} - -pub struct BaseInboundCodec { - /// Inner codec for handling various encodings - inner: TCodec, -} - -impl Encoder for BaseInboundCodec -where - TCodec: Encoder, - ::Item = RPCResponse, -{ - type Item = RPCResponse; - type Error = ::Error; - - fn encode( - &mut self, - item: Self::Item, - dst: &mut BytesMut - ) -> Result<(), Self::Error> { - dst.clear(); - dst.reserve(1); - dst.put_u8(item.as_u8); - return self.inner.encode(); - } -} - - -impl Decoder for BaseInboundCodec -where - TCodec: Decoder, - ::Item: RPCrequest, - ::Error: From, -{ - - type Item = RPCRequest; - type Error = ::Error; - - fn decode( - &mut self, - src: &mut BytesMut - ) -> Result, Self::Error> { - self.inner.decode(src) - } -} - -pub struct BaseOutboundCodec -where - TCodec: InnerCodec, - ::Item = RPCResponse, - ::ErrorItem = ErrorMessage, - { - /// Inner codec for handling various encodings - inner: TCodec, - /// Optimisation for decoding. True if the response code has been read and we are awaiting a - /// response. - response_code: Option, -} - -impl Encoder for BaseOutboundCodec -where - TCodec: Encoder -{ - type Item = RPCRequest; - type Error = ::Error; - - fn encode( - &mut self, - item: Self::Item, - dst: &mut BytesMut - ) -> Result<(), Self::Error> { - self.inner.encode(item, dst) - } -} - - -impl Decoder for BaseOutboundCodec -where - TCodec: InnerCodec, - ::Error: From, -{ - - type Item = RPCResponse; - type Error = ::Error; - - fn decode( - &mut self, - src: &mut BytesMut - ) -> Result, Self::Error> { - - - let response_code = { - if let Some(resp_code) = self.response_code { - resp_code; - } - else { - if src.is_empty() { - return Err(io::Error::new(io::ErrorKind::InvalidData, "no bytes received")); - } - let resp_byte = src.split_to(1); - let resp_code_byte = [0; 1]; - resp_code_byte.copy_from_slice(&resp_byte); - - let resp_code = u8::from_be_bytes(resp_code_byte); - - if let Some(response) = RPCErrorResponse::internal_data(resp_code) { - self.response_code = None; - return response; - } - resp_code - } - }; - - if RPCErrorResponse::is_response(response_code) { - // decode an actual response - return self.inner.decode(src).map(|r| r.map(|resp| RPCErrorResponse::Success(resp))); - } - else { - // decode an error - return self.inner.decode_error(src).map(|r| r.map(|resp| RPCErrorResponse::from_error(response_code, resp))); - } - - } - -} - - - -/// SSZ Input stream -pub struct SSZInboundSink { - inner: - protocol: ProtocolId - -impl for SSZInputStream -where - TSocket: AsyncRead + AsyncWrite -{ - - /// Set up the initial input stream object. - pub fn new(incomming: TSocket, protocol: ProtocolId, max_size: usize) -> Self { - - // this type of stream should only apply to ssz protocols - debug_assert!(protocol.encoding.as_str() == "ssz"); - - let mut uvi_codec = UviBytes::default(); - uvi_codec.set_max_len(max_size); - - let inner = Framed::new(incomming, uvi_codec).from_err() - .with(|response| { - self.encode(response) - }) - .and_then(|bytes| { - self.decode(request) - }).into_future(); - - //TODO: add timeout - - SSZInputStream { - inner, - protocol - } - } - - /// Decodes an SSZ-encoded RPCRequest. - fn decode(&self, request: RPCRequest) { - - match self.protocol.message_name.as_str() { - "hello" => match protocol.version.as_str() { - "1.0.0" => Ok(RPCRequest::Hello(HelloMessage::from_ssz_bytes(&packet)?)), - _ => Err(RPCError::InvalidProtocol("Unknown HELLO version")), - }, - "goodbye" => match protocol.version.as_str() { - "1.0.0" => Ok(RPCRequest::Goodbye(Goodbye::from_ssz_bytes(&packet)?)), - _ => Err(RPCError::InvalidProtocol( - "Unknown GOODBYE version.as_str()", - )), - }, - "beacon_block_roots" => match protocol.version.as_str() { - "1.0.0" => Ok(RPCRequest::BeaconBlockRoots( - BeaconBlockRootsRequest::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_ROOTS version.", - )), - }, - "beacon_block_headers" => match protocol.version.as_str() { - "1.0.0" => Ok(RPCRequest::BeaconBlockHeaders( - BeaconBlockHeadersRequest::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_HEADERS version.", - )), - }, - "beacon_block_bodies" => match protocol.version.as_str() { - "1.0.0" => Ok(RPCRequest::BeaconBlockBodies( - BeaconBlockBodiesRequest::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_BODIES version.", - )), - }, - "beacon_chain_state" => match protocol.version.as_str() { - "1.0.0" => Ok(RPCRequest::BeaconChainState( - BeaconChainStateRequest::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_CHAIN_STATE version.", - )), - }, - } - } - - fn encode(&self, response: RPCResponse) { - - // TODO: Add error code - - match response { - RPCResponse::Hello(res) => res.as_ssz_bytes(), - RPCResponse::Goodbye => unreachable!(), - RPCResponse::BeaconBlockRoots(res) => res.as_ssz_bytes(), - RPCResponse::BeaconBlockHeaders(res) => res.headers, // already raw bytes - RPCResponse::BeaconBlockBodies(res) => res.block_bodies, // already raw bytes - RPCResponse::BeaconChainState(res) => res.as_ssz_bytes(), - } - } - -} - -type SSZInboundOutput = stream::AndThen>>, RPCError>, - RPCResponse, - fn(RPCResponse) -> Result, RPCError>, - Result, RPCError>, - >, - fn(BytesMut) -> Result, - Result - >; - -impl Sink for SSZInputStreamSink { - - type SinkItem = RPCResponse; - type SinkError = RPCError; - - fn start_send( - &mut self, - item: Self::SinkItem -) -> Result, Self::SinkError> { - self.inner.start_send(item) - } - - fn poll_complete(&mut self) -> Result, Self::SinkError> { - self.inner.poll_complete() - } -} - -/* Outbound specific stream */ - -// Implement our own decoder to handle the response byte - -struct SSZOutboundCodec - - - -pub struct SSZOutboundStreamSink { - inner: - protocol: ProtocolId - -impl for SSZOutboundStreamSink -where - TSocket: AsyncRead + AsyncWrite -{ - - /// Set up the initial outbound stream object. - pub fn new(socket: TSocket, protocol: ProtocolId, max_size: usize) -> Self { - - // this type of stream should only apply to ssz protocols - debug_assert!(protocol.encoding.as_str() == "ssz"); - - let mut uvi_codec = UviBytes::default(); - uvi_codec.set_max_len(max_size); - - let inner = Framed::new(socket, uvi_codec).from_err() - .with(|request| { - self.encode(request) - }) - .and_then(|bytes| { - self.decode(response) - }); - - SSZOutboundStream { - inner, - protocol - } - } - - - - - - /// Decodes a response that was received on the same stream as a request. The response type should - /// therefore match the request protocol type. - pub fn decode(&self, response: Vec, - protocol: ProtocolId, - response_code: ResponseCode, - ) -> Result { - match response_code { - ResponseCode::EncodingError => Ok(RPCResponse::Error("Encoding error".into())), - ResponseCode::InvalidRequest => { - let response = match protocol.encoding.as_str() { - "ssz" => ErrorResponse::from_ssz_bytes(&packet)?, - _ => return Err(RPCError::InvalidProtocol("Unknown Encoding")), - }; - Ok(RPCResponse::Error(format!( - "Invalid Request: {}", - response.error_message - ))) - } - ResponseCode::ServerError => { - let response = match protocol.encoding.as_str() { - "ssz" => ErrorResponse::from_ssz_bytes(&packet)?, - _ => return Err(RPCError::InvalidProtocol("Unknown Encoding")), - }; - Ok(RPCResponse::Error(format!( - "Remote Server Error: {}", - response.error_message - ))) - } - ResponseCode::Success => match protocol.message_name.as_str() { - "hello" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCResponse::Hello(HelloMessage::from_ssz_bytes(&packet)?)), - _ => Err(RPCError::InvalidProtocol("Unknown HELLO encoding")), - }, - _ => Err(RPCError::InvalidProtocol("Unknown HELLO version.")), - }, - "goodbye" => Err(RPCError::Custom( - "GOODBYE should not have a response".into(), - )), - "beacon_block_roots" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCResponse::BeaconBlockRoots( - BeaconBlockRootsResponse::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_ROOTS encoding", - )), - }, - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_ROOTS version.", - )), - }, - "beacon_block_headers" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCResponse::BeaconBlockHeaders( - BeaconBlockHeadersResponse { headers: packet }, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_HEADERS encoding", - )), - }, - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_HEADERS version.", - )), - }, - "beacon_block_bodies" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse { - block_bodies: packet, - })), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_BODIES encoding", - )), - }, - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_BODIES version.", - )), - }, - "beacon_chain_state" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCResponse::BeaconChainState( - BeaconChainStateResponse::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_CHAIN_STATE encoding", - )), - }, - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_CHAIN_STATE version.", - )), - }, - }, - } - } - - fn encode(&self, response: RPCResponse) { - - match response { - RPCResponse::Hello(res) => res.as_ssz_bytes(), - RPCResponse::Goodbye => unreachable!(), - RPCResponse::BeaconBlockRoots(res) => res.as_ssz_bytes(), - RPCResponse::BeaconBlockHeaders(res) => res.headers, // already raw bytes - RPCResponse::BeaconBlockBodies(res) => res.block_bodies, // already raw bytes - RPCResponse::BeaconChainState(res) => res.as_ssz_bytes(), - } - } - -} - -type SSZOutboundStream = stream::AndThen>>, RPCError>, - RPCResponse, - fn(RPCResponse) -> Result, RPCError>, - Result, RPCError>, - >, - fn(BytesMut) -> Result, - Result - >; - - -impl Stream for SSZInputStreamSink { - - type Item = SSZInboundOutput; - type Error = RPCError; - - fn poll(&mut self) -> Result>, Self::Error> { - self.inner.poll() - } -} - -impl Sink for SSZInputStreamSink { - - type SinkItem = RPCResponse; - type SinkError = RPCError; - - fn start_send( - &mut self, - item: Self::SinkItem -) -> Result, Self::SinkError> { - self.inner.start_send(item) - } - - fn poll_complete(&mut self) -> Result, Self::SinkError> { - self.inner.poll_complete() - } -} - - - - - - -