From d727e55abe26bbb1452341152bd01654a2dae022 Mon Sep 17 00:00:00 2001 From: divma Date: Thu, 19 Nov 2020 23:33:44 +0000 Subject: [PATCH] Move some rpc processing to the beacon_processor (#1936) ## Issue Addressed `BlocksByRange` requests were the main culprit of a series of timeouts to peer's requests in general because they produce build up in the router's processor. Those were moved to the blocking executor but a task is being spawned for each; also not ideal since the amount of resources we give to those is not controlled ## Proposed Changes - Move `BlocksByRange` and `BlocksByRoots` to the `beacon_processor`. The processor crafts the responses and sends them. - Move too the processing of `StatusMessage`s from other peers. This is a fast operation but it can also build up and won't scale if we keep it in the router (processing one at the time). These don't need to send an answer, so there is no harm in processing them "later" if that were to happen. Sending responses to status requests is still in the router, so we answer as soon as we see them. - Some "extras" that are basically clean up: - Split the `Worker` logic in sync methods (chain processing and rpc blocks), gossip methods (the majority of methods) and rpc methods (the new ones) - Move the `status_message` function previously provided by the router's processor to a more central place since it is used by the router, sync, network_context and beacon_processor - Some spelling ## Additional Info What's left to decide/test more thoroughly is the length of the queues and the priority rules. @paulhauner suggested at some point to put status above attestations, and @AgeManning had described an importance of "protecting gossipsub" so my solution is leaving status requests in the router and RPC methods below attestations. Slashings and Exits are at the end. --- .../src/beacon_processor/chain_segment.rs | 218 ---------- .../network/src/beacon_processor/mod.rs | 140 ++++++- .../{worker.rs => worker/gossip_methods.rs} | 224 ++++------ .../src/beacon_processor/worker/mod.rs | 43 ++ .../beacon_processor/worker/rpc_methods.rs | 251 +++++++++++ .../beacon_processor/worker/sync_methods.rs | 229 ++++++++++ beacon_node/network/src/lib.rs | 1 + beacon_node/network/src/router/mod.rs | 2 +- beacon_node/network/src/router/processor.rs | 391 +++--------------- beacon_node/network/src/status.rs | 29 ++ beacon_node/network/src/sync/manager.rs | 4 +- .../network/src/sync/network_context.rs | 4 +- .../network/src/sync/range_sync/range.rs | 7 +- 13 files changed, 824 insertions(+), 719 deletions(-) delete mode 100644 beacon_node/network/src/beacon_processor/chain_segment.rs rename beacon_node/network/src/beacon_processor/{worker.rs => worker/gossip_methods.rs} (85%) create mode 100644 beacon_node/network/src/beacon_processor/worker/mod.rs create mode 100644 beacon_node/network/src/beacon_processor/worker/rpc_methods.rs create mode 100644 beacon_node/network/src/beacon_processor/worker/sync_methods.rs create mode 100644 beacon_node/network/src/status.rs diff --git a/beacon_node/network/src/beacon_processor/chain_segment.rs b/beacon_node/network/src/beacon_processor/chain_segment.rs deleted file mode 100644 index 47e14f5e2a..0000000000 --- a/beacon_node/network/src/beacon_processor/chain_segment.rs +++ /dev/null @@ -1,218 +0,0 @@ -use crate::metrics; -use crate::router::processor::FUTURE_SLOT_TOLERANCE; -use crate::sync::manager::SyncMessage; -use crate::sync::{BatchProcessResult, ChainId}; -use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, ChainSegmentResult}; -use eth2_libp2p::PeerId; -use slog::{debug, error, trace, warn}; -use std::sync::Arc; -use tokio::sync::mpsc; -use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock}; - -/// Id associated to a block processing request, either a batch or a single block. -#[derive(Clone, Debug, PartialEq)] -pub enum ProcessId { - /// Processing Id of a range syncing batch. - RangeBatchId(ChainId, Epoch), - /// Processing Id of the parent lookup of a block. - ParentLookup(PeerId, Hash256), -} - -pub fn handle_chain_segment( - chain: Arc>, - process_id: ProcessId, - downloaded_blocks: Vec>, - sync_send: mpsc::UnboundedSender>, - log: slog::Logger, -) { - match process_id { - // this a request from the range sync - ProcessId::RangeBatchId(chain_id, epoch) => { - let start_slot = downloaded_blocks.first().map(|b| b.message.slot.as_u64()); - let end_slot = downloaded_blocks.last().map(|b| b.message.slot.as_u64()); - let sent_blocks = downloaded_blocks.len(); - - let result = match process_blocks(chain, downloaded_blocks.iter(), &log) { - (_, Ok(_)) => { - debug!(log, "Batch processed"; "batch_epoch" => epoch, "first_block_slot" => start_slot, "chain" => chain_id, - "last_block_slot" => end_slot, "processed_blocks" => sent_blocks, "service"=> "sync"); - BatchProcessResult::Success(sent_blocks > 0) - } - (imported_blocks, Err(e)) => { - debug!(log, "Batch processing failed"; "batch_epoch" => epoch, "first_block_slot" => start_slot, "chain" => chain_id, - "last_block_slot" => end_slot, "error" => e, "imported_blocks" => imported_blocks, "service" => "sync"); - BatchProcessResult::Failed(imported_blocks > 0) - } - }; - - let msg = SyncMessage::BatchProcessed { - chain_id, - epoch, - result, - }; - sync_send.send(msg).unwrap_or_else(|_| { - debug!( - log, - "Block processor could not inform range sync result. Likely shutting down." - ); - }); - } - // this is a parent lookup request from the sync manager - ProcessId::ParentLookup(peer_id, chain_head) => { - debug!( - log, "Processing parent lookup"; - "last_peer_id" => format!("{}", peer_id), - "blocks" => downloaded_blocks.len() - ); - // parent blocks are ordered from highest slot to lowest, so we need to process in - // reverse - match process_blocks(chain, downloaded_blocks.iter().rev(), &log) { - (_, Err(e)) => { - debug!(log, "Parent lookup failed"; "last_peer_id" => %peer_id, "error" => e); - sync_send - .send(SyncMessage::ParentLookupFailed{peer_id, chain_head}) - .unwrap_or_else(|_| { - // on failure, inform to downvote the peer - debug!( - log, - "Block processor could not inform parent lookup result. Likely shutting down." - ); - }); - } - (_, Ok(_)) => { - debug!(log, "Parent lookup processed successfully"); - } - } - } - } -} - -/// Helper function to process blocks batches which only consumes the chain and blocks to process. -fn process_blocks< - 'a, - T: BeaconChainTypes, - I: Iterator>, ->( - chain: Arc>, - downloaded_blocks: I, - log: &slog::Logger, -) -> (usize, Result<(), String>) { - let blocks = downloaded_blocks.cloned().collect::>(); - match chain.process_chain_segment(blocks) { - ChainSegmentResult::Successful { imported_blocks } => { - metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL); - if imported_blocks > 0 { - // Batch completed successfully with at least one block, run fork choice. - run_fork_choice(chain, log); - } - - (imported_blocks, Ok(())) - } - ChainSegmentResult::Failed { - imported_blocks, - error, - } => { - metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_FAILED_TOTAL); - let r = handle_failed_chain_segment(error, log); - if imported_blocks > 0 { - run_fork_choice(chain, log); - } - (imported_blocks, r) - } - } -} - -/// Runs fork-choice on a given chain. This is used during block processing after one successful -/// block import. -fn run_fork_choice(chain: Arc>, log: &slog::Logger) { - match chain.fork_choice() { - Ok(()) => trace!( - log, - "Fork choice success"; - "location" => "batch processing" - ), - Err(e) => error!( - log, - "Fork choice failed"; - "error" => ?e, - "location" => "batch import error" - ), - } -} - -/// Helper function to handle a `BlockError` from `process_chain_segment` -fn handle_failed_chain_segment( - error: BlockError, - log: &slog::Logger, -) -> Result<(), String> { - match error { - BlockError::ParentUnknown(block) => { - // blocks should be sequential and all parents should exist - - Err(format!( - "Block has an unknown parent: {}", - block.parent_root() - )) - } - BlockError::BlockIsAlreadyKnown => { - // This can happen for many reasons. Head sync's can download multiples and parent - // lookups can download blocks before range sync - Ok(()) - } - BlockError::FutureSlot { - present_slot, - block_slot, - } => { - if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot { - // The block is too far in the future, drop it. - warn!( - log, "Block is ahead of our slot clock"; - "msg" => "block for future slot rejected, check your time", - "present_slot" => present_slot, - "block_slot" => block_slot, - "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, - ); - } else { - // The block is in the future, but not too far. - debug!( - log, "Block is slightly ahead of our slot clock, ignoring."; - "present_slot" => present_slot, - "block_slot" => block_slot, - "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, - ); - } - - Err(format!( - "Block with slot {} is higher than the current slot {}", - block_slot, present_slot - )) - } - BlockError::WouldRevertFinalizedSlot { .. } => { - debug!( log, "Finalized or earlier block processed";); - - Ok(()) - } - BlockError::GenesisBlock => { - debug!(log, "Genesis block was processed"); - Ok(()) - } - BlockError::BeaconChainError(e) => { - warn!( - log, "BlockProcessingFailure"; - "msg" => "unexpected condition in processing block.", - "outcome" => ?e, - ); - - Err(format!("Internal error whilst processing block: {:?}", e)) - } - other => { - debug!( - log, "Invalid block received"; - "msg" => "peer sent invalid block", - "outcome" => %other, - ); - - Err(format!("Peer sent invalid block. Reason: {:?}", other)) - } - } -} diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index b23b40e542..d616ac0ad5 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -1,4 +1,4 @@ -//! Provides the `BeaconProcessor`, a mutli-threaded processor for messages received on the network +//! Provides the `BeaconProcessor`, a multi-threaded processor for messages received on the network //! that need to be processed by the `BeaconChain`. //! //! Uses `tokio` tasks (instead of raw threads) to provide the following tasks: @@ -26,7 +26,7 @@ //! //! Then, there is a maximum of `n` "worker" blocking threads, where `n` is the CPU count. //! -//! Whenever the manager receives a new parcel of work, it either: +//! Whenever the manager receives a new parcel of work, it is either: //! //! - Provided to a newly-spawned worker tasks (if we are not already at `n` workers). //! - Added to a queue. @@ -37,7 +37,10 @@ use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; -use eth2_libp2p::{MessageId, NetworkGlobals, PeerId}; +use eth2_libp2p::{ + rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage}, + MessageId, NetworkGlobals, PeerId, PeerRequestId, +}; use slog::{crit, debug, error, trace, warn, Logger}; use std::collections::VecDeque; use std::sync::{Arc, Weak}; @@ -48,12 +51,12 @@ use types::{ Attestation, AttesterSlashing, EthSpec, Hash256, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, SubnetId, }; + use worker::Worker; -mod chain_segment; mod worker; -pub use chain_segment::ProcessId; +pub use worker::ProcessId; /// The maximum size of the channel for work events to the `BeaconProcessor`. /// @@ -98,10 +101,22 @@ const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024; /// be stored before we start dropping them. const MAX_CHAIN_SEGMENT_QUEUE_LEN: usize = 64; +/// The maximum number of queued `StatusMessage` objects received from the network RPC that will be +/// stored before we start dropping them. +const MAX_STATUS_QUEUE_LEN: usize = 1_024; + +/// The maximum number of queued `BlocksByRangeRequest` objects received from the network RPC that +/// will be stored before we start dropping them. +const MAX_BLOCKS_BY_RANGE_QUEUE_LEN: usize = 1_024; + +/// The maximum number of queued `BlocksByRootRequest` objects received from the network RPC that +/// will be stored before we start dropping them. +const MAX_BLOCKS_BY_ROOTS_QUEUE_LEN: usize = 1_024; + /// The name of the manager tokio task. -const MANAGER_TASK_NAME: &str = "beacon_gossip_processor_manager"; +const MANAGER_TASK_NAME: &str = "beacon_processor_manager"; /// The name of the worker tokio tasks. -const WORKER_TASK_NAME: &str = "beacon_gossip_processor_worker"; +const WORKER_TASK_NAME: &str = "beacon_processor_worker"; /// The minimum interval between log messages indicating that a queue is full. const LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30); @@ -132,7 +147,7 @@ impl FifoQueue { if self.queue.len() == self.max_length { error!( log, - "Block queue full"; + "Work queue is full"; "msg" => "the system has insufficient resources for load", "queue_len" => self.max_length, "queue" => item_desc, @@ -320,6 +335,51 @@ impl WorkEvent { work: Work::ChainSegment { process_id, blocks }, } } + + /// Create a new work event to process `StatusMessage`s from the RPC network. + pub fn status_message(peer_id: PeerId, message: StatusMessage) -> Self { + Self { + drop_during_sync: false, + work: Work::Status { peer_id, message }, + } + } + + /// Create a new work event to process `BlocksByRangeRequest`s from the RPC network. + pub fn blocks_by_range_request( + peer_id: PeerId, + request_id: PeerRequestId, + request: BlocksByRangeRequest, + ) -> Self { + Self { + drop_during_sync: false, + work: Work::BlocksByRangeRequest { + peer_id, + request_id, + request, + }, + } + } + + /// Create a new work event to process `BlocksByRootRequest`s from the RPC network. + pub fn blocks_by_roots_request( + peer_id: PeerId, + request_id: PeerRequestId, + request: BlocksByRootRequest, + ) -> Self { + Self { + drop_during_sync: false, + work: Work::BlocksByRootsRequest { + peer_id, + request_id, + request, + }, + } + } + + /// Get a `str` representation of the type of work this `WorkEvent` contains. + pub fn work_type(&self) -> &'static str { + self.work.str_id() + } } /// A consensus message (or multiple) from the network that requires processing. @@ -365,6 +425,20 @@ pub enum Work { process_id: ProcessId, blocks: Vec>, }, + Status { + peer_id: PeerId, + message: StatusMessage, + }, + BlocksByRangeRequest { + peer_id: PeerId, + request_id: PeerRequestId, + request: BlocksByRangeRequest, + }, + BlocksByRootsRequest { + peer_id: PeerId, + request_id: PeerRequestId, + request: BlocksByRootRequest, + }, } impl Work { @@ -379,6 +453,9 @@ impl Work { Work::GossipAttesterSlashing { .. } => "gossip_attester_slashing", Work::RpcBlock { .. } => "rpc_block", Work::ChainSegment { .. } => "chain_segment", + Work::Status { .. } => "status_processing", + Work::BlocksByRangeRequest { .. } => "blocks_by_range_request", + Work::BlocksByRootsRequest { .. } => "blocks_by_roots_request", } } } @@ -453,6 +530,10 @@ impl BeaconProcessor { let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN); + let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN); + let mut bbrange_queue = FifoQueue::new(MAX_BLOCKS_BY_RANGE_QUEUE_LEN); + let mut bbroots_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN); + let executor = self.executor.clone(); // The manager future will run on the core executor and delegate tasks to worker @@ -534,14 +615,22 @@ impl BeaconProcessor { // required to verify some attestations. } else if let Some(item) = gossip_block_queue.pop() { self.spawn_worker(idle_tx.clone(), item); - // Check the aggregates, *then* the unaggregates - // since we assume that aggregates are more valuable to local validators - // and effectively give us more information with less signature - // verification time. + // Check the aggregates, *then* the unaggregates since we assume that + // aggregates are more valuable to local validators and effectively give us + // more information with less signature verification time. } else if let Some(item) = aggregate_queue.pop() { self.spawn_worker(idle_tx.clone(), item); } else if let Some(item) = attestation_queue.pop() { self.spawn_worker(idle_tx.clone(), item); + // Check RPC methods next. Status messages are needed for sync so + // prioritize them over syncing requests from other peers (BlocksByRange + // and BlocksByRoot) + } else if let Some(item) = status_queue.pop() { + self.spawn_worker(idle_tx.clone(), item); + } else if let Some(item) = bbrange_queue.pop() { + self.spawn_worker(idle_tx.clone(), item); + } else if let Some(item) = bbroots_queue.pop() { + self.spawn_worker(idle_tx.clone(), item); // Check slashings after all other consensus messages so we prioritize // following head. // @@ -606,6 +695,13 @@ impl BeaconProcessor { Work::ChainSegment { .. } => { chain_segment_queue.push(work, work_id, &self.log) } + Work::Status { .. } => status_queue.push(work, work_id, &self.log), + Work::BlocksByRangeRequest { .. } => { + bbrange_queue.push(work, work_id, &self.log) + } + Work::BlocksByRootsRequest { .. } => { + bbroots_queue.push(work, work_id, &self.log) + } } } } @@ -804,6 +900,26 @@ impl BeaconProcessor { Work::ChainSegment { process_id, blocks } => { worker.process_chain_segment(process_id, blocks) } + /* + * Processing of Status Messages. + */ + Work::Status { peer_id, message } => worker.process_status(peer_id, message), + /* + * Processing of range syncing requests from other peers. + */ + Work::BlocksByRangeRequest { + peer_id, + request_id, + request, + } => worker.handle_blocks_by_range_request(peer_id, request_id, request), + /* + * Processing of blocks by roots requests from other peers. + */ + Work::BlocksByRootsRequest { + peer_id, + request_id, + request, + } => worker.handle_blocks_by_root_request(peer_id, request_id, request), }; trace!( diff --git a/beacon_node/network/src/beacon_processor/worker.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs similarity index 85% rename from beacon_node/network/src/beacon_processor/worker.rs rename to beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 9d54975142..d16722782d 100644 --- a/beacon_node/network/src/beacon_processor/worker.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -1,31 +1,47 @@ -use super::{ - chain_segment::{handle_chain_segment, ProcessId}, - BlockResultSender, -}; use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; use beacon_chain::{ attestation_verification::Error as AttnError, observed_operations::ObservationOutcome, - BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, + BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, }; use eth2_libp2p::{MessageAcceptance, MessageId, PeerAction, PeerId}; -use slog::{crit, debug, error, info, trace, warn, Logger}; +use slog::{debug, error, info, trace, warn}; use ssz::Encode; -use std::sync::Arc; -use tokio::sync::mpsc; use types::{ Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, SubnetId, }; -/// Contains the context necessary to import blocks, attestations, etc to the beacon chain. -pub struct Worker { - pub chain: Arc>, - pub network_tx: mpsc::UnboundedSender>, - pub sync_tx: mpsc::UnboundedSender>, - pub log: Logger, -} +use super::Worker; impl Worker { + /* Auxiliary functions */ + + /// Penalizes a peer for misbehaviour. + fn penalize_peer(&self, peer_id: PeerId, action: PeerAction) { + self.send_network_message(NetworkMessage::ReportPeer { peer_id, action }) + } + + /// Send a message on `message_tx` that the `message_id` sent by `peer_id` should be propagated on + /// the gossip network. + /// + /// Creates a log if there is an internal error. + /// Propagates the result of the validation for the given message to the network. If the result + /// is valid the message gets forwarded to other peers. + fn propagate_validation_result( + &self, + message_id: MessageId, + propagation_source: PeerId, + validation_result: MessageAcceptance, + ) { + self.send_network_message(NetworkMessage::ValidationResult { + propagation_source, + message_id, + validation_result, + }) + } + + /* Processing functions */ + /// Process the unaggregated attestation received from the gossip network and: /// /// - If it passes gossip propagation criteria, tell the network thread to forward it. @@ -76,17 +92,17 @@ impl Worker { debug!( self.log, "Attestation invalid for fork choice"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root ) } e => error!( self.log, "Error applying attestation to fork choice"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root ), } } @@ -95,9 +111,9 @@ impl Worker { debug!( self.log, "Attestation invalid for agg pool"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root ) } @@ -149,17 +165,17 @@ impl Worker { debug!( self.log, "Aggregate invalid for fork choice"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root ) } e => error!( self.log, "Error applying aggregate to fork choice"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root ), } } @@ -168,9 +184,9 @@ impl Worker { debug!( self.log, "Attestation invalid for op pool"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) + "reason" => ?e, + "peer" => %peer_id, + "beacon_block_root" => ?beacon_block_root ) } @@ -255,7 +271,7 @@ impl Worker { trace!( self.log, "Gossipsub block processed"; - "peer_id" => peer_id.to_string() + "peer_id" => %peer_id ); // The `MessageHandler` would be the place to put this, however it doesn't seem @@ -270,7 +286,7 @@ impl Worker { Err(e) => error!( self.log, "Fork choice failed"; - "error" => format!("{:?}", e), + "error" => ?e, "location" => "block gossip" ), } @@ -281,7 +297,7 @@ impl Worker { error!( self.log, "Block with unknown parent attempted to be processed"; - "peer_id" => peer_id.to_string() + "peer_id" => %peer_id ); self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block)); } @@ -323,7 +339,7 @@ impl Worker { self.log, "Dropping exit for already exiting validator"; "validator_index" => validator_index, - "peer" => peer_id.to_string() + "peer" => %peer_id ); return; } @@ -332,8 +348,8 @@ impl Worker { self.log, "Dropping invalid exit"; "validator_index" => validator_index, - "peer" => peer_id.to_string(), - "error" => format!("{:?}", e) + "peer" => %peer_id, + "error" => ?e ); // These errors occur due to a fault in the beacon chain. It is not necessarily // the fault on the peer. @@ -377,7 +393,7 @@ impl Worker { "Dropping proposer slashing"; "reason" => "Already seen a proposer slashing for that validator", "validator_index" => validator_index, - "peer" => peer_id.to_string() + "peer" => %peer_id ); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); return; @@ -389,8 +405,8 @@ impl Worker { self.log, "Dropping invalid proposer slashing"; "validator_index" => validator_index, - "peer" => peer_id.to_string(), - "error" => format!("{:?}", e) + "peer" => %peer_id, + "error" => ?e ); self.propagate_validation_result( message_id, @@ -430,7 +446,7 @@ impl Worker { self.log, "Dropping attester slashing"; "reason" => "Slashings already known for all slashed validators", - "peer" => peer_id.to_string() + "peer" => %peer_id ); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); return; @@ -439,8 +455,8 @@ impl Worker { debug!( self.log, "Dropping invalid attester slashing"; - "peer" => peer_id.to_string(), - "error" => format!("{:?}", e) + "peer" => %peer_id, + "error" => ?e ); self.propagate_validation_result( message_id, @@ -458,7 +474,7 @@ impl Worker { self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); if let Err(e) = self.chain.import_attester_slashing(slashing) { - debug!(self.log, "Error importing attester slashing"; "error" => format!("{:?}", e)); + debug!(self.log, "Error importing attester slashing"; "error" => ?e); metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_ERROR_TOTAL); } else { debug!(self.log, "Successfully imported attester slashing"); @@ -466,82 +482,6 @@ impl Worker { } } - /// Attempt to process a block received from a direct RPC request, returning the processing - /// result on the `result_tx` channel. - /// - /// Raises a log if there are errors publishing the result to the channel. - pub fn process_rpc_block( - self, - block: SignedBeaconBlock, - result_tx: BlockResultSender, - ) { - let block_result = self.chain.process_block(block); - - metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL); - - if result_tx.send(block_result).is_err() { - crit!(self.log, "Failed return sync block result"); - } - } - - /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync - /// thread if more blocks are needed to process it. - pub fn process_chain_segment( - self, - process_id: ProcessId, - blocks: Vec>, - ) { - handle_chain_segment(self.chain, process_id, blocks, self.sync_tx, self.log) - } - - /// Send a message on `message_tx` that the `message_id` sent by `peer_id` should be propagated on - /// the gossip network. - /// - /// Creates a log if there is an interal error. - /// Propagates the result of the validation fot the given message to the network. If the result - /// is valid the message gets forwarded to other peers. - fn propagate_validation_result( - &self, - message_id: MessageId, - propagation_source: PeerId, - validation_result: MessageAcceptance, - ) { - self.network_tx - .send(NetworkMessage::ValidationResult { - propagation_source, - message_id, - validation_result, - }) - .unwrap_or_else(|_| { - warn!( - self.log, - "Could not send propagation request to the network service" - ) - }); - } - - /// Penalizes a peer for misbehaviour. - fn penalize_peer(&self, peer_id: PeerId, action: PeerAction) { - self.network_tx - .send(NetworkMessage::ReportPeer { peer_id, action }) - .unwrap_or_else(|_| { - warn!( - self.log, - "Could not send peer action to the network service" - ) - }); - } - - /// Send a message to `sync_tx`. - /// - /// Creates a log if there is an interal error. - fn send_sync_message(&self, message: SyncMessage) { - self.sync_tx.send(message).unwrap_or_else(|e| { - error!(self.log, "Could not send message to the sync service"; - "error" => %e) - }); - } - /// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the /// network. pub fn handle_attestation_verification_failure( @@ -567,9 +507,9 @@ impl Worker { trace!( self.log, "Attestation is not within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots"; - "peer_id" => peer_id.to_string(), - "block" => format!("{}", beacon_block_root), - "type" => format!("{:?}", attestation_type), + "peer_id" => %peer_id, + "block" => %beacon_block_root, + "type" => ?attestation_type, ); self.propagate_validation_result( message_id, @@ -657,9 +597,9 @@ impl Worker { trace!( self.log, "Attestation already known"; - "peer_id" => peer_id.to_string(), - "block" => format!("{}", beacon_block_root), - "type" => format!("{:?}", attestation_type), + "peer_id" => %peer_id, + "block" => %beacon_block_root, + "type" => ?attestation_type, ); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); return; @@ -674,9 +614,9 @@ impl Worker { trace!( self.log, "Aggregator already known"; - "peer_id" => peer_id.to_string(), - "block" => format!("{}", beacon_block_root), - "type" => format!("{:?}", attestation_type), + "peer_id" => %peer_id, + "block" => %beacon_block_root, + "type" => ?attestation_type, ); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); return; @@ -690,9 +630,9 @@ impl Worker { trace!( self.log, "Prior attestation known"; - "peer_id" => peer_id.to_string(), - "block" => format!("{}", beacon_block_root), - "type" => format!("{:?}", attestation_type), + "peer_id" => %peer_id, + "block" => %beacon_block_root, + "type" => ?attestation_type, ); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); return; @@ -721,8 +661,8 @@ impl Worker { debug!( self.log, "Attestation for unknown block"; - "peer_id" => peer_id.to_string(), - "block" => format!("{}", beacon_block_root) + "peer_id" => %peer_id, + "block" => %beacon_block_root ); // we don't know the block, get the sync manager to handle the block lookup self.sync_tx @@ -909,8 +849,8 @@ impl Worker { error!( self.log, "Unable to validate aggregate"; - "peer_id" => peer_id.to_string(), - "error" => format!("{:?}", e), + "peer_id" => %peer_id, + "error" => ?e, ); self.propagate_validation_result( message_id, @@ -925,10 +865,10 @@ impl Worker { debug!( self.log, "Invalid attestation from network"; - "reason" => format!("{:?}", error), - "block" => format!("{}", beacon_block_root), - "peer_id" => peer_id.to_string(), - "type" => format!("{:?}", attestation_type), + "reason" => ?error, + "block" => %beacon_block_root, + "peer_id" => %peer_id, + "type" => ?attestation_type, ); } } diff --git a/beacon_node/network/src/beacon_processor/worker/mod.rs b/beacon_node/network/src/beacon_processor/worker/mod.rs new file mode 100644 index 0000000000..40a863303a --- /dev/null +++ b/beacon_node/network/src/beacon_processor/worker/mod.rs @@ -0,0 +1,43 @@ +use crate::{service::NetworkMessage, sync::SyncMessage}; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use slog::{error, Logger}; +use std::sync::Arc; +use tokio::sync::mpsc; + +mod gossip_methods; +mod rpc_methods; +mod sync_methods; + +pub use sync_methods::ProcessId; + +pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; + +/// Contains the context necessary to import blocks, attestations, etc to the beacon chain. +pub struct Worker { + pub chain: Arc>, + pub network_tx: mpsc::UnboundedSender>, + pub sync_tx: mpsc::UnboundedSender>, + pub log: Logger, +} + +impl Worker { + /// Send a message to `sync_tx`. + /// + /// Creates a log if there is an internal error. + fn send_sync_message(&self, message: SyncMessage) { + self.sync_tx.send(message).unwrap_or_else(|e| { + error!(self.log, "Could not send message to the sync service"; + "error" => %e) + }); + } + + /// Send a message to `network_tx`. + /// + /// Creates a log if there is an internal error. + fn send_network_message(&self, message: NetworkMessage) { + self.network_tx.send(message).unwrap_or_else(|e| { + error!(self.log, "Could not send message to the network service"; + "error" => %e) + }); + } +} diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs new file mode 100644 index 0000000000..42796ee2ad --- /dev/null +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -0,0 +1,251 @@ +use crate::beacon_processor::worker::FUTURE_SLOT_TOLERANCE; +use crate::service::NetworkMessage; +use crate::status::ToStatusMessage; +use crate::sync::SyncMessage; +use beacon_chain::{BeaconChainError, BeaconChainTypes}; +use eth2_libp2p::rpc::StatusMessage; +use eth2_libp2p::rpc::*; +use eth2_libp2p::{PeerId, PeerRequestId, Response, SyncInfo}; +use itertools::process_results; +use slog::{debug, error, warn}; +use slot_clock::SlotClock; +use types::{Epoch, EthSpec, Hash256, Slot}; + +use super::Worker; + +impl Worker { + /* Auxiliary functions */ + + /// Disconnects and ban's a peer, sending a Goodbye request with the associated reason. + pub fn goodbye_peer(&self, peer_id: PeerId, reason: GoodbyeReason) { + self.send_network_message(NetworkMessage::GoodbyePeer { peer_id, reason }); + } + + pub fn send_response( + &self, + peer_id: PeerId, + response: Response, + id: PeerRequestId, + ) { + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + id, + response, + }) + } + + /* Processing functions */ + + /// Process a `Status` message to determine if a peer is relevant to us. If the peer is + /// irrelevant the reason is returned. + fn check_peer_relevance( + &self, + remote: &StatusMessage, + ) -> Result, BeaconChainError> { + let local = self.chain.status_message()?; + let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()); + + let irrelevant_reason = if local.fork_digest != remote.fork_digest { + // The node is on a different network/fork + Some(format!( + "Incompatible forks Ours:{} Theirs:{}", + hex::encode(local.fork_digest), + hex::encode(remote.fork_digest) + )) + } else if remote.head_slot + > self + .chain + .slot() + .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()) + + FUTURE_SLOT_TOLERANCE + { + // The remote's head is on a slot that is significantly ahead of what we consider the + // current slot. This could be because they are using a different genesis time, or that + // their or our system's clock is incorrect. + Some("Different system clocks or genesis time".to_string()) + } else if remote.finalized_epoch <= local.finalized_epoch + && remote.finalized_root != Hash256::zero() + && local.finalized_root != Hash256::zero() + && self + .chain + .root_at_slot(start_slot(remote.finalized_epoch)) + .map(|root_opt| root_opt != Some(remote.finalized_root))? + { + // The remote's finalized epoch is less than or equal to ours, but the block root is + // different to the one in our chain. Therefore, the node is on a different chain and we + // should not communicate with them. + Some("Different finalized chain".to_string()) + } else { + None + }; + + Ok(irrelevant_reason) + } + + pub fn process_status(&self, peer_id: PeerId, status: StatusMessage) { + match self.check_peer_relevance(&status) { + Ok(Some(irrelevant_reason)) => { + debug!(self.log, "Handshake Failure"; "peer" => %peer_id, "reason" => irrelevant_reason); + self.goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork); + } + Ok(None) => { + let info = SyncInfo { + head_slot: status.head_slot, + head_root: status.head_root, + finalized_epoch: status.finalized_epoch, + finalized_root: status.finalized_root, + }; + self.send_sync_message(SyncMessage::AddPeer(peer_id, info)); + } + Err(e) => error!(self.log, "Could not process status message"; "error" => ?e), + } + } + + /// Handle a `BlocksByRoot` request from the peer. + pub fn handle_blocks_by_root_request( + &self, + peer_id: PeerId, + request_id: PeerRequestId, + request: BlocksByRootRequest, + ) { + let mut send_block_count = 0; + for root in request.block_roots.iter() { + if let Ok(Some(block)) = self.chain.store.get_block(root) { + self.send_response( + peer_id.clone(), + Response::BlocksByRoot(Some(Box::new(block))), + request_id, + ); + send_block_count += 1; + } else { + debug!(self.log, "Peer requested unknown block"; + "peer" => %peer_id, + "request_root" => ?root); + } + } + debug!(self.log, "Received BlocksByRoot Request"; + "peer" => %peer_id, + "requested" => request.block_roots.len(), + "returned" => send_block_count); + + // send stream termination + self.send_response(peer_id, Response::BlocksByRoot(None), request_id); + } + + /// Handle a `BlocksByRange` request from the peer. + pub fn handle_blocks_by_range_request( + &self, + peer_id: PeerId, + request_id: PeerRequestId, + mut req: BlocksByRangeRequest, + ) { + debug!(self.log, "Received BlocksByRange Request"; + "peer_id" => %peer_id, + "count" => req.count, + "start_slot" => req.start_slot, + "step" => req.step); + + // Should not send more than max request blocks + if req.count > MAX_REQUEST_BLOCKS { + req.count = MAX_REQUEST_BLOCKS; + } + if req.step == 0 { + self.goodbye_peer(peer_id, GoodbyeReason::Fault); + return warn!(self.log, "Peer sent invalid range request"; "error" => "Step sent was 0"); + } + + let forwards_block_root_iter = match self + .chain + .forwards_iter_block_roots(Slot::from(req.start_slot)) + { + Ok(iter) => iter, + Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e), + }; + + // Pick out the required blocks, ignoring skip-slots and stepping by the step parameter. + // + // NOTE: We don't mind if req.count * req.step overflows as it just ends the iterator early and + // the peer will get less blocks. + // The step parameter is quadratically weighted in the filter, so large values should be + // prevented before reaching this point. + let mut last_block_root = None; + let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { + iter.take_while(|(_, slot)| { + slot.as_u64() < req.start_slot.saturating_add(req.count * req.step) + }) + // map skip slots to None + .map(|(root, _)| { + let result = if Some(root) == last_block_root { + None + } else { + Some(root) + }; + last_block_root = Some(root); + result + }) + .step_by(req.step as usize) + .collect::>>() + }); + + let block_roots = match maybe_block_roots { + Ok(block_roots) => block_roots, + Err(e) => return error!(self.log, "Error during iteration over blocks"; "error" => ?e), + }; + + // remove all skip slots + let block_roots = block_roots + .into_iter() + .filter_map(|root| root) + .collect::>(); + + let mut blocks_sent = 0; + for root in block_roots { + if let Ok(Some(block)) = self.chain.store.get_block(&root) { + // Due to skip slots, blocks could be out of the range, we ensure they are in the + // range before sending + if block.slot() >= req.start_slot + && block.slot() < req.start_slot + req.count * req.step + { + blocks_sent += 1; + self.send_network_message(NetworkMessage::SendResponse { + peer_id: peer_id.clone(), + response: Response::BlocksByRange(Some(Box::new(block))), + id: request_id, + }); + } + } else { + error!(self.log, "Block in the chain is not in the store"; + "request_root" => ?root); + } + } + + let current_slot = self + .chain + .slot() + .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); + + if blocks_sent < (req.count as usize) { + debug!(self.log, "BlocksByRange Response sent"; + "peer" => %peer_id, + "msg" => "Failed to return all requested blocks", + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blocks_sent); + } else { + debug!(self.log, "BlocksByRange Response sent"; + "peer" => %peer_id, + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blocks_sent); + } + + // send the stream terminator + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + response: Response::BlocksByRange(None), + id: request_id, + }); + } +} diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs new file mode 100644 index 0000000000..4eb8be09a5 --- /dev/null +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -0,0 +1,229 @@ +use super::Worker; +use crate::beacon_processor::worker::FUTURE_SLOT_TOLERANCE; +use crate::beacon_processor::BlockResultSender; +use crate::metrics; +use crate::sync::manager::SyncMessage; +use crate::sync::{BatchProcessResult, ChainId}; +use beacon_chain::{BeaconChainTypes, BlockError, ChainSegmentResult}; +use eth2_libp2p::PeerId; +use slog::{crit, debug, error, trace, warn}; +use types::{Epoch, Hash256, SignedBeaconBlock}; + +/// Id associated to a block processing request, either a batch or a single block. +#[derive(Clone, Debug, PartialEq)] +pub enum ProcessId { + /// Processing Id of a range syncing batch. + RangeBatchId(ChainId, Epoch), + /// Processing Id of the parent lookup of a block. + ParentLookup(PeerId, Hash256), +} + +impl Worker { + /// Attempt to process a block received from a direct RPC request, returning the processing + /// result on the `result_tx` channel. + /// + /// Raises a log if there are errors publishing the result to the channel. + pub fn process_rpc_block( + self, + block: SignedBeaconBlock, + result_tx: BlockResultSender, + ) { + let block_result = self.chain.process_block(block); + + metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL); + + if result_tx.send(block_result).is_err() { + crit!(self.log, "Failed return sync block result"); + } + } + + /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync + /// thread if more blocks are needed to process it. + pub fn process_chain_segment( + &self, + process_id: ProcessId, + downloaded_blocks: Vec>, + ) { + match process_id { + // this a request from the range sync + ProcessId::RangeBatchId(chain_id, epoch) => { + let start_slot = downloaded_blocks.first().map(|b| b.message.slot.as_u64()); + let end_slot = downloaded_blocks.last().map(|b| b.message.slot.as_u64()); + let sent_blocks = downloaded_blocks.len(); + + let result = match self.process_blocks(downloaded_blocks.iter()) { + (_, Ok(_)) => { + debug!(self.log, "Batch processed"; + "batch_epoch" => epoch, + "first_block_slot" => start_slot, + "chain" => chain_id, + "last_block_slot" => end_slot, + "processed_blocks" => sent_blocks, + "service"=> "sync"); + BatchProcessResult::Success(sent_blocks > 0) + } + (imported_blocks, Err(e)) => { + debug!(self.log, "Batch processing failed"; + "batch_epoch" => epoch, + "first_block_slot" => start_slot, + "chain" => chain_id, + "last_block_slot" => end_slot, + "imported_blocks" => imported_blocks, + "error" => e, + "service" => "sync"); + BatchProcessResult::Failed(imported_blocks > 0) + } + }; + + self.send_sync_message(SyncMessage::BatchProcessed { + chain_id, + epoch, + result, + }); + } + // this is a parent lookup request from the sync manager + ProcessId::ParentLookup(peer_id, chain_head) => { + debug!( + self.log, "Processing parent lookup"; + "last_peer_id" => %peer_id, + "blocks" => downloaded_blocks.len() + ); + // parent blocks are ordered from highest slot to lowest, so we need to process in + // reverse + match self.process_blocks(downloaded_blocks.iter().rev()) { + (_, Err(e)) => { + debug!(self.log, "Parent lookup failed"; "last_peer_id" => %peer_id, "error" => e); + self.send_sync_message(SyncMessage::ParentLookupFailed { + peer_id, + chain_head, + }) + } + (_, Ok(_)) => { + debug!(self.log, "Parent lookup processed successfully"); + } + } + } + } + } + + /// Helper function to process blocks batches which only consumes the chain and blocks to process. + fn process_blocks<'a>( + &self, + downloaded_blocks: impl Iterator>, + ) -> (usize, Result<(), String>) { + let blocks = downloaded_blocks.cloned().collect::>(); + match self.chain.process_chain_segment(blocks) { + ChainSegmentResult::Successful { imported_blocks } => { + metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL); + if imported_blocks > 0 { + // Batch completed successfully with at least one block, run fork choice. + self.run_fork_choice(); + } + + (imported_blocks, Ok(())) + } + ChainSegmentResult::Failed { + imported_blocks, + error, + } => { + metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_FAILED_TOTAL); + let r = self.handle_failed_chain_segment(error); + if imported_blocks > 0 { + self.run_fork_choice(); + } + (imported_blocks, r) + } + } + } + + /// Runs fork-choice on a given chain. This is used during block processing after one successful + /// block import. + fn run_fork_choice(&self) { + match self.chain.fork_choice() { + Ok(()) => trace!( + self.log, + "Fork choice success"; + "location" => "batch processing" + ), + Err(e) => error!( + self.log, + "Fork choice failed"; + "error" => ?e, + "location" => "batch import error" + ), + } + } + + /// Helper function to handle a `BlockError` from `process_chain_segment` + fn handle_failed_chain_segment(&self, error: BlockError) -> Result<(), String> { + match error { + BlockError::ParentUnknown(block) => { + // blocks should be sequential and all parents should exist + + Err(format!( + "Block has an unknown parent: {}", + block.parent_root() + )) + } + BlockError::BlockIsAlreadyKnown => { + // This can happen for many reasons. Head sync's can download multiples and parent + // lookups can download blocks before range sync + Ok(()) + } + BlockError::FutureSlot { + present_slot, + block_slot, + } => { + if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot { + // The block is too far in the future, drop it. + warn!( + self.log, "Block is ahead of our slot clock"; + "msg" => "block for future slot rejected, check your time", + "present_slot" => present_slot, + "block_slot" => block_slot, + "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, + ); + } else { + // The block is in the future, but not too far. + debug!( + self.log, "Block is slightly ahead of our slot clock, ignoring."; + "present_slot" => present_slot, + "block_slot" => block_slot, + "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, + ); + } + + Err(format!( + "Block with slot {} is higher than the current slot {}", + block_slot, present_slot + )) + } + BlockError::WouldRevertFinalizedSlot { .. } => { + debug!(self.log, "Finalized or earlier block processed";); + Ok(()) + } + BlockError::GenesisBlock => { + debug!(self.log, "Genesis block was processed"); + Ok(()) + } + BlockError::BeaconChainError(e) => { + warn!( + self.log, "BlockProcessingFailure"; + "msg" => "unexpected condition in processing block.", + "outcome" => ?e, + ); + + Err(format!("Internal error whilst processing block: {:?}", e)) + } + other => { + debug!( + self.log, "Invalid block received"; + "msg" => "peer sent invalid block", + "outcome" => %other, + ); + + Err(format!("Peer sent invalid block. Reason: {:?}", other)) + } + } + } +} diff --git a/beacon_node/network/src/lib.rs b/beacon_node/network/src/lib.rs index 5ac74e2edd..f6aa777ab9 100644 --- a/beacon_node/network/src/lib.rs +++ b/beacon_node/network/src/lib.rs @@ -11,6 +11,7 @@ mod metrics; mod nat; mod persisted_dht; mod router; +mod status; mod sync; pub use eth2_libp2p::NetworkConfig; diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index d26d779974..68d5303194 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -5,7 +5,7 @@ //! syncing-related responses to the Sync manager. #![allow(clippy::unit_arg)] -pub mod processor; +mod processor; use crate::error; use crate::service::NetworkMessage; diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 4bc97d041c..54d66c342b 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -6,23 +6,17 @@ use crate::sync::SyncMessage; use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; use eth2_libp2p::rpc::*; use eth2_libp2p::{ - MessageId, NetworkGlobals, PeerAction, PeerId, PeerRequestId, Request, Response, SyncInfo, + MessageId, NetworkGlobals, PeerAction, PeerId, PeerRequestId, Request, Response, }; -use itertools::process_results; use slog::{debug, error, o, trace, warn}; -use slot_clock::SlotClock; use std::cmp; use std::sync::Arc; use tokio::sync::mpsc; use types::{ - Attestation, AttesterSlashing, ChainSpec, Epoch, EthSpec, Hash256, ProposerSlashing, - SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, Slot, SubnetId, + Attestation, AttesterSlashing, ChainSpec, EthSpec, ProposerSlashing, SignedAggregateAndProof, + SignedBeaconBlock, SignedVoluntaryExit, SubnetId, }; -/// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it. -/// Otherwise we queue it. -pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; - /// Processes validated messages from the network. It relays necessary data to the syncing thread /// and processes blocks from the pubsub network. pub struct Processor { @@ -34,8 +28,6 @@ pub struct Processor { network: HandlerNetworkContext, /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. beacon_processor_send: mpsc::Sender>, - /// The current task executor. - executor: task_executor::TaskExecutor, /// The `RPCHandler` logger. log: slog::Logger, } @@ -68,7 +60,7 @@ impl Processor { network_tx: network_send.clone(), sync_tx: sync_send.clone(), network_globals, - executor: executor.clone(), + executor, max_workers: cmp::max(1, num_cpus::get()), current_workers: 0, log: log.clone(), @@ -80,7 +72,6 @@ impl Processor { sync_send, network: HandlerNetworkContext::new(network_send, log.clone()), beacon_processor_send, - executor, log: log.new(o!("service" => "router")), } } @@ -144,127 +135,25 @@ impl Processor { ); } - if let Err(e) = self.process_status(peer_id, status) { - error!(self.log, "Could not process status message"; "error" => format!("{:?}", e)); - } + self.send_beacon_processor_work(BeaconWorkEvent::status_message(peer_id, status)) } /// Process a `Status` response from a peer. pub fn on_status_response(&mut self, peer_id: PeerId, status: StatusMessage) { debug!(self.log, "Received Status Response"; "peer_id" => %peer_id, &status); - - // Process the status message, without sending back another status. - if let Err(e) = self.process_status(peer_id, status) { - error!(self.log, "Could not process status message"; "error" => format!("{:?}", e)); - } - } - - /// Process a `Status` message to determine if a peer is relevant to us. Irrelevant peers are - /// disconnected; relevant peers are sent to the SyncManager - fn process_status( - &mut self, - peer_id: PeerId, - remote: StatusMessage, - ) -> Result<(), BeaconChainError> { - let local = status_message(&self.chain)?; - let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()); - - let irrelevant_reason = if local.fork_digest != remote.fork_digest { - // The node is on a different network/fork - Some(format!( - "Incompatible forks Ours:{} Theirs:{}", - hex::encode(local.fork_digest), - hex::encode(remote.fork_digest) - )) - } else if remote.head_slot - > self - .chain - .slot() - .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()) - + FUTURE_SLOT_TOLERANCE - { - // The remote's head is on a slot that is significantly ahead of what we consider the - // current slot. This could be because they are using a different genesis time, or that - // their or our system's clock is incorrect. - Some("Different system clocks or genesis time".to_string()) - } else if remote.finalized_epoch <= local.finalized_epoch - && remote.finalized_root != Hash256::zero() - && local.finalized_root != Hash256::zero() - && self - .chain - .root_at_slot(start_slot(remote.finalized_epoch)) - .map(|root_opt| root_opt != Some(remote.finalized_root))? - { - // The remote's finalized epoch is less than or equal to ours, but the block root is - // different to the one in our chain. Therefore, the node is on a different chain and we - // should not communicate with them. - Some("Different finalized chain".to_string()) - } else { - None - }; - - if let Some(irrelevant_reason) = irrelevant_reason { - debug!(self.log, "Handshake Failure"; "peer" => %peer_id, "reason" => irrelevant_reason); - self.network - .goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork); - } else { - let info = SyncInfo { - head_slot: remote.head_slot, - head_root: remote.head_root, - finalized_epoch: remote.finalized_epoch, - finalized_root: remote.finalized_root, - }; - self.send_to_sync(SyncMessage::AddPeer(peer_id, info)); - } - - Ok(()) + self.send_beacon_processor_work(BeaconWorkEvent::status_message(peer_id, status)) } /// Handle a `BlocksByRoot` request from the peer. pub fn on_blocks_by_root_request( - &self, + &mut self, peer_id: PeerId, request_id: PeerRequestId, request: BlocksByRootRequest, ) { - let chain = self.chain.clone(); - let mut network = self.network.clone(); - let log = self.log.clone(); - - // Shift the db reads to a blocking thread. - self.executor.spawn_blocking( - move || { - let mut send_block_count = 0; - for root in request.block_roots.iter() { - if let Ok(Some(block)) = chain.store.get_block(root) { - network.send_response( - peer_id.clone(), - Response::BlocksByRoot(Some(Box::new(block))), - request_id, - ); - send_block_count += 1; - } else { - debug!( - log, - "Peer requested unknown block"; - "peer" => peer_id.to_string(), - "request_root" => format!("{:}", root), - ); - } - } - debug!( - log, - "Received BlocksByRoot Request"; - "peer" => peer_id.to_string(), - "requested" => request.block_roots.len(), - "returned" => send_block_count, - ); - - // send stream termination - network.send_response(peer_id, Response::BlocksByRoot(None), request_id); - }, - "blocks_by_root_request", - ); + self.send_beacon_processor_work(BeaconWorkEvent::blocks_by_roots_request( + peer_id, request_id, request, + )) } /// Handle a `BlocksByRange` request from the peer. @@ -272,144 +161,11 @@ impl Processor { &mut self, peer_id: PeerId, request_id: PeerRequestId, - mut req: BlocksByRangeRequest, + req: BlocksByRangeRequest, ) { - let chain = self.chain.clone(); - let mut network = self.network.clone(); - let log = self.log.clone(); - - // Shift the db reads to a blocking thread. - self.executor.spawn_blocking(move || { - - debug!( - log, - "Received BlocksByRange Request"; - "peer_id" => %peer_id, - "count" => req.count, - "start_slot" => req.start_slot, - "step" => req.step, - ); - - // Should not send more than max request blocks - if req.count > MAX_REQUEST_BLOCKS { - req.count = MAX_REQUEST_BLOCKS; - } - if req.step == 0 { - warn!(log, - "Peer sent invalid range request"; - "error" => "Step sent was 0"); - network.goodbye_peer(peer_id, GoodbyeReason::Fault); - return; - } - - let forwards_block_root_iter = match - chain - .forwards_iter_block_roots(Slot::from(req.start_slot)) - { - Ok(iter) => iter, - Err(e) => { - return error!( - log, - "Unable to obtain root iter"; - "error" => format!("{:?}", e) - ) - } - }; - - // Pick out the required blocks, ignoring skip-slots and stepping by the step parameter. - // - // NOTE: We don't mind if req.count * req.step overflows as it just ends the iterator early and - // the peer will get less blocks. - // The step parameter is quadratically weighted in the filter, so large values should be - // prevented before reaching this point. - let mut last_block_root = None; - let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { - iter.take_while(|(_, slot)| { - slot.as_u64() < req.start_slot.saturating_add(req.count * req.step) - }) - // map skip slots to None - .map(|(root, _)| { - let result = if Some(root) == last_block_root { - None - } else { - Some(root) - }; - last_block_root = Some(root); - result - }) - .step_by(req.step as usize) - .collect::>>() - }); - - let block_roots = match maybe_block_roots { - Ok(block_roots) => block_roots, - Err(e) => { - error!(log, "Error during iteration over blocks"; "error" => format!("{:?}", e)); - return; - } - }; - - // remove all skip slots - let block_roots = block_roots - .into_iter() - .filter_map(|root| root) - .collect::>(); - - let mut blocks_sent = 0; - for root in block_roots { - if let Ok(Some(block)) = chain.store.get_block(&root) { - // Due to skip slots, blocks could be out of the range, we ensure they are in the - // range before sending - if block.slot() >= req.start_slot - && block.slot() < req.start_slot + req.count * req.step - { - blocks_sent += 1; - network.send_response( - peer_id.clone(), - Response::BlocksByRange(Some(Box::new(block))), - request_id, - ); - } - } else { - error!( - log, - "Block in the chain is not in the store"; - "request_root" => format!("{:}", root), - ); - } - } - - let current_slot = - chain - .slot() - .unwrap_or_else(|_| chain.slot_clock.genesis_slot()); - - if blocks_sent < (req.count as usize) { - debug!( - log, - "BlocksByRange Response Sent"; - "peer" => peer_id.to_string(), - "msg" => "Failed to return all requested blocks", - "start_slot" => req.start_slot, - "current_slot" => current_slot, - "requested" => req.count, - "returned" => blocks_sent); - } else { - debug!( - log, - "Sending BlocksByRange Response"; - "peer" => peer_id.to_string(), - "start_slot" => req.start_slot, - "current_slot" => current_slot, - "requested" => req.count, - "returned" => blocks_sent); - } - - // send the stream terminator - network - .send_response(peer_id, Response::BlocksByRange(None), request_id); - - }, "blocks_by_range_request"); + self.send_beacon_processor_work(BeaconWorkEvent::blocks_by_range_request( + peer_id, request_id, req, + )) } /// Handle a `BlocksByRange` response from the peer. @@ -478,18 +234,9 @@ impl Processor { peer_id: PeerId, block: Box>, ) { - self.beacon_processor_send - .try_send(BeaconWorkEvent::gossip_beacon_block( - message_id, peer_id, block, - )) - .unwrap_or_else(|e| { - error!( - &self.log, - "Unable to send to gossip processor"; - "type" => "block gossip", - "error" => e.to_string(), - ) - }) + self.send_beacon_processor_work(BeaconWorkEvent::gossip_beacon_block( + message_id, peer_id, block, + )) } pub fn on_unaggregated_attestation_gossip( @@ -500,22 +247,13 @@ impl Processor { subnet_id: SubnetId, should_process: bool, ) { - self.beacon_processor_send - .try_send(BeaconWorkEvent::unaggregated_attestation( - message_id, - peer_id, - unaggregated_attestation, - subnet_id, - should_process, - )) - .unwrap_or_else(|e| { - error!( - &self.log, - "Unable to send to gossip processor"; - "type" => "unaggregated attestation gossip", - "error" => e.to_string(), - ) - }) + self.send_beacon_processor_work(BeaconWorkEvent::unaggregated_attestation( + message_id, + peer_id, + unaggregated_attestation, + subnet_id, + should_process, + )) } pub fn on_aggregated_attestation_gossip( @@ -524,18 +262,9 @@ impl Processor { peer_id: PeerId, aggregate: SignedAggregateAndProof, ) { - self.beacon_processor_send - .try_send(BeaconWorkEvent::aggregated_attestation( - message_id, peer_id, aggregate, - )) - .unwrap_or_else(|e| { - error!( - &self.log, - "Unable to send to gossip processor"; - "type" => "aggregated attestation gossip", - "error" => e.to_string(), - ) - }) + self.send_beacon_processor_work(BeaconWorkEvent::aggregated_attestation( + message_id, peer_id, aggregate, + )) } pub fn on_voluntary_exit_gossip( @@ -544,20 +273,11 @@ impl Processor { peer_id: PeerId, voluntary_exit: Box, ) { - self.beacon_processor_send - .try_send(BeaconWorkEvent::gossip_voluntary_exit( - message_id, - peer_id, - voluntary_exit, - )) - .unwrap_or_else(|e| { - error!( - &self.log, - "Unable to send to gossip processor"; - "type" => "voluntary exit gossip", - "error" => e.to_string(), - ) - }) + self.send_beacon_processor_work(BeaconWorkEvent::gossip_voluntary_exit( + message_id, + peer_id, + voluntary_exit, + )) } pub fn on_proposer_slashing_gossip( @@ -566,20 +286,11 @@ impl Processor { peer_id: PeerId, proposer_slashing: Box, ) { - self.beacon_processor_send - .try_send(BeaconWorkEvent::gossip_proposer_slashing( - message_id, - peer_id, - proposer_slashing, - )) - .unwrap_or_else(|e| { - error!( - &self.log, - "Unable to send to gossip processor"; - "type" => "proposer slashing gossip", - "error" => e.to_string(), - ) - }) + self.send_beacon_processor_work(BeaconWorkEvent::gossip_proposer_slashing( + message_id, + peer_id, + proposer_slashing, + )) } pub fn on_attester_slashing_gossip( @@ -588,19 +299,23 @@ impl Processor { peer_id: PeerId, attester_slashing: Box>, ) { + self.send_beacon_processor_work(BeaconWorkEvent::gossip_attester_slashing( + message_id, + peer_id, + attester_slashing, + )) + } + + fn send_beacon_processor_work(&mut self, work: BeaconWorkEvent) { self.beacon_processor_send - .try_send(BeaconWorkEvent::gossip_attester_slashing( - message_id, - peer_id, - attester_slashing, - )) + .try_send(work) .unwrap_or_else(|e| { - error!( - &self.log, - "Unable to send to gossip processor"; - "type" => "attester slashing gossip", - "error" => e.to_string(), - ) + let work_type = match &e { + mpsc::error::TrySendError::Closed(work) + | mpsc::error::TrySendError::Full(work) => work.work_type(), + }; + error!(&self.log, "Unable to send message to the beacon processor"; + "error" => %e, "type" => work_type) }) } } @@ -648,7 +363,7 @@ impl HandlerNetworkContext { } /// Disconnects and ban's a peer, sending a Goodbye request with the associated reason. - pub fn goodbye_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { + pub fn _goodbye_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { self.inform_network(NetworkMessage::GoodbyePeer { peer_id, reason }); } diff --git a/beacon_node/network/src/status.rs b/beacon_node/network/src/status.rs new file mode 100644 index 0000000000..41cc990edf --- /dev/null +++ b/beacon_node/network/src/status.rs @@ -0,0 +1,29 @@ +use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; +use types::ChainSpec; + +use eth2_libp2p::rpc::StatusMessage; +/// Trait to produce a `StatusMessage` representing the state of the given `beacon_chain`. +/// +/// NOTE: The purpose of this is simply to obtain a `StatusMessage` from the `BeaconChain` without +/// polluting/coupling the type with RPC concepts. +pub trait ToStatusMessage { + fn status_message(&self) -> Result; +} + +impl ToStatusMessage for BeaconChain { + fn status_message(&self) -> Result { + let head_info = self.head_info()?; + let genesis_validators_root = self.genesis_validators_root; + + let fork_digest = + ChainSpec::compute_fork_digest(head_info.fork.current_version, genesis_validators_root); + + Ok(StatusMessage { + fork_digest, + finalized_root: head_info.finalized_checkpoint.root, + finalized_epoch: head_info.finalized_checkpoint.epoch, + head_root: head_info.block_root, + head_slot: head_info.slot, + }) + } +} diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 5c2a01e2b0..8e558f8146 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -38,8 +38,8 @@ use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{ChainId, RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use super::RequestId; use crate::beacon_processor::{ProcessId, WorkEvent as BeaconWorkEvent}; -use crate::router::processor::status_message; use crate::service::NetworkMessage; +use crate::status::ToStatusMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; use eth2_libp2p::rpc::{methods::MAX_REQUEST_BLOCKS, BlocksByRootRequest, GoodbyeReason}; use eth2_libp2p::types::{NetworkGlobals, SyncState}; @@ -258,7 +258,7 @@ impl SyncManager { /// ours that we consider it fully sync'd with respect to our current chain. fn add_peer(&mut self, peer_id: PeerId, remote: SyncInfo) { // ensure the beacon chain still exists - let local = match status_message(&self.chain) { + let local = match self.chain.status_message() { Ok(status) => SyncInfo { head_slot: status.head_slot, head_root: status.head_root, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index aa000939db..2529bde65c 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -3,8 +3,8 @@ use super::range_sync::{BatchId, ChainId}; use super::RequestId as SyncRequestId; -use crate::router::processor::status_message; use crate::service::NetworkMessage; +use crate::status::ToStatusMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, RequestId}; use eth2_libp2p::{Client, NetworkGlobals, PeerAction, PeerId, Request}; @@ -63,7 +63,7 @@ impl SyncNetworkContext { chain: Arc>, peers: impl Iterator, ) { - if let Ok(status_message) = status_message(&chain) { + if let Ok(status_message) = &chain.status_message() { for peer_id in peers { debug!( self.log, diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 9075804f87..a89911d990 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -43,10 +43,9 @@ use super::chain::{ChainId, RemoveChain, SyncingChain}; use super::chain_collection::ChainCollection; use super::sync_type::RangeSyncType; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; -use crate::router::processor::status_message; +use crate::status::ToStatusMessage; use crate::sync::network_context::SyncNetworkContext; -use crate::sync::BatchProcessResult; -use crate::sync::RequestId; +use crate::sync::{BatchProcessResult, RequestId}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::PeerId; use eth2_libp2p::SyncInfo; @@ -341,7 +340,7 @@ impl RangeSync { network.status_peers(self.beacon_chain.clone(), chain.peers()); - let local = match status_message(&self.beacon_chain) { + let local = match self.beacon_chain.status_message() { Ok(status) => SyncInfo { head_slot: status.head_slot, head_root: status.head_root,