Add MAX_BLOBS_PER_BLOCK_FULU config (#7161)

Add `MAX_BLOBS_PER_BLOCK_FULU` config.
This commit is contained in:
Jimmy Chen
2025-04-15 08:20:46 +08:00
committed by GitHub
parent 08882c64ca
commit 476f3a593c
7 changed files with 145 additions and 33 deletions

View File

@@ -9,11 +9,14 @@ use crate::{
sync::{manager::BlockProcessType, SyncMessage},
};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::kzg_utils::blobs_to_data_column_sidecars;
use beacon_chain::test_utils::{
test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType,
get_kzg, test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy,
EphemeralHarnessType,
};
use beacon_chain::{BeaconChain, WhenSlotSkipped};
use beacon_processor::{work_reprocessing_queue::*, *};
use itertools::Itertools;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3};
use lighthouse_network::rpc::InboundRequestId;
use lighthouse_network::{
@@ -29,9 +32,9 @@ use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, Epoch, Hash256, MainnetEthSpec,
ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, Slot,
SubnetId,
Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, DataColumnSidecarList,
DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedVoluntaryExit, Slot, SubnetId,
};
type E = MainnetEthSpec;
@@ -52,6 +55,7 @@ struct TestRig {
chain: Arc<BeaconChain<T>>,
next_block: Arc<SignedBeaconBlock<E>>,
next_blobs: Option<BlobSidecarList<E>>,
next_data_columns: Option<DataColumnSidecarList<E>>,
attestations: Vec<(Attestation<E>, SubnetId)>,
next_block_attestations: Vec<(Attestation<E>, SubnetId)>,
next_block_aggregate_attestations: Vec<SignedAggregateAndProof<E>>,
@@ -241,7 +245,7 @@ impl TestRig {
let network_beacon_processor = Arc::new(network_beacon_processor);
let beacon_processor = BeaconProcessor {
network_globals,
network_globals: network_globals.clone(),
executor,
current_workers: 0,
config: beacon_processor_config,
@@ -262,15 +266,36 @@ impl TestRig {
assert!(beacon_processor.is_ok());
let block = next_block_tuple.0;
let blob_sidecars = if let Some((kzg_proofs, blobs)) = next_block_tuple.1 {
Some(BlobSidecar::build_sidecars(blobs, &block, kzg_proofs, &chain.spec).unwrap())
let (blob_sidecars, data_columns) = if let Some((kzg_proofs, blobs)) = next_block_tuple.1 {
if chain.spec.is_peer_das_enabled_for_epoch(block.epoch()) {
let kzg = get_kzg(&chain.spec);
let custody_columns: DataColumnSidecarList<E> = blobs_to_data_column_sidecars(
&blobs.iter().collect_vec(),
kzg_proofs.clone().into_iter().collect_vec(),
&block,
&kzg,
&chain.spec,
)
.unwrap()
.into_iter()
.filter(|c| network_globals.sampling_columns.contains(&c.index))
.collect::<Vec<_>>();
(None, Some(custody_columns))
} else {
let blob_sidecars =
BlobSidecar::build_sidecars(blobs, &block, kzg_proofs, &chain.spec).unwrap();
(Some(blob_sidecars), None)
}
} else {
None
(None, None)
};
Self {
chain,
next_block: block,
next_blobs: blob_sidecars,
next_data_columns: data_columns,
attestations,
next_block_attestations,
next_block_aggregate_attestations,
@@ -323,6 +348,22 @@ impl TestRig {
}
}
pub fn enqueue_gossip_data_columns(&self, col_index: usize) {
if let Some(data_columns) = self.next_data_columns.as_ref() {
let data_column = data_columns.get(col_index).unwrap();
self.network_beacon_processor
.send_gossip_data_column_sidecar(
junk_message_id(),
junk_peer_id(),
Client::default(),
DataColumnSubnetId::from_column_index(data_column.index, &self.chain.spec),
data_column.clone(),
Duration::from_secs(0),
)
.unwrap();
}
}
pub fn custody_columns_count(&self) -> usize {
self.network_beacon_processor
.network_globals
@@ -375,6 +416,19 @@ impl TestRig {
}
}
pub fn enqueue_single_lookup_rpc_data_columns(&self) {
if let Some(data_columns) = self.next_data_columns.clone() {
self.network_beacon_processor
.send_rpc_custody_columns(
self.next_block.canonical_root(),
data_columns,
Duration::default(),
BlockProcessType::SingleCustodyColumn(1),
)
.unwrap();
}
}
pub fn enqueue_blobs_by_range_request(&self, count: u64) {
self.network_beacon_processor
.send_blobs_by_range_request(
@@ -632,6 +686,13 @@ async fn import_gossip_block_acceptably_early() {
.await;
}
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar])
.await;
}
// Note: this section of the code is a bit race-y. We're assuming that we can set the slot clock
// and check the head in the time between the block arrived early and when its due for
// processing.
@@ -708,19 +769,20 @@ async fn import_gossip_block_at_current_slot() {
rig.assert_event_journal_completes(&[WorkType::GossipBlock])
.await;
let num_blobs = rig
.next_blobs
.as_ref()
.map(|blobs| blobs.len())
.unwrap_or(0);
let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
for i in 0..num_blobs {
rig.enqueue_gossip_blob(i);
rig.assert_event_journal_completes(&[WorkType::GossipBlobSidecar])
.await;
}
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar])
.await;
}
assert_eq!(
rig.head_root(),
rig.next_block.canonical_root(),
@@ -773,11 +835,8 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod
);
// Send the block and ensure that the attestation is received back and imported.
let num_blobs = rig
.next_blobs
.as_ref()
.map(|blobs| blobs.len())
.unwrap_or(0);
let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
let mut events = vec![];
match import_method {
BlockImportMethod::Gossip => {
@@ -787,6 +846,10 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod
rig.enqueue_gossip_blob(i);
events.push(WorkType::GossipBlobSidecar);
}
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
events.push(WorkType::GossipDataColumnSidecar);
}
}
BlockImportMethod::Rpc => {
rig.enqueue_rpc_block();
@@ -795,6 +858,10 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod
rig.enqueue_single_lookup_rpc_blobs();
events.push(WorkType::RpcBlobs);
}
if num_data_columns > 0 {
rig.enqueue_single_lookup_rpc_data_columns();
events.push(WorkType::RpcCustodyColumn);
}
}
};
@@ -854,11 +921,8 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod
);
// Send the block and ensure that the attestation is received back and imported.
let num_blobs = rig
.next_blobs
.as_ref()
.map(|blobs| blobs.len())
.unwrap_or(0);
let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
let mut events = vec![];
match import_method {
BlockImportMethod::Gossip => {
@@ -868,6 +932,10 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod
rig.enqueue_gossip_blob(i);
events.push(WorkType::GossipBlobSidecar);
}
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
events.push(WorkType::GossipDataColumnSidecar)
}
}
BlockImportMethod::Rpc => {
rig.enqueue_rpc_block();
@@ -876,6 +944,10 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod
rig.enqueue_single_lookup_rpc_blobs();
events.push(WorkType::RpcBlobs);
}
if num_data_columns > 0 {
rig.enqueue_single_lookup_rpc_data_columns();
events.push(WorkType::RpcCustodyColumn);
}
}
};
@@ -1060,12 +1132,20 @@ async fn test_rpc_block_reprocessing() {
rig.assert_event_journal_completes(&[WorkType::RpcBlock])
.await;
rig.enqueue_single_lookup_rpc_blobs();
if rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0) > 0 {
let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
if num_blobs > 0 {
rig.enqueue_single_lookup_rpc_blobs();
rig.assert_event_journal_completes(&[WorkType::RpcBlobs])
.await;
}
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
if num_data_columns > 0 {
rig.enqueue_single_lookup_rpc_data_columns();
rig.assert_event_journal_completes(&[WorkType::RpcCustodyColumn])
.await;
}
// next_block shouldn't be processed since it couldn't get the
// duplicate cache handle
assert_ne!(next_block_root, rig.head_root());

View File

@@ -148,9 +148,10 @@ MAX_BLOBS_PER_BLOCK_ELECTRA: 2
# MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA
MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 256
# DAS
# Fulu
NUMBER_OF_COLUMNS: 128
NUMBER_OF_CUSTODY_GROUPS: 128
DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128
SAMPLES_PER_SLOT: 8
CUSTODY_REQUIREMENT: 4
MAX_BLOBS_PER_BLOCK_FULU: 12

View File

@@ -132,9 +132,10 @@ MAX_BLOBS_PER_BLOCK_ELECTRA: 2
# MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA
MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 256
# DAS
# Fulu
NUMBER_OF_COLUMNS: 128
NUMBER_OF_CUSTODY_GROUPS: 128
DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128
SAMPLES_PER_SLOT: 8
CUSTODY_REQUIREMENT: 4
MAX_BLOBS_PER_BLOCK_FULU: 12

View File

@@ -137,9 +137,10 @@ MAX_BLOBS_PER_BLOCK_ELECTRA: 9
# MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA
MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 1152
# DAS
# Fulu
NUMBER_OF_COLUMNS: 128
NUMBER_OF_CUSTODY_GROUPS: 128
DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128
SAMPLES_PER_SLOT: 8
CUSTODY_REQUIREMENT: 4
MAX_BLOBS_PER_BLOCK_FULU: 12

View File

@@ -152,9 +152,10 @@ MAX_BLOBS_PER_BLOCK_ELECTRA: 9
# MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA
MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 1152
# DAS
# Fulu
NUMBER_OF_COLUMNS: 128
NUMBER_OF_CUSTODY_GROUPS: 128
DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128
SAMPLES_PER_SLOT: 8
CUSTODY_REQUIREMENT: 4
MAX_BLOBS_PER_BLOCK_FULU: 12

View File

@@ -138,9 +138,10 @@ MAX_BLOBS_PER_BLOCK_ELECTRA: 9
# MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA
MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 1152
# DAS
# Fulu
NUMBER_OF_COLUMNS: 128
NUMBER_OF_CUSTODY_GROUPS: 128
DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128
SAMPLES_PER_SLOT: 8
CUSTODY_REQUIREMENT: 4
MAX_BLOBS_PER_BLOCK_FULU: 12

View File

@@ -240,6 +240,11 @@ pub struct ChainSpec {
blob_sidecar_subnet_count_electra: u64,
max_request_blob_sidecars_electra: u64,
/*
* Networking Fulu
*/
max_blobs_per_block_fulu: u64,
/*
* Networking Derived
*
@@ -655,7 +660,9 @@ impl ChainSpec {
/// Return the value of `MAX_BLOBS_PER_BLOCK` appropriate for `fork`.
pub fn max_blobs_per_block_by_fork(&self, fork_name: ForkName) -> u64 {
if fork_name.electra_enabled() {
if fork_name.fulu_enabled() {
self.max_blobs_per_block_fulu
} else if fork_name.electra_enabled() {
self.max_blobs_per_block_electra
} else {
self.max_blobs_per_block
@@ -992,6 +999,11 @@ impl ChainSpec {
blob_sidecar_subnet_count_electra: default_blob_sidecar_subnet_count_electra(),
max_request_blob_sidecars_electra: default_max_request_blob_sidecars_electra(),
/*
* Networking Fulu specific
*/
max_blobs_per_block_fulu: default_max_blobs_per_block_fulu(),
/*
* Application specific
*/
@@ -1321,6 +1333,11 @@ impl ChainSpec {
blob_sidecar_subnet_count_electra: 2,
max_request_blob_sidecars_electra: 256,
/*
* Networking Fulu specific
*/
max_blobs_per_block_fulu: default_max_blobs_per_block_fulu(),
/*
* Application specific
*/
@@ -1540,6 +1557,9 @@ pub struct Config {
#[serde(default = "default_custody_requirement")]
#[serde(with = "serde_utils::quoted_u64")]
custody_requirement: u64,
#[serde(default = "default_max_blobs_per_block_fulu")]
#[serde(with = "serde_utils::quoted_u64")]
max_blobs_per_block_fulu: u64,
}
fn default_bellatrix_fork_version() -> [u8; 4] {
@@ -1677,6 +1697,10 @@ const fn default_max_blobs_per_block_electra() -> u64 {
9
}
const fn default_max_blobs_per_block_fulu() -> u64 {
12
}
const fn default_attestation_propagation_slot_range() -> u64 {
32
}
@@ -1904,6 +1928,7 @@ impl Config {
data_column_sidecar_subnet_count: spec.data_column_sidecar_subnet_count,
samples_per_slot: spec.samples_per_slot,
custody_requirement: spec.custody_requirement,
max_blobs_per_block_fulu: spec.max_blobs_per_block_fulu,
}
}
@@ -1982,6 +2007,7 @@ impl Config {
data_column_sidecar_subnet_count,
samples_per_slot,
custody_requirement,
max_blobs_per_block_fulu,
} = self;
if preset_base != E::spec_name().to_string().as_str() {
@@ -2064,6 +2090,7 @@ impl Config {
data_column_sidecar_subnet_count,
samples_per_slot,
custody_requirement,
max_blobs_per_block_fulu,
..chain_spec.clone()
})