Add MAX_BLOBS_PER_BLOCK_FULU config (#7161)

Add `MAX_BLOBS_PER_BLOCK_FULU` config.
This commit is contained in:
Jimmy Chen
2025-04-15 08:20:46 +08:00
committed by GitHub
parent 08882c64ca
commit 476f3a593c
7 changed files with 145 additions and 33 deletions

View File

@@ -9,11 +9,14 @@ use crate::{
sync::{manager::BlockProcessType, SyncMessage}, sync::{manager::BlockProcessType, SyncMessage},
}; };
use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::kzg_utils::blobs_to_data_column_sidecars;
use beacon_chain::test_utils::{ use beacon_chain::test_utils::{
test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, get_kzg, test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy,
EphemeralHarnessType,
}; };
use beacon_chain::{BeaconChain, WhenSlotSkipped}; use beacon_chain::{BeaconChain, WhenSlotSkipped};
use beacon_processor::{work_reprocessing_queue::*, *}; use beacon_processor::{work_reprocessing_queue::*, *};
use itertools::Itertools;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3}; use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3};
use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::rpc::InboundRequestId;
use lighthouse_network::{ use lighthouse_network::{
@@ -29,9 +32,9 @@ use std::time::Duration;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList; use types::blob_sidecar::FixedBlobSidecarList;
use types::{ use types::{
Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, Epoch, Hash256, MainnetEthSpec, Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, DataColumnSidecarList,
ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, Slot, DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof,
SubnetId, SignedBeaconBlock, SignedVoluntaryExit, Slot, SubnetId,
}; };
type E = MainnetEthSpec; type E = MainnetEthSpec;
@@ -52,6 +55,7 @@ struct TestRig {
chain: Arc<BeaconChain<T>>, chain: Arc<BeaconChain<T>>,
next_block: Arc<SignedBeaconBlock<E>>, next_block: Arc<SignedBeaconBlock<E>>,
next_blobs: Option<BlobSidecarList<E>>, next_blobs: Option<BlobSidecarList<E>>,
next_data_columns: Option<DataColumnSidecarList<E>>,
attestations: Vec<(Attestation<E>, SubnetId)>, attestations: Vec<(Attestation<E>, SubnetId)>,
next_block_attestations: Vec<(Attestation<E>, SubnetId)>, next_block_attestations: Vec<(Attestation<E>, SubnetId)>,
next_block_aggregate_attestations: Vec<SignedAggregateAndProof<E>>, next_block_aggregate_attestations: Vec<SignedAggregateAndProof<E>>,
@@ -241,7 +245,7 @@ impl TestRig {
let network_beacon_processor = Arc::new(network_beacon_processor); let network_beacon_processor = Arc::new(network_beacon_processor);
let beacon_processor = BeaconProcessor { let beacon_processor = BeaconProcessor {
network_globals, network_globals: network_globals.clone(),
executor, executor,
current_workers: 0, current_workers: 0,
config: beacon_processor_config, config: beacon_processor_config,
@@ -262,15 +266,36 @@ impl TestRig {
assert!(beacon_processor.is_ok()); assert!(beacon_processor.is_ok());
let block = next_block_tuple.0; let block = next_block_tuple.0;
let blob_sidecars = if let Some((kzg_proofs, blobs)) = next_block_tuple.1 { let (blob_sidecars, data_columns) = if let Some((kzg_proofs, blobs)) = next_block_tuple.1 {
Some(BlobSidecar::build_sidecars(blobs, &block, kzg_proofs, &chain.spec).unwrap()) if chain.spec.is_peer_das_enabled_for_epoch(block.epoch()) {
let kzg = get_kzg(&chain.spec);
let custody_columns: DataColumnSidecarList<E> = blobs_to_data_column_sidecars(
&blobs.iter().collect_vec(),
kzg_proofs.clone().into_iter().collect_vec(),
&block,
&kzg,
&chain.spec,
)
.unwrap()
.into_iter()
.filter(|c| network_globals.sampling_columns.contains(&c.index))
.collect::<Vec<_>>();
(None, Some(custody_columns))
} else {
let blob_sidecars =
BlobSidecar::build_sidecars(blobs, &block, kzg_proofs, &chain.spec).unwrap();
(Some(blob_sidecars), None)
}
} else { } else {
None (None, None)
}; };
Self { Self {
chain, chain,
next_block: block, next_block: block,
next_blobs: blob_sidecars, next_blobs: blob_sidecars,
next_data_columns: data_columns,
attestations, attestations,
next_block_attestations, next_block_attestations,
next_block_aggregate_attestations, next_block_aggregate_attestations,
@@ -323,6 +348,22 @@ impl TestRig {
} }
} }
pub fn enqueue_gossip_data_columns(&self, col_index: usize) {
if let Some(data_columns) = self.next_data_columns.as_ref() {
let data_column = data_columns.get(col_index).unwrap();
self.network_beacon_processor
.send_gossip_data_column_sidecar(
junk_message_id(),
junk_peer_id(),
Client::default(),
DataColumnSubnetId::from_column_index(data_column.index, &self.chain.spec),
data_column.clone(),
Duration::from_secs(0),
)
.unwrap();
}
}
pub fn custody_columns_count(&self) -> usize { pub fn custody_columns_count(&self) -> usize {
self.network_beacon_processor self.network_beacon_processor
.network_globals .network_globals
@@ -375,6 +416,19 @@ impl TestRig {
} }
} }
pub fn enqueue_single_lookup_rpc_data_columns(&self) {
if let Some(data_columns) = self.next_data_columns.clone() {
self.network_beacon_processor
.send_rpc_custody_columns(
self.next_block.canonical_root(),
data_columns,
Duration::default(),
BlockProcessType::SingleCustodyColumn(1),
)
.unwrap();
}
}
pub fn enqueue_blobs_by_range_request(&self, count: u64) { pub fn enqueue_blobs_by_range_request(&self, count: u64) {
self.network_beacon_processor self.network_beacon_processor
.send_blobs_by_range_request( .send_blobs_by_range_request(
@@ -632,6 +686,13 @@ async fn import_gossip_block_acceptably_early() {
.await; .await;
} }
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar])
.await;
}
// Note: this section of the code is a bit race-y. We're assuming that we can set the slot clock // Note: this section of the code is a bit race-y. We're assuming that we can set the slot clock
// and check the head in the time between the block arrived early and when its due for // and check the head in the time between the block arrived early and when its due for
// processing. // processing.
@@ -708,19 +769,20 @@ async fn import_gossip_block_at_current_slot() {
rig.assert_event_journal_completes(&[WorkType::GossipBlock]) rig.assert_event_journal_completes(&[WorkType::GossipBlock])
.await; .await;
let num_blobs = rig let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
.next_blobs
.as_ref()
.map(|blobs| blobs.len())
.unwrap_or(0);
for i in 0..num_blobs { for i in 0..num_blobs {
rig.enqueue_gossip_blob(i); rig.enqueue_gossip_blob(i);
rig.assert_event_journal_completes(&[WorkType::GossipBlobSidecar]) rig.assert_event_journal_completes(&[WorkType::GossipBlobSidecar])
.await; .await;
} }
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar])
.await;
}
assert_eq!( assert_eq!(
rig.head_root(), rig.head_root(),
rig.next_block.canonical_root(), rig.next_block.canonical_root(),
@@ -773,11 +835,8 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod
); );
// Send the block and ensure that the attestation is received back and imported. // Send the block and ensure that the attestation is received back and imported.
let num_blobs = rig let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
.next_blobs let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
.as_ref()
.map(|blobs| blobs.len())
.unwrap_or(0);
let mut events = vec![]; let mut events = vec![];
match import_method { match import_method {
BlockImportMethod::Gossip => { BlockImportMethod::Gossip => {
@@ -787,6 +846,10 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod
rig.enqueue_gossip_blob(i); rig.enqueue_gossip_blob(i);
events.push(WorkType::GossipBlobSidecar); events.push(WorkType::GossipBlobSidecar);
} }
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
events.push(WorkType::GossipDataColumnSidecar);
}
} }
BlockImportMethod::Rpc => { BlockImportMethod::Rpc => {
rig.enqueue_rpc_block(); rig.enqueue_rpc_block();
@@ -795,6 +858,10 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod
rig.enqueue_single_lookup_rpc_blobs(); rig.enqueue_single_lookup_rpc_blobs();
events.push(WorkType::RpcBlobs); events.push(WorkType::RpcBlobs);
} }
if num_data_columns > 0 {
rig.enqueue_single_lookup_rpc_data_columns();
events.push(WorkType::RpcCustodyColumn);
}
} }
}; };
@@ -854,11 +921,8 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod
); );
// Send the block and ensure that the attestation is received back and imported. // Send the block and ensure that the attestation is received back and imported.
let num_blobs = rig let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
.next_blobs let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
.as_ref()
.map(|blobs| blobs.len())
.unwrap_or(0);
let mut events = vec![]; let mut events = vec![];
match import_method { match import_method {
BlockImportMethod::Gossip => { BlockImportMethod::Gossip => {
@@ -868,6 +932,10 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod
rig.enqueue_gossip_blob(i); rig.enqueue_gossip_blob(i);
events.push(WorkType::GossipBlobSidecar); events.push(WorkType::GossipBlobSidecar);
} }
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
events.push(WorkType::GossipDataColumnSidecar)
}
} }
BlockImportMethod::Rpc => { BlockImportMethod::Rpc => {
rig.enqueue_rpc_block(); rig.enqueue_rpc_block();
@@ -876,6 +944,10 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod
rig.enqueue_single_lookup_rpc_blobs(); rig.enqueue_single_lookup_rpc_blobs();
events.push(WorkType::RpcBlobs); events.push(WorkType::RpcBlobs);
} }
if num_data_columns > 0 {
rig.enqueue_single_lookup_rpc_data_columns();
events.push(WorkType::RpcCustodyColumn);
}
} }
}; };
@@ -1060,12 +1132,20 @@ async fn test_rpc_block_reprocessing() {
rig.assert_event_journal_completes(&[WorkType::RpcBlock]) rig.assert_event_journal_completes(&[WorkType::RpcBlock])
.await; .await;
rig.enqueue_single_lookup_rpc_blobs(); let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
if rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0) > 0 { if num_blobs > 0 {
rig.enqueue_single_lookup_rpc_blobs();
rig.assert_event_journal_completes(&[WorkType::RpcBlobs]) rig.assert_event_journal_completes(&[WorkType::RpcBlobs])
.await; .await;
} }
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
if num_data_columns > 0 {
rig.enqueue_single_lookup_rpc_data_columns();
rig.assert_event_journal_completes(&[WorkType::RpcCustodyColumn])
.await;
}
// next_block shouldn't be processed since it couldn't get the // next_block shouldn't be processed since it couldn't get the
// duplicate cache handle // duplicate cache handle
assert_ne!(next_block_root, rig.head_root()); assert_ne!(next_block_root, rig.head_root());

