//! This module contains the logic for Lighthouse's backfill sync. //! //! This kind of sync occurs when a trusted state is provided to the client. The client //! will perform a [`RangeSync`] to the latest head from the trusted state, such that the //! client can perform its duties right away. Once completed, a backfill sync occurs, where all old //! blocks (from genesis) are downloaded in order to keep a consistent history. //! //! If a batch fails, the backfill sync cannot progress. In this scenario, we mark the backfill //! sync as failed, log an error and attempt to retry once a new peer joins the node. use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::batch::{ BatchConfig, BatchId, BatchInfo, BatchMetricsState, BatchOperationOutcome, BatchProcessingResult, BatchState, }; use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::manager::BatchProcessResult; use crate::sync::network_context::{ RangeRequestId, RpcRequestSendError, RpcResponseError, SyncNetworkContext, }; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes}; use lighthouse_network::service::api_types::Id; use lighthouse_network::types::{BackFillState, NetworkGlobals}; use lighthouse_network::{PeerAction, PeerId}; use logging::crit; use std::collections::hash_map::DefaultHasher; use std::collections::{ HashSet, btree_map::{BTreeMap, Entry}, }; use std::hash::{Hash, Hasher}; use std::marker::PhantomData; use std::sync::Arc; use strum::IntoEnumIterator; use tracing::{debug, error, info, warn}; use types::{ColumnIndex, Epoch, EthSpec}; /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of /// blocks per batch are requested _at most_. A batch may request less blocks to account for /// already requested slots. There is a timeout for each batch request. If this value is too high, /// we will negatively report peers with poor bandwidth. This can be set arbitrarily high, in which /// case the responder will fill the response up to the max request size, assuming they have the /// bandwidth to do so. pub const BACKFILL_EPOCHS_PER_BATCH: u64 = 1; /// The maximum number of batches to queue before requesting more. const BACKFILL_BATCH_BUFFER_SIZE: u8 = 5; /// The number of times to retry a batch before it is considered failed. const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 10; /// Invalid batches are attempted to be re-downloaded from other peers. If a batch cannot be processed /// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty. const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 10; type RpcBlocks = Vec>; type BackFillBatchInfo = BatchInfo, RpcBlocks>; type BackFillSyncBatches = BTreeMap>; /// Custom configuration for the batch object. struct BackFillBatchConfig { marker: PhantomData, } impl BatchConfig for BackFillBatchConfig { fn max_batch_download_attempts() -> u8 { MAX_BATCH_DOWNLOAD_ATTEMPTS } fn max_batch_processing_attempts() -> u8 { MAX_BATCH_PROCESSING_ATTEMPTS } fn batch_attempt_hash(data: &D) -> u64 { let mut hasher = DefaultHasher::new(); data.hash(&mut hasher); hasher.finish() } } /// Return type when attempting to start the backfill sync process. pub enum SyncStart { /// The chain started syncing or is already syncing. Syncing { /// The number of slots that have been processed so far. completed: usize, /// The number of slots still to be processed. remaining: usize, }, /// The chain didn't start syncing. NotSyncing, } /// A standard result from calling public functions on [`BackFillSync`]. pub enum ProcessResult { /// The call was successful. Successful, /// The call resulted in completing the backfill sync. SyncCompleted, } /// The ways a backfill sync can fail. // The info in the enum variants is displayed in logging, clippy thinks it's dead code. #[derive(Debug)] pub enum BackFillError { /// A batch failed to be downloaded. BatchDownloadFailed(#[allow(dead_code)] BatchId), /// A batch could not be processed. BatchProcessingFailed(#[allow(dead_code)] BatchId), /// A batch entered an invalid state. BatchInvalidState(#[allow(dead_code)] BatchId, #[allow(dead_code)] String), /// The sync algorithm entered an invalid state. InvalidSyncState(#[allow(dead_code)] String), /// The chain became paused. Paused, } pub struct BackFillSync { /// Keeps track of the current progress of the backfill. /// This only gets refreshed from the beacon chain if we enter a failed state. current_start: BatchId, /// Starting epoch of the batch that needs to be processed next. /// This is incremented as the chain advances. processing_target: BatchId, /// Starting epoch of the next batch that needs to be downloaded. to_be_downloaded: BatchId, /// Keeps track if we have requested the final batch. last_batch_downloaded: bool, /// Sorted map of batches undergoing some kind of processing. batches: BackFillSyncBatches, /// The current processing batch, if any. current_processing_batch: Option, /// Batches validated by this chain. validated_batches: u64, /// We keep track of peers that are participating in the backfill sync. Unlike RangeSync, /// BackFillSync uses all synced peers to download the chain from. If BackFillSync fails, we don't /// want to penalize all our synced peers, so we use this variable to keep track of peers that /// have participated and only penalize these peers if backfill sync fails. participating_peers: HashSet, /// When a backfill sync fails, we keep track of whether a new fully synced peer has joined. /// This signifies that we are able to attempt to restart a failed chain. restart_failed_sync: bool, /// Reference to the beacon chain to obtain initial starting points for the backfill sync. beacon_chain: Arc>, /// Reference to the network globals in order to obtain valid peers to backfill blocks from /// (i.e synced peers). network_globals: Arc>, } impl BackFillSync { pub fn new( beacon_chain: Arc>, network_globals: Arc>, ) -> Self { // Determine if backfill is enabled or not. // If, for some reason a backfill has already been completed (or we've used a trusted // genesis root) then backfill has been completed. let anchor_info = beacon_chain.store.get_anchor_info(); let (state, current_start) = if anchor_info.block_backfill_complete(beacon_chain.genesis_backfill_slot) { (BackFillState::Completed, Epoch::new(0)) } else { ( BackFillState::Paused, anchor_info .oldest_block_slot .epoch(T::EthSpec::slots_per_epoch()), ) }; let bfs = BackFillSync { batches: BTreeMap::new(), processing_target: current_start, current_start, last_batch_downloaded: false, to_be_downloaded: current_start, network_globals, current_processing_batch: None, validated_batches: 0, participating_peers: HashSet::new(), restart_failed_sync: false, beacon_chain, }; // Update the global network state with the current backfill state. bfs.set_state(state); bfs } /// Pauses the backfill sync if it's currently syncing. pub fn pause(&mut self) { if let BackFillState::Syncing = self.state() { debug!(processed_epochs = %self.validated_batches, to_be_processed = %self.current_start,"Backfill sync paused"); self.set_state(BackFillState::Paused); } } /// Starts or resumes syncing. /// /// If resuming is successful, reports back the current syncing metrics. #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] pub fn start( &mut self, network: &mut SyncNetworkContext, ) -> Result { match self.state() { BackFillState::Syncing => {} // already syncing ignore. BackFillState::Paused => { if self .network_globals .peers .read() .synced_peers_for_epoch(self.to_be_downloaded) .next() .is_some() // backfill can't progress if we do not have peers in the required subnets post peerdas. && self.good_peers_on_sampling_subnets(self.to_be_downloaded, network) { // If there are peers to resume with, begin the resume. debug!(start_epoch = ?self.current_start, awaiting_batches = self.batches.len(), processing_target = ?self.processing_target, "Resuming backfill sync"); self.set_state(BackFillState::Syncing); // Resume any previously failed batches. self.resume_batches(network)?; // begin requesting blocks from the peer pool, until all peers are exhausted. self.request_batches(network)?; // start processing batches if needed self.process_completed_batches(network)?; } else { return Ok(SyncStart::NotSyncing); } } BackFillState::Failed => { // Attempt to recover from a failed sync. All local variables should be reset and // cleared already for a fresh start. // We only attempt to restart a failed backfill sync if a new synced peer has been // added. if !self.restart_failed_sync { return Ok(SyncStart::NotSyncing); } self.set_state(BackFillState::Syncing); // Obtain a new start slot, from the beacon chain and handle possible errors. if let Err(e) = self.reset_start_epoch() { // This infallible match exists to force us to update this code if a future // refactor of `ResetEpochError` adds a variant. let ResetEpochError::SyncCompleted = e; error!("Backfill sync completed whilst in failed status"); self.set_state(BackFillState::Completed); return Err(BackFillError::InvalidSyncState(String::from( "chain completed", ))); } debug!(start_epoch = %self.current_start, "Resuming a failed backfill sync"); // begin requesting blocks from the peer pool, until all peers are exhausted. self.request_batches(network)?; } BackFillState::Completed => return Ok(SyncStart::NotSyncing), } Ok(SyncStart::Syncing { completed: (self.validated_batches * BACKFILL_EPOCHS_PER_BATCH * T::EthSpec::slots_per_epoch()) as usize, remaining: self .current_start .start_slot(T::EthSpec::slots_per_epoch()) .saturating_sub(self.beacon_chain.genesis_backfill_slot) .as_usize(), }) } /// A fully synced peer has joined us. /// If we are in a failed state, update a local variable to indicate we are able to restart /// the failed sync on the next attempt. pub fn fully_synced_peer_joined(&mut self) { if matches!(self.state(), BackFillState::Failed) { self.restart_failed_sync = true; } } /// A peer has disconnected. /// If the peer has active batches, those are considered failed and re-requested. #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), BackFillError> { if matches!(self.state(), BackFillState::Failed) { return Ok(()); } // Remove the peer from the participation list self.participating_peers.remove(peer_id); Ok(()) } /// An RPC error has occurred. /// /// If the batch exists it is re-requested. #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] pub fn inject_error( &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, peer_id: &PeerId, request_id: Id, err: RpcResponseError, ) -> Result<(), BackFillError> { if let Some(batch) = self.batches.get_mut(&batch_id) { if let RpcResponseError::BlockComponentCouplingError(coupling_error) = &err { match coupling_error { CouplingError::DataColumnPeerFailure { error, faulty_peers, exceeded_retries, } => { debug!(?batch_id, error, "Block components coupling error"); // Note: we don't fail the batch here because a `CouplingError` is // recoverable by requesting from other honest peers. let mut failed_columns = HashSet::new(); let mut failed_peers = HashSet::new(); for (column, peer) in faulty_peers { failed_columns.insert(*column); failed_peers.insert(*peer); } // Only retry if peer failure **and** retries haven't been exceeded if !*exceeded_retries { return self.retry_partial_batch( network, batch_id, request_id, failed_columns, failed_peers, ); } } CouplingError::BlobPeerFailure(msg) => { debug!(?batch_id, msg, "Blob peer failure"); } CouplingError::InternalError(msg) => { error!(?batch_id, msg, "Block components coupling internal error"); } } } // A batch could be retried without the peer failing the request (disconnecting/ // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer // TODO(das): removed peer_id matching as the node may request a different peer for data // columns. if !batch.is_expecting_request_id(&request_id) { return Ok(()); } debug!(batch_epoch = %batch_id, error = ?err, "Batch download failed"); match batch.download_failed(Some(*peer_id)) { Err(e) => self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)), Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { self.fail_sync(BackFillError::BatchDownloadFailed(batch_id)) } Ok(BatchOperationOutcome::Continue) => self.send_batch(network, batch_id), } } else { // this could be an error for an old batch, removed when the chain advances Ok(()) } } /// A block has been received for a batch relating to this backfilling chain. /// If the block correctly completes the batch it will be processed if possible. /// If this returns an error, the backfill sync has failed and will be restarted once new peers /// join the system. /// The sync manager should update the global sync state on failure. #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] pub fn on_block_response( &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, peer_id: &PeerId, request_id: Id, blocks: Vec>, ) -> Result { // check if we have this batch let Some(batch) = self.batches.get_mut(&batch_id) else { if !matches!(self.state(), BackFillState::Failed) { // A batch might get removed when the chain advances, so this is non fatal. debug!(epoch = %batch_id, "Received a block for unknown batch"); } return Ok(ProcessResult::Successful); }; // A batch could be retried without the peer failing the request (disconnecting/ // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer, and that the // request_id matches if !batch.is_expecting_request_id(&request_id) { return Ok(ProcessResult::Successful); } let received = blocks.len(); match batch.download_completed(blocks, *peer_id) { Ok(_) => { let awaiting_batches = self.processing_target.saturating_sub(batch_id) / BACKFILL_EPOCHS_PER_BATCH; debug!( epoch = %batch_id, blocks = received, %awaiting_batches, "Completed batch received" ); // pre-emptively request more blocks from peers whilst we process current blocks, self.request_batches(network)?; self.process_completed_batches(network) } Err(e) => { self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?; Ok(ProcessResult::Successful) } } } /// The syncing process has failed. /// /// This resets past variables, to allow for a fresh start when resuming. fn fail_sync(&mut self, error: BackFillError) -> Result<(), BackFillError> { // Some errors shouldn't fail the chain. if matches!(error, BackFillError::Paused) { return Ok(()); } // Set the state self.set_state(BackFillState::Failed); // Remove all batches and active requests and participating peers. self.batches.clear(); self.participating_peers.clear(); self.restart_failed_sync = false; // Reset all downloading and processing targets self.processing_target = self.current_start; self.to_be_downloaded = self.current_start; self.last_batch_downloaded = false; self.current_processing_batch = None; // NOTE: Lets keep validated_batches for posterity // Emit the log here error!(?error, "Backfill sync failed"); // Return the error, kinda weird pattern, but I want to use // `self.fail_chain(_)?` in other parts of the code. Err(error) } /// Processes the batch with the given id. /// The batch must exist and be ready for processing fn process_batch( &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, ) -> Result { // Only process batches if this chain is Syncing, and only one at a time if self.state() != BackFillState::Syncing || self.current_processing_batch.is_some() { return Ok(ProcessResult::Successful); } let Some(batch) = self.batches.get_mut(&batch_id) else { return self .fail_sync(BackFillError::InvalidSyncState(format!( "Trying to process a batch that does not exist: {}", batch_id ))) .map(|_| ProcessResult::Successful); }; // NOTE: We send empty batches to the processor in order to trigger the block processor // result callback. This is done, because an empty batch could end a chain and the logic // for removing chains and checking completion is in the callback. let (blocks, _) = match batch.start_processing() { Err(e) => { return self .fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)) .map(|_| ProcessResult::Successful); } Ok(v) => v, }; let process_id = ChainSegmentProcessId::BackSyncBatchId(batch_id); self.current_processing_batch = Some(batch_id); if let Err(e) = network .beacon_processor() .send_chain_segment(process_id, blocks) { crit!( msg = "process_batch", error = %e, batch = ?self.processing_target, "Failed to send backfill segment to processor." ); // This is unlikely to happen but it would stall syncing since the batch now has no // blocks to continue, and the chain is expecting a processing result that won't // arrive. To mitigate this, (fake) fail this processing so that the batch is // re-downloaded. self.on_batch_process_result(network, batch_id, &BatchProcessResult::NonFaultyFailure) } else { Ok(ProcessResult::Successful) } } /// The block processor has completed processing a batch. This function handles the result /// of the batch processor. /// If an error is returned the BackFill sync has failed. #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] pub fn on_batch_process_result( &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, result: &BatchProcessResult, ) -> Result { // The first two cases are possible in regular sync, should not occur in backfill, but we // keep this logic for handling potential processing race conditions. // result let batch = match &self.current_processing_batch { Some(processing_id) if *processing_id != batch_id => { debug!( batch_epoch = %batch_id.as_u64(), expected_batch_epoch = processing_id.as_u64(), "Unexpected batch result" ); return Ok(ProcessResult::Successful); } None => { debug!(%batch_id, "Chain was not expecting a batch result"); return Ok(ProcessResult::Successful); } _ => { // batch_id matches, continue self.current_processing_batch = None; match self.batches.get_mut(&batch_id) { Some(batch) => batch, None => { // This is an error. Fail the sync algorithm. return self .fail_sync(BackFillError::InvalidSyncState(format!( "Current processing batch not found: {}", batch_id ))) .map(|_| ProcessResult::Successful); } } } }; let Some(peer) = batch.processing_peer() else { self.fail_sync(BackFillError::BatchInvalidState( batch_id, String::from("Peer does not exist"), ))?; return Ok(ProcessResult::Successful); }; debug!( ?result, %batch, batch_epoch = %batch_id, %peer, client = %network.client_type(peer), "Backfill batch processed" ); match result { BatchProcessResult::Success { imported_blocks, .. } => { if let Err(e) = batch.processing_completed(BatchProcessingResult::Success) { self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?; } // If the processed batch was not empty, we can validate previous unvalidated // blocks. if *imported_blocks > 0 { self.advance_chain(network, batch_id); } if batch_id == self.processing_target { self.processing_target = self .processing_target .saturating_sub(BACKFILL_EPOCHS_PER_BATCH); } // check if the chain has completed syncing if self.check_completed() { // chain is completed info!( blocks_processed = self.validated_batches * T::EthSpec::slots_per_epoch(), "Backfill sync completed" ); self.set_state(BackFillState::Completed); Ok(ProcessResult::SyncCompleted) } else { // chain is not completed // attempt to request more batches self.request_batches(network)?; // attempt to process more batches self.process_completed_batches(network) } } BatchProcessResult::FaultyFailure { imported_blocks, penalty, } => { match batch.processing_completed(BatchProcessingResult::FaultyFailure) { Err(e) => { // Batch was in the wrong state self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)) .map(|_| ProcessResult::Successful) } Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { // check that we have not exceeded the re-process retry counter // If a batch has exceeded the invalid batch lookup attempts limit, it means // that it is likely all peers are sending invalid batches // repeatedly and are either malicious or faulty. We stop the backfill sync and // report all synced peers that have participated. warn!( score_adjustment = %penalty, batch_epoch = %batch_id, "Backfill batch failed to download. Penalizing peers" ); for peer in self.participating_peers.drain() { // TODO(das): `participating_peers` only includes block peers. Should we // penalize the custody column peers too? network.report_peer(peer, *penalty, "backfill_batch_failed"); } self.fail_sync(BackFillError::BatchProcessingFailed(batch_id)) .map(|_| ProcessResult::Successful) } Ok(BatchOperationOutcome::Continue) => { // chain can continue. Check if it can be progressed if *imported_blocks > 0 { // At least one block was successfully verified and imported, then we can be sure all // previous batches are valid and we only need to download the current failed // batch. self.advance_chain(network, batch_id); } // Handle this invalid batch, that is within the re-process retries limit. self.handle_invalid_batch(network, batch_id) .map(|_| ProcessResult::Successful) } } } BatchProcessResult::NonFaultyFailure => { if let Err(e) = batch.processing_completed(BatchProcessingResult::NonFaultyFailure) { self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?; } self.send_batch(network, batch_id)?; Ok(ProcessResult::Successful) } } } /// Processes the next ready batch. fn process_completed_batches( &mut self, network: &mut SyncNetworkContext, ) -> Result { // Only process batches if backfill is syncing and only process one batch at a time if self.state() != BackFillState::Syncing || self.current_processing_batch.is_some() { return Ok(ProcessResult::Successful); } // Find the id of the batch we are going to process. if let Some(batch) = self.batches.get(&self.processing_target) { let state = batch.state(); match state { BatchState::AwaitingProcessing(..) => { return self.process_batch(network, self.processing_target); } BatchState::Downloading(..) => { // Batch is not ready, nothing to process } BatchState::Poisoned => unreachable!("Poisoned batch"), // Batches can be in `AwaitingDownload` state if there weren't good data column subnet // peers to send the request to. BatchState::AwaitingDownload => return Ok(ProcessResult::Successful), BatchState::Failed | BatchState::Processing(_) => { // these are all inconsistent states: // - Failed -> non recoverable batch. Chain should have been removed // - Processing -> `self.current_processing_batch` is None self.fail_sync(BackFillError::InvalidSyncState(String::from( "Invalid expected batch state", )))?; return Ok(ProcessResult::Successful); } BatchState::AwaitingValidation(_) => { // TODO: I don't think this state is possible, log a CRIT just in case. // If this is not observed, add it to the failed state branch above. crit!( batch = ?self.processing_target, "Chain encountered a robust batch awaiting validation" ); self.processing_target -= BACKFILL_EPOCHS_PER_BATCH; if self.to_be_downloaded >= self.processing_target { self.to_be_downloaded = self.processing_target - BACKFILL_EPOCHS_PER_BATCH; } self.request_batches(network)?; } } } else { self.fail_sync(BackFillError::InvalidSyncState(format!( "Batch not found for current processing target {}", self.processing_target )))?; return Ok(ProcessResult::Successful); } Ok(ProcessResult::Successful) } /// Removes any batches previous to the given `validating_epoch` and updates the current /// boundaries of the chain. /// /// The `validating_epoch` must align with batch boundaries. /// /// If a previous batch has been validated and it had been re-processed, penalize the original /// peer. fn advance_chain(&mut self, network: &mut SyncNetworkContext, validating_epoch: Epoch) { // make sure this epoch produces an advancement if validating_epoch >= self.current_start { return; } // We can now validate higher batches that the current batch. Here we remove all // batches that are higher than the current batch. We add on an extra // `BACKFILL_EPOCHS_PER_BATCH` as `split_off` is inclusive. let removed_batches = self .batches .split_off(&(validating_epoch + BACKFILL_EPOCHS_PER_BATCH)); for (id, batch) in removed_batches.into_iter() { self.validated_batches = self.validated_batches.saturating_add(1); // only for batches awaiting validation can we be sure the last attempt is // right, and thus, that any different attempt is wrong match batch.state() { BatchState::AwaitingValidation(processed_attempt) => { for attempt in batch.attempts() { // The validated batch has been re-processed if attempt.hash != processed_attempt.hash { // The re-downloaded version was different. if processed_attempt.peer_id != attempt.peer_id { // A different peer sent the correct batch, the previous peer did not // We negatively score the original peer. let action = PeerAction::LowToleranceError; debug!( batch_epoch = ?id, score_adjustment = %action, original_peer = %attempt.peer_id, new_peer = %processed_attempt.peer_id, "Re-processed batch validated. Scoring original peer" ); network.report_peer( attempt.peer_id, action, "backfill_reprocessed_original_peer", ); } else { // The same peer corrected it's previous mistake. There was an error, so we // negative score the original peer. let action = PeerAction::MidToleranceError; debug!( batch_epoch = ?id, score_adjustment = %action, original_peer = %attempt.peer_id, new_peer = %processed_attempt.peer_id, "Re-processed batch validated by the same peer" ); network.report_peer( attempt.peer_id, action, "backfill_reprocessed_same_peer", ); } } } } BatchState::Downloading(..) => {} BatchState::AwaitingDownload => return, BatchState::Failed | BatchState::Poisoned => { crit!("batch indicates inconsistent chain state while advancing chain") } BatchState::AwaitingProcessing(..) => {} BatchState::Processing(_) => { debug!(batch = %id, %batch, "Advancing chain while processing a batch"); if let Some(processing_id) = self.current_processing_batch && id >= processing_id { self.current_processing_batch = None; } } } } self.processing_target = self.processing_target.min(validating_epoch); self.current_start = validating_epoch; self.to_be_downloaded = self.to_be_downloaded.min(validating_epoch); if self.batches.contains_key(&self.to_be_downloaded) { // if a chain is advanced by Range beyond the previous `self.to_be_downloaded`, we // won't have this batch, so we need to request it. self.to_be_downloaded -= BACKFILL_EPOCHS_PER_BATCH; } debug!(?validating_epoch, processing_target = ?self.processing_target, "Backfill advanced"); } /// An invalid batch has been received that could not be processed, but that can be retried. /// /// These events occur when a peer has successfully responded with blocks, but the blocks we /// have received are incorrect or invalid. This indicates the peer has not performed as /// intended and can result in downvoting a peer. fn handle_invalid_batch( &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, ) -> Result<(), BackFillError> { // The current batch could not be processed, indicating either the current or previous // batches are invalid. // The previous batch could be incomplete due to the block sizes being too large to fit in // a single RPC request or there could be consecutive empty batches which are not supposed // to be there // The current (sub-optimal) strategy is to simply re-request all batches that could // potentially be faulty. If a batch returns a different result than the original and // results in successful processing, we downvote the original peer that sent us the batch. // this is our robust `processing_target`. All previous batches must be awaiting // validation let mut redownload_queue = Vec::new(); for (id, batch) in self .batches .iter_mut() .filter(|&(&id, ref _batch)| id > batch_id) { match batch .validation_failed() .map_err(|e| BackFillError::BatchInvalidState(batch_id, e.0))? { BatchOperationOutcome::Failed { blacklist: _ } => { // Batch has failed and cannot be redownloaded. return self.fail_sync(BackFillError::BatchProcessingFailed(batch_id)); } BatchOperationOutcome::Continue => { redownload_queue.push(*id); } } } // no batch maxed out it process attempts, so now the chain's volatile progress must be // reset self.processing_target = self.current_start; for id in redownload_queue { self.send_batch(network, id)?; } // finally, re-request the failed batch. self.send_batch(network, batch_id) } /// Requests the batch assigned to the given id from a given peer. fn send_batch( &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, ) -> Result<(), BackFillError> { if matches!(self.state(), BackFillState::Paused) { return Err(BackFillError::Paused); } if let Some(batch) = self.batches.get_mut(&batch_id) { debug!(?batch_id, "Sending backfill batch"); let synced_peers = self .network_globals .peers .read() .synced_peers_for_epoch(batch_id) .cloned() .collect::>(); let (request, is_blob_batch) = batch.to_blocks_by_range_request(); let failed_peers = batch.failed_peers(); match network.block_components_by_range_request( is_blob_batch, request, RangeRequestId::BackfillSync { batch_id }, &synced_peers, &synced_peers, // All synced peers have imported up to the finalized slot so they must have their custody columns available &failed_peers, ) { Ok(request_id) => { // inform the batch about the new request if let Err(e) = batch.start_downloading(request_id) { return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)); } debug!(epoch = %batch_id, %batch, "Requesting batch"); return Ok(()); } Err(e) => match e { RpcRequestSendError::NoPeer(no_peer) => { // If we are here the chain has no more synced peers info!( "reason" = format!("insufficient_synced_peers({no_peer:?})"), "Backfill sync paused" ); self.set_state(BackFillState::Paused); return Err(BackFillError::Paused); } RpcRequestSendError::InternalError(e) => { // NOTE: under normal conditions this shouldn't happen but we handle it anyway warn!(%batch_id, error = ?e, %batch,"Could not send batch request"); // register the failed download and check if the batch can be retried if let Err(e) = batch.start_downloading(1) { return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)); } match batch.download_failed(None) { Err(e) => { self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))? } Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))? } Ok(BatchOperationOutcome::Continue) => { return self.send_batch(network, batch_id); } } } }, } } Ok(()) } /// Retries partial column requests within the batch by creating new requests for the failed columns. pub fn retry_partial_batch( &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, id: Id, failed_columns: HashSet, mut failed_peers: HashSet, ) -> Result<(), BackFillError> { if let Some(batch) = self.batches.get_mut(&batch_id) { failed_peers.extend(&batch.failed_peers()); let req = batch.to_blocks_by_range_request().0; let synced_peers = network .network_globals() .peers .read() .synced_peers_for_epoch(batch_id) .cloned() .collect::>(); match network.retry_columns_by_range( id, &synced_peers, &failed_peers, req, &failed_columns, ) { Ok(_) => { debug!( ?batch_id, id, "Retried column requests from different peers" ); return Ok(()); } Err(e) => { debug!(?batch_id, id, e, "Failed to retry partial batch"); } } } else { return Err(BackFillError::InvalidSyncState( "Batch should exist to be retried".to_string(), )); } Ok(()) } /// When resuming a chain, this function searches for batches that need to be re-downloaded and /// transitions their state to redownload the batch. fn resume_batches(&mut self, network: &mut SyncNetworkContext) -> Result<(), BackFillError> { let batch_ids_to_retry = self .batches .iter() .filter_map(|(batch_id, batch)| { // In principle there should only ever be on of these, and we could terminate the // loop early, however the processing is negligible and we continue the search // for robustness to handle potential future modification if matches!(batch.state(), BatchState::AwaitingDownload) { Some(*batch_id) } else { None } }) .collect::>(); for batch_id in batch_ids_to_retry { self.send_batch(network, batch_id)?; } Ok(()) } /// Attempts to request the next required batches from the peer pool if the chain is syncing. It will exhaust the peer /// pool and left over batches until the batch buffer is reached or all peers are exhausted. fn request_batches( &mut self, network: &mut SyncNetworkContext, ) -> Result<(), BackFillError> { if !matches!(self.state(), BackFillState::Syncing) { return Ok(()); } // find the next pending batch and request it from the peer // Note: for this function to not infinite loop we must: // - If `include_next_batch` returns Some we MUST increase the count of batches that are // accounted in the `BACKFILL_BATCH_BUFFER_SIZE` limit in the `matches!` statement of // that function. while let Some(batch_id) = self.include_next_batch(network) { // send the batch self.send_batch(network, batch_id)?; } // No more batches, simply stop Ok(()) } /// Creates the next required batch from the chain. If there are no more batches required, /// `false` is returned. fn include_next_batch(&mut self, network: &mut SyncNetworkContext) -> Option { // don't request batches beyond genesis; if self.last_batch_downloaded { return None; } // only request batches up to the buffer size limit // NOTE: we don't count batches in the AwaitingValidation state, to prevent stalling sync // if the current processing window is contained in a long range of skip slots. let in_buffer = |batch: &BackFillBatchInfo| { matches!( batch.state(), BatchState::Downloading(..) | BatchState::AwaitingProcessing(..) ) }; if self .batches .iter() .filter(|&(_epoch, batch)| in_buffer(batch)) .count() >= BACKFILL_BATCH_BUFFER_SIZE as usize { return None; } if !self.good_peers_on_sampling_subnets(self.to_be_downloaded, network) { debug!("Waiting for peers to be available on custody column subnets"); return None; } let batch_id = self.to_be_downloaded; // this batch could have been included already being an optimistic batch match self.batches.entry(batch_id) { Entry::Occupied(_) => { // this batch doesn't need downloading, let this same function decide the next batch if self.would_complete(batch_id) { self.last_batch_downloaded = true; } self.to_be_downloaded = self .to_be_downloaded .saturating_sub(BACKFILL_EPOCHS_PER_BATCH); self.include_next_batch(network) } Entry::Vacant(entry) => { let batch_type = network.batch_type(batch_id); entry.insert(BatchInfo::new( &batch_id, BACKFILL_EPOCHS_PER_BATCH, batch_type, )); if self.would_complete(batch_id) { self.last_batch_downloaded = true; } self.to_be_downloaded = self .to_be_downloaded .saturating_sub(BACKFILL_EPOCHS_PER_BATCH); Some(batch_id) } } } /// Checks all sampling column subnets for peers. Returns `true` if there is at least one peer in /// every sampling column subnet. /// /// Returns `true` if peerdas isn't enabled for the epoch. fn good_peers_on_sampling_subnets( &self, epoch: Epoch, network: &SyncNetworkContext, ) -> bool { if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) { // Require peers on all sampling column subnets before sending batches network .network_globals() .sampling_subnets() .iter() .all(|subnet_id| { let min_peer_count = 1; network .network_globals() .peers .read() .has_good_peers_in_custody_subnet(subnet_id, min_peer_count) }) } else { true } } /// Resets the start epoch based on the beacon chain. /// /// This errors if the beacon chain indicates that backfill sync has already completed or is /// not required. fn reset_start_epoch(&mut self) -> Result<(), ResetEpochError> { let anchor_info = self.beacon_chain.store.get_anchor_info(); if anchor_info.block_backfill_complete(self.beacon_chain.genesis_backfill_slot) { Err(ResetEpochError::SyncCompleted) } else { self.current_start = anchor_info .oldest_block_slot .epoch(T::EthSpec::slots_per_epoch()); Ok(()) } } /// Checks with the beacon chain if backfill sync has completed. fn check_completed(&mut self) -> bool { if self.would_complete(self.current_start) { // Check that the beacon chain agrees let anchor_info = self.beacon_chain.store.get_anchor_info(); // Conditions that we have completed a backfill sync if anchor_info.block_backfill_complete(self.beacon_chain.genesis_backfill_slot) { return true; } else { error!("Backfill out of sync with beacon chain"); } } false } /// Checks if backfill would complete by syncing to `start_epoch`. fn would_complete(&self, start_epoch: Epoch) -> bool { start_epoch <= self .beacon_chain .genesis_backfill_slot .epoch(T::EthSpec::slots_per_epoch()) } pub fn register_metrics(&self) { for state in BatchMetricsState::iter() { let count = self .batches .values() .filter(|b| b.state().metrics_state() == state) .count(); metrics::set_gauge_vec( &metrics::SYNCING_CHAIN_BATCHES, &["backfill", state.into()], count as i64, ); } } /// Updates the global network state indicating the current state of a backfill sync. fn set_state(&self, state: BackFillState) { *self.network_globals.backfill_state.write() = state; } fn state(&self) -> BackFillState { self.network_globals.backfill_state.read().clone() } } /// Error kind for attempting to restart the sync from beacon chain parameters. enum ResetEpochError { /// The chain has already completed. SyncCompleted, } #[cfg(test)] mod tests { use super::*; use beacon_chain::test_utils::BeaconChainHarness; use bls::Hash256; use lighthouse_network::{NetworkConfig, SyncInfo, SyncStatus}; use rand_08::SeedableRng; use rand_08::prelude::StdRng; use types::MinimalEthSpec; #[test] fn request_batches_should_not_loop_infinitely() { let harness = BeaconChainHarness::builder(MinimalEthSpec) .default_spec() .deterministic_keypairs(4) .fresh_ephemeral_store() .build(); let beacon_chain = harness.chain.clone(); let slots_per_epoch = MinimalEthSpec::slots_per_epoch(); let network_globals = Arc::new(NetworkGlobals::new_test_globals( vec![], Arc::new(NetworkConfig::default()), beacon_chain.spec.clone(), )); { let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64); let peer_id = network_globals .peers .write() .__add_connected_peer_testing_only( true, &beacon_chain.spec, k256::ecdsa::SigningKey::random(&mut rng).into(), ); // Simulate finalized epoch and head being 2 epochs ahead let finalized_epoch = Epoch::new(40); let head_epoch = finalized_epoch + 2; let head_slot = head_epoch.start_slot(slots_per_epoch) + 1; network_globals.peers.write().update_sync_status( &peer_id, SyncStatus::Synced { info: SyncInfo { head_slot, head_root: Hash256::random(), finalized_epoch, finalized_root: Hash256::random(), earliest_available_slot: None, }, }, ); } let mut network = SyncNetworkContext::new_for_testing( beacon_chain.clone(), network_globals.clone(), harness.runtime.task_executor.clone(), ); let mut backfill = BackFillSync::new(beacon_chain, network_globals); backfill.set_state(BackFillState::Syncing); // if this ends up running into an infinite loop, the test will overflow the stack pretty quickly. let _ = backfill.request_batches(&mut network); } }