add a bunch of blob coupling boiler plate, add a blobs by root request

This commit is contained in:
realbigsean
2022-11-15 16:43:56 -05:00
parent fe04d945cc
commit 7162e5e23b
25 changed files with 334 additions and 211 deletions

View File

@@ -3,7 +3,7 @@ use crate::rpc::{
protocol::{Encoding, Protocol, ProtocolId, RPCError, Version, ERROR_TYPE_MAX, ERROR_TYPE_MIN},
};
use crate::rpc::{InboundRequest, OutboundRequest, RPCCodedResponse, RPCResponse};
use crate::{rpc::methods::*, EnrSyncCommitteeBitfield, SignedBeaconBlockAndBlobsSidecar};
use crate::{rpc::methods::*, EnrSyncCommitteeBitfield};
use libp2p::bytes::BytesMut;
use snap::read::FrameDecoder;
use snap::write::FrameEncoder;
@@ -15,11 +15,7 @@ use std::io::{Read, Write};
use std::marker::PhantomData;
use std::sync::Arc;
use tokio_util::codec::{Decoder, Encoder};
use types::{
BlobsSidecar, EthSpec, ForkContext, ForkName, SignedBeaconBlock, SignedBeaconBlockAltair,
SignedBeaconBlockBase, SignedBeaconBlockCapella, SignedBeaconBlockEip4844,
SignedBeaconBlockMerge,
};
use types::{BlobsSidecar, EthSpec, ForkContext, ForkName, SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockAndBlobsSidecar, SignedBeaconBlockBase, SignedBeaconBlockCapella, SignedBeaconBlockEip4844, SignedBeaconBlockMerge};
use unsigned_varint::codec::Uvi;
const CONTEXT_BYTES_LEN: usize = 4;
@@ -72,6 +68,7 @@ impl<TSpec: EthSpec> Encoder<RPCCodedResponse<TSpec>> for SSZSnappyInboundCodec<
RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(),
RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(),
RPCResponse::BlobsByRange(res) => res.as_ssz_bytes(),
RPCResponse::BlobsByRoot(res) => res.as_ssz_bytes(),
RPCResponse::Pong(res) => res.data.as_ssz_bytes(),
RPCResponse::MetaData(res) =>
// Encode the correct version of the MetaData response based on the negotiated version.
@@ -231,6 +228,7 @@ impl<TSpec: EthSpec> Encoder<OutboundRequest<TSpec>> for SSZSnappyOutboundCodec<
OutboundRequest::BlocksByRange(req) => req.as_ssz_bytes(),
OutboundRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(),
OutboundRequest::BlobsByRange(req) => req.as_ssz_bytes(),
OutboundRequest::BlobsByRoot(req) => req.block_roots.as_ssz_bytes(),
OutboundRequest::Ping(req) => req.as_ssz_bytes(),
OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode
};
@@ -313,7 +311,8 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyOutboundCodec<TSpec> {
let _read_bytes = src.split_to(n as usize);
match self.protocol.version {
Version::V1 => handle_v1_response(self.protocol.message_name, &decoded_buffer),
Version::V1 => handle_v1_response(self.protocol.message_name, &decoded_buffer,
&mut self.fork_name, ),
Version::V2 => handle_v2_response(
self.protocol.message_name,
&decoded_buffer,
@@ -483,6 +482,11 @@ fn handle_v1_request<T: EthSpec>(
Protocol::BlobsByRange => Ok(Some(InboundRequest::BlobsByRange(
BlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?,
))),
Protocol::BlobsByRoot => Ok(Some(InboundRequest::BlobsByRoot(
BlobsByRootRequest{
block_roots: VariableList::from_ssz_bytes(decoded_buffer)?,
},
))),
Protocol::Ping => Ok(Some(InboundRequest::Ping(Ping {
data: u64::from_ssz_bytes(decoded_buffer)?,
}))),
@@ -540,6 +544,7 @@ fn handle_v2_request<T: EthSpec>(
fn handle_v1_response<T: EthSpec>(
protocol: Protocol,
decoded_buffer: &[u8],
fork_name: &mut Option<ForkName>,
) -> Result<Option<RPCResponse<T>>, RPCError> {
match protocol {
Protocol::Status => Ok(Some(RPCResponse::Status(StatusMessage::from_ssz_bytes(
@@ -555,7 +560,40 @@ fn handle_v1_response<T: EthSpec>(
Protocol::BlocksByRoot => Ok(Some(RPCResponse::BlocksByRoot(Arc::new(
SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?),
)))),
Protocol::BlobsByRange => Err(RPCError::InvalidData("blobs by range via v1".to_string())),
Protocol::BlobsByRange => {
let fork_name = fork_name.take().ok_or_else(|| {
RPCError::ErrorResponse(
RPCResponseErrorCode::InvalidRequest,
format!("No context bytes provided for {} response", protocol),
)
})?;
match fork_name {
ForkName::Eip4844 => Ok(Some(RPCResponse::BlobsByRange(Arc::new(
SignedBeaconBlockAndBlobsSidecar::from_ssz_bytes(decoded_buffer)?,
)))),
_ => Err(RPCError::ErrorResponse(
RPCResponseErrorCode::InvalidRequest,
"Invalid forkname for blobsbyrange".to_string(),
)),
}
},
Protocol::BlobsByRoot => {
let fork_name = fork_name.take().ok_or_else(|| {
RPCError::ErrorResponse(
RPCResponseErrorCode::InvalidRequest,
format!("No context bytes provided for {} response", protocol),
)
})?;
match fork_name {
ForkName::Eip4844 => Ok(Some(RPCResponse::BlobsByRoot(Arc::new(
SignedBeaconBlockAndBlobsSidecar::from_ssz_bytes(decoded_buffer)?,
)))),
_ => Err(RPCError::ErrorResponse(
RPCResponseErrorCode::InvalidRequest,
"Invalid forkname for blobsbyroot".to_string(),
)),
}
},
Protocol::Ping => Ok(Some(RPCResponse::Pong(Ping {
data: u64::from_ssz_bytes(decoded_buffer)?,
}))),
@@ -640,15 +678,8 @@ fn handle_v2_response<T: EthSpec>(
)?),
)))),
},
Protocol::BlobsByRange => match fork_name {
ForkName::Eip4844 => Ok(Some(RPCResponse::BlobsByRange(Arc::new(
SignedBeaconBlockAndBlobsSidecar::from_ssz_bytes(decoded_buffer)?,
)))),
_ => Err(RPCError::ErrorResponse(
RPCResponseErrorCode::InvalidRequest,
"Invalid forkname for blobsbyrange".to_string(),
)),
},
Protocol::BlobsByRange => Err(RPCError::InvalidData("blobs by range via v2".to_string())),
Protocol::BlobsByRoot => Err(RPCError::InvalidData("blobs by range via v2".to_string())),
_ => Err(RPCError::ErrorResponse(
RPCResponseErrorCode::InvalidRequest,
"Invalid v2 request".to_string(),
@@ -917,6 +948,9 @@ mod tests {
OutboundRequest::BlobsByRange(blbrange) => {
assert_eq!(decoded, InboundRequest::BlobsByRange(blbrange))
}
OutboundRequest::BlobsByRoot(blbroot) => {
assert_eq!(decoded, InboundRequest::BlobsByRoot(bbroot))
}
OutboundRequest::Ping(ping) => {
assert_eq!(decoded, InboundRequest::Ping(ping))
}

View File

@@ -14,7 +14,7 @@ use strum::IntoStaticStr;
use superstruct::superstruct;
use types::blobs_sidecar::BlobsSidecar;
use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
use crate::SignedBeaconBlockAndBlobsSidecar;
use types::SignedBeaconBlockAndBlobsSidecar;
/// Maximum number of blocks in a single request.
pub type MaxRequestBlocks = U1024;
@@ -243,6 +243,13 @@ pub struct BlocksByRootRequest {
pub block_roots: VariableList<Hash256, MaxRequestBlocks>,
}
/// Request a number of beacon blocks and blobs from a peer.
#[derive(Clone, Debug, PartialEq)]
pub struct BlobsByRootRequest {
/// The list of beacon block roots being requested.
pub block_roots: VariableList<Hash256, MaxRequestBlocks>,
}
/* RPC Handling and Grouping */
// Collection of enums and structs used by the Codecs to encode/decode RPC messages
@@ -261,6 +268,9 @@ pub enum RPCResponse<T: EthSpec> {
/// A response to a get BLOBS_BY_RANGE request
BlobsByRange(Arc<SignedBeaconBlockAndBlobsSidecar<T>>),
/// A response to a get BLOBS_BY_ROOT request.
BlobsByRoot(Arc<SignedBeaconBlockAndBlobsSidecar<T>>),
/// A PONG response to a PING request.
Pong(Ping),
@@ -279,6 +289,9 @@ pub enum ResponseTermination {
/// Blobs by range stream termination.
BlobsByRange,
/// Blobs by root stream termination.
BlobsByRoot,
}
/// The structured response containing a result/code indicating success or failure
@@ -341,6 +354,7 @@ impl<T: EthSpec> RPCCodedResponse<T> {
RPCResponse::BlocksByRange(_) => true,
RPCResponse::BlocksByRoot(_) => true,
RPCResponse::BlobsByRange(_) => true,
RPCResponse::BlobsByRoot(_) => true,
RPCResponse::Pong(_) => false,
RPCResponse::MetaData(_) => false,
},
@@ -376,6 +390,7 @@ impl<T: EthSpec> RPCResponse<T> {
RPCResponse::BlocksByRange(_) => Protocol::BlocksByRange,
RPCResponse::BlocksByRoot(_) => Protocol::BlocksByRoot,
RPCResponse::BlobsByRange(_) => Protocol::BlobsByRange,
RPCResponse::BlobsByRoot(_) => Protocol::BlobsByRoot,
RPCResponse::Pong(_) => Protocol::Ping,
RPCResponse::MetaData(_) => Protocol::MetaData,
}
@@ -412,7 +427,10 @@ impl<T: EthSpec> std::fmt::Display for RPCResponse<T> {
write!(f, "BlocksByRoot: Block slot: {}", block.slot())
}
RPCResponse::BlobsByRange(blob) => {
write!(f, "BlobsByRange: Blob slot: {}", blob.beacon_block_slot)
write!(f, "BlobsByRange: Blob slot: {}", blob.blobs_sidecar.beacon_block_slot)
}
RPCResponse::BlobsByRoot(blob) => {
write!(f, "BlobsByRoot: Blob slot: {}", blob.blobs_sidecar.beacon_block_slot)
}
RPCResponse::Pong(ping) => write!(f, "Pong: {}", ping.data),
RPCResponse::MetaData(metadata) => write!(f, "Metadata: {}", metadata.seq_number()),

View File

@@ -300,6 +300,7 @@ where
ResponseTermination::BlocksByRange => Protocol::BlocksByRange,
ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot,
ResponseTermination::BlobsByRange => Protocol::BlobsByRange,
ResponseTermination::BlobsByRoot => Protocol::BlobsByRoot,
},
),
},

View File

@@ -39,6 +39,7 @@ pub enum OutboundRequest<TSpec: EthSpec> {
BlocksByRange(OldBlocksByRangeRequest),
BlocksByRoot(BlocksByRootRequest),
BlobsByRange(BlobsByRangeRequest),
BlobsByRoot(BlobsByRootRequest),
Ping(Ping),
MetaData(PhantomData<TSpec>),
}
@@ -81,6 +82,9 @@ impl<TSpec: EthSpec> OutboundRequest<TSpec> {
Version::V1,
Encoding::SSZSnappy,
)],
OutboundRequest::BlobsByRoot(_) => vec![
ProtocolId::new(Protocol::BlobsByRoot, Version::V1, Encoding::SSZSnappy),
],
OutboundRequest::Ping(_) => vec![ProtocolId::new(
Protocol::Ping,
Version::V1,
@@ -103,6 +107,7 @@ impl<TSpec: EthSpec> OutboundRequest<TSpec> {
OutboundRequest::BlocksByRange(req) => req.count,
OutboundRequest::BlocksByRoot(req) => req.block_roots.len() as u64,
OutboundRequest::BlobsByRange(req) => req.count,
OutboundRequest::BlobsByRoot(req) => req.block_roots.len() as u64,
OutboundRequest::Ping(_) => 1,
OutboundRequest::MetaData(_) => 1,
}
@@ -116,6 +121,7 @@ impl<TSpec: EthSpec> OutboundRequest<TSpec> {
OutboundRequest::BlocksByRange(_) => Protocol::BlocksByRange,
OutboundRequest::BlocksByRoot(_) => Protocol::BlocksByRoot,
OutboundRequest::BlobsByRange(_) => Protocol::BlobsByRange,
OutboundRequest::BlobsByRoot(_) => Protocol::BlobsByRoot,
OutboundRequest::Ping(_) => Protocol::Ping,
OutboundRequest::MetaData(_) => Protocol::MetaData,
}
@@ -130,6 +136,7 @@ impl<TSpec: EthSpec> OutboundRequest<TSpec> {
OutboundRequest::BlocksByRange(_) => ResponseTermination::BlocksByRange,
OutboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot,
OutboundRequest::BlobsByRange(_) => ResponseTermination::BlobsByRange,
OutboundRequest::BlobsByRoot(_) => ResponseTermination::BlobsByRoot,
OutboundRequest::Status(_) => unreachable!(),
OutboundRequest::Goodbye(_) => unreachable!(),
OutboundRequest::Ping(_) => unreachable!(),
@@ -186,6 +193,7 @@ impl<TSpec: EthSpec> std::fmt::Display for OutboundRequest<TSpec> {
OutboundRequest::BlocksByRange(req) => write!(f, "Blocks by range: {}", req),
OutboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req),
OutboundRequest::BlobsByRange(req) => write!(f, "Blobs by range: {:?}", req),
OutboundRequest::BlobsByRoot(req) => write!(f, "Blobs by root: {:?}", req),
OutboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data),
OutboundRequest::MetaData(_) => write!(f, "MetaData request"),
}

View File

@@ -107,12 +107,6 @@ lazy_static! {
.as_ssz_bytes()
.len();
pub static ref BLOBS_SIDECAR_MIN: usize = BlobsSidecar::<MainnetEthSpec>::empty()
.as_ssz_bytes()
.len();
pub static ref BLOBS_SIDECAR_MAX: usize = *BLOBS_SIDECAR_MIN // Max size of variable length `blobs` field
+ (MainnetEthSpec::max_blobs_per_block() * <Blob<MainnetEthSpec> as Encode>::ssz_fixed_len());
}
/// The maximum bytes that can be sent across the RPC pre-merge.
@@ -181,6 +175,8 @@ pub enum Protocol {
BlocksByRoot,
/// The `BlobsByRange` protocol name.
BlobsByRange,
/// The `BlobsByRoot` protocol name.
BlobsByRoot,
/// The `Ping` protocol name.
Ping,
/// The `MetaData` protocol name.
@@ -210,6 +206,7 @@ impl std::fmt::Display for Protocol {
Protocol::BlocksByRange => "beacon_blocks_by_range",
Protocol::BlocksByRoot => "beacon_blocks_by_root",
Protocol::BlobsByRange => "blobs_sidecars_by_range",
Protocol::BlobsByRoot => "beacon_block_and_blobs_sidecar_by_root",
Protocol::Ping => "ping",
Protocol::MetaData => "metadata",
};
@@ -322,6 +319,9 @@ impl ProtocolId {
<BlobsByRangeRequest as Encode>::ssz_fixed_len(),
<BlobsByRangeRequest as Encode>::ssz_fixed_len(),
),
Protocol::BlobsByRoot => {
RpcLimits::new(*BLOCKS_BY_ROOT_REQUEST_MIN, *BLOCKS_BY_ROOT_REQUEST_MAX)
}
Protocol::Ping => RpcLimits::new(
<Ping as Encode>::ssz_fixed_len(),
<Ping as Encode>::ssz_fixed_len(),
@@ -340,7 +340,11 @@ impl ProtocolId {
Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response
Protocol::BlocksByRange => rpc_block_limits_by_fork(fork_context.current_fork()),
Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork()),
Protocol::BlobsByRange => RpcLimits::new(*BLOBS_SIDECAR_MIN, *BLOBS_SIDECAR_MAX),
//FIXME(sean) add blob sizes
Protocol::BlobsByRange => rpc_block_limits_by_fork(fork_context.current_fork()),
Protocol::BlobsByRoot => rpc_block_limits_by_fork(fork_context.current_fork()),
Protocol::Ping => RpcLimits::new(
<Ping as Encode>::ssz_fixed_len(),
<Ping as Encode>::ssz_fixed_len(),
@@ -455,6 +459,7 @@ pub enum InboundRequest<TSpec: EthSpec> {
BlocksByRange(OldBlocksByRangeRequest),
BlocksByRoot(BlocksByRootRequest),
BlobsByRange(BlobsByRangeRequest),
BlobsByRoot(BlobsByRootRequest),
Ping(Ping),
MetaData(PhantomData<TSpec>),
}
@@ -499,6 +504,11 @@ impl<TSpec: EthSpec> InboundRequest<TSpec> {
Version::V1,
Encoding::SSZSnappy,
)],
InboundRequest::BlobsByRoot(_) => vec![ProtocolId::new(
Protocol::BlobsByRoot,
Version::V1,
Encoding::SSZSnappy,
)],
InboundRequest::Ping(_) => vec![ProtocolId::new(
Protocol::Ping,
Version::V1,
@@ -521,6 +531,7 @@ impl<TSpec: EthSpec> InboundRequest<TSpec> {
InboundRequest::BlocksByRange(req) => req.count,
InboundRequest::BlocksByRoot(req) => req.block_roots.len() as u64,
InboundRequest::BlobsByRange(req) => req.count,
InboundRequest::BlobsByRoot(req) => req.block_roots.len() as u64,
InboundRequest::Ping(_) => 1,
InboundRequest::MetaData(_) => 1,
}
@@ -534,6 +545,7 @@ impl<TSpec: EthSpec> InboundRequest<TSpec> {
InboundRequest::BlocksByRange(_) => Protocol::BlocksByRange,
InboundRequest::BlocksByRoot(_) => Protocol::BlocksByRoot,
InboundRequest::BlobsByRange(_) => Protocol::BlobsByRange,
InboundRequest::BlobsByRoot(_) => Protocol::BlobsByRoot,
InboundRequest::Ping(_) => Protocol::Ping,
InboundRequest::MetaData(_) => Protocol::MetaData,
}
@@ -548,6 +560,7 @@ impl<TSpec: EthSpec> InboundRequest<TSpec> {
InboundRequest::BlocksByRange(_) => ResponseTermination::BlocksByRange,
InboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot,
InboundRequest::BlobsByRange(_) => ResponseTermination::BlobsByRange,
InboundRequest::BlobsByRoot(_) => ResponseTermination::BlobsByRoot,
InboundRequest::Status(_) => unreachable!(),
InboundRequest::Goodbye(_) => unreachable!(),
InboundRequest::Ping(_) => unreachable!(),
@@ -654,6 +667,7 @@ impl<TSpec: EthSpec> std::fmt::Display for InboundRequest<TSpec> {
InboundRequest::BlocksByRange(req) => write!(f, "Blocks by range: {}", req),
InboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req),
InboundRequest::BlobsByRange(req) => write!(f, "Blobs by range: {:?}", req),
InboundRequest::BlobsByRoot(req) => write!(f, "Blobs by root: {:?}", req),
InboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data),
InboundRequest::MetaData(_) => write!(f, "MetaData request"),
}

View File

@@ -75,6 +75,8 @@ pub struct RPCRateLimiter {
bbroots_rl: Limiter<PeerId>,
/// BlobsByRange rate limiter.
blbrange_rl: Limiter<PeerId>,
/// BlobsByRoot rate limiter.
blbroot_rl: Limiter<PeerId>,
}
/// Error type for non conformant requests
@@ -102,6 +104,8 @@ pub struct RPCRateLimiterBuilder {
bbroots_quota: Option<Quota>,
/// Quota for the BlobsByRange protocol.
blbrange_quota: Option<Quota>,
/// Quota for the BlobsByRoot protocol.
blbroot_quota: Option<Quota>,
}
impl RPCRateLimiterBuilder {
@@ -121,6 +125,7 @@ impl RPCRateLimiterBuilder {
Protocol::BlocksByRange => self.bbrange_quota = q,
Protocol::BlocksByRoot => self.bbroots_quota = q,
Protocol::BlobsByRange => self.blbrange_quota = q,
Protocol::BlobsByRoot => self.blbroot_quota = q,
}
self
}
@@ -165,6 +170,10 @@ impl RPCRateLimiterBuilder {
.blbrange_quota
.ok_or("BlobsByRange quota not specified")?;
let blbroots_quota = self
.blbroot_quota
.ok_or("BlobsByRoot quota not specified")?;
// create the rate limiters
let ping_rl = Limiter::from_quota(ping_quota)?;
let metadata_rl = Limiter::from_quota(metadata_quota)?;
@@ -173,6 +182,7 @@ impl RPCRateLimiterBuilder {
let bbroots_rl = Limiter::from_quota(bbroots_quota)?;
let bbrange_rl = Limiter::from_quota(bbrange_quota)?;
let blbrange_rl = Limiter::from_quota(blbrange_quota)?;
let blbroot_rl = Limiter::from_quota(blbroots_quota)?;
// check for peers to prune every 30 seconds, starting in 30 seconds
let prune_every = tokio::time::Duration::from_secs(30);
@@ -187,6 +197,7 @@ impl RPCRateLimiterBuilder {
bbroots_rl,
bbrange_rl,
blbrange_rl,
blbroot_rl,
init_time: Instant::now(),
})
}
@@ -211,6 +222,7 @@ impl RPCRateLimiter {
Protocol::BlocksByRange => &mut self.bbrange_rl,
Protocol::BlocksByRoot => &mut self.bbroots_rl,
Protocol::BlobsByRange => &mut self.blbrange_rl,
Protocol::BlobsByRoot => &mut self.blbroot_rl,
};
check(limiter)
}
@@ -224,6 +236,7 @@ impl RPCRateLimiter {
self.bbrange_rl.prune(time_since_start);
self.bbroots_rl.prune(time_since_start);
self.blbrange_rl.prune(time_since_start);
self.blbroot_rl.prune(time_since_start);
}
}