use super::import_queue::{ImportQueue, PartialBeaconBlockCompletion}; use crate::message_handler::NetworkContext; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::PeerId; use slog::{debug, error, info, o, trace, warn}; use ssz::Encode; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use store::Store; use types::{ Attestation, BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, EthSpec, Hash256, Slot, }; /// The number of slots that we can import blocks ahead of us, before going into full Sync mode. const SLOT_IMPORT_TOLERANCE: u64 = 100; /// The amount of seconds a block (or partial block) may exist in the import queue. const QUEUE_STALE_SECS: u64 = 100; /// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it. /// Otherwise we queue it. const FUTURE_SLOT_TOLERANCE: u64 = 1; const SHOULD_FORWARD_GOSSIP_BLOCK: bool = true; const SHOULD_NOT_FORWARD_GOSSIP_BLOCK: bool = false; /// Keeps track of syncing information for known connected peers. #[derive(Clone, Copy, Debug)] pub struct PeerSyncInfo { network_id: u8, chain_id: u64, latest_finalized_root: Hash256, latest_finalized_epoch: Epoch, best_root: Hash256, best_slot: Slot, } impl From for PeerSyncInfo { fn from(hello: HelloMessage) -> PeerSyncInfo { PeerSyncInfo { network_id: hello.network_id, chain_id: hello.chain_id, latest_finalized_root: hello.latest_finalized_root, latest_finalized_epoch: hello.latest_finalized_epoch, best_root: hello.best_root, best_slot: hello.best_slot, } } } impl From<&Arc>> for PeerSyncInfo { fn from(chain: &Arc>) -> PeerSyncInfo { Self::from(hello_message(chain)) } } /// The current syncing state. #[derive(PartialEq)] pub enum SyncState { Idle, Downloading, _Stopped, } /// Simple Syncing protocol. pub struct SimpleSync { /// A reference to the underlying beacon chain. chain: Arc>, /// A mapping of Peers to their respective PeerSyncInfo. known_peers: HashMap, /// A queue to allow importing of blocks import_queue: ImportQueue, /// The current state of the syncing protocol. state: SyncState, log: slog::Logger, } impl SimpleSync { /// Instantiate a `SimpleSync` instance, with no peers and an empty queue. pub fn new(beacon_chain: Arc>, log: &slog::Logger) -> Self { let sync_logger = log.new(o!("Service"=> "Sync")); let queue_item_stale_time = Duration::from_secs(QUEUE_STALE_SECS); let import_queue = ImportQueue::new(beacon_chain.clone(), queue_item_stale_time, log.clone()); SimpleSync { chain: beacon_chain.clone(), known_peers: HashMap::new(), import_queue, state: SyncState::Idle, log: sync_logger, } } /// Handle a `Goodbye` message from a peer. /// /// Removes the peer from `known_peers`. pub fn on_goodbye(&mut self, peer_id: PeerId, reason: GoodbyeReason) { info!( self.log, "PeerGoodbye"; "peer" => format!("{:?}", peer_id), "reason" => format!("{:?}", reason), ); self.known_peers.remove(&peer_id); } /// Handle a peer disconnect. /// /// Removes the peer from `known_peers`. pub fn on_disconnect(&mut self, peer_id: PeerId) { info!( self.log, "Peer Disconnected"; "peer" => format!("{:?}", peer_id), ); self.known_peers.remove(&peer_id); } /// Handle the connection of a new peer. /// /// Sends a `Hello` message to the peer. pub fn on_connect(&self, peer_id: PeerId, network: &mut NetworkContext) { info!(self.log, "PeerConnected"; "peer" => format!("{:?}", peer_id)); network.send_rpc_request(peer_id, RPCRequest::Hello(hello_message(&self.chain))); } /// Handle a `Hello` request. /// /// Processes the `HelloMessage` from the remote peer and sends back our `Hello`. pub fn on_hello_request( &mut self, peer_id: PeerId, request_id: RequestId, hello: HelloMessage, network: &mut NetworkContext, ) { debug!(self.log, "HelloRequest"; "peer" => format!("{:?}", peer_id)); // Say hello back. network.send_rpc_response( peer_id.clone(), request_id, RPCResponse::Hello(hello_message(&self.chain)), ); self.process_hello(peer_id, hello, network); } /// Process a `Hello` response from a peer. pub fn on_hello_response( &mut self, peer_id: PeerId, hello: HelloMessage, network: &mut NetworkContext, ) { debug!(self.log, "HelloResponse"; "peer" => format!("{:?}", peer_id)); // Process the hello message, without sending back another hello. self.process_hello(peer_id, hello, network); } /// Process a `Hello` message, requesting new blocks if appropriate. /// /// Disconnects the peer if required. fn process_hello( &mut self, peer_id: PeerId, hello: HelloMessage, network: &mut NetworkContext, ) { let remote = PeerSyncInfo::from(hello); let local = PeerSyncInfo::from(&self.chain); let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()); if local.network_id != remote.network_id { // The node is on a different network, disconnect them. info!( self.log, "HandshakeFailure"; "peer" => format!("{:?}", peer_id), "reason" => "network_id" ); network.disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork); } else if remote.latest_finalized_epoch <= local.latest_finalized_epoch && remote.latest_finalized_root != Hash256::zero() && local.latest_finalized_root != Hash256::zero() && (self.root_at_slot(start_slot(remote.latest_finalized_epoch)) != Some(remote.latest_finalized_root)) { // The remotes finalized epoch is less than or greater than ours, but the block root is // different to the one in our chain. // // Therefore, the node is on a different chain and we should not communicate with them. info!( self.log, "HandshakeFailure"; "peer" => format!("{:?}", peer_id), "reason" => "different finalized chain" ); network.disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork); } else if remote.latest_finalized_epoch < local.latest_finalized_epoch { // The node has a lower finalized epoch, their chain is not useful to us. There are two // cases where a node can have a lower finalized epoch: // // ## The node is on the same chain // // If a node is on the same chain but has a lower finalized epoch, their head must be // lower than ours. Therefore, we have nothing to request from them. // // ## The node is on a fork // // If a node is on a fork that has a lower finalized epoch, switching to that fork would // cause us to revert a finalized block. This is not permitted, therefore we have no // interest in their blocks. debug!( self.log, "NaivePeer"; "peer" => format!("{:?}", peer_id), "reason" => "lower finalized epoch" ); } else if self .chain .store .exists::>(&remote.best_root) .unwrap_or_else(|_| false) { // If the node's best-block is already known to us, we have nothing to request. debug!( self.log, "NaivePeer"; "peer" => format!("{:?}", peer_id), "reason" => "best block is known" ); } else { // The remote node has an equal or great finalized epoch and we don't know it's head. // // Therefore, there are some blocks between the local finalized epoch and the remote // head that are worth downloading. debug!( self.log, "UsefulPeer"; "peer" => format!("{:?}", peer_id), "local_finalized_epoch" => local.latest_finalized_epoch, "remote_latest_finalized_epoch" => remote.latest_finalized_epoch, ); let start_slot = local .latest_finalized_epoch .start_slot(T::EthSpec::slots_per_epoch()); let required_slots = remote.best_slot - start_slot; self.request_block_roots( peer_id, BeaconBlockRootsRequest { start_slot, count: required_slots.as_u64(), }, network, ); } } fn root_at_slot(&self, target_slot: Slot) -> Option { self.chain .rev_iter_block_roots() .find(|(_root, slot)| *slot == target_slot) .map(|(root, _slot)| root) } /// Handle a `BeaconBlockRoots` request from the peer. pub fn on_beacon_block_roots_request( &mut self, peer_id: PeerId, request_id: RequestId, req: BeaconBlockRootsRequest, network: &mut NetworkContext, ) { debug!( self.log, "BlockRootsRequest"; "peer" => format!("{:?}", peer_id), "count" => req.count, "start_slot" => req.start_slot, ); let mut roots: Vec = self .chain .rev_iter_block_roots() .take_while(|(_root, slot)| req.start_slot <= *slot) .filter(|(_root, slot)| *slot < req.start_slot + req.count) .map(|(block_root, slot)| BlockRootSlot { slot, block_root }) .collect(); if roots.len() as u64 != req.count { debug!( self.log, "BlockRootsRequest"; "peer" => format!("{:?}", peer_id), "msg" => "Failed to return all requested hashes", "start_slot" => req.start_slot, "current_slot" => self.chain.present_slot(), "requested" => req.count, "returned" => roots.len(), ); } roots.reverse(); roots.dedup_by_key(|brs| brs.block_root); network.send_rpc_response( peer_id, request_id, RPCResponse::BeaconBlockRoots(BeaconBlockRootsResponse { roots }), ) } /// Handle a `BeaconBlockRoots` response from the peer. pub fn on_beacon_block_roots_response( &mut self, peer_id: PeerId, res: BeaconBlockRootsResponse, network: &mut NetworkContext, ) { debug!( self.log, "BlockRootsResponse"; "peer" => format!("{:?}", peer_id), "count" => res.roots.len(), ); if res.roots.is_empty() { warn!( self.log, "Peer returned empty block roots response"; "peer_id" => format!("{:?}", peer_id) ); return; } // The wire protocol specifies that slots must be in ascending order. if !res.slots_are_ascending() { warn!( self.log, "Peer returned block roots response with bad slot ordering"; "peer_id" => format!("{:?}", peer_id) ); return; } let new_roots = self .import_queue .enqueue_block_roots(&res.roots, peer_id.clone()); // No new roots means nothing to do. // // This check protects against future panics. if new_roots.is_empty() { return; } // Determine the first (earliest) and last (latest) `BlockRootSlot` items. // // This logic relies upon slots to be in ascending order, which is enforced earlier. let first = new_roots.first().expect("Non-empty list must have first"); let last = new_roots.last().expect("Non-empty list must have last"); // Request all headers between the earliest and latest new `BlockRootSlot` items. self.request_block_headers( peer_id, BeaconBlockHeadersRequest { start_root: first.block_root, start_slot: first.slot, max_headers: (last.slot - first.slot + 1).as_u64(), skip_slots: 0, }, network, ) } /// Handle a `BeaconBlockHeaders` request from the peer. pub fn on_beacon_block_headers_request( &mut self, peer_id: PeerId, request_id: RequestId, req: BeaconBlockHeadersRequest, network: &mut NetworkContext, ) { debug!( self.log, "BlockHeadersRequest"; "peer" => format!("{:?}", peer_id), "count" => req.max_headers, ); let count = req.max_headers; // Collect the block roots. let mut roots: Vec = self .chain .rev_iter_block_roots() .take_while(|(_root, slot)| req.start_slot <= *slot) .filter(|(_root, slot)| *slot < req.start_slot + count) .map(|(root, _slot)| root) .collect(); roots.reverse(); roots.dedup(); let headers: Vec = roots .into_iter() .step_by(req.skip_slots as usize + 1) .filter_map(|root| { let block = self .chain .store .get::>(&root) .ok()?; Some(block?.block_header()) }) .collect(); // ssz-encode the headers let headers = headers.as_ssz_bytes(); network.send_rpc_response( peer_id, request_id, RPCResponse::BeaconBlockHeaders(BeaconBlockHeadersResponse { headers }), ) } /// Handle a `BeaconBlockHeaders` response from the peer. pub fn on_beacon_block_headers_response( &mut self, peer_id: PeerId, headers: Vec, network: &mut NetworkContext, ) { debug!( self.log, "BlockHeadersResponse"; "peer" => format!("{:?}", peer_id), "count" => headers.len(), ); if headers.is_empty() { warn!( self.log, "Peer returned empty block headers response. PeerId: {:?}", peer_id ); return; } // Enqueue the headers, obtaining a list of the roots of the headers which were newly added // to the queue. let block_roots = self.import_queue.enqueue_headers(headers, peer_id.clone()); if !block_roots.is_empty() { self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network); } } /// Handle a `BeaconBlockBodies` request from the peer. pub fn on_beacon_block_bodies_request( &mut self, peer_id: PeerId, request_id: RequestId, req: BeaconBlockBodiesRequest, network: &mut NetworkContext, ) { let block_bodies: Vec> = req .block_roots .iter() .filter_map(|root| { if let Ok(Some(block)) = self.chain.store.get::>(root) { Some(block.body) } else { debug!( self.log, "Peer requested unknown block"; "peer" => format!("{:?}", peer_id), "request_root" => format!("{:}", root), ); None } }) .collect(); debug!( self.log, "BlockBodiesRequest"; "peer" => format!("{:?}", peer_id), "requested" => req.block_roots.len(), "returned" => block_bodies.len(), ); let bytes = block_bodies.as_ssz_bytes(); network.send_rpc_response( peer_id, request_id, RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse { block_bodies: bytes, block_roots: None, }), ) } /// Handle a `BeaconBlockBodies` response from the peer. pub fn on_beacon_block_bodies_response( &mut self, peer_id: PeerId, res: DecodedBeaconBlockBodiesResponse, network: &mut NetworkContext, ) { debug!( self.log, "BlockBodiesResponse"; "peer" => format!("{:?}", peer_id), "count" => res.block_bodies.len(), ); if !res.block_bodies.is_empty() { // Import all blocks to queue let last_root = self .import_queue .enqueue_bodies(res.block_bodies, peer_id.clone()); // Attempt to process all received bodies by recursively processing the latest block if let Some(root) = last_root { if let Some(BlockProcessingOutcome::Processed { .. }) = self.attempt_process_partial_block(peer_id, root, network, &"rpc") { // If processing is successful remove from `import_queue` self.import_queue.remove(root); } } } // Clear out old entries self.import_queue.remove_stale(); } /// Process a gossip message declaring a new block. /// /// Attempts to apply to block to the beacon chain. May queue the block for later processing. /// /// Returns a `bool` which, if `true`, indicates we should forward the block to our peers. pub fn on_block_gossip( &mut self, peer_id: PeerId, block: BeaconBlock, network: &mut NetworkContext, ) -> bool { if let Some(outcome) = self.process_block(peer_id.clone(), block.clone(), network, &"gossip") { match outcome { BlockProcessingOutcome::Processed { .. } => SHOULD_FORWARD_GOSSIP_BLOCK, BlockProcessingOutcome::ParentUnknown { parent } => { // Add this block to the queue self.import_queue .enqueue_full_blocks(vec![block.clone()], peer_id.clone()); debug!( self.log, "RequestParentBlock"; "parent_root" => format!("{}", parent), "parent_slot" => block.slot - 1, "peer" => format!("{:?}", peer_id), ); // Request roots between parent and start of finality from peer. let start_slot = self .chain .head() .beacon_state .finalized_checkpoint .epoch .start_slot(T::EthSpec::slots_per_epoch()); self.request_block_roots( peer_id, BeaconBlockRootsRequest { // Request blocks between `latest_finalized_slot` and the `block` start_slot, count: block.slot.as_u64() - start_slot.as_u64(), }, network, ); // Clean the stale entries from the queue. self.import_queue.remove_stale(); SHOULD_FORWARD_GOSSIP_BLOCK } BlockProcessingOutcome::FutureSlot { present_slot, block_slot, } if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot => { self.import_queue .enqueue_full_blocks(vec![block], peer_id.clone()); SHOULD_FORWARD_GOSSIP_BLOCK } // Note: known blocks are forwarded on the gossip network. // // We rely upon the lower layers (libp2p) to stop loops occurring from re-gossiped // blocks. BlockProcessingOutcome::BlockIsAlreadyKnown => SHOULD_FORWARD_GOSSIP_BLOCK, _ => SHOULD_NOT_FORWARD_GOSSIP_BLOCK, } } else { SHOULD_NOT_FORWARD_GOSSIP_BLOCK } } /// Process a gossip message declaring a new attestation. /// /// Not currently implemented. pub fn on_attestation_gossip( &mut self, _peer_id: PeerId, msg: Attestation, _network: &mut NetworkContext, ) { match self.chain.process_attestation(msg) { Ok(outcome) => info!( self.log, "Processed attestation"; "source" => "gossip", "outcome" => format!("{:?}", outcome) ), Err(e) => { warn!(self.log, "InvalidAttestation"; "source" => "gossip", "error" => format!("{:?}", e)) } } } /// Request some `BeaconBlockRoots` from the remote peer. fn request_block_roots( &mut self, peer_id: PeerId, req: BeaconBlockRootsRequest, network: &mut NetworkContext, ) { // Potentially set state to sync. if self.state == SyncState::Idle && req.count > SLOT_IMPORT_TOLERANCE { debug!(self.log, "Entering downloading sync state."); self.state = SyncState::Downloading; } debug!( self.log, "RPCRequest(BeaconBlockRoots)"; "count" => req.count, "peer" => format!("{:?}", peer_id) ); // TODO: handle count > max count. network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockRoots(req)); } /// Request some `BeaconBlockHeaders` from the remote peer. fn request_block_headers( &mut self, peer_id: PeerId, req: BeaconBlockHeadersRequest, network: &mut NetworkContext, ) { debug!( self.log, "RPCRequest(BeaconBlockHeaders)"; "max_headers" => req.max_headers, "peer" => format!("{:?}", peer_id) ); network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockHeaders(req)); } /// Request some `BeaconBlockBodies` from the remote peer. fn request_block_bodies( &mut self, peer_id: PeerId, req: BeaconBlockBodiesRequest, network: &mut NetworkContext, ) { debug!( self.log, "RPCRequest(BeaconBlockBodies)"; "count" => req.block_roots.len(), "peer" => format!("{:?}", peer_id) ); network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlockBodies(req)); } /// Returns `true` if `self.chain` has not yet processed this block. pub fn chain_has_seen_block(&self, block_root: &Hash256) -> bool { !self .chain .is_new_block_root(&block_root) .unwrap_or_else(|_| { error!(self.log, "Unable to determine if block is new."); false }) } /// Generates our current state in the form of a HELLO RPC message. pub fn generate_hello(&self) -> HelloMessage { hello_message(&self.chain) } /// Helper function to attempt to process a partial block. /// /// If the block can be completed recursively call `process_block` /// else request missing parts. fn attempt_process_partial_block( &mut self, peer_id: PeerId, block_root: Hash256, network: &mut NetworkContext, source: &str, ) -> Option { match self.import_queue.attempt_complete_block(block_root) { PartialBeaconBlockCompletion::MissingBody => { // Unable to complete the block because the block body is missing. debug!( self.log, "RequestParentBody"; "source" => source, "block_root" => format!("{}", block_root), "peer" => format!("{:?}", peer_id), ); // Request the block body from the peer. self.request_block_bodies( peer_id, BeaconBlockBodiesRequest { block_roots: vec![block_root], }, network, ); None } PartialBeaconBlockCompletion::MissingHeader(slot) => { // Unable to complete the block because the block header is missing. debug!( self.log, "RequestParentHeader"; "source" => source, "block_root" => format!("{}", block_root), "peer" => format!("{:?}", peer_id), ); // Request the block header from the peer. self.request_block_headers( peer_id, BeaconBlockHeadersRequest { start_root: block_root, start_slot: slot, max_headers: 1, skip_slots: 0, }, network, ); None } PartialBeaconBlockCompletion::MissingRoot => { // The `block_root` is not known to the queue. debug!( self.log, "MissingParentRoot"; "source" => source, "block_root" => format!("{}", block_root), "peer" => format!("{:?}", peer_id), ); // Do nothing. None } PartialBeaconBlockCompletion::Complete(block) => { // The block exists in the queue, attempt to process it trace!( self.log, "AttemptProcessParent"; "source" => source, "block_root" => format!("{}", block_root), "parent_slot" => block.slot, "peer" => format!("{:?}", peer_id), ); self.process_block(peer_id.clone(), block, network, source) } } } /// Processes the `block` that was received from `peer_id`. /// /// If the block was submitted to the beacon chain without internal error, `Some(outcome)` is /// returned, otherwise `None` is returned. Note: `Some(_)` does not necessarily indicate that /// the block was successfully processed or valid. /// /// This function performs the following duties: /// /// - Attempting to import the block into the beacon chain. /// - Logging /// - Requesting unavailable blocks (e.g., if parent is unknown). /// - Disconnecting faulty nodes. /// /// This function does not remove processed blocks from the import queue. fn process_block( &mut self, peer_id: PeerId, block: BeaconBlock, network: &mut NetworkContext, source: &str, ) -> Option { let processing_result = self.chain.process_block(block.clone()); if let Ok(outcome) = processing_result { match outcome { BlockProcessingOutcome::Processed { block_root } => { // The block was valid and we processed it successfully. debug!( self.log, "Imported block from network"; "source" => source, "slot" => block.slot, "block_root" => format!("{}", block_root), "peer" => format!("{:?}", peer_id), ); } BlockProcessingOutcome::ParentUnknown { parent } => { // The parent has not been processed trace!( self.log, "ParentBlockUnknown"; "source" => source, "parent_root" => format!("{}", parent), "baby_block_slot" => block.slot, "peer" => format!("{:?}", peer_id), ); // If the parent is in the `import_queue` attempt to complete it then process it. // All other cases leave `parent` in `import_queue` and return original outcome. if let Some(BlockProcessingOutcome::Processed { .. }) = self.attempt_process_partial_block(peer_id, parent, network, source) { // If processing parent is successful, re-process block and remove parent from queue self.import_queue.remove(parent); // Attempt to process `block` again match self.chain.process_block(block) { Ok(outcome) => return Some(outcome), Err(_) => return None, } } } BlockProcessingOutcome::FutureSlot { present_slot, block_slot, } => { if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot { // The block is too far in the future, drop it. warn!( self.log, "FutureBlock"; "source" => source, "msg" => "block for future slot rejected, check your time", "present_slot" => present_slot, "block_slot" => block_slot, "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, "peer" => format!("{:?}", peer_id), ); network.disconnect(peer_id, GoodbyeReason::Fault); } else { // The block is in the future, but not too far. debug!( self.log, "QueuedFutureBlock"; "source" => source, "msg" => "queuing future block, check your time", "present_slot" => present_slot, "block_slot" => block_slot, "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, "peer" => format!("{:?}", peer_id), ); } } _ => { debug!( self.log, "InvalidBlock"; "source" => source, "msg" => "peer sent invalid block", "outcome" => format!("{:?}", outcome), "peer" => format!("{:?}", peer_id), ); } } Some(outcome) } else { error!( self.log, "BlockProcessingFailure"; "source" => source, "msg" => "unexpected condition in processing block.", "outcome" => format!("{:?}", processing_result) ); None } } } /// Build a `HelloMessage` representing the state of the given `beacon_chain`. fn hello_message(beacon_chain: &BeaconChain) -> HelloMessage { let spec = &beacon_chain.spec; let state = &beacon_chain.head().beacon_state; HelloMessage { network_id: spec.network_id, //TODO: Correctly define the chain id chain_id: spec.network_id as u64, latest_finalized_root: state.finalized_checkpoint.root, latest_finalized_epoch: state.finalized_checkpoint.epoch, best_root: beacon_chain.head().beacon_block_root, best_slot: state.slot, } }