Temp fixes

This commit is contained in:
Eitan Seri- Levi
2026-04-06 00:16:02 -07:00
parent c8f69b5a73
commit f7cf8fca8d
7 changed files with 69 additions and 132 deletions

View File

@@ -41,7 +41,6 @@ pub enum ChainSegmentProcessId {
BackSyncBatchId(Epoch),
}
/// Returned when a chain segment import fails.
/// Returned when a chain segment import fails.
struct ChainSegmentFailed {
/// To be displayed in logs.
@@ -50,13 +49,6 @@ struct ChainSegmentFailed {
peer_action: Option<PeerAction>,
}
/// Result of processing a batch of blocks.
enum BlockBatchResult {
Ok { imported_blocks: usize },
ParentEnvelopeUnknown { parent_root: Hash256 },
Err { imported_blocks: usize, failed: Option<ChainSegmentFailed> },
}
impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Returns an async closure which processes a beacon block received via RPC.
///
@@ -641,7 +633,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.process_blocks(downloaded_blocks.iter(), notify_execution_layer)
.await
{
BlockBatchResult::Ok { imported_blocks } => {
(imported_blocks, Ok(_)) => {
debug!(
batch_epoch = %epoch,
first_block_slot = start_slot,
@@ -655,27 +647,17 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
imported_blocks,
}
}
BlockBatchResult::ParentEnvelopeUnknown { parent_root } => {
warn!(
batch_epoch = %epoch,
?parent_root,
"Batch processing paused: parent envelope unknown"
);
BatchProcessResult::ParentEnvelopeUnknown { parent_root }
}
BlockBatchResult::Err { imported_blocks, failed } => {
if let Some(e) = &failed {
debug!(
batch_epoch = %epoch,
first_block_slot = start_slot,
chain = chain_id,
last_block_slot = end_slot,
imported_blocks,
error = %e.message,
service = "sync",
"Batch processing failed");
}
match failed.and_then(|e| e.peer_action) {
(imported_blocks, Err(e)) => {
debug!(
batch_epoch = %epoch,
first_block_slot = start_slot,
chain = chain_id,
last_block_slot = end_slot,
imported_blocks,
error = %e.message,
service = "sync",
"Batch processing failed");
match e.peer_action {
Some(penalty) => BatchProcessResult::FaultyFailure {
imported_blocks,
penalty,
@@ -776,7 +758,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
&self,
downloaded_blocks: impl Iterator<Item = &'a RangeSyncBlock<T::EthSpec>>,
notify_execution_layer: NotifyExecutionLayer,
) -> BlockBatchResult {
) -> (usize, Result<(), ChainSegmentFailed>) {
let blocks: Vec<_> = downloaded_blocks.cloned().collect();
match self
.chain
@@ -788,25 +770,18 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
if !imported_blocks.is_empty() {
self.chain.recompute_head_at_current_slot().await;
}
BlockBatchResult::Ok { imported_blocks: imported_blocks.len() }
(imported_blocks.len(), Ok(()))
}
ChainSegmentResult::Failed {
imported_blocks,
error,
} => {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_FAILED_TOTAL);
let r = self.handle_failed_chain_segment(error);
if !imported_blocks.is_empty() {
self.chain.recompute_head_at_current_slot().await;
}
// Intercept ParentEnvelopeUnknown before normal error handling.
if let BlockError::ParentEnvelopeUnknown { parent_root } = error {
return BlockBatchResult::ParentEnvelopeUnknown { parent_root };
}
let r = self.handle_failed_chain_segment(error);
BlockBatchResult::Err {
imported_blocks: imported_blocks.len(),
failed: r.err(),
}
(imported_blocks.len(), r)
}
}
}

View File

@@ -672,8 +672,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
}
}
}
BatchProcessResult::NonFaultyFailure
| BatchProcessResult::ParentEnvelopeUnknown { .. } => {
BatchProcessResult::NonFaultyFailure => {
if let Err(e) = batch.processing_completed(BatchProcessingResult::NonFaultyFailure)
{
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?;

View File

@@ -233,11 +233,6 @@ pub enum BatchProcessResult {
penalty: PeerAction,
},
NonFaultyFailure,
/// The batch processing failed because the parent block's execution payload envelope
/// is not yet available. The chain should pause until the envelope is fetched.
ParentEnvelopeUnknown {
parent_root: Hash256,
},
}
/// The result of processing multiple data columns.
@@ -977,20 +972,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
SyncMessage::BlockComponentProcessed {
process_type,
result,
} => {
// If a payload envelope was successfully imported, resume any range
// sync chains that were waiting for it.
if let BlockProcessType::SinglePayloadEnvelope { block_root, .. } = &process_type {
if matches!(&result, BlockProcessingResult::Ok(_)) {
self.range_sync.resume_chains_awaiting_envelope(
*block_root,
&mut self.network,
);
}
}
self.block_lookups
.on_processing_result(process_type, result, &mut self.network)
}
} => self
.block_lookups
.on_processing_result(process_type, result, &mut self.network),
SyncMessage::GossipBlockProcessResult {
block_root,
imported,
@@ -1001,23 +985,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
),
SyncMessage::BatchProcessed { sync_type, result } => match sync_type {
ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => {
// If the batch failed due to a missing parent envelope, trigger
// an envelope lookup before pausing the chain.
if let BatchProcessResult::ParentEnvelopeUnknown { parent_root } = &result {
let peers: Vec<_> = self
.network
.network_globals()
.peers
.read()
.synced_peers()
.cloned()
.collect();
let _ = self.block_lookups.search_parent_envelope_of_child(
*parent_root,
&peers,
&mut self.network,
);
}
self.range_sync.handle_block_process_result(
&mut self.network,
chain_id,

View File

@@ -156,8 +156,6 @@ pub enum ChainSyncingState {
Stopped,
/// The chain is undergoing syncing.
Syncing,
/// The chain is paused waiting for a parent envelope to be fetched.
AwaitingEnvelope { parent_root: Hash256 },
}
impl<T: BeaconChainTypes> SyncingChain<T> {
@@ -641,19 +639,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// Simply re-download all batches in `AwaitingDownload` state.
self.attempt_send_awaiting_download_batches(network, "non-faulty-failure")
}
BatchProcessResult::ParentEnvelopeUnknown { parent_root } => {
batch.processing_completed(BatchProcessingResult::NonFaultyFailure)?;
// Pause the chain until the missing parent envelope is fetched.
debug!(
?parent_root,
"Chain paused: awaiting parent envelope"
);
self.state = ChainSyncingState::AwaitingEnvelope {
parent_root: *parent_root,
};
Ok(KeepChain)
}
}
}
@@ -1190,7 +1175,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
pub fn is_syncing(&self) -> bool {
match self.state {
ChainSyncingState::Syncing => true,
ChainSyncingState::Stopped | ChainSyncingState::AwaitingEnvelope { .. } => false,
ChainSyncingState::Stopped => false,
}
}

View File

@@ -3,7 +3,7 @@
//! Each chain type is stored in it's own map. A variety of helper functions are given along with
//! this struct to simplify the logic of the other layers of sync.
use super::chain::{ChainId, ChainSyncingState, ProcessingResult, RemoveChain, SyncingChain};
use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain};
use super::sync_type::RangeSyncType;
use crate::metrics;
use crate::sync::batch::BatchMetricsState;
@@ -562,30 +562,6 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
}
}
/// Resume any chains that were paused waiting for the given parent envelope.
pub fn resume_chains_awaiting_envelope(
&mut self,
parent_root: Hash256,
network: &mut SyncNetworkContext<T>,
) {
for chain in self
.finalized_chains
.values_mut()
.chain(self.head_chains.values_mut())
{
if chain.state
== (ChainSyncingState::AwaitingEnvelope { parent_root })
{
debug!(
?parent_root,
"Resuming chain after parent envelope received"
);
chain.state = ChainSyncingState::Syncing;
let _ = chain.resume(network);
}
}
}
fn update_metrics(&self) {
metrics::set_gauge_vec(
&metrics::SYNCING_CHAINS_COUNT,

View File

@@ -266,15 +266,6 @@ where
}
}
/// Resume any chains that were paused waiting for the given parent envelope.
pub fn resume_chains_awaiting_envelope(
&mut self,
parent_root: Hash256,
network: &mut SyncNetworkContext<T>,
) {
self.chains.resume_chains_awaiting_envelope(parent_root, network);
}
/// A peer has disconnected. This removes the peer from any ongoing chains and mappings. A
/// disconnected peer could remove a chain
pub fn peer_disconnect(&mut self, network: &mut SyncNetworkContext<T>, peer_id: &PeerId) {