Breakup RPCBlock into LookupBlock & RangeSyncBlock (#8860)

Co-Authored-By: Mark Mackey <mark@sigmaprime.io>
This commit is contained in:
ethDreamer
2026-03-13 14:22:29 -05:00
committed by GitHub
parent 02137492f3
commit 6ca610d918
25 changed files with 505 additions and 669 deletions

View File

@@ -7,6 +7,7 @@ use crate::sync::{
manager::{BlockProcessType, BlockProcessingResult, SyncManager},
};
use beacon_chain::blob_verification::KzgVerifiedBlob;
use beacon_chain::block_verification_types::LookupBlock;
use beacon_chain::custody_context::NodeCustodyType;
use beacon_chain::{
AvailabilityProcessingStatus, BlockError, NotifyExecutionLayer,
@@ -464,7 +465,7 @@ impl TestRig {
panic!("Test consumer requested unknown block: {id:?}")
})
.block_data()
.and_then(|d| d.blobs())
.blobs()
.unwrap_or_else(|| panic!("Block {id:?} has no blobs"))
.iter()
.find(|blob| blob.index == id.index)
@@ -528,7 +529,7 @@ impl TestRig {
panic!("Test consumer requested unknown block: {id:?}")
})
.block_data()
.and_then(|d| d.data_columns())
.data_columns()
.unwrap_or_else(|| panic!("Block id {id:?} has no columns"));
id.columns
.iter()
@@ -594,7 +595,7 @@ impl TestRig {
// - Some blocks may not have blobs as the blob count is random
let blobs = (req.start_slot..req.start_slot + req.count)
.filter_map(|slot| self.network_blocks_by_slot.get(&Slot::new(slot)))
.filter_map(|block| block.block_data().and_then(|d| d.blobs()))
.filter_map(|block| block.block_data().blobs())
.flat_map(|blobs| blobs.into_iter())
.collect::<Vec<_>>();
self.send_rpc_blobs_response(req_id, peer_id, &blobs);
@@ -610,7 +611,7 @@ impl TestRig {
// - Some blocks may not have columns as the blob count is random
let columns = (req.start_slot..req.start_slot + req.count)
.filter_map(|slot| self.network_blocks_by_slot.get(&Slot::new(slot)))
.filter_map(|block| block.block_data().and_then(|d| d.data_columns()))
.filter_map(|block| block.block_data().data_columns())
.flat_map(|columns| {
columns
.into_iter()
@@ -786,10 +787,10 @@ impl TestRig {
}
fn corrupt_last_block_signature(&mut self) {
let rpc_block = self.get_last_block().clone();
let mut block = (*rpc_block.block_cloned()).clone();
let blobs = rpc_block.block_data().and_then(|d| d.blobs());
let columns = rpc_block.block_data().and_then(|d| d.data_columns());
let range_sync_block = self.get_last_block().clone();
let mut block = (*range_sync_block.block_cloned()).clone();
let blobs = range_sync_block.block_data().blobs();
let columns = range_sync_block.block_data().data_columns();
*block.signature_mut() = self.valid_signature();
self.re_insert_block(Arc::new(block), blobs, columns);
}
@@ -801,15 +802,15 @@ impl TestRig {
}
fn corrupt_last_blob_proposer_signature(&mut self) {
let rpc_block = self.get_last_block().clone();
let block = rpc_block.block_cloned();
let mut blobs = rpc_block
let range_sync_block = self.get_last_block().clone();
let block = range_sync_block.block_cloned();
let mut blobs = range_sync_block
.block_data()
.and_then(|d| d.blobs())
.blobs()
.expect("no blobs")
.into_iter()
.collect::<Vec<_>>();
let columns = rpc_block.block_data().and_then(|d| d.data_columns());
let columns = range_sync_block.block_data().data_columns();
let first = blobs.first_mut().expect("empty blobs");
Arc::make_mut(first).signed_block_header.signature = self.valid_signature();
let max_blobs =
@@ -822,15 +823,15 @@ impl TestRig {
}
fn corrupt_last_blob_kzg_proof(&mut self) {
let rpc_block = self.get_last_block().clone();
let block = rpc_block.block_cloned();
let mut blobs = rpc_block
let range_sync_block = self.get_last_block().clone();
let block = range_sync_block.block_cloned();
let mut blobs = range_sync_block
.block_data()
.and_then(|d| d.blobs())
.blobs()
.expect("no blobs")
.into_iter()
.collect::<Vec<_>>();
let columns = rpc_block.block_data().and_then(|d| d.data_columns());
let columns = range_sync_block.block_data().data_columns();
let first = blobs.first_mut().expect("empty blobs");
Arc::make_mut(first).kzg_proof = kzg::KzgProof::empty();
let max_blobs =
@@ -843,12 +844,12 @@ impl TestRig {
}
fn corrupt_last_column_proposer_signature(&mut self) {
let rpc_block = self.get_last_block().clone();
let block = rpc_block.block_cloned();
let blobs = rpc_block.block_data().and_then(|d| d.blobs());
let mut columns = rpc_block
let range_sync_block = self.get_last_block().clone();
let block = range_sync_block.block_cloned();
let blobs = range_sync_block.block_data().blobs();
let mut columns = range_sync_block
.block_data()
.and_then(|d| d.data_columns())
.data_columns()
.expect("no columns");
let first = columns.first_mut().expect("empty columns");
Arc::make_mut(first)
@@ -859,12 +860,12 @@ impl TestRig {
}
fn corrupt_last_column_kzg_proof(&mut self) {
let rpc_block = self.get_last_block().clone();
let block = rpc_block.block_cloned();
let blobs = rpc_block.block_data().and_then(|d| d.blobs());
let mut columns = rpc_block
let range_sync_block = self.get_last_block().clone();
let block = range_sync_block.block_cloned();
let blobs = range_sync_block.block_data().blobs();
let mut columns = range_sync_block
.block_data()
.and_then(|d| d.data_columns())
.data_columns()
.expect("no columns");
let first = columns.first_mut().expect("empty columns");
let column = Arc::make_mut(first);
@@ -873,7 +874,7 @@ impl TestRig {
self.re_insert_block(block, blobs, Some(columns));
}
fn get_last_block(&self) -> &RpcBlock<E> {
fn get_last_block(&self) -> &RangeSyncBlock<E> {
let (_, last_block) = self
.network_blocks_by_root
.iter()
@@ -893,13 +894,13 @@ impl TestRig {
let block_root = block.canonical_root();
let block_slot = block.slot();
let block_data = if let Some(columns) = columns {
Some(AvailableBlockData::new_with_data_columns(columns))
AvailableBlockData::new_with_data_columns(columns)
} else if let Some(blobs) = blobs {
Some(AvailableBlockData::new_with_blobs(blobs))
AvailableBlockData::new_with_blobs(blobs)
} else {
Some(AvailableBlockData::NoData)
AvailableBlockData::NoData
};
let rpc_block = RpcBlock::new(
let range_sync_block = RangeSyncBlock::new(
block,
block_data,
&self.harness.chain.data_availability_checker,
@@ -907,8 +908,9 @@ impl TestRig {
)
.unwrap();
self.network_blocks_by_slot
.insert(block_slot, rpc_block.clone());
self.network_blocks_by_root.insert(block_root, rpc_block);
.insert(block_slot, range_sync_block.clone());
self.network_blocks_by_root
.insert(block_root, range_sync_block);
}
/// Trigger a lookup with the last created block
@@ -947,7 +949,7 @@ impl TestRig {
/// Import a block directly into the chain without going through lookup sync
async fn import_block_by_root(&mut self, block_root: Hash256) {
let rpc_block = self
let range_sync_block = self
.network_blocks_by_root
.get(&block_root)
.unwrap_or_else(|| panic!("No block for root {block_root}"))
@@ -957,9 +959,9 @@ impl TestRig {
.chain
.process_block(
block_root,
rpc_block,
range_sync_block,
NotifyExecutionLayer::Yes,
BlockImportSource::Gossip,
BlockImportSource::RangeSync,
|| Ok(()),
)
.await
@@ -979,7 +981,7 @@ impl TestRig {
let blobs = self
.get_last_block()
.block_data()
.and_then(|d| d.blobs())
.blobs()
.expect("no blobs");
let blob = blobs.first().expect("empty blobs");
self.trigger_unknown_parent_blob(peer_id, blob.clone());
@@ -990,7 +992,7 @@ impl TestRig {
let columns = self
.get_last_block()
.block_data()
.and_then(|d| d.data_columns())
.data_columns()
.expect("No data columns");
let column = columns.first().expect("empty columns");
self.trigger_unknown_parent_column(peer_id, column.clone());
@@ -1475,15 +1477,14 @@ impl TestRig {
) -> AvailabilityProcessingStatus {
// Simulate importing block from another source. Don't use GossipVerified as it checks with
// the clock, which does not match the timestamp in the payload.
let block_root = block.canonical_root();
let rpc_block = RpcBlock::BlockOnly { block_root, block };
let lookup_block = LookupBlock::new(block);
self.harness
.chain
.process_block(
block_root,
rpc_block,
lookup_block.block_root(),
lookup_block,
NotifyExecutionLayer::Yes,
BlockImportSource::Gossip,
BlockImportSource::Lookup,
|| Ok(()),
)
.await
@@ -2196,10 +2197,7 @@ async fn blobs_in_da_checker_skip_download() {
};
r.build_chain(1).await;
let block = r.get_last_block().clone();
let blobs = block
.block_data()
.and_then(|d| d.blobs())
.expect("block with no blobs");
let blobs = block.block_data().blobs().expect("block with no blobs");
for blob in &blobs {
r.insert_blob_to_da_checker(blob.clone());
}