Global Sync access (#994)

* Connect sync logic to network globals

* Add further sync info to sync status

* Build new syncing HTTP API methods

* Fix bug in updating sync state

* Highest slot is current slot

* Update book for syncing API
This commit is contained in:
Age Manning
2020-04-14 18:17:35 +10:00
committed by GitHub
parent db7847c34a
commit e5874f4565
22 changed files with 818 additions and 399 deletions

View File

@@ -39,20 +39,19 @@ use super::range_sync::{BatchId, RangeSync};
use crate::router::processor::PeerSyncInfo;
use crate::service::NetworkMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome};
use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::RequestId;
use eth2_libp2p::PeerId;
use eth2_libp2p::rpc::{methods::*, RequestId};
use eth2_libp2p::types::NetworkGlobals;
use eth2_libp2p::{PeerId, PeerSyncStatus};
use fnv::FnvHashMap;
use futures::prelude::*;
use rand::seq::SliceRandom;
use slog::{crit, debug, error, info, trace, warn, Logger};
use smallvec::SmallVec;
use std::boxed::Box;
use std::collections::HashSet;
use std::ops::Sub;
use std::sync::Weak;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
use types::{EthSpec, Hash256, SignedBeaconBlock};
use types::{EthSpec, Hash256, SignedBeaconBlock, Slot};
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
/// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a
@@ -126,32 +125,16 @@ struct ParentRequests<T: EthSpec> {
pending: Option<RequestId>,
}
#[derive(PartialEq, Debug, Clone)]
/// The current state of the `ImportManager`.
enum ManagerState {
/// The manager is performing a long-range (batch) sync. In this mode, parent lookups are
/// disabled.
Syncing,
/// The manager is up to date with all known peers and is connected to at least one
/// fully-syncing peer. In this state, parent lookups are enabled.
Regular,
/// No useful peers are connected. Long-range sync's cannot proceed and we have no useful
/// peers to download parents for. More peers need to be connected before we can proceed.
Stalled,
}
/// The primary object for handling and driving all the current syncing logic. It maintains the
/// current state of the syncing process, the number of useful peers, downloaded blocks and
/// controls the logic behind both the long-range (batch) sync and the on-going potential parent
/// look-up of blocks.
pub struct SyncManager<T: BeaconChainTypes> {
/// A weak reference to the underlying beacon chain.
chain: Weak<BeaconChain<T>>,
/// A reference to the underlying beacon chain.
chain: Arc<BeaconChain<T>>,
/// The current state of the import manager.
state: ManagerState,
/// A reference to the network globals and peer-db.
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
/// A receiving channel sent by the message processor thread.
input_channel: mpsc::UnboundedReceiver<SyncMessage<T::EthSpec>>,
@@ -171,9 +154,6 @@ pub struct SyncManager<T: BeaconChainTypes> {
/// The flag allows us to determine if the peer returned data or sent us nothing.
single_block_lookups: FnvHashMap<RequestId, (Hash256, bool)>,
/// The collection of known, connected, fully-sync'd peers.
full_peers: HashSet<PeerId>,
/// The logger for the import manager.
log: Logger,
@@ -186,7 +166,8 @@ pub struct SyncManager<T: BeaconChainTypes> {
/// dropped during the syncing process which will gracefully end the `SyncManager`.
pub fn spawn<T: BeaconChainTypes>(
executor: &tokio::runtime::TaskExecutor,
beacon_chain: Weak<BeaconChain<T>>,
beacon_chain: Arc<BeaconChain<T>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
log: slog::Logger,
) -> (
@@ -200,14 +181,18 @@ pub fn spawn<T: BeaconChainTypes>(
// create an instance of the SyncManager
let sync_manager = SyncManager {
chain: beacon_chain.clone(),
state: ManagerState::Stalled,
input_channel: sync_recv,
range_sync: RangeSync::new(
beacon_chain.clone(),
network_globals.clone(),
sync_send.clone(),
log.clone(),
),
network: SyncNetworkContext::new(network_send, log.clone()),
range_sync: RangeSync::new(beacon_chain, sync_send.clone(), log.clone()),
chain: beacon_chain,
network_globals,
input_channel: sync_recv,
parent_queue: SmallVec::new(),
single_block_lookups: FnvHashMap::default(),
full_peers: HashSet::new(),
log: log.clone(),
sync_send: sync_send.clone(),
};
@@ -239,17 +224,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
/// ours that we consider it fully sync'd with respect to our current chain.
fn add_peer(&mut self, peer_id: PeerId, remote: PeerSyncInfo) {
// ensure the beacon chain still exists
let chain = match self.chain.upgrade() {
Some(chain) => chain,
None => {
warn!(self.log,
"Beacon chain dropped. Peer not considered for sync";
"peer_id" => format!("{:?}", peer_id));
return;
}
};
let local = match PeerSyncInfo::from_chain(&chain) {
let local = match PeerSyncInfo::from_chain(&self.chain) {
Some(local) => local,
None => {
return error!(
@@ -263,12 +238,12 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// If a peer is within SLOT_IMPORT_TOLERANCE from our head slot, ignore a batch/range sync,
// consider it a fully-sync'd peer.
if remote.head_slot.sub(local.head_slot).as_usize() < SLOT_IMPORT_TOLERANCE {
trace!(self.log, "Ignoring full sync with peer";
trace!(self.log, "Peer synced to our head found";
"peer" => format!("{:?}", peer_id),
"peer_head_slot" => remote.head_slot,
"local_head_slot" => local.head_slot,
);
self.add_full_peer(peer_id);
self.synced_peer(&peer_id, remote.head_slot);
// notify the range sync that a peer has been added
self.range_sync.fully_synced_peer_found();
return;
@@ -277,19 +252,14 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// Check if the peer is significantly behind us. If within `SLOT_IMPORT_TOLERANCE`
// treat them as a fully synced peer. If not, ignore them in the sync process
if local.head_slot.sub(remote.head_slot).as_usize() < SLOT_IMPORT_TOLERANCE {
self.add_full_peer(peer_id.clone());
self.synced_peer(&peer_id, remote.head_slot);
} else {
debug!(
self.log,
"Out of sync peer connected";
"peer" => format!("{:?}", peer_id),
);
self.behind_peer(&peer_id, remote.head_slot);
return;
}
// Add the peer to our RangeSync
self.range_sync.add_peer(&mut self.network, peer_id, remote);
self.update_state();
}
/// The response to a `BlocksByRoot` request.
@@ -398,43 +368,41 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
// we have the correct block, try and process it
if let Some(chain) = self.chain.upgrade() {
match BlockProcessingOutcome::shim(chain.process_block(block.clone())) {
Ok(outcome) => {
match outcome {
BlockProcessingOutcome::Processed { block_root } => {
info!(self.log, "Processed block"; "block" => format!("{}", block_root));
match BlockProcessingOutcome::shim(self.chain.process_block(block.clone())) {
Ok(outcome) => {
match outcome {
BlockProcessingOutcome::Processed { block_root } => {
info!(self.log, "Processed block"; "block" => format!("{}", block_root));
match chain.fork_choice() {
Ok(()) => trace!(
self.log,
"Fork choice success";
"location" => "single block"
),
Err(e) => error!(
self.log,
"Fork choice failed";
"error" => format!("{:?}", e),
"location" => "single block"
),
}
}
BlockProcessingOutcome::ParentUnknown { .. } => {
// We don't know of the blocks parent, begin a parent lookup search
self.add_unknown_block(peer_id, block);
}
BlockProcessingOutcome::BlockIsAlreadyKnown => {
trace!(self.log, "Single block lookup already known");
}
_ => {
warn!(self.log, "Single block lookup failed"; "outcome" => format!("{:?}", outcome));
self.network.downvote_peer(peer_id);
match self.chain.fork_choice() {
Ok(()) => trace!(
self.log,
"Fork choice success";
"location" => "single block"
),
Err(e) => error!(
self.log,
"Fork choice failed";
"error" => format!("{:?}", e),
"location" => "single block"
),
}
}
BlockProcessingOutcome::ParentUnknown { .. } => {
// We don't know of the blocks parent, begin a parent lookup search
self.add_unknown_block(peer_id, block);
}
BlockProcessingOutcome::BlockIsAlreadyKnown => {
trace!(self.log, "Single block lookup already known");
}
_ => {
warn!(self.log, "Single block lookup failed"; "outcome" => format!("{:?}", outcome));
self.network.downvote_peer(peer_id);
}
}
Err(e) => {
warn!(self.log, "Unexpected block processing error"; "error" => format!("{:?}", e));
}
}
Err(e) => {
warn!(self.log, "Unexpected block processing error"; "error" => format!("{:?}", e));
}
}
}
@@ -442,8 +410,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
/// A block has been sent to us that has an unknown parent. This begins a parent lookup search
/// to find the parent or chain of parents that match our current chain.
fn add_unknown_block(&mut self, peer_id: PeerId, block: SignedBeaconBlock<T::EthSpec>) {
// If we are not in regular sync mode, ignore this block
if self.state != ManagerState::Regular {
// If we are not synced ignore the block
if !self.network_globals.sync_state.read().is_synced() {
return;
}
@@ -473,8 +441,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
/// A request to search for a block hash has been received. This function begins a BlocksByRoot
/// request to find the requested block.
fn search_for_block(&mut self, peer_id: PeerId, block_hash: Hash256) {
// If we are not in regular sync mode, ignore this block
if self.state != ManagerState::Regular {
// If we are not synced, ignore this block
if !self.network_globals.sync_state.read().is_synced() {
return;
}
@@ -516,42 +484,60 @@ impl<T: BeaconChainTypes> SyncManager<T> {
fn peer_disconnect(&mut self, peer_id: &PeerId) {
self.range_sync.peer_disconnect(&mut self.network, peer_id);
self.full_peers.remove(peer_id);
self.update_state();
self.update_sync_state();
}
fn add_full_peer(&mut self, peer_id: PeerId) {
debug!(
self.log, "Fully synced peer added";
"peer" => format!("{:?}", peer_id),
);
self.full_peers.insert(peer_id);
/// Updates the syncing state of a peer to be synced.
fn synced_peer(&mut self, peer_id: &PeerId, status_head_slot: Slot) {
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
match peer_info.sync_status {
PeerSyncStatus::Synced { .. } => {
peer_info.sync_status = PeerSyncStatus::Synced { status_head_slot }
} // just update block
PeerSyncStatus::Behind { .. } | PeerSyncStatus::Unknown => {
peer_info.sync_status = PeerSyncStatus::Synced { status_head_slot };
debug!(self.log, "Peer transitioned to synced status"; "peer_id" => format!("{}", peer_id));
}
}
} else {
crit!(self.log, "Status'd peer is unknown"; "peer_id" => format!("{}", peer_id));
}
self.update_sync_state();
}
/// Updates the syncing state of a peer to be behind.
fn behind_peer(&mut self, peer_id: &PeerId, status_head_slot: Slot) {
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
match peer_info.sync_status {
PeerSyncStatus::Synced { .. } => {
debug!(self.log, "Peer transitioned to from synced state to behind"; "peer_id" => format!("{}", peer_id), "head_slot" => status_head_slot);
peer_info.sync_status = PeerSyncStatus::Behind { status_head_slot }
}
PeerSyncStatus::Behind { .. } => {
peer_info.sync_status = PeerSyncStatus::Behind { status_head_slot }
} // just update
PeerSyncStatus::Unknown => {
debug!(self.log, "Peer transitioned to behind sync status"; "peer_id" => format!("{}", peer_id), "head_slot" => status_head_slot);
peer_info.sync_status = PeerSyncStatus::Behind { status_head_slot }
}
}
} else {
crit!(self.log, "Status'd peer is unknown"; "peer_id" => format!("{}", peer_id));
}
self.update_sync_state();
}
/// Updates the global sync state and logs any changes.
fn update_sync_state(&mut self) {
if let Some((old_state, new_state)) = self.network_globals.update_sync_state() {
info!(self.log, "Sync state updated"; "old_state" => format!("{}", old_state), "new_state" => format!("{}",new_state));
}
}
/* Processing State Functions */
// These functions are called in the main poll function to transition the state of the sync
// manager
/// Updates the syncing state of the `SyncManager`.
fn update_state(&mut self) {
let previous_state = self.state.clone();
self.state = {
if self.range_sync.is_syncing() {
ManagerState::Syncing
} else if !self.full_peers.is_empty() {
ManagerState::Regular
} else {
ManagerState::Stalled
}
};
if self.state != previous_state {
info!(self.log, "Syncing state updated";
"old_state" => format!("{:?}", previous_state),
"new_state" => format!("{:?}", self.state),
);
}
}
/// A new block has been received for a parent lookup query, process it.
fn process_parent_request(&mut self, mut parent_request: ParentRequests<T::EthSpec>) {
// verify the last added block is the parent of the last requested block
@@ -598,55 +584,50 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// If the last block in the queue has an unknown parent, we continue the parent
// lookup-search.
if let Some(chain) = self.chain.upgrade() {
let newest_block = parent_request
.downloaded_blocks
.pop()
.expect("There is always at least one block in the queue");
match BlockProcessingOutcome::shim(chain.process_block(newest_block.clone())) {
Ok(BlockProcessingOutcome::ParentUnknown { .. }) => {
// need to keep looking for parents
// add the block back to the queue and continue the search
parent_request.downloaded_blocks.push(newest_block);
self.request_parent(parent_request);
return;
}
Ok(BlockProcessingOutcome::Processed { .. })
| Ok(BlockProcessingOutcome::BlockIsAlreadyKnown { .. }) => {
spawn_block_processor(
self.chain.clone(),
ProcessId::ParentLookup(parent_request.last_submitted_peer.clone()),
parent_request.downloaded_blocks,
self.sync_send.clone(),
self.log.clone(),
);
}
Ok(outcome) => {
// all else we consider the chain a failure and downvote the peer that sent
// us the last block
warn!(
self.log, "Invalid parent chain. Downvoting peer";
"outcome" => format!("{:?}", outcome),
"last_peer" => format!("{:?}", parent_request.last_submitted_peer),
);
self.network
.downvote_peer(parent_request.last_submitted_peer.clone());
return;
}
Err(e) => {
warn!(
self.log, "Parent chain processing error. Downvoting peer";
"error" => format!("{:?}", e),
"last_peer" => format!("{:?}", parent_request.last_submitted_peer),
);
self.network
.downvote_peer(parent_request.last_submitted_peer.clone());
return;
}
let newest_block = parent_request
.downloaded_blocks
.pop()
.expect("There is always at least one block in the queue");
match BlockProcessingOutcome::shim(self.chain.process_block(newest_block.clone())) {
Ok(BlockProcessingOutcome::ParentUnknown { .. }) => {
// need to keep looking for parents
// add the block back to the queue and continue the search
parent_request.downloaded_blocks.push(newest_block);
self.request_parent(parent_request);
return;
}
Ok(BlockProcessingOutcome::Processed { .. })
| Ok(BlockProcessingOutcome::BlockIsAlreadyKnown { .. }) => {
spawn_block_processor(
Arc::downgrade(&self.chain),
ProcessId::ParentLookup(parent_request.last_submitted_peer.clone()),
parent_request.downloaded_blocks,
self.sync_send.clone(),
self.log.clone(),
);
}
Ok(outcome) => {
// all else we consider the chain a failure and downvote the peer that sent
// us the last block
warn!(
self.log, "Invalid parent chain. Downvoting peer";
"outcome" => format!("{:?}", outcome),
"last_peer" => format!("{:?}", parent_request.last_submitted_peer),
);
self.network
.downvote_peer(parent_request.last_submitted_peer.clone());
return;
}
Err(e) => {
warn!(
self.log, "Parent chain processing error. Downvoting peer";
"error" => format!("{:?}", e),
"last_peer" => format!("{:?}", parent_request.last_submitted_peer),
);
self.network
.downvote_peer(parent_request.last_submitted_peer.clone());
return;
}
} else {
// chain doesn't exist, drop the parent queue and return
return;
}
}
}
@@ -678,9 +659,15 @@ impl<T: BeaconChainTypes> SyncManager<T> {
block_roots: vec![parent_hash],
};
// select a random fully synced peer to attempt to download the parent block
let available_peers = self.full_peers.iter().collect::<Vec<_>>();
let available_peers = self
.network_globals
.peers
.read()
.synced_peers()
.cloned()
.collect::<Vec<_>>();
let peer_id = if let Some(peer_id) = available_peers.choose(&mut rand::thread_rng()) {
(**peer_id).clone()
(*peer_id).clone()
} else {
// there were no peers to choose from. We drop the lookup request
return;
@@ -763,9 +750,6 @@ impl<T: BeaconChainTypes> Future for SyncManager<T> {
}
}
// update the state of the manager
self.update_state();
Ok(Async::NotReady)
}
}

View File

@@ -8,7 +8,7 @@ use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RequestId};
use eth2_libp2p::PeerId;
use slog::{debug, trace, warn};
use std::sync::Weak;
use std::sync::Arc;
use tokio::sync::mpsc;
use types::EthSpec;
@@ -34,24 +34,22 @@ impl<T: EthSpec> SyncNetworkContext<T> {
pub fn status_peer<U: BeaconChainTypes>(
&mut self,
chain: Weak<BeaconChain<U>>,
chain: Arc<BeaconChain<U>>,
peer_id: PeerId,
) {
if let Some(chain) = chain.upgrade() {
if let Some(status_message) = status_message(&chain) {
debug!(
self.log,
"Sending Status Request";
"peer" => format!("{:?}", peer_id),
"fork_digest" => format!("{:?}", status_message.fork_digest),
"finalized_root" => format!("{:?}", status_message.finalized_root),
"finalized_epoch" => format!("{:?}", status_message.finalized_epoch),
"head_root" => format!("{}", status_message.head_root),
"head_slot" => format!("{}", status_message.head_slot),
);
if let Some(status_message) = status_message(&chain) {
debug!(
self.log,
"Sending Status Request";
"peer" => format!("{:?}", peer_id),
"fork_digest" => format!("{:?}", status_message.fork_digest),
"finalized_root" => format!("{:?}", status_message.finalized_root),
"finalized_epoch" => format!("{:?}", status_message.finalized_epoch),
"head_root" => format!("{}", status_message.head_root),
"head_slot" => format!("{}", status_message.head_slot),
);
let _ = self.send_rpc_request(peer_id, RPCRequest::Status(status_message));
}
let _ = self.send_rpc_request(peer_id, RPCRequest::Status(status_message));
}
}

View File

@@ -8,7 +8,7 @@ use eth2_libp2p::PeerId;
use rand::prelude::*;
use slog::{crit, debug, warn};
use std::collections::HashSet;
use std::sync::Weak;
use std::sync::Arc;
use tokio::sync::mpsc;
use types::{Hash256, SignedBeaconBlock, Slot};
@@ -82,7 +82,8 @@ pub struct SyncingChain<T: BeaconChainTypes> {
/// back once batch processing has completed.
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
chain: Weak<BeaconChain<T>>,
/// A reference to the underlying beacon chain.
chain: Arc<BeaconChain<T>>,
/// A reference to the sync logger.
log: slog::Logger,
@@ -103,7 +104,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
target_head_root: Hash256,
peer_id: PeerId,
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
chain: Weak<BeaconChain<T>>,
chain: Arc<BeaconChain<T>>,
log: slog::Logger,
) -> Self {
let mut peer_pool = HashSet::new();
@@ -244,7 +245,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let batch_id = ProcessId::RangeBatchId(batch.id.clone());
self.current_processing_batch = Some(batch);
spawn_block_processor(
self.chain.clone(),
Arc::downgrade(&self.chain.clone()),
batch_id,
downloaded_blocks,
self.sync_send.clone(),

View File

@@ -8,48 +8,138 @@ use crate::router::processor::PeerSyncInfo;
use crate::sync::manager::SyncMessage;
use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::PeerId;
use slog::{debug, error, warn};
use std::sync::Weak;
use eth2_libp2p::{types::SyncState, NetworkGlobals, PeerId};
use slog::{debug, error, info};
use std::sync::Arc;
use tokio::sync::mpsc;
use types::EthSpec;
use types::{Hash256, Slot};
/// The state of the long range/batch sync.
pub enum SyncState {
#[derive(Clone)]
pub enum RangeSyncState {
/// A finalized chain is being synced.
Finalized,
Finalized {
/// The start of the finalized chain.
start_slot: Slot,
/// The target head slot of the finalized chain.
head_slot: Slot,
/// The target head root of the finalized chain.
head_root: Hash256,
},
/// There are no finalized chains and we are syncing one more head chains.
Head,
Head {
/// The last finalized checkpoint for all head chains.
start_slot: Slot,
/// The largest known slot to sync to.
head_slot: Slot,
},
/// There are no head or finalized chains and no long range sync is in progress.
Idle,
}
impl PartialEq for RangeSyncState {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(RangeSyncState::Finalized { .. }, RangeSyncState::Finalized { .. }) => true,
(RangeSyncState::Head { .. }, RangeSyncState::Head { .. }) => true,
(RangeSyncState::Idle, RangeSyncState::Idle) => true,
_ => false,
}
}
}
impl Into<SyncState> for RangeSyncState {
fn into(self) -> SyncState {
match self {
RangeSyncState::Finalized {
start_slot,
head_slot,
head_root,
} => SyncState::SyncingFinalized {
start_slot,
head_slot,
head_root,
},
RangeSyncState::Head {
start_slot,
head_slot,
} => SyncState::SyncingHead {
start_slot,
head_slot,
},
RangeSyncState::Idle => SyncState::Stalled, // this should never really be used
}
}
}
/// A collection of finalized and head chains currently being processed.
pub struct ChainCollection<T: BeaconChainTypes> {
/// The beacon chain for processing.
beacon_chain: Weak<BeaconChain<T>>,
beacon_chain: Arc<BeaconChain<T>>,
/// A reference to the global network parameters.
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
/// The set of finalized chains being synced.
finalized_chains: Vec<SyncingChain<T>>,
/// The set of head chains being synced.
head_chains: Vec<SyncingChain<T>>,
/// The current sync state of the process.
sync_state: SyncState,
state: RangeSyncState,
/// Logger for the collection.
log: slog::Logger,
}
impl<T: BeaconChainTypes> ChainCollection<T> {
pub fn new(beacon_chain: Weak<BeaconChain<T>>) -> Self {
pub fn new(
beacon_chain: Arc<BeaconChain<T>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
log: slog::Logger,
) -> Self {
ChainCollection {
sync_state: SyncState::Idle,
beacon_chain,
network_globals,
finalized_chains: Vec::new(),
head_chains: Vec::new(),
beacon_chain,
state: RangeSyncState::Idle,
log,
}
}
/// The current syncing state.
pub fn sync_state(&self) -> &SyncState {
&self.sync_state
pub fn state(&self) -> &RangeSyncState {
&self.state
}
/// Updates the global sync state and logs any changes.
fn update_sync_state(&mut self, state: RangeSyncState) {
// if there is no range sync occurring, the state is either synced or not based on
// connected peers.
self.state = state;
if self.state == RangeSyncState::Idle {
// there is no range sync, let the state of peers determine the global node sync state
let new_state = self
.network_globals
.peers
.read()
.synced_peers()
.next()
.map(|_| SyncState::Synced)
.unwrap_or_else(|| SyncState::Stalled);
let mut peer_state = self.network_globals.sync_state.write();
if new_state != *peer_state {
info!(self.log, "Sync state updated"; "old_state" => format!("{}",peer_state), "new_state" => format!("{}",new_state));
}
*peer_state = new_state;
} else {
// The state is based on a range sync state, update it
let mut node_sync_state = self.network_globals.sync_state.write();
let new_state: SyncState = self.state.clone().into();
if *node_sync_state != new_state {
// we are updating the state, inform the user
info!(self.log, "Sync state updated"; "old_state" => format!("{}",node_sync_state), "new_state" => format!("{}",new_state));
}
*node_sync_state = new_state;
}
}
/// A fully synced peer has joined.
@@ -57,9 +147,10 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
/// We could be awaiting a head sync. If we are in the head syncing state, without any head
/// chains, then update the state to idle.
pub fn fully_synced_peer_found(&mut self) {
if let SyncState::Head = self.sync_state {
if let RangeSyncState::Head { .. } = self.state {
if self.head_chains.is_empty() {
self.sync_state = SyncState::Idle;
// Update the global network state to either synced or stalled.
self.update_sync_state(RangeSyncState::Idle);
}
}
}
@@ -68,8 +159,19 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
/// `SyncState::Head` indicating we are awaiting new peers to connect before we can consider
/// the state as idle.
pub fn set_head_sync(&mut self) {
if let SyncState::Idle = self.sync_state {
self.sync_state = SyncState::Head;
if let RangeSyncState::Idle = self.state {
let current_slot = self
.beacon_chain
.head_info()
.map(|info| info.slot)
.unwrap_or_else(|_| Slot::from(0u64));
// NOTE: This will modify the /node/syncing API to show current slot for all fields
// while we update peers to look for new potentially HEAD chains.
let temp_head_state = RangeSyncState::Head {
start_slot: current_slot,
head_slot: current_slot,
};
self.update_sync_state(temp_head_state);
}
}
@@ -103,36 +205,26 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
///
/// This removes any out-dated chains, swaps to any higher priority finalized chains and
/// updates the state of the collection.
pub fn update_finalized(
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
log: &slog::Logger,
) {
let local_slot = match self.beacon_chain.upgrade() {
Some(chain) => {
let local = match PeerSyncInfo::from_chain(&chain) {
Some(local) => local,
None => {
return error!(
log,
"Failed to get peer sync info";
"msg" => "likely due to head lock contention"
)
}
};
pub fn update_finalized(&mut self, network: &mut SyncNetworkContext<T::EthSpec>) {
let local_slot = {
let local = match PeerSyncInfo::from_chain(&self.beacon_chain) {
Some(local) => local,
None => {
return error!(
self.log,
"Failed to get peer sync info";
"msg" => "likely due to head lock contention"
)
}
};
local
.finalized_epoch
.start_slot(T::EthSpec::slots_per_epoch())
}
None => {
warn!(log, "Beacon chain dropped. Chains not updated");
return;
}
local
.finalized_epoch
.start_slot(T::EthSpec::slots_per_epoch())
};
// Remove any outdated finalized chains
self.purge_outdated_chains(network, log);
self.purge_outdated_chains(network);
// Check if any chains become the new syncing chain
if let Some(index) = self.finalized_syncing_index() {
@@ -149,13 +241,20 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
})
{
// A chain has more peers. Swap the syncing chain
debug!(log, "Switching finalized chains to sync"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_slot"=> chain.start_slot);
debug!(self.log, "Switching finalized chains to sync"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_slot"=> chain.start_slot);
// update the state to a new finalized state
let state = RangeSyncState::Finalized {
start_slot: chain.start_slot,
head_slot: chain.target_head_slot,
head_root: chain.target_head_root,
};
self.update_sync_state(state);
// Stop the current chain from syncing
self.finalized_chains[index].stop_syncing();
// Start the new chain
self.finalized_chains[new_index].start_syncing(network, local_slot);
self.sync_state = SyncState::Finalized;
}
} else if let Some(chain) = self
.finalized_chains
@@ -163,15 +262,36 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
.max_by_key(|chain| chain.peer_pool.len())
{
// There is no currently syncing finalization chain, starting the one with the most peers
debug!(log, "New finalized chain started syncing"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_slot"=> chain.start_slot);
debug!(self.log, "New finalized chain started syncing"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_slot"=> chain.start_slot);
chain.start_syncing(network, local_slot);
self.sync_state = SyncState::Finalized;
let state = RangeSyncState::Finalized {
start_slot: chain.start_slot,
head_slot: chain.target_head_slot,
head_root: chain.target_head_root,
};
self.update_sync_state(state);
} else {
// There are no finalized chains, update the state.
if self.head_chains.is_empty() {
self.sync_state = SyncState::Idle;
self.update_sync_state(RangeSyncState::Idle);
} else {
self.sync_state = SyncState::Head;
// for the syncing API, we find the minimal start_slot and the maximum
// target_slot of all head chains to report back.
let (min_slot, max_slot) = self.head_chains.iter().fold(
(Slot::from(0u64), Slot::from(0u64)),
|(min, max), chain| {
(
std::cmp::min(min, chain.start_slot),
std::cmp::max(max, chain.target_head_slot),
)
},
);
let head_state = RangeSyncState::Head {
start_slot: min_slot,
head_slot: max_slot,
};
self.update_sync_state(head_state);
}
}
}
@@ -184,7 +304,6 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
target_slot: Slot,
peer_id: PeerId,
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
log: &slog::Logger,
) {
self.finalized_chains.push(SyncingChain::new(
local_finalized_slot,
@@ -193,7 +312,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
peer_id,
sync_send,
self.beacon_chain.clone(),
log.clone(),
self.log.clone(),
));
}
@@ -207,7 +326,6 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
target_slot: Slot,
peer_id: PeerId,
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
log: &slog::Logger,
) {
// remove the peer from any other head chains
@@ -223,7 +341,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
peer_id,
sync_send,
self.beacon_chain.clone(),
log.clone(),
self.log.clone(),
);
// All head chains can sync simultaneously
new_head_chain.start_syncing(network, remote_finalized_slot);
@@ -281,29 +399,20 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
///
/// This removes chains with no peers, or chains whose start block slot is less than our current
/// finalized block slot.
pub fn purge_outdated_chains(
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
log: &slog::Logger,
) {
pub fn purge_outdated_chains(&mut self, network: &mut SyncNetworkContext<T::EthSpec>) {
// Remove any chains that have no peers
self.finalized_chains
.retain(|chain| !chain.peer_pool.is_empty());
self.head_chains.retain(|chain| !chain.peer_pool.is_empty());
let (beacon_chain, local_info) = match self.beacon_chain.upgrade() {
Some(chain) => match PeerSyncInfo::from_chain(&chain) {
Some(local) => (chain, local),
None => {
return error!(
log,
"Failed to get peer sync info";
"msg" => "likely due to head lock contention"
)
}
},
let local_info = match PeerSyncInfo::from_chain(&self.beacon_chain) {
Some(local) => local,
None => {
return;
return error!(
self.log,
"Failed to get peer sync info";
"msg" => "likely due to head lock contention"
)
}
};
@@ -311,6 +420,8 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
.finalized_epoch
.start_slot(T::EthSpec::slots_per_epoch());
let beacon_chain = &self.beacon_chain;
let log_ref = &self.log;
// Remove chains that are out-dated and re-status their peers
self.finalized_chains.retain(|chain| {
if chain.target_head_slot <= local_finalized_slot
@@ -318,7 +429,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
.fork_choice
.contains_block(&chain.target_head_root)
{
debug!(log, "Purging out of finalized chain"; "start_slot" => chain.start_slot, "end_slot" => chain.target_head_slot);
debug!(log_ref, "Purging out of finalized chain"; "start_slot" => chain.start_slot, "end_slot" => chain.target_head_slot);
chain.status_peers(network);
false
} else {
@@ -331,7 +442,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
.fork_choice
.contains_block(&chain.target_head_root)
{
debug!(log, "Purging out of date head chain"; "start_slot" => chain.start_slot, "end_slot" => chain.target_head_slot);
debug!(log_ref, "Purging out of date head chain"; "start_slot" => chain.start_slot, "end_slot" => chain.target_head_slot);
chain.status_peers(network);
false
} else {
@@ -355,12 +466,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
/// finalized chain length, indicates a head chain.
///
/// This will re-status the chains peers on removal. The index must exist.
pub fn remove_chain(
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
index: usize,
log: &slog::Logger,
) {
pub fn remove_chain(&mut self, network: &mut SyncNetworkContext<T::EthSpec>, index: usize) {
let chain = if index >= self.finalized_chains.len() {
let index = index - self.finalized_chains.len();
let chain = self.head_chains.swap_remove(index);
@@ -372,10 +478,10 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
chain
};
debug!(log, "Chain was removed"; "start_slot" => chain.start_slot.as_u64(), "end_slot" => chain.target_head_slot.as_u64());
debug!(self.log, "Chain was removed"; "start_slot" => chain.start_slot.as_u64(), "end_slot" => chain.target_head_slot.as_u64());
// update the state
self.update_finalized(network, log);
self.update_finalized(network);
}
/// Returns the index of finalized chain that is currently syncing. Returns `None` if no

View File

@@ -40,7 +40,7 @@
//! and further batches are requested as current blocks are being processed.
use super::chain::ProcessingResult;
use super::chain_collection::{ChainCollection, SyncState};
use super::chain_collection::{ChainCollection, RangeSyncState};
use super::BatchId;
use crate::router::processor::PeerSyncInfo;
use crate::sync::block_processor::BatchProcessResult;
@@ -48,10 +48,10 @@ use crate::sync::manager::SyncMessage;
use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::rpc::RequestId;
use eth2_libp2p::PeerId;
use slog::{debug, error, trace, warn};
use eth2_libp2p::{NetworkGlobals, PeerId};
use slog::{debug, error, trace};
use std::collections::HashSet;
use std::sync::Weak;
use std::sync::Arc;
use tokio::sync::mpsc;
use types::{EthSpec, SignedBeaconBlock};
@@ -60,7 +60,7 @@ use types::{EthSpec, SignedBeaconBlock};
/// holds the current state of the long range sync.
pub struct RangeSync<T: BeaconChainTypes> {
/// The beacon chain for processing.
beacon_chain: Weak<BeaconChain<T>>,
beacon_chain: Arc<BeaconChain<T>>,
/// A collection of chains that need to be downloaded. This stores any head or finalized chains
/// that need to be downloaded.
chains: ChainCollection<T>,
@@ -77,13 +77,14 @@ pub struct RangeSync<T: BeaconChainTypes> {
impl<T: BeaconChainTypes> RangeSync<T> {
pub fn new(
beacon_chain: Weak<BeaconChain<T>>,
beacon_chain: Arc<BeaconChain<T>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
log: slog::Logger,
) -> Self {
RangeSync {
beacon_chain: beacon_chain.clone(),
chains: ChainCollection::new(beacon_chain),
chains: ChainCollection::new(beacon_chain, network_globals, log.clone()),
awaiting_head_peers: HashSet::new(),
sync_send,
log,
@@ -118,21 +119,14 @@ impl<T: BeaconChainTypes> RangeSync<T> {
// determine if we need to run a sync to the nearest finalized state or simply sync to
// its current head
let (chain, local_info) = match self.beacon_chain.upgrade() {
Some(chain) => match PeerSyncInfo::from_chain(&chain) {
Some(local) => (chain, local),
None => {
return error!(
self.log,
"Failed to get peer sync info";
"msg" => "likely due to head lock contention"
)
}
},
let local_info = match PeerSyncInfo::from_chain(&self.beacon_chain) {
Some(local) => local,
None => {
return warn!(self.log,
"Beacon chain dropped. Peer not considered for sync";
"peer_id" => format!("{:?}", peer_id));
return error!(
self.log,
"Failed to get peer sync info";
"msg" => "likely due to head lock contention"
)
}
};
@@ -148,10 +142,13 @@ impl<T: BeaconChainTypes> RangeSync<T> {
self.remove_peer(network, &peer_id);
// remove any out-of-date chains
self.chains.purge_outdated_chains(network, &self.log);
self.chains.purge_outdated_chains(network);
if remote_finalized_slot > local_info.head_slot
&& !chain.fork_choice.contains_block(&remote.finalized_root)
&& !self
.beacon_chain
.fork_choice
.contains_block(&remote.finalized_root)
{
debug!(self.log, "Finalization sync peer joined"; "peer_id" => format!("{:?}", peer_id));
// Finalized chain search
@@ -171,7 +168,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
chain.add_peer(network, peer_id);
// check if the new peer's addition will favour a new syncing chain.
self.chains.update_finalized(network, &self.log);
self.chains.update_finalized(network);
} else {
// there is no finalized chain that matches this peer's last finalized target
// create a new finalized chain
@@ -183,9 +180,8 @@ impl<T: BeaconChainTypes> RangeSync<T> {
remote_finalized_slot,
peer_id,
self.sync_send.clone(),
&self.log,
);
self.chains.update_finalized(network, &self.log);
self.chains.update_finalized(network);
}
} else {
if self.chains.is_finalizing_sync() {
@@ -216,10 +212,9 @@ impl<T: BeaconChainTypes> RangeSync<T> {
remote.head_slot,
peer_id,
self.sync_send.clone(),
&self.log,
);
}
self.chains.update_finalized(network, &self.log);
self.chains.update_finalized(network);
}
}
@@ -274,7 +269,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
chain.status_peers(network);
// update the state of the collection
self.chains.update_finalized(network, &self.log);
self.chains.update_finalized(network);
// set the state to a head sync, to inform the manager that we are awaiting a
// head chain.
@@ -282,13 +277,13 @@ impl<T: BeaconChainTypes> RangeSync<T> {
// if there are no more finalized chains, re-status all known peers awaiting a head
// sync
match self.chains.sync_state() {
SyncState::Idle | SyncState::Head => {
match self.chains.state() {
RangeSyncState::Idle | RangeSyncState::Head { .. } => {
for peer_id in self.awaiting_head_peers.drain() {
network.status_peer(self.beacon_chain.clone(), peer_id);
}
}
SyncState::Finalized => {} // Have more finalized chains to complete
RangeSyncState::Finalized { .. } => {} // Have more finalized chains to complete
}
}
Some((_, ProcessingResult::KeepChain)) => {}
@@ -308,7 +303,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
chain.status_peers(network);
// update the state of the collection
self.chains.update_finalized(network, &self.log);
self.chains.update_finalized(network);
}
Some((_, ProcessingResult::KeepChain)) => {}
None => {
@@ -321,15 +316,6 @@ impl<T: BeaconChainTypes> RangeSync<T> {
}
}
/// Public method to indicate the current state of the long range sync.
pub fn is_syncing(&self) -> bool {
match self.chains.sync_state() {
SyncState::Finalized => true,
SyncState::Head => true,
SyncState::Idle => false,
}
}
/// A peer has disconnected. This removes the peer from any ongoing chains and mappings. A
/// disconnected peer could remove a chain
pub fn peer_disconnect(
@@ -344,7 +330,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
self.remove_peer(network, peer_id);
// update the state of the collection
self.chains.update_finalized(network, &self.log);
self.chains.update_finalized(network);
}
/// When a peer gets removed, both the head and finalized chains need to be searched to check which pool the peer is in. The chain may also have a batch or batches awaiting
@@ -370,7 +356,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
{
// the chain needed to be removed
debug!(self.log, "Chain being removed due to failed batch");
self.chains.remove_chain(network, index, &self.log);
self.chains.remove_chain(network, index);
}
}
@@ -392,7 +378,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
Some((_, ProcessingResult::KeepChain)) => {} // error handled chain persists
Some((index, ProcessingResult::RemoveChain)) => {
debug!(self.log, "Chain being removed due to RPC error");
self.chains.remove_chain(network, index, &self.log)
self.chains.remove_chain(network, index)
}
None => {} // request wasn't in the finalized chains, check the head chains
}