View File

@@ -148,9 +148,10 @@ MAX_BLOBS_PER_BLOCK_ELECTRA: 2
# MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA # MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA
MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 256 MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 256
# DAS # Fulu
NUMBER_OF_COLUMNS: 128 NUMBER_OF_COLUMNS: 128
NUMBER_OF_CUSTODY_GROUPS: 128 NUMBER_OF_CUSTODY_GROUPS: 128
DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128
SAMPLES_PER_SLOT: 8 SAMPLES_PER_SLOT: 8
CUSTODY_REQUIREMENT: 4 CUSTODY_REQUIREMENT: 4
MAX_BLOBS_PER_BLOCK_FULU: 12

View File

@@ -132,9 +132,10 @@ MAX_BLOBS_PER_BLOCK_ELECTRA: 2
# MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA # MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA
MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 256 MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 256
# DAS # Fulu
NUMBER_OF_COLUMNS: 128 NUMBER_OF_COLUMNS: 128
NUMBER_OF_CUSTODY_GROUPS: 128 NUMBER_OF_CUSTODY_GROUPS: 128
DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128
SAMPLES_PER_SLOT: 8 SAMPLES_PER_SLOT: 8
CUSTODY_REQUIREMENT: 4 CUSTODY_REQUIREMENT: 4
MAX_BLOBS_PER_BLOCK_FULU: 12

