diff --git a/beacon_node/eth2-libp2p/src/peer_manager/peer_sync_status.rs b/beacon_node/eth2-libp2p/src/peer_manager/peer_sync_status.rs index 0ce4f6ce9c..9d0e6c1cfa 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/peer_sync_status.rs +++ b/beacon_node/eth2-libp2p/src/peer_manager/peer_sync_status.rs @@ -57,7 +57,7 @@ impl PeerSyncStatus { let new_state = PeerSyncStatus::Synced { info }; match self { - PeerSyncStatus::Synced { .. } => { + PeerSyncStatus::Synced { .. } | PeerSyncStatus::Unknown => { *self = new_state; false // state was not updated } @@ -70,11 +70,11 @@ impl PeerSyncStatus { /// Updates the sync state given a peer that is further ahead in the chain than us. /// Returns true if the state has changed. - pub fn update_ahead(&mut self, info: SyncInfo) -> bool { + pub fn update_advanced(&mut self, info: SyncInfo) -> bool { let new_state = PeerSyncStatus::Advanced { info }; match self { - PeerSyncStatus::Advanced { .. } => { + PeerSyncStatus::Advanced { .. } | PeerSyncStatus::Unknown => { *self = new_state; false // state was not updated } @@ -91,7 +91,7 @@ impl PeerSyncStatus { let new_state = PeerSyncStatus::Behind { info }; match self { - PeerSyncStatus::Behind { .. } => { + PeerSyncStatus::Behind { .. } | PeerSyncStatus::Unknown => { *self = new_state; false // state was not updated } diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index d3c3d98cc9..d309ee9bb1 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -145,7 +145,7 @@ impl Processor { /// Process a `Status` response from a peer. pub fn on_status_response(&mut self, peer_id: PeerId, status: StatusMessage) { - trace!( + debug!( self.log, "Received Status Response"; "peer" => format!("{:?}", peer_id), diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index c67ef22c40..ab603df3ac 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -268,8 +268,8 @@ impl SyncManager { "peer" => format!("{:?}", peer_id), "peer_head_slot" => remote.head_slot, "local_head_slot" => local_peer_info.head_slot, - "remote_finalized_epoch" => local_peer_info.finalized_epoch, - "local_finalized_epoch" => remote.finalized_epoch, + "peer_finalized_epoch" => remote.finalized_epoch, + "local_finalized_epoch" => local_peer_info.finalized_epoch, ); // Add the peer to our RangeSync self.range_sync @@ -528,11 +528,14 @@ impl SyncManager { self.update_sync_state(); } + // TODO: Group these functions into one. /// Updates the syncing state of a peer to be synced. fn synced_peer(&mut self, peer_id: &PeerId, sync_info: PeerSyncInfo) { if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { + let head_slot = sync_info.head_slot; + let finalized_epoch = sync_info.finalized_epoch; if peer_info.sync_status.update_synced(sync_info.into()) { - debug!(self.log, "Peer transitioned to synced status"; "peer_id" => format!("{}", peer_id)); + debug!(self.log, "Peer transitioned sync state"; "new_state" => "synced", "peer_id" => format!("{}", peer_id), "head_slot" => head_slot, "finalized_epoch" => finalized_epoch); } } else { crit!(self.log, "Status'd peer is unknown"; "peer_id" => format!("{}", peer_id)); @@ -543,9 +546,10 @@ impl SyncManager { /// Updates the syncing state of a peer to be behind. fn advanced_peer(&mut self, peer_id: &PeerId, sync_info: PeerSyncInfo) { if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - let advanced_slot = sync_info.head_slot; - if peer_info.sync_status.update_ahead(sync_info.into()) { - debug!(self.log, "Peer transitioned to from synced state to ahead"; "peer_id" => format!("{}", peer_id), "head_slot" => advanced_slot); + let head_slot = sync_info.head_slot; + let finalized_epoch = sync_info.finalized_epoch; + if peer_info.sync_status.update_advanced(sync_info.into()) { + debug!(self.log, "Peer transitioned sync state"; "new_state" => "advanced", "peer_id" => format!("{}", peer_id), "head_slot" => head_slot, "finalized_epoch" => finalized_epoch); } } else { crit!(self.log, "Status'd peer is unknown"; "peer_id" => format!("{}", peer_id)); @@ -556,9 +560,10 @@ impl SyncManager { /// Updates the syncing state of a peer to be behind. fn behind_peer(&mut self, peer_id: &PeerId, sync_info: PeerSyncInfo) { if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - let behind_slot = sync_info.head_slot; + let head_slot = sync_info.head_slot; + let finalized_epoch = sync_info.finalized_epoch; if peer_info.sync_status.update_behind(sync_info.into()) { - debug!(self.log, "Peer transitioned to from synced state to behind"; "peer_id" => format!("{}", peer_id), "head_slot" => behind_slot); + debug!(self.log, "Peer transitioned sync state"; "new_state" => "behind", "peer_id" => format!("{}", peer_id), "head_slot" => head_slot, "finalized_epoch" => finalized_epoch); } } else { crit!(self.log, "Status'd peer is unknown"; "peer_id" => format!("{}", peer_id)); diff --git a/beacon_node/network/src/sync/peer_sync_info.rs b/beacon_node/network/src/sync/peer_sync_info.rs index 724aa567dd..f03d1a1dfb 100644 --- a/beacon_node/network/src/sync/peer_sync_info.rs +++ b/beacon_node/network/src/sync/peer_sync_info.rs @@ -7,7 +7,6 @@ use std::ops::Sub; use std::sync::Arc; use types::{Epoch, Hash256, Slot}; -/// /// Keeps track of syncing information for known connected peers. #[derive(Clone, Copy, Debug)] pub struct PeerSyncInfo { @@ -66,7 +65,7 @@ impl PeerSyncInfo { PeerSyncType::FullySynced } // if not, check if the peer is ahead of our chain - else if self.is_ahead_peer(remote_peer_sync_info) { + else if self.is_advanced_peer(remote_peer_sync_info) { PeerSyncType::Advanced } else { // the peer must be behind and not useful @@ -102,7 +101,7 @@ impl PeerSyncInfo { /// 1) The peer could have a head slot that is greater /// than SLOT_IMPORT_TOLERANCE of our current head. /// 2) The peer has a greater finalized slot/epoch than our own. - fn is_ahead_peer(&self, remote: &PeerSyncInfo) -> bool { + fn is_advanced_peer(&self, remote: &PeerSyncInfo) -> bool { if remote.head_slot.sub(self.head_slot).as_usize() > SLOT_IMPORT_TOLERANCE || self.finalized_epoch < remote.finalized_epoch { diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 2f19e3673a..6ea2dd5968 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -634,10 +634,7 @@ impl SyncingChain { ) -> ProcessingResult { batch.retries += 1; - // TODO: Handle partially downloaded batches. Update this when building a new batch - // processor thread. - - if batch.retries > MAX_BATCH_RETRIES { + if batch.retries > MAX_BATCH_RETRIES || self.peer_pool.is_empty() { // chain is unrecoverable, remove it ProcessingResult::RemoveChain } else { diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index 069fe712bf..77eb17f156 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -5,6 +5,7 @@ mod batch; mod chain; mod chain_collection; mod range; +mod sync_type; pub use batch::Batch; pub use batch::BatchId; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 70efa4dce9..21ffea34dd 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -41,6 +41,7 @@ use super::chain::{ChainId, ProcessingResult}; use super::chain_collection::{ChainCollection, RangeSyncState}; +use super::sync_type::RangeSyncType; use super::BatchId; use crate::sync::block_processor::BatchProcessResult; use crate::sync::manager::SyncMessage; @@ -112,7 +113,7 @@ impl RangeSync { &mut self, network: &mut SyncNetworkContext, peer_id: PeerId, - remote: PeerSyncInfo, + remote_info: PeerSyncInfo, ) { // evaluate which chain to sync from @@ -131,93 +132,106 @@ impl RangeSync { }; // convenience variables - let remote_finalized_slot = remote + let remote_finalized_slot = remote_info .finalized_epoch .start_slot(T::EthSpec::slots_per_epoch()); let local_finalized_slot = local_info .finalized_epoch .start_slot(T::EthSpec::slots_per_epoch()); - // remove peer from any chains - self.remove_peer(network, &peer_id); + // NOTE: A peer that has been re-status'd may now exist in multiple finalized chains. // remove any out-of-date chains self.chains.purge_outdated_chains(network); - if remote_finalized_slot > local_info.head_slot - && !self - .beacon_chain - .fork_choice - .contains_block(&remote.finalized_root) - { - debug!(self.log, "Finalization sync peer joined"; "peer_id" => format!("{:?}", peer_id)); - // Finalized chain search + // determine which kind of sync to perform and set up the chains + match RangeSyncType::new(&self.beacon_chain, &local_info, &remote_info) { + RangeSyncType::Finalized => { + // Finalized chain search + debug!(self.log, "Finalization sync peer joined"; "peer_id" => format!("{:?}", peer_id)); - // Note: We keep current head chains. These can continue syncing whilst we complete - // this new finalized chain. + // remove the peer from the awaiting_head_peers list if it exists + self.awaiting_head_peers.remove(&peer_id); - // If a finalized chain already exists that matches, add this peer to the chain's peer - // pool. - if let Some(chain) = self - .chains - .get_finalized_mut(remote.finalized_root, remote_finalized_slot) - { - debug!(self.log, "Finalized chain exists, adding peer"; "peer_id" => format!("{:?}", peer_id), "target_root" => format!("{}", chain.target_head_root), "end_slot" => chain.target_head_slot, "start_slot"=> chain.start_slot); + // Note: We keep current head chains. These can continue syncing whilst we complete + // this new finalized chain. - // add the peer to the chain's peer pool - chain.add_peer(network, peer_id); + // If a finalized chain already exists that matches, add this peer to the chain's peer + // pool. + if let Some(chain) = self + .chains + .get_finalized_mut(remote_info.finalized_root, remote_finalized_slot) + { + debug!(self.log, "Finalized chain exists, adding peer"; "peer_id" => format!("{:?}", peer_id), "target_root" => format!("{}", chain.target_head_root), "end_slot" => chain.target_head_slot, "start_slot"=> chain.start_slot); - // check if the new peer's addition will favour a new syncing chain. - self.chains.update_finalized(network); - self.chains.update_sync_state(); - } else { - // there is no finalized chain that matches this peer's last finalized target - // create a new finalized chain - debug!(self.log, "New finalized chain added to sync"; "peer_id" => format!("{:?}", peer_id), "start_slot" => local_finalized_slot.as_u64(), "end_slot" => remote_finalized_slot.as_u64(), "finalized_root" => format!("{}", remote.finalized_root)); + // add the peer to the chain's peer pool + chain.add_peer(network, peer_id); - self.chains.new_finalized_chain( - local_finalized_slot, - remote.finalized_root, - remote_finalized_slot, - peer_id, - self.sync_send.clone(), - ); + // check if the new peer's addition will favour a new syncing chain. + self.chains.update_finalized(network); + // update the global sync state if necessary + self.chains.update_sync_state(); + } else { + // there is no finalized chain that matches this peer's last finalized target + // create a new finalized chain + debug!(self.log, "New finalized chain added to sync"; "peer_id" => format!("{:?}", peer_id), "start_slot" => local_finalized_slot.as_u64(), "end_slot" => remote_finalized_slot.as_u64(), "finalized_root" => format!("{}", remote_info.finalized_root)); + + self.chains.new_finalized_chain( + local_finalized_slot, + remote_info.finalized_root, + remote_finalized_slot, + peer_id, + self.sync_send.clone(), + ); + self.chains.update_finalized(network); + // update the global sync state + self.chains.update_sync_state(); + } + } + RangeSyncType::Head => { + // This peer requires a head chain sync + + if self.chains.is_finalizing_sync() { + // If there are finalized chains to sync, finish these first, before syncing head + // chains. This allows us to re-sync all known peers + trace!(self.log, "Waiting for finalized sync to complete"; "peer_id" => format!("{:?}", peer_id)); + // store the peer to re-status after all finalized chains complete + self.awaiting_head_peers.insert(peer_id); + return; + } + + // if the peer existed in any other head chain, remove it. + self.remove_peer(network, &peer_id); + + // The new peer has the same finalized (earlier filters should prevent a peer with an + // earlier finalized chain from reaching here). + debug!(self.log, "New peer added for recent head sync"; "peer_id" => format!("{:?}", peer_id)); + + // search if there is a matching head chain, then add the peer to the chain + if let Some(chain) = self + .chains + .get_head_mut(remote_info.head_root, remote_info.head_slot) + { + debug!(self.log, "Adding peer to the existing head chain peer pool"; "head_root" => format!("{}",remote_info.head_root), "head_slot" => remote_info.head_slot, "peer_id" => format!("{:?}", peer_id)); + + // add the peer to the head's pool + chain.add_peer(network, peer_id); + } else { + // There are no other head chains that match this peer's status, create a new one, and + let start_slot = std::cmp::min(local_info.head_slot, remote_finalized_slot); + debug!(self.log, "Creating a new syncing head chain"; "head_root" => format!("{}",remote_info.head_root), "start_slot" => start_slot, "head_slot" => remote_info.head_slot, "peer_id" => format!("{:?}", peer_id)); + self.chains.new_head_chain( + network, + start_slot, + remote_info.head_root, + remote_info.head_slot, + peer_id, + self.sync_send.clone(), + ); + } self.chains.update_finalized(network); self.chains.update_sync_state(); } - } else { - if self.chains.is_finalizing_sync() { - // If there are finalized chains to sync, finish these first, before syncing head - // chains. This allows us to re-sync all known peers - trace!(self.log, "Waiting for finalized sync to complete"; "peer_id" => format!("{:?}", peer_id)); - return; - } - - // The new peer has the same finalized (earlier filters should prevent a peer with an - // earlier finalized chain from reaching here). - debug!(self.log, "New peer added for recent head sync"; "peer_id" => format!("{:?}", peer_id)); - - // search if there is a matching head chain, then add the peer to the chain - if let Some(chain) = self.chains.get_head_mut(remote.head_root, remote.head_slot) { - debug!(self.log, "Adding peer to the existing head chain peer pool"; "head_root" => format!("{}",remote.head_root), "head_slot" => remote.head_slot, "peer_id" => format!("{:?}", peer_id)); - - // add the peer to the head's pool - chain.add_peer(network, peer_id); - } else { - // There are no other head chains that match this peer's status, create a new one, and - let start_slot = std::cmp::min(local_info.head_slot, remote_finalized_slot); - debug!(self.log, "Creating a new syncing head chain"; "head_root" => format!("{}",remote.head_root), "start_slot" => start_slot, "head_slot" => remote.head_slot, "peer_id" => format!("{:?}", peer_id)); - self.chains.new_head_chain( - network, - start_slot, - remote.head_root, - remote.head_slot, - peer_id, - self.sync_send.clone(), - ); - } - self.chains.update_finalized(network); - self.chains.update_sync_state(); } } diff --git a/beacon_node/network/src/sync/range_sync/sync_type.rs b/beacon_node/network/src/sync/range_sync/sync_type.rs new file mode 100644 index 0000000000..4b08b8b046 --- /dev/null +++ b/beacon_node/network/src/sync/range_sync/sync_type.rs @@ -0,0 +1,40 @@ +//! Contains logic about identifying which Sync to perform given PeerSyncInfo of ourselves and +//! of a remote. + +use crate::sync::PeerSyncInfo; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use std::sync::Arc; + +/// The type of Range sync that should be done relative to our current state. +pub enum RangeSyncType { + /// A finalized chain sync should be started with this peer. + Finalized, + /// A head chain sync should be started with this peer. + Head, +} + +impl RangeSyncType { + /// Determines the type of sync given our local `PeerSyncInfo` and the remote's + /// `PeerSyncInfo`. + pub fn new( + chain: &Arc>, + local_info: &PeerSyncInfo, + remote_info: &PeerSyncInfo, + ) -> RangeSyncType { + // Check for finalized chain sync + // + // The condition is: + // - The remotes finalized epoch is greater than our current finalized epoch and we have + // not seen the finalized hash before. + + if remote_info.finalized_epoch > local_info.finalized_epoch + && !chain + .fork_choice + .contains_block(&remote_info.finalized_root) + { + RangeSyncType::Finalized + } else { + RangeSyncType::Head + } + } +}