diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec.rs similarity index 91% rename from beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs rename to beacon_node/lighthouse_network/src/rpc/codec.rs index 8f5143d7ed..224fb8a5f7 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec.rs @@ -1,9 +1,9 @@ use crate::rpc::methods::*; -use crate::rpc::{ - codec::base::OutboundCodec, - protocol::{Encoding, ProtocolId, RPCError, SupportedProtocol, ERROR_TYPE_MAX, ERROR_TYPE_MIN}, +use crate::rpc::protocol::{ + Encoding, ProtocolId, RPCError, SupportedProtocol, ERROR_TYPE_MAX, ERROR_TYPE_MIN, }; use crate::rpc::{InboundRequest, OutboundRequest}; +use libp2p::bytes::BufMut; use libp2p::bytes::BytesMut; use snap::read::FrameDecoder; use snap::write::FrameEncoder; @@ -57,13 +57,13 @@ impl SSZSnappyInboundCodec { max_packet_size, } } -} -// Encoder for inbound streams: Encodes RPC Responses sent to peers. -impl Encoder> for SSZSnappyInboundCodec { - type Error = RPCError; - - fn encode(&mut self, item: RPCCodedResponse, dst: &mut BytesMut) -> Result<(), Self::Error> { + /// Encodes RPC Responses sent to peers. + fn encode_response( + &mut self, + item: RPCCodedResponse, + dst: &mut BytesMut, + ) -> Result<(), RPCError> { let bytes = match &item { RPCCodedResponse::Success(resp) => match &resp { RPCResponse::Status(res) => res.as_ssz_bytes(), @@ -125,6 +125,21 @@ impl Encoder> for SSZSnappyInboundCodec { } } +// Encoder for inbound streams: Encodes RPC Responses sent to peers. +impl Encoder> for SSZSnappyInboundCodec { + type Error = RPCError; + + fn encode(&mut self, item: RPCCodedResponse, dst: &mut BytesMut) -> Result<(), Self::Error> { + dst.clear(); + dst.reserve(1); + dst.put_u8( + item.as_u8() + .expect("Should never encode a stream termination"), + ); + self.encode_response(item, dst) + } +} + // Decoder for inbound streams: Decodes RPC requests from peers impl Decoder for SSZSnappyInboundCodec { type Item = InboundRequest; @@ -188,6 +203,8 @@ pub struct SSZSnappyOutboundCodec { /// The fork name corresponding to the received context bytes. fork_name: Option, fork_context: Arc, + /// Keeps track of the current response code for a chunk. + current_response_code: Option, phantom: PhantomData, } @@ -209,6 +226,93 @@ impl SSZSnappyOutboundCodec { fork_name: None, fork_context, phantom: PhantomData, + current_response_code: None, + } + } + + // Decode an Rpc response. + fn decode_response(&mut self, src: &mut BytesMut) -> Result>, RPCError> { + // Read the context bytes if required + if self.protocol.has_context_bytes() && self.fork_name.is_none() { + if src.len() >= CONTEXT_BYTES_LEN { + let context_bytes = src.split_to(CONTEXT_BYTES_LEN); + let mut result = [0; CONTEXT_BYTES_LEN]; + result.copy_from_slice(context_bytes.as_ref()); + self.fork_name = Some(context_bytes_to_fork_name( + result, + self.fork_context.clone(), + )?); + } else { + return Ok(None); + } + } + let Some(length) = handle_length(&mut self.inner, &mut self.len, src)? else { + return Ok(None); + }; + + // Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of + // packet size for ssz container corresponding to `self.protocol`. + let ssz_limits = self.protocol.rpc_response_limits::(&self.fork_context); + if ssz_limits.is_out_of_bounds(length, self.max_packet_size) { + return Err(RPCError::InvalidData(format!( + "RPC response length is out of bounds, length {}, max {}, min {}", + length, ssz_limits.max, ssz_limits.min + ))); + } + // Calculate worst case compression length for given uncompressed length + let max_compressed_len = snap::raw::max_compress_len(length) as u64; + // Create a limit reader as a wrapper that reads only upto `max_compressed_len` from `src`. + let limit_reader = Cursor::new(src.as_ref()).take(max_compressed_len); + let mut reader = FrameDecoder::new(limit_reader); + + let mut decoded_buffer = vec![0; length]; + + match reader.read_exact(&mut decoded_buffer) { + Ok(()) => { + // `n` is how many bytes the reader read in the compressed stream + let n = reader.get_ref().get_ref().position(); + self.len = None; + let _read_bytes = src.split_to(n as usize); + // Safe to `take` from `self.fork_name` as we have all the bytes we need to + // decode an ssz object at this point. + let fork_name = self.fork_name.take(); + handle_rpc_response(self.protocol.versioned_protocol, &decoded_buffer, fork_name) + } + Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len), + } + } + + fn decode_error(&mut self, src: &mut BytesMut) -> Result, RPCError> { + let Some(length) = handle_length(&mut self.inner, &mut self.len, src)? else { + return Ok(None); + }; + + // Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of + // packet size for ssz container corresponding to `ErrorType`. + if length > self.max_packet_size || length > *ERROR_TYPE_MAX || length < *ERROR_TYPE_MIN { + return Err(RPCError::InvalidData(format!( + "RPC Error length is out of bounds, length {}", + length + ))); + } + + // Calculate worst case compression length for given uncompressed length + let max_compressed_len = snap::raw::max_compress_len(length) as u64; + // Create a limit reader as a wrapper that reads only upto `max_compressed_len` from `src`. + let limit_reader = Cursor::new(src.as_ref()).take(max_compressed_len); + let mut reader = FrameDecoder::new(limit_reader); + let mut decoded_buffer = vec![0; length]; + match reader.read_exact(&mut decoded_buffer) { + Ok(()) => { + // `n` is how many bytes the reader read in the compressed stream + let n = reader.get_ref().get_ref().position(); + self.len = None; + let _read_bytes = src.split_to(n as usize); + Ok(Some(ErrorType(VariableList::from_ssz_bytes( + &decoded_buffer, + )?))) + } + Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len), } } } @@ -265,99 +369,40 @@ impl Encoder> for SSZSnappyOutboundCodec { // We prefer to decode blocks and attestations with extra knowledge about the chain to perform // faster verification checks before decoding entire blocks/attestations. impl Decoder for SSZSnappyOutboundCodec { - type Item = RPCResponse; + type Item = RPCCodedResponse; type Error = RPCError; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - // Read the context bytes if required - if self.protocol.has_context_bytes() && self.fork_name.is_none() { - if src.len() >= CONTEXT_BYTES_LEN { - let context_bytes = src.split_to(CONTEXT_BYTES_LEN); - let mut result = [0; CONTEXT_BYTES_LEN]; - result.copy_from_slice(context_bytes.as_ref()); - self.fork_name = Some(context_bytes_to_fork_name( - result, - self.fork_context.clone(), - )?); + // if we have only received the response code, wait for more bytes + if src.len() <= 1 { + return Ok(None); + } + // using the response code determine which kind of payload needs to be decoded. + let response_code = self.current_response_code.unwrap_or_else(|| { + let resp_code = src.split_to(1)[0]; + self.current_response_code = Some(resp_code); + resp_code + }); + + let inner_result = { + if RPCCodedResponse::::is_response(response_code) { + // decode an actual response and mutates the buffer if enough bytes have been read + // returning the result. + self.decode_response(src) + .map(|r| r.map(RPCCodedResponse::Success)) } else { - return Ok(None); + // decode an error + self.decode_error(src) + .map(|r| r.map(|resp| RPCCodedResponse::from_error(response_code, resp))) } - } - let Some(length) = handle_length(&mut self.inner, &mut self.len, src)? else { - return Ok(None); }; - - // Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of - // packet size for ssz container corresponding to `self.protocol`. - let ssz_limits = self.protocol.rpc_response_limits::(&self.fork_context); - if ssz_limits.is_out_of_bounds(length, self.max_packet_size) { - return Err(RPCError::InvalidData(format!( - "RPC response length is out of bounds, length {}, max {}, min {}", - length, ssz_limits.max, ssz_limits.min - ))); - } - // Calculate worst case compression length for given uncompressed length - let max_compressed_len = snap::raw::max_compress_len(length) as u64; - // Create a limit reader as a wrapper that reads only upto `max_compressed_len` from `src`. - let limit_reader = Cursor::new(src.as_ref()).take(max_compressed_len); - let mut reader = FrameDecoder::new(limit_reader); - - let mut decoded_buffer = vec![0; length]; - - match reader.read_exact(&mut decoded_buffer) { - Ok(()) => { - // `n` is how many bytes the reader read in the compressed stream - let n = reader.get_ref().get_ref().position(); - self.len = None; - let _read_bytes = src.split_to(n as usize); - // Safe to `take` from `self.fork_name` as we have all the bytes we need to - // decode an ssz object at this point. - let fork_name = self.fork_name.take(); - handle_rpc_response(self.protocol.versioned_protocol, &decoded_buffer, fork_name) - } - Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len), - } - } -} - -impl OutboundCodec> for SSZSnappyOutboundCodec { - type CodecErrorType = ErrorType; - - fn decode_error( - &mut self, - src: &mut BytesMut, - ) -> Result, RPCError> { - let Some(length) = handle_length(&mut self.inner, &mut self.len, src)? else { - return Ok(None); - }; - - // Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of - // packet size for ssz container corresponding to `ErrorType`. - if length > self.max_packet_size || length > *ERROR_TYPE_MAX || length < *ERROR_TYPE_MIN { - return Err(RPCError::InvalidData(format!( - "RPC Error length is out of bounds, length {}", - length - ))); - } - - // Calculate worst case compression length for given uncompressed length - let max_compressed_len = snap::raw::max_compress_len(length) as u64; - // Create a limit reader as a wrapper that reads only upto `max_compressed_len` from `src`. - let limit_reader = Cursor::new(src.as_ref()).take(max_compressed_len); - let mut reader = FrameDecoder::new(limit_reader); - let mut decoded_buffer = vec![0; length]; - match reader.read_exact(&mut decoded_buffer) { - Ok(()) => { - // `n` is how many bytes the reader read in the compressed stream - let n = reader.get_ref().get_ref().position(); - self.len = None; - let _read_bytes = src.split_to(n as usize); - Ok(Some(ErrorType(VariableList::from_ssz_bytes( - &decoded_buffer, - )?))) - } - Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len), + // if the inner decoder was capable of decoding a chunk, we need to reset the current + // response code for the next chunk + if let Ok(Some(_)) = inner_result { + self.current_response_code = None; } + // return the result + inner_result } } @@ -1030,7 +1075,7 @@ mod tests { let mut snappy_inbound_codec = SSZSnappyInboundCodec::::new(snappy_protocol_id, max_packet_size, fork_context); - snappy_inbound_codec.encode(message, &mut buf)?; + snappy_inbound_codec.encode_response(message, &mut buf)?; Ok(buf) } @@ -1075,7 +1120,7 @@ mod tests { let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new(snappy_protocol_id, max_packet_size, fork_context); // decode message just as snappy message - snappy_outbound_codec.decode(message) + snappy_outbound_codec.decode_response(message) } /// Encodes the provided protocol message as bytes and tries to decode the encoding bytes. @@ -1847,4 +1892,129 @@ mod tests { RPCError::InvalidData(_) )); } + + #[test] + fn test_decode_status_message() { + let message = hex::decode("0054ff060000734e615070590032000006e71e7b54989925efd6c9cbcb8ceb9b5f71216f5137282bf6a1e3b50f64e42d6c7fb347abe07eb0db8200000005029e2800").unwrap(); + let mut buf = BytesMut::new(); + buf.extend_from_slice(&message); + + let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy); + + let fork_context = Arc::new(fork_context(ForkName::Base)); + + let chain_spec = Spec::default_spec(); + + let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new( + snappy_protocol_id, + max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize), + fork_context, + ); + + // remove response code + let mut snappy_buf = buf.clone(); + let _ = snappy_buf.split_to(1); + + // decode message just as snappy message + let _snappy_decoded_message = snappy_outbound_codec + .decode_response(&mut snappy_buf) + .unwrap(); + + // decode message as ssz snappy chunk + let _snappy_decoded_chunk = snappy_outbound_codec.decode(&mut buf).unwrap(); + } + + #[test] + fn test_invalid_length_prefix() { + let mut uvi_codec: Uvi = Uvi::default(); + let mut dst = BytesMut::with_capacity(1024); + + // Smallest > 10 byte varint + let len: u128 = 2u128.pow(70); + + // Insert length-prefix + uvi_codec.encode(len, &mut dst).unwrap(); + + let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy); + + let fork_context = Arc::new(fork_context(ForkName::Base)); + + let chain_spec = Spec::default_spec(); + + let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new( + snappy_protocol_id, + max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize), + fork_context, + ); + + let snappy_decoded_message = snappy_outbound_codec.decode_response(&mut dst).unwrap_err(); + + assert_eq!( + snappy_decoded_message, + RPCError::IoError("input bytes exceed maximum".to_string()), + "length-prefix of > 10 bytes is invalid" + ); + } + + #[test] + fn test_length_limits() { + fn encode_len(len: usize) -> BytesMut { + let mut uvi_codec: Uvi = Uvi::default(); + let mut dst = BytesMut::with_capacity(1024); + uvi_codec.encode(len, &mut dst).unwrap(); + dst + } + + let protocol_id = ProtocolId::new(SupportedProtocol::BlocksByRangeV1, Encoding::SSZSnappy); + + // Response limits + let fork_context = Arc::new(fork_context(ForkName::Base)); + + let chain_spec = Spec::default_spec(); + + let max_rpc_size = max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize); + let limit = protocol_id.rpc_response_limits::(&fork_context); + let mut max = encode_len(limit.max + 1); + let mut codec = SSZSnappyOutboundCodec::::new( + protocol_id.clone(), + max_rpc_size, + fork_context.clone(), + ); + assert!(matches!( + codec.decode_response(&mut max).unwrap_err(), + RPCError::InvalidData(_) + )); + + let mut min = encode_len(limit.min - 1); + let mut codec = SSZSnappyOutboundCodec::::new( + protocol_id.clone(), + max_rpc_size, + fork_context.clone(), + ); + assert!(matches!( + codec.decode_response(&mut min).unwrap_err(), + RPCError::InvalidData(_) + )); + + // Request limits + let limit = protocol_id.rpc_request_limits(&fork_context.spec); + let mut max = encode_len(limit.max + 1); + let mut codec = SSZSnappyOutboundCodec::::new( + protocol_id.clone(), + max_rpc_size, + fork_context.clone(), + ); + assert!(matches!( + codec.decode_response(&mut max).unwrap_err(), + RPCError::InvalidData(_) + )); + + let mut min = encode_len(limit.min - 1); + let mut codec = + SSZSnappyOutboundCodec::::new(protocol_id, max_rpc_size, fork_context); + assert!(matches!( + codec.decode_response(&mut min).unwrap_err(), + RPCError::InvalidData(_) + )); + } } diff --git a/beacon_node/lighthouse_network/src/rpc/codec/base.rs b/beacon_node/lighthouse_network/src/rpc/codec/base.rs deleted file mode 100644 index 4b9e8d5097..0000000000 --- a/beacon_node/lighthouse_network/src/rpc/codec/base.rs +++ /dev/null @@ -1,334 +0,0 @@ -//! This handles the various supported encoding mechanism for the Eth 2.0 RPC. - -use crate::rpc::methods::ErrorType; -use crate::rpc::{InboundRequest, OutboundRequest, RPCCodedResponse, RPCResponse}; -use libp2p::bytes::BufMut; -use libp2p::bytes::BytesMut; -use std::marker::PhantomData; -use tokio_util::codec::{Decoder, Encoder}; -use types::EthSpec; - -pub trait OutboundCodec: Encoder + Decoder { - type CodecErrorType; - - fn decode_error( - &mut self, - src: &mut BytesMut, - ) -> Result, ::Error>; -} - -/* Global Inbound Codec */ -// This deals with Decoding RPC Requests from other peers and encoding our responses - -pub struct BaseInboundCodec -where - TCodec: Encoder> + Decoder, - E: EthSpec, -{ - /// Inner codec for handling various encodings - inner: TCodec, - phantom: PhantomData, -} - -impl BaseInboundCodec -where - TCodec: Encoder> + Decoder, - E: EthSpec, -{ - pub fn new(codec: TCodec) -> Self { - BaseInboundCodec { - inner: codec, - phantom: PhantomData, - } - } -} - -/* Global Outbound Codec */ -// This deals with Decoding RPC Responses from other peers and encoding our requests -pub struct BaseOutboundCodec -where - TOutboundCodec: OutboundCodec>, - E: EthSpec, -{ - /// Inner codec for handling various encodings. - inner: TOutboundCodec, - /// Keeps track of the current response code for a chunk. - current_response_code: Option, - phantom: PhantomData, -} - -impl BaseOutboundCodec -where - E: EthSpec, - TOutboundCodec: OutboundCodec>, -{ - pub fn new(codec: TOutboundCodec) -> Self { - BaseOutboundCodec { - inner: codec, - current_response_code: None, - phantom: PhantomData, - } - } -} - -/* Implementation of the Encoding/Decoding for the global codecs */ - -/* Base Inbound Codec */ - -// This Encodes RPC Responses sent to external peers -impl Encoder> for BaseInboundCodec -where - E: EthSpec, - TCodec: Decoder + Encoder>, -{ - type Error = >>::Error; - - fn encode(&mut self, item: RPCCodedResponse, dst: &mut BytesMut) -> Result<(), Self::Error> { - dst.clear(); - dst.reserve(1); - dst.put_u8( - item.as_u8() - .expect("Should never encode a stream termination"), - ); - self.inner.encode(item, dst) - } -} - -// This Decodes RPC Requests from external peers -impl Decoder for BaseInboundCodec -where - E: EthSpec, - TCodec: Encoder> + Decoder>, -{ - type Item = InboundRequest; - type Error = ::Error; - - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - self.inner.decode(src) - } -} - -/* Base Outbound Codec */ - -// This Encodes RPC Requests sent to external peers -impl Encoder> for BaseOutboundCodec -where - E: EthSpec, - TCodec: OutboundCodec> + Encoder>, -{ - type Error = >>::Error; - - fn encode(&mut self, item: OutboundRequest, dst: &mut BytesMut) -> Result<(), Self::Error> { - self.inner.encode(item, dst) - } -} - -// This decodes RPC Responses received from external peers -impl Decoder for BaseOutboundCodec -where - E: EthSpec, - TCodec: OutboundCodec, CodecErrorType = ErrorType> - + Decoder>, -{ - type Item = RPCCodedResponse; - type Error = ::Error; - - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - // if we have only received the response code, wait for more bytes - if src.len() <= 1 { - return Ok(None); - } - // using the response code determine which kind of payload needs to be decoded. - let response_code = self.current_response_code.unwrap_or_else(|| { - let resp_code = src.split_to(1)[0]; - self.current_response_code = Some(resp_code); - resp_code - }); - - let inner_result = { - if RPCCodedResponse::::is_response(response_code) { - // decode an actual response and mutates the buffer if enough bytes have been read - // returning the result. - self.inner - .decode(src) - .map(|r| r.map(RPCCodedResponse::Success)) - } else { - // decode an error - self.inner - .decode_error(src) - .map(|r| r.map(|resp| RPCCodedResponse::from_error(response_code, resp))) - } - }; - // if the inner decoder was capable of decoding a chunk, we need to reset the current - // response code for the next chunk - if let Ok(Some(_)) = inner_result { - self.current_response_code = None; - } - // return the result - inner_result - } -} - -#[cfg(test)] -mod tests { - use super::super::ssz_snappy::*; - use super::*; - use crate::rpc::protocol::*; - - use std::sync::Arc; - use types::{Epoch, FixedBytesExtended, ForkContext, ForkName, Hash256, Slot}; - use unsigned_varint::codec::Uvi; - - type Spec = types::MainnetEthSpec; - - fn fork_context(fork_name: ForkName) -> ForkContext { - let mut chain_spec = Spec::default_spec(); - let altair_fork_epoch = Epoch::new(1); - let bellatrix_fork_epoch = Epoch::new(2); - let capella_fork_epoch = Epoch::new(3); - let deneb_fork_epoch = Epoch::new(4); - let electra_fork_epoch = Epoch::new(5); - - chain_spec.altair_fork_epoch = Some(altair_fork_epoch); - chain_spec.bellatrix_fork_epoch = Some(bellatrix_fork_epoch); - chain_spec.capella_fork_epoch = Some(capella_fork_epoch); - chain_spec.deneb_fork_epoch = Some(deneb_fork_epoch); - chain_spec.electra_fork_epoch = Some(electra_fork_epoch); - - let current_slot = match fork_name { - ForkName::Base => Slot::new(0), - ForkName::Altair => altair_fork_epoch.start_slot(Spec::slots_per_epoch()), - ForkName::Bellatrix => bellatrix_fork_epoch.start_slot(Spec::slots_per_epoch()), - ForkName::Capella => capella_fork_epoch.start_slot(Spec::slots_per_epoch()), - ForkName::Deneb => deneb_fork_epoch.start_slot(Spec::slots_per_epoch()), - ForkName::Electra => electra_fork_epoch.start_slot(Spec::slots_per_epoch()), - }; - ForkContext::new::(current_slot, Hash256::zero(), &chain_spec) - } - - #[test] - fn test_decode_status_message() { - let message = hex::decode("0054ff060000734e615070590032000006e71e7b54989925efd6c9cbcb8ceb9b5f71216f5137282bf6a1e3b50f64e42d6c7fb347abe07eb0db8200000005029e2800").unwrap(); - let mut buf = BytesMut::new(); - buf.extend_from_slice(&message); - - let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy); - - let fork_context = Arc::new(fork_context(ForkName::Base)); - - let chain_spec = Spec::default_spec(); - - let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new( - snappy_protocol_id, - max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize), - fork_context, - ); - - // remove response code - let mut snappy_buf = buf.clone(); - let _ = snappy_buf.split_to(1); - - // decode message just as snappy message - let _snappy_decoded_message = snappy_outbound_codec.decode(&mut snappy_buf).unwrap(); - - // build codecs for entire chunk - let mut snappy_base_outbound_codec = BaseOutboundCodec::new(snappy_outbound_codec); - - // decode message as ssz snappy chunk - let _snappy_decoded_chunk = snappy_base_outbound_codec.decode(&mut buf).unwrap(); - } - - #[test] - fn test_invalid_length_prefix() { - let mut uvi_codec: Uvi = Uvi::default(); - let mut dst = BytesMut::with_capacity(1024); - - // Smallest > 10 byte varint - let len: u128 = 2u128.pow(70); - - // Insert length-prefix - uvi_codec.encode(len, &mut dst).unwrap(); - - let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy); - - let fork_context = Arc::new(fork_context(ForkName::Base)); - - let chain_spec = Spec::default_spec(); - - let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new( - snappy_protocol_id, - max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize), - fork_context, - ); - - let snappy_decoded_message = snappy_outbound_codec.decode(&mut dst).unwrap_err(); - - assert_eq!( - snappy_decoded_message, - RPCError::IoError("input bytes exceed maximum".to_string()), - "length-prefix of > 10 bytes is invalid" - ); - } - - #[test] - fn test_length_limits() { - fn encode_len(len: usize) -> BytesMut { - let mut uvi_codec: Uvi = Uvi::default(); - let mut dst = BytesMut::with_capacity(1024); - uvi_codec.encode(len, &mut dst).unwrap(); - dst - } - - let protocol_id = ProtocolId::new(SupportedProtocol::BlocksByRangeV1, Encoding::SSZSnappy); - - // Response limits - let fork_context = Arc::new(fork_context(ForkName::Base)); - - let chain_spec = Spec::default_spec(); - - let max_rpc_size = max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize); - let limit = protocol_id.rpc_response_limits::(&fork_context); - let mut max = encode_len(limit.max + 1); - let mut codec = SSZSnappyOutboundCodec::::new( - protocol_id.clone(), - max_rpc_size, - fork_context.clone(), - ); - assert!(matches!( - codec.decode(&mut max).unwrap_err(), - RPCError::InvalidData(_) - )); - - let mut min = encode_len(limit.min - 1); - let mut codec = SSZSnappyOutboundCodec::::new( - protocol_id.clone(), - max_rpc_size, - fork_context.clone(), - ); - assert!(matches!( - codec.decode(&mut min).unwrap_err(), - RPCError::InvalidData(_) - )); - - // Request limits - let limit = protocol_id.rpc_request_limits(&fork_context.spec); - let mut max = encode_len(limit.max + 1); - let mut codec = SSZSnappyOutboundCodec::::new( - protocol_id.clone(), - max_rpc_size, - fork_context.clone(), - ); - assert!(matches!( - codec.decode(&mut max).unwrap_err(), - RPCError::InvalidData(_) - )); - - let mut min = encode_len(limit.min - 1); - let mut codec = - SSZSnappyOutboundCodec::::new(protocol_id, max_rpc_size, fork_context); - assert!(matches!( - codec.decode(&mut min).unwrap_err(), - RPCError::InvalidData(_) - )); - } -} diff --git a/beacon_node/lighthouse_network/src/rpc/codec/mod.rs b/beacon_node/lighthouse_network/src/rpc/codec/mod.rs deleted file mode 100644 index dbe99af5bf..0000000000 --- a/beacon_node/lighthouse_network/src/rpc/codec/mod.rs +++ /dev/null @@ -1,61 +0,0 @@ -pub(crate) mod base; -pub(crate) mod ssz_snappy; - -use self::base::{BaseInboundCodec, BaseOutboundCodec}; -use self::ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec}; -use crate::rpc::protocol::RPCError; -use crate::rpc::{InboundRequest, OutboundRequest, RPCCodedResponse}; -use libp2p::bytes::BytesMut; -use tokio_util::codec::{Decoder, Encoder}; -use types::EthSpec; - -// Known types of codecs -pub enum InboundCodec { - SSZSnappy(BaseInboundCodec, E>), -} - -pub enum OutboundCodec { - SSZSnappy(BaseOutboundCodec, E>), -} - -impl Encoder> for InboundCodec { - type Error = RPCError; - - fn encode(&mut self, item: RPCCodedResponse, dst: &mut BytesMut) -> Result<(), Self::Error> { - match self { - InboundCodec::SSZSnappy(codec) => codec.encode(item, dst), - } - } -} - -impl Decoder for InboundCodec { - type Item = InboundRequest; - type Error = RPCError; - - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - match self { - InboundCodec::SSZSnappy(codec) => codec.decode(src), - } - } -} - -impl Encoder> for OutboundCodec { - type Error = RPCError; - - fn encode(&mut self, item: OutboundRequest, dst: &mut BytesMut) -> Result<(), Self::Error> { - match self { - OutboundCodec::SSZSnappy(codec) => codec.encode(item, dst), - } - } -} - -impl Decoder for OutboundCodec { - type Item = RPCCodedResponse; - type Error = RPCError; - - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - match self { - OutboundCodec::SSZSnappy(codec) => codec.decode(src), - } - } -} diff --git a/beacon_node/lighthouse_network/src/rpc/outbound.rs b/beacon_node/lighthouse_network/src/rpc/outbound.rs index c67c7865ea..2bfa42ccac 100644 --- a/beacon_node/lighthouse_network/src/rpc/outbound.rs +++ b/beacon_node/lighthouse_network/src/rpc/outbound.rs @@ -2,9 +2,7 @@ use super::methods::*; use super::protocol::ProtocolId; use super::protocol::SupportedProtocol; use super::RPCError; -use crate::rpc::codec::{ - base::BaseOutboundCodec, ssz_snappy::SSZSnappyOutboundCodec, OutboundCodec, -}; +use crate::rpc::codec::SSZSnappyOutboundCodec; use crate::rpc::protocol::Encoding; use futures::future::BoxFuture; use futures::prelude::{AsyncRead, AsyncWrite}; @@ -183,7 +181,7 @@ impl OutboundRequest { /* Outbound upgrades */ -pub type OutboundFramed = Framed, OutboundCodec>; +pub type OutboundFramed = Framed, SSZSnappyOutboundCodec>; impl OutboundUpgrade for OutboundRequestContainer where @@ -199,12 +197,7 @@ where let socket = socket.compat(); let codec = match protocol.encoding { Encoding::SSZSnappy => { - let ssz_snappy_codec = BaseOutboundCodec::new(SSZSnappyOutboundCodec::new( - protocol, - self.max_rpc_size, - self.fork_context.clone(), - )); - OutboundCodec::SSZSnappy(ssz_snappy_codec) + SSZSnappyOutboundCodec::new(protocol, self.max_rpc_size, self.fork_context.clone()) } }; diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index f4bdf6450b..09a18e5de6 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -1,5 +1,5 @@ use super::methods::*; -use crate::rpc::codec::{base::BaseInboundCodec, ssz_snappy::SSZSnappyInboundCodec, InboundCodec}; +use crate::rpc::codec::SSZSnappyInboundCodec; use futures::future::BoxFuture; use futures::prelude::{AsyncRead, AsyncWrite}; use futures::{FutureExt, StreamExt}; @@ -647,7 +647,7 @@ pub fn rpc_data_column_limits() -> RpcLimits { pub type InboundOutput = (InboundRequest, InboundFramed); pub type InboundFramed = - Framed>>>, InboundCodec>; + Framed>>>, SSZSnappyInboundCodec>; impl InboundUpgrade for RPCProtocol where @@ -664,15 +664,13 @@ where // convert the socket to tokio compatible socket let socket = socket.compat(); let codec = match protocol.encoding { - Encoding::SSZSnappy => { - let ssz_snappy_codec = BaseInboundCodec::new(SSZSnappyInboundCodec::new( - protocol, - self.max_rpc_size, - self.fork_context.clone(), - )); - InboundCodec::SSZSnappy(ssz_snappy_codec) - } + Encoding::SSZSnappy => SSZSnappyInboundCodec::new( + protocol, + self.max_rpc_size, + self.fork_context.clone(), + ), }; + let mut timed_socket = TimeoutStream::new(socket); timed_socket.set_read_timeout(Some(self.ttfb_timeout));