add reprocess queue

This commit is contained in:
Eitan Seri- Levi
2026-03-02 13:02:56 -08:00
parent a937802dc3
commit 73caa1d1b1
4 changed files with 166 additions and 5 deletions

View File

@@ -41,7 +41,8 @@
pub use crate::scheduler::BeaconProcessorQueueLengths;
use crate::scheduler::work_queue::WorkQueues;
use crate::work_reprocessing_queue::{
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage,
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, QueuedGossipEnvelope,
ReprocessQueueMessage,
};
use futures::stream::{Stream, StreamExt};
use futures::task::Poll;
@@ -242,6 +243,18 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
process_fn,
},
},
ReadyWork::Envelope(QueuedGossipEnvelope {
beacon_block_slot,
beacon_block_root,
process_fn,
}) => Self {
drop_during_sync: false,
work: Work::DelayedImportEnvelope {
beacon_block_slot,
beacon_block_root,
process_fn,
},
},
ReadyWork::RpcBlock(QueuedRpcBlock {
beacon_block_root,
process_fn,
@@ -384,6 +397,11 @@ pub enum Work<E: EthSpec> {
beacon_block_root: Hash256,
process_fn: AsyncFn,
},
DelayedImportEnvelope {
beacon_block_slot: Slot,
beacon_block_root: Hash256,
process_fn: AsyncFn,
},
GossipVoluntaryExit(BlockingFn),
GossipProposerSlashing(BlockingFn),
GossipAttesterSlashing(BlockingFn),
@@ -447,6 +465,7 @@ pub enum WorkType {
GossipBlobSidecar,
GossipDataColumnSidecar,
DelayedImportBlock,
DelayedImportEnvelope,
GossipVoluntaryExit,
GossipProposerSlashing,
GossipAttesterSlashing,
@@ -498,6 +517,7 @@ impl<E: EthSpec> Work<E> {
Work::GossipBlobSidecar(_) => WorkType::GossipBlobSidecar,
Work::GossipDataColumnSidecar(_) => WorkType::GossipDataColumnSidecar,
Work::DelayedImportBlock { .. } => WorkType::DelayedImportBlock,
Work::DelayedImportEnvelope { .. } => WorkType::DelayedImportEnvelope,
Work::GossipVoluntaryExit(_) => WorkType::GossipVoluntaryExit,
Work::GossipProposerSlashing(_) => WorkType::GossipProposerSlashing,
Work::GossipAttesterSlashing(_) => WorkType::GossipAttesterSlashing,
@@ -793,6 +813,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
// on the delayed ones.
} else if let Some(item) = work_queues.delayed_block_queue.pop() {
Some(item)
} else if let Some(item) = work_queues.delayed_envelope_queue.pop() {
Some(item)
// Check gossip blocks and payloads before gossip attestations, since a block might be
// required to verify some attestations.
} else if let Some(item) = work_queues.gossip_block_queue.pop() {
@@ -1111,6 +1133,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::DelayedImportBlock { .. } => {
work_queues.delayed_block_queue.push(work, work_id)
}
Work::DelayedImportEnvelope { .. } => {
work_queues.delayed_envelope_queue.push(work, work_id)
}
Work::GossipVoluntaryExit { .. } => {
work_queues.gossip_voluntary_exit_queue.push(work, work_id)
}
@@ -1238,6 +1263,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
work_queues.gossip_data_column_queue.len()
}
WorkType::DelayedImportBlock => work_queues.delayed_block_queue.len(),
WorkType::DelayedImportEnvelope => work_queues.delayed_envelope_queue.len(),
WorkType::GossipVoluntaryExit => {
work_queues.gossip_voluntary_exit_queue.len()
}
@@ -1435,6 +1461,11 @@ impl<E: EthSpec> BeaconProcessor<E> {
beacon_block_slot: _,
beacon_block_root: _,
process_fn,
}
| Work::DelayedImportEnvelope {
beacon_block_slot: _,
beacon_block_root: _,
process_fn,
} => task_spawner.spawn_async(process_fn),
Work::RpcBlock {
process_fn,

View File

@@ -127,6 +127,7 @@ pub struct BeaconProcessorQueueLengths {
gossip_blob_queue: usize,
gossip_data_column_queue: usize,
delayed_block_queue: usize,
delayed_envelope_queue: usize,
status_queue: usize,
block_brange_queue: usize,
block_broots_queue: usize,
@@ -197,6 +198,7 @@ impl BeaconProcessorQueueLengths {
gossip_blob_queue: 1024,
gossip_data_column_queue: 1024,
delayed_block_queue: 1024,
delayed_envelope_queue: 1024,
status_queue: 1024,
block_brange_queue: 1024,
block_broots_queue: 1024,
@@ -250,6 +252,7 @@ pub struct WorkQueues<E: EthSpec> {
pub gossip_blob_queue: FifoQueue<Work<E>>,
pub gossip_data_column_queue: FifoQueue<Work<E>>,
pub delayed_block_queue: FifoQueue<Work<E>>,
pub delayed_envelope_queue: FifoQueue<Work<E>>,
pub status_queue: FifoQueue<Work<E>>,
pub block_brange_queue: FifoQueue<Work<E>>,
pub block_broots_queue: FifoQueue<Work<E>>,
@@ -315,6 +318,7 @@ impl<E: EthSpec> WorkQueues<E> {
let gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue);
let gossip_data_column_queue = FifoQueue::new(queue_lengths.gossip_data_column_queue);
let delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue);
let delayed_envelope_queue = FifoQueue::new(queue_lengths.delayed_envelope_queue);
let status_queue = FifoQueue::new(queue_lengths.status_queue);
let block_brange_queue = FifoQueue::new(queue_lengths.block_brange_queue);
@@ -375,6 +379,7 @@ impl<E: EthSpec> WorkQueues<E> {
gossip_blob_queue,
gossip_data_column_queue,
delayed_block_queue,
delayed_envelope_queue,
status_queue,
block_brange_queue,
block_broots_queue,

View File

@@ -35,6 +35,7 @@ use types::{EthSpec, Hash256, Slot};
const TASK_NAME: &str = "beacon_processor_reprocess_queue";
const GOSSIP_BLOCKS: &str = "gossip_blocks";
const GOSSIP_ENVELOPES: &str = "gossip_envelopes";
const RPC_BLOCKS: &str = "rpc_blocks";
const ATTESTATIONS: &str = "attestations";
const ATTESTATIONS_PER_ROOT: &str = "attestations_per_root";
@@ -51,6 +52,10 @@ pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12);
/// For how long to queue light client updates for re-processing.
pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12);
/// Envelope timeout as a multiplier of slot duration. Envelopes waiting for their block will be
/// sent for processing after this many slots worth of time, even if the block hasn't arrived.
const QUEUED_ENVELOPE_DELAY_SLOTS: u32 = 1;
/// For how long to queue rpc blocks before sending them back for reprocessing.
pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(4);
@@ -65,6 +70,9 @@ pub const QUEUED_RECONSTRUCTION_DELAY: Duration = Duration::from_millis(150);
/// it's nice to have extra protection.
const MAXIMUM_QUEUED_BLOCKS: usize = 16;
/// Set an arbitrary upper-bound on the number of queued envelopes to avoid DoS attacks.
const MAXIMUM_QUEUED_ENVELOPES: usize = 16;
/// How many attestations we keep before new ones get dropped.
const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384;
@@ -93,6 +101,8 @@ pub const RECONSTRUCTION_DEADLINE: (u64, u64) = (1, 4);
pub enum ReprocessQueueMessage {
/// A block that has been received early and we should queue for later processing.
EarlyBlock(QueuedGossipBlock),
/// An execution payload envelope that references a block not yet in fork choice.
UnknownBlockEnvelope(QueuedGossipEnvelope),
/// A gossip block for hash `X` is being imported, we should queue the rpc block for the same
/// hash until the gossip block is imported.
RpcBlock(QueuedRpcBlock),
@@ -120,6 +130,7 @@ pub enum ReprocessQueueMessage {
/// Events sent by the scheduler once they are ready for re-processing.
pub enum ReadyWork {
Block(QueuedGossipBlock),
Envelope(QueuedGossipEnvelope),
RpcBlock(QueuedRpcBlock),
IgnoredRpcBlock(IgnoredRpcBlock),
Unaggregate(QueuedUnaggregate),
@@ -157,6 +168,13 @@ pub struct QueuedGossipBlock {
pub process_fn: AsyncFn,
}
/// An execution payload envelope that arrived early and has been queued for later import.
pub struct QueuedGossipEnvelope {
pub beacon_block_slot: Slot,
pub beacon_block_root: Hash256,
pub process_fn: AsyncFn,
}
/// A block that arrived for processing when the same block was being imported over gossip.
/// It is queued for later import.
pub struct QueuedRpcBlock {
@@ -209,6 +227,8 @@ impl<E: EthSpec> From<QueuedBackfillBatch> for WorkEvent<E> {
enum InboundEvent {
/// A gossip block that was queued for later processing and is ready for import.
ReadyGossipBlock(QueuedGossipBlock),
/// An envelope whose block has been imported and is now ready for processing.
ReadyEnvelope(Hash256),
/// A rpc block that was queued because the same gossip block was being imported
/// will now be retried for import.
ReadyRpcBlock(QueuedRpcBlock),
@@ -234,6 +254,8 @@ struct ReprocessQueue<S> {
/* Queues */
/// Queue to manage scheduled early blocks.
gossip_block_delay_queue: DelayQueue<QueuedGossipBlock>,
/// Queue to manage envelope timeouts (keyed by block root).
envelope_delay_queue: DelayQueue<Hash256>,
/// Queue to manage scheduled early blocks.
rpc_block_delay_queue: DelayQueue<QueuedRpcBlock>,
/// Queue to manage scheduled attestations.
@@ -246,6 +268,8 @@ struct ReprocessQueue<S> {
/* Queued items */
/// Queued blocks.
queued_gossip_block_roots: HashSet<Hash256>,
/// Queued envelopes awaiting their block, keyed by block root.
awaiting_envelopes_per_root: HashMap<Hash256, (QueuedGossipEnvelope, DelayKey)>,
/// Queued aggregated attestations.
queued_aggregates: FnvHashMap<usize, (QueuedAggregate, DelayKey)>,
/// Queued attestations.
@@ -266,6 +290,7 @@ struct ReprocessQueue<S> {
next_attestation: usize,
next_lc_update: usize,
early_block_debounce: TimeLatch,
envelope_delay_debounce: TimeLatch,
rpc_block_debounce: TimeLatch,
attestation_delay_debounce: TimeLatch,
lc_update_delay_debounce: TimeLatch,
@@ -315,6 +340,13 @@ impl<S: SlotClock> Stream for ReprocessQueue<S> {
Poll::Ready(None) | Poll::Pending => (),
}
match self.envelope_delay_queue.poll_expired(cx) {
Poll::Ready(Some(block_root)) => {
return Poll::Ready(Some(InboundEvent::ReadyEnvelope(block_root.into_inner())));
}
Poll::Ready(None) | Poll::Pending => (),
}
match self.rpc_block_delay_queue.poll_expired(cx) {
Poll::Ready(Some(queued_block)) => {
return Poll::Ready(Some(InboundEvent::ReadyRpcBlock(queued_block.into_inner())));
@@ -418,11 +450,13 @@ impl<S: SlotClock> ReprocessQueue<S> {
work_reprocessing_rx,
ready_work_tx,
gossip_block_delay_queue: DelayQueue::new(),
envelope_delay_queue: DelayQueue::new(),
rpc_block_delay_queue: DelayQueue::new(),
attestations_delay_queue: DelayQueue::new(),
lc_updates_delay_queue: DelayQueue::new(),
column_reconstructions_delay_queue: DelayQueue::new(),
queued_gossip_block_roots: HashSet::new(),
awaiting_envelopes_per_root: HashMap::new(),
queued_lc_updates: FnvHashMap::default(),
queued_aggregates: FnvHashMap::default(),
queued_unaggregates: FnvHashMap::default(),
@@ -433,6 +467,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
next_attestation: 0,
next_lc_update: 0,
early_block_debounce: TimeLatch::default(),
envelope_delay_debounce: TimeLatch::default(),
rpc_block_debounce: TimeLatch::default(),
attestation_delay_debounce: TimeLatch::default(),
lc_update_delay_debounce: TimeLatch::default(),
@@ -498,6 +533,39 @@ impl<S: SlotClock> ReprocessQueue<S> {
}
}
}
// An envelope that references an unknown block. Queue it until the block is
// imported, or until the timeout expires.
InboundEvent::Msg(UnknownBlockEnvelope(queued_envelope)) => {
let block_root = queued_envelope.beacon_block_root;
// Don't add the same envelope to the queue twice. This prevents DoS attacks.
if self.awaiting_envelopes_per_root.contains_key(&block_root) {
return;
}
// Check to ensure this won't over-fill the queue.
if self.awaiting_envelopes_per_root.len() >= MAXIMUM_QUEUED_ENVELOPES {
if self.envelope_delay_debounce.elapsed() {
warn!(
queue_size = MAXIMUM_QUEUED_ENVELOPES,
msg = "system resources may be saturated",
"Envelope delay queue is full"
);
}
// Drop the envelope.
return;
}
// Register the timeout.
let delay_key = self.envelope_delay_queue.insert(
block_root,
self.slot_clock.slot_duration() * QUEUED_ENVELOPE_DELAY_SLOTS,
);
// Store the envelope keyed by block root.
self.awaiting_envelopes_per_root
.insert(block_root, (queued_envelope, delay_key));
}
// A rpc block arrived for processing at the same time when a gossip block
// for the same block hash is being imported. We wait for `QUEUED_RPC_BLOCK_DELAY`
// and then send the rpc block back for processing assuming the gossip import
@@ -647,6 +715,23 @@ impl<S: SlotClock> ReprocessQueue<S> {
block_root,
parent_root,
}) => {
// Unqueue the envelope we have for this root, if any.
if let Some((envelope, delay_key)) =
self.awaiting_envelopes_per_root.remove(&block_root)
{
self.envelope_delay_queue.remove(&delay_key);
if self
.ready_work_tx
.try_send(ReadyWork::Envelope(envelope))
.is_err()
{
error!(
?block_root,
"Failed to send envelope for reprocessing after block import"
);
}
}
// Unqueue the attestations we have for this root, if any.
if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&block_root) {
let mut sent_count = 0;
@@ -802,6 +887,21 @@ impl<S: SlotClock> ReprocessQueue<S> {
error!("Failed to pop queued block");
}
}
// An envelope's timeout has expired. Send it for processing regardless of
// whether the block has been imported.
InboundEvent::ReadyEnvelope(block_root) => {
if let Some((envelope, _delay_key)) =
self.awaiting_envelopes_per_root.remove(&block_root)
{
if self
.ready_work_tx
.try_send(ReadyWork::Envelope(envelope))
.is_err()
{
error!(?block_root, "Failed to send envelope after timeout");
}
}
}
InboundEvent::ReadyAttestation(queued_id) => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS,
@@ -941,6 +1041,11 @@ impl<S: SlotClock> ReprocessQueue<S> {
&[GOSSIP_BLOCKS],
self.gossip_block_delay_queue.len() as i64,
);
metrics::set_gauge_vec(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
&[GOSSIP_ENVELOPES],
self.awaiting_envelopes_per_root.len() as i64,
);
metrics::set_gauge_vec(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
&[RPC_BLOCKS],

View File

@@ -49,8 +49,8 @@ use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction;
use beacon_processor::{
DuplicateCache, GossipAggregatePackage, GossipAttestationBatch,
work_reprocessing_queue::{
QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate,
ReprocessQueueMessage,
QueuedAggregate, QueuedGossipBlock, QueuedGossipEnvelope, QueuedLightClientUpdate,
QueuedUnaggregate, ReprocessQueueMessage,
},
};
@@ -3347,7 +3347,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// TODO(gloas) update metrics to note how early the envelope arrived
let inner_self = self.clone();
let _process_fn = Box::pin(async move {
let process_fn = Box::pin(async move {
inner_self
.process_gossip_verified_execution_payload_envelope(
peer_id,
@@ -3356,7 +3356,27 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.await;
});
// TODO(gloas) send to reprocess queue
if self
.beacon_processor_send
.try_send(WorkEvent {
drop_during_sync: false,
work: Work::Reprocess(ReprocessQueueMessage::UnknownBlockEnvelope(
QueuedGossipEnvelope {
beacon_block_slot: envelope_slot,
beacon_block_root,
process_fn,
},
)),
})
.is_err()
{
error!(
%envelope_slot,
?beacon_block_root,
location = "envelope gossip",
"Failed to defer envelope import"
);
}
None
}
Ok(_) => Some(verified_envelope),