diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index c64ee2bc73..acbe5b51de 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -156,6 +156,10 @@ impl NetworkGlobals { .add_latest_update((update_start_slot, cgc)) } + pub fn prune_cgc_updates_older_than(&self, slot: Slot) { + self.cgc_updates.write().prune_updates_older_than(slot); + } + pub fn dump_cgc_updates(&self) -> CGCUpdates { self.cgc_updates.read().clone() } diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 7376244501..3683046e18 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -28,7 +28,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, error, info_span, trace, warn, Instrument}; -use types::{BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, SignedBeaconBlock}; +use types::{BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, SignedBeaconBlock, Slot}; /// Handles messages from the network and routes them to the appropriate service to be handled. pub struct Router { @@ -75,6 +75,8 @@ pub enum RouterMessage { PubsubMessage(MessageId, PeerId, PubsubMessage, bool), /// The peer manager has requested we re-status a peer. StatusPeer(PeerId), + /// Trigger backfill sync restart + BackfillSyncRestart(Slot), } impl Router { @@ -181,6 +183,9 @@ impl Router { RouterMessage::PubsubMessage(id, peer_id, gossip, should_process) => { self.handle_gossip(id, peer_id, gossip, should_process); } + RouterMessage::BackfillSyncRestart(slot) => { + self.send_to_sync(SyncMessage::BackfillSyncRestart(slot)); + } } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index c9871b40eb..0a65c75c4a 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -34,7 +34,7 @@ use tokio::sync::mpsc; use tokio::time::Sleep; use tracing::{debug, error, info, info_span, trace, warn, Instrument}; use types::{ - ChainSpec, Epoch, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, + ChainSpec, Epoch, EthSpec, ForkContext, ForkName, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId, Unsigned, ValidatorSubscription, }; @@ -850,6 +850,14 @@ impl NetworkService { } fn on_cgc_update_interval(&mut self) { + // Skip running this function if Fulu is not scheduled. But run it before the fork to start + // announcing the CGC ahead of the fork. + let fulu_fork_epoch = match self.beacon_chain.spec.fork_epoch(ForkName::Fulu) { + None => return, + Some(epoch) if epoch == Epoch::max_value() => return, + Some(epoch) => epoch, + }; + let prev_cgc = self.network_globals.custody_group_count(Slot::max_value()); let Ok(clock_epoch) = self.beacon_chain.epoch() else { return; @@ -973,33 +981,47 @@ impl NetworkService { // with CGC 128 // let oldest_block_slot = self.beacon_chain.store.get_anchor_info().oldest_block_slot; + // TODO(das): use min_epochs_for_data_columns + let last_pruned_epoch = clock_epoch.saturating_sub(Epoch::new( + self.beacon_chain.spec.min_epochs_for_blob_sidecars_requests, + )); + let last_pruned_slot = last_pruned_epoch.start_slot(T::EthSpec::slots_per_epoch()); + let fulu_fork_slot = fulu_fork_epoch.start_slot(T::EthSpec::slots_per_epoch()); + let oldest_relevant_slot = std::cmp::max( + oldest_block_slot, + std::cmp::max(last_pruned_slot, fulu_fork_slot), + ); + let finalized_slot = self.beacon_chain.finalized_slot(); - let cgc_at_oldest_block_slot = self.network_globals.custody_group_count(oldest_block_slot); + + let cgc_at_oldest_relevant_slot = self + .network_globals + .custody_group_count(oldest_relevant_slot); let cgc_at_finalized_slot = self.network_globals.custody_group_count(finalized_slot); + + let backfill_started_recently = + finalized_slot.saturating_sub(oldest_block_slot) < MAX_SLOT_DISTANCE_BACKFILL_RESTART; + let backfill_finished = oldest_block_slot == Slot::new(0); + // TODO(das): If we support a decreasing CGC we must consider the min value between this two // slots. - if cgc_at_oldest_block_slot < cgc_at_finalized_slot { - let backfill_started_recently = finalized_slot.saturating_sub(oldest_block_slot) - < MAX_SLOT_DISTANCE_BACKFILL_RESTART; - // Note: we don't check if backfill finished. If it did because we are close to genesis, - // we want to restart it anyway to backfill with the CGC. The only condition to NOT - // restart is if backfill went too far and thus we would waste too much bandwidth - // fetching the blocks again. - if backfill_started_recently { - // We need backfill sync to fetch batches with `CGC_f = cgc_at_finalized_slot`. Then - // `custody_group_count(oldest_block_slot) should now return `CGC_f`. So we have to - // delete the CGC updates with `update.slot < finalized_slot` - todo!(); - } + // + // Skip if backfill has finished. State reconstruction may have already started and we could + // mess with the DB. For real networks Fulu fork is way ahead of genesis so it won't affect + if cgc_at_oldest_relevant_slot < cgc_at_finalized_slot + && backfill_started_recently + && !backfill_finished + { + // We need backfill sync to fetch batches with `CGC_f = cgc_at_finalized_slot`. Then + // `custody_group_count(oldest_block_slot) should now return `CGC_f`. So we have to + // delete the CGC updates with `update.slot < finalized_slot` + self.network_globals + .prune_cgc_updates_older_than(finalized_slot); + self.send_to_router(RouterMessage::BackfillSyncRestart(finalized_slot)); } // Schedule an advertise CGC update for later - // TODO(das): use min_epochs_for_data_columns - let last_pruned_epoch = - clock_epoch - Epoch::new(self.beacon_chain.spec.min_epochs_for_blob_sidecars_requests); - let cgc_to_announce = self - .network_globals - .custody_group_count(last_pruned_epoch.start_slot(T::EthSpec::slots_per_epoch())); + let cgc_to_announce = cgc_at_oldest_relevant_slot; // update_enr_cgc updates the NetworkGlobals ENR match self.libp2p.update_enr_cgc(cgc_to_announce) { diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 509caf7316..012df722ed 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -208,6 +208,37 @@ impl BackFillSync { } } + /// Restarts backfill backfill sync clearing its state + #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] + #[instrument(parent = None, + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip_all + )] + pub fn restart( + &mut self, + network: &mut SyncNetworkContext, + ) -> Result { + match self.state() { + // Reset and start again + BackFillState::Syncing => { + self.reset_sync(); + self.set_state(BackFillState::Paused); + self.start(network) + } + // Reset, but keep paused + BackFillState::Paused => { + self.reset_sync(); + Ok(SyncStart::NotSyncing) + } + // Ignore a restart if completed + BackFillState::Completed => Ok(SyncStart::NotSyncing), + // Already reset, no need to do anything + BackFillState::Failed => Ok(SyncStart::NotSyncing), + } + } + /// Starts or resumes syncing. /// /// If resuming is successful, reports back the current syncing metrics. @@ -486,6 +517,24 @@ impl BackFillSync { // Set the state self.set_state(BackFillState::Failed); + self.reset_sync(); + + // Emit the log here + error!(?error, "Backfill sync failed"); + + // Return the error, kinda weird pattern, but I want to use + // `self.fail_chain(_)?` in other parts of the code. + Err(error) + } + + /// This resets past variables, to allow for a fresh start when resuming. + #[instrument(parent = None, + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip_all + )] + fn reset_sync(&mut self) { // Remove all batches and active requests and participating peers. self.batches.clear(); self.active_requests.clear(); @@ -499,13 +548,6 @@ impl BackFillSync { self.current_processing_batch = None; // NOTE: Lets keep validated_batches for posterity - - // Emit the log here - error!(?error, "Backfill sync failed"); - - // Return the error, kinda weird pattern, but I want to use - // `self.fail_chain(_)?` in other parts of the code. - Err(error) } /// Processes the batch with the given id. diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 9a48e9aa5d..41f86745e1 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -177,6 +177,9 @@ pub enum SyncMessage { /// A block from gossip has completed processing, GossipBlockProcessResult { block_root: Hash256, imported: bool }, + + /// Network service asks backfill sync to restart after increasing the oldest_block_slot + BackfillSyncRestart(Slot), } /// The type of processing specified for a received block. @@ -896,6 +899,13 @@ impl SyncManager { self.on_sampling_result(requester, result) } } + SyncMessage::BackfillSyncRestart(slot) => { + if let Err(e) = self.backfill_sync.restart(&mut self.network) { + error!(error = ?e, "Error on backfill sync restart"); + } else { + debug!(%slot, "Received backfill sync restart event"); + } + } } } diff --git a/consensus/types/src/custody.rs b/consensus/types/src/custody.rs index f038a8589d..8550290fd7 100644 --- a/consensus/types/src/custody.rs +++ b/consensus/types/src/custody.rs @@ -34,6 +34,10 @@ impl CGCUpdates { .map_err(|e| format!("Updates list full: {e:?}")) } + pub fn prune_updates_older_than(&mut self, slot: Slot) { + todo!("{slot}"); + } + pub fn iter(&self) -> impl Iterator + '_ { std::iter::once((Slot::new(0), self.initial_value)).chain(self.updates.iter().copied()) }