diff --git a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs index 56509c93f8..d15a3c2752 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs @@ -65,6 +65,13 @@ pub const QUEUED_SAMPLING_REQUESTS_DELAY: Duration = Duration::from_secs(12); /// For how long to queue delayed column reconstruction. pub const QUEUED_RECONSTRUCTION_DELAY: Duration = Duration::from_millis(150); +/// Maximum number of times an envelope can be retried for transient EL errors. +const MAX_ENVELOPE_RETRIES: u8 = 3; + +/// Fallback timeout multiplier for retry envelopes (in slots). If no BlockImported event +/// triggers the retry within this duration, the envelope is dispatched anyway. +const RETRY_ENVELOPE_TIMEOUT_SLOTS: u32 = 1; + /// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that /// we signature-verify blocks before putting them in the queue *should* protect against this, but /// it's nice to have extra protection. @@ -105,6 +112,8 @@ pub enum ReprocessQueueMessage { EarlyEnvelope(QueuedGossipEnvelope), /// An execution payload envelope that references a block not yet in fork choice. UnknownBlockForEnvelope(QueuedGossipEnvelope), + /// An execution payload envelope whose EL verification failed transiently and should be retried. + RetryEnvelope(QueuedGossipEnvelope), /// A gossip block for hash `X` is being imported, we should queue the rpc block for the same /// hash until the gossip block is imported. RpcBlock(QueuedRpcBlock), @@ -244,6 +253,8 @@ enum InboundEvent { ReadyBackfillSync(QueuedBackfillBatch), /// A column reconstruction that was queued is ready for processing. ReadyColumnReconstruction(QueuedColumnReconstruction), + /// A retry envelope's fallback timeout expired; dispatch it regardless. + ReadyRetryEnvelope(Hash256), /// A message sent to the `ReprocessQueue` Msg(ReprocessQueueMessage), } @@ -288,6 +299,11 @@ struct ReprocessQueue { awaiting_lc_updates_per_parent_root: HashMap>, /// Column reconstruction per block root. queued_column_reconstructions: HashMap, + /// Envelopes awaiting retry after a transient EL error, keyed by block root. + /// Dispatched when a `BlockImported` event fires, or after a fallback timeout. + retry_envelopes_per_root: HashMap, + /// Delay queue for retry envelope fallback timeouts (keyed by block root). + retry_envelope_delay_queue: DelayQueue, /// Queued backfill batches queued_backfill_batches: Vec, @@ -403,6 +419,15 @@ impl Stream for ReprocessQueue { Poll::Ready(None) | Poll::Pending => (), } + match self.retry_envelope_delay_queue.poll_expired(cx) { + Poll::Ready(Some(block_root)) => { + return Poll::Ready(Some(InboundEvent::ReadyRetryEnvelope( + block_root.into_inner(), + ))); + } + Poll::Ready(None) | Poll::Pending => (), + } + if let Some(next_backfill_batch_event) = self.next_backfill_batch_event.as_mut() { match next_backfill_batch_event.as_mut().poll(cx) { Poll::Ready(_) => { @@ -481,6 +506,8 @@ impl ReprocessQueue { awaiting_lc_updates_per_parent_root: HashMap::new(), queued_backfill_batches: Vec::new(), queued_column_reconstructions: HashMap::new(), + retry_envelopes_per_root: HashMap::new(), + retry_envelope_delay_queue: DelayQueue::new(), next_attestation: 0, next_lc_update: 0, early_block_debounce: TimeLatch::default(), @@ -628,6 +655,56 @@ impl ReprocessQueue { // for the same block hash is being imported. We wait for `QUEUED_RPC_BLOCK_DELAY` // and then send the rpc block back for processing assuming the gossip import // has completed by then. + InboundEvent::Msg(RetryEnvelope(queued_envelope)) => { + let block_root = queued_envelope.beacon_block_root; + + // Check if we already have a retry pending for this root. + if let Some((_existing, _delay_key, count)) = + self.retry_envelopes_per_root.get(&block_root) + { + if *count >= MAX_ENVELOPE_RETRIES { + warn!( + ?block_root, + retries = *count, + "Envelope exceeded max retries for transient EL error, dropping" + ); + return; + } + } + + // Determine retry count from any prior attempt. + let retry_count = self + .retry_envelopes_per_root + .get(&block_root) + .map_or(1, |(_, _, c)| c + 1); + + // Remove any existing entry (and its delay key) before reinserting. + if let Some((_old_envelope, old_delay_key, _)) = + self.retry_envelopes_per_root.remove(&block_root) + { + self.retry_envelope_delay_queue.remove(&old_delay_key); + } + + debug!( + ?block_root, + retry_count, + "Queuing envelope for retry after transient EL error (waiting for BlockImported)" + ); + + // Register a fallback timeout of 1 slot duration. + let fallback_timeout = + self.slot_clock.slot_duration() * RETRY_ENVELOPE_TIMEOUT_SLOTS; + let delay_key = self + .retry_envelope_delay_queue + .insert(block_root, fallback_timeout); + + self.retry_envelopes_per_root + .insert(block_root, (queued_envelope, delay_key, retry_count)); + } + // A rpc block arrived for processing at the same time when a gossip block + // for the same block hash is being imported. We wait for `QUEUED_RPC_BLOCK_DELAY` + // and then send the rpc block back for processing assuming the gossip import + // has completed by then. InboundEvent::Msg(RpcBlock(rpc_block)) => { // Check to ensure this won't over-fill the queue. if self.rpc_block_delay_queue.len() >= MAXIMUM_QUEUED_BLOCKS { @@ -790,6 +867,28 @@ impl ReprocessQueue { } } + // Dispatch any retry envelopes waiting on this block root. + if let Some((envelope, delay_key, retry_count)) = + self.retry_envelopes_per_root.remove(&block_root) + { + self.retry_envelope_delay_queue.remove(&delay_key); + debug!( + ?block_root, + retry_count, + "Dispatching retry envelope after BlockImported" + ); + if self + .ready_work_tx + .try_send(ReadyWork::Envelope(envelope)) + .is_err() + { + error!( + ?block_root, + "Failed to send retry envelope for reprocessing after block import" + ); + } + } + // Unqueue the attestations we have for this root, if any. if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&block_root) { let mut sent_count = 0; @@ -1107,6 +1206,29 @@ impl ReprocessQueue { ); } } + // Fallback timeout for a retry envelope — dispatch it even though no + // BlockImported arrived. + InboundEvent::ReadyRetryEnvelope(block_root) => { + if let Some((envelope, _delay_key, retry_count)) = + self.retry_envelopes_per_root.remove(&block_root) + { + debug!( + ?block_root, + retry_count, + "Retry envelope fallback timeout expired, dispatching" + ); + if self + .ready_work_tx + .try_send(ReadyWork::Envelope(envelope)) + .is_err() + { + error!( + ?block_root, + "Failed to send retry envelope after fallback timeout" + ); + } + } + } } metrics::set_gauge_vec( diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 826e4a8c50..255a1a2eac 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -4030,6 +4030,9 @@ impl NetworkBeaconProcessor { ) { let _processing_start_time = Instant::now(); let beacon_block_root = verified_envelope.signed_envelope.beacon_block_root(); + let envelope_slot = verified_envelope.signed_envelope.slot(); + // Keep a clone of the raw envelope in case we need to retry on transient EL errors. + let raw_envelope = verified_envelope.signed_envelope.clone(); #[allow(clippy::result_large_err)] let result = self @@ -4052,7 +4055,86 @@ impl NetworkBeaconProcessor { // Nothing to do } Err(e) => match e { - EnvelopeError::ExecutionPayloadError(epe) if !epe.penalize_peer() => {} + // Transient EL error — queue for retry on next BlockImported. + EnvelopeError::ExecutionPayloadError(epe) if !epe.penalize_peer() => { + warn!( + ?beacon_block_root, + error = ?epe, + "Transient EL error during envelope import, queuing for retry" + ); + + let chain = self.chain.clone(); + let process_fn = Box::pin(async move { + // Re-verify and re-import the envelope from scratch. + match chain.verify_envelope_for_gossip(raw_envelope).await { + Ok(re_verified) => { + let re_block_root = + re_verified.signed_envelope.beacon_block_root(); + let result = chain + .process_execution_payload_envelope( + re_block_root, + re_verified, + NotifyExecutionLayer::Yes, + BlockImportSource::Gossip, + || Ok(()), + ) + .await; + match &result { + Ok(_) => { + debug!( + ?re_block_root, + "Retry envelope imported successfully" + ); + } + Err(e) => { + warn!( + ?re_block_root, + error = ?e, + "Retry envelope failed on re-import" + ); + // On repeated transient failure, the envelope will be + // retried again via the reprocess queue (up to max + // retries), handled by the RetryEnvelope message handler. + if let EnvelopeError::ExecutionPayloadError(epe) = e { + if !epe.penalize_peer() { + // Could retry again, but we let the + // reprocess queue handle max retry logic. + } + } + } + } + } + Err(e) => { + debug!( + error = ?e, + "Retry envelope failed re-verification" + ); + } + } + }); + + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess( + ReprocessQueueMessage::RetryEnvelope( + QueuedGossipEnvelope { + beacon_block_slot: envelope_slot, + beacon_block_root, + process_fn, + }, + ), + ), + }) + .is_err() + { + error!( + ?beacon_block_root, + "Failed to queue envelope for EL retry" + ); + } + } EnvelopeError::BadSignature | EnvelopeError::BuilderIndexMismatch { .. } | EnvelopeError::SlotMismatch { .. }