mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-30 04:37:13 +00:00
wip: early envelope reprocessing and is_heze_fork plumbing
This commit is contained in:
@@ -3,8 +3,8 @@ use std::sync::Arc;
|
||||
use execution_layer::{NewPayloadRequest, NewPayloadRequestGloas};
|
||||
use fork_choice::PayloadVerificationStatus;
|
||||
use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash;
|
||||
use tracing::warn;
|
||||
use types::{SignedBeaconBlock, SignedExecutionPayloadEnvelope};
|
||||
use tracing::{info, warn};
|
||||
use types::{BeaconBlockRef, SignedBeaconBlock, SignedExecutionPayloadEnvelope};
|
||||
|
||||
use crate::{
|
||||
BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer,
|
||||
@@ -86,12 +86,25 @@ impl<T: BeaconChainTypes> PayloadNotifier<T> {
|
||||
.map(kzg_commitment_to_versioned_hash)
|
||||
.collect();
|
||||
|
||||
// Heze and Gloas share identical payload wire formats; only the engine API
|
||||
// method version differs (V6 for Heze, V5 for Gloas). Set is_heze_fork so
|
||||
// the dispatch in http.rs calls engine_newPayloadV6 for Heze blocks.
|
||||
let block_fork = block.message().fork_name_unchecked();
|
||||
let is_heze_fork = matches!(block.message(), BeaconBlockRef::Heze(_));
|
||||
info!(
|
||||
?block_fork,
|
||||
is_heze_fork,
|
||||
slot = ?envelope.message.slot(),
|
||||
"[FOCIL DEBUG] build_new_payload_request fork check"
|
||||
);
|
||||
|
||||
Ok(NewPayloadRequest::Gloas(NewPayloadRequestGloas {
|
||||
execution_payload: &envelope.message.payload,
|
||||
versioned_hashes,
|
||||
parent_beacon_block_root: envelope.message.parent_beacon_block_root,
|
||||
execution_requests: &envelope.message.execution_requests,
|
||||
il_transactions: Default::default(),
|
||||
is_heze_fork,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2900,6 +2900,7 @@ where
|
||||
parent_beacon_block_root: block.message().parent_root(),
|
||||
execution_requests: &signed_envelope.message.execution_requests,
|
||||
il_transactions: Default::default(),
|
||||
is_heze_fork: false,
|
||||
});
|
||||
|
||||
self.chain
|
||||
|
||||
@@ -101,6 +101,8 @@ pub const RECONSTRUCTION_DEADLINE: (u64, u64) = (1, 4);
|
||||
pub enum ReprocessQueueMessage {
|
||||
/// A block that has been received early and we should queue for later processing.
|
||||
EarlyBlock(QueuedGossipBlock),
|
||||
/// An execution payload envelope that arrived before its slot and should be queued for later processing.
|
||||
EarlyEnvelope(QueuedGossipEnvelope),
|
||||
/// An execution payload envelope that references a block not yet in fork choice.
|
||||
UnknownBlockForEnvelope(QueuedGossipEnvelope),
|
||||
/// A gossip block for hash `X` is being imported, we should queue the rpc block for the same
|
||||
@@ -227,6 +229,8 @@ impl<E: EthSpec> From<QueuedBackfillBatch> for WorkEvent<E> {
|
||||
enum InboundEvent {
|
||||
/// A gossip block that was queued for later processing and is ready for import.
|
||||
ReadyGossipBlock(QueuedGossipBlock),
|
||||
/// An early envelope that has been queued until its slot arrives and is now ready for import.
|
||||
ReadyEarlyEnvelope(QueuedGossipEnvelope),
|
||||
/// An envelope whose block has been imported and is now ready for processing.
|
||||
ReadyEnvelope(Hash256),
|
||||
/// A rpc block that was queued because the same gossip block was being imported
|
||||
@@ -254,6 +258,8 @@ struct ReprocessQueue<S> {
|
||||
/* Queues */
|
||||
/// Queue to manage scheduled early blocks.
|
||||
gossip_block_delay_queue: DelayQueue<QueuedGossipBlock>,
|
||||
/// Queue to manage early envelopes (arrived before their slot).
|
||||
early_envelope_delay_queue: DelayQueue<QueuedGossipEnvelope>,
|
||||
/// Queue to manage envelope timeouts (keyed by block root).
|
||||
envelope_delay_queue: DelayQueue<Hash256>,
|
||||
/// Queue to manage scheduled early blocks.
|
||||
@@ -290,6 +296,7 @@ struct ReprocessQueue<S> {
|
||||
next_attestation: usize,
|
||||
next_lc_update: usize,
|
||||
early_block_debounce: TimeLatch,
|
||||
early_envelope_debounce: TimeLatch,
|
||||
envelope_delay_debounce: TimeLatch,
|
||||
rpc_block_debounce: TimeLatch,
|
||||
attestation_delay_debounce: TimeLatch,
|
||||
@@ -340,6 +347,15 @@ impl<S: SlotClock> Stream for ReprocessQueue<S> {
|
||||
Poll::Ready(None) | Poll::Pending => (),
|
||||
}
|
||||
|
||||
match self.early_envelope_delay_queue.poll_expired(cx) {
|
||||
Poll::Ready(Some(queued_envelope)) => {
|
||||
return Poll::Ready(Some(InboundEvent::ReadyEarlyEnvelope(
|
||||
queued_envelope.into_inner(),
|
||||
)));
|
||||
}
|
||||
Poll::Ready(None) | Poll::Pending => (),
|
||||
}
|
||||
|
||||
match self.envelope_delay_queue.poll_expired(cx) {
|
||||
Poll::Ready(Some(block_root)) => {
|
||||
return Poll::Ready(Some(InboundEvent::ReadyEnvelope(block_root.into_inner())));
|
||||
@@ -450,6 +466,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
||||
work_reprocessing_rx,
|
||||
ready_work_tx,
|
||||
gossip_block_delay_queue: DelayQueue::new(),
|
||||
early_envelope_delay_queue: DelayQueue::new(),
|
||||
envelope_delay_queue: DelayQueue::new(),
|
||||
rpc_block_delay_queue: DelayQueue::new(),
|
||||
attestations_delay_queue: DelayQueue::new(),
|
||||
@@ -467,6 +484,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
||||
next_attestation: 0,
|
||||
next_lc_update: 0,
|
||||
early_block_debounce: TimeLatch::default(),
|
||||
early_envelope_debounce: TimeLatch::default(),
|
||||
envelope_delay_debounce: TimeLatch::default(),
|
||||
rpc_block_debounce: TimeLatch::default(),
|
||||
attestation_delay_debounce: TimeLatch::default(),
|
||||
@@ -533,6 +551,33 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
||||
}
|
||||
}
|
||||
}
|
||||
// An envelope that arrived before its slot. Queue it until the appropriate slot arrives.
|
||||
InboundEvent::Msg(EarlyEnvelope(early_envelope)) => {
|
||||
let envelope_slot = early_envelope.beacon_block_slot;
|
||||
let block_root = early_envelope.beacon_block_root;
|
||||
|
||||
if let Some(duration_till_slot) = self.slot_clock.duration_to_slot(envelope_slot) {
|
||||
self.early_envelope_delay_queue.insert(
|
||||
early_envelope,
|
||||
duration_till_slot + ADDITIONAL_QUEUED_BLOCK_DELAY,
|
||||
);
|
||||
} else {
|
||||
// Slot has already arrived; dispatch immediately if possible.
|
||||
if let Some(now) = self.slot_clock.now()
|
||||
&& envelope_slot <= now
|
||||
{
|
||||
if self
|
||||
.ready_work_tx
|
||||
.try_send(ReadyWork::Envelope(early_envelope))
|
||||
.is_err()
|
||||
{
|
||||
error!(?block_root, "Failed to send early envelope for immediate processing");
|
||||
}
|
||||
} else {
|
||||
debug!(?block_root, %envelope_slot, "Dropping early envelope, cannot determine slot timing");
|
||||
}
|
||||
}
|
||||
}
|
||||
// An envelope that references an unknown block. Queue it until the block is
|
||||
// imported, or until the timeout expires.
|
||||
InboundEvent::Msg(UnknownBlockForEnvelope(queued_envelope)) => {
|
||||
@@ -900,6 +945,17 @@ impl<S: SlotClock> ReprocessQueue<S> {
|
||||
error!("Failed to pop queued block");
|
||||
}
|
||||
}
|
||||
// An early envelope whose slot has now arrived; dispatch for processing.
|
||||
InboundEvent::ReadyEarlyEnvelope(ready_envelope) => {
|
||||
let block_root = ready_envelope.beacon_block_root;
|
||||
if self
|
||||
.ready_work_tx
|
||||
.try_send(ReadyWork::Envelope(ready_envelope))
|
||||
.is_err()
|
||||
{
|
||||
error!(?block_root, "Failed to send early envelope after slot arrived");
|
||||
}
|
||||
}
|
||||
// An envelope's timeout has expired. Send it for processing regardless of
|
||||
// whether the block has been imported.
|
||||
InboundEvent::ReadyEnvelope(block_root) => {
|
||||
|
||||
@@ -54,6 +54,10 @@ pub struct NewPayloadRequest<'block, E: EthSpec> {
|
||||
pub execution_requests: &'block ExecutionRequests<E>,
|
||||
#[superstruct(only(Heze, Gloas))]
|
||||
pub il_transactions: Transactions<E>,
|
||||
/// When true, this Gloas-shaped request must use engine_newPayloadV6 (Heze fork).
|
||||
/// Gloas and Heze have identical payload wire formats; only the method version differs.
|
||||
#[superstruct(only(Gloas))]
|
||||
pub is_heze_fork: bool,
|
||||
}
|
||||
|
||||
impl<'block, E: EthSpec> NewPayloadRequest<'block, E> {
|
||||
|
||||
@@ -129,11 +129,17 @@ pub async fn handle_rpc<E: EthSpec>(
|
||||
})
|
||||
.map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?,
|
||||
ENGINE_NEW_PAYLOAD_V5 => {
|
||||
// V5 is for Gloas; fall back to Heze for backward compat
|
||||
get_param::<JsonExecutionPayloadGloas<E>>(params, 0)
|
||||
.map(|jep| JsonExecutionPayload::Gloas(jep))
|
||||
.or_else(|_| {
|
||||
get_param::<JsonExecutionPayloadHeze<E>>(params, 0)
|
||||
.map(|jep| JsonExecutionPayload::Heze(jep))
|
||||
})
|
||||
.map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?
|
||||
}
|
||||
ENGINE_NEW_PAYLOAD_V6 => {
|
||||
// V6 is for Heze (Bogota EL fork)
|
||||
get_param::<JsonExecutionPayloadHeze<E>>(params, 0)
|
||||
.map(|jep| JsonExecutionPayload::Heze(jep))
|
||||
.map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?
|
||||
|
||||
@@ -3978,7 +3978,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
// TODO(gloas) update metrics to note how early the envelope arrived
|
||||
|
||||
let inner_self = self.clone();
|
||||
let _process_fn = Box::pin(async move {
|
||||
let process_fn = Box::pin(async move {
|
||||
inner_self
|
||||
.process_gossip_verified_execution_payload_envelope(
|
||||
peer_id,
|
||||
@@ -3987,7 +3987,26 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
.await;
|
||||
});
|
||||
|
||||
// TODO(gloas) send to reprocess queue
|
||||
if self
|
||||
.beacon_processor_send
|
||||
.try_send(WorkEvent {
|
||||
drop_during_sync: false,
|
||||
work: Work::Reprocess(ReprocessQueueMessage::EarlyEnvelope(
|
||||
QueuedGossipEnvelope {
|
||||
beacon_block_slot: envelope_slot,
|
||||
beacon_block_root,
|
||||
process_fn,
|
||||
},
|
||||
)),
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
error!(
|
||||
%envelope_slot,
|
||||
?beacon_block_root,
|
||||
"Failed to defer early envelope import"
|
||||
);
|
||||
}
|
||||
None
|
||||
}
|
||||
Ok(_) => Some(verified_envelope),
|
||||
|
||||
Reference in New Issue
Block a user