From f96a3282b5072b6f30e16b9e94cccbfe64d7a3a7 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Fri, 22 Mar 2019 11:39:16 +1100 Subject: [PATCH] Pass first sync test --- beacon_node/network/src/sync/simple_sync.rs | 85 ++++++++++----------- beacon_node/network/tests/tests.rs | 62 ++++++++++----- 2 files changed, 81 insertions(+), 66 deletions(-) diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 6a40ecf60e..0b2f736c1d 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -1,7 +1,5 @@ use crate::beacon_chain::BeaconChain; use crate::message_handler::NetworkContext; -use crate::service::NetworkMessage; -use crossbeam_channel::Sender; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCRequest, RPCResponse}; use eth2_libp2p::PeerId; @@ -10,14 +8,13 @@ use std::collections::HashMap; use std::sync::Arc; use types::{Epoch, Hash256, Slot}; -type NetworkSender = Sender; - /// The number of slots that we can import blocks ahead of us, before going into full Sync mode. const SLOT_IMPORT_TOLERANCE: u64 = 100; /// Keeps track of syncing information for known connected peers. -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug)] pub struct PeerSyncInfo { + network_id: u8, latest_finalized_root: Hash256, latest_finalized_epoch: Epoch, best_root: Hash256, @@ -25,25 +22,24 @@ pub struct PeerSyncInfo { } impl PeerSyncInfo { - fn is_on_chain(&self, chain: &Arc) -> bool { - // TODO: make useful. - true + fn is_on_same_chain(&self, other: Self) -> bool { + self.network_id == other.network_id } - fn has_higher_finalized_epoch(&self, chain: &Arc) -> bool { - self.latest_finalized_epoch > chain.get_state().finalized_epoch + fn has_higher_finalized_epoch_than(&self, other: Self) -> bool { + self.latest_finalized_epoch > other.latest_finalized_epoch } - fn has_higher_best_slot(&self, chain: &Arc) -> bool { - self.latest_finalized_epoch > chain.get_state().finalized_epoch + fn has_higher_best_slot_than(&self, other: Self) -> bool { + self.best_slot > other.best_slot } - pub fn status(&self, chain: &Arc) -> PeerStatus { - if self.has_higher_finalized_epoch(chain) { + pub fn status_compared_to(&self, other: Self) -> PeerStatus { + if self.has_higher_finalized_epoch_than(other) { PeerStatus::HigherFinalizedEpoch - } else if !self.is_on_chain(chain) { - PeerStatus::HigherFinalizedEpoch - } else if self.has_higher_best_slot(chain) { + } else if !self.is_on_same_chain(other) { + PeerStatus::OnDifferentChain + } else if self.has_higher_best_slot_than(other) { PeerStatus::HigherBestSlot } else { PeerStatus::NotInteresting @@ -62,6 +58,7 @@ pub enum PeerStatus { impl From for PeerSyncInfo { fn from(hello: HelloMessage) -> PeerSyncInfo { PeerSyncInfo { + network_id: hello.network_id, latest_finalized_root: hello.latest_finalized_root, latest_finalized_epoch: hello.latest_finalized_epoch, best_root: hello.best_root, @@ -70,6 +67,12 @@ impl From for PeerSyncInfo { } } +impl From<&Arc> for PeerSyncInfo { + fn from(chain: &Arc) -> PeerSyncInfo { + Self::from(chain.hello_message()) + } +} + /// The current syncing state. #[derive(PartialEq)] pub enum SyncState { @@ -88,12 +91,6 @@ pub struct SimpleSync { known_peers: HashMap, /// The current state of the syncing protocol. state: SyncState, - /// The network id, for quick HELLO RPC message lookup. - network_id: u8, - /// The latest epoch of the syncing chain. - latest_finalized_epoch: Epoch, - /// The latest block of the syncing chain. - latest_slot: Slot, /// Sync logger. log: slog::Logger, } @@ -106,9 +103,6 @@ impl SimpleSync { chain: beacon_chain.clone(), known_peers: HashMap::new(), state: SyncState::Idle, - network_id: beacon_chain.get_spec().network_id, - latest_finalized_epoch: state.finalized_epoch, - latest_slot: state.slot - 1, //TODO: Build latest block function into Beacon chain and correct this log: sync_logger, } } @@ -133,40 +127,39 @@ impl SimpleSync { pub fn on_hello(&mut self, peer_id: PeerId, hello: HelloMessage, network: &mut NetworkContext) { let spec = self.chain.get_spec(); + let remote = PeerSyncInfo::from(hello); + let local = PeerSyncInfo::from(&self.chain); + let remote_status = remote.status_compared_to(local); + // network id must match - if hello.network_id != self.network_id { - debug!(self.log, "Bad network id. Peer: {:?}", peer_id); - network.disconnect(peer_id); - return; + if remote_status != PeerStatus::OnDifferentChain { + debug!(self.log, "Handshake successful. Peer: {:?}", peer_id); + self.known_peers.insert(peer_id.clone(), remote); } - let peer = PeerSyncInfo::from(hello); - debug!(self.log, "Handshake successful. Peer: {:?}", peer_id); - self.known_peers.insert(peer_id.clone(), peer); - - debug!( - self.log, - "Peer hello. Status: {:?}", - peer.status(&self.chain) - ); - - match peer.status(&self.chain) { + match remote_status { PeerStatus::OnDifferentChain => { debug!(self.log, "Peer is on different chain. Peer: {:?}", peer_id); network.disconnect(peer_id); } PeerStatus::HigherFinalizedEpoch => { - let start_slot = peer.latest_finalized_epoch.start_slot(spec.slots_per_epoch); - let required_slots = start_slot - self.chain.slot(); + let start_slot = remote + .latest_finalized_epoch + .start_slot(spec.slots_per_epoch); + let required_slots = start_slot - local.best_slot; self.request_block_roots(peer_id, start_slot, required_slots.as_u64(), network); } PeerStatus::HigherBestSlot => { - let start_slot = peer.best_slot; - let required_slots = start_slot - self.chain.slot(); + let required_slots = remote.best_slot - local.best_slot; - self.request_block_roots(peer_id, start_slot, required_slots.as_u64(), network); + self.request_block_roots( + peer_id, + local.best_slot, + required_slots.as_u64(), + network, + ); } PeerStatus::NotInteresting => {} } diff --git a/beacon_node/network/tests/tests.rs b/beacon_node/network/tests/tests.rs index 7941ffb991..dea57982e7 100644 --- a/beacon_node/network/tests/tests.rs +++ b/beacon_node/network/tests/tests.rs @@ -1,4 +1,3 @@ -use beacon_chain::test_utils::TestingBeaconChainBuilder; use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCMethod, RPCRequest, RPCResponse}; @@ -9,7 +8,6 @@ use network::service::{NetworkMessage, OutgoingMessage}; use sloggers::terminal::{Destination, TerminalLoggerBuilder}; use sloggers::types::Severity; use sloggers::Build; -use std::sync::Arc; use std::time::Duration; use test_harness::BeaconChainHarness; use tokio::runtime::TaskExecutor; @@ -19,26 +17,40 @@ pub struct SyncNode { pub id: usize, sender: Sender, receiver: Receiver, + harness: BeaconChainHarness, } impl SyncNode { - pub fn new( + fn from_beacon_state_builder( id: usize, executor: &TaskExecutor, - chain: Arc, + state_builder: TestingBeaconStateBuilder, + spec: &ChainSpec, logger: slog::Logger, ) -> Self { + let harness = BeaconChainHarness::from_beacon_state_builder(state_builder, spec.clone()); + let (network_sender, network_receiver) = unbounded(); - let message_handler_sender = - MessageHandler::spawn(chain, network_sender, executor, logger).unwrap(); + let message_handler_sender = MessageHandler::spawn( + harness.beacon_chain.clone(), + network_sender, + executor, + logger, + ) + .unwrap(); Self { id, sender: message_handler_sender, receiver: network_receiver, + harness, } } + fn increment_beacon_chain_slot(&mut self) { + self.harness.increment_beacon_chain_slot(); + } + fn send(&self, message: HandlerMessage) { self.sender.send(message).unwrap(); } @@ -47,7 +59,11 @@ impl SyncNode { self.receiver.recv_timeout(Duration::from_millis(500)) } - fn recv_rpc_response(&self) -> Result { + fn hello_message(&self) -> HelloMessage { + self.harness.beacon_chain.hello_message() + } + + fn _recv_rpc_response(&self) -> Result { let network_message = self.recv()?; Ok(match network_message { NetworkMessage::Send( @@ -108,12 +124,6 @@ impl SyncMaster { } } - pub fn build_blocks(&mut self, blocks: usize) { - for _ in 0..blocks { - self.harness.advance_chain_with_block(); - } - } - pub fn response_id(&mut self, node: &SyncNode) -> u64 { let id = self.response_ids[node.id]; self.response_ids[node.id] += 1; @@ -169,11 +179,11 @@ fn test_setup( let mut nodes = Vec::with_capacity(node_count); for id in 0..node_count { - let local_chain = TestingBeaconChainBuilder::from(state_builder.clone()).build(&spec); - let node = SyncNode::new( + let node = SyncNode::from_beacon_state_builder( id, &runtime.executor(), - Arc::new(local_chain), + state_builder.clone(), + &spec, logger.clone(), ); @@ -185,6 +195,15 @@ fn test_setup( (runtime, master, nodes) } +pub fn build_blocks(blocks: usize, master: &mut SyncMaster, nodes: &mut Vec) { + for _ in 0..blocks { + master.harness.advance_chain_with_block(); + for i in 0..nodes.len() { + nodes[i].increment_beacon_chain_slot(); + } + } +} + #[test] fn first_test() { let logger = get_logger(); @@ -195,17 +214,20 @@ fn first_test() { let state_builder = TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(validator_count, &spec); - let (runtime, mut master, nodes) = test_setup(state_builder, node_count, &spec, logger.clone()); + let (runtime, mut master, mut nodes) = + test_setup(state_builder, node_count, &spec, logger.clone()); - master.build_blocks(10); + let original_node_slot = nodes[0].hello_message().best_slot; + + build_blocks(2, &mut master, &mut nodes); master.do_hello_with(&nodes[0]); assert_sent_block_root_request( &nodes[0], BeaconBlockRootsRequest { - start_slot: Slot::new(1), - count: 10, + start_slot: original_node_slot, + count: 2, }, );