diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 775f5a3df0..1ad325ebe1 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -22,7 +22,7 @@ pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_MISS_TOTAL: &s "validator_monitor_attestation_simulator_source_attester_miss_total"; /* -* Execution Payload Envelope Procsesing +* Execution Payload Envelope Processing */ pub static ENVELOPE_PROCESSING_REQUESTS: LazyLock> = LazyLock::new(|| { diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs index 6e4e7105e1..4a2b152703 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs @@ -5,7 +5,7 @@ use state_processing::{ VerifySignatures, envelope_processing::{VerifyStateRoot, process_execution_payload_envelope}, }; -use types::{EthSpec, SignedExecutionPayloadEnvelope}; +use types::EthSpec; use crate::{ BeaconChain, BeaconChainError, BeaconChainTypes, NotifyExecutionLayer, @@ -18,24 +18,14 @@ use crate::{ }, }; -pub trait IntoExecutionPendingEnvelope: Sized { - fn into_execution_pending_envelope( - self, - chain: &Arc>, - notify_execution_layer: NotifyExecutionLayer, - ) -> Result, EnvelopeError>; - - fn envelope(&self) -> &Arc>; -} - pub struct ExecutionPendingEnvelope { pub signed_envelope: MaybeAvailableEnvelope, pub import_data: EnvelopeImportData, pub payload_verification_handle: PayloadVerificationHandle, } -impl IntoExecutionPendingEnvelope for GossipVerifiedEnvelope { - fn into_execution_pending_envelope( +impl GossipVerifiedEnvelope { + pub fn into_execution_pending_envelope( self, chain: &Arc>, notify_execution_layer: NotifyExecutionLayer, @@ -116,25 +106,4 @@ impl IntoExecutionPendingEnvelope for GossipVerifiedEnve payload_verification_handle, }) } - - fn envelope(&self) -> &Arc> { - &self.signed_envelope - } -} - -impl IntoExecutionPendingEnvelope - for Arc> -{ - fn into_execution_pending_envelope( - self, - chain: &Arc>, - notify_execution_layer: NotifyExecutionLayer, - ) -> Result, EnvelopeError> { - GossipVerifiedEnvelope::new(self, &chain.gossip_verification_context())? - .into_execution_pending_envelope(chain, notify_execution_layer) - } - - fn envelope(&self) -> &Arc> { - self - } } diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs index 8c8ee57fb4..7b33d519e5 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs @@ -248,7 +248,7 @@ impl BeaconChain { /// /// ## Errors /// - /// Returns an `Err` if the given envelope was invalid, or an error was encountered during + /// Returns an `Err` if the given envelope was invalid, or an error was encountered during verification. pub async fn verify_envelope_for_gossip( self: &Arc, envelope: Arc>, diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index 9387156fc2..39dc1a4451 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -10,14 +10,14 @@ use types::{BeaconState, BlockImportSource, Hash256, SignedBeaconBlock, Slot}; use super::{ AvailableEnvelope, AvailableExecutedEnvelope, EnvelopeError, EnvelopeImportData, - ExecutedEnvelope, IntoExecutionPendingEnvelope, MaybeAvailableEnvelope, + ExecutedEnvelope, gossip_verified_envelope::GossipVerifiedEnvelope, }; use crate::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, NotifyExecutionLayer, block_verification_types::{AsBlock, AvailableBlockData}, metrics, - payload_envelope_verification::ExecutionPendingEnvelope, + payload_envelope_verification::{ExecutionPendingEnvelope, MaybeAvailableEnvelope}, validator_monitor::{get_slot_delay_ms, timestamp_now}, }; use eth2::types::{EventKind, SseExecutionPayloadAvailable}; @@ -26,25 +26,20 @@ impl BeaconChain { /// Returns `Ok(block_root)` if the given `unverified_envelope` was successfully verified and /// imported into the chain. /// - /// Items that implement `IntoExecutionPendingEnvelope` include: - /// - /// - `GossipVerifiedEnvelope` - /// - TODO(gloas) implement for envelopes recieved over RPC - /// /// ## Errors /// - /// Returns an `Err` if the given block was invalid, or an error was encountered during + /// Returns an `Err` if the given payload envelope was invalid, or an error was encountered during /// verification. #[instrument(skip_all, fields(block_root = ?block_root, block_source = %block_source))] - pub async fn process_execution_payload_envelope>( + pub async fn process_execution_payload_envelope( self: &Arc, block_root: Hash256, - unverified_envelope: P, + unverified_envelope: GossipVerifiedEnvelope, notify_execution_layer: NotifyExecutionLayer, block_source: BlockImportSource, publish_fn: impl FnOnce() -> Result<(), EnvelopeError>, ) -> Result { - let block_slot = unverified_envelope.envelope().slot(); + let block_slot = unverified_envelope.signed_envelope.slot(); // Set observed time if not already set. Usually this should be set by gossip or RPC, // but just in case we set it again here (useful for tests). @@ -110,9 +105,9 @@ impl BeaconChain { } }; - // Verify and import the block. + // Verify and import the payload envelope. match import_envelope.await { - // The block was successfully verified and imported. Yay. + // The payload envelope was successfully verified and imported. Yay. Ok(status @ AvailabilityProcessingStatus::Imported(block_root)) => { info!( ?block_root, @@ -126,7 +121,7 @@ impl BeaconChain { Ok(status) } Ok(status @ AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => { - debug!(?block_root, %slot, "Beacon block awaiting blobs"); + debug!(?block_root, %slot, "Payload envelope awaiting blobs"); Ok(status) } @@ -139,7 +134,7 @@ impl BeaconChain { ); } _ => { - // There was an error whilst attempting to verify and import the block. The block might + // There was an error whilst attempting to verify and import the payload envelope. It might // be partially verified or partially imported. crit!( error = ?e, @@ -149,7 +144,7 @@ impl BeaconChain { }; Err(EnvelopeError::BeaconChainError(e)) } - // The block failed verification. + // The payload envelope failed verification. Err(other) => { warn!( reason = other.to_string(), @@ -165,7 +160,7 @@ impl BeaconChain { /// /// An error is returned if the verification handle couldn't be awaited. #[instrument(skip_all, level = "debug")] - pub async fn into_executed_payload_envelope( + async fn into_executed_payload_envelope( self: Arc, pending_envelope: ExecutionPendingEnvelope, ) -> Result, EnvelopeError> { @@ -278,6 +273,8 @@ impl BeaconChain { return Err(EnvelopeError::BlockRootUnknown { block_root }); } + + // TODO(gloas) no fork choice logic yet // Take an exclusive write-lock on fork choice. It's very important to prevent deadlocks by // avoiding taking other locks whilst holding this lock. let mut fork_choice = parking_lot::RwLockUpgradableReadGuard::upgrade(fork_choice_reader); @@ -349,6 +346,7 @@ impl BeaconChain { error = ?e, "Database write failed!" ); + return Err(e.into()); // TODO(gloas) handle db write failure // return Err(self // .handle_import_block_db_write_error(fork_choice) diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs index 1a5827c433..12019a436d 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -3,27 +3,18 @@ //! types, starting at a `SignedExecutionPayloadEnvelope` and finishing with an `AvailableExecutedEnvelope` (see //! diagram below). //! -//! // TODO(gloas) we might want to update this diagram to include `AvailabelExecutedEnvelope` //! ```ignore -//! START -//! | -//! ▼ //! SignedExecutionPayloadEnvelope //! | -//! |--------------- -//! | | -//! | ▼ -//! | GossipVerifiedEnvelope -//! | | -//! |--------------- +//! ▼ +//! GossipVerifiedEnvelope //! | //! ▼ //! ExecutionPendingEnvelope //! | //! await -//! | //! ▼ -//! END +//! ExecutedEnvelope //! //! ``` @@ -48,7 +39,7 @@ pub mod gossip_verified_envelope; pub mod import; mod payload_notifier; -pub use execution_pending_envelope::{ExecutionPendingEnvelope, IntoExecutionPendingEnvelope}; +pub use execution_pending_envelope::ExecutionPendingEnvelope; #[derive(PartialEq)] pub struct EnvelopeImportData { @@ -63,7 +54,7 @@ pub struct AvailableEnvelope { execution_block_hash: ExecutionBlockHash, envelope: Arc>, columns: DataColumnSidecarList, - /// Timestamp at which this block first became available (UNIX timestamp, time since 1970). + /// Timestamp at which this envelope first became available (UNIX timestamp, time since 1970). columns_available_timestamp: Option, pub spec: Arc, } @@ -111,7 +102,7 @@ pub enum MaybeAvailableEnvelope { }, } -/// This snapshot is to be used for verifying a envelope of the block. +/// This snapshot is to be used for verifying a payload envelope. #[derive(Debug, Clone)] pub struct EnvelopeProcessingSnapshot { /// This state is equivalent to the `self.beacon_block.state_root()` before applying the envelope. @@ -183,54 +174,41 @@ impl AvailableExecutedEnvelope { #[derive(Debug)] pub enum EnvelopeError { /// The envelope's block root is unknown. - BlockRootUnknown { - block_root: Hash256, - }, + BlockRootUnknown { block_root: Hash256 }, /// The signature is invalid. BadSignature, /// The builder index doesn't match the committed bid - BuilderIndexMismatch { - committed_bid: u64, - envelope: u64, - }, - // The envelope slot doesn't match the block - SlotMismatch { - block: Slot, - envelope: Slot, - }, - // The validator index is unknown - UnknownValidator { - builder_index: u64, - }, - // The block hash doesn't match the committed bid + BuilderIndexMismatch { committed_bid: u64, envelope: u64 }, + /// The envelope slot doesn't match the block + SlotMismatch { block: Slot, envelope: Slot }, + /// The validator index is unknown + UnknownValidator { builder_index: u64 }, + /// The block hash doesn't match the committed bid BlockHashMismatch { committed_bid: ExecutionBlockHash, envelope: ExecutionBlockHash, }, - // The block's proposer_index does not match the locally computed proposer - IncorrectBlockProposer { - block: u64, - local_shuffling: u64, - }, - // The slot belongs to a block that is from a slot prior than - // the most recently finalized slot + /// The block's proposer_index does not match the locally computed proposer + IncorrectBlockProposer { block: u64, local_shuffling: u64 }, + /// The slot belongs to a block that is from a slot prior than + /// to most recently finalized slot PriorToFinalization { payload_slot: Slot, latest_finalized_slot: Slot, }, - // Some Beacon Chain Error + /// Some Beacon Chain Error BeaconChainError(Arc), - // Some Beacon State error + /// Some Beacon State error BeaconStateError(BeaconStateError), - // Some BlockProcessingError (for electra operations) + /// Some BlockProcessingError (for electra operations) BlockProcessingError(BlockProcessingError), - // Some EnvelopeProcessingError + /// Some EnvelopeProcessingError EnvelopeProcessingError(EnvelopeProcessingError), - // Error verifying the execution payload + /// Error verifying the execution payload ExecutionPayloadError(ExecutionPayloadError), - // An error from block-level checks reused during envelope import + /// An error from block-level checks reused during envelope import BlockError(BlockError), - // Internal error + /// Internal error InternalError(String), } diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 33a00bfa49..c33f4840e0 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -41,7 +41,8 @@ pub use crate::scheduler::BeaconProcessorQueueLengths; use crate::scheduler::work_queue::WorkQueues; use crate::work_reprocessing_queue::{ - QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage, + QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, QueuedGossipEnvelope, + ReprocessQueueMessage, }; use futures::stream::{Stream, StreamExt}; use futures::task::Poll; @@ -242,6 +243,18 @@ impl From for WorkEvent { process_fn, }, }, + ReadyWork::Envelope(QueuedGossipEnvelope { + beacon_block_slot, + beacon_block_root, + process_fn, + }) => Self { + drop_during_sync: false, + work: Work::DelayedImportEnvelope { + beacon_block_slot, + beacon_block_root, + process_fn, + }, + }, ReadyWork::RpcBlock(QueuedRpcBlock { beacon_block_root, process_fn, @@ -384,6 +397,11 @@ pub enum Work { beacon_block_root: Hash256, process_fn: AsyncFn, }, + DelayedImportEnvelope { + beacon_block_slot: Slot, + beacon_block_root: Hash256, + process_fn: AsyncFn, + }, GossipVoluntaryExit(BlockingFn), GossipProposerSlashing(BlockingFn), GossipAttesterSlashing(BlockingFn), @@ -447,6 +465,7 @@ pub enum WorkType { GossipBlobSidecar, GossipDataColumnSidecar, DelayedImportBlock, + DelayedImportEnvelope, GossipVoluntaryExit, GossipProposerSlashing, GossipAttesterSlashing, @@ -498,6 +517,7 @@ impl Work { Work::GossipBlobSidecar(_) => WorkType::GossipBlobSidecar, Work::GossipDataColumnSidecar(_) => WorkType::GossipDataColumnSidecar, Work::DelayedImportBlock { .. } => WorkType::DelayedImportBlock, + Work::DelayedImportEnvelope { .. } => WorkType::DelayedImportEnvelope, Work::GossipVoluntaryExit(_) => WorkType::GossipVoluntaryExit, Work::GossipProposerSlashing(_) => WorkType::GossipProposerSlashing, Work::GossipAttesterSlashing(_) => WorkType::GossipAttesterSlashing, @@ -793,6 +813,8 @@ impl BeaconProcessor { // on the delayed ones. } else if let Some(item) = work_queues.delayed_block_queue.pop() { Some(item) + } else if let Some(item) = work_queues.delayed_envelope_queue.pop() { + Some(item) // Check gossip blocks and payloads before gossip attestations, since a block might be // required to verify some attestations. } else if let Some(item) = work_queues.gossip_block_queue.pop() { @@ -1111,6 +1133,9 @@ impl BeaconProcessor { Work::DelayedImportBlock { .. } => { work_queues.delayed_block_queue.push(work, work_id) } + Work::DelayedImportEnvelope { .. } => { + work_queues.delayed_envelope_queue.push(work, work_id) + } Work::GossipVoluntaryExit { .. } => { work_queues.gossip_voluntary_exit_queue.push(work, work_id) } @@ -1238,6 +1263,7 @@ impl BeaconProcessor { work_queues.gossip_data_column_queue.len() } WorkType::DelayedImportBlock => work_queues.delayed_block_queue.len(), + WorkType::DelayedImportEnvelope => work_queues.delayed_envelope_queue.len(), WorkType::GossipVoluntaryExit => { work_queues.gossip_voluntary_exit_queue.len() } @@ -1435,6 +1461,11 @@ impl BeaconProcessor { beacon_block_slot: _, beacon_block_root: _, process_fn, + } + | Work::DelayedImportEnvelope { + beacon_block_slot: _, + beacon_block_root: _, + process_fn, } => task_spawner.spawn_async(process_fn), Work::RpcBlock { process_fn, diff --git a/beacon_node/beacon_processor/src/scheduler/work_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_queue.rs index 934659b304..e48c776b6d 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_queue.rs @@ -127,6 +127,7 @@ pub struct BeaconProcessorQueueLengths { gossip_blob_queue: usize, gossip_data_column_queue: usize, delayed_block_queue: usize, + delayed_envelope_queue: usize, status_queue: usize, block_brange_queue: usize, block_broots_queue: usize, @@ -197,6 +198,7 @@ impl BeaconProcessorQueueLengths { gossip_blob_queue: 1024, gossip_data_column_queue: 1024, delayed_block_queue: 1024, + delayed_envelope_queue: 1024, status_queue: 1024, block_brange_queue: 1024, block_broots_queue: 1024, @@ -250,6 +252,7 @@ pub struct WorkQueues { pub gossip_blob_queue: FifoQueue>, pub gossip_data_column_queue: FifoQueue>, pub delayed_block_queue: FifoQueue>, + pub delayed_envelope_queue: FifoQueue>, pub status_queue: FifoQueue>, pub block_brange_queue: FifoQueue>, pub block_broots_queue: FifoQueue>, @@ -315,6 +318,7 @@ impl WorkQueues { let gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue); let gossip_data_column_queue = FifoQueue::new(queue_lengths.gossip_data_column_queue); let delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue); + let delayed_envelope_queue = FifoQueue::new(queue_lengths.delayed_envelope_queue); let status_queue = FifoQueue::new(queue_lengths.status_queue); let block_brange_queue = FifoQueue::new(queue_lengths.block_brange_queue); @@ -375,6 +379,7 @@ impl WorkQueues { gossip_blob_queue, gossip_data_column_queue, delayed_block_queue, + delayed_envelope_queue, status_queue, block_brange_queue, block_broots_queue, diff --git a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs index c99388287c..6cb82a2906 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs @@ -35,6 +35,7 @@ use types::{EthSpec, Hash256, Slot}; const TASK_NAME: &str = "beacon_processor_reprocess_queue"; const GOSSIP_BLOCKS: &str = "gossip_blocks"; +const GOSSIP_ENVELOPES: &str = "gossip_envelopes"; const RPC_BLOCKS: &str = "rpc_blocks"; const ATTESTATIONS: &str = "attestations"; const ATTESTATIONS_PER_ROOT: &str = "attestations_per_root"; @@ -51,6 +52,10 @@ pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12); /// For how long to queue light client updates for re-processing. pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12); +/// Envelope timeout as a multiplier of slot duration. Envelopes waiting for their block will be +/// sent for processing after this many slots worth of time, even if the block hasn't arrived. +const QUEUED_ENVELOPE_DELAY_SLOTS: u32 = 1; + /// For how long to queue rpc blocks before sending them back for reprocessing. pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(4); @@ -65,6 +70,9 @@ pub const QUEUED_RECONSTRUCTION_DELAY: Duration = Duration::from_millis(150); /// it's nice to have extra protection. const MAXIMUM_QUEUED_BLOCKS: usize = 16; +/// Set an arbitrary upper-bound on the number of queued envelopes to avoid DoS attacks. +const MAXIMUM_QUEUED_ENVELOPES: usize = 16; + /// How many attestations we keep before new ones get dropped. const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384; @@ -93,6 +101,8 @@ pub const RECONSTRUCTION_DEADLINE: (u64, u64) = (1, 4); pub enum ReprocessQueueMessage { /// A block that has been received early and we should queue for later processing. EarlyBlock(QueuedGossipBlock), + /// An execution payload envelope that references a block not yet in fork choice. + UnknownBlockEnvelope(QueuedGossipEnvelope), /// A gossip block for hash `X` is being imported, we should queue the rpc block for the same /// hash until the gossip block is imported. RpcBlock(QueuedRpcBlock), @@ -120,6 +130,7 @@ pub enum ReprocessQueueMessage { /// Events sent by the scheduler once they are ready for re-processing. pub enum ReadyWork { Block(QueuedGossipBlock), + Envelope(QueuedGossipEnvelope), RpcBlock(QueuedRpcBlock), IgnoredRpcBlock(IgnoredRpcBlock), Unaggregate(QueuedUnaggregate), @@ -157,6 +168,13 @@ pub struct QueuedGossipBlock { pub process_fn: AsyncFn, } +/// An execution payload envelope that arrived early and has been queued for later import. +pub struct QueuedGossipEnvelope { + pub beacon_block_slot: Slot, + pub beacon_block_root: Hash256, + pub process_fn: AsyncFn, +} + /// A block that arrived for processing when the same block was being imported over gossip. /// It is queued for later import. pub struct QueuedRpcBlock { @@ -209,6 +227,8 @@ impl From for WorkEvent { enum InboundEvent { /// A gossip block that was queued for later processing and is ready for import. ReadyGossipBlock(QueuedGossipBlock), + /// An envelope whose block has been imported and is now ready for processing. + ReadyEnvelope(Hash256), /// A rpc block that was queued because the same gossip block was being imported /// will now be retried for import. ReadyRpcBlock(QueuedRpcBlock), @@ -234,6 +254,8 @@ struct ReprocessQueue { /* Queues */ /// Queue to manage scheduled early blocks. gossip_block_delay_queue: DelayQueue, + /// Queue to manage envelope timeouts (keyed by block root). + envelope_delay_queue: DelayQueue, /// Queue to manage scheduled early blocks. rpc_block_delay_queue: DelayQueue, /// Queue to manage scheduled attestations. @@ -246,6 +268,8 @@ struct ReprocessQueue { /* Queued items */ /// Queued blocks. queued_gossip_block_roots: HashSet, + /// Queued envelopes awaiting their block, keyed by block root. + awaiting_envelopes_per_root: HashMap, /// Queued aggregated attestations. queued_aggregates: FnvHashMap, /// Queued attestations. @@ -266,6 +290,7 @@ struct ReprocessQueue { next_attestation: usize, next_lc_update: usize, early_block_debounce: TimeLatch, + envelope_delay_debounce: TimeLatch, rpc_block_debounce: TimeLatch, attestation_delay_debounce: TimeLatch, lc_update_delay_debounce: TimeLatch, @@ -315,6 +340,13 @@ impl Stream for ReprocessQueue { Poll::Ready(None) | Poll::Pending => (), } + match self.envelope_delay_queue.poll_expired(cx) { + Poll::Ready(Some(block_root)) => { + return Poll::Ready(Some(InboundEvent::ReadyEnvelope(block_root.into_inner()))); + } + Poll::Ready(None) | Poll::Pending => (), + } + match self.rpc_block_delay_queue.poll_expired(cx) { Poll::Ready(Some(queued_block)) => { return Poll::Ready(Some(InboundEvent::ReadyRpcBlock(queued_block.into_inner()))); @@ -418,11 +450,13 @@ impl ReprocessQueue { work_reprocessing_rx, ready_work_tx, gossip_block_delay_queue: DelayQueue::new(), + envelope_delay_queue: DelayQueue::new(), rpc_block_delay_queue: DelayQueue::new(), attestations_delay_queue: DelayQueue::new(), lc_updates_delay_queue: DelayQueue::new(), column_reconstructions_delay_queue: DelayQueue::new(), queued_gossip_block_roots: HashSet::new(), + awaiting_envelopes_per_root: HashMap::new(), queued_lc_updates: FnvHashMap::default(), queued_aggregates: FnvHashMap::default(), queued_unaggregates: FnvHashMap::default(), @@ -433,6 +467,7 @@ impl ReprocessQueue { next_attestation: 0, next_lc_update: 0, early_block_debounce: TimeLatch::default(), + envelope_delay_debounce: TimeLatch::default(), rpc_block_debounce: TimeLatch::default(), attestation_delay_debounce: TimeLatch::default(), lc_update_delay_debounce: TimeLatch::default(), @@ -498,6 +533,39 @@ impl ReprocessQueue { } } } + // An envelope that references an unknown block. Queue it until the block is + // imported, or until the timeout expires. + InboundEvent::Msg(UnknownBlockEnvelope(queued_envelope)) => { + let block_root = queued_envelope.beacon_block_root; + + // Don't add the same envelope to the queue twice. This prevents DoS attacks. + if self.awaiting_envelopes_per_root.contains_key(&block_root) { + return; + } + + // Check to ensure this won't over-fill the queue. + if self.awaiting_envelopes_per_root.len() >= MAXIMUM_QUEUED_ENVELOPES { + if self.envelope_delay_debounce.elapsed() { + warn!( + queue_size = MAXIMUM_QUEUED_ENVELOPES, + msg = "system resources may be saturated", + "Envelope delay queue is full" + ); + } + // Drop the envelope. + return; + } + + // Register the timeout. + let delay_key = self.envelope_delay_queue.insert( + block_root, + self.slot_clock.slot_duration() * QUEUED_ENVELOPE_DELAY_SLOTS, + ); + + // Store the envelope keyed by block root. + self.awaiting_envelopes_per_root + .insert(block_root, (queued_envelope, delay_key)); + } // A rpc block arrived for processing at the same time when a gossip block // for the same block hash is being imported. We wait for `QUEUED_RPC_BLOCK_DELAY` // and then send the rpc block back for processing assuming the gossip import @@ -647,6 +715,23 @@ impl ReprocessQueue { block_root, parent_root, }) => { + // Unqueue the envelope we have for this root, if any. + if let Some((envelope, delay_key)) = + self.awaiting_envelopes_per_root.remove(&block_root) + { + self.envelope_delay_queue.remove(&delay_key); + if self + .ready_work_tx + .try_send(ReadyWork::Envelope(envelope)) + .is_err() + { + error!( + ?block_root, + "Failed to send envelope for reprocessing after block import" + ); + } + } + // Unqueue the attestations we have for this root, if any. if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&block_root) { let mut sent_count = 0; @@ -802,6 +887,21 @@ impl ReprocessQueue { error!("Failed to pop queued block"); } } + // An envelope's timeout has expired. Send it for processing regardless of + // whether the block has been imported. + InboundEvent::ReadyEnvelope(block_root) => { + if let Some((envelope, _delay_key)) = + self.awaiting_envelopes_per_root.remove(&block_root) + { + if self + .ready_work_tx + .try_send(ReadyWork::Envelope(envelope)) + .is_err() + { + error!(?block_root, "Failed to send envelope after timeout"); + } + } + } InboundEvent::ReadyAttestation(queued_id) => { metrics::inc_counter( &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS, @@ -941,6 +1041,11 @@ impl ReprocessQueue { &[GOSSIP_BLOCKS], self.gossip_block_delay_queue.len() as i64, ); + metrics::set_gauge_vec( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL, + &[GOSSIP_ENVELOPES], + self.awaiting_envelopes_per_root.len() as i64, + ); metrics::set_gauge_vec( &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL, &[RPC_BLOCKS], diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index f8636f5429..ce6c69ff0c 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -49,8 +49,8 @@ use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction; use beacon_processor::{ DuplicateCache, GossipAggregatePackage, GossipAttestationBatch, work_reprocessing_queue::{ - QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate, - ReprocessQueueMessage, + QueuedAggregate, QueuedGossipBlock, QueuedGossipEnvelope, QueuedLightClientUpdate, + QueuedUnaggregate, ReprocessQueueMessage, }, }; @@ -3347,7 +3347,7 @@ impl NetworkBeaconProcessor { // TODO(gloas) update metrics to note how early the envelope arrived let inner_self = self.clone(); - let _process_fn = Box::pin(async move { + let process_fn = Box::pin(async move { inner_self .process_gossip_verified_execution_payload_envelope( peer_id, @@ -3356,7 +3356,27 @@ impl NetworkBeaconProcessor { .await; }); - // TODO(gloas) send to reprocess queue + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(ReprocessQueueMessage::UnknownBlockEnvelope( + QueuedGossipEnvelope { + beacon_block_slot: envelope_slot, + beacon_block_root, + process_fn, + }, + )), + }) + .is_err() + { + error!( + %envelope_slot, + ?beacon_block_root, + location = "envelope gossip", + "Failed to defer envelope import" + ); + } None } Ok(_) => Some(verified_envelope), diff --git a/validator_client/validator_services/src/block_service.rs b/validator_client/validator_services/src/block_service.rs index 27b5659f1d..f6d3bc910f 100644 --- a/validator_client/validator_services/src/block_service.rs +++ b/validator_client/validator_services/src/block_service.rs @@ -10,7 +10,6 @@ use std::fmt::Debug; use std::future::Future; use std::ops::Deref; use std::sync::Arc; -use std::thread::sleep; use std::time::Duration; use task_executor::TaskExecutor; use tokio::sync::mpsc; @@ -622,8 +621,6 @@ impl BlockService { ) .await?; - sleep(Duration::from_secs(4)); - // TODO(gloas) we only need to fetch, sign and publish the envelope in the local building case. // Right now we always default to local building. Once we implement trustless/trusted builder logic // we should check the bid for index == BUILDER_INDEX_SELF_BUILD