From 668513b67ee2bbe1f6eb93832a6340d0a084d0b8 Mon Sep 17 00:00:00 2001 From: divma Date: Thu, 22 Oct 2020 00:26:06 +0000 Subject: [PATCH] Sync state adjustments (#1804) check for advanced peers and the state of the chain wrt the clock slot to decide if a chain is or not synced /transitioning to a head sync. Also a fix that prevented getting the right state while syncing heads --- .../eth2_libp2p/src/peer_manager/peerdb.rs | 13 ++++ beacon_node/network/src/router/processor.rs | 6 +- beacon_node/network/src/sync/manager.rs | 37 ++++++++--- .../src/sync/range_sync/chain_collection.rs | 63 ++++++++----------- 4 files changed, 71 insertions(+), 48 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs index 9f9a4296c6..7cbcf92fef 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs @@ -229,6 +229,19 @@ impl PeerDB { .map(|(peer_id, _)| peer_id) } + /// Gives the `peer_id` of all known connected and advanced peers. + pub fn advanced_peers(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| { + if info.sync_status.is_advanced() { + return info.connection_status.is_connected(); + } + false + }) + .map(|(peer_id, _)| peer_id) + } + /// Gives an iterator of all peers on a given subnet. pub fn peers_on_subnet(&self, subnet_id: SubnetId) -> impl Iterator { self.peers diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 266b1d45df..52c0d52e6f 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -232,9 +232,9 @@ impl Processor { // because they are using a different genesis time, or that theirs or our system // clock is incorrect. debug!( - self.log, "Handshake Failure"; - "peer" => peer_id.to_string(), - "reason" => "different system clocks or genesis time" + self.log, "Handshake Failure"; + "peer" => peer_id.to_string(), + "reason" => "different system clocks or genesis time" ); self.network .goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index b60489e3a5..6b5fdffdae 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -670,19 +670,38 @@ impl SyncManager { fn update_sync_state(&mut self) { let new_state: SyncState = match self.range_sync.state() { Err(e) => { - debug!(self.log, "Error getting range sync state"; "error" => %e); + crit!(self.log, "Error getting range sync state"; "error" => %e); return; } Ok(state) => match state { None => { - // no range sync, decide if we are stalled or synced - self.network_globals - .peers - .read() - .synced_peers() - .next() - .map(|_| SyncState::Synced) - .unwrap_or_else(|| SyncState::Stalled) + // no range sync, decide if we are stalled or synced. + // For this we check if there is at least one advanced peer. An advanced peer + // with Idle range is possible since a peer's status is updated periodically. + // If we synced a peer between status messages, most likely the peer has + // advanced and will produce a head chain on re-status. Otherwise it will shift + // to being synced + let head = self.chain.best_slot().unwrap_or_else(|_| Slot::new(0)); + let current_slot = self.chain.slot().unwrap_or_else(|_| Slot::new(0)); + + let peers = self.network_globals.peers.read(); + if current_slot >= head + && current_slot.sub(head) <= (SLOT_IMPORT_TOLERANCE as u64) + && head > 0 + { + SyncState::Synced + } else if peers.advanced_peers().next().is_some() { + SyncState::SyncingHead { + start_slot: head, + target_slot: current_slot, + } + } else if peers.synced_peers().next().is_none() { + SyncState::Stalled + } else { + // There are no peers that require syncing and we have at least one synced + // peer + SyncState::Synced + } } Some((RangeSyncType::Finalized, start_slot, target_slot)) => { SyncState::SyncingFinalized { diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 1b92b5eed6..4af972ceb0 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -240,13 +240,12 @@ impl ChainCollection { .head_chains .get(id) .ok_or(format!("Head syncing chain not found: {}", id))?; - range = range.map(|(min_start, max_slot)| { - ( - min_start - .min(chain.start_epoch.start_slot(T::EthSpec::slots_per_epoch())), - max_slot.max(chain.target_head_slot), - ) - }); + let start = chain.start_epoch.start_slot(T::EthSpec::slots_per_epoch()); + let target = chain.target_head_slot; + + range = range + .map(|(min_start, max_slot)| (min_start.min(start), max_slot.max(target))) + .or(Some((start, target))); } let (start_slot, target_slot) = range.ok_or_else(|| "Syncing head with empty head ids".to_string())?; @@ -348,45 +347,37 @@ impl ChainCollection { return; } - // NOTE: if switching from Head Syncing to Finalized Syncing, the head chains are allowed - // to continue, so we check for such chains first, and allow them to continue. - let mut syncing_chains = SmallVec::<[u64; PARALLEL_HEAD_CHAINS]>::new(); - for (id, chain) in self.head_chains.iter_mut() { - if chain.is_syncing() { - if syncing_chains.len() < PARALLEL_HEAD_CHAINS { - syncing_chains.push(*id); - } else { - chain.stop_syncing(); - debug!(self.log, "Stopping extra head chain"; "chain" => id); - } - } - } + // Order chains by available peers, if two chains have the same number of peers, prefer one + // that is already syncing + let mut preferred_ids = self + .head_chains + .iter() + .map(|(id, chain)| (chain.available_peers(), !chain.is_syncing(), *id)) + .collect::>(); + preferred_ids.sort_unstable(); - let mut not_syncing = self.head_chains.len() - syncing_chains.len(); - // Find all head chains that are not currently syncing ordered by peer count. - while syncing_chains.len() < PARALLEL_HEAD_CHAINS && not_syncing > 0 { - // Find the chain with the most peers and start syncing - if let Some((id, chain)) = self - .head_chains - .iter_mut() - .filter(|(_id, chain)| !chain.is_syncing()) - .max_by_key(|(_id, chain)| chain.available_peers()) - { - // start syncing this chain - debug!(self.log, "New head chain started syncing"; &chain); + let mut syncing_chains = SmallVec::<[u64; PARALLEL_HEAD_CHAINS]>::new(); + for (_, _, id) in preferred_ids { + let chain = self.head_chains.get_mut(&id).expect("known chain"); + if syncing_chains.len() < PARALLEL_HEAD_CHAINS { + // start this chain if it's not already syncing + if !chain.is_syncing() { + debug!(self.log, "New head chain started syncing"; &chain); + } if let ProcessingResult::RemoveChain = chain.start_syncing(network, local_epoch, local_head_epoch) { - let id = *id; self.head_chains.remove(&id); error!(self.log, "Chain removed while switching head chains"; "id" => id); } else { - syncing_chains.push(*id); + syncing_chains.push(id); } + } else { + // stop any other chain + chain.stop_syncing(); } - // update variables - not_syncing = self.head_chains.len() - syncing_chains.len(); } + self.state = if syncing_chains.is_empty() { RangeSyncState::Idle } else {