Simplify reconstruction test assertion

Replace `assert_event_journal_contains_at_least_ordered` helper with an
inline drain that just counts the gossip + reconstruction events. The
helper was carrying around `WORKER_FREED` bookkeeping and a strict
prefix-match for one caller; counting the two relevant work types until
both thresholds are met is the same check with much less code.
This commit is contained in:
dapplion
2026-04-27 09:25:44 +02:00
parent 51e295229b
commit d7000fc0d1

View File

@@ -656,60 +656,6 @@ impl TestRig {
///
/// Given the described logic, `expected` must not contain `WORKER_FREED` or `NOTHING_TO_DO`
/// events.
/// Like [`Self::assert_event_journal_contains_ordered`], but tolerant of extra trailing
/// repetitions of the final expected event. Useful for events the reprocess queue can
/// dispatch redundantly under timing pressure (e.g. reconstruction).
pub async fn assert_event_journal_contains_at_least_ordered(&mut self, expected: &[WorkType]) {
let expected_strs = expected
.iter()
.map(|ev| ev.into())
.collect::<Vec<&'static str>>();
let mut events = Vec::with_capacity(expected_strs.len());
let mut worker_freed_remaining = expected_strs.len();
let drain_future = async {
loop {
match self.work_journal_rx.recv().await {
Some(event) if event == WORKER_FREED => {
worker_freed_remaining = worker_freed_remaining.saturating_sub(1);
if worker_freed_remaining == 0 {
break;
}
}
Some(event) if event == NOTHING_TO_DO => {}
Some(event) => events.push(event),
None => break,
}
}
};
tokio::select! {
_ = tokio::time::sleep(STANDARD_TIMEOUT) => panic!(
"Timeout ({:?}) expired waiting for events. Expected at least {:?} but got {:?} waiting for {} `WORKER_FREED` events.",
STANDARD_TIMEOUT, expected_strs, events, worker_freed_remaining,
),
_ = drain_future => {},
}
// Events must start with the exact expected sequence; trailing events must all be
// repetitions of the final expected event.
assert!(
events.len() >= expected_strs.len(),
"expected at least {} events, got {}: {:?}",
expected_strs.len(),
events.len(),
events,
);
let (head, tail) = events.split_at(expected_strs.len());
assert_eq!(head, expected_strs.as_slice());
let trailing = expected_strs.last().copied().unwrap_or("");
for event in tail {
assert_eq!(*event, trailing, "unexpected trailing event {event:?}");
}
assert_eq!(worker_freed_remaining, 0);
}
pub async fn assert_event_journal_contains_ordered(&mut self, expected: &[WorkType]) {
let expected = expected
.iter()
@@ -1062,17 +1008,30 @@ async fn data_column_reconstruction_at_deadline() {
rig.enqueue_gossip_data_columns(i);
}
// Expect all gossip events followed by at least one reconstruction. Under a slow
// signature backend (real crypto) the reprocess queue can dispatch multiple
// reconstruction work items before the import completes; subsequent ones are no-ops
// via the `reconstruction_started` flag, so we just require >= 1.
let mut expected_events: Vec<WorkType> = (0..min_columns_for_reconstruction)
.map(|_| WorkType::GossipDataColumnSidecar)
.collect();
expected_events.push(WorkType::ColumnReconstruction);
rig.assert_event_journal_contains_at_least_ordered(&expected_events)
.await;
// Drain the journal until we've seen all gossip events plus at least one
// reconstruction. Under real crypto the reprocess queue can dispatch the
// reconstruction work item more than once (the second is a no-op via
// `reconstruction_started`), so we don't pin the count — we just require >= 1.
let gsc: &str = WorkType::GossipDataColumnSidecar.into();
let cr: &str = WorkType::ColumnReconstruction.into();
let (mut gossip_seen, mut recon_seen) = (0usize, 0usize);
let drain = async {
while let Some(event) = rig.work_journal_rx.recv().await {
if event == gsc {
gossip_seen += 1;
} else if event == cr {
recon_seen += 1;
}
if gossip_seen == min_columns_for_reconstruction && recon_seen >= 1 {
break;
}
}
};
if tokio::time::timeout(STANDARD_TIMEOUT, drain).await.is_err() {
panic!("timeout: gossip_seen={gossip_seen}, recon_seen={recon_seen}");
}
assert_eq!(gossip_seen, min_columns_for_reconstruction);
assert!(recon_seen >= 1);
}
// Test the column reconstruction is delayed for columns that arrive for a previous slot.