From e5874f4565485921175f8aaeb5b813becb7a8379 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 14 Apr 2020 18:17:35 +1000 Subject: [PATCH] Global Sync access (#994) * Connect sync logic to network globals * Add further sync info to sync status * Build new syncing HTTP API methods * Fix bug in updating sync state * Highest slot is current slot * Update book for syncing API --- beacon_node/eth2-libp2p/Cargo.toml | 2 +- beacon_node/eth2-libp2p/src/lib.rs | 2 +- .../eth2-libp2p/src/peer_manager/mod.rs | 18 +- .../eth2-libp2p/src/peer_manager/peer_info.rs | 24 +- .../eth2-libp2p/src/peer_manager/peerdb.rs | 103 +++--- beacon_node/eth2-libp2p/src/types/globals.rs | 38 ++ beacon_node/eth2-libp2p/src/types/mod.rs | 2 + .../eth2-libp2p/src/types/sync_state.rs | 66 ++++ beacon_node/network/src/router/mod.rs | 11 +- beacon_node/network/src/router/processor.rs | 19 +- beacon_node/network/src/service.rs | 22 +- beacon_node/network/src/sync/manager.rs | 330 +++++++++--------- .../network/src/sync/network_context.rs | 30 +- .../network/src/sync/range_sync/chain.rs | 9 +- .../src/sync/range_sync/chain_collection.rs | 268 +++++++++----- .../network/src/sync/range_sync/range.rs | 76 ++-- beacon_node/rest_api/src/node.rs | 41 +++ beacon_node/rest_api/src/router.rs | 16 +- book/src/http.md | 1 + book/src/http_node.md | 104 ++++++ eth2/utils/rest_types/src/lib.rs | 3 + eth2/utils/rest_types/src/node.rs | 32 ++ 22 files changed, 818 insertions(+), 399 deletions(-) create mode 100644 beacon_node/eth2-libp2p/src/types/sync_state.rs create mode 100644 book/src/http_node.md create mode 100644 eth2/utils/rest_types/src/node.rs diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index cf74cbed96..bf2e78fa85 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -12,7 +12,7 @@ libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "4e3003d5283040fe types = { path = "../../eth2/types" } hashmap_delay = { path = "../../eth2/utils/hashmap_delay" } eth2_ssz_types = { path = "../../eth2/utils/ssz_types" } -serde = "1.0.102" +serde = { version = "1.0.102", features = ["derive"] } serde_derive = "1.0.102" eth2_ssz = "0.1.2" eth2_ssz_derive = "0.1.0" diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2-libp2p/src/lib.rs index e751760a40..98f3ffc9f3 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2-libp2p/src/lib.rs @@ -20,6 +20,6 @@ pub use config::Config as NetworkConfig; pub use libp2p::gossipsub::{MessageId, Topic, TopicHash}; pub use libp2p::{multiaddr, Multiaddr}; pub use libp2p::{PeerId, Swarm}; -pub use peer_manager::{PeerDB, PeerInfo}; +pub use peer_manager::{PeerDB, PeerInfo, PeerSyncStatus}; pub use rpc::RPCEvent; pub use service::Service; diff --git a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs index 78bb22493f..16974dacf8 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs @@ -15,7 +15,7 @@ use types::EthSpec; mod peer_info; mod peerdb; -pub use peer_info::PeerInfo; +pub use peer_info::{PeerInfo, PeerSyncStatus}; /// The minimum reputation before a peer is disconnected. // Most likely this needs tweaking const MINIMUM_REPUTATION_BEFORE_BAN: Rep = 20; @@ -196,14 +196,14 @@ impl PeerManager { pub fn connect_ingoing(&mut self, peer_id: &PeerId) -> bool { self.update_reputations(); let mut peerdb = self.network_globals.peers.write(); - peerdb.new_peer(peer_id); if !peerdb.connection_status(peer_id).is_banned() { peerdb.connect_ingoing(peer_id); + // start a ping and status timer for the peer + self.ping_peers.insert(peer_id.clone()); + self.status_peers.insert(peer_id.clone()); + return true; } - // start a ping and status timer for the peer - self.ping_peers.insert(peer_id.clone()); - self.status_peers.insert(peer_id.clone()); false } @@ -213,14 +213,14 @@ impl PeerManager { pub fn connect_outgoing(&mut self, peer_id: &PeerId) -> bool { self.update_reputations(); let mut peerdb = self.network_globals.peers.write(); - peerdb.new_peer(peer_id); if !peerdb.connection_status(peer_id).is_banned() { peerdb.connect_outgoing(peer_id); + // start a ping and status timer for the peer + self.ping_peers.insert(peer_id.clone()); + self.status_peers.insert(peer_id.clone()); + return true; } - // start a ping and status timer for the peer - self.ping_peers.insert(peer_id.clone()); - self.status_peers.insert(peer_id.clone()); false } diff --git a/beacon_node/eth2-libp2p/src/peer_manager/peer_info.rs b/beacon_node/eth2-libp2p/src/peer_manager/peer_info.rs index 8a6ca98cd0..584520ba0a 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/peer_info.rs +++ b/beacon_node/eth2-libp2p/src/peer_manager/peer_info.rs @@ -1,7 +1,7 @@ use super::peerdb::{Rep, DEFAULT_REPUTATION}; use crate::rpc::MetaData; use std::time::Instant; -use types::{EthSpec, SubnetId}; +use types::{EthSpec, Slot, SubnetId}; use PeerConnectionStatus::*; /// Information about a given connected peer. @@ -17,7 +17,7 @@ pub struct PeerInfo { pub connection_status: PeerConnectionStatus, /// The current syncing state of the peer. The state may be determined after it's initial /// connection. - pub syncing_status: PeerSyncingStatus, + pub sync_status: PeerSyncStatus, /// The ENR subnet bitfield of the peer. This may be determined after it's initial /// connection. pub meta_data: Option>, @@ -33,7 +33,7 @@ impl Default for PeerInfo { _version: vec![0], }, connection_status: Default::default(), - syncing_status: PeerSyncingStatus::Unknown, + sync_status: PeerSyncStatus::Unknown, meta_data: None, } } @@ -98,14 +98,18 @@ pub enum PeerConnectionStatus { }, } -#[derive(Debug, Clone)] -pub enum PeerSyncingStatus { - /// At the current state as our node. - Synced, - /// The peer is further ahead than our node and useful for block downloads. - Ahead, +#[derive(Debug, Clone, PartialEq)] +pub enum PeerSyncStatus { + /// At the current state as our node or ahead of us. + Synced { + /// The last known head slot from the peer's handshake. + status_head_slot: Slot, + }, /// Is behind our current head and not useful for block downloads. - Behind, + Behind { + /// The last known head slot from the peer's handshake. + status_head_slot: Slot, + }, /// Not currently known as a STATUS handshake has not occurred. Unknown, } diff --git a/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs index f2c792c2ea..7fd9aeae8e 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs @@ -1,7 +1,7 @@ -use super::peer_info::{PeerConnectionStatus, PeerInfo}; +use super::peer_info::{PeerConnectionStatus, PeerInfo, PeerSyncStatus}; use crate::rpc::methods::MetaData; use crate::PeerId; -use slog::warn; +use slog::{crit, warn}; use std::collections::HashMap; use types::{EthSpec, SubnetId}; @@ -31,6 +31,9 @@ impl PeerDB { peers: HashMap::new(), } } + + /* Getters */ + /// Gives the reputation of a peer, or DEFAULT_REPUTATION if it is unknown. pub fn reputation(&self, peer_id: &PeerId) -> Rep { self.peers @@ -53,6 +56,15 @@ impl PeerDB { self.peers.get_mut(peer_id) } + /// Returns true if the peer is synced at least to our current head. + pub fn peer_synced(&self, peer_id: &PeerId) -> bool { + match self.peers.get(peer_id).map(|info| &info.sync_status) { + Some(PeerSyncStatus::Synced { .. }) => true, + Some(_) => false, + None => false, + } + } + /// Gives the ids of all known connected peers. pub fn connected_peers(&self) -> impl Iterator { self.peers @@ -61,6 +73,19 @@ impl PeerDB { .map(|(peer_id, _)| peer_id) } + /// Gives the `peer_id` of all known connected and synced peers. + pub fn synced_peers(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| { + if let PeerSyncStatus::Synced { .. } = info.sync_status { + return info.connection_status.is_connected(); + } + false + }) + .map(|(peer_id, _)| peer_id) + } + /// Gives an iterator of all peers on a given subnet. pub fn peers_on_subnet(&self, subnet_id: &SubnetId) -> impl Iterator { let subnet_id_filter = subnet_id.clone(); @@ -115,6 +140,16 @@ impl PeerDB { .map(|(id, _)| id) } + /// Gets the connection status of the peer. + pub fn connection_status(&self, peer_id: &PeerId) -> PeerConnectionStatus { + self.peer_info(peer_id) + .map_or(PeerConnectionStatus::default(), |info| { + info.connection_status.clone() + }) + } + + /* Setters */ + /// Sets a peer as connected with an ingoing connection pub fn connect_ingoing(&mut self, peer_id: &PeerId) { let info = self @@ -128,15 +163,6 @@ impl PeerDB { info.connection_status.connect_ingoing(); } - /// Add the meta data of a peer. - pub fn add_metadata(&mut self, peer_id: &PeerId, meta_data: MetaData) { - if let Some(peer_info) = self.peers.get_mut(peer_id) { - peer_info.meta_data = Some(meta_data); - } else { - warn!(self.log, "Tried to add meta data for a non-existant peer"; "peer_id" => format!("{}", peer_id)); - } - } - /// Sets a peer as connected with an outgoing connection pub fn connect_outgoing(&mut self, peer_id: &PeerId) { let info = self @@ -197,31 +223,35 @@ impl PeerDB { info.connection_status.ban(); } - /// Inserts a new peer with the default PeerInfo if it is not already present - /// Returns if the peer was new to the PeerDB - pub fn new_peer(&mut self, peer_id: &PeerId) -> bool { - if !self.peers.contains_key(peer_id) { - self.peers.insert(peer_id.clone(), Default::default()); - return true; + /// Add the meta data of a peer. + pub fn add_metadata(&mut self, peer_id: &PeerId, meta_data: MetaData) { + if let Some(peer_info) = self.peers.get_mut(peer_id) { + peer_info.meta_data = Some(meta_data); + } else { + warn!(self.log, "Tried to add meta data for a non-existant peer"; "peer_id" => format!("{}", peer_id)); } - false } - /// Sets the reputation of peer + /// Sets the reputation of peer. pub fn set_reputation(&mut self, peer_id: &PeerId, rep: Rep) { - let log_ref = &self.log; - self.peers - .entry(peer_id.clone()) - .or_insert_with(|| { - warn!(log_ref, "Setting the reputation of an unknown peer"; - "peer_id" => format!("{:?}",peer_id)); - PeerInfo::default() - }) - .reputation = rep; + if let Some(peer_info) = self.peers.get_mut(peer_id) { + peer_info.reputation = rep; + } else { + crit!(self.log, "Tried to modify reputation for an unknown peer"; "peer_id" => format!("{}",peer_id)); + } + } + + /// Sets the syncing status of a peer. + pub fn set_sync_status(&mut self, peer_id: &PeerId, sync_status: PeerSyncStatus) { + if let Some(peer_info) = self.peers.get_mut(peer_id) { + peer_info.sync_status = sync_status; + } else { + crit!(self.log, "Tried to the sync status for an unknown peer"; "peer_id" => format!("{}",peer_id)); + } } /// Adds to a peer's reputation by `change`. If the reputation exceeds Rep's - /// upper (lower) bounds, it stays at the maximum (minimum) value + /// upper (lower) bounds, it stays at the maximum (minimum) value. pub fn add_reputation(&mut self, peer_id: &PeerId, change: Rep) { let log_ref = &self.log; let info = self.peers.entry(peer_id.clone()).or_insert_with(|| { @@ -231,13 +261,6 @@ impl PeerDB { }); info.reputation = info.reputation.saturating_add(change); } - - pub fn connection_status(&self, peer_id: &PeerId) -> PeerConnectionStatus { - self.peer_info(peer_id) - .map_or(PeerConnectionStatus::default(), |info| { - info.connection_status.clone() - }) - } } #[cfg(test)] @@ -355,9 +378,6 @@ mod tests { let p0 = PeerId::random(); let p1 = PeerId::random(); let p2 = PeerId::random(); - pdb.new_peer(&p0); - pdb.new_peer(&p1); - pdb.new_peer(&p2); pdb.connect_ingoing(&p0); pdb.connect_ingoing(&p1); pdb.connect_ingoing(&p2); @@ -378,9 +398,6 @@ mod tests { let p0 = PeerId::random(); let p1 = PeerId::random(); let p2 = PeerId::random(); - pdb.new_peer(&p0); - pdb.new_peer(&p1); - pdb.new_peer(&p2); pdb.connect_ingoing(&p0); pdb.connect_ingoing(&p1); pdb.connect_ingoing(&p2); @@ -401,7 +418,7 @@ mod tests { let random_peer = PeerId::random(); - pdb.new_peer(&random_peer); + pdb.connect_ingoing(&random_peer); assert_eq!(pdb.n_dc, pdb.disconnected_peers().count()); pdb.connect_ingoing(&random_peer); diff --git a/beacon_node/eth2-libp2p/src/types/globals.rs b/beacon_node/eth2-libp2p/src/types/globals.rs index 1dce8129f4..0be9d4cb44 100644 --- a/beacon_node/eth2-libp2p/src/types/globals.rs +++ b/beacon_node/eth2-libp2p/src/types/globals.rs @@ -1,6 +1,7 @@ //! A collection of variables that are accessible outside of the network thread itself. use crate::peer_manager::PeerDB; use crate::rpc::methods::MetaData; +use crate::types::SyncState; use crate::{discovery::enr::Eth2Enr, Enr, GossipTopic, Multiaddr, PeerId}; use parking_lot::RwLock; use std::collections::HashSet; @@ -24,6 +25,8 @@ pub struct NetworkGlobals { pub peers: RwLock>, /// The current gossipsub topic subscriptions. pub gossipsub_subscriptions: RwLock>, + /// The current sync status of the node. + pub sync_state: RwLock, } impl NetworkGlobals { @@ -45,6 +48,7 @@ impl NetworkGlobals { listen_port_udp: AtomicU16::new(udp_port), peers: RwLock::new(PeerDB::new(log)), gossipsub_subscriptions: RwLock::new(HashSet::new()), + sync_state: RwLock::new(SyncState::Stalled), } } @@ -78,4 +82,38 @@ impl NetworkGlobals { pub fn connected_peers(&self) -> usize { self.peers.read().connected_peers().count() } + + /// Returns in the node is syncing. + pub fn is_syncing(&self) -> bool { + self.sync_state.read().is_syncing() + } + + /// Returns the current sync state of the peer. + pub fn sync_state(&self) -> SyncState { + self.sync_state.read().clone() + } + + /// Updates the syncing state of the node. + /// + /// If there is a new state, the old state and the new states are returned. + pub fn update_sync_state(&self) -> Option<(SyncState, SyncState)> { + let mut result = None; + // if we are in a range sync, nothing changes. Range sync will update this. + if !self.is_syncing() { + let new_state = self + .peers + .read() + .synced_peers() + .next() + .map(|_| SyncState::Synced) + .unwrap_or_else(|| SyncState::Stalled); + + let mut peer_state = self.sync_state.write(); + if new_state != *peer_state { + result = Some((peer_state.clone(), new_state.clone())); + } + *peer_state = new_state; + } + result + } } diff --git a/beacon_node/eth2-libp2p/src/types/mod.rs b/beacon_node/eth2-libp2p/src/types/mod.rs index 410df08df6..94d24bad6e 100644 --- a/beacon_node/eth2-libp2p/src/types/mod.rs +++ b/beacon_node/eth2-libp2p/src/types/mod.rs @@ -1,6 +1,7 @@ pub mod error; mod globals; mod pubsub; +mod sync_state; mod topics; use types::{BitVector, EthSpec}; @@ -12,4 +13,5 @@ pub type Enr = libp2p::discv5::enr::Enr; pub use globals::NetworkGlobals; pub use pubsub::PubsubMessage; +pub use sync_state::SyncState; pub use topics::{GossipEncoding, GossipKind, GossipTopic}; diff --git a/beacon_node/eth2-libp2p/src/types/sync_state.rs b/beacon_node/eth2-libp2p/src/types/sync_state.rs new file mode 100644 index 0000000000..572d33a31b --- /dev/null +++ b/beacon_node/eth2-libp2p/src/types/sync_state.rs @@ -0,0 +1,66 @@ +use serde::{Deserialize, Serialize}; +use types::{Hash256, Slot}; + +/// The current state of the node. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum SyncState { + /// The node is performing a long-range (batch) sync over a finalized chain. + /// In this state, parent lookups are disabled. + SyncingFinalized { + start_slot: Slot, + head_slot: Slot, + head_root: Hash256, + }, + /// The node is performing a long-range (batch) sync over one or many head chains. + /// In this state parent lookups are disabled. + SyncingHead { start_slot: Slot, head_slot: Slot }, + /// The node is up to date with all known peers and is connected to at least one + /// fully synced peer. In this state, parent lookups are enabled. + Synced, + /// No useful peers are connected. Long-range sync's cannot proceed and we have no useful + /// peers to download parents for. More peers need to be connected before we can proceed. + Stalled, +} + +impl PartialEq for SyncState { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (SyncState::SyncingFinalized { .. }, SyncState::SyncingFinalized { .. }) => true, + (SyncState::SyncingHead { .. }, SyncState::SyncingHead { .. }) => true, + (SyncState::Synced, SyncState::Synced) => true, + (SyncState::Stalled, SyncState::Stalled) => true, + _ => false, + } + } +} + +impl SyncState { + /// Returns a boolean indicating the node is currently performing a long-range sync. + pub fn is_syncing(&self) -> bool { + match self { + SyncState::SyncingFinalized { .. } => true, + SyncState::SyncingHead { .. } => true, + SyncState::Synced => false, + SyncState::Stalled => false, + } + } + + /// Returns true if the node is synced. + pub fn is_synced(&self) -> bool { + match self { + SyncState::Synced => true, + _ => false, + } + } +} + +impl std::fmt::Display for SyncState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SyncState::SyncingFinalized { .. } => write!(f, "Syncing Finalized Chain"), + SyncState::SyncingHead { .. } => write!(f, "Syncing Head Chain"), + SyncState::Synced { .. } => write!(f, "Synced"), + SyncState::Stalled { .. } => write!(f, "Stalled"), + } + } +} diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index e987a71682..073b38cc2f 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -11,7 +11,7 @@ use crate::service::NetworkMessage; use beacon_chain::{AttestationType, BeaconChain, BeaconChainTypes}; use eth2_libp2p::{ rpc::{RPCError, RPCErrorResponse, RPCRequest, RPCResponse, RequestId, ResponseTermination}, - MessageId, PeerId, PubsubMessage, RPCEvent, + MessageId, NetworkGlobals, PeerId, PubsubMessage, RPCEvent, }; use futures::future::Future; use futures::stream::Stream; @@ -55,6 +55,7 @@ impl Router { /// Initializes and runs the Router. pub fn spawn( beacon_chain: Arc>, + network_globals: Arc>, network_send: mpsc::UnboundedSender>, executor: &tokio::runtime::TaskExecutor, log: slog::Logger, @@ -65,7 +66,13 @@ impl Router { let (handler_send, handler_recv) = mpsc::unbounded_channel(); // Initialise a message instance, which itself spawns the syncing thread. - let processor = Processor::new(executor, beacon_chain, network_send.clone(), &log); + let processor = Processor::new( + executor, + beacon_chain, + network_globals, + network_send.clone(), + &log, + ); // generate the Message handler let mut handler = Router { diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 2907851a55..f113adf70a 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -6,7 +6,7 @@ use beacon_chain::{ }; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId}; -use eth2_libp2p::PeerId; +use eth2_libp2p::{NetworkGlobals, PeerId}; use slog::{debug, error, o, trace, warn}; use ssz::Encode; use std::sync::Arc; @@ -70,6 +70,7 @@ impl Processor { pub fn new( executor: &tokio::runtime::TaskExecutor, beacon_chain: Arc>, + network_globals: Arc>, network_send: mpsc::UnboundedSender>, log: &slog::Logger, ) -> Self { @@ -78,7 +79,8 @@ impl Processor { // spawn the sync thread let (sync_send, _sync_exit) = crate::sync::manager::spawn( executor, - Arc::downgrade(&beacon_chain), + beacon_chain.clone(), + network_globals, network_send.clone(), sync_logger, ); @@ -170,7 +172,16 @@ impl Processor { /// Process a `Status` response from a peer. pub fn on_status_response(&mut self, peer_id: PeerId, status: StatusMessage) { - trace!(self.log, "StatusResponse"; "peer" => format!("{:?}", peer_id)); + trace!( + self.log, + "Received Status Response"; + "peer" => format!("{:?}", peer_id), + "fork_digest" => format!("{:?}", status.fork_digest), + "finalized_root" => format!("{:?}", status.finalized_root), + "finalized_epoch" => format!("{:?}", status.finalized_epoch), + "head_root" => format!("{}", status.head_root), + "head_slot" => format!("{}", status.head_slot), + ); // Process the status message, without sending back another status. self.process_status(peer_id, status); @@ -268,7 +279,7 @@ impl Processor { .exists::>(&remote.head_root) .unwrap_or_else(|_| false) { - trace!( + debug!( self.log, "Peer with known chain found"; "peer" => format!("{:?}", peer_id), "remote_head_slot" => remote.head_slot, diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 6db0a21a77..a88e743a95 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -65,15 +65,8 @@ impl NetworkService { )> { // build the network channel let (network_send, network_recv) = mpsc::unbounded_channel::>(); - // Get a reference to the beacon chain store + // get a reference to the beacon chain store let store = beacon_chain.store.clone(); - // launch the router task - let router_send = Router::spawn( - beacon_chain.clone(), - network_send.clone(), - executor, - network_log.clone(), - )?; let propagation_percentage = config.propagation_percentage; @@ -95,7 +88,18 @@ impl NetworkService { // This is currently used to obtain the listening addresses from the libp2p service. let initial_delay = Delay::new(Instant::now() + Duration::from_secs(1)); - // create the attestation service + // launch derived network services + + // router task + let router_send = Router::spawn( + beacon_chain.clone(), + network_globals.clone(), + network_send.clone(), + executor, + network_log.clone(), + )?; + + // attestation service let attestation_service = AttestationService::new(beacon_chain.clone(), network_globals.clone(), &network_log); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 5da5ba49b3..5b9b5f4e50 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -39,20 +39,19 @@ use super::range_sync::{BatchId, RangeSync}; use crate::router::processor::PeerSyncInfo; use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; -use eth2_libp2p::rpc::methods::*; -use eth2_libp2p::rpc::RequestId; -use eth2_libp2p::PeerId; +use eth2_libp2p::rpc::{methods::*, RequestId}; +use eth2_libp2p::types::NetworkGlobals; +use eth2_libp2p::{PeerId, PeerSyncStatus}; use fnv::FnvHashMap; use futures::prelude::*; use rand::seq::SliceRandom; use slog::{crit, debug, error, info, trace, warn, Logger}; use smallvec::SmallVec; use std::boxed::Box; -use std::collections::HashSet; use std::ops::Sub; -use std::sync::Weak; +use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; -use types::{EthSpec, Hash256, SignedBeaconBlock}; +use types::{EthSpec, Hash256, SignedBeaconBlock, Slot}; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a @@ -126,32 +125,16 @@ struct ParentRequests { pending: Option, } -#[derive(PartialEq, Debug, Clone)] -/// The current state of the `ImportManager`. -enum ManagerState { - /// The manager is performing a long-range (batch) sync. In this mode, parent lookups are - /// disabled. - Syncing, - - /// The manager is up to date with all known peers and is connected to at least one - /// fully-syncing peer. In this state, parent lookups are enabled. - Regular, - - /// No useful peers are connected. Long-range sync's cannot proceed and we have no useful - /// peers to download parents for. More peers need to be connected before we can proceed. - Stalled, -} - /// The primary object for handling and driving all the current syncing logic. It maintains the /// current state of the syncing process, the number of useful peers, downloaded blocks and /// controls the logic behind both the long-range (batch) sync and the on-going potential parent /// look-up of blocks. pub struct SyncManager { - /// A weak reference to the underlying beacon chain. - chain: Weak>, + /// A reference to the underlying beacon chain. + chain: Arc>, - /// The current state of the import manager. - state: ManagerState, + /// A reference to the network globals and peer-db. + network_globals: Arc>, /// A receiving channel sent by the message processor thread. input_channel: mpsc::UnboundedReceiver>, @@ -171,9 +154,6 @@ pub struct SyncManager { /// The flag allows us to determine if the peer returned data or sent us nothing. single_block_lookups: FnvHashMap, - /// The collection of known, connected, fully-sync'd peers. - full_peers: HashSet, - /// The logger for the import manager. log: Logger, @@ -186,7 +166,8 @@ pub struct SyncManager { /// dropped during the syncing process which will gracefully end the `SyncManager`. pub fn spawn( executor: &tokio::runtime::TaskExecutor, - beacon_chain: Weak>, + beacon_chain: Arc>, + network_globals: Arc>, network_send: mpsc::UnboundedSender>, log: slog::Logger, ) -> ( @@ -200,14 +181,18 @@ pub fn spawn( // create an instance of the SyncManager let sync_manager = SyncManager { - chain: beacon_chain.clone(), - state: ManagerState::Stalled, - input_channel: sync_recv, + range_sync: RangeSync::new( + beacon_chain.clone(), + network_globals.clone(), + sync_send.clone(), + log.clone(), + ), network: SyncNetworkContext::new(network_send, log.clone()), - range_sync: RangeSync::new(beacon_chain, sync_send.clone(), log.clone()), + chain: beacon_chain, + network_globals, + input_channel: sync_recv, parent_queue: SmallVec::new(), single_block_lookups: FnvHashMap::default(), - full_peers: HashSet::new(), log: log.clone(), sync_send: sync_send.clone(), }; @@ -239,17 +224,7 @@ impl SyncManager { /// ours that we consider it fully sync'd with respect to our current chain. fn add_peer(&mut self, peer_id: PeerId, remote: PeerSyncInfo) { // ensure the beacon chain still exists - let chain = match self.chain.upgrade() { - Some(chain) => chain, - None => { - warn!(self.log, - "Beacon chain dropped. Peer not considered for sync"; - "peer_id" => format!("{:?}", peer_id)); - return; - } - }; - - let local = match PeerSyncInfo::from_chain(&chain) { + let local = match PeerSyncInfo::from_chain(&self.chain) { Some(local) => local, None => { return error!( @@ -263,12 +238,12 @@ impl SyncManager { // If a peer is within SLOT_IMPORT_TOLERANCE from our head slot, ignore a batch/range sync, // consider it a fully-sync'd peer. if remote.head_slot.sub(local.head_slot).as_usize() < SLOT_IMPORT_TOLERANCE { - trace!(self.log, "Ignoring full sync with peer"; + trace!(self.log, "Peer synced to our head found"; "peer" => format!("{:?}", peer_id), "peer_head_slot" => remote.head_slot, "local_head_slot" => local.head_slot, ); - self.add_full_peer(peer_id); + self.synced_peer(&peer_id, remote.head_slot); // notify the range sync that a peer has been added self.range_sync.fully_synced_peer_found(); return; @@ -277,19 +252,14 @@ impl SyncManager { // Check if the peer is significantly behind us. If within `SLOT_IMPORT_TOLERANCE` // treat them as a fully synced peer. If not, ignore them in the sync process if local.head_slot.sub(remote.head_slot).as_usize() < SLOT_IMPORT_TOLERANCE { - self.add_full_peer(peer_id.clone()); + self.synced_peer(&peer_id, remote.head_slot); } else { - debug!( - self.log, - "Out of sync peer connected"; - "peer" => format!("{:?}", peer_id), - ); + self.behind_peer(&peer_id, remote.head_slot); return; } // Add the peer to our RangeSync self.range_sync.add_peer(&mut self.network, peer_id, remote); - self.update_state(); } /// The response to a `BlocksByRoot` request. @@ -398,43 +368,41 @@ impl SyncManager { } // we have the correct block, try and process it - if let Some(chain) = self.chain.upgrade() { - match BlockProcessingOutcome::shim(chain.process_block(block.clone())) { - Ok(outcome) => { - match outcome { - BlockProcessingOutcome::Processed { block_root } => { - info!(self.log, "Processed block"; "block" => format!("{}", block_root)); + match BlockProcessingOutcome::shim(self.chain.process_block(block.clone())) { + Ok(outcome) => { + match outcome { + BlockProcessingOutcome::Processed { block_root } => { + info!(self.log, "Processed block"; "block" => format!("{}", block_root)); - match chain.fork_choice() { - Ok(()) => trace!( - self.log, - "Fork choice success"; - "location" => "single block" - ), - Err(e) => error!( - self.log, - "Fork choice failed"; - "error" => format!("{:?}", e), - "location" => "single block" - ), - } - } - BlockProcessingOutcome::ParentUnknown { .. } => { - // We don't know of the blocks parent, begin a parent lookup search - self.add_unknown_block(peer_id, block); - } - BlockProcessingOutcome::BlockIsAlreadyKnown => { - trace!(self.log, "Single block lookup already known"); - } - _ => { - warn!(self.log, "Single block lookup failed"; "outcome" => format!("{:?}", outcome)); - self.network.downvote_peer(peer_id); + match self.chain.fork_choice() { + Ok(()) => trace!( + self.log, + "Fork choice success"; + "location" => "single block" + ), + Err(e) => error!( + self.log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "single block" + ), } } + BlockProcessingOutcome::ParentUnknown { .. } => { + // We don't know of the blocks parent, begin a parent lookup search + self.add_unknown_block(peer_id, block); + } + BlockProcessingOutcome::BlockIsAlreadyKnown => { + trace!(self.log, "Single block lookup already known"); + } + _ => { + warn!(self.log, "Single block lookup failed"; "outcome" => format!("{:?}", outcome)); + self.network.downvote_peer(peer_id); + } } - Err(e) => { - warn!(self.log, "Unexpected block processing error"; "error" => format!("{:?}", e)); - } + } + Err(e) => { + warn!(self.log, "Unexpected block processing error"; "error" => format!("{:?}", e)); } } } @@ -442,8 +410,8 @@ impl SyncManager { /// A block has been sent to us that has an unknown parent. This begins a parent lookup search /// to find the parent or chain of parents that match our current chain. fn add_unknown_block(&mut self, peer_id: PeerId, block: SignedBeaconBlock) { - // If we are not in regular sync mode, ignore this block - if self.state != ManagerState::Regular { + // If we are not synced ignore the block + if !self.network_globals.sync_state.read().is_synced() { return; } @@ -473,8 +441,8 @@ impl SyncManager { /// A request to search for a block hash has been received. This function begins a BlocksByRoot /// request to find the requested block. fn search_for_block(&mut self, peer_id: PeerId, block_hash: Hash256) { - // If we are not in regular sync mode, ignore this block - if self.state != ManagerState::Regular { + // If we are not synced, ignore this block + if !self.network_globals.sync_state.read().is_synced() { return; } @@ -516,42 +484,60 @@ impl SyncManager { fn peer_disconnect(&mut self, peer_id: &PeerId) { self.range_sync.peer_disconnect(&mut self.network, peer_id); - self.full_peers.remove(peer_id); - self.update_state(); + self.update_sync_state(); } - fn add_full_peer(&mut self, peer_id: PeerId) { - debug!( - self.log, "Fully synced peer added"; - "peer" => format!("{:?}", peer_id), - ); - self.full_peers.insert(peer_id); + /// Updates the syncing state of a peer to be synced. + fn synced_peer(&mut self, peer_id: &PeerId, status_head_slot: Slot) { + if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { + match peer_info.sync_status { + PeerSyncStatus::Synced { .. } => { + peer_info.sync_status = PeerSyncStatus::Synced { status_head_slot } + } // just update block + PeerSyncStatus::Behind { .. } | PeerSyncStatus::Unknown => { + peer_info.sync_status = PeerSyncStatus::Synced { status_head_slot }; + debug!(self.log, "Peer transitioned to synced status"; "peer_id" => format!("{}", peer_id)); + } + } + } else { + crit!(self.log, "Status'd peer is unknown"; "peer_id" => format!("{}", peer_id)); + } + self.update_sync_state(); } + /// Updates the syncing state of a peer to be behind. + fn behind_peer(&mut self, peer_id: &PeerId, status_head_slot: Slot) { + if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { + match peer_info.sync_status { + PeerSyncStatus::Synced { .. } => { + debug!(self.log, "Peer transitioned to from synced state to behind"; "peer_id" => format!("{}", peer_id), "head_slot" => status_head_slot); + peer_info.sync_status = PeerSyncStatus::Behind { status_head_slot } + } + PeerSyncStatus::Behind { .. } => { + peer_info.sync_status = PeerSyncStatus::Behind { status_head_slot } + } // just update + + PeerSyncStatus::Unknown => { + debug!(self.log, "Peer transitioned to behind sync status"; "peer_id" => format!("{}", peer_id), "head_slot" => status_head_slot); + peer_info.sync_status = PeerSyncStatus::Behind { status_head_slot } + } + } + } else { + crit!(self.log, "Status'd peer is unknown"; "peer_id" => format!("{}", peer_id)); + } + self.update_sync_state(); + } + + /// Updates the global sync state and logs any changes. + fn update_sync_state(&mut self) { + if let Some((old_state, new_state)) = self.network_globals.update_sync_state() { + info!(self.log, "Sync state updated"; "old_state" => format!("{}", old_state), "new_state" => format!("{}",new_state)); + } + } /* Processing State Functions */ // These functions are called in the main poll function to transition the state of the sync // manager - /// Updates the syncing state of the `SyncManager`. - fn update_state(&mut self) { - let previous_state = self.state.clone(); - self.state = { - if self.range_sync.is_syncing() { - ManagerState::Syncing - } else if !self.full_peers.is_empty() { - ManagerState::Regular - } else { - ManagerState::Stalled - } - }; - if self.state != previous_state { - info!(self.log, "Syncing state updated"; - "old_state" => format!("{:?}", previous_state), - "new_state" => format!("{:?}", self.state), - ); - } - } - /// A new block has been received for a parent lookup query, process it. fn process_parent_request(&mut self, mut parent_request: ParentRequests) { // verify the last added block is the parent of the last requested block @@ -598,55 +584,50 @@ impl SyncManager { // If the last block in the queue has an unknown parent, we continue the parent // lookup-search. - if let Some(chain) = self.chain.upgrade() { - let newest_block = parent_request - .downloaded_blocks - .pop() - .expect("There is always at least one block in the queue"); - match BlockProcessingOutcome::shim(chain.process_block(newest_block.clone())) { - Ok(BlockProcessingOutcome::ParentUnknown { .. }) => { - // need to keep looking for parents - // add the block back to the queue and continue the search - parent_request.downloaded_blocks.push(newest_block); - self.request_parent(parent_request); - return; - } - Ok(BlockProcessingOutcome::Processed { .. }) - | Ok(BlockProcessingOutcome::BlockIsAlreadyKnown { .. }) => { - spawn_block_processor( - self.chain.clone(), - ProcessId::ParentLookup(parent_request.last_submitted_peer.clone()), - parent_request.downloaded_blocks, - self.sync_send.clone(), - self.log.clone(), - ); - } - Ok(outcome) => { - // all else we consider the chain a failure and downvote the peer that sent - // us the last block - warn!( - self.log, "Invalid parent chain. Downvoting peer"; - "outcome" => format!("{:?}", outcome), - "last_peer" => format!("{:?}", parent_request.last_submitted_peer), - ); - self.network - .downvote_peer(parent_request.last_submitted_peer.clone()); - return; - } - Err(e) => { - warn!( - self.log, "Parent chain processing error. Downvoting peer"; - "error" => format!("{:?}", e), - "last_peer" => format!("{:?}", parent_request.last_submitted_peer), - ); - self.network - .downvote_peer(parent_request.last_submitted_peer.clone()); - return; - } + let newest_block = parent_request + .downloaded_blocks + .pop() + .expect("There is always at least one block in the queue"); + match BlockProcessingOutcome::shim(self.chain.process_block(newest_block.clone())) { + Ok(BlockProcessingOutcome::ParentUnknown { .. }) => { + // need to keep looking for parents + // add the block back to the queue and continue the search + parent_request.downloaded_blocks.push(newest_block); + self.request_parent(parent_request); + return; + } + Ok(BlockProcessingOutcome::Processed { .. }) + | Ok(BlockProcessingOutcome::BlockIsAlreadyKnown { .. }) => { + spawn_block_processor( + Arc::downgrade(&self.chain), + ProcessId::ParentLookup(parent_request.last_submitted_peer.clone()), + parent_request.downloaded_blocks, + self.sync_send.clone(), + self.log.clone(), + ); + } + Ok(outcome) => { + // all else we consider the chain a failure and downvote the peer that sent + // us the last block + warn!( + self.log, "Invalid parent chain. Downvoting peer"; + "outcome" => format!("{:?}", outcome), + "last_peer" => format!("{:?}", parent_request.last_submitted_peer), + ); + self.network + .downvote_peer(parent_request.last_submitted_peer.clone()); + return; + } + Err(e) => { + warn!( + self.log, "Parent chain processing error. Downvoting peer"; + "error" => format!("{:?}", e), + "last_peer" => format!("{:?}", parent_request.last_submitted_peer), + ); + self.network + .downvote_peer(parent_request.last_submitted_peer.clone()); + return; } - } else { - // chain doesn't exist, drop the parent queue and return - return; } } } @@ -678,9 +659,15 @@ impl SyncManager { block_roots: vec![parent_hash], }; // select a random fully synced peer to attempt to download the parent block - let available_peers = self.full_peers.iter().collect::>(); + let available_peers = self + .network_globals + .peers + .read() + .synced_peers() + .cloned() + .collect::>(); let peer_id = if let Some(peer_id) = available_peers.choose(&mut rand::thread_rng()) { - (**peer_id).clone() + (*peer_id).clone() } else { // there were no peers to choose from. We drop the lookup request return; @@ -763,9 +750,6 @@ impl Future for SyncManager { } } - // update the state of the manager - self.update_state(); - Ok(Async::NotReady) } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index c89b964b44..b28164b6d5 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -8,7 +8,7 @@ use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RequestId}; use eth2_libp2p::PeerId; use slog::{debug, trace, warn}; -use std::sync::Weak; +use std::sync::Arc; use tokio::sync::mpsc; use types::EthSpec; @@ -34,24 +34,22 @@ impl SyncNetworkContext { pub fn status_peer( &mut self, - chain: Weak>, + chain: Arc>, peer_id: PeerId, ) { - if let Some(chain) = chain.upgrade() { - if let Some(status_message) = status_message(&chain) { - debug!( - self.log, - "Sending Status Request"; - "peer" => format!("{:?}", peer_id), - "fork_digest" => format!("{:?}", status_message.fork_digest), - "finalized_root" => format!("{:?}", status_message.finalized_root), - "finalized_epoch" => format!("{:?}", status_message.finalized_epoch), - "head_root" => format!("{}", status_message.head_root), - "head_slot" => format!("{}", status_message.head_slot), - ); + if let Some(status_message) = status_message(&chain) { + debug!( + self.log, + "Sending Status Request"; + "peer" => format!("{:?}", peer_id), + "fork_digest" => format!("{:?}", status_message.fork_digest), + "finalized_root" => format!("{:?}", status_message.finalized_root), + "finalized_epoch" => format!("{:?}", status_message.finalized_epoch), + "head_root" => format!("{}", status_message.head_root), + "head_slot" => format!("{}", status_message.head_slot), + ); - let _ = self.send_rpc_request(peer_id, RPCRequest::Status(status_message)); - } + let _ = self.send_rpc_request(peer_id, RPCRequest::Status(status_message)); } } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index bc6216db20..0edb431163 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -8,7 +8,7 @@ use eth2_libp2p::PeerId; use rand::prelude::*; use slog::{crit, debug, warn}; use std::collections::HashSet; -use std::sync::Weak; +use std::sync::Arc; use tokio::sync::mpsc; use types::{Hash256, SignedBeaconBlock, Slot}; @@ -82,7 +82,8 @@ pub struct SyncingChain { /// back once batch processing has completed. sync_send: mpsc::UnboundedSender>, - chain: Weak>, + /// A reference to the underlying beacon chain. + chain: Arc>, /// A reference to the sync logger. log: slog::Logger, @@ -103,7 +104,7 @@ impl SyncingChain { target_head_root: Hash256, peer_id: PeerId, sync_send: mpsc::UnboundedSender>, - chain: Weak>, + chain: Arc>, log: slog::Logger, ) -> Self { let mut peer_pool = HashSet::new(); @@ -244,7 +245,7 @@ impl SyncingChain { let batch_id = ProcessId::RangeBatchId(batch.id.clone()); self.current_processing_batch = Some(batch); spawn_block_processor( - self.chain.clone(), + Arc::downgrade(&self.chain.clone()), batch_id, downloaded_blocks, self.sync_send.clone(), diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 8ab2627ee0..7ce4f0552e 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -8,48 +8,138 @@ use crate::router::processor::PeerSyncInfo; use crate::sync::manager::SyncMessage; use crate::sync::network_context::SyncNetworkContext; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use eth2_libp2p::PeerId; -use slog::{debug, error, warn}; -use std::sync::Weak; +use eth2_libp2p::{types::SyncState, NetworkGlobals, PeerId}; +use slog::{debug, error, info}; +use std::sync::Arc; use tokio::sync::mpsc; use types::EthSpec; use types::{Hash256, Slot}; /// The state of the long range/batch sync. -pub enum SyncState { +#[derive(Clone)] +pub enum RangeSyncState { /// A finalized chain is being synced. - Finalized, + Finalized { + /// The start of the finalized chain. + start_slot: Slot, + /// The target head slot of the finalized chain. + head_slot: Slot, + /// The target head root of the finalized chain. + head_root: Hash256, + }, /// There are no finalized chains and we are syncing one more head chains. - Head, + Head { + /// The last finalized checkpoint for all head chains. + start_slot: Slot, + /// The largest known slot to sync to. + head_slot: Slot, + }, /// There are no head or finalized chains and no long range sync is in progress. Idle, } +impl PartialEq for RangeSyncState { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (RangeSyncState::Finalized { .. }, RangeSyncState::Finalized { .. }) => true, + (RangeSyncState::Head { .. }, RangeSyncState::Head { .. }) => true, + (RangeSyncState::Idle, RangeSyncState::Idle) => true, + _ => false, + } + } +} + +impl Into for RangeSyncState { + fn into(self) -> SyncState { + match self { + RangeSyncState::Finalized { + start_slot, + head_slot, + head_root, + } => SyncState::SyncingFinalized { + start_slot, + head_slot, + head_root, + }, + RangeSyncState::Head { + start_slot, + head_slot, + } => SyncState::SyncingHead { + start_slot, + head_slot, + }, + RangeSyncState::Idle => SyncState::Stalled, // this should never really be used + } + } +} + /// A collection of finalized and head chains currently being processed. pub struct ChainCollection { /// The beacon chain for processing. - beacon_chain: Weak>, + beacon_chain: Arc>, + /// A reference to the global network parameters. + network_globals: Arc>, /// The set of finalized chains being synced. finalized_chains: Vec>, /// The set of head chains being synced. head_chains: Vec>, /// The current sync state of the process. - sync_state: SyncState, + state: RangeSyncState, + /// Logger for the collection. + log: slog::Logger, } impl ChainCollection { - pub fn new(beacon_chain: Weak>) -> Self { + pub fn new( + beacon_chain: Arc>, + network_globals: Arc>, + log: slog::Logger, + ) -> Self { ChainCollection { - sync_state: SyncState::Idle, + beacon_chain, + network_globals, finalized_chains: Vec::new(), head_chains: Vec::new(), - beacon_chain, + state: RangeSyncState::Idle, + log, } } - /// The current syncing state. - pub fn sync_state(&self) -> &SyncState { - &self.sync_state + pub fn state(&self) -> &RangeSyncState { + &self.state + } + + /// Updates the global sync state and logs any changes. + fn update_sync_state(&mut self, state: RangeSyncState) { + // if there is no range sync occurring, the state is either synced or not based on + // connected peers. + self.state = state; + + if self.state == RangeSyncState::Idle { + // there is no range sync, let the state of peers determine the global node sync state + let new_state = self + .network_globals + .peers + .read() + .synced_peers() + .next() + .map(|_| SyncState::Synced) + .unwrap_or_else(|| SyncState::Stalled); + let mut peer_state = self.network_globals.sync_state.write(); + if new_state != *peer_state { + info!(self.log, "Sync state updated"; "old_state" => format!("{}",peer_state), "new_state" => format!("{}",new_state)); + } + *peer_state = new_state; + } else { + // The state is based on a range sync state, update it + let mut node_sync_state = self.network_globals.sync_state.write(); + let new_state: SyncState = self.state.clone().into(); + if *node_sync_state != new_state { + // we are updating the state, inform the user + info!(self.log, "Sync state updated"; "old_state" => format!("{}",node_sync_state), "new_state" => format!("{}",new_state)); + } + *node_sync_state = new_state; + } } /// A fully synced peer has joined. @@ -57,9 +147,10 @@ impl ChainCollection { /// We could be awaiting a head sync. If we are in the head syncing state, without any head /// chains, then update the state to idle. pub fn fully_synced_peer_found(&mut self) { - if let SyncState::Head = self.sync_state { + if let RangeSyncState::Head { .. } = self.state { if self.head_chains.is_empty() { - self.sync_state = SyncState::Idle; + // Update the global network state to either synced or stalled. + self.update_sync_state(RangeSyncState::Idle); } } } @@ -68,8 +159,19 @@ impl ChainCollection { /// `SyncState::Head` indicating we are awaiting new peers to connect before we can consider /// the state as idle. pub fn set_head_sync(&mut self) { - if let SyncState::Idle = self.sync_state { - self.sync_state = SyncState::Head; + if let RangeSyncState::Idle = self.state { + let current_slot = self + .beacon_chain + .head_info() + .map(|info| info.slot) + .unwrap_or_else(|_| Slot::from(0u64)); + // NOTE: This will modify the /node/syncing API to show current slot for all fields + // while we update peers to look for new potentially HEAD chains. + let temp_head_state = RangeSyncState::Head { + start_slot: current_slot, + head_slot: current_slot, + }; + self.update_sync_state(temp_head_state); } } @@ -103,36 +205,26 @@ impl ChainCollection { /// /// This removes any out-dated chains, swaps to any higher priority finalized chains and /// updates the state of the collection. - pub fn update_finalized( - &mut self, - network: &mut SyncNetworkContext, - log: &slog::Logger, - ) { - let local_slot = match self.beacon_chain.upgrade() { - Some(chain) => { - let local = match PeerSyncInfo::from_chain(&chain) { - Some(local) => local, - None => { - return error!( - log, - "Failed to get peer sync info"; - "msg" => "likely due to head lock contention" - ) - } - }; + pub fn update_finalized(&mut self, network: &mut SyncNetworkContext) { + let local_slot = { + let local = match PeerSyncInfo::from_chain(&self.beacon_chain) { + Some(local) => local, + None => { + return error!( + self.log, + "Failed to get peer sync info"; + "msg" => "likely due to head lock contention" + ) + } + }; - local - .finalized_epoch - .start_slot(T::EthSpec::slots_per_epoch()) - } - None => { - warn!(log, "Beacon chain dropped. Chains not updated"); - return; - } + local + .finalized_epoch + .start_slot(T::EthSpec::slots_per_epoch()) }; // Remove any outdated finalized chains - self.purge_outdated_chains(network, log); + self.purge_outdated_chains(network); // Check if any chains become the new syncing chain if let Some(index) = self.finalized_syncing_index() { @@ -149,13 +241,20 @@ impl ChainCollection { }) { // A chain has more peers. Swap the syncing chain - debug!(log, "Switching finalized chains to sync"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_slot"=> chain.start_slot); + debug!(self.log, "Switching finalized chains to sync"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_slot"=> chain.start_slot); + + // update the state to a new finalized state + let state = RangeSyncState::Finalized { + start_slot: chain.start_slot, + head_slot: chain.target_head_slot, + head_root: chain.target_head_root, + }; + self.update_sync_state(state); // Stop the current chain from syncing self.finalized_chains[index].stop_syncing(); // Start the new chain self.finalized_chains[new_index].start_syncing(network, local_slot); - self.sync_state = SyncState::Finalized; } } else if let Some(chain) = self .finalized_chains @@ -163,15 +262,36 @@ impl ChainCollection { .max_by_key(|chain| chain.peer_pool.len()) { // There is no currently syncing finalization chain, starting the one with the most peers - debug!(log, "New finalized chain started syncing"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_slot"=> chain.start_slot); + debug!(self.log, "New finalized chain started syncing"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_slot"=> chain.start_slot); chain.start_syncing(network, local_slot); - self.sync_state = SyncState::Finalized; + let state = RangeSyncState::Finalized { + start_slot: chain.start_slot, + head_slot: chain.target_head_slot, + head_root: chain.target_head_root, + }; + self.update_sync_state(state); } else { // There are no finalized chains, update the state. if self.head_chains.is_empty() { - self.sync_state = SyncState::Idle; + self.update_sync_state(RangeSyncState::Idle); } else { - self.sync_state = SyncState::Head; + // for the syncing API, we find the minimal start_slot and the maximum + // target_slot of all head chains to report back. + + let (min_slot, max_slot) = self.head_chains.iter().fold( + (Slot::from(0u64), Slot::from(0u64)), + |(min, max), chain| { + ( + std::cmp::min(min, chain.start_slot), + std::cmp::max(max, chain.target_head_slot), + ) + }, + ); + let head_state = RangeSyncState::Head { + start_slot: min_slot, + head_slot: max_slot, + }; + self.update_sync_state(head_state); } } } @@ -184,7 +304,6 @@ impl ChainCollection { target_slot: Slot, peer_id: PeerId, sync_send: mpsc::UnboundedSender>, - log: &slog::Logger, ) { self.finalized_chains.push(SyncingChain::new( local_finalized_slot, @@ -193,7 +312,7 @@ impl ChainCollection { peer_id, sync_send, self.beacon_chain.clone(), - log.clone(), + self.log.clone(), )); } @@ -207,7 +326,6 @@ impl ChainCollection { target_slot: Slot, peer_id: PeerId, sync_send: mpsc::UnboundedSender>, - log: &slog::Logger, ) { // remove the peer from any other head chains @@ -223,7 +341,7 @@ impl ChainCollection { peer_id, sync_send, self.beacon_chain.clone(), - log.clone(), + self.log.clone(), ); // All head chains can sync simultaneously new_head_chain.start_syncing(network, remote_finalized_slot); @@ -281,29 +399,20 @@ impl ChainCollection { /// /// This removes chains with no peers, or chains whose start block slot is less than our current /// finalized block slot. - pub fn purge_outdated_chains( - &mut self, - network: &mut SyncNetworkContext, - log: &slog::Logger, - ) { + pub fn purge_outdated_chains(&mut self, network: &mut SyncNetworkContext) { // Remove any chains that have no peers self.finalized_chains .retain(|chain| !chain.peer_pool.is_empty()); self.head_chains.retain(|chain| !chain.peer_pool.is_empty()); - let (beacon_chain, local_info) = match self.beacon_chain.upgrade() { - Some(chain) => match PeerSyncInfo::from_chain(&chain) { - Some(local) => (chain, local), - None => { - return error!( - log, - "Failed to get peer sync info"; - "msg" => "likely due to head lock contention" - ) - } - }, + let local_info = match PeerSyncInfo::from_chain(&self.beacon_chain) { + Some(local) => local, None => { - return; + return error!( + self.log, + "Failed to get peer sync info"; + "msg" => "likely due to head lock contention" + ) } }; @@ -311,6 +420,8 @@ impl ChainCollection { .finalized_epoch .start_slot(T::EthSpec::slots_per_epoch()); + let beacon_chain = &self.beacon_chain; + let log_ref = &self.log; // Remove chains that are out-dated and re-status their peers self.finalized_chains.retain(|chain| { if chain.target_head_slot <= local_finalized_slot @@ -318,7 +429,7 @@ impl ChainCollection { .fork_choice .contains_block(&chain.target_head_root) { - debug!(log, "Purging out of finalized chain"; "start_slot" => chain.start_slot, "end_slot" => chain.target_head_slot); + debug!(log_ref, "Purging out of finalized chain"; "start_slot" => chain.start_slot, "end_slot" => chain.target_head_slot); chain.status_peers(network); false } else { @@ -331,7 +442,7 @@ impl ChainCollection { .fork_choice .contains_block(&chain.target_head_root) { - debug!(log, "Purging out of date head chain"; "start_slot" => chain.start_slot, "end_slot" => chain.target_head_slot); + debug!(log_ref, "Purging out of date head chain"; "start_slot" => chain.start_slot, "end_slot" => chain.target_head_slot); chain.status_peers(network); false } else { @@ -355,12 +466,7 @@ impl ChainCollection { /// finalized chain length, indicates a head chain. /// /// This will re-status the chains peers on removal. The index must exist. - pub fn remove_chain( - &mut self, - network: &mut SyncNetworkContext, - index: usize, - log: &slog::Logger, - ) { + pub fn remove_chain(&mut self, network: &mut SyncNetworkContext, index: usize) { let chain = if index >= self.finalized_chains.len() { let index = index - self.finalized_chains.len(); let chain = self.head_chains.swap_remove(index); @@ -372,10 +478,10 @@ impl ChainCollection { chain }; - debug!(log, "Chain was removed"; "start_slot" => chain.start_slot.as_u64(), "end_slot" => chain.target_head_slot.as_u64()); + debug!(self.log, "Chain was removed"; "start_slot" => chain.start_slot.as_u64(), "end_slot" => chain.target_head_slot.as_u64()); // update the state - self.update_finalized(network, log); + self.update_finalized(network); } /// Returns the index of finalized chain that is currently syncing. Returns `None` if no diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 7e81af782b..1288e4e96c 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -40,7 +40,7 @@ //! and further batches are requested as current blocks are being processed. use super::chain::ProcessingResult; -use super::chain_collection::{ChainCollection, SyncState}; +use super::chain_collection::{ChainCollection, RangeSyncState}; use super::BatchId; use crate::router::processor::PeerSyncInfo; use crate::sync::block_processor::BatchProcessResult; @@ -48,10 +48,10 @@ use crate::sync::manager::SyncMessage; use crate::sync::network_context::SyncNetworkContext; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::rpc::RequestId; -use eth2_libp2p::PeerId; -use slog::{debug, error, trace, warn}; +use eth2_libp2p::{NetworkGlobals, PeerId}; +use slog::{debug, error, trace}; use std::collections::HashSet; -use std::sync::Weak; +use std::sync::Arc; use tokio::sync::mpsc; use types::{EthSpec, SignedBeaconBlock}; @@ -60,7 +60,7 @@ use types::{EthSpec, SignedBeaconBlock}; /// holds the current state of the long range sync. pub struct RangeSync { /// The beacon chain for processing. - beacon_chain: Weak>, + beacon_chain: Arc>, /// A collection of chains that need to be downloaded. This stores any head or finalized chains /// that need to be downloaded. chains: ChainCollection, @@ -77,13 +77,14 @@ pub struct RangeSync { impl RangeSync { pub fn new( - beacon_chain: Weak>, + beacon_chain: Arc>, + network_globals: Arc>, sync_send: mpsc::UnboundedSender>, log: slog::Logger, ) -> Self { RangeSync { beacon_chain: beacon_chain.clone(), - chains: ChainCollection::new(beacon_chain), + chains: ChainCollection::new(beacon_chain, network_globals, log.clone()), awaiting_head_peers: HashSet::new(), sync_send, log, @@ -118,21 +119,14 @@ impl RangeSync { // determine if we need to run a sync to the nearest finalized state or simply sync to // its current head - let (chain, local_info) = match self.beacon_chain.upgrade() { - Some(chain) => match PeerSyncInfo::from_chain(&chain) { - Some(local) => (chain, local), - None => { - return error!( - self.log, - "Failed to get peer sync info"; - "msg" => "likely due to head lock contention" - ) - } - }, + let local_info = match PeerSyncInfo::from_chain(&self.beacon_chain) { + Some(local) => local, None => { - return warn!(self.log, - "Beacon chain dropped. Peer not considered for sync"; - "peer_id" => format!("{:?}", peer_id)); + return error!( + self.log, + "Failed to get peer sync info"; + "msg" => "likely due to head lock contention" + ) } }; @@ -148,10 +142,13 @@ impl RangeSync { self.remove_peer(network, &peer_id); // remove any out-of-date chains - self.chains.purge_outdated_chains(network, &self.log); + self.chains.purge_outdated_chains(network); if remote_finalized_slot > local_info.head_slot - && !chain.fork_choice.contains_block(&remote.finalized_root) + && !self + .beacon_chain + .fork_choice + .contains_block(&remote.finalized_root) { debug!(self.log, "Finalization sync peer joined"; "peer_id" => format!("{:?}", peer_id)); // Finalized chain search @@ -171,7 +168,7 @@ impl RangeSync { chain.add_peer(network, peer_id); // check if the new peer's addition will favour a new syncing chain. - self.chains.update_finalized(network, &self.log); + self.chains.update_finalized(network); } else { // there is no finalized chain that matches this peer's last finalized target // create a new finalized chain @@ -183,9 +180,8 @@ impl RangeSync { remote_finalized_slot, peer_id, self.sync_send.clone(), - &self.log, ); - self.chains.update_finalized(network, &self.log); + self.chains.update_finalized(network); } } else { if self.chains.is_finalizing_sync() { @@ -216,10 +212,9 @@ impl RangeSync { remote.head_slot, peer_id, self.sync_send.clone(), - &self.log, ); } - self.chains.update_finalized(network, &self.log); + self.chains.update_finalized(network); } } @@ -274,7 +269,7 @@ impl RangeSync { chain.status_peers(network); // update the state of the collection - self.chains.update_finalized(network, &self.log); + self.chains.update_finalized(network); // set the state to a head sync, to inform the manager that we are awaiting a // head chain. @@ -282,13 +277,13 @@ impl RangeSync { // if there are no more finalized chains, re-status all known peers awaiting a head // sync - match self.chains.sync_state() { - SyncState::Idle | SyncState::Head => { + match self.chains.state() { + RangeSyncState::Idle | RangeSyncState::Head { .. } => { for peer_id in self.awaiting_head_peers.drain() { network.status_peer(self.beacon_chain.clone(), peer_id); } } - SyncState::Finalized => {} // Have more finalized chains to complete + RangeSyncState::Finalized { .. } => {} // Have more finalized chains to complete } } Some((_, ProcessingResult::KeepChain)) => {} @@ -308,7 +303,7 @@ impl RangeSync { chain.status_peers(network); // update the state of the collection - self.chains.update_finalized(network, &self.log); + self.chains.update_finalized(network); } Some((_, ProcessingResult::KeepChain)) => {} None => { @@ -321,15 +316,6 @@ impl RangeSync { } } - /// Public method to indicate the current state of the long range sync. - pub fn is_syncing(&self) -> bool { - match self.chains.sync_state() { - SyncState::Finalized => true, - SyncState::Head => true, - SyncState::Idle => false, - } - } - /// A peer has disconnected. This removes the peer from any ongoing chains and mappings. A /// disconnected peer could remove a chain pub fn peer_disconnect( @@ -344,7 +330,7 @@ impl RangeSync { self.remove_peer(network, peer_id); // update the state of the collection - self.chains.update_finalized(network, &self.log); + self.chains.update_finalized(network); } /// When a peer gets removed, both the head and finalized chains need to be searched to check which pool the peer is in. The chain may also have a batch or batches awaiting @@ -370,7 +356,7 @@ impl RangeSync { { // the chain needed to be removed debug!(self.log, "Chain being removed due to failed batch"); - self.chains.remove_chain(network, index, &self.log); + self.chains.remove_chain(network, index); } } @@ -392,7 +378,7 @@ impl RangeSync { Some((_, ProcessingResult::KeepChain)) => {} // error handled chain persists Some((index, ProcessingResult::RemoveChain)) => { debug!(self.log, "Chain being removed due to RPC error"); - self.chains.remove_chain(network, index, &self.log) + self.chains.remove_chain(network, index) } None => {} // request wasn't in the finalized chains, check the head chains } diff --git a/beacon_node/rest_api/src/node.rs b/beacon_node/rest_api/src/node.rs index 228a7ef0d3..2995957e02 100644 --- a/beacon_node/rest_api/src/node.rs +++ b/beacon_node/rest_api/src/node.rs @@ -1,9 +1,50 @@ use crate::response_builder::ResponseBuilder; use crate::ApiResult; +use eth2_libp2p::{types::SyncState, NetworkGlobals}; use hyper::{Body, Request}; +use rest_types::{SyncingResponse, SyncingStatus}; +use std::sync::Arc; +use types::{EthSpec, Slot}; use version; /// Read the version string from the current Lighthouse build. pub fn get_version(req: Request) -> ApiResult { ResponseBuilder::new(&req)?.body_no_ssz(&version::version()) } + +pub fn syncing( + req: Request, + network: Arc>, + current_slot: Slot, +) -> ApiResult { + let (starting_slot, highest_slot) = match network.sync_state() { + SyncState::SyncingFinalized { + start_slot, + head_slot, + .. + } + | SyncState::SyncingHead { + start_slot, + head_slot, + } => (start_slot, head_slot), + SyncState::Synced | SyncState::Stalled => (Slot::from(0u64), current_slot), + }; + + let sync_status = SyncingStatus { + starting_slot, + current_slot, + highest_slot, + }; + + ResponseBuilder::new(&req)?.body(&SyncingResponse { + is_syncing: network.is_syncing(), + sync_status, + }) +} + +pub fn lighthouse_syncing( + req: Request, + network: Arc>, +) -> ApiResult { + ResponseBuilder::new(&req)?.body_no_ssz(&network.sync_state()) +} diff --git a/beacon_node/rest_api/src/router.rs b/beacon_node/rest_api/src/router.rs index f086cd1ff7..3c5c7dc7b0 100644 --- a/beacon_node/rest_api/src/router.rs +++ b/beacon_node/rest_api/src/router.rs @@ -11,6 +11,7 @@ use slog::debug; use std::path::PathBuf; use std::sync::Arc; use std::time::Instant; +use types::Slot; fn into_boxfut(item: F) -> BoxFut where @@ -44,7 +45,20 @@ pub fn route( // Methods for Client (&Method::GET, "/node/version") => into_boxfut(node::get_version(req)), (&Method::GET, "/node/syncing") => { - into_boxfut(helpers::implementation_pending_response(req)) + // inform the current slot, or set to 0 + let current_slot = beacon_chain + .head_info() + .map(|info| info.slot) + .unwrap_or_else(|_| Slot::from(0u64)); + + into_boxfut(node::syncing::( + req, + network_globals, + current_slot, + )) + } + (&Method::GET, "/node/lighthouse_syncing") => { + into_boxfut(node::lighthouse_syncing::(req, network_globals)) } // Methods for Network diff --git a/book/src/http.md b/book/src/http.md index 042881af05..fe164fd05f 100644 --- a/book/src/http.md +++ b/book/src/http.md @@ -14,6 +14,7 @@ detail: Endpoint | Description | | --- | -- | +[`/node`](./node.md) | General information about the beacon node. [`/beacon`](./http_beacon.md) | General information about the beacon chain. [`/validator`](./http_validator.md) | Provides functionality to validator clients. [`/consensus`](./http_consensus.md) | Proof-of-stake voting statistics. diff --git a/book/src/http_node.md b/book/src/http_node.md new file mode 100644 index 0000000000..4e4768bb8e --- /dev/null +++ b/book/src/http_node.md @@ -0,0 +1,104 @@ +# Lighthouse REST API: `/node` + +The `/node` endpoints provide information about the lighthouse beacon node. + +## Endpoints + +HTTP Path | Description | +| --- | -- | +[`/node/version`](#nodeversion) | Get the node's version. +[`/node/syncing`](#nodesyncing) | Get the node's syncing status. +[`/node/syncing`](#nodelighthouse_syncing) | Get the node's syncing status +(Lighthouse specific). + +## `/node/version` + +Requests the beacon node's version. + +### HTTP Specification + +| Property | Specification | +| --- |--- | +Path | `/node/version` +Method | GET +JSON Encoding | String +Query Parameters | None +Typical Responses | 200 + +### Example Response + +```json +"Lighthouse-0.2.0-unstable" +``` + +## `/node/syncing` + +Requests the syncing status of the beacon node. + +### HTTP Specification + +| Property | Specification | +| --- |--- | +Path | `/node/syncing` +Method | GET +JSON Encoding | Object +Query Parameters | None +Typical Responses | 200 + +### Example Response + +```json +{ + is_syncing: true, + sync_status: { + starting_slot: 0, + current_slot: 100, + highest_slot: 200, + } +} +``` + +## `/node/lighthouse_syncing` + +Requests the syncing state of a Lighthouse beacon node. Lighthouse as a +custom sync protocol, this request gets Lighthouse-specific sync information. + +### HTTP Specification + +| Property | Specification | +| --- |--- | +Path | `/node/lighthouse_syncing` +Method | GET +JSON Encoding | Object +Query Parameters | None +Typical Responses | 200 + +### Example Response + +If the node is undergoing a finalization sync: +```json +{ + "SyncingFinalized": { + "start_slot": 10, + "head_slot": 20, + "head_root":"0x74020d0e3c3c02d2ea6279d5760f7d0dd376c4924beaaec4d5c0cefd1c0c4465" + } +} +``` + +If the node is undergoing a head chain sync: +```json +{ + "SyncingHead": { + "start_slot":0, + "head_slot":1195 + } +} +``` + +If the node is synced +```json +{ +"Synced" +} +``` diff --git a/eth2/utils/rest_types/src/lib.rs b/eth2/utils/rest_types/src/lib.rs index 9fc257a8c7..2c834f6e71 100644 --- a/eth2/utils/rest_types/src/lib.rs +++ b/eth2/utils/rest_types/src/lib.rs @@ -4,6 +4,7 @@ mod beacon; mod consensus; +mod node; mod validator; pub use beacon::{ @@ -16,3 +17,5 @@ pub use validator::{ }; pub use consensus::{IndividualVote, IndividualVotesRequest, IndividualVotesResponse}; + +pub use node::{SyncingResponse, SyncingStatus}; diff --git a/eth2/utils/rest_types/src/node.rs b/eth2/utils/rest_types/src/node.rs new file mode 100644 index 0000000000..ecacacc1ce --- /dev/null +++ b/eth2/utils/rest_types/src/node.rs @@ -0,0 +1,32 @@ +//! Collection of types for the /node HTTP +use serde::{Deserialize, Serialize}; +use ssz_derive::{Decode, Encode}; +use types::Slot; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Encode, Decode)] +/// The current syncing status of the node. +pub struct SyncingStatus { + /// The starting slot of sync. + /// + /// For a finalized sync, this is the start slot of the current finalized syncing + /// chain. + /// + /// For head sync this is the last finalized slot. + pub starting_slot: Slot, + /// The current slot. + pub current_slot: Slot, + /// The highest known slot. For the current syncing chain. + /// + /// For a finalized sync, the target finalized slot. + /// For head sync, this is the highest known slot of all head chains. + pub highest_slot: Slot, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Encode, Decode)] +/// The response for the /node/syncing HTTP GET. +pub struct SyncingResponse { + /// Is the node syncing. + pub is_syncing: bool, + /// The current sync status. + pub sync_status: SyncingStatus, +}