From 0b82e9f8a979bc5182b2f349573c4f564975651c Mon Sep 17 00:00:00 2001 From: Age Manning Date: Wed, 22 Apr 2020 23:58:10 +1000 Subject: [PATCH] Update Syncing logic (#1042) * Prevent duplicate parent block lookups * Updates logic for handling re-status'd peers * Allow block lookup if the block is close to head * Correct ordering of sync logs * Remove comments in block processer, clean up sim --- beacon_node/eth2-libp2p/src/lib.rs | 2 +- .../eth2-libp2p/src/peer_manager/mod.rs | 4 +- .../eth2-libp2p/src/peer_manager/peer_info.rs | 20 +- .../src/peer_manager/peer_sync_status.rs | 104 ++++++++++ .../eth2-libp2p/src/peer_manager/peerdb.rs | 5 +- beacon_node/network/src/router/processor.rs | 30 +-- .../network/src/sync/block_processor.rs | 56 +----- beacon_node/network/src/sync/manager.rs | 184 ++++++++++-------- beacon_node/network/src/sync/mod.rs | 2 + .../network/src/sync/peer_sync_info.rs | 114 +++++++++++ .../src/sync/range_sync/chain_collection.rs | 19 +- .../network/src/sync/range_sync/range.rs | 19 +- tests/simulator/src/cli.rs | 12 +- tests/simulator/src/sync_sim.rs | 16 +- 14 files changed, 386 insertions(+), 201 deletions(-) create mode 100644 beacon_node/eth2-libp2p/src/peer_manager/peer_sync_status.rs create mode 100644 beacon_node/network/src/sync/peer_sync_info.rs diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2-libp2p/src/lib.rs index a0d6af1b95..9230a4afb0 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2-libp2p/src/lib.rs @@ -20,6 +20,6 @@ pub use config::Config as NetworkConfig; pub use libp2p::gossipsub::{MessageId, Topic, TopicHash}; pub use libp2p::{multiaddr, Multiaddr}; pub use libp2p::{PeerId, Swarm}; -pub use peer_manager::{PeerDB, PeerInfo, PeerSyncStatus}; +pub use peer_manager::{PeerDB, PeerInfo, PeerSyncStatus, SyncInfo}; pub use rpc::RPCEvent; pub use service::{Service, NETWORK_KEY_FILENAME}; diff --git a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs index aa75b871fc..0b9d1b5c4a 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs @@ -16,9 +16,11 @@ use types::EthSpec; mod client; mod peer_info; +mod peer_sync_status; mod peerdb; -pub use peer_info::{PeerInfo, PeerSyncStatus}; +pub use peer_info::PeerInfo; +pub use peer_sync_status::{PeerSyncStatus, SyncInfo}; /// The minimum reputation before a peer is disconnected. // Most likely this needs tweaking const _MINIMUM_REPUTATION_BEFORE_BAN: Rep = 20; diff --git a/beacon_node/eth2-libp2p/src/peer_manager/peer_info.rs b/beacon_node/eth2-libp2p/src/peer_manager/peer_info.rs index e1fad2eeb0..611766a160 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/peer_info.rs +++ b/beacon_node/eth2-libp2p/src/peer_manager/peer_info.rs @@ -1,5 +1,6 @@ use super::client::Client; use super::peerdb::{Rep, DEFAULT_REPUTATION}; +use super::PeerSyncStatus; use crate::rpc::MetaData; use crate::Multiaddr; use serde::{ @@ -7,7 +8,7 @@ use serde::{ Serialize, }; use std::time::Instant; -use types::{EthSpec, Slot, SubnetId}; +use types::{EthSpec, SubnetId}; use PeerConnectionStatus::*; /// Information about a given connected peer. @@ -130,23 +131,6 @@ impl Serialize for PeerConnectionStatus { } } -#[derive(Clone, Debug, Serialize)] -/// The current sync status of the peer. -pub enum PeerSyncStatus { - /// At the current state as our node or ahead of us. - Synced { - /// The last known head slot from the peer's handshake. - status_head_slot: Slot, - }, - /// Is behind our current head and not useful for block downloads. - Behind { - /// The last known head slot from the peer's handshake. - status_head_slot: Slot, - }, - /// Not currently known as a STATUS handshake has not occurred. - Unknown, -} - impl Default for PeerConnectionStatus { fn default() -> Self { PeerConnectionStatus::Dialing { diff --git a/beacon_node/eth2-libp2p/src/peer_manager/peer_sync_status.rs b/beacon_node/eth2-libp2p/src/peer_manager/peer_sync_status.rs new file mode 100644 index 0000000000..0ce4f6ce9c --- /dev/null +++ b/beacon_node/eth2-libp2p/src/peer_manager/peer_sync_status.rs @@ -0,0 +1,104 @@ +//! Handles individual sync status for peers. + +use serde::Serialize; +use types::{Epoch, Hash256, Slot}; + +#[derive(Clone, Debug, Serialize)] +/// The current sync status of the peer. +pub enum PeerSyncStatus { + /// At the current state as our node or ahead of us. + Synced { info: SyncInfo }, + /// The peer has greater knowledge about the canonical chain than we do. + Advanced { info: SyncInfo }, + /// Is behind our current head and not useful for block downloads. + Behind { info: SyncInfo }, + /// Not currently known as a STATUS handshake has not occurred. + Unknown, +} + +/// This is stored inside the PeerSyncStatus and is very similar to `PeerSyncInfo` in the +/// `Network` crate. +#[derive(Clone, Debug, Serialize)] +pub struct SyncInfo { + pub status_head_slot: Slot, + pub status_head_root: Hash256, + pub status_finalized_epoch: Epoch, + pub status_finalized_root: Hash256, +} + +impl PeerSyncStatus { + /// Returns true if the peer has advanced knowledge of the chain. + pub fn is_advanced(&self) -> bool { + match self { + PeerSyncStatus::Advanced { .. } => true, + _ => false, + } + } + + /// Returns true if the peer is up to date with the current chain. + pub fn is_synced(&self) -> bool { + match self { + PeerSyncStatus::Synced { .. } => true, + _ => false, + } + } + + /// Returns true if the peer is behind the current chain. + pub fn is_behind(&self) -> bool { + match self { + PeerSyncStatus::Behind { .. } => true, + _ => false, + } + } + + /// Updates the sync state given a fully synced peer. + /// Returns true if the state has changed. + pub fn update_synced(&mut self, info: SyncInfo) -> bool { + let new_state = PeerSyncStatus::Synced { info }; + + match self { + PeerSyncStatus::Synced { .. } => { + *self = new_state; + false // state was not updated + } + _ => { + *self = new_state; + true + } + } + } + + /// Updates the sync state given a peer that is further ahead in the chain than us. + /// Returns true if the state has changed. + pub fn update_ahead(&mut self, info: SyncInfo) -> bool { + let new_state = PeerSyncStatus::Advanced { info }; + + match self { + PeerSyncStatus::Advanced { .. } => { + *self = new_state; + false // state was not updated + } + _ => { + *self = new_state; + true + } + } + } + + /// Updates the sync state given a peer that is behind us in the chain. + /// Returns true if the state has changed. + pub fn update_behind(&mut self, info: SyncInfo) -> bool { + let new_state = PeerSyncStatus::Behind { info }; + + match self { + PeerSyncStatus::Behind { .. } => { + *self = new_state; + false // state was not updated + } + _ => { + *self = new_state; + true + } + } + } +} diff --git a/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs index 091233b3e2..066fa3736b 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs @@ -1,4 +1,5 @@ -use super::peer_info::{PeerConnectionStatus, PeerInfo, PeerSyncStatus}; +use super::peer_info::{PeerConnectionStatus, PeerInfo}; +use super::peer_sync_status::PeerSyncStatus; use crate::rpc::methods::MetaData; use crate::PeerId; use slog::{crit, warn}; @@ -101,7 +102,7 @@ impl PeerDB { self.peers .iter() .filter(|(_, info)| { - if let PeerSyncStatus::Synced { .. } = info.sync_status { + if info.sync_status.is_synced() || info.sync_status.is_advanced() { return info.connection_status.is_connected(); } false diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index d0458e7776..d3c3d98cc9 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -1,5 +1,5 @@ use crate::service::NetworkMessage; -use crate::sync::SyncMessage; +use crate::sync::{PeerSyncInfo, SyncMessage}; use beacon_chain::{ AttestationProcessingOutcome, AttestationType, BeaconChain, BeaconChainTypes, BlockError, BlockProcessingOutcome, GossipVerifiedBlock, @@ -23,34 +23,6 @@ use types::{ /// Otherwise we queue it. pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; -/// Keeps track of syncing information for known connected peers. -#[derive(Clone, Copy, Debug)] -pub struct PeerSyncInfo { - fork_digest: [u8; 4], - pub finalized_root: Hash256, - pub finalized_epoch: Epoch, - pub head_root: Hash256, - pub head_slot: Slot, -} - -impl From for PeerSyncInfo { - fn from(status: StatusMessage) -> PeerSyncInfo { - PeerSyncInfo { - fork_digest: status.fork_digest, - finalized_root: status.finalized_root, - finalized_epoch: status.finalized_epoch, - head_root: status.head_root, - head_slot: status.head_slot, - } - } -} - -impl PeerSyncInfo { - pub fn from_chain(chain: &Arc>) -> Option { - Some(Self::from(status_message(chain)?)) - } -} - /// Processes validated messages from the network. It relays necessary data to the syncing thread /// and processes blocks from the pubsub network. pub struct Processor { diff --git a/beacon_node/network/src/sync/block_processor.rs b/beacon_node/network/src/sync/block_processor.rs index cc546ea081..8c53869e40 100644 --- a/beacon_node/network/src/sync/block_processor.rs +++ b/beacon_node/network/src/sync/block_processor.rs @@ -3,7 +3,7 @@ use crate::sync::manager::SyncMessage; use crate::sync::range_sync::{BatchId, ChainId}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, ChainSegmentResult}; use eth2_libp2p::PeerId; -use slog::{crit, debug, error, trace, warn}; +use slog::{debug, error, trace, warn}; use std::sync::{Arc, Weak}; use tokio::sync::mpsc; use types::SignedBeaconBlock; @@ -103,8 +103,6 @@ pub fn spawn_block_processor( } /// Helper function to process blocks batches which only consumes the chain and blocks to process. -// TODO: Verify the fork choice logic and the correct error handling from `process_chain_segment`. -// Ensure fork-choice doesn't need to be run during the failed errors. fn process_blocks< 'a, T: BeaconChainTypes, @@ -126,7 +124,6 @@ fn process_blocks< "count" => imported_blocks, ); // Batch completed successfully with at least one block, run fork choice. - // TODO: Verify this logic run_fork_choice(chain, log); } @@ -136,8 +133,10 @@ fn process_blocks< imported_blocks, error, } => { - let r = handle_failed_chain_segment(chain, imported_blocks, error, log); - + let r = handle_failed_chain_segment(error, log); + if imported_blocks > 0 { + run_fork_choice(chain, log); + } (imported_blocks, r) } }; @@ -167,31 +166,16 @@ fn run_fork_choice(chain: Arc>, log: &slog:: } /// Helper function to handle a `BlockError` from `process_chain_segment` -fn handle_failed_chain_segment( - chain: Arc>, - imported_blocks: usize, - error: BlockError, - log: &slog::Logger, -) -> Result<(), String> { +fn handle_failed_chain_segment(error: BlockError, log: &slog::Logger) -> Result<(), String> { match error { BlockError::ParentUnknown(parent) => { // blocks should be sequential and all parents should exist - warn!( - log, "Parent block is unknown"; - "parent_root" => format!("{}", parent), - ); - - // NOTE: logic from master. TODO: check - if imported_blocks > 0 { - run_fork_choice(chain, log); - } Err(format!("Block has an unknown parent: {}", parent)) } BlockError::BlockIsAlreadyKnown => { - // TODO: Check handling of this - crit!(log, "Unknown handling of block error"); - + // This can happen for many reasons. Head sync's can download multiples and parent + // lookups can download blocks before range sync Ok(()) } BlockError::FutureSlot { @@ -207,10 +191,6 @@ fn handle_failed_chain_segment( "block_slot" => block_slot, "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, ); - // NOTE: logic from master. TODO: check - if imported_blocks > 0 { - run_fork_choice(chain, log); - } } else { // The block is in the future, but not too far. debug!( @@ -227,26 +207,15 @@ fn handle_failed_chain_segment( )) } BlockError::WouldRevertFinalizedSlot { .. } => { - //TODO: Check handling. Run fork choice? - debug!( - log, "Finalized or earlier block processed"; - ); - // block reached our finalized slot or was earlier, move to the next block - // TODO: How does this logic happen for the chain segment. We would want to - // continue processing in this case. + debug!( log, "Finalized or earlier block processed";); Ok(()) } BlockError::GenesisBlock => { - debug!( - log, "Genesis block was processed"; - ); - // TODO: Similarly here. Prefer to continue processing. - + debug!(log, "Genesis block was processed"); Ok(()) } BlockError::BeaconChainError(e) => { - // TODO: Run fork choice? warn!( log, "BlockProcessingFailure"; "msg" => "unexpected condition in processing block.", @@ -256,11 +225,6 @@ fn handle_failed_chain_segment( Err(format!("Internal error whilst processing block: {:?}", e)) } other => { - // TODO: Run fork choice? - // NOTE: logic from master. TODO: check - if imported_blocks > 0 { - run_fork_choice(chain, log); - } warn!( log, "Invalid block received"; "msg" => "peer sent invalid block", diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index db826b54c4..c67ef22c40 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -35,16 +35,15 @@ use super::block_processor::{spawn_block_processor, BatchProcessResult, ProcessId}; use super::network_context::SyncNetworkContext; +use super::peer_sync_info::{PeerSyncInfo, PeerSyncType}; use super::range_sync::{BatchId, ChainId, RangeSync}; -use crate::router::processor::PeerSyncInfo; use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use eth2_libp2p::rpc::{methods::*, RequestId}; use eth2_libp2p::types::NetworkGlobals; -use eth2_libp2p::{PeerId, PeerSyncStatus}; +use eth2_libp2p::PeerId; use fnv::FnvHashMap; use futures::prelude::*; -use rand::seq::SliceRandom; use slog::{crit, debug, error, info, trace, warn, Logger}; use smallvec::SmallVec; use std::boxed::Box; @@ -56,7 +55,7 @@ use types::{EthSpec, Hash256, SignedBeaconBlock, Slot}; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a /// fully sync'd peer. -const SLOT_IMPORT_TOLERANCE: usize = 20; +pub const SLOT_IMPORT_TOLERANCE: usize = 20; /// How many attempts we try to find a parent of a block before we give up trying . const PARENT_FAIL_TOLERANCE: usize = 5; /// The maximum depth we will search for a parent block. In principle we should have sync'd any @@ -153,7 +152,7 @@ pub struct SyncManager { /// received or not. /// /// The flag allows us to determine if the peer returned data or sent us nothing. - single_block_lookups: FnvHashMap, + single_block_lookups: FnvHashMap, /// The logger for the import manager. log: Logger, @@ -162,6 +161,23 @@ pub struct SyncManager { sync_send: mpsc::UnboundedSender>, } +/// Object representing a single block lookup request. +struct SingleBlockRequest { + /// The hash of the requested block. + pub hash: Hash256, + /// Whether a block was received from this request, or the peer returned an empty response. + pub block_returned: bool, +} + +impl SingleBlockRequest { + pub fn new(hash: Hash256) -> Self { + Self { + hash, + block_returned: false, + } + } +} + /// Spawns a new `SyncManager` thread which has a weak reference to underlying beacon /// chain. This allows the chain to be /// dropped during the syncing process which will gracefully end the `SyncManager`. @@ -225,7 +241,7 @@ impl SyncManager { /// ours that we consider it fully sync'd with respect to our current chain. fn add_peer(&mut self, peer_id: PeerId, remote: PeerSyncInfo) { // ensure the beacon chain still exists - let local = match PeerSyncInfo::from_chain(&self.chain) { + let local_peer_info = match PeerSyncInfo::from_chain(&self.chain) { Some(local) => local, None => { return error!( @@ -236,29 +252,33 @@ impl SyncManager { } }; - // If a peer is within SLOT_IMPORT_TOLERANCE from our head slot, ignore a batch/range sync, - // consider it a fully-sync'd peer. - if remote.head_slot.sub(local.head_slot).as_usize() < SLOT_IMPORT_TOLERANCE { - trace!(self.log, "Peer synced to our head found"; - "peer" => format!("{:?}", peer_id), - "peer_head_slot" => remote.head_slot, - "local_head_slot" => local.head_slot, - ); - self.synced_peer(&peer_id, remote.head_slot); - // notify the range sync that a peer has been added - self.range_sync.fully_synced_peer_found(); - return; - } - - // Check if the peer is significantly behind us. If within `SLOT_IMPORT_TOLERANCE` - // treat them as a fully synced peer. If not, ignore them in the sync process - if local.head_slot.sub(remote.head_slot).as_usize() < SLOT_IMPORT_TOLERANCE { - // Add the peer to our RangeSync - self.range_sync - .add_peer(&mut self.network, peer_id.clone(), remote); - self.synced_peer(&peer_id, remote.head_slot); - } else { - self.behind_peer(&peer_id, remote.head_slot); + match local_peer_info.peer_sync_type(&remote) { + PeerSyncType::FullySynced => { + trace!(self.log, "Peer synced to our head found"; + "peer" => format!("{:?}", peer_id), + "peer_head_slot" => remote.head_slot, + "local_head_slot" => local_peer_info.head_slot, + ); + self.synced_peer(&peer_id, remote); + // notify the range sync that a peer has been added + self.range_sync.fully_synced_peer_found(); + } + PeerSyncType::Advanced => { + trace!(self.log, "Useful peer for sync found"; + "peer" => format!("{:?}", peer_id), + "peer_head_slot" => remote.head_slot, + "local_head_slot" => local_peer_info.head_slot, + "remote_finalized_epoch" => local_peer_info.finalized_epoch, + "local_finalized_epoch" => remote.finalized_epoch, + ); + // Add the peer to our RangeSync + self.range_sync + .add_peer(&mut self.network, peer_id.clone(), remote); + self.advanced_peer(&peer_id, remote); + } + PeerSyncType::Behind => { + self.behind_peer(&peer_id, remote); + } } } @@ -280,12 +300,10 @@ impl SyncManager { // check if this is a single block lookup - i.e we were searching for a specific hash let mut single_block_hash = None; - if let Some((block_hash, data_received)) = - self.single_block_lookups.get_mut(&request_id) - { + if let Some(block_request) = self.single_block_lookups.get_mut(&request_id) { // update the state of the lookup indicating a block was received from the peer - *data_received = true; - single_block_hash = Some(block_hash.clone()); + block_request.block_returned = true; + single_block_hash = Some(block_request.hash.clone()); } if let Some(block_hash) = single_block_hash { self.single_block_lookup_response(peer_id, block, block_hash); @@ -316,12 +334,10 @@ impl SyncManager { // this is a stream termination // stream termination for a single block lookup, remove the key - if let Some((block_hash, data_received)) = - self.single_block_lookups.remove(&request_id) - { + if let Some(single_block_request) = self.single_block_lookups.remove(&request_id) { // the peer didn't respond with a block that it referenced - if !data_received { - warn!(self.log, "Peer didn't respond with a block it referenced"; "referenced_block_hash" => format!("{}", block_hash), "peer_id" => format!("{}", peer_id)); + if !single_block_request.block_returned { + warn!(self.log, "Peer didn't respond with a block it referenced"; "referenced_block_hash" => format!("{}", single_block_request.hash), "peer_id" => format!("{}", peer_id)); self.network.downvote_peer(peer_id); } return; @@ -410,9 +426,24 @@ impl SyncManager { /// A block has been sent to us that has an unknown parent. This begins a parent lookup search /// to find the parent or chain of parents that match our current chain. fn add_unknown_block(&mut self, peer_id: PeerId, block: SignedBeaconBlock) { - // If we are not synced ignore the block + // If we are not synced or within SLOT_IMPORT_TOLERANCE of the block, ignore if !self.network_globals.sync_state.read().is_synced() { - return; + let head_slot = self + .chain + .head_info() + .map(|info| info.slot) + .unwrap_or_else(|_| Slot::from(0u64)); + let unknown_block_slot = block.message.slot; + + // if the block is far in the future, ignore it. If its within the slot tolerance of + // our current head, regardless of the syncing state, fetch it. + if (head_slot >= unknown_block_slot + && head_slot.sub(unknown_block_slot).as_usize() > SLOT_IMPORT_TOLERANCE) + || (head_slot < unknown_block_slot + && unknown_block_slot.sub(head_slot).as_usize() > SLOT_IMPORT_TOLERANCE) + { + return; + } } // Make sure this block is not already being searched for @@ -446,13 +477,23 @@ impl SyncManager { return; } + // Do not re-request a block that is already being requested + if self + .single_block_lookups + .values() + .find(|single_block_request| single_block_request.hash == block_hash) + .is_some() + { + return; + } + let request = BlocksByRootRequest { block_roots: vec![block_hash], }; if let Ok(request_id) = self.network.blocks_by_root_request(peer_id, request) { self.single_block_lookups - .insert(request_id, (block_hash, false)); + .insert(request_id, SingleBlockRequest::new(block_hash)); } } @@ -488,16 +529,10 @@ impl SyncManager { } /// Updates the syncing state of a peer to be synced. - fn synced_peer(&mut self, peer_id: &PeerId, status_head_slot: Slot) { + fn synced_peer(&mut self, peer_id: &PeerId, sync_info: PeerSyncInfo) { if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - match peer_info.sync_status { - PeerSyncStatus::Synced { .. } => { - peer_info.sync_status = PeerSyncStatus::Synced { status_head_slot } - } // just update block - PeerSyncStatus::Behind { .. } | PeerSyncStatus::Unknown => { - peer_info.sync_status = PeerSyncStatus::Synced { status_head_slot }; - debug!(self.log, "Peer transitioned to synced status"; "peer_id" => format!("{}", peer_id)); - } + if peer_info.sync_status.update_synced(sync_info.into()) { + debug!(self.log, "Peer transitioned to synced status"; "peer_id" => format!("{}", peer_id)); } } else { crit!(self.log, "Status'd peer is unknown"; "peer_id" => format!("{}", peer_id)); @@ -506,21 +541,24 @@ impl SyncManager { } /// Updates the syncing state of a peer to be behind. - fn behind_peer(&mut self, peer_id: &PeerId, status_head_slot: Slot) { + fn advanced_peer(&mut self, peer_id: &PeerId, sync_info: PeerSyncInfo) { if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - match peer_info.sync_status { - PeerSyncStatus::Synced { .. } => { - debug!(self.log, "Peer transitioned to from synced state to behind"; "peer_id" => format!("{}", peer_id), "head_slot" => status_head_slot); - peer_info.sync_status = PeerSyncStatus::Behind { status_head_slot } - } - PeerSyncStatus::Behind { .. } => { - peer_info.sync_status = PeerSyncStatus::Behind { status_head_slot } - } // just update + let advanced_slot = sync_info.head_slot; + if peer_info.sync_status.update_ahead(sync_info.into()) { + debug!(self.log, "Peer transitioned to from synced state to ahead"; "peer_id" => format!("{}", peer_id), "head_slot" => advanced_slot); + } + } else { + crit!(self.log, "Status'd peer is unknown"; "peer_id" => format!("{}", peer_id)); + } + self.update_sync_state(); + } - PeerSyncStatus::Unknown => { - debug!(self.log, "Peer transitioned to behind sync status"; "peer_id" => format!("{}", peer_id), "head_slot" => status_head_slot); - peer_info.sync_status = PeerSyncStatus::Behind { status_head_slot } - } + /// Updates the syncing state of a peer to be behind. + fn behind_peer(&mut self, peer_id: &PeerId, sync_info: PeerSyncInfo) { + if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { + let behind_slot = sync_info.head_slot; + if peer_info.sync_status.update_behind(sync_info.into()) { + debug!(self.log, "Peer transitioned to from synced state to behind"; "peer_id" => format!("{}", peer_id), "head_slot" => behind_slot); } } else { crit!(self.log, "Status'd peer is unknown"; "peer_id" => format!("{}", peer_id)); @@ -665,20 +703,10 @@ impl SyncManager { let request = BlocksByRootRequest { block_roots: vec![parent_hash], }; - // select a random fully synced peer to attempt to download the parent block - let available_peers = self - .network_globals - .peers - .read() - .synced_peers() - .cloned() - .collect::>(); - let peer_id = if let Some(peer_id) = available_peers.choose(&mut rand::thread_rng()) { - (*peer_id).clone() - } else { - // there were no peers to choose from. We drop the lookup request - return; - }; + + // We continue to search for the chain of blocks from the same peer. Other peers are not + // guaranteed to have this chain of blocks. + let peer_id = parent_request.last_submitted_peer.clone(); if let Ok(request_id) = self.network.blocks_by_root_request(peer_id, request) { // if the request was successful add the queue back into self diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index 26274ef97f..2e68dc6e81 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -4,6 +4,8 @@ mod block_processor; pub mod manager; mod network_context; +mod peer_sync_info; mod range_sync; pub use manager::SyncMessage; +pub use peer_sync_info::PeerSyncInfo; diff --git a/beacon_node/network/src/sync/peer_sync_info.rs b/beacon_node/network/src/sync/peer_sync_info.rs new file mode 100644 index 0000000000..724aa567dd --- /dev/null +++ b/beacon_node/network/src/sync/peer_sync_info.rs @@ -0,0 +1,114 @@ +use super::manager::SLOT_IMPORT_TOLERANCE; +use crate::router::processor::status_message; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use eth2_libp2p::rpc::methods::*; +use eth2_libp2p::SyncInfo; +use std::ops::Sub; +use std::sync::Arc; +use types::{Epoch, Hash256, Slot}; + +/// +/// Keeps track of syncing information for known connected peers. +#[derive(Clone, Copy, Debug)] +pub struct PeerSyncInfo { + pub fork_digest: [u8; 4], + pub finalized_root: Hash256, + pub finalized_epoch: Epoch, + pub head_root: Hash256, + pub head_slot: Slot, +} + +/// The type of peer relative to our current state. +pub enum PeerSyncType { + /// The peer is on our chain and is fully synced with respect to our chain. + FullySynced, + /// The peer has a greater knowledge of the chain that us that warrants a full sync. + Advanced, + /// A peer is behind in the sync and not useful to us for downloading blocks. + Behind, +} + +impl From for PeerSyncInfo { + fn from(status: StatusMessage) -> PeerSyncInfo { + PeerSyncInfo { + fork_digest: status.fork_digest, + finalized_root: status.finalized_root, + finalized_epoch: status.finalized_epoch, + head_root: status.head_root, + head_slot: status.head_slot, + } + } +} + +impl Into for PeerSyncInfo { + fn into(self) -> SyncInfo { + SyncInfo { + status_head_slot: self.head_slot, + status_head_root: self.head_root, + status_finalized_epoch: self.finalized_epoch, + status_finalized_root: self.finalized_root, + } + } +} + +impl PeerSyncInfo { + /// Derives the peer sync information from a beacon chain. + pub fn from_chain(chain: &Arc>) -> Option { + Some(Self::from(status_message(chain)?)) + } + + /// Given another peer's `PeerSyncInfo` this will determine how useful that peer is for us in + /// regards to syncing. This returns the peer sync type that can then be handled by the + /// `SyncManager`. + pub fn peer_sync_type(&self, remote_peer_sync_info: &PeerSyncInfo) -> PeerSyncType { + // check if the peer is fully synced with our current chain + if self.is_fully_synced_peer(remote_peer_sync_info) { + PeerSyncType::FullySynced + } + // if not, check if the peer is ahead of our chain + else if self.is_ahead_peer(remote_peer_sync_info) { + PeerSyncType::Advanced + } else { + // the peer must be behind and not useful + PeerSyncType::Behind + } + } + + /// Determines if another peer is fully synced with the current peer. + /// + /// A fully synced peer is a peer whose finalized epoch and hash match our own and their + /// head is within SLOT_IMPORT_TOLERANCE of our own. + /// In this case we ignore any batch/range syncing. + fn is_fully_synced_peer(&self, remote: &PeerSyncInfo) -> bool { + // ensure we are on the same chain, with minor differing heads + if remote.finalized_epoch == self.finalized_epoch + && remote.finalized_root == self.finalized_root + { + // that we are within SLOT_IMPORT_TOLERANCE of our two heads + if (self.head_slot >= remote.head_slot + && self.head_slot.sub(remote.head_slot).as_usize() <= SLOT_IMPORT_TOLERANCE) + || (self.head_slot < remote.head_slot) + && remote.head_slot.sub(self.head_slot).as_usize() <= SLOT_IMPORT_TOLERANCE + { + return true; + } + } + false + } + + /// Determines if a peer has more knowledge about the current chain than we do. + /// + /// There are two conditions here. + /// 1) The peer could have a head slot that is greater + /// than SLOT_IMPORT_TOLERANCE of our current head. + /// 2) The peer has a greater finalized slot/epoch than our own. + fn is_ahead_peer(&self, remote: &PeerSyncInfo) -> bool { + if remote.head_slot.sub(self.head_slot).as_usize() > SLOT_IMPORT_TOLERANCE + || self.finalized_epoch < remote.finalized_epoch + { + true + } else { + false + } + } +} diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 4fed36ae9a..5c93aa45a3 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -4,9 +4,9 @@ //! with this struct to to simplify the logic of the other layers of sync. use super::chain::{ChainSyncingState, SyncingChain}; -use crate::router::processor::PeerSyncInfo; use crate::sync::manager::SyncMessage; use crate::sync::network_context::SyncNetworkContext; +use crate::sync::PeerSyncInfo; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::{types::SyncState, NetworkGlobals, PeerId}; use slog::{debug, error, info}; @@ -110,10 +110,9 @@ impl ChainCollection { } /// Updates the global sync state and logs any changes. - fn update_sync_state(&mut self, state: RangeSyncState) { + pub fn update_sync_state(&mut self) { // if there is no range sync occurring, the state is either synced or not based on // connected peers. - self.state = state; if self.state == RangeSyncState::Idle { // there is no range sync, let the state of peers determine the global node sync state @@ -150,7 +149,8 @@ impl ChainCollection { if let RangeSyncState::Head { .. } = self.state { if self.head_chains.is_empty() { // Update the global network state to either synced or stalled. - self.update_sync_state(RangeSyncState::Idle); + self.state = RangeSyncState::Idle; + self.update_sync_state(); } } } @@ -165,13 +165,14 @@ impl ChainCollection { .head_info() .map(|info| info.slot) .unwrap_or_else(|_| Slot::from(0u64)); + // NOTE: This will modify the /node/syncing API to show current slot for all fields // while we update peers to look for new potentially HEAD chains. let temp_head_state = RangeSyncState::Head { start_slot: current_slot, head_slot: current_slot, }; - self.update_sync_state(temp_head_state); + self.state = temp_head_state; } } @@ -249,7 +250,7 @@ impl ChainCollection { head_slot: chain.target_head_slot, head_root: chain.target_head_root, }; - self.update_sync_state(state); + self.state = state; // Stop the current chain from syncing self.finalized_chains[index].stop_syncing(); @@ -269,11 +270,11 @@ impl ChainCollection { head_slot: chain.target_head_slot, head_root: chain.target_head_root, }; - self.update_sync_state(state); + self.state = state; } else { // There are no finalized chains, update the state. if self.head_chains.is_empty() { - self.update_sync_state(RangeSyncState::Idle); + self.state = RangeSyncState::Idle; } else { // for the syncing API, we find the minimal start_slot and the maximum // target_slot of all head chains to report back. @@ -291,7 +292,7 @@ impl ChainCollection { start_slot: min_slot, head_slot: max_slot, }; - self.update_sync_state(head_state); + self.state = head_state; } } } diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 0525909ef6..70efa4dce9 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -42,10 +42,10 @@ use super::chain::{ChainId, ProcessingResult}; use super::chain_collection::{ChainCollection, RangeSyncState}; use super::BatchId; -use crate::router::processor::PeerSyncInfo; use crate::sync::block_processor::BatchProcessResult; use crate::sync::manager::SyncMessage; use crate::sync::network_context::SyncNetworkContext; +use crate::sync::PeerSyncInfo; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::rpc::RequestId; use eth2_libp2p::{NetworkGlobals, PeerId}; @@ -169,6 +169,7 @@ impl RangeSync { // check if the new peer's addition will favour a new syncing chain. self.chains.update_finalized(network); + self.chains.update_sync_state(); } else { // there is no finalized chain that matches this peer's last finalized target // create a new finalized chain @@ -182,6 +183,7 @@ impl RangeSync { self.sync_send.clone(), ); self.chains.update_finalized(network); + self.chains.update_sync_state(); } } else { if self.chains.is_finalizing_sync() { @@ -215,6 +217,7 @@ impl RangeSync { ); } self.chains.update_finalized(network); + self.chains.update_sync_state(); } } @@ -272,15 +275,17 @@ impl RangeSync { Some((index, ProcessingResult::RemoveChain)) => { let chain = self.chains.remove_finalized_chain(index); debug!(self.log, "Finalized chain removed"; "start_slot" => chain.start_slot.as_u64(), "end_slot" => chain.target_head_slot.as_u64()); - // the chain is complete, re-status it's peers - chain.status_peers(network); - // update the state of the collection self.chains.update_finalized(network); - // set the state to a head sync, to inform the manager that we are awaiting a + // the chain is complete, re-status it's peers + chain.status_peers(network); + + // set the state to a head sync if there are no finalized chains, to inform the manager that we are awaiting a // head chain. self.chains.set_head_sync(); + // Update the global variables + self.chains.update_sync_state(); // if there are no more finalized chains, re-status all known peers awaiting a head // sync @@ -312,6 +317,8 @@ impl RangeSync { // update the state of the collection self.chains.update_finalized(network); + // update the global state and log any change + self.chains.update_sync_state(); } Some((_, ProcessingResult::KeepChain)) => {} None => { @@ -339,6 +346,8 @@ impl RangeSync { // update the state of the collection self.chains.update_finalized(network); + // update the global state and inform the user + self.chains.update_sync_state(); } /// When a peer gets removed, both the head and finalized chains need to be searched to check which pool the peer is in. The chain may also have a batch or batches awaiting diff --git a/tests/simulator/src/cli.rs b/tests/simulator/src/cli.rs index 4f3277fc11..444e670104 100644 --- a/tests/simulator/src/cli.rs +++ b/tests/simulator/src/cli.rs @@ -78,27 +78,31 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .short("s") .long("speedup") .takes_value(true) - .help("Speed up factor for eth1 blocks and slot production (default 15)"), + .default_value("15") + .help("Speed up factor for eth1 blocks and slot production"), ) .arg( Arg::with_name("initial_delay") .short("i") .long("initial_delay") .takes_value(true) - .help("Epoch delay for new beacon node to start syncing (default 50)"), + .default_value("5") + .help("Epoch delay for new beacon node to start syncing"), ) .arg( Arg::with_name("sync_timeout") .long("sync_timeout") .takes_value(true) - .help("Number of epochs after which newly added beacon nodes must be synced (default 10)"), + .default_value("10") + .help("Number of epochs after which newly added beacon nodes must be synced"), ) .arg( Arg::with_name("strategy") .long("strategy") .takes_value(true) + .default_value("all") .possible_values(&["one-node", "two-nodes", "mixed", "all"]) - .help("Sync verification strategy to run. (default all)"), + .help("Sync verification strategy to run."), ), ) } diff --git a/tests/simulator/src/sync_sim.rs b/tests/simulator/src/sync_sim.rs index 5f5d70e7f6..16a62fc327 100644 --- a/tests/simulator/src/sync_sim.rs +++ b/tests/simulator/src/sync_sim.rs @@ -12,14 +12,14 @@ use tokio::timer::Interval; use types::{Epoch, EthSpec}; pub fn run_syncing_sim(matches: &ArgMatches) -> Result<(), String> { - let initial_delay = value_t!(matches, "initial_delay", u64).unwrap_or(50); - let sync_delay = value_t!(matches, "sync_delay", u64).unwrap_or(10); - let speed_up_factor = value_t!(matches, "speedup", u64).unwrap_or(15); - let strategy = value_t!(matches, "strategy", String).unwrap_or("all".into()); + let initial_delay = value_t!(matches, "initial_delay", u64).unwrap(); + let sync_timeout = value_t!(matches, "sync_timeout", u64).unwrap(); + let speed_up_factor = value_t!(matches, "speedup", u64).unwrap(); + let strategy = value_t!(matches, "strategy", String).unwrap(); println!("Syncing Simulator:"); println!(" initial_delay:{}", initial_delay); - println!(" sync delay:{}", sync_delay); + println!(" sync timeout: {}", sync_timeout); println!(" speed up factor:{}", speed_up_factor); println!(" strategy:{}", strategy); @@ -29,7 +29,7 @@ pub fn run_syncing_sim(matches: &ArgMatches) -> Result<(), String> { syncing_sim( speed_up_factor, initial_delay, - sync_delay, + sync_timeout, strategy, log_level, log_format, @@ -39,7 +39,7 @@ pub fn run_syncing_sim(matches: &ArgMatches) -> Result<(), String> { fn syncing_sim( speed_up_factor: u64, initial_delay: u64, - sync_delay: u64, + sync_timeout: u64, strategy: String, log_level: &str, log_format: Option<&str>, @@ -108,7 +108,7 @@ fn syncing_sim( beacon_config.clone(), slot_duration, initial_delay, - sync_delay, + sync_timeout, )) .join(final_future) .map(|_| network)