Silky smooth discovery (#1274)

* Initial structural re-write

* Improving discovery update and correcting attestation service logic

* Rework discovery.mod

* Handling lifetimes of query futures

* Discovery update first draft

* format fixes

* Stabalise discv5 update

* Formatting corrections

* Limit FindPeers queries and bug correction

* Update to stable release discv5

* Remove unnecessary pin

* formatting
This commit is contained in:
Age Manning
2020-06-19 14:13:23 +10:00
committed by GitHub
parent 06a72614cb
commit e379ad0f4e
48 changed files with 1168 additions and 1045 deletions

View File

@@ -0,0 +1,217 @@
//! This handles the various supported encoding mechanism for the Eth 2.0 RPC.
use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse};
use libp2p::bytes::BufMut;
use libp2p::bytes::BytesMut;
use std::marker::PhantomData;
use tokio_util::codec::{Decoder, Encoder};
use types::EthSpec;
pub trait OutboundCodec<TItem>: Encoder<TItem> + Decoder {
type ErrorType;
fn decode_error(
&mut self,
src: &mut BytesMut,
) -> Result<Option<Self::ErrorType>, <Self as Decoder>::Error>;
}
/* Global Inbound Codec */
// This deals with Decoding RPC Requests from other peers and encoding our responses
pub struct BaseInboundCodec<TCodec, TSpec>
where
TCodec: Encoder<RPCCodedResponse<TSpec>> + Decoder,
TSpec: EthSpec,
{
/// Inner codec for handling various encodings
inner: TCodec,
phantom: PhantomData<TSpec>,
}
impl<TCodec, TSpec> BaseInboundCodec<TCodec, TSpec>
where
TCodec: Encoder<RPCCodedResponse<TSpec>> + Decoder,
TSpec: EthSpec,
{
pub fn new(codec: TCodec) -> Self {
BaseInboundCodec {
inner: codec,
phantom: PhantomData,
}
}
}
/* Global Outbound Codec */
// This deals with Decoding RPC Responses from other peers and encoding our requests
pub struct BaseOutboundCodec<TOutboundCodec, TSpec>
where
TOutboundCodec: OutboundCodec<RPCRequest<TSpec>>,
TSpec: EthSpec,
{
/// Inner codec for handling various encodings.
inner: TOutboundCodec,
/// Keeps track of the current response code for a chunk.
current_response_code: Option<u8>,
phantom: PhantomData<TSpec>,
}
impl<TOutboundCodec, TSpec> BaseOutboundCodec<TOutboundCodec, TSpec>
where
TSpec: EthSpec,
TOutboundCodec: OutboundCodec<RPCRequest<TSpec>>,
{
pub fn new(codec: TOutboundCodec) -> Self {
BaseOutboundCodec {
inner: codec,
current_response_code: None,
phantom: PhantomData,
}
}
}
/* Implementation of the Encoding/Decoding for the global codecs */
/* Base Inbound Codec */
// This Encodes RPC Responses sent to external peers
impl<TCodec, TSpec> Encoder<RPCCodedResponse<TSpec>> for BaseInboundCodec<TCodec, TSpec>
where
TSpec: EthSpec,
TCodec: Decoder + Encoder<RPCCodedResponse<TSpec>>,
{
type Error = <TCodec as Encoder<RPCCodedResponse<TSpec>>>::Error;
fn encode(
&mut self,
item: RPCCodedResponse<TSpec>,
dst: &mut BytesMut,
) -> Result<(), Self::Error> {
dst.clear();
dst.reserve(1);
dst.put_u8(
item.as_u8()
.expect("Should never encode a stream termination"),
);
self.inner.encode(item, dst)
}
}
// This Decodes RPC Requests from external peers
impl<TCodec, TSpec> Decoder for BaseInboundCodec<TCodec, TSpec>
where
TSpec: EthSpec,
TCodec: Encoder<RPCCodedResponse<TSpec>> + Decoder<Item = RPCRequest<TSpec>>,
{
type Item = RPCRequest<TSpec>;
type Error = <TCodec as Decoder>::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
self.inner.decode(src)
}
}
/* Base Outbound Codec */
// This Encodes RPC Requests sent to external peers
impl<TCodec, TSpec> Encoder<RPCRequest<TSpec>> for BaseOutboundCodec<TCodec, TSpec>
where
TSpec: EthSpec,
TCodec: OutboundCodec<RPCRequest<TSpec>> + Encoder<RPCRequest<TSpec>>,
{
type Error = <TCodec as Encoder<RPCRequest<TSpec>>>::Error;
fn encode(&mut self, item: RPCRequest<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
self.inner.encode(item, dst)
}
}
// This decodes RPC Responses received from external peers
impl<TCodec, TSpec> Decoder for BaseOutboundCodec<TCodec, TSpec>
where
TSpec: EthSpec,
TCodec:
OutboundCodec<RPCRequest<TSpec>, ErrorType = String> + Decoder<Item = RPCResponse<TSpec>>,
{
type Item = RPCCodedResponse<TSpec>;
type Error = <TCodec as Decoder>::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
// if we have only received the response code, wait for more bytes
if src.len() <= 1 {
return Ok(None);
}
// using the response code determine which kind of payload needs to be decoded.
let response_code = self.current_response_code.unwrap_or_else(|| {
let resp_code = src.split_to(1)[0];
self.current_response_code = Some(resp_code);
resp_code
});
let inner_result = {
if RPCCodedResponse::<TSpec>::is_response(response_code) {
// decode an actual response and mutates the buffer if enough bytes have been read
// returning the result.
self.inner
.decode(src)
.map(|r| r.map(RPCCodedResponse::Success))
} else {
// decode an error
self.inner
.decode_error(src)
.map(|r| r.map(|resp| RPCCodedResponse::from_error(response_code, resp)))
}
};
// if the inner decoder was capable of decoding a chunk, we need to reset the current
// response code for the next chunk
if let Ok(Some(_)) = inner_result {
self.current_response_code = None;
}
// return the result
inner_result
}
}
#[cfg(test)]
mod tests {
use super::super::ssz::*;
use super::super::ssz_snappy::*;
use super::*;
use crate::rpc::protocol::*;
#[test]
fn test_decode_status_message() {
let message = hex::decode("ff060000734e615070590032000006e71e7b54989925efd6c9cbcb8ceb9b5f71216f5137282bf6a1e3b50f64e42d6c7fb347abe07eb0db8200000005029e2800").unwrap();
let mut buf = BytesMut::new();
buf.extend_from_slice(&message);
type Spec = types::MainnetEthSpec;
let snappy_protocol_id =
ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy);
let ssz_protocol_id = ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZ);
let mut snappy_outbound_codec =
SSZSnappyOutboundCodec::<Spec>::new(snappy_protocol_id, 1_048_576);
let mut ssz_outbound_codec = SSZOutboundCodec::<Spec>::new(ssz_protocol_id, 1_048_576);
// decode message just as snappy message
let snappy_decoded_message = snappy_outbound_codec.decode(&mut buf.clone());
// decode message just a ssz message
let ssz_decoded_message = ssz_outbound_codec.decode(&mut buf.clone());
// build codecs for entire chunk
let mut snappy_base_outbound_codec = BaseOutboundCodec::new(snappy_outbound_codec);
let mut ssz_base_outbound_codec = BaseOutboundCodec::new(ssz_outbound_codec);
// decode message as ssz snappy chunk
let snappy_decoded_chunk = snappy_base_outbound_codec.decode(&mut buf.clone());
// decode message just a ssz chunk
let ssz_decoded_chunk = ssz_base_outbound_codec.decode(&mut buf.clone());
let _ = dbg!(snappy_decoded_message);
let _ = dbg!(ssz_decoded_message);
let _ = dbg!(snappy_decoded_chunk);
let _ = dbg!(ssz_decoded_chunk);
}
}

