Remove generic E from RequestId (#6462)

* remove Ethspec from types where it's possible to do so

* remove generic E from RequestType
This commit is contained in:
João Oliveira
2024-10-17 00:05:59 +01:00
committed by GitHub
parent 83d5c521d7
commit 772929fae2
15 changed files with 69 additions and 68 deletions

View File

@@ -28,7 +28,7 @@ const CONTEXT_BYTES_LEN: usize = 4;
/* Inbound Codec */
pub struct SSZSnappyInboundCodec<E: EthSpec> {
pub struct SSZSnappyInboundCodec<E> {
protocol: ProtocolId,
inner: Uvi<usize>,
len: Option<usize>,
@@ -142,7 +142,7 @@ impl<E: EthSpec> Encoder<RpcResponse<E>> for SSZSnappyInboundCodec<E> {
// Decoder for inbound streams: Decodes RPC requests from peers
impl<E: EthSpec> Decoder for SSZSnappyInboundCodec<E> {
type Item = RequestType<E>;
type Item = RequestType;
type Error = RPCError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
@@ -194,7 +194,7 @@ impl<E: EthSpec> Decoder for SSZSnappyInboundCodec<E> {
}
/* Outbound Codec: Codec for initiating RPC requests */
pub struct SSZSnappyOutboundCodec<E: EthSpec> {
pub struct SSZSnappyOutboundCodec<E> {
inner: Uvi<usize>,
len: Option<usize>,
protocol: ProtocolId,
@@ -321,10 +321,10 @@ impl<E: EthSpec> SSZSnappyOutboundCodec<E> {
}
// Encoder for outbound streams: Encodes RPC Requests to peers
impl<E: EthSpec> Encoder<RequestType<E>> for SSZSnappyOutboundCodec<E> {
impl<E: EthSpec> Encoder<RequestType> for SSZSnappyOutboundCodec<E> {
type Error = RPCError;
fn encode(&mut self, item: RequestType<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(&mut self, item: RequestType, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = match item {
RequestType::Status(req) => req.as_ssz_bytes(),
RequestType::Goodbye(req) => req.as_ssz_bytes(),
@@ -543,11 +543,11 @@ fn handle_length(
/// Decodes an `InboundRequest` from the byte stream.
/// `decoded_buffer` should be an ssz-encoded bytestream with
// length = length-prefix received in the beginning of the stream.
fn handle_rpc_request<E: EthSpec>(
fn handle_rpc_request(
versioned_protocol: SupportedProtocol,
decoded_buffer: &[u8],
spec: &ChainSpec,
) -> Result<Option<RequestType<E>>, RPCError> {
) -> Result<Option<RequestType>, RPCError> {
match versioned_protocol {
SupportedProtocol::StatusV1 => Ok(Some(RequestType::Status(
StatusMessage::from_ssz_bytes(decoded_buffer)?,
@@ -1009,6 +1009,7 @@ mod tests {
BlobsByRangeRequest {
start_slot: 0,
count: 10,
max_blobs_per_block: Spec::max_blobs_per_block(),
}
}
@@ -1154,7 +1155,7 @@ mod tests {
}
/// Verifies that requests we send are encoded in a way that we would correctly decode too.
fn encode_then_decode_request(req: RequestType<Spec>, fork_name: ForkName, spec: &ChainSpec) {
fn encode_then_decode_request(req: RequestType, fork_name: ForkName, spec: &ChainSpec) {
let fork_context = Arc::new(fork_context(fork_name));
let max_packet_size = max_rpc_size(&fork_context, spec.max_chunk_size as usize);
let protocol = ProtocolId::new(req.versioned_protocol(), Encoding::SSZSnappy);
@@ -1745,7 +1746,7 @@ mod tests {
fn test_encode_then_decode_request() {
let chain_spec = Spec::default_spec();
let requests: &[RequestType<Spec>] = &[
let requests: &[RequestType] = &[
RequestType::Ping(ping_message()),
RequestType::Status(status_message()),
RequestType::Goodbye(GoodbyeReason::Fault),

View File

@@ -20,6 +20,7 @@ use slog::{crit, debug, trace};
use smallvec::SmallVec;
use std::{
collections::{hash_map::Entry, VecDeque},
marker::PhantomData,
pin::Pin,
sync::Arc,
task::{Context, Poll},
@@ -96,7 +97,7 @@ where
events_out: SmallVec<[HandlerEvent<Id, E>; 4]>,
/// Queue of outbound substreams to open.
dial_queue: SmallVec<[(Id, RequestType<E>); 4]>,
dial_queue: SmallVec<[(Id, RequestType); 4]>,
/// Current number of concurrent outbound substreams being opened.
dial_negotiated: u32,
@@ -206,7 +207,7 @@ pub enum OutboundSubstreamState<E: EthSpec> {
/// The framed negotiated substream.
substream: Box<OutboundFramed<Stream, E>>,
/// Keeps track of the actual request sent.
request: RequestType<E>,
request: RequestType,
},
/// Closing an outbound substream>
Closing(Box<OutboundFramed<Stream, E>>),
@@ -274,7 +275,7 @@ where
}
/// Opens an outbound substream with a request.
fn send_request(&mut self, id: Id, req: RequestType<E>) {
fn send_request(&mut self, id: Id, req: RequestType) {
match self.state {
HandlerState::Active => {
self.dial_queue.push((id, req));
@@ -330,7 +331,7 @@ where
type ToBehaviour = HandlerEvent<Id, E>;
type InboundProtocol = RPCProtocol<E>;
type OutboundProtocol = OutboundRequestContainer<E>;
type OutboundOpenInfo = (Id, RequestType<E>); // Keep track of the id and the request
type OutboundOpenInfo = (Id, RequestType); // Keep track of the id and the request
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
@@ -788,6 +789,7 @@ where
req: req.clone(),
fork_context: self.fork_context.clone(),
max_rpc_size: self.listen_protocol().upgrade().max_rpc_size,
phantom: PhantomData,
},
(),
)
@@ -905,7 +907,7 @@ where
fn on_fully_negotiated_outbound(
&mut self,
substream: OutboundFramed<Stream, E>,
(id, request): (Id, RequestType<E>),
(id, request): (Id, RequestType),
) {
self.dial_negotiated -= 1;
// Reset any io-retries counter.
@@ -961,7 +963,7 @@ where
}
fn on_dial_upgrade_error(
&mut self,
request_info: (Id, RequestType<E>),
request_info: (Id, RequestType),
error: StreamUpgradeError<RPCError>,
) {
let (id, req) = request_info;

View File

@@ -8,7 +8,6 @@ use ssz_derive::{Decode, Encode};
use ssz_types::{typenum::U256, VariableList};
use std::collections::BTreeMap;
use std::fmt::Display;
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::Arc;
use strum::IntoStaticStr;
@@ -93,27 +92,19 @@ pub struct Ping {
variant_attributes(derive(Clone, Debug, PartialEq, Serialize),)
)]
#[derive(Clone, Debug, PartialEq)]
pub struct MetadataRequest<E: EthSpec> {
_phantom_data: PhantomData<E>,
}
pub struct MetadataRequest;
impl<E: EthSpec> MetadataRequest<E> {
impl MetadataRequest {
pub fn new_v1() -> Self {
Self::V1(MetadataRequestV1 {
_phantom_data: PhantomData,
})
Self::V1(MetadataRequestV1 {})
}
pub fn new_v2() -> Self {
Self::V2(MetadataRequestV2 {
_phantom_data: PhantomData,
})
Self::V2(MetadataRequestV2 {})
}
pub fn new_v3() -> Self {
Self::V3(MetadataRequestV3 {
_phantom_data: PhantomData,
})
Self::V3(MetadataRequestV3 {})
}
}
@@ -323,11 +314,14 @@ pub struct BlobsByRangeRequest {
/// The number of slots from the start slot.
pub count: u64,
/// maximum number of blobs in a single block.
pub max_blobs_per_block: usize,
}
impl BlobsByRangeRequest {
pub fn max_blobs_requested<E: EthSpec>(&self) -> u64 {
self.count.saturating_mul(E::max_blobs_per_block() as u64)
pub fn max_blobs_requested(&self) -> u64 {
self.count.saturating_mul(self.max_blobs_per_block as u64)
}
}
@@ -343,7 +337,7 @@ pub struct DataColumnsByRangeRequest {
}
impl DataColumnsByRangeRequest {
pub fn max_requested<E: EthSpec>(&self) -> u64 {
pub fn max_requested(&self) -> u64 {
self.count.saturating_mul(self.columns.len() as u64)
}

View File

@@ -61,7 +61,7 @@ pub enum RPCSend<Id, E: EthSpec> {
///
/// The `Id` is given by the application making the request. These
/// go over *outbound* connections.
Request(Id, RequestType<E>),
Request(Id, RequestType),
/// A response sent from Lighthouse.
///
/// The `SubstreamId` must correspond to the RPC-given ID of the original request received from the
@@ -79,7 +79,7 @@ pub enum RPCReceived<Id, E: EthSpec> {
///
/// The `SubstreamId` is given by the `RPCHandler` as it identifies this request with the
/// *inbound* substream over which it is managed.
Request(Request<E>),
Request(Request),
/// A response received from the outside.
///
/// The `Id` corresponds to the application given ID of the original request sent to the
@@ -113,10 +113,10 @@ impl RequestId {
/// An Rpc Request.
#[derive(Debug, Clone)]
pub struct Request<E: EthSpec> {
pub struct Request {
pub id: RequestId,
pub substream_id: SubstreamId,
pub r#type: RequestType<E>,
pub r#type: RequestType,
}
impl<E: EthSpec, Id: std::fmt::Debug> std::fmt::Display for RPCSend<Id, E> {
@@ -221,7 +221,7 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
/// Submits an RPC request.
///
/// The peer must be connected for this to succeed.
pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, req: RequestType<E>) {
pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, req: RequestType) {
let event = if let Some(self_limiter) = self.self_limiter.as_mut() {
match self_limiter.allows(peer_id, request_id, req) {
Ok(event) => event,

View File

@@ -7,6 +7,7 @@ use futures::future::BoxFuture;
use futures::prelude::{AsyncRead, AsyncWrite};
use futures::{FutureExt, SinkExt};
use libp2p::core::{OutboundUpgrade, UpgradeInfo};
use std::marker::PhantomData;
use std::sync::Arc;
use tokio_util::{
codec::Framed,
@@ -19,13 +20,14 @@ use types::{EthSpec, ForkContext};
// `OutboundUpgrade`
#[derive(Debug, Clone)]
pub struct OutboundRequestContainer<E: EthSpec> {
pub req: RequestType<E>,
pub struct OutboundRequestContainer<E> {
pub req: RequestType,
pub fork_context: Arc<ForkContext>,
pub max_rpc_size: usize,
pub phantom: PhantomData<E>,
}
impl<E: EthSpec> UpgradeInfo for OutboundRequestContainer<E> {
impl<E> UpgradeInfo for OutboundRequestContainer<E> {
type Info = ProtocolId;
type InfoIter = Vec<Self::Info>;

View File

@@ -643,7 +643,7 @@ pub fn rpc_data_column_limits<E: EthSpec>() -> RpcLimits {
// The inbound protocol reads the request, decodes it and returns the stream to the protocol
// handler to respond to once ready.
pub type InboundOutput<TSocket, E> = (RequestType<E>, InboundFramed<TSocket, E>);
pub type InboundOutput<TSocket, E> = (RequestType, InboundFramed<TSocket, E>);
pub type InboundFramed<TSocket, E> =
Framed<std::pin::Pin<Box<TimeoutStream<Compat<TSocket>>>>, SSZSnappyInboundCodec<E>>;
@@ -711,7 +711,7 @@ where
}
#[derive(Debug, Clone, PartialEq)]
pub enum RequestType<E: EthSpec> {
pub enum RequestType {
Status(StatusMessage),
Goodbye(GoodbyeReason),
BlocksByRange(OldBlocksByRangeRequest),
@@ -724,11 +724,11 @@ pub enum RequestType<E: EthSpec> {
LightClientOptimisticUpdate,
LightClientFinalityUpdate,
Ping(Ping),
MetaData(MetadataRequest<E>),
MetaData(MetadataRequest),
}
/// Implements the encoding per supported protocol for `RPCRequest`.
impl<E: EthSpec> RequestType<E> {
impl RequestType {
/* These functions are used in the handler for stream management */
/// Maximum number of responses expected for this request.
@@ -738,10 +738,10 @@ impl<E: EthSpec> RequestType<E> {
RequestType::Goodbye(_) => 0,
RequestType::BlocksByRange(req) => *req.count(),
RequestType::BlocksByRoot(req) => req.block_roots().len() as u64,
RequestType::BlobsByRange(req) => req.max_blobs_requested::<E>(),
RequestType::BlobsByRange(req) => req.max_blobs_requested(),
RequestType::BlobsByRoot(req) => req.blob_ids.len() as u64,
RequestType::DataColumnsByRoot(req) => req.data_column_ids.len() as u64,
RequestType::DataColumnsByRange(req) => req.max_requested::<E>(),
RequestType::DataColumnsByRange(req) => req.max_requested(),
RequestType::Ping(_) => 1,
RequestType::MetaData(_) => 1,
RequestType::LightClientBootstrap(_) => 1,
@@ -973,7 +973,7 @@ impl std::error::Error for RPCError {
}
}
impl<E: EthSpec> std::fmt::Display for RequestType<E> {
impl std::fmt::Display for RequestType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RequestType::Status(status) => write!(f, "Status Message: {}", status),

View File

@@ -9,7 +9,6 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use tokio::time::Interval;
use types::EthSpec;
/// Nanoseconds since a given time.
// Maintained as u64 to reduce footprint
@@ -252,7 +251,7 @@ pub trait RateLimiterItem {
fn max_responses(&self) -> u64;
}
impl<E: EthSpec> RateLimiterItem for super::RequestType<E> {
impl RateLimiterItem for super::RequestType {
fn protocol(&self) -> Protocol {
self.versioned_protocol().protocol()
}

View File

@@ -19,8 +19,8 @@ use super::{
/// A request that was rate limited or waiting on rate limited requests for the same peer and
/// protocol.
struct QueuedRequest<Id: ReqId, E: EthSpec> {
req: RequestType<E>,
struct QueuedRequest<Id: ReqId> {
req: RequestType,
request_id: Id,
}
@@ -28,7 +28,7 @@ pub(crate) struct SelfRateLimiter<Id: ReqId, E: EthSpec> {
/// Requests queued for sending per peer. This requests are stored when the self rate
/// limiter rejects them. Rate limiting is based on a Peer and Protocol basis, therefore
/// are stored in the same way.
delayed_requests: HashMap<(PeerId, Protocol), VecDeque<QueuedRequest<Id, E>>>,
delayed_requests: HashMap<(PeerId, Protocol), VecDeque<QueuedRequest<Id>>>,
/// The delay required to allow a peer's outbound request per protocol.
next_peer_request: DelayQueue<(PeerId, Protocol)>,
/// Rate limiter for our own requests.
@@ -70,7 +70,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
&mut self,
peer_id: PeerId,
request_id: Id,
req: RequestType<E>,
req: RequestType,
) -> Result<BehaviourAction<Id, E>, Error> {
let protocol = req.versioned_protocol().protocol();
// First check that there are not already other requests waiting to be sent.
@@ -101,9 +101,9 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
limiter: &mut RateLimiter,
peer_id: PeerId,
request_id: Id,
req: RequestType<E>,
req: RequestType,
log: &Logger,
) -> Result<BehaviourAction<Id, E>, (QueuedRequest<Id, E>, Duration)> {
) -> Result<BehaviourAction<Id, E>, (QueuedRequest<Id>, Duration)> {
match limiter.allows(&peer_id, &req) {
Ok(()) => Ok(BehaviourAction::NotifyHandler {
peer_id,

View File

@@ -80,7 +80,7 @@ pub enum NetworkEvent<E: EthSpec> {
/// Identifier of the request. All responses to this request must use this id.
id: PeerRequestId,
/// Request the peer sent.
request: rpc::Request<E>,
request: rpc::Request,
},
ResponseReceived {
/// Peer that sent the response.
@@ -965,7 +965,7 @@ impl<E: EthSpec> Network<E> {
&mut self,
peer_id: PeerId,
request_id: AppRequestId,
request: RequestType<E>,
request: RequestType,
) -> Result<(), (AppRequestId, RPCError)> {
// Check if the peer is connected before sending an RPC request
if !self.swarm.is_connected(&peer_id) {
@@ -1178,7 +1178,7 @@ impl<E: EthSpec> Network<E> {
/// Sends a METADATA response to a peer.
fn send_meta_data_response(
&mut self,
_req: MetadataRequest<E>,
_req: MetadataRequest,
id: PeerRequestId,
request_id: rpc::RequestId,
peer_id: PeerId,

View File

@@ -327,6 +327,7 @@ fn test_blobs_by_range_chunked_rpc() {
let rpc_request = RequestType::BlobsByRange(BlobsByRangeRequest {
start_slot: 0,
count: slot_count,
max_blobs_per_block: E::max_blobs_per_block(),
});
// BlocksByRange Response

View File

@@ -793,7 +793,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
// Should not send more than max request blocks
if req.max_blobs_requested::<T::EthSpec>() > self.chain.spec.max_request_blob_sidecars {
if req.max_blobs_requested() > self.chain.spec.max_request_blob_sidecars {
return Err((
RpcErrorResponse::InvalidRequest,
"Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`",
@@ -998,7 +998,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
// Should not send more than max request data columns
if req.max_requested::<T::EthSpec>() > self.chain.spec.max_request_data_column_sidecars {
if req.max_requested() > self.chain.spec.max_request_data_column_sidecars {
return Err((
RpcErrorResponse::InvalidRequest,
"Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`",

View File

@@ -30,9 +30,9 @@ use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, Epoch, Hash256, MainnetEthSpec,
ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, Slot,
SubnetId,
Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, Epoch, EthSpec, Hash256,
MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock,
SignedVoluntaryExit, Slot, SubnetId,
};
type E = MainnetEthSpec;
@@ -366,6 +366,7 @@ impl TestRig {
BlobsByRangeRequest {
start_slot: 0,
count,
max_blobs_per_block: E::max_blobs_per_block(),
},
)
.unwrap();

View File

@@ -58,7 +58,7 @@ pub enum RouterMessage<E: EthSpec> {
RPCRequestReceived {
peer_id: PeerId,
id: PeerRequestId,
request: rpc::Request<E>,
request: rpc::Request,
},
/// An RPC response has been received.
RPCResponseReceived {
@@ -193,11 +193,11 @@ impl<T: BeaconChainTypes> Router<T> {
/* RPC - Related functionality */
/// A new RPC request has been received from the network.
fn handle_rpc_request<E: EthSpec>(
fn handle_rpc_request(
&mut self,
peer_id: PeerId,
request_id: PeerRequestId,
rpc_request: rpc::Request<E>,
rpc_request: rpc::Request,
) {
if !self.network_globals.peers.read().is_connected(&peer_id) {
debug!(self.log, "Dropping request of disconnected peer"; "peer_id" => %peer_id, "request" => ?rpc_request);
@@ -824,7 +824,7 @@ impl<E: EthSpec> HandlerNetworkContext<E> {
}
/// Sends a request to the network task.
pub fn send_processor_request(&mut self, peer_id: PeerId, request: RequestType<E>) {
pub fn send_processor_request(&mut self, peer_id: PeerId, request: RequestType) {
self.inform_network(NetworkMessage::SendRequest {
peer_id,
request_id: AppRequestId::Router,

View File

@@ -62,7 +62,7 @@ pub enum NetworkMessage<E: EthSpec> {
/// Send an RPC request to the libp2p service.
SendRequest {
peer_id: PeerId,
request: RequestType<E>,
request: RequestType,
request_id: AppRequestId,
},
/// Send a successful Response to the libp2p service.

View File

@@ -415,6 +415,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request: RequestType::BlobsByRange(BlobsByRangeRequest {
start_slot: *request.start_slot(),
count: *request.count(),
max_blobs_per_block: T::EthSpec::max_blobs_per_block(),
}),
request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }),
})