diff --git a/beacon_node/eth2_libp2p/src/rpc/codec/base.rs b/beacon_node/eth2_libp2p/src/rpc/codec/base.rs index 299c76a6b3..97e80ce8b7 100644 --- a/beacon_node/eth2_libp2p/src/rpc/codec/base.rs +++ b/beacon_node/eth2_libp2p/src/rpc/codec/base.rs @@ -174,7 +174,6 @@ where #[cfg(test)] mod tests { - use super::super::ssz::*; use super::super::ssz_snappy::*; use super::*; use crate::rpc::protocol::*; @@ -189,29 +188,22 @@ mod tests { let snappy_protocol_id = ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy); - let ssz_protocol_id = ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZ); let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new(snappy_protocol_id, 1_048_576); - let mut ssz_outbound_codec = SSZOutboundCodec::::new(ssz_protocol_id, 1_048_576); // decode message just as snappy message let snappy_decoded_message = snappy_outbound_codec.decode(&mut buf.clone()); // decode message just a ssz message - let ssz_decoded_message = ssz_outbound_codec.decode(&mut buf.clone()); // build codecs for entire chunk let mut snappy_base_outbound_codec = BaseOutboundCodec::new(snappy_outbound_codec); - let mut ssz_base_outbound_codec = BaseOutboundCodec::new(ssz_outbound_codec); // decode message as ssz snappy chunk let snappy_decoded_chunk = snappy_base_outbound_codec.decode(&mut buf.clone()); // decode message just a ssz chunk - let ssz_decoded_chunk = ssz_base_outbound_codec.decode(&mut buf.clone()); let _ = dbg!(snappy_decoded_message); - let _ = dbg!(ssz_decoded_message); let _ = dbg!(snappy_decoded_chunk); - let _ = dbg!(ssz_decoded_chunk); } } diff --git a/beacon_node/eth2_libp2p/src/rpc/codec/mod.rs b/beacon_node/eth2_libp2p/src/rpc/codec/mod.rs index c117f52feb..ae8e2abd15 100644 --- a/beacon_node/eth2_libp2p/src/rpc/codec/mod.rs +++ b/beacon_node/eth2_libp2p/src/rpc/codec/mod.rs @@ -1,9 +1,7 @@ pub(crate) mod base; -pub(crate) mod ssz; pub(crate) mod ssz_snappy; use self::base::{BaseInboundCodec, BaseOutboundCodec}; -use self::ssz::{SSZInboundCodec, SSZOutboundCodec}; use self::ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec}; use crate::rpc::protocol::RPCError; use crate::rpc::{RPCCodedResponse, RPCRequest}; @@ -14,12 +12,10 @@ use types::EthSpec; // Known types of codecs pub enum InboundCodec { SSZSnappy(BaseInboundCodec, TSpec>), - SSZ(BaseInboundCodec, TSpec>), } pub enum OutboundCodec { SSZSnappy(BaseOutboundCodec, TSpec>), - SSZ(BaseOutboundCodec, TSpec>), } impl Encoder> for InboundCodec { @@ -27,7 +23,6 @@ impl Encoder> for InboundCodec { fn encode(&mut self, item: RPCCodedResponse, dst: &mut BytesMut) -> Result<(), Self::Error> { match self { - InboundCodec::SSZ(codec) => codec.encode(item, dst), InboundCodec::SSZSnappy(codec) => codec.encode(item, dst), } } @@ -39,7 +34,6 @@ impl Decoder for InboundCodec { fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { match self { - InboundCodec::SSZ(codec) => codec.decode(src), InboundCodec::SSZSnappy(codec) => codec.decode(src), } } @@ -50,7 +44,6 @@ impl Encoder> for OutboundCodec { fn encode(&mut self, item: RPCRequest, dst: &mut BytesMut) -> Result<(), Self::Error> { match self { - OutboundCodec::SSZ(codec) => codec.encode(item, dst), OutboundCodec::SSZSnappy(codec) => codec.encode(item, dst), } } @@ -62,7 +55,6 @@ impl Decoder for OutboundCodec { fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { match self { - OutboundCodec::SSZ(codec) => codec.decode(src), OutboundCodec::SSZSnappy(codec) => codec.decode(src), } } diff --git a/beacon_node/eth2_libp2p/src/rpc/codec/ssz.rs b/beacon_node/eth2_libp2p/src/rpc/codec/ssz.rs deleted file mode 100644 index 2197048f1d..0000000000 --- a/beacon_node/eth2_libp2p/src/rpc/codec/ssz.rs +++ /dev/null @@ -1,324 +0,0 @@ -use crate::rpc::methods::*; -use crate::rpc::{ - codec::base::OutboundCodec, - protocol::{ - Encoding, Protocol, ProtocolId, RPCError, Version, BLOCKS_BY_ROOT_REQUEST_MAX, - BLOCKS_BY_ROOT_REQUEST_MIN, SIGNED_BEACON_BLOCK_MAX, SIGNED_BEACON_BLOCK_MIN, - }, -}; -use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse}; -use libp2p::bytes::{BufMut, Bytes, BytesMut}; -use ssz::{Decode, Encode}; -use ssz_types::VariableList; -use std::marker::PhantomData; -use tokio_util::codec::{Decoder, Encoder}; -use types::{EthSpec, SignedBeaconBlock}; -use unsigned_varint::codec::UviBytes; - -/* Inbound Codec */ - -pub struct SSZInboundCodec { - inner: UviBytes, - protocol: ProtocolId, - phantom: PhantomData, -} - -impl SSZInboundCodec { - pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self { - let mut uvi_codec = UviBytes::default(); - uvi_codec.set_max_len(max_packet_size); - - // this encoding only applies to ssz. - debug_assert_eq!(protocol.encoding, Encoding::SSZ); - - SSZInboundCodec { - inner: uvi_codec, - protocol, - phantom: PhantomData, - } - } -} - -// Encoder for inbound streams: Encodes RPC Responses sent to peers. -impl Encoder> for SSZInboundCodec { - type Error = RPCError; - - fn encode( - &mut self, - item: RPCCodedResponse, - dst: &mut BytesMut, - ) -> Result<(), Self::Error> { - let bytes = match item { - RPCCodedResponse::Success(resp) => match resp { - RPCResponse::Status(res) => res.as_ssz_bytes(), - RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(), - RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(), - RPCResponse::Pong(res) => res.data.as_ssz_bytes(), - RPCResponse::MetaData(res) => res.as_ssz_bytes(), - }, - RPCCodedResponse::Error(_, err) => err.as_ssz_bytes(), - RPCCodedResponse::StreamTermination(_) => { - unreachable!("Code error - attempting to encode a stream termination") - } - }; - if !bytes.is_empty() { - // length-prefix and return - return self - .inner - .encode(Bytes::from(bytes), dst) - .map_err(RPCError::from); - } else { - // payload is empty, add a 0-byte length prefix - dst.reserve(1); - dst.put_u8(0); - } - Ok(()) - } -} - -// Decoder for inbound streams: Decodes RPC requests from peers -impl Decoder for SSZInboundCodec { - type Item = RPCRequest; - type Error = RPCError; - - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - match self.inner.decode(src).map_err(RPCError::from) { - Ok(Some(packet)) => match self.protocol.message_name { - Protocol::Status => match self.protocol.version { - Version::V1 => { - if packet.len() == ::ssz_fixed_len() { - Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes( - &packet, - )?))) - } else { - Err(RPCError::InvalidData) - } - } - }, - Protocol::Goodbye => match self.protocol.version { - Version::V1 => { - if packet.len() == ::ssz_fixed_len() { - Ok(Some(RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes( - &packet, - )?))) - } else { - Err(RPCError::InvalidData) - } - } - }, - Protocol::BlocksByRange => match self.protocol.version { - Version::V1 => { - if packet.len() == ::ssz_fixed_len() { - Ok(Some(RPCRequest::BlocksByRange( - BlocksByRangeRequest::from_ssz_bytes(&packet)?, - ))) - } else { - Err(RPCError::InvalidData) - } - } - }, - Protocol::BlocksByRoot => match self.protocol.version { - Version::V1 => { - if packet.len() >= *BLOCKS_BY_ROOT_REQUEST_MIN - && packet.len() <= *BLOCKS_BY_ROOT_REQUEST_MAX - { - Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest { - block_roots: VariableList::from_ssz_bytes(&packet)?, - }))) - } else { - Err(RPCError::InvalidData) - } - } - }, - Protocol::Ping => match self.protocol.version { - Version::V1 => { - if packet.len() == ::ssz_fixed_len() { - Ok(Some(RPCRequest::Ping(Ping { - data: u64::from_ssz_bytes(&packet)?, - }))) - } else { - Err(RPCError::InvalidData) - } - } - }, - Protocol::MetaData => match self.protocol.version { - Version::V1 => { - if !packet.is_empty() { - Err(RPCError::InvalidData) - } else { - Ok(Some(RPCRequest::MetaData(PhantomData))) - } - } - }, - }, - Ok(None) => Ok(None), - Err(e) => Err(e), - } - } -} - -/* Outbound Codec: Codec for initiating RPC requests */ - -pub struct SSZOutboundCodec { - inner: UviBytes, - protocol: ProtocolId, - phantom: PhantomData, -} - -impl SSZOutboundCodec { - pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self { - let mut uvi_codec = UviBytes::default(); - uvi_codec.set_max_len(max_packet_size); - - // this encoding only applies to ssz. - debug_assert_eq!(protocol.encoding, Encoding::SSZ); - - SSZOutboundCodec { - inner: uvi_codec, - protocol, - phantom: PhantomData, - } - } -} - -// Encoder for outbound streams: Encodes RPC Requests to peers -impl Encoder> for SSZOutboundCodec { - type Error = RPCError; - - fn encode(&mut self, item: RPCRequest, dst: &mut BytesMut) -> Result<(), Self::Error> { - let bytes = match item { - RPCRequest::Status(req) => req.as_ssz_bytes(), - RPCRequest::Goodbye(req) => req.as_ssz_bytes(), - RPCRequest::BlocksByRange(req) => req.as_ssz_bytes(), - RPCRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(), - RPCRequest::Ping(req) => req.as_ssz_bytes(), - RPCRequest::MetaData(_) => return Ok(()), // no metadata to encode - }; - // length-prefix - self.inner - .encode(libp2p::bytes::Bytes::from(bytes), dst) - .map_err(RPCError::from) - } -} - -// Decoder for outbound streams: Decodes RPC responses from peers. -// -// The majority of the decoding has now been pushed upstream due to the changing specification. -// We prefer to decode blocks and attestations with extra knowledge about the chain to perform -// faster verification checks before decoding entire blocks/attestations. -impl Decoder for SSZOutboundCodec { - type Item = RPCResponse; - type Error = RPCError; - - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - if src.len() == 1 && src[0] == 0_u8 { - // the object is empty. We return the empty object if this is the case - // clear the buffer and return an empty object - src.clear(); - match self.protocol.message_name { - Protocol::Status => match self.protocol.version { - Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty HELLO message. The stream has terminated unexpectedly - }, - Protocol::Goodbye => Err(RPCError::InvalidData), - Protocol::BlocksByRange => match self.protocol.version { - Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty block message. - }, - Protocol::BlocksByRoot => match self.protocol.version { - Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty block message. - }, - Protocol::Ping => match self.protocol.version { - Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty block message. - }, - Protocol::MetaData => match self.protocol.version { - Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty block message. - }, - } - } else { - match self.inner.decode(src).map_err(RPCError::from) { - Ok(Some(mut packet)) => { - // take the bytes from the buffer - let raw_bytes = packet.split(); - - match self.protocol.message_name { - Protocol::Status => match self.protocol.version { - Version::V1 => { - if raw_bytes.len() == ::ssz_fixed_len() { - Ok(Some(RPCResponse::Status(StatusMessage::from_ssz_bytes( - &raw_bytes, - )?))) - } else { - Err(RPCError::InvalidData) - } - } - }, - Protocol::Goodbye => Err(RPCError::InvalidData), - Protocol::BlocksByRange => match self.protocol.version { - Version::V1 => { - if raw_bytes.len() >= *SIGNED_BEACON_BLOCK_MIN - && raw_bytes.len() <= *SIGNED_BEACON_BLOCK_MAX - { - Ok(Some(RPCResponse::BlocksByRange(Box::new( - SignedBeaconBlock::from_ssz_bytes(&raw_bytes)?, - )))) - } else { - Err(RPCError::InvalidData) - } - } - }, - Protocol::BlocksByRoot => match self.protocol.version { - Version::V1 => { - if raw_bytes.len() >= *SIGNED_BEACON_BLOCK_MIN - && raw_bytes.len() <= *SIGNED_BEACON_BLOCK_MAX - { - Ok(Some(RPCResponse::BlocksByRoot(Box::new( - SignedBeaconBlock::from_ssz_bytes(&raw_bytes)?, - )))) - } else { - Err(RPCError::InvalidData) - } - } - }, - Protocol::Ping => match self.protocol.version { - Version::V1 => { - if raw_bytes.len() == ::ssz_fixed_len() { - Ok(Some(RPCResponse::Pong(Ping { - data: u64::from_ssz_bytes(&raw_bytes)?, - }))) - } else { - Err(RPCError::InvalidData) - } - } - }, - Protocol::MetaData => match self.protocol.version { - Version::V1 => { - if raw_bytes.len() == as Encode>::ssz_fixed_len() { - Ok(Some(RPCResponse::MetaData(MetaData::from_ssz_bytes( - &raw_bytes, - )?))) - } else { - Err(RPCError::InvalidData) - } - } - }, - } - } - Ok(None) => Ok(None), // waiting for more bytes - Err(e) => Err(e), - } - } - } -} - -impl OutboundCodec> for SSZOutboundCodec { - type ErrorType = String; - - fn decode_error(&mut self, src: &mut BytesMut) -> Result, RPCError> { - match self.inner.decode(src).map_err(RPCError::from) { - Ok(Some(packet)) => Ok(Some( - String::from_utf8_lossy(&>::from_ssz_bytes(&packet)?).into(), - )), - Ok(None) => Ok(None), - Err(e) => Err(e), - } - } -} diff --git a/beacon_node/eth2_libp2p/src/rpc/protocol.rs b/beacon_node/eth2_libp2p/src/rpc/protocol.rs index 91bbdbd146..fbd5326a92 100644 --- a/beacon_node/eth2_libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2_libp2p/src/rpc/protocol.rs @@ -2,7 +2,6 @@ use super::methods::*; use crate::rpc::{ codec::{ base::{BaseInboundCodec, BaseOutboundCodec}, - ssz::{SSZInboundCodec, SSZOutboundCodec}, ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec}, InboundCodec, OutboundCodec, }, @@ -91,7 +90,6 @@ pub enum Version { /// RPC Encondings supported. #[derive(Debug, Clone, PartialEq, Eq)] pub enum Encoding { - SSZ, SSZSnappy, } @@ -112,7 +110,6 @@ impl std::fmt::Display for Protocol { impl std::fmt::Display for Encoding { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let repr = match self { - Encoding::SSZ => "ssz", Encoding::SSZSnappy => "ssz_snappy", }; f.write_str(repr) @@ -141,17 +138,11 @@ impl UpgradeInfo for RPCProtocol { fn protocol_info(&self) -> Self::InfoIter { vec![ ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy), - ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZ), ProtocolId::new(Protocol::Goodbye, Version::V1, Encoding::SSZSnappy), - ProtocolId::new(Protocol::Goodbye, Version::V1, Encoding::SSZ), ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy), - ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZ), ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy), - ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZ), ProtocolId::new(Protocol::Ping, Version::V1, Encoding::SSZSnappy), - ProtocolId::new(Protocol::Ping, Version::V1, Encoding::SSZ), ProtocolId::new(Protocol::MetaData, Version::V1, Encoding::SSZSnappy), - ProtocolId::new(Protocol::MetaData, Version::V1, Encoding::SSZ), ] } } @@ -224,11 +215,6 @@ where BaseInboundCodec::new(SSZSnappyInboundCodec::new(protocol, MAX_RPC_SIZE)); InboundCodec::SSZSnappy(ssz_snappy_codec) } - Encoding::SSZ => { - let ssz_codec = - BaseInboundCodec::new(SSZInboundCodec::new(protocol, MAX_RPC_SIZE)); - InboundCodec::SSZ(ssz_codec) - } }; let mut timed_socket = TimeoutStream::new(socket); timed_socket.set_read_timeout(Some(Duration::from_secs(TTFB_TIMEOUT))); @@ -286,30 +272,36 @@ impl RPCRequest { pub fn supported_protocols(&self) -> Vec { match self { // add more protocols when versions/encodings are supported - RPCRequest::Status(_) => vec![ - ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy), - ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZ), - ], - RPCRequest::Goodbye(_) => vec![ - ProtocolId::new(Protocol::Goodbye, Version::V1, Encoding::SSZSnappy), - ProtocolId::new(Protocol::Goodbye, Version::V1, Encoding::SSZ), - ], - RPCRequest::BlocksByRange(_) => vec![ - ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy), - ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZ), - ], - RPCRequest::BlocksByRoot(_) => vec![ - ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy), - ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZ), - ], - RPCRequest::Ping(_) => vec![ - ProtocolId::new(Protocol::Ping, Version::V1, Encoding::SSZSnappy), - ProtocolId::new(Protocol::Ping, Version::V1, Encoding::SSZ), - ], - RPCRequest::MetaData(_) => vec![ - ProtocolId::new(Protocol::MetaData, Version::V1, Encoding::SSZSnappy), - ProtocolId::new(Protocol::MetaData, Version::V1, Encoding::SSZ), - ], + RPCRequest::Status(_) => vec![ProtocolId::new( + Protocol::Status, + Version::V1, + Encoding::SSZSnappy, + )], + RPCRequest::Goodbye(_) => vec![ProtocolId::new( + Protocol::Goodbye, + Version::V1, + Encoding::SSZSnappy, + )], + RPCRequest::BlocksByRange(_) => vec![ProtocolId::new( + Protocol::BlocksByRange, + Version::V1, + Encoding::SSZSnappy, + )], + RPCRequest::BlocksByRoot(_) => vec![ProtocolId::new( + Protocol::BlocksByRoot, + Version::V1, + Encoding::SSZSnappy, + )], + RPCRequest::Ping(_) => vec![ProtocolId::new( + Protocol::Ping, + Version::V1, + Encoding::SSZSnappy, + )], + RPCRequest::MetaData(_) => vec![ProtocolId::new( + Protocol::MetaData, + Version::V1, + Encoding::SSZSnappy, + )], } } @@ -379,11 +371,6 @@ where BaseOutboundCodec::new(SSZSnappyOutboundCodec::new(protocol, MAX_RPC_SIZE)); OutboundCodec::SSZSnappy(ssz_snappy_codec) } - Encoding::SSZ => { - let ssz_codec = - BaseOutboundCodec::new(SSZOutboundCodec::new(protocol, MAX_RPC_SIZE)); - OutboundCodec::SSZ(ssz_codec) - } }; let mut socket = Framed::new(socket, codec);