Rework internal rpc protocol handling (#4290)

## Issue Addressed

Resolves #3980. Builds on work by @GeemoCandama in #4084 

## Proposed Changes

Extends the `SupportedProtocol` abstraction added in Geemo's PR and attempts to fix internal versioning of requests that are mentioned in this comment https://github.com/sigp/lighthouse/pull/4084#issuecomment-1496380033 

Co-authored-by: geemo <geemo@tutanota.com>
This commit is contained in:
Pawan Dhananjay
2023-06-14 05:08:50 +00:00
parent 0caaad4c03
commit 0ecca1dcb0
17 changed files with 619 additions and 584 deletions

View File

@@ -214,8 +214,7 @@ mod tests {
let mut buf = BytesMut::new();
buf.extend_from_slice(&message);
let snappy_protocol_id =
ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy);
let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy);
let fork_context = Arc::new(fork_context(ForkName::Base));
let mut snappy_outbound_codec = SSZSnappyOutboundCodec::<Spec>::new(
@@ -249,8 +248,7 @@ mod tests {
// Insert length-prefix
uvi_codec.encode(len, &mut dst).unwrap();
let snappy_protocol_id =
ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy);
let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy);
let fork_context = Arc::new(fork_context(ForkName::Base));
let mut snappy_outbound_codec = SSZSnappyOutboundCodec::<Spec>::new(
@@ -277,8 +275,7 @@ mod tests {
dst
}
let protocol_id =
ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy);
let protocol_id = ProtocolId::new(SupportedProtocol::BlocksByRangeV1, Encoding::SSZSnappy);
// Response limits
let fork_context = Arc::new(fork_context(ForkName::Base));

View File

