mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-31 21:27:12 +00:00
Complete envelope-lookup functionality and tests
Implementation:
- payload_envelope_verification: implement the AvailabilityPending branch
in the envelope import flow. Previously returned
InternalError("Pending payload envelope not yet implemented") for any
envelope whose data columns hadn't yet been received, blocking the
end-to-end RPC import path. New `import_pending_execution_payload_envelope`
marks the payload as received in fork choice and persists the envelope to
the store; columns are still expected to arrive separately (gossip /
engineGetBlobs / reconstruction) and persist their own ops.
- sync manager: short-circuit `handle_unknown_parent_envelope` when the
parent's payload was received between gossip-verification and the trigger
reaching sync. No lookup is created; the trigger is treated as a no-op.
- gossip→sync hook: when a Gloas envelope is imported via the gossip path,
emit `SyncMessage::GossipEnvelopeImported { block_root }` so any lookups
awaiting that parent envelope unblock without depending on the in-flight
RPC response landing first. Closes the review-flagged race where a
gossip-imported envelope left child lookups pinned.
Tests (3 new):
- envelope_already_received_skips_lookup — trigger after envelope already
in fork choice creates zero lookups.
- happy_path_unknown_parent_envelope — end-to-end RPC import path: lookups
complete, head advances to the gossip block.
- happy_path_unknown_parent_envelope_via_gossip — pending envelope-only
lookup unblocked by a concurrent gossip import via the new sync hook.
Existing tests updated:
- bad_peer_envelope_rpc_failure / bad_peer_wrong_envelope_response now
expect the lookup to retry and succeed (mirroring `bad_peer_*` tests for
blocks/blobs/columns), reflecting the now-working import path.
This commit is contained in:
@@ -14,7 +14,8 @@ use super::{
|
||||
};
|
||||
use crate::{
|
||||
AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes,
|
||||
NotifyExecutionLayer, block_verification_types::AvailableBlockData, metrics,
|
||||
NotifyExecutionLayer, block_verification::PayloadVerificationOutcome,
|
||||
block_verification_types::AvailableBlockData, metrics,
|
||||
payload_envelope_verification::ExecutionPendingEnvelope, validator_monitor::get_slot_delay_ms,
|
||||
};
|
||||
|
||||
@@ -99,9 +100,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
self.import_available_execution_payload_envelope(Box::new(envelope))
|
||||
.await
|
||||
}
|
||||
ExecutedEnvelope::AvailabilityPending() => Err(EnvelopeError::InternalError(
|
||||
"Pending payload envelope not yet implemented".to_owned(),
|
||||
)),
|
||||
ExecutedEnvelope::AvailabilityPending {
|
||||
signed_envelope,
|
||||
import_data,
|
||||
payload_verification_outcome,
|
||||
} => {
|
||||
self.import_pending_execution_payload_envelope(
|
||||
signed_envelope,
|
||||
import_data,
|
||||
payload_verification_outcome,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -185,6 +195,39 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
))
|
||||
}
|
||||
|
||||
/// Import an envelope whose data column availability has not yet been satisfied.
|
||||
///
|
||||
/// Marks the block's payload as received in fork choice and persists the envelope to the
|
||||
/// store, but does not write data column ops. Columns are expected to arrive separately
|
||||
/// (gossip, engineGetBlobs, or reconstruction).
|
||||
#[instrument(skip_all)]
|
||||
pub async fn import_pending_execution_payload_envelope(
|
||||
self: &Arc<Self>,
|
||||
signed_envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||
import_data: EnvelopeImportData<T::EthSpec>,
|
||||
payload_verification_outcome: PayloadVerificationOutcome,
|
||||
) -> Result<AvailabilityProcessingStatus, EnvelopeError> {
|
||||
let EnvelopeImportData {
|
||||
block_root,
|
||||
_phantom,
|
||||
} = import_data;
|
||||
let block_root = {
|
||||
let chain = self.clone();
|
||||
self.spawn_blocking_handle(
|
||||
move || {
|
||||
chain.import_execution_payload_envelope_pending_columns(
|
||||
signed_envelope,
|
||||
block_root,
|
||||
payload_verification_outcome.payload_verification_status,
|
||||
)
|
||||
},
|
||||
"payload_verification_handle",
|
||||
)
|
||||
.await??
|
||||
};
|
||||
Ok(AvailabilityProcessingStatus::Imported(block_root))
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub async fn import_available_execution_payload_envelope(
|
||||
self: &Arc<Self>,
|
||||
@@ -219,6 +262,50 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
Ok(AvailabilityProcessingStatus::Imported(block_root))
|
||||
}
|
||||
|
||||
/// Same as `import_execution_payload_envelope` but for envelopes whose data columns
|
||||
/// have not yet been received. Marks the payload as received in fork choice and
|
||||
/// persists the envelope; columns are persisted separately as they arrive.
|
||||
#[instrument(skip_all)]
|
||||
fn import_execution_payload_envelope_pending_columns(
|
||||
&self,
|
||||
signed_envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||
block_root: Hash256,
|
||||
payload_verification_status: PayloadVerificationStatus,
|
||||
) -> Result<Hash256, EnvelopeError> {
|
||||
let fork_choice_reader = self.canonical_head.fork_choice_upgradable_read_lock();
|
||||
if !fork_choice_reader.contains_block(&block_root) {
|
||||
return Err(EnvelopeError::BlockRootUnknown { block_root });
|
||||
}
|
||||
|
||||
let mut fork_choice = parking_lot::RwLockUpgradableReadGuard::upgrade(fork_choice_reader);
|
||||
fork_choice
|
||||
.on_valid_payload_envelope_received(block_root)
|
||||
.map_err(|e| EnvelopeError::InternalError(format!("{e:?}")))?;
|
||||
|
||||
let db_write_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_DB_WRITE);
|
||||
let ops = vec![StoreOp::PutPayloadEnvelope(
|
||||
block_root,
|
||||
signed_envelope.clone(),
|
||||
)];
|
||||
let db_span = info_span!("persist_envelope_pending_columns").entered();
|
||||
if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) {
|
||||
error!(error = ?e, "Database write failed for pending-columns envelope");
|
||||
return Err(e.into());
|
||||
}
|
||||
drop(db_span);
|
||||
drop(fork_choice);
|
||||
|
||||
let envelope_time_imported = self.slot_clock.now_duration().unwrap_or(Duration::MAX);
|
||||
metrics::stop_timer(db_write_timer);
|
||||
self.import_envelope_update_metrics_and_events(
|
||||
signed_envelope,
|
||||
block_root,
|
||||
payload_verification_status,
|
||||
envelope_time_imported,
|
||||
);
|
||||
Ok(block_root)
|
||||
}
|
||||
|
||||
/// Accepts a fully-verified and available envelope and imports it into the chain without performing any
|
||||
/// additional verification.
|
||||
///
|
||||
|
||||
@@ -103,11 +103,16 @@ pub struct EnvelopeProcessingSnapshot<E: EthSpec> {
|
||||
/// 1. `Available`: This envelope has been executed and also contains all data to consider it
|
||||
/// fully available.
|
||||
/// 2. `AvailabilityPending`: This envelope hasn't received all required blobs to consider it
|
||||
/// fully available.
|
||||
/// fully available. The envelope is still imported (fork-choice marks the block's payload
|
||||
/// as received and the envelope is persisted); column persistence is handled separately
|
||||
/// via gossip / engineGetBlobs as columns arrive.
|
||||
pub enum ExecutedEnvelope<E: EthSpec> {
|
||||
Available(AvailableExecutedEnvelope<E>),
|
||||
// TODO(gloas) implement availability pending
|
||||
AvailabilityPending(),
|
||||
AvailabilityPending {
|
||||
signed_envelope: Arc<SignedExecutionPayloadEnvelope<E>>,
|
||||
import_data: EnvelopeImportData<E>,
|
||||
payload_verification_outcome: PayloadVerificationOutcome,
|
||||
},
|
||||
}
|
||||
|
||||
impl<E: EthSpec> ExecutedEnvelope<E> {
|
||||
@@ -124,11 +129,14 @@ impl<E: EthSpec> ExecutedEnvelope<E> {
|
||||
payload_verification_outcome,
|
||||
))
|
||||
}
|
||||
// TODO(gloas) implement availability pending
|
||||
MaybeAvailableEnvelope::AvailabilityPending {
|
||||
block_hash: _,
|
||||
envelope: _,
|
||||
} => Self::AvailabilityPending(),
|
||||
envelope: signed_envelope,
|
||||
} => Self::AvailabilityPending {
|
||||
signed_envelope,
|
||||
import_data,
|
||||
payload_verification_outcome,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3983,8 +3983,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
// register_process_result_metrics(&result, metrics::BlockSource::Gossip, "envelope");
|
||||
|
||||
match &result {
|
||||
Ok(AvailabilityProcessingStatus::Imported(_))
|
||||
| Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => {
|
||||
Ok(AvailabilityProcessingStatus::Imported(block_root)) => {
|
||||
// Notify sync so any pending child lookup awaiting this parent envelope unblocks.
|
||||
self.send_sync_message(SyncMessage::GossipEnvelopeImported {
|
||||
block_root: *block_root,
|
||||
});
|
||||
}
|
||||
Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => {
|
||||
// Nothing to do
|
||||
}
|
||||
Err(e) => match e {
|
||||
|
||||
@@ -154,6 +154,10 @@ pub enum SyncMessage<E: EthSpec> {
|
||||
/// A block's parent is known but its execution payload envelope has not been received yet.
|
||||
UnknownParentEnvelope(PeerId, Arc<SignedBeaconBlock<E>>, Hash256),
|
||||
|
||||
/// An execution payload envelope has been imported via the local gossip path.
|
||||
/// Sync uses this to unblock any child lookups that were awaiting this parent envelope.
|
||||
GossipEnvelopeImported { block_root: Hash256 },
|
||||
|
||||
/// A partial data column with an unknown parent has been received.
|
||||
UnknownParentPartialDataColumn {
|
||||
peer_id: PeerId,
|
||||
@@ -961,6 +965,14 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
}),
|
||||
);
|
||||
}
|
||||
SyncMessage::GossipEnvelopeImported { block_root } => {
|
||||
debug!(
|
||||
%block_root,
|
||||
"Gossip-imported envelope; unblocking awaiting child lookups"
|
||||
);
|
||||
self.block_lookups
|
||||
.continue_envelope_child_lookups(block_root, &mut self.network);
|
||||
}
|
||||
SyncMessage::UnknownParentPartialDataColumn {
|
||||
peer_id,
|
||||
block_root,
|
||||
@@ -1096,6 +1108,21 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
slot: Slot,
|
||||
block_component: BlockComponent<T::EthSpec>,
|
||||
) {
|
||||
// Defensive: if the parent's payload envelope was already received between when
|
||||
// gossip-verification raised `ParentEnvelopeUnknown` and now, no lookup is needed.
|
||||
if self
|
||||
.chain
|
||||
.canonical_head
|
||||
.fork_choice_read_lock()
|
||||
.is_payload_received(&parent_root)
|
||||
{
|
||||
debug!(
|
||||
%block_root,
|
||||
%parent_root,
|
||||
"Parent envelope already received, skipping envelope lookup"
|
||||
);
|
||||
return;
|
||||
}
|
||||
match self.should_search_for_block(Some(slot), &peer_id) {
|
||||
Ok(_) => {
|
||||
if self.block_lookups.search_child_and_parent_envelope(
|
||||
|
||||
@@ -1304,6 +1304,32 @@ impl TestRig {
|
||||
self.harness.chain.recompute_head_at_current_slot().await;
|
||||
}
|
||||
|
||||
/// Persist a Gloas execution payload envelope into the local chain and mark the
|
||||
/// block as "payload received" in fork choice. Mimics the side-effects of the
|
||||
/// gossip-import path, including the `GossipEnvelopeImported` sync notification.
|
||||
/// The caller is responsible for ensuring the corresponding beacon block is
|
||||
/// already imported.
|
||||
async fn import_envelope_for_block_root(&mut self, block_root: Hash256) {
|
||||
let envelope = self
|
||||
.network_envelopes_by_root
|
||||
.get(&block_root)
|
||||
.unwrap_or_else(|| panic!("no envelope cached for {block_root:?}"))
|
||||
.as_ref()
|
||||
.clone();
|
||||
self.harness
|
||||
.chain
|
||||
.store
|
||||
.put_payload_envelope(&block_root, &envelope)
|
||||
.expect("should store envelope");
|
||||
self.harness
|
||||
.chain
|
||||
.canonical_head
|
||||
.fork_choice_write_lock()
|
||||
.on_valid_payload_envelope_received(block_root)
|
||||
.expect("should update fork choice with envelope");
|
||||
self.push_sync_message(SyncMessage::GossipEnvelopeImported { block_root });
|
||||
}
|
||||
|
||||
/// Import a block directly into the chain without going through lookup sync
|
||||
async fn import_block_by_root(&mut self, block_root: Hash256) {
|
||||
let range_sync_block = self
|
||||
@@ -2869,8 +2895,8 @@ async fn envelope_lookup_issues_by_root_rpc() {
|
||||
);
|
||||
}
|
||||
|
||||
/// If the envelope RPC errors out, the envelope-only lookup is dropped and the
|
||||
/// drop cascades to the awaiting child lookup.
|
||||
/// One transient RPC error on the envelope request → lookup retries with the same peer
|
||||
/// and completes successfully. Mirrors the `bad_peer_rpc_failure` shape used for blocks.
|
||||
#[tokio::test]
|
||||
async fn bad_peer_envelope_rpc_failure() {
|
||||
let Some(mut r) = setup_unknown_parent_envelope_scenario().await else {
|
||||
@@ -2879,11 +2905,13 @@ async fn bad_peer_envelope_rpc_failure() {
|
||||
r.trigger_with_last_unknown_parent_envelope();
|
||||
r.simulate(SimulateConfig::new().return_rpc_error(RPCError::IoError("test".into())))
|
||||
.await;
|
||||
r.assert_failed_lookup_sync();
|
||||
r.assert_successful_lookup_sync();
|
||||
r.assert_head_slot(2);
|
||||
}
|
||||
|
||||
/// Peer responds with an envelope for a different block_root than was requested.
|
||||
/// The request-items layer must reject as `UnrequestedBlockRoot`; both lookups drop.
|
||||
/// Peer responds once with an envelope for a different block_root than requested.
|
||||
/// The request-items layer raises `UnrequestedBlockRoot`, the peer is penalised, and
|
||||
/// the lookup retries successfully on the next request.
|
||||
#[tokio::test]
|
||||
async fn bad_peer_wrong_envelope_response() {
|
||||
let Some(mut r) = setup_unknown_parent_envelope_scenario().await else {
|
||||
@@ -2892,8 +2920,52 @@ async fn bad_peer_wrong_envelope_response() {
|
||||
r.trigger_with_last_unknown_parent_envelope();
|
||||
r.simulate(SimulateConfig::new().return_wrong_envelope_once())
|
||||
.await;
|
||||
r.assert_failed_lookup_sync();
|
||||
r.assert_penalties_of_type("UnrequestedBlockRoot");
|
||||
r.assert_successful_lookup_sync();
|
||||
r.assert_head_slot(2);
|
||||
}
|
||||
|
||||
/// Trigger `UnknownParentEnvelope` when the parent's payload envelope is already
|
||||
/// in fork choice. Sync should treat the trigger as a no-op and create no lookups.
|
||||
#[tokio::test]
|
||||
async fn envelope_already_received_skips_lookup() {
|
||||
let Some(mut r) = setup_unknown_parent_envelope_scenario().await else {
|
||||
return;
|
||||
};
|
||||
let parent_root = r.get_last_block().block_cloned().parent_root();
|
||||
r.import_envelope_for_block_root(parent_root).await;
|
||||
r.trigger_with_last_unknown_parent_envelope();
|
||||
r.assert_single_lookups_count(0);
|
||||
}
|
||||
|
||||
/// End-to-end: an envelope-only RPC lookup completes, the cached child block is
|
||||
/// processed, and the head advances to the gossip block.
|
||||
#[tokio::test]
|
||||
async fn happy_path_unknown_parent_envelope() {
|
||||
let Some(mut r) = setup_unknown_parent_envelope_scenario().await else {
|
||||
return;
|
||||
};
|
||||
r.trigger_with_last_unknown_parent_envelope();
|
||||
r.simulate(SimulateConfig::happy_path()).await;
|
||||
r.assert_successful_lookup_sync();
|
||||
r.assert_head_slot(2);
|
||||
r.assert_no_penalties();
|
||||
}
|
||||
|
||||
/// While an envelope-only RPC lookup is pending, the same envelope is imported
|
||||
/// via the gossip path. The child lookup should still unblock and import.
|
||||
#[tokio::test]
|
||||
async fn happy_path_unknown_parent_envelope_via_gossip() {
|
||||
let Some(mut r) = setup_unknown_parent_envelope_scenario().await else {
|
||||
return;
|
||||
};
|
||||
let parent_root = r.get_last_block().block_cloned().parent_root();
|
||||
r.trigger_with_last_unknown_parent_envelope();
|
||||
// Import the envelope via the local gossip path before any RPC response arrives.
|
||||
r.import_envelope_for_block_root(parent_root).await;
|
||||
r.simulate(SimulateConfig::happy_path()).await;
|
||||
r.assert_successful_lookup_sync();
|
||||
r.assert_head_slot(2);
|
||||
}
|
||||
|
||||
/// Peer returns the requested envelope but with a corrupted signature. Gossip
|
||||
|
||||
Reference in New Issue
Block a user