Merge remote-tracking branch 'sigp/unstable' into gloas-lookup-sync-fixes

Reconciles unstable's #9383 (Deprecate blob lookup sync) with this PR's
rewritten lookup architecture by removing blob lookup from the new arch:
Deneb/Electra block lookups complete on the block alone (the merged
da_checker makes them available without blobs), and DataDownload::Blobs,
blob_lookup_request, SyncRequestId::SingleBlob, BlockProcessType::SingleBlob,
the process_rpc_blobs lookup cluster, and blob lookup tests are removed.
Range-sync blobs and blob serving are kept.
This commit is contained in:
dapplion
2026-06-01 17:44:16 +02:00
22 changed files with 309 additions and 316 deletions

View File

@@ -37,7 +37,6 @@ use {
};
pub use sync_methods::ChainSegmentProcessId;
use types::data::FixedBlobSidecarList;
pub type Error<T> = TrySendError<BeaconWorkEvent<T>>;
@@ -534,31 +533,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
})
}
/// Create a new `Work` event for some blobs, where the result from computation (if any) is
/// sent to the other side of `result_tx`.
pub fn send_rpc_blobs(
self: &Arc<Self>,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
) -> Result<(), Error<T::EthSpec>> {
let blob_count = blobs.iter().filter(|b| b.is_some()).count();
if blob_count == 0 {
return Ok(());
}
let process_fn = self.clone().generate_rpc_blobs_process_fn(
block_root,
blobs,
seen_timestamp,
process_type,
);
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::RpcBlobs { process_fn },
})
}
/// Create a new `Work` event for an RPC-fetched payload envelope. `process_lookup_envelope`
/// reports the result back to sync.
pub fn send_lookup_envelope(

View File

@@ -41,15 +41,12 @@ use std::iter::Iterator;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use types::data::BlobIdentifier;
use types::{
AttesterSlashing, BlobSidecar, ChainSpec, DataColumnSidecarList, DataColumnSubnetId, Epoch,
EthSpec, ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, Hash256,
MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock,
SignedExecutionPayloadEnvelope, SignedVoluntaryExit, SingleAttestation, Slot, SubnetId,
};
use types::{
BlobSidecarList,
data::{BlobIdentifier, FixedBlobSidecarList},
AttesterSlashing, ChainSpec, DataColumnSidecarList, DataColumnSubnetId, Epoch, EthSpec,
ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, Hash256, MainnetEthSpec,
ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedExecutionPayloadEnvelope,
SignedVoluntaryExit, SingleAttestation, Slot, SubnetId,
};
type E = MainnetEthSpec;
@@ -69,7 +66,6 @@ const STANDARD_TIMEOUT: Duration = Duration::from_secs(10);
struct TestRig {
chain: Arc<BeaconChain<T>>,
next_block: Arc<SignedBeaconBlock<E>>,
next_blobs: Option<BlobSidecarList<E>>,
next_data_columns: Option<DataColumnSidecarList<E>>,
attestations: Vec<(SingleAttestation, SubnetId)>,
next_block_attestations: Vec<(SingleAttestation, SubnetId)>,
@@ -341,7 +337,7 @@ impl TestRig {
assert!(beacon_processor.is_ok());
let block = next_block_tuple.0;
let (blob_sidecars, data_columns) = if let Some((kzg_proofs, blobs)) = next_block_tuple.1 {
let data_columns = if let Some((kzg_proofs, blobs)) = next_block_tuple.1 {
if chain.spec.is_peer_das_enabled_for_epoch(block.epoch()) {
let kzg = get_kzg(&chain.spec);
let epoch = block.slot().epoch(E::slots_per_epoch());
@@ -358,20 +354,17 @@ impl TestRig {
.filter(|c| sampling_indices.contains(c.index()))
.collect::<Vec<_>>();
(None, Some(custody_columns))
Some(custody_columns)
} else {
let blob_sidecars =
BlobSidecar::build_sidecars(blobs, &block, kzg_proofs, &chain.spec).unwrap();
(Some(blob_sidecars), None)
None
}
} else {
(None, None)
None
};
Self {
chain,
next_block: block,
next_blobs: blob_sidecars,
next_data_columns: data_columns,
attestations,
next_block_attestations,
@@ -448,20 +441,6 @@ impl TestRig {
.unwrap();
}
pub fn enqueue_single_lookup_rpc_blobs(&self) {
if let Some(blobs) = self.next_blobs.clone() {
let blobs = FixedBlobSidecarList::new(blobs.into_iter().map(Some).collect::<Vec<_>>());
self.network_beacon_processor
.send_rpc_blobs(
self.next_block.canonical_root(),
blobs,
std::time::Duration::default(),
BlockProcessType::SingleBlob { id: 1 },
)
.unwrap();
}
}
pub fn enqueue_single_lookup_rpc_data_columns(&self) {
if let Some(data_columns) = self.next_data_columns.clone() {
self.network_beacon_processor
@@ -1278,7 +1257,6 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod
);
// Send the block and ensure that the attestation is received back and imported.
let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
let mut events = vec![];
match import_method {
@@ -1293,10 +1271,6 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod
BlockImportMethod::Rpc => {
rig.enqueue_lookup_block();
events.push(WorkType::RpcBlock);
if num_blobs > 0 {
rig.enqueue_single_lookup_rpc_blobs();
events.push(WorkType::RpcBlobs);
}
if num_data_columns > 0 {
rig.enqueue_single_lookup_rpc_data_columns();
events.push(WorkType::RpcCustodyColumn);
@@ -1360,7 +1334,6 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod
);
// Send the block and ensure that the attestation is received back and imported.
let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
let mut events = vec![];
match import_method {
@@ -1375,10 +1348,6 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod
BlockImportMethod::Rpc => {
rig.enqueue_lookup_block();
events.push(WorkType::RpcBlock);
if num_blobs > 0 {
rig.enqueue_single_lookup_rpc_blobs();
events.push(WorkType::RpcBlobs);
}
if num_data_columns > 0 {
rig.enqueue_single_lookup_rpc_data_columns();
events.push(WorkType::RpcCustodyColumn);
@@ -1565,19 +1534,13 @@ async fn import_misc_gossip_ops() {
async fn test_rpc_block_reprocessing() {
let mut rig = TestRig::new(SMALL_CHAIN).await;
let next_block_root = rig.next_block.canonical_root();
// Insert the next block into the duplicate cache manually
let handle = rig.duplicate_cache.check_and_insert(next_block_root);
rig.enqueue_single_lookup_block();
rig.assert_event_journal_completes(&[WorkType::RpcBlock])
.await;
let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
if num_blobs > 0 {
rig.enqueue_single_lookup_rpc_blobs();
rig.assert_event_journal_completes(&[WorkType::RpcBlobs])
.await;
}
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
if num_data_columns > 0 {
rig.enqueue_single_lookup_rpc_data_columns();

View File

@@ -340,8 +340,8 @@ impl<T: BeaconChainTypes> Router<T> {
Response::BlobsByRange(blob) => {
self.on_blobs_by_range_response(peer_id, app_request_id, blob);
}
Response::BlobsByRoot(blob) => {
self.on_blobs_by_root_response(peer_id, app_request_id, blob);
Response::BlobsByRoot(_) => {
crit!(%peer_id, "Unexpected BlobsByRoot response; lookup blob requests removed");
}
Response::DataColumnsByRoot(data_column) => {
self.on_data_columns_by_root_response(peer_id, app_request_id, data_column);
@@ -721,40 +721,6 @@ impl<T: BeaconChainTypes> Router<T> {
});
}
/// Handle a `BlobsByRoot` response from the peer.
pub fn on_blobs_by_root_response(
&mut self,
peer_id: PeerId,
app_request_id: AppRequestId,
blob_sidecar: Option<Arc<BlobSidecar<T::EthSpec>>>,
) {
let sync_request_id = match app_request_id {
AppRequestId::Sync(sync_id) => match sync_id {
id @ SyncRequestId::SingleBlob { .. } => id,
other => {
crit!(request = ?other, "BlobsByRoot response on incorrect request");
return;
}
},
AppRequestId::Router => {
crit!(%peer_id, "All BlobsByRoot requests belong to sync");
return;
}
AppRequestId::Internal => unreachable!("Handled internally"),
};
trace!(
%peer_id,
"Received BlobsByRoot Response"
);
self.send_to_sync(SyncMessage::RpcBlob {
sync_request_id,
peer_id,
blob_sidecar,
seen_timestamp: self.chain.slot_clock.now_duration().unwrap_or_default(),
});
}
/// Handle a `DataColumnsByRoot` response from the peer.
pub fn on_data_columns_by_root_response(
&mut self,

View File

@@ -1,9 +1,7 @@
use crate::sync::block_lookups::single_block_lookup::{
LookupRequestError, SingleBlockLookup, SingleLookupRequestState,
};
use crate::sync::block_lookups::{
BlobRequestState, BlockRequestState, CustodyRequestState, PeerId,
};
use crate::sync::block_lookups::{BlockRequestState, CustodyRequestState, PeerId};
use crate::sync::manager::BlockProcessType;
use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
use beacon_chain::BeaconChainTypes;
@@ -11,7 +9,6 @@ use lighthouse_network::service::api_types::Id;
use parking_lot::RwLock;
use std::collections::HashSet;
use std::sync::Arc;
use types::data::FixedBlobSidecarList;
use types::{DataColumnSidecarList, SignedBeaconBlock};
use super::SingleLookupId;
@@ -20,17 +17,16 @@ use super::single_block_lookup::{ComponentRequests, DownloadResult};
#[derive(Debug, Copy, Clone)]
pub enum ResponseType {
Block,
Blob,
CustodyColumn,
}
/// This trait unifies common single block lookup functionality across blocks and blobs. This
/// includes making requests, verifying responses, and handling processing results. A
/// `SingleBlockLookup` includes both a `BlockRequestState` and a `BlobRequestState`, this trait is
/// implemented for each.
/// This trait unifies common single block lookup functionality across blocks and data columns.
/// This includes making requests, verifying responses, and handling processing results. A
/// `SingleBlockLookup` includes both a `BlockRequestState` and a `CustodyRequestState`, this trait
/// is implemented for each.
///
/// The use of the `ResponseType` associated type gives us a degree of type
/// safety when handling a block/blob response ensuring we only mutate the correct corresponding
/// safety when handling a block/column response ensuring we only mutate the correct corresponding
/// state.
pub trait RequestState<T: BeaconChainTypes> {
/// The type created after validation.
@@ -61,7 +57,7 @@ pub trait RequestState<T: BeaconChainTypes> {
/// Returns the `ResponseType` associated with this trait implementation. Useful in logging.
fn response_type() -> ResponseType;
/// A getter for the `BlockRequestState` or `BlobRequestState` associated with this trait.
/// A getter for the `BlockRequestState` or `CustodyRequestState` associated with this trait.
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str>;
/// A getter for a reference to the `SingleLookupRequestState` associated with this trait.
@@ -114,54 +110,6 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
}
}
impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
type VerifiedResponseType = FixedBlobSidecarList<T::EthSpec>;
fn make_request(
&self,
id: Id,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
expected_blobs: usize,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.blob_lookup_request(id, lookup_peers, self.block_root, expected_blobs)
.map_err(LookupRequestError::SendFailedNetwork)
}
fn send_for_processing(
id: Id,
download_result: DownloadResult<Self::VerifiedResponseType>,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
let DownloadResult {
value,
block_root,
seen_timestamp,
..
} = download_result;
cx.send_blobs_for_processing(id, block_root, value, seen_timestamp)
.map_err(LookupRequestError::SendFailedProcessor)
}
fn response_type() -> ResponseType {
ResponseType::Blob
}
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str> {
match &mut request.component_requests {
ComponentRequests::WaitingForBlock => Err("waiting for block"),
ComponentRequests::ActiveBlobRequest(request, _) => Ok(request),
ComponentRequests::ActiveCustodyRequest { .. } => Err("expecting custody request"),
ComponentRequests::NotNeeded { .. } => Err("not needed"),
}
}
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
&self.state
}
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState<Self::VerifiedResponseType> {
&mut self.state
}
}
impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
type VerifiedResponseType = DataColumnSidecarList<T::EthSpec>;
@@ -203,7 +151,6 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str> {
match &mut request.component_requests {
ComponentRequests::WaitingForBlock => Err("waiting for block"),
ComponentRequests::ActiveBlobRequest { .. } => Err("expecting blob request"),
ComponentRequests::ActiveCustodyRequest(request) => Ok(request),
ComponentRequests::NotNeeded { .. } => Err("not needed"),
}

View File

@@ -607,11 +607,14 @@ mod tests {
let mut spec = test_spec::<E>();
spec.deneb_fork_epoch = Some(Epoch::new(0));
// Pin to pre-PeerDAS so this exercises the blob (not custody-column) path under any
// FORK_NAME.
spec.fulu_fork_epoch = None;
let spec = Arc::new(spec);
let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode));
// Assert response is finished and RpcBlocks cannot be constructed, because blobs weren't returned.
// Blobs are no longer required for availability, so the response succeeds without them.
let result = info.responses(da_checker, spec).unwrap();
assert!(result.is_err())
assert!(result.is_ok())
}
#[test]

View File

@@ -9,7 +9,6 @@ use tracing::{Span, debug};
use types::{Hash256, Slot};
pub use blobs_by_range::BlobsByRangeRequestItems;
pub use blobs_by_root::{BlobsByRootRequestItems, BlobsByRootSingleBlockRequest};
pub use blocks_by_range::BlocksByRangeRequestItems;
pub use blocks_by_root::{BlocksByRootRequestItems, BlocksByRootSingleRequest};
pub use data_columns_by_range::DataColumnsByRangeRequestItems;
@@ -25,7 +24,6 @@ use crate::metrics;
use super::{RpcEvent, RpcResponseError, RpcResponseResult};
mod blobs_by_range;
mod blobs_by_root;
mod blocks_by_range;
mod blocks_by_root;
mod data_columns_by_range;

View File

@@ -1,73 +0,0 @@
use lighthouse_network::rpc::methods::BlobsByRootRequest;
use std::sync::Arc;
use types::{BlobSidecar, EthSpec, ForkContext, Hash256, data::BlobIdentifier};
use super::{ActiveRequestItems, LookupVerifyError};
#[derive(Debug, Clone)]
pub struct BlobsByRootSingleBlockRequest {
pub block_root: Hash256,
pub indices: Vec<u64>,
}
impl BlobsByRootSingleBlockRequest {
pub fn into_request(self, spec: &ForkContext) -> Result<BlobsByRootRequest, String> {
BlobsByRootRequest::new(
self.indices
.into_iter()
.map(|index| BlobIdentifier {
block_root: self.block_root,
index,
})
.collect(),
spec,
)
}
}
pub struct BlobsByRootRequestItems<E: EthSpec> {
request: BlobsByRootSingleBlockRequest,
items: Vec<Arc<BlobSidecar<E>>>,
}
impl<E: EthSpec> BlobsByRootRequestItems<E> {
pub fn new(request: BlobsByRootSingleBlockRequest) -> Self {
Self {
request,
items: vec![],
}
}
}
impl<E: EthSpec> ActiveRequestItems for BlobsByRootRequestItems<E> {
type Item = Arc<BlobSidecar<E>>;
/// Appends a chunk to this multi-item request. If all expected chunks are received, this
/// method returns `Some`, resolving the request before the stream terminator.
/// The active request SHOULD be dropped after `add_response` returns an error
fn add(&mut self, blob: Self::Item) -> Result<bool, LookupVerifyError> {
let block_root = blob.block_root();
if self.request.block_root != block_root {
return Err(LookupVerifyError::UnrequestedBlockRoot(block_root));
}
if !blob.verify_blob_sidecar_inclusion_proof() {
return Err(LookupVerifyError::InvalidInclusionProof);
}
if !self.request.indices.contains(&blob.index) {
return Err(LookupVerifyError::UnrequestedIndex(blob.index));
}
if self.items.iter().any(|b| b.index == blob.index) {
return Err(LookupVerifyError::DuplicatedData(blob.slot(), blob.index));
}
self.items.push(blob);
Ok(self.items.len() >= self.request.indices.len())
}
fn consume(&mut self) -> Vec<Self::Item> {
std::mem::take(&mut self.items)
}
}