This commit is contained in:
Eitan Seri-Levi
2026-05-24 11:09:51 +03:00
parent af09a3124c
commit 0e99b454a3

View File

@@ -52,6 +52,9 @@ pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12);
/// For how long to queue light client updates for re-processing.
pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12);
/// For how long to queue gossip data columns awaiting their block for re-processing.
pub const QUEUED_GOSSIP_DATA_COLUMN_DELAY: Duration = Duration::from_secs(12);
/// Envelope timeout as a multiplier of slot duration. Envelopes waiting for their block will be
/// sent for processing after this many slots worth of time, even if the block hasn't arrived.
const QUEUED_ENVELOPE_DELAY_SLOTS: u32 = 1;
@@ -302,6 +305,8 @@ struct ReprocessQueue<S> {
queued_backfill_batches: Vec<QueuedBackfillBatch>,
/// Queued gossip data columns awaiting their block, keyed by block root.
awaiting_data_columns_per_root: HashMap<Hash256, (Vec<QueuedGossipDataColumn>, DelayKey)>,
/// Total number of queued gossip data columns across all roots.
queued_data_columns_count: usize,
/* Aux */
/// Next attestation id, used for both aggregated and unaggregated attestations
@@ -312,6 +317,7 @@ struct ReprocessQueue<S> {
rpc_block_debounce: TimeLatch,
attestation_delay_debounce: TimeLatch,
lc_update_delay_debounce: TimeLatch,
data_column_delay_debounce: TimeLatch,
next_backfill_batch_event: Option<Pin<Box<tokio::time::Sleep>>>,
slot_clock: Arc<S>,
}
@@ -491,6 +497,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
queued_backfill_batches: Vec::new(),
queued_column_reconstructions: HashMap::new(),
awaiting_data_columns_per_root: HashMap::new(),
queued_data_columns_count: 0,
next_attestation: 0,
next_lc_update: 0,
early_block_debounce: TimeLatch::default(),
@@ -498,6 +505,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
rpc_block_debounce: TimeLatch::default(),
attestation_delay_debounce: TimeLatch::default(),
lc_update_delay_debounce: TimeLatch::default(),
data_column_delay_debounce: TimeLatch::default(),
next_backfill_batch_event: None,
slot_clock,
}
@@ -718,23 +726,32 @@ impl<S: SlotClock> ReprocessQueue<S> {
InboundEvent::Msg(UnknownBlockDataColumn(queued_data_column)) => {
let block_root = queued_data_column.beacon_block_root;
if self.queued_data_columns_count >= MAXIMUM_QUEUED_DATA_COLUMNS {
if self.data_column_delay_debounce.elapsed() {
warn!(
queue_size = MAXIMUM_QUEUED_DATA_COLUMNS,
msg = "system resources may be saturated",
"Data column delay queue is full, dropping column"
);
}
return;
}
if let Some((columns, _delay_key)) =
self.awaiting_data_columns_per_root.get_mut(&block_root)
{
// Append to existing entry; the timer for this root is already running.
columns.push(queued_data_column);
} else {
if self.awaiting_data_columns_per_root.len() >= MAXIMUM_QUEUED_DATA_COLUMNS {
return;
}
let delay_key = self
.data_columns_delay_queue
.insert(block_root, QUEUED_ATTESTATION_DELAY);
.insert(block_root, QUEUED_GOSSIP_DATA_COLUMN_DELAY);
self.awaiting_data_columns_per_root
.insert(block_root, (vec![queued_data_column], delay_key));
}
self.queued_data_columns_count += 1;
}
InboundEvent::Msg(UnknownLightClientOptimisticUpdate(
queued_light_client_optimistic_update,
@@ -854,6 +871,9 @@ impl<S: SlotClock> ReprocessQueue<S> {
self.awaiting_data_columns_per_root.remove(&block_root)
{
self.data_columns_delay_queue.remove(&delay_key);
self.queued_data_columns_count = self
.queued_data_columns_count
.saturating_sub(data_columns.len());
for data_column in data_columns {
if self
.ready_work_tx
@@ -1121,6 +1141,9 @@ impl<S: SlotClock> ReprocessQueue<S> {
if let Some((data_columns, _)) =
self.awaiting_data_columns_per_root.remove(&block_root)
{
self.queued_data_columns_count = self
.queued_data_columns_count
.saturating_sub(data_columns.len());
for data_column in data_columns {
if self
.ready_work_tx
@@ -1779,7 +1802,7 @@ mod tests {
);
// Advance time past the delay so the entry expires.
advance_time(&queue.slot_clock, 2 * QUEUED_ATTESTATION_DELAY).await;
advance_time(&queue.slot_clock, 2 * QUEUED_GOSSIP_DATA_COLUMN_DELAY).await;
let ready_msg = queue.next().await.unwrap();
assert!(matches!(ready_msg, InboundEvent::ReadyDataColumn(_)));
queue.handle_message(ready_msg);