mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-06 18:21:45 +00:00
## Issue Addressed #1112 The logic is slightly different but still valid wrt to error handling. - Inbound state is either Busy with a future that return the subtream (and info about the processing) - The state machine works as follows: - `Idle` with pending responses => `Busy` - `Busy` => finished ? if so and there are new pending responses then `Busy`, if not then `Idle` => not finished remains `Busy` - Add an `InboundInfo` for readability - Other stuff: - Close inbound substreams when all expected responses are sent - Remove the error variants from `RPCCodedResponse` and use the codes instead - Fix various spelling mistakes because I got sloppy last time Sorry for the delay Co-authored-by: Age Manning <Age@AgeManning.com>
325 lines
14 KiB
Rust
325 lines
14 KiB
Rust
use crate::rpc::methods::*;
|
|
use crate::rpc::{
|
|
codec::base::OutboundCodec,
|
|
protocol::{
|
|
Encoding, Protocol, ProtocolId, RPCError, Version, BLOCKS_BY_ROOT_REQUEST_MAX,
|
|
BLOCKS_BY_ROOT_REQUEST_MIN, SIGNED_BEACON_BLOCK_MAX, SIGNED_BEACON_BLOCK_MIN,
|
|
},
|
|
};
|
|
use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse};
|
|
use libp2p::bytes::{BufMut, Bytes, BytesMut};
|
|
use ssz::{Decode, Encode};
|
|
use ssz_types::VariableList;
|
|
use std::marker::PhantomData;
|
|
use tokio_util::codec::{Decoder, Encoder};
|
|
use types::{EthSpec, SignedBeaconBlock};
|
|
use unsigned_varint::codec::UviBytes;
|
|
|
|
/* Inbound Codec */
|
|
|
|
pub struct SSZInboundCodec<TSpec: EthSpec> {
|
|
inner: UviBytes,
|
|
protocol: ProtocolId,
|
|
phantom: PhantomData<TSpec>,
|
|
}
|
|
|
|
impl<TSpec: EthSpec> SSZInboundCodec<TSpec> {
|
|
pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self {
|
|
let mut uvi_codec = UviBytes::default();
|
|
uvi_codec.set_max_len(max_packet_size);
|
|
|
|
// this encoding only applies to ssz.
|
|
debug_assert_eq!(protocol.encoding, Encoding::SSZ);
|
|
|
|
SSZInboundCodec {
|
|
inner: uvi_codec,
|
|
protocol,
|
|
phantom: PhantomData,
|
|
}
|
|
}
|
|
}
|
|
|
|
// Encoder for inbound streams: Encodes RPC Responses sent to peers.
|
|
impl<TSpec: EthSpec> Encoder<RPCCodedResponse<TSpec>> for SSZInboundCodec<TSpec> {
|
|
type Error = RPCError;
|
|
|
|
fn encode(
|
|
&mut self,
|
|
item: RPCCodedResponse<TSpec>,
|
|
dst: &mut BytesMut,
|
|
) -> Result<(), Self::Error> {
|
|
let bytes = match item {
|
|
RPCCodedResponse::Success(resp) => match resp {
|
|
RPCResponse::Status(res) => res.as_ssz_bytes(),
|
|
RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(),
|
|
RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(),
|
|
RPCResponse::Pong(res) => res.data.as_ssz_bytes(),
|
|
RPCResponse::MetaData(res) => res.as_ssz_bytes(),
|
|
},
|
|
RPCCodedResponse::Error(_, err) => err.as_ssz_bytes(),
|
|
RPCCodedResponse::StreamTermination(_) => {
|
|
unreachable!("Code error - attempting to encode a stream termination")
|
|
}
|
|
};
|
|
if !bytes.is_empty() {
|
|
// length-prefix and return
|
|
return self
|
|
.inner
|
|
.encode(Bytes::from(bytes), dst)
|
|
.map_err(RPCError::from);
|
|
} else {
|
|
// payload is empty, add a 0-byte length prefix
|
|
dst.reserve(1);
|
|
dst.put_u8(0);
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
// Decoder for inbound streams: Decodes RPC requests from peers
|
|
impl<TSpec: EthSpec> Decoder for SSZInboundCodec<TSpec> {
|
|
type Item = RPCRequest<TSpec>;
|
|
type Error = RPCError;
|
|
|
|
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
|
match self.inner.decode(src).map_err(RPCError::from) {
|
|
Ok(Some(packet)) => match self.protocol.message_name {
|
|
Protocol::Status => match self.protocol.version {
|
|
Version::V1 => {
|
|
if packet.len() == <StatusMessage as Encode>::ssz_fixed_len() {
|
|
Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes(
|
|
&packet,
|
|
)?)))
|
|
} else {
|
|
Err(RPCError::InvalidData)
|
|
}
|
|
}
|
|
},
|
|
Protocol::Goodbye => match self.protocol.version {
|
|
Version::V1 => {
|
|
if packet.len() == <GoodbyeReason as Encode>::ssz_fixed_len() {
|
|
Ok(Some(RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes(
|
|
&packet,
|
|
)?)))
|
|
} else {
|
|
Err(RPCError::InvalidData)
|
|
}
|
|
}
|
|
},
|
|
Protocol::BlocksByRange => match self.protocol.version {
|
|
Version::V1 => {
|
|
if packet.len() == <BlocksByRangeRequest as Encode>::ssz_fixed_len() {
|
|
Ok(Some(RPCRequest::BlocksByRange(
|
|
BlocksByRangeRequest::from_ssz_bytes(&packet)?,
|
|
)))
|
|
} else {
|
|
Err(RPCError::InvalidData)
|
|
}
|
|
}
|
|
},
|
|
Protocol::BlocksByRoot => match self.protocol.version {
|
|
Version::V1 => {
|
|
if packet.len() >= *BLOCKS_BY_ROOT_REQUEST_MIN
|
|
&& packet.len() <= *BLOCKS_BY_ROOT_REQUEST_MAX
|
|
{
|
|
Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest {
|
|
block_roots: VariableList::from_ssz_bytes(&packet)?,
|
|
})))
|
|
} else {
|
|
Err(RPCError::InvalidData)
|
|
}
|
|
}
|
|
},
|
|
Protocol::Ping => match self.protocol.version {
|
|
Version::V1 => {
|
|
if packet.len() == <Ping as Encode>::ssz_fixed_len() {
|
|
Ok(Some(RPCRequest::Ping(Ping {
|
|
data: u64::from_ssz_bytes(&packet)?,
|
|
})))
|
|
} else {
|
|
Err(RPCError::InvalidData)
|
|
}
|
|
}
|
|
},
|
|
Protocol::MetaData => match self.protocol.version {
|
|
Version::V1 => {
|
|
if !packet.is_empty() {
|
|
Err(RPCError::InvalidData)
|
|
} else {
|
|
Ok(Some(RPCRequest::MetaData(PhantomData)))
|
|
}
|
|
}
|
|
},
|
|
},
|
|
Ok(None) => Ok(None),
|
|
Err(e) => Err(e),
|
|
}
|
|
}
|
|
}
|
|
|
|
/* Outbound Codec: Codec for initiating RPC requests */
|
|
|
|
pub struct SSZOutboundCodec<TSpec: EthSpec> {
|
|
inner: UviBytes,
|
|
protocol: ProtocolId,
|
|
phantom: PhantomData<TSpec>,
|
|
}
|
|
|
|
impl<TSpec: EthSpec> SSZOutboundCodec<TSpec> {
|
|
pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self {
|
|
let mut uvi_codec = UviBytes::default();
|
|
uvi_codec.set_max_len(max_packet_size);
|
|
|
|
// this encoding only applies to ssz.
|
|
debug_assert_eq!(protocol.encoding, Encoding::SSZ);
|
|
|
|
SSZOutboundCodec {
|
|
inner: uvi_codec,
|
|
protocol,
|
|
phantom: PhantomData,
|
|
}
|
|
}
|
|
}
|
|
|
|
// Encoder for outbound streams: Encodes RPC Requests to peers
|
|
impl<TSpec: EthSpec> Encoder<RPCRequest<TSpec>> for SSZOutboundCodec<TSpec> {
|
|
type Error = RPCError;
|
|
|
|
fn encode(&mut self, item: RPCRequest<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
|
let bytes = match item {
|
|
RPCRequest::Status(req) => req.as_ssz_bytes(),
|
|
RPCRequest::Goodbye(req) => req.as_ssz_bytes(),
|
|
RPCRequest::BlocksByRange(req) => req.as_ssz_bytes(),
|
|
RPCRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(),
|
|
RPCRequest::Ping(req) => req.as_ssz_bytes(),
|
|
RPCRequest::MetaData(_) => return Ok(()), // no metadata to encode
|
|
};
|
|
// length-prefix
|
|
self.inner
|
|
.encode(libp2p::bytes::Bytes::from(bytes), dst)
|
|
.map_err(RPCError::from)
|
|
}
|
|
}
|
|
|
|
// Decoder for outbound streams: Decodes RPC responses from peers.
|
|
//
|
|
// The majority of the decoding has now been pushed upstream due to the changing specification.
|
|
// We prefer to decode blocks and attestations with extra knowledge about the chain to perform
|
|
// faster verification checks before decoding entire blocks/attestations.
|
|
impl<TSpec: EthSpec> Decoder for SSZOutboundCodec<TSpec> {
|
|
type Item = RPCResponse<TSpec>;
|
|
type Error = RPCError;
|
|
|
|
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
|
if src.len() == 1 && src[0] == 0_u8 {
|
|
// the object is empty. We return the empty object if this is the case
|
|
// clear the buffer and return an empty object
|
|
src.clear();
|
|
match self.protocol.message_name {
|
|
Protocol::Status => match self.protocol.version {
|
|
Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty HELLO message. The stream has terminated unexpectedly
|
|
},
|
|
Protocol::Goodbye => Err(RPCError::InvalidData),
|
|
Protocol::BlocksByRange => match self.protocol.version {
|
|
Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty block message.
|
|
},
|
|
Protocol::BlocksByRoot => match self.protocol.version {
|
|
Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty block message.
|
|
},
|
|
Protocol::Ping => match self.protocol.version {
|
|
Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty block message.
|
|
},
|
|
Protocol::MetaData => match self.protocol.version {
|
|
Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty block message.
|
|
},
|
|
}
|
|
} else {
|
|
match self.inner.decode(src).map_err(RPCError::from) {
|
|
Ok(Some(mut packet)) => {
|
|
// take the bytes from the buffer
|
|
let raw_bytes = packet.split();
|
|
|
|
match self.protocol.message_name {
|
|
Protocol::Status => match self.protocol.version {
|
|
Version::V1 => {
|
|
if raw_bytes.len() == <StatusMessage as Encode>::ssz_fixed_len() {
|
|
Ok(Some(RPCResponse::Status(StatusMessage::from_ssz_bytes(
|
|
&raw_bytes,
|
|
)?)))
|
|
} else {
|
|
Err(RPCError::InvalidData)
|
|
}
|
|
}
|
|
},
|
|
Protocol::Goodbye => Err(RPCError::InvalidData),
|
|
Protocol::BlocksByRange => match self.protocol.version {
|
|
Version::V1 => {
|
|
if raw_bytes.len() >= *SIGNED_BEACON_BLOCK_MIN
|
|
&& raw_bytes.len() <= *SIGNED_BEACON_BLOCK_MAX
|
|
{
|
|
Ok(Some(RPCResponse::BlocksByRange(Box::new(
|
|
SignedBeaconBlock::from_ssz_bytes(&raw_bytes)?,
|
|
))))
|
|
} else {
|
|
Err(RPCError::InvalidData)
|
|
}
|
|
}
|
|
},
|
|
Protocol::BlocksByRoot => match self.protocol.version {
|
|
Version::V1 => {
|
|
if raw_bytes.len() >= *SIGNED_BEACON_BLOCK_MIN
|
|
&& raw_bytes.len() <= *SIGNED_BEACON_BLOCK_MAX
|
|
{
|
|
Ok(Some(RPCResponse::BlocksByRoot(Box::new(
|
|
SignedBeaconBlock::from_ssz_bytes(&raw_bytes)?,
|
|
))))
|
|
} else {
|
|
Err(RPCError::InvalidData)
|
|
}
|
|
}
|
|
},
|
|
Protocol::Ping => match self.protocol.version {
|
|
Version::V1 => {
|
|
if raw_bytes.len() == <Ping as Encode>::ssz_fixed_len() {
|
|
Ok(Some(RPCResponse::Pong(Ping {
|
|
data: u64::from_ssz_bytes(&raw_bytes)?,
|
|
})))
|
|
} else {
|
|
Err(RPCError::InvalidData)
|
|
}
|
|
}
|
|
},
|
|
Protocol::MetaData => match self.protocol.version {
|
|
Version::V1 => {
|
|
if raw_bytes.len() == <MetaData<TSpec> as Encode>::ssz_fixed_len() {
|
|
Ok(Some(RPCResponse::MetaData(MetaData::from_ssz_bytes(
|
|
&raw_bytes,
|
|
)?)))
|
|
} else {
|
|
Err(RPCError::InvalidData)
|
|
}
|
|
}
|
|
},
|
|
}
|
|
}
|
|
Ok(None) => Ok(None), // waiting for more bytes
|
|
Err(e) => Err(e),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<TSpec: EthSpec> OutboundCodec<RPCRequest<TSpec>> for SSZOutboundCodec<TSpec> {
|
|
type ErrorType = String;
|
|
|
|
fn decode_error(&mut self, src: &mut BytesMut) -> Result<Option<Self::ErrorType>, RPCError> {
|
|
match self.inner.decode(src).map_err(RPCError::from) {
|
|
Ok(Some(packet)) => Ok(Some(
|
|
String::from_utf8_lossy(&<Vec<u8>>::from_ssz_bytes(&packet)?).into(),
|
|
)),
|
|
Ok(None) => Ok(None),
|
|
Err(e) => Err(e),
|
|
}
|
|
}
|
|
}
|