From 623c8f4617a58c6f792222e784a6a27c1364805f Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sun, 24 May 2026 09:38:12 +0300 Subject: [PATCH] Clean up --- beacon_node/beacon_processor/src/lib.rs | 17 +++++++++++++- .../src/scheduler/work_queue.rs | 7 ++++++ .../src/scheduler/work_reprocessing_queue.rs | 23 +++++++++++-------- 3 files changed, 36 insertions(+), 11 deletions(-) diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index cd58558b12..d8f63ec561 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -306,7 +306,7 @@ impl From for WorkEvent { } ReadyWork::DataColumn(QueuedGossipDataColumn { process_fn, .. }) => Self { drop_during_sync: true, - work: Work::UnknownBlockAttestation { process_fn }, + work: Work::UnknownBlockDataColumn { process_fn }, }, } } @@ -373,6 +373,9 @@ pub enum Work { UnknownBlockAttestation { process_fn: BlockingFn, }, + UnknownBlockDataColumn { + process_fn: BlockingFn, + }, GossipAttestationBatch { attestations: GossipAttestationBatch, process_batch: Box, @@ -469,6 +472,7 @@ pub enum WorkType { GossipAttestation, GossipAttestationToConvert, UnknownBlockAttestation, + UnknownBlockDataColumn, GossipAttestationBatch, GossipAggregate, UnknownBlockAggregate, @@ -576,6 +580,7 @@ impl Work { Work::LightClientFinalityUpdateRequest(_) => WorkType::LightClientFinalityUpdateRequest, Work::LightClientUpdatesByRangeRequest(_) => WorkType::LightClientUpdatesByRangeRequest, Work::UnknownBlockAttestation { .. } => WorkType::UnknownBlockAttestation, + Work::UnknownBlockDataColumn { .. } => WorkType::UnknownBlockDataColumn, Work::UnknownBlockAggregate { .. } => WorkType::UnknownBlockAggregate, Work::UnknownLightClientOptimisticUpdate { .. } => { WorkType::UnknownLightClientOptimisticUpdate @@ -990,6 +995,9 @@ impl BeaconProcessor { } else if let Some(item) = work_queues.unknown_block_aggregate_queue.pop() { Some(item) } else if let Some(item) = work_queues.unknown_block_attestation_queue.pop() + { + Some(item) + } else if let Some(item) = work_queues.unknown_block_data_column_queue.pop() { Some(item) // Check execution payload bids. Most proposers will request bids directly from builders @@ -1250,6 +1258,9 @@ impl BeaconProcessor { Work::UnknownBlockAttestation { .. } => { work_queues.unknown_block_attestation_queue.push(work) } + Work::UnknownBlockDataColumn { .. } => work_queues + .unknown_block_data_column_queue + .push(work, work_id), Work::UnknownBlockAggregate { .. } => { work_queues.unknown_block_aggregate_queue.push(work) } @@ -1300,6 +1311,9 @@ impl BeaconProcessor { WorkType::UnknownBlockAttestation => { work_queues.unknown_block_attestation_queue.len() } + WorkType::UnknownBlockDataColumn => { + work_queues.unknown_block_data_column_queue.len() + } WorkType::GossipAttestationBatch => 0, // No queue WorkType::GossipAggregate => work_queues.aggregate_queue.len(), WorkType::UnknownBlockAggregate => { @@ -1517,6 +1531,7 @@ impl BeaconProcessor { }), Work::UnknownBlockAttestation { process_fn } | Work::UnknownBlockAggregate { process_fn } + | Work::UnknownBlockDataColumn { process_fn } | Work::UnknownLightClientOptimisticUpdate { process_fn, .. } => { task_spawner.spawn_blocking(process_fn) } diff --git a/beacon_node/beacon_processor/src/scheduler/work_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_queue.rs index 2fdc15182c..41d25a7c21 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_queue.rs @@ -111,6 +111,7 @@ pub struct BeaconProcessorQueueLengths { attestation_queue: usize, unknown_block_aggregate_queue: usize, unknown_block_attestation_queue: usize, + unknown_block_data_column_queue: usize, sync_message_queue: usize, sync_contribution_queue: usize, gossip_voluntary_exit_queue: usize, @@ -175,6 +176,8 @@ impl BeaconProcessorQueueLengths { Ok(Self { aggregate_queue: 4096, unknown_block_aggregate_queue: 1024, + // Capacity for two slot's worth of data columns for a supernode. + unknown_block_data_column_queue: 256, // Capacity for a full slot's worth of attestations if subscribed to all subnets attestation_queue: std::cmp::max( active_validator_count / slots_per_epoch, @@ -247,6 +250,7 @@ pub struct WorkQueues { pub attestation_debounce: TimeLatch, pub unknown_block_aggregate_queue: LifoQueue>, pub unknown_block_attestation_queue: LifoQueue>, + pub unknown_block_data_column_queue: FifoQueue>, pub sync_message_queue: LifoQueue>, pub sync_contribution_queue: LifoQueue>, pub gossip_voluntary_exit_queue: FifoQueue>, @@ -305,6 +309,8 @@ impl WorkQueues { LifoQueue::new(queue_lengths.unknown_block_aggregate_queue); let unknown_block_attestation_queue = LifoQueue::new(queue_lengths.unknown_block_attestation_queue); + let unknown_block_data_column_queue = + FifoQueue::new(queue_lengths.unknown_block_data_column_queue); let sync_message_queue = LifoQueue::new(queue_lengths.sync_message_queue); let sync_contribution_queue = LifoQueue::new(queue_lengths.sync_contribution_queue); @@ -387,6 +393,7 @@ impl WorkQueues { attestation_debounce, unknown_block_aggregate_queue, unknown_block_attestation_queue, + unknown_block_data_column_queue, sync_message_queue, sync_contribution_queue, gossip_voluntary_exit_queue, diff --git a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs index 764b09745a..faf3711b5e 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs @@ -301,7 +301,7 @@ struct ReprocessQueue { /// Queued backfill batches queued_backfill_batches: Vec, /// Queued gossip data columns awaiting their block, keyed by block root. - queued_gossip_data_columns: HashMap, DelayKey)>, + awaiting_data_columns_per_root: HashMap, DelayKey)>, /* Aux */ /// Next attestation id, used for both aggregated and unaggregated attestations @@ -490,7 +490,7 @@ impl ReprocessQueue { awaiting_lc_updates_per_parent_root: HashMap::new(), queued_backfill_batches: Vec::new(), queued_column_reconstructions: HashMap::new(), - queued_gossip_data_columns: HashMap::new(), + awaiting_data_columns_per_root: HashMap::new(), next_attestation: 0, next_lc_update: 0, early_block_debounce: TimeLatch::default(), @@ -719,12 +719,12 @@ impl ReprocessQueue { let block_root = queued_data_column.beacon_block_root; if let Some((columns, _delay_key)) = - self.queued_gossip_data_columns.get_mut(&block_root) + self.awaiting_data_columns_per_root.get_mut(&block_root) { // Append to existing entry; the timer for this root is already running. columns.push(queued_data_column); } else { - if self.queued_gossip_data_columns.len() >= MAXIMUM_QUEUED_DATA_COLUMNS { + if self.awaiting_data_columns_per_root.len() >= MAXIMUM_QUEUED_DATA_COLUMNS { return; } @@ -732,7 +732,7 @@ impl ReprocessQueue { .data_columns_delay_queue .insert(block_root, QUEUED_ATTESTATION_DELAY); - self.queued_gossip_data_columns + self.awaiting_data_columns_per_root .insert(block_root, (vec![queued_data_column], delay_key)); } } @@ -851,7 +851,7 @@ impl ReprocessQueue { // Unqueue the data columns we have for this root, if any. if let Some((data_columns, delay_key)) = - self.queued_gossip_data_columns.remove(&block_root) + self.awaiting_data_columns_per_root.remove(&block_root) { self.data_columns_delay_queue.remove(&delay_key); for data_column in data_columns { @@ -1118,7 +1118,8 @@ impl ReprocessQueue { } } InboundEvent::ReadyDataColumn(block_root) => { - if let Some((data_columns, _)) = self.queued_gossip_data_columns.remove(&block_root) + if let Some((data_columns, _)) = + self.awaiting_data_columns_per_root.remove(&block_root) { for data_column in data_columns { if self @@ -1731,7 +1732,11 @@ mod tests { queue.handle_message(InboundEvent::Msg(msg)); assert_eq!(queue.awaiting_data_columns_per_root.len(), 1); - assert_eq!(queue.queued_gossip_data_columns.len(), 1); + assert!( + queue + .awaiting_data_columns_per_root + .contains_key(&beacon_block_root) + ); assert_eq!(queue.data_columns_delay_queue.len(), 1); // Simulate block import. @@ -1742,7 +1747,6 @@ mod tests { // Internal state should be cleaned up. assert!(queue.awaiting_data_columns_per_root.is_empty()); - assert!(queue.queued_gossip_data_columns.is_empty()); assert_eq!(queue.data_columns_delay_queue.len(), 0); // The column should have been sent to the ready_work channel. @@ -1782,6 +1786,5 @@ mod tests { // All internal state should be cleaned up. assert!(queue.awaiting_data_columns_per_root.is_empty()); - assert!(queue.queued_gossip_data_columns.is_empty()); } }