Misc Peer sync info adjustments (#1896)

## Issue Addressed
#1856 

## Proposed Changes
- For clarity, the router's processor now only decides if a peer is compatible and it disconnects it or sends it to sync accordingly. No logic here regarding how useful is the peer. 
- Update peer_sync_info's rules
- Add an `IrrelevantPeer` sync status to account for incompatible peers (maybe this should be "IncompatiblePeer" now that I think about it?) this state is update upon receiving an internal goodbye in the peer manager
- Misc code cleanups
- Reduce the need to create `StatusMessage`s (and thus, `Arc` accesses )
- Add missing calls to update the global sync state

The overall effect should be:
- More peers recognized as Behind, and less as Unknown
- Peers identified as incompatible
This commit is contained in:
divma
2020-11-13 09:00:10 +00:00
parent 46a06069c6
commit 8a16548715
11 changed files with 222 additions and 379 deletions

View File

@@ -34,14 +34,16 @@
//! search for the block and subsequently search for parents if needed.
use super::network_context::SyncNetworkContext;
use super::peer_sync_info::{PeerSyncInfo, PeerSyncType};
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
use super::range_sync::{ChainId, RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
use super::RequestId;
use crate::beacon_processor::{ProcessId, WorkEvent as BeaconWorkEvent};
use crate::router::processor::status_message;
use crate::service::NetworkMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError};
use eth2_libp2p::rpc::{methods::MAX_REQUEST_BLOCKS, BlocksByRootRequest, GoodbyeReason};
use eth2_libp2p::types::{NetworkGlobals, SyncState};
use eth2_libp2p::SyncInfo;
use eth2_libp2p::{PeerAction, PeerId};
use fnv::FnvHashMap;
use lru_cache::LRUCache;
@@ -73,7 +75,7 @@ const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2;
/// A message than can be sent to the sync manager thread.
pub enum SyncMessage<T: EthSpec> {
/// A useful peer has been discovered.
AddPeer(PeerId, PeerSyncInfo),
AddPeer(PeerId, SyncInfo),
/// A `BlocksByRange` response has been received.
BlocksByRangeResponse {
@@ -254,62 +256,32 @@ impl<T: BeaconChainTypes> SyncManager<T> {
///
/// If the peer is within the `SLOT_IMPORT_TOLERANCE`, then it's head is sufficiently close to
/// ours that we consider it fully sync'd with respect to our current chain.
fn add_peer(&mut self, peer_id: PeerId, remote: PeerSyncInfo) {
fn add_peer(&mut self, peer_id: PeerId, remote: SyncInfo) {
// ensure the beacon chain still exists
let local_peer_info = match PeerSyncInfo::from_chain(&self.chain) {
Some(local) => local,
None => {
return error!(
self.log,
"Failed to get peer sync info";
"msg" => "likely due to head lock contention"
)
let local = match status_message(&self.chain) {
Ok(status) => SyncInfo {
head_slot: status.head_slot,
head_root: status.head_root,
finalized_epoch: status.finalized_epoch,
finalized_root: status.finalized_root,
},
Err(e) => {
return error!(self.log, "Failed to get peer sync info";
"msg" => "likely due to head lock contention", "err" => ?e)
}
};
match local_peer_info.peer_sync_type(&remote) {
PeerSyncType::FullySynced => {
trace!(self.log, "Peer synced to our head found";
"peer" => %peer_id,
"peer_head_slot" => remote.head_slot,
"local_head_slot" => local_peer_info.head_slot,
);
self.synced_peer(&peer_id, remote);
}
PeerSyncType::Advanced => {
trace!(self.log, "Useful peer for sync found";
"peer" => %peer_id,
"peer_head_slot" => remote.head_slot,
"local_head_slot" => local_peer_info.head_slot,
"peer_finalized_epoch" => remote.finalized_epoch,
"local_finalized_epoch" => local_peer_info.finalized_epoch,
);
let sync_type = remote_sync_type(&local, &remote, &self.chain);
// There are few cases to handle here:
//
// - A peer could appear advanced if our fork choice has rejected their version of
// the chain. If we know of their head slot, we consider this peer fully synced.
// - A peer could have just advanced to the next epoch and have a new finalized
// epoch that is currently ahead of ours. If their finalized epoch is ahead of ours
// by one and their head_slot is within the slot tolerance, consider this peer
// fully synced.
// update the state of the peer.
self.update_peer_sync_state(&peer_id, &local, &remote, &sync_type);
if (self.chain.fork_choice.read().contains_block(&remote.head_root)) || // the first case
(remote.finalized_epoch.sub(local_peer_info.finalized_epoch) == 1 && remote.head_slot.sub(local_peer_info.head_slot) < SLOT_IMPORT_TOLERANCE as u64)
// the second case
{
self.synced_peer(&peer_id, remote);
} else {
// Add the peer to our RangeSync
self.range_sync
.add_peer(&mut self.network, peer_id.clone(), remote);
self.advanced_peer(&peer_id, remote);
}
}
PeerSyncType::Behind => {
self.behind_peer(&peer_id, remote);
}
if matches!(sync_type, PeerSyncType::Advanced) {
self.range_sync
.add_peer(&mut self.network, local, peer_id, remote);
}
self.update_sync_state();
}
/// The response to a `BlocksByRoot` request.
@@ -616,6 +588,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// otherwise, this is a range sync issue, notify the range sync
self.range_sync
.inject_error(&mut self.network, peer_id, request_id);
self.update_sync_state();
}
fn peer_disconnect(&mut self, peer_id: &PeerId) {
@@ -623,47 +596,25 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.update_sync_state();
}
// TODO: Group these functions into one for cleaner code.
/// Updates the syncing state of a peer to be synced.
fn synced_peer(&mut self, peer_id: &PeerId, sync_info: PeerSyncInfo) {
/// Updates the syncing state of a peer.
fn update_peer_sync_state(
&mut self,
peer_id: &PeerId,
local_sync_info: &SyncInfo,
remote_sync_info: &SyncInfo,
sync_type: &PeerSyncType,
) {
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
let head_slot = sync_info.head_slot;
let finalized_epoch = sync_info.finalized_epoch;
if peer_info.sync_status.update_synced(sync_info.into()) {
debug!(self.log, "Peer transitioned sync state"; "new_state" => "synced", "peer_id" => %peer_id, "head_slot" => head_slot, "finalized_epoch" => finalized_epoch);
let new_state = sync_type.as_sync_status(remote_sync_info);
let rpr = new_state.to_string();
if peer_info.sync_status.update(new_state) {
debug!(self.log, "Peer transitioned sync state"; "peer_id" => %peer_id, "new_state" => rpr,
"our_head_slot" => local_sync_info.head_slot, "out_finalized_epoch" => local_sync_info.finalized_epoch,
"their_head_slot" => remote_sync_info.head_slot, "their_finalized_epoch" => remote_sync_info.finalized_epoch);
}
} else {
crit!(self.log, "Status'd peer is unknown"; "peer_id" => %peer_id);
}
self.update_sync_state();
}
/// Updates the syncing state of a peer to be advanced.
fn advanced_peer(&mut self, peer_id: &PeerId, sync_info: PeerSyncInfo) {
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
let head_slot = sync_info.head_slot;
let finalized_epoch = sync_info.finalized_epoch;
if peer_info.sync_status.update_advanced(sync_info.into()) {
debug!(self.log, "Peer transitioned sync state"; "new_state" => "advanced", "peer_id" => %peer_id, "head_slot" => head_slot, "finalized_epoch" => finalized_epoch);
}
} else {
crit!(self.log, "Status'd peer is unknown"; "peer_id" => %peer_id);
}
self.update_sync_state();
}
/// Updates the syncing state of a peer to be behind.
fn behind_peer(&mut self, peer_id: &PeerId, sync_info: PeerSyncInfo) {
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
let head_slot = sync_info.head_slot;
let finalized_epoch = sync_info.finalized_epoch;
if peer_info.sync_status.update_behind(sync_info.into()) {
debug!(self.log, "Peer transitioned sync state"; "new_state" => "behind", "peer_id" => %peer_id, "head_slot" => head_slot, "finalized_epoch" => finalized_epoch);
}
} else {
crit!(self.log, "Status'd peer is unknown"; "peer_id" => %peer_id);
}
self.update_sync_state();
}
/// Updates the global sync state and logs any changes.
@@ -921,6 +872,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
request_id,
beacon_block.map(|b| *b),
);
self.update_sync_state();
}
SyncMessage::BlocksByRootResponse {
peer_id,
@@ -953,6 +905,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
epoch,
result,
);
self.update_sync_state();
}
SyncMessage::ParentLookupFailed {
chain_head,

View File

@@ -7,7 +7,6 @@ mod peer_sync_info;
mod range_sync;
pub use manager::{BatchProcessResult, SyncMessage};
pub use peer_sync_info::PeerSyncInfo;
pub use range_sync::ChainId;
/// Type of id of rpc requests sent by sync

View File

@@ -63,7 +63,7 @@ impl<T: EthSpec> SyncNetworkContext<T> {
chain: Arc<BeaconChain<U>>,
peers: impl Iterator<Item = PeerId>,
) {
if let Some(status_message) = status_message(&chain) {
if let Ok(status_message) = status_message(&chain) {
for peer_id in peers {
debug!(
self.log,

View File

@@ -1,21 +1,7 @@
use super::manager::SLOT_IMPORT_TOLERANCE;
use crate::router::processor::status_message;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::rpc::*;
use eth2_libp2p::SyncInfo;
use std::ops::Sub;
use std::sync::Arc;
use types::{Epoch, Hash256, Slot};
/// Keeps track of syncing information for known connected peers.
#[derive(Clone, Copy, Debug)]
pub struct PeerSyncInfo {
pub fork_digest: [u8; 4],
pub finalized_root: Hash256,
pub finalized_epoch: Epoch,
pub head_root: Hash256,
pub head_slot: Slot,
}
use eth2_libp2p::{PeerSyncStatus, SyncInfo};
use std::cmp::Ordering;
/// The type of peer relative to our current state.
pub enum PeerSyncType {
@@ -27,82 +13,75 @@ pub enum PeerSyncType {
Behind,
}
impl From<StatusMessage> for PeerSyncInfo {
fn from(status: StatusMessage) -> PeerSyncInfo {
PeerSyncInfo {
fork_digest: status.fork_digest,
finalized_root: status.finalized_root,
finalized_epoch: status.finalized_epoch,
head_root: status.head_root,
head_slot: status.head_slot,
impl PeerSyncType {
pub fn as_sync_status(&self, info: &SyncInfo) -> PeerSyncStatus {
match self {
PeerSyncType::FullySynced => PeerSyncStatus::Synced { info: info.clone() },
PeerSyncType::Behind => PeerSyncStatus::Behind { info: info.clone() },
PeerSyncType::Advanced => PeerSyncStatus::Advanced { info: info.clone() },
}
}
}
impl Into<SyncInfo> for PeerSyncInfo {
fn into(self) -> SyncInfo {
SyncInfo {
status_head_slot: self.head_slot,
status_head_root: self.head_root,
status_finalized_epoch: self.finalized_epoch,
status_finalized_root: self.finalized_root,
}
}
}
pub fn remote_sync_type<T: BeaconChainTypes>(
local: &SyncInfo,
remote: &SyncInfo,
chain: &BeaconChain<T>,
) -> PeerSyncType {
// auxiliary variables for clarity: Inclusive boundaries of the range in which we consider a peer's
// head "near" ours.
let near_range_start = local.head_slot - SLOT_IMPORT_TOLERANCE as u64;
let near_range_end = local.head_slot + SLOT_IMPORT_TOLERANCE as u64;
impl PeerSyncInfo {
/// Derives the peer sync information from a beacon chain.
pub fn from_chain<T: BeaconChainTypes>(chain: &Arc<BeaconChain<T>>) -> Option<PeerSyncInfo> {
Some(Self::from(status_message(chain)?))
}
/// Given another peer's `PeerSyncInfo` this will determine how useful that peer is to us in
/// regards to syncing. This returns the peer sync type that can then be handled by the
/// `SyncManager`.
pub fn peer_sync_type(&self, remote_peer_sync_info: &PeerSyncInfo) -> PeerSyncType {
// check if the peer is fully synced with our current chain
if self.is_fully_synced_peer(remote_peer_sync_info) {
PeerSyncType::FullySynced
}
// if not, check if the peer is ahead of our chain
else if self.is_advanced_peer(remote_peer_sync_info) {
PeerSyncType::Advanced
} else {
// the peer must be behind and not useful
match remote.finalized_epoch.cmp(&local.finalized_epoch) {
Ordering::Less => {
// The node has a lower finalized epoch, their chain is not useful to us. There are two
// cases where a node can have a lower finalized epoch:
//
// ## The node is on the same chain
//
// If a node is on the same chain but has a lower finalized epoch, their head must be
// lower than ours. Therefore, we have nothing to request from them.
//
// ## The node is on a fork
//
// If a node is on a fork that has a lower finalized epoch, switching to that fork would
// cause us to revert a finalized block. This is not permitted, therefore we have no
// interest in their blocks.
//
// We keep these peers to allow them to sync from us.
PeerSyncType::Behind
}
}
/// Determines if another peer is fully synced with the current peer.
///
/// A fully synced peer is a peer whose finalized epoch and hash match our own and their
/// head is within SLOT_IMPORT_TOLERANCE of our own.
/// In this case we ignore any batch/range syncing.
fn is_fully_synced_peer(&self, remote: &PeerSyncInfo) -> bool {
// ensure we are on the same chain, with minor differing heads
if remote.finalized_epoch == self.finalized_epoch
&& remote.finalized_root == self.finalized_root
{
// that we are within SLOT_IMPORT_TOLERANCE of our two heads
if (self.head_slot >= remote.head_slot
&& self.head_slot.sub(remote.head_slot).as_usize() <= SLOT_IMPORT_TOLERANCE)
|| (self.head_slot < remote.head_slot
&& remote.head_slot.sub(self.head_slot).as_usize() <= SLOT_IMPORT_TOLERANCE)
Ordering::Equal => {
// NOTE: if a peer has our same `finalized_epoch` with a different `finalized_root`
// they are not considered relevant and won't be propagated to sync.
// Check if the peer is the peer is inside the tolerance range to be considered synced.
if remote.head_slot < near_range_start {
PeerSyncType::Behind
} else if remote.head_slot > near_range_end
&& !chain.fork_choice.read().contains_block(&remote.head_root)
{
return true;
// This peer has a head ahead enough of ours and we have no knowledge of their best
// block.
PeerSyncType::Advanced
} else {
// This peer is either in the tolerance range, or ahead us with an already rejected
// block.
PeerSyncType::FullySynced
}
}
Ordering::Greater => {
if (local.finalized_epoch + 1 == remote.finalized_epoch
&& near_range_start <= remote.head_slot
&& remote.head_slot <= near_range_end)
|| chain.fork_choice.read().contains_block(&remote.head_root)
{
// This peer is near enough to us to be considered synced, or
// we have already synced up to this peer's head
PeerSyncType::FullySynced
} else {
PeerSyncType::Advanced
}
}
false
}
/// Determines if a peer has more knowledge about the current chain than we do.
///
/// There are two conditions here.
/// 1) The peer could have a head slot that is greater
/// than SLOT_IMPORT_TOLERANCE of our current head.
/// 2) The peer has a greater finalized slot/epoch than our own.
fn is_advanced_peer(&self, remote: &PeerSyncInfo) -> bool {
remote.head_slot.sub(self.head_slot).as_usize() > SLOT_IMPORT_TOLERANCE
|| self.finalized_epoch < remote.finalized_epoch
}
}

View File

@@ -7,9 +7,9 @@ use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain};
use super::sync_type::RangeSyncType;
use crate::beacon_processor::WorkEvent as BeaconWorkEvent;
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::PeerSyncInfo;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::PeerId;
use eth2_libp2p::SyncInfo;
use fnv::FnvHashMap;
use slog::{crit, debug, error};
use smallvec::SmallVec;
@@ -185,35 +185,22 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
pub fn update(
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
awaiting_head_peers: &mut HashMap<PeerId, PeerSyncInfo>,
local: &SyncInfo,
awaiting_head_peers: &mut HashMap<PeerId, SyncInfo>,
beacon_processor_send: &mpsc::Sender<BeaconWorkEvent<T::EthSpec>>,
) {
let (local_finalized_epoch, local_head_epoch) =
match PeerSyncInfo::from_chain(&self.beacon_chain) {
None => {
return error!(
self.log,
"Failed to get peer sync info";
"msg" => "likely due to head lock contention"
)
}
Some(local) => (
local.finalized_epoch,
local.head_slot.epoch(T::EthSpec::slots_per_epoch()),
),
};
// Remove any outdated finalized/head chains
self.purge_outdated_chains(awaiting_head_peers);
self.purge_outdated_chains(local, awaiting_head_peers);
let local_head_epoch = local.head_slot.epoch(T::EthSpec::slots_per_epoch());
// Choose the best finalized chain if one needs to be selected.
self.update_finalized_chains(network, local_finalized_epoch, local_head_epoch);
self.update_finalized_chains(network, local.finalized_epoch, local_head_epoch);
if !matches!(self.state, RangeSyncState::Finalized(_)) {
// Handle head syncing chains if there are no finalized chains left.
self.update_head_chains(
network,
local_finalized_epoch,
local.finalized_epoch,
local_head_epoch,
awaiting_head_peers,
beacon_processor_send,
@@ -329,7 +316,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
network: &mut SyncNetworkContext<T::EthSpec>,
local_epoch: Epoch,
local_head_epoch: Epoch,
awaiting_head_peers: &mut HashMap<PeerId, PeerSyncInfo>,
awaiting_head_peers: &mut HashMap<PeerId, SyncInfo>,
beacon_processor_send: &mpsc::Sender<BeaconWorkEvent<T::EthSpec>>,
) {
// Include the awaiting head peers
@@ -404,19 +391,9 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
/// finalized block slot. Peers that would create outdated chains are removed too.
pub fn purge_outdated_chains(
&mut self,
awaiting_head_peers: &mut HashMap<PeerId, PeerSyncInfo>,
local_info: &SyncInfo,
awaiting_head_peers: &mut HashMap<PeerId, SyncInfo>,
) {
let local_info = match PeerSyncInfo::from_chain(&self.beacon_chain) {
Some(local) => local,
None => {
return error!(
self.log,
"Failed to get peer sync info";
"msg" => "likely due to head lock contention"
)
}
};
let local_finalized_slot = local_info
.finalized_epoch
.start_slot(T::EthSpec::slots_per_epoch());

View File

@@ -43,12 +43,13 @@ use super::chain::{ChainId, RemoveChain, SyncingChain};
use super::chain_collection::ChainCollection;
use super::sync_type::RangeSyncType;
use crate::beacon_processor::WorkEvent as BeaconWorkEvent;
use crate::router::processor::status_message;
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::BatchProcessResult;
use crate::sync::PeerSyncInfo;
use crate::sync::RequestId;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::PeerId;
use eth2_libp2p::SyncInfo;
use slog::{crit, debug, error, trace};
use std::collections::HashMap;
use std::sync::Arc;
@@ -63,7 +64,7 @@ pub struct RangeSync<T: BeaconChainTypes> {
beacon_chain: Arc<BeaconChain<T>>,
/// Last known sync info of our useful connected peers. We use this information to create Head
/// chains after all finalized chains have ended.
awaiting_head_peers: HashMap<PeerId, PeerSyncInfo>,
awaiting_head_peers: HashMap<PeerId, SyncInfo>,
/// A collection of chains that need to be downloaded. This stores any head or finalized chains
/// that need to be downloaded.
chains: ChainCollection<T>,
@@ -100,22 +101,15 @@ impl<T: BeaconChainTypes> RangeSync<T> {
pub fn add_peer(
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
local_info: SyncInfo,
peer_id: PeerId,
remote_info: PeerSyncInfo,
remote_info: SyncInfo,
) {
// evaluate which chain to sync from
// determine if we need to run a sync to the nearest finalized state or simply sync to
// its current head
let local_info = match PeerSyncInfo::from_chain(&self.beacon_chain) {
Some(local) => local,
None => {
return error!(self.log, "Failed to get peer sync info";
"msg" => "likely due to head lock contention")
}
};
// convenience variable
let remote_finalized_slot = remote_info
.finalized_epoch
@@ -146,6 +140,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
self.chains.update(
network,
&local_info,
&mut self.awaiting_head_peers,
&self.beacon_processor_send,
);
@@ -182,6 +177,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
);
self.chains.update(
network,
&local_info,
&mut self.awaiting_head_peers,
&self.beacon_processor_send,
);
@@ -345,9 +341,23 @@ impl<T: BeaconChainTypes> RangeSync<T> {
network.status_peers(self.beacon_chain.clone(), chain.peers());
let local = match status_message(&self.beacon_chain) {
Ok(status) => SyncInfo {
head_slot: status.head_slot,
head_root: status.head_root,
finalized_epoch: status.finalized_epoch,
finalized_root: status.finalized_root,
},
Err(e) => {
return error!(self.log, "Failed to get peer sync info";
"msg" => "likely due to head lock contention", "err" => ?e)
}
};
// update the state of the collection
self.chains.update(
network,
&local,
&mut self.awaiting_head_peers,
&self.beacon_processor_send,
);

View File

@@ -1,8 +1,8 @@
//! Contains logic about identifying which Sync to perform given PeerSyncInfo of ourselves and
//! of a remote.
use crate::sync::PeerSyncInfo;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::SyncInfo;
use std::sync::Arc;
/// The type of Range sync that should be done relative to our current state.
@@ -19,8 +19,8 @@ impl RangeSyncType {
/// `PeerSyncInfo`.
pub fn new<T: BeaconChainTypes>(
chain: &Arc<BeaconChain<T>>,
local_info: &PeerSyncInfo,
remote_info: &PeerSyncInfo,
local_info: &SyncInfo,
remote_info: &SyncInfo,
) -> RangeSyncType {
// Check for finalized chain sync
//