diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs index dc82a8d469..5583213785 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs @@ -3,8 +3,8 @@ use std::sync::Arc; use execution_layer::{NewPayloadRequest, NewPayloadRequestGloas}; use fork_choice::PayloadVerificationStatus; use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash; -use tracing::warn; -use types::{SignedBeaconBlock, SignedExecutionPayloadEnvelope}; +use tracing::{info, warn}; +use types::{BeaconBlockRef, SignedBeaconBlock, SignedExecutionPayloadEnvelope}; use crate::{ BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer, @@ -86,12 +86,25 @@ impl PayloadNotifier { .map(kzg_commitment_to_versioned_hash) .collect(); + // Heze and Gloas share identical payload wire formats; only the engine API + // method version differs (V6 for Heze, V5 for Gloas). Set is_heze_fork so + // the dispatch in http.rs calls engine_newPayloadV6 for Heze blocks. + let block_fork = block.message().fork_name_unchecked(); + let is_heze_fork = matches!(block.message(), BeaconBlockRef::Heze(_)); + info!( + ?block_fork, + is_heze_fork, + slot = ?envelope.message.slot(), + "[FOCIL DEBUG] build_new_payload_request fork check" + ); + Ok(NewPayloadRequest::Gloas(NewPayloadRequestGloas { execution_payload: &envelope.message.payload, versioned_hashes, parent_beacon_block_root: envelope.message.parent_beacon_block_root, execution_requests: &envelope.message.execution_requests, il_transactions: Default::default(), + is_heze_fork, })) } } diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index fb1a67a1eb..fed06dce0f 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -2900,6 +2900,7 @@ where parent_beacon_block_root: block.message().parent_root(), execution_requests: &signed_envelope.message.execution_requests, il_transactions: Default::default(), + is_heze_fork: false, }); self.chain diff --git a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs index 38306b3bb6..56509c93f8 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs @@ -101,6 +101,8 @@ pub const RECONSTRUCTION_DEADLINE: (u64, u64) = (1, 4); pub enum ReprocessQueueMessage { /// A block that has been received early and we should queue for later processing. EarlyBlock(QueuedGossipBlock), + /// An execution payload envelope that arrived before its slot and should be queued for later processing. + EarlyEnvelope(QueuedGossipEnvelope), /// An execution payload envelope that references a block not yet in fork choice. UnknownBlockForEnvelope(QueuedGossipEnvelope), /// A gossip block for hash `X` is being imported, we should queue the rpc block for the same @@ -227,6 +229,8 @@ impl From for WorkEvent { enum InboundEvent { /// A gossip block that was queued for later processing and is ready for import. ReadyGossipBlock(QueuedGossipBlock), + /// An early envelope that has been queued until its slot arrives and is now ready for import. + ReadyEarlyEnvelope(QueuedGossipEnvelope), /// An envelope whose block has been imported and is now ready for processing. ReadyEnvelope(Hash256), /// A rpc block that was queued because the same gossip block was being imported @@ -254,6 +258,8 @@ struct ReprocessQueue { /* Queues */ /// Queue to manage scheduled early blocks. gossip_block_delay_queue: DelayQueue, + /// Queue to manage early envelopes (arrived before their slot). + early_envelope_delay_queue: DelayQueue, /// Queue to manage envelope timeouts (keyed by block root). envelope_delay_queue: DelayQueue, /// Queue to manage scheduled early blocks. @@ -290,6 +296,7 @@ struct ReprocessQueue { next_attestation: usize, next_lc_update: usize, early_block_debounce: TimeLatch, + early_envelope_debounce: TimeLatch, envelope_delay_debounce: TimeLatch, rpc_block_debounce: TimeLatch, attestation_delay_debounce: TimeLatch, @@ -340,6 +347,15 @@ impl Stream for ReprocessQueue { Poll::Ready(None) | Poll::Pending => (), } + match self.early_envelope_delay_queue.poll_expired(cx) { + Poll::Ready(Some(queued_envelope)) => { + return Poll::Ready(Some(InboundEvent::ReadyEarlyEnvelope( + queued_envelope.into_inner(), + ))); + } + Poll::Ready(None) | Poll::Pending => (), + } + match self.envelope_delay_queue.poll_expired(cx) { Poll::Ready(Some(block_root)) => { return Poll::Ready(Some(InboundEvent::ReadyEnvelope(block_root.into_inner()))); @@ -450,6 +466,7 @@ impl ReprocessQueue { work_reprocessing_rx, ready_work_tx, gossip_block_delay_queue: DelayQueue::new(), + early_envelope_delay_queue: DelayQueue::new(), envelope_delay_queue: DelayQueue::new(), rpc_block_delay_queue: DelayQueue::new(), attestations_delay_queue: DelayQueue::new(), @@ -467,6 +484,7 @@ impl ReprocessQueue { next_attestation: 0, next_lc_update: 0, early_block_debounce: TimeLatch::default(), + early_envelope_debounce: TimeLatch::default(), envelope_delay_debounce: TimeLatch::default(), rpc_block_debounce: TimeLatch::default(), attestation_delay_debounce: TimeLatch::default(), @@ -533,6 +551,33 @@ impl ReprocessQueue { } } } + // An envelope that arrived before its slot. Queue it until the appropriate slot arrives. + InboundEvent::Msg(EarlyEnvelope(early_envelope)) => { + let envelope_slot = early_envelope.beacon_block_slot; + let block_root = early_envelope.beacon_block_root; + + if let Some(duration_till_slot) = self.slot_clock.duration_to_slot(envelope_slot) { + self.early_envelope_delay_queue.insert( + early_envelope, + duration_till_slot + ADDITIONAL_QUEUED_BLOCK_DELAY, + ); + } else { + // Slot has already arrived; dispatch immediately if possible. + if let Some(now) = self.slot_clock.now() + && envelope_slot <= now + { + if self + .ready_work_tx + .try_send(ReadyWork::Envelope(early_envelope)) + .is_err() + { + error!(?block_root, "Failed to send early envelope for immediate processing"); + } + } else { + debug!(?block_root, %envelope_slot, "Dropping early envelope, cannot determine slot timing"); + } + } + } // An envelope that references an unknown block. Queue it until the block is // imported, or until the timeout expires. InboundEvent::Msg(UnknownBlockForEnvelope(queued_envelope)) => { @@ -900,6 +945,17 @@ impl ReprocessQueue { error!("Failed to pop queued block"); } } + // An early envelope whose slot has now arrived; dispatch for processing. + InboundEvent::ReadyEarlyEnvelope(ready_envelope) => { + let block_root = ready_envelope.beacon_block_root; + if self + .ready_work_tx + .try_send(ReadyWork::Envelope(ready_envelope)) + .is_err() + { + error!(?block_root, "Failed to send early envelope after slot arrived"); + } + } // An envelope's timeout has expired. Send it for processing regardless of // whether the block has been imported. InboundEvent::ReadyEnvelope(block_root) => { diff --git a/beacon_node/execution_layer/src/engine_api/new_payload_request.rs b/beacon_node/execution_layer/src/engine_api/new_payload_request.rs index b26908ead1..b51c079f66 100644 --- a/beacon_node/execution_layer/src/engine_api/new_payload_request.rs +++ b/beacon_node/execution_layer/src/engine_api/new_payload_request.rs @@ -54,6 +54,10 @@ pub struct NewPayloadRequest<'block, E: EthSpec> { pub execution_requests: &'block ExecutionRequests, #[superstruct(only(Heze, Gloas))] pub il_transactions: Transactions, + /// When true, this Gloas-shaped request must use engine_newPayloadV6 (Heze fork). + /// Gloas and Heze have identical payload wire formats; only the method version differs. + #[superstruct(only(Gloas))] + pub is_heze_fork: bool, } impl<'block, E: EthSpec> NewPayloadRequest<'block, E> { diff --git a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs index 94cf6afed1..0f783c3373 100644 --- a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs +++ b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs @@ -129,11 +129,17 @@ pub async fn handle_rpc( }) .map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?, ENGINE_NEW_PAYLOAD_V5 => { + // V5 is for Gloas; fall back to Heze for backward compat get_param::>(params, 0) .map(|jep| JsonExecutionPayload::Gloas(jep)) + .or_else(|_| { + get_param::>(params, 0) + .map(|jep| JsonExecutionPayload::Heze(jep)) + }) .map_err(|s| (s, BAD_PARAMS_ERROR_CODE))? } ENGINE_NEW_PAYLOAD_V6 => { + // V6 is for Heze (Bogota EL fork) get_param::>(params, 0) .map(|jep| JsonExecutionPayload::Heze(jep)) .map_err(|s| (s, BAD_PARAMS_ERROR_CODE))? diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 654c5813a8..826e4a8c50 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -3978,7 +3978,7 @@ impl NetworkBeaconProcessor { // TODO(gloas) update metrics to note how early the envelope arrived let inner_self = self.clone(); - let _process_fn = Box::pin(async move { + let process_fn = Box::pin(async move { inner_self .process_gossip_verified_execution_payload_envelope( peer_id, @@ -3987,7 +3987,26 @@ impl NetworkBeaconProcessor { .await; }); - // TODO(gloas) send to reprocess queue + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(ReprocessQueueMessage::EarlyEnvelope( + QueuedGossipEnvelope { + beacon_block_slot: envelope_slot, + beacon_block_root, + process_fn, + }, + )), + }) + .is_err() + { + error!( + %envelope_slot, + ?beacon_block_root, + "Failed to defer early envelope import" + ); + } None } Ok(_) => Some(verified_envelope),