Notify lookup after gossip data column processing resulted in an import (#7940)

When gossip data column processing completes and results in a block import, sync is currently not notified of the successful import. This is inconsistent with how blob processing and block processing both notify sync.

This fix ensures lookup sync receives block import notifications when blocks become available through gossip data column.
This commit is contained in:
Jimmy Chen
2025-08-27 11:32:17 +10:00
committed by GitHub
parent 3e78034de6
commit 8901c7417d
2 changed files with 113 additions and 4 deletions

View File

@@ -1032,7 +1032,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.await;
register_process_result_metrics(&result, metrics::BlockSource::Gossip, "data_column");
match result {
match &result {
Ok(availability) => match availability {
AvailabilityProcessingStatus::Imported(block_root) => {
info!(
@@ -1058,6 +1058,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// another column arrives it either completes availability or pushes
// reconstruction back a bit.
let cloned_self = Arc::clone(self);
let block_root = *block_root;
let send_result = self.beacon_processor_send.try_send(WorkEvent {
drop_during_sync: false,
work: Work::Reprocess(ReprocessQueueMessage::DelayColumnReconstruction(
@@ -1106,6 +1107,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
}
}
// If a block is in the da_checker, sync maybe awaiting for an event when block is finally
// imported. A block can become imported both after processing a block or data column. If a
// importing a block results in `Imported`, notify. Do not notify of data column errors.
if matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))) {
self.send_sync_message(SyncMessage::GossipBlockProcessResult {
block_root,
imported: true,
});
}
}
/// Process the beacon block received from the gossip network and:

View File

@@ -70,7 +70,7 @@ struct TestRig {
beacon_processor_tx: BeaconProcessorSend<E>,
work_journal_rx: mpsc::Receiver<&'static str>,
network_rx: mpsc::UnboundedReceiver<NetworkMessage<E>>,
_sync_rx: mpsc::UnboundedReceiver<SyncMessage<E>>,
sync_rx: mpsc::UnboundedReceiver<SyncMessage<E>>,
duplicate_cache: DuplicateCache,
network_beacon_processor: Arc<NetworkBeaconProcessor<T>>,
_harness: BeaconChainHarness<T>,
@@ -202,7 +202,7 @@ impl TestRig {
beacon_processor_rx,
} = BeaconProcessorChannels::new(&beacon_processor_config);
let (sync_tx, _sync_rx) = mpsc::unbounded_channel();
let (sync_tx, sync_rx) = mpsc::unbounded_channel();
// Default metadata
let meta_data = if spec.is_peer_das_scheduled() {
@@ -310,7 +310,7 @@ impl TestRig {
beacon_processor_tx,
work_journal_rx,
network_rx,
_sync_rx,
sync_rx,
duplicate_cache,
network_beacon_processor,
_harness: harness,
@@ -677,6 +677,45 @@ impl TestRig {
Some(events)
}
}
/// Listen for sync messages and collect them for a specified duration or until reaching a count.
///
/// Returns None if no messages were received, or Some(Vec) containing the received messages.
pub async fn receive_sync_messages_with_timeout(
&mut self,
timeout: Duration,
count: Option<usize>,
) -> Option<Vec<SyncMessage<E>>> {
let mut events = vec![];
let timeout_future = tokio::time::sleep(timeout);
tokio::pin!(timeout_future);
loop {
// Break if we've received the requested count of messages
if let Some(target_count) = count
&& events.len() >= target_count
{
break;
}
tokio::select! {
_ = &mut timeout_future => break,
maybe_msg = self.sync_rx.recv() => {
match maybe_msg {
Some(msg) => events.push(msg),
None => break, // Channel closed
}
}
}
}
if events.is_empty() {
None
} else {
Some(events)
}
}
}
fn junk_peer_id() -> PeerId {
@@ -1365,3 +1404,62 @@ async fn test_blobs_by_range() {
}
assert_eq!(blob_count, actual_count);
}
/// Ensure that data column processing that results in block import sends a sync notification
#[tokio::test]
async fn test_data_column_import_notifies_sync() {
if test_spec::<E>().fulu_fork_epoch.is_none() {
return;
}
let mut rig = TestRig::new(SMALL_CHAIN).await;
let block_root = rig.next_block.canonical_root();
// Enqueue the block first to prepare for data column processing
rig.enqueue_gossip_block();
rig.assert_event_journal_completes(&[WorkType::GossipBlock])
.await;
rig.receive_sync_messages_with_timeout(Duration::from_millis(100), Some(1))
.await
.expect("should receive sync message");
// Enqueue data columns which should trigger block import when complete
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
if num_data_columns > 0 {
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar])
.await;
}
// Verify block import succeeded
assert_eq!(
rig.head_root(),
block_root,
"block should be imported and become head"
);
// Check that sync was notified of the successful import
let sync_messages = rig
.receive_sync_messages_with_timeout(Duration::from_millis(100), Some(1))
.await
.expect("should receive sync message");
// Verify we received the expected GossipBlockProcessResult message
assert_eq!(
sync_messages.len(),
1,
"should receive exactly one sync message"
);
match &sync_messages[0] {
SyncMessage::GossipBlockProcessResult {
block_root: msg_block_root,
imported,
} => {
assert_eq!(*msg_block_root, block_root, "block root should match");
assert!(*imported, "block should be marked as imported");
}
other => panic!("expected GossipBlockProcessResult, got {:?}", other),
}
}
}