From 8e4df4ababf68d7c58973177013d78447b9a208d Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Sat, 6 Jun 2026 01:52:45 +0200 Subject: [PATCH] Simplify lookup sync da_checker oracle (#9428) Implementing gloas lookup sync is currently incompatible with the `GossipBlockProcessResult` mechanism. Today it's implemented such that if we receive a sucessful `GossipBlockProcessResult` we directly mark the lookup as Complete and delete it. In Gloas we can't delete a lookup after block import, as we may still have FULL child awaiting the payload. IMO this `GossipBlockProcessResult` brings a lot of headache and edge cases that we can just live without. Also the `reset_request` business is nasty and can easily leave the lookup in a bad state. If we get rid of `GossipBlockProcessResult` we only pay the following performance penalty: - Lookup is created exactly while the block's payload is being execution validated - (new degradation) we download the block again - send the block for processing but the duplicate cache prevents double execution So in the worst case we spend a few KBs of extra download bandwidth. Remember each block is downloaded 8x times through gossip in the happy case. Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com> Co-Authored-By: Pawan Dhananjay --- .../gossip_methods.rs | 23 ---- .../src/network_beacon_processor/tests.rs | 104 +---------------- .../network/src/sync/block_lookups/mod.rs | 33 ------ .../sync/block_lookups/single_block_lookup.rs | 6 - beacon_node/network/src/sync/manager.rs | 13 --- .../network/src/sync/network_context.rs | 33 ++---- beacon_node/network/src/sync/tests/lookups.rs | 107 ------------------ 7 files changed, 13 insertions(+), 306 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 68b41cab5e..29e43b18c2 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -922,14 +922,6 @@ impl NetworkBeaconProcessor { &metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION, processing_start_time.elapsed().as_millis() as i64, ); - - // If a block is in the da_checker, sync maybe awaiting for an event when block is finally - // imported. A block can become imported both after processing a block or data column. If - // importing a block results in `Imported`, notify. Do not notify of data column errors. - self.send_sync_message(SyncMessage::GossipBlockProcessResult { - block_root, - imported: true, - }); } AvailabilityProcessingStatus::MissingComponents(slot, block_root) => { trace!( @@ -1354,16 +1346,6 @@ impl NetworkBeaconProcessor { // contributing to the partial. } } - - // If a block is in the da_checker, sync maybe awaiting for an event when block is finally - // imported. A block can become imported both after processing a block or data column. If a - // importing a block results in `Imported`, notify. Do not notify of data column errors. - if matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))) { - self.send_sync_message(SyncMessage::GossipBlockProcessResult { - block_root, - imported: true, - }); - } } async fn check_reconstruction_trigger(self: &Arc, slot: Slot, block_root: &Hash256) { @@ -1898,11 +1880,6 @@ impl NetworkBeaconProcessor { if let Err(e) = &result { self.maybe_store_invalid_block(&invalid_block_storage, block_root, &block, e); } - - self.send_sync_message(SyncMessage::GossipBlockProcessResult { - block_root, - imported: matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))), - }); } pub fn process_gossip_voluntary_exit( diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index ad98851532..8ccfe38fa3 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -6,7 +6,7 @@ use crate::{ ChainSegmentProcessId, DuplicateCache, InvalidBlockStorage, NetworkBeaconProcessor, }, service::NetworkMessage, - sync::{SyncMessage, manager::BlockProcessType}, + sync::manager::BlockProcessType, }; use beacon_chain::block_verification_types::LookupBlock; use beacon_chain::custody_context::NodeCustodyType; @@ -76,7 +76,6 @@ struct TestRig { beacon_processor_tx: BeaconProcessorSend, work_journal_rx: mpsc::Receiver<&'static str>, network_rx: mpsc::UnboundedReceiver>, - sync_rx: mpsc::UnboundedReceiver>, duplicate_cache: DuplicateCache, network_beacon_processor: Arc>, _harness: BeaconChainHarness, @@ -270,7 +269,7 @@ impl TestRig { beacon_processor_rx, } = BeaconProcessorChannels::new(&beacon_processor_config); - let (sync_tx, sync_rx) = mpsc::unbounded_channel(); + let (sync_tx, _sync_rx) = mpsc::unbounded_channel(); // Default metadata let meta_data = if spec.is_peer_das_scheduled() { @@ -375,7 +374,6 @@ impl TestRig { beacon_processor_tx, work_journal_rx, network_rx, - sync_rx, duplicate_cache, network_beacon_processor, _harness: harness, @@ -844,45 +842,6 @@ impl TestRig { Some(events) } } - - /// Listen for sync messages and collect them for a specified duration or until reaching a count. - /// - /// Returns None if no messages were received, or Some(Vec) containing the received messages. - pub async fn receive_sync_messages_with_timeout( - &mut self, - timeout: Duration, - count: Option, - ) -> Option>> { - let mut events = vec![]; - - let timeout_future = tokio::time::sleep(timeout); - tokio::pin!(timeout_future); - - loop { - // Break if we've received the requested count of messages - if let Some(target_count) = count - && events.len() >= target_count - { - break; - } - - tokio::select! { - _ = &mut timeout_future => break, - maybe_msg = self.sync_rx.recv() => { - match maybe_msg { - Some(msg) => events.push(msg), - None => break, // Channel closed - } - } - } - } - - if events.is_empty() { - None - } else { - Some(events) - } - } } fn junk_peer_id() -> PeerId { @@ -1862,65 +1821,6 @@ async fn test_blobs_by_root_post_fulu_should_return_empty() { assert_eq!(0, actual_count); } -/// Ensure that data column processing that results in block import sends a sync notification -#[tokio::test] -async fn test_data_column_import_notifies_sync() { - if test_spec::().fulu_fork_epoch.is_none() { - return; - } - - let mut rig = TestRig::new(SMALL_CHAIN).await; - let block_root = rig.next_block.canonical_root(); - - // Enqueue the block first to prepare for data column processing - rig.enqueue_gossip_block(); - rig.assert_event_journal_completes(&[WorkType::GossipBlock]) - .await; - rig.receive_sync_messages_with_timeout(Duration::from_millis(100), Some(1)) - .await - .expect("should receive sync message"); - - // Enqueue data columns which should trigger block import when complete - let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0); - if num_data_columns > 0 { - for i in 0..num_data_columns { - rig.enqueue_gossip_data_columns(i); - rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar]) - .await; - } - - // Verify block import succeeded - assert_eq!( - rig.head_root(), - block_root, - "block should be imported and become head" - ); - - // Check that sync was notified of the successful import - let sync_messages = rig - .receive_sync_messages_with_timeout(Duration::from_millis(100), Some(1)) - .await - .expect("should receive sync message"); - - // Verify we received the expected GossipBlockProcessResult message - assert_eq!( - sync_messages.len(), - 1, - "should receive exactly one sync message" - ); - match &sync_messages[0] { - SyncMessage::GossipBlockProcessResult { - block_root: msg_block_root, - imported, - } => { - assert_eq!(*msg_block_root, block_root, "block root should match"); - assert!(*imported, "block should be marked as imported"); - } - other => panic!("expected GossipBlockProcessResult, got {:?}", other), - } - } -} - #[tokio::test] async fn test_data_columns_by_range_request_only_returns_requested_columns() { if test_spec::().fulu_fork_epoch.is_none() { diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index a265373e3f..0cbeb5ee4e 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -482,39 +482,6 @@ impl BlockLookups { self.on_lookup_result(lookup_id, lookup_result, "processing_result", cx); } - pub fn on_external_processing_result( - &mut self, - block_root: Hash256, - imported: bool, - cx: &mut SyncNetworkContext, - ) { - let Some((id, lookup)) = self - .single_block_lookups - .iter_mut() - .find(|(_, lookup)| lookup.is_for_block(block_root)) - else { - // Ok to ignore gossip process events - return; - }; - - let lookup_result = if imported { - Ok(LookupResult::Completed) - } else { - // A lookup may be in the following state: - // - Block awaiting processing from a different source - // - Blobs downloaded processed, and inserted into the da_checker - // - // At this point the block fails processing (e.g. execution engine offline) and it is - // removed from the da_checker. Note that ALL components are removed from the da_checker - // so when we re-download and process the block we get the error - // MissingComponentsAfterAllProcessed and get stuck. - lookup.reset_requests(); - lookup.continue_requests(cx) - }; - let id = *id; - self.on_lookup_result(id, lookup_result, "external_processing_result", cx); - } - /// Makes progress on the immediate children of `block_root` pub fn continue_child_lookups(&mut self, block_root: Hash256, cx: &mut SyncNetworkContext) { let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 8eb58da4e6..157da5d806 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -139,12 +139,6 @@ impl SingleBlockLookup { } } - /// Reset the status of all requests (used on block processing failure) - pub fn reset_requests(&mut self) { - self.block_request = BlockRequest::new(); - self.data_request = DataRequest::WaitingForBlock; - } - /// Return the slot of this lookup's block if it's currently cached pub fn peek_downloaded_block_slot(&self) -> Option { self.block_request diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 166c65b6e1..66bb13ae98 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -182,11 +182,6 @@ pub enum SyncMessage { process_type: BlockProcessType, result: BlockProcessingResult, }, - - /// A gossip-received component has completed processing and the block may now be imported. - /// In Fulu this is sent after block or blob processing. In Gloas this is also sent after - /// data column or payload envelope processing triggers availability. - GossipBlockProcessResult { block_root: Hash256, imported: bool }, } /// The type of processing specified for a received block. @@ -907,14 +902,6 @@ impl SyncManager { } => self .block_lookups .on_processing_result(process_type, result, &mut self.network), - SyncMessage::GossipBlockProcessResult { - block_root, - imported, - } => self.block_lookups.on_external_processing_result( - block_root, - imported, - &mut self.network, - ), SyncMessage::BatchProcessed { sync_type, result } => match sync_type { ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => { self.range_sync.handle_block_process_result( diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 1e35c0a72f..dfeb8d8f12 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -53,8 +53,8 @@ use task_executor::TaskExecutor; use tokio::sync::mpsc; use tracing::{Span, debug, debug_span, error, warn}; use types::{ - BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, - ForkContext, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, + BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, ForkContext, + Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, }; pub mod custody; @@ -849,26 +849,15 @@ impl SyncNetworkContext { match self.chain.get_block_process_status(&block_root) { // Unknown block, continue request to download BlockProcessStatus::Unknown => {} - // Block is known and currently processing. Imports from gossip and HTTP API insert the - // block in the da_cache. However, HTTP API is unable to notify sync when it completes - // block import. Returning `Pending` here will result in stuck lookups if the block is - // importing from sync. - BlockProcessStatus::NotValidated(_, source) => match source { - BlockImportSource::Gossip => { - // Lookup sync event safety: If the block is currently in the processing cache, we - // are guaranteed to receive a `SyncMessage::GossipBlockProcessResult` that will - // make progress on this lookup - return Ok(LookupRequestResult::Pending("block in processing cache")); - } - BlockImportSource::Lookup - | BlockImportSource::RangeSync - | BlockImportSource::HttpApi => { - // Lookup, RangeSync or HttpApi block import don't emit the GossipBlockProcessResult - // event. If a lookup happens to be created during block import from one of - // those sources just import the block twice. Otherwise the lookup will get - // stuck. Double imports are fine, they just waste resources. - } - }, + // Block is known but processing. The block may turn out to be invalid, so we want sync to + // NOT mark the request as complete yet. The ideal flow would be: + // - Wait for processing to complete + // - Only if there is an error re-download and re-process + // But implementing this introduces complexity and the risk for the lookup to get stuck. + // Instead we always re-download the block eagerly and de-duplicate the processing. So in + // the happy case we just download the block again if the lookup is created while execution + // processing the block. + BlockProcessStatus::NotValidated(..) => {} // Block is fully validated. If it's not yet imported it's waiting for missing block // components. Consider this request completed and do nothing. BlockProcessStatus::ExecutionValidated(block) => { diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 5642f7846a..91227d77f8 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -1235,12 +1235,6 @@ impl TestRig { self.assert_empty_network(); } - fn assert_pending_lookup_sync(&self) { - assert!(self.created_lookups() > 0, "no created lookups"); - assert_eq!(self.dropped_lookups(), 0, "some dropped lookups"); - assert_eq!(self.completed_lookups(), 0, "some completed lookups"); - } - /// Assert there is at least one range sync chain created and that all sync chains completed pub(super) fn assert_successful_range_sync(&self) { assert!( @@ -1330,15 +1324,6 @@ impl TestRig { genesis_fork().fulu_enabled().then(Self::default) } - fn new_after_deneb_before_fulu() -> Option { - let fork = genesis_fork(); - if fork.deneb_enabled() && !fork.fulu_enabled() { - Some(Self::default()) - } else { - None - } - } - pub fn new_fulu_peer_test(fulu_test_type: FuluTestType) -> Option { genesis_fork().fulu_enabled().then(|| { Self::new(TestRigConfig { @@ -1673,56 +1658,6 @@ impl TestRig { } } - fn insert_block_to_da_checker_as_pre_execution(&mut self, block: Arc>) { - self.log(&format!( - "Inserting block to availability_cache as pre_execution_block {:?}", - block.canonical_root() - )); - self.harness - .chain - .data_availability_checker - .put_pre_execution_block(block.canonical_root(), block, BlockImportSource::Gossip) - .unwrap(); - } - - fn simulate_block_gossip_processing_becomes_invalid(&mut self, block_root: Hash256) { - self.log(&format!( - "Marking block {block_root:?} in da_checker as execution error" - )); - self.harness - .chain - .data_availability_checker - .remove_block_on_execution_error(&block_root); - - self.send_sync_message(SyncMessage::GossipBlockProcessResult { - block_root, - imported: false, - }); - } - - async fn simulate_block_gossip_processing_becomes_valid( - &mut self, - block: Arc>, - ) { - let block_root = block.canonical_root(); - - match self.import_block_to_da_checker(block).await { - AvailabilityProcessingStatus::Imported(block_root) => { - self.log(&format!( - "insert block to da_checker and it imported {block_root:?}" - )); - } - AvailabilityProcessingStatus::MissingComponents(_, _) => { - panic!("block not imported after adding to da_checker"); - } - } - - self.send_sync_message(SyncMessage::GossipBlockProcessResult { - block_root, - imported: false, - }); - } - fn requests_count(&self) -> HashMap<&'static str, usize> { let mut requests_count = HashMap::new(); for (request, _) in &self.requests { @@ -2294,48 +2229,6 @@ async fn block_in_da_checker_skips_download() { ); } -#[tokio::test] -async fn block_in_processing_cache_becomes_invalid() { - let Some(mut r) = TestRig::new_after_deneb_before_fulu() else { - return; - }; - r.build_chain(1).await; - let block = r.block_at_slot(1); - r.insert_block_to_da_checker_as_pre_execution(block.clone()); - r.trigger_with_last_block(); - r.simulate(SimulateConfig::happy_path()).await; - r.assert_pending_lookup_sync(); - // Here the only active lookup is waiting for the block to finish processing - - // Simulate invalid block, removing it from processing cache - r.simulate_block_gossip_processing_becomes_invalid(block.canonical_root()); - // Should download block, then issue blobs request - r.simulate(SimulateConfig::happy_path()).await; - r.assert_successful_lookup_sync(); -} - -#[tokio::test] -async fn block_in_processing_cache_becomes_valid_imported() { - let Some(mut r) = TestRig::new_after_deneb_before_fulu() else { - return; - }; - r.build_chain(1).await; - let block = r.block_at_slot(1); - r.insert_block_to_da_checker_as_pre_execution(block.clone()); - r.trigger_with_last_block(); - r.simulate(SimulateConfig::happy_path()).await; - r.assert_pending_lookup_sync(); - // Here the only active lookup is waiting for the block to finish processing - - // Resolve the block from processing step - r.simulate_block_gossip_processing_becomes_valid(block) - .await; - // Should not trigger block or blob request - r.assert_empty_network(); - // Resolve blob and expect lookup completed - r.assert_no_active_lookups(); -} - macro_rules! fulu_peer_matrix_tests { ( [$($name:ident => $variant:expr),+ $(,)?]