View File

@@ -0,0 +1,69 @@
pub(crate) mod base;
pub(crate) mod ssz;
pub(crate) mod ssz_snappy;
use self::base::{BaseInboundCodec, BaseOutboundCodec};
use self::ssz::{SSZInboundCodec, SSZOutboundCodec};
use self::ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec};
use crate::rpc::protocol::RPCError;
use crate::rpc::{RPCCodedResponse, RPCRequest};
use libp2p::bytes::BytesMut;
use tokio_util::codec::{Decoder, Encoder};
use types::EthSpec;
// Known types of codecs
pub enum InboundCodec<TSpec: EthSpec> {
SSZSnappy(BaseInboundCodec<SSZSnappyInboundCodec<TSpec>, TSpec>),
SSZ(BaseInboundCodec<SSZInboundCodec<TSpec>, TSpec>),
}
pub enum OutboundCodec<TSpec: EthSpec> {
SSZSnappy(BaseOutboundCodec<SSZSnappyOutboundCodec<TSpec>, TSpec>),
SSZ(BaseOutboundCodec<SSZOutboundCodec<TSpec>, TSpec>),
}
impl<T: EthSpec> Encoder<RPCCodedResponse<T>> for InboundCodec<T> {
type Error = RPCError;
fn encode(&mut self, item: RPCCodedResponse<T>, dst: &mut BytesMut) -> Result<(), Self::Error> {
match self {
InboundCodec::SSZ(codec) => codec.encode(item, dst),
InboundCodec::SSZSnappy(codec) => codec.encode(item, dst),
}
}
}
impl<TSpec: EthSpec> Decoder for InboundCodec<TSpec> {
type Item = RPCRequest<TSpec>;
type Error = RPCError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self {
InboundCodec::SSZ(codec) => codec.decode(src),
InboundCodec::SSZSnappy(codec) => codec.decode(src),
}
}
}
impl<TSpec: EthSpec> Encoder<RPCRequest<TSpec>> for OutboundCodec<TSpec> {
type Error = RPCError;
fn encode(&mut self, item: RPCRequest<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
match self {
OutboundCodec::SSZ(codec) => codec.encode(item, dst),
OutboundCodec::SSZSnappy(codec) => codec.encode(item, dst),
}
}
}
impl<T: EthSpec> Decoder for OutboundCodec<T> {
type Item = RPCCodedResponse<T>;
type Error = RPCError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self {
OutboundCodec::SSZ(codec) => codec.decode(src),
OutboundCodec::SSZSnappy(codec) => codec.decode(src),
}
}
}

