This commit is contained in:
Eitan Seri-Levi
2026-05-24 09:38:12 +03:00
parent 3febb7275c
commit 623c8f4617
3 changed files with 36 additions and 11 deletions

View File

@@ -306,7 +306,7 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
}
ReadyWork::DataColumn(QueuedGossipDataColumn { process_fn, .. }) => Self {
drop_during_sync: true,
work: Work::UnknownBlockAttestation { process_fn },
work: Work::UnknownBlockDataColumn { process_fn },
},
}
}
@@ -373,6 +373,9 @@ pub enum Work<E: EthSpec> {
UnknownBlockAttestation {
process_fn: BlockingFn,
},
UnknownBlockDataColumn {
process_fn: BlockingFn,
},
GossipAttestationBatch {
attestations: GossipAttestationBatch,
process_batch: Box<dyn FnOnce(GossipAttestationBatch) + Send + Sync>,
@@ -469,6 +472,7 @@ pub enum WorkType {
GossipAttestation,
GossipAttestationToConvert,
UnknownBlockAttestation,
UnknownBlockDataColumn,
GossipAttestationBatch,
GossipAggregate,
UnknownBlockAggregate,
@@ -576,6 +580,7 @@ impl<E: EthSpec> Work<E> {
Work::LightClientFinalityUpdateRequest(_) => WorkType::LightClientFinalityUpdateRequest,
Work::LightClientUpdatesByRangeRequest(_) => WorkType::LightClientUpdatesByRangeRequest,
Work::UnknownBlockAttestation { .. } => WorkType::UnknownBlockAttestation,
Work::UnknownBlockDataColumn { .. } => WorkType::UnknownBlockDataColumn,
Work::UnknownBlockAggregate { .. } => WorkType::UnknownBlockAggregate,
Work::UnknownLightClientOptimisticUpdate { .. } => {
WorkType::UnknownLightClientOptimisticUpdate
@@ -990,6 +995,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
} else if let Some(item) = work_queues.unknown_block_aggregate_queue.pop() {
Some(item)
} else if let Some(item) = work_queues.unknown_block_attestation_queue.pop()
{
Some(item)
} else if let Some(item) = work_queues.unknown_block_data_column_queue.pop()
{
Some(item)
// Check execution payload bids. Most proposers will request bids directly from builders
@@ -1250,6 +1258,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::UnknownBlockAttestation { .. } => {
work_queues.unknown_block_attestation_queue.push(work)
}
Work::UnknownBlockDataColumn { .. } => work_queues
.unknown_block_data_column_queue
.push(work, work_id),
Work::UnknownBlockAggregate { .. } => {
work_queues.unknown_block_aggregate_queue.push(work)
}
@@ -1300,6 +1311,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
WorkType::UnknownBlockAttestation => {
work_queues.unknown_block_attestation_queue.len()
}
WorkType::UnknownBlockDataColumn => {
work_queues.unknown_block_data_column_queue.len()
}
WorkType::GossipAttestationBatch => 0, // No queue
WorkType::GossipAggregate => work_queues.aggregate_queue.len(),
WorkType::UnknownBlockAggregate => {
@@ -1517,6 +1531,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
}),
Work::UnknownBlockAttestation { process_fn }
| Work::UnknownBlockAggregate { process_fn }
| Work::UnknownBlockDataColumn { process_fn }
| Work::UnknownLightClientOptimisticUpdate { process_fn, .. } => {
task_spawner.spawn_blocking(process_fn)
}

View File

@@ -111,6 +111,7 @@ pub struct BeaconProcessorQueueLengths {
attestation_queue: usize,
unknown_block_aggregate_queue: usize,
unknown_block_attestation_queue: usize,
unknown_block_data_column_queue: usize,
sync_message_queue: usize,
sync_contribution_queue: usize,
gossip_voluntary_exit_queue: usize,
@@ -175,6 +176,8 @@ impl BeaconProcessorQueueLengths {
Ok(Self {
aggregate_queue: 4096,
unknown_block_aggregate_queue: 1024,
// Capacity for two slot's worth of data columns for a supernode.
unknown_block_data_column_queue: 256,
// Capacity for a full slot's worth of attestations if subscribed to all subnets
attestation_queue: std::cmp::max(
active_validator_count / slots_per_epoch,
@@ -247,6 +250,7 @@ pub struct WorkQueues<E: EthSpec> {
pub attestation_debounce: TimeLatch,
pub unknown_block_aggregate_queue: LifoQueue<Work<E>>,
pub unknown_block_attestation_queue: LifoQueue<Work<E>>,
pub unknown_block_data_column_queue: FifoQueue<Work<E>>,
pub sync_message_queue: LifoQueue<Work<E>>,
pub sync_contribution_queue: LifoQueue<Work<E>>,
pub gossip_voluntary_exit_queue: FifoQueue<Work<E>>,
@@ -305,6 +309,8 @@ impl<E: EthSpec> WorkQueues<E> {
LifoQueue::new(queue_lengths.unknown_block_aggregate_queue);
let unknown_block_attestation_queue =
LifoQueue::new(queue_lengths.unknown_block_attestation_queue);
let unknown_block_data_column_queue =
FifoQueue::new(queue_lengths.unknown_block_data_column_queue);
let sync_message_queue = LifoQueue::new(queue_lengths.sync_message_queue);
let sync_contribution_queue = LifoQueue::new(queue_lengths.sync_contribution_queue);
@@ -387,6 +393,7 @@ impl<E: EthSpec> WorkQueues<E> {
attestation_debounce,
unknown_block_aggregate_queue,
unknown_block_attestation_queue,
unknown_block_data_column_queue,
sync_message_queue,
sync_contribution_queue,
gossip_voluntary_exit_queue,

View File

@@ -301,7 +301,7 @@ struct ReprocessQueue<S> {
/// Queued backfill batches
queued_backfill_batches: Vec<QueuedBackfillBatch>,
/// Queued gossip data columns awaiting their block, keyed by block root.
queued_gossip_data_columns: HashMap<Hash256, (Vec<QueuedGossipDataColumn>, DelayKey)>,
awaiting_data_columns_per_root: HashMap<Hash256, (Vec<QueuedGossipDataColumn>, DelayKey)>,
/* Aux */
/// Next attestation id, used for both aggregated and unaggregated attestations
@@ -490,7 +490,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
awaiting_lc_updates_per_parent_root: HashMap::new(),
queued_backfill_batches: Vec::new(),
queued_column_reconstructions: HashMap::new(),
queued_gossip_data_columns: HashMap::new(),
awaiting_data_columns_per_root: HashMap::new(),
next_attestation: 0,
next_lc_update: 0,
early_block_debounce: TimeLatch::default(),
@@ -719,12 +719,12 @@ impl<S: SlotClock> ReprocessQueue<S> {
let block_root = queued_data_column.beacon_block_root;
if let Some((columns, _delay_key)) =
self.queued_gossip_data_columns.get_mut(&block_root)
self.awaiting_data_columns_per_root.get_mut(&block_root)
{
// Append to existing entry; the timer for this root is already running.
columns.push(queued_data_column);
} else {
if self.queued_gossip_data_columns.len() >= MAXIMUM_QUEUED_DATA_COLUMNS {
if self.awaiting_data_columns_per_root.len() >= MAXIMUM_QUEUED_DATA_COLUMNS {
return;
}
@@ -732,7 +732,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
.data_columns_delay_queue
.insert(block_root, QUEUED_ATTESTATION_DELAY);
self.queued_gossip_data_columns
self.awaiting_data_columns_per_root
.insert(block_root, (vec![queued_data_column], delay_key));
}
}
@@ -851,7 +851,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
// Unqueue the data columns we have for this root, if any.
if let Some((data_columns, delay_key)) =
self.queued_gossip_data_columns.remove(&block_root)
self.awaiting_data_columns_per_root.remove(&block_root)
{
self.data_columns_delay_queue.remove(&delay_key);
for data_column in data_columns {
@@ -1118,7 +1118,8 @@ impl<S: SlotClock> ReprocessQueue<S> {
}
}
InboundEvent::ReadyDataColumn(block_root) => {
if let Some((data_columns, _)) = self.queued_gossip_data_columns.remove(&block_root)
if let Some((data_columns, _)) =
self.awaiting_data_columns_per_root.remove(&block_root)
{
for data_column in data_columns {
if self
@@ -1731,7 +1732,11 @@ mod tests {
queue.handle_message(InboundEvent::Msg(msg));
assert_eq!(queue.awaiting_data_columns_per_root.len(), 1);
assert_eq!(queue.queued_gossip_data_columns.len(), 1);
assert!(
queue
.awaiting_data_columns_per_root
.contains_key(&beacon_block_root)
);
assert_eq!(queue.data_columns_delay_queue.len(), 1);
// Simulate block import.
@@ -1742,7 +1747,6 @@ mod tests {
// Internal state should be cleaned up.
assert!(queue.awaiting_data_columns_per_root.is_empty());
assert!(queue.queued_gossip_data_columns.is_empty());
assert_eq!(queue.data_columns_delay_queue.len(), 0);
// The column should have been sent to the ready_work channel.
@@ -1782,6 +1786,5 @@ mod tests {
// All internal state should be cleaned up.
assert!(queue.awaiting_data_columns_per_root.is_empty());
assert!(queue.queued_gossip_data_columns.is_empty());
}
}