mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-03 00:31:50 +00:00
N/A Extracts (3) from https://github.com/sigp/lighthouse/pull/7946. Prior to peerdas, a batch should never have been in `AwaitingDownload` state because we immediataly try to move from `AwaitingDownload` to `Downloading` state by sending batches. This was always possible as long as we had peers in the `SyncingChain` in the pre-peerdas world. However, this is no longer the case as a batch can be stuck waiting in `AwaitingDownload` state if we have no peers to request the columns from. This PR makes `AwaitingDownload` to be an allowable in between state. If a batch is found to be in this state, then we attempt to send the batch instead of erroring like before. Note to reviewer: We need to make sure that this doesn't lead to a bunch of batches stuck in `AwaitingDownload` state if the chain can be progressed. Backfill already retries all batches in AwaitingDownload state so we just need to make `AwaitingDownload` a valid state during processing and validation. This PR explicitly adds the same logic for forward sync to download batches stuck in `AwaitingDownload`. Apart from that, we also force download of the `processing_target` when sync stops progressing. This is required in cases where `self.batches` has > `BATCH_BUFFER_SIZE` batches that are waiting to get processed but the `processing_batch` has repeatedly failed at download/processing stage. This leads to sync getting stuck and never recovering.
1263 lines
53 KiB
Rust
1263 lines
53 KiB
Rust
//! This module contains the logic for Lighthouse's backfill sync.
|
|
//!
|
|
//! This kind of sync occurs when a trusted state is provided to the client. The client
|
|
//! will perform a [`RangeSync`] to the latest head from the trusted state, such that the
|
|
//! client can perform its duties right away. Once completed, a backfill sync occurs, where all old
|
|
//! blocks (from genesis) are downloaded in order to keep a consistent history.
|
|
//!
|
|
//! If a batch fails, the backfill sync cannot progress. In this scenario, we mark the backfill
|
|
//! sync as failed, log an error and attempt to retry once a new peer joins the node.
|
|
|
|
use crate::network_beacon_processor::ChainSegmentProcessId;
|
|
use crate::sync::block_sidecar_coupling::CouplingError;
|
|
use crate::sync::manager::BatchProcessResult;
|
|
use crate::sync::network_context::{
|
|
RangeRequestId, RpcRequestSendError, RpcResponseError, SyncNetworkContext,
|
|
};
|
|
use crate::sync::range_sync::{
|
|
BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState,
|
|
};
|
|
use beacon_chain::block_verification_types::RpcBlock;
|
|
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
|
use lighthouse_network::service::api_types::Id;
|
|
use lighthouse_network::types::{BackFillState, NetworkGlobals};
|
|
use lighthouse_network::{PeerAction, PeerId};
|
|
use logging::crit;
|
|
use std::collections::{
|
|
HashSet,
|
|
btree_map::{BTreeMap, Entry},
|
|
};
|
|
use std::sync::Arc;
|
|
use tracing::{debug, error, info, warn};
|
|
use types::{ColumnIndex, Epoch, EthSpec};
|
|
|
|
/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
|
|
/// blocks per batch are requested _at most_. A batch may request less blocks to account for
|
|
/// already requested slots. There is a timeout for each batch request. If this value is too high,
|
|
/// we will negatively report peers with poor bandwidth. This can be set arbitrarily high, in which
|
|
/// case the responder will fill the response up to the max request size, assuming they have the
|
|
/// bandwidth to do so.
|
|
pub const BACKFILL_EPOCHS_PER_BATCH: u64 = 1;
|
|
|
|
/// The maximum number of batches to queue before requesting more.
|
|
const BACKFILL_BATCH_BUFFER_SIZE: u8 = 5;
|
|
|
|
/// The number of times to retry a batch before it is considered failed.
|
|
const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 10;
|
|
|
|
/// Invalid batches are attempted to be re-downloaded from other peers. If a batch cannot be processed
|
|
/// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty.
|
|
const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 10;
|
|
|
|
/// Custom configuration for the batch object.
|
|
struct BackFillBatchConfig {}
|
|
|
|
impl BatchConfig for BackFillBatchConfig {
|
|
fn max_batch_download_attempts() -> u8 {
|
|
MAX_BATCH_DOWNLOAD_ATTEMPTS
|
|
}
|
|
fn max_batch_processing_attempts() -> u8 {
|
|
MAX_BATCH_PROCESSING_ATTEMPTS
|
|
}
|
|
fn batch_attempt_hash<E: EthSpec>(blocks: &[RpcBlock<E>]) -> u64 {
|
|
use std::collections::hash_map::DefaultHasher;
|
|
use std::hash::{Hash, Hasher};
|
|
let mut hasher = DefaultHasher::new();
|
|
blocks.hash(&mut hasher);
|
|
hasher.finish()
|
|
}
|
|
}
|
|
|
|
/// Return type when attempting to start the backfill sync process.
|
|
pub enum SyncStart {
|
|
/// The chain started syncing or is already syncing.
|
|
Syncing {
|
|
/// The number of slots that have been processed so far.
|
|
completed: usize,
|
|
/// The number of slots still to be processed.
|
|
remaining: usize,
|
|
},
|
|
/// The chain didn't start syncing.
|
|
NotSyncing,
|
|
}
|
|
|
|
/// A standard result from calling public functions on [`BackFillSync`].
|
|
pub enum ProcessResult {
|
|
/// The call was successful.
|
|
Successful,
|
|
/// The call resulted in completing the backfill sync.
|
|
SyncCompleted,
|
|
}
|
|
|
|
/// The ways a backfill sync can fail.
|
|
// The info in the enum variants is displayed in logging, clippy thinks it's dead code.
|
|
#[derive(Debug)]
|
|
pub enum BackFillError {
|
|
/// A batch failed to be downloaded.
|
|
BatchDownloadFailed(#[allow(dead_code)] BatchId),
|
|
/// A batch could not be processed.
|
|
BatchProcessingFailed(#[allow(dead_code)] BatchId),
|
|
/// A batch entered an invalid state.
|
|
BatchInvalidState(#[allow(dead_code)] BatchId, #[allow(dead_code)] String),
|
|
/// The sync algorithm entered an invalid state.
|
|
InvalidSyncState(#[allow(dead_code)] String),
|
|
/// The chain became paused.
|
|
Paused,
|
|
}
|
|
|
|
pub struct BackFillSync<T: BeaconChainTypes> {
|
|
/// Keeps track of the current progress of the backfill.
|
|
/// This only gets refreshed from the beacon chain if we enter a failed state.
|
|
current_start: BatchId,
|
|
|
|
/// Starting epoch of the batch that needs to be processed next.
|
|
/// This is incremented as the chain advances.
|
|
processing_target: BatchId,
|
|
|
|
/// Starting epoch of the next batch that needs to be downloaded.
|
|
to_be_downloaded: BatchId,
|
|
|
|
/// Keeps track if we have requested the final batch.
|
|
last_batch_downloaded: bool,
|
|
|
|
/// Sorted map of batches undergoing some kind of processing.
|
|
batches: BTreeMap<BatchId, BatchInfo<T::EthSpec, BackFillBatchConfig>>,
|
|
|
|
/// The current processing batch, if any.
|
|
current_processing_batch: Option<BatchId>,
|
|
|
|
/// Batches validated by this chain.
|
|
validated_batches: u64,
|
|
|
|
/// We keep track of peers that are participating in the backfill sync. Unlike RangeSync,
|
|
/// BackFillSync uses all synced peers to download the chain from. If BackFillSync fails, we don't
|
|
/// want to penalize all our synced peers, so we use this variable to keep track of peers that
|
|
/// have participated and only penalize these peers if backfill sync fails.
|
|
participating_peers: HashSet<PeerId>,
|
|
|
|
/// When a backfill sync fails, we keep track of whether a new fully synced peer has joined.
|
|
/// This signifies that we are able to attempt to restart a failed chain.
|
|
restart_failed_sync: bool,
|
|
|
|
/// Reference to the beacon chain to obtain initial starting points for the backfill sync.
|
|
beacon_chain: Arc<BeaconChain<T>>,
|
|
|
|
/// Reference to the network globals in order to obtain valid peers to backfill blocks from
|
|
/// (i.e synced peers).
|
|
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
|
|
}
|
|
|
|
impl<T: BeaconChainTypes> BackFillSync<T> {
|
|
pub fn new(
|
|
beacon_chain: Arc<BeaconChain<T>>,
|
|
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
|
|
) -> Self {
|
|
// Determine if backfill is enabled or not.
|
|
// If, for some reason a backfill has already been completed (or we've used a trusted
|
|
// genesis root) then backfill has been completed.
|
|
let anchor_info = beacon_chain.store.get_anchor_info();
|
|
let (state, current_start) =
|
|
if anchor_info.block_backfill_complete(beacon_chain.genesis_backfill_slot) {
|
|
(BackFillState::Completed, Epoch::new(0))
|
|
} else {
|
|
(
|
|
BackFillState::Paused,
|
|
anchor_info
|
|
.oldest_block_slot
|
|
.epoch(T::EthSpec::slots_per_epoch()),
|
|
)
|
|
};
|
|
|
|
let bfs = BackFillSync {
|
|
batches: BTreeMap::new(),
|
|
processing_target: current_start,
|
|
current_start,
|
|
last_batch_downloaded: false,
|
|
to_be_downloaded: current_start,
|
|
network_globals,
|
|
current_processing_batch: None,
|
|
validated_batches: 0,
|
|
participating_peers: HashSet::new(),
|
|
restart_failed_sync: false,
|
|
beacon_chain,
|
|
};
|
|
|
|
// Update the global network state with the current backfill state.
|
|
bfs.set_state(state);
|
|
bfs
|
|
}
|
|
|
|
/// Pauses the backfill sync if it's currently syncing.
|
|
pub fn pause(&mut self) {
|
|
if let BackFillState::Syncing = self.state() {
|
|
debug!(processed_epochs = %self.validated_batches, to_be_processed = %self.current_start,"Backfill sync paused");
|
|
self.set_state(BackFillState::Paused);
|
|
}
|
|
}
|
|
|
|
/// Starts or resumes syncing.
|
|
///
|
|
/// If resuming is successful, reports back the current syncing metrics.
|
|
#[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"]
|
|
pub fn start(
|
|
&mut self,
|
|
network: &mut SyncNetworkContext<T>,
|
|
) -> Result<SyncStart, BackFillError> {
|
|
match self.state() {
|
|
BackFillState::Syncing => {} // already syncing ignore.
|
|
BackFillState::Paused => {
|
|
if self
|
|
.network_globals
|
|
.peers
|
|
.read()
|
|
.synced_peers_for_epoch(self.to_be_downloaded, None)
|
|
.next()
|
|
.is_some()
|
|
// backfill can't progress if we do not have peers in the required subnets post peerdas.
|
|
&& self.good_peers_on_sampling_subnets(self.to_be_downloaded, network)
|
|
{
|
|
// If there are peers to resume with, begin the resume.
|
|
debug!(start_epoch = ?self.current_start, awaiting_batches = self.batches.len(), processing_target = ?self.processing_target, "Resuming backfill sync");
|
|
self.set_state(BackFillState::Syncing);
|
|
// Resume any previously failed batches.
|
|
self.resume_batches(network)?;
|
|
// begin requesting blocks from the peer pool, until all peers are exhausted.
|
|
self.request_batches(network)?;
|
|
|
|
// start processing batches if needed
|
|
self.process_completed_batches(network)?;
|
|
} else {
|
|
return Ok(SyncStart::NotSyncing);
|
|
}
|
|
}
|
|
BackFillState::Failed => {
|
|
// Attempt to recover from a failed sync. All local variables should be reset and
|
|
// cleared already for a fresh start.
|
|
// We only attempt to restart a failed backfill sync if a new synced peer has been
|
|
// added.
|
|
if !self.restart_failed_sync {
|
|
return Ok(SyncStart::NotSyncing);
|
|
}
|
|
|
|
self.set_state(BackFillState::Syncing);
|
|
|
|
// Obtain a new start slot, from the beacon chain and handle possible errors.
|
|
if let Err(e) = self.reset_start_epoch() {
|
|
// This infallible match exists to force us to update this code if a future
|
|
// refactor of `ResetEpochError` adds a variant.
|
|
let ResetEpochError::SyncCompleted = e;
|
|
error!("Backfill sync completed whilst in failed status");
|
|
self.set_state(BackFillState::Completed);
|
|
return Err(BackFillError::InvalidSyncState(String::from(
|
|
"chain completed",
|
|
)));
|
|
}
|
|
|
|
debug!(start_epoch = %self.current_start, "Resuming a failed backfill sync");
|
|
|
|
// begin requesting blocks from the peer pool, until all peers are exhausted.
|
|
self.request_batches(network)?;
|
|
}
|
|
BackFillState::Completed => return Ok(SyncStart::NotSyncing),
|
|
}
|
|
|
|
Ok(SyncStart::Syncing {
|
|
completed: (self.validated_batches
|
|
* BACKFILL_EPOCHS_PER_BATCH
|
|
* T::EthSpec::slots_per_epoch()) as usize,
|
|
remaining: self
|
|
.current_start
|
|
.start_slot(T::EthSpec::slots_per_epoch())
|
|
.saturating_sub(self.beacon_chain.genesis_backfill_slot)
|
|
.as_usize(),
|
|
})
|
|
}
|
|
|
|
/// A fully synced peer has joined us.
|
|
/// If we are in a failed state, update a local variable to indicate we are able to restart
|
|
/// the failed sync on the next attempt.
|
|
pub fn fully_synced_peer_joined(&mut self) {
|
|
if matches!(self.state(), BackFillState::Failed) {
|
|
self.restart_failed_sync = true;
|
|
}
|
|
}
|
|
|
|
/// A peer has disconnected.
|
|
/// If the peer has active batches, those are considered failed and re-requested.
|
|
#[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"]
|
|
pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), BackFillError> {
|
|
if matches!(self.state(), BackFillState::Failed) {
|
|
return Ok(());
|
|
}
|
|
|
|
// Remove the peer from the participation list
|
|
self.participating_peers.remove(peer_id);
|
|
Ok(())
|
|
}
|
|
|
|
/// An RPC error has occurred.
|
|
///
|
|
/// If the batch exists it is re-requested.
|
|
#[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"]
|
|
pub fn inject_error(
|
|
&mut self,
|
|
network: &mut SyncNetworkContext<T>,
|
|
batch_id: BatchId,
|
|
peer_id: &PeerId,
|
|
request_id: Id,
|
|
err: RpcResponseError,
|
|
) -> Result<(), BackFillError> {
|
|
if let Some(batch) = self.batches.get_mut(&batch_id) {
|
|
if let RpcResponseError::BlockComponentCouplingError(coupling_error) = &err {
|
|
match coupling_error {
|
|
CouplingError::DataColumnPeerFailure {
|
|
error,
|
|
faulty_peers,
|
|
action,
|
|
exceeded_retries,
|
|
} => {
|
|
debug!(?batch_id, error, "Block components coupling error");
|
|
// Note: we don't fail the batch here because a `CouplingError` is
|
|
// recoverable by requesting from other honest peers.
|
|
let mut failed_columns = HashSet::new();
|
|
let mut failed_peers = HashSet::new();
|
|
for (column, peer) in faulty_peers {
|
|
failed_columns.insert(*column);
|
|
failed_peers.insert(*peer);
|
|
}
|
|
for peer in failed_peers.iter() {
|
|
network.report_peer(*peer, *action, "failed to return columns");
|
|
}
|
|
|
|
// Only retry if peer failure **and** retries have been exceeded
|
|
if !*exceeded_retries {
|
|
return self.retry_partial_batch(
|
|
network,
|
|
batch_id,
|
|
request_id,
|
|
failed_columns,
|
|
failed_peers,
|
|
);
|
|
}
|
|
}
|
|
CouplingError::BlobPeerFailure(msg) => {
|
|
tracing::debug!(?batch_id, msg, "Blob peer failure");
|
|
}
|
|
CouplingError::InternalError(msg) => {
|
|
error!(?batch_id, msg, "Block components coupling internal error");
|
|
}
|
|
}
|
|
}
|
|
// A batch could be retried without the peer failing the request (disconnecting/
|
|
// sending an error /timeout) if the peer is removed from the chain for other
|
|
// reasons. Check that this block belongs to the expected peer
|
|
// TODO(das): removed peer_id matching as the node may request a different peer for data
|
|
// columns.
|
|
if !batch.is_expecting_block(&request_id) {
|
|
return Ok(());
|
|
}
|
|
debug!(batch_epoch = %batch_id, error = ?err, "Batch download failed");
|
|
match batch.download_failed(Some(*peer_id)) {
|
|
Err(e) => self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)),
|
|
Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
|
|
self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))
|
|
}
|
|
Ok(BatchOperationOutcome::Continue) => self.send_batch(network, batch_id),
|
|
}
|
|
} else {
|
|
// this could be an error for an old batch, removed when the chain advances
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
/// A block has been received for a batch relating to this backfilling chain.
|
|
/// If the block correctly completes the batch it will be processed if possible.
|
|
/// If this returns an error, the backfill sync has failed and will be restarted once new peers
|
|
/// join the system.
|
|
/// The sync manager should update the global sync state on failure.
|
|
#[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"]
|
|
pub fn on_block_response(
|
|
&mut self,
|
|
network: &mut SyncNetworkContext<T>,
|
|
batch_id: BatchId,
|
|
peer_id: &PeerId,
|
|
request_id: Id,
|
|
blocks: Vec<RpcBlock<T::EthSpec>>,
|
|
) -> Result<ProcessResult, BackFillError> {
|
|
// check if we have this batch
|
|
let Some(batch) = self.batches.get_mut(&batch_id) else {
|
|
if !matches!(self.state(), BackFillState::Failed) {
|
|
// A batch might get removed when the chain advances, so this is non fatal.
|
|
debug!(epoch = %batch_id, "Received a block for unknown batch");
|
|
}
|
|
return Ok(ProcessResult::Successful);
|
|
};
|
|
|
|
// A batch could be retried without the peer failing the request (disconnecting/
|
|
// sending an error /timeout) if the peer is removed from the chain for other
|
|
// reasons. Check that this block belongs to the expected peer, and that the
|
|
// request_id matches
|
|
if !batch.is_expecting_block(&request_id) {
|
|
return Ok(ProcessResult::Successful);
|
|
}
|
|
|
|
match batch.download_completed(blocks, *peer_id) {
|
|
Ok(received) => {
|
|
let awaiting_batches =
|
|
self.processing_target.saturating_sub(batch_id) / BACKFILL_EPOCHS_PER_BATCH;
|
|
debug!(
|
|
epoch = %batch_id,
|
|
blocks = received,
|
|
%awaiting_batches,
|
|
"Completed batch received"
|
|
);
|
|
|
|
// pre-emptively request more blocks from peers whilst we process current blocks,
|
|
self.request_batches(network)?;
|
|
self.process_completed_batches(network)
|
|
}
|
|
Err(e) => {
|
|
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?;
|
|
Ok(ProcessResult::Successful)
|
|
}
|
|
}
|
|
}
|
|
|
|
/// The syncing process has failed.
|
|
///
|
|
/// This resets past variables, to allow for a fresh start when resuming.
|
|
fn fail_sync(&mut self, error: BackFillError) -> Result<(), BackFillError> {
|
|
// Some errors shouldn't fail the chain.
|
|
if matches!(error, BackFillError::Paused) {
|
|
return Ok(());
|
|
}
|
|
|
|
// Set the state
|
|
self.set_state(BackFillState::Failed);
|
|
// Remove all batches and active requests and participating peers.
|
|
self.batches.clear();
|
|
self.participating_peers.clear();
|
|
self.restart_failed_sync = false;
|
|
|
|
// Reset all downloading and processing targets
|
|
self.processing_target = self.current_start;
|
|
self.to_be_downloaded = self.current_start;
|
|
self.last_batch_downloaded = false;
|
|
self.current_processing_batch = None;
|
|
|
|
// NOTE: Lets keep validated_batches for posterity
|
|
|
|
// Emit the log here
|
|
error!(?error, "Backfill sync failed");
|
|
|
|
// Return the error, kinda weird pattern, but I want to use
|
|
// `self.fail_chain(_)?` in other parts of the code.
|
|
Err(error)
|
|
}
|
|
|
|
/// Processes the batch with the given id.
|
|
/// The batch must exist and be ready for processing
|
|
fn process_batch(
|
|
&mut self,
|
|
network: &mut SyncNetworkContext<T>,
|
|
batch_id: BatchId,
|
|
) -> Result<ProcessResult, BackFillError> {
|
|
// Only process batches if this chain is Syncing, and only one at a time
|
|
if self.state() != BackFillState::Syncing || self.current_processing_batch.is_some() {
|
|
return Ok(ProcessResult::Successful);
|
|
}
|
|
|
|
let Some(batch) = self.batches.get_mut(&batch_id) else {
|
|
return self
|
|
.fail_sync(BackFillError::InvalidSyncState(format!(
|
|
"Trying to process a batch that does not exist: {}",
|
|
batch_id
|
|
)))
|
|
.map(|_| ProcessResult::Successful);
|
|
};
|
|
|
|
// NOTE: We send empty batches to the processor in order to trigger the block processor
|
|
// result callback. This is done, because an empty batch could end a chain and the logic
|
|
// for removing chains and checking completion is in the callback.
|
|
|
|
let (blocks, _) = match batch.start_processing() {
|
|
Err(e) => {
|
|
return self
|
|
.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))
|
|
.map(|_| ProcessResult::Successful);
|
|
}
|
|
Ok(v) => v,
|
|
};
|
|
|
|
let process_id = ChainSegmentProcessId::BackSyncBatchId(batch_id);
|
|
self.current_processing_batch = Some(batch_id);
|
|
|
|
if let Err(e) = network
|
|
.beacon_processor()
|
|
.send_chain_segment(process_id, blocks)
|
|
{
|
|
crit!(
|
|
msg = "process_batch",
|
|
error = %e,
|
|
batch = ?self.processing_target,
|
|
"Failed to send backfill segment to processor."
|
|
);
|
|
// This is unlikely to happen but it would stall syncing since the batch now has no
|
|
// blocks to continue, and the chain is expecting a processing result that won't
|
|
// arrive. To mitigate this, (fake) fail this processing so that the batch is
|
|
// re-downloaded.
|
|
self.on_batch_process_result(network, batch_id, &BatchProcessResult::NonFaultyFailure)
|
|
} else {
|
|
Ok(ProcessResult::Successful)
|
|
}
|
|
}
|
|
|
|
/// The block processor has completed processing a batch. This function handles the result
|
|
/// of the batch processor.
|
|
/// If an error is returned the BackFill sync has failed.
|
|
#[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"]
|
|
pub fn on_batch_process_result(
|
|
&mut self,
|
|
network: &mut SyncNetworkContext<T>,
|
|
batch_id: BatchId,
|
|
result: &BatchProcessResult,
|
|
) -> Result<ProcessResult, BackFillError> {
|
|
// The first two cases are possible in regular sync, should not occur in backfill, but we
|
|
// keep this logic for handling potential processing race conditions.
|
|
// result
|
|
let batch = match &self.current_processing_batch {
|
|
Some(processing_id) if *processing_id != batch_id => {
|
|
debug!(
|
|
batch_epoch = %batch_id.as_u64(),
|
|
expected_batch_epoch = processing_id.as_u64(),
|
|
"Unexpected batch result"
|
|
);
|
|
return Ok(ProcessResult::Successful);
|
|
}
|
|
None => {
|
|
debug!(%batch_id, "Chain was not expecting a batch result");
|
|
return Ok(ProcessResult::Successful);
|
|
}
|
|
_ => {
|
|
// batch_id matches, continue
|
|
self.current_processing_batch = None;
|
|
|
|
match self.batches.get_mut(&batch_id) {
|
|
Some(batch) => batch,
|
|
None => {
|
|
// This is an error. Fail the sync algorithm.
|
|
return self
|
|
.fail_sync(BackFillError::InvalidSyncState(format!(
|
|
"Current processing batch not found: {}",
|
|
batch_id
|
|
)))
|
|
.map(|_| ProcessResult::Successful);
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
let Some(peer) = batch.processing_peer() else {
|
|
self.fail_sync(BackFillError::BatchInvalidState(
|
|
batch_id,
|
|
String::from("Peer does not exist"),
|
|
))?;
|
|
return Ok(ProcessResult::Successful);
|
|
};
|
|
|
|
debug!(
|
|
?result,
|
|
%batch,
|
|
batch_epoch = %batch_id,
|
|
%peer,
|
|
client = %network.client_type(peer),
|
|
"Backfill batch processed"
|
|
);
|
|
|
|
match result {
|
|
BatchProcessResult::Success {
|
|
imported_blocks, ..
|
|
} => {
|
|
if let Err(e) = batch.processing_completed(BatchProcessingResult::Success) {
|
|
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?;
|
|
}
|
|
// If the processed batch was not empty, we can validate previous unvalidated
|
|
// blocks.
|
|
if *imported_blocks > 0 {
|
|
self.advance_chain(network, batch_id);
|
|
}
|
|
|
|
if batch_id == self.processing_target {
|
|
self.processing_target = self
|
|
.processing_target
|
|
.saturating_sub(BACKFILL_EPOCHS_PER_BATCH);
|
|
}
|
|
|
|
// check if the chain has completed syncing
|
|
if self.check_completed() {
|
|
// chain is completed
|
|
info!(
|
|
blocks_processed = self.validated_batches * T::EthSpec::slots_per_epoch(),
|
|
"Backfill sync completed"
|
|
);
|
|
self.set_state(BackFillState::Completed);
|
|
Ok(ProcessResult::SyncCompleted)
|
|
} else {
|
|
// chain is not completed
|
|
// attempt to request more batches
|
|
self.request_batches(network)?;
|
|
// attempt to process more batches
|
|
self.process_completed_batches(network)
|
|
}
|
|
}
|
|
BatchProcessResult::FaultyFailure {
|
|
imported_blocks,
|
|
penalty,
|
|
} => {
|
|
match batch.processing_completed(BatchProcessingResult::FaultyFailure) {
|
|
Err(e) => {
|
|
// Batch was in the wrong state
|
|
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))
|
|
.map(|_| ProcessResult::Successful)
|
|
}
|
|
Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
|
|
// check that we have not exceeded the re-process retry counter
|
|
// If a batch has exceeded the invalid batch lookup attempts limit, it means
|
|
// that it is likely all peers are sending invalid batches
|
|
// repeatedly and are either malicious or faulty. We stop the backfill sync and
|
|
// report all synced peers that have participated.
|
|
warn!(
|
|
score_adjustment = %penalty,
|
|
batch_epoch = %batch_id,
|
|
"Backfill batch failed to download. Penalizing peers"
|
|
);
|
|
|
|
for peer in self.participating_peers.drain() {
|
|
// TODO(das): `participating_peers` only includes block peers. Should we
|
|
// penalize the custody column peers too?
|
|
network.report_peer(peer, *penalty, "backfill_batch_failed");
|
|
}
|
|
self.fail_sync(BackFillError::BatchProcessingFailed(batch_id))
|
|
.map(|_| ProcessResult::Successful)
|
|
}
|
|
|
|
Ok(BatchOperationOutcome::Continue) => {
|
|
// chain can continue. Check if it can be progressed
|
|
if *imported_blocks > 0 {
|
|
// At least one block was successfully verified and imported, then we can be sure all
|
|
// previous batches are valid and we only need to download the current failed
|
|
// batch.
|
|
self.advance_chain(network, batch_id);
|
|
}
|
|
// Handle this invalid batch, that is within the re-process retries limit.
|
|
self.handle_invalid_batch(network, batch_id)
|
|
.map(|_| ProcessResult::Successful)
|
|
}
|
|
}
|
|
}
|
|
BatchProcessResult::NonFaultyFailure => {
|
|
if let Err(e) = batch.processing_completed(BatchProcessingResult::NonFaultyFailure)
|
|
{
|
|
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?;
|
|
}
|
|
self.send_batch(network, batch_id)?;
|
|
Ok(ProcessResult::Successful)
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Processes the next ready batch.
|
|
fn process_completed_batches(
|
|
&mut self,
|
|
network: &mut SyncNetworkContext<T>,
|
|
) -> Result<ProcessResult, BackFillError> {
|
|
// Only process batches if backfill is syncing and only process one batch at a time
|
|
if self.state() != BackFillState::Syncing || self.current_processing_batch.is_some() {
|
|
return Ok(ProcessResult::Successful);
|
|
}
|
|
|
|
// Find the id of the batch we are going to process.
|
|
if let Some(batch) = self.batches.get(&self.processing_target) {
|
|
let state = batch.state();
|
|
match state {
|
|
BatchState::AwaitingProcessing(..) => {
|
|
return self.process_batch(network, self.processing_target);
|
|
}
|
|
BatchState::Downloading(..) => {
|
|
// Batch is not ready, nothing to process
|
|
}
|
|
BatchState::Poisoned => unreachable!("Poisoned batch"),
|
|
// Batches can be in `AwaitingDownload` state if there weren't good data column subnet
|
|
// peers to send the request to.
|
|
BatchState::AwaitingDownload => return Ok(ProcessResult::Successful),
|
|
BatchState::Failed | BatchState::Processing(_) => {
|
|
// these are all inconsistent states:
|
|
// - Failed -> non recoverable batch. Chain should have been removed
|
|
// - Processing -> `self.current_processing_batch` is None
|
|
self.fail_sync(BackFillError::InvalidSyncState(String::from(
|
|
"Invalid expected batch state",
|
|
)))?;
|
|
return Ok(ProcessResult::Successful);
|
|
}
|
|
BatchState::AwaitingValidation(_) => {
|
|
// TODO: I don't think this state is possible, log a CRIT just in case.
|
|
// If this is not observed, add it to the failed state branch above.
|
|
crit!(
|
|
batch = ?self.processing_target,
|
|
"Chain encountered a robust batch awaiting validation"
|
|
);
|
|
|
|
self.processing_target -= BACKFILL_EPOCHS_PER_BATCH;
|
|
if self.to_be_downloaded >= self.processing_target {
|
|
self.to_be_downloaded = self.processing_target - BACKFILL_EPOCHS_PER_BATCH;
|
|
}
|
|
self.request_batches(network)?;
|
|
}
|
|
}
|
|
} else {
|
|
self.fail_sync(BackFillError::InvalidSyncState(format!(
|
|
"Batch not found for current processing target {}",
|
|
self.processing_target
|
|
)))?;
|
|
return Ok(ProcessResult::Successful);
|
|
}
|
|
Ok(ProcessResult::Successful)
|
|
}
|
|
|
|
/// Removes any batches previous to the given `validating_epoch` and updates the current
|
|
/// boundaries of the chain.
|
|
///
|
|
/// The `validating_epoch` must align with batch boundaries.
|
|
///
|
|
/// If a previous batch has been validated and it had been re-processed, penalize the original
|
|
/// peer.
|
|
fn advance_chain(&mut self, network: &mut SyncNetworkContext<T>, validating_epoch: Epoch) {
|
|
// make sure this epoch produces an advancement
|
|
if validating_epoch >= self.current_start {
|
|
return;
|
|
}
|
|
|
|
// We can now validate higher batches that the current batch. Here we remove all
|
|
// batches that are higher than the current batch. We add on an extra
|
|
// `BACKFILL_EPOCHS_PER_BATCH` as `split_off` is inclusive.
|
|
let removed_batches = self
|
|
.batches
|
|
.split_off(&(validating_epoch + BACKFILL_EPOCHS_PER_BATCH));
|
|
|
|
for (id, batch) in removed_batches.into_iter() {
|
|
self.validated_batches = self.validated_batches.saturating_add(1);
|
|
// only for batches awaiting validation can we be sure the last attempt is
|
|
// right, and thus, that any different attempt is wrong
|
|
match batch.state() {
|
|
BatchState::AwaitingValidation(processed_attempt) => {
|
|
for attempt in batch.attempts() {
|
|
// The validated batch has been re-processed
|
|
if attempt.hash != processed_attempt.hash {
|
|
// The re-downloaded version was different.
|
|
if processed_attempt.peer_id != attempt.peer_id {
|
|
// A different peer sent the correct batch, the previous peer did not
|
|
// We negatively score the original peer.
|
|
let action = PeerAction::LowToleranceError;
|
|
debug!(
|
|
batch_epoch = ?id,
|
|
score_adjustment = %action,
|
|
original_peer = %attempt.peer_id,
|
|
new_peer = %processed_attempt.peer_id,
|
|
"Re-processed batch validated. Scoring original peer"
|
|
);
|
|
network.report_peer(
|
|
attempt.peer_id,
|
|
action,
|
|
"backfill_reprocessed_original_peer",
|
|
);
|
|
} else {
|
|
// The same peer corrected it's previous mistake. There was an error, so we
|
|
// negative score the original peer.
|
|
let action = PeerAction::MidToleranceError;
|
|
debug!(
|
|
batch_epoch = ?id,
|
|
score_adjustment = %action,
|
|
original_peer = %attempt.peer_id,
|
|
new_peer = %processed_attempt.peer_id,
|
|
"Re-processed batch validated by the same peer"
|
|
);
|
|
network.report_peer(
|
|
attempt.peer_id,
|
|
action,
|
|
"backfill_reprocessed_same_peer",
|
|
);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
BatchState::Downloading(..) => {}
|
|
BatchState::AwaitingDownload => return,
|
|
BatchState::Failed | BatchState::Poisoned => {
|
|
crit!("batch indicates inconsistent chain state while advancing chain")
|
|
}
|
|
BatchState::AwaitingProcessing(..) => {}
|
|
BatchState::Processing(_) => {
|
|
debug!(batch = %id, %batch, "Advancing chain while processing a batch");
|
|
if let Some(processing_id) = self.current_processing_batch
|
|
&& id >= processing_id
|
|
{
|
|
self.current_processing_batch = None;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
self.processing_target = self.processing_target.min(validating_epoch);
|
|
self.current_start = validating_epoch;
|
|
self.to_be_downloaded = self.to_be_downloaded.min(validating_epoch);
|
|
if self.batches.contains_key(&self.to_be_downloaded) {
|
|
// if a chain is advanced by Range beyond the previous `self.to_be_downloaded`, we
|
|
// won't have this batch, so we need to request it.
|
|
self.to_be_downloaded -= BACKFILL_EPOCHS_PER_BATCH;
|
|
}
|
|
debug!(?validating_epoch, processing_target = ?self.processing_target, "Backfill advanced");
|
|
}
|
|
|
|
/// An invalid batch has been received that could not be processed, but that can be retried.
|
|
///
|
|
/// These events occur when a peer has successfully responded with blocks, but the blocks we
|
|
/// have received are incorrect or invalid. This indicates the peer has not performed as
|
|
/// intended and can result in downvoting a peer.
|
|
fn handle_invalid_batch(
|
|
&mut self,
|
|
network: &mut SyncNetworkContext<T>,
|
|
batch_id: BatchId,
|
|
) -> Result<(), BackFillError> {
|
|
// The current batch could not be processed, indicating either the current or previous
|
|
// batches are invalid.
|
|
|
|
// The previous batch could be incomplete due to the block sizes being too large to fit in
|
|
// a single RPC request or there could be consecutive empty batches which are not supposed
|
|
// to be there
|
|
|
|
// The current (sub-optimal) strategy is to simply re-request all batches that could
|
|
// potentially be faulty. If a batch returns a different result than the original and
|
|
// results in successful processing, we downvote the original peer that sent us the batch.
|
|
|
|
// this is our robust `processing_target`. All previous batches must be awaiting
|
|
// validation
|
|
let mut redownload_queue = Vec::new();
|
|
|
|
for (id, batch) in self
|
|
.batches
|
|
.iter_mut()
|
|
.filter(|&(&id, ref _batch)| id > batch_id)
|
|
{
|
|
match batch
|
|
.validation_failed()
|
|
.map_err(|e| BackFillError::BatchInvalidState(batch_id, e.0))?
|
|
{
|
|
BatchOperationOutcome::Failed { blacklist: _ } => {
|
|
// Batch has failed and cannot be redownloaded.
|
|
return self.fail_sync(BackFillError::BatchProcessingFailed(batch_id));
|
|
}
|
|
BatchOperationOutcome::Continue => {
|
|
redownload_queue.push(*id);
|
|
}
|
|
}
|
|
}
|
|
|
|
// no batch maxed out it process attempts, so now the chain's volatile progress must be
|
|
// reset
|
|
self.processing_target = self.current_start;
|
|
|
|
for id in redownload_queue {
|
|
self.send_batch(network, id)?;
|
|
}
|
|
// finally, re-request the failed batch.
|
|
self.send_batch(network, batch_id)
|
|
}
|
|
|
|
/// Requests the batch assigned to the given id from a given peer.
|
|
fn send_batch(
|
|
&mut self,
|
|
network: &mut SyncNetworkContext<T>,
|
|
batch_id: BatchId,
|
|
) -> Result<(), BackFillError> {
|
|
if matches!(self.state(), BackFillState::Paused) {
|
|
return Err(BackFillError::Paused);
|
|
}
|
|
if let Some(batch) = self.batches.get_mut(&batch_id) {
|
|
debug!(?batch_id, "Sending backfill batch");
|
|
let synced_peers = self
|
|
.network_globals
|
|
.peers
|
|
.read()
|
|
.synced_peers_for_epoch(batch_id, None)
|
|
.cloned()
|
|
.collect::<HashSet<_>>();
|
|
|
|
let (request, is_blob_batch) = batch.to_blocks_by_range_request();
|
|
let failed_peers = batch.failed_peers();
|
|
match network.block_components_by_range_request(
|
|
is_blob_batch,
|
|
request,
|
|
RangeRequestId::BackfillSync { batch_id },
|
|
&synced_peers,
|
|
&failed_peers,
|
|
) {
|
|
Ok(request_id) => {
|
|
// inform the batch about the new request
|
|
if let Err(e) = batch.start_downloading(request_id) {
|
|
return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0));
|
|
}
|
|
debug!(epoch = %batch_id, %batch, "Requesting batch");
|
|
|
|
return Ok(());
|
|
}
|
|
Err(e) => match e {
|
|
RpcRequestSendError::NoPeer(no_peer) => {
|
|
// If we are here the chain has no more synced peers
|
|
info!(
|
|
"reason" = format!("insufficient_synced_peers({no_peer:?})"),
|
|
"Backfill sync paused"
|
|
);
|
|
self.set_state(BackFillState::Paused);
|
|
return Err(BackFillError::Paused);
|
|
}
|
|
RpcRequestSendError::InternalError(e) => {
|
|
// NOTE: under normal conditions this shouldn't happen but we handle it anyway
|
|
warn!(%batch_id, error = ?e, %batch,"Could not send batch request");
|
|
// register the failed download and check if the batch can be retried
|
|
if let Err(e) = batch.start_downloading(1) {
|
|
return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0));
|
|
}
|
|
|
|
match batch.download_failed(None) {
|
|
Err(e) => {
|
|
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?
|
|
}
|
|
Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
|
|
self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))?
|
|
}
|
|
Ok(BatchOperationOutcome::Continue) => {
|
|
return self.send_batch(network, batch_id);
|
|
}
|
|
}
|
|
}
|
|
},
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Retries partial column requests within the batch by creating new requests for the failed columns.
|
|
pub fn retry_partial_batch(
|
|
&mut self,
|
|
network: &mut SyncNetworkContext<T>,
|
|
batch_id: BatchId,
|
|
id: Id,
|
|
failed_columns: HashSet<ColumnIndex>,
|
|
mut failed_peers: HashSet<PeerId>,
|
|
) -> Result<(), BackFillError> {
|
|
if let Some(batch) = self.batches.get_mut(&batch_id) {
|
|
failed_peers.extend(&batch.failed_peers());
|
|
let req = batch.to_blocks_by_range_request().0;
|
|
|
|
let synced_peers = network
|
|
.network_globals()
|
|
.peers
|
|
.read()
|
|
.synced_peers_for_epoch(batch_id, None)
|
|
.cloned()
|
|
.collect::<HashSet<_>>();
|
|
|
|
match network.retry_columns_by_range(
|
|
id,
|
|
&synced_peers,
|
|
&failed_peers,
|
|
req,
|
|
&failed_columns,
|
|
) {
|
|
Ok(_) => {
|
|
debug!(
|
|
?batch_id,
|
|
id, "Retried column requests from different peers"
|
|
);
|
|
return Ok(());
|
|
}
|
|
Err(e) => {
|
|
debug!(?batch_id, id, e, "Failed to retry partial batch");
|
|
}
|
|
}
|
|
} else {
|
|
return Err(BackFillError::InvalidSyncState(
|
|
"Batch should exist to be retried".to_string(),
|
|
));
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// When resuming a chain, this function searches for batches that need to be re-downloaded and
|
|
/// transitions their state to redownload the batch.
|
|
fn resume_batches(&mut self, network: &mut SyncNetworkContext<T>) -> Result<(), BackFillError> {
|
|
let batch_ids_to_retry = self
|
|
.batches
|
|
.iter()
|
|
.filter_map(|(batch_id, batch)| {
|
|
// In principle there should only ever be on of these, and we could terminate the
|
|
// loop early, however the processing is negligible and we continue the search
|
|
// for robustness to handle potential future modification
|
|
if matches!(batch.state(), BatchState::AwaitingDownload) {
|
|
Some(*batch_id)
|
|
} else {
|
|
None
|
|
}
|
|
})
|
|
.collect::<Vec<_>>();
|
|
|
|
for batch_id in batch_ids_to_retry {
|
|
self.send_batch(network, batch_id)?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Attempts to request the next required batches from the peer pool if the chain is syncing. It will exhaust the peer
|
|
/// pool and left over batches until the batch buffer is reached or all peers are exhausted.
|
|
fn request_batches(
|
|
&mut self,
|
|
network: &mut SyncNetworkContext<T>,
|
|
) -> Result<(), BackFillError> {
|
|
if !matches!(self.state(), BackFillState::Syncing) {
|
|
return Ok(());
|
|
}
|
|
|
|
// find the next pending batch and request it from the peer
|
|
// Note: for this function to not infinite loop we must:
|
|
// - If `include_next_batch` returns Some we MUST increase the count of batches that are
|
|
// accounted in the `BACKFILL_BATCH_BUFFER_SIZE` limit in the `matches!` statement of
|
|
// that function.
|
|
while let Some(batch_id) = self.include_next_batch(network) {
|
|
// send the batch
|
|
self.send_batch(network, batch_id)?;
|
|
}
|
|
|
|
// No more batches, simply stop
|
|
Ok(())
|
|
}
|
|
|
|
/// Creates the next required batch from the chain. If there are no more batches required,
|
|
/// `false` is returned.
|
|
fn include_next_batch(&mut self, network: &mut SyncNetworkContext<T>) -> Option<BatchId> {
|
|
// don't request batches beyond genesis;
|
|
if self.last_batch_downloaded {
|
|
return None;
|
|
}
|
|
|
|
// only request batches up to the buffer size limit
|
|
// NOTE: we don't count batches in the AwaitingValidation state, to prevent stalling sync
|
|
// if the current processing window is contained in a long range of skip slots.
|
|
let in_buffer = |batch: &BatchInfo<T::EthSpec, BackFillBatchConfig>| {
|
|
matches!(
|
|
batch.state(),
|
|
BatchState::Downloading(..) | BatchState::AwaitingProcessing(..)
|
|
)
|
|
};
|
|
if self
|
|
.batches
|
|
.iter()
|
|
.filter(|&(_epoch, batch)| in_buffer(batch))
|
|
.count()
|
|
> BACKFILL_BATCH_BUFFER_SIZE as usize
|
|
{
|
|
return None;
|
|
}
|
|
|
|
if !self.good_peers_on_sampling_subnets(self.to_be_downloaded, network) {
|
|
debug!("Waiting for peers to be available on custody column subnets");
|
|
return None;
|
|
}
|
|
|
|
let batch_id = self.to_be_downloaded;
|
|
// this batch could have been included already being an optimistic batch
|
|
match self.batches.entry(batch_id) {
|
|
Entry::Occupied(_) => {
|
|
// this batch doesn't need downloading, let this same function decide the next batch
|
|
if self.would_complete(batch_id) {
|
|
self.last_batch_downloaded = true;
|
|
}
|
|
|
|
self.to_be_downloaded = self
|
|
.to_be_downloaded
|
|
.saturating_sub(BACKFILL_EPOCHS_PER_BATCH);
|
|
self.include_next_batch(network)
|
|
}
|
|
Entry::Vacant(entry) => {
|
|
let batch_type = network.batch_type(batch_id);
|
|
entry.insert(BatchInfo::new(
|
|
&batch_id,
|
|
BACKFILL_EPOCHS_PER_BATCH,
|
|
batch_type,
|
|
));
|
|
if self.would_complete(batch_id) {
|
|
self.last_batch_downloaded = true;
|
|
}
|
|
self.to_be_downloaded = self
|
|
.to_be_downloaded
|
|
.saturating_sub(BACKFILL_EPOCHS_PER_BATCH);
|
|
Some(batch_id)
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Checks all sampling column subnets for peers. Returns `true` if there is at least one peer in
|
|
/// every sampling column subnet.
|
|
///
|
|
/// Returns `true` if peerdas isn't enabled for the epoch.
|
|
fn good_peers_on_sampling_subnets(
|
|
&self,
|
|
epoch: Epoch,
|
|
network: &SyncNetworkContext<T>,
|
|
) -> bool {
|
|
if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) {
|
|
// Require peers on all sampling column subnets before sending batches
|
|
network
|
|
.network_globals()
|
|
.sampling_subnets()
|
|
.iter()
|
|
.all(|subnet_id| {
|
|
let min_peer_count = 1;
|
|
network
|
|
.network_globals()
|
|
.peers
|
|
.read()
|
|
.has_good_peers_in_custody_subnet(subnet_id, min_peer_count)
|
|
})
|
|
} else {
|
|
true
|
|
}
|
|
}
|
|
|
|
/// Resets the start epoch based on the beacon chain.
|
|
///
|
|
/// This errors if the beacon chain indicates that backfill sync has already completed or is
|
|
/// not required.
|
|
fn reset_start_epoch(&mut self) -> Result<(), ResetEpochError> {
|
|
let anchor_info = self.beacon_chain.store.get_anchor_info();
|
|
if anchor_info.block_backfill_complete(self.beacon_chain.genesis_backfill_slot) {
|
|
Err(ResetEpochError::SyncCompleted)
|
|
} else {
|
|
self.current_start = anchor_info
|
|
.oldest_block_slot
|
|
.epoch(T::EthSpec::slots_per_epoch());
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
/// Checks with the beacon chain if backfill sync has completed.
|
|
fn check_completed(&mut self) -> bool {
|
|
if self.would_complete(self.current_start) {
|
|
// Check that the beacon chain agrees
|
|
let anchor_info = self.beacon_chain.store.get_anchor_info();
|
|
// Conditions that we have completed a backfill sync
|
|
if anchor_info.block_backfill_complete(self.beacon_chain.genesis_backfill_slot) {
|
|
return true;
|
|
} else {
|
|
error!("Backfill out of sync with beacon chain");
|
|
}
|
|
}
|
|
false
|
|
}
|
|
|
|
/// Checks if backfill would complete by syncing to `start_epoch`.
|
|
fn would_complete(&self, start_epoch: Epoch) -> bool {
|
|
start_epoch
|
|
<= self
|
|
.beacon_chain
|
|
.genesis_backfill_slot
|
|
.epoch(T::EthSpec::slots_per_epoch())
|
|
}
|
|
|
|
/// Updates the global network state indicating the current state of a backfill sync.
|
|
fn set_state(&self, state: BackFillState) {
|
|
*self.network_globals.backfill_state.write() = state;
|
|
}
|
|
|
|
fn state(&self) -> BackFillState {
|
|
self.network_globals.backfill_state.read().clone()
|
|
}
|
|
}
|
|
|
|
/// Error kind for attempting to restart the sync from beacon chain parameters.
|
|
enum ResetEpochError {
|
|
/// The chain has already completed.
|
|
SyncCompleted,
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use beacon_chain::test_utils::BeaconChainHarness;
|
|
use bls::Hash256;
|
|
use lighthouse_network::{NetworkConfig, SyncInfo, SyncStatus};
|
|
use rand_08::SeedableRng;
|
|
use rand_08::prelude::StdRng;
|
|
use types::MinimalEthSpec;
|
|
|
|
#[test]
|
|
fn request_batches_should_not_loop_infinitely() {
|
|
let harness = BeaconChainHarness::builder(MinimalEthSpec)
|
|
.default_spec()
|
|
.deterministic_keypairs(4)
|
|
.fresh_ephemeral_store()
|
|
.build();
|
|
|
|
let beacon_chain = harness.chain.clone();
|
|
let slots_per_epoch = MinimalEthSpec::slots_per_epoch();
|
|
|
|
let network_globals = Arc::new(NetworkGlobals::new_test_globals(
|
|
vec![],
|
|
Arc::new(NetworkConfig::default()),
|
|
beacon_chain.spec.clone(),
|
|
));
|
|
|
|
{
|
|
let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64);
|
|
let peer_id = network_globals
|
|
.peers
|
|
.write()
|
|
.__add_connected_peer_testing_only(
|
|
true,
|
|
&beacon_chain.spec,
|
|
k256::ecdsa::SigningKey::random(&mut rng).into(),
|
|
);
|
|
|
|
// Simulate finalized epoch and head being 2 epochs ahead
|
|
let finalized_epoch = Epoch::new(40);
|
|
let head_epoch = finalized_epoch + 2;
|
|
let head_slot = head_epoch.start_slot(slots_per_epoch) + 1;
|
|
|
|
network_globals.peers.write().update_sync_status(
|
|
&peer_id,
|
|
SyncStatus::Synced {
|
|
info: SyncInfo {
|
|
head_slot,
|
|
head_root: Hash256::random(),
|
|
finalized_epoch,
|
|
finalized_root: Hash256::random(),
|
|
earliest_available_slot: None,
|
|
},
|
|
},
|
|
);
|
|
}
|
|
|
|
let mut network = SyncNetworkContext::new_for_testing(
|
|
beacon_chain.clone(),
|
|
network_globals.clone(),
|
|
harness.runtime.task_executor.clone(),
|
|
);
|
|
|
|
let mut backfill = BackFillSync::new(beacon_chain, network_globals);
|
|
backfill.set_state(BackFillState::Syncing);
|
|
|
|
// if this ends up running into an infinite loop, the test will overflow the stack pretty quickly.
|
|
let _ = backfill.request_batches(&mut network);
|
|
}
|
|
}
|