resolve merge conflicts

This commit is contained in:
Eitan Seri-Levi
2026-04-30 01:51:26 +02:00
544 changed files with 48684 additions and 18351 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1 +1,4 @@
pub mod work_queue;
pub mod work_reprocessing_queue;
pub use work_queue::BeaconProcessorQueueLengths;

View File

@@ -0,0 +1,448 @@
use crate::Work;
use logging::TimeLatch;
use std::collections::VecDeque;
use tracing::error;
use types::{BeaconState, ChainSpec, EthSpec, RelativeEpoch};
/// Over-provision queues based on active validator count by some factor. The beacon chain has
/// strict churns that prevent the validator set size from changing rapidly. By over-provisioning
/// slightly, we don't need to adjust the queues during the lifetime of a process.
const ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT: usize = 110;
/// Minimum size of dynamically sized queues. Due to integer division we don't want 0 length queues
/// as the processor won't process that message type. 128 is an arbitrary value value >= 1 that
/// seems reasonable.
const MIN_QUEUE_LEN: usize = 128;
/// A simple first-in-first-out queue with a maximum length.
pub struct FifoQueue<T> {
queue: VecDeque<T>,
max_length: usize,
}
impl<T> FifoQueue<T> {
/// Create a new, empty queue with the given length.
pub fn new(max_length: usize) -> Self {
Self {
queue: VecDeque::default(),
max_length,
}
}
/// Add a new item to the queue.
///
/// Drops `item` if the queue is full.
pub fn push(&mut self, item: T, item_desc: &str) {
if self.queue.len() == self.max_length {
error!(
queue = item_desc,
queue_len = self.max_length,
msg = "the system has insufficient resources for load",
"Work queue is full",
)
} else {
self.queue.push_back(item);
}
}
/// Remove the next item from the queue.
pub fn pop(&mut self) -> Option<T> {
self.queue.pop_front()
}
/// Returns the current length of the queue.
pub fn len(&self) -> usize {
self.queue.len()
}
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
}
/// A simple last-in-first-out queue with a maximum length.
pub struct LifoQueue<T> {
queue: VecDeque<T>,
pub max_length: usize,
}
impl<T> LifoQueue<T> {
/// Create a new, empty queue with the given length.
pub fn new(max_length: usize) -> Self {
Self {
queue: VecDeque::default(),
max_length,
}
}
/// Add a new item to the front of the queue.
///
/// If the queue is full, the item at the back of the queue is dropped.
pub fn push(&mut self, item: T) {
if self.queue.len() == self.max_length {
self.queue.pop_back();
}
self.queue.push_front(item);
}
/// Remove the next item from the queue.
pub fn pop(&mut self) -> Option<T> {
self.queue.pop_front()
}
/// Returns `true` if the queue is full.
pub fn is_full(&self) -> bool {
self.queue.len() >= self.max_length
}
/// Returns the current length of the queue.
pub fn len(&self) -> usize {
self.queue.len()
}
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
}
/// Maximum number of queued items that will be stored before dropping them
pub struct BeaconProcessorQueueLengths {
aggregate_queue: usize,
attestation_queue: usize,
unknown_block_aggregate_queue: usize,
unknown_block_attestation_queue: usize,
sync_message_queue: usize,
sync_contribution_queue: usize,
gossip_voluntary_exit_queue: usize,
gossip_proposer_slashing_queue: usize,
gossip_attester_slashing_queue: usize,
unknown_light_client_update_queue: usize,
rpc_block_queue: usize,
rpc_blob_queue: usize,
rpc_custody_column_queue: usize,
column_reconstruction_queue: usize,
chain_segment_queue: usize,
backfill_chain_segment: usize,
gossip_block_queue: usize,
gossip_blob_queue: usize,
gossip_data_column_queue: usize,
gossip_partial_data_column_queue: usize,
delayed_block_queue: usize,
delayed_envelope_queue: usize,
status_queue: usize,
block_brange_queue: usize,
block_broots_queue: usize,
blob_broots_queue: usize,
blob_brange_queue: usize,
dcbroots_queue: usize,
dcbrange_queue: usize,
payload_envelopes_brange_queue: usize,
payload_envelopes_broots_queue: usize,
gossip_bls_to_execution_change_queue: usize,
gossip_execution_payload_queue: usize,
gossip_execution_payload_bid_queue: usize,
gossip_payload_attestation_queue: usize,
gossip_proposer_preferences_queue: usize,
lc_bootstrap_queue: usize,
lc_rpc_optimistic_update_queue: usize,
lc_rpc_finality_update_queue: usize,
lc_gossip_finality_update_queue: usize,
lc_gossip_optimistic_update_queue: usize,
lc_update_range_queue: usize,
// TODO(focil) pick proper values
gossip_inclusion_list_queue: usize,
api_request_p0_queue: usize,
api_request_p1_queue: usize,
}
impl BeaconProcessorQueueLengths {
pub fn from_state<E: EthSpec>(
state: &BeaconState<E>,
spec: &ChainSpec,
) -> Result<Self, String> {
let active_validator_count =
match state.get_cached_active_validator_indices(RelativeEpoch::Current) {
Ok(indices) => indices.len(),
Err(_) => state
.get_active_validator_indices(state.current_epoch(), spec)
.map_err(|e| format!("Error computing active indices: {:?}", e))?
.len(),
};
let active_validator_count =
(ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT * active_validator_count) / 100;
let slots_per_epoch = E::slots_per_epoch() as usize;
Ok(Self {
aggregate_queue: 4096,
unknown_block_aggregate_queue: 1024,
// Capacity for a full slot's worth of attestations if subscribed to all subnets
attestation_queue: std::cmp::max(
active_validator_count / slots_per_epoch,
MIN_QUEUE_LEN,
),
// Capacity for a full slot's worth of attestations if subscribed to all subnets
unknown_block_attestation_queue: std::cmp::max(
active_validator_count / slots_per_epoch,
MIN_QUEUE_LEN,
),
sync_message_queue: 2048,
sync_contribution_queue: 1024,
gossip_voluntary_exit_queue: 4096,
gossip_proposer_slashing_queue: 4096,
gossip_attester_slashing_queue: 4096,
unknown_light_client_update_queue: 128,
rpc_block_queue: 1024,
rpc_blob_queue: 1024,
// We don't request more than `PARENT_DEPTH_TOLERANCE` (32) lookups, so we can limit
// this queue size. With 48 max blobs per block, each column sidecar list could be up to 12MB.
rpc_custody_column_queue: 64,
column_reconstruction_queue: 1,
chain_segment_queue: 64,
backfill_chain_segment: 64,
gossip_block_queue: 1024,
gossip_blob_queue: 1024,
gossip_data_column_queue: 1024,
gossip_partial_data_column_queue: 1024,
delayed_block_queue: 1024,
delayed_envelope_queue: 1024,
status_queue: 1024,
block_brange_queue: 1024,
block_broots_queue: 1024,
blob_broots_queue: 1024,
blob_brange_queue: 1024,
dcbroots_queue: 1024,
dcbrange_queue: 1024,
payload_envelopes_brange_queue: 1024,
payload_envelopes_broots_queue: 1024,
gossip_bls_to_execution_change_queue: 16384,
// TODO(EIP-7732): verify 1024 is preferable. I used same value as `gossip_block_queue` and `gossip_blob_queue`
gossip_execution_payload_queue: 1024,
// TODO(EIP-7732) how big should this queue be?
gossip_execution_payload_bid_queue: 1024,
// PTC size ~512 per slot, buffer 2-3 slots for reorgs and processing delays (512 * 3 = 1536)
// TODO(EIP-7732): verify if this is preferable queue length or otherwise
gossip_payload_attestation_queue: 1536,
// TODO(EIP-7732): verify if this is preferable queue length
gossip_proposer_preferences_queue: 1024,
lc_gossip_finality_update_queue: 1024,
lc_gossip_optimistic_update_queue: 1024,
lc_bootstrap_queue: 1024,
lc_rpc_optimistic_update_queue: 512,
lc_rpc_finality_update_queue: 512,
lc_update_range_queue: 512,
gossip_inclusion_list_queue: 64,
api_request_p0_queue: 1024,
api_request_p1_queue: 1024,
})
}
}
pub struct WorkQueues<E: EthSpec> {
pub aggregate_queue: LifoQueue<Work<E>>,
pub aggregate_debounce: TimeLatch,
pub attestation_queue: LifoQueue<Work<E>>,
pub attestation_to_convert_queue: LifoQueue<Work<E>>,
pub attestation_debounce: TimeLatch,
pub unknown_block_aggregate_queue: LifoQueue<Work<E>>,
pub unknown_block_attestation_queue: LifoQueue<Work<E>>,
pub sync_message_queue: LifoQueue<Work<E>>,
pub sync_contribution_queue: LifoQueue<Work<E>>,
pub gossip_voluntary_exit_queue: FifoQueue<Work<E>>,
pub gossip_proposer_slashing_queue: FifoQueue<Work<E>>,
pub gossip_attester_slashing_queue: FifoQueue<Work<E>>,
pub unknown_light_client_update_queue: FifoQueue<Work<E>>,
pub rpc_block_queue: FifoQueue<Work<E>>,
pub rpc_blob_queue: FifoQueue<Work<E>>,
pub rpc_custody_column_queue: FifoQueue<Work<E>>,
pub column_reconstruction_queue: LifoQueue<Work<E>>,
pub chain_segment_queue: FifoQueue<Work<E>>,
pub backfill_chain_segment: FifoQueue<Work<E>>,
pub gossip_block_queue: FifoQueue<Work<E>>,
pub gossip_blob_queue: FifoQueue<Work<E>>,
pub gossip_data_column_queue: FifoQueue<Work<E>>,
pub gossip_partial_data_column_queue: FifoQueue<Work<E>>,
pub delayed_block_queue: FifoQueue<Work<E>>,
pub delayed_envelope_queue: FifoQueue<Work<E>>,
pub status_queue: FifoQueue<Work<E>>,
pub block_brange_queue: FifoQueue<Work<E>>,
pub block_broots_queue: FifoQueue<Work<E>>,
pub payload_envelopes_brange_queue: FifoQueue<Work<E>>,
pub payload_envelopes_broots_queue: FifoQueue<Work<E>>,
pub blob_broots_queue: FifoQueue<Work<E>>,
pub blob_brange_queue: FifoQueue<Work<E>>,
pub dcbroots_queue: FifoQueue<Work<E>>,
pub dcbrange_queue: FifoQueue<Work<E>>,
pub gossip_bls_to_execution_change_queue: FifoQueue<Work<E>>,
pub gossip_execution_payload_queue: FifoQueue<Work<E>>,
pub gossip_execution_payload_bid_queue: FifoQueue<Work<E>>,
pub gossip_payload_attestation_queue: FifoQueue<Work<E>>,
pub gossip_proposer_preferences_queue: FifoQueue<Work<E>>,
pub lc_gossip_finality_update_queue: FifoQueue<Work<E>>,
pub lc_gossip_optimistic_update_queue: FifoQueue<Work<E>>,
pub lc_bootstrap_queue: FifoQueue<Work<E>>,
pub lc_rpc_optimistic_update_queue: FifoQueue<Work<E>>,
pub lc_rpc_finality_update_queue: FifoQueue<Work<E>>,
pub lc_update_range_queue: FifoQueue<Work<E>>,
pub gossip_inclusion_list_queue: FifoQueue<Work<E>>,
pub api_request_p0_queue: FifoQueue<Work<E>>,
pub api_request_p1_queue: FifoQueue<Work<E>>,
}
impl<E: EthSpec> WorkQueues<E> {
pub fn new(queue_lengths: BeaconProcessorQueueLengths) -> Self {
// Using LIFO queues for attestations since validator profits rely upon getting fresh
// attestations into blocks. Additionally, later attestations contain more information than
// earlier ones, so we consider them more valuable.
let aggregate_queue = LifoQueue::new(queue_lengths.aggregate_queue);
let aggregate_debounce = TimeLatch::default();
let attestation_queue = LifoQueue::new(queue_lengths.attestation_queue);
let attestation_to_convert_queue = LifoQueue::new(queue_lengths.attestation_queue);
let attestation_debounce = TimeLatch::default();
let unknown_block_aggregate_queue =
LifoQueue::new(queue_lengths.unknown_block_aggregate_queue);
let unknown_block_attestation_queue =
LifoQueue::new(queue_lengths.unknown_block_attestation_queue);
let sync_message_queue = LifoQueue::new(queue_lengths.sync_message_queue);
let sync_contribution_queue = LifoQueue::new(queue_lengths.sync_contribution_queue);
// Using a FIFO queue for voluntary exits since it prevents exit censoring. I don't have
// a strong feeling about queue type for exits.
let gossip_voluntary_exit_queue = FifoQueue::new(queue_lengths.gossip_voluntary_exit_queue);
// Using a FIFO queue for slashing to prevent people from flushing their slashings from the
// queues with lots of junk messages.
let gossip_proposer_slashing_queue =
FifoQueue::new(queue_lengths.gossip_proposer_slashing_queue);
let gossip_attester_slashing_queue =
FifoQueue::new(queue_lengths.gossip_attester_slashing_queue);
// Using a FIFO queue for light client updates to maintain sequence order.
let unknown_light_client_update_queue =
FifoQueue::new(queue_lengths.unknown_light_client_update_queue);
// Using a FIFO queue since blocks need to be imported sequentially.
let rpc_block_queue = FifoQueue::new(queue_lengths.rpc_block_queue);
let rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue);
let rpc_custody_column_queue = FifoQueue::new(queue_lengths.rpc_custody_column_queue);
let column_reconstruction_queue = LifoQueue::new(queue_lengths.column_reconstruction_queue);
let chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue);
let backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment);
let gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue);
let gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue);
let gossip_data_column_queue = FifoQueue::new(queue_lengths.gossip_data_column_queue);
let gossip_partial_data_column_queue =
FifoQueue::new(queue_lengths.gossip_partial_data_column_queue);
let delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue);
let delayed_envelope_queue = FifoQueue::new(queue_lengths.delayed_envelope_queue);
let status_queue = FifoQueue::new(queue_lengths.status_queue);
let block_brange_queue = FifoQueue::new(queue_lengths.block_brange_queue);
let block_broots_queue = FifoQueue::new(queue_lengths.block_broots_queue);
let blob_broots_queue = FifoQueue::new(queue_lengths.blob_broots_queue);
let blob_brange_queue = FifoQueue::new(queue_lengths.blob_brange_queue);
let dcbroots_queue = FifoQueue::new(queue_lengths.dcbroots_queue);
let dcbrange_queue = FifoQueue::new(queue_lengths.dcbrange_queue);
let payload_envelopes_brange_queue =
FifoQueue::new(queue_lengths.payload_envelopes_brange_queue);
let payload_envelopes_broots_queue =
FifoQueue::new(queue_lengths.payload_envelopes_broots_queue);
let gossip_bls_to_execution_change_queue =
FifoQueue::new(queue_lengths.gossip_bls_to_execution_change_queue);
let gossip_execution_payload_queue =
FifoQueue::new(queue_lengths.gossip_execution_payload_queue);
let gossip_execution_payload_bid_queue =
FifoQueue::new(queue_lengths.gossip_execution_payload_bid_queue);
let gossip_payload_attestation_queue =
FifoQueue::new(queue_lengths.gossip_payload_attestation_queue);
let gossip_proposer_preferences_queue =
FifoQueue::new(queue_lengths.gossip_proposer_preferences_queue);
let lc_gossip_optimistic_update_queue =
FifoQueue::new(queue_lengths.lc_gossip_optimistic_update_queue);
let lc_gossip_finality_update_queue =
FifoQueue::new(queue_lengths.lc_gossip_finality_update_queue);
let lc_bootstrap_queue = FifoQueue::new(queue_lengths.lc_bootstrap_queue);
let lc_rpc_optimistic_update_queue =
FifoQueue::new(queue_lengths.lc_rpc_optimistic_update_queue);
let lc_rpc_finality_update_queue =
FifoQueue::new(queue_lengths.lc_rpc_finality_update_queue);
let lc_update_range_queue: FifoQueue<Work<E>> =
FifoQueue::new(queue_lengths.lc_update_range_queue);
let gossip_inclusion_list_queue =
FifoQueue::new(queue_lengths.gossip_inclusion_list_queue);
let api_request_p0_queue = FifoQueue::new(queue_lengths.api_request_p0_queue);
let api_request_p1_queue = FifoQueue::new(queue_lengths.api_request_p1_queue);
WorkQueues {
aggregate_queue,
aggregate_debounce,
attestation_queue,
attestation_to_convert_queue,
attestation_debounce,
unknown_block_aggregate_queue,
unknown_block_attestation_queue,
sync_message_queue,
sync_contribution_queue,
gossip_voluntary_exit_queue,
gossip_proposer_slashing_queue,
gossip_attester_slashing_queue,
unknown_light_client_update_queue,
rpc_block_queue,
rpc_blob_queue,
rpc_custody_column_queue,
chain_segment_queue,
column_reconstruction_queue,
backfill_chain_segment,
gossip_block_queue,
gossip_blob_queue,
gossip_data_column_queue,
gossip_partial_data_column_queue,
delayed_block_queue,
delayed_envelope_queue,
status_queue,
block_brange_queue,
block_broots_queue,
blob_broots_queue,
blob_brange_queue,
dcbroots_queue,
dcbrange_queue,
payload_envelopes_brange_queue,
payload_envelopes_broots_queue,
gossip_bls_to_execution_change_queue,
gossip_execution_payload_queue,
gossip_execution_payload_bid_queue,
gossip_payload_attestation_queue,
gossip_proposer_preferences_queue,
lc_gossip_optimistic_update_queue,
lc_gossip_finality_update_queue,
lc_bootstrap_queue,
lc_rpc_optimistic_update_queue,
lc_rpc_finality_update_queue,
lc_update_range_queue,
gossip_inclusion_list_queue,
api_request_p0_queue,
api_request_p1_queue,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use types::{BeaconState, ChainSpec, Eth1Data, ForkName, MainnetEthSpec};
#[test]
fn min_queue_len() {
// State with no validators.
let spec = ForkName::latest().make_genesis_spec(ChainSpec::mainnet());
let genesis_time = 0;
let state = BeaconState::<MainnetEthSpec>::new(genesis_time, Eth1Data::default(), &spec);
assert_eq!(state.validators().len(), 0);
let queue_lengths = BeaconProcessorQueueLengths::from_state(&state, &spec).unwrap();
assert_eq!(queue_lengths.attestation_queue, MIN_QUEUE_LEN);
assert_eq!(queue_lengths.unknown_block_attestation_queue, MIN_QUEUE_LEN);
}
}

