diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 367fc3ccb3..2a7542e73b 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -656,60 +656,6 @@ impl TestRig { /// /// Given the described logic, `expected` must not contain `WORKER_FREED` or `NOTHING_TO_DO` /// events. - /// Like [`Self::assert_event_journal_contains_ordered`], but tolerant of extra trailing - /// repetitions of the final expected event. Useful for events the reprocess queue can - /// dispatch redundantly under timing pressure (e.g. reconstruction). - pub async fn assert_event_journal_contains_at_least_ordered(&mut self, expected: &[WorkType]) { - let expected_strs = expected - .iter() - .map(|ev| ev.into()) - .collect::>(); - - let mut events = Vec::with_capacity(expected_strs.len()); - let mut worker_freed_remaining = expected_strs.len(); - - let drain_future = async { - loop { - match self.work_journal_rx.recv().await { - Some(event) if event == WORKER_FREED => { - worker_freed_remaining = worker_freed_remaining.saturating_sub(1); - if worker_freed_remaining == 0 { - break; - } - } - Some(event) if event == NOTHING_TO_DO => {} - Some(event) => events.push(event), - None => break, - } - } - }; - - tokio::select! { - _ = tokio::time::sleep(STANDARD_TIMEOUT) => panic!( - "Timeout ({:?}) expired waiting for events. Expected at least {:?} but got {:?} waiting for {} `WORKER_FREED` events.", - STANDARD_TIMEOUT, expected_strs, events, worker_freed_remaining, - ), - _ = drain_future => {}, - } - - // Events must start with the exact expected sequence; trailing events must all be - // repetitions of the final expected event. - assert!( - events.len() >= expected_strs.len(), - "expected at least {} events, got {}: {:?}", - expected_strs.len(), - events.len(), - events, - ); - let (head, tail) = events.split_at(expected_strs.len()); - assert_eq!(head, expected_strs.as_slice()); - let trailing = expected_strs.last().copied().unwrap_or(""); - for event in tail { - assert_eq!(*event, trailing, "unexpected trailing event {event:?}"); - } - assert_eq!(worker_freed_remaining, 0); - } - pub async fn assert_event_journal_contains_ordered(&mut self, expected: &[WorkType]) { let expected = expected .iter() @@ -1062,17 +1008,30 @@ async fn data_column_reconstruction_at_deadline() { rig.enqueue_gossip_data_columns(i); } - // Expect all gossip events followed by at least one reconstruction. Under a slow - // signature backend (real crypto) the reprocess queue can dispatch multiple - // reconstruction work items before the import completes; subsequent ones are no-ops - // via the `reconstruction_started` flag, so we just require >= 1. - let mut expected_events: Vec = (0..min_columns_for_reconstruction) - .map(|_| WorkType::GossipDataColumnSidecar) - .collect(); - expected_events.push(WorkType::ColumnReconstruction); - - rig.assert_event_journal_contains_at_least_ordered(&expected_events) - .await; + // Drain the journal until we've seen all gossip events plus at least one + // reconstruction. Under real crypto the reprocess queue can dispatch the + // reconstruction work item more than once (the second is a no-op via + // `reconstruction_started`), so we don't pin the count — we just require >= 1. + let gsc: &str = WorkType::GossipDataColumnSidecar.into(); + let cr: &str = WorkType::ColumnReconstruction.into(); + let (mut gossip_seen, mut recon_seen) = (0usize, 0usize); + let drain = async { + while let Some(event) = rig.work_journal_rx.recv().await { + if event == gsc { + gossip_seen += 1; + } else if event == cr { + recon_seen += 1; + } + if gossip_seen == min_columns_for_reconstruction && recon_seen >= 1 { + break; + } + } + }; + if tokio::time::timeout(STANDARD_TIMEOUT, drain).await.is_err() { + panic!("timeout: gossip_seen={gossip_seen}, recon_seen={recon_seen}"); + } + assert_eq!(gossip_seen, min_columns_for_reconstruction); + assert!(recon_seen >= 1); } // Test the column reconstruction is delayed for columns that arrive for a previous slot.