From 11684b0da0948d21cddeb531ad2b4d99f9b25d2c Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Tue, 28 Apr 2026 15:49:29 +0200 Subject: [PATCH] Complete envelope-lookup functionality and tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implementation: - payload_envelope_verification: implement the AvailabilityPending branch in the envelope import flow. Previously returned InternalError("Pending payload envelope not yet implemented") for any envelope whose data columns hadn't yet been received, blocking the end-to-end RPC import path. New `import_pending_execution_payload_envelope` marks the payload as received in fork choice and persists the envelope to the store; columns are still expected to arrive separately (gossip / engineGetBlobs / reconstruction) and persist their own ops. - sync manager: short-circuit `handle_unknown_parent_envelope` when the parent's payload was received between gossip-verification and the trigger reaching sync. No lookup is created; the trigger is treated as a no-op. - gossip→sync hook: when a Gloas envelope is imported via the gossip path, emit `SyncMessage::GossipEnvelopeImported { block_root }` so any lookups awaiting that parent envelope unblock without depending on the in-flight RPC response landing first. Closes the review-flagged race where a gossip-imported envelope left child lookups pinned. Tests (3 new): - envelope_already_received_skips_lookup — trigger after envelope already in fork choice creates zero lookups. - happy_path_unknown_parent_envelope — end-to-end RPC import path: lookups complete, head advances to the gossip block. - happy_path_unknown_parent_envelope_via_gossip — pending envelope-only lookup unblocked by a concurrent gossip import via the new sync hook. Existing tests updated: - bad_peer_envelope_rpc_failure / bad_peer_wrong_envelope_response now expect the lookup to retry and succeed (mirroring `bad_peer_*` tests for blocks/blobs/columns), reflecting the now-working import path. --- .../payload_envelope_verification/import.rs | 95 ++++++++++++++++++- .../src/payload_envelope_verification/mod.rs | 20 ++-- .../gossip_methods.rs | 9 +- beacon_node/network/src/sync/manager.rs | 27 ++++++ beacon_node/network/src/sync/tests/lookups.rs | 84 ++++++++++++++-- 5 files changed, 217 insertions(+), 18 deletions(-) diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index 5a6d3a1b7d..e40dc180b0 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -14,7 +14,8 @@ use super::{ }; use crate::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, - NotifyExecutionLayer, block_verification_types::AvailableBlockData, metrics, + NotifyExecutionLayer, block_verification::PayloadVerificationOutcome, + block_verification_types::AvailableBlockData, metrics, payload_envelope_verification::ExecutionPendingEnvelope, validator_monitor::get_slot_delay_ms, }; @@ -99,9 +100,18 @@ impl BeaconChain { self.import_available_execution_payload_envelope(Box::new(envelope)) .await } - ExecutedEnvelope::AvailabilityPending() => Err(EnvelopeError::InternalError( - "Pending payload envelope not yet implemented".to_owned(), - )), + ExecutedEnvelope::AvailabilityPending { + signed_envelope, + import_data, + payload_verification_outcome, + } => { + self.import_pending_execution_payload_envelope( + signed_envelope, + import_data, + payload_verification_outcome, + ) + .await + } } }; @@ -185,6 +195,39 @@ impl BeaconChain { )) } + /// Import an envelope whose data column availability has not yet been satisfied. + /// + /// Marks the block's payload as received in fork choice and persists the envelope to the + /// store, but does not write data column ops. Columns are expected to arrive separately + /// (gossip, engineGetBlobs, or reconstruction). + #[instrument(skip_all)] + pub async fn import_pending_execution_payload_envelope( + self: &Arc, + signed_envelope: Arc>, + import_data: EnvelopeImportData, + payload_verification_outcome: PayloadVerificationOutcome, + ) -> Result { + let EnvelopeImportData { + block_root, + _phantom, + } = import_data; + let block_root = { + let chain = self.clone(); + self.spawn_blocking_handle( + move || { + chain.import_execution_payload_envelope_pending_columns( + signed_envelope, + block_root, + payload_verification_outcome.payload_verification_status, + ) + }, + "payload_verification_handle", + ) + .await?? + }; + Ok(AvailabilityProcessingStatus::Imported(block_root)) + } + #[instrument(skip_all)] pub async fn import_available_execution_payload_envelope( self: &Arc, @@ -219,6 +262,50 @@ impl BeaconChain { Ok(AvailabilityProcessingStatus::Imported(block_root)) } + /// Same as `import_execution_payload_envelope` but for envelopes whose data columns + /// have not yet been received. Marks the payload as received in fork choice and + /// persists the envelope; columns are persisted separately as they arrive. + #[instrument(skip_all)] + fn import_execution_payload_envelope_pending_columns( + &self, + signed_envelope: Arc>, + block_root: Hash256, + payload_verification_status: PayloadVerificationStatus, + ) -> Result { + let fork_choice_reader = self.canonical_head.fork_choice_upgradable_read_lock(); + if !fork_choice_reader.contains_block(&block_root) { + return Err(EnvelopeError::BlockRootUnknown { block_root }); + } + + let mut fork_choice = parking_lot::RwLockUpgradableReadGuard::upgrade(fork_choice_reader); + fork_choice + .on_valid_payload_envelope_received(block_root) + .map_err(|e| EnvelopeError::InternalError(format!("{e:?}")))?; + + let db_write_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_DB_WRITE); + let ops = vec![StoreOp::PutPayloadEnvelope( + block_root, + signed_envelope.clone(), + )]; + let db_span = info_span!("persist_envelope_pending_columns").entered(); + if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) { + error!(error = ?e, "Database write failed for pending-columns envelope"); + return Err(e.into()); + } + drop(db_span); + drop(fork_choice); + + let envelope_time_imported = self.slot_clock.now_duration().unwrap_or(Duration::MAX); + metrics::stop_timer(db_write_timer); + self.import_envelope_update_metrics_and_events( + signed_envelope, + block_root, + payload_verification_status, + envelope_time_imported, + ); + Ok(block_root) + } + /// Accepts a fully-verified and available envelope and imports it into the chain without performing any /// additional verification. /// diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs index 51fc3f235d..7756e5cdbe 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -103,11 +103,16 @@ pub struct EnvelopeProcessingSnapshot { /// 1. `Available`: This envelope has been executed and also contains all data to consider it /// fully available. /// 2. `AvailabilityPending`: This envelope hasn't received all required blobs to consider it -/// fully available. +/// fully available. The envelope is still imported (fork-choice marks the block's payload +/// as received and the envelope is persisted); column persistence is handled separately +/// via gossip / engineGetBlobs as columns arrive. pub enum ExecutedEnvelope { Available(AvailableExecutedEnvelope), - // TODO(gloas) implement availability pending - AvailabilityPending(), + AvailabilityPending { + signed_envelope: Arc>, + import_data: EnvelopeImportData, + payload_verification_outcome: PayloadVerificationOutcome, + }, } impl ExecutedEnvelope { @@ -124,11 +129,14 @@ impl ExecutedEnvelope { payload_verification_outcome, )) } - // TODO(gloas) implement availability pending MaybeAvailableEnvelope::AvailabilityPending { block_hash: _, - envelope: _, - } => Self::AvailabilityPending(), + envelope: signed_envelope, + } => Self::AvailabilityPending { + signed_envelope, + import_data, + payload_verification_outcome, + }, } } } diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index fc95783975..bb67ec2beb 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -3983,8 +3983,13 @@ impl NetworkBeaconProcessor { // register_process_result_metrics(&result, metrics::BlockSource::Gossip, "envelope"); match &result { - Ok(AvailabilityProcessingStatus::Imported(_)) - | Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => { + Ok(AvailabilityProcessingStatus::Imported(block_root)) => { + // Notify sync so any pending child lookup awaiting this parent envelope unblocks. + self.send_sync_message(SyncMessage::GossipEnvelopeImported { + block_root: *block_root, + }); + } + Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => { // Nothing to do } Err(e) => match e { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index f60c3949c9..869e7e32b0 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -154,6 +154,10 @@ pub enum SyncMessage { /// A block's parent is known but its execution payload envelope has not been received yet. UnknownParentEnvelope(PeerId, Arc>, Hash256), + /// An execution payload envelope has been imported via the local gossip path. + /// Sync uses this to unblock any child lookups that were awaiting this parent envelope. + GossipEnvelopeImported { block_root: Hash256 }, + /// A partial data column with an unknown parent has been received. UnknownParentPartialDataColumn { peer_id: PeerId, @@ -961,6 +965,14 @@ impl SyncManager { }), ); } + SyncMessage::GossipEnvelopeImported { block_root } => { + debug!( + %block_root, + "Gossip-imported envelope; unblocking awaiting child lookups" + ); + self.block_lookups + .continue_envelope_child_lookups(block_root, &mut self.network); + } SyncMessage::UnknownParentPartialDataColumn { peer_id, block_root, @@ -1096,6 +1108,21 @@ impl SyncManager { slot: Slot, block_component: BlockComponent, ) { + // Defensive: if the parent's payload envelope was already received between when + // gossip-verification raised `ParentEnvelopeUnknown` and now, no lookup is needed. + if self + .chain + .canonical_head + .fork_choice_read_lock() + .is_payload_received(&parent_root) + { + debug!( + %block_root, + %parent_root, + "Parent envelope already received, skipping envelope lookup" + ); + return; + } match self.should_search_for_block(Some(slot), &peer_id) { Ok(_) => { if self.block_lookups.search_child_and_parent_envelope( diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 4f03924eef..35c45eb928 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -1304,6 +1304,32 @@ impl TestRig { self.harness.chain.recompute_head_at_current_slot().await; } + /// Persist a Gloas execution payload envelope into the local chain and mark the + /// block as "payload received" in fork choice. Mimics the side-effects of the + /// gossip-import path, including the `GossipEnvelopeImported` sync notification. + /// The caller is responsible for ensuring the corresponding beacon block is + /// already imported. + async fn import_envelope_for_block_root(&mut self, block_root: Hash256) { + let envelope = self + .network_envelopes_by_root + .get(&block_root) + .unwrap_or_else(|| panic!("no envelope cached for {block_root:?}")) + .as_ref() + .clone(); + self.harness + .chain + .store + .put_payload_envelope(&block_root, &envelope) + .expect("should store envelope"); + self.harness + .chain + .canonical_head + .fork_choice_write_lock() + .on_valid_payload_envelope_received(block_root) + .expect("should update fork choice with envelope"); + self.push_sync_message(SyncMessage::GossipEnvelopeImported { block_root }); + } + /// Import a block directly into the chain without going through lookup sync async fn import_block_by_root(&mut self, block_root: Hash256) { let range_sync_block = self @@ -2869,8 +2895,8 @@ async fn envelope_lookup_issues_by_root_rpc() { ); } -/// If the envelope RPC errors out, the envelope-only lookup is dropped and the -/// drop cascades to the awaiting child lookup. +/// One transient RPC error on the envelope request → lookup retries with the same peer +/// and completes successfully. Mirrors the `bad_peer_rpc_failure` shape used for blocks. #[tokio::test] async fn bad_peer_envelope_rpc_failure() { let Some(mut r) = setup_unknown_parent_envelope_scenario().await else { @@ -2879,11 +2905,13 @@ async fn bad_peer_envelope_rpc_failure() { r.trigger_with_last_unknown_parent_envelope(); r.simulate(SimulateConfig::new().return_rpc_error(RPCError::IoError("test".into()))) .await; - r.assert_failed_lookup_sync(); + r.assert_successful_lookup_sync(); + r.assert_head_slot(2); } -/// Peer responds with an envelope for a different block_root than was requested. -/// The request-items layer must reject as `UnrequestedBlockRoot`; both lookups drop. +/// Peer responds once with an envelope for a different block_root than requested. +/// The request-items layer raises `UnrequestedBlockRoot`, the peer is penalised, and +/// the lookup retries successfully on the next request. #[tokio::test] async fn bad_peer_wrong_envelope_response() { let Some(mut r) = setup_unknown_parent_envelope_scenario().await else { @@ -2892,8 +2920,52 @@ async fn bad_peer_wrong_envelope_response() { r.trigger_with_last_unknown_parent_envelope(); r.simulate(SimulateConfig::new().return_wrong_envelope_once()) .await; - r.assert_failed_lookup_sync(); r.assert_penalties_of_type("UnrequestedBlockRoot"); + r.assert_successful_lookup_sync(); + r.assert_head_slot(2); +} + +/// Trigger `UnknownParentEnvelope` when the parent's payload envelope is already +/// in fork choice. Sync should treat the trigger as a no-op and create no lookups. +#[tokio::test] +async fn envelope_already_received_skips_lookup() { + let Some(mut r) = setup_unknown_parent_envelope_scenario().await else { + return; + }; + let parent_root = r.get_last_block().block_cloned().parent_root(); + r.import_envelope_for_block_root(parent_root).await; + r.trigger_with_last_unknown_parent_envelope(); + r.assert_single_lookups_count(0); +} + +/// End-to-end: an envelope-only RPC lookup completes, the cached child block is +/// processed, and the head advances to the gossip block. +#[tokio::test] +async fn happy_path_unknown_parent_envelope() { + let Some(mut r) = setup_unknown_parent_envelope_scenario().await else { + return; + }; + r.trigger_with_last_unknown_parent_envelope(); + r.simulate(SimulateConfig::happy_path()).await; + r.assert_successful_lookup_sync(); + r.assert_head_slot(2); + r.assert_no_penalties(); +} + +/// While an envelope-only RPC lookup is pending, the same envelope is imported +/// via the gossip path. The child lookup should still unblock and import. +#[tokio::test] +async fn happy_path_unknown_parent_envelope_via_gossip() { + let Some(mut r) = setup_unknown_parent_envelope_scenario().await else { + return; + }; + let parent_root = r.get_last_block().block_cloned().parent_root(); + r.trigger_with_last_unknown_parent_envelope(); + // Import the envelope via the local gossip path before any RPC response arrives. + r.import_envelope_for_block_root(parent_root).await; + r.simulate(SimulateConfig::happy_path()).await; + r.assert_successful_lookup_sync(); + r.assert_head_slot(2); } /// Peer returns the requested envelope but with a corrupted signature. Gossip