View File

@@ -137,9 +137,10 @@ MAX_BLOBS_PER_BLOCK_ELECTRA: 9
# MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA # MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA
MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 1152 MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 1152
# DAS # Fulu
NUMBER_OF_COLUMNS: 128 NUMBER_OF_COLUMNS: 128
NUMBER_OF_CUSTODY_GROUPS: 128 NUMBER_OF_CUSTODY_GROUPS: 128
DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128
SAMPLES_PER_SLOT: 8 SAMPLES_PER_SLOT: 8
CUSTODY_REQUIREMENT: 4 CUSTODY_REQUIREMENT: 4
MAX_BLOBS_PER_BLOCK_FULU: 12

View File

@@ -152,9 +152,10 @@ MAX_BLOBS_PER_BLOCK_ELECTRA: 9
# MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA # MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA
MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 1152 MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 1152
# DAS # Fulu
NUMBER_OF_COLUMNS: 128 NUMBER_OF_COLUMNS: 128
NUMBER_OF_CUSTODY_GROUPS: 128 NUMBER_OF_CUSTODY_GROUPS: 128
DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128
SAMPLES_PER_SLOT: 8 SAMPLES_PER_SLOT: 8
CUSTODY_REQUIREMENT: 4 CUSTODY_REQUIREMENT: 4
MAX_BLOBS_PER_BLOCK_FULU: 12

View File

@@ -138,9 +138,10 @@ MAX_BLOBS_PER_BLOCK_ELECTRA: 9
# MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA # MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA
MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 1152 MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 1152
# DAS # Fulu
NUMBER_OF_COLUMNS: 128 NUMBER_OF_COLUMNS: 128
NUMBER_OF_CUSTODY_GROUPS: 128 NUMBER_OF_CUSTODY_GROUPS: 128
DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128
SAMPLES_PER_SLOT: 8 SAMPLES_PER_SLOT: 8
CUSTODY_REQUIREMENT: 4 CUSTODY_REQUIREMENT: 4
MAX_BLOBS_PER_BLOCK_FULU: 12

