Super/Silky smooth syncs (#816)

* Initial block processing thread design

* Correct compilation issues

* Increase logging and request from all given peers

* Patch peer request bug

* Adds fork choice to block processing

* Adds logging for bug isolation

* Patch syncing for chains with skip-slots

* Bump block processing error logs

* Improve logging for attestation processing

* Randomize peer selection during sync

* Resuming chains restarts from local finalized slot

* Downgrades Arc batches to Rc batches

* Add clippy fixes

* Downgrade Rc<Batch> to Option<Batch> to pass processed batches to chains

* Add reviewers suggestions
This commit is contained in:
Age Manning
2020-01-23 17:30:49 +11:00
committed by GitHub
parent f8cff3bd2e
commit fdb6e28f94
10 changed files with 759 additions and 477 deletions

View File

@@ -1,28 +1,35 @@
use crate::message_processor::FUTURE_SLOT_TOLERANCE;
use super::batch::{Batch, BatchId, PendingBatches};
use super::batch_processing::{spawn_batch_processor, BatchProcessResult};
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::range_sync::batch::{Batch, PendingBatches};
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome};
use eth2_libp2p::rpc::methods::*;
use crate::sync::SyncMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::rpc::RequestId;
use eth2_libp2p::PeerId;
use slog::{crit, debug, error, trace, warn, Logger};
use rand::prelude::*;
use slog::{crit, debug, warn};
use std::collections::HashSet;
use std::ops::Sub;
use std::sync::Weak;
use types::{BeaconBlock, EthSpec, Hash256, Slot};
use tokio::sync::mpsc;
use types::{BeaconBlock, Hash256, Slot};
/// Blocks are downloaded in batches from peers. This constant specifies how many blocks per batch
/// is requested. There is a timeout for each batch request. If this value is too high, we will
/// downvote peers with poor bandwidth. This can be set arbitrarily high, in which case the
/// responder will fill the response up to the max request size, assuming they have the bandwidth
/// to do so.
//TODO: Make this dynamic based on peer's bandwidth
//TODO: This is lower due to current thread design. Modify once rebuilt.
const BLOCKS_PER_BATCH: u64 = 25;
pub const BLOCKS_PER_BATCH: u64 = 50;
/// The number of times to retry a batch before the chain is considered failed and removed.
const MAX_BATCH_RETRIES: u8 = 5;
/// The maximum number of batches to queue before requesting more.
const BATCH_BUFFER_SIZE: u8 = 5;
/// Invalid batches are attempted to be re-downloaded from other peers. If they cannot be processed
/// after `INVALID_BATCH_LOOKUP_ATTEMPTS` times, the chain is considered faulty and all peers will
/// be downvoted.
const _INVALID_BATCH_LOOKUP_ATTEMPTS: u8 = 3;
/// A return type for functions that act on a `Chain` which informs the caller whether the chain
/// has been completed and should be removed or to be kept if further processing is
/// required.
@@ -31,32 +38,6 @@ pub enum ProcessingResult {
RemoveChain,
}
impl<T: EthSpec> Eq for Batch<T> {}
impl<T: EthSpec> Batch<T> {
fn new(id: u64, start_slot: Slot, end_slot: Slot, head_root: Hash256, peer_id: PeerId) -> Self {
Batch {
id,
start_slot,
end_slot,
head_root,
_original_peer: peer_id.clone(),
current_peer: peer_id,
retries: 0,
downloaded_blocks: Vec::new(),
}
}
fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest {
BlocksByRangeRequest {
head_block_root: self.head_root,
start_slot: self.start_slot.into(),
count: std::cmp::min(BLOCKS_PER_BATCH, self.end_slot.sub(self.start_slot).into()),
step: 1,
}
}
}
/// A chain of blocks that need to be downloaded. Peers who claim to contain the target head
/// root are grouped into the peer pool and queried for batches when downloading the
/// chain.
@@ -77,21 +58,37 @@ pub struct SyncingChain<T: BeaconChainTypes> {
/// The batches that have been downloaded and are awaiting processing and/or validation.
completed_batches: Vec<Batch<T::EthSpec>>,
/// Batches that have been processed and awaiting validation before being removed.
processed_batches: Vec<Batch<T::EthSpec>>,
/// The peers that agree on the `target_head_slot` and `target_head_root` as a canonical chain
/// and thus available to download this chain from.
pub peer_pool: HashSet<PeerId>,
/// The next batch_id that needs to be downloaded.
to_be_downloaded_id: u64,
to_be_downloaded_id: BatchId,
/// The next batch id that needs to be processed.
to_be_processed_id: u64,
to_be_processed_id: BatchId,
/// The last batch id that was processed.
last_processed_id: u64,
last_processed_id: BatchId,
/// The current state of the chain.
pub state: ChainSyncingState,
/// A random id given to a batch process request. This is None if there is no ongoing batch
/// process.
current_processing_id: Option<u64>,
/// A send channel to the sync manager. This is given to the batch processor thread to report
/// back once batch processing has completed.
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
chain: Weak<BeaconChain<T>>,
/// A reference to the sync logger.
log: slog::Logger,
}
#[derive(PartialEq)]
@@ -100,8 +97,6 @@ pub enum ChainSyncingState {
Stopped,
/// The chain is undergoing syncing.
Syncing,
/// The chain is temporarily paused whilst an error is rectified.
_Paused,
}
impl<T: BeaconChainTypes> SyncingChain<T> {
@@ -110,6 +105,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
target_head_slot: Slot,
target_head_root: Hash256,
peer_id: PeerId,
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
chain: Weak<BeaconChain<T>>,
log: slog::Logger,
) -> Self {
let mut peer_pool = HashSet::new();
peer_pool.insert(peer_id);
@@ -120,11 +118,16 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
target_head_root,
pending_batches: PendingBatches::new(),
completed_batches: Vec::new(),
processed_batches: Vec::new(),
peer_pool,
to_be_downloaded_id: 1,
to_be_processed_id: 1,
last_processed_id: 0,
to_be_downloaded_id: BatchId(1),
to_be_processed_id: BatchId(1),
last_processed_id: BatchId(0),
state: ChainSyncingState::Stopped,
current_processing_id: None,
sync_send,
chain,
log,
}
}
@@ -136,49 +139,45 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// batch.
pub fn on_block_response(
&mut self,
chain: &Weak<BeaconChain<T>>,
network: &mut SyncNetworkContext,
request_id: RequestId,
beacon_block: &Option<BeaconBlock<T::EthSpec>>,
log: &slog::Logger,
) -> Option<ProcessingResult> {
) -> Option<()> {
if let Some(block) = beacon_block {
// This is not a stream termination, simply add the block to the request
self.pending_batches.add_block(request_id, block.clone())?;
Some(ProcessingResult::KeepChain)
self.pending_batches.add_block(request_id, block.clone())
} else {
// A stream termination has been sent. This batch has ended. Process a completed batch.
let batch = self.pending_batches.remove(request_id)?;
Some(self.process_completed_batch(chain.clone(), network, batch, log))
self.handle_completed_batch(network, batch);
Some(())
}
}
/// A completed batch has been received, process the batch.
/// This will return `ProcessingResult::KeepChain` if the chain has not completed or
/// failed indicating that further batches are required.
fn process_completed_batch(
fn handle_completed_batch(
&mut self,
chain: Weak<BeaconChain<T>>,
network: &mut SyncNetworkContext,
batch: Batch<T::EthSpec>,
log: &slog::Logger,
) -> ProcessingResult {
) {
// An entire batch of blocks has been received. This functions checks to see if it can be processed,
// remove any batches waiting to be verified and if this chain is syncing, request new
// blocks for the peer.
debug!(log, "Completed batch received"; "id"=>batch.id, "blocks"=>batch.downloaded_blocks.len(), "awaiting_batches" => self.completed_batches.len());
debug!(self.log, "Completed batch received"; "id"=> *batch.id, "blocks"=>batch.downloaded_blocks.len(), "awaiting_batches" => self.completed_batches.len());
// verify the range of received blocks
// Note that the order of blocks is verified in block processing
if let Some(last_slot) = batch.downloaded_blocks.last().map(|b| b.slot) {
// the batch is non-empty
if batch.start_slot > batch.downloaded_blocks[0].slot || batch.end_slot < last_slot {
warn!(log, "BlocksByRange response returned out of range blocks";
warn!(self.log, "BlocksByRange response returned out of range blocks";
"response_initial_slot" => batch.downloaded_blocks[0].slot,
"requested_initial_slot" => batch.start_slot);
network.downvote_peer(batch.current_peer);
self.to_be_processed_id = batch.id; // reset the id back to here, when incrementing, it will check against completed batches
return ProcessingResult::KeepChain;
return;
}
}
@@ -200,138 +199,138 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// already be processed but not verified and therefore have Id's less than
// `self.to_be_processed_id`.
//TODO: Run the processing of blocks in a separate thread. Build a queue of completed
//blocks here, manage the queue and process them in another thread as they become
//available.
// pre-emptively request more blocks from peers whilst we process current blocks,
self.request_batches(network);
if self.state == ChainSyncingState::Syncing {
// pre-emptively request more blocks from peers whilst we process current blocks,
if !self.send_range_request(network, log) {
debug!(log, "No peer available for next batch.")
}
// Try and process any completed batches. This will spawn a new task to process any blocks
// that are ready to be processed.
self.process_completed_batches();
}
/// Tries to process any batches if there are any available and we are not currently processing
/// other batches.
fn process_completed_batches(&mut self) {
// Only process batches if this chain is Syncing
if self.state != ChainSyncingState::Syncing {
return;
}
// Try and process batches sequentially in the ordered list.
let current_process_id = self.to_be_processed_id;
// keep track of the number of successful batches to decide whether to run fork choice
let mut successful_block_process = false;
// Only process one batch at a time
if self.current_processing_id.is_some() {
return;
}
for batch in self
.completed_batches
.iter()
.filter(|batch| batch.id >= current_process_id)
// Check if there is a batch ready to be processed
while !self.completed_batches.is_empty()
&& self.completed_batches[0].id == self.to_be_processed_id
{
if batch.id != self.to_be_processed_id {
// there are no batches to be processed at the moment
break;
}
let batch = self.completed_batches.remove(0);
if batch.downloaded_blocks.is_empty() {
// the batch was empty, progress to the next block
self.to_be_processed_id += 1;
// The batch was empty, consider this processed and move to the next batch
self.processed_batches.push(batch);
*self.to_be_processed_id += 1;
continue;
}
// process the batch
// Keep track of successful batches. Run fork choice after all waiting batches have
// been processed.
debug!(log, "Processing batch"; "batch_id" => batch.id);
match process_batch(chain.clone(), batch, log) {
Ok(_) => {
// batch was successfully processed
self.last_processed_id = self.to_be_processed_id;
self.to_be_processed_id += 1;
successful_block_process = true;
}
Err(e) => {
warn!(log, "Block processing error"; "error"=> format!("{:?}", e));
if successful_block_process {
if let Some(chain) = chain.upgrade() {
match chain.fork_choice() {
Ok(()) => trace!(
log,
"Fork choice success";
"location" => "batch import error"
),
Err(e) => error!(
log,
"Fork choice failed";
"error" => format!("{:?}", e),
"location" => "batch import error"
),
}
}
}
// batch processing failed
// this could be because this batch is invalid, or a previous invalidated batch
// is invalid. We need to find out which and downvote the peer that has sent us
// an invalid batch.
// firstly remove any validated batches
return self.handle_invalid_batch(chain, network);
}
}
}
// If we have processed batches, run fork choice
if successful_block_process {
if let Some(chain) = chain.upgrade() {
match chain.fork_choice() {
Ok(()) => trace!(
log,
"Fork choice success";
"location" => "batch import success"
),
Err(e) => error!(
log,
"Fork choice failed";
"error" => format!("{:?}", e),
"location" => "batch import success"
),
}
}
}
// remove any validated batches
let last_processed_id = self.last_processed_id;
self.completed_batches
.retain(|batch| batch.id >= last_processed_id);
// check if the chain has completed syncing
if self.start_slot + self.last_processed_id * BLOCKS_PER_BATCH >= self.target_head_slot {
// chain is completed
ProcessingResult::RemoveChain
} else {
// chain is not completed
ProcessingResult::KeepChain
// send the batch to the batch processor thread
return self.process_batch(batch);
}
}
/// An invalid batch has been received that could not be processed.
fn handle_invalid_batch(
/// Sends a batch to the batch processor.
fn process_batch(&mut self, batch: Batch<T::EthSpec>) {
// only spawn one instance at a time
let processing_id: u64 = rand::random();
self.current_processing_id = Some(processing_id);
spawn_batch_processor(
self.chain.clone(),
processing_id,
batch,
self.sync_send.clone(),
self.log.clone(),
);
}
/// The block processor has completed processing a batch. This function handles the result
/// of the batch processor.
pub fn on_batch_process_result(
&mut self,
_chain: Weak<BeaconChain<T>>,
network: &mut SyncNetworkContext,
) -> ProcessingResult {
// The current batch could not be processed, indicating either the current or previous
// batches are invalid
// The previous batch could be
// incomplete due to the block sizes being too large to fit in a single RPC
// request or there could be consecutive empty batches which are not supposed to be there
// Address these two cases individually.
// Firstly, check if the past batch is invalid.
//
//TODO: Implement this logic
// Currently just fail the chain, and drop all associated peers, removing them from the
// peer pool, to prevent re-status
for peer_id in self.peer_pool.drain() {
network.downvote_peer(peer_id);
processing_id: u64,
batch: &mut Option<Batch<T::EthSpec>>,
result: &BatchProcessResult,
) -> Option<ProcessingResult> {
if Some(processing_id) != self.current_processing_id {
// batch process doesn't belong to this chain
return None;
}
ProcessingResult::RemoveChain
// Consume the batch option
let batch = batch.take().or_else(|| {
crit!(self.log, "Processed batch taken by another chain");
None
})?;
// double check batches are processed in order TODO: Remove for prod
if batch.id != self.to_be_processed_id {
crit!(self.log, "Batch processed out of order";
"processed_batch_id" => *batch.id,
"expected_id" => *self.to_be_processed_id);
}
self.current_processing_id = None;
let res = match result {
BatchProcessResult::Success => {
*self.to_be_processed_id += 1;
// This variable accounts for skip slots and batches that were not actually
// processed due to having no blocks.
self.last_processed_id = batch.id;
// Remove any validate batches awaiting validation.
// Only batches that have blocks are processed here, therefore all previous batches
// have been correct.
let last_processed_id = self.last_processed_id;
self.processed_batches
.retain(|batch| batch.id.0 >= last_processed_id.0);
// add the current batch to processed batches to be verified in the future. We are
// only uncertain about this batch, if it has not returned all blocks.
if batch.downloaded_blocks.len() < BLOCKS_PER_BATCH as usize {
self.processed_batches.push(batch);
}
// check if the chain has completed syncing
if self.start_slot + *self.last_processed_id * BLOCKS_PER_BATCH
>= self.target_head_slot
{
// chain is completed
ProcessingResult::RemoveChain
} else {
// chain is not completed
// attempt to request more batches
self.request_batches(network);
// attempt to process more batches
self.process_completed_batches();
// keep the chain
ProcessingResult::KeepChain
}
}
BatchProcessResult::Failed => {
// batch processing failed
// this could be because this batch is invalid, or a previous invalidated batch
// is invalid. We need to find out which and downvote the peer that has sent us
// an invalid batch.
// firstly remove any validated batches
self.handle_invalid_batch(network, batch)
}
};
Some(res)
}
pub fn stop_syncing(&mut self) {
@@ -342,154 +341,66 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// This chain has been requested to start syncing.
///
/// This could be new chain, or an old chain that is being resumed.
pub fn start_syncing(
&mut self,
network: &mut SyncNetworkContext,
local_finalized_slot: Slot,
log: &slog::Logger,
) {
pub fn start_syncing(&mut self, network: &mut SyncNetworkContext, local_finalized_slot: Slot) {
// A local finalized slot is provided as other chains may have made
// progress whilst this chain was Stopped or paused. If so, update the `processed_batch_id` to
// accommodate potentially downloaded batches from other chains. Also prune any old batches
// awaiting processing
// Only important if the local head is more than a batch worth of blocks ahead of
// what this chain believes is downloaded
let batches_ahead = local_finalized_slot
.as_u64()
.saturating_sub(self.start_slot.as_u64() + self.last_processed_id * BLOCKS_PER_BATCH)
/ BLOCKS_PER_BATCH;
// If the local finalized epoch is ahead of our current processed chain, update the chain
// to start from this point and re-index all subsequent batches starting from one
// (effectively creating a new chain).
if batches_ahead != 0 {
// there are `batches_ahead` whole batches that have been downloaded by another
// chain. Set the current processed_batch_id to this value.
debug!(log, "Updating chains processed batches"; "old_completed_slot" => self.start_slot + self.last_processed_id*BLOCKS_PER_BATCH, "new_completed_slot" => self.start_slot + (self.last_processed_id + batches_ahead)*BLOCKS_PER_BATCH);
self.last_processed_id += batches_ahead;
if local_finalized_slot.as_u64()
> self
.start_slot
.as_u64()
.saturating_add(*self.last_processed_id * BLOCKS_PER_BATCH)
{
debug!(self.log, "Updating chain's progress";
"prev_completed_slot" => self.start_slot + *self.last_processed_id*BLOCKS_PER_BATCH,
"new_completed_slot" => local_finalized_slot.as_u64());
// Re-index batches
*self.last_processed_id = 0;
*self.to_be_downloaded_id = 1;
*self.to_be_processed_id = 1;
if self.start_slot + self.last_processed_id * BLOCKS_PER_BATCH
> self.target_head_slot.as_u64()
{
crit!(
log,
"Current head slot is above the target head";
"target_head_slot" => self.target_head_slot.as_u64(),
"new_start" => self.start_slot + self.last_processed_id * BLOCKS_PER_BATCH,
);
return;
}
// update the `to_be_downloaded_id`
if self.to_be_downloaded_id < self.last_processed_id {
self.to_be_downloaded_id = self.last_processed_id;
}
let last_processed_id = self.last_processed_id;
self.completed_batches
.retain(|batch| batch.id >= last_processed_id.saturating_sub(1));
// remove any completed or processed batches
self.completed_batches.clear();
self.processed_batches.clear();
}
// Now begin requesting blocks from the peer pool, until all peers are exhausted.
while self.send_range_request(network, log) {}
self.state = ChainSyncingState::Syncing;
// start processing batches if needed
self.process_completed_batches();
// begin requesting blocks from the peer pool, until all peers are exhausted.
self.request_batches(network);
}
/// Add a peer to the chain.
///
/// If the chain is active, this starts requesting batches from this peer.
pub fn add_peer(
&mut self,
network: &mut SyncNetworkContext,
peer_id: PeerId,
log: &slog::Logger,
) {
pub fn add_peer(&mut self, network: &mut SyncNetworkContext, peer_id: PeerId) {
self.peer_pool.insert(peer_id.clone());
// do not request blocks if the chain is not syncing
if let ChainSyncingState::Stopped = self.state {
debug!(log, "Peer added to a non-syncing chain"; "peer_id" => format!("{:?}", peer_id));
debug!(self.log, "Peer added to a non-syncing chain"; "peer_id" => format!("{:?}", peer_id));
return;
}
// find the next batch and request it from the peer
self.send_range_request(network, log);
// find the next batch and request it from any peers if we need to
self.request_batches(network);
}
/// Sends a STATUS message to all peers in the peer pool.
pub fn status_peers(&self, chain: Weak<BeaconChain<T>>, network: &mut SyncNetworkContext) {
pub fn status_peers(&self, network: &mut SyncNetworkContext) {
for peer_id in self.peer_pool.iter() {
network.status_peer(chain.clone(), peer_id.clone());
network.status_peer(self.chain.clone(), peer_id.clone());
}
}
/// Requests the next required batch from a peer. Returns true, if there was a peer available
/// to send a request and there are batches to request, false otherwise.
fn send_range_request(&mut self, network: &mut SyncNetworkContext, log: &slog::Logger) -> bool {
// find the next pending batch and request it from the peer
if let Some(peer_id) = self.get_next_peer() {
if let Some(batch) = self.get_next_batch(peer_id) {
debug!(log, "Requesting batch"; "start_slot" => batch.start_slot, "end_slot" => batch.end_slot, "id" => batch.id, "peer" => format!("{:?}", batch.current_peer), "head_root"=> format!("{}", batch.head_root));
// send the batch
self.send_batch(network, batch);
return true;
}
}
false
}
/// Returns a peer if there exists a peer which does not currently have a pending request.
///
/// This is used to create the next request.
fn get_next_peer(&self) -> Option<PeerId> {
for peer in self.peer_pool.iter() {
if self.pending_batches.peer_is_idle(peer) {
return Some(peer.clone());
}
}
None
}
/// Requests the provided batch from the provided peer.
fn send_batch(&mut self, network: &mut SyncNetworkContext, batch: Batch<T::EthSpec>) {
let request = batch.to_blocks_by_range_request();
if let Ok(request_id) = network.blocks_by_range_request(batch.current_peer.clone(), request)
{
// add the batch to pending list
self.pending_batches.insert(request_id, batch);
}
}
/// Returns the next required batch from the chain if it exists. If there are no more batches
/// required, `None` is returned.
fn get_next_batch(&mut self, peer_id: PeerId) -> Option<Batch<T::EthSpec>> {
let batch_start_slot =
self.start_slot + self.to_be_downloaded_id.saturating_sub(1) * BLOCKS_PER_BATCH;
if batch_start_slot > self.target_head_slot {
return None;
}
let batch_end_slot = std::cmp::min(
batch_start_slot + BLOCKS_PER_BATCH,
self.target_head_slot.saturating_add(1u64),
);
let batch_id = self.to_be_downloaded_id;
// find the next batch id. The largest of the next sequential idea, of the next uncompleted
// id
let max_completed_id =
self.completed_batches
.iter()
.fold(0, |max, batch| if batch.id > max { batch.id } else { max });
self.to_be_downloaded_id =
std::cmp::max(self.to_be_downloaded_id + 1, max_completed_id + 1);
Some(Batch::new(
batch_id,
batch_start_slot,
batch_end_slot,
self.target_head_root,
peer_id,
))
}
/// An RPC error has occurred.
///
/// Checks if the request_id is associated with this chain. If so, attempts to re-request the
@@ -501,18 +412,21 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
network: &mut SyncNetworkContext,
peer_id: &PeerId,
request_id: RequestId,
log: &slog::Logger,
) -> Option<ProcessingResult> {
if let Some(batch) = self.pending_batches.remove(request_id) {
warn!(log, "Batch failed. RPC Error"; "id" => batch.id, "retries" => batch.retries, "peer" => format!("{:?}", peer_id));
warn!(self.log, "Batch failed. RPC Error";
"id" => *batch.id,
"retries" => batch.retries,
"peer" => format!("{:?}", peer_id));
Some(self.failed_batch(network, batch, log))
Some(self.failed_batch(network, batch))
} else {
None
}
}
/// A batch has failed.
/// A batch has failed. This occurs when a network timeout happens or the peer didn't respond.
/// These events do not indicate a malicious peer, more likely simple networking issues.
///
/// Attempts to re-request from another peer in the peer pool (if possible) and returns
/// `ProcessingResult::RemoveChain` if the number of retries on the batch exceeds
@@ -521,7 +435,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
&mut self,
network: &mut SyncNetworkContext,
mut batch: Batch<T::EthSpec>,
log: &Logger,
) -> ProcessingResult {
batch.retries += 1;
@@ -541,116 +454,152 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
.unwrap_or_else(|| current_peer);
batch.current_peer = new_peer.clone();
debug!(log, "Re-Requesting batch"; "start_slot" => batch.start_slot, "end_slot" => batch.end_slot, "id" => batch.id, "peer" => format!("{:?}", batch.current_peer), "head_root"=> format!("{}", batch.head_root));
debug!(self.log, "Re-Requesting batch";
"start_slot" => batch.start_slot,
"end_slot" => batch.end_slot,
"id" => *batch.id,
"peer" => format!("{:?}", batch.current_peer),
"head_root"=> format!("{}", batch.head_root));
self.send_batch(network, batch);
ProcessingResult::KeepChain
}
}
}
// Helper function to process block batches which only consumes the chain and blocks to process
fn process_batch<T: BeaconChainTypes>(
chain: Weak<BeaconChain<T>>,
batch: &Batch<T::EthSpec>,
log: &Logger,
) -> Result<(), String> {
for block in &batch.downloaded_blocks {
if let Some(chain) = chain.upgrade() {
let processing_result = chain.process_block(block.clone());
/// An invalid batch has been received that could not be processed.
///
/// These events occur when a peer as successfully responded with blocks, but the blocks we
/// have received are incorrect or invalid. This indicates the peer has not performed as
/// intended and can result in downvoting a peer.
fn handle_invalid_batch(
&mut self,
network: &mut SyncNetworkContext,
_batch: Batch<T::EthSpec>,
) -> ProcessingResult {
// The current batch could not be processed, indicating either the current or previous
// batches are invalid
if let Ok(outcome) = processing_result {
match outcome {
BlockProcessingOutcome::Processed { block_root } => {
// The block was valid and we processed it successfully.
trace!(
log, "Imported block from network";
"slot" => block.slot,
"block_root" => format!("{}", block_root),
);
}
BlockProcessingOutcome::ParentUnknown { parent } => {
// blocks should be sequential and all parents should exist
trace!(
log, "Parent block is unknown";
"parent_root" => format!("{}", parent),
"baby_block_slot" => block.slot,
);
return Err(format!(
"Block at slot {} has an unknown parent.",
block.slot
));
}
BlockProcessingOutcome::BlockIsAlreadyKnown => {
// this block is already known to us, move to the next
debug!(
log, "Imported a block that is already known";
"block_slot" => block.slot,
);
}
BlockProcessingOutcome::FutureSlot {
present_slot,
block_slot,
} => {
if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot {
// The block is too far in the future, drop it.
trace!(
log, "Block is ahead of our slot clock";
"msg" => "block for future slot rejected, check your time",
"present_slot" => present_slot,
"block_slot" => block_slot,
"FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE,
);
return Err(format!(
"Block at slot {} is too far in the future",
block.slot
));
} else {
// The block is in the future, but not too far.
trace!(
log, "Block is slightly ahead of our slot clock, ignoring.";
"present_slot" => present_slot,
"block_slot" => block_slot,
"FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE,
);
}
}
BlockProcessingOutcome::WouldRevertFinalizedSlot { .. } => {
trace!(
log, "Finalized or earlier block processed";
"outcome" => format!("{:?}", outcome),
);
// block reached our finalized slot or was earlier, move to the next block
}
BlockProcessingOutcome::GenesisBlock => {
trace!(
log, "Genesis block was processed";
"outcome" => format!("{:?}", outcome),
);
}
_ => {
warn!(
log, "Invalid block received";
"msg" => "peer sent invalid block",
"outcome" => format!("{:?}", outcome),
);
return Err(format!("Invalid block at slot {}", block.slot));
}
}
} else {
warn!(
log, "BlockProcessingFailure";
"msg" => "unexpected condition in processing block.",
"outcome" => format!("{:?}", processing_result)
);
return Err(format!(
"Unexpected block processing error: {:?}",
processing_result
));
}
} else {
return Ok(()); // terminate early due to dropped beacon chain
// The previous batch could be incomplete due to the block sizes being too large to fit in
// a single RPC request or there could be consecutive empty batches which are not supposed
// to be there
// The current (sub-optimal) strategy is to simply re-request all batches that could
// potentially be faulty. If a batch returns a different result than the original and
// results in successful processing, we downvote the original peer that sent us the batch.
// If all batches return the same result, we try this process INVALID_BATCH_LOOKUP_ATTEMPTS
// times before considering the entire chain invalid and downvoting all peers.
// Firstly, check if there are any past batches that could be invalid.
if !self.processed_batches.is_empty() {
// try and re-download this batch from other peers
}
//TODO: Implement this logic
// Currently just fail the chain, and drop all associated peers, removing them from the
// peer pool, to prevent re-status
for peer_id in self.peer_pool.drain() {
network.downvote_peer(peer_id);
}
ProcessingResult::RemoveChain
}
/// Attempts to request the next required batches from the peer pool if the chain is syncing.
/// It will exhaust the peer pool and left over batches until the batch buffer is reached or
/// all peers are exhausted.
fn request_batches(&mut self, network: &mut SyncNetworkContext) {
if let ChainSyncingState::Syncing = self.state {
while self.send_range_request(network) {}
}
}
Ok(())
/// Requests the next required batch from a peer. Returns true, if there was a peer available
/// to send a request and there are batches to request, false otherwise.
fn send_range_request(&mut self, network: &mut SyncNetworkContext) -> bool {
// find the next pending batch and request it from the peer
if let Some(peer_id) = self.get_next_peer() {
if let Some(batch) = self.get_next_batch(peer_id) {
debug!(self.log, "Requesting batch"; "start_slot" => batch.start_slot, "end_slot" => batch.end_slot, "id" => *batch.id, "peer" => format!("{:?}", batch.current_peer), "head_root"=> format!("{}", batch.head_root));
// send the batch
self.send_batch(network, batch);
return true;
}
}
false
}
/// Returns a peer if there exists a peer which does not currently have a pending request.
///
/// This is used to create the next request.
fn get_next_peer(&self) -> Option<PeerId> {
// randomize the peers for load balancing
let mut rng = rand::thread_rng();
let mut peers = self.peer_pool.iter().collect::<Vec<_>>();
peers.shuffle(&mut rng);
for peer in peers {
if self.pending_batches.peer_is_idle(peer) {
return Some(peer.clone());
}
}
None
}
/// Returns the next required batch from the chain if it exists. If there are no more batches
/// required, `None` is returned.
fn get_next_batch(&mut self, peer_id: PeerId) -> Option<Batch<T::EthSpec>> {
// only request batches up to the buffer size limit
if self
.completed_batches
.len()
.saturating_add(self.pending_batches.len())
> BATCH_BUFFER_SIZE as usize
{
return None;
}
// don't request batches beyond the target head slot
let batch_start_slot =
self.start_slot + self.to_be_downloaded_id.saturating_sub(1) * BLOCKS_PER_BATCH;
if batch_start_slot > self.target_head_slot {
return None;
}
// truncate the batch to the target head of the chain
let batch_end_slot = std::cmp::min(
batch_start_slot + BLOCKS_PER_BATCH,
self.target_head_slot.saturating_add(1u64),
);
let batch_id = self.to_be_downloaded_id;
// Find the next batch id. The largest of the next sequential id, or the next uncompleted
// id
let max_completed_id = self
.completed_batches
.iter()
.last()
.map(|x| x.id.0)
.unwrap_or_else(|| 0);
// TODO: Check if this is necessary
self.to_be_downloaded_id = BatchId(std::cmp::max(
self.to_be_downloaded_id.0 + 1,
max_completed_id + 1,
));
Some(Batch::new(
batch_id,
batch_start_slot,
batch_end_slot,
self.target_head_root,
peer_id,
))
}
/// Requests the provided batch from the provided peer.
fn send_batch(&mut self, network: &mut SyncNetworkContext, batch: Batch<T::EthSpec>) {
let request = batch.to_blocks_by_range_request();
if let Ok(request_id) = network.blocks_by_range_request(batch.current_peer.clone(), request)
{
// add the batch to pending list
self.pending_batches.insert(request_id, batch);
}
}
}