feat(focil): retry envelope on transient EL errors

When engine_newPayloadV6 fails with a transient error (e.g. Besu's
ConcurrentModificationException), queue the envelope for retry instead
of permanently rejecting it. Matches Lodestar's behavior of retrying
on the next BlockImported event.

- Add RetryEnvelope variant to ReprocessQueueMessage
- On BlockImported, immediately dispatch any pending retry envelopes
- Fallback timeout of 1 slot in case no block arrives
- Max 3 retries per envelope to prevent infinite loops
- Only retry non-penalizing EL errors (transient failures)
This commit is contained in:
Devnet Bot
2026-05-06 14:29:10 +00:00
parent 2388accc78
commit 39b6f58bc2
2 changed files with 205 additions and 1 deletions

View File

@@ -65,6 +65,13 @@ pub const QUEUED_SAMPLING_REQUESTS_DELAY: Duration = Duration::from_secs(12);
/// For how long to queue delayed column reconstruction. /// For how long to queue delayed column reconstruction.
pub const QUEUED_RECONSTRUCTION_DELAY: Duration = Duration::from_millis(150); pub const QUEUED_RECONSTRUCTION_DELAY: Duration = Duration::from_millis(150);
/// Maximum number of times an envelope can be retried for transient EL errors.
const MAX_ENVELOPE_RETRIES: u8 = 3;
/// Fallback timeout multiplier for retry envelopes (in slots). If no BlockImported event
/// triggers the retry within this duration, the envelope is dispatched anyway.
const RETRY_ENVELOPE_TIMEOUT_SLOTS: u32 = 1;
/// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that /// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that
/// we signature-verify blocks before putting them in the queue *should* protect against this, but /// we signature-verify blocks before putting them in the queue *should* protect against this, but
/// it's nice to have extra protection. /// it's nice to have extra protection.
@@ -105,6 +112,8 @@ pub enum ReprocessQueueMessage {
EarlyEnvelope(QueuedGossipEnvelope), EarlyEnvelope(QueuedGossipEnvelope),
/// An execution payload envelope that references a block not yet in fork choice. /// An execution payload envelope that references a block not yet in fork choice.
UnknownBlockForEnvelope(QueuedGossipEnvelope), UnknownBlockForEnvelope(QueuedGossipEnvelope),
/// An execution payload envelope whose EL verification failed transiently and should be retried.
RetryEnvelope(QueuedGossipEnvelope),
/// A gossip block for hash `X` is being imported, we should queue the rpc block for the same /// A gossip block for hash `X` is being imported, we should queue the rpc block for the same
/// hash until the gossip block is imported. /// hash until the gossip block is imported.
RpcBlock(QueuedRpcBlock), RpcBlock(QueuedRpcBlock),
@@ -244,6 +253,8 @@ enum InboundEvent {
ReadyBackfillSync(QueuedBackfillBatch), ReadyBackfillSync(QueuedBackfillBatch),
/// A column reconstruction that was queued is ready for processing. /// A column reconstruction that was queued is ready for processing.
ReadyColumnReconstruction(QueuedColumnReconstruction), ReadyColumnReconstruction(QueuedColumnReconstruction),
/// A retry envelope's fallback timeout expired; dispatch it regardless.
ReadyRetryEnvelope(Hash256),
/// A message sent to the `ReprocessQueue` /// A message sent to the `ReprocessQueue`
Msg(ReprocessQueueMessage), Msg(ReprocessQueueMessage),
} }
@@ -288,6 +299,11 @@ struct ReprocessQueue<S> {
awaiting_lc_updates_per_parent_root: HashMap<Hash256, Vec<QueuedLightClientUpdateId>>, awaiting_lc_updates_per_parent_root: HashMap<Hash256, Vec<QueuedLightClientUpdateId>>,
/// Column reconstruction per block root. /// Column reconstruction per block root.
queued_column_reconstructions: HashMap<Hash256, DelayKey>, queued_column_reconstructions: HashMap<Hash256, DelayKey>,
/// Envelopes awaiting retry after a transient EL error, keyed by block root.
/// Dispatched when a `BlockImported` event fires, or after a fallback timeout.
retry_envelopes_per_root: HashMap<Hash256, (QueuedGossipEnvelope, DelayKey, u8)>,
/// Delay queue for retry envelope fallback timeouts (keyed by block root).
retry_envelope_delay_queue: DelayQueue<Hash256>,
/// Queued backfill batches /// Queued backfill batches
queued_backfill_batches: Vec<QueuedBackfillBatch>, queued_backfill_batches: Vec<QueuedBackfillBatch>,
@@ -403,6 +419,15 @@ impl<S: SlotClock> Stream for ReprocessQueue<S> {
Poll::Ready(None) | Poll::Pending => (), Poll::Ready(None) | Poll::Pending => (),
} }
match self.retry_envelope_delay_queue.poll_expired(cx) {
Poll::Ready(Some(block_root)) => {
return Poll::Ready(Some(InboundEvent::ReadyRetryEnvelope(
block_root.into_inner(),
)));
}
Poll::Ready(None) | Poll::Pending => (),
}
if let Some(next_backfill_batch_event) = self.next_backfill_batch_event.as_mut() { if let Some(next_backfill_batch_event) = self.next_backfill_batch_event.as_mut() {
match next_backfill_batch_event.as_mut().poll(cx) { match next_backfill_batch_event.as_mut().poll(cx) {
Poll::Ready(_) => { Poll::Ready(_) => {
@@ -481,6 +506,8 @@ impl<S: SlotClock> ReprocessQueue<S> {
awaiting_lc_updates_per_parent_root: HashMap::new(), awaiting_lc_updates_per_parent_root: HashMap::new(),
queued_backfill_batches: Vec::new(), queued_backfill_batches: Vec::new(),
queued_column_reconstructions: HashMap::new(), queued_column_reconstructions: HashMap::new(),
retry_envelopes_per_root: HashMap::new(),
retry_envelope_delay_queue: DelayQueue::new(),
next_attestation: 0, next_attestation: 0,
next_lc_update: 0, next_lc_update: 0,
early_block_debounce: TimeLatch::default(), early_block_debounce: TimeLatch::default(),
@@ -628,6 +655,56 @@ impl<S: SlotClock> ReprocessQueue<S> {
// for the same block hash is being imported. We wait for `QUEUED_RPC_BLOCK_DELAY` // for the same block hash is being imported. We wait for `QUEUED_RPC_BLOCK_DELAY`
// and then send the rpc block back for processing assuming the gossip import // and then send the rpc block back for processing assuming the gossip import
// has completed by then. // has completed by then.
InboundEvent::Msg(RetryEnvelope(queued_envelope)) => {
let block_root = queued_envelope.beacon_block_root;
// Check if we already have a retry pending for this root.
if let Some((_existing, _delay_key, count)) =
self.retry_envelopes_per_root.get(&block_root)
{
if *count >= MAX_ENVELOPE_RETRIES {
warn!(
?block_root,
retries = *count,
"Envelope exceeded max retries for transient EL error, dropping"
);
return;
}
}
// Determine retry count from any prior attempt.
let retry_count = self
.retry_envelopes_per_root
.get(&block_root)
.map_or(1, |(_, _, c)| c + 1);
// Remove any existing entry (and its delay key) before reinserting.
if let Some((_old_envelope, old_delay_key, _)) =
self.retry_envelopes_per_root.remove(&block_root)
{
self.retry_envelope_delay_queue.remove(&old_delay_key);
}
debug!(
?block_root,
retry_count,
"Queuing envelope for retry after transient EL error (waiting for BlockImported)"
);
// Register a fallback timeout of 1 slot duration.
let fallback_timeout =
self.slot_clock.slot_duration() * RETRY_ENVELOPE_TIMEOUT_SLOTS;
let delay_key = self
.retry_envelope_delay_queue
.insert(block_root, fallback_timeout);
self.retry_envelopes_per_root
.insert(block_root, (queued_envelope, delay_key, retry_count));
}
// A rpc block arrived for processing at the same time when a gossip block
// for the same block hash is being imported. We wait for `QUEUED_RPC_BLOCK_DELAY`
// and then send the rpc block back for processing assuming the gossip import
// has completed by then.
InboundEvent::Msg(RpcBlock(rpc_block)) => { InboundEvent::Msg(RpcBlock(rpc_block)) => {
// Check to ensure this won't over-fill the queue. // Check to ensure this won't over-fill the queue.
if self.rpc_block_delay_queue.len() >= MAXIMUM_QUEUED_BLOCKS { if self.rpc_block_delay_queue.len() >= MAXIMUM_QUEUED_BLOCKS {
@@ -790,6 +867,28 @@ impl<S: SlotClock> ReprocessQueue<S> {
} }
} }
// Dispatch any retry envelopes waiting on this block root.
if let Some((envelope, delay_key, retry_count)) =
self.retry_envelopes_per_root.remove(&block_root)
{
self.retry_envelope_delay_queue.remove(&delay_key);
debug!(
?block_root,
retry_count,
"Dispatching retry envelope after BlockImported"
);
if self
.ready_work_tx
.try_send(ReadyWork::Envelope(envelope))
.is_err()
{
error!(
?block_root,
"Failed to send retry envelope for reprocessing after block import"
);
}
}
// Unqueue the attestations we have for this root, if any. // Unqueue the attestations we have for this root, if any.
if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&block_root) { if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&block_root) {
let mut sent_count = 0; let mut sent_count = 0;
@@ -1107,6 +1206,29 @@ impl<S: SlotClock> ReprocessQueue<S> {
); );
} }
} }
// Fallback timeout for a retry envelope — dispatch it even though no
// BlockImported arrived.
InboundEvent::ReadyRetryEnvelope(block_root) => {
if let Some((envelope, _delay_key, retry_count)) =
self.retry_envelopes_per_root.remove(&block_root)
{
debug!(
?block_root,
retry_count,
"Retry envelope fallback timeout expired, dispatching"
);
if self
.ready_work_tx
.try_send(ReadyWork::Envelope(envelope))
.is_err()
{
error!(
?block_root,
"Failed to send retry envelope after fallback timeout"
);
}
}
}
} }
metrics::set_gauge_vec( metrics::set_gauge_vec(

View File

@@ -4030,6 +4030,9 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
) { ) {
let _processing_start_time = Instant::now(); let _processing_start_time = Instant::now();
let beacon_block_root = verified_envelope.signed_envelope.beacon_block_root(); let beacon_block_root = verified_envelope.signed_envelope.beacon_block_root();
let envelope_slot = verified_envelope.signed_envelope.slot();
// Keep a clone of the raw envelope in case we need to retry on transient EL errors.
let raw_envelope = verified_envelope.signed_envelope.clone();
#[allow(clippy::result_large_err)] #[allow(clippy::result_large_err)]
let result = self let result = self
@@ -4052,7 +4055,86 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// Nothing to do // Nothing to do
} }
Err(e) => match e { Err(e) => match e {
EnvelopeError::ExecutionPayloadError(epe) if !epe.penalize_peer() => {} // Transient EL error — queue for retry on next BlockImported.
EnvelopeError::ExecutionPayloadError(epe) if !epe.penalize_peer() => {
warn!(
?beacon_block_root,
error = ?epe,
"Transient EL error during envelope import, queuing for retry"
);
let chain = self.chain.clone();
let process_fn = Box::pin(async move {
// Re-verify and re-import the envelope from scratch.
match chain.verify_envelope_for_gossip(raw_envelope).await {
Ok(re_verified) => {
let re_block_root =
re_verified.signed_envelope.beacon_block_root();
let result = chain
.process_execution_payload_envelope(
re_block_root,
re_verified,
NotifyExecutionLayer::Yes,
BlockImportSource::Gossip,
|| Ok(()),
)
.await;
match &result {
Ok(_) => {
debug!(
?re_block_root,
"Retry envelope imported successfully"
);
}
Err(e) => {
warn!(
?re_block_root,
error = ?e,
"Retry envelope failed on re-import"
);
// On repeated transient failure, the envelope will be
// retried again via the reprocess queue (up to max
// retries), handled by the RetryEnvelope message handler.
if let EnvelopeError::ExecutionPayloadError(epe) = e {
if !epe.penalize_peer() {
// Could retry again, but we let the
// reprocess queue handle max retry logic.
}
}
}
}
}
Err(e) => {
debug!(
error = ?e,
"Retry envelope failed re-verification"
);
}
}
});
if self
.beacon_processor_send
.try_send(WorkEvent {
drop_during_sync: false,
work: Work::Reprocess(
ReprocessQueueMessage::RetryEnvelope(
QueuedGossipEnvelope {
beacon_block_slot: envelope_slot,
beacon_block_root,
process_fn,
},
),
),
})
.is_err()
{
error!(
?beacon_block_root,
"Failed to queue envelope for EL retry"
);
}
}
EnvelopeError::BadSignature EnvelopeError::BadSignature
| EnvelopeError::BuilderIndexMismatch { .. } | EnvelopeError::BuilderIndexMismatch { .. }
| EnvelopeError::SlotMismatch { .. } | EnvelopeError::SlotMismatch { .. }