diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs b/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs index c6644cf26c..f1b7f74daf 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs @@ -1,8 +1,10 @@ pub(crate) mod base; pub(crate) mod ssz; +pub(crate) mod ssz_snappy; use self::base::{BaseInboundCodec, BaseOutboundCodec}; use self::ssz::{SSZInboundCodec, SSZOutboundCodec}; +use self::ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec}; use crate::rpc::protocol::RPCError; use crate::rpc::{RPCErrorResponse, RPCRequest}; use libp2p::bytes::BytesMut; @@ -11,10 +13,12 @@ use types::EthSpec; // Known types of codecs pub enum InboundCodec { + SSZSnappy(BaseInboundCodec, TSpec>), SSZ(BaseInboundCodec, TSpec>), } pub enum OutboundCodec { + SSZSnappy(BaseOutboundCodec, TSpec>), SSZ(BaseOutboundCodec, TSpec>), } @@ -25,6 +29,7 @@ impl Encoder for InboundCodec { fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { match self { InboundCodec::SSZ(codec) => codec.encode(item, dst), + InboundCodec::SSZSnappy(codec) => codec.encode(item, dst), } } } @@ -36,6 +41,7 @@ impl Decoder for InboundCodec { fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { match self { InboundCodec::SSZ(codec) => codec.decode(src), + InboundCodec::SSZSnappy(codec) => codec.decode(src), } } } @@ -47,6 +53,7 @@ impl Encoder for OutboundCodec { fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { match self { OutboundCodec::SSZ(codec) => codec.encode(item, dst), + OutboundCodec::SSZSnappy(codec) => codec.encode(item, dst), } } } @@ -58,6 +65,7 @@ impl Decoder for OutboundCodec { fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { match self { OutboundCodec::SSZ(codec) => codec.decode(src), + OutboundCodec::SSZSnappy(codec) => codec.decode(src), } } } diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs new file mode 100644 index 0000000000..c345a71451 --- /dev/null +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs @@ -0,0 +1,373 @@ +use crate::rpc::methods::*; +use crate::rpc::{ + codec::base::OutboundCodec, + protocol::{ + ProtocolId, RPCError, RPC_BLOCKS_BY_RANGE, RPC_BLOCKS_BY_ROOT, RPC_GOODBYE, RPC_META_DATA, + RPC_PING, RPC_STATUS, + }, +}; +use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse}; +use libp2p::bytes::BytesMut; +use snap::read::FrameDecoder; +use snap::write::FrameEncoder; +use ssz::{Decode, Encode}; +use std::io::Cursor; +use std::io::ErrorKind; +use std::io::{Read, Write}; +use std::marker::PhantomData; +use tokio::codec::{Decoder, Encoder}; +use types::{EthSpec, SignedBeaconBlock}; +use unsigned_varint::codec::Uvi; + +/* Inbound Codec */ + +pub struct SSZSnappyInboundCodec { + protocol: ProtocolId, + inner: Uvi, + len: Option, + /// Maximum bytes that can be sent in one req/resp chunked responses. + max_packet_size: usize, + phantom: PhantomData, +} + +impl SSZSnappyInboundCodec { + pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self { + let uvi_codec = Uvi::default(); + // this encoding only applies to ssz_snappy. + debug_assert!(protocol.encoding.as_str() == "ssz_snappy"); + + SSZSnappyInboundCodec { + inner: uvi_codec, + protocol, + len: None, + phantom: PhantomData, + max_packet_size, + } + } +} + +// Encoder for inbound streams: Encodes RPC Responses sent to peers. +impl Encoder for SSZSnappyInboundCodec { + type Item = RPCErrorResponse; + type Error = RPCError; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + let bytes = match item { + RPCErrorResponse::Success(resp) => match resp { + RPCResponse::Status(res) => res.as_ssz_bytes(), + RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(), + RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(), + RPCResponse::Pong(res) => res.data.as_ssz_bytes(), + RPCResponse::MetaData(res) => res.as_ssz_bytes(), + }, + RPCErrorResponse::InvalidRequest(err) => err.as_ssz_bytes(), + RPCErrorResponse::ServerError(err) => err.as_ssz_bytes(), + RPCErrorResponse::Unknown(err) => err.as_ssz_bytes(), + RPCErrorResponse::StreamTermination(_) => { + unreachable!("Code error - attempting to encode a stream termination") + } + }; + // SSZ encoded bytes should be within `max_packet_size` + if bytes.len() > self.max_packet_size { + return Err(RPCError::Custom( + "attempting to encode data > max_packet_size".into(), + )); + } + // Inserts the length prefix of the uncompressed bytes into dst + // encoded as a unsigned varint + self.inner + .encode(bytes.len(), dst) + .map_err(RPCError::from)?; + + let mut writer = FrameEncoder::new(Vec::new()); + writer.write_all(&bytes).map_err(RPCError::from)?; + writer.flush().map_err(RPCError::from)?; + + // Write compressed bytes to `dst` + dst.extend_from_slice(writer.get_ref()); + Ok(()) + } +} + +// Decoder for inbound streams: Decodes RPC requests from peers +impl Decoder for SSZSnappyInboundCodec { + type Item = RPCRequest; + type Error = RPCError; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + if self.len.is_none() { + // Decode the length of the uncompressed bytes from an unsigned varint + match self.inner.decode(src).map_err(RPCError::from)? { + Some(length) => { + self.len = Some(length); + } + None => return Ok(None), // need more bytes to decode length + } + }; + + let length = self.len.expect("length should be Some"); + + // Should not attempt to decode rpc chunks with length > max_packet_size + if length > self.max_packet_size { + return Err(RPCError::Custom( + "attempting to decode data > max_packet_size".into(), + )); + } + let mut reader = FrameDecoder::new(Cursor::new(&src)); + let mut decoded_buffer = vec![0; length]; + + match reader.read_exact(&mut decoded_buffer) { + Ok(()) => { + // `n` is how many bytes the reader read in the compressed stream + let n = reader.get_ref().position(); + self.len = None; + src.split_to(n as usize); + match self.protocol.message_name.as_str() { + RPC_STATUS => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes( + &decoded_buffer, + )?))), + _ => unreachable!("Cannot negotiate an unknown version"), + }, + RPC_GOODBYE => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes( + &decoded_buffer, + )?))), + _ => unreachable!("Cannot negotiate an unknown version"), + }, + RPC_BLOCKS_BY_RANGE => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCRequest::BlocksByRange( + BlocksByRangeRequest::from_ssz_bytes(&decoded_buffer)?, + ))), + _ => unreachable!("Cannot negotiate an unknown version"), + }, + RPC_BLOCKS_BY_ROOT => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest { + block_roots: Vec::from_ssz_bytes(&decoded_buffer)?, + }))), + _ => unreachable!("Cannot negotiate an unknown version"), + }, + RPC_PING => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCRequest::Ping(Ping::from_ssz_bytes( + &decoded_buffer, + )?))), + _ => unreachable!("Cannot negotiate an unknown version"), + }, + RPC_META_DATA => match self.protocol.version.as_str() { + "1" => { + if decoded_buffer.len() > 0 { + Err(RPCError::Custom( + "Get metadata request should be empty".into(), + )) + } else { + Ok(Some(RPCRequest::MetaData(PhantomData))) + } + } + _ => unreachable!("Cannot negotiate an unknown version"), + }, + _ => unreachable!("Cannot negotiate an unknown protocol"), + } + } + Err(e) => match e.kind() { + // Haven't received enough bytes to decode yet + // TODO: check if this is the only Error variant where we return `Ok(None)` + ErrorKind::UnexpectedEof => { + return Ok(None); + } + _ => return Err(e).map_err(RPCError::from), + }, + } + } +} + +/* Outbound Codec: Codec for initiating RPC requests */ +pub struct SSZSnappyOutboundCodec { + inner: Uvi, + len: Option, + protocol: ProtocolId, + /// Maximum bytes that can be sent in one req/resp chunked responses. + max_packet_size: usize, + phantom: PhantomData, +} + +impl SSZSnappyOutboundCodec { + pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self { + let uvi_codec = Uvi::default(); + // this encoding only applies to ssz_snappy. + debug_assert!(protocol.encoding.as_str() == "ssz_snappy"); + + SSZSnappyOutboundCodec { + inner: uvi_codec, + protocol, + max_packet_size, + len: None, + phantom: PhantomData, + } + } +} + +// Encoder for outbound streams: Encodes RPC Requests to peers +impl Encoder for SSZSnappyOutboundCodec { + type Item = RPCRequest; + type Error = RPCError; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + let bytes = match item { + RPCRequest::Status(req) => req.as_ssz_bytes(), + RPCRequest::Goodbye(req) => req.as_ssz_bytes(), + RPCRequest::BlocksByRange(req) => req.as_ssz_bytes(), + RPCRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(), + RPCRequest::Ping(req) => req.as_ssz_bytes(), + RPCRequest::MetaData(_) => return Ok(()), // no metadata to encode + }; + // SSZ encoded bytes should be within `max_packet_size` + if bytes.len() > self.max_packet_size { + return Err(RPCError::Custom( + "attempting to encode data > max_packet_size".into(), + )); + } + + // Inserts the length prefix of the uncompressed bytes into dst + // encoded as a unsigned varint + self.inner + .encode(bytes.len(), dst) + .map_err(RPCError::from)?; + + let mut writer = FrameEncoder::new(Vec::new()); + writer.write_all(&bytes).map_err(RPCError::from)?; + writer.flush().map_err(RPCError::from)?; + + // Write compressed bytes to `dst` + dst.extend_from_slice(writer.get_ref()); + Ok(()) + } +} + +// Decoder for outbound streams: Decodes RPC responses from peers. +// +// The majority of the decoding has now been pushed upstream due to the changing specification. +// We prefer to decode blocks and attestations with extra knowledge about the chain to perform +// faster verification checks before decoding entire blocks/attestations. +impl Decoder for SSZSnappyOutboundCodec { + type Item = RPCResponse; + type Error = RPCError; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + if self.len.is_none() { + // Decode the length of the uncompressed bytes from an unsigned varint + match self.inner.decode(src).map_err(RPCError::from)? { + Some(length) => { + self.len = Some(length as usize); + } + None => return Ok(None), // need more bytes to decode length + } + }; + + let length = self.len.expect("length should be Some"); + + // Should not attempt to decode rpc chunks with length > max_packet_size + if length > self.max_packet_size { + return Err(RPCError::Custom( + "attempting to decode data > max_packet_size".into(), + )); + } + let mut reader = FrameDecoder::new(Cursor::new(&src)); + let mut decoded_buffer = vec![0; length]; + match reader.read_exact(&mut decoded_buffer) { + Ok(()) => { + // `n` is how many bytes the reader read in the compressed stream + let n = reader.get_ref().position(); + self.len = None; + src.split_to(n as usize); + match self.protocol.message_name.as_str() { + RPC_STATUS => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCResponse::Status(StatusMessage::from_ssz_bytes( + &decoded_buffer, + )?))), + _ => unreachable!("Cannot negotiate an unknown version"), + }, + RPC_GOODBYE => { + Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")) + } + RPC_BLOCKS_BY_RANGE => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCResponse::BlocksByRange(Box::new( + SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?, + )))), + _ => unreachable!("Cannot negotiate an unknown version"), + }, + RPC_BLOCKS_BY_ROOT => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCResponse::BlocksByRoot(Box::new( + SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?, + )))), + _ => unreachable!("Cannot negotiate an unknown version"), + }, + RPC_PING => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCResponse::Pong(Ping { + data: u64::from_ssz_bytes(&decoded_buffer)?, + }))), + _ => unreachable!("Cannot negotiate an unknown version"), + }, + RPC_META_DATA => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCResponse::MetaData(MetaData::from_ssz_bytes( + &decoded_buffer, + )?))), + _ => unreachable!("Cannot negotiate an unknown version"), + }, + _ => unreachable!("Cannot negotiate an unknown protocol"), + } + } + Err(e) => match e.kind() { + // Haven't received enough bytes to decode yet + // TODO: check if this is the only Error variant where we return `Ok(None)` + ErrorKind::UnexpectedEof => { + return Ok(None); + } + _ => return Err(e).map_err(RPCError::from), + }, + } + } +} + +impl OutboundCodec for SSZSnappyOutboundCodec { + type ErrorType = ErrorMessage; + + fn decode_error(&mut self, src: &mut BytesMut) -> Result, RPCError> { + if self.len.is_none() { + // Decode the length of the uncompressed bytes from an unsigned varint + match self.inner.decode(src).map_err(RPCError::from)? { + Some(length) => { + self.len = Some(length as usize); + } + None => return Ok(None), // need more bytes to decode length + } + }; + + let length = self.len.expect("length should be Some"); + + // Should not attempt to decode rpc chunks with length > max_packet_size + if length > self.max_packet_size { + return Err(RPCError::Custom( + "attempting to decode data > max_packet_size".into(), + )); + } + let mut reader = FrameDecoder::new(Cursor::new(&src)); + let mut decoded_buffer = vec![0; length]; + match reader.read_exact(&mut decoded_buffer) { + Ok(()) => { + // `n` is how many bytes the reader read in the compressed stream + let n = reader.get_ref().position(); + self.len = None; + src.split_to(n as usize); + Ok(Some(ErrorMessage::from_ssz_bytes(&decoded_buffer)?)) + } + Err(e) => match e.kind() { + // Haven't received enough bytes to decode yet + // TODO: check if this is the only Error variant where we return `Ok(None)` + ErrorKind::UnexpectedEof => { + return Ok(None); + } + _ => return Err(e).map_err(RPCError::from), + }, + } + } +} diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 0f93759fef..058ea78e75 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -5,6 +5,7 @@ use crate::rpc::{ codec::{ base::{BaseInboundCodec, BaseOutboundCodec}, ssz::{SSZInboundCodec, SSZOutboundCodec}, + ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec}, InboundCodec, OutboundCodec, }, methods::ResponseTermination, @@ -58,11 +59,17 @@ impl UpgradeInfo for RPCProtocol { /// The list of supported RPC protocols for Lighthouse. fn protocol_info(&self) -> Self::InfoIter { vec![ + ProtocolId::new(RPC_STATUS, "1", "ssz_snappy"), ProtocolId::new(RPC_STATUS, "1", "ssz"), + ProtocolId::new(RPC_GOODBYE, "1", "ssz_snappy"), ProtocolId::new(RPC_GOODBYE, "1", "ssz"), + ProtocolId::new(RPC_BLOCKS_BY_RANGE, "1", "ssz_snappy"), ProtocolId::new(RPC_BLOCKS_BY_RANGE, "1", "ssz"), + ProtocolId::new(RPC_BLOCKS_BY_ROOT, "1", "ssz_snappy"), ProtocolId::new(RPC_BLOCKS_BY_ROOT, "1", "ssz"), + ProtocolId::new(RPC_PING, "1", "ssz_snappy"), ProtocolId::new(RPC_PING, "1", "ssz"), + ProtocolId::new(RPC_META_DATA, "1", "ssz_snappy"), ProtocolId::new(RPC_META_DATA, "1", "ssz"), ] } @@ -146,39 +153,44 @@ where socket: upgrade::Negotiated, protocol: ProtocolId, ) -> Self::Future { - match protocol.encoding.as_str() { - "ssz" | _ => { - let protocol_name = protocol.message_name.clone(); - let ssz_codec = BaseInboundCodec::new(SSZInboundCodec::new(protocol, MAX_RPC_SIZE)); - let codec = InboundCodec::SSZ(ssz_codec); - let mut timed_socket = TimeoutStream::new(socket); - timed_socket.set_read_timeout(Some(Duration::from_secs(TTFB_TIMEOUT))); - - let socket = Framed::new(timed_socket, codec); - - // MetaData requests should be empty, return the stream - if protocol_name == RPC_META_DATA { - futures::future::Either::A(futures::future::ok(( - RPCRequest::MetaData(PhantomData), - socket, - ))) - } else { - futures::future::Either::B( - socket - .into_future() - .timeout(Duration::from_secs(REQUEST_TIMEOUT)) - .map_err(RPCError::from as FnMapErr) - .and_then({ - |(req, stream)| match req { - Some(request) => futures::future::ok((request, stream)), - None => futures::future::err(RPCError::Custom( - "Stream terminated early".into(), - )), - } - } as FnAndThen), - ) - } + let protocol_name = protocol.message_name.clone(); + let codec = match protocol.encoding.as_str() { + "ssz_snappy" => { + let ssz_snappy_codec = + BaseInboundCodec::new(SSZSnappyInboundCodec::new(protocol, MAX_RPC_SIZE)); + InboundCodec::SSZSnappy(ssz_snappy_codec) } + "ssz" | _ => { + let ssz_codec = BaseInboundCodec::new(SSZInboundCodec::new(protocol, MAX_RPC_SIZE)); + InboundCodec::SSZ(ssz_codec) + } + }; + let mut timed_socket = TimeoutStream::new(socket); + timed_socket.set_read_timeout(Some(Duration::from_secs(TTFB_TIMEOUT))); + + let socket = Framed::new(timed_socket, codec); + + // MetaData requests should be empty, return the stream + if protocol_name == RPC_META_DATA { + futures::future::Either::A(futures::future::ok(( + RPCRequest::MetaData(PhantomData), + socket, + ))) + } else { + futures::future::Either::B( + socket + .into_future() + .timeout(Duration::from_secs(REQUEST_TIMEOUT)) + .map_err(RPCError::from as FnMapErr) + .and_then({ + |(req, stream)| match req { + Some(request) => futures::future::ok((request, stream)), + None => futures::future::err(RPCError::Custom( + "Stream terminated early".into(), + )), + } + } as FnAndThen), + ) } } } @@ -213,12 +225,30 @@ impl RPCRequest { pub fn supported_protocols(&self) -> Vec { match self { // add more protocols when versions/encodings are supported - RPCRequest::Status(_) => vec![ProtocolId::new(RPC_STATUS, "1", "ssz")], - RPCRequest::Goodbye(_) => vec![ProtocolId::new(RPC_GOODBYE, "1", "ssz")], - RPCRequest::BlocksByRange(_) => vec![ProtocolId::new(RPC_BLOCKS_BY_RANGE, "1", "ssz")], - RPCRequest::BlocksByRoot(_) => vec![ProtocolId::new(RPC_BLOCKS_BY_ROOT, "1", "ssz")], - RPCRequest::Ping(_) => vec![ProtocolId::new(RPC_PING, "1", "ssz")], - RPCRequest::MetaData(_) => vec![ProtocolId::new(RPC_META_DATA, "1", "ssz")], + RPCRequest::Status(_) => vec![ + ProtocolId::new(RPC_STATUS, "1", "ssz_snappy"), + ProtocolId::new(RPC_STATUS, "1", "ssz"), + ], + RPCRequest::Goodbye(_) => vec![ + ProtocolId::new(RPC_GOODBYE, "1", "ssz_snappy"), + ProtocolId::new(RPC_GOODBYE, "1", "ssz"), + ], + RPCRequest::BlocksByRange(_) => vec![ + ProtocolId::new(RPC_BLOCKS_BY_RANGE, "1", "ssz_snappy"), + ProtocolId::new(RPC_BLOCKS_BY_RANGE, "1", "ssz"), + ], + RPCRequest::BlocksByRoot(_) => vec![ + ProtocolId::new(RPC_BLOCKS_BY_ROOT, "1", "ssz_snappy"), + ProtocolId::new(RPC_BLOCKS_BY_ROOT, "1", "ssz"), + ], + RPCRequest::Ping(_) => vec![ + ProtocolId::new(RPC_PING, "1", "ssz_snappy"), + ProtocolId::new(RPC_PING, "1", "ssz"), + ], + RPCRequest::MetaData(_) => vec![ + ProtocolId::new(RPC_META_DATA, "1", "ssz_snappy"), + ProtocolId::new(RPC_META_DATA, "1", "ssz"), + ], } } @@ -286,14 +316,19 @@ where socket: upgrade::Negotiated, protocol: Self::Info, ) -> Self::Future { - match protocol.encoding.as_str() { + let codec = match protocol.encoding.as_str() { + "ssz_snappy" => { + let ssz_snappy_codec = + BaseOutboundCodec::new(SSZSnappyOutboundCodec::new(protocol, MAX_RPC_SIZE)); + OutboundCodec::SSZSnappy(ssz_snappy_codec) + } "ssz" | _ => { let ssz_codec = BaseOutboundCodec::new(SSZOutboundCodec::new(protocol, MAX_RPC_SIZE)); - let codec = OutboundCodec::SSZ(ssz_codec); - Framed::new(socket, codec).send(self) + OutboundCodec::SSZ(ssz_codec) } - } + }; + Framed::new(socket, codec).send(self) } } @@ -304,6 +339,8 @@ pub enum RPCError { ReadError(upgrade::ReadOneError), /// Error when decoding the raw buffer from ssz. SSZDecodeError(ssz::DecodeError), + /// Snappy error + SnappyError(snap::Error), /// Invalid Protocol ID. InvalidProtocol(&'static str), /// IO Error. @@ -351,6 +388,12 @@ impl From for RPCError { } } +impl From for RPCError { + fn from(err: snap::Error) -> Self { + RPCError::SnappyError(err) + } +} + // Error trait is required for `ProtocolsHandler` impl std::fmt::Display for RPCError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -361,6 +404,7 @@ impl std::fmt::Display for RPCError { RPCError::IoError(ref err) => write!(f, "IO Error: {}", err), RPCError::RPCErrorResponse => write!(f, "RPC Response Error"), RPCError::StreamTimeout => write!(f, "Stream Timeout"), + RPCError::SnappyError(ref err) => write!(f, "Snappy error: {}", err), RPCError::Custom(ref err) => write!(f, "{}", err), } } @@ -371,6 +415,7 @@ impl std::error::Error for RPCError { match *self { RPCError::ReadError(ref err) => Some(err), RPCError::SSZDecodeError(_) => None, + RPCError::SnappyError(ref err) => Some(err), RPCError::InvalidProtocol(_) => None, RPCError::IoError(ref err) => Some(err), RPCError::StreamTimeout => None, diff --git a/beacon_node/eth2-libp2p/tests/rpc_tests.rs b/beacon_node/eth2-libp2p/tests/rpc_tests.rs index ddf31a37e4..016e3ab1d4 100644 --- a/beacon_node/eth2-libp2p/tests/rpc_tests.rs +++ b/beacon_node/eth2-libp2p/tests/rpc_tests.rs @@ -395,6 +395,140 @@ fn test_blocks_by_range_single_empty_rpc() { assert!(test_result.load(Relaxed)); } +#[test] +// Tests a streamed, chunked BlocksByRoot RPC Message +// The size of the reponse is a full `BeaconBlock` +// which is greater than the Snappy frame size. Hence, this test +// serves to test the snappy framing format as well. +fn test_blocks_by_root_chunked_rpc() { + // set up the logging. The level and enabled logging or not + let log_level = Level::Trace; + let enable_logging = false; + + let messages_to_send = 3; + + let log = common::build_log(log_level, enable_logging); + let spec = E::default_spec(); + + // get sender/receiver + let (mut sender, mut receiver) = common::build_node_pair(&log, 10515); + + // BlocksByRoot Request + let rpc_request = RPCRequest::BlocksByRoot(BlocksByRootRequest { + block_roots: vec![Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0)], + }); + + // BlocksByRoot Response + let full_block = BeaconBlock::full(&spec); + let signed_full_block = SignedBeaconBlock { + message: full_block, + signature: Signature::empty_signature(), + }; + let rpc_response = RPCResponse::BlocksByRoot(Box::new(signed_full_block)); + + let sender_request = rpc_request.clone(); + let sender_log = log.clone(); + let sender_response = rpc_response.clone(); + + // keep count of the number of messages received + let messages_received = Arc::new(Mutex::new(0)); + // build the sender future + let sender_future = future::poll_fn(move || -> Poll { + loop { + match sender.poll().unwrap() { + Async::Ready(Some(BehaviourEvent::PeerDialed(peer_id))) => { + // Send a BlocksByRoot request + warn!(sender_log, "Sender sending RPC request"); + sender + .swarm + .send_rpc(peer_id, RPCEvent::Request(1, sender_request.clone())); + } + Async::Ready(Some(BehaviourEvent::RPC(_, event))) => match event { + // Should receive the RPC response + RPCEvent::Response(id, response) => { + warn!(sender_log, "Sender received a response"); + assert_eq!(id, 1); + match response { + RPCErrorResponse::Success(res) => { + assert_eq!(res, sender_response.clone()); + *messages_received.lock().unwrap() += 1; + warn!(sender_log, "Chunk received"); + } + RPCErrorResponse::StreamTermination( + ResponseTermination::BlocksByRoot, + ) => { + // should be exactly 10 messages before terminating + assert_eq!(*messages_received.lock().unwrap(), messages_to_send); + // end the test + return Ok(Async::Ready(true)); + } + m => panic!("Invalid RPC received: {}", m), + } + } + m => panic!("Received invalid RPC message: {}", m), + }, + Async::Ready(Some(_)) => {} + Async::Ready(None) | Async::NotReady => return Ok(Async::NotReady), + }; + } + }); + + // build the receiver future + let receiver_future = future::poll_fn(move || -> Poll { + loop { + match receiver.poll().unwrap() { + Async::Ready(Some(BehaviourEvent::RPC(peer_id, event))) => match event { + // Should receive the sent RPC request + RPCEvent::Request(id, request) => { + if request == rpc_request { + // send the response + warn!(log, "Receiver got request"); + + for _ in 1..=messages_to_send { + receiver.swarm.send_rpc( + peer_id.clone(), + RPCEvent::Response( + id, + RPCErrorResponse::Success(rpc_response.clone()), + ), + ); + } + // send the stream termination + receiver.swarm.send_rpc( + peer_id, + RPCEvent::Response( + id, + RPCErrorResponse::StreamTermination( + ResponseTermination::BlocksByRange, + ), + ), + ); + } + } + _ => panic!("Received invalid RPC message"), + }, + Async::Ready(Some(_)) => (), + Async::Ready(None) | Async::NotReady => return Ok(Async::NotReady), + } + } + }); + + // execute the futures and check the result + let test_result = Arc::new(AtomicBool::new(false)); + let error_result = test_result.clone(); + let thread_result = test_result.clone(); + tokio::run( + sender_future + .select(receiver_future) + .timeout(Duration::from_millis(1000)) + .map_err(move |_| error_result.store(false, Relaxed)) + .map(move |result| { + thread_result.store(result.0, Relaxed); + }), + ); + assert!(test_result.load(Relaxed)); +} + #[test] // Tests a Goodbye RPC message fn test_goodbye_rpc() { diff --git a/eth2/types/src/attestation_data.rs b/eth2/types/src/attestation_data.rs index fc28732de2..15d9920f70 100644 --- a/eth2/types/src/attestation_data.rs +++ b/eth2/types/src/attestation_data.rs @@ -10,7 +10,18 @@ use tree_hash_derive::TreeHash; /// /// Spec v0.11.1 #[derive( - Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash, Encode, Decode, TreeHash, TestRandom, + Debug, + Clone, + PartialEq, + Eq, + Serialize, + Deserialize, + Hash, + Encode, + Decode, + TreeHash, + TestRandom, + Default, )] pub struct AttestationData { pub slot: Slot, diff --git a/eth2/types/src/beacon_block.rs b/eth2/types/src/beacon_block.rs index 5c8efa85db..e2fed91439 100644 --- a/eth2/types/src/beacon_block.rs +++ b/eth2/types/src/beacon_block.rs @@ -50,6 +50,100 @@ impl BeaconBlock { } } + /// Return a block where the block has the max possible operations. + pub fn full(spec: &ChainSpec) -> BeaconBlock { + let header = BeaconBlockHeader { + slot: Slot::new(1), + proposer_index: 0, + parent_root: Hash256::zero(), + state_root: Hash256::zero(), + body_root: Hash256::zero(), + }; + + let signed_header = SignedBeaconBlockHeader { + message: header, + signature: Signature::empty_signature(), + }; + let indexed_attestation: IndexedAttestation = IndexedAttestation { + attesting_indices: VariableList::new(vec![ + 0 as u64; + T::MaxValidatorsPerCommittee::to_usize() + ]) + .unwrap(), + data: AttestationData::default(), + signature: AggregateSignature::new(), + }; + + let deposit_data = DepositData { + pubkey: PublicKeyBytes::empty(), + withdrawal_credentials: Hash256::zero(), + amount: 0, + signature: SignatureBytes::empty(), + }; + let proposer_slashing = ProposerSlashing { + signed_header_1: signed_header.clone(), + signed_header_2: signed_header.clone(), + }; + + let attester_slashing = AttesterSlashing { + attestation_1: indexed_attestation.clone(), + attestation_2: indexed_attestation.clone(), + }; + + let attestation: Attestation = Attestation { + aggregation_bits: BitList::with_capacity(T::MaxValidatorsPerCommittee::to_usize()) + .unwrap(), + data: AttestationData::default(), + signature: AggregateSignature::new(), + }; + + let deposit = Deposit { + proof: FixedVector::from_elem(Hash256::zero()), + data: deposit_data, + }; + + let voluntary_exit = VoluntaryExit { + epoch: Epoch::new(1), + validator_index: 1, + }; + + let signed_voluntary_exit = SignedVoluntaryExit { + message: voluntary_exit, + signature: Signature::empty_signature(), + }; + + let mut block: BeaconBlock = BeaconBlock::empty(spec); + for _ in 0..T::MaxProposerSlashings::to_usize() { + block + .body + .proposer_slashings + .push(proposer_slashing.clone()) + .unwrap(); + } + for _ in 0..T::MaxDeposits::to_usize() { + block.body.deposits.push(deposit.clone()).unwrap(); + } + for _ in 0..T::MaxVoluntaryExits::to_usize() { + block + .body + .voluntary_exits + .push(signed_voluntary_exit.clone()) + .unwrap(); + } + for _ in 0..T::MaxAttesterSlashings::to_usize() { + block + .body + .attester_slashings + .push(attester_slashing.clone()) + .unwrap(); + } + + for _ in 0..T::MaxAttestations::to_usize() { + block.body.attestations.push(attestation.clone()).unwrap(); + } + block + } + /// Returns the epoch corresponding to `self.slot`. pub fn epoch(&self) -> Epoch { self.slot.epoch(T::slots_per_epoch())