Ensure batches align to epoch boundaries (#1021)

* Ensure batches align to epoch boundaries

* Clean up range_sync logs
This commit is contained in:
divma
2020-04-26 23:18:09 -05:00
committed by GitHub
parent a50ade3ffc
commit fa8154e3da
4 changed files with 87 additions and 63 deletions

View File

@@ -10,14 +10,15 @@ use slog::{crit, debug, warn};
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::mpsc;
use types::{Hash256, SignedBeaconBlock, Slot};
use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
/// Blocks are downloaded in batches from peers. This constant specifies how many blocks per batch
/// is requested. There is a timeout for each batch request. If this value is too high, we will
/// downvote peers with poor bandwidth. This can be set arbitrarily high, in which case the
/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
/// blocks per batch are requested _at most_. A batch may request less blocks to account for
/// already requested slots. There is a timeout for each batch request. If this value is too high,
/// we will downvote peers with poor bandwidth. This can be set arbitrarily high, in which case the
/// responder will fill the response up to the max request size, assuming they have the bandwidth
/// to do so.
pub const BLOCKS_PER_BATCH: u64 = 64;
pub const EPOCHS_PER_BATCH: u64 = 2;
/// The number of times to retry a batch before the chain is considered failed and removed.
const MAX_BATCH_RETRIES: u8 = 5;
@@ -49,7 +50,7 @@ pub struct SyncingChain<T: BeaconChainTypes> {
id: ChainId,
/// The original start slot when this chain was initialised.
pub start_slot: Slot,
pub start_epoch: Epoch,
/// The target head slot.
pub target_head_slot: Slot,
@@ -80,8 +81,7 @@ pub struct SyncingChain<T: BeaconChainTypes> {
/// The current state of the chain.
pub state: ChainSyncingState,
/// A random id given to a batch process request. This is None if there is no ongoing batch
/// process.
/// The current processing batch, if any.
current_processing_batch: Option<Batch<T::EthSpec>>,
/// A send channel to the sync manager. This is given to the batch processor thread to report
@@ -106,7 +106,7 @@ pub enum ChainSyncingState {
impl<T: BeaconChainTypes> SyncingChain<T> {
pub fn new(
id: u64,
start_slot: Slot,
start_epoch: Epoch,
target_head_slot: Slot,
target_head_root: Hash256,
peer_id: PeerId,
@@ -119,7 +119,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
SyncingChain {
id,
start_slot,
start_epoch,
target_head_slot,
target_head_root,
pending_batches: PendingBatches::new(),
@@ -138,8 +138,13 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// Returns the latest slot number that has been processed.
fn current_processed_slot(&self) -> Slot {
self.start_slot
.saturating_add(self.to_be_processed_id.saturating_sub(1u64) * BLOCKS_PER_BATCH)
self.start_epoch
.start_slot(T::EthSpec::slots_per_epoch())
.saturating_add(
self.to_be_processed_id.saturating_sub(1u64)
* T::EthSpec::slots_per_epoch()
* EPOCHS_PER_BATCH,
)
}
/// A batch of blocks has been received. This function gets run on all chains and should
@@ -539,7 +544,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
pub fn start_syncing(
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
local_finalized_slot: Slot,
local_finalized_epoch: Epoch,
) {
// A local finalized slot is provided as other chains may have made
// progress whilst this chain was Stopped or paused. If so, update the `processed_batch_id` to
@@ -550,11 +555,17 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// to start from this point and re-index all subsequent batches starting from one
// (effectively creating a new chain).
if local_finalized_slot > self.current_processed_slot() {
let local_finalized_slot = local_finalized_epoch.start_slot(T::EthSpec::slots_per_epoch());
let current_processed_slot = self.current_processed_slot();
if local_finalized_slot > current_processed_slot {
// Advance the chain to account for already downloaded blocks.
self.start_epoch = local_finalized_epoch;
debug!(self.log, "Updating chain's progress";
"chain_id" => self.id,
"prev_completed_slot" => self.current_processed_slot(),
"new_completed_slot" => local_finalized_slot.as_u64());
"prev_completed_slot" => current_processed_slot,
"new_completed_slot" => self.current_processed_slot());
// Re-index batches
*self.to_be_downloaded_id = 1;
*self.to_be_processed_id = 1;
@@ -706,6 +717,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
/// Returns the next required batch from the chain if it exists. If there are no more batches
/// required, `None` is returned.
fn get_next_batch(&mut self, peer_id: PeerId) -> Option<Batch<T::EthSpec>> {
let slots_per_epoch = T::EthSpec::slots_per_epoch();
let blocks_per_batch = slots_per_epoch * EPOCHS_PER_BATCH;
// only request batches up to the buffer size limit
if self
.completed_batches
@@ -716,16 +730,23 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
return None;
}
let batch_start_slot = self.start_epoch.start_slot(slots_per_epoch)
+ self.to_be_downloaded_id.saturating_sub(1) * blocks_per_batch;
// don't request batches beyond the target head slot
let batch_start_slot =
self.start_slot + self.to_be_downloaded_id.saturating_sub(1) * BLOCKS_PER_BATCH;
if batch_start_slot > self.target_head_slot {
return None;
}
// truncate the batch to the target head of the chain
// truncate the batch to the epoch containing the target head of the chain
let batch_end_slot = std::cmp::min(
batch_start_slot + BLOCKS_PER_BATCH,
self.target_head_slot.saturating_add(1u64),
// request either a batch containing the max number of blocks per batch
batch_start_slot + blocks_per_batch,
// or a batch of one epoch of blocks, which contains the `target_head_slot`
self.target_head_slot
.saturating_add(slots_per_epoch)
.epoch(slots_per_epoch)
.start_slot(slots_per_epoch),
);
let batch_id = self.to_be_downloaded_id;