Complete backfill restart

This commit is contained in:
dapplion
2025-04-10 19:51:35 -03:00
parent 630c79d110
commit b18bb4e8d0
9 changed files with 125 additions and 19 deletions

View File

@@ -36,6 +36,8 @@ pub enum HistoricalBlockError {
IndexOutOfBounds,
/// Internal store error
StoreError(StoreError),
/// Internal error
InternalError(String),
}
impl From<StoreError> for HistoricalBlockError {
@@ -45,6 +47,37 @@ impl From<StoreError> for HistoricalBlockError {
}
impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn reset_anchor_oldest_block(
&self,
new_oldest_block_slot: Slot,
) -> Result<(), HistoricalBlockError> {
let prev_anchor = self.store.get_anchor_info();
if new_oldest_block_slot > prev_anchor.oldest_block_slot {
let new_oldest_parent_root = self
.block_root_at_slot(new_oldest_block_slot, crate::WhenSlotSkipped::Prev)
.map_err(|e| {
HistoricalBlockError::InternalError(format!(
"Error reading block root at slot: {e:?}"
))
})?
// The block at `new_oldest_block_slot` must already be imported since it's gte
// current `oldest_block_slot`.
.ok_or(HistoricalBlockError::InternalError(format!(
"Missing historical block root at slot {new_oldest_block_slot}"
)))?;
let new_anchor = prev_anchor
.as_increased_oldest_block(new_oldest_block_slot, new_oldest_parent_root);
self.store
.compare_and_set_anchor_info_with_write(prev_anchor, new_anchor)?;
debug!(%new_oldest_block_slot, ?new_oldest_parent_root, "Mutated anchor info to advance oldest block");
} else {
// This batch can be imported, no need to update anchor
}
Ok(())
}
/// Store a batch of historical blocks in the database.
///
/// The `blocks` should be given in slot-ascending order. One of the blocks should have a block

View File

@@ -592,6 +592,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self: &Arc<Self>,
process_id: ChainSegmentProcessId,
blocks: Vec<RpcBlock<T::EthSpec>>,
reset_anchor_new_oldest_block_slot: Option<Slot>,
) -> Result<(), Error<T::EthSpec>> {
let is_backfill = matches!(&process_id, ChainSegmentProcessId::BackSyncBatchId { .. });
debug!(blocks = blocks.len(), id = ?process_id, "Batch sending for process");
@@ -609,7 +610,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
NotifyExecutionLayer::Yes
};
processor
.process_chain_segment(process_id, blocks, notify_execution_layer)
.process_chain_segment(
process_id,
blocks,
notify_execution_layer,
reset_anchor_new_oldest_block_slot,
)
.await;
};
let process_fn = Box::pin(process_fn);

View File

