diff --git a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs index 032f14ce3d..9565e57589 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs @@ -82,6 +82,9 @@ pub const BACKFILL_SCHEDULE_IN_SLOT: [(u32, u32); 3] = [ (4, 5), ]; +/// Trigger reconstruction if we are this many seconds into the current slot +pub const RECONSTRUCTION_DEADLINE: Duration = Duration::from_millis(3000); + /// Messages that the scheduler can receive. #[derive(AsRefStr)] pub enum ReprocessQueueMessage { @@ -172,6 +175,7 @@ pub struct QueuedBackfillBatch(pub AsyncFn); pub struct QueuedColumnReconstruction { pub block_root: Hash256, + pub slot: Slot, pub process_fn: AsyncFn, } @@ -749,16 +753,26 @@ impl ReprocessQueue { } } InboundEvent::Msg(DelayColumnReconstruction(request)) => { + let mut reconstruction_delay = QUEUED_RECONSTRUCTION_DELAY; + if let Some(seconds_from_current_slot) = + self.slot_clock.seconds_from_current_slot_start() + && let Some(current_slot) = self.slot_clock.now() + && seconds_from_current_slot >= RECONSTRUCTION_DEADLINE + && current_slot == request.slot + { + // If we are at least `RECONSTRUCTION_DEADLINE` seconds into the current slot, + // and the reconstruction request is for the current slot, process reconstruction immediately. + reconstruction_delay = Duration::from_secs(0); + } match self.queued_column_reconstructions.entry(request.block_root) { Entry::Occupied(key) => { - // Push back the reattempted reconstruction self.column_reconstructions_delay_queue - .reset(key.get(), QUEUED_RECONSTRUCTION_DELAY) + .reset(key.get(), reconstruction_delay); } Entry::Vacant(vacant) => { let delay_key = self .column_reconstructions_delay_queue - .insert(request, QUEUED_RECONSTRUCTION_DELAY); + .insert(request, reconstruction_delay); vacant.insert(delay_key); } } diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index cbe441b419..1f1a3427e7 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1064,6 +1064,7 @@ impl NetworkBeaconProcessor { work: Work::Reprocess(ReprocessQueueMessage::DelayColumnReconstruction( QueuedColumnReconstruction { block_root, + slot: *slot, process_fn: Box::pin(async move { cloned_self .attempt_data_column_reconstruction(block_root, true) diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 2027a525e6..2935c2d213 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -94,12 +94,20 @@ impl TestRig { // This allows for testing voluntary exits without building out a massive chain. let mut spec = test_spec::(); spec.shard_committee_period = 2; - Self::new_parametric(chain_length, BeaconProcessorConfig::default(), spec).await + Self::new_parametric(chain_length, BeaconProcessorConfig::default(), false, spec).await + } + + pub async fn new_supernode(chain_length: u64) -> Self { + // This allows for testing voluntary exits without building out a massive chain. + let mut spec = test_spec::(); + spec.shard_committee_period = 2; + Self::new_parametric(chain_length, BeaconProcessorConfig::default(), true, spec).await } pub async fn new_parametric( chain_length: u64, beacon_processor_config: BeaconProcessorConfig, + import_data_columns: bool, spec: ChainSpec, ) -> Self { let spec = Arc::new(spec); @@ -108,6 +116,7 @@ impl TestRig { .deterministic_keypairs(VALIDATOR_COUNT) .fresh_ephemeral_store() .mock_execution_layer() + .import_all_data_columns(import_data_columns) .chain_config(<_>::default()) .build(); @@ -601,6 +610,40 @@ impl TestRig { .await } + pub async fn assert_event_journal_completes_with_timeout( + &mut self, + expected: &[WorkType], + timeout: Duration, + ) { + self.assert_event_journal_with_timeout( + &expected + .iter() + .map(Into::<&'static str>::into) + .chain(std::iter::once(WORKER_FREED)) + .chain(std::iter::once(NOTHING_TO_DO)) + .collect::>(), + timeout, + ) + .await + } + + pub async fn assert_event_journal_does_not_complete_with_timeout( + &mut self, + expected: &[WorkType], + timeout: Duration, + ) { + self.assert_not_in_event_journal_with_timeout( + &expected + .iter() + .map(Into::<&'static str>::into) + .chain(std::iter::once(WORKER_FREED)) + .chain(std::iter::once(NOTHING_TO_DO)) + .collect::>(), + timeout, + ) + .await + } + pub async fn assert_event_journal_completes(&mut self, expected: &[WorkType]) { self.assert_event_journal( &expected @@ -651,6 +694,37 @@ impl TestRig { assert_eq!(events, expected); } + /// Assert that the `BeaconProcessor` event journal is not as `expected`. + pub async fn assert_not_in_event_journal_with_timeout( + &mut self, + expected: &[&str], + timeout: Duration, + ) { + let mut events = Vec::with_capacity(expected.len()); + + let drain_future = async { + while let Some(event) = self.work_journal_rx.recv().await { + events.push(event); + + // Break as soon as we collect the desired number of events. + if events.len() >= expected.len() { + break; + } + } + }; + + // Panic if we don't time out. + tokio::select! { + _ = tokio::time::sleep(timeout) => {}, + _ = drain_future => panic!( + "Got events before timeout. Expected no events but got {:?}", + events + ), + } + + assert_ne!(events, expected); + } + /// Listen for network messages and collect them for a specified duration or until reaching a count. /// /// Returns None if no messages were received, or Some(Vec) containing the received messages. @@ -743,6 +817,159 @@ fn junk_message_id() -> MessageId { MessageId::new(&[]) } +// Test that column reconstruction is delayed for columns that arrive +// at the beginning of the slot. +#[tokio::test] +async fn data_column_reconstruction_at_slot_start() { + if test_spec::().fulu_fork_epoch.is_none() { + return; + }; + + let mut rig = TestRig::new_supernode(SMALL_CHAIN).await; + + let slot_start = rig + .chain + .slot_clock + .start_of(rig.next_block.slot()) + .unwrap(); + + rig.chain + .slot_clock + .set_current_time(slot_start - rig.chain.spec.maximum_gossip_clock_disparity()); + + assert_eq!( + rig.chain.slot().unwrap(), + rig.next_block.slot() - 1, + "chain should be at the correct slot" + ); + + let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0); + for i in 0..num_data_columns { + rig.enqueue_gossip_data_columns(i); + rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar]) + .await; + } + + if num_data_columns > 0 { + // Reconstruction is delayed by 100ms, we should not be able to complete + // reconstruction up to this point + rig.assert_event_journal_does_not_complete_with_timeout( + &[WorkType::ColumnReconstruction], + Duration::from_millis(100), + ) + .await; + + // We've waited at least 150ms, reconstruction can now be triggered + rig.assert_event_journal_completes_with_timeout( + &[WorkType::ColumnReconstruction], + Duration::from_millis(200), + ) + .await; + } +} + +// Test that column reconstruction happens immediately for columns that arrive at the +// reconstruction deadline. +#[tokio::test] +async fn data_column_reconstruction_at_deadline() { + if test_spec::().fulu_fork_epoch.is_none() { + return; + }; + + let mut rig = TestRig::new_supernode(SMALL_CHAIN).await; + + let slot_start = rig + .chain + .slot_clock + .start_of(rig.next_block.slot()) + .unwrap(); + + rig.chain + .slot_clock + .set_current_time(slot_start - rig.chain.spec.maximum_gossip_clock_disparity()); + + assert_eq!( + rig.chain.slot().unwrap(), + rig.next_block.slot() - 1, + "chain should be at the correct slot" + ); + + // We push the slot clock to 3 seconds into the slot, this is the deadline to trigger reconstruction. + rig.chain + .slot_clock + .set_current_time(slot_start + Duration::from_secs(3)); + + let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0); + for i in 0..num_data_columns { + rig.enqueue_gossip_data_columns(i); + rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar]) + .await; + } + + // Since we're at the reconstruction deadline, reconstruction should be triggered immediately + if num_data_columns > 0 { + rig.assert_event_journal_completes_with_timeout( + &[WorkType::ColumnReconstruction], + Duration::from_millis(50), + ) + .await; + } +} + +// Test the column reconstruction is delayed for columns that arrive for a previous slot. +#[tokio::test] +async fn data_column_reconstruction_at_next_slot() { + if test_spec::().fulu_fork_epoch.is_none() { + return; + }; + + let mut rig = TestRig::new_supernode(SMALL_CHAIN).await; + + let slot_start = rig + .chain + .slot_clock + .start_of(rig.next_block.slot()) + .unwrap(); + + rig.chain + .slot_clock + .set_current_time(slot_start - rig.chain.spec.maximum_gossip_clock_disparity()); + + assert_eq!( + rig.chain.slot().unwrap(), + rig.next_block.slot() - 1, + "chain should be at the correct slot" + ); + + // We push the slot clock to the next slot. + rig.chain + .slot_clock + .set_current_time(slot_start + Duration::from_secs(12)); + + let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0); + for i in 0..num_data_columns { + rig.enqueue_gossip_data_columns(i); + rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar]) + .await; + } + + if num_data_columns > 0 { + // Since we are in the next slot reconstruction for the previous slot should be delayed again + rig.assert_event_journal_does_not_complete_with_timeout( + &[WorkType::ColumnReconstruction], + Duration::from_millis(100), + ) + .await; + + // We've waited at least 150ms, reconstruction can now be triggered + rig.assert_event_journal_completes_with_timeout( + &[WorkType::ColumnReconstruction], + Duration::from_millis(200), + ) + .await; + } +} + /// Blocks that arrive early should be queued for later processing. #[tokio::test] async fn import_gossip_block_acceptably_early() { @@ -1359,8 +1586,13 @@ async fn test_backfill_sync_processing_rate_limiting_disabled() { enable_backfill_rate_limiting: false, ..Default::default() }; - let mut rig = - TestRig::new_parametric(SMALL_CHAIN, beacon_processor_config, test_spec::()).await; + let mut rig = TestRig::new_parametric( + SMALL_CHAIN, + beacon_processor_config, + false, + test_spec::(), + ) + .await; for _ in 0..3 { rig.enqueue_backfill_batch();