Files
lighthouse/beacon_node/network/src/sync/range_sync/chain.rs
Michael Sproul affea585f4 Remove CountUnrealized (#4357)
## Issue Addressed

Closes #4332

## Proposed Changes

Remove the `CountUnrealized` type, defaulting unrealized justification to _on_. This fixes the #4332 issue by ensuring that importing the same block to fork choice always results in the same outcome.

Finalized sync speed may be slightly impacted by this change, but that is deemed an acceptable trade-off until the optimisation from #4118 is implemented.

TODO:

- [x] Also check that the block isn't a duplicate before importing
2023-06-16 06:44:31 +00:00

1119 lines
47 KiB
Rust

use super::batch::{BatchInfo, BatchProcessingResult, BatchState};
use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent};
use crate::sync::{
manager::Id, network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult,
};
use beacon_chain::BeaconChainTypes;
use fnv::FnvHashMap;
use lighthouse_network::{PeerAction, PeerId};
use rand::seq::SliceRandom;
use slog::{crit, debug, o, warn};
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
/// blocks per batch are requested _at most_. A batch may request less blocks to account for
/// already requested slots. There is a timeout for each batch request. If this value is too high,
/// we will negatively report peers with poor bandwidth. This can be set arbitrarily high, in which
/// case the responder will fill the response up to the max request size, assuming they have the
/// bandwidth to do so.
pub const EPOCHS_PER_BATCH: u64 = 2;
/// The maximum number of batches to queue before requesting more.
const BATCH_BUFFER_SIZE: u8 = 5;
/// A return type for functions that act on a `Chain` which informs the caller whether the chain
/// has been completed and should be removed or to be kept if further processing is
/// required.
///
/// Should be checked, since a failed chain must be removed. A chain that requested being removed
/// and continued is now in an inconsistent state.
pub type ProcessingResult = Result<KeepChain, RemoveChain>;
/// Reasons for removing a chain
#[derive(Debug)]
pub enum RemoveChain {
EmptyPeerPool,
ChainCompleted,
/// A chain has failed. This boolean signals whether the chain should be blacklisted.
ChainFailed {
blacklist: bool,
failing_batch: BatchId,
},
WrongBatchState(String),
WrongChainState(String),
}
#[derive(Debug)]
pub struct KeepChain;
/// A chain identifier
pub type ChainId = u64;
pub type BatchId = Epoch;
/// A chain of blocks that need to be downloaded. Peers who claim to contain the target head
/// root are grouped into the peer pool and queried for batches when downloading the
/// chain.
pub struct SyncingChain<T: BeaconChainTypes> {
/// A random id used to identify this chain.
id: ChainId,
/// The start of the chain segment. Any epoch previous to this one has been validated.
pub start_epoch: Epoch,
/// The target head slot.
pub target_head_slot: Slot,
/// The target head root.
pub target_head_root: Hash256,
/// Sorted map of batches undergoing some kind of processing.
batches: BTreeMap<BatchId, BatchInfo<T::EthSpec>>,
/// The peers that agree on the `target_head_slot` and `target_head_root` as a canonical chain
/// and thus available to download this chain from, as well as the batches we are currently
/// requesting.
peers: FnvHashMap<PeerId, HashSet<BatchId>>,
/// Starting epoch of the next batch that needs to be downloaded.
to_be_downloaded: BatchId,
/// Starting epoch of the batch that needs to be processed next.
/// This is incremented as the chain advances.
processing_target: BatchId,
/// Optimistic head to sync.
/// If a block is imported for this batch, the chain advances to this point.
optimistic_start: Option<BatchId>,
/// When a batch for an optimistic start is tried (either successful or not), it is stored to
/// avoid trying it again due to chain stopping/re-starting on chain switching.
attempted_optimistic_starts: HashSet<BatchId>,
/// The current state of the chain.
pub state: ChainSyncingState,
/// The current processing batch, if any.
current_processing_batch: Option<BatchId>,
/// Batches validated by this chain.
validated_batches: u64,
/// The chain's log.
log: slog::Logger,
}
#[derive(PartialEq, Debug)]
pub enum ChainSyncingState {
/// The chain is not being synced.
Stopped,
/// The chain is undergoing syncing.
Syncing,
}
impl<T: BeaconChainTypes> SyncingChain<T> {
pub fn id(target_root: &Hash256, target_slot: &Slot) -> u64 {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
(target_root, target_slot).hash(&mut hasher);
hasher.finish()
}
#[allow(clippy::too_many_arguments)]
pub fn new(
start_epoch: Epoch,
target_head_slot: Slot,
target_head_root: Hash256,
peer_id: PeerId,
log: &slog::Logger,
) -> Self {
let mut peers = FnvHashMap::default();
peers.insert(peer_id, Default::default());
let id = SyncingChain::<T>::id(&target_head_root, &target_head_slot);
SyncingChain {
id,
start_epoch,
target_head_slot,
target_head_root,
batches: BTreeMap::new(),
peers,
to_be_downloaded: start_epoch,
processing_target: start_epoch,
optimistic_start: None,
attempted_optimistic_starts: HashSet::default(),
state: ChainSyncingState::Stopped,
current_processing_batch: None,
validated_batches: 0,
log: log.new(o!("chain" => id)),
}
}
/// Check if the chain has peers from which to process batches.
pub fn available_peers(&self) -> usize {
self.peers.len()
}
/// Get the chain's id.
pub fn get_id(&self) -> ChainId {
self.id
}
/// Peers currently syncing this chain.
pub fn peers(&self) -> impl Iterator<Item = PeerId> + '_ {
self.peers.keys().cloned()
}
/// Progress in epochs made by the chain
pub fn validated_epochs(&self) -> u64 {
self.validated_batches * EPOCHS_PER_BATCH
}
/// Removes a peer from the chain.
/// If the peer has active batches, those are considered failed and re-requested.
pub fn remove_peer(
&mut self,
peer_id: &PeerId,
network: &mut SyncNetworkContext<T>,
) -> ProcessingResult {
if let Some(batch_ids) = self.peers.remove(peer_id) {
// fail the batches
for id in batch_ids {
if let Some(batch) = self.batches.get_mut(&id) {
if let BatchOperationOutcome::Failed { blacklist } =
batch.download_failed(true)?
{
return Err(RemoveChain::ChainFailed {
blacklist,
failing_batch: id,
});
}
self.retry_batch_download(network, id)?;
} else {
debug!(self.log, "Batch not found while removing peer";
"peer" => %peer_id, "batch" => id)
}
}
}
if self.peers.is_empty() {
Err(RemoveChain::EmptyPeerPool)
} else {
Ok(KeepChain)
}
}
/// Returns the latest slot number that has been processed.
fn current_processed_slot(&self) -> Slot {
// the last slot we processed was included in the previous batch, and corresponds to the
// first slot of the current target epoch
self.processing_target
.start_slot(T::EthSpec::slots_per_epoch())
}
/// A block has been received for a batch on this chain.
/// If the block correctly completes the batch it will be processed if possible.
pub fn on_block_response(
&mut self,
network: &mut SyncNetworkContext<T>,
batch_id: BatchId,
peer_id: &PeerId,
request_id: Id,
beacon_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
) -> ProcessingResult {
// check if we have this batch
let batch = match self.batches.get_mut(&batch_id) {
None => {
debug!(self.log, "Received a block for unknown batch"; "epoch" => batch_id);
// A batch might get removed when the chain advances, so this is non fatal.
return Ok(KeepChain);
}
Some(batch) => {
// A batch could be retried without the peer failing the request (disconnecting/
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer, and that the
// request_id matches
if !batch.is_expecting_block(peer_id, &request_id) {
return Ok(KeepChain);
}
batch
}
};
if let Some(block) = beacon_block {
// This is not a stream termination, simply add the block to the request
batch.add_block(block)?;
Ok(KeepChain)
} else {
// A stream termination has been sent. This batch has ended. Process a completed batch.
// Remove the request from the peer's active batches
self.peers
.get_mut(peer_id)
.map(|active_requests| active_requests.remove(&batch_id));
match batch.download_completed() {
Ok(received) => {
let awaiting_batches = batch_id
.saturating_sub(self.optimistic_start.unwrap_or(self.processing_target))
/ EPOCHS_PER_BATCH;
debug!(self.log, "Completed batch received"; "epoch" => batch_id, "blocks" => received, "awaiting_batches" => awaiting_batches);
// pre-emptively request more blocks from peers whilst we process current blocks,
self.request_batches(network)?;
self.process_completed_batches(network)
}
Err(result) => {
let (expected_boundary, received_boundary, outcome) = result?;
warn!(self.log, "Batch received out of range blocks"; "expected_boundary" => expected_boundary, "received_boundary" => received_boundary,
"peer_id" => %peer_id, batch);
if let BatchOperationOutcome::Failed { blacklist } = outcome {
return Err(RemoveChain::ChainFailed {
blacklist,
failing_batch: batch_id,
});
}
// this batch can't be used, so we need to request it again.
self.retry_batch_download(network, batch_id)
}
}
}
}
/// Processes the batch with the given id.
/// The batch must exist and be ready for processing
fn process_batch(
&mut self,
network: &mut SyncNetworkContext<T>,
batch_id: BatchId,
) -> ProcessingResult {
// Only process batches if this chain is Syncing, and only one at a time
if self.state != ChainSyncingState::Syncing || self.current_processing_batch.is_some() {
return Ok(KeepChain);
}
let beacon_processor_send = match network.processor_channel_if_enabled() {
Some(channel) => channel,
None => return Ok(KeepChain),
};
let batch = match self.batches.get_mut(&batch_id) {
Some(batch) => batch,
None => {
return Err(RemoveChain::WrongChainState(format!(
"Trying to process a batch that does not exist: {}",
batch_id
)));
}
};
// NOTE: We send empty batches to the processor in order to trigger the block processor
// result callback. This is done, because an empty batch could end a chain and the logic
// for removing chains and checking completion is in the callback.
let blocks = batch.start_processing()?;
let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id);
self.current_processing_batch = Some(batch_id);
if let Err(e) =
beacon_processor_send.try_send(BeaconWorkEvent::chain_segment(process_id, blocks))
{
crit!(self.log, "Failed to send chain segment to processor."; "msg" => "process_batch",
"error" => %e, "batch" => self.processing_target);
// This is unlikely to happen but it would stall syncing since the batch now has no
// blocks to continue, and the chain is expecting a processing result that won't
// arrive. To mitigate this, (fake) fail this processing so that the batch is
// re-downloaded.
self.on_batch_process_result(network, batch_id, &BatchProcessResult::NonFaultyFailure)
} else {
Ok(KeepChain)
}
}
/// Processes the next ready batch, prioritizing optimistic batches over the processing target.
fn process_completed_batches(
&mut self,
network: &mut SyncNetworkContext<T>,
) -> ProcessingResult {
// Only process batches if this chain is Syncing and only process one batch at a time
if self.state != ChainSyncingState::Syncing || self.current_processing_batch.is_some() {
return Ok(KeepChain);
}
// Find the id of the batch we are going to process.
//
// First try our optimistic start, if any. If this batch is ready, we process it. If the
// batch has not already been completed, check the current chain target.
if let Some(epoch) = self.optimistic_start {
if let Some(batch) = self.batches.get(&epoch) {
let state = batch.state();
match state {
BatchState::AwaitingProcessing(..) => {
// this batch is ready
debug!(self.log, "Processing optimistic start"; "epoch" => epoch);
return self.process_batch(network, epoch);
}
BatchState::Downloading(..) => {
// The optimistic batch is being downloaded. We wait for this before
// attempting to process other batches.
return Ok(KeepChain);
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
BatchState::Processing(_)
| BatchState::AwaitingDownload
| BatchState::Failed => {
// these are all inconsistent states:
// - Processing -> `self.current_processing_batch` is None
// - Failed -> non recoverable batch. For an optimistic batch, it should
// have been removed
// - AwaitingDownload -> A recoverable failed batch should have been
// re-requested.
return Err(RemoveChain::WrongChainState(format!(
"Optimistic batch indicates inconsistent chain state: {:?}",
state
)));
}
BatchState::AwaitingValidation(_) => {
// If an optimistic start is given to the chain after the corresponding
// batch has been requested and processed we can land here. We drop the
// optimistic candidate since we can't conclude whether the batch included
// blocks or not at this point
debug!(self.log, "Dropping optimistic candidate"; "batch" => epoch);
self.optimistic_start = None;
}
}
}
}
// if the optimistic target can't be processed, check the processing target
if let Some(batch) = self.batches.get(&self.processing_target) {
let state = batch.state();
match state {
BatchState::AwaitingProcessing(..) => {
return self.process_batch(network, self.processing_target);
}
BatchState::Downloading(..) => {
// Batch is not ready, nothing to process
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
BatchState::Failed | BatchState::AwaitingDownload | BatchState::Processing(_) => {
// these are all inconsistent states:
// - Failed -> non recoverable batch. Chain should have beee removed
// - AwaitingDownload -> A recoverable failed batch should have been
// re-requested.
// - Processing -> `self.current_processing_batch` is None
return Err(RemoveChain::WrongChainState(format!(
"Robust target batch indicates inconsistent chain state: {:?}",
state
)));
}
BatchState::AwaitingValidation(_) => {
// we can land here if an empty optimistic batch succeeds processing and is
// inside the download buffer (between `self.processing_target` and
// `self.to_be_downloaded`). In this case, eventually the chain advances to the
// batch (`self.processing_target` reaches this point).
debug!(self.log, "Chain encountered a robust batch awaiting validation"; "batch" => self.processing_target);
self.processing_target += EPOCHS_PER_BATCH;
if self.to_be_downloaded <= self.processing_target {
self.to_be_downloaded = self.processing_target + EPOCHS_PER_BATCH;
}
self.request_batches(network)?;
}
}
} else {
return Err(RemoveChain::WrongChainState(format!(
"Batch not found for current processing target {}",
self.processing_target
)));
}
Ok(KeepChain)
}
/// The block processor has completed processing a batch. This function handles the result
/// of the batch processor.
pub fn on_batch_process_result(
&mut self,
network: &mut SyncNetworkContext<T>,
batch_id: BatchId,
result: &BatchProcessResult,
) -> ProcessingResult {
// the first two cases are possible if the chain advances while waiting for a processing
// result
let batch = match &self.current_processing_batch {
Some(processing_id) if *processing_id != batch_id => {
debug!(self.log, "Unexpected batch result";
"batch_epoch" => batch_id, "expected_batch_epoch" => processing_id);
return Ok(KeepChain);
}
None => {
debug!(self.log, "Chain was not expecting a batch result";
"batch_epoch" => batch_id);
return Ok(KeepChain);
}
_ => {
// batch_id matches, continue
self.current_processing_batch = None;
self.batches.get_mut(&batch_id).ok_or_else(|| {
RemoveChain::WrongChainState(format!(
"Current processing batch not found: {}",
batch_id
))
})?
}
};
let peer = batch.current_peer().cloned().ok_or_else(|| {
RemoveChain::WrongBatchState(format!(
"Processing target is in wrong state: {:?}",
batch.state(),
))
})?;
// Log the process result and the batch for debugging purposes.
debug!(self.log, "Batch processing result"; "result" => ?result, &batch,
"batch_epoch" => batch_id, "client" => %network.client_type(&peer));
// We consider three cases. Batch was successfully processed, Batch failed processing due
// to a faulty peer, or batch failed processing but the peer can't be deemed faulty.
match result {
BatchProcessResult::Success { was_non_empty } => {
batch.processing_completed(BatchProcessingResult::Success)?;
if *was_non_empty {
// If the processed batch was not empty, we can validate previous unvalidated
// blocks.
self.advance_chain(network, batch_id);
// we register so that on chain switching we don't try it again
self.attempted_optimistic_starts.insert(batch_id);
} else if self.optimistic_start == Some(batch_id) {
// check if this batch corresponds to an optimistic batch. In this case, we
// reject it as an optimistic candidate since the batch was empty
self.reject_optimistic_batch(
network,
false, /* do not re-request */
"batch was empty",
)?;
}
if batch_id == self.processing_target {
self.processing_target += EPOCHS_PER_BATCH;
}
// check if the chain has completed syncing
if self.current_processed_slot() >= self.target_head_slot {
// chain is completed
Err(RemoveChain::ChainCompleted)
} else {
// chain is not completed
// attempt to request more batches
self.request_batches(network)?;
// attempt to process more batches
self.process_completed_batches(network)
}
}
BatchProcessResult::FaultyFailure {
imported_blocks,
penalty,
} => {
// Penalize the peer appropiately.
network.report_peer(peer, *penalty, "faulty_batch");
// Check if this batch is allowed to continue
match batch.processing_completed(BatchProcessingResult::FaultyFailure)? {
BatchOperationOutcome::Continue => {
// Chain can continue. Check if it can be moved forward.
if *imported_blocks {
// At least one block was successfully verified and imported, so we can be sure all
// previous batches are valid and we only need to download the current failed
// batch.
self.advance_chain(network, batch_id);
}
// Handle this invalid batch, that is within the re-process retries limit.
self.handle_invalid_batch(network, batch_id)
}
BatchOperationOutcome::Failed { blacklist } => {
// Check that we have not exceeded the re-process retry counter,
// If a batch has exceeded the invalid batch lookup attempts limit, it means
// that it is likely all peers in this chain are are sending invalid batches
// repeatedly and are either malicious or faulty. We drop the chain and
// report all peers.
// There are some edge cases with forks that could land us in this situation.
// This should be unlikely, so we tolerate these errors, but not often.
warn!(
self.log,
"Batch failed to download. Dropping chain scoring peers";
"score_adjustment" => %penalty,
"batch_epoch"=> batch_id,
);
for (peer, _) in self.peers.drain() {
network.report_peer(peer, *penalty, "faulty_chain");
}
Err(RemoveChain::ChainFailed {
blacklist,
failing_batch: batch_id,
})
}
}
}
BatchProcessResult::NonFaultyFailure => {
batch.processing_completed(BatchProcessingResult::NonFaultyFailure)?;
// Simply redownload the batch.
self.retry_batch_download(network, batch_id)
}
}
}
fn reject_optimistic_batch(
&mut self,
network: &mut SyncNetworkContext<T>,
redownload: bool,
reason: &str,
) -> ProcessingResult {
if let Some(epoch) = self.optimistic_start.take() {
self.attempted_optimistic_starts.insert(epoch);
// if this batch is inside the current processing range, keep it, otherwise drop
// it. NOTE: this is done to prevent non-sequential batches coming from optimistic
// starts from filling up the buffer size
if epoch < self.to_be_downloaded {
debug!(self.log, "Rejected optimistic batch left for future use"; "epoch" => %epoch, "reason" => reason);
// this batch is now treated as any other batch, and re-requested for future use
if redownload {
return self.retry_batch_download(network, epoch);
}
} else {
debug!(self.log, "Rejected optimistic batch"; "epoch" => %epoch, "reason" => reason);
self.batches.remove(&epoch);
}
}
Ok(KeepChain)
}
/// Removes any batches previous to the given `validating_epoch` and updates the current
/// boundaries of the chain.
///
/// The `validating_epoch` must align with batch boundaries.
///
/// If a previous batch has been validated and it had been re-processed, penalize the original
/// peer.
fn advance_chain(&mut self, network: &mut SyncNetworkContext<T>, validating_epoch: Epoch) {
// make sure this epoch produces an advancement
if validating_epoch <= self.start_epoch {
return;
}
// safety check for batch boundaries
if validating_epoch % EPOCHS_PER_BATCH != self.start_epoch % EPOCHS_PER_BATCH {
crit!(self.log, "Validating Epoch is not aligned");
return;
}
// batches in the range [BatchId, ..) (not yet validated)
let remaining_batches = self.batches.split_off(&validating_epoch);
// batches less than `validating_epoch`
let removed_batches = std::mem::replace(&mut self.batches, remaining_batches);
for (id, batch) in removed_batches.into_iter() {
self.validated_batches = self.validated_batches.saturating_add(1);
// only for batches awaiting validation can we be sure the last attempt is
// right, and thus, that any different attempt is wrong
match batch.state() {
BatchState::AwaitingValidation(ref processed_attempt) => {
for attempt in batch.attempts() {
// The validated batch has been re-processed
if attempt.hash != processed_attempt.hash {
// The re-downloaded version was different
if processed_attempt.peer_id != attempt.peer_id {
// A different peer sent the correct batch, the previous peer did not
// We negatively score the original peer.
let action = PeerAction::LowToleranceError;
debug!(self.log, "Re-processed batch validated. Scoring original peer";
"batch_epoch" => id, "score_adjustment" => %action,
"original_peer" => %attempt.peer_id, "new_peer" => %processed_attempt.peer_id
);
network.report_peer(
attempt.peer_id,
action,
"batch_reprocessed_original_peer",
);
} else {
// The same peer corrected it's previous mistake. There was an error, so we
// negative score the original peer.
let action = PeerAction::MidToleranceError;
debug!(self.log, "Re-processed batch validated by the same peer";
"batch_epoch" => id, "score_adjustment" => %action,
"original_peer" => %attempt.peer_id, "new_peer" => %processed_attempt.peer_id
);
network.report_peer(
attempt.peer_id,
action,
"batch_reprocessed_same_peer",
);
}
}
}
}
BatchState::Downloading(peer, ..) => {
// remove this batch from the peer's active requests
if let Some(active_batches) = self.peers.get_mut(peer) {
active_batches.remove(&id);
}
}
BatchState::Failed | BatchState::Poisoned | BatchState::AwaitingDownload => crit!(
self.log,
"batch indicates inconsistent chain state while advancing chain"
),
BatchState::AwaitingProcessing(..) => {}
BatchState::Processing(_) => {
debug!(self.log, "Advancing chain while processing a batch"; "batch" => id, batch);
if let Some(processing_id) = self.current_processing_batch {
if id <= processing_id {
self.current_processing_batch = None;
}
}
}
}
}
self.processing_target = self.processing_target.max(validating_epoch);
let old_start = self.start_epoch;
self.start_epoch = validating_epoch;
self.to_be_downloaded = self.to_be_downloaded.max(validating_epoch);
if self.batches.contains_key(&self.to_be_downloaded) {
// if a chain is advanced by Range beyond the previous `self.to_be_downloaded`, we
// won't have this batch, so we need to request it.
self.to_be_downloaded += EPOCHS_PER_BATCH;
}
if let Some(epoch) = self.optimistic_start {
if epoch <= validating_epoch {
self.optimistic_start = None;
}
}
debug!(self.log, "Chain advanced"; "previous_start" => old_start,
"new_start" => self.start_epoch, "processing_target" => self.processing_target);
}
/// An invalid batch has been received that could not be processed, but that can be retried.
///
/// These events occur when a peer has successfully responded with blocks, but the blocks we
/// have received are incorrect or invalid. This indicates the peer has not performed as
/// intended and can result in downvoting a peer.
fn handle_invalid_batch(
&mut self,
network: &mut SyncNetworkContext<T>,
batch_id: BatchId,
) -> ProcessingResult {
// The current batch could not be processed, indicating either the current or previous
// batches are invalid.
// The previous batch could be incomplete due to the block sizes being too large to fit in
// a single RPC request or there could be consecutive empty batches which are not supposed
// to be there
// The current (sub-optimal) strategy is to simply re-request all batches that could
// potentially be faulty. If a batch returns a different result than the original and
// results in successful processing, we downvote the original peer that sent us the batch.
if let Some(epoch) = self.optimistic_start {
// If this batch is an optimistic batch, we reject this epoch as an optimistic
// candidate and try to re download it
if epoch == batch_id {
return self.reject_optimistic_batch(network, true, "batch was invalid");
// since this is the optimistic batch, we can't consider previous batches as
// invalid.
}
}
// this is our robust `processing_target`. All previous batches must be awaiting
// validation
let mut redownload_queue = Vec::new();
for (id, batch) in self.batches.range_mut(..batch_id) {
if let BatchOperationOutcome::Failed { blacklist } = batch.validation_failed()? {
// remove the chain early
return Err(RemoveChain::ChainFailed {
blacklist,
failing_batch: *id,
});
}
redownload_queue.push(*id);
}
// no batch maxed out it process attempts, so now the chain's volatile progress must be
// reset
self.processing_target = self.start_epoch;
for id in redownload_queue {
self.retry_batch_download(network, id)?;
}
// finally, re-request the failed batch.
self.retry_batch_download(network, batch_id)
}
pub fn stop_syncing(&mut self) {
self.state = ChainSyncingState::Stopped;
}
/// Either a new chain, or an old one with a peer list
/// This chain has been requested to start syncing.
///
/// This could be new chain, or an old chain that is being resumed.
pub fn start_syncing(
&mut self,
network: &mut SyncNetworkContext<T>,
local_finalized_epoch: Epoch,
optimistic_start_epoch: Epoch,
) -> ProcessingResult {
// to avoid dropping local progress, we advance the chain wrt its batch boundaries. This
let align = |epoch| {
// start_epoch + (number of batches in between)*length_of_batch
self.start_epoch + ((epoch - self.start_epoch) / EPOCHS_PER_BATCH) * EPOCHS_PER_BATCH
};
// get the *aligned* epoch that produces a batch containing the `local_finalized_epoch`
let validating_epoch = align(local_finalized_epoch);
// align the optimistic_start too.
let optimistic_epoch = align(optimistic_start_epoch);
// advance the chain to the new validating epoch
self.advance_chain(network, validating_epoch);
if self.optimistic_start.is_none()
&& optimistic_epoch > self.processing_target
&& !self.attempted_optimistic_starts.contains(&optimistic_epoch)
{
self.optimistic_start = Some(optimistic_epoch);
}
// update the state
self.state = ChainSyncingState::Syncing;
// begin requesting blocks from the peer pool, until all peers are exhausted.
self.request_batches(network)?;
// start processing batches if needed
self.process_completed_batches(network)
}
/// Add a peer to the chain.
///
/// If the chain is active, this starts requesting batches from this peer.
pub fn add_peer(
&mut self,
network: &mut SyncNetworkContext<T>,
peer_id: PeerId,
) -> ProcessingResult {
// add the peer without overwriting its active requests
if self.peers.entry(peer_id).or_default().is_empty() {
// Either new or not, this peer is idle, try to request more batches
self.request_batches(network)
} else {
Ok(KeepChain)
}
}
/// An RPC error has occurred.
///
/// If the batch exists it is re-requested.
pub fn inject_error(
&mut self,
network: &mut SyncNetworkContext<T>,
batch_id: BatchId,
peer_id: &PeerId,
request_id: Id,
) -> ProcessingResult {
if let Some(batch) = self.batches.get_mut(&batch_id) {
// A batch could be retried without the peer failing the request (disconnecting/
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer
if !batch.is_expecting_block(peer_id, &request_id) {
return Ok(KeepChain);
}
debug!(self.log, "Batch failed. RPC Error"; "batch_epoch" => batch_id);
if let Some(active_requests) = self.peers.get_mut(peer_id) {
active_requests.remove(&batch_id);
}
if let BatchOperationOutcome::Failed { blacklist } = batch.download_failed(true)? {
return Err(RemoveChain::ChainFailed {
blacklist,
failing_batch: batch_id,
});
}
self.retry_batch_download(network, batch_id)
} else {
// this could be an error for an old batch, removed when the chain advances
Ok(KeepChain)
}
}
/// Sends and registers the request of a batch awaiting download.
pub fn retry_batch_download(
&mut self,
network: &mut SyncNetworkContext<T>,
batch_id: BatchId,
) -> ProcessingResult {
let batch = match self.batches.get_mut(&batch_id) {
Some(batch) => batch,
None => return Ok(KeepChain),
};
// Find a peer to request the batch
let failed_peers = batch.failed_peers();
let new_peer = {
let mut priorized_peers = self
.peers
.iter()
.map(|(peer, requests)| (failed_peers.contains(peer), requests.len(), *peer))
.collect::<Vec<_>>();
// Sort peers prioritizing unrelated peers with less active requests.
priorized_peers.sort_unstable();
priorized_peers.get(0).map(|&(_, _, peer)| peer)
};
if let Some(peer) = new_peer {
self.send_batch(network, batch_id, peer)
} else {
// If we are here the chain has no more peers
Err(RemoveChain::EmptyPeerPool)
}
}
/// Requests the batch assigned to the given id from a given peer.
pub fn send_batch(
&mut self,
network: &mut SyncNetworkContext<T>,
batch_id: BatchId,
peer: PeerId,
) -> ProcessingResult {
if let Some(batch) = self.batches.get_mut(&batch_id) {
let request = batch.to_blocks_by_range_request();
match network.blocks_by_range_request(peer, request, self.id, batch_id) {
Ok(request_id) => {
// inform the batch about the new request
batch.start_downloading_from_peer(peer, request_id)?;
if self
.optimistic_start
.map(|epoch| epoch == batch_id)
.unwrap_or(false)
{
debug!(self.log, "Requesting optimistic batch"; "epoch" => batch_id, &batch);
} else {
debug!(self.log, "Requesting batch"; "epoch" => batch_id, &batch);
}
// register the batch for this peer
return self
.peers
.get_mut(&peer)
.map(|requests| {
requests.insert(batch_id);
Ok(KeepChain)
})
.unwrap_or_else(|| {
Err(RemoveChain::WrongChainState(format!(
"Sending batch to a peer that is not in the chain: {}",
peer
)))
});
}
Err(e) => {
// NOTE: under normal conditions this shouldn't happen but we handle it anyway
warn!(self.log, "Could not send batch request";
"batch_id" => batch_id, "error" => e, &batch);
// register the failed download and check if the batch can be retried
batch.start_downloading_from_peer(peer, 1)?; // fake request_id is not relevant
self.peers
.get_mut(&peer)
.map(|request| request.remove(&batch_id));
match batch.download_failed(true)? {
BatchOperationOutcome::Failed { blacklist } => {
return Err(RemoveChain::ChainFailed {
blacklist,
failing_batch: batch_id,
})
}
BatchOperationOutcome::Continue => {
return self.retry_batch_download(network, batch_id)
}
}
}
}
}
Ok(KeepChain)
}
/// Returns true if this chain is currently syncing.
pub fn is_syncing(&self) -> bool {
match self.state {
ChainSyncingState::Syncing => true,
ChainSyncingState::Stopped => false,
}
}
/// Kickstarts the chain by sending for processing batches that are ready and requesting more
/// batches if needed.
pub fn resume(
&mut self,
network: &mut SyncNetworkContext<T>,
) -> Result<KeepChain, RemoveChain> {
// Request more batches if needed.
self.request_batches(network)?;
// If there is any batch ready for processing, send it.
self.process_completed_batches(network)
}
/// Attempts to request the next required batches from the peer pool if the chain is syncing. It will exhaust the peer
/// pool and left over batches until the batch buffer is reached or all peers are exhausted.
fn request_batches(&mut self, network: &mut SyncNetworkContext<T>) -> ProcessingResult {
if !matches!(self.state, ChainSyncingState::Syncing) {
return Ok(KeepChain);
}
// find the next pending batch and request it from the peer
// randomize the peers for load balancing
let mut rng = rand::thread_rng();
let mut idle_peers = self
.peers
.iter()
.filter_map(|(peer, requests)| {
if requests.is_empty() {
Some(*peer)
} else {
None
}
})
.collect::<Vec<_>>();
idle_peers.shuffle(&mut rng);
// check if we have the batch for our optimistic start. If not, request it first.
// We wait for this batch before requesting any other batches.
if let Some(epoch) = self.optimistic_start {
if let Entry::Vacant(entry) = self.batches.entry(epoch) {
if let Some(peer) = idle_peers.pop() {
let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH);
entry.insert(optimistic_batch);
self.send_batch(network, epoch, peer)?;
}
}
return Ok(KeepChain);
}
while let Some(peer) = idle_peers.pop() {
if let Some(batch_id) = self.include_next_batch() {
// send the batch
self.send_batch(network, batch_id, peer)?;
} else {
// No more batches, simply stop
return Ok(KeepChain);
}
}
Ok(KeepChain)
}
/// Creates the next required batch from the chain. If there are no more batches required,
/// `false` is returned.
fn include_next_batch(&mut self) -> Option<BatchId> {
// don't request batches beyond the target head slot
if self
.to_be_downloaded
.start_slot(T::EthSpec::slots_per_epoch())
>= self.target_head_slot
{
return None;
}
// only request batches up to the buffer size limit
// NOTE: we don't count batches in the AwaitingValidation state, to prevent stalling sync
// if the current processing window is contained in a long range of skip slots.
let in_buffer = |batch: &BatchInfo<T::EthSpec>| {
matches!(
batch.state(),
BatchState::Downloading(..) | BatchState::AwaitingProcessing(..)
)
};
if self
.batches
.iter()
.filter(|&(_epoch, batch)| in_buffer(batch))
.count()
> BATCH_BUFFER_SIZE as usize
{
return None;
}
let batch_id = self.to_be_downloaded;
// this batch could have been included already being an optimistic batch
match self.batches.entry(batch_id) {
Entry::Occupied(_) => {
// this batch doesn't need downloading, let this same function decide the next batch
self.to_be_downloaded += EPOCHS_PER_BATCH;
self.include_next_batch()
}
Entry::Vacant(entry) => {
entry.insert(BatchInfo::new(&batch_id, EPOCHS_PER_BATCH));
self.to_be_downloaded += EPOCHS_PER_BATCH;
Some(batch_id)
}
}
}
}
impl<T: BeaconChainTypes> slog::KV for &mut SyncingChain<T> {
fn serialize(
&self,
record: &slog::Record,
serializer: &mut dyn slog::Serializer,
) -> slog::Result {
slog::KV::serialize(*self, record, serializer)
}
}
impl<T: BeaconChainTypes> slog::KV for SyncingChain<T> {
fn serialize(
&self,
record: &slog::Record,
serializer: &mut dyn slog::Serializer,
) -> slog::Result {
use slog::Value;
serializer.emit_u64("id", self.id)?;
Value::serialize(&self.start_epoch, record, "from", serializer)?;
Value::serialize(
&self.target_head_slot.epoch(T::EthSpec::slots_per_epoch()),
record,
"to",
serializer,
)?;
serializer.emit_arguments("end_root", &format_args!("{}", self.target_head_root))?;
Value::serialize(
&self.processing_target,
record,
"current_target",
serializer,
)?;
serializer.emit_usize("batches", self.batches.len())?;
serializer.emit_usize("peers", self.peers.len())?;
serializer.emit_u64("validated_batches", self.validated_batches)?;
serializer.emit_arguments("state", &format_args!("{:?}", self.state))?;
slog::Result::Ok(())
}
}
use super::batch::WrongState as WrongBatchState;
impl From<WrongBatchState> for RemoveChain {
fn from(err: WrongBatchState) -> Self {
RemoveChain::WrongBatchState(err.0)
}
}
impl RemoveChain {
pub fn is_critical(&self) -> bool {
matches!(
self,
RemoveChain::WrongBatchState(..) | RemoveChain::WrongChainState(..)
)
}
}