Remove blob lookup from rewritten arch (align with #9383)

This commit is contained in:
dapplion
2026-06-01 17:49:32 +02:00
parent 5754c38132
commit ad99451e15
6 changed files with 12 additions and 586 deletions

View File

@@ -26,10 +26,7 @@ use lighthouse_network::service::api_types::CustodyBackfillBatchId;
use logging::crit;
use std::sync::Arc;
use std::time::Duration;
use store::KzgCommitment;
use tracing::{debug, debug_span, error, info, instrument, warn};
use types::data::FixedBlobSidecarList;
use types::kzg_ext::format_kzg_commitments;
use types::{BlockImportSource, DataColumnSidecarList, Epoch, Hash256};
/// Id associated to a batch processing request, either a sync batch or a parent lookup.
@@ -251,115 +248,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
drop(handle);
}
/// Returns an async closure which processes a list of blobs received via RPC.
///
/// This separate function was required to prevent a cycle during compiler
/// type checking.
pub fn generate_rpc_blobs_process_fn(
self: Arc<Self>,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
) -> AsyncFn {
let process_fn = async move {
self.clone()
.process_rpc_blobs(block_root, blobs, seen_timestamp, process_type)
.await;
};
Box::pin(process_fn)
}
/// Attempt to process a list of blobs received from a direct RPC request.
#[instrument(
name = "lh_process_rpc_blobs",
parent = None,
level = "debug",
skip_all,
fields(?block_root),
)]
pub async fn process_rpc_blobs(
self: Arc<NetworkBeaconProcessor<T>>,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
) {
let Some(slot) = blobs
.iter()
.find_map(|blob| blob.as_ref().map(|blob| blob.slot()))
else {
return;
};
let (indices, commitments): (Vec<u64>, Vec<KzgCommitment>) = blobs
.iter()
.filter_map(|blob_opt| {
blob_opt
.as_ref()
.map(|blob| (blob.index, blob.kzg_commitment))
})
.unzip();
let commitments = format_kzg_commitments(&commitments);
debug!(
?indices,
%block_root,
%slot,
commitments,
"RPC blobs received"
);
if let Ok(current_slot) = self.chain.slot()
&& current_slot == slot
{
// Note: this metric is useful to gauge how long it takes to receive blobs requested
// over rpc. Since we always send the request for block components at `get_unaggregated_attestation_due() / 2`
// we can use that as a baseline to measure against.
let delay = get_slot_delay_ms(seen_timestamp, slot, &self.chain.slot_clock);
metrics::observe_duration(&metrics::BEACON_BLOB_RPC_SLOT_START_DELAY_TIME, delay);
}
let result = self.chain.process_rpc_blobs(slot, block_root, blobs).await;
register_process_result_metrics(&result, metrics::BlockSource::Rpc, "blobs");
match &result {
Ok(AvailabilityProcessingStatus::Imported(hash)) => {
debug!(
result = "imported block and blobs",
%slot,
block_hash = %hash,
"Block components retrieved"
);
self.chain.recompute_head_at_current_slot().await;
}
Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => {
debug!(
block_hash = %block_root,
%slot,
"Missing components over rpc"
);
}
Err(BlockError::DuplicateFullyImported(_)) => {
debug!(
block_hash = %block_root,
%slot,
"Blobs have already been imported"
);
}
// Errors are handled and logged in `block_lookups`
Err(_) => {}
}
// Sync handles these results
let result = classify_processing_result(result, &process_type);
self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type,
result,
});
}
#[instrument(
name = "lh_process_rpc_custody_columns",
parent = None,
@@ -1126,7 +1014,6 @@ fn classify_processing_result(
// Attributable to the block peer (which is also the data peer pre-Gloas).
let reason = match process_type {
BlockProcessType::SingleBlock { .. } => "lookup_block_processing_failure",
BlockProcessType::SingleBlob { .. } => "lookup_blobs_processing_failure",
BlockProcessType::SingleCustodyColumn(_) => "lookup_custody_column_processing_failure",
// Payload envelopes flow through classify_envelope_result; this branch shouldn't fire,
// but produce a sensible reason in case it ever does.

View File

@@ -39,7 +39,6 @@ use std::sync::Arc;
use std::time::Duration;
use store::Hash256;
use tracing::{debug, error, warn};
use types::data::FixedBlobSidecarList;
use types::{EthSpec, SignedBeaconBlock, SignedExecutionPayloadEnvelope};
pub mod parent_chain;
@@ -73,8 +72,6 @@ const MAX_LOOKUPS: usize = 200;
type BlockDownloadResponse<E> =
Result<(Arc<SignedBeaconBlock<E>>, PeerGroup, Duration), RpcResponseError>;
type BlobDownloadResponse<E> =
Result<(FixedBlobSidecarList<E>, PeerGroup, Duration), RpcResponseError>;
type CustodyDownloadResponse<E> =
Result<(types::DataColumnSidecarList<E>, PeerGroup, Duration), RpcResponseError>;
type PayloadDownloadResponse<E> =
@@ -487,20 +484,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.on_lookup_result(id.lookup_id, result, "block_download_response", cx);
}
pub fn on_blob_download_response(
&mut self,
id: SingleLookupReqId,
response: BlobDownloadResponse<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) {
let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else {
debug!(?id, "Blob returned for single block lookup not present");
return;
};
let result = lookup.on_blob_download_response(id.req_id, response, cx);
self.on_lookup_result(id.lookup_id, result, "blob_download_response", cx);
}
pub fn on_custody_download_response(
&mut self,
id: SingleLookupReqId,
@@ -556,7 +539,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
BlockProcessType::SingleBlock { .. } => {
self.on_block_processing_result(lookup_id, result, cx)
}
BlockProcessType::SingleBlob { .. } | BlockProcessType::SingleCustodyColumn(_) => {
BlockProcessType::SingleCustodyColumn(_) => {
self.on_data_processing_result(lookup_id, result, cx)
}
BlockProcessType::SinglePayloadEnvelope(_) => {

View File

@@ -1,6 +1,6 @@
use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS};
use crate::sync::block_lookups::{
BlobDownloadResponse, BlockDownloadResponse, CustodyDownloadResponse, PayloadDownloadResponse,
BlockDownloadResponse, CustodyDownloadResponse, PayloadDownloadResponse,
};
use crate::sync::manager::{BlockProcessType, BlockProcessingResult};
use crate::sync::network_context::{
@@ -19,7 +19,6 @@ use std::time::{Duration, Instant};
use store::Hash256;
use strum::IntoStaticStr;
use tracing::{Span, debug, debug_span};
use types::data::FixedBlobSidecarList;
use types::{
ChainSpec, DataColumnSidecarList, EthSpec, ExecutionBlockHash, ForkName, SignedBeaconBlock,
SignedExecutionPayloadEnvelope, Slot,
@@ -273,11 +272,6 @@ impl<E: EthSpec> DataRequestState<E> {
/// Fork-dependent data download state
#[derive(Debug)]
enum DataDownload<E: EthSpec> {
Blobs {
block_root: Hash256,
expected_blobs: usize,
state: SingleLookupRequestState<FixedBlobSidecarList<E>>,
},
Columns {
block_root: Hash256,
slot: Slot,
@@ -293,15 +287,6 @@ impl<E: EthSpec> DataDownload<E> {
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
match self {
DataDownload::Blobs {
block_root,
expected_blobs,
state,
} => {
let br = *block_root;
let eb = *expected_blobs;
state.make_request(|| cx.blob_lookup_request(id, peers, br, eb))
}
DataDownload::Columns {
block_root,
slot,
@@ -315,16 +300,12 @@ impl<E: EthSpec> DataDownload<E> {
fn is_completed(&self) -> bool {
match self {
DataDownload::Blobs { state, .. } => state.is_completed(),
DataDownload::Columns { state, .. } => state.is_completed(),
}
}
fn take_download_result(&mut self) -> Option<(DownloadedData<E>, PeerGroup)> {
match self {
DataDownload::Blobs { state, .. } => state
.take_download_result()
.map(|r| (DownloadedData::Blobs(r.value), r.peer_group)),
DataDownload::Columns { state, .. } => state
.take_download_result()
.map(|r| (DownloadedData::Columns(r.value), r.peer_group)),
@@ -333,7 +314,6 @@ impl<E: EthSpec> DataDownload<E> {
fn is_awaiting_event(&self) -> bool {
match self {
DataDownload::Blobs { state, .. } => state.is_awaiting_event(),
DataDownload::Columns { state, .. } => state.is_awaiting_event(),
}
}
@@ -342,7 +322,6 @@ impl<E: EthSpec> DataDownload<E> {
/// Downloaded data, waiting to be sent for processing
#[derive(Debug)]
enum DownloadedData<E: EthSpec> {
Blobs(FixedBlobSidecarList<E>),
Columns(DataColumnSidecarList<E>),
}
@@ -354,9 +333,6 @@ impl<E: EthSpec> DownloadedData<E> {
cx: &mut SyncNetworkContext<T>,
) -> Result<(), SendErrorProcessor> {
match self {
DownloadedData::Blobs(blobs) => {
cx.send_blobs_for_processing(id, block_root, blobs.clone(), Duration::ZERO)
}
DownloadedData::Columns(columns) => cx.send_custody_columns_for_processing(
id,
block_root,
@@ -422,22 +398,12 @@ impl<E: EthSpec> DataRequestState<E> {
let block_fork = spec.fork_name_at_slot::<E>(slot);
match block_fork {
ForkName::Base | ForkName::Altair | ForkName::Bellatrix | ForkName::Capella => {
Self::Complete
}
ForkName::Deneb | ForkName::Electra => {
if expected_blobs > 0 {
Self::Downloading(DataDownload::Blobs {
block_root,
expected_blobs,
state: SingleLookupRequestState::new_with_processing_failures(
failed_processing,
),
})
} else {
Self::Complete
}
}
ForkName::Base
| ForkName::Altair
| ForkName::Bellatrix
| ForkName::Capella
| ForkName::Deneb
| ForkName::Electra => Self::Complete,
ForkName::Fulu => {
if expected_blobs > 0 {
Self::Downloading(DataDownload::Columns {
@@ -1027,26 +993,6 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
self.continue_requests(cx)
}
/// Handle a blob download response. Updates download state and advances the lookup.
pub fn on_blob_download_response(
&mut self,
req_id: ReqId,
result: BlobDownloadResponse<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupResult, LookupRequestError> {
let Some(DataRequest {
state: DataRequestState::Downloading(DataDownload::Blobs { state, .. }),
..
}) = &mut self.data_request
else {
return Err(LookupRequestError::BadState(
"blob response but not downloading blobs".to_owned(),
));
};
state.on_download_response(req_id, self.block_root, result)?;
self.continue_requests(cx)
}
/// Handle a custody columns download response. Updates download state and advances the lookup.
pub fn on_custody_download_response(
&mut self,

View File

@@ -190,7 +190,6 @@ pub enum SyncMessage<E: EthSpec> {
#[derive(Debug, Clone)]
pub enum BlockProcessType {
SingleBlock { id: Id },
SingleBlob { id: Id },
SingleCustodyColumn(Id),
SinglePayloadEnvelope(Id),
}
@@ -199,7 +198,6 @@ impl BlockProcessType {
pub fn id(&self) -> Id {
match self {
BlockProcessType::SingleBlock { id }
| BlockProcessType::SingleBlob { id }
| BlockProcessType::SingleCustodyColumn(id)
| BlockProcessType::SinglePayloadEnvelope(id) => *id,
}
@@ -541,9 +539,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
SyncRequestId::SingleBlock { id } => {
self.on_single_block_response(id, peer_id, RpcEvent::RPCError(error))
}
SyncRequestId::SingleBlob { id } => {
self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error))
}
SyncRequestId::SinglePayloadEnvelope { id } => {
self.on_single_payload_envelope_response(id, peer_id, RpcEvent::RPCError(error))
}
@@ -1213,11 +1208,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
seen_timestamp: Duration,
) {
match sync_request_id {
SyncRequestId::SingleBlob { id } => self.on_single_blob_response(
id,
peer_id,
RpcEvent::from_chunk(blob, seen_timestamp),
),
SyncRequestId::BlobsByRange(id) => self.on_blobs_by_range_response(
id,
peer_id,
@@ -1257,23 +1247,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
fn on_single_blob_response(
&mut self,
id: SingleLookupReqId,
peer_id: PeerId,
blob: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
) {
if let Some(resp) = self.network.on_single_blob_response(id, peer_id, blob) {
self.block_lookups.on_blob_download_response(
id,
resp.map(|(value, seen_timestamp)| {
(value, PeerGroup::from_single(peer_id), seen_timestamp)
}),
&mut self.network,
)
}
}
fn rpc_payload_envelope_received(
&mut self,
sync_request_id: SyncRequestId,

View File

@@ -18,7 +18,6 @@ use crate::status::ToStatusMessage;
use crate::sync::batch::ByRangeRequestType;
use crate::sync::block_lookups::SingleLookupId;
use crate::sync::block_sidecar_coupling::CouplingError;
use crate::sync::network_context::requests::BlobsByRootSingleBlockRequest;
use crate::sync::range_data_column_batch_request::RangeDataColumnBatchRequest;
use beacon_chain::block_verification_types::LookupBlock;
use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock};
@@ -38,8 +37,8 @@ use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSourc
use parking_lot::RwLock;
pub use requests::LookupVerifyError;
use requests::{
ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems,
BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems,
ActiveRequests, BlobsByRangeRequestItems, BlocksByRangeRequestItems, BlocksByRootRequestItems,
DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems,
PayloadEnvelopesByRootRequestItems,
};
#[cfg(test)]
@@ -53,7 +52,6 @@ use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use tracing::{Span, debug, debug_span, error, warn};
use types::data::FixedBlobSidecarList;
use types::{
BlobSidecar, BlockImportSource, ChainSpec, ColumnIndex, DataColumnSidecar,
DataColumnSidecarList, EthSpec, ForkContext, Hash256, SignedBeaconBlock,
@@ -205,8 +203,6 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
/// A mapping of active BlocksByRoot requests, including both current slot and parent lookups.
blocks_by_root_requests:
ActiveRequests<SingleLookupReqId, BlocksByRootRequestItems<T::EthSpec>>,
/// A mapping of active BlobsByRoot requests, including both current slot and parent lookups.
blobs_by_root_requests: ActiveRequests<SingleLookupReqId, BlobsByRootRequestItems<T::EthSpec>>,
/// A mapping of active PayloadEnvelopesByRoot requests
payload_envelopes_by_root_requests:
ActiveRequests<SingleLookupReqId, PayloadEnvelopesByRootRequestItems<T::EthSpec>>,
@@ -302,7 +298,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
execution_engine_state: EngineState::Online, // always assume `Online` at the start
request_id: 1,
blocks_by_root_requests: ActiveRequests::new("blocks_by_root"),
blobs_by_root_requests: ActiveRequests::new("blobs_by_root"),
payload_envelopes_by_root_requests: ActiveRequests::new("payload_envelopes_by_root"),
data_columns_by_root_requests: ActiveRequests::new("data_columns_by_root"),
blocks_by_range_requests: ActiveRequests::new("blocks_by_range"),
@@ -335,7 +330,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
network_send: _,
request_id: _,
blocks_by_root_requests,
blobs_by_root_requests,
payload_envelopes_by_root_requests,
data_columns_by_root_requests,
blocks_by_range_requests,
@@ -356,10 +350,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.active_requests_of_peer(peer_id)
.into_iter()
.map(|id| SyncRequestId::SingleBlock { id: *id });
let blobs_by_root_ids = blobs_by_root_requests
.active_requests_of_peer(peer_id)
.into_iter()
.map(|id| SyncRequestId::SingleBlob { id: *id });
let payload_envelopes_by_root_ids = payload_envelopes_by_root_requests
.active_requests_of_peer(peer_id)
.into_iter()
@@ -381,7 +371,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.into_iter()
.map(|req_id| SyncRequestId::DataColumnsByRange(*req_id));
blocks_by_root_ids
.chain(blobs_by_root_ids)
.chain(payload_envelopes_by_root_ids)
.chain(data_column_by_root_ids)
.chain(blocks_by_range_ids)
@@ -438,7 +427,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
network_send: _,
request_id: _,
blocks_by_root_requests,
blobs_by_root_requests,
payload_envelopes_by_root_requests,
data_columns_by_root_requests,
blocks_by_range_requests,
@@ -461,7 +449,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
for peer_id in blocks_by_root_requests
.iter_request_peers()
.chain(blobs_by_root_requests.iter_request_peers())
.chain(payload_envelopes_by_root_requests.iter_request_peers())
.chain(data_columns_by_root_requests.iter_request_peers())
.chain(blocks_by_range_requests.iter_request_peers())
@@ -1022,109 +1009,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
Ok(LookupRequestResult::RequestSent(id.req_id))
}
/// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking:
/// - If we have a downloaded but not yet processed block
/// - If the da_checker has a pending block
/// - If the da_checker has pending blobs from gossip
///
/// Returns false if no request was made, because we don't need to import (more) blobs.
pub fn blob_lookup_request(
&mut self,
lookup_id: SingleLookupId,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
block_root: Hash256,
expected_blobs: usize,
) -> Result<LookupRequestResult, RpcRequestSendError> {
let active_request_count_by_peer = self.active_request_count_by_peer();
let Some(peer_id) = lookup_peers
.read()
.iter()
.map(|peer| {
(
// Prefer peers with less overall requests
active_request_count_by_peer.get(peer).copied().unwrap_or(0),
// Random factor to break ties, otherwise the PeerID breaks ties
rand::random::<u32>(),
peer,
)
})
.min()
.map(|(_, _, peer)| *peer)
else {
// Allow lookup to not have any peers and do nothing. This is an optimization to not
// lose progress of lookups created from a block with unknown parent before we receive
// attestations for said block.
// Lookup sync event safety: If a lookup requires peers to make progress, and does
// not receive any new peers for some time it will be dropped. If it receives a new
// peer it must attempt to make progress.
return Ok(LookupRequestResult::Pending("no peers"));
};
let imported_blob_indexes = self
.chain
.data_availability_checker
.cached_blob_indexes(&block_root)
.unwrap_or_default();
// Include only the blob indexes not yet imported (received through gossip)
let indices = (0..expected_blobs as u64)
.filter(|index| !imported_blob_indexes.contains(index))
.collect::<Vec<_>>();
if indices.is_empty() {
// No blobs required, do not issue any request
return Ok(LookupRequestResult::NoRequestNeeded("no indices to fetch"));
}
let id = SingleLookupReqId {
lookup_id,
req_id: self.next_id(),
};
let request = BlobsByRootSingleBlockRequest {
block_root,
indices: indices.clone(),
};
// Lookup sync event safety: Refer to `Self::block_lookup_request` `network_send.send` call
let network_request = RequestType::BlobsByRoot(
request
.clone()
.into_request(&self.fork_context)
.map_err(RpcRequestSendError::InternalError)?,
);
self.network_send
.send(NetworkMessage::SendRequest {
peer_id,
request: network_request,
app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlob { id }),
})
.map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?;
debug!(
method = "BlobsByRoot",
?block_root,
blob_indices = ?indices,
peer = %peer_id,
%id,
"Sync RPC request sent"
);
self.blobs_by_root_requests.insert(
id,
peer_id,
// true = enforce max_requests are returned for blobs_by_root. We only issue requests for
// blocks after we know the block has data, and only request peers after they claim to
// have imported the block+blobs.
true,
BlobsByRootRequestItems::new(request),
// Not implemented
Span::none(),
);
Ok(LookupRequestResult::RequestSent(id.req_id))
}
/// Request to send a single `data_columns_by_root` request to the network.
pub fn data_column_lookup_request(
&mut self,
@@ -1527,35 +1411,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.on_rpc_response_result(resp, peer_id)
}
pub(crate) fn on_single_blob_response(
&mut self,
id: SingleLookupReqId,
peer_id: PeerId,
rpc_event: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
) -> Option<RpcResponseResult<FixedBlobSidecarList<T::EthSpec>>> {
let resp = self.blobs_by_root_requests.on_response(id, rpc_event);
let resp = resp.map(|res| {
res.and_then(|(blobs, seen_timestamp)| {
if let Some(max_len) = blobs
.first()
.map(|blob| self.chain.spec.max_blobs_per_block(blob.epoch()) as usize)
{
match to_fixed_blob_sidecar_list(blobs, max_len) {
Ok(blobs) => Ok((blobs, seen_timestamp)),
Err(e) => Err(e.into()),
}
} else {
Err(RpcResponseError::VerifyError(
LookupVerifyError::InternalError(
"Requested blobs for a block that has no blobs".to_string(),
),
))
}
})
});
self.on_rpc_response_result(resp, peer_id)
}
pub(crate) fn on_single_payload_envelope_response(
&mut self,
id: SingleLookupReqId,
@@ -1723,36 +1578,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
})
}
pub fn send_blobs_for_processing(
&self,
id: Id,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
seen_timestamp: Duration,
) -> Result<(), SendErrorProcessor> {
let beacon_processor = self
.beacon_processor_if_enabled()
.ok_or(SendErrorProcessor::ProcessorNotAvailable)?;
debug!(?block_root, ?id, "Sending blobs for processing");
// Lookup sync event safety: If `beacon_processor.send_rpc_blobs` returns Ok() sync
// must receive a single `SyncMessage::BlockComponentProcessed` event with this process type
beacon_processor
.send_rpc_blobs(
block_root,
blobs,
seen_timestamp,
BlockProcessType::SingleBlob { id },
)
.map_err(|e| {
error!(
error = ?e,
"Failed to send sync blobs to processor"
);
SendErrorProcessor::SendError
})
}
pub fn send_payload_for_processing(
&self,
block_root: Hash256,
@@ -1918,7 +1743,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub(crate) fn register_metrics(&self) {
for (id, count) in [
("blocks_by_root", self.blocks_by_root_requests.len()),
("blobs_by_root", self.blobs_by_root_requests.len()),
(
"data_columns_by_root",
self.data_columns_by_root_requests.len(),
@@ -1939,17 +1763,3 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
}
}
fn to_fixed_blob_sidecar_list<E: EthSpec>(
blobs: Vec<Arc<BlobSidecar<E>>>,
max_len: usize,
) -> Result<FixedBlobSidecarList<E>, LookupVerifyError> {
let mut fixed_list = FixedBlobSidecarList::new(vec![None; max_len]);
for blob in blobs.into_iter() {
let index = blob.index as usize;
*fixed_list
.get_mut(index)
.ok_or(LookupVerifyError::UnrequestedIndex(index as u64))? = Some(blob)
}
Ok(fixed_list)
}

View File

@@ -8,13 +8,11 @@ use crate::sync::{
SyncMessage,
manager::{BatchProcessResult, BlockProcessType, BlockProcessingResult, SyncManager},
};
use beacon_chain::blob_verification::KzgVerifiedBlob;
use beacon_chain::block_verification_types::LookupBlock;
use beacon_chain::custody_context::NodeCustodyType;
use beacon_chain::{
AvailabilityProcessingStatus, EngineState, NotifyExecutionLayer,
block_verification_types::{AsBlock, AvailableBlockData},
data_availability_checker::Availability,
test_utils::{
AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, NumBlobs,
generate_rand_block_and_blobs, test_spec,
@@ -36,8 +34,8 @@ use std::time::Duration;
use tokio::sync::mpsc;
use tracing::info;
use types::{
BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, EthSpec, ForkContext, ForkName,
Hash256, MinimalEthSpec as E, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, ForkContext, ForkName, Hash256,
MinimalEthSpec as E, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
};
const D: Duration = Duration::new(0, 0);
@@ -556,52 +554,6 @@ impl TestRig {
self.send_rpc_blocks_response(req_id, peer_id, &blocks);
}
(RequestType::BlobsByRoot(req), AppRequestId::Sync(req_id)) => {
if self.complete_strategy.return_no_data_n_times > 0 {
self.complete_strategy.return_no_data_n_times -= 1;
return self.send_rpc_blobs_response(req_id, peer_id, &[]);
}
let mut blobs = req
.blob_ids
.iter()
.map(|id| {
self.network_blocks_by_root
.get(&id.block_root)
.unwrap_or_else(|| {
panic!("Test consumer requested unknown block: {id:?}")
})
.block_data()
.blobs()
.unwrap_or_else(|| panic!("Block {id:?} has no blobs"))
.iter()
.find(|blob| blob.index == id.index)
.unwrap_or_else(|| panic!("Blob id {id:?} not avail"))
.clone()
})
.collect::<Vec<_>>();
if self.complete_strategy.return_too_few_data_n_times > 0 {
self.complete_strategy.return_too_few_data_n_times -= 1;
blobs.pop();
}
if self
.complete_strategy
.return_wrong_sidecar_for_block_n_times
> 0
{
self.complete_strategy
.return_wrong_sidecar_for_block_n_times -= 1;
let first = blobs.first_mut().expect("empty blobs");
let mut blob = Arc::make_mut(first).clone();
blob.signed_block_header.message.body_root = Hash256::ZERO;
*first = Arc::new(blob);
}
self.send_rpc_blobs_response(req_id, peer_id, &blobs);
}
(RequestType::DataColumnsByRoot(req), AppRequestId::Sync(req_id)) => {
if self.complete_strategy.return_no_data_n_times > 0 {
self.complete_strategy.return_no_data_n_times -= 1;
@@ -1073,48 +1025,6 @@ impl TestRig {
keypair.sk.sign(msg)
}
fn corrupt_last_blob_proposer_signature(&mut self) {
let range_sync_block = self.get_last_block().clone();
let block = range_sync_block.block_cloned();
let mut blobs = range_sync_block
.block_data()
.blobs()
.expect("no blobs")
.into_iter()
.collect::<Vec<_>>();
let columns = range_sync_block.block_data().data_columns();
let first = blobs.first_mut().expect("empty blobs");
Arc::make_mut(first).signed_block_header.signature = self.valid_signature();
let max_blobs =
self.harness
.spec
.max_blobs_per_block(block.slot().epoch(E::slots_per_epoch())) as usize;
let blobs =
types::BlobSidecarList::new(blobs, max_blobs).expect("invalid blob sidecar list");
self.re_insert_block(block, Some(blobs), columns);
}
fn corrupt_last_blob_kzg_proof(&mut self) {
let range_sync_block = self.get_last_block().clone();
let block = range_sync_block.block_cloned();
let mut blobs = range_sync_block
.block_data()
.blobs()
.expect("no blobs")
.into_iter()
.collect::<Vec<_>>();
let columns = range_sync_block.block_data().data_columns();
let first = blobs.first_mut().expect("empty blobs");
Arc::make_mut(first).kzg_proof = kzg::KzgProof::empty();
let max_blobs =
self.harness
.spec
.max_blobs_per_block(block.slot().epoch(E::slots_per_epoch())) as usize;
let blobs =
types::BlobSidecarList::new(blobs, max_blobs).expect("invalid blob sidecar list");
self.re_insert_block(block, Some(blobs), columns);
}
fn corrupt_last_column_proposer_signature(&mut self) {
let range_sync_block = self.get_last_block().clone();
let block = range_sync_block.block_cloned();
@@ -1821,27 +1731,6 @@ impl TestRig {
}
}
fn insert_blob_to_da_checker(&mut self, blob: Arc<BlobSidecar<E>>) {
match self
.harness
.chain
.data_availability_checker
.put_kzg_verified_blobs(
blob.block_root(),
std::iter::once(
KzgVerifiedBlob::new(blob, &self.harness.chain.kzg, Duration::new(0, 0))
.expect("Invalid blob"),
),
)
.unwrap()
{
Availability::Available(_) => panic!("column removed from da_checker, available"),
Availability::MissingComponents(block_root) => {
self.log(&format!("inserted column to da_checker {block_root:?}"))
}
};
}
fn insert_block_to_da_checker_as_pre_execution(&mut self, block: Arc<SignedBeaconBlock<E>>) {
self.log(&format!(
"Inserting block to availability_cache as pre_execution_block {:?}",
@@ -2549,32 +2438,6 @@ async fn block_in_processing_cache_becomes_valid_imported() {
r.assert_no_active_lookups();
}
// IGNORE: wait for change that delays blob fetching to knowing the block
#[tokio::test]
async fn blobs_in_da_checker_skip_download() {
let Some(mut r) = TestRig::new_after_deneb_before_fulu() else {
return;
};
r.build_chain(1).await;
let block = r.get_last_block().clone();
let blobs = block.block_data().blobs().expect("block with no blobs");
for blob in &blobs {
r.insert_blob_to_da_checker(blob.clone());
}
r.trigger_with_last_block();
r.simulate(SimulateConfig::happy_path()).await;
r.assert_successful_lookup_sync();
assert_eq!(
r.requests
.iter()
.filter(|(request, _)| matches!(request, RequestType::BlobsByRoot(_)))
.collect::<Vec<_>>(),
Vec::<&(RequestType<E>, AppRequestId)>::new(),
"There should be no blob requests"
);
}
/// Test that lookups complete when the block is already fully imported.
/// Exercises the `NoRequestNeeded` → `Completed` download state path.
/// Without the fix, `on_completed_request` left the state as `AwaitingDownload`
@@ -2720,42 +2583,6 @@ async fn crypto_on_fail_with_invalid_block_signature() {
}
}
#[tokio::test]
async fn crypto_on_fail_with_bad_blob_proposer_signature() {
let Some(mut r) = TestRig::new_after_deneb_before_fulu() else {
return;
};
r.build_chain(1).await;
r.corrupt_last_blob_proposer_signature();
r.trigger_with_last_block();
r.simulate(SimulateConfig::happy_path()).await;
if cfg!(feature = "fake_crypto") {
r.assert_successful_lookup_sync();
r.assert_no_penalties();
} else {
r.assert_failed_lookup_sync();
r.assert_penalties_of_type("lookup_blobs_processing_failure");
}
}
#[tokio::test]
async fn crypto_on_fail_with_bad_blob_kzg_proof() {
let Some(mut r) = TestRig::new_after_deneb_before_fulu() else {
return;
};
r.build_chain(1).await;
r.corrupt_last_blob_kzg_proof();
r.trigger_with_last_block();
r.simulate(SimulateConfig::happy_path()).await;
if cfg!(feature = "fake_crypto") {
r.assert_successful_lookup_sync();
r.assert_no_penalties();
} else {
r.assert_failed_lookup_sync();
r.assert_penalties_of_type("lookup_blobs_processing_failure");
}
}
#[tokio::test]
async fn crypto_on_fail_with_bad_column_proposer_signature() {
let Some(mut r) = TestRig::new_fulu_peer_test(FuluTestType::WeSupernodeThemSupernode) else {