View File

@@ -0,0 +1,256 @@
use crate::rpc::methods::*;
use crate::rpc::{
codec::base::OutboundCodec,
protocol::{Encoding, Protocol, ProtocolId, RPCError, Version},
};
use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse};
use libp2p::bytes::{BufMut, Bytes, BytesMut};
use ssz::{Decode, Encode};
use std::marker::PhantomData;
use tokio_util::codec::{Decoder, Encoder};
use types::{EthSpec, SignedBeaconBlock};
use unsigned_varint::codec::UviBytes;
/* Inbound Codec */
pub struct SSZInboundCodec<TSpec: EthSpec> {
inner: UviBytes,
protocol: ProtocolId,
phantom: PhantomData<TSpec>,
}
impl<TSpec: EthSpec> SSZInboundCodec<TSpec> {
pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self {
let mut uvi_codec = UviBytes::default();
uvi_codec.set_max_len(max_packet_size);
// this encoding only applies to ssz.
debug_assert_eq!(protocol.encoding, Encoding::SSZ);
SSZInboundCodec {
inner: uvi_codec,
protocol,
phantom: PhantomData,
}
}
}
// Encoder for inbound streams: Encodes RPC Responses sent to peers.
impl<TSpec: EthSpec> Encoder<RPCCodedResponse<TSpec>> for SSZInboundCodec<TSpec> {
type Error = RPCError;
fn encode(
&mut self,
item: RPCCodedResponse<TSpec>,
dst: &mut BytesMut,
) -> Result<(), Self::Error> {
let bytes = match item {
RPCCodedResponse::Success(resp) => match resp {
RPCResponse::Status(res) => res.as_ssz_bytes(),
RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(),
RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(),
RPCResponse::Pong(res) => res.data.as_ssz_bytes(),
RPCResponse::MetaData(res) => res.as_ssz_bytes(),
},
RPCCodedResponse::InvalidRequest(err) => err.into_bytes().as_ssz_bytes(),
RPCCodedResponse::ServerError(err) => err.into_bytes().as_ssz_bytes(),
RPCCodedResponse::Unknown(err) => err.into_bytes().as_ssz_bytes(),
RPCCodedResponse::StreamTermination(_) => {
unreachable!("Code error - attempting to encode a stream termination")
}
};
if !bytes.is_empty() {
// length-prefix and return
return self
.inner
.encode(Bytes::from(bytes), dst)
.map_err(RPCError::from);
} else {
// payload is empty, add a 0-byte length prefix
dst.reserve(1);
dst.put_u8(0);
}
Ok(())
}
}
// Decoder for inbound streams: Decodes RPC requests from peers
impl<TSpec: EthSpec> Decoder for SSZInboundCodec<TSpec> {
type Item = RPCRequest<TSpec>;
type Error = RPCError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self.inner.decode(src).map_err(RPCError::from) {
Ok(Some(packet)) => match self.protocol.message_name {
Protocol::Status => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes(
&packet,
)?))),
},
Protocol::Goodbye => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes(
&packet,
)?))),
},
Protocol::BlocksByRange => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::BlocksByRange(
BlocksByRangeRequest::from_ssz_bytes(&packet)?,
))),
},
Protocol::BlocksByRoot => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: Vec::from_ssz_bytes(&packet)?,
}))),
},
Protocol::Ping => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::Ping(Ping {
data: u64::from_ssz_bytes(&packet)?,
}))),
},
Protocol::MetaData => match self.protocol.version {
Version::V1 => {
if packet.len() > 0 {
Err(RPCError::InvalidData)
} else {
Ok(Some(RPCRequest::MetaData(PhantomData)))
}
}
},
},
Ok(None) => Ok(None),
Err(e) => Err(e),
}
}
}
/* Outbound Codec: Codec for initiating RPC requests */
pub struct SSZOutboundCodec<TSpec: EthSpec> {
inner: UviBytes,
protocol: ProtocolId,
phantom: PhantomData<TSpec>,
}
impl<TSpec: EthSpec> SSZOutboundCodec<TSpec> {
pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self {
let mut uvi_codec = UviBytes::default();
uvi_codec.set_max_len(max_packet_size);
// this encoding only applies to ssz.
debug_assert_eq!(protocol.encoding, Encoding::SSZ);
SSZOutboundCodec {
inner: uvi_codec,
protocol,
phantom: PhantomData,
}
}
}
// Encoder for outbound streams: Encodes RPC Requests to peers
impl<TSpec: EthSpec> Encoder<RPCRequest<TSpec>> for SSZOutboundCodec<TSpec> {
type Error = RPCError;
fn encode(&mut self, item: RPCRequest<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = match item {
RPCRequest::Status(req) => req.as_ssz_bytes(),
RPCRequest::Goodbye(req) => req.as_ssz_bytes(),
RPCRequest::BlocksByRange(req) => req.as_ssz_bytes(),
RPCRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(),
RPCRequest::Ping(req) => req.as_ssz_bytes(),
RPCRequest::MetaData(_) => return Ok(()), // no metadata to encode
};
// length-prefix
self.inner
.encode(libp2p::bytes::Bytes::from(bytes), dst)
.map_err(RPCError::from)
}
}
// Decoder for outbound streams: Decodes RPC responses from peers.
//
// The majority of the decoding has now been pushed upstream due to the changing specification.
// We prefer to decode blocks and attestations with extra knowledge about the chain to perform
// faster verification checks before decoding entire blocks/attestations.
impl<TSpec: EthSpec> Decoder for SSZOutboundCodec<TSpec> {
type Item = RPCResponse<TSpec>;
type Error = RPCError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.len() == 1 && src[0] == 0_u8 {
// the object is empty. We return the empty object if this is the case
// clear the buffer and return an empty object
src.clear();
match self.protocol.message_name {
Protocol::Status => match self.protocol.version {
Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty HELLO message. The stream has terminated unexpectedly
},
Protocol::Goodbye => Err(RPCError::InvalidData),
Protocol::BlocksByRange => match self.protocol.version {
Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty block message.
},
Protocol::BlocksByRoot => match self.protocol.version {
Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty block message.
},
Protocol::Ping => match self.protocol.version {
Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty block message.
},
Protocol::MetaData => match self.protocol.version {
Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty block message.
},
}
} else {
match self.inner.decode(src).map_err(RPCError::from) {
Ok(Some(mut packet)) => {
// take the bytes from the buffer
let raw_bytes = packet.split();
match self.protocol.message_name {
Protocol::Status => match self.protocol.version {
Version::V1 => Ok(Some(RPCResponse::Status(
StatusMessage::from_ssz_bytes(&raw_bytes)?,
))),
},
Protocol::Goodbye => Err(RPCError::InvalidData),
Protocol::BlocksByRange => match self.protocol.version {
Version::V1 => Ok(Some(RPCResponse::BlocksByRange(Box::new(
SignedBeaconBlock::from_ssz_bytes(&raw_bytes)?,
)))),
},
Protocol::BlocksByRoot => match self.protocol.version {
Version::V1 => Ok(Some(RPCResponse::BlocksByRoot(Box::new(
SignedBeaconBlock::from_ssz_bytes(&raw_bytes)?,
)))),
},
Protocol::Ping => match self.protocol.version {
Version::V1 => Ok(Some(RPCResponse::Pong(Ping {
data: u64::from_ssz_bytes(&raw_bytes)?,
}))),
},
Protocol::MetaData => match self.protocol.version {
Version::V1 => Ok(Some(RPCResponse::MetaData(
MetaData::from_ssz_bytes(&raw_bytes)?,
))),
},
}
}
Ok(None) => Ok(None), // waiting for more bytes
Err(e) => Err(e),
}
}
}
}
impl<TSpec: EthSpec> OutboundCodec<RPCRequest<TSpec>> for SSZOutboundCodec<TSpec> {
type ErrorType = String;
fn decode_error(&mut self, src: &mut BytesMut) -> Result<Option<Self::ErrorType>, RPCError> {
match self.inner.decode(src).map_err(RPCError::from) {
Ok(Some(packet)) => Ok(Some(
String::from_utf8_lossy(&<Vec<u8>>::from_ssz_bytes(&packet)?).into(),
)),
Ok(None) => Ok(None),
Err(e) => Err(e),
}
}
}

