Implement backfill sync restart

This commit is contained in:
dapplion
2025-04-10 12:07:25 -03:00
parent f9d2c1d2d3
commit 8d9bcd966c
6 changed files with 116 additions and 29 deletions

View File

@@ -28,7 +28,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, error, info_span, trace, warn, Instrument};
use types::{BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, SignedBeaconBlock};
use types::{BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, SignedBeaconBlock, Slot};
/// Handles messages from the network and routes them to the appropriate service to be handled.
pub struct Router<T: BeaconChainTypes> {
@@ -75,6 +75,8 @@ pub enum RouterMessage<E: EthSpec> {
PubsubMessage(MessageId, PeerId, PubsubMessage<E>, bool),
/// The peer manager has requested we re-status a peer.
StatusPeer(PeerId),
/// Trigger backfill sync restart
BackfillSyncRestart(Slot),
}
impl<T: BeaconChainTypes> Router<T> {
@@ -181,6 +183,9 @@ impl<T: BeaconChainTypes> Router<T> {
RouterMessage::PubsubMessage(id, peer_id, gossip, should_process) => {
self.handle_gossip(id, peer_id, gossip, should_process);
}
RouterMessage::BackfillSyncRestart(slot) => {
self.send_to_sync(SyncMessage::BackfillSyncRestart(slot));
}
}
}

View File

@@ -34,7 +34,7 @@ use tokio::sync::mpsc;
use tokio::time::Sleep;
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use types::{
ChainSpec, Epoch, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription,
ChainSpec, Epoch, EthSpec, ForkContext, ForkName, Slot, SubnetId, SyncCommitteeSubscription,
SyncSubnetId, Unsigned, ValidatorSubscription,
};
@@ -850,6 +850,14 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
fn on_cgc_update_interval(&mut self) {
// Skip running this function if Fulu is not scheduled. But run it before the fork to start
// announcing the CGC ahead of the fork.
let fulu_fork_epoch = match self.beacon_chain.spec.fork_epoch(ForkName::Fulu) {
None => return,
Some(epoch) if epoch == Epoch::max_value() => return,
Some(epoch) => epoch,
};
let prev_cgc = self.network_globals.custody_group_count(Slot::max_value());
let Ok(clock_epoch) = self.beacon_chain.epoch() else {
return;
@@ -973,33 +981,47 @@ impl<T: BeaconChainTypes> NetworkService<T> {
// with CGC 128
//
let oldest_block_slot = self.beacon_chain.store.get_anchor_info().oldest_block_slot;
// TODO(das): use min_epochs_for_data_columns
let last_pruned_epoch = clock_epoch.saturating_sub(Epoch::new(
self.beacon_chain.spec.min_epochs_for_blob_sidecars_requests,
));
let last_pruned_slot = last_pruned_epoch.start_slot(T::EthSpec::slots_per_epoch());
let fulu_fork_slot = fulu_fork_epoch.start_slot(T::EthSpec::slots_per_epoch());
let oldest_relevant_slot = std::cmp::max(
oldest_block_slot,
std::cmp::max(last_pruned_slot, fulu_fork_slot),
);
let finalized_slot = self.beacon_chain.finalized_slot();
let cgc_at_oldest_block_slot = self.network_globals.custody_group_count(oldest_block_slot);
let cgc_at_oldest_relevant_slot = self
.network_globals
.custody_group_count(oldest_relevant_slot);
let cgc_at_finalized_slot = self.network_globals.custody_group_count(finalized_slot);
let backfill_started_recently =
finalized_slot.saturating_sub(oldest_block_slot) < MAX_SLOT_DISTANCE_BACKFILL_RESTART;
let backfill_finished = oldest_block_slot == Slot::new(0);
// TODO(das): If we support a decreasing CGC we must consider the min value between this two
// slots.
if cgc_at_oldest_block_slot < cgc_at_finalized_slot {
let backfill_started_recently = finalized_slot.saturating_sub(oldest_block_slot)
< MAX_SLOT_DISTANCE_BACKFILL_RESTART;
// Note: we don't check if backfill finished. If it did because we are close to genesis,
// we want to restart it anyway to backfill with the CGC. The only condition to NOT
// restart is if backfill went too far and thus we would waste too much bandwidth
// fetching the blocks again.
if backfill_started_recently {
// We need backfill sync to fetch batches with `CGC_f = cgc_at_finalized_slot`. Then
// `custody_group_count(oldest_block_slot) should now return `CGC_f`. So we have to
// delete the CGC updates with `update.slot < finalized_slot`
todo!();
}
//
// Skip if backfill has finished. State reconstruction may have already started and we could
// mess with the DB. For real networks Fulu fork is way ahead of genesis so it won't affect
if cgc_at_oldest_relevant_slot < cgc_at_finalized_slot
&& backfill_started_recently
&& !backfill_finished
{
// We need backfill sync to fetch batches with `CGC_f = cgc_at_finalized_slot`. Then
// `custody_group_count(oldest_block_slot) should now return `CGC_f`. So we have to
// delete the CGC updates with `update.slot < finalized_slot`
self.network_globals
.prune_cgc_updates_older_than(finalized_slot);
self.send_to_router(RouterMessage::BackfillSyncRestart(finalized_slot));
}
// Schedule an advertise CGC update for later
// TODO(das): use min_epochs_for_data_columns
let last_pruned_epoch =
clock_epoch - Epoch::new(self.beacon_chain.spec.min_epochs_for_blob_sidecars_requests);
let cgc_to_announce = self
.network_globals
.custody_group_count(last_pruned_epoch.start_slot(T::EthSpec::slots_per_epoch()));
let cgc_to_announce = cgc_at_oldest_relevant_slot;
// update_enr_cgc updates the NetworkGlobals ENR
match self.libp2p.update_enr_cgc(cgc_to_announce) {

View File

@@ -208,6 +208,37 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
}
}
/// Restarts backfill backfill sync clearing its state
#[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"]
#[instrument(parent = None,
level = "info",
fields(service = "backfill_sync"),
name = "backfill_sync",
skip_all
)]
pub fn restart(
&mut self,
network: &mut SyncNetworkContext<T>,
) -> Result<SyncStart, BackFillError> {
match self.state() {
// Reset and start again
BackFillState::Syncing => {
self.reset_sync();
self.set_state(BackFillState::Paused);
self.start(network)
}
// Reset, but keep paused
BackFillState::Paused => {
self.reset_sync();
Ok(SyncStart::NotSyncing)
}
// Ignore a restart if completed
BackFillState::Completed => Ok(SyncStart::NotSyncing),
// Already reset, no need to do anything
BackFillState::Failed => Ok(SyncStart::NotSyncing),
}
}
/// Starts or resumes syncing.
///
/// If resuming is successful, reports back the current syncing metrics.
@@ -486,6 +517,24 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
// Set the state
self.set_state(BackFillState::Failed);
self.reset_sync();
// Emit the log here
error!(?error, "Backfill sync failed");
// Return the error, kinda weird pattern, but I want to use
// `self.fail_chain(_)?` in other parts of the code.
Err(error)
}
/// This resets past variables, to allow for a fresh start when resuming.
#[instrument(parent = None,
level = "info",
fields(service = "backfill_sync"),
name = "backfill_sync",
skip_all
)]
fn reset_sync(&mut self) {
// Remove all batches and active requests and participating peers.
self.batches.clear();
self.active_requests.clear();
@@ -499,13 +548,6 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
self.current_processing_batch = None;
// NOTE: Lets keep validated_batches for posterity
// Emit the log here
error!(?error, "Backfill sync failed");
// Return the error, kinda weird pattern, but I want to use
// `self.fail_chain(_)?` in other parts of the code.
Err(error)
}
/// Processes the batch with the given id.

View File

@@ -177,6 +177,9 @@ pub enum SyncMessage<E: EthSpec> {
/// A block from gossip has completed processing,
GossipBlockProcessResult { block_root: Hash256, imported: bool },
/// Network service asks backfill sync to restart after increasing the oldest_block_slot
BackfillSyncRestart(Slot),
}
/// The type of processing specified for a received block.
@@ -896,6 +899,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.on_sampling_result(requester, result)
}
}
SyncMessage::BackfillSyncRestart(slot) => {
if let Err(e) = self.backfill_sync.restart(&mut self.network) {
error!(error = ?e, "Error on backfill sync restart");
} else {
debug!(%slot, "Received backfill sync restart event");
}
}
}
}