Merge branch 'unstable' into gloas-lookup-sync-fixes

Rebase the gloas lookup-sync work onto #9391's RequestState trait-removal
design: payload-envelope request reuses the generic SingleLookupRequestState,
concrete BlockRequest/DataRequest/PayloadRequest, parent-imported gate against
awaiting_parent: Option<Hash256>. (Some gloas custody-failure tests still fail —
known peer-attribution issue, pushed for visibility.)
This commit is contained in:
dapplion
2026-06-04 04:16:41 +02:00
28 changed files with 1195 additions and 1619 deletions

View File

@@ -41,8 +41,8 @@
pub use crate::scheduler::BeaconProcessorQueueLengths;
use crate::scheduler::work_queue::WorkQueues;
use crate::work_reprocessing_queue::{
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, QueuedGossipEnvelope,
ReprocessQueueMessage,
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, QueuedGossipDataColumn,
QueuedGossipEnvelope, ReprocessQueueMessage,
};
use futures::stream::{Stream, StreamExt};
use futures::task::Poll;
@@ -304,6 +304,10 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
work: Work::ColumnReconstruction(process_fn),
}
}
ReadyWork::DataColumn(QueuedGossipDataColumn { process_fn, .. }) => Self {
drop_during_sync: true,
work: Work::UnknownBlockDataColumn { process_fn },
},
}
}
}
@@ -369,6 +373,9 @@ pub enum Work<E: EthSpec> {
UnknownBlockAttestation {
process_fn: BlockingFn,
},
UnknownBlockDataColumn {
process_fn: BlockingFn,
},
GossipAttestationBatch {
attestations: GossipAttestationBatch,
process_batch: Box<dyn FnOnce(GossipAttestationBatch) + Send + Sync>,
@@ -466,6 +473,7 @@ pub enum WorkType {
GossipAttestation,
GossipAttestationToConvert,
UnknownBlockAttestation,
UnknownBlockDataColumn,
GossipAttestationBatch,
GossipAggregate,
UnknownBlockAggregate,
@@ -571,6 +579,7 @@ impl<E: EthSpec> Work<E> {
Work::LightClientFinalityUpdateRequest(_) => WorkType::LightClientFinalityUpdateRequest,
Work::LightClientUpdatesByRangeRequest(_) => WorkType::LightClientUpdatesByRangeRequest,
Work::UnknownBlockAttestation { .. } => WorkType::UnknownBlockAttestation,
Work::UnknownBlockDataColumn { .. } => WorkType::UnknownBlockDataColumn,
Work::UnknownBlockAggregate { .. } => WorkType::UnknownBlockAggregate,
Work::UnknownLightClientOptimisticUpdate { .. } => {
WorkType::UnknownLightClientOptimisticUpdate
@@ -844,6 +853,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
Some(item)
} else if let Some(item) = work_queues.gossip_data_column_queue.pop() {
Some(item)
} else if let Some(item) = work_queues.unknown_block_data_column_queue.pop()
{
Some(item)
} else if let Some(item) =
work_queues.gossip_partial_data_column_queue.pop()
{
@@ -1240,6 +1252,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::UnknownBlockAttestation { .. } => {
work_queues.unknown_block_attestation_queue.push(work)
}
Work::UnknownBlockDataColumn { .. } => work_queues
.unknown_block_data_column_queue
.push(work, work_id),
Work::UnknownBlockAggregate { .. } => {
work_queues.unknown_block_aggregate_queue.push(work)
}
@@ -1290,6 +1305,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
WorkType::UnknownBlockAttestation => {
work_queues.unknown_block_attestation_queue.len()
}
WorkType::UnknownBlockDataColumn => {
work_queues.unknown_block_data_column_queue.len()
}
WorkType::GossipAttestationBatch => 0, // No queue
WorkType::GossipAggregate => work_queues.aggregate_queue.len(),
WorkType::UnknownBlockAggregate => {
@@ -1506,6 +1524,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
}),
Work::UnknownBlockAttestation { process_fn }
| Work::UnknownBlockAggregate { process_fn }
| Work::UnknownBlockDataColumn { process_fn }
| Work::UnknownLightClientOptimisticUpdate { process_fn, .. } => {
task_spawner.spawn_blocking(process_fn)
}

View File

@@ -111,6 +111,7 @@ pub struct BeaconProcessorQueueLengths {
attestation_queue: usize,
unknown_block_aggregate_queue: usize,
unknown_block_attestation_queue: usize,
unknown_block_data_column_queue: usize,
sync_message_queue: usize,
sync_contribution_queue: usize,
gossip_voluntary_exit_queue: usize,
@@ -174,6 +175,8 @@ impl BeaconProcessorQueueLengths {
Ok(Self {
aggregate_queue: 4096,
unknown_block_aggregate_queue: 1024,
// Capacity for two slot's worth of data columns for a supernode.
unknown_block_data_column_queue: 256,
// Capacity for a full slot's worth of attestations if subscribed to all subnets
attestation_queue: std::cmp::max(
active_validator_count / slots_per_epoch,
@@ -245,6 +248,7 @@ pub struct WorkQueues<E: EthSpec> {
pub attestation_debounce: TimeLatch,
pub unknown_block_aggregate_queue: LifoQueue<Work<E>>,
pub unknown_block_attestation_queue: LifoQueue<Work<E>>,
pub unknown_block_data_column_queue: FifoQueue<Work<E>>,
pub sync_message_queue: LifoQueue<Work<E>>,
pub sync_contribution_queue: LifoQueue<Work<E>>,
pub gossip_voluntary_exit_queue: FifoQueue<Work<E>>,
@@ -302,6 +306,8 @@ impl<E: EthSpec> WorkQueues<E> {
LifoQueue::new(queue_lengths.unknown_block_aggregate_queue);
let unknown_block_attestation_queue =
LifoQueue::new(queue_lengths.unknown_block_attestation_queue);
let unknown_block_data_column_queue =
FifoQueue::new(queue_lengths.unknown_block_data_column_queue);
let sync_message_queue = LifoQueue::new(queue_lengths.sync_message_queue);
let sync_contribution_queue = LifoQueue::new(queue_lengths.sync_contribution_queue);
@@ -383,6 +389,7 @@ impl<E: EthSpec> WorkQueues<E> {
attestation_debounce,
unknown_block_aggregate_queue,
unknown_block_attestation_queue,
unknown_block_data_column_queue,
sync_message_queue,
sync_contribution_queue,
gossip_voluntary_exit_queue,

View File

@@ -52,6 +52,10 @@ pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12);
/// For how long to queue light client updates for re-processing.
pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12);
/// Data column timeout as a multiplier of slot duration. Columns waiting for their block will be
/// sent for processing after this many slots worth of time, even if the block hasn't arrived.
const QUEUED_DATA_COLUMN_DELAY_SLOTS: u32 = 1;
/// Envelope timeout as a multiplier of slot duration. Envelopes waiting for their block will be
/// sent for processing after this many slots worth of time, even if the block hasn't arrived.
const QUEUED_ENVELOPE_DELAY_SLOTS: u32 = 1;
@@ -76,6 +80,9 @@ const MAXIMUM_QUEUED_ENVELOPES: usize = 16;
/// How many attestations we keep before new ones get dropped.
const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384;
/// How many columns we keep before new ones get dropped.
const MAXIMUM_QUEUED_DATA_COLUMNS: usize = 256;
/// How many light client updates we keep before new ones get dropped.
const MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES: usize = 128;
@@ -123,6 +130,8 @@ pub enum ReprocessQueueMessage {
UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate),
/// A new backfill batch that needs to be scheduled for processing.
BackfillSync(QueuedBackfillBatch),
/// A gossip data column that references an unknown block.
UnknownBlockDataColumn(QueuedGossipDataColumn),
/// A delayed column reconstruction that needs checking
DelayColumnReconstruction(QueuedColumnReconstruction),
}
@@ -138,6 +147,7 @@ pub enum ReadyWork {
LightClientUpdate(QueuedLightClientUpdate),
BackfillSync(QueuedBackfillBatch),
ColumnReconstruction(QueuedColumnReconstruction),
DataColumn(QueuedGossipDataColumn),
}
/// An Attestation for which the corresponding block was not seen while processing, queued for
@@ -200,6 +210,12 @@ pub struct QueuedColumnReconstruction {
pub process_fn: AsyncFn,
}
/// A gossip data column that references an unknown block, queued for later reprocessing.
pub struct QueuedGossipDataColumn {
pub beacon_block_root: Hash256,
pub process_fn: BlockingFn,
}
impl<E: EthSpec> TryFrom<WorkEvent<E>> for QueuedBackfillBatch {
type Error = WorkEvent<E>;
@@ -240,6 +256,8 @@ enum InboundEvent {
ReadyBackfillSync(QueuedBackfillBatch),
/// A column reconstruction that was queued is ready for processing.
ReadyColumnReconstruction(QueuedColumnReconstruction),
/// A gossip data column that is ready for re-processing.
ReadyDataColumn(Hash256),
/// A message sent to the `ReprocessQueue`
Msg(ReprocessQueueMessage),
}
@@ -264,6 +282,8 @@ struct ReprocessQueue<S> {
lc_updates_delay_queue: DelayQueue<QueuedLightClientUpdateId>,
/// Queue to manage scheduled column reconstructions.
column_reconstructions_delay_queue: DelayQueue<QueuedColumnReconstruction>,
/// Queue to manage gossip data column timeouts.
data_columns_delay_queue: DelayQueue<Hash256>,
/* Queued items */
/// Queued blocks.
@@ -284,6 +304,10 @@ struct ReprocessQueue<S> {
queued_column_reconstructions: HashMap<Hash256, Option<DelayKey>>,
/// Queued backfill batches
queued_backfill_batches: Vec<QueuedBackfillBatch>,
/// Queued gossip data columns awaiting their block, keyed by block root.
awaiting_data_columns_per_root: HashMap<Hash256, (Vec<QueuedGossipDataColumn>, DelayKey)>,
/// Total number of queued gossip data columns across all roots.
queued_data_columns_count: usize,
/* Aux */
/// Next attestation id, used for both aggregated and unaggregated attestations
@@ -294,6 +318,7 @@ struct ReprocessQueue<S> {
rpc_block_debounce: TimeLatch,
attestation_delay_debounce: TimeLatch,
lc_update_delay_debounce: TimeLatch,
data_column_delay_debounce: TimeLatch,
next_backfill_batch_event: Option<Pin<Box<tokio::time::Sleep>>>,
slot_clock: Arc<S>,
}
@@ -387,6 +412,13 @@ impl<S: SlotClock> Stream for ReprocessQueue<S> {
Poll::Ready(None) | Poll::Pending => (),
}
match self.data_columns_delay_queue.poll_expired(cx) {
Poll::Ready(Some(block_root)) => {
return Poll::Ready(Some(InboundEvent::ReadyDataColumn(block_root.into_inner())));
}
Poll::Ready(None) | Poll::Pending => (),
}
if let Some(next_backfill_batch_event) = self.next_backfill_batch_event.as_mut() {
match next_backfill_batch_event.as_mut().poll(cx) {
Poll::Ready(_) => {
@@ -455,6 +487,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
attestations_delay_queue: DelayQueue::new(),
lc_updates_delay_queue: DelayQueue::new(),
column_reconstructions_delay_queue: DelayQueue::new(),
data_columns_delay_queue: DelayQueue::new(),
queued_gossip_block_roots: HashSet::new(),
awaiting_envelopes_per_root: HashMap::new(),
queued_lc_updates: FnvHashMap::default(),
@@ -464,6 +497,8 @@ impl<S: SlotClock> ReprocessQueue<S> {
awaiting_lc_updates_per_parent_root: HashMap::new(),
queued_backfill_batches: Vec::new(),
queued_column_reconstructions: HashMap::new(),
awaiting_data_columns_per_root: HashMap::new(),
queued_data_columns_count: 0,
next_attestation: 0,
next_lc_update: 0,
early_block_debounce: TimeLatch::default(),
@@ -471,6 +506,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
rpc_block_debounce: TimeLatch::default(),
attestation_delay_debounce: TimeLatch::default(),
lc_update_delay_debounce: TimeLatch::default(),
data_column_delay_debounce: TimeLatch::default(),
next_backfill_batch_event: None,
slot_clock,
}
@@ -551,22 +587,16 @@ impl<S: SlotClock> ReprocessQueue<S> {
return;
}
// When the queue is full, evict the oldest entry to make room for newer envelopes.
// When the queue is full, drop the new envelope.
if self.awaiting_envelopes_per_root.len() >= MAXIMUM_QUEUED_ENVELOPES {
if self.envelope_delay_debounce.elapsed() {
warn!(
queue_size = MAXIMUM_QUEUED_ENVELOPES,
msg = "system resources may be saturated",
"Envelope delay queue is full, evicting oldest entry"
"Envelope delay queue is full, dropping envelope"
);
}
if let Some(oldest_root) =
self.awaiting_envelopes_per_root.keys().next().copied()
&& let Some((_envelope, delay_key)) =
self.awaiting_envelopes_per_root.remove(&oldest_root)
{
self.envelope_delay_queue.remove(&delay_key);
}
return;
}
// Register the timeout.
@@ -688,6 +718,37 @@ impl<S: SlotClock> ReprocessQueue<S> {
self.next_attestation += 1;
}
InboundEvent::Msg(UnknownBlockDataColumn(queued_data_column)) => {
let block_root = queued_data_column.beacon_block_root;
if self.queued_data_columns_count >= MAXIMUM_QUEUED_DATA_COLUMNS {
if self.data_column_delay_debounce.elapsed() {
warn!(
queue_size = MAXIMUM_QUEUED_DATA_COLUMNS,
msg = "system resources may be saturated",
"Data column delay queue is full, dropping column"
);
}
return;
}
if let Some((columns, _delay_key)) =
self.awaiting_data_columns_per_root.get_mut(&block_root)
{
// Append to existing entry; the timer for this root is already running.
columns.push(queued_data_column);
} else {
let delay_key = self.data_columns_delay_queue.insert(
block_root,
self.slot_clock.slot_duration() * QUEUED_DATA_COLUMN_DELAY_SLOTS,
);
self.awaiting_data_columns_per_root
.insert(block_root, (vec![queued_data_column], delay_key));
}
self.queued_data_columns_count += 1;
}
InboundEvent::Msg(UnknownLightClientOptimisticUpdate(
queued_light_client_optimistic_update,
)) => {
@@ -800,6 +861,25 @@ impl<S: SlotClock> ReprocessQueue<S> {
);
}
}
// Unqueue the data columns we have for this root, if any.
if let Some((data_columns, delay_key)) =
self.awaiting_data_columns_per_root.remove(&block_root)
{
self.data_columns_delay_queue.remove(&delay_key);
self.queued_data_columns_count = self
.queued_data_columns_count
.saturating_sub(data_columns.len());
for data_column in data_columns {
if self
.ready_work_tx
.try_send(ReadyWork::DataColumn(data_column))
.is_err()
{
error!(?block_root, "Failed to send data column for reprocessing");
}
}
}
}
InboundEvent::Msg(NewLightClientOptimisticUpdate { parent_root }) => {
// Unqueue the light client optimistic updates we have for this root, if any.
@@ -1053,6 +1133,27 @@ impl<S: SlotClock> ReprocessQueue<S> {
);
}
}
InboundEvent::ReadyDataColumn(block_root) => {
if let Some((data_columns, _)) =
self.awaiting_data_columns_per_root.remove(&block_root)
{
self.queued_data_columns_count = self
.queued_data_columns_count
.saturating_sub(data_columns.len());
for data_column in data_columns {
if self
.ready_work_tx
.try_send(ReadyWork::DataColumn(data_column))
.is_err()
{
error!(
hint = "system may be overloaded",
"Ignored expired gossip data column"
);
}
}
}
}
}
metrics::set_gauge_vec(
@@ -1581,48 +1682,87 @@ mod tests {
assert_eq!(queue.envelope_delay_queue.len(), 1);
}
/// Tests that a queued gossip data column is released when its block is imported.
#[tokio::test]
async fn envelope_capacity_evicts_oldest() {
async fn data_column_released_on_block_imported() {
create_test_tracing_subscriber();
let config = BeaconProcessorConfig::default();
let (ready_work_tx, mut ready_work_rx) =
mpsc::channel::<ReadyWork>(config.max_scheduled_work_queue_len);
let (_, reprocess_work_rx) =
mpsc::channel::<ReprocessQueueMessage>(config.max_scheduled_work_queue_len);
let slot_clock = Arc::new(testing_slot_clock(12));
let mut queue = ReprocessQueue::new(ready_work_tx, reprocess_work_rx, slot_clock);
tokio::time::pause();
let beacon_block_root = Hash256::repeat_byte(0xbb);
let msg = ReprocessQueueMessage::UnknownBlockDataColumn(QueuedGossipDataColumn {
beacon_block_root,
process_fn: Box::new(|| {}),
});
queue.handle_message(InboundEvent::Msg(msg));
assert_eq!(queue.awaiting_data_columns_per_root.len(), 1);
assert!(
queue
.awaiting_data_columns_per_root
.contains_key(&beacon_block_root)
);
assert_eq!(queue.data_columns_delay_queue.len(), 1);
// Simulate block import.
queue.handle_message(InboundEvent::Msg(ReprocessQueueMessage::BlockImported {
block_root: beacon_block_root,
parent_root: Hash256::repeat_byte(0x00),
}));
// Internal state should be cleaned up.
assert!(queue.awaiting_data_columns_per_root.is_empty());
assert_eq!(queue.data_columns_delay_queue.len(), 0);
// The column should have been sent to the ready_work channel.
let ready = ready_work_rx.try_recv().expect("column should be ready");
assert!(matches!(ready, ReadyWork::DataColumn(_)));
}
/// Tests that an expired gossip data column is pruned cleanly from all internal state.
#[tokio::test]
async fn prune_awaiting_data_columns_per_root() {
create_test_tracing_subscriber();
let mut queue = test_queue();
// Pause time so it only advances manually
tokio::time::pause();
// Fill the queue to capacity.
for i in 0..MAXIMUM_QUEUED_ENVELOPES {
let block_root = Hash256::repeat_byte(i as u8);
let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope {
beacon_block_slot: Slot::new(1),
beacon_block_root: block_root,
process_fn: Box::pin(async {}),
});
queue.handle_message(InboundEvent::Msg(msg));
}
assert_eq!(
queue.awaiting_envelopes_per_root.len(),
MAXIMUM_QUEUED_ENVELOPES
);
let beacon_block_root = Hash256::repeat_byte(0xcd);
// One more should evict the oldest and insert the new one.
let overflow_root = Hash256::repeat_byte(0xff);
let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope {
beacon_block_slot: Slot::new(1),
beacon_block_root: overflow_root,
process_fn: Box::pin(async {}),
let msg = ReprocessQueueMessage::UnknownBlockDataColumn(QueuedGossipDataColumn {
beacon_block_root,
process_fn: Box::new(|| {}),
});
queue.handle_message(InboundEvent::Msg(msg));
// Queue should still be at capacity, with the new root present.
assert_eq!(
queue.awaiting_envelopes_per_root.len(),
MAXIMUM_QUEUED_ENVELOPES
);
assert_eq!(queue.awaiting_data_columns_per_root.len(), 1);
assert!(
queue
.awaiting_envelopes_per_root
.contains_key(&overflow_root)
.awaiting_data_columns_per_root
.contains_key(&beacon_block_root)
);
// Advance time past the delay so the entry expires.
advance_time(
&queue.slot_clock,
2 * queue.slot_clock.slot_duration() * QUEUED_DATA_COLUMN_DELAY_SLOTS,
)
.await;
let ready_msg = queue.next().await.unwrap();
assert!(matches!(ready_msg, InboundEvent::ReadyDataColumn(_)));
queue.handle_message(ready_msg);
// All internal state should be cleaned up.
assert!(queue.awaiting_data_columns_per_root.is_empty());
}
}