View File

@@ -0,0 +1,354 @@
use crate::rpc::methods::*;
use crate::rpc::{
codec::base::OutboundCodec,
protocol::{Encoding, Protocol, ProtocolId, RPCError, Version},
};
use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse};
use libp2p::bytes::BytesMut;
use snap::read::FrameDecoder;
use snap::write::FrameEncoder;
use ssz::{Decode, Encode};
use std::io::Cursor;
use std::io::ErrorKind;
use std::io::{Read, Write};
use std::marker::PhantomData;
use tokio_util::codec::{Decoder, Encoder};
use types::{EthSpec, SignedBeaconBlock};
use unsigned_varint::codec::Uvi;
/* Inbound Codec */
pub struct SSZSnappyInboundCodec<TSpec: EthSpec> {
protocol: ProtocolId,
inner: Uvi<usize>,
len: Option<usize>,
/// Maximum bytes that can be sent in one req/resp chunked responses.
max_packet_size: usize,
phantom: PhantomData<TSpec>,
}
impl<T: EthSpec> SSZSnappyInboundCodec<T> {
pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self {
let uvi_codec = Uvi::default();
// this encoding only applies to ssz_snappy.
debug_assert_eq!(protocol.encoding, Encoding::SSZSnappy);
SSZSnappyInboundCodec {
inner: uvi_codec,
protocol,
len: None,
phantom: PhantomData,
max_packet_size,
}
}
}
// Encoder for inbound streams: Encodes RPC Responses sent to peers.
impl<TSpec: EthSpec> Encoder<RPCCodedResponse<TSpec>> for SSZSnappyInboundCodec<TSpec> {
type Error = RPCError;
fn encode(
&mut self,
item: RPCCodedResponse<TSpec>,
dst: &mut BytesMut,
) -> Result<(), Self::Error> {
let bytes = match item {
RPCCodedResponse::Success(resp) => match resp {
RPCResponse::Status(res) => res.as_ssz_bytes(),
RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(),
RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(),
RPCResponse::Pong(res) => res.data.as_ssz_bytes(),
RPCResponse::MetaData(res) => res.as_ssz_bytes(),
},
RPCCodedResponse::InvalidRequest(err) => err.into_bytes().as_ssz_bytes(),
RPCCodedResponse::ServerError(err) => err.into_bytes().as_ssz_bytes(),
RPCCodedResponse::Unknown(err) => err.into_bytes().as_ssz_bytes(),
RPCCodedResponse::StreamTermination(_) => {
unreachable!("Code error - attempting to encode a stream termination")
}
};
// SSZ encoded bytes should be within `max_packet_size`
if bytes.len() > self.max_packet_size {
return Err(RPCError::InternalError(
"attempting to encode data > max_packet_size".into(),
));
}
// Inserts the length prefix of the uncompressed bytes into dst
// encoded as a unsigned varint
self.inner
.encode(bytes.len(), dst)
.map_err(RPCError::from)?;
let mut writer = FrameEncoder::new(Vec::new());
writer.write_all(&bytes).map_err(RPCError::from)?;
writer.flush().map_err(RPCError::from)?;
// Write compressed bytes to `dst`
dst.extend_from_slice(writer.get_ref());
Ok(())
}
}
// Decoder for inbound streams: Decodes RPC requests from peers
impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
type Item = RPCRequest<TSpec>;
type Error = RPCError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if self.len.is_none() {
// Decode the length of the uncompressed bytes from an unsigned varint
match self.inner.decode(src).map_err(RPCError::from)? {
Some(length) => {
self.len = Some(length);
}
None => return Ok(None), // need more bytes to decode length
}
};
let length = self.len.expect("length should be Some");
// Should not attempt to decode rpc chunks with length > max_packet_size
if length > self.max_packet_size {
return Err(RPCError::InvalidData);
}
let mut reader = FrameDecoder::new(Cursor::new(&src));
let mut decoded_buffer = vec![0; length];
match reader.read_exact(&mut decoded_buffer) {
Ok(()) => {
// `n` is how many bytes the reader read in the compressed stream
let n = reader.get_ref().position();
self.len = None;
let _read_bytes = src.split_to(n as usize);
match self.protocol.message_name {
Protocol::Status => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes(
&decoded_buffer,
)?))),
},
Protocol::Goodbye => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::Goodbye(
GoodbyeReason::from_ssz_bytes(&decoded_buffer)?,
))),
},
Protocol::BlocksByRange => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::BlocksByRange(
BlocksByRangeRequest::from_ssz_bytes(&decoded_buffer)?,
))),
},
Protocol::BlocksByRoot => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: Vec::from_ssz_bytes(&decoded_buffer)?,
}))),
},
Protocol::Ping => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::Ping(Ping::from_ssz_bytes(
&decoded_buffer,
)?))),
},
Protocol::MetaData => match self.protocol.version {
Version::V1 => {
if decoded_buffer.len() > 0 {
Err(RPCError::InvalidData)
} else {
Ok(Some(RPCRequest::MetaData(PhantomData)))
}
}
},
}
}
Err(e) => match e.kind() {
// Haven't received enough bytes to decode yet
// TODO: check if this is the only Error variant where we return `Ok(None)`
ErrorKind::UnexpectedEof => {
return Ok(None);
}
_ => return Err(e).map_err(RPCError::from),
},
}
}
}
/* Outbound Codec: Codec for initiating RPC requests */
pub struct SSZSnappyOutboundCodec<TSpec: EthSpec> {
inner: Uvi<usize>,
len: Option<usize>,
protocol: ProtocolId,
/// Maximum bytes that can be sent in one req/resp chunked responses.
max_packet_size: usize,
phantom: PhantomData<TSpec>,
}
impl<TSpec: EthSpec> SSZSnappyOutboundCodec<TSpec> {
pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self {
let uvi_codec = Uvi::default();
// this encoding only applies to ssz_snappy.
debug_assert_eq!(protocol.encoding, Encoding::SSZSnappy);
SSZSnappyOutboundCodec {
inner: uvi_codec,
protocol,
max_packet_size,
len: None,
phantom: PhantomData,
}
}
}
// Encoder for outbound streams: Encodes RPC Requests to peers
impl<TSpec: EthSpec> Encoder<RPCRequest<TSpec>> for SSZSnappyOutboundCodec<TSpec> {
type Error = RPCError;
fn encode(&mut self, item: RPCRequest<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = match item {
RPCRequest::Status(req) => req.as_ssz_bytes(),
RPCRequest::Goodbye(req) => req.as_ssz_bytes(),
RPCRequest::BlocksByRange(req) => req.as_ssz_bytes(),
RPCRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(),
RPCRequest::Ping(req) => req.as_ssz_bytes(),
RPCRequest::MetaData(_) => return Ok(()), // no metadata to encode
};
// SSZ encoded bytes should be within `max_packet_size`
if bytes.len() > self.max_packet_size {
return Err(RPCError::InternalError(
"attempting to encode data > max_packet_size",
));
}
// Inserts the length prefix of the uncompressed bytes into dst
// encoded as a unsigned varint
self.inner
.encode(bytes.len(), dst)
.map_err(RPCError::from)?;
let mut writer = FrameEncoder::new(Vec::new());
writer.write_all(&bytes).map_err(RPCError::from)?;
writer.flush().map_err(RPCError::from)?;
// Write compressed bytes to `dst`
dst.extend_from_slice(writer.get_ref());
Ok(())
}
}
// Decoder for outbound streams: Decodes RPC responses from peers.
//
// The majority of the decoding has now been pushed upstream due to the changing specification.
// We prefer to decode blocks and attestations with extra knowledge about the chain to perform
// faster verification checks before decoding entire blocks/attestations.
impl<TSpec: EthSpec> Decoder for SSZSnappyOutboundCodec<TSpec> {
type Item = RPCResponse<TSpec>;
type Error = RPCError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if self.len.is_none() {
// Decode the length of the uncompressed bytes from an unsigned varint
match self.inner.decode(src).map_err(RPCError::from)? {
Some(length) => {
self.len = Some(length as usize);
}
None => return Ok(None), // need more bytes to decode length
}
};
let length = self.len.expect("length should be Some");
// Should not attempt to decode rpc chunks with length > max_packet_size
if length > self.max_packet_size {
return Err(RPCError::InvalidData);
}
let mut reader = FrameDecoder::new(Cursor::new(&src));
let mut decoded_buffer = vec![0; length];
match reader.read_exact(&mut decoded_buffer) {
Ok(()) => {
// `n` is how many bytes the reader read in the compressed stream
let n = reader.get_ref().position();
self.len = None;
let _read_byts = src.split_to(n as usize);
match self.protocol.message_name {
Protocol::Status => match self.protocol.version {
Version::V1 => Ok(Some(RPCResponse::Status(
StatusMessage::from_ssz_bytes(&decoded_buffer)?,
))),
},
Protocol::Goodbye => {
// Goodbye does not have a response
Err(RPCError::InvalidData)
}
Protocol::BlocksByRange => match self.protocol.version {
Version::V1 => Ok(Some(RPCResponse::BlocksByRange(Box::new(
SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?,
)))),
},
Protocol::BlocksByRoot => match self.protocol.version {
Version::V1 => Ok(Some(RPCResponse::BlocksByRoot(Box::new(
SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?,
)))),
},
Protocol::Ping => match self.protocol.version {
Version::V1 => Ok(Some(RPCResponse::Pong(Ping {
data: u64::from_ssz_bytes(&decoded_buffer)?,
}))),
},
Protocol::MetaData => match self.protocol.version {
Version::V1 => Ok(Some(RPCResponse::MetaData(MetaData::from_ssz_bytes(
&decoded_buffer,
)?))),
},
}
}
Err(e) => match e.kind() {
// Haven't received enough bytes to decode yet
// TODO: check if this is the only Error variant where we return `Ok(None)`
ErrorKind::UnexpectedEof => {
return Ok(None);
}
_ => return Err(e).map_err(RPCError::from),
},
}
}
}
impl<TSpec: EthSpec> OutboundCodec<RPCRequest<TSpec>> for SSZSnappyOutboundCodec<TSpec> {
type ErrorType = String;
fn decode_error(&mut self, src: &mut BytesMut) -> Result<Option<Self::ErrorType>, RPCError> {
if self.len.is_none() {
// Decode the length of the uncompressed bytes from an unsigned varint
match self.inner.decode(src).map_err(RPCError::from)? {
Some(length) => {
self.len = Some(length as usize);
}
None => return Ok(None), // need more bytes to decode length
}
};
let length = self.len.expect("length should be Some");
// Should not attempt to decode rpc chunks with length > max_packet_size
if length > self.max_packet_size {
return Err(RPCError::InvalidData);
}
let mut reader = FrameDecoder::new(Cursor::new(&src));
let mut decoded_buffer = vec![0; length];
match reader.read_exact(&mut decoded_buffer) {
Ok(()) => {
// `n` is how many bytes the reader read in the compressed stream
let n = reader.get_ref().position();
self.len = None;
let _read_bytes = src.split_to(n as usize);
Ok(Some(
String::from_utf8_lossy(&<Vec<u8>>::from_ssz_bytes(&decoded_buffer)?).into(),
))
}
Err(e) => match e.kind() {
// Haven't received enough bytes to decode yet
// TODO: check if this is the only Error variant where we return `Ok(None)`
ErrorKind::UnexpectedEof => {
return Ok(None);
}
_ => return Err(e).map_err(RPCError::from),
},
}
}
}