diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 7d26b42c33..a53e76402e 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1032,7 +1032,7 @@ impl NetworkBeaconProcessor { .await; register_process_result_metrics(&result, metrics::BlockSource::Gossip, "data_column"); - match result { + match &result { Ok(availability) => match availability { AvailabilityProcessingStatus::Imported(block_root) => { info!( @@ -1058,6 +1058,7 @@ impl NetworkBeaconProcessor { // another column arrives it either completes availability or pushes // reconstruction back a bit. let cloned_self = Arc::clone(self); + let block_root = *block_root; let send_result = self.beacon_processor_send.try_send(WorkEvent { drop_during_sync: false, work: Work::Reprocess(ReprocessQueueMessage::DelayColumnReconstruction( @@ -1106,6 +1107,16 @@ impl NetworkBeaconProcessor { ); } } + + // If a block is in the da_checker, sync maybe awaiting for an event when block is finally + // imported. A block can become imported both after processing a block or data column. If a + // importing a block results in `Imported`, notify. Do not notify of data column errors. + if matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))) { + self.send_sync_message(SyncMessage::GossipBlockProcessResult { + block_root, + imported: true, + }); + } } /// Process the beacon block received from the gossip network and: diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 557f9a2914..916548eb9d 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -70,7 +70,7 @@ struct TestRig { beacon_processor_tx: BeaconProcessorSend, work_journal_rx: mpsc::Receiver<&'static str>, network_rx: mpsc::UnboundedReceiver>, - _sync_rx: mpsc::UnboundedReceiver>, + sync_rx: mpsc::UnboundedReceiver>, duplicate_cache: DuplicateCache, network_beacon_processor: Arc>, _harness: BeaconChainHarness, @@ -202,7 +202,7 @@ impl TestRig { beacon_processor_rx, } = BeaconProcessorChannels::new(&beacon_processor_config); - let (sync_tx, _sync_rx) = mpsc::unbounded_channel(); + let (sync_tx, sync_rx) = mpsc::unbounded_channel(); // Default metadata let meta_data = if spec.is_peer_das_scheduled() { @@ -310,7 +310,7 @@ impl TestRig { beacon_processor_tx, work_journal_rx, network_rx, - _sync_rx, + sync_rx, duplicate_cache, network_beacon_processor, _harness: harness, @@ -677,6 +677,45 @@ impl TestRig { Some(events) } } + + /// Listen for sync messages and collect them for a specified duration or until reaching a count. + /// + /// Returns None if no messages were received, or Some(Vec) containing the received messages. + pub async fn receive_sync_messages_with_timeout( + &mut self, + timeout: Duration, + count: Option, + ) -> Option>> { + let mut events = vec![]; + + let timeout_future = tokio::time::sleep(timeout); + tokio::pin!(timeout_future); + + loop { + // Break if we've received the requested count of messages + if let Some(target_count) = count + && events.len() >= target_count + { + break; + } + + tokio::select! { + _ = &mut timeout_future => break, + maybe_msg = self.sync_rx.recv() => { + match maybe_msg { + Some(msg) => events.push(msg), + None => break, // Channel closed + } + } + } + } + + if events.is_empty() { + None + } else { + Some(events) + } + } } fn junk_peer_id() -> PeerId { @@ -1365,3 +1404,62 @@ async fn test_blobs_by_range() { } assert_eq!(blob_count, actual_count); } + +/// Ensure that data column processing that results in block import sends a sync notification +#[tokio::test] +async fn test_data_column_import_notifies_sync() { + if test_spec::().fulu_fork_epoch.is_none() { + return; + } + + let mut rig = TestRig::new(SMALL_CHAIN).await; + let block_root = rig.next_block.canonical_root(); + + // Enqueue the block first to prepare for data column processing + rig.enqueue_gossip_block(); + rig.assert_event_journal_completes(&[WorkType::GossipBlock]) + .await; + rig.receive_sync_messages_with_timeout(Duration::from_millis(100), Some(1)) + .await + .expect("should receive sync message"); + + // Enqueue data columns which should trigger block import when complete + let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0); + if num_data_columns > 0 { + for i in 0..num_data_columns { + rig.enqueue_gossip_data_columns(i); + rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar]) + .await; + } + + // Verify block import succeeded + assert_eq!( + rig.head_root(), + block_root, + "block should be imported and become head" + ); + + // Check that sync was notified of the successful import + let sync_messages = rig + .receive_sync_messages_with_timeout(Duration::from_millis(100), Some(1)) + .await + .expect("should receive sync message"); + + // Verify we received the expected GossipBlockProcessResult message + assert_eq!( + sync_messages.len(), + 1, + "should receive exactly one sync message" + ); + match &sync_messages[0] { + SyncMessage::GossipBlockProcessResult { + block_root: msg_block_root, + imported, + } => { + assert_eq!(*msg_block_root, block_root, "block root should match"); + assert!(*imported, "block should be marked as imported"); + } + other => panic!("expected GossipBlockProcessResult, got {:?}", other), + } + } +}