Update Syncing logic (#1042)

* Prevent duplicate parent block lookups

* Updates logic for handling re-status'd peers

* Allow block lookup if the block is close to head

* Correct ordering of sync logs

* Remove comments in block processer, clean up sim
This commit is contained in:
Age Manning
2020-04-22 23:58:10 +10:00
committed by GitHub
parent aacec7a4a7
commit 0b82e9f8a9
14 changed files with 386 additions and 201 deletions

View File

@@ -35,16 +35,15 @@
use super::block_processor::{spawn_block_processor, BatchProcessResult, ProcessId};
use super::network_context::SyncNetworkContext;
use super::peer_sync_info::{PeerSyncInfo, PeerSyncType};
use super::range_sync::{BatchId, ChainId, RangeSync};
use crate::router::processor::PeerSyncInfo;
use crate::service::NetworkMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome};
use eth2_libp2p::rpc::{methods::*, RequestId};
use eth2_libp2p::types::NetworkGlobals;
use eth2_libp2p::{PeerId, PeerSyncStatus};
use eth2_libp2p::PeerId;
use fnv::FnvHashMap;
use futures::prelude::*;
use rand::seq::SliceRandom;
use slog::{crit, debug, error, info, trace, warn, Logger};
use smallvec::SmallVec;
use std::boxed::Box;
@@ -56,7 +55,7 @@ use types::{EthSpec, Hash256, SignedBeaconBlock, Slot};
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
/// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a
/// fully sync'd peer.
const SLOT_IMPORT_TOLERANCE: usize = 20;
pub const SLOT_IMPORT_TOLERANCE: usize = 20;
/// How many attempts we try to find a parent of a block before we give up trying .
const PARENT_FAIL_TOLERANCE: usize = 5;
/// The maximum depth we will search for a parent block. In principle we should have sync'd any
@@ -153,7 +152,7 @@ pub struct SyncManager<T: BeaconChainTypes> {
/// received or not.
///
/// The flag allows us to determine if the peer returned data or sent us nothing.
single_block_lookups: FnvHashMap<RequestId, (Hash256, bool)>,
single_block_lookups: FnvHashMap<RequestId, SingleBlockRequest>,
/// The logger for the import manager.
log: Logger,
@@ -162,6 +161,23 @@ pub struct SyncManager<T: BeaconChainTypes> {
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
}
/// Object representing a single block lookup request.
struct SingleBlockRequest {
/// The hash of the requested block.
pub hash: Hash256,
/// Whether a block was received from this request, or the peer returned an empty response.
pub block_returned: bool,
}
impl SingleBlockRequest {
pub fn new(hash: Hash256) -> Self {
Self {
hash,
block_returned: false,
}
}
}
/// Spawns a new `SyncManager` thread which has a weak reference to underlying beacon
/// chain. This allows the chain to be
/// dropped during the syncing process which will gracefully end the `SyncManager`.
@@ -225,7 +241,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
/// ours that we consider it fully sync'd with respect to our current chain.
fn add_peer(&mut self, peer_id: PeerId, remote: PeerSyncInfo) {
// ensure the beacon chain still exists
let local = match PeerSyncInfo::from_chain(&self.chain) {
let local_peer_info = match PeerSyncInfo::from_chain(&self.chain) {
Some(local) => local,
None => {
return error!(
@@ -236,29 +252,33 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
};
// If a peer is within SLOT_IMPORT_TOLERANCE from our head slot, ignore a batch/range sync,
// consider it a fully-sync'd peer.
if remote.head_slot.sub(local.head_slot).as_usize() < SLOT_IMPORT_TOLERANCE {
trace!(self.log, "Peer synced to our head found";
"peer" => format!("{:?}", peer_id),
"peer_head_slot" => remote.head_slot,
"local_head_slot" => local.head_slot,
);
self.synced_peer(&peer_id, remote.head_slot);
// notify the range sync that a peer has been added
self.range_sync.fully_synced_peer_found();
return;
}
// Check if the peer is significantly behind us. If within `SLOT_IMPORT_TOLERANCE`
// treat them as a fully synced peer. If not, ignore them in the sync process
if local.head_slot.sub(remote.head_slot).as_usize() < SLOT_IMPORT_TOLERANCE {
// Add the peer to our RangeSync
self.range_sync
.add_peer(&mut self.network, peer_id.clone(), remote);
self.synced_peer(&peer_id, remote.head_slot);
} else {
self.behind_peer(&peer_id, remote.head_slot);
match local_peer_info.peer_sync_type(&remote) {
PeerSyncType::FullySynced => {
trace!(self.log, "Peer synced to our head found";
"peer" => format!("{:?}", peer_id),
"peer_head_slot" => remote.head_slot,
"local_head_slot" => local_peer_info.head_slot,
);
self.synced_peer(&peer_id, remote);
// notify the range sync that a peer has been added
self.range_sync.fully_synced_peer_found();
}
PeerSyncType::Advanced => {
trace!(self.log, "Useful peer for sync found";
"peer" => format!("{:?}", peer_id),
"peer_head_slot" => remote.head_slot,
"local_head_slot" => local_peer_info.head_slot,
"remote_finalized_epoch" => local_peer_info.finalized_epoch,
"local_finalized_epoch" => remote.finalized_epoch,
);
// Add the peer to our RangeSync
self.range_sync
.add_peer(&mut self.network, peer_id.clone(), remote);
self.advanced_peer(&peer_id, remote);
}
PeerSyncType::Behind => {
self.behind_peer(&peer_id, remote);
}
}
}
@@ -280,12 +300,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// check if this is a single block lookup - i.e we were searching for a specific hash
let mut single_block_hash = None;
if let Some((block_hash, data_received)) =
self.single_block_lookups.get_mut(&request_id)
{
if let Some(block_request) = self.single_block_lookups.get_mut(&request_id) {
// update the state of the lookup indicating a block was received from the peer
*data_received = true;
single_block_hash = Some(block_hash.clone());
block_request.block_returned = true;
single_block_hash = Some(block_request.hash.clone());
}
if let Some(block_hash) = single_block_hash {
self.single_block_lookup_response(peer_id, block, block_hash);
@@ -316,12 +334,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// this is a stream termination
// stream termination for a single block lookup, remove the key
if let Some((block_hash, data_received)) =
self.single_block_lookups.remove(&request_id)
{
if let Some(single_block_request) = self.single_block_lookups.remove(&request_id) {
// the peer didn't respond with a block that it referenced
if !data_received {
warn!(self.log, "Peer didn't respond with a block it referenced"; "referenced_block_hash" => format!("{}", block_hash), "peer_id" => format!("{}", peer_id));
if !single_block_request.block_returned {
warn!(self.log, "Peer didn't respond with a block it referenced"; "referenced_block_hash" => format!("{}", single_block_request.hash), "peer_id" => format!("{}", peer_id));
self.network.downvote_peer(peer_id);
}
return;
@@ -410,9 +426,24 @@ impl<T: BeaconChainTypes> SyncManager<T> {
/// A block has been sent to us that has an unknown parent. This begins a parent lookup search
/// to find the parent or chain of parents that match our current chain.
fn add_unknown_block(&mut self, peer_id: PeerId, block: SignedBeaconBlock<T::EthSpec>) {
// If we are not synced ignore the block
// If we are not synced or within SLOT_IMPORT_TOLERANCE of the block, ignore
if !self.network_globals.sync_state.read().is_synced() {
return;
let head_slot = self
.chain
.head_info()
.map(|info| info.slot)
.unwrap_or_else(|_| Slot::from(0u64));
let unknown_block_slot = block.message.slot;
// if the block is far in the future, ignore it. If its within the slot tolerance of
// our current head, regardless of the syncing state, fetch it.
if (head_slot >= unknown_block_slot
&& head_slot.sub(unknown_block_slot).as_usize() > SLOT_IMPORT_TOLERANCE)
|| (head_slot < unknown_block_slot
&& unknown_block_slot.sub(head_slot).as_usize() > SLOT_IMPORT_TOLERANCE)
{
return;
}
}
// Make sure this block is not already being searched for
@@ -446,13 +477,23 @@ impl<T: BeaconChainTypes> SyncManager<T> {
return;
}
// Do not re-request a block that is already being requested
if self
.single_block_lookups
.values()
.find(|single_block_request| single_block_request.hash == block_hash)
.is_some()
{
return;
}
let request = BlocksByRootRequest {
block_roots: vec![block_hash],
};
if let Ok(request_id) = self.network.blocks_by_root_request(peer_id, request) {
self.single_block_lookups
.insert(request_id, (block_hash, false));
.insert(request_id, SingleBlockRequest::new(block_hash));
}
}
@@ -488,16 +529,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
/// Updates the syncing state of a peer to be synced.
fn synced_peer(&mut self, peer_id: &PeerId, status_head_slot: Slot) {
fn synced_peer(&mut self, peer_id: &PeerId, sync_info: PeerSyncInfo) {
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
match peer_info.sync_status {
PeerSyncStatus::Synced { .. } => {
peer_info.sync_status = PeerSyncStatus::Synced { status_head_slot }
} // just update block
PeerSyncStatus::Behind { .. } | PeerSyncStatus::Unknown => {
peer_info.sync_status = PeerSyncStatus::Synced { status_head_slot };
debug!(self.log, "Peer transitioned to synced status"; "peer_id" => format!("{}", peer_id));
}
if peer_info.sync_status.update_synced(sync_info.into()) {
debug!(self.log, "Peer transitioned to synced status"; "peer_id" => format!("{}", peer_id));
}
} else {
crit!(self.log, "Status'd peer is unknown"; "peer_id" => format!("{}", peer_id));
@@ -506,21 +541,24 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
/// Updates the syncing state of a peer to be behind.
fn behind_peer(&mut self, peer_id: &PeerId, status_head_slot: Slot) {
fn advanced_peer(&mut self, peer_id: &PeerId, sync_info: PeerSyncInfo) {
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
match peer_info.sync_status {
PeerSyncStatus::Synced { .. } => {
debug!(self.log, "Peer transitioned to from synced state to behind"; "peer_id" => format!("{}", peer_id), "head_slot" => status_head_slot);
peer_info.sync_status = PeerSyncStatus::Behind { status_head_slot }
}
PeerSyncStatus::Behind { .. } => {
peer_info.sync_status = PeerSyncStatus::Behind { status_head_slot }
} // just update
let advanced_slot = sync_info.head_slot;
if peer_info.sync_status.update_ahead(sync_info.into()) {
debug!(self.log, "Peer transitioned to from synced state to ahead"; "peer_id" => format!("{}", peer_id), "head_slot" => advanced_slot);
}
} else {
crit!(self.log, "Status'd peer is unknown"; "peer_id" => format!("{}", peer_id));
}
self.update_sync_state();
}
PeerSyncStatus::Unknown => {
debug!(self.log, "Peer transitioned to behind sync status"; "peer_id" => format!("{}", peer_id), "head_slot" => status_head_slot);
peer_info.sync_status = PeerSyncStatus::Behind { status_head_slot }
}
/// Updates the syncing state of a peer to be behind.
fn behind_peer(&mut self, peer_id: &PeerId, sync_info: PeerSyncInfo) {
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
let behind_slot = sync_info.head_slot;
if peer_info.sync_status.update_behind(sync_info.into()) {
debug!(self.log, "Peer transitioned to from synced state to behind"; "peer_id" => format!("{}", peer_id), "head_slot" => behind_slot);
}
} else {
crit!(self.log, "Status'd peer is unknown"; "peer_id" => format!("{}", peer_id));
@@ -665,20 +703,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
let request = BlocksByRootRequest {
block_roots: vec![parent_hash],
};
// select a random fully synced peer to attempt to download the parent block
let available_peers = self
.network_globals
.peers
.read()
.synced_peers()
.cloned()
.collect::<Vec<_>>();
let peer_id = if let Some(peer_id) = available_peers.choose(&mut rand::thread_rng()) {
(*peer_id).clone()
} else {
// there were no peers to choose from. We drop the lookup request
return;
};
// We continue to search for the chain of blocks from the same peer. Other peers are not
// guaranteed to have this chain of blocks.
let peer_id = parent_request.last_submitted_peer.clone();
if let Ok(request_id) = self.network.blocks_by_root_request(peer_id, request) {
// if the request was successful add the queue back into self