diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 33a00bfa49..c33f4840e0 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -41,7 +41,8 @@ pub use crate::scheduler::BeaconProcessorQueueLengths; use crate::scheduler::work_queue::WorkQueues; use crate::work_reprocessing_queue::{ - QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage, + QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, QueuedGossipEnvelope, + ReprocessQueueMessage, }; use futures::stream::{Stream, StreamExt}; use futures::task::Poll; @@ -242,6 +243,18 @@ impl From for WorkEvent { process_fn, }, }, + ReadyWork::Envelope(QueuedGossipEnvelope { + beacon_block_slot, + beacon_block_root, + process_fn, + }) => Self { + drop_during_sync: false, + work: Work::DelayedImportEnvelope { + beacon_block_slot, + beacon_block_root, + process_fn, + }, + }, ReadyWork::RpcBlock(QueuedRpcBlock { beacon_block_root, process_fn, @@ -384,6 +397,11 @@ pub enum Work { beacon_block_root: Hash256, process_fn: AsyncFn, }, + DelayedImportEnvelope { + beacon_block_slot: Slot, + beacon_block_root: Hash256, + process_fn: AsyncFn, + }, GossipVoluntaryExit(BlockingFn), GossipProposerSlashing(BlockingFn), GossipAttesterSlashing(BlockingFn), @@ -447,6 +465,7 @@ pub enum WorkType { GossipBlobSidecar, GossipDataColumnSidecar, DelayedImportBlock, + DelayedImportEnvelope, GossipVoluntaryExit, GossipProposerSlashing, GossipAttesterSlashing, @@ -498,6 +517,7 @@ impl Work { Work::GossipBlobSidecar(_) => WorkType::GossipBlobSidecar, Work::GossipDataColumnSidecar(_) => WorkType::GossipDataColumnSidecar, Work::DelayedImportBlock { .. } => WorkType::DelayedImportBlock, + Work::DelayedImportEnvelope { .. } => WorkType::DelayedImportEnvelope, Work::GossipVoluntaryExit(_) => WorkType::GossipVoluntaryExit, Work::GossipProposerSlashing(_) => WorkType::GossipProposerSlashing, Work::GossipAttesterSlashing(_) => WorkType::GossipAttesterSlashing, @@ -793,6 +813,8 @@ impl BeaconProcessor { // on the delayed ones. } else if let Some(item) = work_queues.delayed_block_queue.pop() { Some(item) + } else if let Some(item) = work_queues.delayed_envelope_queue.pop() { + Some(item) // Check gossip blocks and payloads before gossip attestations, since a block might be // required to verify some attestations. } else if let Some(item) = work_queues.gossip_block_queue.pop() { @@ -1111,6 +1133,9 @@ impl BeaconProcessor { Work::DelayedImportBlock { .. } => { work_queues.delayed_block_queue.push(work, work_id) } + Work::DelayedImportEnvelope { .. } => { + work_queues.delayed_envelope_queue.push(work, work_id) + } Work::GossipVoluntaryExit { .. } => { work_queues.gossip_voluntary_exit_queue.push(work, work_id) } @@ -1238,6 +1263,7 @@ impl BeaconProcessor { work_queues.gossip_data_column_queue.len() } WorkType::DelayedImportBlock => work_queues.delayed_block_queue.len(), + WorkType::DelayedImportEnvelope => work_queues.delayed_envelope_queue.len(), WorkType::GossipVoluntaryExit => { work_queues.gossip_voluntary_exit_queue.len() } @@ -1435,6 +1461,11 @@ impl BeaconProcessor { beacon_block_slot: _, beacon_block_root: _, process_fn, + } + | Work::DelayedImportEnvelope { + beacon_block_slot: _, + beacon_block_root: _, + process_fn, } => task_spawner.spawn_async(process_fn), Work::RpcBlock { process_fn, diff --git a/beacon_node/beacon_processor/src/scheduler/work_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_queue.rs index 934659b304..e48c776b6d 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_queue.rs @@ -127,6 +127,7 @@ pub struct BeaconProcessorQueueLengths { gossip_blob_queue: usize, gossip_data_column_queue: usize, delayed_block_queue: usize, + delayed_envelope_queue: usize, status_queue: usize, block_brange_queue: usize, block_broots_queue: usize, @@ -197,6 +198,7 @@ impl BeaconProcessorQueueLengths { gossip_blob_queue: 1024, gossip_data_column_queue: 1024, delayed_block_queue: 1024, + delayed_envelope_queue: 1024, status_queue: 1024, block_brange_queue: 1024, block_broots_queue: 1024, @@ -250,6 +252,7 @@ pub struct WorkQueues { pub gossip_blob_queue: FifoQueue>, pub gossip_data_column_queue: FifoQueue>, pub delayed_block_queue: FifoQueue>, + pub delayed_envelope_queue: FifoQueue>, pub status_queue: FifoQueue>, pub block_brange_queue: FifoQueue>, pub block_broots_queue: FifoQueue>, @@ -315,6 +318,7 @@ impl WorkQueues { let gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue); let gossip_data_column_queue = FifoQueue::new(queue_lengths.gossip_data_column_queue); let delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue); + let delayed_envelope_queue = FifoQueue::new(queue_lengths.delayed_envelope_queue); let status_queue = FifoQueue::new(queue_lengths.status_queue); let block_brange_queue = FifoQueue::new(queue_lengths.block_brange_queue); @@ -375,6 +379,7 @@ impl WorkQueues { gossip_blob_queue, gossip_data_column_queue, delayed_block_queue, + delayed_envelope_queue, status_queue, block_brange_queue, block_broots_queue, diff --git a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs index c99388287c..6cb82a2906 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs @@ -35,6 +35,7 @@ use types::{EthSpec, Hash256, Slot}; const TASK_NAME: &str = "beacon_processor_reprocess_queue"; const GOSSIP_BLOCKS: &str = "gossip_blocks"; +const GOSSIP_ENVELOPES: &str = "gossip_envelopes"; const RPC_BLOCKS: &str = "rpc_blocks"; const ATTESTATIONS: &str = "attestations"; const ATTESTATIONS_PER_ROOT: &str = "attestations_per_root"; @@ -51,6 +52,10 @@ pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12); /// For how long to queue light client updates for re-processing. pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12); +/// Envelope timeout as a multiplier of slot duration. Envelopes waiting for their block will be +/// sent for processing after this many slots worth of time, even if the block hasn't arrived. +const QUEUED_ENVELOPE_DELAY_SLOTS: u32 = 1; + /// For how long to queue rpc blocks before sending them back for reprocessing. pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(4); @@ -65,6 +70,9 @@ pub const QUEUED_RECONSTRUCTION_DELAY: Duration = Duration::from_millis(150); /// it's nice to have extra protection. const MAXIMUM_QUEUED_BLOCKS: usize = 16; +/// Set an arbitrary upper-bound on the number of queued envelopes to avoid DoS attacks. +const MAXIMUM_QUEUED_ENVELOPES: usize = 16; + /// How many attestations we keep before new ones get dropped. const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384; @@ -93,6 +101,8 @@ pub const RECONSTRUCTION_DEADLINE: (u64, u64) = (1, 4); pub enum ReprocessQueueMessage { /// A block that has been received early and we should queue for later processing. EarlyBlock(QueuedGossipBlock), + /// An execution payload envelope that references a block not yet in fork choice. + UnknownBlockEnvelope(QueuedGossipEnvelope), /// A gossip block for hash `X` is being imported, we should queue the rpc block for the same /// hash until the gossip block is imported. RpcBlock(QueuedRpcBlock), @@ -120,6 +130,7 @@ pub enum ReprocessQueueMessage { /// Events sent by the scheduler once they are ready for re-processing. pub enum ReadyWork { Block(QueuedGossipBlock), + Envelope(QueuedGossipEnvelope), RpcBlock(QueuedRpcBlock), IgnoredRpcBlock(IgnoredRpcBlock), Unaggregate(QueuedUnaggregate), @@ -157,6 +168,13 @@ pub struct QueuedGossipBlock { pub process_fn: AsyncFn, } +/// An execution payload envelope that arrived early and has been queued for later import. +pub struct QueuedGossipEnvelope { + pub beacon_block_slot: Slot, + pub beacon_block_root: Hash256, + pub process_fn: AsyncFn, +} + /// A block that arrived for processing when the same block was being imported over gossip. /// It is queued for later import. pub struct QueuedRpcBlock { @@ -209,6 +227,8 @@ impl From for WorkEvent { enum InboundEvent { /// A gossip block that was queued for later processing and is ready for import. ReadyGossipBlock(QueuedGossipBlock), + /// An envelope whose block has been imported and is now ready for processing. + ReadyEnvelope(Hash256), /// A rpc block that was queued because the same gossip block was being imported /// will now be retried for import. ReadyRpcBlock(QueuedRpcBlock), @@ -234,6 +254,8 @@ struct ReprocessQueue { /* Queues */ /// Queue to manage scheduled early blocks. gossip_block_delay_queue: DelayQueue, + /// Queue to manage envelope timeouts (keyed by block root). + envelope_delay_queue: DelayQueue, /// Queue to manage scheduled early blocks. rpc_block_delay_queue: DelayQueue, /// Queue to manage scheduled attestations. @@ -246,6 +268,8 @@ struct ReprocessQueue { /* Queued items */ /// Queued blocks. queued_gossip_block_roots: HashSet, + /// Queued envelopes awaiting their block, keyed by block root. + awaiting_envelopes_per_root: HashMap, /// Queued aggregated attestations. queued_aggregates: FnvHashMap, /// Queued attestations. @@ -266,6 +290,7 @@ struct ReprocessQueue { next_attestation: usize, next_lc_update: usize, early_block_debounce: TimeLatch, + envelope_delay_debounce: TimeLatch, rpc_block_debounce: TimeLatch, attestation_delay_debounce: TimeLatch, lc_update_delay_debounce: TimeLatch, @@ -315,6 +340,13 @@ impl Stream for ReprocessQueue { Poll::Ready(None) | Poll::Pending => (), } + match self.envelope_delay_queue.poll_expired(cx) { + Poll::Ready(Some(block_root)) => { + return Poll::Ready(Some(InboundEvent::ReadyEnvelope(block_root.into_inner()))); + } + Poll::Ready(None) | Poll::Pending => (), + } + match self.rpc_block_delay_queue.poll_expired(cx) { Poll::Ready(Some(queued_block)) => { return Poll::Ready(Some(InboundEvent::ReadyRpcBlock(queued_block.into_inner()))); @@ -418,11 +450,13 @@ impl ReprocessQueue { work_reprocessing_rx, ready_work_tx, gossip_block_delay_queue: DelayQueue::new(), + envelope_delay_queue: DelayQueue::new(), rpc_block_delay_queue: DelayQueue::new(), attestations_delay_queue: DelayQueue::new(), lc_updates_delay_queue: DelayQueue::new(), column_reconstructions_delay_queue: DelayQueue::new(), queued_gossip_block_roots: HashSet::new(), + awaiting_envelopes_per_root: HashMap::new(), queued_lc_updates: FnvHashMap::default(), queued_aggregates: FnvHashMap::default(), queued_unaggregates: FnvHashMap::default(), @@ -433,6 +467,7 @@ impl ReprocessQueue { next_attestation: 0, next_lc_update: 0, early_block_debounce: TimeLatch::default(), + envelope_delay_debounce: TimeLatch::default(), rpc_block_debounce: TimeLatch::default(), attestation_delay_debounce: TimeLatch::default(), lc_update_delay_debounce: TimeLatch::default(), @@ -498,6 +533,39 @@ impl ReprocessQueue { } } } + // An envelope that references an unknown block. Queue it until the block is + // imported, or until the timeout expires. + InboundEvent::Msg(UnknownBlockEnvelope(queued_envelope)) => { + let block_root = queued_envelope.beacon_block_root; + + // Don't add the same envelope to the queue twice. This prevents DoS attacks. + if self.awaiting_envelopes_per_root.contains_key(&block_root) { + return; + } + + // Check to ensure this won't over-fill the queue. + if self.awaiting_envelopes_per_root.len() >= MAXIMUM_QUEUED_ENVELOPES { + if self.envelope_delay_debounce.elapsed() { + warn!( + queue_size = MAXIMUM_QUEUED_ENVELOPES, + msg = "system resources may be saturated", + "Envelope delay queue is full" + ); + } + // Drop the envelope. + return; + } + + // Register the timeout. + let delay_key = self.envelope_delay_queue.insert( + block_root, + self.slot_clock.slot_duration() * QUEUED_ENVELOPE_DELAY_SLOTS, + ); + + // Store the envelope keyed by block root. + self.awaiting_envelopes_per_root + .insert(block_root, (queued_envelope, delay_key)); + } // A rpc block arrived for processing at the same time when a gossip block // for the same block hash is being imported. We wait for `QUEUED_RPC_BLOCK_DELAY` // and then send the rpc block back for processing assuming the gossip import @@ -647,6 +715,23 @@ impl ReprocessQueue { block_root, parent_root, }) => { + // Unqueue the envelope we have for this root, if any. + if let Some((envelope, delay_key)) = + self.awaiting_envelopes_per_root.remove(&block_root) + { + self.envelope_delay_queue.remove(&delay_key); + if self + .ready_work_tx + .try_send(ReadyWork::Envelope(envelope)) + .is_err() + { + error!( + ?block_root, + "Failed to send envelope for reprocessing after block import" + ); + } + } + // Unqueue the attestations we have for this root, if any. if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&block_root) { let mut sent_count = 0; @@ -802,6 +887,21 @@ impl ReprocessQueue { error!("Failed to pop queued block"); } } + // An envelope's timeout has expired. Send it for processing regardless of + // whether the block has been imported. + InboundEvent::ReadyEnvelope(block_root) => { + if let Some((envelope, _delay_key)) = + self.awaiting_envelopes_per_root.remove(&block_root) + { + if self + .ready_work_tx + .try_send(ReadyWork::Envelope(envelope)) + .is_err() + { + error!(?block_root, "Failed to send envelope after timeout"); + } + } + } InboundEvent::ReadyAttestation(queued_id) => { metrics::inc_counter( &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS, @@ -941,6 +1041,11 @@ impl ReprocessQueue { &[GOSSIP_BLOCKS], self.gossip_block_delay_queue.len() as i64, ); + metrics::set_gauge_vec( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL, + &[GOSSIP_ENVELOPES], + self.awaiting_envelopes_per_root.len() as i64, + ); metrics::set_gauge_vec( &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL, &[RPC_BLOCKS], diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index f8636f5429..ce6c69ff0c 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -49,8 +49,8 @@ use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction; use beacon_processor::{ DuplicateCache, GossipAggregatePackage, GossipAttestationBatch, work_reprocessing_queue::{ - QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate, - ReprocessQueueMessage, + QueuedAggregate, QueuedGossipBlock, QueuedGossipEnvelope, QueuedLightClientUpdate, + QueuedUnaggregate, ReprocessQueueMessage, }, }; @@ -3347,7 +3347,7 @@ impl NetworkBeaconProcessor { // TODO(gloas) update metrics to note how early the envelope arrived let inner_self = self.clone(); - let _process_fn = Box::pin(async move { + let process_fn = Box::pin(async move { inner_self .process_gossip_verified_execution_payload_envelope( peer_id, @@ -3356,7 +3356,27 @@ impl NetworkBeaconProcessor { .await; }); - // TODO(gloas) send to reprocess queue + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(ReprocessQueueMessage::UnknownBlockEnvelope( + QueuedGossipEnvelope { + beacon_block_slot: envelope_slot, + beacon_block_root, + process_fn, + }, + )), + }) + .is_err() + { + error!( + %envelope_slot, + ?beacon_block_root, + location = "envelope gossip", + "Failed to defer envelope import" + ); + } None } Ok(_) => Some(verified_envelope),