diff --git a/beacon_node/eth2_libp2p/src/rpc/codec/base.rs b/beacon_node/eth2_libp2p/src/rpc/codec/base.rs index d7a64b58d1..1613b59d5c 100644 --- a/beacon_node/eth2_libp2p/src/rpc/codec/base.rs +++ b/beacon_node/eth2_libp2p/src/rpc/codec/base.rs @@ -177,7 +177,15 @@ where mod tests { use super::super::ssz_snappy::*; use super::*; + use crate::rpc::methods::StatusMessage; use crate::rpc::protocol::*; + use snap::write::FrameEncoder; + use ssz::Encode; + use std::io::Write; + use types::{Epoch, Hash256, Slot}; + use unsigned_varint::codec::Uvi; + + type Spec = types::MainnetEthSpec; #[test] fn test_decode_status_message() { @@ -185,8 +193,6 @@ mod tests { let mut buf = BytesMut::new(); buf.extend_from_slice(&message); - type Spec = types::MainnetEthSpec; - let snappy_protocol_id = ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy); @@ -207,4 +213,57 @@ mod tests { let _ = dbg!(snappy_decoded_message); let _ = dbg!(snappy_decoded_chunk); } + + #[test] + fn test_decode_malicious_status_message() { + // Snappy stream identifier + let stream_identifier: &'static [u8] = b"\xFF\x06\x00\x00sNaPpY"; + + // byte 0(0xFE) is padding chunk type identifier for snappy messages + // byte 1,2,3 are chunk length (little endian) + let malicious_padding: &'static [u8] = b"\xFE\x00\x00\x00"; + + // Status message is 84 bytes uncompressed. `max_compressed_len` is 130. + let status_message_bytes = StatusMessage { + fork_digest: [0; 4], + finalized_root: Hash256::from_low_u64_be(0), + finalized_epoch: Epoch::new(1), + head_root: Hash256::from_low_u64_be(0), + head_slot: Slot::new(1), + } + .as_ssz_bytes(); + + let mut uvi_codec: Uvi = Uvi::default(); + let mut dst = BytesMut::with_capacity(1024); + + // Insert length-prefix + uvi_codec + .encode(status_message_bytes.len(), &mut dst) + .unwrap(); + + // Insert snappy stream identifier + dst.extend_from_slice(stream_identifier); + + // Insert malicious padding of 80 bytes. + for _ in 0..20 { + dst.extend_from_slice(malicious_padding); + } + + // Insert payload (42 bytes compressed) + let mut writer = FrameEncoder::new(Vec::new()); + writer.write_all(&status_message_bytes).unwrap(); + writer.flush().unwrap(); + dst.extend_from_slice(writer.get_ref()); + + // 42 + 80 = 132 > max_compressed_len. Hence, decoding should fail with `InvalidData`. + + let snappy_protocol_id = + ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy); + + let mut snappy_outbound_codec = + SSZSnappyOutboundCodec::::new(snappy_protocol_id, 1_048_576); + + let snappy_decoded_message = snappy_outbound_codec.decode(&mut dst.clone()).unwrap_err(); + assert_eq!(snappy_decoded_message, RPCError::InvalidData); + } } diff --git a/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs b/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs index 8b35a4c685..8fe12adca6 100644 --- a/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs @@ -1,10 +1,7 @@ use crate::rpc::methods::*; use crate::rpc::{ codec::base::OutboundCodec, - protocol::{ - Encoding, Protocol, ProtocolId, RPCError, Version, BLOCKS_BY_ROOT_REQUEST_MAX, - BLOCKS_BY_ROOT_REQUEST_MIN, SIGNED_BEACON_BLOCK_MAX, SIGNED_BEACON_BLOCK_MIN, - }, + protocol::{Encoding, Protocol, ProtocolId, RPCError, Version, ERROR_TYPE_MAX, ERROR_TYPE_MIN}, }; use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse}; use libp2p::bytes::BytesMut; @@ -110,79 +107,56 @@ impl Decoder for SSZSnappyInboundCodec { let length = self.len.expect("length should be Some"); - // Should not attempt to decode rpc chunks with length > max_packet_size - if length > self.max_packet_size { + // Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of + // packet size for ssz container corresponding to `self.protocol`. + let ssz_limits = self.protocol.rpc_request_limits(); + if length > self.max_packet_size || ssz_limits.is_out_of_bounds(length) { return Err(RPCError::InvalidData); } - let mut reader = FrameDecoder::new(Cursor::new(&src)); + // Calculate worst case compression length for given uncompressed length + let max_compressed_len = snap::raw::max_compress_len(length) as u64; + + // Create a limit reader as a wrapper that reads only upto `max_compressed_len` from `src`. + let limit_reader = Cursor::new(src.as_ref()).take(max_compressed_len); + let mut reader = FrameDecoder::new(limit_reader); let mut decoded_buffer = vec![0; length]; - match read_exact(&mut reader, &mut decoded_buffer, length) { + match reader.read_exact(&mut decoded_buffer) { Ok(()) => { // `n` is how many bytes the reader read in the compressed stream - let n = reader.get_ref().position(); + let n = reader.get_ref().get_ref().position(); self.len = None; let _read_bytes = src.split_to(n as usize); + + // We need not check that decoded_buffer.len() is within bounds here + // since we have already checked `length` above. match self.protocol.message_name { Protocol::Status => match self.protocol.version { - Version::V1 => { - if decoded_buffer.len() == ::ssz_fixed_len() { - Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes( - &decoded_buffer, - )?))) - } else { - Err(RPCError::InvalidData) - } - } + Version::V1 => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes( + &decoded_buffer, + )?))), }, Protocol::Goodbye => match self.protocol.version { - Version::V1 => { - if decoded_buffer.len() == ::ssz_fixed_len() { - Ok(Some(RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes( - &decoded_buffer, - )?))) - } else { - Err(RPCError::InvalidData) - } - } + Version::V1 => Ok(Some(RPCRequest::Goodbye( + GoodbyeReason::from_ssz_bytes(&decoded_buffer)?, + ))), }, Protocol::BlocksByRange => match self.protocol.version { - Version::V1 => { - if decoded_buffer.len() - == ::ssz_fixed_len() - { - Ok(Some(RPCRequest::BlocksByRange( - BlocksByRangeRequest::from_ssz_bytes(&decoded_buffer)?, - ))) - } else { - Err(RPCError::InvalidData) - } - } + Version::V1 => Ok(Some(RPCRequest::BlocksByRange( + BlocksByRangeRequest::from_ssz_bytes(&decoded_buffer)?, + ))), }, Protocol::BlocksByRoot => match self.protocol.version { - Version::V1 => { - if decoded_buffer.len() >= *BLOCKS_BY_ROOT_REQUEST_MIN - && decoded_buffer.len() <= *BLOCKS_BY_ROOT_REQUEST_MAX - { - Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest { - block_roots: VariableList::from_ssz_bytes(&decoded_buffer)?, - }))) - } else { - Err(RPCError::InvalidData) - } - } + Version::V1 => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest { + block_roots: VariableList::from_ssz_bytes(&decoded_buffer)?, + }))), }, Protocol::Ping => match self.protocol.version { - Version::V1 => { - if decoded_buffer.len() == ::ssz_fixed_len() { - Ok(Some(RPCRequest::Ping(Ping { - data: u64::from_ssz_bytes(&decoded_buffer)?, - }))) - } else { - Err(RPCError::InvalidData) - } - } + Version::V1 => Ok(Some(RPCRequest::Ping(Ping { + data: u64::from_ssz_bytes(&decoded_buffer)?, + }))), }, + // This case should be unreachable as `MetaData` requests are handled separately in the `InboundUpgrade` Protocol::MetaData => match self.protocol.version { Version::V1 => { if !decoded_buffer.is_empty() { @@ -194,11 +168,7 @@ impl Decoder for SSZSnappyInboundCodec { }, } } - Err(e) => match e.kind() { - // Haven't received enough bytes to decode yet, wait for more - ErrorKind::UnexpectedEof => Ok(None), - _ => Err(e).map_err(RPCError::from), - }, + Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len), } } } @@ -288,87 +258,60 @@ impl Decoder for SSZSnappyOutboundCodec { let length = self.len.expect("length should be Some"); - // Should not attempt to decode rpc chunks with length > max_packet_size - if length > self.max_packet_size { + // Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of + // packet size for ssz container corresponding to `self.protocol`. + let ssz_limits = self.protocol.rpc_response_limits::(); + if length > self.max_packet_size || ssz_limits.is_out_of_bounds(length) { return Err(RPCError::InvalidData); } - let mut reader = FrameDecoder::new(Cursor::new(&src)); + // Calculate worst case compression length for given uncompressed length + let max_compressed_len = snap::raw::max_compress_len(length) as u64; + // Create a limit reader as a wrapper that reads only upto `max_compressed_len` from `src`. + let limit_reader = Cursor::new(src.as_ref()).take(max_compressed_len); + let mut reader = FrameDecoder::new(limit_reader); + let mut decoded_buffer = vec![0; length]; - match read_exact(&mut reader, &mut decoded_buffer, length) { + + match reader.read_exact(&mut decoded_buffer) { Ok(()) => { // `n` is how many bytes the reader read in the compressed stream - let n = reader.get_ref().position(); + let n = reader.get_ref().get_ref().position(); self.len = None; - let _read_byts = src.split_to(n as usize); + let _read_bytes = src.split_to(n as usize); + + // We need not check that decoded_buffer.len() is within bounds here + // since we have already checked `length` above. match self.protocol.message_name { Protocol::Status => match self.protocol.version { - Version::V1 => { - if decoded_buffer.len() == ::ssz_fixed_len() { - Ok(Some(RPCResponse::Status(StatusMessage::from_ssz_bytes( - &decoded_buffer, - )?))) - } else { - Err(RPCError::InvalidData) - } - } + Version::V1 => Ok(Some(RPCResponse::Status( + StatusMessage::from_ssz_bytes(&decoded_buffer)?, + ))), }, + // This case should be unreachable as `Goodbye` has no response. Protocol::Goodbye => Err(RPCError::InvalidData), Protocol::BlocksByRange => match self.protocol.version { - Version::V1 => { - if decoded_buffer.len() >= *SIGNED_BEACON_BLOCK_MIN - && decoded_buffer.len() <= *SIGNED_BEACON_BLOCK_MAX - { - Ok(Some(RPCResponse::BlocksByRange(Box::new( - SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?, - )))) - } else { - Err(RPCError::InvalidData) - } - } + Version::V1 => Ok(Some(RPCResponse::BlocksByRange(Box::new( + SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?, + )))), }, Protocol::BlocksByRoot => match self.protocol.version { - Version::V1 => { - if decoded_buffer.len() >= *SIGNED_BEACON_BLOCK_MIN - && decoded_buffer.len() <= *SIGNED_BEACON_BLOCK_MAX - { - Ok(Some(RPCResponse::BlocksByRoot(Box::new( - SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?, - )))) - } else { - Err(RPCError::InvalidData) - } - } + Version::V1 => Ok(Some(RPCResponse::BlocksByRoot(Box::new( + SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?, + )))), }, Protocol::Ping => match self.protocol.version { - Version::V1 => { - if decoded_buffer.len() == ::ssz_fixed_len() { - Ok(Some(RPCResponse::Pong(Ping { - data: u64::from_ssz_bytes(&decoded_buffer)?, - }))) - } else { - Err(RPCError::InvalidData) - } - } + Version::V1 => Ok(Some(RPCResponse::Pong(Ping { + data: u64::from_ssz_bytes(&decoded_buffer)?, + }))), }, Protocol::MetaData => match self.protocol.version { - Version::V1 => { - if decoded_buffer.len() == as Encode>::ssz_fixed_len() - { - Ok(Some(RPCResponse::MetaData(MetaData::from_ssz_bytes( - &decoded_buffer, - )?))) - } else { - Err(RPCError::InvalidData) - } - } + Version::V1 => Ok(Some(RPCResponse::MetaData(MetaData::from_ssz_bytes( + &decoded_buffer, + )?))), }, } } - Err(e) => match e.kind() { - // Haven't received enough bytes to decode yet, wait for more - ErrorKind::UnexpectedEof => Ok(None), - _ => Err(e).map_err(RPCError::from), - }, + Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len), } } } @@ -392,84 +335,52 @@ impl OutboundCodec> for SSZSnappyOutboundCodec let length = self.len.expect("length should be Some"); - // Should not attempt to decode rpc chunks with length > max_packet_size - if length > self.max_packet_size { + // Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of + // packet size for ssz container corresponding to `ErrorType`. + if length > self.max_packet_size || length > *ERROR_TYPE_MAX || length < *ERROR_TYPE_MIN { return Err(RPCError::InvalidData); } - let mut reader = FrameDecoder::new(Cursor::new(&src)); + + // Calculate worst case compression length for given uncompressed length + let max_compressed_len = snap::raw::max_compress_len(length) as u64; + // // Create a limit reader as a wrapper that reads only upto `max_compressed_len` from `src`. + let limit_reader = Cursor::new(src.as_ref()).take(max_compressed_len); + let mut reader = FrameDecoder::new(limit_reader); let mut decoded_buffer = vec![0; length]; - match read_exact(&mut reader, &mut decoded_buffer, length) { + match reader.read_exact(&mut decoded_buffer) { Ok(()) => { // `n` is how many bytes the reader read in the compressed stream - let n = reader.get_ref().position(); + let n = reader.get_ref().get_ref().position(); self.len = None; let _read_bytes = src.split_to(n as usize); Ok(Some(ErrorType(VariableList::from_ssz_bytes( &decoded_buffer, )?))) } - Err(e) => match e.kind() { + Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len), + } + } +} + +/// Handle errors that we get from decoding an RPC message from the stream. +/// `num_bytes_read` is the number of bytes the snappy decoder has read from the underlying stream. +/// `max_compressed_len` is the maximum compressed size for a given uncompressed size. +fn handle_error( + err: std::io::Error, + num_bytes: u64, + max_compressed_len: u64, +) -> Result, RPCError> { + match err.kind() { + ErrorKind::UnexpectedEof => { + // If snappy has read `max_compressed_len` from underlying stream and still can't fill buffer, we have a malicious message. + // Report as `InvalidData` so that malicious peer gets banned. + if num_bytes >= max_compressed_len { + Err(RPCError::InvalidData) + } else { // Haven't received enough bytes to decode yet, wait for more - ErrorKind::UnexpectedEof => Ok(None), - _ => Err(e).map_err(RPCError::from), - }, - } - } -} - -/// Wrapper over `read` implementation of `FrameDecoder`. -/// -/// Works like the standard `read_exact` implementation, except that it returns an error if length of -// compressed bytes read from the underlying reader is greater than worst case compression length for snappy. -fn read_exact>( - reader: &mut FrameDecoder>, - mut buf: &mut [u8], - uncompressed_length: usize, -) -> Result<(), std::io::Error> { - // Calculate worst case compression length for given uncompressed length - let max_compressed_len = snap::raw::max_compress_len(uncompressed_length) as u64; - - // Initialize the position of the reader - let mut pos = reader.get_ref().position(); - let mut count = 0; - while !buf.is_empty() { - match reader.read(buf) { - Ok(0) => break, - Ok(n) => { - let tmp = buf; - buf = &mut tmp[n..]; + Ok(None) } - Err(ref e) if e.kind() == ErrorKind::Interrupted => {} - Err(e) => return Err(e), } - // Get current position of reader - let curr_pos = reader.get_ref().position(); - // Note: reader should always advance forward. However, this behaviour - // depends on the implementation of `snap::FrameDecoder`, so it is better - // to check to avoid underflow panic. - if curr_pos > pos { - count += reader.get_ref().position() - pos; - pos = curr_pos; - } else { - return Err(std::io::Error::new( - ErrorKind::InvalidData, - "snappy: reader is not advanced forward while reading", - )); - } - - if count > max_compressed_len { - return Err(std::io::Error::new( - ErrorKind::InvalidData, - "snappy: compressed data is > max_compressed_len", - )); - } - } - if !buf.is_empty() { - Err(std::io::Error::new( - ErrorKind::UnexpectedEof, - "failed to fill whole buffer", - )) - } else { - Ok(()) + _ => Err(err).map_err(RPCError::from), } } diff --git a/beacon_node/eth2_libp2p/src/rpc/methods.rs b/beacon_node/eth2_libp2p/src/rpc/methods.rs index 30c9b13fb5..f3a239c8da 100644 --- a/beacon_node/eth2_libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2_libp2p/src/rpc/methods.rs @@ -16,7 +16,8 @@ pub type MaxRequestBlocks = U1024; pub const MAX_REQUEST_BLOCKS: u64 = 1024; /// Maximum length of error message. -type MaxErrorLen = U256; +pub type MaxErrorLen = U256; +pub const MAX_ERROR_LEN: u64 = 256; /// Wrapper over SSZ List to represent error message in rpc responses. #[derive(Debug, Clone)] @@ -256,7 +257,7 @@ pub enum RPCCodedResponse { } /// The code assigned to an erroneous `RPCResponse`. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq)] pub enum RPCResponseErrorCode { RateLimited, InvalidRequest, diff --git a/beacon_node/eth2_libp2p/src/rpc/protocol.rs b/beacon_node/eth2_libp2p/src/rpc/protocol.rs index fbd5326a92..9adfa241a2 100644 --- a/beacon_node/eth2_libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2_libp2p/src/rpc/protocol.rs @@ -5,7 +5,7 @@ use crate::rpc::{ ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec}, InboundCodec, OutboundCodec, }, - methods::ResponseTermination, + methods::{MaxErrorLen, ResponseTermination, MAX_ERROR_LEN}, MaxRequestBlocks, MAX_REQUEST_BLOCKS, }; use futures::future::BoxFuture; @@ -51,6 +51,19 @@ lazy_static! { ]) .as_ssz_bytes() .len(); + pub static ref ERROR_TYPE_MIN: usize = + VariableList::::from(Vec::::new()) + .as_ssz_bytes() + .len(); + pub static ref ERROR_TYPE_MAX: usize = + VariableList::::from(vec![ + 0u8; + MAX_ERROR_LEN + as usize + ]) + .as_ssz_bytes() + .len(); + } /// The maximum bytes that can be sent across the RPC. @@ -147,6 +160,24 @@ impl UpgradeInfo for RPCProtocol { } } +/// Represents the ssz length bounds for RPC messages. +#[derive(Debug, PartialEq)] +pub struct RpcLimits { + min: usize, + max: usize, +} + +impl RpcLimits { + pub fn new(min: usize, max: usize) -> Self { + Self { min, max } + } + + /// Returns true if the given length is out of bounds, false otherwise. + pub fn is_out_of_bounds(&self, length: usize) -> bool { + length > self.max || length < self.min + } +} + /// Tracks the types in a protocol id. #[derive(Clone, Debug)] pub struct ProtocolId { @@ -163,6 +194,59 @@ pub struct ProtocolId { protocol_id: String, } +impl ProtocolId { + /// Returns min and max size for messages of given protocol id requests. + pub fn rpc_request_limits(&self) -> RpcLimits { + match self.message_name { + Protocol::Status => RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), + ), + Protocol::Goodbye => RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), + ), + Protocol::BlocksByRange => RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), + ), + Protocol::BlocksByRoot => { + RpcLimits::new(*BLOCKS_BY_ROOT_REQUEST_MIN, *BLOCKS_BY_ROOT_REQUEST_MAX) + } + Protocol::Ping => RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), + ), + Protocol::MetaData => RpcLimits::new(0, 0), // Metadata requests are empty + } + } + + /// Returns min and max size for messages of given protocol id responses. + pub fn rpc_response_limits(&self) -> RpcLimits { + match self.message_name { + Protocol::Status => RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), + ), + Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response + Protocol::BlocksByRange => { + RpcLimits::new(*SIGNED_BEACON_BLOCK_MIN, *SIGNED_BEACON_BLOCK_MAX) + } + Protocol::BlocksByRoot => { + RpcLimits::new(*SIGNED_BEACON_BLOCK_MIN, *SIGNED_BEACON_BLOCK_MAX) + } + Protocol::Ping => RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), + ), + Protocol::MetaData => RpcLimits::new( + as Encode>::ssz_fixed_len(), + as Encode>::ssz_fixed_len(), + ), + } + } +} + /// An RPC protocol ID. impl ProtocolId { pub fn new(message_name: Protocol, version: Version, encoding: Encoding) -> Self { @@ -233,7 +317,8 @@ where { Err(e) => Err(RPCError::from(e)), Ok((Some(Ok(request)), stream)) => Ok((request, stream)), - Ok((Some(Err(_)), _)) | Ok((None, _)) => Err(RPCError::IncompleteStream), + Ok((Some(Err(e)), _)) => Err(e), + Ok((None, _)) => Err(RPCError::IncompleteStream), } } } @@ -385,7 +470,7 @@ where } /// Error in RPC Encoding/Decoding. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum RPCError { /// Error when decoding the raw buffer from ssz. // NOTE: in the future a ssz::DecodeError should map to an InvalidData error