mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-08 01:05:47 +00:00
simplify rpc codec logic (#6304)
* simplify rpc codec logic * Merge branch 'unstable' of github.com:sigp/lighthouse into simplify-rpc-codec * Merge branch 'unstable' of github.com:sigp/lighthouse into simplify-rpc-codec * Merge branch 'unstable' of github.com:sigp/lighthouse into simply-rpc-codec * Merge branch 'unstable' into simplify-rpc-codec * Merge branch 'unstable' into simplify-rpc-codec
This commit is contained in:
@@ -1,9 +1,9 @@
|
|||||||
use crate::rpc::methods::*;
|
use crate::rpc::methods::*;
|
||||||
use crate::rpc::{
|
use crate::rpc::protocol::{
|
||||||
codec::base::OutboundCodec,
|
Encoding, ProtocolId, RPCError, SupportedProtocol, ERROR_TYPE_MAX, ERROR_TYPE_MIN,
|
||||||
protocol::{Encoding, ProtocolId, RPCError, SupportedProtocol, ERROR_TYPE_MAX, ERROR_TYPE_MIN},
|
|
||||||
};
|
};
|
||||||
use crate::rpc::{InboundRequest, OutboundRequest};
|
use crate::rpc::{InboundRequest, OutboundRequest};
|
||||||
|
use libp2p::bytes::BufMut;
|
||||||
use libp2p::bytes::BytesMut;
|
use libp2p::bytes::BytesMut;
|
||||||
use snap::read::FrameDecoder;
|
use snap::read::FrameDecoder;
|
||||||
use snap::write::FrameEncoder;
|
use snap::write::FrameEncoder;
|
||||||
@@ -57,13 +57,13 @@ impl<E: EthSpec> SSZSnappyInboundCodec<E> {
|
|||||||
max_packet_size,
|
max_packet_size,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Encoder for inbound streams: Encodes RPC Responses sent to peers.
|
/// Encodes RPC Responses sent to peers.
|
||||||
impl<E: EthSpec> Encoder<RPCCodedResponse<E>> for SSZSnappyInboundCodec<E> {
|
fn encode_response(
|
||||||
type Error = RPCError;
|
&mut self,
|
||||||
|
item: RPCCodedResponse<E>,
|
||||||
fn encode(&mut self, item: RPCCodedResponse<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
dst: &mut BytesMut,
|
||||||
|
) -> Result<(), RPCError> {
|
||||||
let bytes = match &item {
|
let bytes = match &item {
|
||||||
RPCCodedResponse::Success(resp) => match &resp {
|
RPCCodedResponse::Success(resp) => match &resp {
|
||||||
RPCResponse::Status(res) => res.as_ssz_bytes(),
|
RPCResponse::Status(res) => res.as_ssz_bytes(),
|
||||||
@@ -125,6 +125,21 @@ impl<E: EthSpec> Encoder<RPCCodedResponse<E>> for SSZSnappyInboundCodec<E> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Encoder for inbound streams: Encodes RPC Responses sent to peers.
|
||||||
|
impl<E: EthSpec> Encoder<RPCCodedResponse<E>> for SSZSnappyInboundCodec<E> {
|
||||||
|
type Error = RPCError;
|
||||||
|
|
||||||
|
fn encode(&mut self, item: RPCCodedResponse<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
||||||
|
dst.clear();
|
||||||
|
dst.reserve(1);
|
||||||
|
dst.put_u8(
|
||||||
|
item.as_u8()
|
||||||
|
.expect("Should never encode a stream termination"),
|
||||||
|
);
|
||||||
|
self.encode_response(item, dst)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Decoder for inbound streams: Decodes RPC requests from peers
|
// Decoder for inbound streams: Decodes RPC requests from peers
|
||||||
impl<E: EthSpec> Decoder for SSZSnappyInboundCodec<E> {
|
impl<E: EthSpec> Decoder for SSZSnappyInboundCodec<E> {
|
||||||
type Item = InboundRequest<E>;
|
type Item = InboundRequest<E>;
|
||||||
@@ -188,6 +203,8 @@ pub struct SSZSnappyOutboundCodec<E: EthSpec> {
|
|||||||
/// The fork name corresponding to the received context bytes.
|
/// The fork name corresponding to the received context bytes.
|
||||||
fork_name: Option<ForkName>,
|
fork_name: Option<ForkName>,
|
||||||
fork_context: Arc<ForkContext>,
|
fork_context: Arc<ForkContext>,
|
||||||
|
/// Keeps track of the current response code for a chunk.
|
||||||
|
current_response_code: Option<u8>,
|
||||||
phantom: PhantomData<E>,
|
phantom: PhantomData<E>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -209,6 +226,93 @@ impl<E: EthSpec> SSZSnappyOutboundCodec<E> {
|
|||||||
fork_name: None,
|
fork_name: None,
|
||||||
fork_context,
|
fork_context,
|
||||||
phantom: PhantomData,
|
phantom: PhantomData,
|
||||||
|
current_response_code: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decode an Rpc response.
|
||||||
|
fn decode_response(&mut self, src: &mut BytesMut) -> Result<Option<RPCResponse<E>>, RPCError> {
|
||||||
|
// Read the context bytes if required
|
||||||
|
if self.protocol.has_context_bytes() && self.fork_name.is_none() {
|
||||||
|
if src.len() >= CONTEXT_BYTES_LEN {
|
||||||
|
let context_bytes = src.split_to(CONTEXT_BYTES_LEN);
|
||||||
|
let mut result = [0; CONTEXT_BYTES_LEN];
|
||||||
|
result.copy_from_slice(context_bytes.as_ref());
|
||||||
|
self.fork_name = Some(context_bytes_to_fork_name(
|
||||||
|
result,
|
||||||
|
self.fork_context.clone(),
|
||||||
|
)?);
|
||||||
|
} else {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let Some(length) = handle_length(&mut self.inner, &mut self.len, src)? else {
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
|
|
||||||
|
// Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of
|
||||||
|
// packet size for ssz container corresponding to `self.protocol`.
|
||||||
|
let ssz_limits = self.protocol.rpc_response_limits::<E>(&self.fork_context);
|
||||||
|
if ssz_limits.is_out_of_bounds(length, self.max_packet_size) {
|
||||||
|
return Err(RPCError::InvalidData(format!(
|
||||||
|
"RPC response length is out of bounds, length {}, max {}, min {}",
|
||||||
|
length, ssz_limits.max, ssz_limits.min
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
// Calculate worst case compression length for given uncompressed length
|
||||||
|
let max_compressed_len = snap::raw::max_compress_len(length) as u64;
|
||||||
|
// Create a limit reader as a wrapper that reads only upto `max_compressed_len` from `src`.
|
||||||
|
let limit_reader = Cursor::new(src.as_ref()).take(max_compressed_len);
|
||||||
|
let mut reader = FrameDecoder::new(limit_reader);
|
||||||
|
|
||||||
|
let mut decoded_buffer = vec![0; length];
|
||||||
|
|
||||||
|
match reader.read_exact(&mut decoded_buffer) {
|
||||||
|
Ok(()) => {
|
||||||
|
// `n` is how many bytes the reader read in the compressed stream
|
||||||
|
let n = reader.get_ref().get_ref().position();
|
||||||
|
self.len = None;
|
||||||
|
let _read_bytes = src.split_to(n as usize);
|
||||||
|
// Safe to `take` from `self.fork_name` as we have all the bytes we need to
|
||||||
|
// decode an ssz object at this point.
|
||||||
|
let fork_name = self.fork_name.take();
|
||||||
|
handle_rpc_response(self.protocol.versioned_protocol, &decoded_buffer, fork_name)
|
||||||
|
}
|
||||||
|
Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn decode_error(&mut self, src: &mut BytesMut) -> Result<Option<ErrorType>, RPCError> {
|
||||||
|
let Some(length) = handle_length(&mut self.inner, &mut self.len, src)? else {
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
|
|
||||||
|
// Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of
|
||||||
|
// packet size for ssz container corresponding to `ErrorType`.
|
||||||
|
if length > self.max_packet_size || length > *ERROR_TYPE_MAX || length < *ERROR_TYPE_MIN {
|
||||||
|
return Err(RPCError::InvalidData(format!(
|
||||||
|
"RPC Error length is out of bounds, length {}",
|
||||||
|
length
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate worst case compression length for given uncompressed length
|
||||||
|
let max_compressed_len = snap::raw::max_compress_len(length) as u64;
|
||||||
|
// Create a limit reader as a wrapper that reads only upto `max_compressed_len` from `src`.
|
||||||
|
let limit_reader = Cursor::new(src.as_ref()).take(max_compressed_len);
|
||||||
|
let mut reader = FrameDecoder::new(limit_reader);
|
||||||
|
let mut decoded_buffer = vec![0; length];
|
||||||
|
match reader.read_exact(&mut decoded_buffer) {
|
||||||
|
Ok(()) => {
|
||||||
|
// `n` is how many bytes the reader read in the compressed stream
|
||||||
|
let n = reader.get_ref().get_ref().position();
|
||||||
|
self.len = None;
|
||||||
|
let _read_bytes = src.split_to(n as usize);
|
||||||
|
Ok(Some(ErrorType(VariableList::from_ssz_bytes(
|
||||||
|
&decoded_buffer,
|
||||||
|
)?)))
|
||||||
|
}
|
||||||
|
Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -265,99 +369,40 @@ impl<E: EthSpec> Encoder<OutboundRequest<E>> for SSZSnappyOutboundCodec<E> {
|
|||||||
// We prefer to decode blocks and attestations with extra knowledge about the chain to perform
|
// We prefer to decode blocks and attestations with extra knowledge about the chain to perform
|
||||||
// faster verification checks before decoding entire blocks/attestations.
|
// faster verification checks before decoding entire blocks/attestations.
|
||||||
impl<E: EthSpec> Decoder for SSZSnappyOutboundCodec<E> {
|
impl<E: EthSpec> Decoder for SSZSnappyOutboundCodec<E> {
|
||||||
type Item = RPCResponse<E>;
|
type Item = RPCCodedResponse<E>;
|
||||||
type Error = RPCError;
|
type Error = RPCError;
|
||||||
|
|
||||||
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||||
// Read the context bytes if required
|
// if we have only received the response code, wait for more bytes
|
||||||
if self.protocol.has_context_bytes() && self.fork_name.is_none() {
|
if src.len() <= 1 {
|
||||||
if src.len() >= CONTEXT_BYTES_LEN {
|
return Ok(None);
|
||||||
let context_bytes = src.split_to(CONTEXT_BYTES_LEN);
|
}
|
||||||
let mut result = [0; CONTEXT_BYTES_LEN];
|
// using the response code determine which kind of payload needs to be decoded.
|
||||||
result.copy_from_slice(context_bytes.as_ref());
|
let response_code = self.current_response_code.unwrap_or_else(|| {
|
||||||
self.fork_name = Some(context_bytes_to_fork_name(
|
let resp_code = src.split_to(1)[0];
|
||||||
result,
|
self.current_response_code = Some(resp_code);
|
||||||
self.fork_context.clone(),
|
resp_code
|
||||||
)?);
|
});
|
||||||
|
|
||||||
|
let inner_result = {
|
||||||
|
if RPCCodedResponse::<E>::is_response(response_code) {
|
||||||
|
// decode an actual response and mutates the buffer if enough bytes have been read
|
||||||
|
// returning the result.
|
||||||
|
self.decode_response(src)
|
||||||
|
.map(|r| r.map(RPCCodedResponse::Success))
|
||||||
} else {
|
} else {
|
||||||
return Ok(None);
|
// decode an error
|
||||||
|
self.decode_error(src)
|
||||||
|
.map(|r| r.map(|resp| RPCCodedResponse::from_error(response_code, resp)))
|
||||||
}
|
}
|
||||||
}
|
|
||||||
let Some(length) = handle_length(&mut self.inner, &mut self.len, src)? else {
|
|
||||||
return Ok(None);
|
|
||||||
};
|
};
|
||||||
|
// if the inner decoder was capable of decoding a chunk, we need to reset the current
|
||||||
// Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of
|
// response code for the next chunk
|
||||||
// packet size for ssz container corresponding to `self.protocol`.
|
if let Ok(Some(_)) = inner_result {
|
||||||
let ssz_limits = self.protocol.rpc_response_limits::<E>(&self.fork_context);
|
self.current_response_code = None;
|
||||||
if ssz_limits.is_out_of_bounds(length, self.max_packet_size) {
|
|
||||||
return Err(RPCError::InvalidData(format!(
|
|
||||||
"RPC response length is out of bounds, length {}, max {}, min {}",
|
|
||||||
length, ssz_limits.max, ssz_limits.min
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
// Calculate worst case compression length for given uncompressed length
|
|
||||||
let max_compressed_len = snap::raw::max_compress_len(length) as u64;
|
|
||||||
// Create a limit reader as a wrapper that reads only upto `max_compressed_len` from `src`.
|
|
||||||
let limit_reader = Cursor::new(src.as_ref()).take(max_compressed_len);
|
|
||||||
let mut reader = FrameDecoder::new(limit_reader);
|
|
||||||
|
|
||||||
let mut decoded_buffer = vec![0; length];
|
|
||||||
|
|
||||||
match reader.read_exact(&mut decoded_buffer) {
|
|
||||||
Ok(()) => {
|
|
||||||
// `n` is how many bytes the reader read in the compressed stream
|
|
||||||
let n = reader.get_ref().get_ref().position();
|
|
||||||
self.len = None;
|
|
||||||
let _read_bytes = src.split_to(n as usize);
|
|
||||||
// Safe to `take` from `self.fork_name` as we have all the bytes we need to
|
|
||||||
// decode an ssz object at this point.
|
|
||||||
let fork_name = self.fork_name.take();
|
|
||||||
handle_rpc_response(self.protocol.versioned_protocol, &decoded_buffer, fork_name)
|
|
||||||
}
|
|
||||||
Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<E: EthSpec> OutboundCodec<OutboundRequest<E>> for SSZSnappyOutboundCodec<E> {
|
|
||||||
type CodecErrorType = ErrorType;
|
|
||||||
|
|
||||||
fn decode_error(
|
|
||||||
&mut self,
|
|
||||||
src: &mut BytesMut,
|
|
||||||
) -> Result<Option<Self::CodecErrorType>, RPCError> {
|
|
||||||
let Some(length) = handle_length(&mut self.inner, &mut self.len, src)? else {
|
|
||||||
return Ok(None);
|
|
||||||
};
|
|
||||||
|
|
||||||
// Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of
|
|
||||||
// packet size for ssz container corresponding to `ErrorType`.
|
|
||||||
if length > self.max_packet_size || length > *ERROR_TYPE_MAX || length < *ERROR_TYPE_MIN {
|
|
||||||
return Err(RPCError::InvalidData(format!(
|
|
||||||
"RPC Error length is out of bounds, length {}",
|
|
||||||
length
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Calculate worst case compression length for given uncompressed length
|
|
||||||
let max_compressed_len = snap::raw::max_compress_len(length) as u64;
|
|
||||||
// Create a limit reader as a wrapper that reads only upto `max_compressed_len` from `src`.
|
|
||||||
let limit_reader = Cursor::new(src.as_ref()).take(max_compressed_len);
|
|
||||||
let mut reader = FrameDecoder::new(limit_reader);
|
|
||||||
let mut decoded_buffer = vec![0; length];
|
|
||||||
match reader.read_exact(&mut decoded_buffer) {
|
|
||||||
Ok(()) => {
|
|
||||||
// `n` is how many bytes the reader read in the compressed stream
|
|
||||||
let n = reader.get_ref().get_ref().position();
|
|
||||||
self.len = None;
|
|
||||||
let _read_bytes = src.split_to(n as usize);
|
|
||||||
Ok(Some(ErrorType(VariableList::from_ssz_bytes(
|
|
||||||
&decoded_buffer,
|
|
||||||
)?)))
|
|
||||||
}
|
|
||||||
Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len),
|
|
||||||
}
|
}
|
||||||
|
// return the result
|
||||||
|
inner_result
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1030,7 +1075,7 @@ mod tests {
|
|||||||
let mut snappy_inbound_codec =
|
let mut snappy_inbound_codec =
|
||||||
SSZSnappyInboundCodec::<Spec>::new(snappy_protocol_id, max_packet_size, fork_context);
|
SSZSnappyInboundCodec::<Spec>::new(snappy_protocol_id, max_packet_size, fork_context);
|
||||||
|
|
||||||
snappy_inbound_codec.encode(message, &mut buf)?;
|
snappy_inbound_codec.encode_response(message, &mut buf)?;
|
||||||
Ok(buf)
|
Ok(buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1075,7 +1120,7 @@ mod tests {
|
|||||||
let mut snappy_outbound_codec =
|
let mut snappy_outbound_codec =
|
||||||
SSZSnappyOutboundCodec::<Spec>::new(snappy_protocol_id, max_packet_size, fork_context);
|
SSZSnappyOutboundCodec::<Spec>::new(snappy_protocol_id, max_packet_size, fork_context);
|
||||||
// decode message just as snappy message
|
// decode message just as snappy message
|
||||||
snappy_outbound_codec.decode(message)
|
snappy_outbound_codec.decode_response(message)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Encodes the provided protocol message as bytes and tries to decode the encoding bytes.
|
/// Encodes the provided protocol message as bytes and tries to decode the encoding bytes.
|
||||||
@@ -1847,4 +1892,129 @@ mod tests {
|
|||||||
RPCError::InvalidData(_)
|
RPCError::InvalidData(_)
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_decode_status_message() {
|
||||||
|
let message = hex::decode("0054ff060000734e615070590032000006e71e7b54989925efd6c9cbcb8ceb9b5f71216f5137282bf6a1e3b50f64e42d6c7fb347abe07eb0db8200000005029e2800").unwrap();
|
||||||
|
let mut buf = BytesMut::new();
|
||||||
|
buf.extend_from_slice(&message);
|
||||||
|
|
||||||
|
let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy);
|
||||||
|
|
||||||
|
let fork_context = Arc::new(fork_context(ForkName::Base));
|
||||||
|
|
||||||
|
let chain_spec = Spec::default_spec();
|
||||||
|
|
||||||
|
let mut snappy_outbound_codec = SSZSnappyOutboundCodec::<Spec>::new(
|
||||||
|
snappy_protocol_id,
|
||||||
|
max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize),
|
||||||
|
fork_context,
|
||||||
|
);
|
||||||
|
|
||||||
|
// remove response code
|
||||||
|
let mut snappy_buf = buf.clone();
|
||||||
|
let _ = snappy_buf.split_to(1);
|
||||||
|
|
||||||
|
// decode message just as snappy message
|
||||||
|
let _snappy_decoded_message = snappy_outbound_codec
|
||||||
|
.decode_response(&mut snappy_buf)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// decode message as ssz snappy chunk
|
||||||
|
let _snappy_decoded_chunk = snappy_outbound_codec.decode(&mut buf).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_invalid_length_prefix() {
|
||||||
|
let mut uvi_codec: Uvi<u128> = Uvi::default();
|
||||||
|
let mut dst = BytesMut::with_capacity(1024);
|
||||||
|
|
||||||
|
// Smallest > 10 byte varint
|
||||||
|
let len: u128 = 2u128.pow(70);
|
||||||
|
|
||||||
|
// Insert length-prefix
|
||||||
|
uvi_codec.encode(len, &mut dst).unwrap();
|
||||||
|
|
||||||
|
let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy);
|
||||||
|
|
||||||
|
let fork_context = Arc::new(fork_context(ForkName::Base));
|
||||||
|
|
||||||
|
let chain_spec = Spec::default_spec();
|
||||||
|
|
||||||
|
let mut snappy_outbound_codec = SSZSnappyOutboundCodec::<Spec>::new(
|
||||||
|
snappy_protocol_id,
|
||||||
|
max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize),
|
||||||
|
fork_context,
|
||||||
|
);
|
||||||
|
|
||||||
|
let snappy_decoded_message = snappy_outbound_codec.decode_response(&mut dst).unwrap_err();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
snappy_decoded_message,
|
||||||
|
RPCError::IoError("input bytes exceed maximum".to_string()),
|
||||||
|
"length-prefix of > 10 bytes is invalid"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_length_limits() {
|
||||||
|
fn encode_len(len: usize) -> BytesMut {
|
||||||
|
let mut uvi_codec: Uvi<usize> = Uvi::default();
|
||||||
|
let mut dst = BytesMut::with_capacity(1024);
|
||||||
|
uvi_codec.encode(len, &mut dst).unwrap();
|
||||||
|
dst
|
||||||
|
}
|
||||||
|
|
||||||
|
let protocol_id = ProtocolId::new(SupportedProtocol::BlocksByRangeV1, Encoding::SSZSnappy);
|
||||||
|
|
||||||
|
// Response limits
|
||||||
|
let fork_context = Arc::new(fork_context(ForkName::Base));
|
||||||
|
|
||||||
|
let chain_spec = Spec::default_spec();
|
||||||
|
|
||||||
|
let max_rpc_size = max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize);
|
||||||
|
let limit = protocol_id.rpc_response_limits::<Spec>(&fork_context);
|
||||||
|
let mut max = encode_len(limit.max + 1);
|
||||||
|
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
|
||||||
|
protocol_id.clone(),
|
||||||
|
max_rpc_size,
|
||||||
|
fork_context.clone(),
|
||||||
|
);
|
||||||
|
assert!(matches!(
|
||||||
|
codec.decode_response(&mut max).unwrap_err(),
|
||||||
|
RPCError::InvalidData(_)
|
||||||
|
));
|
||||||
|
|
||||||
|
let mut min = encode_len(limit.min - 1);
|
||||||
|
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
|
||||||
|
protocol_id.clone(),
|
||||||
|
max_rpc_size,
|
||||||
|
fork_context.clone(),
|
||||||
|
);
|
||||||
|
assert!(matches!(
|
||||||
|
codec.decode_response(&mut min).unwrap_err(),
|
||||||
|
RPCError::InvalidData(_)
|
||||||
|
));
|
||||||
|
|
||||||
|
// Request limits
|
||||||
|
let limit = protocol_id.rpc_request_limits(&fork_context.spec);
|
||||||
|
let mut max = encode_len(limit.max + 1);
|
||||||
|
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
|
||||||
|
protocol_id.clone(),
|
||||||
|
max_rpc_size,
|
||||||
|
fork_context.clone(),
|
||||||
|
);
|
||||||
|
assert!(matches!(
|
||||||
|
codec.decode_response(&mut max).unwrap_err(),
|
||||||
|
RPCError::InvalidData(_)
|
||||||
|
));
|
||||||
|
|
||||||
|
let mut min = encode_len(limit.min - 1);
|
||||||
|
let mut codec =
|
||||||
|
SSZSnappyOutboundCodec::<Spec>::new(protocol_id, max_rpc_size, fork_context);
|
||||||
|
assert!(matches!(
|
||||||
|
codec.decode_response(&mut min).unwrap_err(),
|
||||||
|
RPCError::InvalidData(_)
|
||||||
|
));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -1,334 +0,0 @@
|
|||||||
//! This handles the various supported encoding mechanism for the Eth 2.0 RPC.
|
|
||||||
|
|
||||||
use crate::rpc::methods::ErrorType;
|
|
||||||
use crate::rpc::{InboundRequest, OutboundRequest, RPCCodedResponse, RPCResponse};
|
|
||||||
use libp2p::bytes::BufMut;
|
|
||||||
use libp2p::bytes::BytesMut;
|
|
||||||
use std::marker::PhantomData;
|
|
||||||
use tokio_util::codec::{Decoder, Encoder};
|
|
||||||
use types::EthSpec;
|
|
||||||
|
|
||||||
pub trait OutboundCodec<TItem>: Encoder<TItem> + Decoder {
|
|
||||||
type CodecErrorType;
|
|
||||||
|
|
||||||
fn decode_error(
|
|
||||||
&mut self,
|
|
||||||
src: &mut BytesMut,
|
|
||||||
) -> Result<Option<Self::CodecErrorType>, <Self as Decoder>::Error>;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Global Inbound Codec */
|
|
||||||
// This deals with Decoding RPC Requests from other peers and encoding our responses
|
|
||||||
|
|
||||||
pub struct BaseInboundCodec<TCodec, E>
|
|
||||||
where
|
|
||||||
TCodec: Encoder<RPCCodedResponse<E>> + Decoder,
|
|
||||||
E: EthSpec,
|
|
||||||
{
|
|
||||||
/// Inner codec for handling various encodings
|
|
||||||
inner: TCodec,
|
|
||||||
phantom: PhantomData<E>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<TCodec, E> BaseInboundCodec<TCodec, E>
|
|
||||||
where
|
|
||||||
TCodec: Encoder<RPCCodedResponse<E>> + Decoder,
|
|
||||||
E: EthSpec,
|
|
||||||
{
|
|
||||||
pub fn new(codec: TCodec) -> Self {
|
|
||||||
BaseInboundCodec {
|
|
||||||
inner: codec,
|
|
||||||
phantom: PhantomData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Global Outbound Codec */
|
|
||||||
// This deals with Decoding RPC Responses from other peers and encoding our requests
|
|
||||||
pub struct BaseOutboundCodec<TOutboundCodec, E>
|
|
||||||
where
|
|
||||||
TOutboundCodec: OutboundCodec<OutboundRequest<E>>,
|
|
||||||
E: EthSpec,
|
|
||||||
{
|
|
||||||
/// Inner codec for handling various encodings.
|
|
||||||
inner: TOutboundCodec,
|
|
||||||
/// Keeps track of the current response code for a chunk.
|
|
||||||
current_response_code: Option<u8>,
|
|
||||||
phantom: PhantomData<E>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<TOutboundCodec, E> BaseOutboundCodec<TOutboundCodec, E>
|
|
||||||
where
|
|
||||||
E: EthSpec,
|
|
||||||
TOutboundCodec: OutboundCodec<OutboundRequest<E>>,
|
|
||||||
{
|
|
||||||
pub fn new(codec: TOutboundCodec) -> Self {
|
|
||||||
BaseOutboundCodec {
|
|
||||||
inner: codec,
|
|
||||||
current_response_code: None,
|
|
||||||
phantom: PhantomData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Implementation of the Encoding/Decoding for the global codecs */
|
|
||||||
|
|
||||||
/* Base Inbound Codec */
|
|
||||||
|
|
||||||
// This Encodes RPC Responses sent to external peers
|
|
||||||
impl<TCodec, E> Encoder<RPCCodedResponse<E>> for BaseInboundCodec<TCodec, E>
|
|
||||||
where
|
|
||||||
E: EthSpec,
|
|
||||||
TCodec: Decoder + Encoder<RPCCodedResponse<E>>,
|
|
||||||
{
|
|
||||||
type Error = <TCodec as Encoder<RPCCodedResponse<E>>>::Error;
|
|
||||||
|
|
||||||
fn encode(&mut self, item: RPCCodedResponse<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
|
||||||
dst.clear();
|
|
||||||
dst.reserve(1);
|
|
||||||
dst.put_u8(
|
|
||||||
item.as_u8()
|
|
||||||
.expect("Should never encode a stream termination"),
|
|
||||||
);
|
|
||||||
self.inner.encode(item, dst)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// This Decodes RPC Requests from external peers
|
|
||||||
impl<TCodec, E> Decoder for BaseInboundCodec<TCodec, E>
|
|
||||||
where
|
|
||||||
E: EthSpec,
|
|
||||||
TCodec: Encoder<RPCCodedResponse<E>> + Decoder<Item = InboundRequest<E>>,
|
|
||||||
{
|
|
||||||
type Item = InboundRequest<E>;
|
|
||||||
type Error = <TCodec as Decoder>::Error;
|
|
||||||
|
|
||||||
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
|
||||||
self.inner.decode(src)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Base Outbound Codec */
|
|
||||||
|
|
||||||
// This Encodes RPC Requests sent to external peers
|
|
||||||
impl<TCodec, E> Encoder<OutboundRequest<E>> for BaseOutboundCodec<TCodec, E>
|
|
||||||
where
|
|
||||||
E: EthSpec,
|
|
||||||
TCodec: OutboundCodec<OutboundRequest<E>> + Encoder<OutboundRequest<E>>,
|
|
||||||
{
|
|
||||||
type Error = <TCodec as Encoder<OutboundRequest<E>>>::Error;
|
|
||||||
|
|
||||||
fn encode(&mut self, item: OutboundRequest<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
|
||||||
self.inner.encode(item, dst)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// This decodes RPC Responses received from external peers
|
|
||||||
impl<TCodec, E> Decoder for BaseOutboundCodec<TCodec, E>
|
|
||||||
where
|
|
||||||
E: EthSpec,
|
|
||||||
TCodec: OutboundCodec<OutboundRequest<E>, CodecErrorType = ErrorType>
|
|
||||||
+ Decoder<Item = RPCResponse<E>>,
|
|
||||||
{
|
|
||||||
type Item = RPCCodedResponse<E>;
|
|
||||||
type Error = <TCodec as Decoder>::Error;
|
|
||||||
|
|
||||||
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
|
||||||
// if we have only received the response code, wait for more bytes
|
|
||||||
if src.len() <= 1 {
|
|
||||||
return Ok(None);
|
|
||||||
}
|
|
||||||
// using the response code determine which kind of payload needs to be decoded.
|
|
||||||
let response_code = self.current_response_code.unwrap_or_else(|| {
|
|
||||||
let resp_code = src.split_to(1)[0];
|
|
||||||
self.current_response_code = Some(resp_code);
|
|
||||||
resp_code
|
|
||||||
});
|
|
||||||
|
|
||||||
let inner_result = {
|
|
||||||
if RPCCodedResponse::<E>::is_response(response_code) {
|
|
||||||
// decode an actual response and mutates the buffer if enough bytes have been read
|
|
||||||
// returning the result.
|
|
||||||
self.inner
|
|
||||||
.decode(src)
|
|
||||||
.map(|r| r.map(RPCCodedResponse::Success))
|
|
||||||
} else {
|
|
||||||
// decode an error
|
|
||||||
self.inner
|
|
||||||
.decode_error(src)
|
|
||||||
.map(|r| r.map(|resp| RPCCodedResponse::from_error(response_code, resp)))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
// if the inner decoder was capable of decoding a chunk, we need to reset the current
|
|
||||||
// response code for the next chunk
|
|
||||||
if let Ok(Some(_)) = inner_result {
|
|
||||||
self.current_response_code = None;
|
|
||||||
}
|
|
||||||
// return the result
|
|
||||||
inner_result
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::super::ssz_snappy::*;
|
|
||||||
use super::*;
|
|
||||||
use crate::rpc::protocol::*;
|
|
||||||
|
|
||||||
use std::sync::Arc;
|
|
||||||
use types::{Epoch, FixedBytesExtended, ForkContext, ForkName, Hash256, Slot};
|
|
||||||
use unsigned_varint::codec::Uvi;
|
|
||||||
|
|
||||||
type Spec = types::MainnetEthSpec;
|
|
||||||
|
|
||||||
fn fork_context(fork_name: ForkName) -> ForkContext {
|
|
||||||
let mut chain_spec = Spec::default_spec();
|
|
||||||
let altair_fork_epoch = Epoch::new(1);
|
|
||||||
let bellatrix_fork_epoch = Epoch::new(2);
|
|
||||||
let capella_fork_epoch = Epoch::new(3);
|
|
||||||
let deneb_fork_epoch = Epoch::new(4);
|
|
||||||
let electra_fork_epoch = Epoch::new(5);
|
|
||||||
|
|
||||||
chain_spec.altair_fork_epoch = Some(altair_fork_epoch);
|
|
||||||
chain_spec.bellatrix_fork_epoch = Some(bellatrix_fork_epoch);
|
|
||||||
chain_spec.capella_fork_epoch = Some(capella_fork_epoch);
|
|
||||||
chain_spec.deneb_fork_epoch = Some(deneb_fork_epoch);
|
|
||||||
chain_spec.electra_fork_epoch = Some(electra_fork_epoch);
|
|
||||||
|
|
||||||
let current_slot = match fork_name {
|
|
||||||
ForkName::Base => Slot::new(0),
|
|
||||||
ForkName::Altair => altair_fork_epoch.start_slot(Spec::slots_per_epoch()),
|
|
||||||
ForkName::Bellatrix => bellatrix_fork_epoch.start_slot(Spec::slots_per_epoch()),
|
|
||||||
ForkName::Capella => capella_fork_epoch.start_slot(Spec::slots_per_epoch()),
|
|
||||||
ForkName::Deneb => deneb_fork_epoch.start_slot(Spec::slots_per_epoch()),
|
|
||||||
ForkName::Electra => electra_fork_epoch.start_slot(Spec::slots_per_epoch()),
|
|
||||||
};
|
|
||||||
ForkContext::new::<Spec>(current_slot, Hash256::zero(), &chain_spec)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_decode_status_message() {
|
|
||||||
let message = hex::decode("0054ff060000734e615070590032000006e71e7b54989925efd6c9cbcb8ceb9b5f71216f5137282bf6a1e3b50f64e42d6c7fb347abe07eb0db8200000005029e2800").unwrap();
|
|
||||||
let mut buf = BytesMut::new();
|
|
||||||
buf.extend_from_slice(&message);
|
|
||||||
|
|
||||||
let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy);
|
|
||||||
|
|
||||||
let fork_context = Arc::new(fork_context(ForkName::Base));
|
|
||||||
|
|
||||||
let chain_spec = Spec::default_spec();
|
|
||||||
|
|
||||||
let mut snappy_outbound_codec = SSZSnappyOutboundCodec::<Spec>::new(
|
|
||||||
snappy_protocol_id,
|
|
||||||
max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize),
|
|
||||||
fork_context,
|
|
||||||
);
|
|
||||||
|
|
||||||
// remove response code
|
|
||||||
let mut snappy_buf = buf.clone();
|
|
||||||
let _ = snappy_buf.split_to(1);
|
|
||||||
|
|
||||||
// decode message just as snappy message
|
|
||||||
let _snappy_decoded_message = snappy_outbound_codec.decode(&mut snappy_buf).unwrap();
|
|
||||||
|
|
||||||
// build codecs for entire chunk
|
|
||||||
let mut snappy_base_outbound_codec = BaseOutboundCodec::new(snappy_outbound_codec);
|
|
||||||
|
|
||||||
// decode message as ssz snappy chunk
|
|
||||||
let _snappy_decoded_chunk = snappy_base_outbound_codec.decode(&mut buf).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_invalid_length_prefix() {
|
|
||||||
let mut uvi_codec: Uvi<u128> = Uvi::default();
|
|
||||||
let mut dst = BytesMut::with_capacity(1024);
|
|
||||||
|
|
||||||
// Smallest > 10 byte varint
|
|
||||||
let len: u128 = 2u128.pow(70);
|
|
||||||
|
|
||||||
// Insert length-prefix
|
|
||||||
uvi_codec.encode(len, &mut dst).unwrap();
|
|
||||||
|
|
||||||
let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy);
|
|
||||||
|
|
||||||
let fork_context = Arc::new(fork_context(ForkName::Base));
|
|
||||||
|
|
||||||
let chain_spec = Spec::default_spec();
|
|
||||||
|
|
||||||
let mut snappy_outbound_codec = SSZSnappyOutboundCodec::<Spec>::new(
|
|
||||||
snappy_protocol_id,
|
|
||||||
max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize),
|
|
||||||
fork_context,
|
|
||||||
);
|
|
||||||
|
|
||||||
let snappy_decoded_message = snappy_outbound_codec.decode(&mut dst).unwrap_err();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
snappy_decoded_message,
|
|
||||||
RPCError::IoError("input bytes exceed maximum".to_string()),
|
|
||||||
"length-prefix of > 10 bytes is invalid"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_length_limits() {
|
|
||||||
fn encode_len(len: usize) -> BytesMut {
|
|
||||||
let mut uvi_codec: Uvi<usize> = Uvi::default();
|
|
||||||
let mut dst = BytesMut::with_capacity(1024);
|
|
||||||
uvi_codec.encode(len, &mut dst).unwrap();
|
|
||||||
dst
|
|
||||||
}
|
|
||||||
|
|
||||||
let protocol_id = ProtocolId::new(SupportedProtocol::BlocksByRangeV1, Encoding::SSZSnappy);
|
|
||||||
|
|
||||||
// Response limits
|
|
||||||
let fork_context = Arc::new(fork_context(ForkName::Base));
|
|
||||||
|
|
||||||
let chain_spec = Spec::default_spec();
|
|
||||||
|
|
||||||
let max_rpc_size = max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize);
|
|
||||||
let limit = protocol_id.rpc_response_limits::<Spec>(&fork_context);
|
|
||||||
let mut max = encode_len(limit.max + 1);
|
|
||||||
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
|
|
||||||
protocol_id.clone(),
|
|
||||||
max_rpc_size,
|
|
||||||
fork_context.clone(),
|
|
||||||
);
|
|
||||||
assert!(matches!(
|
|
||||||
codec.decode(&mut max).unwrap_err(),
|
|
||||||
RPCError::InvalidData(_)
|
|
||||||
));
|
|
||||||
|
|
||||||
let mut min = encode_len(limit.min - 1);
|
|
||||||
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
|
|
||||||
protocol_id.clone(),
|
|
||||||
max_rpc_size,
|
|
||||||
fork_context.clone(),
|
|
||||||
);
|
|
||||||
assert!(matches!(
|
|
||||||
codec.decode(&mut min).unwrap_err(),
|
|
||||||
RPCError::InvalidData(_)
|
|
||||||
));
|
|
||||||
|
|
||||||
// Request limits
|
|
||||||
let limit = protocol_id.rpc_request_limits(&fork_context.spec);
|
|
||||||
let mut max = encode_len(limit.max + 1);
|
|
||||||
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
|
|
||||||
protocol_id.clone(),
|
|
||||||
max_rpc_size,
|
|
||||||
fork_context.clone(),
|
|
||||||
);
|
|
||||||
assert!(matches!(
|
|
||||||
codec.decode(&mut max).unwrap_err(),
|
|
||||||
RPCError::InvalidData(_)
|
|
||||||
));
|
|
||||||
|
|
||||||
let mut min = encode_len(limit.min - 1);
|
|
||||||
let mut codec =
|
|
||||||
SSZSnappyOutboundCodec::<Spec>::new(protocol_id, max_rpc_size, fork_context);
|
|
||||||
assert!(matches!(
|
|
||||||
codec.decode(&mut min).unwrap_err(),
|
|
||||||
RPCError::InvalidData(_)
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,61 +0,0 @@
|
|||||||
pub(crate) mod base;
|
|
||||||
pub(crate) mod ssz_snappy;
|
|
||||||
|
|
||||||
use self::base::{BaseInboundCodec, BaseOutboundCodec};
|
|
||||||
use self::ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec};
|
|
||||||
use crate::rpc::protocol::RPCError;
|
|
||||||
use crate::rpc::{InboundRequest, OutboundRequest, RPCCodedResponse};
|
|
||||||
use libp2p::bytes::BytesMut;
|
|
||||||
use tokio_util::codec::{Decoder, Encoder};
|
|
||||||
use types::EthSpec;
|
|
||||||
|
|
||||||
// Known types of codecs
|
|
||||||
pub enum InboundCodec<E: EthSpec> {
|
|
||||||
SSZSnappy(BaseInboundCodec<SSZSnappyInboundCodec<E>, E>),
|
|
||||||
}
|
|
||||||
|
|
||||||
pub enum OutboundCodec<E: EthSpec> {
|
|
||||||
SSZSnappy(BaseOutboundCodec<SSZSnappyOutboundCodec<E>, E>),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<E: EthSpec> Encoder<RPCCodedResponse<E>> for InboundCodec<E> {
|
|
||||||
type Error = RPCError;
|
|
||||||
|
|
||||||
fn encode(&mut self, item: RPCCodedResponse<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
|
||||||
match self {
|
|
||||||
InboundCodec::SSZSnappy(codec) => codec.encode(item, dst),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<E: EthSpec> Decoder for InboundCodec<E> {
|
|
||||||
type Item = InboundRequest<E>;
|
|
||||||
type Error = RPCError;
|
|
||||||
|
|
||||||
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
|
||||||
match self {
|
|
||||||
InboundCodec::SSZSnappy(codec) => codec.decode(src),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<E: EthSpec> Encoder<OutboundRequest<E>> for OutboundCodec<E> {
|
|
||||||
type Error = RPCError;
|
|
||||||
|
|
||||||
fn encode(&mut self, item: OutboundRequest<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
|
||||||
match self {
|
|
||||||
OutboundCodec::SSZSnappy(codec) => codec.encode(item, dst),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<E: EthSpec> Decoder for OutboundCodec<E> {
|
|
||||||
type Item = RPCCodedResponse<E>;
|
|
||||||
type Error = RPCError;
|
|
||||||
|
|
||||||
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
|
||||||
match self {
|
|
||||||
OutboundCodec::SSZSnappy(codec) => codec.decode(src),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -2,9 +2,7 @@ use super::methods::*;
|
|||||||
use super::protocol::ProtocolId;
|
use super::protocol::ProtocolId;
|
||||||
use super::protocol::SupportedProtocol;
|
use super::protocol::SupportedProtocol;
|
||||||
use super::RPCError;
|
use super::RPCError;
|
||||||
use crate::rpc::codec::{
|
use crate::rpc::codec::SSZSnappyOutboundCodec;
|
||||||
base::BaseOutboundCodec, ssz_snappy::SSZSnappyOutboundCodec, OutboundCodec,
|
|
||||||
};
|
|
||||||
use crate::rpc::protocol::Encoding;
|
use crate::rpc::protocol::Encoding;
|
||||||
use futures::future::BoxFuture;
|
use futures::future::BoxFuture;
|
||||||
use futures::prelude::{AsyncRead, AsyncWrite};
|
use futures::prelude::{AsyncRead, AsyncWrite};
|
||||||
@@ -183,7 +181,7 @@ impl<E: EthSpec> OutboundRequest<E> {
|
|||||||
|
|
||||||
/* Outbound upgrades */
|
/* Outbound upgrades */
|
||||||
|
|
||||||
pub type OutboundFramed<TSocket, E> = Framed<Compat<TSocket>, OutboundCodec<E>>;
|
pub type OutboundFramed<TSocket, E> = Framed<Compat<TSocket>, SSZSnappyOutboundCodec<E>>;
|
||||||
|
|
||||||
impl<TSocket, E> OutboundUpgrade<TSocket> for OutboundRequestContainer<E>
|
impl<TSocket, E> OutboundUpgrade<TSocket> for OutboundRequestContainer<E>
|
||||||
where
|
where
|
||||||
@@ -199,12 +197,7 @@ where
|
|||||||
let socket = socket.compat();
|
let socket = socket.compat();
|
||||||
let codec = match protocol.encoding {
|
let codec = match protocol.encoding {
|
||||||
Encoding::SSZSnappy => {
|
Encoding::SSZSnappy => {
|
||||||
let ssz_snappy_codec = BaseOutboundCodec::new(SSZSnappyOutboundCodec::new(
|
SSZSnappyOutboundCodec::new(protocol, self.max_rpc_size, self.fork_context.clone())
|
||||||
protocol,
|
|
||||||
self.max_rpc_size,
|
|
||||||
self.fork_context.clone(),
|
|
||||||
));
|
|
||||||
OutboundCodec::SSZSnappy(ssz_snappy_codec)
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
use super::methods::*;
|
use super::methods::*;
|
||||||
use crate::rpc::codec::{base::BaseInboundCodec, ssz_snappy::SSZSnappyInboundCodec, InboundCodec};
|
use crate::rpc::codec::SSZSnappyInboundCodec;
|
||||||
use futures::future::BoxFuture;
|
use futures::future::BoxFuture;
|
||||||
use futures::prelude::{AsyncRead, AsyncWrite};
|
use futures::prelude::{AsyncRead, AsyncWrite};
|
||||||
use futures::{FutureExt, StreamExt};
|
use futures::{FutureExt, StreamExt};
|
||||||
@@ -647,7 +647,7 @@ pub fn rpc_data_column_limits<E: EthSpec>() -> RpcLimits {
|
|||||||
|
|
||||||
pub type InboundOutput<TSocket, E> = (InboundRequest<E>, InboundFramed<TSocket, E>);
|
pub type InboundOutput<TSocket, E> = (InboundRequest<E>, InboundFramed<TSocket, E>);
|
||||||
pub type InboundFramed<TSocket, E> =
|
pub type InboundFramed<TSocket, E> =
|
||||||
Framed<std::pin::Pin<Box<TimeoutStream<Compat<TSocket>>>>, InboundCodec<E>>;
|
Framed<std::pin::Pin<Box<TimeoutStream<Compat<TSocket>>>>, SSZSnappyInboundCodec<E>>;
|
||||||
|
|
||||||
impl<TSocket, E> InboundUpgrade<TSocket> for RPCProtocol<E>
|
impl<TSocket, E> InboundUpgrade<TSocket> for RPCProtocol<E>
|
||||||
where
|
where
|
||||||
@@ -664,15 +664,13 @@ where
|
|||||||
// convert the socket to tokio compatible socket
|
// convert the socket to tokio compatible socket
|
||||||
let socket = socket.compat();
|
let socket = socket.compat();
|
||||||
let codec = match protocol.encoding {
|
let codec = match protocol.encoding {
|
||||||
Encoding::SSZSnappy => {
|
Encoding::SSZSnappy => SSZSnappyInboundCodec::new(
|
||||||
let ssz_snappy_codec = BaseInboundCodec::new(SSZSnappyInboundCodec::new(
|
protocol,
|
||||||
protocol,
|
self.max_rpc_size,
|
||||||
self.max_rpc_size,
|
self.fork_context.clone(),
|
||||||
self.fork_context.clone(),
|
),
|
||||||
));
|
|
||||||
InboundCodec::SSZSnappy(ssz_snappy_codec)
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut timed_socket = TimeoutStream::new(socket);
|
let mut timed_socket = TimeoutStream::new(socket);
|
||||||
timed_socket.set_read_timeout(Some(self.ttfb_timeout));
|
timed_socket.set_read_timeout(Some(self.ttfb_timeout));
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user