mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-10 12:11:59 +00:00
Fix BlobsByRange by reverting PR6462 (#6526)
* Revert "Remove generic E from RequestId (#6462)"
This reverts commit 772929fae2.
This commit is contained in:
@@ -28,7 +28,7 @@ const CONTEXT_BYTES_LEN: usize = 4;
|
||||
|
||||
/* Inbound Codec */
|
||||
|
||||
pub struct SSZSnappyInboundCodec<E> {
|
||||
pub struct SSZSnappyInboundCodec<E: EthSpec> {
|
||||
protocol: ProtocolId,
|
||||
inner: Uvi<usize>,
|
||||
len: Option<usize>,
|
||||
@@ -143,7 +143,7 @@ impl<E: EthSpec> Encoder<RpcResponse<E>> for SSZSnappyInboundCodec<E> {
|
||||
|
||||
// Decoder for inbound streams: Decodes RPC requests from peers
|
||||
impl<E: EthSpec> Decoder for SSZSnappyInboundCodec<E> {
|
||||
type Item = RequestType;
|
||||
type Item = RequestType<E>;
|
||||
type Error = RPCError;
|
||||
|
||||
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||
@@ -195,7 +195,7 @@ impl<E: EthSpec> Decoder for SSZSnappyInboundCodec<E> {
|
||||
}
|
||||
|
||||
/* Outbound Codec: Codec for initiating RPC requests */
|
||||
pub struct SSZSnappyOutboundCodec<E> {
|
||||
pub struct SSZSnappyOutboundCodec<E: EthSpec> {
|
||||
inner: Uvi<usize>,
|
||||
len: Option<usize>,
|
||||
protocol: ProtocolId,
|
||||
@@ -322,10 +322,10 @@ impl<E: EthSpec> SSZSnappyOutboundCodec<E> {
|
||||
}
|
||||
|
||||
// Encoder for outbound streams: Encodes RPC Requests to peers
|
||||
impl<E: EthSpec> Encoder<RequestType> for SSZSnappyOutboundCodec<E> {
|
||||
impl<E: EthSpec> Encoder<RequestType<E>> for SSZSnappyOutboundCodec<E> {
|
||||
type Error = RPCError;
|
||||
|
||||
fn encode(&mut self, item: RequestType, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
||||
fn encode(&mut self, item: RequestType<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
||||
let bytes = match item {
|
||||
RequestType::Status(req) => req.as_ssz_bytes(),
|
||||
RequestType::Goodbye(req) => req.as_ssz_bytes(),
|
||||
@@ -549,11 +549,11 @@ fn handle_length(
|
||||
/// Decodes an `InboundRequest` from the byte stream.
|
||||
/// `decoded_buffer` should be an ssz-encoded bytestream with
|
||||
// length = length-prefix received in the beginning of the stream.
|
||||
fn handle_rpc_request(
|
||||
fn handle_rpc_request<E: EthSpec>(
|
||||
versioned_protocol: SupportedProtocol,
|
||||
decoded_buffer: &[u8],
|
||||
spec: &ChainSpec,
|
||||
) -> Result<Option<RequestType>, RPCError> {
|
||||
) -> Result<Option<RequestType<E>>, RPCError> {
|
||||
match versioned_protocol {
|
||||
SupportedProtocol::StatusV1 => Ok(Some(RequestType::Status(
|
||||
StatusMessage::from_ssz_bytes(decoded_buffer)?,
|
||||
@@ -1035,7 +1035,6 @@ mod tests {
|
||||
BlobsByRangeRequest {
|
||||
start_slot: 0,
|
||||
count: 10,
|
||||
max_blobs_per_block: Spec::max_blobs_per_block(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1181,7 +1180,7 @@ mod tests {
|
||||
}
|
||||
|
||||
/// Verifies that requests we send are encoded in a way that we would correctly decode too.
|
||||
fn encode_then_decode_request(req: RequestType, fork_name: ForkName, spec: &ChainSpec) {
|
||||
fn encode_then_decode_request(req: RequestType<Spec>, fork_name: ForkName, spec: &ChainSpec) {
|
||||
let fork_context = Arc::new(fork_context(fork_name));
|
||||
let max_packet_size = max_rpc_size(&fork_context, spec.max_chunk_size as usize);
|
||||
let protocol = ProtocolId::new(req.versioned_protocol(), Encoding::SSZSnappy);
|
||||
@@ -1778,7 +1777,7 @@ mod tests {
|
||||
fn test_encode_then_decode_request() {
|
||||
let chain_spec = Spec::default_spec();
|
||||
|
||||
let requests: &[RequestType] = &[
|
||||
let requests: &[RequestType<Spec>] = &[
|
||||
RequestType::Ping(ping_message()),
|
||||
RequestType::Status(status_message()),
|
||||
RequestType::Goodbye(GoodbyeReason::Fault),
|
||||
|
||||
@@ -20,7 +20,6 @@ use slog::{crit, debug, trace};
|
||||
use smallvec::SmallVec;
|
||||
use std::{
|
||||
collections::{hash_map::Entry, VecDeque},
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
@@ -97,7 +96,7 @@ where
|
||||
events_out: SmallVec<[HandlerEvent<Id, E>; 4]>,
|
||||
|
||||
/// Queue of outbound substreams to open.
|
||||
dial_queue: SmallVec<[(Id, RequestType); 4]>,
|
||||
dial_queue: SmallVec<[(Id, RequestType<E>); 4]>,
|
||||
|
||||
/// Current number of concurrent outbound substreams being opened.
|
||||
dial_negotiated: u32,
|
||||
@@ -207,7 +206,7 @@ pub enum OutboundSubstreamState<E: EthSpec> {
|
||||
/// The framed negotiated substream.
|
||||
substream: Box<OutboundFramed<Stream, E>>,
|
||||
/// Keeps track of the actual request sent.
|
||||
request: RequestType,
|
||||
request: RequestType<E>,
|
||||
},
|
||||
/// Closing an outbound substream>
|
||||
Closing(Box<OutboundFramed<Stream, E>>),
|
||||
@@ -275,7 +274,7 @@ where
|
||||
}
|
||||
|
||||
/// Opens an outbound substream with a request.
|
||||
fn send_request(&mut self, id: Id, req: RequestType) {
|
||||
fn send_request(&mut self, id: Id, req: RequestType<E>) {
|
||||
match self.state {
|
||||
HandlerState::Active => {
|
||||
self.dial_queue.push((id, req));
|
||||
@@ -331,7 +330,7 @@ where
|
||||
type ToBehaviour = HandlerEvent<Id, E>;
|
||||
type InboundProtocol = RPCProtocol<E>;
|
||||
type OutboundProtocol = OutboundRequestContainer<E>;
|
||||
type OutboundOpenInfo = (Id, RequestType); // Keep track of the id and the request
|
||||
type OutboundOpenInfo = (Id, RequestType<E>); // Keep track of the id and the request
|
||||
type InboundOpenInfo = ();
|
||||
|
||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
|
||||
@@ -789,7 +788,6 @@ where
|
||||
req: req.clone(),
|
||||
fork_context: self.fork_context.clone(),
|
||||
max_rpc_size: self.listen_protocol().upgrade().max_rpc_size,
|
||||
phantom: PhantomData,
|
||||
},
|
||||
(),
|
||||
)
|
||||
@@ -907,7 +905,7 @@ where
|
||||
fn on_fully_negotiated_outbound(
|
||||
&mut self,
|
||||
substream: OutboundFramed<Stream, E>,
|
||||
(id, request): (Id, RequestType),
|
||||
(id, request): (Id, RequestType<E>),
|
||||
) {
|
||||
self.dial_negotiated -= 1;
|
||||
// Reset any io-retries counter.
|
||||
@@ -963,7 +961,7 @@ where
|
||||
}
|
||||
fn on_dial_upgrade_error(
|
||||
&mut self,
|
||||
request_info: (Id, RequestType),
|
||||
request_info: (Id, RequestType<E>),
|
||||
error: StreamUpgradeError<RPCError>,
|
||||
) {
|
||||
let (id, req) = request_info;
|
||||
|
||||
@@ -8,6 +8,7 @@ use ssz_derive::{Decode, Encode};
|
||||
use ssz_types::{typenum::U256, VariableList};
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt::Display;
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
use strum::IntoStaticStr;
|
||||
@@ -93,19 +94,27 @@ pub struct Ping {
|
||||
variant_attributes(derive(Clone, Debug, PartialEq, Serialize),)
|
||||
)]
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct MetadataRequest;
|
||||
pub struct MetadataRequest<E: EthSpec> {
|
||||
_phantom_data: PhantomData<E>,
|
||||
}
|
||||
|
||||
impl MetadataRequest {
|
||||
impl<E: EthSpec> MetadataRequest<E> {
|
||||
pub fn new_v1() -> Self {
|
||||
Self::V1(MetadataRequestV1 {})
|
||||
Self::V1(MetadataRequestV1 {
|
||||
_phantom_data: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn new_v2() -> Self {
|
||||
Self::V2(MetadataRequestV2 {})
|
||||
Self::V2(MetadataRequestV2 {
|
||||
_phantom_data: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn new_v3() -> Self {
|
||||
Self::V3(MetadataRequestV3 {})
|
||||
Self::V3(MetadataRequestV3 {
|
||||
_phantom_data: PhantomData,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -315,14 +324,11 @@ pub struct BlobsByRangeRequest {
|
||||
|
||||
/// The number of slots from the start slot.
|
||||
pub count: u64,
|
||||
|
||||
/// maximum number of blobs in a single block.
|
||||
pub max_blobs_per_block: usize,
|
||||
}
|
||||
|
||||
impl BlobsByRangeRequest {
|
||||
pub fn max_blobs_requested(&self) -> u64 {
|
||||
self.count.saturating_mul(self.max_blobs_per_block as u64)
|
||||
pub fn max_blobs_requested<E: EthSpec>(&self) -> u64 {
|
||||
self.count.saturating_mul(E::max_blobs_per_block() as u64)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -338,7 +344,7 @@ pub struct DataColumnsByRangeRequest {
|
||||
}
|
||||
|
||||
impl DataColumnsByRangeRequest {
|
||||
pub fn max_requested(&self) -> u64 {
|
||||
pub fn max_requested<E: EthSpec>(&self) -> u64 {
|
||||
self.count.saturating_mul(self.columns.len() as u64)
|
||||
}
|
||||
|
||||
|
||||
@@ -61,7 +61,7 @@ pub enum RPCSend<Id, E: EthSpec> {
|
||||
///
|
||||
/// The `Id` is given by the application making the request. These
|
||||
/// go over *outbound* connections.
|
||||
Request(Id, RequestType),
|
||||
Request(Id, RequestType<E>),
|
||||
/// A response sent from Lighthouse.
|
||||
///
|
||||
/// The `SubstreamId` must correspond to the RPC-given ID of the original request received from the
|
||||
@@ -79,7 +79,7 @@ pub enum RPCReceived<Id, E: EthSpec> {
|
||||
///
|
||||
/// The `SubstreamId` is given by the `RPCHandler` as it identifies this request with the
|
||||
/// *inbound* substream over which it is managed.
|
||||
Request(Request),
|
||||
Request(Request<E>),
|
||||
/// A response received from the outside.
|
||||
///
|
||||
/// The `Id` corresponds to the application given ID of the original request sent to the
|
||||
@@ -113,10 +113,10 @@ impl RequestId {
|
||||
|
||||
/// An Rpc Request.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Request {
|
||||
pub struct Request<E: EthSpec> {
|
||||
pub id: RequestId,
|
||||
pub substream_id: SubstreamId,
|
||||
pub r#type: RequestType,
|
||||
pub r#type: RequestType<E>,
|
||||
}
|
||||
|
||||
impl<E: EthSpec, Id: std::fmt::Debug> std::fmt::Display for RPCSend<Id, E> {
|
||||
@@ -221,7 +221,7 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
|
||||
/// Submits an RPC request.
|
||||
///
|
||||
/// The peer must be connected for this to succeed.
|
||||
pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, req: RequestType) {
|
||||
pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, req: RequestType<E>) {
|
||||
let event = if let Some(self_limiter) = self.self_limiter.as_mut() {
|
||||
match self_limiter.allows(peer_id, request_id, req) {
|
||||
Ok(event) => event,
|
||||
|
||||
@@ -7,7 +7,6 @@ use futures::future::BoxFuture;
|
||||
use futures::prelude::{AsyncRead, AsyncWrite};
|
||||
use futures::{FutureExt, SinkExt};
|
||||
use libp2p::core::{OutboundUpgrade, UpgradeInfo};
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use tokio_util::{
|
||||
codec::Framed,
|
||||
@@ -20,14 +19,13 @@ use types::{EthSpec, ForkContext};
|
||||
// `OutboundUpgrade`
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct OutboundRequestContainer<E> {
|
||||
pub req: RequestType,
|
||||
pub struct OutboundRequestContainer<E: EthSpec> {
|
||||
pub req: RequestType<E>,
|
||||
pub fork_context: Arc<ForkContext>,
|
||||
pub max_rpc_size: usize,
|
||||
pub phantom: PhantomData<E>,
|
||||
}
|
||||
|
||||
impl<E> UpgradeInfo for OutboundRequestContainer<E> {
|
||||
impl<E: EthSpec> UpgradeInfo for OutboundRequestContainer<E> {
|
||||
type Info = ProtocolId;
|
||||
type InfoIter = Vec<Self::Info>;
|
||||
|
||||
|
||||
@@ -686,7 +686,7 @@ pub fn rpc_data_column_limits<E: EthSpec>() -> RpcLimits {
|
||||
// The inbound protocol reads the request, decodes it and returns the stream to the protocol
|
||||
// handler to respond to once ready.
|
||||
|
||||
pub type InboundOutput<TSocket, E> = (RequestType, InboundFramed<TSocket, E>);
|
||||
pub type InboundOutput<TSocket, E> = (RequestType<E>, InboundFramed<TSocket, E>);
|
||||
pub type InboundFramed<TSocket, E> =
|
||||
Framed<std::pin::Pin<Box<TimeoutStream<Compat<TSocket>>>>, SSZSnappyInboundCodec<E>>;
|
||||
|
||||
@@ -754,7 +754,7 @@ where
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum RequestType {
|
||||
pub enum RequestType<E: EthSpec> {
|
||||
Status(StatusMessage),
|
||||
Goodbye(GoodbyeReason),
|
||||
BlocksByRange(OldBlocksByRangeRequest),
|
||||
@@ -768,11 +768,11 @@ pub enum RequestType {
|
||||
LightClientFinalityUpdate,
|
||||
LightClientUpdatesByRange(LightClientUpdatesByRangeRequest),
|
||||
Ping(Ping),
|
||||
MetaData(MetadataRequest),
|
||||
MetaData(MetadataRequest<E>),
|
||||
}
|
||||
|
||||
/// Implements the encoding per supported protocol for `RPCRequest`.
|
||||
impl RequestType {
|
||||
impl<E: EthSpec> RequestType<E> {
|
||||
/* These functions are used in the handler for stream management */
|
||||
|
||||
/// Maximum number of responses expected for this request.
|
||||
@@ -782,10 +782,10 @@ impl RequestType {
|
||||
RequestType::Goodbye(_) => 0,
|
||||
RequestType::BlocksByRange(req) => *req.count(),
|
||||
RequestType::BlocksByRoot(req) => req.block_roots().len() as u64,
|
||||
RequestType::BlobsByRange(req) => req.max_blobs_requested(),
|
||||
RequestType::BlobsByRange(req) => req.max_blobs_requested::<E>(),
|
||||
RequestType::BlobsByRoot(req) => req.blob_ids.len() as u64,
|
||||
RequestType::DataColumnsByRoot(req) => req.data_column_ids.len() as u64,
|
||||
RequestType::DataColumnsByRange(req) => req.max_requested(),
|
||||
RequestType::DataColumnsByRange(req) => req.max_requested::<E>(),
|
||||
RequestType::Ping(_) => 1,
|
||||
RequestType::MetaData(_) => 1,
|
||||
RequestType::LightClientBootstrap(_) => 1,
|
||||
@@ -1027,7 +1027,7 @@ impl std::error::Error for RPCError {
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for RequestType {
|
||||
impl<E: EthSpec> std::fmt::Display for RequestType<E> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
RequestType::Status(status) => write!(f, "Status Message: {}", status),
|
||||
|
||||
@@ -9,6 +9,7 @@ use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::time::Interval;
|
||||
use types::EthSpec;
|
||||
|
||||
/// Nanoseconds since a given time.
|
||||
// Maintained as u64 to reduce footprint
|
||||
@@ -261,7 +262,7 @@ pub trait RateLimiterItem {
|
||||
fn max_responses(&self) -> u64;
|
||||
}
|
||||
|
||||
impl RateLimiterItem for super::RequestType {
|
||||
impl<E: EthSpec> RateLimiterItem for super::RequestType<E> {
|
||||
fn protocol(&self) -> Protocol {
|
||||
self.versioned_protocol().protocol()
|
||||
}
|
||||
|
||||
@@ -19,8 +19,8 @@ use super::{
|
||||
|
||||
/// A request that was rate limited or waiting on rate limited requests for the same peer and
|
||||
/// protocol.
|
||||
struct QueuedRequest<Id: ReqId> {
|
||||
req: RequestType,
|
||||
struct QueuedRequest<Id: ReqId, E: EthSpec> {
|
||||
req: RequestType<E>,
|
||||
request_id: Id,
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ pub(crate) struct SelfRateLimiter<Id: ReqId, E: EthSpec> {
|
||||
/// Requests queued for sending per peer. This requests are stored when the self rate
|
||||
/// limiter rejects them. Rate limiting is based on a Peer and Protocol basis, therefore
|
||||
/// are stored in the same way.
|
||||
delayed_requests: HashMap<(PeerId, Protocol), VecDeque<QueuedRequest<Id>>>,
|
||||
delayed_requests: HashMap<(PeerId, Protocol), VecDeque<QueuedRequest<Id, E>>>,
|
||||
/// The delay required to allow a peer's outbound request per protocol.
|
||||
next_peer_request: DelayQueue<(PeerId, Protocol)>,
|
||||
/// Rate limiter for our own requests.
|
||||
@@ -70,7 +70,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: Id,
|
||||
req: RequestType,
|
||||
req: RequestType<E>,
|
||||
) -> Result<BehaviourAction<Id, E>, Error> {
|
||||
let protocol = req.versioned_protocol().protocol();
|
||||
// First check that there are not already other requests waiting to be sent.
|
||||
@@ -101,9 +101,9 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
|
||||
limiter: &mut RateLimiter,
|
||||
peer_id: PeerId,
|
||||
request_id: Id,
|
||||
req: RequestType,
|
||||
req: RequestType<E>,
|
||||
log: &Logger,
|
||||
) -> Result<BehaviourAction<Id, E>, (QueuedRequest<Id>, Duration)> {
|
||||
) -> Result<BehaviourAction<Id, E>, (QueuedRequest<Id, E>, Duration)> {
|
||||
match limiter.allows(&peer_id, &req) {
|
||||
Ok(()) => Ok(BehaviourAction::NotifyHandler {
|
||||
peer_id,
|
||||
|
||||
@@ -80,7 +80,7 @@ pub enum NetworkEvent<E: EthSpec> {
|
||||
/// Identifier of the request. All responses to this request must use this id.
|
||||
id: PeerRequestId,
|
||||
/// Request the peer sent.
|
||||
request: rpc::Request,
|
||||
request: rpc::Request<E>,
|
||||
},
|
||||
ResponseReceived {
|
||||
/// Peer that sent the response.
|
||||
@@ -966,7 +966,7 @@ impl<E: EthSpec> Network<E> {
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: AppRequestId,
|
||||
request: RequestType,
|
||||
request: RequestType<E>,
|
||||
) -> Result<(), (AppRequestId, RPCError)> {
|
||||
// Check if the peer is connected before sending an RPC request
|
||||
if !self.swarm.is_connected(&peer_id) {
|
||||
@@ -1179,7 +1179,7 @@ impl<E: EthSpec> Network<E> {
|
||||
/// Sends a METADATA response to a peer.
|
||||
fn send_meta_data_response(
|
||||
&mut self,
|
||||
_req: MetadataRequest,
|
||||
_req: MetadataRequest<E>,
|
||||
id: PeerRequestId,
|
||||
request_id: rpc::RequestId,
|
||||
peer_id: PeerId,
|
||||
|
||||
@@ -327,7 +327,6 @@ fn test_blobs_by_range_chunked_rpc() {
|
||||
let rpc_request = RequestType::BlobsByRange(BlobsByRangeRequest {
|
||||
start_slot: 0,
|
||||
count: slot_count,
|
||||
max_blobs_per_block: E::max_blobs_per_block(),
|
||||
});
|
||||
|
||||
// BlocksByRange Response
|
||||
|
||||
Reference in New Issue
Block a user