Temp dart throws

This commit is contained in:
Eitan Seri- Levi
2026-04-05 02:15:57 -07:00
parent 0d36ee0fbe
commit c8f69b5a73
6 changed files with 129 additions and 22 deletions

View File

@@ -41,6 +41,7 @@ pub enum ChainSegmentProcessId {
BackSyncBatchId(Epoch),
}
/// Returned when a chain segment import fails.
/// Returned when a chain segment import fails.
struct ChainSegmentFailed {
/// To be displayed in logs.
@@ -49,6 +50,13 @@ struct ChainSegmentFailed {
peer_action: Option<PeerAction>,
}
/// Result of processing a batch of blocks.
enum BlockBatchResult {
Ok { imported_blocks: usize },
ParentEnvelopeUnknown { parent_root: Hash256 },
Err { imported_blocks: usize, failed: Option<ChainSegmentFailed> },
}
impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Returns an async closure which processes a beacon block received via RPC.
///
@@ -633,7 +641,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.process_blocks(downloaded_blocks.iter(), notify_execution_layer)
.await
{
(imported_blocks, Ok(_)) => {
BlockBatchResult::Ok { imported_blocks } => {
debug!(
batch_epoch = %epoch,
first_block_slot = start_slot,
@@ -647,17 +655,27 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
imported_blocks,
}
}
(imported_blocks, Err(e)) => {
debug!(
batch_epoch = %epoch,
first_block_slot = start_slot,
chain = chain_id,
last_block_slot = end_slot,
imported_blocks,
error = %e.message,
service = "sync",
"Batch processing failed");
match e.peer_action {
BlockBatchResult::ParentEnvelopeUnknown { parent_root } => {
warn!(
batch_epoch = %epoch,
?parent_root,
"Batch processing paused: parent envelope unknown"
);
BatchProcessResult::ParentEnvelopeUnknown { parent_root }
}
BlockBatchResult::Err { imported_blocks, failed } => {
if let Some(e) = &failed {
debug!(
batch_epoch = %epoch,
first_block_slot = start_slot,
chain = chain_id,
last_block_slot = end_slot,
imported_blocks,
error = %e.message,
service = "sync",
"Batch processing failed");
}
match failed.and_then(|e| e.peer_action) {
Some(penalty) => BatchProcessResult::FaultyFailure {
imported_blocks,
penalty,
@@ -758,7 +776,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
&self,
downloaded_blocks: impl Iterator<Item = &'a RangeSyncBlock<T::EthSpec>>,
notify_execution_layer: NotifyExecutionLayer,
) -> (usize, Result<(), ChainSegmentFailed>) {
) -> BlockBatchResult {
let blocks: Vec<_> = downloaded_blocks.cloned().collect();
match self
.chain
@@ -770,18 +788,25 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
if !imported_blocks.is_empty() {
self.chain.recompute_head_at_current_slot().await;
}
(imported_blocks.len(), Ok(()))
BlockBatchResult::Ok { imported_blocks: imported_blocks.len() }
}
ChainSegmentResult::Failed {
imported_blocks,
error,
} => {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_FAILED_TOTAL);
let r = self.handle_failed_chain_segment(error);
if !imported_blocks.is_empty() {
self.chain.recompute_head_at_current_slot().await;
}
(imported_blocks.len(), r)
// Intercept ParentEnvelopeUnknown before normal error handling.
if let BlockError::ParentEnvelopeUnknown { parent_root } = error {
return BlockBatchResult::ParentEnvelopeUnknown { parent_root };
}
let r = self.handle_failed_chain_segment(error);
BlockBatchResult::Err {
imported_blocks: imported_blocks.len(),
failed: r.err(),
}
}
}
}

View File

@@ -672,7 +672,8 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
}
}
}
BatchProcessResult::NonFaultyFailure => {
BatchProcessResult::NonFaultyFailure
| BatchProcessResult::ParentEnvelopeUnknown { .. } => {
if let Err(e) = batch.processing_completed(BatchProcessingResult::NonFaultyFailure)
{
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?;

View File

@@ -233,6 +233,11 @@ pub enum BatchProcessResult {
penalty: PeerAction,
},
NonFaultyFailure,
/// The batch processing failed because the parent block's execution payload envelope
/// is not yet available. The chain should pause until the envelope is fetched.
ParentEnvelopeUnknown {
parent_root: Hash256,
},
}
/// The result of processing multiple data columns.
@@ -972,9 +977,20 @@ impl<T: BeaconChainTypes> SyncManager<T> {
SyncMessage::BlockComponentProcessed {
process_type,
result,
} => self
.block_lookups
.on_processing_result(process_type, result, &mut self.network),
} => {
// If a payload envelope was successfully imported, resume any range
// sync chains that were waiting for it.
if let BlockProcessType::SinglePayloadEnvelope { block_root, .. } = &process_type {
if matches!(&result, BlockProcessingResult::Ok(_)) {
self.range_sync.resume_chains_awaiting_envelope(
*block_root,
&mut self.network,
);
}
}
self.block_lookups
.on_processing_result(process_type, result, &mut self.network)
}
SyncMessage::GossipBlockProcessResult {
block_root,
imported,
@@ -985,6 +1001,23 @@ impl<T: BeaconChainTypes> SyncManager<T> {
),
SyncMessage::BatchProcessed { sync_type, result } => match sync_type {
ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => {
// If the batch failed due to a missing parent envelope, trigger
// an envelope lookup before pausing the chain.
if let BatchProcessResult::ParentEnvelopeUnknown { parent_root } = &result {
let peers: Vec<_> = self
.network
.network_globals()
.peers
.read()
.synced_peers()
.cloned()
.collect();
let _ = self.block_lookups.search_parent_envelope_of_child(
*parent_root,
&peers,
&mut self.network,
);
}
self.range_sync.handle_block_process_result(
&mut self.network,
chain_id,

View File

@@ -156,6 +156,8 @@ pub enum ChainSyncingState {
Stopped,
/// The chain is undergoing syncing.
Syncing,
/// The chain is paused waiting for a parent envelope to be fetched.
AwaitingEnvelope { parent_root: Hash256 },
}
impl<T: BeaconChainTypes> SyncingChain<T> {
@@ -639,6 +641,19 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// Simply re-download all batches in `AwaitingDownload` state.
self.attempt_send_awaiting_download_batches(network, "non-faulty-failure")
}
BatchProcessResult::ParentEnvelopeUnknown { parent_root } => {
batch.processing_completed(BatchProcessingResult::NonFaultyFailure)?;
// Pause the chain until the missing parent envelope is fetched.
debug!(
?parent_root,
"Chain paused: awaiting parent envelope"
);
self.state = ChainSyncingState::AwaitingEnvelope {
parent_root: *parent_root,
};
Ok(KeepChain)
}
}
}
@@ -1175,7 +1190,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
pub fn is_syncing(&self) -> bool {
match self.state {
ChainSyncingState::Syncing => true,
ChainSyncingState::Stopped => false,
ChainSyncingState::Stopped | ChainSyncingState::AwaitingEnvelope { .. } => false,
}
}

View File

@@ -3,7 +3,7 @@
//! Each chain type is stored in it's own map. A variety of helper functions are given along with
//! this struct to simplify the logic of the other layers of sync.
use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain};
use super::chain::{ChainId, ChainSyncingState, ProcessingResult, RemoveChain, SyncingChain};
use super::sync_type::RangeSyncType;
use crate::metrics;
use crate::sync::batch::BatchMetricsState;
@@ -562,6 +562,30 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
}
}
/// Resume any chains that were paused waiting for the given parent envelope.
pub fn resume_chains_awaiting_envelope(
&mut self,
parent_root: Hash256,
network: &mut SyncNetworkContext<T>,
) {
for chain in self
.finalized_chains
.values_mut()
.chain(self.head_chains.values_mut())
{
if chain.state
== (ChainSyncingState::AwaitingEnvelope { parent_root })
{
debug!(
?parent_root,
"Resuming chain after parent envelope received"
);
chain.state = ChainSyncingState::Syncing;
let _ = chain.resume(network);
}
}
}
fn update_metrics(&self) {
metrics::set_gauge_vec(
&metrics::SYNCING_CHAINS_COUNT,

View File

@@ -266,6 +266,15 @@ where
}
}
/// Resume any chains that were paused waiting for the given parent envelope.
pub fn resume_chains_awaiting_envelope(
&mut self,
parent_root: Hash256,
network: &mut SyncNetworkContext<T>,
) {
self.chains.resume_chains_awaiting_envelope(parent_root, network);
}
/// A peer has disconnected. This removes the peer from any ongoing chains and mappings. A
/// disconnected peer could remove a chain
pub fn peer_disconnect(&mut self, network: &mut SyncNetworkContext<T>, peer_id: &PeerId) {