View File

@@ -35,6 +35,7 @@ use types::{EthSpec, Hash256, Slot};
const TASK_NAME: &str = "beacon_processor_reprocess_queue";
const GOSSIP_BLOCKS: &str = "gossip_blocks";
const GOSSIP_ENVELOPES: &str = "gossip_envelopes";
const RPC_BLOCKS: &str = "rpc_blocks";
const ATTESTATIONS: &str = "attestations";
const ATTESTATIONS_PER_ROOT: &str = "attestations_per_root";
@@ -51,6 +52,10 @@ pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12);
/// For how long to queue light client updates for re-processing.
pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12);
/// Envelope timeout as a multiplier of slot duration. Envelopes waiting for their block will be
/// sent for processing after this many slots worth of time, even if the block hasn't arrived.
const QUEUED_ENVELOPE_DELAY_SLOTS: u32 = 1;
/// For how long to queue rpc blocks before sending them back for reprocessing.
pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(4);
@@ -65,6 +70,9 @@ pub const QUEUED_RECONSTRUCTION_DELAY: Duration = Duration::from_millis(150);
/// it's nice to have extra protection.
const MAXIMUM_QUEUED_BLOCKS: usize = 16;
/// Set an arbitrary upper-bound on the number of queued envelopes to avoid DoS attacks.
const MAXIMUM_QUEUED_ENVELOPES: usize = 16;
/// How many attestations we keep before new ones get dropped.
const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384;
@@ -93,6 +101,8 @@ pub const RECONSTRUCTION_DEADLINE: (u64, u64) = (1, 4);
pub enum ReprocessQueueMessage {
/// A block that has been received early and we should queue for later processing.
EarlyBlock(QueuedGossipBlock),
/// An execution payload envelope that references a block not yet in fork choice.
UnknownBlockForEnvelope(QueuedGossipEnvelope),
/// A gossip block for hash `X` is being imported, we should queue the rpc block for the same
/// hash until the gossip block is imported.
RpcBlock(QueuedRpcBlock),
@@ -120,6 +130,7 @@ pub enum ReprocessQueueMessage {
/// Events sent by the scheduler once they are ready for re-processing.
pub enum ReadyWork {
Block(QueuedGossipBlock),
Envelope(QueuedGossipEnvelope),
RpcBlock(QueuedRpcBlock),
IgnoredRpcBlock(IgnoredRpcBlock),
Unaggregate(QueuedUnaggregate),
@@ -157,6 +168,13 @@ pub struct QueuedGossipBlock {
pub process_fn: AsyncFn,
}
/// An execution payload envelope that arrived early and has been queued for later import.
pub struct QueuedGossipEnvelope {
pub beacon_block_slot: Slot,
pub beacon_block_root: Hash256,
pub process_fn: AsyncFn,
}
/// A block that arrived for processing when the same block was being imported over gossip.
/// It is queued for later import.
pub struct QueuedRpcBlock {
@@ -209,6 +227,8 @@ impl<E: EthSpec> From<QueuedBackfillBatch> for WorkEvent<E> {
enum InboundEvent {
/// A gossip block that was queued for later processing and is ready for import.
ReadyGossipBlock(QueuedGossipBlock),
/// An envelope whose block has been imported and is now ready for processing.
ReadyEnvelope(Hash256),
/// A rpc block that was queued because the same gossip block was being imported
/// will now be retried for import.
ReadyRpcBlock(QueuedRpcBlock),
@@ -234,6 +254,8 @@ struct ReprocessQueue<S> {
/* Queues */
/// Queue to manage scheduled early blocks.
gossip_block_delay_queue: DelayQueue<QueuedGossipBlock>,
/// Queue to manage envelope timeouts (keyed by block root).
envelope_delay_queue: DelayQueue<Hash256>,
/// Queue to manage scheduled early blocks.
rpc_block_delay_queue: DelayQueue<QueuedRpcBlock>,
/// Queue to manage scheduled attestations.
@@ -246,6 +268,8 @@ struct ReprocessQueue<S> {
/* Queued items */
/// Queued blocks.
queued_gossip_block_roots: HashSet<Hash256>,
/// Queued envelopes awaiting their block, keyed by block root.
awaiting_envelopes_per_root: HashMap<Hash256, (QueuedGossipEnvelope, DelayKey)>,
/// Queued aggregated attestations.
queued_aggregates: FnvHashMap<usize, (QueuedAggregate, DelayKey)>,
/// Queued attestations.
@@ -266,6 +290,7 @@ struct ReprocessQueue<S> {
next_attestation: usize,
next_lc_update: usize,
early_block_debounce: TimeLatch,
envelope_delay_debounce: TimeLatch,
rpc_block_debounce: TimeLatch,
attestation_delay_debounce: TimeLatch,
lc_update_delay_debounce: TimeLatch,
@@ -315,6 +340,13 @@ impl<S: SlotClock> Stream for ReprocessQueue<S> {
Poll::Ready(None) | Poll::Pending => (),
}
match self.envelope_delay_queue.poll_expired(cx) {
Poll::Ready(Some(block_root)) => {
return Poll::Ready(Some(InboundEvent::ReadyEnvelope(block_root.into_inner())));
}
Poll::Ready(None) | Poll::Pending => (),
}
match self.rpc_block_delay_queue.poll_expired(cx) {
Poll::Ready(Some(queued_block)) => {
return Poll::Ready(Some(InboundEvent::ReadyRpcBlock(queued_block.into_inner())));
@@ -418,11 +450,13 @@ impl<S: SlotClock> ReprocessQueue<S> {
work_reprocessing_rx,
ready_work_tx,
gossip_block_delay_queue: DelayQueue::new(),
envelope_delay_queue: DelayQueue::new(),
rpc_block_delay_queue: DelayQueue::new(),
attestations_delay_queue: DelayQueue::new(),
lc_updates_delay_queue: DelayQueue::new(),
column_reconstructions_delay_queue: DelayQueue::new(),
queued_gossip_block_roots: HashSet::new(),
awaiting_envelopes_per_root: HashMap::new(),
queued_lc_updates: FnvHashMap::default(),
queued_aggregates: FnvHashMap::default(),
queued_unaggregates: FnvHashMap::default(),
@@ -433,6 +467,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
next_attestation: 0,
next_lc_update: 0,
early_block_debounce: TimeLatch::default(),
envelope_delay_debounce: TimeLatch::default(),
rpc_block_debounce: TimeLatch::default(),
attestation_delay_debounce: TimeLatch::default(),
lc_update_delay_debounce: TimeLatch::default(),
@@ -498,6 +533,52 @@ impl<S: SlotClock> ReprocessQueue<S> {
}
}
}
// An envelope that references an unknown block. Queue it until the block is
// imported, or until the timeout expires.
InboundEvent::Msg(UnknownBlockForEnvelope(queued_envelope)) => {
let block_root = queued_envelope.beacon_block_root;
// TODO(gloas): Perform lightweight pre-validation before queuing
// (e.g. verify builder signature) to prevent unsigned garbage from
// consuming queue slots.
// Don't add the same envelope to the queue twice. This prevents DoS attacks.
if self.awaiting_envelopes_per_root.contains_key(&block_root) {
trace!(
?block_root,
"Duplicate envelope for same block root, dropping"
);
return;
}
// When the queue is full, evict the oldest entry to make room for newer envelopes.
if self.awaiting_envelopes_per_root.len() >= MAXIMUM_QUEUED_ENVELOPES {
if self.envelope_delay_debounce.elapsed() {
warn!(
queue_size = MAXIMUM_QUEUED_ENVELOPES,
msg = "system resources may be saturated",
"Envelope delay queue is full, evicting oldest entry"
);
}
if let Some(oldest_root) =
self.awaiting_envelopes_per_root.keys().next().copied()
&& let Some((_envelope, delay_key)) =
self.awaiting_envelopes_per_root.remove(&oldest_root)
{
self.envelope_delay_queue.remove(&delay_key);
}
}
// Register the timeout.
let delay_key = self.envelope_delay_queue.insert(
block_root,
self.slot_clock.slot_duration() * QUEUED_ENVELOPE_DELAY_SLOTS,
);
// Store the envelope keyed by block root.
self.awaiting_envelopes_per_root
.insert(block_root, (queued_envelope, delay_key));
}
// A rpc block arrived for processing at the same time when a gossip block
// for the same block hash is being imported. We wait for `QUEUED_RPC_BLOCK_DELAY`
// and then send the rpc block back for processing assuming the gossip import
@@ -647,6 +728,23 @@ impl<S: SlotClock> ReprocessQueue<S> {
block_root,
parent_root,
}) => {
// Unqueue the envelope we have for this root, if any.
if let Some((envelope, delay_key)) =
self.awaiting_envelopes_per_root.remove(&block_root)
{
self.envelope_delay_queue.remove(&delay_key);
if self
.ready_work_tx
.try_send(ReadyWork::Envelope(envelope))
.is_err()
{
error!(
?block_root,
"Failed to send envelope for reprocessing after block import"
);
}
}
// Unqueue the attestations we have for this root, if any.
if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&block_root) {
let mut sent_count = 0;
@@ -802,6 +900,25 @@ impl<S: SlotClock> ReprocessQueue<S> {
error!("Failed to pop queued block");
}
}
// An envelope's timeout has expired. Send it for processing regardless of
// whether the block has been imported.
InboundEvent::ReadyEnvelope(block_root) => {
if let Some((envelope, _delay_key)) =
self.awaiting_envelopes_per_root.remove(&block_root)
{
debug!(
?block_root,
"Envelope timed out waiting for block, sending for processing"
);
if self
.ready_work_tx
.try_send(ReadyWork::Envelope(envelope))
.is_err()
{
error!(?block_root, "Failed to send envelope after timeout");
}
}
}
InboundEvent::ReadyAttestation(queued_id) => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS,
@@ -941,6 +1058,11 @@ impl<S: SlotClock> ReprocessQueue<S> {
&[GOSSIP_BLOCKS],
self.gossip_block_delay_queue.len() as i64,
);
metrics::set_gauge_vec(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
&[GOSSIP_ENVELOPES],
self.awaiting_envelopes_per_root.len() as i64,
);
metrics::set_gauge_vec(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
&[RPC_BLOCKS],
@@ -1339,4 +1461,163 @@ mod tests {
assert_eq!(reconstruction.block_root, block_root);
}
}
// Test that envelopes are properly cleaned up from `awaiting_envelopes_per_root` on timeout.
#[tokio::test]
async fn prune_awaiting_envelopes_per_root() {
create_test_tracing_subscriber();
let mut queue = test_queue();
// Pause time so it only advances manually
tokio::time::pause();
let beacon_block_root = Hash256::repeat_byte(0xaf);
// Insert an envelope.
let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope {
beacon_block_slot: Slot::new(1),
beacon_block_root,
process_fn: Box::pin(async {}),
});
// Process the event to enter it into the delay queue.
queue.handle_message(InboundEvent::Msg(msg));
// Check that it is queued.
assert_eq!(queue.awaiting_envelopes_per_root.len(), 1);
assert!(
queue
.awaiting_envelopes_per_root
.contains_key(&beacon_block_root)
);
// Advance time to expire the envelope.
advance_time(
&queue.slot_clock,
queue.slot_clock.slot_duration() * QUEUED_ENVELOPE_DELAY_SLOTS * 2,
)
.await;
let ready_msg = queue.next().await.unwrap();
assert!(matches!(ready_msg, InboundEvent::ReadyEnvelope(_)));
queue.handle_message(ready_msg);
// The entry for the block root should be gone.
assert!(queue.awaiting_envelopes_per_root.is_empty());
}
#[tokio::test]
async fn envelope_released_on_block_imported() {
create_test_tracing_subscriber();
let mut queue = test_queue();
// Pause time so it only advances manually
tokio::time::pause();
let beacon_block_root = Hash256::repeat_byte(0xaf);
let parent_root = Hash256::repeat_byte(0xab);
// Insert an envelope.
let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope {
beacon_block_slot: Slot::new(1),
beacon_block_root,
process_fn: Box::pin(async {}),
});
// Process the event to enter it into the delay queue.
queue.handle_message(InboundEvent::Msg(msg));
// Check that it is queued.
assert_eq!(queue.awaiting_envelopes_per_root.len(), 1);
// Simulate block import.
let imported = ReprocessQueueMessage::BlockImported {
block_root: beacon_block_root,
parent_root,
};
queue.handle_message(InboundEvent::Msg(imported));
// The entry for the block root should be gone.
assert!(queue.awaiting_envelopes_per_root.is_empty());
// Delay queue entry should also be cancelled.
assert_eq!(queue.envelope_delay_queue.len(), 0);
}
#[tokio::test]
async fn envelope_dedup_drops_second() {
create_test_tracing_subscriber();
let mut queue = test_queue();
// Pause time so it only advances manually
tokio::time::pause();
let beacon_block_root = Hash256::repeat_byte(0xaf);
// Insert an envelope.
let msg1 = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope {
beacon_block_slot: Slot::new(1),
beacon_block_root,
process_fn: Box::pin(async {}),
});
let msg2 = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope {
beacon_block_slot: Slot::new(1),
beacon_block_root,
process_fn: Box::pin(async {}),
});
// Process both events.
queue.handle_message(InboundEvent::Msg(msg1));
queue.handle_message(InboundEvent::Msg(msg2));
// Only one should be queued.
assert_eq!(queue.awaiting_envelopes_per_root.len(), 1);
assert_eq!(queue.envelope_delay_queue.len(), 1);
}
#[tokio::test]
async fn envelope_capacity_evicts_oldest() {
create_test_tracing_subscriber();
let mut queue = test_queue();
// Pause time so it only advances manually
tokio::time::pause();
// Fill the queue to capacity.
for i in 0..MAXIMUM_QUEUED_ENVELOPES {
let block_root = Hash256::repeat_byte(i as u8);
let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope {
beacon_block_slot: Slot::new(1),
beacon_block_root: block_root,
process_fn: Box::pin(async {}),
});
queue.handle_message(InboundEvent::Msg(msg));
}
assert_eq!(
queue.awaiting_envelopes_per_root.len(),
MAXIMUM_QUEUED_ENVELOPES
);
// One more should evict the oldest and insert the new one.
let overflow_root = Hash256::repeat_byte(0xff);
let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope {
beacon_block_slot: Slot::new(1),
beacon_block_root: overflow_root,
process_fn: Box::pin(async {}),
});
queue.handle_message(InboundEvent::Msg(msg));
// Queue should still be at capacity, with the new root present.
assert_eq!(
queue.awaiting_envelopes_per_root.len(),
MAXIMUM_QUEUED_ENVELOPES
);
assert!(
queue
.awaiting_envelopes_per_root
.contains_key(&overflow_root)
);
}
}