From c8f69b5a736db41c4ed38d0eca78b2629c8c2930 Mon Sep 17 00:00:00 2001 From: Eitan Seri- Levi Date: Sun, 5 Apr 2026 02:15:57 -0700 Subject: [PATCH] Temp dart throws --- .../network_beacon_processor/sync_methods.rs | 57 +++++++++++++------ .../network/src/sync/backfill_sync/mod.rs | 3 +- beacon_node/network/src/sync/manager.rs | 39 ++++++++++++- .../network/src/sync/range_sync/chain.rs | 17 +++++- .../src/sync/range_sync/chain_collection.rs | 26 ++++++++- .../network/src/sync/range_sync/range.rs | 9 +++ 6 files changed, 129 insertions(+), 22 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 57d3d7d220..3f98b6c28f 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -41,6 +41,7 @@ pub enum ChainSegmentProcessId { BackSyncBatchId(Epoch), } +/// Returned when a chain segment import fails. /// Returned when a chain segment import fails. struct ChainSegmentFailed { /// To be displayed in logs. @@ -49,6 +50,13 @@ struct ChainSegmentFailed { peer_action: Option, } +/// Result of processing a batch of blocks. +enum BlockBatchResult { + Ok { imported_blocks: usize }, + ParentEnvelopeUnknown { parent_root: Hash256 }, + Err { imported_blocks: usize, failed: Option }, +} + impl NetworkBeaconProcessor { /// Returns an async closure which processes a beacon block received via RPC. /// @@ -633,7 +641,7 @@ impl NetworkBeaconProcessor { .process_blocks(downloaded_blocks.iter(), notify_execution_layer) .await { - (imported_blocks, Ok(_)) => { + BlockBatchResult::Ok { imported_blocks } => { debug!( batch_epoch = %epoch, first_block_slot = start_slot, @@ -647,17 +655,27 @@ impl NetworkBeaconProcessor { imported_blocks, } } - (imported_blocks, Err(e)) => { - debug!( - batch_epoch = %epoch, - first_block_slot = start_slot, - chain = chain_id, - last_block_slot = end_slot, - imported_blocks, - error = %e.message, - service = "sync", - "Batch processing failed"); - match e.peer_action { + BlockBatchResult::ParentEnvelopeUnknown { parent_root } => { + warn!( + batch_epoch = %epoch, + ?parent_root, + "Batch processing paused: parent envelope unknown" + ); + BatchProcessResult::ParentEnvelopeUnknown { parent_root } + } + BlockBatchResult::Err { imported_blocks, failed } => { + if let Some(e) = &failed { + debug!( + batch_epoch = %epoch, + first_block_slot = start_slot, + chain = chain_id, + last_block_slot = end_slot, + imported_blocks, + error = %e.message, + service = "sync", + "Batch processing failed"); + } + match failed.and_then(|e| e.peer_action) { Some(penalty) => BatchProcessResult::FaultyFailure { imported_blocks, penalty, @@ -758,7 +776,7 @@ impl NetworkBeaconProcessor { &self, downloaded_blocks: impl Iterator>, notify_execution_layer: NotifyExecutionLayer, - ) -> (usize, Result<(), ChainSegmentFailed>) { + ) -> BlockBatchResult { let blocks: Vec<_> = downloaded_blocks.cloned().collect(); match self .chain @@ -770,18 +788,25 @@ impl NetworkBeaconProcessor { if !imported_blocks.is_empty() { self.chain.recompute_head_at_current_slot().await; } - (imported_blocks.len(), Ok(())) + BlockBatchResult::Ok { imported_blocks: imported_blocks.len() } } ChainSegmentResult::Failed { imported_blocks, error, } => { metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_FAILED_TOTAL); - let r = self.handle_failed_chain_segment(error); if !imported_blocks.is_empty() { self.chain.recompute_head_at_current_slot().await; } - (imported_blocks.len(), r) + // Intercept ParentEnvelopeUnknown before normal error handling. + if let BlockError::ParentEnvelopeUnknown { parent_root } = error { + return BlockBatchResult::ParentEnvelopeUnknown { parent_root }; + } + let r = self.handle_failed_chain_segment(error); + BlockBatchResult::Err { + imported_blocks: imported_blocks.len(), + failed: r.err(), + } } } } diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 29beb96e5a..4fef78a47a 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -672,7 +672,8 @@ impl BackFillSync { } } } - BatchProcessResult::NonFaultyFailure => { + BatchProcessResult::NonFaultyFailure + | BatchProcessResult::ParentEnvelopeUnknown { .. } => { if let Err(e) = batch.processing_completed(BatchProcessingResult::NonFaultyFailure) { self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?; diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index c1c1029446..da67dd5dc4 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -233,6 +233,11 @@ pub enum BatchProcessResult { penalty: PeerAction, }, NonFaultyFailure, + /// The batch processing failed because the parent block's execution payload envelope + /// is not yet available. The chain should pause until the envelope is fetched. + ParentEnvelopeUnknown { + parent_root: Hash256, + }, } /// The result of processing multiple data columns. @@ -972,9 +977,20 @@ impl SyncManager { SyncMessage::BlockComponentProcessed { process_type, result, - } => self - .block_lookups - .on_processing_result(process_type, result, &mut self.network), + } => { + // If a payload envelope was successfully imported, resume any range + // sync chains that were waiting for it. + if let BlockProcessType::SinglePayloadEnvelope { block_root, .. } = &process_type { + if matches!(&result, BlockProcessingResult::Ok(_)) { + self.range_sync.resume_chains_awaiting_envelope( + *block_root, + &mut self.network, + ); + } + } + self.block_lookups + .on_processing_result(process_type, result, &mut self.network) + } SyncMessage::GossipBlockProcessResult { block_root, imported, @@ -985,6 +1001,23 @@ impl SyncManager { ), SyncMessage::BatchProcessed { sync_type, result } => match sync_type { ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => { + // If the batch failed due to a missing parent envelope, trigger + // an envelope lookup before pausing the chain. + if let BatchProcessResult::ParentEnvelopeUnknown { parent_root } = &result { + let peers: Vec<_> = self + .network + .network_globals() + .peers + .read() + .synced_peers() + .cloned() + .collect(); + let _ = self.block_lookups.search_parent_envelope_of_child( + *parent_root, + &peers, + &mut self.network, + ); + } self.range_sync.handle_block_process_result( &mut self.network, chain_id, diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index d533d8ed0d..bd2c7af385 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -156,6 +156,8 @@ pub enum ChainSyncingState { Stopped, /// The chain is undergoing syncing. Syncing, + /// The chain is paused waiting for a parent envelope to be fetched. + AwaitingEnvelope { parent_root: Hash256 }, } impl SyncingChain { @@ -639,6 +641,19 @@ impl SyncingChain { // Simply re-download all batches in `AwaitingDownload` state. self.attempt_send_awaiting_download_batches(network, "non-faulty-failure") } + BatchProcessResult::ParentEnvelopeUnknown { parent_root } => { + batch.processing_completed(BatchProcessingResult::NonFaultyFailure)?; + + // Pause the chain until the missing parent envelope is fetched. + debug!( + ?parent_root, + "Chain paused: awaiting parent envelope" + ); + self.state = ChainSyncingState::AwaitingEnvelope { + parent_root: *parent_root, + }; + Ok(KeepChain) + } } } @@ -1175,7 +1190,7 @@ impl SyncingChain { pub fn is_syncing(&self) -> bool { match self.state { ChainSyncingState::Syncing => true, - ChainSyncingState::Stopped => false, + ChainSyncingState::Stopped | ChainSyncingState::AwaitingEnvelope { .. } => false, } } diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index a087fdecdf..7a642f1e02 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -3,7 +3,7 @@ //! Each chain type is stored in it's own map. A variety of helper functions are given along with //! this struct to simplify the logic of the other layers of sync. -use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain}; +use super::chain::{ChainId, ChainSyncingState, ProcessingResult, RemoveChain, SyncingChain}; use super::sync_type::RangeSyncType; use crate::metrics; use crate::sync::batch::BatchMetricsState; @@ -562,6 +562,30 @@ impl ChainCollection { } } + /// Resume any chains that were paused waiting for the given parent envelope. + pub fn resume_chains_awaiting_envelope( + &mut self, + parent_root: Hash256, + network: &mut SyncNetworkContext, + ) { + for chain in self + .finalized_chains + .values_mut() + .chain(self.head_chains.values_mut()) + { + if chain.state + == (ChainSyncingState::AwaitingEnvelope { parent_root }) + { + debug!( + ?parent_root, + "Resuming chain after parent envelope received" + ); + chain.state = ChainSyncingState::Syncing; + let _ = chain.resume(network); + } + } + } + fn update_metrics(&self) { metrics::set_gauge_vec( &metrics::SYNCING_CHAINS_COUNT, diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 6509ac3cb3..80915df86a 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -266,6 +266,15 @@ where } } + /// Resume any chains that were paused waiting for the given parent envelope. + pub fn resume_chains_awaiting_envelope( + &mut self, + parent_root: Hash256, + network: &mut SyncNetworkContext, + ) { + self.chains.resume_chains_awaiting_envelope(parent_root, network); + } + /// A peer has disconnected. This removes the peer from any ongoing chains and mappings. A /// disconnected peer could remove a chain pub fn peer_disconnect(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) {