View File

@@ -240,6 +240,11 @@ pub struct ChainSpec {
blob_sidecar_subnet_count_electra: u64, blob_sidecar_subnet_count_electra: u64,
max_request_blob_sidecars_electra: u64, max_request_blob_sidecars_electra: u64,
/*
* Networking Fulu
*/
max_blobs_per_block_fulu: u64,
/* /*
* Networking Derived * Networking Derived
* *
@@ -655,7 +660,9 @@ impl ChainSpec {
/// Return the value of `MAX_BLOBS_PER_BLOCK` appropriate for `fork`. /// Return the value of `MAX_BLOBS_PER_BLOCK` appropriate for `fork`.
pub fn max_blobs_per_block_by_fork(&self, fork_name: ForkName) -> u64 { pub fn max_blobs_per_block_by_fork(&self, fork_name: ForkName) -> u64 {
if fork_name.electra_enabled() { if fork_name.fulu_enabled() {
self.max_blobs_per_block_fulu
} else if fork_name.electra_enabled() {
self.max_blobs_per_block_electra self.max_blobs_per_block_electra
} else { } else {
self.max_blobs_per_block self.max_blobs_per_block
@@ -992,6 +999,11 @@ impl ChainSpec {
blob_sidecar_subnet_count_electra: default_blob_sidecar_subnet_count_electra(), blob_sidecar_subnet_count_electra: default_blob_sidecar_subnet_count_electra(),
max_request_blob_sidecars_electra: default_max_request_blob_sidecars_electra(), max_request_blob_sidecars_electra: default_max_request_blob_sidecars_electra(),
/*
* Networking Fulu specific
*/
max_blobs_per_block_fulu: default_max_blobs_per_block_fulu(),
/* /*
* Application specific * Application specific
*/ */
@@ -1321,6 +1333,11 @@ impl ChainSpec {
blob_sidecar_subnet_count_electra: 2, blob_sidecar_subnet_count_electra: 2,
max_request_blob_sidecars_electra: 256, max_request_blob_sidecars_electra: 256,
/*
* Networking Fulu specific
*/
max_blobs_per_block_fulu: default_max_blobs_per_block_fulu(),
/* /*
* Application specific * Application specific
*/ */
@@ -1540,6 +1557,9 @@ pub struct Config {
#[serde(default = "default_custody_requirement")] #[serde(default = "default_custody_requirement")]
#[serde(with = "serde_utils::quoted_u64")] #[serde(with = "serde_utils::quoted_u64")]
custody_requirement: u64, custody_requirement: u64,
#[serde(default = "default_max_blobs_per_block_fulu")]
#[serde(with = "serde_utils::quoted_u64")]
max_blobs_per_block_fulu: u64,
} }
fn default_bellatrix_fork_version() -> [u8; 4] { fn default_bellatrix_fork_version() -> [u8; 4] {
@@ -1677,6 +1697,10 @@ const fn default_max_blobs_per_block_electra() -> u64 {
9 9
} }
const fn default_max_blobs_per_block_fulu() -> u64 {
12
}
const fn default_attestation_propagation_slot_range() -> u64 { const fn default_attestation_propagation_slot_range() -> u64 {
32 32
} }
@@ -1904,6 +1928,7 @@ impl Config {
data_column_sidecar_subnet_count: spec.data_column_sidecar_subnet_count, data_column_sidecar_subnet_count: spec.data_column_sidecar_subnet_count,
samples_per_slot: spec.samples_per_slot, samples_per_slot: spec.samples_per_slot,
custody_requirement: spec.custody_requirement, custody_requirement: spec.custody_requirement,
max_blobs_per_block_fulu: spec.max_blobs_per_block_fulu,
} }
} }
@@ -1982,6 +2007,7 @@ impl Config {
data_column_sidecar_subnet_count, data_column_sidecar_subnet_count,
samples_per_slot, samples_per_slot,
custody_requirement, custody_requirement,
max_blobs_per_block_fulu,
} = self; } = self;
if preset_base != E::spec_name().to_string().as_str() { if preset_base != E::spec_name().to_string().as_str() {
@@ -2064,6 +2090,7 @@ impl Config {
data_column_sidecar_subnet_count, data_column_sidecar_subnet_count,
samples_per_slot, samples_per_slot,
custody_requirement, custody_requirement,
max_blobs_per_block_fulu,
..chain_spec.clone() ..chain_spec.clone()
}) })