diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 0c3006b713..d8de50a5c9 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -26,10 +26,7 @@ use lighthouse_network::service::api_types::CustodyBackfillBatchId; use logging::crit; use std::sync::Arc; use std::time::Duration; -use store::KzgCommitment; use tracing::{debug, debug_span, error, info, instrument, warn}; -use types::data::FixedBlobSidecarList; -use types::kzg_ext::format_kzg_commitments; use types::{BlockImportSource, DataColumnSidecarList, Epoch, Hash256}; /// Id associated to a batch processing request, either a sync batch or a parent lookup. @@ -251,115 +248,6 @@ impl NetworkBeaconProcessor { drop(handle); } - /// Returns an async closure which processes a list of blobs received via RPC. - /// - /// This separate function was required to prevent a cycle during compiler - /// type checking. - pub fn generate_rpc_blobs_process_fn( - self: Arc, - block_root: Hash256, - blobs: FixedBlobSidecarList, - seen_timestamp: Duration, - process_type: BlockProcessType, - ) -> AsyncFn { - let process_fn = async move { - self.clone() - .process_rpc_blobs(block_root, blobs, seen_timestamp, process_type) - .await; - }; - Box::pin(process_fn) - } - - /// Attempt to process a list of blobs received from a direct RPC request. - #[instrument( - name = "lh_process_rpc_blobs", - parent = None, - level = "debug", - skip_all, - fields(?block_root), - )] - pub async fn process_rpc_blobs( - self: Arc>, - block_root: Hash256, - blobs: FixedBlobSidecarList, - seen_timestamp: Duration, - process_type: BlockProcessType, - ) { - let Some(slot) = blobs - .iter() - .find_map(|blob| blob.as_ref().map(|blob| blob.slot())) - else { - return; - }; - - let (indices, commitments): (Vec, Vec) = blobs - .iter() - .filter_map(|blob_opt| { - blob_opt - .as_ref() - .map(|blob| (blob.index, blob.kzg_commitment)) - }) - .unzip(); - let commitments = format_kzg_commitments(&commitments); - - debug!( - ?indices, - %block_root, - %slot, - commitments, - "RPC blobs received" - ); - - if let Ok(current_slot) = self.chain.slot() - && current_slot == slot - { - // Note: this metric is useful to gauge how long it takes to receive blobs requested - // over rpc. Since we always send the request for block components at `get_unaggregated_attestation_due() / 2` - // we can use that as a baseline to measure against. - let delay = get_slot_delay_ms(seen_timestamp, slot, &self.chain.slot_clock); - - metrics::observe_duration(&metrics::BEACON_BLOB_RPC_SLOT_START_DELAY_TIME, delay); - } - - let result = self.chain.process_rpc_blobs(slot, block_root, blobs).await; - register_process_result_metrics(&result, metrics::BlockSource::Rpc, "blobs"); - - match &result { - Ok(AvailabilityProcessingStatus::Imported(hash)) => { - debug!( - result = "imported block and blobs", - %slot, - block_hash = %hash, - "Block components retrieved" - ); - self.chain.recompute_head_at_current_slot().await; - } - Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => { - debug!( - block_hash = %block_root, - %slot, - "Missing components over rpc" - ); - } - Err(BlockError::DuplicateFullyImported(_)) => { - debug!( - block_hash = %block_root, - %slot, - "Blobs have already been imported" - ); - } - // Errors are handled and logged in `block_lookups` - Err(_) => {} - } - - // Sync handles these results - let result = classify_processing_result(result, &process_type); - self.send_sync_message(SyncMessage::BlockComponentProcessed { - process_type, - result, - }); - } - #[instrument( name = "lh_process_rpc_custody_columns", parent = None, @@ -1126,7 +1014,6 @@ fn classify_processing_result( // Attributable to the block peer (which is also the data peer pre-Gloas). let reason = match process_type { BlockProcessType::SingleBlock { .. } => "lookup_block_processing_failure", - BlockProcessType::SingleBlob { .. } => "lookup_blobs_processing_failure", BlockProcessType::SingleCustodyColumn(_) => "lookup_custody_column_processing_failure", // Payload envelopes flow through classify_envelope_result; this branch shouldn't fire, // but produce a sensible reason in case it ever does. diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index e9522b1045..058d1a7808 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -39,7 +39,6 @@ use std::sync::Arc; use std::time::Duration; use store::Hash256; use tracing::{debug, error, warn}; -use types::data::FixedBlobSidecarList; use types::{EthSpec, SignedBeaconBlock, SignedExecutionPayloadEnvelope}; pub mod parent_chain; @@ -73,8 +72,6 @@ const MAX_LOOKUPS: usize = 200; type BlockDownloadResponse = Result<(Arc>, PeerGroup, Duration), RpcResponseError>; -type BlobDownloadResponse = - Result<(FixedBlobSidecarList, PeerGroup, Duration), RpcResponseError>; type CustodyDownloadResponse = Result<(types::DataColumnSidecarList, PeerGroup, Duration), RpcResponseError>; type PayloadDownloadResponse = @@ -487,20 +484,6 @@ impl BlockLookups { self.on_lookup_result(id.lookup_id, result, "block_download_response", cx); } - pub fn on_blob_download_response( - &mut self, - id: SingleLookupReqId, - response: BlobDownloadResponse, - cx: &mut SyncNetworkContext, - ) { - let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else { - debug!(?id, "Blob returned for single block lookup not present"); - return; - }; - let result = lookup.on_blob_download_response(id.req_id, response, cx); - self.on_lookup_result(id.lookup_id, result, "blob_download_response", cx); - } - pub fn on_custody_download_response( &mut self, id: SingleLookupReqId, @@ -556,7 +539,7 @@ impl BlockLookups { BlockProcessType::SingleBlock { .. } => { self.on_block_processing_result(lookup_id, result, cx) } - BlockProcessType::SingleBlob { .. } | BlockProcessType::SingleCustodyColumn(_) => { + BlockProcessType::SingleCustodyColumn(_) => { self.on_data_processing_result(lookup_id, result, cx) } BlockProcessType::SinglePayloadEnvelope(_) => { diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 53ddb3b1e2..78e96e238f 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -1,6 +1,6 @@ use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS}; use crate::sync::block_lookups::{ - BlobDownloadResponse, BlockDownloadResponse, CustodyDownloadResponse, PayloadDownloadResponse, + BlockDownloadResponse, CustodyDownloadResponse, PayloadDownloadResponse, }; use crate::sync::manager::{BlockProcessType, BlockProcessingResult}; use crate::sync::network_context::{ @@ -19,7 +19,6 @@ use std::time::{Duration, Instant}; use store::Hash256; use strum::IntoStaticStr; use tracing::{Span, debug, debug_span}; -use types::data::FixedBlobSidecarList; use types::{ ChainSpec, DataColumnSidecarList, EthSpec, ExecutionBlockHash, ForkName, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, @@ -273,11 +272,6 @@ impl DataRequestState { /// Fork-dependent data download state #[derive(Debug)] enum DataDownload { - Blobs { - block_root: Hash256, - expected_blobs: usize, - state: SingleLookupRequestState>, - }, Columns { block_root: Hash256, slot: Slot, @@ -293,15 +287,6 @@ impl DataDownload { cx: &mut SyncNetworkContext, ) -> Result<(), LookupRequestError> { match self { - DataDownload::Blobs { - block_root, - expected_blobs, - state, - } => { - let br = *block_root; - let eb = *expected_blobs; - state.make_request(|| cx.blob_lookup_request(id, peers, br, eb)) - } DataDownload::Columns { block_root, slot, @@ -315,16 +300,12 @@ impl DataDownload { fn is_completed(&self) -> bool { match self { - DataDownload::Blobs { state, .. } => state.is_completed(), DataDownload::Columns { state, .. } => state.is_completed(), } } fn take_download_result(&mut self) -> Option<(DownloadedData, PeerGroup)> { match self { - DataDownload::Blobs { state, .. } => state - .take_download_result() - .map(|r| (DownloadedData::Blobs(r.value), r.peer_group)), DataDownload::Columns { state, .. } => state .take_download_result() .map(|r| (DownloadedData::Columns(r.value), r.peer_group)), @@ -333,7 +314,6 @@ impl DataDownload { fn is_awaiting_event(&self) -> bool { match self { - DataDownload::Blobs { state, .. } => state.is_awaiting_event(), DataDownload::Columns { state, .. } => state.is_awaiting_event(), } } @@ -342,7 +322,6 @@ impl DataDownload { /// Downloaded data, waiting to be sent for processing #[derive(Debug)] enum DownloadedData { - Blobs(FixedBlobSidecarList), Columns(DataColumnSidecarList), } @@ -354,9 +333,6 @@ impl DownloadedData { cx: &mut SyncNetworkContext, ) -> Result<(), SendErrorProcessor> { match self { - DownloadedData::Blobs(blobs) => { - cx.send_blobs_for_processing(id, block_root, blobs.clone(), Duration::ZERO) - } DownloadedData::Columns(columns) => cx.send_custody_columns_for_processing( id, block_root, @@ -422,22 +398,12 @@ impl DataRequestState { let block_fork = spec.fork_name_at_slot::(slot); match block_fork { - ForkName::Base | ForkName::Altair | ForkName::Bellatrix | ForkName::Capella => { - Self::Complete - } - ForkName::Deneb | ForkName::Electra => { - if expected_blobs > 0 { - Self::Downloading(DataDownload::Blobs { - block_root, - expected_blobs, - state: SingleLookupRequestState::new_with_processing_failures( - failed_processing, - ), - }) - } else { - Self::Complete - } - } + ForkName::Base + | ForkName::Altair + | ForkName::Bellatrix + | ForkName::Capella + | ForkName::Deneb + | ForkName::Electra => Self::Complete, ForkName::Fulu => { if expected_blobs > 0 { Self::Downloading(DataDownload::Columns { @@ -1027,26 +993,6 @@ impl SingleBlockLookup { self.continue_requests(cx) } - /// Handle a blob download response. Updates download state and advances the lookup. - pub fn on_blob_download_response( - &mut self, - req_id: ReqId, - result: BlobDownloadResponse, - cx: &mut SyncNetworkContext, - ) -> Result { - let Some(DataRequest { - state: DataRequestState::Downloading(DataDownload::Blobs { state, .. }), - .. - }) = &mut self.data_request - else { - return Err(LookupRequestError::BadState( - "blob response but not downloading blobs".to_owned(), - )); - }; - state.on_download_response(req_id, self.block_root, result)?; - self.continue_requests(cx) - } - /// Handle a custody columns download response. Updates download state and advances the lookup. pub fn on_custody_download_response( &mut self, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index f2cde09dea..c7b6bd5c8c 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -190,7 +190,6 @@ pub enum SyncMessage { #[derive(Debug, Clone)] pub enum BlockProcessType { SingleBlock { id: Id }, - SingleBlob { id: Id }, SingleCustodyColumn(Id), SinglePayloadEnvelope(Id), } @@ -199,7 +198,6 @@ impl BlockProcessType { pub fn id(&self) -> Id { match self { BlockProcessType::SingleBlock { id } - | BlockProcessType::SingleBlob { id } | BlockProcessType::SingleCustodyColumn(id) | BlockProcessType::SinglePayloadEnvelope(id) => *id, } @@ -541,9 +539,6 @@ impl SyncManager { SyncRequestId::SingleBlock { id } => { self.on_single_block_response(id, peer_id, RpcEvent::RPCError(error)) } - SyncRequestId::SingleBlob { id } => { - self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error)) - } SyncRequestId::SinglePayloadEnvelope { id } => { self.on_single_payload_envelope_response(id, peer_id, RpcEvent::RPCError(error)) } @@ -1213,11 +1208,6 @@ impl SyncManager { seen_timestamp: Duration, ) { match sync_request_id { - SyncRequestId::SingleBlob { id } => self.on_single_blob_response( - id, - peer_id, - RpcEvent::from_chunk(blob, seen_timestamp), - ), SyncRequestId::BlobsByRange(id) => self.on_blobs_by_range_response( id, peer_id, @@ -1257,23 +1247,6 @@ impl SyncManager { } } - fn on_single_blob_response( - &mut self, - id: SingleLookupReqId, - peer_id: PeerId, - blob: RpcEvent>>, - ) { - if let Some(resp) = self.network.on_single_blob_response(id, peer_id, blob) { - self.block_lookups.on_blob_download_response( - id, - resp.map(|(value, seen_timestamp)| { - (value, PeerGroup::from_single(peer_id), seen_timestamp) - }), - &mut self.network, - ) - } - } - fn rpc_payload_envelope_received( &mut self, sync_request_id: SyncRequestId, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index a987fd94f6..5e8e68f277 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -18,7 +18,6 @@ use crate::status::ToStatusMessage; use crate::sync::batch::ByRangeRequestType; use crate::sync::block_lookups::SingleLookupId; use crate::sync::block_sidecar_coupling::CouplingError; -use crate::sync::network_context::requests::BlobsByRootSingleBlockRequest; use crate::sync::range_data_column_batch_request::RangeDataColumnBatchRequest; use beacon_chain::block_verification_types::LookupBlock; use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock}; @@ -38,8 +37,8 @@ use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSourc use parking_lot::RwLock; pub use requests::LookupVerifyError; use requests::{ - ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems, - BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, + ActiveRequests, BlobsByRangeRequestItems, BlocksByRangeRequestItems, BlocksByRootRequestItems, + DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, PayloadEnvelopesByRootRequestItems, }; #[cfg(test)] @@ -53,7 +52,6 @@ use std::time::Duration; use task_executor::TaskExecutor; use tokio::sync::mpsc; use tracing::{Span, debug, debug_span, error, warn}; -use types::data::FixedBlobSidecarList; use types::{ BlobSidecar, BlockImportSource, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, ForkContext, Hash256, SignedBeaconBlock, @@ -205,8 +203,6 @@ pub struct SyncNetworkContext { /// A mapping of active BlocksByRoot requests, including both current slot and parent lookups. blocks_by_root_requests: ActiveRequests>, - /// A mapping of active BlobsByRoot requests, including both current slot and parent lookups. - blobs_by_root_requests: ActiveRequests>, /// A mapping of active PayloadEnvelopesByRoot requests payload_envelopes_by_root_requests: ActiveRequests>, @@ -302,7 +298,6 @@ impl SyncNetworkContext { execution_engine_state: EngineState::Online, // always assume `Online` at the start request_id: 1, blocks_by_root_requests: ActiveRequests::new("blocks_by_root"), - blobs_by_root_requests: ActiveRequests::new("blobs_by_root"), payload_envelopes_by_root_requests: ActiveRequests::new("payload_envelopes_by_root"), data_columns_by_root_requests: ActiveRequests::new("data_columns_by_root"), blocks_by_range_requests: ActiveRequests::new("blocks_by_range"), @@ -335,7 +330,6 @@ impl SyncNetworkContext { network_send: _, request_id: _, blocks_by_root_requests, - blobs_by_root_requests, payload_envelopes_by_root_requests, data_columns_by_root_requests, blocks_by_range_requests, @@ -356,10 +350,6 @@ impl SyncNetworkContext { .active_requests_of_peer(peer_id) .into_iter() .map(|id| SyncRequestId::SingleBlock { id: *id }); - let blobs_by_root_ids = blobs_by_root_requests - .active_requests_of_peer(peer_id) - .into_iter() - .map(|id| SyncRequestId::SingleBlob { id: *id }); let payload_envelopes_by_root_ids = payload_envelopes_by_root_requests .active_requests_of_peer(peer_id) .into_iter() @@ -381,7 +371,6 @@ impl SyncNetworkContext { .into_iter() .map(|req_id| SyncRequestId::DataColumnsByRange(*req_id)); blocks_by_root_ids - .chain(blobs_by_root_ids) .chain(payload_envelopes_by_root_ids) .chain(data_column_by_root_ids) .chain(blocks_by_range_ids) @@ -438,7 +427,6 @@ impl SyncNetworkContext { network_send: _, request_id: _, blocks_by_root_requests, - blobs_by_root_requests, payload_envelopes_by_root_requests, data_columns_by_root_requests, blocks_by_range_requests, @@ -461,7 +449,6 @@ impl SyncNetworkContext { for peer_id in blocks_by_root_requests .iter_request_peers() - .chain(blobs_by_root_requests.iter_request_peers()) .chain(payload_envelopes_by_root_requests.iter_request_peers()) .chain(data_columns_by_root_requests.iter_request_peers()) .chain(blocks_by_range_requests.iter_request_peers()) @@ -1022,109 +1009,6 @@ impl SyncNetworkContext { Ok(LookupRequestResult::RequestSent(id.req_id)) } - - /// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking: - /// - If we have a downloaded but not yet processed block - /// - If the da_checker has a pending block - /// - If the da_checker has pending blobs from gossip - /// - /// Returns false if no request was made, because we don't need to import (more) blobs. - pub fn blob_lookup_request( - &mut self, - lookup_id: SingleLookupId, - lookup_peers: Arc>>, - block_root: Hash256, - expected_blobs: usize, - ) -> Result { - let active_request_count_by_peer = self.active_request_count_by_peer(); - let Some(peer_id) = lookup_peers - .read() - .iter() - .map(|peer| { - ( - // Prefer peers with less overall requests - active_request_count_by_peer.get(peer).copied().unwrap_or(0), - // Random factor to break ties, otherwise the PeerID breaks ties - rand::random::(), - peer, - ) - }) - .min() - .map(|(_, _, peer)| *peer) - else { - // Allow lookup to not have any peers and do nothing. This is an optimization to not - // lose progress of lookups created from a block with unknown parent before we receive - // attestations for said block. - // Lookup sync event safety: If a lookup requires peers to make progress, and does - // not receive any new peers for some time it will be dropped. If it receives a new - // peer it must attempt to make progress. - return Ok(LookupRequestResult::Pending("no peers")); - }; - - let imported_blob_indexes = self - .chain - .data_availability_checker - .cached_blob_indexes(&block_root) - .unwrap_or_default(); - // Include only the blob indexes not yet imported (received through gossip) - let indices = (0..expected_blobs as u64) - .filter(|index| !imported_blob_indexes.contains(index)) - .collect::>(); - - if indices.is_empty() { - // No blobs required, do not issue any request - return Ok(LookupRequestResult::NoRequestNeeded("no indices to fetch")); - } - - let id = SingleLookupReqId { - lookup_id, - req_id: self.next_id(), - }; - - let request = BlobsByRootSingleBlockRequest { - block_root, - indices: indices.clone(), - }; - - // Lookup sync event safety: Refer to `Self::block_lookup_request` `network_send.send` call - let network_request = RequestType::BlobsByRoot( - request - .clone() - .into_request(&self.fork_context) - .map_err(RpcRequestSendError::InternalError)?, - ); - self.network_send - .send(NetworkMessage::SendRequest { - peer_id, - request: network_request, - app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlob { id }), - }) - .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; - - debug!( - method = "BlobsByRoot", - ?block_root, - blob_indices = ?indices, - peer = %peer_id, - %id, - "Sync RPC request sent" - ); - - self.blobs_by_root_requests.insert( - id, - peer_id, - // true = enforce max_requests are returned for blobs_by_root. We only issue requests for - // blocks after we know the block has data, and only request peers after they claim to - // have imported the block+blobs. - true, - BlobsByRootRequestItems::new(request), - // Not implemented - Span::none(), - ); - - Ok(LookupRequestResult::RequestSent(id.req_id)) - } - /// Request to send a single `data_columns_by_root` request to the network. pub fn data_column_lookup_request( &mut self, @@ -1527,35 +1411,6 @@ impl SyncNetworkContext { self.on_rpc_response_result(resp, peer_id) } - pub(crate) fn on_single_blob_response( - &mut self, - id: SingleLookupReqId, - peer_id: PeerId, - rpc_event: RpcEvent>>, - ) -> Option>> { - let resp = self.blobs_by_root_requests.on_response(id, rpc_event); - let resp = resp.map(|res| { - res.and_then(|(blobs, seen_timestamp)| { - if let Some(max_len) = blobs - .first() - .map(|blob| self.chain.spec.max_blobs_per_block(blob.epoch()) as usize) - { - match to_fixed_blob_sidecar_list(blobs, max_len) { - Ok(blobs) => Ok((blobs, seen_timestamp)), - Err(e) => Err(e.into()), - } - } else { - Err(RpcResponseError::VerifyError( - LookupVerifyError::InternalError( - "Requested blobs for a block that has no blobs".to_string(), - ), - )) - } - }) - }); - self.on_rpc_response_result(resp, peer_id) - } - pub(crate) fn on_single_payload_envelope_response( &mut self, id: SingleLookupReqId, @@ -1723,36 +1578,6 @@ impl SyncNetworkContext { }) } - pub fn send_blobs_for_processing( - &self, - id: Id, - block_root: Hash256, - blobs: FixedBlobSidecarList, - seen_timestamp: Duration, - ) -> Result<(), SendErrorProcessor> { - let beacon_processor = self - .beacon_processor_if_enabled() - .ok_or(SendErrorProcessor::ProcessorNotAvailable)?; - - debug!(?block_root, ?id, "Sending blobs for processing"); - // Lookup sync event safety: If `beacon_processor.send_rpc_blobs` returns Ok() sync - // must receive a single `SyncMessage::BlockComponentProcessed` event with this process type - beacon_processor - .send_rpc_blobs( - block_root, - blobs, - seen_timestamp, - BlockProcessType::SingleBlob { id }, - ) - .map_err(|e| { - error!( - error = ?e, - "Failed to send sync blobs to processor" - ); - SendErrorProcessor::SendError - }) - } - pub fn send_payload_for_processing( &self, block_root: Hash256, @@ -1918,7 +1743,6 @@ impl SyncNetworkContext { pub(crate) fn register_metrics(&self) { for (id, count) in [ ("blocks_by_root", self.blocks_by_root_requests.len()), - ("blobs_by_root", self.blobs_by_root_requests.len()), ( "data_columns_by_root", self.data_columns_by_root_requests.len(), @@ -1939,17 +1763,3 @@ impl SyncNetworkContext { } } } - -fn to_fixed_blob_sidecar_list( - blobs: Vec>>, - max_len: usize, -) -> Result, LookupVerifyError> { - let mut fixed_list = FixedBlobSidecarList::new(vec![None; max_len]); - for blob in blobs.into_iter() { - let index = blob.index as usize; - *fixed_list - .get_mut(index) - .ok_or(LookupVerifyError::UnrequestedIndex(index as u64))? = Some(blob) - } - Ok(fixed_list) -} diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index a3e0c58c83..e6b81b8971 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -8,13 +8,11 @@ use crate::sync::{ SyncMessage, manager::{BatchProcessResult, BlockProcessType, BlockProcessingResult, SyncManager}, }; -use beacon_chain::blob_verification::KzgVerifiedBlob; use beacon_chain::block_verification_types::LookupBlock; use beacon_chain::custody_context::NodeCustodyType; use beacon_chain::{ AvailabilityProcessingStatus, EngineState, NotifyExecutionLayer, block_verification_types::{AsBlock, AvailableBlockData}, - data_availability_checker::Availability, test_utils::{ AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, NumBlobs, generate_rand_block_and_blobs, test_spec, @@ -36,8 +34,8 @@ use std::time::Duration; use tokio::sync::mpsc; use tracing::info; use types::{ - BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, EthSpec, ForkContext, ForkName, - Hash256, MinimalEthSpec as E, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, + BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, ForkContext, ForkName, Hash256, + MinimalEthSpec as E, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, }; const D: Duration = Duration::new(0, 0); @@ -556,52 +554,6 @@ impl TestRig { self.send_rpc_blocks_response(req_id, peer_id, &blocks); } - (RequestType::BlobsByRoot(req), AppRequestId::Sync(req_id)) => { - if self.complete_strategy.return_no_data_n_times > 0 { - self.complete_strategy.return_no_data_n_times -= 1; - return self.send_rpc_blobs_response(req_id, peer_id, &[]); - } - - let mut blobs = req - .blob_ids - .iter() - .map(|id| { - self.network_blocks_by_root - .get(&id.block_root) - .unwrap_or_else(|| { - panic!("Test consumer requested unknown block: {id:?}") - }) - .block_data() - .blobs() - .unwrap_or_else(|| panic!("Block {id:?} has no blobs")) - .iter() - .find(|blob| blob.index == id.index) - .unwrap_or_else(|| panic!("Blob id {id:?} not avail")) - .clone() - }) - .collect::>(); - - if self.complete_strategy.return_too_few_data_n_times > 0 { - self.complete_strategy.return_too_few_data_n_times -= 1; - blobs.pop(); - } - - if self - .complete_strategy - .return_wrong_sidecar_for_block_n_times - > 0 - { - self.complete_strategy - .return_wrong_sidecar_for_block_n_times -= 1; - let first = blobs.first_mut().expect("empty blobs"); - let mut blob = Arc::make_mut(first).clone(); - blob.signed_block_header.message.body_root = Hash256::ZERO; - *first = Arc::new(blob); - } - - self.send_rpc_blobs_response(req_id, peer_id, &blobs); - } - (RequestType::DataColumnsByRoot(req), AppRequestId::Sync(req_id)) => { if self.complete_strategy.return_no_data_n_times > 0 { self.complete_strategy.return_no_data_n_times -= 1; @@ -1073,48 +1025,6 @@ impl TestRig { keypair.sk.sign(msg) } - fn corrupt_last_blob_proposer_signature(&mut self) { - let range_sync_block = self.get_last_block().clone(); - let block = range_sync_block.block_cloned(); - let mut blobs = range_sync_block - .block_data() - .blobs() - .expect("no blobs") - .into_iter() - .collect::>(); - let columns = range_sync_block.block_data().data_columns(); - let first = blobs.first_mut().expect("empty blobs"); - Arc::make_mut(first).signed_block_header.signature = self.valid_signature(); - let max_blobs = - self.harness - .spec - .max_blobs_per_block(block.slot().epoch(E::slots_per_epoch())) as usize; - let blobs = - types::BlobSidecarList::new(blobs, max_blobs).expect("invalid blob sidecar list"); - self.re_insert_block(block, Some(blobs), columns); - } - - fn corrupt_last_blob_kzg_proof(&mut self) { - let range_sync_block = self.get_last_block().clone(); - let block = range_sync_block.block_cloned(); - let mut blobs = range_sync_block - .block_data() - .blobs() - .expect("no blobs") - .into_iter() - .collect::>(); - let columns = range_sync_block.block_data().data_columns(); - let first = blobs.first_mut().expect("empty blobs"); - Arc::make_mut(first).kzg_proof = kzg::KzgProof::empty(); - let max_blobs = - self.harness - .spec - .max_blobs_per_block(block.slot().epoch(E::slots_per_epoch())) as usize; - let blobs = - types::BlobSidecarList::new(blobs, max_blobs).expect("invalid blob sidecar list"); - self.re_insert_block(block, Some(blobs), columns); - } - fn corrupt_last_column_proposer_signature(&mut self) { let range_sync_block = self.get_last_block().clone(); let block = range_sync_block.block_cloned(); @@ -1821,27 +1731,6 @@ impl TestRig { } } - fn insert_blob_to_da_checker(&mut self, blob: Arc>) { - match self - .harness - .chain - .data_availability_checker - .put_kzg_verified_blobs( - blob.block_root(), - std::iter::once( - KzgVerifiedBlob::new(blob, &self.harness.chain.kzg, Duration::new(0, 0)) - .expect("Invalid blob"), - ), - ) - .unwrap() - { - Availability::Available(_) => panic!("column removed from da_checker, available"), - Availability::MissingComponents(block_root) => { - self.log(&format!("inserted column to da_checker {block_root:?}")) - } - }; - } - fn insert_block_to_da_checker_as_pre_execution(&mut self, block: Arc>) { self.log(&format!( "Inserting block to availability_cache as pre_execution_block {:?}", @@ -2549,32 +2438,6 @@ async fn block_in_processing_cache_becomes_valid_imported() { r.assert_no_active_lookups(); } -// IGNORE: wait for change that delays blob fetching to knowing the block -#[tokio::test] -async fn blobs_in_da_checker_skip_download() { - let Some(mut r) = TestRig::new_after_deneb_before_fulu() else { - return; - }; - r.build_chain(1).await; - let block = r.get_last_block().clone(); - let blobs = block.block_data().blobs().expect("block with no blobs"); - for blob in &blobs { - r.insert_blob_to_da_checker(blob.clone()); - } - r.trigger_with_last_block(); - r.simulate(SimulateConfig::happy_path()).await; - - r.assert_successful_lookup_sync(); - assert_eq!( - r.requests - .iter() - .filter(|(request, _)| matches!(request, RequestType::BlobsByRoot(_))) - .collect::>(), - Vec::<&(RequestType, AppRequestId)>::new(), - "There should be no blob requests" - ); -} - /// Test that lookups complete when the block is already fully imported. /// Exercises the `NoRequestNeeded` → `Completed` download state path. /// Without the fix, `on_completed_request` left the state as `AwaitingDownload` @@ -2720,42 +2583,6 @@ async fn crypto_on_fail_with_invalid_block_signature() { } } -#[tokio::test] -async fn crypto_on_fail_with_bad_blob_proposer_signature() { - let Some(mut r) = TestRig::new_after_deneb_before_fulu() else { - return; - }; - r.build_chain(1).await; - r.corrupt_last_blob_proposer_signature(); - r.trigger_with_last_block(); - r.simulate(SimulateConfig::happy_path()).await; - if cfg!(feature = "fake_crypto") { - r.assert_successful_lookup_sync(); - r.assert_no_penalties(); - } else { - r.assert_failed_lookup_sync(); - r.assert_penalties_of_type("lookup_blobs_processing_failure"); - } -} - -#[tokio::test] -async fn crypto_on_fail_with_bad_blob_kzg_proof() { - let Some(mut r) = TestRig::new_after_deneb_before_fulu() else { - return; - }; - r.build_chain(1).await; - r.corrupt_last_blob_kzg_proof(); - r.trigger_with_last_block(); - r.simulate(SimulateConfig::happy_path()).await; - if cfg!(feature = "fake_crypto") { - r.assert_successful_lookup_sync(); - r.assert_no_penalties(); - } else { - r.assert_failed_lookup_sync(); - r.assert_penalties_of_type("lookup_blobs_processing_failure"); - } -} - #[tokio::test] async fn crypto_on_fail_with_bad_column_proposer_signature() { let Some(mut r) = TestRig::new_fulu_peer_test(FuluTestType::WeSupernodeThemSupernode) else {