From 2b30c96f16fa52b44198768fce6a226861674da7 Mon Sep 17 00:00:00 2001 From: chonghe <44791194+chong-he@users.noreply.github.com> Date: Mon, 20 Oct 2025 14:29:21 +0800 Subject: [PATCH] Avoid attempting to serve blobs after Fulu fork (#7756) * #7122 Co-Authored-By: Tan Chee Keong Co-Authored-By: chonghe <44791194+chong-he@users.noreply.github.com> --- .../network_beacon_processor/rpc_methods.rs | 114 +++++++--- .../src/network_beacon_processor/tests.rs | 197 +++++++++++++++++- 2 files changed, 269 insertions(+), 42 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 58e02ffe00..0fcd67dbf1 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -3,7 +3,7 @@ use crate::network_beacon_processor::{FUTURE_SLOT_TOLERANCE, NetworkBeaconProces use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::SyncMessage; -use beacon_chain::{BeaconChainError, BeaconChainTypes, WhenSlotSkipped}; +use beacon_chain::{BeaconChainError, BeaconChainTypes, BlockProcessStatus, WhenSlotSkipped}; use itertools::{Itertools, process_results}; use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest, @@ -293,21 +293,49 @@ impl NetworkBeaconProcessor { inbound_request_id: InboundRequestId, request: BlobsByRootRequest, ) -> Result<(), (RpcErrorResponse, &'static str)> { - let Some(requested_root) = request.blob_ids.as_slice().first().map(|id| id.block_root) - else { - // No blob ids requested. - return Ok(()); - }; - let requested_indices = request - .blob_ids - .as_slice() - .iter() - .map(|id| id.index) - .collect::>(); let mut send_blob_count = 0; + let fulu_start_slot = self + .chain + .spec + .fulu_fork_epoch + .map(|epoch| epoch.start_slot(T::EthSpec::slots_per_epoch())); + let mut blob_list_results = HashMap::new(); + + let slots_by_block_root: HashMap = request + .blob_ids + .iter() + .flat_map(|blob_id| { + let block_root = blob_id.block_root; + self.chain + .data_availability_checker + .get_cached_block(&block_root) + .and_then(|status| match status { + BlockProcessStatus::NotValidated(block, _source) => Some(block), + BlockProcessStatus::ExecutionValidated(block) => Some(block), + BlockProcessStatus::Unknown => None, + }) + .or_else(|| self.chain.early_attester_cache.get_block(block_root)) + .map(|block| (block_root, block.slot())) + }) + .collect(); + for id in request.blob_ids.as_slice() { + let BlobIdentifier { + block_root: root, + index, + } = id; + + let slot = slots_by_block_root.get(root); + + // Skip if slot is >= fulu_start_slot + if let (Some(slot), Some(fulu_slot)) = (slot, fulu_start_slot) + && *slot >= fulu_slot + { + continue; + } + // First attempt to get the blobs from the RPC cache. if let Ok(Some(blob)) = self.chain.data_availability_checker.get_blob(id) { self.send_response( @@ -317,11 +345,6 @@ impl NetworkBeaconProcessor { ); send_blob_count += 1; } else { - let BlobIdentifier { - block_root: root, - index, - } = id; - let blob_list_result = match blob_list_results.entry(root) { Entry::Vacant(entry) => { entry.insert(self.chain.get_blobs_checking_early_attester_cache(root)) @@ -331,16 +354,15 @@ impl NetworkBeaconProcessor { match blob_list_result.as_ref() { Ok(blobs_sidecar_list) => { - 'inner: for blob_sidecar in blobs_sidecar_list.iter() { - if blob_sidecar.index == *index { - self.send_response( - peer_id, - inbound_request_id, - Response::BlobsByRoot(Some(blob_sidecar.clone())), - ); - send_blob_count += 1; - break 'inner; - } + if let Some(blob_sidecar) = + blobs_sidecar_list.iter().find(|b| b.index == *index) + { + self.send_response( + peer_id, + inbound_request_id, + Response::BlobsByRoot(Some(blob_sidecar.clone())), + ); + send_blob_count += 1; } } Err(e) => { @@ -354,10 +376,10 @@ impl NetworkBeaconProcessor { } } } + debug!( %peer_id, - %requested_root, - ?requested_indices, + block_root = ?slots_by_block_root.keys(), returned = send_blob_count, "BlobsByRoot outgoing response processed" ); @@ -1003,6 +1025,34 @@ impl NetworkBeaconProcessor { ); let request_start_slot = Slot::from(req.start_slot); + let request_start_epoch = request_start_slot.epoch(T::EthSpec::slots_per_epoch()); + let fork_name = self.chain.spec.fork_name_at_epoch(request_start_epoch); + // Should not send more than max request blob sidecars + if req.max_blobs_requested(request_start_epoch, &self.chain.spec) + > self.chain.spec.max_request_blob_sidecars(fork_name) as u64 + { + return Err(( + RpcErrorResponse::InvalidRequest, + "Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`", + )); + } + + let effective_count = if let Some(fulu_epoch) = self.chain.spec.fulu_fork_epoch { + let fulu_start_slot = fulu_epoch.start_slot(T::EthSpec::slots_per_epoch()); + let request_end_slot = request_start_slot.saturating_add(req.count) - 1; + + // If the request_start_slot is at or after a Fulu slot, return an empty response + if request_start_slot >= fulu_start_slot { + return Ok(()); + // For the case that the request slots spans across the Fulu fork slot + } else if request_end_slot >= fulu_start_slot { + (fulu_start_slot - request_start_slot).as_u64() + } else { + req.count + } + } else { + req.count + }; let data_availability_boundary_slot = match self.chain.data_availability_boundary() { Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()), @@ -1040,7 +1090,7 @@ impl NetworkBeaconProcessor { } let block_roots = - self.get_block_roots_for_slot_range(req.start_slot, req.count, "BlobsByRange")?; + self.get_block_roots_for_slot_range(req.start_slot, effective_count, "BlobsByRange")?; let current_slot = self .chain @@ -1067,7 +1117,7 @@ impl NetworkBeaconProcessor { // Due to skip slots, blobs could be out of the range, we ensure they // are in the range before sending if blob_sidecar.slot() >= request_start_slot - && blob_sidecar.slot() < request_start_slot + req.count + && blob_sidecar.slot() < request_start_slot + effective_count { blobs_sent += 1; self.send_network_message(NetworkMessage::SendResponse { @@ -1148,7 +1198,7 @@ impl NetworkBeaconProcessor { if req.max_requested::() > self.chain.spec.max_request_data_column_sidecars { return Err(( RpcErrorResponse::InvalidRequest, - "Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`", + "Request exceeded `MAX_REQUEST_DATA_COLUMN_SIDECARS`", )); } diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 4137c974bf..a3aef8f802 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -22,7 +22,7 @@ use gossipsub::MessageAcceptance; use itertools::Itertools; use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::rpc::methods::{ - BlobsByRangeRequest, DataColumnsByRangeRequest, MetaDataV3, + BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, MetaDataV3, }; use lighthouse_network::{ Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response, @@ -37,12 +37,12 @@ use std::iter::Iterator; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use types::blob_sidecar::FixedBlobSidecarList; +use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList}; use types::{ AttesterSlashing, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecarList, DataColumnSubnetId, Epoch, EthSpec, Hash256, MainnetEthSpec, ProposerSlashing, - SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, SingleAttestation, Slot, - SubnetId, + RuntimeVariableList, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, + SingleAttestation, Slot, SubnetId, }; type E = MainnetEthSpec; @@ -431,15 +431,22 @@ impl TestRig { } } - pub fn enqueue_blobs_by_range_request(&self, count: u64) { + pub fn enqueue_blobs_by_range_request(&self, start_slot: u64, count: u64) { self.network_beacon_processor .send_blobs_by_range_request( PeerId::random(), InboundRequestId::new_unchecked(42, 24), - BlobsByRangeRequest { - start_slot: 0, - count, - }, + BlobsByRangeRequest { start_slot, count }, + ) + .unwrap(); + } + + pub fn enqueue_blobs_by_root_request(&self, blob_ids: RuntimeVariableList) { + self.network_beacon_processor + .send_blobs_by_roots_request( + PeerId::random(), + InboundRequestId::new_unchecked(42, 24), + BlobsByRootRequest { blob_ids }, ) .unwrap(); } @@ -1632,8 +1639,9 @@ async fn test_blobs_by_range() { return; }; let mut rig = TestRig::new(64).await; + let start_slot = 0; let slot_count = 32; - rig.enqueue_blobs_by_range_request(slot_count); + rig.enqueue_blobs_by_range_request(start_slot, slot_count); let mut blob_count = 0; for slot in 0..slot_count { @@ -1651,6 +1659,65 @@ async fn test_blobs_by_range() { .unwrap_or(0); } let mut actual_count = 0; + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::BlobsByRange(blob), + inbound_request_id: _, + } = next + { + if blob.is_some() { + actual_count += 1; + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + if test_spec::().fulu_fork_epoch.is_some() { + assert_eq!(0, actual_count, "Post-Fulu should return 0 blobs"); + } else { + assert_eq!(blob_count, actual_count); + } +} + +#[tokio::test] +async fn test_blobs_by_range_spans_fulu_fork() { + // Only test for Electra & Fulu fork transition + if test_spec::().electra_fork_epoch.is_none() { + return; + }; + let mut spec = test_spec::(); + spec.fulu_fork_epoch = Some(Epoch::new(1)); + spec.gloas_fork_epoch = Some(Epoch::new(2)); + + let mut rig = TestRig::new_parametric(64, BeaconProcessorConfig::default(), false, spec).await; + + let start_slot = 16; + // This will span from epoch 0 (Electra) to epoch 1 (Fulu) + let slot_count = 32; + + rig.enqueue_blobs_by_range_request(start_slot, slot_count); + + let mut blob_count = 0; + for slot in start_slot..slot_count { + let root = rig + .chain + .block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None) + .unwrap(); + blob_count += root + .map(|root| { + rig.chain + .get_blobs(&root) + .map(|list| list.len()) + .unwrap_or(0) + }) + .unwrap_or(0); + } + + let mut actual_count = 0; + while let Some(next) = rig.network_rx.recv().await { if let NetworkMessage::SendResponse { peer_id: _, @@ -1670,6 +1737,116 @@ async fn test_blobs_by_range() { assert_eq!(blob_count, actual_count); } +#[tokio::test] +async fn test_blobs_by_root() { + if test_spec::().deneb_fork_epoch.is_none() { + return; + }; + + let mut rig = TestRig::new(64).await; + + // Get the block root of a sample slot, e.g., slot 1 + let block_root = rig + .chain + .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + + let blobs = rig.chain.get_blobs(&block_root).unwrap(); + let blob_count = blobs.len(); + + let blob_ids: Vec = (0..blob_count) + .map(|index| BlobIdentifier { + block_root, + index: index as u64, + }) + .collect(); + + let blob_ids_list = RuntimeVariableList::new(blob_ids, blob_count).unwrap(); + + rig.enqueue_blobs_by_root_request(blob_ids_list); + + let mut blob_count = 0; + let root = rig + .chain + .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) + .unwrap(); + blob_count += root + .map(|root| { + rig.chain + .get_blobs(&root) + .map(|list| list.len()) + .unwrap_or(0) + }) + .unwrap_or(0); + + let mut actual_count = 0; + + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::BlobsByRoot(blob), + inbound_request_id: _, + } = next + { + if blob.is_some() { + actual_count += 1; + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + assert_eq!(blob_count, actual_count); +} + +#[tokio::test] +async fn test_blobs_by_root_post_fulu_should_return_empty() { + // Only test for Fulu fork + if test_spec::().fulu_fork_epoch.is_none() { + return; + }; + + let mut rig = TestRig::new(64).await; + + let block_root = rig + .chain + .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + + let blob_ids = vec![BlobIdentifier { + block_root, + index: 0, + }]; + + let blob_ids_list = RuntimeVariableList::new(blob_ids, 1).unwrap(); + + rig.enqueue_blobs_by_root_request(blob_ids_list); + + let mut actual_count = 0; + + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::BlobsByRoot(blob), + inbound_request_id: _, + } = next + { + if blob.is_some() { + actual_count += 1; + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + // Post-Fulu should return 0 blobs + assert_eq!(0, actual_count); +} + /// Ensure that data column processing that results in block import sends a sync notification #[tokio::test] async fn test_data_column_import_notifies_sync() {