Fix stuck backfill when scheduled work queue is at capacity (#5575)

* Fix stuck backfill and add regression test.

* Remove unnecessary `yield_now`

* Merge branch 'unstable' into fix-stuck-backfill

* Revert previous change and add extra comment.

* Merge branch 'unstable' into fix-stuck-backfill

* Update tests to use configured event schedule instead of hard coded values.

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into fix-stuck-backfill
This commit is contained in:
Jimmy Chen
2024-04-23 02:06:46 +10:00
committed by GitHub
parent f7aca97a55
commit 532206e008
4 changed files with 142 additions and 34 deletions

View File

@@ -23,4 +23,7 @@ lazy_static = { workspace = true }
lighthouse_metrics = { workspace = true }
parking_lot = { workspace = true }
num_cpus = { workspace = true }
serde = { workspace = true }
serde = { workspace = true }
[dev-dependencies]
tokio = { workspace = true, features = ["test-util"] }

View File

@@ -851,7 +851,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
ready_work_tx,
work_reprocessing_rx,
&self.executor,
slot_clock,
Arc::new(slot_clock),
self.log.clone(),
maximum_gossip_clock_disparity,
)?;

View File

@@ -22,6 +22,7 @@ use slot_clock::SlotClock;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::time::Duration;
use strum::AsRefStr;
@@ -243,7 +244,7 @@ struct ReprocessQueue<S> {
attestation_delay_debounce: TimeLatch,
lc_update_delay_debounce: TimeLatch,
next_backfill_batch_event: Option<Pin<Box<tokio::time::Sleep>>>,
slot_clock: Pin<Box<S>>,
slot_clock: Arc<S>,
}
pub type QueuedLightClientUpdateId = usize;
@@ -362,7 +363,7 @@ pub fn spawn_reprocess_scheduler<S: SlotClock + 'static>(
ready_work_tx: Sender<ReadyWork>,
work_reprocessing_rx: Receiver<ReprocessQueueMessage>,
executor: &TaskExecutor,
slot_clock: S,
slot_clock: Arc<S>,
log: Logger,
maximum_gossip_clock_disparity: Duration,
) -> Result<(), String> {
@@ -370,34 +371,12 @@ pub fn spawn_reprocess_scheduler<S: SlotClock + 'static>(
if ADDITIONAL_QUEUED_BLOCK_DELAY >= maximum_gossip_clock_disparity {
return Err("The block delay and gossip disparity don't match.".to_string());
}
let mut queue = ReprocessQueue {
work_reprocessing_rx,
ready_work_tx,
gossip_block_delay_queue: DelayQueue::new(),
rpc_block_delay_queue: DelayQueue::new(),
attestations_delay_queue: DelayQueue::new(),
lc_updates_delay_queue: DelayQueue::new(),
queued_gossip_block_roots: HashSet::new(),
queued_lc_updates: FnvHashMap::default(),
queued_aggregates: FnvHashMap::default(),
queued_unaggregates: FnvHashMap::default(),
awaiting_attestations_per_root: HashMap::new(),
awaiting_lc_updates_per_parent_root: HashMap::new(),
queued_backfill_batches: Vec::new(),
next_attestation: 0,
next_lc_update: 0,
early_block_debounce: TimeLatch::default(),
rpc_block_debounce: TimeLatch::default(),
attestation_delay_debounce: TimeLatch::default(),
lc_update_delay_debounce: TimeLatch::default(),
next_backfill_batch_event: None,
slot_clock: Box::pin(slot_clock.clone()),
};
let mut queue = ReprocessQueue::new(ready_work_tx, work_reprocessing_rx, slot_clock);
executor.spawn(
async move {
while let Some(msg) = queue.next().await {
queue.handle_message(msg, &slot_clock, &log);
queue.handle_message(msg, &log);
}
debug!(
@@ -412,7 +391,37 @@ pub fn spawn_reprocess_scheduler<S: SlotClock + 'static>(
}
impl<S: SlotClock> ReprocessQueue<S> {
fn handle_message(&mut self, msg: InboundEvent, slot_clock: &S, log: &Logger) {
fn new(
ready_work_tx: Sender<ReadyWork>,
work_reprocessing_rx: Receiver<ReprocessQueueMessage>,
slot_clock: Arc<S>,
) -> Self {
ReprocessQueue {
work_reprocessing_rx,
ready_work_tx,
gossip_block_delay_queue: DelayQueue::new(),
rpc_block_delay_queue: DelayQueue::new(),
attestations_delay_queue: DelayQueue::new(),
lc_updates_delay_queue: DelayQueue::new(),
queued_gossip_block_roots: HashSet::new(),
queued_lc_updates: FnvHashMap::default(),
queued_aggregates: FnvHashMap::default(),
queued_unaggregates: FnvHashMap::default(),
awaiting_attestations_per_root: HashMap::new(),
awaiting_lc_updates_per_parent_root: HashMap::new(),
queued_backfill_batches: Vec::new(),
next_attestation: 0,
next_lc_update: 0,
early_block_debounce: TimeLatch::default(),
rpc_block_debounce: TimeLatch::default(),
attestation_delay_debounce: TimeLatch::default(),
lc_update_delay_debounce: TimeLatch::default(),
next_backfill_batch_event: None,
slot_clock,
}
}
fn handle_message(&mut self, msg: InboundEvent, log: &Logger) {
use ReprocessQueueMessage::*;
match msg {
// Some block has been indicated as "early" and should be processed when the
@@ -426,7 +435,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
return;
}
if let Some(duration_till_slot) = slot_clock.duration_to_slot(block_slot) {
if let Some(duration_till_slot) = self.slot_clock.duration_to_slot(block_slot) {
// Check to ensure this won't over-fill the queue.
if self.queued_gossip_block_roots.len() >= MAXIMUM_QUEUED_BLOCKS {
if self.early_block_debounce.elapsed() {
@@ -459,7 +468,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
// This logic is slightly awkward since `SlotClock::duration_to_slot`
// doesn't distinguish between a slot that has already arrived and an
// error reading the slot clock.
if let Some(now) = slot_clock.now() {
if let Some(now) = self.slot_clock.now() {
if block_slot <= now
&& self
.ready_work_tx
@@ -860,7 +869,8 @@ impl<S: SlotClock> ReprocessQueue<S> {
}
}
InboundEvent::ReadyBackfillSync(queued_backfill_batch) => {
let millis_from_slot_start = slot_clock
let millis_from_slot_start = self
.slot_clock
.millis_from_current_slot_start()
.map_or("null".to_string(), |duration| {
duration.as_millis().to_string()
@@ -886,7 +896,12 @@ impl<S: SlotClock> ReprocessQueue<S> {
"Failed to send scheduled backfill work";
"info" => "sending work back to queue"
);
self.queued_backfill_batches.insert(0, batch)
self.queued_backfill_batches.insert(0, batch);
// only recompute if there is no `next_backfill_batch_event` already scheduled
if self.next_backfill_batch_event.is_none() {
self.recompute_next_backfill_batch_event();
}
}
// The message was not sent and we didn't get the correct
// return result. This is a logic error.
@@ -963,7 +978,11 @@ impl<S: SlotClock> ReprocessQueue<S> {
#[cfg(test)]
mod tests {
use super::*;
use slot_clock::TestingSlotClock;
use logging::test_logger;
use slot_clock::{ManualSlotClock, TestingSlotClock};
use std::ops::Add;
use std::sync::Arc;
use task_executor::test_utils::TestRuntime;
#[test]
fn backfill_processing_schedule_calculation() {
@@ -1002,4 +1021,84 @@ mod tests {
duration_to_next_slot + event_times[0]
);
}
// Regression test for issue #5504.
// See: https://github.com/sigp/lighthouse/issues/5504#issuecomment-2050930045
#[tokio::test]
async fn backfill_schedule_failed_should_reschedule() {
let runtime = TestRuntime::default();
let log = test_logger();
let (work_reprocessing_tx, work_reprocessing_rx) = mpsc::channel(1);
let (ready_work_tx, mut ready_work_rx) = mpsc::channel(1);
let slot_duration = 12;
let slot_clock = Arc::new(testing_slot_clock(slot_duration));
spawn_reprocess_scheduler(
ready_work_tx.clone(),
work_reprocessing_rx,
&runtime.task_executor,
slot_clock.clone(),
log,
Duration::from_millis(500),
)
.unwrap();
// Pause time so it only advances manually
tokio::time::pause();
// Send some random work to `ready_work_tx` to fill up the capacity first.
ready_work_tx
.try_send(ReadyWork::IgnoredRpcBlock(IgnoredRpcBlock {
process_fn: Box::new(|| {}),
}))
.unwrap();
// Now queue a backfill sync batch.
work_reprocessing_tx
.try_send(ReprocessQueueMessage::BackfillSync(QueuedBackfillBatch(
Box::pin(async {}),
)))
.unwrap();
tokio::task::yield_now().await;
// Advance the time by more than 1/2 the slot to trigger a scheduled backfill batch to be sent.
// This should fail as the `ready_work` channel is at capacity, and it should be rescheduled.
let duration_to_next_event =
ReprocessQueue::duration_until_next_backfill_batch_event(slot_clock.as_ref());
let one_ms = Duration::from_millis(1);
advance_time(&slot_clock, duration_to_next_event.add(one_ms)).await;
// Now drain the `ready_work` channel.
assert!(matches!(
ready_work_rx.try_recv(),
Ok(ReadyWork::IgnoredRpcBlock { .. })
));
assert!(ready_work_rx.try_recv().is_err());
// Advance time again, and assert that the re-scheduled batch is successfully sent.
let duration_to_next_event =
ReprocessQueue::duration_until_next_backfill_batch_event(slot_clock.as_ref());
advance_time(&slot_clock, duration_to_next_event.add(one_ms)).await;
assert!(matches!(
ready_work_rx.try_recv(),
Ok(ReadyWork::BackfillSync { .. })
));
}
/// Advances slot clock and test clock time by the same duration.
async fn advance_time(slot_clock: &ManualSlotClock, duration: Duration) {
slot_clock.advance_time(duration);
tokio::time::advance(duration).await;
// NOTE: The `tokio::time::advance` fn actually calls `yield_now()` after advancing the
// clock. Why do we need an extra `yield_now`?
tokio::task::yield_now().await;
}
fn testing_slot_clock(slot_duration: u64) -> ManualSlotClock {
TestingSlotClock::new(
Slot::new(0),
Duration::from_secs(0),
Duration::from_secs(slot_duration),
)
}
}

View File

@@ -1,5 +1,6 @@
use super::SlotClock;
use parking_lot::RwLock;
use std::ops::Add;
use std::sync::Arc;
use std::time::Duration;
use types::Slot;
@@ -41,6 +42,11 @@ impl ManualSlotClock {
*self.current_time.write() = duration;
}
pub fn advance_time(&self, duration: Duration) {
let current_time = *self.current_time.read();
*self.current_time.write() = current_time.add(duration);
}
pub fn advance_slot(&self) {
self.set_slot(self.now().unwrap().as_u64() + 1)
}