sync tx blobs

This commit is contained in:
realbigsean
2022-02-19 15:00:45 -07:00
parent 4cdf1b546d
commit 4008da6c60
13 changed files with 157 additions and 15 deletions

View File

@@ -15,10 +15,7 @@ use std::io::{Read, Write};
use std::marker::PhantomData;
use std::sync::Arc;
use tokio_util::codec::{Decoder, Encoder};
use types::{
EthSpec, ForkContext, ForkName, SignedBeaconBlock, SignedBeaconBlockAltair,
SignedBeaconBlockBase, SignedBeaconBlockMerge, SignedBeaconBlockShanghai,
};
use types::{BlobWrapper, EthSpec, ForkContext, ForkName, SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockMerge, SignedBeaconBlockShanghai};
use unsigned_varint::codec::Uvi;
const CONTEXT_BYTES_LEN: usize = 4;
@@ -69,6 +66,7 @@ impl<TSpec: EthSpec> Encoder<RPCCodedResponse<TSpec>> for SSZSnappyInboundCodec<
RPCCodedResponse::Success(resp) => match &resp {
RPCResponse::Status(res) => res.as_ssz_bytes(),
RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(),
RPCResponse::TxBlobsByRange(res) => res.as_ssz_bytes(),
RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(),
RPCResponse::Pong(res) => res.data.as_ssz_bytes(),
RPCResponse::MetaData(res) =>
@@ -227,6 +225,7 @@ impl<TSpec: EthSpec> Encoder<OutboundRequest<TSpec>> for SSZSnappyOutboundCodec<
OutboundRequest::Status(req) => req.as_ssz_bytes(),
OutboundRequest::Goodbye(req) => req.as_ssz_bytes(),
OutboundRequest::BlocksByRange(req) => req.as_ssz_bytes(),
OutboundRequest::TxBlobsByRange(req) => req.as_ssz_bytes(),
OutboundRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(),
OutboundRequest::Ping(req) => req.as_ssz_bytes(),
OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode
@@ -469,6 +468,9 @@ fn handle_v1_request<T: EthSpec>(
Protocol::BlocksByRange => Ok(Some(InboundRequest::BlocksByRange(
OldBlocksByRangeRequest::from_ssz_bytes(decoded_buffer)?,
))),
Protocol::TxBlobsByRange => Ok(Some(InboundRequest::TxBlobsByRange(
TxBlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?,
))),
Protocol::BlocksByRoot => Ok(Some(InboundRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: VariableList::from_ssz_bytes(decoded_buffer)?,
}))),
@@ -501,6 +503,9 @@ fn handle_v2_request<T: EthSpec>(
Protocol::BlocksByRange => Ok(Some(InboundRequest::BlocksByRange(
OldBlocksByRangeRequest::from_ssz_bytes(decoded_buffer)?,
))),
Protocol::TxBlobsByRange => Ok(Some(InboundRequest::TxBlobsByRange(
TxBlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?,
))),
Protocol::BlocksByRoot => Ok(Some(InboundRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: VariableList::from_ssz_bytes(decoded_buffer)?,
}))),
@@ -538,6 +543,9 @@ fn handle_v1_response<T: EthSpec>(
Protocol::BlocksByRange => Ok(Some(RPCResponse::BlocksByRange(Arc::new(
SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?),
)))),
Protocol::TxBlobsByRange => Ok(Some(RPCResponse::TxBlobsByRange(Arc::new(
BlobWrapper::from_ssz_bytes(decoded_buffer)?),
))),
Protocol::BlocksByRoot => Ok(Some(RPCResponse::BlocksByRoot(Arc::new(
SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?),
)))),
@@ -595,6 +603,13 @@ fn handle_v2_response<T: EthSpec>(
)?),
)))),
},
Protocol::TxBlobsByRange => {
Ok(Some(RPCResponse::TxBlobsByRange(Box::new(
BlobWrapper::from_ssz_bytes(
decoded_buffer,
)?
))))
},
Protocol::BlocksByRoot => match fork_name {
ForkName::Altair => Ok(Some(RPCResponse::BlocksByRoot(Arc::new(
SignedBeaconBlock::Altair(SignedBeaconBlockAltair::from_ssz_bytes(

View File

@@ -12,7 +12,7 @@ use std::ops::Deref;
use std::sync::Arc;
use strum::IntoStaticStr;
use superstruct::superstruct;
use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
use types::{BlobWrapper, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
/// Maximum number of blocks in a single request.
pub type MaxRequestBlocks = U1024;
@@ -221,6 +221,12 @@ pub struct OldBlocksByRangeRequest {
pub step: u64,
}
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct TxBlobsByRangeRequest {
pub execution_block_number: u64,
pub count: u64,
}
/// Request a number of beacon block bodies from a peer.
#[derive(Clone, Debug, PartialEq)]
pub struct BlocksByRootRequest {
@@ -240,6 +246,8 @@ pub enum RPCResponse<T: EthSpec> {
/// batch.
BlocksByRange(Arc<SignedBeaconBlock<T>>),
TxBlobsByRange(Box<BlobWrapper<T>>),
/// A response to a get BLOCKS_BY_ROOT request.
BlocksByRoot(Arc<SignedBeaconBlock<T>>),
@@ -256,6 +264,8 @@ pub enum ResponseTermination {
/// Blocks by range stream termination.
BlocksByRange,
TxBlobsByRange,
/// Blocks by root stream termination.
BlocksByRoot,
}
@@ -318,6 +328,7 @@ impl<T: EthSpec> RPCCodedResponse<T> {
RPCCodedResponse::Success(resp) => match resp {
RPCResponse::Status(_) => false,
RPCResponse::BlocksByRange(_) => true,
RPCResponse::TxBlobsByRange(_) => true,
RPCResponse::BlocksByRoot(_) => true,
RPCResponse::Pong(_) => false,
RPCResponse::MetaData(_) => false,
@@ -385,6 +396,9 @@ impl<T: EthSpec> std::fmt::Display for RPCResponse<T> {
RPCResponse::BlocksByRange(block) => {
write!(f, "BlocksByRange: Block slot: {}", block.slot())
}
RPCResponse::TxBlobsByRange(blob) => {
write!(f, "TxBlobsByRange: Block slot: {}", blob.beacon_block_slot)
}
RPCResponse::BlocksByRoot(block) => {
write!(f, "BlocksByRoot: Block slot: {}", block.slot())
}
@@ -436,6 +450,16 @@ impl std::fmt::Display for OldBlocksByRangeRequest {
}
}
impl std::fmt::Display for TxBlobsByRangeRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Execution block number: {}, Count: {}",
self.execution_block_number, self.count
)
}
}
impl slog::KV for StatusMessage {
fn serialize(
&self,

View File

@@ -125,6 +125,12 @@ impl<Id: ReqId, TSpec: EthSpec> RPC<Id, TSpec> {
methods::MAX_REQUEST_BLOCKS,
Duration::from_secs(10),
)
//FIXME(sean)
.n_every(
Protocol::TxBlobsByRange,
methods::MAX_REQUEST_BLOCKS,
Duration::from_secs(10),
)
.n_every(Protocol::BlocksByRoot, 128, Duration::from_secs(10))
.build()
.expect("Configuration parameters are valid");

View File

@@ -37,6 +37,7 @@ pub enum OutboundRequest<TSpec: EthSpec> {
Status(StatusMessage),
Goodbye(GoodbyeReason),
BlocksByRange(OldBlocksByRangeRequest),
TxBlobsByRange(TxBlobsByRangeRequest),
BlocksByRoot(BlocksByRootRequest),
Ping(Ping),
MetaData(PhantomData<TSpec>),
@@ -71,6 +72,10 @@ impl<TSpec: EthSpec> OutboundRequest<TSpec> {
ProtocolId::new(Protocol::BlocksByRange, Version::V2, Encoding::SSZSnappy),
ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy),
],
//FIXME(sean) what should the protocol version be?
OutboundRequest::TxBlobsByRange(_) => vec![
ProtocolId::new(Protocol::TxBlobsByRange, Version::V2, Encoding::SSZSnappy),
],
OutboundRequest::BlocksByRoot(_) => vec![
ProtocolId::new(Protocol::BlocksByRoot, Version::V2, Encoding::SSZSnappy),
ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy),
@@ -95,6 +100,7 @@ impl<TSpec: EthSpec> OutboundRequest<TSpec> {
OutboundRequest::Status(_) => 1,
OutboundRequest::Goodbye(_) => 0,
OutboundRequest::BlocksByRange(req) => req.count,
OutboundRequest::TxBlobsByRange(req) => req.count,
OutboundRequest::BlocksByRoot(req) => req.block_roots.len() as u64,
OutboundRequest::Ping(_) => 1,
OutboundRequest::MetaData(_) => 1,
@@ -107,6 +113,7 @@ impl<TSpec: EthSpec> OutboundRequest<TSpec> {
OutboundRequest::Status(_) => Protocol::Status,
OutboundRequest::Goodbye(_) => Protocol::Goodbye,
OutboundRequest::BlocksByRange(_) => Protocol::BlocksByRange,
OutboundRequest::TxBlobsByRange(_) => Protocol::TxBlobsByRange,
OutboundRequest::BlocksByRoot(_) => Protocol::BlocksByRoot,
OutboundRequest::Ping(_) => Protocol::Ping,
OutboundRequest::MetaData(_) => Protocol::MetaData,
@@ -120,6 +127,7 @@ impl<TSpec: EthSpec> OutboundRequest<TSpec> {
// this only gets called after `multiple_responses()` returns true. Therefore, only
// variants that have `multiple_responses()` can have values.
OutboundRequest::BlocksByRange(_) => ResponseTermination::BlocksByRange,
OutboundRequest::TxBlobsByRange(_) => ResponseTermination::TxBlobsByRange,
OutboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot,
OutboundRequest::Status(_) => unreachable!(),
OutboundRequest::Goodbye(_) => unreachable!(),
@@ -175,6 +183,7 @@ impl<TSpec: EthSpec> std::fmt::Display for OutboundRequest<TSpec> {
OutboundRequest::Status(status) => write!(f, "Status Message: {}", status),
OutboundRequest::Goodbye(reason) => write!(f, "Goodbye: {}", reason),
OutboundRequest::BlocksByRange(req) => write!(f, "Blocks by range: {}", req),
OutboundRequest::TxBlobsByRange(req) => write!(f, "Blobs by range: {}", req),
OutboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req),
OutboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data),
OutboundRequest::MetaData(_) => write!(f, "MetaData request"),

View File

@@ -20,10 +20,7 @@ use tokio_util::{
codec::Framed,
compat::{Compat, FuturesAsyncReadCompatExt},
};
use types::{
BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, EthSpec, ForkContext,
ForkName, Hash256, MainnetEthSpec, Signature, SignedBeaconBlock,
};
use types::{BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, BlobWrapper, EthSpec, ForkContext, ForkName, Hash256, MainnetEthSpec, Signature, SignedBeaconBlock};
lazy_static! {
// Note: Hardcoding the `EthSpec` type for `SignedBeaconBlock` as min/max values is
@@ -71,6 +68,12 @@ lazy_static! {
+ types::ExecutionPayload::<MainnetEthSpec>::max_execution_payload_size() // adding max size of execution payload (~16gb)
+ ssz::BYTES_PER_LENGTH_OFFSET; // Adding the additional ssz offset for the `ExecutionPayload` field
pub static ref BLOB_MIN: usize = BlobWrapper::<MainnetEthSpec>::empty()
.as_ssz_bytes()
.len();
pub static ref BLOB_MAX: usize = BlobWrapper::<MainnetEthSpec>::max_size();
pub static ref BLOCKS_BY_ROOT_REQUEST_MIN: usize =
VariableList::<Hash256, MaxRequestBlocks>::from(Vec::<Hash256>::new())
.as_ssz_bytes()
@@ -147,6 +150,7 @@ pub enum Protocol {
Goodbye,
/// The `BlocksByRange` protocol name.
BlocksByRange,
TxBlobsByRange,
/// The `BlocksByRoot` protocol name.
BlocksByRoot,
/// The `Ping` protocol name.
@@ -176,6 +180,8 @@ impl std::fmt::Display for Protocol {
Protocol::Status => "status",
Protocol::Goodbye => "goodbye",
Protocol::BlocksByRange => "beacon_blocks_by_range",
//FIXME(sean) verify
Protocol::TxBlobsByRange => "tx_blobs_by_range",
Protocol::BlocksByRoot => "beacon_blocks_by_root",
Protocol::Ping => "ping",
Protocol::MetaData => "metadata",
@@ -282,6 +288,12 @@ impl ProtocolId {
<OldBlocksByRangeRequest as Encode>::ssz_fixed_len(),
<OldBlocksByRangeRequest as Encode>::ssz_fixed_len(),
),
Protocol::TxBlobsByRange => {
RpcLimits::new(
<TxBlobsByRangeRequest as Encode>::ssz_fixed_len(),
<TxBlobsByRangeRequest as Encode>::ssz_fixed_len(),
)
}
Protocol::BlocksByRoot => {
RpcLimits::new(*BLOCKS_BY_ROOT_REQUEST_MIN, *BLOCKS_BY_ROOT_REQUEST_MAX)
}
@@ -451,6 +463,11 @@ impl<TSpec: EthSpec> InboundRequest<TSpec> {
ProtocolId::new(Protocol::BlocksByRange, Version::V2, Encoding::SSZSnappy),
ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy),
],
//FIXME(sean) do I need v1
InboundRequest::TxBlobsByRange(_) => vec![
// V2 has higher preference when negotiating a stream
ProtocolId::new(Protocol::TxBlobsByRange, Version::V2, Encoding::SSZSnappy),
],
InboundRequest::BlocksByRoot(_) => vec![
// V2 has higher preference when negotiating a stream
ProtocolId::new(Protocol::BlocksByRoot, Version::V2, Encoding::SSZSnappy),
@@ -476,6 +493,7 @@ impl<TSpec: EthSpec> InboundRequest<TSpec> {
InboundRequest::Status(_) => 1,
InboundRequest::Goodbye(_) => 0,
InboundRequest::BlocksByRange(req) => req.count,
InboundRequest::TxBlobsByRange(req) => req.count,
InboundRequest::BlocksByRoot(req) => req.block_roots.len() as u64,
InboundRequest::Ping(_) => 1,
InboundRequest::MetaData(_) => 1,
@@ -488,6 +506,7 @@ impl<TSpec: EthSpec> InboundRequest<TSpec> {
InboundRequest::Status(_) => Protocol::Status,
InboundRequest::Goodbye(_) => Protocol::Goodbye,
InboundRequest::BlocksByRange(_) => Protocol::BlocksByRange,
InboundRequest::TxBlobsByRange(_) => Protocol::TxBlobsByRange,
InboundRequest::BlocksByRoot(_) => Protocol::BlocksByRoot,
InboundRequest::Ping(_) => Protocol::Ping,
InboundRequest::MetaData(_) => Protocol::MetaData,
@@ -501,6 +520,7 @@ impl<TSpec: EthSpec> InboundRequest<TSpec> {
// this only gets called after `multiple_responses()` returns true. Therefore, only
// variants that have `multiple_responses()` can have values.
InboundRequest::BlocksByRange(_) => ResponseTermination::BlocksByRange,
InboundRequest::TxBlobsByRange(_) => ResponseTermination::TxBlobsByRange,
InboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot,
InboundRequest::Status(_) => unreachable!(),
InboundRequest::Goodbye(_) => unreachable!(),
@@ -606,6 +626,7 @@ impl<TSpec: EthSpec> std::fmt::Display for InboundRequest<TSpec> {
InboundRequest::Status(status) => write!(f, "Status Message: {}", status),
InboundRequest::Goodbye(reason) => write!(f, "Goodbye: {}", reason),
InboundRequest::BlocksByRange(req) => write!(f, "Blocks by range: {}", req),
InboundRequest::TxBlobsByRange(req) => write!(f, "Blobs by range: {}", req),
InboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req),
InboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data),
InboundRequest::MetaData(_) => write!(f, "MetaData request"),

View File

@@ -71,6 +71,7 @@ pub struct RPCRateLimiter {
status_rl: Limiter<PeerId>,
/// BlocksByRange rate limiter.
bbrange_rl: Limiter<PeerId>,
txbbrange_rl: Limiter<PeerId>,
/// BlocksByRoot rate limiter.
bbroots_rl: Limiter<PeerId>,
}
@@ -96,6 +97,7 @@ pub struct RPCRateLimiterBuilder {
status_quota: Option<Quota>,
/// Quota for the BlocksByRange protocol.
bbrange_quota: Option<Quota>,
txbbrange_quota: Option<Quota>,
/// Quota for the BlocksByRoot protocol.
bbroots_quota: Option<Quota>,
}
@@ -115,6 +117,7 @@ impl RPCRateLimiterBuilder {
Protocol::MetaData => self.metadata_quota = q,
Protocol::Goodbye => self.goodbye_quota = q,
Protocol::BlocksByRange => self.bbrange_quota = q,
Protocol::TxBlobsByRange => self.txbbrange_quota = q,
Protocol::BlocksByRoot => self.bbroots_quota = q,
}
self
@@ -155,6 +158,9 @@ impl RPCRateLimiterBuilder {
let bbrange_quota = self
.bbrange_quota
.ok_or("BlocksByRange quota not specified")?;
let txbbrange_quota = self
.txbbrange_quota
.ok_or("TxBlobsByRange quota not specified")?;
// create the rate limiters
let ping_rl = Limiter::from_quota(ping_quota)?;
@@ -163,6 +169,7 @@ impl RPCRateLimiterBuilder {
let goodbye_rl = Limiter::from_quota(goodbye_quota)?;
let bbroots_rl = Limiter::from_quota(bbroots_quota)?;
let bbrange_rl = Limiter::from_quota(bbrange_quota)?;
let txbbrange_rl = Limiter::from_quota(txbbrange_quota)?;
// check for peers to prune every 30 seconds, starting in 30 seconds
let prune_every = tokio::time::Duration::from_secs(30);
@@ -176,6 +183,7 @@ impl RPCRateLimiterBuilder {
goodbye_rl,
bbroots_rl,
bbrange_rl,
txbbrange_rl,
init_time: Instant::now(),
})
}
@@ -198,6 +206,7 @@ impl RPCRateLimiter {
Protocol::MetaData => &mut self.metadata_rl,
Protocol::Goodbye => &mut self.goodbye_rl,
Protocol::BlocksByRange => &mut self.bbrange_rl,
Protocol::TxBlobsByRange => &mut self.txbbrange_rl,
Protocol::BlocksByRoot => &mut self.bbroots_rl,
};
check(limiter)