From cc44a64d152cd408a60e174608f924b75c95506a Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 18 Aug 2020 02:49:24 +0000 Subject: [PATCH] Limit parallelism of head chain sync (#1527) ## Description Currently lighthouse load-balances across peers a single finalized chain. The chain is selected via the most peers. Once synced to the latest finalized epoch Lighthouse creates chains amongst its peers and syncs them all in parallel amongst each peer (grouped by their current head block). This is typically fast and relatively efficient under normal operations. However if the chain has not finalized in a long time, the head chains can grow quite long. Peer's head chains will update every slot as new blocks are added to the head. Syncing all head chains in parallel is a bottleneck and highly inefficient in block duplication leads to RPC timeouts when attempting to handle all new heads chains at once. This PR limits the parallelism of head syncing chains to 2. We now sync at most two head chains at a time. This allows for the possiblity of sync progressing alongside a peer being slow and holding up one chain via RPC timeouts. --- .../network/src/sync/range_sync/chain.rs | 8 ++ .../src/sync/range_sync/chain_collection.rs | 123 +++++++++++++----- .../network/src/sync/range_sync/range.rs | 13 +- 3 files changed, 106 insertions(+), 38 deletions(-) diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index a01e8c3c5b..d79dff469b 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -686,6 +686,14 @@ impl SyncingChain { } } + /// Returns true if this chain is currently syncing. + pub fn is_syncing(&self) -> bool { + match self.state { + ChainSyncingState::Syncing => true, + ChainSyncingState::Stopped => false, + } + } + /// Attempts to request the next required batches from the peer pool if the chain is syncing. It will exhaust the peer /// pool and left over batches until the batch buffer is reached or all peers are exhausted. fn request_batches(&mut self, network: &mut SyncNetworkContext) { diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index ff194d27b9..1543710ccf 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -15,6 +15,9 @@ use tokio::sync::mpsc; use types::EthSpec; use types::{Epoch, Hash256, Slot}; +/// The number of head syncing chains to sync at a time. +const PARALLEL_HEAD_CHAINS: usize = 2; + /// The state of the long range/batch sync. #[derive(Clone)] pub enum RangeSyncState { @@ -205,8 +208,9 @@ impl ChainCollection { /// Updates the state of the chain collection. /// /// This removes any out-dated chains, swaps to any higher priority finalized chains and - /// updates the state of the collection. - pub fn update_finalized(&mut self, network: &mut SyncNetworkContext) { + /// updates the state of the collection. This starts head chains syncing if any are required to + /// do so. + pub fn update(&mut self, network: &mut SyncNetworkContext) { let local_epoch = { let local = match PeerSyncInfo::from_chain(&self.beacon_chain) { Some(local) => local, @@ -222,9 +226,25 @@ impl ChainCollection { local.finalized_epoch }; - // Remove any outdated finalized chains + // Remove any outdated finalized/head chains self.purge_outdated_chains(network); + // Choose the best finalized chain if one needs to be selected. + self.update_finalized_chains(network, local_epoch); + + if self.finalized_syncing_index().is_none() { + // Handle head syncing chains if there are no finalized chains left. + self.update_head_chains(network, local_epoch); + } + } + + /// This looks at all current finalized chains and decides if a new chain should be prioritised + /// or not. + fn update_finalized_chains( + &mut self, + network: &mut SyncNetworkContext, + local_epoch: Epoch, + ) { // Check if any chains become the new syncing chain if let Some(index) = self.finalized_syncing_index() { // There is a current finalized chain syncing @@ -269,32 +289,76 @@ impl ChainCollection { head_root: chain.target_head_root, }; self.state = state; - } else { - // There are no finalized chains, update the state. - if self.head_chains.is_empty() { - self.state = RangeSyncState::Idle; - } else { - // for the syncing API, we find the minimal start_slot and the maximum - // target_slot of all head chains to report back. - - let (min_epoch, max_slot) = self.head_chains.iter().fold( - (Epoch::from(0u64), Slot::from(0u64)), - |(min, max), chain| { - ( - std::cmp::min(min, chain.start_epoch), - std::cmp::max(max, chain.target_head_slot), - ) - }, - ); - let head_state = RangeSyncState::Head { - start_slot: min_epoch.start_slot(T::EthSpec::slots_per_epoch()), - head_slot: max_slot, - }; - self.state = head_state; - } } } + /// Start syncing any head chains if required. + fn update_head_chains( + &mut self, + network: &mut SyncNetworkContext, + local_epoch: Epoch, + ) { + // There are no finalized chains, update the state. + if self.head_chains.is_empty() { + self.state = RangeSyncState::Idle; + return; + } + + let mut currently_syncing = self + .head_chains + .iter() + .filter(|chain| chain.is_syncing()) + .count(); + let mut not_syncing = self.head_chains.len() - currently_syncing; + + // Find all head chains that are not currently syncing ordered by peer count. + while currently_syncing <= PARALLEL_HEAD_CHAINS && not_syncing > 0 { + // Find the chain with the most peers and start syncing + if let Some((_index, chain)) = self + .head_chains + .iter_mut() + .filter(|chain| !chain.is_syncing()) + .enumerate() + .max_by_key(|(_index, chain)| chain.peer_pool.len()) + { + // start syncing this chain + debug!(self.log, "New head chain started syncing"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_epoch"=> chain.start_epoch); + chain.start_syncing(network, local_epoch); + } + + // update variables + currently_syncing = self + .head_chains + .iter() + .filter(|chain| chain.is_syncing()) + .count(); + not_syncing = self.head_chains.len() - currently_syncing; + } + + // Start + // for the syncing API, we find the minimal start_slot and the maximum + // target_slot of all head chains to report back. + + let (min_epoch, max_slot) = self + .head_chains + .iter() + .filter(|chain| chain.is_syncing()) + .fold( + (Epoch::from(0u64), Slot::from(0u64)), + |(min, max), chain| { + ( + std::cmp::min(min, chain.start_epoch), + std::cmp::max(max, chain.target_head_slot), + ) + }, + ); + let head_state = RangeSyncState::Head { + start_slot: min_epoch.start_slot(T::EthSpec::slots_per_epoch()), + head_slot: max_slot, + }; + self.state = head_state; + } + /// Add a new finalized chain to the collection. pub fn new_finalized_chain( &mut self, @@ -321,7 +385,6 @@ impl ChainCollection { #[allow(clippy::too_many_arguments)] pub fn new_head_chain( &mut self, - network: &mut SyncNetworkContext, remote_finalized_epoch: Epoch, target_head: Hash256, target_slot: Slot, @@ -336,7 +399,7 @@ impl ChainCollection { self.head_chains.retain(|chain| !chain.peer_pool.is_empty()); let chain_id = rand::random(); - let mut new_head_chain = SyncingChain::new( + let new_head_chain = SyncingChain::new( chain_id, remote_finalized_epoch, target_slot, @@ -346,8 +409,6 @@ impl ChainCollection { self.beacon_chain.clone(), self.log.clone(), ); - // All head chains can sync simultaneously - new_head_chain.start_syncing(network, remote_finalized_epoch); self.head_chains.push(new_head_chain); } @@ -511,7 +572,7 @@ impl ChainCollection { debug!(self.log, "Chain was removed"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot); // update the state - self.update_finalized(network); + self.update(network); } /// Returns the index of finalized chain that is currently syncing. Returns `None` if no diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 4d768b6fc5..f0ee38e104 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -166,7 +166,7 @@ impl RangeSync { chain.add_peer(network, peer_id); // check if the new peer's addition will favour a new syncing chain. - self.chains.update_finalized(network); + self.chains.update(network); // update the global sync state if necessary self.chains.update_sync_state(); } else { @@ -181,7 +181,7 @@ impl RangeSync { peer_id, self.beacon_processor_send.clone(), ); - self.chains.update_finalized(network); + self.chains.update(network); // update the global sync state self.chains.update_sync_state(); } @@ -221,7 +221,6 @@ impl RangeSync { debug!(self.log, "Creating a new syncing head chain"; "head_root" => format!("{}",remote_info.head_root), "start_epoch" => start_epoch, "head_slot" => remote_info.head_slot, "peer_id" => format!("{:?}", peer_id)); self.chains.new_head_chain( - network, start_epoch, remote_info.head_root, remote_info.head_slot, @@ -229,7 +228,7 @@ impl RangeSync { self.beacon_processor_send.clone(), ); } - self.chains.update_finalized(network); + self.chains.update(network); self.chains.update_sync_state(); } } @@ -284,7 +283,7 @@ impl RangeSync { let chain = self.chains.remove_finalized_chain(index); debug!(self.log, "Finalized chain removed"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot); // update the state of the collection - self.chains.update_finalized(network); + self.chains.update(network); // the chain is complete, re-status it's peers chain.status_peers(network); @@ -324,7 +323,7 @@ impl RangeSync { chain.status_peers(network); // update the state of the collection - self.chains.update_finalized(network); + self.chains.update(network); // update the global state and log any change self.chains.update_sync_state(); } @@ -353,7 +352,7 @@ impl RangeSync { self.remove_peer(network, peer_id); // update the state of the collection - self.chains.update_finalized(network); + self.chains.update(network); // update the global state and inform the user self.chains.update_sync_state(); }