@@ -1,9 +1,9 @@
use crate::rpc::methods::*;
use crate::rpc::{
codec::base::OutboundCodec,
protocol::{Encoding, Protocol, ProtocolId, RPCError, Version, ERROR_TYPE_MAX, ERROR_TYPE_MIN},
protocol::{Encoding, ProtocolId, RPCError, SupportedProtocol, ERROR_TYPE_MAX, ERROR_TYPE_MIN},
};
use crate::rpc::{InboundRequest, OutboundRequest, RPCCodedResponse, RPCResponse};
use crate::{rpc::methods::*, EnrSyncCommitteeBitfield};
use libp2p::bytes::BytesMut;
use snap::read::FrameDecoder;
use snap::write::FrameEncoder;
@@ -76,27 +76,14 @@ impl<TSpec: EthSpec> Encoder<RPCCodedResponse<TSpec>> for SSZSnappyInboundCodec<
RPCResponse::MetaData(res) =>
// Encode the correct version of the MetaData response based on the negotiated version.
{
match self.protocol.version {
Version::V1 => MetaData::<TSpec>::V1(MetaDataV1 {
seq_number: *res.seq_number(),
attnets: res.attnets().clone(),
})
.as_ssz_bytes(),
Version::V2 => {
// `res` is of type MetaDataV2, return the ssz bytes
if res.syncnets().is_ok() {
res.as_ssz_bytes()
} else {
// `res` is of type MetaDataV1, create a MetaDataV2 by adding a default syncnets field
// Note: This code path is redundant as `res` would be always of type MetaDataV2
MetaData::<TSpec>::V2(MetaDataV2 {
seq_number: *res.seq_number(),
attnets: res.attnets().clone(),
syncnets: EnrSyncCommitteeBitfield::<TSpec>::default(),
})
.as_ssz_bytes()
}
}
match self.protocol.versioned_protocol {
SupportedProtocol::MetaDataV1 => res.metadata_v1().as_ssz_bytes(),
// We always send V2 metadata responses from the behaviour
// No change required.
SupportedProtocol::MetaDataV2 => res.metadata_v2().as_ssz_bytes(),
_ => unreachable!(
"We only send metadata responses on negotiating metadata requests"
),
}
}
},
@@ -139,8 +126,11 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
type Error = RPCError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if self.protocol.message_name == Protocol::MetaData {
return Ok(Some(InboundRequest::MetaData(PhantomData)));
if self.protocol.versioned_protocol == SupportedProtocol::MetaDataV1 {
return Ok(Some(InboundRequest::MetaData(MetadataRequest::new_v1())));
}
if self.protocol.versioned_protocol == SupportedProtocol::MetaDataV2 {
return Ok(Some(InboundRequest::MetaData(MetadataRequest::new_v2())));
}
let length = match handle_length(&mut self.inner, &mut self.len, src)? {
Some(len) => len,
@@ -152,8 +142,8 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
let ssz_limits = self.protocol.rpc_request_limits();
if ssz_limits.is_out_of_bounds(length, self.max_packet_size) {
return Err(RPCError::InvalidData(format!(
"RPC request length is out of bounds, length {}",
length
"RPC request length for protocol {:?} is out of bounds, length {}",
self.protocol.versioned_protocol, length
)));
}
// Calculate worst case compression length for given uncompressed length
@@ -170,11 +160,7 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
let n = reader.get_ref().get_ref().position();
self.len = None;
let _read_bytes = src.split_to(n as usize);
match self.protocol.version {
Version::V1 => handle_v1_request(self.protocol.message_name, &decoded_buffer),
Version::V2 => handle_v2_request(self.protocol.message_name, &decoded_buffer),
}
handle_rpc_request(self.protocol.versioned_protocol, &decoded_buffer)
}
Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len),
}
@@ -228,11 +214,16 @@ impl<TSpec: EthSpec> Encoder<OutboundRequest<TSpec>> for SSZSnappyOutboundCodec<
let bytes = match item {
OutboundRequest::Status(req) => req.as_ssz_bytes(),
OutboundRequest::Goodbye(req) => req.as_ssz_bytes(),
OutboundRequest::BlocksByRange(req) => req.as_ssz_bytes(),
OutboundRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(),
OutboundRequest::BlocksByRange(r) => match r {
OldBlocksByRangeRequest::V1(req) => req.as_ssz_bytes(),
OldBlocksByRangeRequest::V2(req) => req.as_ssz_bytes(),
},
OutboundRequest::BlocksByRoot(r) => match r {
BlocksByRootRequest::V1(req) => req.block_roots.as_ssz_bytes(),
BlocksByRootRequest::V2(req) => req.block_roots.as_ssz_bytes(),
},
OutboundRequest::Ping(req) => req.as_ssz_bytes(),
OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode
OutboundRequest::LightClientBootstrap(req) => req.as_ssz_bytes(),
};
// SSZ encoded bytes should be within `max_packet_size`
if bytes.len() > self.max_packet_size {
@@ -311,15 +302,10 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyOutboundCodec<TSpec> {
let n = reader.get_ref().get_ref().position();
self.len = None;
let _read_bytes = src.split_to(n as usize);
match self.protocol.version {
Version::V1 => handle_v1_response(self.protocol.message_name, &decoded_buffer),
Version::V2 => handle_v2_response(
self.protocol.message_name,
&decoded_buffer,
&mut self.fork_name,
),
}
// Safe to `take` from `self.fork_name` as we have all the bytes we need to
// decode an ssz object at this point.
let fork_name = self.fork_name.take();
handle_rpc_response(self.protocol.versioned_protocol, &decoded_buffer, fork_name)
}
Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len),
}
@@ -456,181 +442,150 @@ fn handle_length(
}
}
/// Decodes a `Version::V1` `InboundRequest` from the byte stream.
/// Decodes an `InboundRequest` from the byte stream.
/// `decoded_buffer` should be an ssz-encoded bytestream with
// length = length-prefix received in the beginning of the stream.
fn handle_v1_request<T: EthSpec>(
protocol: Protocol,
fn handle_rpc_request<T: EthSpec>(
versioned_protocol: SupportedProtocol,
decoded_buffer: &[u8],
) -> Result<Option<InboundRequest<T>>, RPCError> {
match protocol {
Protocol::Status => Ok(Some(InboundRequest::Status(StatusMessage::from_ssz_bytes(
decoded_buffer,
)?))),
Protocol::Goodbye => Ok(Some(InboundRequest::Goodbye(
match versioned_protocol {
SupportedProtocol::StatusV1 => Ok(Some(InboundRequest::Status(
StatusMessage::from_ssz_bytes(decoded_buffer)?,
))),
SupportedProtocol::GoodbyeV1 => Ok(Some(InboundRequest::Goodbye(
GoodbyeReason::from_ssz_bytes(decoded_buffer)?,
))),
Protocol::BlocksByRange => Ok(Some(InboundRequest::BlocksByRange(
OldBlocksByRangeRequest::from_ssz_bytes(decoded_buffer)?,
SupportedProtocol::BlocksByRangeV2 => Ok(Some(InboundRequest::BlocksByRange(
OldBlocksByRangeRequest::V2(OldBlocksByRangeRequestV2::from_ssz_bytes(decoded_buffer)?),
))),
Protocol::BlocksByRoot => Ok(Some(InboundRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: VariableList::from_ssz_bytes(decoded_buffer)?,
}))),
Protocol::Ping => Ok(Some(InboundRequest::Ping(Ping {
SupportedProtocol::BlocksByRangeV1 => Ok(Some(InboundRequest::BlocksByRange(
OldBlocksByRangeRequest::V1(OldBlocksByRangeRequestV1::from_ssz_bytes(decoded_buffer)?),
))),
SupportedProtocol::BlocksByRootV2 => Ok(Some(InboundRequest::BlocksByRoot(
BlocksByRootRequest::V2(BlocksByRootRequestV2 {
block_roots: VariableList::from_ssz_bytes(decoded_buffer)?,
}),
))),
SupportedProtocol::BlocksByRootV1 => Ok(Some(InboundRequest::BlocksByRoot(
BlocksByRootRequest::V1(BlocksByRootRequestV1 {
block_roots: VariableList::from_ssz_bytes(decoded_buffer)?,
}),
))),
SupportedProtocol::PingV1 => Ok(Some(InboundRequest::Ping(Ping {
data: u64::from_ssz_bytes(decoded_buffer)?,
}))),
Protocol::LightClientBootstrap => Ok(Some(InboundRequest::LightClientBootstrap(
LightClientBootstrapRequest {
SupportedProtocol::LightClientBootstrapV1 => Ok(Some(
InboundRequest::LightClientBootstrap(LightClientBootstrapRequest {
root: Hash256::from_ssz_bytes(decoded_buffer)?,
},
))),
}),
)),
// MetaData requests return early from InboundUpgrade and do not reach the decoder.
// Handle this case just for completeness.
Protocol::MetaData => {
SupportedProtocol::MetaDataV2 => {
if !decoded_buffer.is_empty() {
Err(RPCError::InternalError(
"Metadata requests shouldn't reach decoder",
))
} else {
Ok(Some(InboundRequest::MetaData(PhantomData)))
Ok(Some(InboundRequest::MetaData(MetadataRequest::new_v2())))
}
}
}
}
/// Decodes a `Version::V2` `InboundRequest` from the byte stream.
/// `decoded_buffer` should be an ssz-encoded bytestream with
// length = length-prefix received in the beginning of the stream.
fn handle_v2_request<T: EthSpec>(
protocol: Protocol,
decoded_buffer: &[u8],
) -> Result<Option<InboundRequest<T>>, RPCError> {
match protocol {
Protocol::BlocksByRange => Ok(Some(InboundRequest::BlocksByRange(
OldBlocksByRangeRequest::from_ssz_bytes(decoded_buffer)?,
))),
Protocol::BlocksByRoot => Ok(Some(InboundRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: VariableList::from_ssz_bytes(decoded_buffer)?,
}))),
// MetaData requests return early from InboundUpgrade and do not reach the decoder.
// Handle this case just for completeness.
Protocol::MetaData => {
SupportedProtocol::MetaDataV1 => {
if !decoded_buffer.is_empty() {
Err(RPCError::InvalidData("Metadata request".to_string()))
} else {
Ok(Some(InboundRequest::MetaData(PhantomData)))
Ok(Some(InboundRequest::MetaData(MetadataRequest::new_v1())))
}
}
_ => Err(RPCError::ErrorResponse(
RPCResponseErrorCode::InvalidRequest,
format!("{} does not support version 2", protocol),
)),
}
}
/// Decodes a `Version::V1` `RPCResponse` from the byte stream.
/// Decodes a `RPCResponse` from the byte stream.
/// `decoded_buffer` should be an ssz-encoded bytestream with
// length = length-prefix received in the beginning of the stream.
fn handle_v1_response<T: EthSpec>(
protocol: Protocol,
decoded_buffer: &[u8],
) -> Result<Option<RPCResponse<T>>, RPCError> {
match protocol {
Protocol::Status => Ok(Some(RPCResponse::Status(StatusMessage::from_ssz_bytes(
decoded_buffer,
)?))),
// This case should be unreachable as `Goodbye` has no response.
Protocol::Goodbye => Err(RPCError::InvalidData(
"Goodbye RPC message has no valid response".to_string(),
)),
Protocol::BlocksByRange => Ok(Some(RPCResponse::BlocksByRange(Arc::new(
SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?),
)))),
Protocol::BlocksByRoot => Ok(Some(RPCResponse::BlocksByRoot(Arc::new(
SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?),
)))),
Protocol::Ping => Ok(Some(RPCResponse::Pong(Ping {
data: u64::from_ssz_bytes(decoded_buffer)?,
}))),
Protocol::MetaData => Ok(Some(RPCResponse::MetaData(MetaData::V1(
MetaDataV1::from_ssz_bytes(decoded_buffer)?,
)))),
Protocol::LightClientBootstrap => Ok(Some(RPCResponse::LightClientBootstrap(
LightClientBootstrap::from_ssz_bytes(decoded_buffer)?,
))),
}
}
/// Decodes a `Version::V2` `RPCResponse` from the byte stream.
/// `decoded_buffer` should be an ssz-encoded bytestream with
// length = length-prefix received in the beginning of the stream.
/// length = length-prefix received in the beginning of the stream.
///
/// For BlocksByRange/BlocksByRoot reponses, decodes the appropriate response
/// according to the received `ForkName`.
fn handle_v2_response<T: EthSpec>(
protocol: Protocol,
fn handle_rpc_response<T: EthSpec>(
versioned_protocol: SupportedProtocol,
decoded_buffer: &[u8],
fork_name: &mut Option<ForkName>,
fork_name: Option<ForkName>,
) -> Result<Option<RPCResponse<T>>, RPCError> {
// MetaData does not contain context_bytes
if let Protocol::MetaData = protocol {
Ok(Some(RPCResponse::MetaData(MetaData::V2(
match versioned_protocol {
SupportedProtocol::StatusV1 => Ok(Some(RPCResponse::Status(
StatusMessage::from_ssz_bytes(decoded_buffer)?,
))),
// This case should be unreachable as `Goodbye` has no response.
SupportedProtocol::GoodbyeV1 => Err(RPCError::InvalidData(
"Goodbye RPC message has no valid response".to_string(),
)),
SupportedProtocol::BlocksByRangeV1 => Ok(Some(RPCResponse::BlocksByRange(Arc::new(
SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?),
)))),
SupportedProtocol::BlocksByRootV1 => Ok(Some(RPCResponse::BlocksByRoot(Arc::new(
SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?),
)))),
SupportedProtocol::PingV1 => Ok(Some(RPCResponse::Pong(Ping {
data: u64::from_ssz_bytes(decoded_buffer)?,
}))),
SupportedProtocol::MetaDataV1 => Ok(Some(RPCResponse::MetaData(MetaData::V1(
MetaDataV1::from_ssz_bytes(decoded_buffer)?,
)))),
SupportedProtocol::LightClientBootstrapV1 => Ok(Some(RPCResponse::LightClientBootstrap(
LightClientBootstrap::from_ssz_bytes(decoded_buffer)?,
))),
// MetaData V2 responses have no context bytes, so behave similarly to V1 responses
SupportedProtocol::MetaDataV2 => Ok(Some(RPCResponse::MetaData(MetaData::V2(
MetaDataV2::from_ssz_bytes(decoded_buffer)?,
))))
} else {
let fork_name = fork_name.take().ok_or_else(|| {
RPCError::ErrorResponse(
RPCResponseErrorCode::InvalidRequest,
format!("No context bytes provided for {} response", protocol),
)
})?;
match protocol {
Protocol::BlocksByRange => match fork_name {
ForkName::Altair => Ok(Some(RPCResponse::BlocksByRange(Arc::new(
SignedBeaconBlock::Altair(SignedBeaconBlockAltair::from_ssz_bytes(
decoded_buffer,
)?),
)))),
)))),
SupportedProtocol::BlocksByRangeV2 => match fork_name {
Some(ForkName::Altair) => Ok(Some(RPCResponse::BlocksByRange(Arc::new(
SignedBeaconBlock::Altair(SignedBeaconBlockAltair::from_ssz_bytes(decoded_buffer)?),
)))),
ForkName::Base => Ok(Some(RPCResponse::BlocksByRange(Arc::new(
SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?),
)))),
ForkName::Merge => Ok(Some(RPCResponse::BlocksByRange(Arc::new(
SignedBeaconBlock::Merge(SignedBeaconBlockMerge::from_ssz_bytes(
decoded_buffer,
)?),
)))),
ForkName::Capella => Ok(Some(RPCResponse::BlocksByRange(Arc::new(
SignedBeaconBlock::Capella(SignedBeaconBlockCapella::from_ssz_bytes(
decoded_buffer,
)?),
)))),
},
Protocol::BlocksByRoot => match fork_name {
ForkName::Altair => Ok(Some(RPCResponse::BlocksByRoot(Arc::new(
SignedBeaconBlock::Altair(SignedBeaconBlockAltair::from_ssz_bytes(
decoded_buffer,
)?),
)))),
ForkName::Base => Ok(Some(RPCResponse::BlocksByRoot(Arc::new(
SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?),
)))),
ForkName::Merge => Ok(Some(RPCResponse::BlocksByRoot(Arc::new(
SignedBeaconBlock::Merge(SignedBeaconBlockMerge::from_ssz_bytes(
decoded_buffer,
)?),
)))),
ForkName::Capella => Ok(Some(RPCResponse::BlocksByRoot(Arc::new(
SignedBeaconBlock::Capella(SignedBeaconBlockCapella::from_ssz_bytes(
decoded_buffer,
)?),
)))),
},
_ => Err(RPCError::ErrorResponse(
Some(ForkName::Base) => Ok(Some(RPCResponse::BlocksByRange(Arc::new(
SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?),
)))),
Some(ForkName::Merge) => Ok(Some(RPCResponse::BlocksByRange(Arc::new(
SignedBeaconBlock::Merge(SignedBeaconBlockMerge::from_ssz_bytes(decoded_buffer)?),
)))),
Some(ForkName::Capella) => Ok(Some(RPCResponse::BlocksByRange(Arc::new(
SignedBeaconBlock::Capella(SignedBeaconBlockCapella::from_ssz_bytes(
decoded_buffer,
)?),
)))),
None => Err(RPCError::ErrorResponse(
RPCResponseErrorCode::InvalidRequest,
"Invalid v2 request".to_string(),
format!(
"No context bytes provided for {:?} response",
versioned_protocol
),
)),
}
},
SupportedProtocol::BlocksByRootV2 => match fork_name {
Some(ForkName::Altair) => Ok(Some(RPCResponse::BlocksByRoot(Arc::new(
SignedBeaconBlock::Altair(SignedBeaconBlockAltair::from_ssz_bytes(decoded_buffer)?),
)))),
Some(ForkName::Base) => Ok(Some(RPCResponse::BlocksByRoot(Arc::new(
SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?),
)))),
Some(ForkName::Merge) => Ok(Some(RPCResponse::BlocksByRoot(Arc::new(
SignedBeaconBlock::Merge(SignedBeaconBlockMerge::from_ssz_bytes(decoded_buffer)?),
)))),
Some(ForkName::Capella) => Ok(Some(RPCResponse::BlocksByRoot(Arc::new(
SignedBeaconBlock::Capella(SignedBeaconBlockCapella::from_ssz_bytes(
decoded_buffer,
)?),
)))),
None => Err(RPCError::ErrorResponse(
RPCResponseErrorCode::InvalidRequest,
format!(
"No context bytes provided for {:?} response",
versioned_protocol
),
)),
},
}
}
@@ -742,18 +697,20 @@ mod tests {
}
}
fn bbrange_request() -> OldBlocksByRangeRequest {
OldBlocksByRangeRequest {
start_slot: 0,
count: 10,
step: 1,
}
fn bbrange_request_v1() -> OldBlocksByRangeRequest {
OldBlocksByRangeRequest::new_v1(0, 10, 1)
}
fn bbroot_request() -> BlocksByRootRequest {
BlocksByRootRequest {
block_roots: VariableList::from(vec![Hash256::zero()]),
}
fn bbrange_request_v2() -> OldBlocksByRangeRequest {
OldBlocksByRangeRequest::new(0, 10, 1)
}
fn bbroot_request_v1() -> BlocksByRootRequest {
BlocksByRootRequest::new_v1(vec![Hash256::zero()].into())
}
fn bbroot_request_v2() -> BlocksByRootRequest {
BlocksByRootRequest::new(vec![Hash256::zero()].into())
}
fn ping_message() -> Ping {
@@ -777,12 +734,11 @@ mod tests {
/// Encodes the given protocol response as bytes.
fn encode_response(
protocol: Protocol,
version: Version,
protocol: SupportedProtocol,
message: RPCCodedResponse<Spec>,
fork_name: ForkName,
) -> Result<BytesMut, RPCError> {
let snappy_protocol_id = ProtocolId::new(protocol, version, Encoding::SSZSnappy);
let snappy_protocol_id = ProtocolId::new(protocol, Encoding::SSZSnappy);
let fork_context = Arc::new(fork_context(fork_name));
let max_packet_size = max_rpc_size(&fork_context);
@@ -824,12 +780,11 @@ mod tests {
/// Attempts to decode the given protocol bytes as an rpc response
fn decode_response(
protocol: Protocol,
version: Version,
protocol: SupportedProtocol,
message: &mut BytesMut,
fork_name: ForkName,
) -> Result<Option<RPCResponse<Spec>>, RPCError> {
let snappy_protocol_id = ProtocolId::new(protocol, version, Encoding::SSZSnappy);
let snappy_protocol_id = ProtocolId::new(protocol, Encoding::SSZSnappy);
let fork_context = Arc::new(fork_context(fork_name));
let max_packet_size = max_rpc_size(&fork_context);
let mut snappy_outbound_codec =
@@ -840,63 +795,55 @@ mod tests {
/// Encodes the provided protocol message as bytes and tries to decode the encoding bytes.
fn encode_then_decode_response(
protocol: Protocol,
version: Version,
protocol: SupportedProtocol,
message: RPCCodedResponse<Spec>,
fork_name: ForkName,
) -> Result<Option<RPCResponse<Spec>>, RPCError> {
let mut encoded = encode_response(protocol, version.clone(), message, fork_name)?;
decode_response(protocol, version, &mut encoded, fork_name)
let mut encoded = encode_response(protocol, message, fork_name)?;
decode_response(protocol, &mut encoded, fork_name)
}
/// Verifies that requests we send are encoded in a way that we would correctly decode too.
fn encode_then_decode_request(req: OutboundRequest<Spec>, fork_name: ForkName) {
let fork_context = Arc::new(fork_context(fork_name));
let max_packet_size = max_rpc_size(&fork_context);
for protocol in req.supported_protocols() {
// Encode a request we send
let mut buf = BytesMut::new();
let mut outbound_codec = SSZSnappyOutboundCodec::<Spec>::new(
protocol.clone(),
max_packet_size,
fork_context.clone(),
);
outbound_codec.encode(req.clone(), &mut buf).unwrap();
let protocol = ProtocolId::new(req.versioned_protocol(), Encoding::SSZSnappy);
// Encode a request we send
let mut buf = BytesMut::new();
let mut outbound_codec = SSZSnappyOutboundCodec::<Spec>::new(
protocol.clone(),
max_packet_size,
fork_context.clone(),
);
outbound_codec.encode(req.clone(), &mut buf).unwrap();
let mut inbound_codec = SSZSnappyInboundCodec::<Spec>::new(
protocol.clone(),
max_packet_size,
fork_context.clone(),
);
let mut inbound_codec =
SSZSnappyInboundCodec::<Spec>::new(protocol.clone(), max_packet_size, fork_context);
let decoded = inbound_codec.decode(&mut buf).unwrap().unwrap_or_else(|| {
panic!(
"Should correctly decode the request {} over protocol {:?} and fork {}",
req, protocol, fork_name
)
});
match req.clone() {
OutboundRequest::Status(status) => {
assert_eq!(decoded, InboundRequest::Status(status))
}
OutboundRequest::Goodbye(goodbye) => {
assert_eq!(decoded, InboundRequest::Goodbye(goodbye))
}
OutboundRequest::BlocksByRange(bbrange) => {
assert_eq!(decoded, InboundRequest::BlocksByRange(bbrange))
}
OutboundRequest::BlocksByRoot(bbroot) => {
assert_eq!(decoded, InboundRequest::BlocksByRoot(bbroot))
}
OutboundRequest::Ping(ping) => {
assert_eq!(decoded, InboundRequest::Ping(ping))
}
OutboundRequest::MetaData(metadata) => {
assert_eq!(decoded, InboundRequest::MetaData(metadata))
}
OutboundRequest::LightClientBootstrap(bootstrap) => {
assert_eq!(decoded, InboundRequest::LightClientBootstrap(bootstrap))
}
let decoded = inbound_codec.decode(&mut buf).unwrap().unwrap_or_else(|| {
panic!(
"Should correctly decode the request {} over protocol {:?} and fork {}",
req, protocol, fork_name
)
});
match req {
OutboundRequest::Status(status) => {
assert_eq!(decoded, InboundRequest::Status(status))
}
OutboundRequest::Goodbye(goodbye) => {
assert_eq!(decoded, InboundRequest::Goodbye(goodbye))
}
OutboundRequest::BlocksByRange(bbrange) => {
assert_eq!(decoded, InboundRequest::BlocksByRange(bbrange))
}
OutboundRequest::BlocksByRoot(bbroot) => {
assert_eq!(decoded, InboundRequest::BlocksByRoot(bbroot))
}
OutboundRequest::Ping(ping) => {
assert_eq!(decoded, InboundRequest::Ping(ping))
}
OutboundRequest::MetaData(metadata) => {
assert_eq!(decoded, InboundRequest::MetaData(metadata))
}
}
}
@@ -906,8 +853,7 @@ mod tests {
fn test_encode_then_decode_v1() {
assert_eq!(
encode_then_decode_response(
Protocol::Status,
Version::V1,
SupportedProtocol::StatusV1,
RPCCodedResponse::Success(RPCResponse::Status(status_message())),
ForkName::Base,
),
@@ -916,8 +862,7 @@ mod tests {
assert_eq!(
encode_then_decode_response(
Protocol::Ping,
Version::V1,
SupportedProtocol::PingV1,
RPCCodedResponse::Success(RPCResponse::Pong(ping_message())),
ForkName::Base,
),
@@ -926,8 +871,7 @@ mod tests {
assert_eq!(
encode_then_decode_response(
Protocol::BlocksByRange,
Version::V1,
SupportedProtocol::BlocksByRangeV1,
RPCCodedResponse::Success(RPCResponse::BlocksByRange(Arc::new(empty_base_block()))),
ForkName::Base,
),
@@ -939,8 +883,7 @@ mod tests {
assert!(
matches!(
encode_then_decode_response(
Protocol::BlocksByRange,
Version::V1,
SupportedProtocol::BlocksByRangeV1,
RPCCodedResponse::Success(RPCResponse::BlocksByRange(Arc::new(altair_block()))),
ForkName::Altair,
)
@@ -952,8 +895,7 @@ mod tests {
assert_eq!(
encode_then_decode_response(
Protocol::BlocksByRoot,
Version::V1,
SupportedProtocol::BlocksByRootV1,
RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(empty_base_block()))),
ForkName::Base,
),
@@ -965,8 +907,7 @@ mod tests {
assert!(
matches!(
encode_then_decode_response(
Protocol::BlocksByRoot,
Version::V1,
SupportedProtocol::BlocksByRootV1,
RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(altair_block()))),
ForkName::Altair,
)
@@ -978,18 +919,7 @@ mod tests {
assert_eq!(
encode_then_decode_response(
Protocol::MetaData,
Version::V1,
RPCCodedResponse::Success(RPCResponse::MetaData(metadata())),
ForkName::Base,
),
Ok(Some(RPCResponse::MetaData(metadata()))),
);
assert_eq!(
encode_then_decode_response(
Protocol::MetaData,
Version::V1,
SupportedProtocol::MetaDataV1,
RPCCodedResponse::Success(RPCResponse::MetaData(metadata())),
ForkName::Base,
),
@@ -999,8 +929,7 @@ mod tests {
// A MetaDataV2 still encodes as a MetaDataV1 since version is Version::V1
assert_eq!(
encode_then_decode_response(
Protocol::MetaData,
Version::V1,
SupportedProtocol::MetaDataV1,
RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v2())),
ForkName::Base,
),
@@ -1011,38 +940,9 @@ mod tests {
// Test RPCResponse encoding/decoding for V1 messages
#[test]
fn test_encode_then_decode_v2() {
assert!(
matches!(
encode_then_decode_response(
Protocol::Status,
Version::V2,
RPCCodedResponse::Success(RPCResponse::Status(status_message())),
ForkName::Base,
)
.unwrap_err(),
RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _),
),
"status does not have V2 message"
);
assert!(
matches!(
encode_then_decode_response(
Protocol::Ping,
Version::V2,
RPCCodedResponse::Success(RPCResponse::Pong(ping_message())),
ForkName::Base,
)
.unwrap_err(),
RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _),
),
"ping does not have V2 message"
);
assert_eq!(
encode_then_decode_response(
Protocol::BlocksByRange,
Version::V2,
SupportedProtocol::BlocksByRangeV2,
RPCCodedResponse::Success(RPCResponse::BlocksByRange(Arc::new(empty_base_block()))),
ForkName::Base,
),
@@ -1056,8 +956,7 @@ mod tests {
// the current_fork's rpc limit
assert_eq!(
encode_then_decode_response(
Protocol::BlocksByRange,
Version::V2,
SupportedProtocol::BlocksByRangeV2,
RPCCodedResponse::Success(RPCResponse::BlocksByRange(Arc::new(empty_base_block()))),
ForkName::Altair,
),
@@ -1068,8 +967,7 @@ mod tests {
assert_eq!(
encode_then_decode_response(
Protocol::BlocksByRange,
Version::V2,
SupportedProtocol::BlocksByRangeV2,
RPCCodedResponse::Success(RPCResponse::BlocksByRange(Arc::new(altair_block()))),
ForkName::Altair,
),
@@ -1081,8 +979,7 @@ mod tests {
assert_eq!(
encode_then_decode_response(
Protocol::BlocksByRange,
Version::V2,
SupportedProtocol::BlocksByRangeV2,
RPCCodedResponse::Success(RPCResponse::BlocksByRange(Arc::new(
merge_block_small.clone()
))),
@@ -1100,8 +997,7 @@ mod tests {
assert!(
matches!(
decode_response(
Protocol::BlocksByRange,
Version::V2,
SupportedProtocol::BlocksByRangeV2,
&mut encoded,
ForkName::Merge,
)
@@ -1113,8 +1009,7 @@ mod tests {
assert_eq!(
encode_then_decode_response(
Protocol::BlocksByRoot,
Version::V2,
SupportedProtocol::BlocksByRootV2,
RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(empty_base_block()))),
ForkName::Base,
),
@@ -1128,8 +1023,7 @@ mod tests {
// the current_fork's rpc limit
assert_eq!(
encode_then_decode_response(
Protocol::BlocksByRoot,
Version::V2,
SupportedProtocol::BlocksByRootV2,
RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(empty_base_block()))),
ForkName::Altair,
),
@@ -1140,8 +1034,7 @@ mod tests {
assert_eq!(
encode_then_decode_response(
Protocol::BlocksByRoot,
Version::V2,
SupportedProtocol::BlocksByRootV2,
RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(altair_block()))),
ForkName::Altair,
),
@@ -1150,8 +1043,7 @@ mod tests {
assert_eq!(
encode_then_decode_response(
Protocol::BlocksByRoot,
Version::V2,
SupportedProtocol::BlocksByRootV2,
RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(
merge_block_small.clone()
))),
@@ -1167,8 +1059,7 @@ mod tests {
assert!(
matches!(
decode_response(
Protocol::BlocksByRoot,
Version::V2,
SupportedProtocol::BlocksByRootV2,
&mut encoded,
ForkName::Merge,
)
@@ -1181,8 +1072,7 @@ mod tests {
// A MetaDataV1 still encodes as a MetaDataV2 since version is Version::V2
assert_eq!(
encode_then_decode_response(
Protocol::MetaData,
Version::V2,
SupportedProtocol::MetaDataV2,
RPCCodedResponse::Success(RPCResponse::MetaData(metadata())),
ForkName::Base,
),
@@ -1191,8 +1081,7 @@ mod tests {
assert_eq!(
encode_then_decode_response(
Protocol::MetaData,
Version::V2,
SupportedProtocol::MetaDataV2,
RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v2())),
ForkName::Altair,
),
@@ -1207,8 +1096,7 @@ mod tests {
// Removing context bytes for v2 messages should error
let mut encoded_bytes = encode_response(
Protocol::BlocksByRange,
Version::V2,
SupportedProtocol::BlocksByRangeV2,
RPCCodedResponse::Success(RPCResponse::BlocksByRange(Arc::new(empty_base_block()))),
ForkName::Base,
)
@@ -1218,8 +1106,7 @@ mod tests {
assert!(matches!(
decode_response(
Protocol::BlocksByRange,
Version::V2,
SupportedProtocol::BlocksByRangeV2,
&mut encoded_bytes,
ForkName::Base
)
@@ -1228,8 +1115,7 @@ mod tests {
));
let mut encoded_bytes = encode_response(
Protocol::BlocksByRoot,
Version::V2,
SupportedProtocol::BlocksByRootV2,
RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(empty_base_block()))),
ForkName::Base,
)
@@ -1239,8 +1125,7 @@ mod tests {
assert!(matches!(
decode_response(
Protocol::BlocksByRange,
Version::V2,
SupportedProtocol::BlocksByRangeV2,
&mut encoded_bytes,
ForkName::Base
)
@@ -1250,8 +1135,7 @@ mod tests {
// Trying to decode a base block with altair context bytes should give ssz decoding error
let mut encoded_bytes = encode_response(
Protocol::BlocksByRange,
Version::V2,
SupportedProtocol::BlocksByRangeV2,
RPCCodedResponse::Success(RPCResponse::BlocksByRange(Arc::new(empty_base_block()))),
ForkName::Altair,
)
@@ -1264,8 +1148,7 @@ mod tests {
assert!(matches!(
decode_response(
Protocol::BlocksByRange,
Version::V2,
SupportedProtocol::BlocksByRangeV2,
&mut wrong_fork_bytes,
ForkName::Altair
)
@@ -1275,8 +1158,7 @@ mod tests {
// Trying to decode an altair block with base context bytes should give ssz decoding error
let mut encoded_bytes = encode_response(
Protocol::BlocksByRoot,
Version::V2,
SupportedProtocol::BlocksByRootV2,
RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(altair_block()))),
ForkName::Altair,
)
@@ -1288,8 +1170,7 @@ mod tests {
assert!(matches!(
decode_response(
Protocol::BlocksByRange,
Version::V2,
SupportedProtocol::BlocksByRangeV2,
&mut wrong_fork_bytes,
ForkName::Altair
)
@@ -1302,8 +1183,7 @@ mod tests {
encoded_bytes.extend_from_slice(&fork_context.to_context_bytes(ForkName::Altair).unwrap());
encoded_bytes.extend_from_slice(
&encode_response(
Protocol::MetaData,
Version::V2,
SupportedProtocol::MetaDataV2,
RPCCodedResponse::Success(RPCResponse::MetaData(metadata())),
ForkName::Altair,
)
@@ -1311,8 +1191,7 @@ mod tests {
);
assert!(decode_response(
Protocol::MetaData,
Version::V2,
SupportedProtocol::MetaDataV2,
&mut encoded_bytes,
ForkName::Altair
)
@@ -1320,8 +1199,7 @@ mod tests {
// Sending context bytes which do not correspond to any fork should return an error
let mut encoded_bytes = encode_response(
Protocol::BlocksByRoot,
Version::V2,
SupportedProtocol::BlocksByRootV2,
RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(empty_base_block()))),
ForkName::Altair,
)
@@ -1333,8 +1211,7 @@ mod tests {
assert!(matches!(
decode_response(
Protocol::BlocksByRange,
Version::V2,
SupportedProtocol::BlocksByRangeV2,
&mut wrong_fork_bytes,
ForkName::Altair
)
@@ -1344,8 +1221,7 @@ mod tests {
// Sending bytes less than context bytes length should wait for more bytes by returning `Ok(None)`
let mut encoded_bytes = encode_response(
Protocol::BlocksByRoot,
Version::V2,
SupportedProtocol::BlocksByRootV2,
RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(empty_base_block()))),
ForkName::Altair,
)
@@ -1355,8 +1231,7 @@ mod tests {
assert_eq!(
decode_response(
Protocol::BlocksByRange,
Version::V2,
SupportedProtocol::BlocksByRangeV2,
&mut part,
ForkName::Altair
),
@@ -1370,9 +1245,12 @@ mod tests {
OutboundRequest::Ping(ping_message()),
OutboundRequest::Status(status_message()),
OutboundRequest::Goodbye(GoodbyeReason::Fault),
OutboundRequest::BlocksByRange(bbrange_request()),
OutboundRequest::BlocksByRoot(bbroot_request()),
OutboundRequest::MetaData(PhantomData::<Spec>),
OutboundRequest::BlocksByRange(bbrange_request_v1()),
OutboundRequest::BlocksByRange(bbrange_request_v2()),
OutboundRequest::BlocksByRoot(bbroot_request_v1()),
OutboundRequest::BlocksByRoot(bbroot_request_v2()),
OutboundRequest::MetaData(MetadataRequest::new_v1()),
OutboundRequest::MetaData(MetadataRequest::new_v2()),
];
for req in requests.iter() {
for fork_name in ForkName::list_all() {
@@ -1432,7 +1310,7 @@ mod tests {
// 10 (for stream identifier) + 80 + 42 = 132 > `max_compressed_len`. Hence, decoding should fail with `InvalidData`.
assert!(matches!(
decode_response(Protocol::Status, Version::V1, &mut dst, ForkName::Base).unwrap_err(),
decode_response(SupportedProtocol::StatusV1, &mut dst, ForkName::Base).unwrap_err(),
RPCError::InvalidData(_)
));
}
@@ -1490,8 +1368,7 @@ mod tests {
// 10 (for stream identifier) + 176156 + 8103 = 184269 > `max_compressed_len`. Hence, decoding should fail with `InvalidData`.
assert!(matches!(
decode_response(
Protocol::BlocksByRange,
Version::V2,
SupportedProtocol::BlocksByRangeV2,
&mut dst,
ForkName::Altair
)
@@ -1534,7 +1411,7 @@ mod tests {
dst.extend_from_slice(writer.get_ref());
assert!(matches!(
decode_response(Protocol::Status, Version::V1, &mut dst, ForkName::Base).unwrap_err(),
decode_response(SupportedProtocol::StatusV1, &mut dst, ForkName::Base).unwrap_err(),
RPCError::InvalidData(_)
));
}

View File

@@ -245,7 +245,7 @@ where
while let Some((id, req)) = self.dial_queue.pop() {
self.events_out.push(Err(HandlerErr::Outbound {
error: RPCError::Disconnected,
proto: req.protocol(),
proto: req.versioned_protocol().protocol(),
id,
}));
}
@@ -269,7 +269,7 @@ where
}
_ => self.events_out.push(Err(HandlerErr::Outbound {
error: RPCError::Disconnected,
proto: req.protocol(),
proto: req.versioned_protocol().protocol(),
id,
})),
}
@@ -334,7 +334,7 @@ where
) {
self.dial_negotiated -= 1;
let (id, request) = request_info;
let proto = request.protocol();
let proto = request.versioned_protocol().protocol();
// accept outbound connections only if the handler is not deactivated
if matches!(self.state, HandlerState::Deactivated) {
@@ -414,7 +414,7 @@ where
128,
) as usize),
delay_key: Some(delay_key),
protocol: req.protocol(),
protocol: req.versioned_protocol().protocol(),
request_start_time: Instant::now(),
remaining_chunks: expected_responses,
},
@@ -422,7 +422,7 @@ where
} else {
self.events_out.push(Err(HandlerErr::Inbound {
id: self.current_inbound_substream_id,
proto: req.protocol(),
proto: req.versioned_protocol().protocol(),
error: RPCError::HandlerRejected,
}));
return self.shutdown(None);
@@ -498,7 +498,7 @@ where
};
self.events_out.push(Err(HandlerErr::Outbound {
error,
proto: req.protocol(),
proto: req.versioned_protocol().protocol(),
id,
}));
}
@@ -895,7 +895,7 @@ where
// else we return an error, stream should not have closed early.
let outbound_err = HandlerErr::Outbound {
id: request_id,
proto: request.protocol(),
proto: request.versioned_protocol().protocol(),
error: RPCError::IncompleteStream,
};
return Poll::Ready(ConnectionHandlerEvent::Custom(Err(outbound_err)));

View File

@@ -3,11 +3,13 @@
use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield};
use regex::bytes::Regex;
use serde::Serialize;
use ssz::Encode;
use ssz_derive::{Decode, Encode};
use ssz_types::{
typenum::{U1024, U256},
VariableList,
};
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::Arc;
use strum::IntoStaticStr;
@@ -85,6 +87,30 @@ pub struct Ping {
pub data: u64,
}
/// The METADATA request structure.
#[superstruct(
variants(V1, V2),
variant_attributes(derive(Clone, Debug, PartialEq, Serialize),)
)]
#[derive(Clone, Debug, PartialEq)]
pub struct MetadataRequest<T: EthSpec> {
_phantom_data: PhantomData<T>,
}
impl<T: EthSpec> MetadataRequest<T> {
pub fn new_v1() -> Self {
Self::V1(MetadataRequestV1 {
_phantom_data: PhantomData,
})
}
pub fn new_v2() -> Self {
Self::V2(MetadataRequestV2 {
_phantom_data: PhantomData,
})
}
}
/// The METADATA response structure.
#[superstruct(
variants(V1, V2),
@@ -93,9 +119,8 @@ pub struct Ping {
serde(bound = "T: EthSpec", deny_unknown_fields),
)
)]
#[derive(Clone, Debug, PartialEq, Serialize, Encode)]
#[derive(Clone, Debug, PartialEq, Serialize)]
#[serde(bound = "T: EthSpec")]
#[ssz(enum_behaviour = "transparent")]
pub struct MetaData<T: EthSpec> {
/// A sequential counter indicating when data gets modified.
pub seq_number: u64,
@@ -106,6 +131,38 @@ pub struct MetaData<T: EthSpec> {
pub syncnets: EnrSyncCommitteeBitfield<T>,
}
impl<T: EthSpec> MetaData<T> {
/// Returns a V1 MetaData response from self.
pub fn metadata_v1(&self) -> Self {
match self {
md @ MetaData::V1(_) => md.clone(),
MetaData::V2(metadata) => MetaData::V1(MetaDataV1 {
seq_number: metadata.seq_number,
attnets: metadata.attnets.clone(),
}),
}
}
/// Returns a V2 MetaData response from self by filling unavailable fields with default.
pub fn metadata_v2(&self) -> Self {
match self {
MetaData::V1(metadata) => MetaData::V2(MetaDataV2 {
seq_number: metadata.seq_number,
attnets: metadata.attnets.clone(),
syncnets: Default::default(),
}),
md @ MetaData::V2(_) => md.clone(),
}
}
pub fn as_ssz_bytes(&self) -> Vec<u8> {
match self {
MetaData::V1(md) => md.as_ssz_bytes(),
MetaData::V2(md) => md.as_ssz_bytes(),
}
}
}
/// The reason given for a `Goodbye` message.
///
/// Note: any unknown `u64::into(n)` will resolve to `Goodbye::Unknown` for any unknown `n`,
@@ -197,7 +254,11 @@ impl ssz::Decode for GoodbyeReason {
}
/// Request a number of beacon block roots from a peer.
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
#[superstruct(
variants(V1, V2),
variant_attributes(derive(Encode, Decode, Clone, Debug, PartialEq))
)]
#[derive(Clone, Debug, PartialEq)]
pub struct BlocksByRangeRequest {
/// The starting slot to request blocks.
pub start_slot: u64,
@@ -206,8 +267,23 @@ pub struct BlocksByRangeRequest {
pub count: u64,
}
impl BlocksByRangeRequest {
/// The default request is V2
pub fn new(start_slot: u64, count: u64) -> Self {
Self::V2(BlocksByRangeRequestV2 { start_slot, count })
}
pub fn new_v1(start_slot: u64, count: u64) -> Self {
Self::V1(BlocksByRangeRequestV1 { start_slot, count })
}
}
/// Request a number of beacon block roots from a peer.
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
#[superstruct(
variants(V1, V2),
variant_attributes(derive(Encode, Decode, Clone, Debug, PartialEq))
)]
#[derive(Clone, Debug, PartialEq)]
pub struct OldBlocksByRangeRequest {
/// The starting slot to request blocks.
pub start_slot: u64,
@@ -223,13 +299,43 @@ pub struct OldBlocksByRangeRequest {
pub step: u64,
}
impl OldBlocksByRangeRequest {
/// The default request is V2
pub fn new(start_slot: u64, count: u64, step: u64) -> Self {
Self::V2(OldBlocksByRangeRequestV2 {
start_slot,
count,
step,
})
}
pub fn new_v1(start_slot: u64, count: u64, step: u64) -> Self {
Self::V1(OldBlocksByRangeRequestV1 {
start_slot,
count,
step,
})
}
}
/// Request a number of beacon block bodies from a peer.
#[superstruct(variants(V1, V2), variant_attributes(derive(Clone, Debug, PartialEq)))]
#[derive(Clone, Debug, PartialEq)]
pub struct BlocksByRootRequest {
/// The list of beacon block bodies being requested.
pub block_roots: VariableList<Hash256, MaxRequestBlocks>,
}
impl BlocksByRootRequest {
pub fn new(block_roots: VariableList<Hash256, MaxRequestBlocks>) -> Self {
Self::V2(BlocksByRootRequestV2 { block_roots })
}
pub fn new_v1(block_roots: VariableList<Hash256, MaxRequestBlocks>) -> Self {
Self::V1(BlocksByRootRequestV1 { block_roots })
}
}
/* RPC Handling and Grouping */
// Collection of enums and structs used by the Codecs to encode/decode RPC messages
@@ -438,7 +544,12 @@ impl std::fmt::Display for GoodbyeReason {
impl std::fmt::Display for BlocksByRangeRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Start Slot: {}, Count: {}", self.start_slot, self.count)
write!(
f,
"Start Slot: {}, Count: {}",
self.start_slot(),
self.count()
)
}
}
@@ -447,7 +558,9 @@ impl std::fmt::Display for OldBlocksByRangeRequest {
write!(
f,
"Start Slot: {}, Count: {}, Step: {}",
self.start_slot, self.count, self.step
self.start_slot(),
self.count(),
self.step()
)
}
}

View File

@@ -247,7 +247,7 @@ where
}
Err(RateLimitedErr::TooLarge) => {
// we set the batch sizes, so this is a coding/config err for most protocols
let protocol = req.protocol();
let protocol = req.versioned_protocol().protocol();
if matches!(protocol, Protocol::BlocksByRange) {
debug!(self.log, "Blocks by range request will never be processed"; "request" => %req);
} else {
@@ -335,7 +335,7 @@ where
serializer.emit_arguments("peer_id", &format_args!("{}", self.peer_id))?;
let (msg_kind, protocol) = match &self.event {
Ok(received) => match received {
RPCReceived::Request(_, req) => ("request", req.protocol()),
RPCReceived::Request(_, req) => ("request", req.versioned_protocol().protocol()),
RPCReceived::Response(_, res) => ("response", res.protocol()),
RPCReceived::EndOfStream(_, end) => (
"end_of_stream",

View File

@@ -1,11 +1,8 @@
use std::marker::PhantomData;
use super::methods::*;
use super::protocol::Protocol;
use super::protocol::ProtocolId;
use super::protocol::SupportedProtocol;
use super::RPCError;
use crate::rpc::protocol::Encoding;
use crate::rpc::protocol::Version;
use crate::rpc::{
codec::{base::BaseOutboundCodec, ssz_snappy::SSZSnappyOutboundCodec, OutboundCodec},
methods::ResponseTermination,
@@ -38,9 +35,8 @@ pub enum OutboundRequest<TSpec: EthSpec> {
Goodbye(GoodbyeReason),
BlocksByRange(OldBlocksByRangeRequest),
BlocksByRoot(BlocksByRootRequest),
LightClientBootstrap(LightClientBootstrapRequest),
Ping(Ping),
MetaData(PhantomData<TSpec>),
MetaData(MetadataRequest<TSpec>),
}
impl<TSpec: EthSpec> UpgradeInfo for OutboundRequestContainer<TSpec> {
@@ -59,36 +55,29 @@ impl<TSpec: EthSpec> OutboundRequest<TSpec> {
match self {
// add more protocols when versions/encodings are supported
OutboundRequest::Status(_) => vec![ProtocolId::new(
Protocol::Status,
Version::V1,
SupportedProtocol::StatusV1,
Encoding::SSZSnappy,
)],
OutboundRequest::Goodbye(_) => vec![ProtocolId::new(
Protocol::Goodbye,
Version::V1,
SupportedProtocol::GoodbyeV1,
Encoding::SSZSnappy,
)],
OutboundRequest::BlocksByRange(_) => vec![
ProtocolId::new(Protocol::BlocksByRange, Version::V2, Encoding::SSZSnappy),
ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy),
ProtocolId::new(SupportedProtocol::BlocksByRangeV2, Encoding::SSZSnappy),
ProtocolId::new(SupportedProtocol::BlocksByRangeV1, Encoding::SSZSnappy),
],
OutboundRequest::BlocksByRoot(_) => vec![
ProtocolId::new(Protocol::BlocksByRoot, Version::V2, Encoding::SSZSnappy),
ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy),
ProtocolId::new(SupportedProtocol::BlocksByRootV2, Encoding::SSZSnappy),
ProtocolId::new(SupportedProtocol::BlocksByRootV1, Encoding::SSZSnappy),
],
OutboundRequest::Ping(_) => vec![ProtocolId::new(
Protocol::Ping,
Version::V1,
SupportedProtocol::PingV1,
Encoding::SSZSnappy,
)],
OutboundRequest::MetaData(_) => vec![
ProtocolId::new(Protocol::MetaData, Version::V2, Encoding::SSZSnappy),
ProtocolId::new(Protocol::MetaData, Version::V1, Encoding::SSZSnappy),
ProtocolId::new(SupportedProtocol::MetaDataV2, Encoding::SSZSnappy),
ProtocolId::new(SupportedProtocol::MetaDataV1, Encoding::SSZSnappy),
],
// Note: This match arm is technically unreachable as we only respond to light client requests
// that we generate from the beacon state.
// We do not make light client rpc requests from the beacon node
OutboundRequest::LightClientBootstrap(_) => vec![],
}
}
/* These functions are used in the handler for stream management */
@@ -98,24 +87,31 @@ impl<TSpec: EthSpec> OutboundRequest<TSpec> {
match self {
OutboundRequest::Status(_) => 1,
OutboundRequest::Goodbye(_) => 0,
OutboundRequest::BlocksByRange(req) => req.count,
OutboundRequest::BlocksByRoot(req) => req.block_roots.len() as u64,
OutboundRequest::BlocksByRange(req) => *req.count(),
OutboundRequest::BlocksByRoot(req) => req.block_roots().len() as u64,
OutboundRequest::Ping(_) => 1,
OutboundRequest::MetaData(_) => 1,
OutboundRequest::LightClientBootstrap(_) => 1,
}
}
/// Gives the corresponding `Protocol` to this request.
pub fn protocol(&self) -> Protocol {
/// Gives the corresponding `SupportedProtocol` to this request.
pub fn versioned_protocol(&self) -> SupportedProtocol {
match self {
OutboundRequest::Status(_) => Protocol::Status,
OutboundRequest::Goodbye(_) => Protocol::Goodbye,
OutboundRequest::BlocksByRange(_) => Protocol::BlocksByRange,
OutboundRequest::BlocksByRoot(_) => Protocol::BlocksByRoot,
OutboundRequest::Ping(_) => Protocol::Ping,
OutboundRequest::MetaData(_) => Protocol::MetaData,
OutboundRequest::LightClientBootstrap(_) => Protocol::LightClientBootstrap,
OutboundRequest::Status(_) => SupportedProtocol::StatusV1,
OutboundRequest::Goodbye(_) => SupportedProtocol::GoodbyeV1,
OutboundRequest::BlocksByRange(req) => match req {
OldBlocksByRangeRequest::V1(_) => SupportedProtocol::BlocksByRangeV1,
OldBlocksByRangeRequest::V2(_) => SupportedProtocol::BlocksByRangeV2,
},
OutboundRequest::BlocksByRoot(req) => match req {
BlocksByRootRequest::V1(_) => SupportedProtocol::BlocksByRootV1,
BlocksByRootRequest::V2(_) => SupportedProtocol::BlocksByRootV2,
},
OutboundRequest::Ping(_) => SupportedProtocol::PingV1,
OutboundRequest::MetaData(req) => match req {
MetadataRequest::V1(_) => SupportedProtocol::MetaDataV1,
MetadataRequest::V2(_) => SupportedProtocol::MetaDataV2,
},
}
}
@@ -127,7 +123,6 @@ impl<TSpec: EthSpec> OutboundRequest<TSpec> {
// variants that have `multiple_responses()` can have values.
OutboundRequest::BlocksByRange(_) => ResponseTermination::BlocksByRange,
OutboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot,
OutboundRequest::LightClientBootstrap(_) => unreachable!(),
OutboundRequest::Status(_) => unreachable!(),
OutboundRequest::Goodbye(_) => unreachable!(),
OutboundRequest::Ping(_) => unreachable!(),
@@ -185,9 +180,6 @@ impl<TSpec: EthSpec> std::fmt::Display for OutboundRequest<TSpec> {
OutboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req),
OutboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data),
OutboundRequest::MetaData(_) => write!(f, "MetaData request"),
OutboundRequest::LightClientBootstrap(bootstrap) => {
write!(f, "Lightclient Bootstrap: {}", bootstrap.root)
}
}
}
}

View File

@@ -179,21 +179,74 @@ pub enum Protocol {
LightClientBootstrap,
}
/// RPC Versions
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Version {
/// Version 1 of RPC
V1,
/// Version 2 of RPC
V2,
}
/// RPC Encondings supported.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Encoding {
SSZSnappy,
}
/// All valid protocol name and version combinations.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum SupportedProtocol {
StatusV1,
GoodbyeV1,
BlocksByRangeV1,
BlocksByRangeV2,
BlocksByRootV1,
BlocksByRootV2,
PingV1,
MetaDataV1,
MetaDataV2,
LightClientBootstrapV1,
}
impl SupportedProtocol {
pub fn version_string(&self) -> &'static str {
match self {
SupportedProtocol::StatusV1 => "1",
SupportedProtocol::GoodbyeV1 => "1",
SupportedProtocol::BlocksByRangeV1 => "1",
SupportedProtocol::BlocksByRangeV2 => "2",
SupportedProtocol::BlocksByRootV1 => "1",
SupportedProtocol::BlocksByRootV2 => "2",
SupportedProtocol::PingV1 => "1",
SupportedProtocol::MetaDataV1 => "1",
SupportedProtocol::MetaDataV2 => "2",
SupportedProtocol::LightClientBootstrapV1 => "1",
}
}
pub fn protocol(&self) -> Protocol {
match self {
SupportedProtocol::StatusV1 => Protocol::Status,
SupportedProtocol::GoodbyeV1 => Protocol::Goodbye,
SupportedProtocol::BlocksByRangeV1 => Protocol::BlocksByRange,
SupportedProtocol::BlocksByRangeV2 => Protocol::BlocksByRange,
SupportedProtocol::BlocksByRootV1 => Protocol::BlocksByRoot,
SupportedProtocol::BlocksByRootV2 => Protocol::BlocksByRoot,
SupportedProtocol::PingV1 => Protocol::Ping,
SupportedProtocol::MetaDataV1 => Protocol::MetaData,
SupportedProtocol::MetaDataV2 => Protocol::MetaData,
SupportedProtocol::LightClientBootstrapV1 => Protocol::LightClientBootstrap,
}
}
fn currently_supported() -> Vec<ProtocolId> {
vec![
ProtocolId::new(Self::StatusV1, Encoding::SSZSnappy),
ProtocolId::new(Self::GoodbyeV1, Encoding::SSZSnappy),
// V2 variants have higher preference then V1
ProtocolId::new(Self::BlocksByRangeV2, Encoding::SSZSnappy),
ProtocolId::new(Self::BlocksByRangeV1, Encoding::SSZSnappy),
ProtocolId::new(Self::BlocksByRootV2, Encoding::SSZSnappy),
ProtocolId::new(Self::BlocksByRootV1, Encoding::SSZSnappy),
ProtocolId::new(Self::PingV1, Encoding::SSZSnappy),
ProtocolId::new(Self::MetaDataV2, Encoding::SSZSnappy),
ProtocolId::new(Self::MetaDataV1, Encoding::SSZSnappy),
]
}
}
impl std::fmt::Display for Encoding {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let repr = match self {
@@ -203,16 +256,6 @@ impl std::fmt::Display for Encoding {
}
}
impl std::fmt::Display for Version {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let repr = match self {
Version::V1 => "1",
Version::V2 => "2",
};
f.write_str(repr)
}
}
#[derive(Debug, Clone)]
pub struct RPCProtocol<TSpec: EthSpec> {
pub fork_context: Arc<ForkContext>,
@@ -227,22 +270,10 @@ impl<TSpec: EthSpec> UpgradeInfo for RPCProtocol<TSpec> {
/// The list of supported RPC protocols for Lighthouse.
fn protocol_info(&self) -> Self::InfoIter {
let mut supported_protocols = vec![
ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy),
ProtocolId::new(Protocol::Goodbye, Version::V1, Encoding::SSZSnappy),
// V2 variants have higher preference then V1
ProtocolId::new(Protocol::BlocksByRange, Version::V2, Encoding::SSZSnappy),
ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy),
ProtocolId::new(Protocol::BlocksByRoot, Version::V2, Encoding::SSZSnappy),
ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy),
ProtocolId::new(Protocol::Ping, Version::V1, Encoding::SSZSnappy),
ProtocolId::new(Protocol::MetaData, Version::V2, Encoding::SSZSnappy),
ProtocolId::new(Protocol::MetaData, Version::V1, Encoding::SSZSnappy),
];
let mut supported_protocols = SupportedProtocol::currently_supported();
if self.enable_light_client_server {
supported_protocols.push(ProtocolId::new(
Protocol::LightClientBootstrap,
Version::V1,
SupportedProtocol::LightClientBootstrapV1,
Encoding::SSZSnappy,
));
}
@@ -272,11 +303,8 @@ impl RpcLimits {
/// Tracks the types in a protocol id.
#[derive(Clone, Debug)]
pub struct ProtocolId {
/// The RPC message type/name.
pub message_name: Protocol,
/// The version of the RPC.
pub version: Version,
/// The protocol name and version
pub versioned_protocol: SupportedProtocol,
/// The encoding of the RPC.
pub encoding: Encoding,
@@ -288,7 +316,7 @@ pub struct ProtocolId {
impl ProtocolId {
/// Returns min and max size for messages of given protocol id requests.
pub fn rpc_request_limits(&self) -> RpcLimits {
match self.message_name {
match self.versioned_protocol.protocol() {
Protocol::Status => RpcLimits::new(
<StatusMessage as Encode>::ssz_fixed_len(),
<StatusMessage as Encode>::ssz_fixed_len(),
@@ -297,9 +325,10 @@ impl ProtocolId {
<GoodbyeReason as Encode>::ssz_fixed_len(),
<GoodbyeReason as Encode>::ssz_fixed_len(),
),
// V1 and V2 requests are the same
Protocol::BlocksByRange => RpcLimits::new(
<OldBlocksByRangeRequest as Encode>::ssz_fixed_len(),
<OldBlocksByRangeRequest as Encode>::ssz_fixed_len(),
<OldBlocksByRangeRequestV2 as Encode>::ssz_fixed_len(),
<OldBlocksByRangeRequestV2 as Encode>::ssz_fixed_len(),
),
Protocol::BlocksByRoot => {
RpcLimits::new(*BLOCKS_BY_ROOT_REQUEST_MIN, *BLOCKS_BY_ROOT_REQUEST_MAX)
@@ -318,7 +347,7 @@ impl ProtocolId {
/// Returns min and max size for messages of given protocol id responses.
pub fn rpc_response_limits<T: EthSpec>(&self, fork_context: &ForkContext) -> RpcLimits {
match self.message_name {
match self.versioned_protocol.protocol() {
Protocol::Status => RpcLimits::new(
<StatusMessage as Encode>::ssz_fixed_len(),
<StatusMessage as Encode>::ssz_fixed_len(),
@@ -344,30 +373,34 @@ impl ProtocolId {
/// Returns `true` if the given `ProtocolId` should expect `context_bytes` in the
/// beginning of the stream, else returns `false`.
pub fn has_context_bytes(&self) -> bool {
match self.message_name {
Protocol::BlocksByRange | Protocol::BlocksByRoot => match self.version {
Version::V2 => true,
Version::V1 => false,
},
Protocol::LightClientBootstrap => match self.version {
Version::V2 | Version::V1 => true,
},
Protocol::Goodbye | Protocol::Ping | Protocol::Status | Protocol::MetaData => false,
match self.versioned_protocol {
SupportedProtocol::BlocksByRangeV2
| SupportedProtocol::BlocksByRootV2
| SupportedProtocol::LightClientBootstrapV1 => true,
SupportedProtocol::StatusV1
| SupportedProtocol::BlocksByRootV1
| SupportedProtocol::BlocksByRangeV1
| SupportedProtocol::PingV1
| SupportedProtocol::MetaDataV1
| SupportedProtocol::MetaDataV2
| SupportedProtocol::GoodbyeV1 => false,
}
}
}
/// An RPC protocol ID.
impl ProtocolId {
pub fn new(message_name: Protocol, version: Version, encoding: Encoding) -> Self {
pub fn new(versioned_protocol: SupportedProtocol, encoding: Encoding) -> Self {
let protocol_id = format!(
"{}/{}/{}/{}",
PROTOCOL_PREFIX, message_name, version, encoding
PROTOCOL_PREFIX,
versioned_protocol.protocol(),
versioned_protocol.version_string(),
encoding
);
ProtocolId {
message_name,
version,
versioned_protocol,
encoding,
protocol_id,
}
@@ -400,7 +433,7 @@ where
fn upgrade_inbound(self, socket: TSocket, protocol: ProtocolId) -> Self::Future {
async move {
let protocol_name = protocol.message_name;
let versioned_protocol = protocol.versioned_protocol;
// convert the socket to tokio compatible socket
let socket = socket.compat();
let codec = match protocol.encoding {
@@ -419,8 +452,13 @@ where
let socket = Framed::new(Box::pin(timed_socket), codec);
// MetaData requests should be empty, return the stream
match protocol_name {
Protocol::MetaData => Ok((InboundRequest::MetaData(PhantomData), socket)),
match versioned_protocol {
SupportedProtocol::MetaDataV1 => {
Ok((InboundRequest::MetaData(MetadataRequest::new_v1()), socket))
}
SupportedProtocol::MetaDataV2 => {
Ok((InboundRequest::MetaData(MetadataRequest::new_v2()), socket))
}
_ => {
match tokio::time::timeout(
Duration::from_secs(REQUEST_TIMEOUT),
@@ -448,7 +486,7 @@ pub enum InboundRequest<TSpec: EthSpec> {
BlocksByRoot(BlocksByRootRequest),
LightClientBootstrap(LightClientBootstrapRequest),
Ping(Ping),
MetaData(PhantomData<TSpec>),
MetaData(MetadataRequest<TSpec>),
}
/// Implements the encoding per supported protocol for `RPCRequest`.
@@ -460,24 +498,33 @@ impl<TSpec: EthSpec> InboundRequest<TSpec> {
match self {
InboundRequest::Status(_) => 1,
InboundRequest::Goodbye(_) => 0,
InboundRequest::BlocksByRange(req) => req.count,
InboundRequest::BlocksByRoot(req) => req.block_roots.len() as u64,
InboundRequest::BlocksByRange(req) => *req.count(),
InboundRequest::BlocksByRoot(req) => req.block_roots().len() as u64,
InboundRequest::Ping(_) => 1,
InboundRequest::MetaData(_) => 1,
InboundRequest::LightClientBootstrap(_) => 1,
}
}
/// Gives the corresponding `Protocol` to this request.
pub fn protocol(&self) -> Protocol {
/// Gives the corresponding `SupportedProtocol` to this request.
pub fn versioned_protocol(&self) -> SupportedProtocol {
match self {
InboundRequest::Status(_) => Protocol::Status,
InboundRequest::Goodbye(_) => Protocol::Goodbye,
InboundRequest::BlocksByRange(_) => Protocol::BlocksByRange,
InboundRequest::BlocksByRoot(_) => Protocol::BlocksByRoot,
InboundRequest::Ping(_) => Protocol::Ping,
InboundRequest::MetaData(_) => Protocol::MetaData,
InboundRequest::LightClientBootstrap(_) => Protocol::LightClientBootstrap,
InboundRequest::Status(_) => SupportedProtocol::StatusV1,
InboundRequest::Goodbye(_) => SupportedProtocol::GoodbyeV1,
InboundRequest::BlocksByRange(req) => match req {
OldBlocksByRangeRequest::V1(_) => SupportedProtocol::BlocksByRangeV1,
OldBlocksByRangeRequest::V2(_) => SupportedProtocol::BlocksByRangeV2,
},
InboundRequest::BlocksByRoot(req) => match req {
BlocksByRootRequest::V1(_) => SupportedProtocol::BlocksByRootV1,
BlocksByRootRequest::V2(_) => SupportedProtocol::BlocksByRootV2,
},
InboundRequest::Ping(_) => SupportedProtocol::PingV1,
InboundRequest::MetaData(req) => match req {
MetadataRequest::V1(_) => SupportedProtocol::MetaDataV1,
MetadataRequest::V2(_) => SupportedProtocol::MetaDataV2,
},
InboundRequest::LightClientBootstrap(_) => SupportedProtocol::LightClientBootstrapV1,
}
}

View File

@@ -192,7 +192,7 @@ pub trait RateLimiterItem {
impl<T: EthSpec> RateLimiterItem for super::InboundRequest<T> {
fn protocol(&self) -> Protocol {
self.protocol()
self.versioned_protocol().protocol()
}
fn expected_responses(&self) -> u64 {
@@ -202,7 +202,7 @@ impl<T: EthSpec> RateLimiterItem for super::InboundRequest<T> {
impl<T: EthSpec> RateLimiterItem for super::OutboundRequest<T> {
fn protocol(&self) -> Protocol {
self.protocol()
self.versioned_protocol().protocol()
}
fn expected_responses(&self) -> u64 {

View File

@@ -72,7 +72,7 @@ impl<Id: ReqId, TSpec: EthSpec> SelfRateLimiter<Id, TSpec> {
request_id: Id,
req: OutboundRequest<TSpec>,
) -> Result<BehaviourAction<Id, TSpec>, Error> {
let protocol = req.protocol();
let protocol = req.versioned_protocol().protocol();
// First check that there are not already other requests waiting to be sent.
if let Some(queued_requests) = self.delayed_requests.get_mut(&(peer_id, protocol)) {
queued_requests.push_back(QueuedRequest { req, request_id });
@@ -111,7 +111,7 @@ impl<Id: ReqId, TSpec: EthSpec> SelfRateLimiter<Id, TSpec> {
event: RPCSend::Request(request_id, req),
}),
Err(e) => {
let protocol = req.protocol();
let protocol = req.versioned_protocol();
match e {
RateLimitedErr::TooLarge => {
// this should never happen with default parameters. Let's just send the request.
@@ -119,7 +119,7 @@ impl<Id: ReqId, TSpec: EthSpec> SelfRateLimiter<Id, TSpec> {
crit!(
log,
"Self rate limiting error for a batch that will never fit. Sending request anyway. Check configuration parameters.";
"protocol" => %req.protocol()
"protocol" => %req.versioned_protocol().protocol()
);
Ok(BehaviourAction::NotifyHandler {
peer_id,
@@ -128,7 +128,7 @@ impl<Id: ReqId, TSpec: EthSpec> SelfRateLimiter<Id, TSpec> {
})
}
RateLimitedErr::TooSoon(wait_time) => {
debug!(log, "Self rate limiting"; "protocol" => %protocol, "wait_time_ms" => wait_time.as_millis(), "peer_id" => %peer_id);
debug!(log, "Self rate limiting"; "protocol" => %protocol.protocol(), "wait_time_ms" => wait_time.as_millis(), "peer_id" => %peer_id);
Err((QueuedRequest { req, request_id }, wait_time))
}
}