Make max_blobs_per_block a config parameter (#6329)

* First pass

* Add restrictions to RuntimeVariableList api

* Use empty_uninitialized and fix warnings

* Fix some todos

* Merge branch 'unstable' into max-blobs-preset

* Fix take impl on RuntimeFixedList

* cleanup

* Fix test compilations

* Fix some more tests

* Fix test from unstable

* Merge branch 'unstable' into max-blobs-preset

* Merge remote-tracking branch 'origin/unstable' into max-blobs-preset

* Remove footgun function

* Minor simplifications

* Move from preset to config

* Fix typo

* Revert "Remove footgun function"

This reverts commit de01f923c7.

* Try fixing tests

* Thread through ChainSpec

* Fix release tests

* Move RuntimeFixedVector into module and rename

* Add test

* Remove empty RuntimeVarList awefullness

* Fix tests

* Simplify BlobSidecarListFromRoot

* Merge remote-tracking branch 'origin/unstable' into max-blobs-preset

* Bump quota to account for new target (6)

* Remove clone

* Fix issue from review

* Try to remove ugliness

* Merge branch 'unstable' into max-blobs-preset

* Fix max value

* Fix doctest

* Fix formatting

* Fix max check

* Delete hardcoded max_blobs_per_block in RPC limits

* Merge remote-tracking branch 'origin/unstable' into max-blobs-preset
This commit is contained in:
Pawan Dhananjay
2025-01-10 12:04:58 +05:30
committed by GitHub
parent ecdf2d891f
commit 05727290fb
61 changed files with 655 additions and 335 deletions

View File

@@ -890,14 +890,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"start_slot" => req.start_slot,
);
// Should not send more than max request blocks
if req.max_blobs_requested::<T::EthSpec>() > self.chain.spec.max_request_blob_sidecars {
return Err((
RpcErrorResponse::InvalidRequest,
"Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`",
));
}
let request_start_slot = Slot::from(req.start_slot);
let data_availability_boundary_slot = match self.chain.data_availability_boundary() {

View File

@@ -259,7 +259,7 @@ impl TestRig {
assert!(beacon_processor.is_ok());
let block = next_block_tuple.0;
let blob_sidecars = if let Some((kzg_proofs, blobs)) = next_block_tuple.1 {
Some(BlobSidecar::build_sidecars(blobs, &block, kzg_proofs).unwrap())
Some(BlobSidecar::build_sidecars(blobs, &block, kzg_proofs, &chain.spec).unwrap())
} else {
None
};
@@ -344,7 +344,7 @@ impl TestRig {
}
pub fn enqueue_single_lookup_rpc_blobs(&self) {
if let Some(blobs) = self.next_blobs.clone() {
let blobs = FixedBlobSidecarList::from(blobs.into_iter().map(Some).collect::<Vec<_>>());
let blobs = FixedBlobSidecarList::new(blobs.into_iter().map(Some).collect::<Vec<_>>());
self.network_beacon_processor
.send_rpc_blobs(
self.next_block.canonical_root(),
@@ -1130,7 +1130,12 @@ async fn test_blobs_by_range() {
.block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None)
.unwrap();
blob_count += root
.map(|root| rig.chain.get_blobs(&root).unwrap_or_default().len())
.map(|root| {
rig.chain
.get_blobs(&root)
.map(|list| list.len())
.unwrap_or(0)
})
.unwrap_or(0);
}
let mut actual_count = 0;

View File

@@ -2,13 +2,13 @@ use beacon_chain::{
block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, get_block_root,
};
use lighthouse_network::PeerId;
use ssz_types::VariableList;
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
};
use types::{
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock,
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, EthSpec, Hash256, RuntimeVariableList,
SignedBeaconBlock,
};
#[derive(Debug)]
@@ -31,6 +31,7 @@ pub struct RangeBlockComponentsRequest<E: EthSpec> {
num_custody_column_requests: Option<usize>,
/// The peers the request was made to.
pub(crate) peer_ids: Vec<PeerId>,
max_blobs_per_block: usize,
}
impl<E: EthSpec> RangeBlockComponentsRequest<E> {
@@ -39,6 +40,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
expects_custody_columns: Option<Vec<ColumnIndex>>,
num_custody_column_requests: Option<usize>,
peer_ids: Vec<PeerId>,
max_blobs_per_block: usize,
) -> Self {
Self {
blocks: <_>::default(),
@@ -51,6 +53,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
expects_custody_columns,
num_custody_column_requests,
peer_ids,
max_blobs_per_block,
}
}
@@ -100,7 +103,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
let mut responses = Vec::with_capacity(blocks.len());
let mut blob_iter = blobs.into_iter().peekable();
for block in blocks.into_iter() {
let mut blob_list = Vec::with_capacity(E::max_blobs_per_block());
let mut blob_list = Vec::with_capacity(self.max_blobs_per_block);
while {
let pair_next_blob = blob_iter
.peek()
@@ -111,7 +114,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
blob_list.push(blob_iter.next().ok_or("Missing next blob".to_string())?);
}
let mut blobs_buffer = vec![None; E::max_blobs_per_block()];
let mut blobs_buffer = vec![None; self.max_blobs_per_block];
for blob in blob_list {
let blob_index = blob.index as usize;
let Some(blob_opt) = blobs_buffer.get_mut(blob_index) else {
@@ -123,7 +126,11 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
*blob_opt = Some(blob);
}
}
let blobs = VariableList::from(blobs_buffer.into_iter().flatten().collect::<Vec<_>>());
let blobs = RuntimeVariableList::new(
blobs_buffer.into_iter().flatten().collect::<Vec<_>>(),
self.max_blobs_per_block,
)
.map_err(|_| "Blobs returned exceeds max length".to_string())?;
responses.push(RpcBlock::new(None, block, Some(blobs)).map_err(|e| format!("{e:?}"))?)
}
@@ -245,12 +252,18 @@ mod tests {
#[test]
fn no_blobs_into_responses() {
let spec = test_spec::<E>();
let peer_id = PeerId::random();
let mut info = RangeBlockComponentsRequest::<E>::new(false, None, None, vec![peer_id]);
let mut rng = XorShiftRng::from_seed([42; 16]);
let blocks = (0..4)
.map(|_| generate_rand_block_and_blobs::<E>(ForkName::Base, NumBlobs::None, &mut rng).0)
.map(|_| {
generate_rand_block_and_blobs::<E>(ForkName::Base, NumBlobs::None, &mut rng, &spec)
.0
})
.collect::<Vec<_>>();
let max_len = spec.max_blobs_per_block(blocks.first().unwrap().epoch()) as usize;
let mut info =
RangeBlockComponentsRequest::<E>::new(false, None, None, vec![peer_id], max_len);
// Send blocks and complete terminate response
for block in blocks {
@@ -265,15 +278,24 @@ mod tests {
#[test]
fn empty_blobs_into_responses() {
let spec = test_spec::<E>();
let peer_id = PeerId::random();
let mut info = RangeBlockComponentsRequest::<E>::new(true, None, None, vec![peer_id]);
let mut rng = XorShiftRng::from_seed([42; 16]);
let blocks = (0..4)
.map(|_| {
// Always generate some blobs.
generate_rand_block_and_blobs::<E>(ForkName::Deneb, NumBlobs::Number(3), &mut rng).0
generate_rand_block_and_blobs::<E>(
ForkName::Deneb,
NumBlobs::Number(3),
&mut rng,
&spec,
)
.0
})
.collect::<Vec<_>>();
let max_len = spec.max_blobs_per_block(blocks.first().unwrap().epoch()) as usize;
let mut info =
RangeBlockComponentsRequest::<E>::new(true, None, None, vec![peer_id], max_len);
// Send blocks and complete terminate response
for block in blocks {
@@ -294,12 +316,7 @@ mod tests {
fn rpc_block_with_custody_columns() {
let spec = test_spec::<E>();
let expects_custody_columns = vec![1, 2, 3, 4];
let mut info = RangeBlockComponentsRequest::<E>::new(
false,
Some(expects_custody_columns.clone()),
Some(expects_custody_columns.len()),
vec![PeerId::random()],
);
let mut rng = XorShiftRng::from_seed([42; 16]);
let blocks = (0..4)
.map(|_| {
@@ -311,7 +328,14 @@ mod tests {
)
})
.collect::<Vec<_>>();
let max_len = spec.max_blobs_per_block(blocks.first().unwrap().0.epoch()) as usize;
let mut info = RangeBlockComponentsRequest::<E>::new(
false,
Some(expects_custody_columns.clone()),
Some(expects_custody_columns.len()),
vec![PeerId::random()],
max_len,
);
// Send blocks and complete terminate response
for block in &blocks {
info.add_block_response(Some(block.0.clone().into()));
@@ -355,12 +379,7 @@ mod tests {
let spec = test_spec::<E>();
let expects_custody_columns = vec![1, 2, 3, 4];
let num_of_data_column_requests = 2;
let mut info = RangeBlockComponentsRequest::<E>::new(
false,
Some(expects_custody_columns.clone()),
Some(num_of_data_column_requests),
vec![PeerId::random()],
);
let mut rng = XorShiftRng::from_seed([42; 16]);
let blocks = (0..4)
.map(|_| {
@@ -372,7 +391,14 @@ mod tests {
)
})
.collect::<Vec<_>>();
let max_len = spec.max_blobs_per_block(blocks.first().unwrap().0.epoch()) as usize;
let mut info = RangeBlockComponentsRequest::<E>::new(
false,
Some(expects_custody_columns.clone()),
Some(num_of_data_column_requests),
vec![PeerId::random()],
max_len,
);
// Send blocks and complete terminate response
for block in &blocks {
info.add_block_response(Some(block.0.clone().into()));

View File

@@ -1234,6 +1234,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
.network
.range_block_and_blob_response(id, block_or_blob)
{
let epoch = resp.sender_id.batch_id();
match resp.responses {
Ok(blocks) => {
match resp.sender_id {
@@ -1277,6 +1278,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
resp.expects_custody_columns,
None,
vec![],
self.chain.spec.max_blobs_per_block(epoch) as usize,
),
);
// inform range that the request needs to be treated as failed

View File

@@ -67,6 +67,15 @@ pub enum RangeRequestId {
},
}
impl RangeRequestId {
pub fn batch_id(&self) -> BatchId {
match self {
RangeRequestId::RangeSync { batch_id, .. } => *batch_id,
RangeRequestId::BackfillSync { batch_id, .. } => *batch_id,
}
}
}
#[derive(Debug)]
pub enum RpcEvent<T> {
StreamTermination,
@@ -445,11 +454,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
(None, None)
};
// TODO(pawan): this would break if a batch contains multiple epochs
let max_blobs_len = self.chain.spec.max_blobs_per_block(epoch);
let info = RangeBlockComponentsRequest::new(
expected_blobs,
expects_columns,
num_of_column_req,
requested_peers,
max_blobs_len as usize,
);
self.range_block_components_requests
.insert(id, (sender_id, info));
@@ -950,12 +962,23 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
) -> Option<RpcResponseResult<FixedBlobSidecarList<T::EthSpec>>> {
let response = self.blobs_by_root_requests.on_response(id, rpc_event);
let response = response.map(|res| {
res.and_then(
|(blobs, seen_timestamp)| match to_fixed_blob_sidecar_list(blobs) {
Ok(blobs) => Ok((blobs, seen_timestamp)),
Err(e) => Err(e.into()),
},
)
res.and_then(|(blobs, seen_timestamp)| {
if let Some(max_len) = blobs
.first()
.map(|blob| self.chain.spec.max_blobs_per_block(blob.epoch()) as usize)
{
match to_fixed_blob_sidecar_list(blobs, max_len) {
Ok(blobs) => Ok((blobs, seen_timestamp)),
Err(e) => Err(e.into()),
}
} else {
Err(RpcResponseError::VerifyError(
LookupVerifyError::InternalError(
"Requested blobs for a block that has no blobs".to_string(),
),
))
}
})
});
if let Some(Err(RpcResponseError::VerifyError(e))) = &response {
self.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
@@ -1150,8 +1173,9 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
fn to_fixed_blob_sidecar_list<E: EthSpec>(
blobs: Vec<Arc<BlobSidecar<E>>>,
max_len: usize,
) -> Result<FixedBlobSidecarList<E>, LookupVerifyError> {
let mut fixed_list = FixedBlobSidecarList::default();
let mut fixed_list = FixedBlobSidecarList::new(vec![None; max_len]);
for blob in blobs.into_iter() {
let index = blob.index as usize;
*fixed_list

View File

@@ -28,6 +28,7 @@ pub enum LookupVerifyError {
UnrequestedIndex(u64),
InvalidInclusionProof,
DuplicateData,
InternalError(String),
}
/// Collection of active requests of a single ReqResp method, i.e. `blocks_by_root`

View File

@@ -119,6 +119,8 @@ impl TestRig {
.network_globals
.set_sync_state(SyncState::Synced);
let spec = chain.spec.clone();
let rng = XorShiftRng::from_seed([42; 16]);
TestRig {
beacon_processor_rx,
@@ -142,6 +144,7 @@ impl TestRig {
harness,
fork_name,
log,
spec,
}
}
@@ -213,7 +216,7 @@ impl TestRig {
) -> (SignedBeaconBlock<E>, Vec<BlobSidecar<E>>) {
let fork_name = self.fork_name;
let rng = &mut self.rng;
generate_rand_block_and_blobs::<E>(fork_name, num_blobs, rng)
generate_rand_block_and_blobs::<E>(fork_name, num_blobs, rng, &self.spec)
}
fn rand_block_and_data_columns(
@@ -1328,8 +1331,10 @@ impl TestRig {
#[test]
fn stable_rng() {
let spec = types::MainnetEthSpec::default_spec();
let mut rng = XorShiftRng::from_seed([42; 16]);
let (block, _) = generate_rand_block_and_blobs::<E>(ForkName::Base, NumBlobs::None, &mut rng);
let (block, _) =
generate_rand_block_and_blobs::<E>(ForkName::Base, NumBlobs::None, &mut rng, &spec);
assert_eq!(
block.canonical_root(),
Hash256::from_slice(
@@ -2187,8 +2192,8 @@ mod deneb_only {
block_verification_types::{AsBlock, RpcBlock},
data_availability_checker::AvailabilityCheckError,
};
use ssz_types::VariableList;
use std::collections::VecDeque;
use types::RuntimeVariableList;
struct DenebTester {
rig: TestRig,
@@ -2546,12 +2551,15 @@ mod deneb_only {
fn parent_block_unknown_parent(mut self) -> Self {
self.rig.log("parent_block_unknown_parent");
let block = self.unknown_parent_block.take().unwrap();
let max_len = self.rig.spec.max_blobs_per_block(block.epoch()) as usize;
// Now this block is the one we expect requests from
self.block = block.clone();
let block = RpcBlock::new(
Some(block.canonical_root()),
block,
self.unknown_parent_blobs.take().map(VariableList::from),
self.unknown_parent_blobs
.take()
.map(|vec| RuntimeVariableList::from_vec(vec, max_len)),
)
.unwrap();
self.rig.parent_block_processed(

View File

@@ -12,7 +12,7 @@ use slot_clock::ManualSlotClock;
use std::sync::Arc;
use store::MemoryStore;
use tokio::sync::mpsc;
use types::{test_utils::XorShiftRng, ForkName, MinimalEthSpec as E};
use types::{test_utils::XorShiftRng, ChainSpec, ForkName, MinimalEthSpec as E};
mod lookups;
mod range;
@@ -64,4 +64,5 @@ struct TestRig {
rng: XorShiftRng,
fork_name: ForkName,
log: Logger,
spec: Arc<ChainSpec>,
}