Avoid attempting to serve blobs after Fulu fork (#7756)

* #7122


  


Co-Authored-By: Tan Chee Keong <tanck@sigmaprime.io>

Co-Authored-By: chonghe <44791194+chong-he@users.noreply.github.com>
This commit is contained in:
chonghe
2025-10-20 14:29:21 +08:00
committed by GitHub
parent da93b89e90
commit 2b30c96f16
2 changed files with 269 additions and 42 deletions

View File

@@ -3,7 +3,7 @@ use crate::network_beacon_processor::{FUTURE_SLOT_TOLERANCE, NetworkBeaconProces
use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
use crate::sync::SyncMessage;
use beacon_chain::{BeaconChainError, BeaconChainTypes, WhenSlotSkipped};
use beacon_chain::{BeaconChainError, BeaconChainTypes, BlockProcessStatus, WhenSlotSkipped};
use itertools::{Itertools, process_results};
use lighthouse_network::rpc::methods::{
BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest,
@@ -293,21 +293,49 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
inbound_request_id: InboundRequestId,
request: BlobsByRootRequest,
) -> Result<(), (RpcErrorResponse, &'static str)> {
let Some(requested_root) = request.blob_ids.as_slice().first().map(|id| id.block_root)
else {
// No blob ids requested.
return Ok(());
};
let requested_indices = request
.blob_ids
.as_slice()
.iter()
.map(|id| id.index)
.collect::<Vec<_>>();
let mut send_blob_count = 0;
let fulu_start_slot = self
.chain
.spec
.fulu_fork_epoch
.map(|epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()));
let mut blob_list_results = HashMap::new();
let slots_by_block_root: HashMap<Hash256, Slot> = request
.blob_ids
.iter()
.flat_map(|blob_id| {
let block_root = blob_id.block_root;
self.chain
.data_availability_checker
.get_cached_block(&block_root)
.and_then(|status| match status {
BlockProcessStatus::NotValidated(block, _source) => Some(block),
BlockProcessStatus::ExecutionValidated(block) => Some(block),
BlockProcessStatus::Unknown => None,
})
.or_else(|| self.chain.early_attester_cache.get_block(block_root))
.map(|block| (block_root, block.slot()))
})
.collect();
for id in request.blob_ids.as_slice() {
let BlobIdentifier {
block_root: root,
index,
} = id;
let slot = slots_by_block_root.get(root);
// Skip if slot is >= fulu_start_slot
if let (Some(slot), Some(fulu_slot)) = (slot, fulu_start_slot)
&& *slot >= fulu_slot
{
continue;
}
// First attempt to get the blobs from the RPC cache.
if let Ok(Some(blob)) = self.chain.data_availability_checker.get_blob(id) {
self.send_response(
@@ -317,11 +345,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
send_blob_count += 1;
} else {
let BlobIdentifier {
block_root: root,
index,
} = id;
let blob_list_result = match blob_list_results.entry(root) {
Entry::Vacant(entry) => {
entry.insert(self.chain.get_blobs_checking_early_attester_cache(root))
@@ -331,16 +354,15 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
match blob_list_result.as_ref() {
Ok(blobs_sidecar_list) => {
'inner: for blob_sidecar in blobs_sidecar_list.iter() {
if blob_sidecar.index == *index {
self.send_response(
peer_id,
inbound_request_id,
Response::BlobsByRoot(Some(blob_sidecar.clone())),
);
send_blob_count += 1;
break 'inner;
}
if let Some(blob_sidecar) =
blobs_sidecar_list.iter().find(|b| b.index == *index)
{
self.send_response(
peer_id,
inbound_request_id,
Response::BlobsByRoot(Some(blob_sidecar.clone())),
);
send_blob_count += 1;
}
}
Err(e) => {
@@ -354,10 +376,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}
}
debug!(
%peer_id,
%requested_root,
?requested_indices,
block_root = ?slots_by_block_root.keys(),
returned = send_blob_count,
"BlobsByRoot outgoing response processed"
);
@@ -1003,6 +1025,34 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
let request_start_slot = Slot::from(req.start_slot);
let request_start_epoch = request_start_slot.epoch(T::EthSpec::slots_per_epoch());
let fork_name = self.chain.spec.fork_name_at_epoch(request_start_epoch);
// Should not send more than max request blob sidecars
if req.max_blobs_requested(request_start_epoch, &self.chain.spec)
> self.chain.spec.max_request_blob_sidecars(fork_name) as u64
{
return Err((
RpcErrorResponse::InvalidRequest,
"Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`",
));
}
let effective_count = if let Some(fulu_epoch) = self.chain.spec.fulu_fork_epoch {
let fulu_start_slot = fulu_epoch.start_slot(T::EthSpec::slots_per_epoch());
let request_end_slot = request_start_slot.saturating_add(req.count) - 1;
// If the request_start_slot is at or after a Fulu slot, return an empty response
if request_start_slot >= fulu_start_slot {
return Ok(());
// For the case that the request slots spans across the Fulu fork slot
} else if request_end_slot >= fulu_start_slot {
(fulu_start_slot - request_start_slot).as_u64()
} else {
req.count
}
} else {
req.count
};
let data_availability_boundary_slot = match self.chain.data_availability_boundary() {
Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()),
@@ -1040,7 +1090,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
let block_roots =
self.get_block_roots_for_slot_range(req.start_slot, req.count, "BlobsByRange")?;
self.get_block_roots_for_slot_range(req.start_slot, effective_count, "BlobsByRange")?;
let current_slot = self
.chain
@@ -1067,7 +1117,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// Due to skip slots, blobs could be out of the range, we ensure they
// are in the range before sending
if blob_sidecar.slot() >= request_start_slot
&& blob_sidecar.slot() < request_start_slot + req.count
&& blob_sidecar.slot() < request_start_slot + effective_count
{
blobs_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
@@ -1148,7 +1198,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
if req.max_requested::<T::EthSpec>() > self.chain.spec.max_request_data_column_sidecars {
return Err((
RpcErrorResponse::InvalidRequest,
"Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`",
"Request exceeded `MAX_REQUEST_DATA_COLUMN_SIDECARS`",
));
}

View File

@@ -22,7 +22,7 @@ use gossipsub::MessageAcceptance;
use itertools::Itertools;
use lighthouse_network::rpc::InboundRequestId;
use lighthouse_network::rpc::methods::{
BlobsByRangeRequest, DataColumnsByRangeRequest, MetaDataV3,
BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, MetaDataV3,
};
use lighthouse_network::{
Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response,
@@ -37,12 +37,12 @@ use std::iter::Iterator;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList};
use types::{
AttesterSlashing, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecarList,
DataColumnSubnetId, Epoch, EthSpec, Hash256, MainnetEthSpec, ProposerSlashing,
SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, SingleAttestation, Slot,
SubnetId,
RuntimeVariableList, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit,
SingleAttestation, Slot, SubnetId,
};
type E = MainnetEthSpec;
@@ -431,15 +431,22 @@ impl TestRig {
}
}
pub fn enqueue_blobs_by_range_request(&self, count: u64) {
pub fn enqueue_blobs_by_range_request(&self, start_slot: u64, count: u64) {
self.network_beacon_processor
.send_blobs_by_range_request(
PeerId::random(),
InboundRequestId::new_unchecked(42, 24),
BlobsByRangeRequest {
start_slot: 0,
count,
},
BlobsByRangeRequest { start_slot, count },
)
.unwrap();
}
pub fn enqueue_blobs_by_root_request(&self, blob_ids: RuntimeVariableList<BlobIdentifier>) {
self.network_beacon_processor
.send_blobs_by_roots_request(
PeerId::random(),
InboundRequestId::new_unchecked(42, 24),
BlobsByRootRequest { blob_ids },
)
.unwrap();
}
@@ -1632,8 +1639,9 @@ async fn test_blobs_by_range() {
return;
};
let mut rig = TestRig::new(64).await;
let start_slot = 0;
let slot_count = 32;
rig.enqueue_blobs_by_range_request(slot_count);
rig.enqueue_blobs_by_range_request(start_slot, slot_count);
let mut blob_count = 0;
for slot in 0..slot_count {
@@ -1651,6 +1659,65 @@ async fn test_blobs_by_range() {
.unwrap_or(0);
}
let mut actual_count = 0;
while let Some(next) = rig.network_rx.recv().await {
if let NetworkMessage::SendResponse {
peer_id: _,
response: Response::BlobsByRange(blob),
inbound_request_id: _,
} = next
{
if blob.is_some() {
actual_count += 1;
} else {
break;
}
} else {
panic!("unexpected message {:?}", next);
}
}
if test_spec::<E>().fulu_fork_epoch.is_some() {
assert_eq!(0, actual_count, "Post-Fulu should return 0 blobs");
} else {
assert_eq!(blob_count, actual_count);
}
}
#[tokio::test]
async fn test_blobs_by_range_spans_fulu_fork() {
// Only test for Electra & Fulu fork transition
if test_spec::<E>().electra_fork_epoch.is_none() {
return;
};
let mut spec = test_spec::<E>();
spec.fulu_fork_epoch = Some(Epoch::new(1));
spec.gloas_fork_epoch = Some(Epoch::new(2));
let mut rig = TestRig::new_parametric(64, BeaconProcessorConfig::default(), false, spec).await;
let start_slot = 16;
// This will span from epoch 0 (Electra) to epoch 1 (Fulu)
let slot_count = 32;
rig.enqueue_blobs_by_range_request(start_slot, slot_count);
let mut blob_count = 0;
for slot in start_slot..slot_count {
let root = rig
.chain
.block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None)
.unwrap();
blob_count += root
.map(|root| {
rig.chain
.get_blobs(&root)
.map(|list| list.len())
.unwrap_or(0)
})
.unwrap_or(0);
}
let mut actual_count = 0;
while let Some(next) = rig.network_rx.recv().await {
if let NetworkMessage::SendResponse {
peer_id: _,
@@ -1670,6 +1737,116 @@ async fn test_blobs_by_range() {
assert_eq!(blob_count, actual_count);
}
#[tokio::test]
async fn test_blobs_by_root() {
if test_spec::<E>().deneb_fork_epoch.is_none() {
return;
};
let mut rig = TestRig::new(64).await;
// Get the block root of a sample slot, e.g., slot 1
let block_root = rig
.chain
.block_root_at_slot(Slot::new(1), WhenSlotSkipped::None)
.unwrap()
.unwrap();
let blobs = rig.chain.get_blobs(&block_root).unwrap();
let blob_count = blobs.len();
let blob_ids: Vec<BlobIdentifier> = (0..blob_count)
.map(|index| BlobIdentifier {
block_root,
index: index as u64,
})
.collect();
let blob_ids_list = RuntimeVariableList::new(blob_ids, blob_count).unwrap();
rig.enqueue_blobs_by_root_request(blob_ids_list);
let mut blob_count = 0;
let root = rig
.chain
.block_root_at_slot(Slot::new(1), WhenSlotSkipped::None)
.unwrap();
blob_count += root
.map(|root| {
rig.chain
.get_blobs(&root)
.map(|list| list.len())
.unwrap_or(0)
})
.unwrap_or(0);
let mut actual_count = 0;
while let Some(next) = rig.network_rx.recv().await {
if let NetworkMessage::SendResponse {
peer_id: _,
response: Response::BlobsByRoot(blob),
inbound_request_id: _,
} = next
{
if blob.is_some() {
actual_count += 1;
} else {
break;
}
} else {
panic!("unexpected message {:?}", next);
}
}
assert_eq!(blob_count, actual_count);
}
#[tokio::test]
async fn test_blobs_by_root_post_fulu_should_return_empty() {
// Only test for Fulu fork
if test_spec::<E>().fulu_fork_epoch.is_none() {
return;
};
let mut rig = TestRig::new(64).await;
let block_root = rig
.chain
.block_root_at_slot(Slot::new(1), WhenSlotSkipped::None)
.unwrap()
.unwrap();
let blob_ids = vec![BlobIdentifier {
block_root,
index: 0,
}];
let blob_ids_list = RuntimeVariableList::new(blob_ids, 1).unwrap();
rig.enqueue_blobs_by_root_request(blob_ids_list);
let mut actual_count = 0;
while let Some(next) = rig.network_rx.recv().await {
if let NetworkMessage::SendResponse {
peer_id: _,
response: Response::BlobsByRoot(blob),
inbound_request_id: _,
} = next
{
if blob.is_some() {
actual_count += 1;
} else {
break;
}
} else {
panic!("unexpected message {:?}", next);
}
}
// Post-Fulu should return 0 blobs
assert_eq!(0, actual_count);
}
/// Ensure that data column processing that results in block import sends a sync notification
#[tokio::test]
async fn test_data_column_import_notifies_sync() {