@@ -25,7 +25,7 @@ use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
use types::beacon_block_body::format_kzg_commitments;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{BlockImportSource, DataColumnSidecar, DataColumnSidecarList, Epoch, Hash256};
use types::{BlockImportSource, DataColumnSidecar, DataColumnSidecarList, Epoch, Hash256, Slot};
/// Id associated to a batch processing request, either a sync batch or a parent lookup.
#[derive(Clone, Debug, PartialEq)]
@@ -438,6 +438,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
sync_type: ChainSegmentProcessId,
downloaded_blocks: Vec<RpcBlock<T::EthSpec>>,
notify_execution_layer: NotifyExecutionLayer,
reset_anchor_new_oldest_block_slot: Option<Slot>,
) {
let result = match sync_type {
// this a request from the range sync
@@ -498,7 +499,9 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.map(|wrapped| wrapped.n_data_columns())
.sum::<usize>();
match self.process_backfill_blocks(downloaded_blocks) {
match self
.process_backfill_blocks(downloaded_blocks, reset_anchor_new_oldest_block_slot)
{
(imported_blocks, Ok(_)) => {
debug!(
batch_epoch = %epoch,
@@ -586,6 +589,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
fn process_backfill_blocks(
&self,
downloaded_blocks: Vec<RpcBlock<T::EthSpec>>,
reset_anchor_new_oldest_block_slot: Option<Slot>,
) -> (usize, Result<(), ChainSegmentFailed>) {
let total_blocks = downloaded_blocks.len();
let available_blocks = match self
@@ -636,6 +640,23 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
}
// TODO(das): If `reset_anchor_new_oldest_block_slot` does not get set for some reason,
// backfill sync will continue as usual but importing blocks from the previous start,
// leaving a CGC gap in the DB. I would like to have stronger assurances that this is
// working as expected. The issue is the `blocks_to_import` filtered vec in
// `import_historical_block_batch`.
if let Some(new_oldest_block_slot) = reset_anchor_new_oldest_block_slot {
if let Err(e) = self.chain.reset_anchor_oldest_block(new_oldest_block_slot) {
return (
0,
Err(ChainSegmentFailed {
peer_action: None,
message: format!("Failed to reset anchor oldest block: {e:?}"),
}),
);
}
}
match self.chain.import_historical_block_batch(available_blocks) {
Ok(imported_blocks) => {
metrics::inc_counter(
@@ -690,6 +711,11 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
warn!(error = ?e, "Backfill batch processing error");
// This is an internal error, don't penalize the peer.
None
}
HistoricalBlockError::InternalError(e) => {
warn!(error = e, "Backfill batch processing error");
// This is an internal error, don't penalize the peer.
None
} //
// Do not use a fallback match, handle all errors explicitly
};

View File

@@ -28,7 +28,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, error, info_span, trace, warn, Instrument};
use types::{BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, SignedBeaconBlock, Slot};
use types::{BlobSidecar, DataColumnSidecar, Epoch, EthSpec, ForkContext, SignedBeaconBlock};
/// Handles messages from the network and routes them to the appropriate service to be handled.
pub struct Router<T: BeaconChainTypes> {
@@ -76,7 +76,7 @@ pub enum RouterMessage<E: EthSpec> {
/// The peer manager has requested we re-status a peer.
StatusPeer(PeerId),
/// Trigger backfill sync restart
BackfillSyncRestart(Slot),
BackfillSyncRestart(Epoch),
}
impl<T: BeaconChainTypes> Router<T> {
@@ -183,8 +183,8 @@ impl<T: BeaconChainTypes> Router<T> {
RouterMessage::PubsubMessage(id, peer_id, gossip, should_process) => {
self.handle_gossip(id, peer_id, gossip, should_process);
}
RouterMessage::BackfillSyncRestart(slot) => {
self.send_to_sync(SyncMessage::BackfillSyncRestart(slot));
RouterMessage::BackfillSyncRestart(epoch) => {
self.send_to_sync(SyncMessage::BackfillSyncRestart(epoch));
}
}
}

View File

@@ -1098,7 +1098,9 @@ impl<T: BeaconChainTypes> NetworkService<T> {
// `finalized_slot`.
self.network_globals
.prune_cgc_updates_older_than(finalized_slot);
self.send_to_router(RouterMessage::BackfillSyncRestart(finalized_slot));
self.send_to_router(RouterMessage::BackfillSyncRestart(
finalized_slot.epoch(T::EthSpec::slots_per_epoch()),
));
info!(slot = %finalized_slot, "Restarting backfill sync to fetch custody columns");
metrics::inc_counter(&metrics::BACKFILL_RESTARTED_FOR_CGC);

View File

@@ -108,6 +108,10 @@ pub struct BackFillSync<T: BeaconChainTypes> {
/// This only gets refreshed from the beacon chain if we enter a failed state.
current_start: BatchId,
/// If Some it will reset the anchor oldest block pointer to this epoch. Used in PeerDAS to
/// restart backfill over a segment of blocks already imported.
restart_epoch: Option<Epoch>,
/// Starting epoch of the batch that needs to be processed next.
/// This is incremented as the chain advances.
processing_target: BatchId,
@@ -179,6 +183,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
active_requests: HashMap::new(),
processing_target: current_start,
current_start,
restart_epoch: None,
last_batch_downloaded: false,
to_be_downloaded: current_start,
network_globals,
@@ -219,7 +224,13 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
pub fn restart(
&mut self,
network: &mut SyncNetworkContext<T>,
new_start: Epoch,
) -> Result<SyncStart, BackFillError> {
self.current_start = new_start;
self.processing_target = new_start;
self.to_be_downloaded = new_start;
self.restart_epoch = Some(new_start);
match self.state() {
// Reset and start again
BackFillState::Syncing => {
@@ -593,10 +604,23 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
let process_id = ChainSegmentProcessId::BackSyncBatchId(batch_id);
self.current_processing_batch = Some(batch_id);
if let Err(e) = network
.beacon_processor()
.send_chain_segment(process_id, blocks)
{
// TODO(das): This mechanism can fail silently. But at the same time we don't want to keep
// re-writing the anchor everytime. It must happen once.
let reset_anchor_new_oldest_block_slot = if let Some(restart_epoch) = self.restart_epoch {
if restart_epoch == batch_id {
Some(restart_epoch.start_slot(T::EthSpec::slots_per_epoch()))
} else {
None
}
} else {
None
};
if let Err(e) = network.beacon_processor().send_chain_segment(
process_id,
blocks,
reset_anchor_new_oldest_block_slot,
) {
crit!(
msg = "process_batch",
error = %e,

View File

@@ -71,7 +71,7 @@ use std::time::Duration;
use tokio::sync::mpsc;
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use types::{
BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot,
BlobSidecar, DataColumnSidecar, Epoch, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot,
};
#[cfg(test)]
@@ -178,8 +178,9 @@ pub enum SyncMessage<E: EthSpec> {
/// A block from gossip has completed processing,
GossipBlockProcessResult { block_root: Hash256, imported: bool },
/// Network service asks backfill sync to restart after increasing the oldest_block_slot
BackfillSyncRestart(Slot),
/// Network service asks backfill sync to restart after increasing the oldest_block_slot. Must
/// start fetching batches from `epoch`.
BackfillSyncRestart(Epoch),
}
/// The type of processing specified for a received block.
@@ -899,11 +900,11 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.on_sampling_result(requester, result)
}
}
SyncMessage::BackfillSyncRestart(slot) => {
if let Err(e) = self.backfill_sync.restart(&mut self.network) {
SyncMessage::BackfillSyncRestart(start_epoch) => {
if let Err(e) = self.backfill_sync.restart(&mut self.network, start_epoch) {
error!(error = ?e, "Error on backfill sync restart");
} else {
debug!(%slot, "Received backfill sync restart event");
debug!(%start_epoch, "Received backfill sync restart event");
}
}
}

View File

@@ -333,7 +333,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id);
self.current_processing_batch = Some(batch_id);
if let Err(e) = beacon_processor.send_chain_segment(process_id, blocks) {
if let Err(e) = beacon_processor.send_chain_segment(process_id, blocks, None) {
crit!(msg = "process_batch",error = %e, batch = ?self.processing_target, "Failed to send chain segment to processor.");
// This is unlikely to happen but it would stall syncing since the batch now has no
// blocks to continue, and the chain is expecting a processing result that won't

View File

@@ -177,6 +177,20 @@ impl AnchorInfo {
pub fn full_state_pruning_enabled(&self) -> bool {
self.state_lower_limit == 0 && self.state_upper_limit == STATE_UPPER_LIMIT_NO_RETAIN
}
pub fn as_increased_oldest_block(
&self,
oldest_block_slot: Slot,
oldest_block_parent: Hash256,
) -> Self {
Self {
anchor_slot: self.anchor_slot,
oldest_block_slot,
oldest_block_parent,
state_upper_limit: self.state_upper_limit,
state_lower_limit: self.state_lower_limit,
}
}
}
impl StoreItem for AnchorInfo {