From fa8154e3da5bcca16f18a5b5f75070b99eff8afd Mon Sep 17 00:00:00 2001 From: divma Date: Sun, 26 Apr 2020 23:18:09 -0500 Subject: [PATCH] Ensure batches align to epoch boundaries (#1021) * Ensure batches align to epoch boundaries * Clean up range_sync logs --- .../network/src/sync/range_sync/batch.rs | 7 ++- .../network/src/sync/range_sync/chain.rs | 63 ++++++++++++------- .../src/sync/range_sync/chain_collection.rs | 44 +++++++------ .../network/src/sync/range_sync/range.rs | 36 ++++++----- 4 files changed, 87 insertions(+), 63 deletions(-) diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 58df693399..bd8b604e3f 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -1,4 +1,4 @@ -use super::chain::BLOCKS_PER_BATCH; +use super::chain::EPOCHS_PER_BATCH; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::RequestId; use eth2_libp2p::PeerId; @@ -76,7 +76,10 @@ impl Batch { pub fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest { BlocksByRangeRequest { start_slot: self.start_slot.into(), - count: std::cmp::min(BLOCKS_PER_BATCH, self.end_slot.sub(self.start_slot).into()), + count: std::cmp::min( + T::slots_per_epoch() * EPOCHS_PER_BATCH, + self.end_slot.sub(self.start_slot).into(), + ), step: 1, } } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 6ea2dd5968..424c3a7e88 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -10,14 +10,15 @@ use slog::{crit, debug, warn}; use std::collections::HashSet; use std::sync::Arc; use tokio::sync::mpsc; -use types::{Hash256, SignedBeaconBlock, Slot}; +use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; -/// Blocks are downloaded in batches from peers. This constant specifies how many blocks per batch -/// is requested. There is a timeout for each batch request. If this value is too high, we will -/// downvote peers with poor bandwidth. This can be set arbitrarily high, in which case the +/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of +/// blocks per batch are requested _at most_. A batch may request less blocks to account for +/// already requested slots. There is a timeout for each batch request. If this value is too high, +/// we will downvote peers with poor bandwidth. This can be set arbitrarily high, in which case the /// responder will fill the response up to the max request size, assuming they have the bandwidth /// to do so. -pub const BLOCKS_PER_BATCH: u64 = 64; +pub const EPOCHS_PER_BATCH: u64 = 2; /// The number of times to retry a batch before the chain is considered failed and removed. const MAX_BATCH_RETRIES: u8 = 5; @@ -49,7 +50,7 @@ pub struct SyncingChain { id: ChainId, /// The original start slot when this chain was initialised. - pub start_slot: Slot, + pub start_epoch: Epoch, /// The target head slot. pub target_head_slot: Slot, @@ -80,8 +81,7 @@ pub struct SyncingChain { /// The current state of the chain. pub state: ChainSyncingState, - /// A random id given to a batch process request. This is None if there is no ongoing batch - /// process. + /// The current processing batch, if any. current_processing_batch: Option>, /// A send channel to the sync manager. This is given to the batch processor thread to report @@ -106,7 +106,7 @@ pub enum ChainSyncingState { impl SyncingChain { pub fn new( id: u64, - start_slot: Slot, + start_epoch: Epoch, target_head_slot: Slot, target_head_root: Hash256, peer_id: PeerId, @@ -119,7 +119,7 @@ impl SyncingChain { SyncingChain { id, - start_slot, + start_epoch, target_head_slot, target_head_root, pending_batches: PendingBatches::new(), @@ -138,8 +138,13 @@ impl SyncingChain { /// Returns the latest slot number that has been processed. fn current_processed_slot(&self) -> Slot { - self.start_slot - .saturating_add(self.to_be_processed_id.saturating_sub(1u64) * BLOCKS_PER_BATCH) + self.start_epoch + .start_slot(T::EthSpec::slots_per_epoch()) + .saturating_add( + self.to_be_processed_id.saturating_sub(1u64) + * T::EthSpec::slots_per_epoch() + * EPOCHS_PER_BATCH, + ) } /// A batch of blocks has been received. This function gets run on all chains and should @@ -539,7 +544,7 @@ impl SyncingChain { pub fn start_syncing( &mut self, network: &mut SyncNetworkContext, - local_finalized_slot: Slot, + local_finalized_epoch: Epoch, ) { // A local finalized slot is provided as other chains may have made // progress whilst this chain was Stopped or paused. If so, update the `processed_batch_id` to @@ -550,11 +555,17 @@ impl SyncingChain { // to start from this point and re-index all subsequent batches starting from one // (effectively creating a new chain). - if local_finalized_slot > self.current_processed_slot() { + let local_finalized_slot = local_finalized_epoch.start_slot(T::EthSpec::slots_per_epoch()); + let current_processed_slot = self.current_processed_slot(); + + if local_finalized_slot > current_processed_slot { + // Advance the chain to account for already downloaded blocks. + self.start_epoch = local_finalized_epoch; + debug!(self.log, "Updating chain's progress"; "chain_id" => self.id, - "prev_completed_slot" => self.current_processed_slot(), - "new_completed_slot" => local_finalized_slot.as_u64()); + "prev_completed_slot" => current_processed_slot, + "new_completed_slot" => self.current_processed_slot()); // Re-index batches *self.to_be_downloaded_id = 1; *self.to_be_processed_id = 1; @@ -706,6 +717,9 @@ impl SyncingChain { /// Returns the next required batch from the chain if it exists. If there are no more batches /// required, `None` is returned. fn get_next_batch(&mut self, peer_id: PeerId) -> Option> { + let slots_per_epoch = T::EthSpec::slots_per_epoch(); + let blocks_per_batch = slots_per_epoch * EPOCHS_PER_BATCH; + // only request batches up to the buffer size limit if self .completed_batches @@ -716,16 +730,23 @@ impl SyncingChain { return None; } + let batch_start_slot = self.start_epoch.start_slot(slots_per_epoch) + + self.to_be_downloaded_id.saturating_sub(1) * blocks_per_batch; + // don't request batches beyond the target head slot - let batch_start_slot = - self.start_slot + self.to_be_downloaded_id.saturating_sub(1) * BLOCKS_PER_BATCH; if batch_start_slot > self.target_head_slot { return None; } - // truncate the batch to the target head of the chain + + // truncate the batch to the epoch containing the target head of the chain let batch_end_slot = std::cmp::min( - batch_start_slot + BLOCKS_PER_BATCH, - self.target_head_slot.saturating_add(1u64), + // request either a batch containing the max number of blocks per batch + batch_start_slot + blocks_per_batch, + // or a batch of one epoch of blocks, which contains the `target_head_slot` + self.target_head_slot + .saturating_add(slots_per_epoch) + .epoch(slots_per_epoch) + .start_slot(slots_per_epoch), ); let batch_id = self.to_be_downloaded_id; diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 5c93aa45a3..becfd7df24 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -13,7 +13,7 @@ use slog::{debug, error, info}; use std::sync::Arc; use tokio::sync::mpsc; use types::EthSpec; -use types::{Hash256, Slot}; +use types::{Epoch, Hash256, Slot}; /// The state of the long range/batch sync. #[derive(Clone)] @@ -207,7 +207,7 @@ impl ChainCollection { /// This removes any out-dated chains, swaps to any higher priority finalized chains and /// updates the state of the collection. pub fn update_finalized(&mut self, network: &mut SyncNetworkContext) { - let local_slot = { + let local_epoch = { let local = match PeerSyncInfo::from_chain(&self.beacon_chain) { Some(local) => local, None => { @@ -219,9 +219,7 @@ impl ChainCollection { } }; - local - .finalized_epoch - .start_slot(T::EthSpec::slots_per_epoch()) + local.finalized_epoch }; // Remove any outdated finalized chains @@ -242,11 +240,11 @@ impl ChainCollection { }) { // A chain has more peers. Swap the syncing chain - debug!(self.log, "Switching finalized chains to sync"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_slot"=> chain.start_slot); + debug!(self.log, "Switching finalized chains to sync"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_epoch"=> local_epoch); // update the state to a new finalized state let state = RangeSyncState::Finalized { - start_slot: chain.start_slot, + start_slot: chain.start_epoch.start_slot(T::EthSpec::slots_per_epoch()), head_slot: chain.target_head_slot, head_root: chain.target_head_root, }; @@ -255,7 +253,7 @@ impl ChainCollection { // Stop the current chain from syncing self.finalized_chains[index].stop_syncing(); // Start the new chain - self.finalized_chains[new_index].start_syncing(network, local_slot); + self.finalized_chains[new_index].start_syncing(network, local_epoch); } } else if let Some(chain) = self .finalized_chains @@ -263,10 +261,10 @@ impl ChainCollection { .max_by_key(|chain| chain.peer_pool.len()) { // There is no currently syncing finalization chain, starting the one with the most peers - debug!(self.log, "New finalized chain started syncing"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_slot"=> chain.start_slot); - chain.start_syncing(network, local_slot); + debug!(self.log, "New finalized chain started syncing"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_epoch"=> chain.start_epoch); + chain.start_syncing(network, local_epoch); let state = RangeSyncState::Finalized { - start_slot: chain.start_slot, + start_slot: chain.start_epoch.start_slot(T::EthSpec::slots_per_epoch()), head_slot: chain.target_head_slot, head_root: chain.target_head_root, }; @@ -279,17 +277,17 @@ impl ChainCollection { // for the syncing API, we find the minimal start_slot and the maximum // target_slot of all head chains to report back. - let (min_slot, max_slot) = self.head_chains.iter().fold( - (Slot::from(0u64), Slot::from(0u64)), + let (min_epoch, max_slot) = self.head_chains.iter().fold( + (Epoch::from(0u64), Slot::from(0u64)), |(min, max), chain| { ( - std::cmp::min(min, chain.start_slot), + std::cmp::min(min, chain.start_epoch), std::cmp::max(max, chain.target_head_slot), ) }, ); let head_state = RangeSyncState::Head { - start_slot: min_slot, + start_slot: min_epoch.start_slot(T::EthSpec::slots_per_epoch()), head_slot: max_slot, }; self.state = head_state; @@ -300,7 +298,7 @@ impl ChainCollection { /// Add a new finalized chain to the collection. pub fn new_finalized_chain( &mut self, - local_finalized_slot: Slot, + local_finalized_epoch: Epoch, target_head: Hash256, target_slot: Slot, peer_id: PeerId, @@ -309,7 +307,7 @@ impl ChainCollection { let chain_id = rand::random(); self.finalized_chains.push(SyncingChain::new( chain_id, - local_finalized_slot, + local_finalized_epoch, target_slot, target_head, peer_id, @@ -324,7 +322,7 @@ impl ChainCollection { pub fn new_head_chain( &mut self, network: &mut SyncNetworkContext, - remote_finalized_slot: Slot, + remote_finalized_epoch: Epoch, target_head: Hash256, target_slot: Slot, peer_id: PeerId, @@ -340,7 +338,7 @@ impl ChainCollection { let chain_id = rand::random(); let mut new_head_chain = SyncingChain::new( chain_id, - remote_finalized_slot, + remote_finalized_epoch, target_slot, target_head, peer_id, @@ -349,7 +347,7 @@ impl ChainCollection { self.log.clone(), ); // All head chains can sync simultaneously - new_head_chain.start_syncing(network, remote_finalized_slot); + new_head_chain.start_syncing(network, remote_finalized_epoch); self.head_chains.push(new_head_chain); } @@ -434,7 +432,7 @@ impl ChainCollection { .fork_choice .contains_block(&chain.target_head_root) { - debug!(log_ref, "Purging out of finalized chain"; "start_slot" => chain.start_slot, "end_slot" => chain.target_head_slot); + debug!(log_ref, "Purging out of finalized chain"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot); chain.status_peers(network); false } else { @@ -447,7 +445,7 @@ impl ChainCollection { .fork_choice .contains_block(&chain.target_head_root) { - debug!(log_ref, "Purging out of date head chain"; "start_slot" => chain.start_slot, "end_slot" => chain.target_head_slot); + debug!(log_ref, "Purging out of date head chain"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot); chain.status_peers(network); false } else { @@ -483,7 +481,7 @@ impl ChainCollection { chain }; - debug!(self.log, "Chain was removed"; "start_slot" => chain.start_slot.as_u64(), "end_slot" => chain.target_head_slot.as_u64()); + debug!(self.log, "Chain was removed"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot); // update the state self.update_finalized(network); diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 21ffea34dd..59c789f819 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -11,10 +11,10 @@ //! ## Finalized chain sync //! //! This occurs when a peer connects that claims to have a finalized head slot that is greater -//! than our own. In this case, we form a chain from our last finalized slot, to their claimed +//! than our own. In this case, we form a chain from our last finalized epoch, to their claimed //! finalized slot. Any peer that also claims to have this last finalized slot is added to a pool -//! of peers from which batches of blocks may be downloaded. Blocks are downloaded until -//! the finalized slot of the chain is reached. Once reached, all peers within the pool are sent a +//! of peers from which batches of blocks may be downloaded. Blocks are downloaded until the +//! finalized slot of the chain is reached. Once reached, all peers within the pool are sent a //! STATUS message to potentially start a head chain sync, or check if further finalized chains //! need to be downloaded. //! @@ -26,11 +26,11 @@ //! //! ## Head Chain Sync //! -//! If a peer joins and there is no active finalized chains being synced, and it's head is -//! beyond our `SLOT_IMPORT_TOLERANCE` a chain is formed starting from this peers finalized slot -//! (this has been necessarily downloaded by our node, otherwise we would start a finalized chain -//! sync) to this peers head slot. Any other peers that match this head slot and head root, are -//! added to this chain's peer pool, which will be downloaded in parallel. +//! If a peer joins and there is no active finalized chains being synced, and it's head is beyond +//! our `SLOT_IMPORT_TOLERANCE` a chain is formed starting from this peers finalized epoch (this +//! has been necessarily downloaded by our node, otherwise we would start a finalized chain sync) +//! to this peers head slot. Any other peers that match this head slot and head root, are added to +//! this chain's peer pool, which will be downloaded in parallel. //! //! Unlike finalized chains, head chains can be synced in parallel. //! @@ -65,7 +65,7 @@ pub struct RangeSync { /// A collection of chains that need to be downloaded. This stores any head or finalized chains /// that need to be downloaded. chains: ChainCollection, - /// Peers that join whilst a finalized chain is being download, sit in this set. Once the + /// Peers that join whilst a finalized chain is being downloaded, sit in this set. Once the /// finalized chain(s) complete, these peer's get STATUS'ed to update their head slot before /// the head chains are formed and downloaded. awaiting_head_peers: HashSet, @@ -162,7 +162,7 @@ impl RangeSync { .chains .get_finalized_mut(remote_info.finalized_root, remote_finalized_slot) { - debug!(self.log, "Finalized chain exists, adding peer"; "peer_id" => format!("{:?}", peer_id), "target_root" => format!("{}", chain.target_head_root), "end_slot" => chain.target_head_slot, "start_slot"=> chain.start_slot); + debug!(self.log, "Finalized chain exists, adding peer"; "peer_id" => format!("{:?}", peer_id), "target_root" => format!("{}", chain.target_head_root), "end_slot" => chain.target_head_slot, "start_epoch"=> chain.start_epoch); // add the peer to the chain's peer pool chain.add_peer(network, peer_id); @@ -174,10 +174,10 @@ impl RangeSync { } else { // there is no finalized chain that matches this peer's last finalized target // create a new finalized chain - debug!(self.log, "New finalized chain added to sync"; "peer_id" => format!("{:?}", peer_id), "start_slot" => local_finalized_slot.as_u64(), "end_slot" => remote_finalized_slot.as_u64(), "finalized_root" => format!("{}", remote_info.finalized_root)); + debug!(self.log, "New finalized chain added to sync"; "peer_id" => format!("{:?}", peer_id), "start_epoch" => local_finalized_slot, "end_slot" => remote_finalized_slot, "finalized_root" => format!("{}", remote_info.finalized_root)); self.chains.new_finalized_chain( - local_finalized_slot, + local_info.finalized_epoch, remote_info.finalized_root, remote_finalized_slot, peer_id, @@ -218,11 +218,13 @@ impl RangeSync { chain.add_peer(network, peer_id); } else { // There are no other head chains that match this peer's status, create a new one, and - let start_slot = std::cmp::min(local_info.head_slot, remote_finalized_slot); - debug!(self.log, "Creating a new syncing head chain"; "head_root" => format!("{}",remote_info.head_root), "start_slot" => start_slot, "head_slot" => remote_info.head_slot, "peer_id" => format!("{:?}", peer_id)); + let start_epoch = std::cmp::min(local_info.head_slot, remote_finalized_slot) + .epoch(T::EthSpec::slots_per_epoch()); + debug!(self.log, "Creating a new syncing head chain"; "head_root" => format!("{}",remote_info.head_root), "start_epoch" => start_epoch, "head_slot" => remote_info.head_slot, "peer_id" => format!("{:?}", peer_id)); + self.chains.new_head_chain( network, - start_slot, + start_epoch, remote_info.head_root, remote_info.head_slot, peer_id, @@ -288,7 +290,7 @@ impl RangeSync { }) { Some((index, ProcessingResult::RemoveChain)) => { let chain = self.chains.remove_finalized_chain(index); - debug!(self.log, "Finalized chain removed"; "start_slot" => chain.start_slot.as_u64(), "end_slot" => chain.target_head_slot.as_u64()); + debug!(self.log, "Finalized chain removed"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot); // update the state of the collection self.chains.update_finalized(network); @@ -325,7 +327,7 @@ impl RangeSync { }) { Some((index, ProcessingResult::RemoveChain)) => { let chain = self.chains.remove_head_chain(index); - debug!(self.log, "Head chain completed"; "start_slot" => chain.start_slot.as_u64(), "end_slot" => chain.target_head_slot.as_u64()); + debug!(self.log, "Head chain completed"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot); // the chain is complete, re-status it's peers and remove it chain.status_peers(network);