mirror of
https://github.com/sigp/lighthouse.git
synced 2026-07-03 12:54:27 +00:00
Unknown block for envelope (#8992)
Add a queue that allows us to reprocess an envelope when it arrives over gossip references a unknown block root. When the block is finally imported, we immediately reprocess the queued envelope. Note that we don't trigger a block lookup sync. Incoming attestations for this block root will already trigger a lookup for us. I think thats good enough Co-Authored-By: Eitan Seri- Levi <eserilev@gmail.com>
This commit is contained in:
@@ -41,7 +41,8 @@
|
|||||||
pub use crate::scheduler::BeaconProcessorQueueLengths;
|
pub use crate::scheduler::BeaconProcessorQueueLengths;
|
||||||
use crate::scheduler::work_queue::WorkQueues;
|
use crate::scheduler::work_queue::WorkQueues;
|
||||||
use crate::work_reprocessing_queue::{
|
use crate::work_reprocessing_queue::{
|
||||||
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage,
|
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, QueuedGossipEnvelope,
|
||||||
|
ReprocessQueueMessage,
|
||||||
};
|
};
|
||||||
use futures::stream::{Stream, StreamExt};
|
use futures::stream::{Stream, StreamExt};
|
||||||
use futures::task::Poll;
|
use futures::task::Poll;
|
||||||
@@ -242,6 +243,18 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
|
|||||||
process_fn,
|
process_fn,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
ReadyWork::Envelope(QueuedGossipEnvelope {
|
||||||
|
beacon_block_slot,
|
||||||
|
beacon_block_root,
|
||||||
|
process_fn,
|
||||||
|
}) => Self {
|
||||||
|
drop_during_sync: false,
|
||||||
|
work: Work::DelayedImportEnvelope {
|
||||||
|
beacon_block_slot,
|
||||||
|
beacon_block_root,
|
||||||
|
process_fn,
|
||||||
|
},
|
||||||
|
},
|
||||||
ReadyWork::RpcBlock(QueuedRpcBlock {
|
ReadyWork::RpcBlock(QueuedRpcBlock {
|
||||||
beacon_block_root,
|
beacon_block_root,
|
||||||
process_fn,
|
process_fn,
|
||||||
@@ -384,6 +397,11 @@ pub enum Work<E: EthSpec> {
|
|||||||
beacon_block_root: Hash256,
|
beacon_block_root: Hash256,
|
||||||
process_fn: AsyncFn,
|
process_fn: AsyncFn,
|
||||||
},
|
},
|
||||||
|
DelayedImportEnvelope {
|
||||||
|
beacon_block_slot: Slot,
|
||||||
|
beacon_block_root: Hash256,
|
||||||
|
process_fn: AsyncFn,
|
||||||
|
},
|
||||||
GossipVoluntaryExit(BlockingFn),
|
GossipVoluntaryExit(BlockingFn),
|
||||||
GossipProposerSlashing(BlockingFn),
|
GossipProposerSlashing(BlockingFn),
|
||||||
GossipAttesterSlashing(BlockingFn),
|
GossipAttesterSlashing(BlockingFn),
|
||||||
@@ -447,6 +465,7 @@ pub enum WorkType {
|
|||||||
GossipBlobSidecar,
|
GossipBlobSidecar,
|
||||||
GossipDataColumnSidecar,
|
GossipDataColumnSidecar,
|
||||||
DelayedImportBlock,
|
DelayedImportBlock,
|
||||||
|
DelayedImportEnvelope,
|
||||||
GossipVoluntaryExit,
|
GossipVoluntaryExit,
|
||||||
GossipProposerSlashing,
|
GossipProposerSlashing,
|
||||||
GossipAttesterSlashing,
|
GossipAttesterSlashing,
|
||||||
@@ -498,6 +517,7 @@ impl<E: EthSpec> Work<E> {
|
|||||||
Work::GossipBlobSidecar(_) => WorkType::GossipBlobSidecar,
|
Work::GossipBlobSidecar(_) => WorkType::GossipBlobSidecar,
|
||||||
Work::GossipDataColumnSidecar(_) => WorkType::GossipDataColumnSidecar,
|
Work::GossipDataColumnSidecar(_) => WorkType::GossipDataColumnSidecar,
|
||||||
Work::DelayedImportBlock { .. } => WorkType::DelayedImportBlock,
|
Work::DelayedImportBlock { .. } => WorkType::DelayedImportBlock,
|
||||||
|
Work::DelayedImportEnvelope { .. } => WorkType::DelayedImportEnvelope,
|
||||||
Work::GossipVoluntaryExit(_) => WorkType::GossipVoluntaryExit,
|
Work::GossipVoluntaryExit(_) => WorkType::GossipVoluntaryExit,
|
||||||
Work::GossipProposerSlashing(_) => WorkType::GossipProposerSlashing,
|
Work::GossipProposerSlashing(_) => WorkType::GossipProposerSlashing,
|
||||||
Work::GossipAttesterSlashing(_) => WorkType::GossipAttesterSlashing,
|
Work::GossipAttesterSlashing(_) => WorkType::GossipAttesterSlashing,
|
||||||
@@ -793,6 +813,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
|||||||
// on the delayed ones.
|
// on the delayed ones.
|
||||||
} else if let Some(item) = work_queues.delayed_block_queue.pop() {
|
} else if let Some(item) = work_queues.delayed_block_queue.pop() {
|
||||||
Some(item)
|
Some(item)
|
||||||
|
} else if let Some(item) = work_queues.delayed_envelope_queue.pop() {
|
||||||
|
Some(item)
|
||||||
// Check gossip blocks and payloads before gossip attestations, since a block might be
|
// Check gossip blocks and payloads before gossip attestations, since a block might be
|
||||||
// required to verify some attestations.
|
// required to verify some attestations.
|
||||||
} else if let Some(item) = work_queues.gossip_block_queue.pop() {
|
} else if let Some(item) = work_queues.gossip_block_queue.pop() {
|
||||||
@@ -1111,6 +1133,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
|||||||
Work::DelayedImportBlock { .. } => {
|
Work::DelayedImportBlock { .. } => {
|
||||||
work_queues.delayed_block_queue.push(work, work_id)
|
work_queues.delayed_block_queue.push(work, work_id)
|
||||||
}
|
}
|
||||||
|
Work::DelayedImportEnvelope { .. } => {
|
||||||
|
work_queues.delayed_envelope_queue.push(work, work_id)
|
||||||
|
}
|
||||||
Work::GossipVoluntaryExit { .. } => {
|
Work::GossipVoluntaryExit { .. } => {
|
||||||
work_queues.gossip_voluntary_exit_queue.push(work, work_id)
|
work_queues.gossip_voluntary_exit_queue.push(work, work_id)
|
||||||
}
|
}
|
||||||
@@ -1238,6 +1263,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
|||||||
work_queues.gossip_data_column_queue.len()
|
work_queues.gossip_data_column_queue.len()
|
||||||
}
|
}
|
||||||
WorkType::DelayedImportBlock => work_queues.delayed_block_queue.len(),
|
WorkType::DelayedImportBlock => work_queues.delayed_block_queue.len(),
|
||||||
|
WorkType::DelayedImportEnvelope => work_queues.delayed_envelope_queue.len(),
|
||||||
WorkType::GossipVoluntaryExit => {
|
WorkType::GossipVoluntaryExit => {
|
||||||
work_queues.gossip_voluntary_exit_queue.len()
|
work_queues.gossip_voluntary_exit_queue.len()
|
||||||
}
|
}
|
||||||
@@ -1435,6 +1461,11 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
|||||||
beacon_block_slot: _,
|
beacon_block_slot: _,
|
||||||
beacon_block_root: _,
|
beacon_block_root: _,
|
||||||
process_fn,
|
process_fn,
|
||||||
|
}
|
||||||
|
| Work::DelayedImportEnvelope {
|
||||||
|
beacon_block_slot: _,
|
||||||
|
beacon_block_root: _,
|
||||||
|
process_fn,
|
||||||
} => task_spawner.spawn_async(process_fn),
|
} => task_spawner.spawn_async(process_fn),
|
||||||
Work::RpcBlock {
|
Work::RpcBlock {
|
||||||
process_fn,
|
process_fn,
|
||||||
|
|||||||
@@ -127,6 +127,7 @@ pub struct BeaconProcessorQueueLengths {
|
|||||||
gossip_blob_queue: usize,
|
gossip_blob_queue: usize,
|
||||||
gossip_data_column_queue: usize,
|
gossip_data_column_queue: usize,
|
||||||
delayed_block_queue: usize,
|
delayed_block_queue: usize,
|
||||||
|
delayed_envelope_queue: usize,
|
||||||
status_queue: usize,
|
status_queue: usize,
|
||||||
block_brange_queue: usize,
|
block_brange_queue: usize,
|
||||||
block_broots_queue: usize,
|
block_broots_queue: usize,
|
||||||
@@ -197,6 +198,7 @@ impl BeaconProcessorQueueLengths {
|
|||||||
gossip_blob_queue: 1024,
|
gossip_blob_queue: 1024,
|
||||||
gossip_data_column_queue: 1024,
|
gossip_data_column_queue: 1024,
|
||||||
delayed_block_queue: 1024,
|
delayed_block_queue: 1024,
|
||||||
|
delayed_envelope_queue: 1024,
|
||||||
status_queue: 1024,
|
status_queue: 1024,
|
||||||
block_brange_queue: 1024,
|
block_brange_queue: 1024,
|
||||||
block_broots_queue: 1024,
|
block_broots_queue: 1024,
|
||||||
@@ -250,6 +252,7 @@ pub struct WorkQueues<E: EthSpec> {
|
|||||||
pub gossip_blob_queue: FifoQueue<Work<E>>,
|
pub gossip_blob_queue: FifoQueue<Work<E>>,
|
||||||
pub gossip_data_column_queue: FifoQueue<Work<E>>,
|
pub gossip_data_column_queue: FifoQueue<Work<E>>,
|
||||||
pub delayed_block_queue: FifoQueue<Work<E>>,
|
pub delayed_block_queue: FifoQueue<Work<E>>,
|
||||||
|
pub delayed_envelope_queue: FifoQueue<Work<E>>,
|
||||||
pub status_queue: FifoQueue<Work<E>>,
|
pub status_queue: FifoQueue<Work<E>>,
|
||||||
pub block_brange_queue: FifoQueue<Work<E>>,
|
pub block_brange_queue: FifoQueue<Work<E>>,
|
||||||
pub block_broots_queue: FifoQueue<Work<E>>,
|
pub block_broots_queue: FifoQueue<Work<E>>,
|
||||||
@@ -315,6 +318,7 @@ impl<E: EthSpec> WorkQueues<E> {
|
|||||||
let gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue);
|
let gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue);
|
||||||
let gossip_data_column_queue = FifoQueue::new(queue_lengths.gossip_data_column_queue);
|
let gossip_data_column_queue = FifoQueue::new(queue_lengths.gossip_data_column_queue);
|
||||||
let delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue);
|
let delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue);
|
||||||
|
let delayed_envelope_queue = FifoQueue::new(queue_lengths.delayed_envelope_queue);
|
||||||
|
|
||||||
let status_queue = FifoQueue::new(queue_lengths.status_queue);
|
let status_queue = FifoQueue::new(queue_lengths.status_queue);
|
||||||
let block_brange_queue = FifoQueue::new(queue_lengths.block_brange_queue);
|
let block_brange_queue = FifoQueue::new(queue_lengths.block_brange_queue);
|
||||||
@@ -375,6 +379,7 @@ impl<E: EthSpec> WorkQueues<E> {
|
|||||||
gossip_blob_queue,
|
gossip_blob_queue,
|
||||||
gossip_data_column_queue,
|
gossip_data_column_queue,
|
||||||
delayed_block_queue,
|
delayed_block_queue,
|
||||||
|
delayed_envelope_queue,
|
||||||
status_queue,
|
status_queue,
|
||||||
block_brange_queue,
|
block_brange_queue,
|
||||||
block_broots_queue,
|
block_broots_queue,
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ use types::{EthSpec, Hash256, Slot};
|
|||||||
|
|
||||||
const TASK_NAME: &str = "beacon_processor_reprocess_queue";
|
const TASK_NAME: &str = "beacon_processor_reprocess_queue";
|
||||||
const GOSSIP_BLOCKS: &str = "gossip_blocks";
|
const GOSSIP_BLOCKS: &str = "gossip_blocks";
|
||||||
|
const GOSSIP_ENVELOPES: &str = "gossip_envelopes";
|
||||||
const RPC_BLOCKS: &str = "rpc_blocks";
|
const RPC_BLOCKS: &str = "rpc_blocks";
|
||||||
const ATTESTATIONS: &str = "attestations";
|
const ATTESTATIONS: &str = "attestations";
|
||||||
const ATTESTATIONS_PER_ROOT: &str = "attestations_per_root";
|
const ATTESTATIONS_PER_ROOT: &str = "attestations_per_root";
|
||||||
@@ -51,6 +52,10 @@ pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12);
|
|||||||
/// For how long to queue light client updates for re-processing.
|
/// For how long to queue light client updates for re-processing.
|
||||||
pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12);
|
pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12);
|
||||||
|
|
||||||
|
/// Envelope timeout as a multiplier of slot duration. Envelopes waiting for their block will be
|
||||||
|
/// sent for processing after this many slots worth of time, even if the block hasn't arrived.
|
||||||
|
const QUEUED_ENVELOPE_DELAY_SLOTS: u32 = 1;
|
||||||
|
|
||||||
/// For how long to queue rpc blocks before sending them back for reprocessing.
|
/// For how long to queue rpc blocks before sending them back for reprocessing.
|
||||||
pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(4);
|
pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(4);
|
||||||
|
|
||||||
@@ -65,6 +70,9 @@ pub const QUEUED_RECONSTRUCTION_DELAY: Duration = Duration::from_millis(150);
|
|||||||
/// it's nice to have extra protection.
|
/// it's nice to have extra protection.
|
||||||
const MAXIMUM_QUEUED_BLOCKS: usize = 16;
|
const MAXIMUM_QUEUED_BLOCKS: usize = 16;
|
||||||
|
|
||||||
|
/// Set an arbitrary upper-bound on the number of queued envelopes to avoid DoS attacks.
|
||||||
|
const MAXIMUM_QUEUED_ENVELOPES: usize = 16;
|
||||||
|
|
||||||
/// How many attestations we keep before new ones get dropped.
|
/// How many attestations we keep before new ones get dropped.
|
||||||
const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384;
|
const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384;
|
||||||
|
|
||||||
@@ -93,6 +101,8 @@ pub const RECONSTRUCTION_DEADLINE: (u64, u64) = (1, 4);
|
|||||||
pub enum ReprocessQueueMessage {
|
pub enum ReprocessQueueMessage {
|
||||||
/// A block that has been received early and we should queue for later processing.
|
/// A block that has been received early and we should queue for later processing.
|
||||||
EarlyBlock(QueuedGossipBlock),
|
EarlyBlock(QueuedGossipBlock),
|
||||||
|
/// An execution payload envelope that references a block not yet in fork choice.
|
||||||
|
UnknownBlockForEnvelope(QueuedGossipEnvelope),
|
||||||
/// A gossip block for hash `X` is being imported, we should queue the rpc block for the same
|
/// A gossip block for hash `X` is being imported, we should queue the rpc block for the same
|
||||||
/// hash until the gossip block is imported.
|
/// hash until the gossip block is imported.
|
||||||
RpcBlock(QueuedRpcBlock),
|
RpcBlock(QueuedRpcBlock),
|
||||||
@@ -120,6 +130,7 @@ pub enum ReprocessQueueMessage {
|
|||||||
/// Events sent by the scheduler once they are ready for re-processing.
|
/// Events sent by the scheduler once they are ready for re-processing.
|
||||||
pub enum ReadyWork {
|
pub enum ReadyWork {
|
||||||
Block(QueuedGossipBlock),
|
Block(QueuedGossipBlock),
|
||||||
|
Envelope(QueuedGossipEnvelope),
|
||||||
RpcBlock(QueuedRpcBlock),
|
RpcBlock(QueuedRpcBlock),
|
||||||
IgnoredRpcBlock(IgnoredRpcBlock),
|
IgnoredRpcBlock(IgnoredRpcBlock),
|
||||||
Unaggregate(QueuedUnaggregate),
|
Unaggregate(QueuedUnaggregate),
|
||||||
@@ -157,6 +168,13 @@ pub struct QueuedGossipBlock {
|
|||||||
pub process_fn: AsyncFn,
|
pub process_fn: AsyncFn,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// An execution payload envelope that arrived early and has been queued for later import.
|
||||||
|
pub struct QueuedGossipEnvelope {
|
||||||
|
pub beacon_block_slot: Slot,
|
||||||
|
pub beacon_block_root: Hash256,
|
||||||
|
pub process_fn: AsyncFn,
|
||||||
|
}
|
||||||
|
|
||||||
/// A block that arrived for processing when the same block was being imported over gossip.
|
/// A block that arrived for processing when the same block was being imported over gossip.
|
||||||
/// It is queued for later import.
|
/// It is queued for later import.
|
||||||
pub struct QueuedRpcBlock {
|
pub struct QueuedRpcBlock {
|
||||||
@@ -209,6 +227,8 @@ impl<E: EthSpec> From<QueuedBackfillBatch> for WorkEvent<E> {
|
|||||||
enum InboundEvent {
|
enum InboundEvent {
|
||||||
/// A gossip block that was queued for later processing and is ready for import.
|
/// A gossip block that was queued for later processing and is ready for import.
|
||||||
ReadyGossipBlock(QueuedGossipBlock),
|
ReadyGossipBlock(QueuedGossipBlock),
|
||||||
|
/// An envelope whose block has been imported and is now ready for processing.
|
||||||
|
ReadyEnvelope(Hash256),
|
||||||
/// A rpc block that was queued because the same gossip block was being imported
|
/// A rpc block that was queued because the same gossip block was being imported
|
||||||
/// will now be retried for import.
|
/// will now be retried for import.
|
||||||
ReadyRpcBlock(QueuedRpcBlock),
|
ReadyRpcBlock(QueuedRpcBlock),
|
||||||
@@ -234,6 +254,8 @@ struct ReprocessQueue<S> {
|
|||||||
/* Queues */
|
/* Queues */
|
||||||
/// Queue to manage scheduled early blocks.
|
/// Queue to manage scheduled early blocks.
|
||||||
gossip_block_delay_queue: DelayQueue<QueuedGossipBlock>,
|
gossip_block_delay_queue: DelayQueue<QueuedGossipBlock>,
|
||||||
|
/// Queue to manage envelope timeouts (keyed by block root).
|
||||||
|
envelope_delay_queue: DelayQueue<Hash256>,
|
||||||
/// Queue to manage scheduled early blocks.
|
/// Queue to manage scheduled early blocks.
|
||||||
rpc_block_delay_queue: DelayQueue<QueuedRpcBlock>,
|
rpc_block_delay_queue: DelayQueue<QueuedRpcBlock>,
|
||||||
/// Queue to manage scheduled attestations.
|
/// Queue to manage scheduled attestations.
|
||||||
@@ -246,6 +268,8 @@ struct ReprocessQueue<S> {
|
|||||||
/* Queued items */
|
/* Queued items */
|
||||||
/// Queued blocks.
|
/// Queued blocks.
|
||||||
queued_gossip_block_roots: HashSet<Hash256>,
|
queued_gossip_block_roots: HashSet<Hash256>,
|
||||||
|
/// Queued envelopes awaiting their block, keyed by block root.
|
||||||
|
awaiting_envelopes_per_root: HashMap<Hash256, (QueuedGossipEnvelope, DelayKey)>,
|
||||||
/// Queued aggregated attestations.
|
/// Queued aggregated attestations.
|
||||||
queued_aggregates: FnvHashMap<usize, (QueuedAggregate, DelayKey)>,
|
queued_aggregates: FnvHashMap<usize, (QueuedAggregate, DelayKey)>,
|
||||||
/// Queued attestations.
|
/// Queued attestations.
|
||||||
@@ -266,6 +290,7 @@ struct ReprocessQueue<S> {
|
|||||||
next_attestation: usize,
|
next_attestation: usize,
|
||||||
next_lc_update: usize,
|
next_lc_update: usize,
|
||||||
early_block_debounce: TimeLatch,
|
early_block_debounce: TimeLatch,
|
||||||
|
envelope_delay_debounce: TimeLatch,
|
||||||
rpc_block_debounce: TimeLatch,
|
rpc_block_debounce: TimeLatch,
|
||||||
attestation_delay_debounce: TimeLatch,
|
attestation_delay_debounce: TimeLatch,
|
||||||
lc_update_delay_debounce: TimeLatch,
|
lc_update_delay_debounce: TimeLatch,
|
||||||
@@ -315,6 +340,13 @@ impl<S: SlotClock> Stream for ReprocessQueue<S> {
|
|||||||
Poll::Ready(None) | Poll::Pending => (),
|
Poll::Ready(None) | Poll::Pending => (),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
match self.envelope_delay_queue.poll_expired(cx) {
|
||||||
|
Poll::Ready(Some(block_root)) => {
|
||||||
|
return Poll::Ready(Some(InboundEvent::ReadyEnvelope(block_root.into_inner())));
|
||||||
|
}
|
||||||
|
Poll::Ready(None) | Poll::Pending => (),
|
||||||
|
}
|
||||||
|
|
||||||
match self.rpc_block_delay_queue.poll_expired(cx) {
|
match self.rpc_block_delay_queue.poll_expired(cx) {
|
||||||
Poll::Ready(Some(queued_block)) => {
|
Poll::Ready(Some(queued_block)) => {
|
||||||
return Poll::Ready(Some(InboundEvent::ReadyRpcBlock(queued_block.into_inner())));
|
return Poll::Ready(Some(InboundEvent::ReadyRpcBlock(queued_block.into_inner())));
|
||||||
@@ -418,11 +450,13 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
|||||||
work_reprocessing_rx,
|
work_reprocessing_rx,
|
||||||
ready_work_tx,
|
ready_work_tx,
|
||||||
gossip_block_delay_queue: DelayQueue::new(),
|
gossip_block_delay_queue: DelayQueue::new(),
|
||||||
|
envelope_delay_queue: DelayQueue::new(),
|
||||||
rpc_block_delay_queue: DelayQueue::new(),
|
rpc_block_delay_queue: DelayQueue::new(),
|
||||||
attestations_delay_queue: DelayQueue::new(),
|
attestations_delay_queue: DelayQueue::new(),
|
||||||
lc_updates_delay_queue: DelayQueue::new(),
|
lc_updates_delay_queue: DelayQueue::new(),
|
||||||
column_reconstructions_delay_queue: DelayQueue::new(),
|
column_reconstructions_delay_queue: DelayQueue::new(),
|
||||||
queued_gossip_block_roots: HashSet::new(),
|
queued_gossip_block_roots: HashSet::new(),
|
||||||
|
awaiting_envelopes_per_root: HashMap::new(),
|
||||||
queued_lc_updates: FnvHashMap::default(),
|
queued_lc_updates: FnvHashMap::default(),
|
||||||
queued_aggregates: FnvHashMap::default(),
|
queued_aggregates: FnvHashMap::default(),
|
||||||
queued_unaggregates: FnvHashMap::default(),
|
queued_unaggregates: FnvHashMap::default(),
|
||||||
@@ -433,6 +467,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
|||||||
next_attestation: 0,
|
next_attestation: 0,
|
||||||
next_lc_update: 0,
|
next_lc_update: 0,
|
||||||
early_block_debounce: TimeLatch::default(),
|
early_block_debounce: TimeLatch::default(),
|
||||||
|
envelope_delay_debounce: TimeLatch::default(),
|
||||||
rpc_block_debounce: TimeLatch::default(),
|
rpc_block_debounce: TimeLatch::default(),
|
||||||
attestation_delay_debounce: TimeLatch::default(),
|
attestation_delay_debounce: TimeLatch::default(),
|
||||||
lc_update_delay_debounce: TimeLatch::default(),
|
lc_update_delay_debounce: TimeLatch::default(),
|
||||||
@@ -498,6 +533,52 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// An envelope that references an unknown block. Queue it until the block is
|
||||||
|
// imported, or until the timeout expires.
|
||||||
|
InboundEvent::Msg(UnknownBlockForEnvelope(queued_envelope)) => {
|
||||||
|
let block_root = queued_envelope.beacon_block_root;
|
||||||
|
|
||||||
|
// TODO(gloas): Perform lightweight pre-validation before queuing
|
||||||
|
// (e.g. verify builder signature) to prevent unsigned garbage from
|
||||||
|
// consuming queue slots.
|
||||||
|
|
||||||
|
// Don't add the same envelope to the queue twice. This prevents DoS attacks.
|
||||||
|
if self.awaiting_envelopes_per_root.contains_key(&block_root) {
|
||||||
|
trace!(
|
||||||
|
?block_root,
|
||||||
|
"Duplicate envelope for same block root, dropping"
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// When the queue is full, evict the oldest entry to make room for newer envelopes.
|
||||||
|
if self.awaiting_envelopes_per_root.len() >= MAXIMUM_QUEUED_ENVELOPES {
|
||||||
|
if self.envelope_delay_debounce.elapsed() {
|
||||||
|
warn!(
|
||||||
|
queue_size = MAXIMUM_QUEUED_ENVELOPES,
|
||||||
|
msg = "system resources may be saturated",
|
||||||
|
"Envelope delay queue is full, evicting oldest entry"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if let Some(oldest_root) =
|
||||||
|
self.awaiting_envelopes_per_root.keys().next().copied()
|
||||||
|
&& let Some((_envelope, delay_key)) =
|
||||||
|
self.awaiting_envelopes_per_root.remove(&oldest_root)
|
||||||
|
{
|
||||||
|
self.envelope_delay_queue.remove(&delay_key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register the timeout.
|
||||||
|
let delay_key = self.envelope_delay_queue.insert(
|
||||||
|
block_root,
|
||||||
|
self.slot_clock.slot_duration() * QUEUED_ENVELOPE_DELAY_SLOTS,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Store the envelope keyed by block root.
|
||||||
|
self.awaiting_envelopes_per_root
|
||||||
|
.insert(block_root, (queued_envelope, delay_key));
|
||||||
|
}
|
||||||
// A rpc block arrived for processing at the same time when a gossip block
|
// A rpc block arrived for processing at the same time when a gossip block
|
||||||
// for the same block hash is being imported. We wait for `QUEUED_RPC_BLOCK_DELAY`
|
// for the same block hash is being imported. We wait for `QUEUED_RPC_BLOCK_DELAY`
|
||||||
// and then send the rpc block back for processing assuming the gossip import
|
// and then send the rpc block back for processing assuming the gossip import
|
||||||
@@ -647,6 +728,23 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
|||||||
block_root,
|
block_root,
|
||||||
parent_root,
|
parent_root,
|
||||||
}) => {
|
}) => {
|
||||||
|
// Unqueue the envelope we have for this root, if any.
|
||||||
|
if let Some((envelope, delay_key)) =
|
||||||
|
self.awaiting_envelopes_per_root.remove(&block_root)
|
||||||
|
{
|
||||||
|
self.envelope_delay_queue.remove(&delay_key);
|
||||||
|
if self
|
||||||
|
.ready_work_tx
|
||||||
|
.try_send(ReadyWork::Envelope(envelope))
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
error!(
|
||||||
|
?block_root,
|
||||||
|
"Failed to send envelope for reprocessing after block import"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Unqueue the attestations we have for this root, if any.
|
// Unqueue the attestations we have for this root, if any.
|
||||||
if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&block_root) {
|
if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&block_root) {
|
||||||
let mut sent_count = 0;
|
let mut sent_count = 0;
|
||||||
@@ -802,6 +900,25 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
|||||||
error!("Failed to pop queued block");
|
error!("Failed to pop queued block");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// An envelope's timeout has expired. Send it for processing regardless of
|
||||||
|
// whether the block has been imported.
|
||||||
|
InboundEvent::ReadyEnvelope(block_root) => {
|
||||||
|
if let Some((envelope, _delay_key)) =
|
||||||
|
self.awaiting_envelopes_per_root.remove(&block_root)
|
||||||
|
{
|
||||||
|
debug!(
|
||||||
|
?block_root,
|
||||||
|
"Envelope timed out waiting for block, sending for processing"
|
||||||
|
);
|
||||||
|
if self
|
||||||
|
.ready_work_tx
|
||||||
|
.try_send(ReadyWork::Envelope(envelope))
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
error!(?block_root, "Failed to send envelope after timeout");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
InboundEvent::ReadyAttestation(queued_id) => {
|
InboundEvent::ReadyAttestation(queued_id) => {
|
||||||
metrics::inc_counter(
|
metrics::inc_counter(
|
||||||
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS,
|
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS,
|
||||||
@@ -941,6 +1058,11 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
|||||||
&[GOSSIP_BLOCKS],
|
&[GOSSIP_BLOCKS],
|
||||||
self.gossip_block_delay_queue.len() as i64,
|
self.gossip_block_delay_queue.len() as i64,
|
||||||
);
|
);
|
||||||
|
metrics::set_gauge_vec(
|
||||||
|
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
|
||||||
|
&[GOSSIP_ENVELOPES],
|
||||||
|
self.awaiting_envelopes_per_root.len() as i64,
|
||||||
|
);
|
||||||
metrics::set_gauge_vec(
|
metrics::set_gauge_vec(
|
||||||
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
|
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
|
||||||
&[RPC_BLOCKS],
|
&[RPC_BLOCKS],
|
||||||
@@ -1339,4 +1461,163 @@ mod tests {
|
|||||||
assert_eq!(reconstruction.block_root, block_root);
|
assert_eq!(reconstruction.block_root, block_root);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test that envelopes are properly cleaned up from `awaiting_envelopes_per_root` on timeout.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn prune_awaiting_envelopes_per_root() {
|
||||||
|
create_test_tracing_subscriber();
|
||||||
|
|
||||||
|
let mut queue = test_queue();
|
||||||
|
|
||||||
|
// Pause time so it only advances manually
|
||||||
|
tokio::time::pause();
|
||||||
|
|
||||||
|
let beacon_block_root = Hash256::repeat_byte(0xaf);
|
||||||
|
|
||||||
|
// Insert an envelope.
|
||||||
|
let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope {
|
||||||
|
beacon_block_slot: Slot::new(1),
|
||||||
|
beacon_block_root,
|
||||||
|
process_fn: Box::pin(async {}),
|
||||||
|
});
|
||||||
|
|
||||||
|
// Process the event to enter it into the delay queue.
|
||||||
|
queue.handle_message(InboundEvent::Msg(msg));
|
||||||
|
|
||||||
|
// Check that it is queued.
|
||||||
|
assert_eq!(queue.awaiting_envelopes_per_root.len(), 1);
|
||||||
|
assert!(
|
||||||
|
queue
|
||||||
|
.awaiting_envelopes_per_root
|
||||||
|
.contains_key(&beacon_block_root)
|
||||||
|
);
|
||||||
|
|
||||||
|
// Advance time to expire the envelope.
|
||||||
|
advance_time(
|
||||||
|
&queue.slot_clock,
|
||||||
|
queue.slot_clock.slot_duration() * QUEUED_ENVELOPE_DELAY_SLOTS * 2,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let ready_msg = queue.next().await.unwrap();
|
||||||
|
assert!(matches!(ready_msg, InboundEvent::ReadyEnvelope(_)));
|
||||||
|
queue.handle_message(ready_msg);
|
||||||
|
|
||||||
|
// The entry for the block root should be gone.
|
||||||
|
assert!(queue.awaiting_envelopes_per_root.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn envelope_released_on_block_imported() {
|
||||||
|
create_test_tracing_subscriber();
|
||||||
|
|
||||||
|
let mut queue = test_queue();
|
||||||
|
|
||||||
|
// Pause time so it only advances manually
|
||||||
|
tokio::time::pause();
|
||||||
|
|
||||||
|
let beacon_block_root = Hash256::repeat_byte(0xaf);
|
||||||
|
let parent_root = Hash256::repeat_byte(0xab);
|
||||||
|
|
||||||
|
// Insert an envelope.
|
||||||
|
let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope {
|
||||||
|
beacon_block_slot: Slot::new(1),
|
||||||
|
beacon_block_root,
|
||||||
|
process_fn: Box::pin(async {}),
|
||||||
|
});
|
||||||
|
|
||||||
|
// Process the event to enter it into the delay queue.
|
||||||
|
queue.handle_message(InboundEvent::Msg(msg));
|
||||||
|
|
||||||
|
// Check that it is queued.
|
||||||
|
assert_eq!(queue.awaiting_envelopes_per_root.len(), 1);
|
||||||
|
|
||||||
|
// Simulate block import.
|
||||||
|
let imported = ReprocessQueueMessage::BlockImported {
|
||||||
|
block_root: beacon_block_root,
|
||||||
|
parent_root,
|
||||||
|
};
|
||||||
|
queue.handle_message(InboundEvent::Msg(imported));
|
||||||
|
|
||||||
|
// The entry for the block root should be gone.
|
||||||
|
assert!(queue.awaiting_envelopes_per_root.is_empty());
|
||||||
|
// Delay queue entry should also be cancelled.
|
||||||
|
assert_eq!(queue.envelope_delay_queue.len(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn envelope_dedup_drops_second() {
|
||||||
|
create_test_tracing_subscriber();
|
||||||
|
|
||||||
|
let mut queue = test_queue();
|
||||||
|
|
||||||
|
// Pause time so it only advances manually
|
||||||
|
tokio::time::pause();
|
||||||
|
|
||||||
|
let beacon_block_root = Hash256::repeat_byte(0xaf);
|
||||||
|
|
||||||
|
// Insert an envelope.
|
||||||
|
let msg1 = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope {
|
||||||
|
beacon_block_slot: Slot::new(1),
|
||||||
|
beacon_block_root,
|
||||||
|
process_fn: Box::pin(async {}),
|
||||||
|
});
|
||||||
|
let msg2 = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope {
|
||||||
|
beacon_block_slot: Slot::new(1),
|
||||||
|
beacon_block_root,
|
||||||
|
process_fn: Box::pin(async {}),
|
||||||
|
});
|
||||||
|
|
||||||
|
// Process both events.
|
||||||
|
queue.handle_message(InboundEvent::Msg(msg1));
|
||||||
|
queue.handle_message(InboundEvent::Msg(msg2));
|
||||||
|
|
||||||
|
// Only one should be queued.
|
||||||
|
assert_eq!(queue.awaiting_envelopes_per_root.len(), 1);
|
||||||
|
assert_eq!(queue.envelope_delay_queue.len(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn envelope_capacity_evicts_oldest() {
|
||||||
|
create_test_tracing_subscriber();
|
||||||
|
|
||||||
|
let mut queue = test_queue();
|
||||||
|
|
||||||
|
// Pause time so it only advances manually
|
||||||
|
tokio::time::pause();
|
||||||
|
|
||||||
|
// Fill the queue to capacity.
|
||||||
|
for i in 0..MAXIMUM_QUEUED_ENVELOPES {
|
||||||
|
let block_root = Hash256::repeat_byte(i as u8);
|
||||||
|
let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope {
|
||||||
|
beacon_block_slot: Slot::new(1),
|
||||||
|
beacon_block_root: block_root,
|
||||||
|
process_fn: Box::pin(async {}),
|
||||||
|
});
|
||||||
|
queue.handle_message(InboundEvent::Msg(msg));
|
||||||
|
}
|
||||||
|
assert_eq!(
|
||||||
|
queue.awaiting_envelopes_per_root.len(),
|
||||||
|
MAXIMUM_QUEUED_ENVELOPES
|
||||||
|
);
|
||||||
|
|
||||||
|
// One more should evict the oldest and insert the new one.
|
||||||
|
let overflow_root = Hash256::repeat_byte(0xff);
|
||||||
|
let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope {
|
||||||
|
beacon_block_slot: Slot::new(1),
|
||||||
|
beacon_block_root: overflow_root,
|
||||||
|
process_fn: Box::pin(async {}),
|
||||||
|
});
|
||||||
|
queue.handle_message(InboundEvent::Msg(msg));
|
||||||
|
|
||||||
|
// Queue should still be at capacity, with the new root present.
|
||||||
|
assert_eq!(
|
||||||
|
queue.awaiting_envelopes_per_root.len(),
|
||||||
|
MAXIMUM_QUEUED_ENVELOPES
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
queue
|
||||||
|
.awaiting_envelopes_per_root
|
||||||
|
.contains_key(&overflow_root)
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,7 +20,9 @@ use beacon_chain::{
|
|||||||
};
|
};
|
||||||
use beacon_chain::{
|
use beacon_chain::{
|
||||||
blob_verification::{GossipBlobError, GossipVerifiedBlob},
|
blob_verification::{GossipBlobError, GossipVerifiedBlob},
|
||||||
payload_envelope_verification::gossip_verified_envelope::GossipVerifiedEnvelope,
|
payload_envelope_verification::{
|
||||||
|
EnvelopeError, gossip_verified_envelope::GossipVerifiedEnvelope,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
use beacon_processor::{Work, WorkEvent};
|
use beacon_processor::{Work, WorkEvent};
|
||||||
use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource};
|
use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource};
|
||||||
@@ -49,8 +51,8 @@ use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction;
|
|||||||
use beacon_processor::{
|
use beacon_processor::{
|
||||||
DuplicateCache, GossipAggregatePackage, GossipAttestationBatch,
|
DuplicateCache, GossipAggregatePackage, GossipAttestationBatch,
|
||||||
work_reprocessing_queue::{
|
work_reprocessing_queue::{
|
||||||
QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate,
|
QueuedAggregate, QueuedGossipBlock, QueuedGossipEnvelope, QueuedLightClientUpdate,
|
||||||
ReprocessQueueMessage,
|
QueuedUnaggregate, ReprocessQueueMessage,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -3332,6 +3334,61 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
|
|
||||||
verified_envelope
|
verified_envelope
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Err(EnvelopeError::BlockRootUnknown { block_root }) => {
|
||||||
|
let envelope_slot = envelope.slot();
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
?block_root,
|
||||||
|
%envelope_slot,
|
||||||
|
"Envelope references unknown block, deferring to reprocess queue"
|
||||||
|
);
|
||||||
|
|
||||||
|
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
|
||||||
|
|
||||||
|
let inner_self = self.clone();
|
||||||
|
let chain = self.chain.clone();
|
||||||
|
let process_fn = Box::pin(async move {
|
||||||
|
match chain.verify_envelope_for_gossip(envelope).await {
|
||||||
|
Ok(verified_envelope) => {
|
||||||
|
inner_self
|
||||||
|
.process_gossip_verified_execution_payload_envelope(
|
||||||
|
peer_id,
|
||||||
|
verified_envelope,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
debug!(
|
||||||
|
error = ?e,
|
||||||
|
"Deferred envelope failed verification"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if self
|
||||||
|
.beacon_processor_send
|
||||||
|
.try_send(WorkEvent {
|
||||||
|
drop_during_sync: false,
|
||||||
|
work: Work::Reprocess(ReprocessQueueMessage::UnknownBlockForEnvelope(
|
||||||
|
QueuedGossipEnvelope {
|
||||||
|
beacon_block_slot: envelope_slot,
|
||||||
|
beacon_block_root: block_root,
|
||||||
|
process_fn,
|
||||||
|
},
|
||||||
|
)),
|
||||||
|
})
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
error!(
|
||||||
|
%envelope_slot,
|
||||||
|
?block_root,
|
||||||
|
"Failed to defer envelope import"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return None;
|
||||||
|
}
|
||||||
// TODO(gloas) penalize peers accordingly
|
// TODO(gloas) penalize peers accordingly
|
||||||
Err(_) => return None,
|
Err(_) => return None,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -2090,3 +2090,8 @@ async fn test_data_columns_by_range_no_duplicates_with_skip_slots() {
|
|||||||
unique_roots.len(),
|
unique_roots.len(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(ePBS): Add integration tests for envelope deferral (UnknownBlockForEnvelope):
|
||||||
|
// 1. Gossip envelope arrives before its block → queued via UnknownBlockForEnvelope
|
||||||
|
// 2. Block imported → envelope released and processed successfully
|
||||||
|
// 3. Timeout path → envelope released and re-verified
|
||||||
|
|||||||
Reference in New Issue
Block a user