Add max delay to reconstruction (#7976)

#7697


  If we're three seconds into the current slot just trigger reconstruction. I don't know what the correct reconstruction deadline number is, but it should probably be at least half a second before the attestation deadline


Co-Authored-By: Eitan Seri- Levi <eserilev@gmail.com>

Co-Authored-By: Eitan Seri-Levi <eserilev@ucsc.edu>
This commit is contained in:
Eitan Seri-Levi
2025-09-11 23:05:42 -07:00
committed by GitHub
parent fb77ce9e19
commit aef8291f94
3 changed files with 253 additions and 6 deletions

View File

@@ -82,6 +82,9 @@ pub const BACKFILL_SCHEDULE_IN_SLOT: [(u32, u32); 3] = [
(4, 5),
];
/// Trigger reconstruction if we are this many seconds into the current slot
pub const RECONSTRUCTION_DEADLINE: Duration = Duration::from_millis(3000);
/// Messages that the scheduler can receive.
#[derive(AsRefStr)]
pub enum ReprocessQueueMessage {
@@ -172,6 +175,7 @@ pub struct QueuedBackfillBatch(pub AsyncFn);
pub struct QueuedColumnReconstruction {
pub block_root: Hash256,
pub slot: Slot,
pub process_fn: AsyncFn,
}
@@ -749,16 +753,26 @@ impl<S: SlotClock> ReprocessQueue<S> {
}
}
InboundEvent::Msg(DelayColumnReconstruction(request)) => {
let mut reconstruction_delay = QUEUED_RECONSTRUCTION_DELAY;
if let Some(seconds_from_current_slot) =
self.slot_clock.seconds_from_current_slot_start()
&& let Some(current_slot) = self.slot_clock.now()
&& seconds_from_current_slot >= RECONSTRUCTION_DEADLINE
&& current_slot == request.slot
{
// If we are at least `RECONSTRUCTION_DEADLINE` seconds into the current slot,
// and the reconstruction request is for the current slot, process reconstruction immediately.
reconstruction_delay = Duration::from_secs(0);
}
match self.queued_column_reconstructions.entry(request.block_root) {
Entry::Occupied(key) => {
// Push back the reattempted reconstruction
self.column_reconstructions_delay_queue
.reset(key.get(), QUEUED_RECONSTRUCTION_DELAY)
.reset(key.get(), reconstruction_delay);
}
Entry::Vacant(vacant) => {
let delay_key = self
.column_reconstructions_delay_queue
.insert(request, QUEUED_RECONSTRUCTION_DELAY);
.insert(request, reconstruction_delay);
vacant.insert(delay_key);
}
}

View File

@@ -1064,6 +1064,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
work: Work::Reprocess(ReprocessQueueMessage::DelayColumnReconstruction(
QueuedColumnReconstruction {
block_root,
slot: *slot,
process_fn: Box::pin(async move {
cloned_self
.attempt_data_column_reconstruction(block_root, true)

View File

@@ -94,12 +94,20 @@ impl TestRig {
// This allows for testing voluntary exits without building out a massive chain.
let mut spec = test_spec::<E>();
spec.shard_committee_period = 2;
Self::new_parametric(chain_length, BeaconProcessorConfig::default(), spec).await
Self::new_parametric(chain_length, BeaconProcessorConfig::default(), false, spec).await
}
pub async fn new_supernode(chain_length: u64) -> Self {
// This allows for testing voluntary exits without building out a massive chain.
let mut spec = test_spec::<E>();
spec.shard_committee_period = 2;
Self::new_parametric(chain_length, BeaconProcessorConfig::default(), true, spec).await
}
pub async fn new_parametric(
chain_length: u64,
beacon_processor_config: BeaconProcessorConfig,
import_data_columns: bool,
spec: ChainSpec,
) -> Self {
let spec = Arc::new(spec);
@@ -108,6 +116,7 @@ impl TestRig {
.deterministic_keypairs(VALIDATOR_COUNT)
.fresh_ephemeral_store()
.mock_execution_layer()
.import_all_data_columns(import_data_columns)
.chain_config(<_>::default())
.build();
@@ -601,6 +610,40 @@ impl TestRig {
.await
}
pub async fn assert_event_journal_completes_with_timeout(
&mut self,
expected: &[WorkType],
timeout: Duration,
) {
self.assert_event_journal_with_timeout(
&expected
.iter()
.map(Into::<&'static str>::into)
.chain(std::iter::once(WORKER_FREED))
.chain(std::iter::once(NOTHING_TO_DO))
.collect::<Vec<_>>(),
timeout,
)
.await
}
pub async fn assert_event_journal_does_not_complete_with_timeout(
&mut self,
expected: &[WorkType],
timeout: Duration,
) {
self.assert_not_in_event_journal_with_timeout(
&expected
.iter()
.map(Into::<&'static str>::into)
.chain(std::iter::once(WORKER_FREED))
.chain(std::iter::once(NOTHING_TO_DO))
.collect::<Vec<_>>(),
timeout,
)
.await
}
pub async fn assert_event_journal_completes(&mut self, expected: &[WorkType]) {
self.assert_event_journal(
&expected
@@ -651,6 +694,37 @@ impl TestRig {
assert_eq!(events, expected);
}
/// Assert that the `BeaconProcessor` event journal is not as `expected`.
pub async fn assert_not_in_event_journal_with_timeout(
&mut self,
expected: &[&str],
timeout: Duration,
) {
let mut events = Vec::with_capacity(expected.len());
let drain_future = async {
while let Some(event) = self.work_journal_rx.recv().await {
events.push(event);
// Break as soon as we collect the desired number of events.
if events.len() >= expected.len() {
break;
}
}
};
// Panic if we don't time out.
tokio::select! {
_ = tokio::time::sleep(timeout) => {},
_ = drain_future => panic!(
"Got events before timeout. Expected no events but got {:?}",
events
),
}
assert_ne!(events, expected);
}
/// Listen for network messages and collect them for a specified duration or until reaching a count.
///
/// Returns None if no messages were received, or Some(Vec) containing the received messages.
@@ -743,6 +817,159 @@ fn junk_message_id() -> MessageId {
MessageId::new(&[])
}
// Test that column reconstruction is delayed for columns that arrive
// at the beginning of the slot.
#[tokio::test]
async fn data_column_reconstruction_at_slot_start() {
if test_spec::<E>().fulu_fork_epoch.is_none() {
return;
};
let mut rig = TestRig::new_supernode(SMALL_CHAIN).await;
let slot_start = rig
.chain
.slot_clock
.start_of(rig.next_block.slot())
.unwrap();
rig.chain
.slot_clock
.set_current_time(slot_start - rig.chain.spec.maximum_gossip_clock_disparity());
assert_eq!(
rig.chain.slot().unwrap(),
rig.next_block.slot() - 1,
"chain should be at the correct slot"
);
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar])
.await;
}
if num_data_columns > 0 {
// Reconstruction is delayed by 100ms, we should not be able to complete
// reconstruction up to this point
rig.assert_event_journal_does_not_complete_with_timeout(
&[WorkType::ColumnReconstruction],
Duration::from_millis(100),
)
.await;
// We've waited at least 150ms, reconstruction can now be triggered
rig.assert_event_journal_completes_with_timeout(
&[WorkType::ColumnReconstruction],
Duration::from_millis(200),
)
.await;
}
}
// Test that column reconstruction happens immediately for columns that arrive at the
// reconstruction deadline.
#[tokio::test]
async fn data_column_reconstruction_at_deadline() {
if test_spec::<E>().fulu_fork_epoch.is_none() {
return;
};
let mut rig = TestRig::new_supernode(SMALL_CHAIN).await;
let slot_start = rig
.chain
.slot_clock
.start_of(rig.next_block.slot())
.unwrap();
rig.chain
.slot_clock
.set_current_time(slot_start - rig.chain.spec.maximum_gossip_clock_disparity());
assert_eq!(
rig.chain.slot().unwrap(),
rig.next_block.slot() - 1,
"chain should be at the correct slot"
);
// We push the slot clock to 3 seconds into the slot, this is the deadline to trigger reconstruction.
rig.chain
.slot_clock
.set_current_time(slot_start + Duration::from_secs(3));
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar])
.await;
}
// Since we're at the reconstruction deadline, reconstruction should be triggered immediately
if num_data_columns > 0 {
rig.assert_event_journal_completes_with_timeout(
&[WorkType::ColumnReconstruction],
Duration::from_millis(50),
)
.await;
}
}
// Test the column reconstruction is delayed for columns that arrive for a previous slot.
#[tokio::test]
async fn data_column_reconstruction_at_next_slot() {
if test_spec::<E>().fulu_fork_epoch.is_none() {
return;
};
let mut rig = TestRig::new_supernode(SMALL_CHAIN).await;
let slot_start = rig
.chain
.slot_clock
.start_of(rig.next_block.slot())
.unwrap();
rig.chain
.slot_clock
.set_current_time(slot_start - rig.chain.spec.maximum_gossip_clock_disparity());
assert_eq!(
rig.chain.slot().unwrap(),
rig.next_block.slot() - 1,
"chain should be at the correct slot"
);
// We push the slot clock to the next slot.
rig.chain
.slot_clock
.set_current_time(slot_start + Duration::from_secs(12));
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar])
.await;
}
if num_data_columns > 0 {
// Since we are in the next slot reconstruction for the previous slot should be delayed again
rig.assert_event_journal_does_not_complete_with_timeout(
&[WorkType::ColumnReconstruction],
Duration::from_millis(100),
)
.await;
// We've waited at least 150ms, reconstruction can now be triggered
rig.assert_event_journal_completes_with_timeout(
&[WorkType::ColumnReconstruction],
Duration::from_millis(200),
)
.await;
}
}
/// Blocks that arrive early should be queued for later processing.
#[tokio::test]
async fn import_gossip_block_acceptably_early() {
@@ -1359,8 +1586,13 @@ async fn test_backfill_sync_processing_rate_limiting_disabled() {
enable_backfill_rate_limiting: false,
..Default::default()
};
let mut rig =
TestRig::new_parametric(SMALL_CHAIN, beacon_processor_config, test_spec::<E>()).await;
let mut rig = TestRig::new_parametric(
SMALL_CHAIN,
beacon_processor_config,
false,
test_spec::<E>(),
)
.await;
for _ in 0..3 {
rig.enqueue_backfill_batch();