From 4d3f8c51e4c73cfe5f3c434e581600c1eb0841f0 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Wed, 19 Aug 2020 10:54:41 +1000 Subject: [PATCH] Refactor into the new Worker struct --- .../network/src/beacon_processor/mod.rs | 656 ++---------------- .../network/src/beacon_processor/worker.rs | 593 ++++++++++++++++ 2 files changed, 651 insertions(+), 598 deletions(-) create mode 100644 beacon_node/network/src/beacon_processor/worker.rs diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index ba8243cb3b..2d91b6fa82 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -36,22 +36,19 @@ //! task. use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; -use beacon_chain::{ - attestation_verification::Error as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, - BlockError, ForkChoiceError, -}; -use chain_segment::handle_chain_segment; +use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; use environment::TaskExecutor; use eth2_libp2p::{MessageId, NetworkGlobals, PeerId}; -use slog::{crit, debug, error, info, trace, warn, Logger}; -use ssz::Encode; +use slog::{crit, debug, error, trace, warn, Logger}; use std::collections::VecDeque; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; use tokio::sync::{mpsc, oneshot}; use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SubnetId}; +use worker::Worker; mod chain_segment; +mod worker; pub use chain_segment::ProcessId; @@ -365,7 +362,7 @@ impl BeaconProcessor { let executor = self.executor.clone(); - // The manager future will run on the non-blocking executor and delegate tasks to worker + // The manager future will run on the core executor and delegate tasks to worker // threads on the blocking executor. let manager_future = async move { loop { @@ -544,7 +541,7 @@ impl BeaconProcessor { } }; - // Spawn on the non-blocking executor. + // Spawn on the core executor. executor.spawn(manager_future, MANAGER_TASK_NAME); } @@ -574,11 +571,16 @@ impl BeaconProcessor { return; }; - let network_tx = self.network_tx.clone(); - let sync_tx = self.sync_tx.clone(); let log = self.log.clone(); let executor = self.executor.clone(); + let worker = Worker { + chain: chain.clone(), + network_tx: self.network_tx.clone(), + sync_tx: self.sync_tx.clone(), + log: self.log.clone(), + }; + trace!( self.log, "Spawning beacon processor worker"; @@ -589,298 +591,53 @@ impl BeaconProcessor { executor.spawn_blocking( move || { let _worker_timer = worker_timer; - let inner_log = log.clone(); - // We use this closure pattern to avoid using a `return` that prevents the - // `idle_tx` message from sending. - let handler = || { - let log = inner_log.clone(); - match work { - /* - * Unaggregated attestation verification. - */ - Work::GossipAttestation { - message_id, - peer_id, - attestation, - subnet_id, - should_import, - } => { - let beacon_block_root = attestation.data.beacon_block_root; - - let attestation = match chain - .verify_unaggregated_attestation_for_gossip(*attestation, subnet_id) - { - Ok(attestation) => attestation, - Err(e) => { - handle_attestation_verification_failure( - &log, - sync_tx, - peer_id, - beacon_block_root, - "unaggregated", - e, - ); - return; - } - }; - - // Indicate to the `Network` service that this message is valid and can be - // propagated on the gossip network. - propagate_gossip_message(network_tx, message_id, peer_id.clone(), &log); - - if !should_import { - return; - } - - metrics::inc_counter( - &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL, - ); - - if let Err(e) = chain.apply_attestation_to_fork_choice(&attestation) { - match e { - BeaconChainError::ForkChoiceError( - ForkChoiceError::InvalidAttestation(e), - ) => debug!( - log, - "Attestation invalid for fork choice"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) - ), - e => error!( - log, - "Error applying attestation to fork choice"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) - ), - } - } - - if let Err(e) = chain.add_to_naive_aggregation_pool(attestation) { - debug!( - log, - "Attestation invalid for agg pool"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) - ) - } - - metrics::inc_counter( - &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL, - ); - } - /* - * Aggregated attestation verification. - */ - Work::GossipAggregate { - message_id, - peer_id, - aggregate, - } => { - let beacon_block_root = - aggregate.message.aggregate.data.beacon_block_root; - - let aggregate = - match chain.verify_aggregated_attestation_for_gossip(*aggregate) { - Ok(aggregate) => aggregate, - Err(e) => { - handle_attestation_verification_failure( - &log, - sync_tx, - peer_id, - beacon_block_root, - "aggregated", - e, - ); - return; - } - }; - - // Indicate to the `Network` service that this message is valid and can be - // propagated on the gossip network. - propagate_gossip_message(network_tx, message_id, peer_id.clone(), &log); - - metrics::inc_counter( - &metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL, - ); - - if let Err(e) = chain.apply_attestation_to_fork_choice(&aggregate) { - match e { - BeaconChainError::ForkChoiceError( - ForkChoiceError::InvalidAttestation(e), - ) => debug!( - log, - "Aggregate invalid for fork choice"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) - ), - e => error!( - log, - "Error applying aggregate to fork choice"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) - ), - } - } - - if let Err(e) = chain.add_to_block_inclusion_pool(aggregate) { - debug!( - log, - "Attestation invalid for op pool"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) - ) - } - - metrics::inc_counter( - &metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL, - ); - } - /* - * Verification for beacon blocks received on gossip. - */ - Work::GossipBlock { - message_id, - peer_id, - block, - } => { - let verified_block = match chain.verify_block_for_gossip(*block) { - Ok(verified_block) => { - info!( - log, - "New block received"; - "slot" => verified_block.block.slot(), - "hash" => verified_block.block_root.to_string() - ); - propagate_gossip_message( - network_tx, - message_id, - peer_id.clone(), - &log, - ); - verified_block - } - Err(BlockError::ParentUnknown(block)) => { - send_sync_message( - sync_tx, - SyncMessage::UnknownBlock(peer_id, block), - &log, - ); - return; - } - Err(BlockError::BlockIsAlreadyKnown) => { - debug!( - log, - "Gossip block is already known"; - ); - return; - } - Err(e) => { - warn!( - log, - "Could not verify block for gossip"; - "error" => format!("{:?}", e) - ); - return; - } - }; - - metrics::inc_counter( - &metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL, - ); - - let block = Box::new(verified_block.block.clone()); - match chain.process_block(verified_block) { - Ok(_block_root) => { - metrics::inc_counter( - &metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL, - ); - - trace!( - log, - "Gossipsub block processed"; - "peer_id" => peer_id.to_string() - ); - - // TODO: It would be better if we can run this _after_ we publish the block to - // reduce block propagation latency. - // - // The `MessageHandler` would be the place to put this, however it doesn't seem - // to have a reference to the `BeaconChain`. I will leave this for future - // works. - match chain.fork_choice() { - Ok(()) => trace!( - log, - "Fork choice success"; - "location" => "block gossip" - ), - Err(e) => error!( - log, - "Fork choice failed"; - "error" => format!("{:?}", e), - "location" => "block gossip" - ), - } - } - Err(BlockError::ParentUnknown { .. }) => { - // Inform the sync manager to find parents for this block - // This should not occur. It should be checked by `should_forward_block` - error!( - log, - "Block with unknown parent attempted to be processed"; - "peer_id" => peer_id.to_string() - ); - send_sync_message( - sync_tx, - SyncMessage::UnknownBlock(peer_id, block), - &log, - ); - } - other => { - debug!( - log, - "Invalid gossip beacon block"; - "outcome" => format!("{:?}", other), - "block root" => format!("{}", block.canonical_root()), - "block slot" => block.slot() - ); - trace!( - log, - "Invalid gossip beacon block ssz"; - "ssz" => format!("0x{}", hex::encode(block.as_ssz_bytes())), - ); - } - }; - } - /* - * Verification for beacon blocks received during syncing via RPC. - */ - Work::RpcBlock { block, result_tx } => { - let block_result = chain.process_block(*block); - - metrics::inc_counter( - &metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL, - ); - - if result_tx.send(block_result).is_err() { - crit!(log, "Failed return sync block result"); - } - } - /* - * Verification for a chain segment (multiple blocks). - */ - Work::ChainSegment { process_id, blocks } => { - handle_chain_segment(chain, process_id, blocks, sync_tx, log) - } - }; + match work { + /* + * Unaggregated attestation verification. + */ + Work::GossipAttestation { + message_id, + peer_id, + attestation, + subnet_id, + should_import, + } => worker.process_gossip_attestation( + message_id, + peer_id, + *attestation, + subnet_id, + should_import, + ), + /* + * Aggregated attestation verification. + */ + Work::GossipAggregate { + message_id, + peer_id, + aggregate, + } => worker.process_gossip_aggregate(message_id, peer_id, *aggregate), + /* + * Verification for beacon blocks received on gossip. + */ + Work::GossipBlock { + message_id, + peer_id, + block, + } => worker.process_gossip_block(message_id, peer_id, *block), + /* + * Verification for beacon blocks received during syncing via RPC. + */ + Work::RpcBlock { block, result_tx } => { + worker.process_rpc_block(*block, result_tx) + } + /* + * Verification for a chain segment (multiple blocks). + */ + Work::ChainSegment { process_id, blocks } => { + worker.process_chain_segment(process_id, blocks) + } }; - handler(); trace!( log, @@ -902,300 +659,3 @@ impl BeaconProcessor { ); } } - -/// Send a message on `message_tx` that the `message_id` sent by `peer_id` should be propagated on -/// the gossip network. -/// -/// Creates a log if there is an interal error. -fn propagate_gossip_message( - network_tx: mpsc::UnboundedSender>, - message_id: MessageId, - peer_id: PeerId, - log: &Logger, -) { - network_tx - .send(NetworkMessage::Validate { - propagation_source: peer_id, - message_id, - }) - .unwrap_or_else(|_| { - warn!( - log, - "Could not send propagation request to the network service" - ) - }); -} - -/// Send a message to `sync_tx`. -/// -/// Creates a log if there is an interal error. -fn send_sync_message( - sync_tx: mpsc::UnboundedSender>, - message: SyncMessage, - log: &Logger, -) { - sync_tx - .send(message) - .unwrap_or_else(|_| error!(log, "Could not send message to the sync service")); -} - -/// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the -/// network. -pub fn handle_attestation_verification_failure( - log: &Logger, - sync_tx: mpsc::UnboundedSender>, - peer_id: PeerId, - beacon_block_root: Hash256, - attestation_type: &str, - error: AttnError, -) { - metrics::register_attestation_error(&error); - match &error { - AttnError::FutureEpoch { .. } - | AttnError::PastEpoch { .. } - | AttnError::FutureSlot { .. } - | AttnError::PastSlot { .. } => { - /* - * These errors can be triggered by a mismatch between our slot and the peer. - * - * - * The peer has published an invalid consensus message, _only_ if we trust our own clock. - */ - } - AttnError::InvalidSelectionProof { .. } | AttnError::InvalidSignature => { - /* - * These errors are caused by invalid signatures. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::EmptyAggregationBitfield => { - /* - * The aggregate had no signatures and is therefore worthless. - * - * Whilst we don't gossip this attestation, this act is **not** a clear - * violation of the spec nor indication of fault. - * - * This may change soon. Reference: - * - * https://github.com/ethereum/eth2.0-specs/pull/1732 - */ - } - AttnError::AggregatorPubkeyUnknown(_) => { - /* - * The aggregator index was higher than any known validator index. This is - * possible in two cases: - * - * 1. The attestation is malformed - * 2. The attestation attests to a beacon_block_root that we do not know. - * - * It should be impossible to reach (2) without triggering - * `AttnError::UnknownHeadBlock`, so we can safely assume the peer is - * faulty. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::AggregatorNotInCommittee { .. } => { - /* - * The aggregator index was higher than any known validator index. This is - * possible in two cases: - * - * 1. The attestation is malformed - * 2. The attestation attests to a beacon_block_root that we do not know. - * - * It should be impossible to reach (2) without triggering - * `AttnError::UnknownHeadBlock`, so we can safely assume the peer is - * faulty. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::AttestationAlreadyKnown { .. } => { - /* - * The aggregate attestation has already been observed on the network or in - * a block. - * - * The peer is not necessarily faulty. - */ - trace!( - log, - "Attestation already known"; - "peer_id" => peer_id.to_string(), - "block" => format!("{}", beacon_block_root), - "type" => format!("{:?}", attestation_type), - ); - return; - } - AttnError::AggregatorAlreadyKnown(_) => { - /* - * There has already been an aggregate attestation seen from this - * aggregator index. - * - * The peer is not necessarily faulty. - */ - trace!( - log, - "Aggregator already known"; - "peer_id" => peer_id.to_string(), - "block" => format!("{}", beacon_block_root), - "type" => format!("{:?}", attestation_type), - ); - return; - } - AttnError::PriorAttestationKnown { .. } => { - /* - * We have already seen an attestation from this validator for this epoch. - * - * The peer is not necessarily faulty. - */ - trace!( - log, - "Prior attestation known"; - "peer_id" => peer_id.to_string(), - "block" => format!("{}", beacon_block_root), - "type" => format!("{:?}", attestation_type), - ); - return; - } - AttnError::ValidatorIndexTooHigh(_) => { - /* - * The aggregator index (or similar field) was higher than the maximum - * possible number of validators. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::UnknownHeadBlock { beacon_block_root } => { - // Note: its a little bit unclear as to whether or not this block is unknown or - // just old. See: - // - // https://github.com/sigp/lighthouse/issues/1039 - - // TODO: Maintain this attestation and re-process once sync completes - debug!( - log, - "Attestation for unknown block"; - "peer_id" => peer_id.to_string(), - "block" => format!("{}", beacon_block_root) - ); - // we don't know the block, get the sync manager to handle the block lookup - sync_tx - .send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root)) - .unwrap_or_else(|_| { - warn!( - log, - "Failed to send to sync service"; - "msg" => "UnknownBlockHash" - ) - }); - return; - } - AttnError::UnknownTargetRoot(_) => { - /* - * The block indicated by the target root is not known to us. - * - * We should always get `AttnError::UnknwonHeadBlock` before we get this - * error, so this means we can get this error if: - * - * 1. The target root does not represent a valid block. - * 2. We do not have the target root in our DB. - * - * For (2), we should only be processing attestations when we should have - * all the available information. Note: if we do a weak-subjectivity sync - * it's possible that this situation could occur, but I think it's - * unlikely. For now, we will declare this to be an invalid message> - * - * The peer has published an invalid consensus message. - */ - } - AttnError::BadTargetEpoch => { - /* - * The aggregator index (or similar field) was higher than the maximum - * possible number of validators. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::NoCommitteeForSlotAndIndex { .. } => { - /* - * It is not possible to attest this the given committee in the given slot. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::NotExactlyOneAggregationBitSet(_) => { - /* - * The unaggregated attestation doesn't have only one signature. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::AttestsToFutureBlock { .. } => { - /* - * The beacon_block_root is from a higher slot than the attestation. - * - * The peer has published an invalid consensus message. - */ - } - - AttnError::InvalidSubnetId { received, expected } => { - /* - * The attestation was received on an incorrect subnet id. - */ - debug!( - log, - "Received attestation on incorrect subnet"; - "expected" => format!("{:?}", expected), - "received" => format!("{:?}", received), - ) - } - AttnError::Invalid(_) => { - /* - * The attestation failed the state_processing verification. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::TooManySkippedSlots { - head_block_slot, - attestation_slot, - } => { - /* - * The attestation references a head block that is too far behind the attestation slot. - * - * The message is not necessarily invalid, but we choose to ignore it. - */ - debug!( - log, - "Rejected long skip slot attestation"; - "head_block_slot" => head_block_slot, - "attestation_slot" => attestation_slot, - ) - } - AttnError::BeaconChainError(e) => { - /* - * Lighthouse hit an unexpected error whilst processing the attestation. It - * should be impossible to trigger a `BeaconChainError` from the network, - * so we have a bug. - * - * It's not clear if the message is invalid/malicious. - */ - error!( - log, - "Unable to validate aggregate"; - "peer_id" => peer_id.to_string(), - "error" => format!("{:?}", e), - ); - } - } - - debug!( - log, - "Invalid attestation from network"; - "reason" => format!("{:?}", error), - "block" => format!("{}", beacon_block_root), - "peer_id" => peer_id.to_string(), - "type" => format!("{:?}", attestation_type), - ); -} diff --git a/beacon_node/network/src/beacon_processor/worker.rs b/beacon_node/network/src/beacon_processor/worker.rs new file mode 100644 index 0000000000..185ac7708d --- /dev/null +++ b/beacon_node/network/src/beacon_processor/worker.rs @@ -0,0 +1,593 @@ +use super::{ + chain_segment::{handle_chain_segment, ProcessId}, + BlockResultSender, +}; +use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; +use beacon_chain::{ + attestation_verification::Error as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, + BlockError, ForkChoiceError, +}; +use eth2_libp2p::{MessageId, PeerId}; +use slog::{crit, debug, error, info, trace, warn, Logger}; +use ssz::Encode; +use std::sync::Arc; +use tokio::sync::mpsc; +use types::{Attestation, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SubnetId}; + +/// Contains the context necessary to import blocks, attestations, etc to the beacon chain. +pub struct Worker { + pub chain: Arc>, + pub network_tx: mpsc::UnboundedSender>, + pub sync_tx: mpsc::UnboundedSender>, + pub log: Logger, +} + +impl Worker { + /// Process the unaggregated attestation received from the gossip network and: + /// + /// - If it passes gossip propagation criteria, tell the network thread to forward it. + /// - Attempt to apply it to fork choice. + /// - Attempt to add it to the naive aggregation pool. + /// + /// Raises a log if there are errors. + pub fn process_gossip_attestation( + self, + message_id: MessageId, + peer_id: PeerId, + attestation: Attestation, + subnet_id: SubnetId, + should_import: bool, + ) { + let beacon_block_root = attestation.data.beacon_block_root; + + let attestation = match self + .chain + .verify_unaggregated_attestation_for_gossip(attestation, subnet_id) + { + Ok(attestation) => attestation, + Err(e) => { + self.handle_attestation_verification_failure( + peer_id, + beacon_block_root, + "unaggregated", + e, + ); + return; + } + }; + + // Indicate to the `Network` service that this message is valid and can be + // propagated on the gossip network. + self.propagate_gossip_message(message_id, peer_id.clone()); + + if !should_import { + return; + } + + metrics::inc_counter(&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL); + + if let Err(e) = self.chain.apply_attestation_to_fork_choice(&attestation) { + match e { + BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation(e)) => { + debug!( + self.log, + "Attestation invalid for fork choice"; + "reason" => format!("{:?}", e), + "peer" => peer_id.to_string(), + "beacon_block_root" => format!("{:?}", beacon_block_root) + ) + } + e => error!( + self.log, + "Error applying attestation to fork choice"; + "reason" => format!("{:?}", e), + "peer" => peer_id.to_string(), + "beacon_block_root" => format!("{:?}", beacon_block_root) + ), + } + } + + if let Err(e) = self.chain.add_to_naive_aggregation_pool(attestation) { + debug!( + self.log, + "Attestation invalid for agg pool"; + "reason" => format!("{:?}", e), + "peer" => peer_id.to_string(), + "beacon_block_root" => format!("{:?}", beacon_block_root) + ) + } + + metrics::inc_counter(&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL); + } + + /// Process the aggregated attestation received from the gossip network and: + /// + /// - If it passes gossip propagation criteria, tell the network thread to forward it. + /// - Attempt to apply it to fork choice. + /// - Attempt to add it to the block inclusion pool. + /// + /// Raises a log if there are errors. + pub fn process_gossip_aggregate( + self, + message_id: MessageId, + peer_id: PeerId, + aggregate: SignedAggregateAndProof, + ) { + let beacon_block_root = aggregate.message.aggregate.data.beacon_block_root; + + let aggregate = match self + .chain + .verify_aggregated_attestation_for_gossip(aggregate) + { + Ok(aggregate) => aggregate, + Err(e) => { + self.handle_attestation_verification_failure( + peer_id, + beacon_block_root, + "aggregated", + e, + ); + return; + } + }; + + // Indicate to the `Network` service that this message is valid and can be + // propagated on the gossip network. + self.propagate_gossip_message(message_id, peer_id.clone()); + + metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL); + + if let Err(e) = self.chain.apply_attestation_to_fork_choice(&aggregate) { + match e { + BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation(e)) => { + debug!( + self.log, + "Aggregate invalid for fork choice"; + "reason" => format!("{:?}", e), + "peer" => peer_id.to_string(), + "beacon_block_root" => format!("{:?}", beacon_block_root) + ) + } + e => error!( + self.log, + "Error applying aggregate to fork choice"; + "reason" => format!("{:?}", e), + "peer" => peer_id.to_string(), + "beacon_block_root" => format!("{:?}", beacon_block_root) + ), + } + } + + if let Err(e) = self.chain.add_to_block_inclusion_pool(aggregate) { + debug!( + self.log, + "Attestation invalid for op pool"; + "reason" => format!("{:?}", e), + "peer" => peer_id.to_string(), + "beacon_block_root" => format!("{:?}", beacon_block_root) + ) + } + + metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL); + } + + /// Process the beacon block received from the gossip network and: + /// + /// - If it passes gossip propagation criteria, tell the network thread to forward it. + /// - Attempt to add it to the beacon chain, informing the sync thread if more blocks need to + /// be downloaded. + /// + /// Raises a log if there are errors. + pub fn process_gossip_block( + self, + message_id: MessageId, + peer_id: PeerId, + block: SignedBeaconBlock, + ) { + let verified_block = match self.chain.verify_block_for_gossip(block) { + Ok(verified_block) => { + info!( + self.log, + "New block received"; + "slot" => verified_block.block.slot(), + "hash" => verified_block.block_root.to_string() + ); + self.propagate_gossip_message(message_id, peer_id.clone()); + verified_block + } + Err(BlockError::ParentUnknown(block)) => { + self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block)); + return; + } + Err(BlockError::BlockIsAlreadyKnown) => { + debug!( + self.log, + "Gossip block is already known"; + ); + return; + } + Err(e) => { + warn!( + self.log, + "Could not verify block for gossip"; + "error" => format!("{:?}", e) + ); + return; + } + }; + + metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL); + + let block = Box::new(verified_block.block.clone()); + match self.chain.process_block(verified_block) { + Ok(_block_root) => { + metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL); + + trace!( + self.log, + "Gossipsub block processed"; + "peer_id" => peer_id.to_string() + ); + + // TODO: It would be better if we can run this _after_ we publish the block to + // reduce block propagation latency. + // + // The `MessageHandler` would be the place to put this, however it doesn't seem + // to have a reference to the `BeaconChain`. I will leave this for future + // works. + match self.chain.fork_choice() { + Ok(()) => trace!( + self.log, + "Fork choice success"; + "location" => "block gossip" + ), + Err(e) => error!( + self.log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "block gossip" + ), + } + } + Err(BlockError::ParentUnknown { .. }) => { + // Inform the sync manager to find parents for this block + // This should not occur. It should be checked by `should_forward_block` + error!( + self.log, + "Block with unknown parent attempted to be processed"; + "peer_id" => peer_id.to_string() + ); + self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block)); + } + other => { + debug!( + self.log, + "Invalid gossip beacon block"; + "outcome" => format!("{:?}", other), + "block root" => format!("{}", block.canonical_root()), + "block slot" => block.slot() + ); + trace!( + self.log, + "Invalid gossip beacon block ssz"; + "ssz" => format!("0x{}", hex::encode(block.as_ssz_bytes())), + ); + } + }; + } + + /// Attempt to process a block received from a direct RPC request, returning the processing + /// result on the `result_tx` channel. + /// + /// Raises a log if there are errors publishing the result to the channel. + pub fn process_rpc_block( + self, + block: SignedBeaconBlock, + result_tx: BlockResultSender, + ) { + let block_result = self.chain.process_block(block); + + metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL); + + if result_tx.send(block_result).is_err() { + crit!(self.log, "Failed return sync block result"); + } + } + + /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync + /// thread if more blocks are needed to process it. + pub fn process_chain_segment( + self, + process_id: ProcessId, + blocks: Vec>, + ) { + handle_chain_segment(self.chain, process_id, blocks, self.sync_tx, self.log) + } + + /// Send a message on `message_tx` that the `message_id` sent by `peer_id` should be propagated on + /// the gossip network. + /// + /// Creates a log if there is an interal error. + fn propagate_gossip_message(&self, message_id: MessageId, peer_id: PeerId) { + self.network_tx + .send(NetworkMessage::Validate { + propagation_source: peer_id, + message_id, + }) + .unwrap_or_else(|_| { + warn!( + self.log, + "Could not send propagation request to the network service" + ) + }); + } + + /// Send a message to `sync_tx`. + /// + /// Creates a log if there is an interal error. + fn send_sync_message(&self, message: SyncMessage) { + self.sync_tx + .send(message) + .unwrap_or_else(|_| error!(self.log, "Could not send message to the sync service")); + } + + /// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the + /// network. + pub fn handle_attestation_verification_failure( + &self, + peer_id: PeerId, + beacon_block_root: Hash256, + attestation_type: &str, + error: AttnError, + ) { + metrics::register_attestation_error(&error); + match &error { + AttnError::FutureEpoch { .. } + | AttnError::PastEpoch { .. } + | AttnError::FutureSlot { .. } + | AttnError::PastSlot { .. } => { + /* + * These errors can be triggered by a mismatch between our slot and the peer. + * + * + * The peer has published an invalid consensus message, _only_ if we trust our own clock. + */ + } + AttnError::InvalidSelectionProof { .. } | AttnError::InvalidSignature => { + /* + * These errors are caused by invalid signatures. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::EmptyAggregationBitfield => { + /* + * The aggregate had no signatures and is therefore worthless. + * + * Whilst we don't gossip this attestation, this act is **not** a clear + * violation of the spec nor indication of fault. + * + * This may change soon. Reference: + * + * https://github.com/ethereum/eth2.0-specs/pull/1732 + */ + } + AttnError::AggregatorPubkeyUnknown(_) => { + /* + * The aggregator index was higher than any known validator index. This is + * possible in two cases: + * + * 1. The attestation is malformed + * 2. The attestation attests to a beacon_block_root that we do not know. + * + * It should be impossible to reach (2) without triggering + * `AttnError::UnknownHeadBlock`, so we can safely assume the peer is + * faulty. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::AggregatorNotInCommittee { .. } => { + /* + * The aggregator index was higher than any known validator index. This is + * possible in two cases: + * + * 1. The attestation is malformed + * 2. The attestation attests to a beacon_block_root that we do not know. + * + * It should be impossible to reach (2) without triggering + * `AttnError::UnknownHeadBlock`, so we can safely assume the peer is + * faulty. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::AttestationAlreadyKnown { .. } => { + /* + * The aggregate attestation has already been observed on the network or in + * a block. + * + * The peer is not necessarily faulty. + */ + trace!( + self.log, + "Attestation already known"; + "peer_id" => peer_id.to_string(), + "block" => format!("{}", beacon_block_root), + "type" => format!("{:?}", attestation_type), + ); + return; + } + AttnError::AggregatorAlreadyKnown(_) => { + /* + * There has already been an aggregate attestation seen from this + * aggregator index. + * + * The peer is not necessarily faulty. + */ + trace!( + self.log, + "Aggregator already known"; + "peer_id" => peer_id.to_string(), + "block" => format!("{}", beacon_block_root), + "type" => format!("{:?}", attestation_type), + ); + return; + } + AttnError::PriorAttestationKnown { .. } => { + /* + * We have already seen an attestation from this validator for this epoch. + * + * The peer is not necessarily faulty. + */ + trace!( + self.log, + "Prior attestation known"; + "peer_id" => peer_id.to_string(), + "block" => format!("{}", beacon_block_root), + "type" => format!("{:?}", attestation_type), + ); + return; + } + AttnError::ValidatorIndexTooHigh(_) => { + /* + * The aggregator index (or similar field) was higher than the maximum + * possible number of validators. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::UnknownHeadBlock { beacon_block_root } => { + // Note: its a little bit unclear as to whether or not this block is unknown or + // just old. See: + // + // https://github.com/sigp/lighthouse/issues/1039 + + // TODO: Maintain this attestation and re-process once sync completes + debug!( + self.log, + "Attestation for unknown block"; + "peer_id" => peer_id.to_string(), + "block" => format!("{}", beacon_block_root) + ); + // we don't know the block, get the sync manager to handle the block lookup + self.sync_tx + .send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root)) + .unwrap_or_else(|_| { + warn!( + self.log, + "Failed to send to sync service"; + "msg" => "UnknownBlockHash" + ) + }); + return; + } + AttnError::UnknownTargetRoot(_) => { + /* + * The block indicated by the target root is not known to us. + * + * We should always get `AttnError::UnknwonHeadBlock` before we get this + * error, so this means we can get this error if: + * + * 1. The target root does not represent a valid block. + * 2. We do not have the target root in our DB. + * + * For (2), we should only be processing attestations when we should have + * all the available information. Note: if we do a weak-subjectivity sync + * it's possible that this situation could occur, but I think it's + * unlikely. For now, we will declare this to be an invalid message> + * + * The peer has published an invalid consensus message. + */ + } + AttnError::BadTargetEpoch => { + /* + * The aggregator index (or similar field) was higher than the maximum + * possible number of validators. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::NoCommitteeForSlotAndIndex { .. } => { + /* + * It is not possible to attest this the given committee in the given slot. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::NotExactlyOneAggregationBitSet(_) => { + /* + * The unaggregated attestation doesn't have only one signature. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::AttestsToFutureBlock { .. } => { + /* + * The beacon_block_root is from a higher slot than the attestation. + * + * The peer has published an invalid consensus message. + */ + } + + AttnError::InvalidSubnetId { received, expected } => { + /* + * The attestation was received on an incorrect subnet id. + */ + debug!( + self.log, + "Received attestation on incorrect subnet"; + "expected" => format!("{:?}", expected), + "received" => format!("{:?}", received), + ) + } + AttnError::Invalid(_) => { + /* + * The attestation failed the state_processing verification. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::TooManySkippedSlots { + head_block_slot, + attestation_slot, + } => { + /* + * The attestation references a head block that is too far behind the attestation slot. + * + * The message is not necessarily invalid, but we choose to ignore it. + */ + debug!( + self.log, + "Rejected long skip slot attestation"; + "head_block_slot" => head_block_slot, + "attestation_slot" => attestation_slot, + ) + } + AttnError::BeaconChainError(e) => { + /* + * Lighthouse hit an unexpected error whilst processing the attestation. It + * should be impossible to trigger a `BeaconChainError` from the network, + * so we have a bug. + * + * It's not clear if the message is invalid/malicious. + */ + error!( + self.log, + "Unable to validate aggregate"; + "peer_id" => peer_id.to_string(), + "error" => format!("{:?}", e), + ); + } + } + + debug!( + self.log, + "Invalid attestation from network"; + "reason" => format!("{:?}", error), + "block" => format!("{}", beacon_block_root), + "peer_id" => peer_id.to_string(), + "type" => format!("{:?}", attestation_type), + ); + } +}