Runtime rpc request sizes (#4841)

* add runtime variable list type

* add configs to ChainSpec

* git rid of max request blocks type

* fix tests and lints

* remove todos

* git rid of old const usage

* fix decode impl

* add new config to `Config` api struct

* add docs fix compilt

* move methods for per-fork-spec to chainspec

* get values off chain spec

* fix compile

* remove min by root size

* add tests for runtime var list

---------

Co-authored-by: Jimmy Chen <jchen.tc@gmail.com>
This commit is contained in:
realbigsean
2024-01-08 18:23:47 -05:00
committed by GitHub
parent 5c8c8da8b1
commit b47e3f252e
25 changed files with 507 additions and 179 deletions

View File

@@ -5,9 +5,7 @@ use crate::sync::SyncMessage;
use beacon_chain::{BeaconChainError, BeaconChainTypes, HistoricalBlockError, WhenSlotSkipped};
use beacon_processor::SendOnDrop;
use itertools::process_results;
use lighthouse_network::rpc::methods::{
BlobsByRangeRequest, BlobsByRootRequest, MAX_REQUEST_BLOB_SIDECARS, MAX_REQUEST_BLOCKS_DENEB,
};
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest};
use lighthouse_network::rpc::StatusMessage;
use lighthouse_network::rpc::*;
use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo};
@@ -222,12 +220,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
request_id: PeerRequestId,
request: BlobsByRootRequest,
) {
let Some(requested_root) = request.blob_ids.first().map(|id| id.block_root) else {
let Some(requested_root) = request.blob_ids.as_slice().first().map(|id| id.block_root)
else {
// No blob ids requested.
return;
};
let requested_indices = request
.blob_ids
.as_slice()
.iter()
.map(|id| id.index)
.collect::<Vec<_>>();
@@ -235,9 +235,9 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let send_response = true;
let mut blob_list_results = HashMap::new();
for id in request.blob_ids.into_iter() {
for id in request.blob_ids.as_slice() {
// First attempt to get the blobs from the RPC cache.
if let Ok(Some(blob)) = self.chain.data_availability_checker.get_blob(&id) {
if let Ok(Some(blob)) = self.chain.data_availability_checker.get_blob(id) {
self.send_response(peer_id, Response::BlobsByRoot(Some(blob)), request_id);
send_blob_count += 1;
} else {
@@ -248,7 +248,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let blob_list_result = match blob_list_results.entry(root) {
Entry::Vacant(entry) => {
entry.insert(self.chain.get_blobs_checking_early_attester_cache(&root))
entry.insert(self.chain.get_blobs_checking_early_attester_cache(root))
}
Entry::Occupied(entry) => entry.into_mut(),
};
@@ -256,7 +256,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
match blob_list_result.as_ref() {
Ok(blobs_sidecar_list) => {
'inner: for blob_sidecar in blobs_sidecar_list.iter() {
if blob_sidecar.index == index {
if blob_sidecar.index == *index {
self.send_response(
peer_id,
Response::BlobsByRoot(Some(blob_sidecar.clone())),
@@ -346,14 +346,17 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
// Should not send more than max request blocks
let max_request_size = self.chain.epoch().map_or(MAX_REQUEST_BLOCKS, |epoch| {
match self.chain.spec.fork_name_at_epoch(epoch) {
ForkName::Deneb => MAX_REQUEST_BLOCKS_DENEB,
ForkName::Base | ForkName::Altair | ForkName::Merge | ForkName::Capella => {
MAX_REQUEST_BLOCKS
}
}
});
let max_request_size =
self.chain
.epoch()
.map_or(self.chain.spec.max_request_blocks, |epoch| {
match self.chain.spec.fork_name_at_epoch(epoch) {
ForkName::Deneb => self.chain.spec.max_request_blocks_deneb,
ForkName::Base | ForkName::Altair | ForkName::Merge | ForkName::Capella => {
self.chain.spec.max_request_blocks
}
}
});
if *req.count() > max_request_size {
return self.send_error_response(
peer_id,
@@ -586,7 +589,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
// Should not send more than max request blocks
if req.max_blobs_requested::<T::EthSpec>() > MAX_REQUEST_BLOB_SIDECARS {
if req.max_blobs_requested::<T::EthSpec>() > self.chain.spec.max_request_blob_sidecars {
return self.send_error_response(
peer_id,
RPCResponseErrorCode::InvalidRequest,

View File

@@ -716,9 +716,10 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
let mut subscribed_topics: Vec<GossipTopic> = vec![];
for topic_kind in
core_topics_to_subscribe::<T::EthSpec>(self.fork_context.current_fork())
{
for topic_kind in core_topics_to_subscribe::<T::EthSpec>(
self.fork_context.current_fork(),
&self.fork_context.spec,
) {
for fork_digest in self.required_gossip_fork_digests() {
let topic = GossipTopic::new(
topic_kind.clone(),
@@ -945,7 +946,10 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
fn subscribed_core_topics(&self) -> bool {
let core_topics = core_topics_to_subscribe::<T::EthSpec>(self.fork_context.current_fork());
let core_topics = core_topics_to_subscribe::<T::EthSpec>(
self.fork_context.current_fork(),
&self.fork_context.spec,
);
let core_topics: HashSet<&GossipKind> = HashSet::from_iter(&core_topics);
let subscriptions = self.network_globals.gossipsub_subscriptions.read();
let subscribed_topics: HashSet<&GossipKind> =

View File

@@ -13,12 +13,11 @@ use beacon_chain::{get_block_root, BeaconChainTypes};
use lighthouse_network::rpc::methods::BlobsByRootRequest;
use lighthouse_network::rpc::BlocksByRootRequest;
use rand::prelude::IteratorRandom;
use ssz_types::VariableList;
use std::ops::IndexMut;
use std::sync::Arc;
use std::time::Duration;
use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList};
use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock};
use types::{BlobSidecar, ChainSpec, EthSpec, Hash256, SignedBeaconBlock};
#[derive(Debug, Copy, Clone)]
pub enum ResponseType {
@@ -87,11 +86,14 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
/* Request building methods */
/// Construct a new request.
fn build_request(&mut self) -> Result<(PeerId, Self::RequestType), LookupRequestError> {
fn build_request(
&mut self,
spec: &ChainSpec,
) -> Result<(PeerId, Self::RequestType), LookupRequestError> {
// Verify and construct request.
self.too_many_attempts()?;
let peer = self.get_peer()?;
let request = self.new_request();
let request = self.new_request(spec);
Ok((peer, request))
}
@@ -108,7 +110,7 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
}
// Construct request.
let (peer_id, request) = self.build_request()?;
let (peer_id, request) = self.build_request(&cx.chain.spec)?;
// Update request state.
self.get_state_mut().state = State::Downloading { peer_id };
@@ -151,7 +153,7 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
}
/// Initialize `Self::RequestType`.
fn new_request(&self) -> Self::RequestType;
fn new_request(&self, spec: &ChainSpec) -> Self::RequestType;
/// Send the request to the network service.
fn make_request(
@@ -254,8 +256,8 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlockRequestState<L>
type VerifiedResponseType = Arc<SignedBeaconBlock<T::EthSpec>>;
type ReconstructedResponseType = RpcBlock<T::EthSpec>;
fn new_request(&self) -> BlocksByRootRequest {
BlocksByRootRequest::new(VariableList::from(vec![self.requested_block_root]))
fn new_request(&self, spec: &ChainSpec) -> BlocksByRootRequest {
BlocksByRootRequest::new(vec![self.requested_block_root], spec)
}
fn make_request(
@@ -353,10 +355,9 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlobRequestState<L,
type VerifiedResponseType = FixedBlobSidecarList<T::EthSpec>;
type ReconstructedResponseType = FixedBlobSidecarList<T::EthSpec>;
fn new_request(&self) -> BlobsByRootRequest {
fn new_request(&self, spec: &ChainSpec) -> BlobsByRootRequest {
let blob_id_vec: Vec<BlobIdentifier> = self.requested_ids.clone().into();
let blob_ids = VariableList::from(blob_id_vec);
BlobsByRootRequest { blob_ids }
BlobsByRootRequest::new(blob_id_vec, spec)
}
fn make_request(

View File

@@ -575,7 +575,7 @@ mod tests {
HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log.clone())
.expect("store");
let da_checker = Arc::new(
DataAvailabilityChecker::new(slot_clock, None, store.into(), &log, spec)
DataAvailabilityChecker::new(slot_clock, None, store.into(), &log, spec.clone())
.expect("data availability checker"),
);
let mut sl = SingleBlockLookup::<TestLookup1, T>::new(
@@ -587,6 +587,7 @@ mod tests {
);
<BlockRequestState<TestLookup1> as RequestState<TestLookup1, T>>::build_request(
&mut sl.block_request_state,
&spec,
)
.unwrap();
sl.block_request_state.state.state = State::Downloading { peer_id };
@@ -616,7 +617,7 @@ mod tests {
.expect("store");
let da_checker = Arc::new(
DataAvailabilityChecker::new(slot_clock, None, store.into(), &log, spec)
DataAvailabilityChecker::new(slot_clock, None, store.into(), &log, spec.clone())
.expect("data availability checker"),
);
@@ -630,6 +631,7 @@ mod tests {
for _ in 1..TestLookup2::MAX_ATTEMPTS {
<BlockRequestState<TestLookup2> as RequestState<TestLookup2, T>>::build_request(
&mut sl.block_request_state,
&spec,
)
.unwrap();
sl.block_request_state.state.register_failure_downloading();
@@ -638,6 +640,7 @@ mod tests {
// Now we receive the block and send it for processing
<BlockRequestState<TestLookup2> as RequestState<TestLookup2, T>>::build_request(
&mut sl.block_request_state,
&spec,
)
.unwrap();
sl.block_request_state.state.state = State::Downloading { peer_id };
@@ -654,7 +657,8 @@ mod tests {
sl.block_request_state.state.register_failure_processing();
assert_eq!(
<BlockRequestState<TestLookup2> as RequestState<TestLookup2, T>>::build_request(
&mut sl.block_request_state
&mut sl.block_request_state,
&spec
),
Err(LookupRequestError::TooManyAttempts {
cannot_process: false

View File

@@ -52,7 +52,6 @@ use beacon_chain::{
AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, EngineState,
};
use futures::StreamExt;
use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS;
use lighthouse_network::rpc::RPCError;
use lighthouse_network::types::{NetworkGlobals, SyncState};
use lighthouse_network::SyncInfo;
@@ -230,7 +229,7 @@ pub fn spawn<T: BeaconChainTypes>(
log: slog::Logger,
) {
assert!(
MAX_REQUEST_BLOCKS >= T::EthSpec::slots_per_epoch() * EPOCHS_PER_BATCH,
beacon_chain.spec.max_request_blocks >= T::EthSpec::slots_per_epoch() * EPOCHS_PER_BATCH,
"Max blocks that can be requested in a single batch greater than max allowed blocks in a single request"
);

View File

@@ -475,9 +475,15 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
};
let request_id = RequestId::Sync(sync_id);
if let Some(block_root) = blob_request.blob_ids.first().map(|id| id.block_root) {
if let Some(block_root) = blob_request
.blob_ids
.as_slice()
.first()
.map(|id| id.block_root)
{
let indices = blob_request
.blob_ids
.as_slice()
.iter()
.map(|id| id.index)
.collect::<Vec<_>>();