diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index ee51964910..db3e4058b2 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -36,6 +36,8 @@ pub enum HistoricalBlockError { IndexOutOfBounds, /// Internal store error StoreError(StoreError), + /// Internal error + InternalError(String), } impl From for HistoricalBlockError { @@ -45,6 +47,37 @@ impl From for HistoricalBlockError { } impl BeaconChain { + pub fn reset_anchor_oldest_block( + &self, + new_oldest_block_slot: Slot, + ) -> Result<(), HistoricalBlockError> { + let prev_anchor = self.store.get_anchor_info(); + + if new_oldest_block_slot > prev_anchor.oldest_block_slot { + let new_oldest_parent_root = self + .block_root_at_slot(new_oldest_block_slot, crate::WhenSlotSkipped::Prev) + .map_err(|e| { + HistoricalBlockError::InternalError(format!( + "Error reading block root at slot: {e:?}" + )) + })? + // The block at `new_oldest_block_slot` must already be imported since it's gte + // current `oldest_block_slot`. + .ok_or(HistoricalBlockError::InternalError(format!( + "Missing historical block root at slot {new_oldest_block_slot}" + )))?; + let new_anchor = prev_anchor + .as_increased_oldest_block(new_oldest_block_slot, new_oldest_parent_root); + self.store + .compare_and_set_anchor_info_with_write(prev_anchor, new_anchor)?; + debug!(%new_oldest_block_slot, ?new_oldest_parent_root, "Mutated anchor info to advance oldest block"); + } else { + // This batch can be imported, no need to update anchor + } + + Ok(()) + } + /// Store a batch of historical blocks in the database. /// /// The `blocks` should be given in slot-ascending order. One of the blocks should have a block diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index b74a5acf6a..ee9ecf013e 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -592,6 +592,7 @@ impl NetworkBeaconProcessor { self: &Arc, process_id: ChainSegmentProcessId, blocks: Vec>, + reset_anchor_new_oldest_block_slot: Option, ) -> Result<(), Error> { let is_backfill = matches!(&process_id, ChainSegmentProcessId::BackSyncBatchId { .. }); debug!(blocks = blocks.len(), id = ?process_id, "Batch sending for process"); @@ -609,7 +610,12 @@ impl NetworkBeaconProcessor { NotifyExecutionLayer::Yes }; processor - .process_chain_segment(process_id, blocks, notify_execution_layer) + .process_chain_segment( + process_id, + blocks, + notify_execution_layer, + reset_anchor_new_oldest_block_slot, + ) .await; }; let process_fn = Box::pin(process_fn); diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index a8779fb8c7..f9e6f909fa 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -25,7 +25,7 @@ use tokio::sync::mpsc; use tracing::{debug, error, info, warn}; use types::beacon_block_body::format_kzg_commitments; use types::blob_sidecar::FixedBlobSidecarList; -use types::{BlockImportSource, DataColumnSidecar, DataColumnSidecarList, Epoch, Hash256}; +use types::{BlockImportSource, DataColumnSidecar, DataColumnSidecarList, Epoch, Hash256, Slot}; /// Id associated to a batch processing request, either a sync batch or a parent lookup. #[derive(Clone, Debug, PartialEq)] @@ -438,6 +438,7 @@ impl NetworkBeaconProcessor { sync_type: ChainSegmentProcessId, downloaded_blocks: Vec>, notify_execution_layer: NotifyExecutionLayer, + reset_anchor_new_oldest_block_slot: Option, ) { let result = match sync_type { // this a request from the range sync @@ -498,7 +499,9 @@ impl NetworkBeaconProcessor { .map(|wrapped| wrapped.n_data_columns()) .sum::(); - match self.process_backfill_blocks(downloaded_blocks) { + match self + .process_backfill_blocks(downloaded_blocks, reset_anchor_new_oldest_block_slot) + { (imported_blocks, Ok(_)) => { debug!( batch_epoch = %epoch, @@ -586,6 +589,7 @@ impl NetworkBeaconProcessor { fn process_backfill_blocks( &self, downloaded_blocks: Vec>, + reset_anchor_new_oldest_block_slot: Option, ) -> (usize, Result<(), ChainSegmentFailed>) { let total_blocks = downloaded_blocks.len(); let available_blocks = match self @@ -636,6 +640,23 @@ impl NetworkBeaconProcessor { ); } + // TODO(das): If `reset_anchor_new_oldest_block_slot` does not get set for some reason, + // backfill sync will continue as usual but importing blocks from the previous start, + // leaving a CGC gap in the DB. I would like to have stronger assurances that this is + // working as expected. The issue is the `blocks_to_import` filtered vec in + // `import_historical_block_batch`. + if let Some(new_oldest_block_slot) = reset_anchor_new_oldest_block_slot { + if let Err(e) = self.chain.reset_anchor_oldest_block(new_oldest_block_slot) { + return ( + 0, + Err(ChainSegmentFailed { + peer_action: None, + message: format!("Failed to reset anchor oldest block: {e:?}"), + }), + ); + } + } + match self.chain.import_historical_block_batch(available_blocks) { Ok(imported_blocks) => { metrics::inc_counter( @@ -690,6 +711,11 @@ impl NetworkBeaconProcessor { warn!(error = ?e, "Backfill batch processing error"); // This is an internal error, don't penalize the peer. None + } + HistoricalBlockError::InternalError(e) => { + warn!(error = e, "Backfill batch processing error"); + // This is an internal error, don't penalize the peer. + None } // // Do not use a fallback match, handle all errors explicitly }; diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 3683046e18..1a6da74100 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -28,7 +28,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, error, info_span, trace, warn, Instrument}; -use types::{BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, SignedBeaconBlock, Slot}; +use types::{BlobSidecar, DataColumnSidecar, Epoch, EthSpec, ForkContext, SignedBeaconBlock}; /// Handles messages from the network and routes them to the appropriate service to be handled. pub struct Router { @@ -76,7 +76,7 @@ pub enum RouterMessage { /// The peer manager has requested we re-status a peer. StatusPeer(PeerId), /// Trigger backfill sync restart - BackfillSyncRestart(Slot), + BackfillSyncRestart(Epoch), } impl Router { @@ -183,8 +183,8 @@ impl Router { RouterMessage::PubsubMessage(id, peer_id, gossip, should_process) => { self.handle_gossip(id, peer_id, gossip, should_process); } - RouterMessage::BackfillSyncRestart(slot) => { - self.send_to_sync(SyncMessage::BackfillSyncRestart(slot)); + RouterMessage::BackfillSyncRestart(epoch) => { + self.send_to_sync(SyncMessage::BackfillSyncRestart(epoch)); } } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index eb3ccca462..c6a7967b0b 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -1098,7 +1098,9 @@ impl NetworkService { // `finalized_slot`. self.network_globals .prune_cgc_updates_older_than(finalized_slot); - self.send_to_router(RouterMessage::BackfillSyncRestart(finalized_slot)); + self.send_to_router(RouterMessage::BackfillSyncRestart( + finalized_slot.epoch(T::EthSpec::slots_per_epoch()), + )); info!(slot = %finalized_slot, "Restarting backfill sync to fetch custody columns"); metrics::inc_counter(&metrics::BACKFILL_RESTARTED_FOR_CGC); diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 012df722ed..e3737c58f9 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -108,6 +108,10 @@ pub struct BackFillSync { /// This only gets refreshed from the beacon chain if we enter a failed state. current_start: BatchId, + /// If Some it will reset the anchor oldest block pointer to this epoch. Used in PeerDAS to + /// restart backfill over a segment of blocks already imported. + restart_epoch: Option, + /// Starting epoch of the batch that needs to be processed next. /// This is incremented as the chain advances. processing_target: BatchId, @@ -179,6 +183,7 @@ impl BackFillSync { active_requests: HashMap::new(), processing_target: current_start, current_start, + restart_epoch: None, last_batch_downloaded: false, to_be_downloaded: current_start, network_globals, @@ -219,7 +224,13 @@ impl BackFillSync { pub fn restart( &mut self, network: &mut SyncNetworkContext, + new_start: Epoch, ) -> Result { + self.current_start = new_start; + self.processing_target = new_start; + self.to_be_downloaded = new_start; + self.restart_epoch = Some(new_start); + match self.state() { // Reset and start again BackFillState::Syncing => { @@ -593,10 +604,23 @@ impl BackFillSync { let process_id = ChainSegmentProcessId::BackSyncBatchId(batch_id); self.current_processing_batch = Some(batch_id); - if let Err(e) = network - .beacon_processor() - .send_chain_segment(process_id, blocks) - { + // TODO(das): This mechanism can fail silently. But at the same time we don't want to keep + // re-writing the anchor everytime. It must happen once. + let reset_anchor_new_oldest_block_slot = if let Some(restart_epoch) = self.restart_epoch { + if restart_epoch == batch_id { + Some(restart_epoch.start_slot(T::EthSpec::slots_per_epoch())) + } else { + None + } + } else { + None + }; + + if let Err(e) = network.beacon_processor().send_chain_segment( + process_id, + blocks, + reset_anchor_new_oldest_block_slot, + ) { crit!( msg = "process_batch", error = %e, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 41f86745e1..1fd01ed556 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -71,7 +71,7 @@ use std::time::Duration; use tokio::sync::mpsc; use tracing::{debug, error, info, info_span, trace, warn, Instrument}; use types::{ - BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot, + BlobSidecar, DataColumnSidecar, Epoch, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot, }; #[cfg(test)] @@ -178,8 +178,9 @@ pub enum SyncMessage { /// A block from gossip has completed processing, GossipBlockProcessResult { block_root: Hash256, imported: bool }, - /// Network service asks backfill sync to restart after increasing the oldest_block_slot - BackfillSyncRestart(Slot), + /// Network service asks backfill sync to restart after increasing the oldest_block_slot. Must + /// start fetching batches from `epoch`. + BackfillSyncRestart(Epoch), } /// The type of processing specified for a received block. @@ -899,11 +900,11 @@ impl SyncManager { self.on_sampling_result(requester, result) } } - SyncMessage::BackfillSyncRestart(slot) => { - if let Err(e) = self.backfill_sync.restart(&mut self.network) { + SyncMessage::BackfillSyncRestart(start_epoch) => { + if let Err(e) = self.backfill_sync.restart(&mut self.network, start_epoch) { error!(error = ?e, "Error on backfill sync restart"); } else { - debug!(%slot, "Received backfill sync restart event"); + debug!(%start_epoch, "Received backfill sync restart event"); } } } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 767630804d..1681336833 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -333,7 +333,7 @@ impl SyncingChain { let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id); self.current_processing_batch = Some(batch_id); - if let Err(e) = beacon_processor.send_chain_segment(process_id, blocks) { + if let Err(e) = beacon_processor.send_chain_segment(process_id, blocks, None) { crit!(msg = "process_batch",error = %e, batch = ?self.processing_target, "Failed to send chain segment to processor."); // This is unlikely to happen but it would stall syncing since the batch now has no // blocks to continue, and the chain is expecting a processing result that won't diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index fa7e475ea2..3369dae897 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -177,6 +177,20 @@ impl AnchorInfo { pub fn full_state_pruning_enabled(&self) -> bool { self.state_lower_limit == 0 && self.state_upper_limit == STATE_UPPER_LIMIT_NO_RETAIN } + + pub fn as_increased_oldest_block( + &self, + oldest_block_slot: Slot, + oldest_block_parent: Hash256, + ) -> Self { + Self { + anchor_slot: self.anchor_slot, + oldest_block_slot, + oldest_block_parent, + state_upper_limit: self.state_upper_limit, + state_lower_limit: self.state_lower_limit, + } + } } impl StoreItem for AnchorInfo {