From 2acf75785c316a8ab9745976f075a2b271406773 Mon Sep 17 00:00:00 2001 From: divma Date: Tue, 20 Oct 2020 22:34:18 +0000 Subject: [PATCH] More sync updates (#1791) ## Issue Addressed #1614 and a couple of sync-stalling problems, the most important is a cyclic dependency between the sync manager and the peer manager --- .../eth2_libp2p/src/peer_manager/mod.rs | 26 +- beacon_node/eth2_libp2p/src/types/globals.rs | 23 +- .../eth2_libp2p/src/types/sync_state.rs | 10 +- beacon_node/http_api/src/lib.rs | 6 +- beacon_node/network/src/sync/manager.rs | 52 +- .../network/src/sync/range_sync/batch.rs | 8 + .../network/src/sync/range_sync/chain.rs | 126 +++-- .../src/sync/range_sync/chain_collection.rs | 473 ++++++++---------- .../network/src/sync/range_sync/mod.rs | 1 + .../network/src/sync/range_sync/range.rs | 141 ++---- 10 files changed, 397 insertions(+), 469 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index 60af724649..dc8786f816 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -35,8 +35,8 @@ use score::{PeerAction, ScoreState}; use std::collections::HashMap; /// The time in seconds between re-status's peers. const STATUS_INTERVAL: u64 = 300; -/// The time in seconds between PING events. We do not send a ping if the other peer as PING'd us within -/// this time frame (Seconds) +/// The time in seconds between PING events. We do not send a ping if the other peer has PING'd us +/// within this time frame (Seconds) const PING_INTERVAL: u64 = 30; /// The heartbeat performs regular updates such as updating reputations and performing discovery @@ -831,20 +831,16 @@ impl Stream for PeerManager { } } - // We don't want to update peers during syncing, since this may result in a new chain being - // synced which leads to inefficient re-downloads of blocks. - if !self.network_globals.is_syncing() { - loop { - match self.status_peers.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(peer_id))) => { - self.status_peers.insert(peer_id.clone()); - self.events.push(PeerManagerEvent::Status(peer_id)) - } - Poll::Ready(Some(Err(e))) => { - error!(self.log, "Failed to check for peers to ping"; "error" => e.to_string()) - } - Poll::Ready(None) | Poll::Pending => break, + loop { + match self.status_peers.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(peer_id))) => { + self.status_peers.insert(peer_id.clone()); + self.events.push(PeerManagerEvent::Status(peer_id)) } + Poll::Ready(Some(Err(e))) => { + error!(self.log, "Failed to check for peers to ping"; "error" => e.to_string()) + } + Poll::Ready(None) | Poll::Pending => break, } } diff --git a/beacon_node/eth2_libp2p/src/types/globals.rs b/beacon_node/eth2_libp2p/src/types/globals.rs index e2bf851de2..746bb335b9 100644 --- a/beacon_node/eth2_libp2p/src/types/globals.rs +++ b/beacon_node/eth2_libp2p/src/types/globals.rs @@ -110,25 +110,8 @@ impl NetworkGlobals { /// Updates the syncing state of the node. /// - /// If there is a new state, the old state and the new states are returned. - pub fn update_sync_state(&self) -> Option<(SyncState, SyncState)> { - let mut result = None; - // if we are in a range sync, nothing changes. Range sync will update this. - if !self.is_syncing() { - let new_state = self - .peers - .read() - .synced_peers() - .next() - .map(|_| SyncState::Synced) - .unwrap_or_else(|| SyncState::Stalled); - - let mut peer_state = self.sync_state.write(); - if new_state != *peer_state { - result = Some((peer_state.clone(), new_state.clone())); - } - *peer_state = new_state; - } - result + /// The old state is returned + pub fn set_sync_state(&self, new_state: SyncState) -> SyncState { + std::mem::replace(&mut *self.sync_state.write(), new_state) } } diff --git a/beacon_node/eth2_libp2p/src/types/sync_state.rs b/beacon_node/eth2_libp2p/src/types/sync_state.rs index 94e066cfff..c217ca1ff6 100644 --- a/beacon_node/eth2_libp2p/src/types/sync_state.rs +++ b/beacon_node/eth2_libp2p/src/types/sync_state.rs @@ -1,19 +1,15 @@ use serde::{Deserialize, Serialize}; -use types::{Hash256, Slot}; +use types::Slot; /// The current state of the node. #[derive(Clone, Debug, Serialize, Deserialize)] pub enum SyncState { /// The node is performing a long-range (batch) sync over a finalized chain. /// In this state, parent lookups are disabled. - SyncingFinalized { - start_slot: Slot, - head_slot: Slot, - head_root: Hash256, - }, + SyncingFinalized { start_slot: Slot, target_slot: Slot }, /// The node is performing a long-range (batch) sync over one or many head chains. /// In this state parent lookups are disabled. - SyncingHead { start_slot: Slot, head_slot: Slot }, + SyncingHead { start_slot: Slot, target_slot: Slot }, /// The node is up to date with all known peers and is connected to at least one /// fully synced peer. In this state, parent lookups are enabled. Synced, diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 1b52cbd2c6..ee619142ba 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -290,7 +290,7 @@ pub fn serve( .and_then( |network_globals: Arc>, chain: Arc>| async move { match *network_globals.sync_state.read() { - SyncState::SyncingFinalized { head_slot, .. } => { + SyncState::SyncingFinalized { target_slot, .. } => { let current_slot = chain .slot_clock .now_or_genesis() @@ -302,12 +302,12 @@ pub fn serve( let tolerance = SYNC_TOLERANCE_EPOCHS * T::EthSpec::slots_per_epoch(); - if head_slot + tolerance >= current_slot { + if target_slot + tolerance >= current_slot { Ok(()) } else { Err(warp_utils::reject::not_synced(format!( "head slot is {}, current slot is {}", - head_slot, current_slot + target_slot, current_slot ))) } } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 4f59c6cff6..b60489e3a5 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -35,13 +35,13 @@ use super::network_context::SyncNetworkContext; use super::peer_sync_info::{PeerSyncInfo, PeerSyncType}; -use super::range_sync::{ChainId, RangeSync, EPOCHS_PER_BATCH}; +use super::range_sync::{ChainId, RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use super::RequestId; use crate::beacon_processor::{ProcessId, WorkEvent as BeaconWorkEvent}; use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; use eth2_libp2p::rpc::{methods::MAX_REQUEST_BLOCKS, BlocksByRootRequest, GoodbyeReason}; -use eth2_libp2p::types::NetworkGlobals; +use eth2_libp2p::types::{NetworkGlobals, SyncState}; use eth2_libp2p::{PeerAction, PeerId}; use fnv::FnvHashMap; use lru_cache::LRUCache; @@ -176,11 +176,11 @@ pub struct SyncManager { /// The flag allows us to determine if the peer returned data or sent us nothing. single_block_lookups: FnvHashMap, - /// The logger for the import manager. - log: Logger, - /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. beacon_processor_send: mpsc::Sender>, + + /// The logger for the import manager. + log: Logger, } /// Object representing a single block lookup request. @@ -222,7 +222,6 @@ pub fn spawn( let mut sync_manager = SyncManager { range_sync: RangeSync::new( beacon_chain.clone(), - network_globals.clone(), beacon_processor_send.clone(), log.clone(), ), @@ -233,8 +232,8 @@ pub fn spawn( parent_queue: SmallVec::new(), failed_chains: LRUCache::new(500), single_block_lookups: FnvHashMap::default(), - log: log.clone(), beacon_processor_send, + log: log.clone(), }; // spawn the sync manager thread @@ -276,8 +275,6 @@ impl SyncManager { "local_head_slot" => local_peer_info.head_slot, ); self.synced_peer(&peer_id, remote); - // notify the range sync that a peer has been added - self.range_sync.fully_synced_peer_found(&mut self.network); } PeerSyncType::Advanced => { trace!(self.log, "Useful peer for sync found"; @@ -302,8 +299,6 @@ impl SyncManager { // the second case { self.synced_peer(&peer_id, remote); - // notify the range sync that a peer has been added - self.range_sync.fully_synced_peer_found(&mut self.network); } else { // Add the peer to our RangeSync self.range_sync @@ -673,10 +668,41 @@ impl SyncManager { /// Updates the global sync state and logs any changes. fn update_sync_state(&mut self) { - if let Some((old_state, new_state)) = self.network_globals.update_sync_state() { + let new_state: SyncState = match self.range_sync.state() { + Err(e) => { + debug!(self.log, "Error getting range sync state"; "error" => %e); + return; + } + Ok(state) => match state { + None => { + // no range sync, decide if we are stalled or synced + self.network_globals + .peers + .read() + .synced_peers() + .next() + .map(|_| SyncState::Synced) + .unwrap_or_else(|| SyncState::Stalled) + } + Some((RangeSyncType::Finalized, start_slot, target_slot)) => { + SyncState::SyncingFinalized { + start_slot, + target_slot, + } + } + Some((RangeSyncType::Head, start_slot, target_slot)) => SyncState::SyncingHead { + start_slot, + target_slot, + }, + }, + }; + + let old_state = self.network_globals.set_sync_state(new_state); + let new_state = self.network_globals.sync_state.read(); + if !new_state.eq(&old_state) { info!(self.log, "Sync state updated"; "old_state" => %old_state, "new_state" => %new_state); // If we have become synced - Subscribe to all the core subnet topics - if new_state == eth2_libp2p::types::SyncState::Synced { + if new_state.is_synced() { self.network.subscribe_core_topics(); } } diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index b51b20897c..b9dc04ccd4 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -98,6 +98,14 @@ impl BatchInfo { peers } + /// Verifies if an incomming block belongs to this batch. + pub fn is_expecting_block(&self, peer_id: &PeerId, request_id: &RequestId) -> bool { + if let BatchState::Downloading(expected_peer, _, expected_id) = &self.state { + return peer_id == expected_peer && expected_id == request_id; + } + false + } + pub fn current_peer(&self) -> Option<&PeerId> { match &self.state { BatchState::AwaitingDownload | BatchState::Failed => None, diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 864ac6124c..dacb12de1d 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -2,14 +2,13 @@ use super::batch::{BatchInfo, BatchState}; use crate::beacon_processor::ProcessId; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; use crate::sync::{network_context::SyncNetworkContext, BatchProcessResult, RequestId}; -use beacon_chain::{BeaconChain, BeaconChainTypes}; +use beacon_chain::BeaconChainTypes; use eth2_libp2p::{PeerAction, PeerId}; use fnv::FnvHashMap; use rand::seq::SliceRandom; use slog::{crit, debug, o, warn}; use std::collections::{btree_map::Entry, BTreeMap, HashSet}; use std::hash::{Hash, Hasher}; -use std::sync::Arc; use tokio::sync::mpsc::Sender; use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; @@ -87,9 +86,6 @@ pub struct SyncingChain { /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. beacon_processor_send: Sender>, - /// A reference to the underlying beacon chain. - chain: Arc>, - /// The chain's log. log: slog::Logger, } @@ -116,7 +112,6 @@ impl SyncingChain { target_head_root: Hash256, peer_id: PeerId, beacon_processor_send: Sender>, - chain: Arc>, log: &slog::Logger, ) -> Self { let mut peers = FnvHashMap::default(); @@ -138,7 +133,6 @@ impl SyncingChain { state: ChainSyncingState::Stopped, current_processing_batch: None, beacon_processor_send, - chain, log: log.new(o!("chain" => id)), } } @@ -163,17 +157,17 @@ impl SyncingChain { if let Some(batch_ids) = self.peers.remove(peer_id) { // fail the batches for id in batch_ids { - if let BatchState::Failed = self - .batches - .get_mut(&id) - .expect("registered batch exists") - .download_failed(&self.log) - { - return ProcessingResult::RemoveChain; - } - if let ProcessingResult::RemoveChain = self.retry_batch_download(network, id) { - // drop the chain early - return ProcessingResult::RemoveChain; + if let Some(batch) = self.batches.get_mut(&id) { + if let BatchState::Failed = batch.download_failed(&self.log) { + return ProcessingResult::RemoveChain; + } + if let ProcessingResult::RemoveChain = self.retry_batch_download(network, id) { + // drop the chain early + return ProcessingResult::RemoveChain; + } + } else { + debug!(self.log, "Batch not found while removing peer"; + "peer" => %peer_id, "batch" => "id") } } } @@ -215,12 +209,8 @@ impl SyncingChain { // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer, and that the // request_id matches - if let BatchState::Downloading(expected_peer, _, expected_request_id) = - batch.state() - { - if expected_peer != peer_id || expected_request_id != &request_id { - return ProcessingResult::KeepChain; - } + if !batch.is_expecting_block(peer_id, &request_id) { + return ProcessingResult::KeepChain; } batch } @@ -275,7 +265,13 @@ impl SyncingChain { return ProcessingResult::KeepChain; } - let batch = self.batches.get_mut(&batch_id).expect("Batch exists"); + let batch = match self.batches.get_mut(&batch_id) { + Some(batch) => batch, + None => { + debug!(self.log, "Processing unknown batch"; "batch" => %batch_id); + return ProcessingResult::RemoveChain; + } + }; // NOTE: We send empty batches to the processor in order to trigger the block processor // result callback. This is done, because an empty batch could end a chain and the logic @@ -340,10 +336,8 @@ impl SyncingChain { // - Poisoned -> this is an intermediate state that should never be reached // - AwaitingDownload -> A recoverable failed batch should have been // re-requested. - unreachable!( - "Optimistic batch indicates inconsistent chain state: {:?}", - state - ) + crit!(self.log, "Optimistic batch indicates inconsistent chain state"; "state" => ?state); + return ProcessingResult::RemoveChain; } BatchState::AwaitingValidation(_) => { // This is possible due to race conditions, and tho it would be considered @@ -352,7 +346,7 @@ impl SyncingChain { // candidate. If the batch was empty the chain rejects it; if it was non // empty the chain is advanced to this point (so that the old optimistic // batch is now the processing target) - crit!(self.log, "Optimistic batch should never be Awaiting Validation"; "batch" => epoch); + debug!(self.log, "Optimistic batch should never be Awaiting Validation"; "batch" => epoch); None } } @@ -436,10 +430,13 @@ impl SyncingChain { match result { BatchProcessResult::Success(was_non_empty) => { - let batch = self - .batches - .get_mut(&batch_id) - .expect("Chain was expecting a known batch"); + let batch = match self.batches.get_mut(&batch_id) { + Some(batch) => batch, + None => { + debug!(self.log, "Current processing batch not found"; "batch" => batch_id); + return ProcessingResult::RemoveChain; + } + }; let _ = batch.processing_completed(true, &self.log); // If the processed batch was not empty, we can validate previous unvalidated // blocks. @@ -479,13 +476,19 @@ impl SyncingChain { } } BatchProcessResult::Failed(imported_blocks) => { - let batch = self - .batches - .get_mut(&batch_id) - .expect("Chain was expecting a known batch"); - let peer = batch - .current_peer() - .expect("batch is processing blocks from a peer"); + let (batch, peer) = match self.batches.get_mut(&batch_id) { + Some(batch) => match batch.current_peer().cloned() { + Some(peer) => (batch, peer), + None => { + debug!(self.log, "Current processing has no peer"; "batch" => batch_id); + return ProcessingResult::RemoveChain; + } + }, + None => { + debug!(self.log, "Current processing batch not found"; "batch" => batch_id); + return ProcessingResult::RemoveChain; + } + }; debug!(self.log, "Batch processing failed"; "imported_blocks" => imported_blocks, "batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer)); if let BatchState::Failed = batch.processing_completed(false, &self.log) { @@ -610,12 +613,13 @@ impl SyncingChain { active_batches.remove(&id); } } - BatchState::Failed | BatchState::Poisoned | BatchState::AwaitingDownload => { - unreachable!("batch indicates inconsistent chain state while advancing chain") - } + BatchState::Failed | BatchState::Poisoned | BatchState::AwaitingDownload => crit!( + self.log, + "batch indicates inconsistent chain state while advancing chain" + ), BatchState::AwaitingProcessing(..) => {} BatchState::Processing(_) => { - assert_eq!( + debug_assert_eq!( id, self.current_processing_batch.expect( "A batch in a processing state means the chain is processing it" @@ -770,11 +774,6 @@ impl SyncingChain { } } - /// Sends a STATUS message to all peers in the peer pool. - pub fn status_peers(&self, network: &mut SyncNetworkContext) { - network.status_peers(self.chain.clone(), self.peers.keys().cloned()); - } - /// An RPC error has occurred. /// /// If the batch exists it is re-requested. @@ -789,16 +788,13 @@ impl SyncingChain { // A batch could be retried without the peer failing the request (disconnecting/ // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer - if let BatchState::Downloading(expected_peer, _, expected_request_id) = batch.state() { - if expected_peer != peer_id || expected_request_id != &request_id { - return ProcessingResult::KeepChain; - } + if !batch.is_expecting_block(peer_id, &request_id) { + return ProcessingResult::KeepChain; } debug!(self.log, "Batch failed. RPC Error"; "batch_epoch" => batch_id); - self.peers - .get_mut(peer_id) - .expect("Peer belongs to the chain") - .remove(&batch_id); + if let Some(active_requests) = self.peers.get_mut(peer_id) { + active_requests.remove(&batch_id); + } if let BatchState::Failed = batch.download_failed(&self.log) { return ProcessingResult::RemoveChain; } @@ -865,11 +861,14 @@ impl SyncingChain { debug!(self.log, "Requesting batch"; "epoch" => batch_id, &batch); } // register the batch for this peer - self.peers + return self + .peers .get_mut(&peer) - .expect("peer belongs to the peer pool") - .insert(batch_id); - return ProcessingResult::KeepChain; + .map(|requests| { + requests.insert(batch_id); + ProcessingResult::KeepChain + }) + .unwrap_or(ProcessingResult::RemoveChain); } Err(e) => { // NOTE: under normal conditions this shouldn't happen but we handle it anyway @@ -879,8 +878,7 @@ impl SyncingChain { batch.start_downloading_from_peer(peer.clone(), 1, &self.log); // fake request_id is not relevant self.peers .get_mut(&peer) - .expect("peer belongs to the peer pool") - .remove(&batch_id); + .map(|request| request.remove(&batch_id)); if let BatchState::Failed = batch.download_failed(&self.log) { return ProcessingResult::RemoveChain; } else { diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index fdfa4b8ebd..1b92b5eed6 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -3,16 +3,18 @@ //! Each chain type is stored in it's own map. A variety of helper functions are given along with //! this struct to simplify the logic of the other layers of sync. -use super::chain::{ChainId, ChainSyncingState, ProcessingResult, SyncingChain}; +use super::chain::{ChainId, ProcessingResult, SyncingChain}; use super::sync_type::RangeSyncType; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; use crate::sync::network_context::SyncNetworkContext; use crate::sync::PeerSyncInfo; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use eth2_libp2p::{types::SyncState, NetworkGlobals, PeerId}; +use eth2_libp2p::PeerId; use fnv::FnvHashMap; -use slog::{crit, debug, error, info, trace}; +use slog::{debug, error}; +use smallvec::SmallVec; use std::collections::hash_map::Entry; +use std::collections::HashMap; use std::sync::Arc; use tokio::sync::mpsc; use types::EthSpec; @@ -25,66 +27,17 @@ const PARALLEL_HEAD_CHAINS: usize = 2; #[derive(Clone)] pub enum RangeSyncState { /// A finalized chain is being synced. - Finalized { - /// The start of the finalized chain. - start_slot: Slot, - /// The target head slot of the finalized chain. - head_slot: Slot, - /// The target head root of the finalized chain. - head_root: Hash256, - }, + Finalized(u64), /// There are no finalized chains and we are syncing one more head chains. - Head { - /// The last finalized checkpoint for all head chains. - start_slot: Slot, - /// The largest known slot to sync to. - head_slot: Slot, - }, + Head(SmallVec<[u64; PARALLEL_HEAD_CHAINS]>), /// There are no head or finalized chains and no long range sync is in progress. Idle, } -impl PartialEq for RangeSyncState { - fn eq(&self, other: &Self) -> bool { - match (self, other) { - (RangeSyncState::Finalized { .. }, RangeSyncState::Finalized { .. }) => true, - (RangeSyncState::Head { .. }, RangeSyncState::Head { .. }) => true, - (RangeSyncState::Idle, RangeSyncState::Idle) => true, - _ => false, - } - } -} - -impl Into for RangeSyncState { - fn into(self) -> SyncState { - match self { - RangeSyncState::Finalized { - start_slot, - head_slot, - head_root, - } => SyncState::SyncingFinalized { - start_slot, - head_slot, - head_root, - }, - RangeSyncState::Head { - start_slot, - head_slot, - } => SyncState::SyncingHead { - start_slot, - head_slot, - }, - RangeSyncState::Idle => SyncState::Stalled, // this should never really be used - } - } -} - /// A collection of finalized and head chains currently being processed. pub struct ChainCollection { /// The beacon chain for processing. beacon_chain: Arc>, - /// A reference to the global network parameters. - network_globals: Arc>, /// The set of finalized chains being synced. finalized_chains: FnvHashMap>, /// The set of head chains being synced. @@ -96,14 +49,9 @@ pub struct ChainCollection { } impl ChainCollection { - pub fn new( - beacon_chain: Arc>, - network_globals: Arc>, - log: slog::Logger, - ) -> Self { + pub fn new(beacon_chain: Arc>, log: slog::Logger) -> Self { ChainCollection { beacon_chain, - network_globals, finalized_chains: FnvHashMap::default(), head_chains: FnvHashMap::default(), state: RangeSyncState::Idle, @@ -111,82 +59,55 @@ impl ChainCollection { } } - pub fn state(&self) -> &RangeSyncState { - &self.state - } - - /// Updates the global sync state and logs any changes. - pub fn update_sync_state(&mut self, network: &mut SyncNetworkContext) { - // if there is no range sync occurring, the state is either synced or not based on - // connected peers. - - if self.state == RangeSyncState::Idle { - // there is no range sync, let the state of peers determine the global node sync state - let new_state = self - .network_globals - .peers - .read() - .synced_peers() - .next() - .map(|_| SyncState::Synced) - .unwrap_or_else(|| SyncState::Stalled); - let mut peer_state = self.network_globals.sync_state.write(); - if new_state != *peer_state { - info!(self.log, "Sync state updated"; "old_state" => %peer_state, "new_state" => %new_state); - if new_state == SyncState::Synced { - network.subscribe_core_topics(); + /// Updates the Syncing state of the collection after a chain is removed. + fn on_chain_removed(&mut self, id: &ChainId, was_syncing: bool) { + match self.state { + RangeSyncState::Finalized(ref syncing_id) => { + if syncing_id == id { + // the finalized chain that was syncing was removed + debug_assert!(was_syncing); + let syncing_head_ids: SmallVec<[u64; PARALLEL_HEAD_CHAINS]> = self + .head_chains + .iter() + .filter(|(_id, chain)| chain.is_syncing()) + .map(|(id, _)| *id) + .collect(); + self.state = if syncing_head_ids.is_empty() { + RangeSyncState::Idle + } else { + RangeSyncState::Head(syncing_head_ids) + }; + } else { + debug_assert!(!was_syncing); } - *peer_state = new_state; } - } else { - // The state is based on a range sync state, update it - let mut node_sync_state = self.network_globals.sync_state.write(); - let new_state: SyncState = self.state.clone().into(); - if *node_sync_state != new_state { - // we are updating the state, inform the user - info!(self.log, "Sync state updated"; "old_state" => %node_sync_state, "new_state" => %new_state); + RangeSyncState::Head(ref mut syncing_head_ids) => { + if let Some(index) = syncing_head_ids + .iter() + .enumerate() + .find(|(_, &chain_id)| &chain_id == id) + .map(|(i, _)| i) + { + // a syncing head chain was removed + debug_assert!(was_syncing); + syncing_head_ids.swap_remove(index); + if syncing_head_ids.is_empty() { + self.state = RangeSyncState::Idle; + } + } else { + debug_assert!(!was_syncing); + } } - *node_sync_state = new_state; - } - } - - /// A fully synced peer has joined. - /// - /// We could be awaiting a head sync. If we are in the head syncing state, without any head - /// chains, then update the state to idle. - pub fn fully_synced_peer_found(&mut self, network: &mut SyncNetworkContext) { - if let RangeSyncState::Head { .. } = self.state { - if self.head_chains.is_empty() { - // Update the global network state to either synced or stalled. - self.state = RangeSyncState::Idle; - self.update_sync_state(network); + RangeSyncState::Idle => { + // the removed chain should not be syncing + debug_assert!(!was_syncing) } } } - /// After a finalized chain completes this function is called. It ensures the state is set to - /// `SyncState::Head` indicating we are awaiting new peers to connect before we can consider - /// the state as idle. - pub fn set_head_sync(&mut self) { - if let RangeSyncState::Idle = self.state { - let current_slot = self - .beacon_chain - .head_info() - .map(|info| info.slot) - .unwrap_or_else(|_| Slot::from(0u64)); - - // NOTE: This will modify the /node/syncing API to show current slot for all fields - // while we update peers to look for new potentially HEAD chains. - let temp_head_state = RangeSyncState::Head { - start_slot: current_slot, - head_slot: current_slot, - }; - self.state = temp_head_state; - } - } - /// Calls `func` on every chain of the collection. If the result is /// `ProcessingResult::RemoveChain`, the chain is removed and returned. + /// NOTE: `func` must not change the syncing state of a chain. pub fn call_all(&mut self, mut func: F) -> Vec<(SyncingChain, RangeSyncType)> where F: FnMut(&mut SyncingChain) -> ProcessingResult, @@ -211,7 +132,9 @@ impl ChainCollection { RangeSyncType::Finalized => self.finalized_chains.remove(&id), RangeSyncType::Head => self.head_chains.remove(&id), }; - results.push((chain.expect("Chain exits"), sync_type)); + let chain = chain.expect("Chain exists"); + self.on_chain_removed(&id, chain.is_syncing()); + results.push((chain, sync_type)); } results } @@ -220,6 +143,7 @@ impl ChainCollection { /// /// If the function returns `ProcessingResult::RemoveChain`, the chain is removed and returned. /// If the chain is found, its syncing type is returned, or an error otherwise. + /// NOTE: `func` should not change the sync state of a chain. pub fn call_by_id( &mut self, id: ChainId, @@ -231,14 +155,18 @@ impl ChainCollection { if let Entry::Occupied(mut entry) = self.finalized_chains.entry(id) { // Search in our finalized chains first if let ProcessingResult::RemoveChain = func(entry.get_mut()) { - Ok((Some(entry.remove()), RangeSyncType::Finalized)) + let chain = entry.remove(); + self.on_chain_removed(&id, chain.is_syncing()); + Ok((Some(chain), RangeSyncType::Finalized)) } else { Ok((None, RangeSyncType::Finalized)) } } else if let Entry::Occupied(mut entry) = self.head_chains.entry(id) { // Search in our head chains next if let ProcessingResult::RemoveChain = func(entry.get_mut()) { - Ok((Some(entry.remove()), RangeSyncType::Head)) + let chain = entry.remove(); + self.on_chain_removed(&id, chain.is_syncing()); + Ok((Some(chain), RangeSyncType::Head)) } else { Ok((None, RangeSyncType::Head)) } @@ -253,7 +181,12 @@ impl ChainCollection { /// This removes any out-dated chains, swaps to any higher priority finalized chains and /// updates the state of the collection. This starts head chains syncing if any are required to /// do so. - pub fn update(&mut self, network: &mut SyncNetworkContext) { + pub fn update( + &mut self, + network: &mut SyncNetworkContext, + awaiting_head_peers: &mut HashMap, + beacon_processor_send: &mpsc::Sender>, + ) { let (local_finalized_epoch, local_head_epoch) = match PeerSyncInfo::from_chain(&self.beacon_chain) { None => { @@ -270,14 +203,56 @@ impl ChainCollection { }; // Remove any outdated finalized/head chains - self.purge_outdated_chains(network); + self.purge_outdated_chains(awaiting_head_peers); // Choose the best finalized chain if one needs to be selected. self.update_finalized_chains(network, local_finalized_epoch, local_head_epoch); - if self.finalized_syncing_chain().is_none() { + if !matches!(self.state, RangeSyncState::Finalized(_)) { // Handle head syncing chains if there are no finalized chains left. - self.update_head_chains(network, local_finalized_epoch, local_head_epoch); + self.update_head_chains( + network, + local_finalized_epoch, + local_head_epoch, + awaiting_head_peers, + beacon_processor_send, + ); + } + } + + pub fn state(&self) -> Result, String> { + match self.state { + RangeSyncState::Finalized(ref syncing_id) => { + let chain = self + .finalized_chains + .get(syncing_id) + .ok_or(format!("Finalized syncing chain not found: {}", syncing_id))?; + Ok(Some(( + RangeSyncType::Finalized, + chain.start_epoch.start_slot(T::EthSpec::slots_per_epoch()), + chain.target_head_slot, + ))) + } + RangeSyncState::Head(ref syncing_head_ids) => { + let mut range: Option<(Slot, Slot)> = None; + for id in syncing_head_ids { + let chain = self + .head_chains + .get(id) + .ok_or(format!("Head syncing chain not found: {}", id))?; + range = range.map(|(min_start, max_slot)| { + ( + min_start + .min(chain.start_epoch.start_slot(T::EthSpec::slots_per_epoch())), + max_slot.max(chain.target_head_slot), + ) + }); + } + let (start_slot, target_slot) = + range.ok_or_else(|| "Syncing head with empty head ids".to_string())?; + Ok(Some((RangeSyncType::Head, start_slot, target_slot))) + } + RangeSyncState::Idle => Ok(None), } } @@ -290,26 +265,32 @@ impl ChainCollection { local_head_epoch: Epoch, ) { // Find the chain with most peers and check if it is already syncing - if let Some((new_id, peers)) = self + if let Some((mut new_id, peers)) = self .finalized_chains .iter() .max_by_key(|(_, chain)| chain.available_peers()) .map(|(id, chain)| (*id, chain.available_peers())) { - let old_id = self.finalized_syncing_chain().map( - |(currently_syncing_id, currently_syncing_chain)| { - if *currently_syncing_id != new_id - && peers > currently_syncing_chain.available_peers() - { - currently_syncing_chain.stop_syncing(); - // we stop this chain and start syncing the one with more peers - Some(*currently_syncing_id) - } else { - // the best chain is already the syncing chain, advance it if possible - None + let mut old_id = None; + if let RangeSyncState::Finalized(syncing_id) = self.state { + if syncing_id == new_id { + // best chain is already syncing + old_id = Some(None); + } else { + // chains are different, check that they don't have the same number of peers + if let Some(syncing_chain) = self.finalized_chains.get_mut(&syncing_id) { + if syncing_chain.available_peers() > peers { + syncing_chain.stop_syncing(); + old_id = Some(Some(syncing_id)); + } else { + // chains have the same number of peers, pick the currently syncing + // chain to avoid unnecesary switchings and try to advance it + new_id = syncing_id; + old_id = Some(None); + } } - }, - ); + } + } let chain = self .finalized_chains @@ -318,18 +299,15 @@ impl ChainCollection { match old_id { Some(Some(old_id)) => debug!(self.log, "Switching finalized chains"; - "old_id" => old_id, &chain), + "old_id" => old_id, &chain), None => debug!(self.log, "Syncing new chain"; &chain), - Some(None) => trace!(self.log, "Advancing currently syncing chain"), - // this is the same chain. We try to advance it. + Some(None) => { + // this is the same chain. We try to advance it. + } } + // update the state to a new finalized state - let state = RangeSyncState::Finalized { - start_slot: chain.start_epoch.start_slot(T::EthSpec::slots_per_epoch()), - head_slot: chain.target_head_slot, - head_root: chain.target_head_root, - }; - self.state = state; + self.state = RangeSyncState::Finalized(new_id); if let ProcessingResult::RemoveChain = chain.start_syncing(network, local_epoch, local_head_epoch) @@ -337,6 +315,7 @@ impl ChainCollection { // this happens only if sending a batch over the `network` fails a lot error!(self.log, "Chain removed while switching chains"); self.finalized_chains.remove(&new_id); + self.on_chain_removed(&new_id, true); } } } @@ -347,23 +326,47 @@ impl ChainCollection { network: &mut SyncNetworkContext, local_epoch: Epoch, local_head_epoch: Epoch, + awaiting_head_peers: &mut HashMap, + beacon_processor_send: &mpsc::Sender>, ) { - // There are no finalized chains, update the state. + // Include the awaiting head peers + for (peer_id, peer_sync_info) in awaiting_head_peers.drain() { + self.add_peer_or_create_chain( + local_epoch, + peer_sync_info.head_root, + peer_sync_info.head_slot, + peer_id, + RangeSyncType::Head, + beacon_processor_send, + network, + ); + } + if self.head_chains.is_empty() { + // There are no finalized chains, update the state. self.state = RangeSyncState::Idle; return; } - let mut currently_syncing = self - .head_chains - .values() - .filter(|chain| chain.is_syncing()) - .count(); - let mut not_syncing = self.head_chains.len() - currently_syncing; + // NOTE: if switching from Head Syncing to Finalized Syncing, the head chains are allowed + // to continue, so we check for such chains first, and allow them to continue. + let mut syncing_chains = SmallVec::<[u64; PARALLEL_HEAD_CHAINS]>::new(); + for (id, chain) in self.head_chains.iter_mut() { + if chain.is_syncing() { + if syncing_chains.len() < PARALLEL_HEAD_CHAINS { + syncing_chains.push(*id); + } else { + chain.stop_syncing(); + debug!(self.log, "Stopping extra head chain"; "chain" => id); + } + } + } + + let mut not_syncing = self.head_chains.len() - syncing_chains.len(); // Find all head chains that are not currently syncing ordered by peer count. - while currently_syncing <= PARALLEL_HEAD_CHAINS && not_syncing > 0 { + while syncing_chains.len() < PARALLEL_HEAD_CHAINS && not_syncing > 0 { // Find the chain with the most peers and start syncing - if let Some((_id, chain)) = self + if let Some((id, chain)) = self .head_chains .iter_mut() .filter(|(_id, chain)| !chain.is_syncing()) @@ -374,53 +377,21 @@ impl ChainCollection { if let ProcessingResult::RemoveChain = chain.start_syncing(network, local_epoch, local_head_epoch) { - error!(self.log, "Chain removed while switching head chains") + let id = *id; + self.head_chains.remove(&id); + error!(self.log, "Chain removed while switching head chains"; "id" => id); + } else { + syncing_chains.push(*id); } } // update variables - currently_syncing = self - .head_chains - .iter() - .filter(|(_id, chain)| chain.is_syncing()) - .count(); - not_syncing = self.head_chains.len() - currently_syncing; + not_syncing = self.head_chains.len() - syncing_chains.len(); } - // Start - // for the syncing API, we find the minimal start_slot and the maximum - // target_slot of all head chains to report back. - let (min_epoch, max_slot) = self - .head_chains - .values() - .filter(|chain| chain.is_syncing()) - .fold( - (Epoch::from(0u64), Slot::from(0u64)), - |(min, max), chain| { - ( - std::cmp::min(min, chain.start_epoch), - std::cmp::max(max, chain.target_head_slot), - ) - }, - ); - let head_state = RangeSyncState::Head { - start_slot: min_epoch.start_slot(T::EthSpec::slots_per_epoch()), - head_slot: max_slot, + self.state = if syncing_chains.is_empty() { + RangeSyncState::Idle + } else { + RangeSyncState::Head(syncing_chains) }; - self.state = head_state; - } - - /// This is called once a head chain has completed syncing. It removes all non-syncing head - /// chains and re-status their peers. - pub fn clear_head_chains(&mut self, network: &mut SyncNetworkContext) { - let log_ref = &self.log; - self.head_chains.retain(|_id, chain| { - if !chain.is_syncing() { - debug!(log_ref, "Removing old head chain"; &chain); - chain.status_peers(network); - false - } else { - true - } - }); } /// Returns if `true` if any finalized chains exist, `false` otherwise. @@ -430,14 +401,11 @@ impl ChainCollection { /// Removes any outdated finalized or head chains. /// This removes chains with no peers, or chains whose start block slot is less than our current - /// finalized block slot. - pub fn purge_outdated_chains(&mut self, network: &mut SyncNetworkContext) { - // Remove any chains that have no peers - self.finalized_chains - .retain(|_id, chain| chain.available_peers() > 0); - self.head_chains - .retain(|_id, chain| chain.available_peers() > 0); - + /// finalized block slot. Peers that would create outdated chains are removed too. + pub fn purge_outdated_chains( + &mut self, + awaiting_head_peers: &mut HashMap, + ) { let local_info = match PeerSyncInfo::from_chain(&self.beacon_chain) { Some(local) => local, None => { @@ -455,39 +423,50 @@ impl ChainCollection { let beacon_chain = &self.beacon_chain; let log_ref = &self.log; - // Remove chains that are out-dated and re-status their peers - self.finalized_chains.retain(|_id, chain| { - if chain.target_head_slot <= local_finalized_slot - || beacon_chain - .fork_choice - .read() - .contains_block(&chain.target_head_root) + + let is_outdated = |target_slot: &Slot, target_root: &Hash256| { + target_slot <= &local_finalized_slot + || beacon_chain.fork_choice.read().contains_block(target_root) + }; + + // Retain only head peers that remain relevant + awaiting_head_peers.retain(|_peer_id, peer_sync_info| { + !is_outdated(&peer_sync_info.head_slot, &peer_sync_info.head_root) + }); + + // Remove chains that are out-dated + let mut removed_chains = Vec::new(); + self.finalized_chains.retain(|id, chain| { + if is_outdated(&chain.target_head_slot, &chain.target_head_root) + || chain.available_peers() == 0 { debug!(log_ref, "Purging out of finalized chain"; &chain); - chain.status_peers(network); + removed_chains.push((*id, chain.is_syncing())); false } else { true } }); - self.head_chains.retain(|_id, chain| { - if chain.target_head_slot <= local_finalized_slot - || beacon_chain - .fork_choice - .read() - .contains_block(&chain.target_head_root) + self.head_chains.retain(|id, chain| { + if is_outdated(&chain.target_head_slot, &chain.target_head_root) + || chain.available_peers() == 0 { debug!(log_ref, "Purging out of date head chain"; &chain); - chain.status_peers(network); + removed_chains.push((*id, chain.is_syncing())); false } else { true } }); + + // update the state of the collection + for (id, was_syncing) in removed_chains { + self.on_chain_removed(&id, was_syncing); + } } /// Adds a peer to a chain with the given target, or creates a new syncing chain if it doesn't - /// exits. + /// exists. #[allow(clippy::too_many_arguments)] pub fn add_peer_or_create_chain( &mut self, @@ -501,27 +480,20 @@ impl ChainCollection { ) { let id = SyncingChain::::id(&target_head_root, &target_head_slot); let collection = if let RangeSyncType::Finalized = sync_type { - if let Some(chain) = self.head_chains.get(&id) { - // sanity verification for chain duplication / purging issues - crit!(self.log, "Adding known head chain as finalized chain"; chain); - } &mut self.finalized_chains } else { - if let Some(chain) = self.finalized_chains.get(&id) { - // sanity verification for chain duplication / purging issues - crit!(self.log, "Adding known finalized chain as head chain"; chain); - } &mut self.head_chains }; match collection.entry(id) { Entry::Occupied(mut entry) => { let chain = entry.get_mut(); debug!(self.log, "Adding peer to known chain"; "peer_id" => %peer, "sync_type" => ?sync_type, &chain); - assert_eq!(chain.target_head_root, target_head_root); - assert_eq!(chain.target_head_slot, target_head_slot); + debug_assert_eq!(chain.target_head_root, target_head_root); + debug_assert_eq!(chain.target_head_slot, target_head_slot); if let ProcessingResult::RemoveChain = chain.add_peer(network, peer) { debug!(self.log, "Chain removed after adding peer"; "chain" => id); - entry.remove(); + let chain = entry.remove(); + self.on_chain_removed(&id, chain.is_syncing()); } } Entry::Vacant(entry) => { @@ -532,25 +504,12 @@ impl ChainCollection { target_head_root, peer, beacon_processor_send.clone(), - self.beacon_chain.clone(), &self.log, ); - assert_eq!(new_chain.get_id(), id); + debug_assert_eq!(new_chain.get_id(), id); debug!(self.log, "New chain added to sync"; "peer_id" => peer_rpr, "sync_type" => ?sync_type, &new_chain); entry.insert(new_chain); } } } - - /// Returns the index of finalized chain that is currently syncing. Returns `None` if no - /// finalized chain is currently syncing. - fn finalized_syncing_chain(&mut self) -> Option<(&ChainId, &mut SyncingChain)> { - self.finalized_chains.iter_mut().find_map(|(id, chain)| { - if chain.state == ChainSyncingState::Syncing { - Some((id, chain)) - } else { - None - } - }) - } } diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index 07939569a3..7bebd417c5 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -10,3 +10,4 @@ mod sync_type; pub use batch::BatchInfo; pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH}; pub use range::RangeSync; +pub use sync_type::RangeSyncType; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index c1ae653a7a..824a38daa8 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -40,7 +40,7 @@ //! and further batches are requested as current blocks are being processed. use super::chain::ChainId; -use super::chain_collection::{ChainCollection, RangeSyncState}; +use super::chain_collection::ChainCollection; use super::sync_type::RangeSyncType; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; use crate::sync::network_context::SyncNetworkContext; @@ -48,12 +48,12 @@ use crate::sync::BatchProcessResult; use crate::sync::PeerSyncInfo; use crate::sync::RequestId; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use eth2_libp2p::{NetworkGlobals, PeerId}; -use slog::{debug, error, trace, warn}; -use std::collections::HashSet; +use eth2_libp2p::PeerId; +use slog::{debug, error, trace}; +use std::collections::HashMap; use std::sync::Arc; use tokio::sync::mpsc; -use types::{Epoch, EthSpec, SignedBeaconBlock}; +use types::{Epoch, EthSpec, SignedBeaconBlock, Slot}; /// The primary object dealing with long range/batch syncing. This contains all the active and /// non-active chains that need to be processed before the syncing is considered complete. This @@ -61,13 +61,12 @@ use types::{Epoch, EthSpec, SignedBeaconBlock}; pub struct RangeSync { /// The beacon chain for processing. beacon_chain: Arc>, + /// Last known sync info of our useful connected peers. We use this information to create Head + /// chains after all finalized chains have ended. + awaiting_head_peers: HashMap, /// A collection of chains that need to be downloaded. This stores any head or finalized chains /// that need to be downloaded. chains: ChainCollection, - /// Peers that join whilst a finalized chain is being downloaded, sit in this set. Once the - /// finalized chain(s) complete, these peer's get STATUS'ed to update their head slot before - /// the head chains are formed and downloaded. - awaiting_head_peers: HashSet, /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. beacon_processor_send: mpsc::Sender>, /// The syncing logger. @@ -77,29 +76,20 @@ pub struct RangeSync { impl RangeSync { pub fn new( beacon_chain: Arc>, - network_globals: Arc>, beacon_processor_send: mpsc::Sender>, log: slog::Logger, ) -> Self { RangeSync { beacon_chain: beacon_chain.clone(), - chains: ChainCollection::new(beacon_chain, network_globals, log.clone()), - awaiting_head_peers: HashSet::new(), + chains: ChainCollection::new(beacon_chain, log.clone()), + awaiting_head_peers: HashMap::new(), beacon_processor_send, log, } } - /// The `chains` collection stores the current state of syncing. Once a finalized chain - /// completes, it's state is pre-emptively set to `SyncState::Head`. This ensures that - /// during the transition period of finalized to head, the sync manager doesn't start - /// requesting blocks from gossipsub. - /// - /// On re-status, a peer that has no head to download indicates that this state can be set to - /// idle as there are in fact no head chains to download. This function notifies the chain - /// collection that the state can safely be set to idle. - pub fn fully_synced_peer_found(&mut self, network: &mut SyncNetworkContext) { - self.chains.fully_synced_peer_found(network) + pub fn state(&self) -> Result, String> { + self.chains.state() } /// A useful peer has been added. The SyncManager has identified this peer as needing either @@ -133,16 +123,11 @@ impl RangeSync { // NOTE: A peer that has been re-status'd may now exist in multiple finalized chains. - // remove any out-of-date chains - self.chains.purge_outdated_chains(network); - // determine which kind of sync to perform and set up the chains match RangeSyncType::new(&self.beacon_chain, &local_info, &remote_info) { RangeSyncType::Finalized => { // Finalized chain search debug!(self.log, "Finalization sync peer joined"; "peer_id" => %peer_id); - - // remove the peer from the awaiting_head_peers list if it exists self.awaiting_head_peers.remove(&peer_id); // Note: We keep current head chains. These can continue syncing whilst we complete @@ -158,24 +143,27 @@ impl RangeSync { network, ); - self.chains.update(network); - // update the global sync state - self.chains.update_sync_state(network); + self.chains.update( + network, + &mut self.awaiting_head_peers, + &self.beacon_processor_send, + ); } RangeSyncType::Head => { // This peer requires a head chain sync if self.chains.is_finalizing_sync() { // If there are finalized chains to sync, finish these first, before syncing head - // chains. This allows us to re-sync all known peers - trace!(self.log, "Waiting for finalized sync to complete"; "peer_id" => %peer_id); - // store the peer to re-status after all finalized chains complete - self.awaiting_head_peers.insert(peer_id); + // chains. + trace!(self.log, "Waiting for finalized sync to complete"; + "peer_id" => %peer_id, "awaiting_head_peers" => &self.awaiting_head_peers.len()); + self.awaiting_head_peers.insert(peer_id, remote_info); return; } // if the peer existed in any other head chain, remove it. self.remove_peer(network, &peer_id); + self.awaiting_head_peers.remove(&peer_id); // The new peer has the same finalized (earlier filters should prevent a peer with an // earlier finalized chain from reaching here). @@ -191,8 +179,11 @@ impl RangeSync { &self.beacon_processor_send, network, ); - self.chains.update(network); - self.chains.update_sync_state(network); + self.chains.update( + network, + &mut self.awaiting_head_peers, + &self.beacon_processor_send, + ); } } } @@ -217,13 +208,14 @@ impl RangeSync { chain.on_block_response(network, batch_id, &peer_id, request_id, beacon_block) }) { Ok((removed_chain, sync_type)) => { - if let Some(removed_chain) = removed_chain { + if let Some(_removed_chain) = removed_chain { debug!(self.log, "Chain removed after block response"; "sync_type" => ?sync_type, "chain_id" => chain_id); - removed_chain.status_peers(network); // update the state of the collection - self.chains.update(network); - // update the global state and inform the user - self.chains.update_sync_state(network); + self.chains.update( + network, + &mut self.awaiting_head_peers, + &self.beacon_processor_send, + ); } } Err(_) => { @@ -249,42 +241,13 @@ impl RangeSync { Ok((None, _sync_type)) => { // Chain was found and not removed } - Ok((Some(removed_chain), sync_type)) => { + Ok((Some(_removed_chain), sync_type)) => { debug!(self.log, "Chain removed after processing result"; "chain" => chain_id, "sync_type" => ?sync_type); - // Chain ended, re-status its peers - removed_chain.status_peers(network); - match sync_type { - RangeSyncType::Finalized => { - // update the state of the collection - self.chains.update(network); - // set the state to a head sync if there are no finalized chains, to inform - // the manager that we are awaiting a head chain. - self.chains.set_head_sync(); - // Update the global variables - self.chains.update_sync_state(network); - // if there are no more finalized chains, re-status all known peers - // awaiting a head sync - match self.chains.state() { - RangeSyncState::Idle | RangeSyncState::Head { .. } => { - network.status_peers( - self.beacon_chain.clone(), - self.awaiting_head_peers.drain(), - ); - } - RangeSyncState::Finalized { .. } => {} // Have more finalized chains to complete - } - } - RangeSyncType::Head => { - // Remove non-syncing head chains and re-status the peers. This removes a - // build-up of potentially duplicate head chains. Any legitimate head - // chains will be re-established - self.chains.clear_head_chains(network); - // update the state of the collection - self.chains.update(network); - // update the global state and log any change - self.chains.update_sync_state(network); - } - } + self.chains.update( + network, + &mut self.awaiting_head_peers, + &self.beacon_processor_send, + ); } Err(_) => { @@ -305,17 +268,12 @@ impl RangeSync { // remove the peer from any peer pool, failing its batches self.remove_peer(network, peer_id); - - // update the state of the collection - self.chains.update(network); - // update the global state and inform the user - self.chains.update_sync_state(network); } /// When a peer gets removed, both the head and finalized chains need to be searched to check /// which pool the peer is in. The chain may also have a batch or batches awaiting /// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum - /// retries. In this case, we need to remove the chain and re-status all the peers. + /// retries. In this case, we need to remove the chain. fn remove_peer(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) { for (removed_chain, sync_type) in self .chains @@ -323,10 +281,12 @@ impl RangeSync { { debug!(self.log, "Chain removed after removing peer"; "sync_type" => ?sync_type, "chain" => removed_chain.get_id()); // update the state of the collection - self.chains.update(network); - // update the global state and inform the user - self.chains.update_sync_state(network); } + self.chains.update( + network, + &mut self.awaiting_head_peers, + &self.beacon_processor_send, + ); } /// An RPC error has occurred. @@ -348,11 +308,12 @@ impl RangeSync { Ok((removed_chain, sync_type)) => { if let Some(removed_chain) = removed_chain { debug!(self.log, "Chain removed on rpc error"; "sync_type" => ?sync_type, "chain" => removed_chain.get_id()); - removed_chain.status_peers(network); // update the state of the collection - self.chains.update(network); - // update the global state and inform the user - self.chains.update_sync_state(network); + self.chains.update( + network, + &mut self.awaiting_head_peers, + &self.beacon_processor_send, + ); } } Err(_) => { @@ -360,7 +321,7 @@ impl RangeSync { } } } else { - warn!(self.log, "Response/Error for non registered request"; "request_id" => request_id) + debug!(self.log, "Response/Error for non registered request"; "request_id" => request_id) } } }