From 8901c7417d4c306e036cea34482fbc06b8d9b4af Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 27 Aug 2025 11:32:17 +1000 Subject: [PATCH] Notify lookup after gossip data column processing resulted in an import (#7940) When gossip data column processing completes and results in a block import, sync is currently not notified of the successful import. This is inconsistent with how blob processing and block processing both notify sync. This fix ensures lookup sync receives block import notifications when blocks become available through gossip data column. --- .../gossip_methods.rs | 13 ++- .../src/network_beacon_processor/tests.rs | 104 +++++++++++++++++- 2 files changed, 113 insertions(+), 4 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 7d26b42c33..a53e76402e 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1032,7 +1032,7 @@ impl NetworkBeaconProcessor { .await; register_process_result_metrics(&result, metrics::BlockSource::Gossip, "data_column"); - match result { + match &result { Ok(availability) => match availability { AvailabilityProcessingStatus::Imported(block_root) => { info!( @@ -1058,6 +1058,7 @@ impl NetworkBeaconProcessor { // another column arrives it either completes availability or pushes // reconstruction back a bit. let cloned_self = Arc::clone(self); + let block_root = *block_root; let send_result = self.beacon_processor_send.try_send(WorkEvent { drop_during_sync: false, work: Work::Reprocess(ReprocessQueueMessage::DelayColumnReconstruction( @@ -1106,6 +1107,16 @@ impl NetworkBeaconProcessor { ); } } + + // If a block is in the da_checker, sync maybe awaiting for an event when block is finally + // imported. A block can become imported both after processing a block or data column. If a + // importing a block results in `Imported`, notify. Do not notify of data column errors. + if matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))) { + self.send_sync_message(SyncMessage::GossipBlockProcessResult { + block_root, + imported: true, + }); + } } /// Process the beacon block received from the gossip network and: diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 557f9a2914..916548eb9d 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -70,7 +70,7 @@ struct TestRig { beacon_processor_tx: BeaconProcessorSend, work_journal_rx: mpsc::Receiver<&'static str>, network_rx: mpsc::UnboundedReceiver>, - _sync_rx: mpsc::UnboundedReceiver>, + sync_rx: mpsc::UnboundedReceiver>, duplicate_cache: DuplicateCache, network_beacon_processor: Arc>, _harness: BeaconChainHarness, @@ -202,7 +202,7 @@ impl TestRig { beacon_processor_rx, } = BeaconProcessorChannels::new(&beacon_processor_config); - let (sync_tx, _sync_rx) = mpsc::unbounded_channel(); + let (sync_tx, sync_rx) = mpsc::unbounded_channel(); // Default metadata let meta_data = if spec.is_peer_das_scheduled() { @@ -310,7 +310,7 @@ impl TestRig { beacon_processor_tx, work_journal_rx, network_rx, - _sync_rx, + sync_rx, duplicate_cache, network_beacon_processor, _harness: harness, @@ -677,6 +677,45 @@ impl TestRig { Some(events) } } + + /// Listen for sync messages and collect them for a specified duration or until reaching a count. + /// + /// Returns None if no messages were received, or Some(Vec) containing the received messages. + pub async fn receive_sync_messages_with_timeout( + &mut self, + timeout: Duration, + count: Option, + ) -> Option>> { + let mut events = vec![]; + + let timeout_future = tokio::time::sleep(timeout); + tokio::pin!(timeout_future); + + loop { + // Break if we've received the requested count of messages + if let Some(target_count) = count + && events.len() >= target_count + { + break; + } + + tokio::select! { + _ = &mut timeout_future => break, + maybe_msg = self.sync_rx.recv() => { + match maybe_msg { + Some(msg) => events.push(msg), + None => break, // Channel closed + } + } + } + } + + if events.is_empty() { + None + } else { + Some(events) + } + } } fn junk_peer_id() -> PeerId { @@ -1365,3 +1404,62 @@ async fn test_blobs_by_range() { } assert_eq!(blob_count, actual_count); } + +/// Ensure that data column processing that results in block import sends a sync notification +#[tokio::test] +async fn test_data_column_import_notifies_sync() { + if test_spec::().fulu_fork_epoch.is_none() { + return; + } + + let mut rig = TestRig::new(SMALL_CHAIN).await; + let block_root = rig.next_block.canonical_root(); + + // Enqueue the block first to prepare for data column processing + rig.enqueue_gossip_block(); + rig.assert_event_journal_completes(&[WorkType::GossipBlock]) + .await; + rig.receive_sync_messages_with_timeout(Duration::from_millis(100), Some(1)) + .await + .expect("should receive sync message"); + + // Enqueue data columns which should trigger block import when complete + let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0); + if num_data_columns > 0 { + for i in 0..num_data_columns { + rig.enqueue_gossip_data_columns(i); + rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar]) + .await; + } + + // Verify block import succeeded + assert_eq!( + rig.head_root(), + block_root, + "block should be imported and become head" + ); + + // Check that sync was notified of the successful import + let sync_messages = rig + .receive_sync_messages_with_timeout(Duration::from_millis(100), Some(1)) + .await + .expect("should receive sync message"); + + // Verify we received the expected GossipBlockProcessResult message + assert_eq!( + sync_messages.len(), + 1, + "should receive exactly one sync message" + ); + match &sync_messages[0] { + SyncMessage::GossipBlockProcessResult { + block_root: msg_block_root, + imported, + } => { + assert_eq!(*msg_block_root, block_root, "block root should match"); + assert!(*imported, "block should be marked as imported"); + } + other => panic!("expected GossipBlockProcessResult, got {:?}", other), + } + } +}