diff --git a/Cargo.lock b/Cargo.lock index 4615c74712..7722ecfb1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3285,6 +3285,7 @@ dependencies = [ "lazy_static", "lighthouse_metrics", "matches", + "num_cpus", "parking_lot 0.11.0", "rand 0.7.3", "rest_types", diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index cd5aa3b609..da1739672a 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -39,3 +39,4 @@ lazy_static = "1.4.0" lighthouse_metrics = { path = "../../common/lighthouse_metrics" } environment = { path = "../../lighthouse/environment" } itertools = "0.9.0" +num_cpus = "1.13.0" diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index b2f789e36a..b0039763c3 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -1,3 +1,4 @@ +use beacon_chain::attestation_verification::Error as AttnError; pub use lighthouse_metrics::*; lazy_static! { @@ -44,4 +45,215 @@ lazy_static! { "network_subnet_subscriptions_aggregator_total", "Count of validator subscription requests where the subscriber is an aggregator." ); + + /* + * Gossip processor + */ + pub static ref GOSSIP_PROCESSOR_WORKERS_SPAWNED_TOTAL: Result = try_create_int_counter( + "gossip_processor_workers_spawned_total", + "The number of workers ever spawned by the gossip processing pool." + ); + pub static ref GOSSIP_PROCESSOR_WORKERS_ACTIVE_TOTAL: Result = try_create_int_gauge( + "gossip_processor_workers_active_total", + "Count of active workers in the gossip processing pool." + ); + pub static ref GOSSIP_PROCESSOR_WORK_EVENTS_TOTAL: Result = try_create_int_counter( + "gossip_processor_work_events_total", + "Count of work events processed by the gossip processor manager." + ); + pub static ref GOSSIP_PROCESSOR_WORK_EVENTS_IGNORED_TOTAL: Result = try_create_int_counter( + "gossip_processor_work_events_ignored_total", + "Count of work events processed by the gossip processor manager." + ); + pub static ref GOSSIP_PROCESSOR_IDLE_EVENTS_TOTAL: Result = try_create_int_counter( + "gossip_processor_idle_events_total", + "Count of idle events processed by the gossip processor manager." + ); + pub static ref GOSSIP_PROCESSOR_EVENT_HANDLING_SECONDS: Result = try_create_histogram( + "gossip_processor_event_handling_seconds", + "Time spent handling a new message and allocating it to a queue or worker." + ); + pub static ref GOSSIP_PROCESSOR_WORKER_TIME: Result = try_create_histogram( + "gossip_processor_worker_time", + "Time taken for a worker to fully process some parcel of work." + ); + pub static ref GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_QUEUE_TOTAL: Result = try_create_int_gauge( + "gossip_processor_unaggregated_attestation_queue_total", + "Count of unagg. attestations waiting to be processed." + ); + pub static ref GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_WORKER_TIME: Result = try_create_histogram( + "gossip_processor_unaggregated_attestation_worker_time", + "Time taken for a worker to fully process an unaggregated attestation." + ); + pub static ref GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL: Result = try_create_int_counter( + "gossip_processor_unaggregated_attestation_verified_total", + "Total number of unaggregated attestations verified for gossip." + ); + pub static ref GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL: Result = try_create_int_counter( + "gossip_processor_unaggregated_attestation_imported_total", + "Total number of unaggregated attestations imported to fork choice, etc." + ); + pub static ref GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_QUEUE_TOTAL: Result = try_create_int_gauge( + "gossip_processor_aggregated_attestation_queue_total", + "Count of agg. attestations waiting to be processed." + ); + pub static ref GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_WORKER_TIME: Result = try_create_histogram( + "gossip_processor_aggregated_attestation_worker_time", + "Time taken for a worker to fully process an aggregated attestation." + ); + pub static ref GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL: Result = try_create_int_counter( + "gossip_processor_aggregated_attestation_verified_total", + "Total number of aggregated attestations verified for gossip." + ); + pub static ref GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL: Result = try_create_int_counter( + "gossip_processor_aggregated_attestation_imported_total", + "Total number of aggregated attestations imported to fork choice, etc." + ); + + /* + * Attestation Errors + */ + pub static ref GOSSIP_ATTESTATION_ERROR_FUTURE_EPOCH: Result = try_create_int_counter( + "gossip_attestation_error_future_epoch", + "Count of a specific error type (see metric name)" + ); + pub static ref GOSSIP_ATTESTATION_ERROR_PAST_EPOCH: Result = try_create_int_counter( + "gossip_attestation_error_past_epoch", + "Count of a specific error type (see metric name)" + ); + pub static ref GOSSIP_ATTESTATION_ERROR_FUTURE_SLOT: Result = try_create_int_counter( + "gossip_attestation_error_future_slot", + "Count of a specific error type (see metric name)" + ); + pub static ref GOSSIP_ATTESTATION_ERROR_PAST_SLOT: Result = try_create_int_counter( + "gossip_attestation_error_past_slot", + "Count of a specific error type (see metric name)" + ); + pub static ref GOSSIP_ATTESTATION_ERROR_INVALID_SELECTION_PROOF: Result = try_create_int_counter( + "gossip_attestation_error_invalid_selection_proof", + "Count of a specific error type (see metric name)" + ); + pub static ref GOSSIP_ATTESTATION_ERROR_INVALID_SIGNATURE: Result = try_create_int_counter( + "gossip_attestation_error_invalid_signature", + "Count of a specific error type (see metric name)" + ); + pub static ref GOSSIP_ATTESTATION_ERROR_EMPTY_AGGREGATION_BITFIELD: Result = try_create_int_counter( + "gossip_attestation_error_empty_aggregation_bitfield", + "Count of a specific error type (see metric name)" + ); + pub static ref GOSSIP_ATTESTATION_ERROR_AGGREGATOR_PUBKEY_UNKNOWN: Result = try_create_int_counter( + "gossip_attestation_error_aggregator_pubkey_unknown", + "Count of a specific error type (see metric name)" + ); + pub static ref GOSSIP_ATTESTATION_ERROR_AGGREGATOR_NOT_IN_COMMITTEE: Result = try_create_int_counter( + "gossip_attestation_error_aggregator_not_in_committee", + "Count of a specific error type (see metric name)" + ); + pub static ref GOSSIP_ATTESTATION_ERROR_ATTESTATION_ALREADY_KNOWN: Result = try_create_int_counter( + "gossip_attestation_error_attestation_already_known", + "Count of a specific error type (see metric name)" + ); + pub static ref GOSSIP_ATTESTATION_ERROR_AGGREGATOR_ALREADY_KNOWN: Result = try_create_int_counter( + "gossip_attestation_error_aggregator_already_known", + "Count of a specific error type (see metric name)" + ); + pub static ref GOSSIP_ATTESTATION_ERROR_PRIOR_ATTESTATION_KNOWN: Result = try_create_int_counter( + "gossip_attestation_error_prior_attestation_known", + "Count of a specific error type (see metric name)" + ); + pub static ref GOSSIP_ATTESTATION_ERROR_VALIDATOR_INDEX_TOO_HIGH: Result = try_create_int_counter( + "gossip_attestation_error_validator_index_too_high", + "Count of a specific error type (see metric name)" + ); + pub static ref GOSSIP_ATTESTATION_ERROR_UNKNOWN_HEAD_BLOCK: Result = try_create_int_counter( + "gossip_attestation_error_unknown_head_block", + "Count of a specific error type (see metric name)" + ); + pub static ref GOSSIP_ATTESTATION_ERROR_UNKNOWN_TARGET_ROOT: Result = try_create_int_counter( + "gossip_attestation_error_unknown_target_root", + "Count of a specific error type (see metric name)" + ); + pub static ref GOSSIP_ATTESTATION_ERROR_BAD_TARGET_EPOCH: Result = try_create_int_counter( + "gossip_attestation_error_bad_target_epoch", + "Count of a specific error type (see metric name)" + ); + pub static ref GOSSIP_ATTESTATION_ERROR_NO_COMMITTEE_FOR_SLOT_AND_INDEX: Result = try_create_int_counter( + "gossip_attestation_error_no_committee_for_slot_and_index", + "Count of a specific error type (see metric name)" + ); + pub static ref GOSSIP_ATTESTATION_ERROR_NOT_EXACTLY_ONE_AGGREGATION_BIT_SET: Result = try_create_int_counter( + "gossip_attestation_error_not_exactly_one_aggregation_bit_set", + "Count of a specific error type (see metric name)" + ); + pub static ref GOSSIP_ATTESTATION_ERROR_ATTESTS_TO_FUTURE_BLOCK: Result = try_create_int_counter( + "gossip_attestation_error_attests_to_future_block", + "Count of a specific error type (see metric name)" + ); + pub static ref GOSSIP_ATTESTATION_ERROR_INVALID_SUBNET_ID: Result = try_create_int_counter( + "gossip_attestation_error_invalid_subnet_id", + "Count of a specific error type (see metric name)" + ); + pub static ref GOSSIP_ATTESTATION_ERROR_INVALID_STATE_PROCESSING: Result = try_create_int_counter( + "gossip_attestation_error_invalid_state_processing", + "Count of a specific error type (see metric name)" + ); + pub static ref GOSSIP_ATTESTATION_ERROR_BEACON_CHAIN_ERROR: Result = try_create_int_counter( + "gossip_attestation_error_beacon_chain_error", + "Count of a specific error type (see metric name)" + ); +} + +pub fn register_attestation_error(error: &AttnError) { + match error { + AttnError::FutureEpoch { .. } => inc_counter(&GOSSIP_ATTESTATION_ERROR_FUTURE_EPOCH), + AttnError::PastEpoch { .. } => inc_counter(&GOSSIP_ATTESTATION_ERROR_PAST_EPOCH), + AttnError::FutureSlot { .. } => inc_counter(&GOSSIP_ATTESTATION_ERROR_FUTURE_SLOT), + AttnError::PastSlot { .. } => inc_counter(&GOSSIP_ATTESTATION_ERROR_PAST_SLOT), + AttnError::InvalidSelectionProof { .. } => { + inc_counter(&GOSSIP_ATTESTATION_ERROR_INVALID_SELECTION_PROOF) + } + AttnError::InvalidSignature => inc_counter(&GOSSIP_ATTESTATION_ERROR_INVALID_SIGNATURE), + AttnError::EmptyAggregationBitfield => { + inc_counter(&GOSSIP_ATTESTATION_ERROR_EMPTY_AGGREGATION_BITFIELD) + } + AttnError::AggregatorPubkeyUnknown(_) => { + inc_counter(&GOSSIP_ATTESTATION_ERROR_AGGREGATOR_PUBKEY_UNKNOWN) + } + AttnError::AggregatorNotInCommittee { .. } => { + inc_counter(&GOSSIP_ATTESTATION_ERROR_AGGREGATOR_NOT_IN_COMMITTEE) + } + AttnError::AttestationAlreadyKnown { .. } => { + inc_counter(&GOSSIP_ATTESTATION_ERROR_ATTESTATION_ALREADY_KNOWN) + } + AttnError::AggregatorAlreadyKnown(_) => { + inc_counter(&GOSSIP_ATTESTATION_ERROR_AGGREGATOR_ALREADY_KNOWN) + } + AttnError::PriorAttestationKnown { .. } => { + inc_counter(&GOSSIP_ATTESTATION_ERROR_PRIOR_ATTESTATION_KNOWN) + } + AttnError::ValidatorIndexTooHigh(_) => { + inc_counter(&GOSSIP_ATTESTATION_ERROR_VALIDATOR_INDEX_TOO_HIGH) + } + AttnError::UnknownHeadBlock { .. } => { + inc_counter(&GOSSIP_ATTESTATION_ERROR_UNKNOWN_HEAD_BLOCK) + } + AttnError::UnknownTargetRoot(_) => { + inc_counter(&GOSSIP_ATTESTATION_ERROR_UNKNOWN_TARGET_ROOT) + } + AttnError::BadTargetEpoch => inc_counter(&GOSSIP_ATTESTATION_ERROR_BAD_TARGET_EPOCH), + AttnError::NoCommitteeForSlotAndIndex { .. } => { + inc_counter(&GOSSIP_ATTESTATION_ERROR_NO_COMMITTEE_FOR_SLOT_AND_INDEX) + } + AttnError::NotExactlyOneAggregationBitSet(_) => { + inc_counter(&GOSSIP_ATTESTATION_ERROR_NOT_EXACTLY_ONE_AGGREGATION_BIT_SET) + } + AttnError::AttestsToFutureBlock { .. } => { + inc_counter(&GOSSIP_ATTESTATION_ERROR_ATTESTS_TO_FUTURE_BLOCK) + } + AttnError::InvalidSubnetId { .. } => { + inc_counter(&GOSSIP_ATTESTATION_ERROR_INVALID_SUBNET_ID) + } + AttnError::Invalid(_) => inc_counter(&GOSSIP_ATTESTATION_ERROR_INVALID_STATE_PROCESSING), + AttnError::BeaconChainError(_) => inc_counter(&GOSSIP_ATTESTATION_ERROR_BEACON_CHAIN_ERROR), + } } diff --git a/beacon_node/network/src/router/gossip_processor.rs b/beacon_node/network/src/router/gossip_processor.rs new file mode 100644 index 0000000000..f9f229ba77 --- /dev/null +++ b/beacon_node/network/src/router/gossip_processor.rs @@ -0,0 +1,842 @@ +//! Provides the `GossipProcessor`, a mutli-threaded processor for messages received on the network +//! that need to be processed by the `BeaconChain`. +//! +//! Uses `tokio` tasks (instead of raw threads) to provide the following tasks: +//! +//! - A "manager" task, which either spawns worker tasks or enqueues work. +//! - One or more "worker" tasks which perform time-intensive work on the `BeaconChain`. +//! +//! ## Purpose +//! +//! The purpose of the `GossipProcessor` is to provide two things: +//! +//! 1. Moving long-running, blocking tasks off the main `tokio` executor. +//! 2. A fixed-length buffer for consensus messages. +//! +//! (1) ensures that we don't clog up the networking stack with long-running tasks, potentially +//! causing timeouts. (2) means that we can easily and explicitly reject messages when we're +//! overloaded and also distribute load across time. +//! +//! ## Detail +//! +//! There is a single "manager" thread who listens to two event channels. These events are either: +//! +//! - A new parcel of work (work event). +//! - Indication that a worker has finished a parcel of work (worker idle). +//! +//! Then, there is a maximum of `n` "worker" blocking threads, where `n` is the CPU count. +//! +//! Whenever the manager receives a new parcel of work, it either: +//! +//! - Provided to a newly-spawned worker tasks (if we are not already at `n` workers). +//! - Added to a queue. +//! +//! Whenever the manager receives a notification that a worker has finished a parcel of work, it +//! checks the queues to see if there are more parcels of work that can be spawned in a new worker +//! task. + +use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; +use beacon_chain::{ + attestation_verification::Error as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, + ForkChoiceError, +}; +use environment::TaskExecutor; +use eth2_libp2p::{MessageId, NetworkGlobals, PeerId}; +use slog::{crit, debug, error, trace, warn, Logger}; +use std::collections::VecDeque; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::mpsc; +use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SubnetId}; + +/// The maximum size of the channel for work events to the `GossipProcessor`. +/// +/// Setting this too low will cause consensus messages to be dropped. +const MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384; + +/// The maximum size of the channel for idle events to the `GossipProcessor`. +/// +/// Setting this too low will prevent new workers from being spawned. It *should* only need to be +/// set to the CPU count, but we set it high to be safe. +const MAX_IDLE_QUEUE_LEN: usize = 16_384; + +/// The maximum number of queued `Attestation` objects that will be stored before we start dropping +/// them. +const MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN: usize = 16_384; + +/// The maximum number of queued `SignedAggregateAndProof` objects that will be stored before we +/// start dropping them. +const MAX_AGGREGATED_ATTESTATION_QUEUE_LEN: usize = 1_024; + +/// The name of the manager tokio task. +const MANAGER_TASK_NAME: &str = "beacon_gossip_processor_manager"; +/// The name of the worker tokio tasks. +const WORKER_TASK_NAME: &str = "beacon_gossip_processor_worker"; + +/// The minimum interval between log messages indicating that a queue is full. +const LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30); + +/// A queued item from gossip, awaiting processing. +struct QueueItem { + message_id: MessageId, + peer_id: PeerId, + item: T, +} + +/// A simple last-in-first-out queue with a maximum length. +struct LifoQueue { + queue: VecDeque>, + max_length: usize, +} + +impl LifoQueue { + /// Create a new, empty queue with the given length. + pub fn new(max_length: usize) -> Self { + Self { + queue: VecDeque::default(), + max_length, + } + } + + /// Add a new item to the queue. + pub fn push(&mut self, item: QueueItem) { + if self.queue.len() == self.max_length { + self.queue.pop_back(); + } + self.queue.push_front(item); + } + + /// Remove the next item from the queue. + pub fn pop(&mut self) -> Option> { + self.queue.pop_front() + } + + /// Returns `true` if the queue is full. + pub fn is_full(&self) -> bool { + self.queue.len() >= self.max_length + } + + /// Returns the current length of the queue. + pub fn len(&self) -> usize { + self.queue.len() + } +} + +/// An event to be processed by the manager task. +#[derive(Debug, PartialEq)] +pub struct WorkEvent { + message_id: MessageId, + peer_id: PeerId, + work: Work, +} + +impl WorkEvent { + /// Create a new `Work` event for some unaggregated attestation. + pub fn unaggregated_attestation( + message_id: MessageId, + peer_id: PeerId, + attestation: Attestation, + subnet_id: SubnetId, + should_import: bool, + ) -> Self { + Self { + message_id, + peer_id, + work: Work::Attestation(Box::new((attestation, subnet_id, should_import))), + } + } + + /// Create a new `Work` event for some aggregated attestation. + pub fn aggregated_attestation( + message_id: MessageId, + peer_id: PeerId, + aggregate: SignedAggregateAndProof, + ) -> Self { + Self { + message_id, + peer_id, + work: Work::Aggregate(Box::new(aggregate)), + } + } +} + +/// A consensus message from gossip which requires processing. +#[derive(Debug, PartialEq)] +pub enum Work { + Attestation(Box<(Attestation, SubnetId, bool)>), + Aggregate(Box>), +} + +/// Provides de-bounce functionality for logging. +#[derive(Default)] +struct TimeLatch(Option); + +impl TimeLatch { + /// Only returns true once every `LOG_DEBOUNCE_INTERVAL`. + fn elapsed(&mut self) -> bool { + let now = Instant::now(); + + let is_elapsed = self.0.map_or(false, |elapse_time| now > elapse_time); + + if is_elapsed || self.0.is_none() { + self.0 = Some(now + LOG_DEBOUNCE_INTERVAL); + } + + is_elapsed + } +} + +/// A mutli-threaded processor for messages received on the network +/// that need to be processed by the `BeaconChain` +/// +/// See module level documentation for more information. +pub struct GossipProcessor { + pub beacon_chain: Arc>, + pub network_tx: mpsc::UnboundedSender>, + pub sync_tx: mpsc::UnboundedSender>, + pub network_globals: Arc>, + pub executor: TaskExecutor, + pub max_workers: usize, + pub current_workers: usize, + pub log: Logger, +} + +impl GossipProcessor { + /// Spawns the "manager" task which checks the receiver end of the returned `Sender` for + /// messages which contain some new work which will be: + /// + /// - Performed immediately, if a worker is available. + /// - Queued for later processing, if no worker is currently available. + /// + /// Only `self.max_workers` will ever be spawned at one time. Each worker is a `tokio` task + /// started with `spawn_blocking`. + pub fn spawn_manager(mut self) -> mpsc::Sender> { + let (event_tx, mut event_rx) = + mpsc::channel::>(MAX_WORK_EVENT_QUEUE_LEN); + let (idle_tx, mut idle_rx) = mpsc::channel::<()>(MAX_IDLE_QUEUE_LEN); + + let mut aggregate_queue = LifoQueue::new(MAX_AGGREGATED_ATTESTATION_QUEUE_LEN); + let mut aggregate_debounce = TimeLatch::default(); + + let mut attestation_queue = LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN); + let mut attestation_debounce = TimeLatch::default(); + + let executor = self.executor.clone(); + + // The manager future will run on the non-blocking executor and delegate tasks to worker + // threads on the blocking executor. + let manager_future = async move { + loop { + // Listen to both the event and idle channels, acting on whichever is ready + // first. + // + // Set `work_event = Some(event)` if there is new work to be done. Otherwise sets + // `event = None` if it was a worker becoming idle. + let work_event = tokio::select! { + // A worker has finished some work. + new_idle_opt = idle_rx.recv() => { + if new_idle_opt.is_some() { + metrics::inc_counter(&metrics::GOSSIP_PROCESSOR_IDLE_EVENTS_TOTAL); + self.current_workers = self.current_workers.saturating_sub(1); + None + } else { + // Exit if all idle senders have been dropped. + // + // This shouldn't happen since this function holds a sender. + crit!( + self.log, + "Gossip processor stopped"; + "msg" => "all idle senders dropped" + ); + break + } + }, + // There is a new piece of work to be handled. + new_work_event_opt = event_rx.recv() => { + if let Some(new_work_event) = new_work_event_opt { + metrics::inc_counter(&metrics::GOSSIP_PROCESSOR_WORK_EVENTS_TOTAL); + Some(new_work_event) + } else { + // Exit if all event senders have been dropped. + // + // This should happen when the client shuts down. + debug!( + self.log, + "Gossip processor stopped"; + "msg" => "all event senders dropped" + ); + break + } + } + }; + + let _event_timer = + metrics::start_timer(&metrics::GOSSIP_PROCESSOR_EVENT_HANDLING_SECONDS); + + let can_spawn = self.current_workers < self.max_workers; + + match work_event { + // There is no new work event, but we are able to spawn a new worker. + None if can_spawn => { + // Check the aggregates, *then* the unaggregates since we assume that + // aggregates are more valuable to local validators and effectively + // give us more information with less signature verification time. + if let Some(item) = aggregate_queue.pop() { + self.spawn_worker( + idle_tx.clone(), + item.message_id, + item.peer_id, + Work::Aggregate(item.item), + ); + } else if let Some(item) = attestation_queue.pop() { + self.spawn_worker( + idle_tx.clone(), + item.message_id, + item.peer_id, + Work::Attestation(item.item), + ); + } + } + // There is no new work event and we are unable to spawn a new worker. + // + // I cannot see any good reason why this would happen. + None => { + warn!( + self.log, + "Unexpected gossip processor condition"; + "msg" => "no new work and cannot spawn worker" + ); + } + // There is a new work event, but the chain is syncing. Ignore it. + Some(WorkEvent { .. }) + if self.network_globals.sync_state.read().is_syncing() => + { + metrics::inc_counter(&metrics::GOSSIP_PROCESSOR_WORK_EVENTS_IGNORED_TOTAL); + trace!( + self.log, + "Gossip processor skipping work"; + "msg" => "chain is syncing" + ); + } + // There is a new work event and the chain is not syncing. Process it. + Some(WorkEvent { + message_id, + peer_id, + work, + }) => match work { + Work::Attestation(_) if can_spawn => { + self.spawn_worker(idle_tx.clone(), message_id, peer_id, work) + } + Work::Attestation(attestation) => attestation_queue.push(QueueItem { + message_id, + peer_id, + item: attestation, + }), + Work::Aggregate(_) if can_spawn => { + self.spawn_worker(idle_tx.clone(), message_id, peer_id, work) + } + Work::Aggregate(aggregate) => aggregate_queue.push(QueueItem { + message_id, + peer_id, + item: aggregate, + }), + }, + } + + metrics::set_gauge( + &metrics::GOSSIP_PROCESSOR_WORKERS_ACTIVE_TOTAL, + self.current_workers as i64, + ); + metrics::set_gauge( + &metrics::GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_QUEUE_TOTAL, + attestation_queue.len() as i64, + ); + metrics::set_gauge( + &metrics::GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_QUEUE_TOTAL, + aggregate_queue.len() as i64, + ); + + if aggregate_queue.is_full() && aggregate_debounce.elapsed() { + error!( + self.log, + "Aggregate attestation queue full"; + "msg" => "the system has insufficient resources for load", + "queue_len" => aggregate_queue.max_length, + ) + } + + if attestation_queue.is_full() && attestation_debounce.elapsed() { + error!( + self.log, + "Attestation queue full"; + "msg" => "the system has insufficient resources for load", + "queue_len" => attestation_queue.max_length, + ) + } + } + }; + + // Spawn on the non-blocking executor. + executor.spawn(manager_future, MANAGER_TASK_NAME); + + event_tx + } + + /// Spawns a blocking worker thread to process some `Work`. + /// + /// Sends an message on `idle_tx` when the work is complete and the task is stopping. + fn spawn_worker( + &mut self, + mut idle_tx: mpsc::Sender<()>, + message_id: MessageId, + peer_id: PeerId, + work: Work, + ) { + let worker_timer = metrics::start_timer(&metrics::GOSSIP_PROCESSOR_WORKER_TIME); + metrics::inc_counter(&metrics::GOSSIP_PROCESSOR_WORKERS_SPAWNED_TOTAL); + + self.current_workers = self.current_workers.saturating_add(1); + let chain = self.beacon_chain.clone(); + let network_tx = self.network_tx.clone(); + let sync_tx = self.sync_tx.clone(); + let log = self.log.clone(); + let executor = self.executor.clone(); + + executor.spawn_blocking( + move || { + let _worker_timer = worker_timer; + + // We use this closure pattern to avoid using a `return` that prevents the + // `idle_tx` message from sending. + let handler = || { + match work { + /* + * Unaggregated attestation verification. + */ + Work::Attestation(boxed_tuple) => { + let (attestation, subnet_id, should_import) = *boxed_tuple; + + let _attestation_timer = metrics::start_timer( + &metrics::GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_WORKER_TIME, + ); + metrics::inc_counter( + &metrics::GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL, + ); + + let beacon_block_root = attestation.data.beacon_block_root; + + let attestation = match chain + .verify_unaggregated_attestation_for_gossip(attestation, subnet_id) + { + Ok(attestation) => attestation, + Err(e) => { + handle_attestation_verification_failure( + &log, + sync_tx, + peer_id.clone(), + beacon_block_root, + "unaggregated", + e, + ); + return; + } + }; + + // Indicate to the `Network` service that this message is valid and can be + // propagated on the gossip network. + propagate_gossip_message(network_tx, message_id, peer_id.clone(), &log); + + if !should_import { + return; + } + + metrics::inc_counter( + &metrics::GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL, + ); + + if let Err(e) = chain.apply_attestation_to_fork_choice(&attestation) { + match e { + BeaconChainError::ForkChoiceError( + ForkChoiceError::InvalidAttestation(e), + ) => debug!( + log, + "Attestation invalid for fork choice"; + "reason" => format!("{:?}", e), + "peer" => peer_id.to_string(), + "beacon_block_root" => format!("{:?}", beacon_block_root) + ), + e => error!( + log, + "Error applying attestation to fork choice"; + "reason" => format!("{:?}", e), + "peer" => peer_id.to_string(), + "beacon_block_root" => format!("{:?}", beacon_block_root) + ), + } + } + + if let Err(e) = chain.add_to_naive_aggregation_pool(attestation) { + debug!( + log, + "Attestation invalid for agg pool"; + "reason" => format!("{:?}", e), + "peer" => peer_id.to_string(), + "beacon_block_root" => format!("{:?}", beacon_block_root) + ) + } + } + /* + * Aggregated attestation verification. + */ + Work::Aggregate(boxed_aggregate) => { + let _attestation_timer = metrics::start_timer( + &metrics::GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_WORKER_TIME, + ); + metrics::inc_counter( + &metrics::GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL, + ); + + let beacon_block_root = + boxed_aggregate.message.aggregate.data.beacon_block_root; + + let aggregate = match chain + .verify_aggregated_attestation_for_gossip(*boxed_aggregate) + { + Ok(aggregate) => aggregate, + Err(e) => { + handle_attestation_verification_failure( + &log, + sync_tx, + peer_id.clone(), + beacon_block_root, + "aggregated", + e, + ); + return; + } + }; + + // Indicate to the `Network` service that this message is valid and can be + // propagated on the gossip network. + propagate_gossip_message(network_tx, message_id, peer_id.clone(), &log); + + metrics::inc_counter( + &metrics::GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL, + ); + + if let Err(e) = chain.apply_attestation_to_fork_choice(&aggregate) { + match e { + BeaconChainError::ForkChoiceError( + ForkChoiceError::InvalidAttestation(e), + ) => debug!( + log, + "Aggregate invalid for fork choice"; + "reason" => format!("{:?}", e), + "peer" => peer_id.to_string(), + "beacon_block_root" => format!("{:?}", beacon_block_root) + ), + e => error!( + log, + "Error applying aggregate to fork choice"; + "reason" => format!("{:?}", e), + "peer" => peer_id.to_string(), + "beacon_block_root" => format!("{:?}", beacon_block_root) + ), + } + } + + if let Err(e) = chain.add_to_block_inclusion_pool(aggregate) { + debug!( + log, + "Attestation invalid for op pool"; + "reason" => format!("{:?}", e), + "peer" => peer_id.to_string(), + "beacon_block_root" => format!("{:?}", beacon_block_root) + ) + } + } + }; + }; + handler(); + + idle_tx.try_send(()).unwrap_or_else(|e| { + crit!( + log, + "Unable to free worker"; + "msg" => "failed to send idle_tx message", + "error" => e.to_string() + ) + }); + }, + WORKER_TASK_NAME, + ); + } +} + +/// Send a message on `message_tx` that the `message_id` sent by `peer_id` should be propagated on +/// the gossip network. +/// +/// Creates a log if there is an interal error. +fn propagate_gossip_message( + network_tx: mpsc::UnboundedSender>, + message_id: MessageId, + peer_id: PeerId, + log: &Logger, +) { + network_tx + .send(NetworkMessage::Validate { + propagation_source: peer_id, + message_id, + }) + .unwrap_or_else(|_| { + warn!( + log, + "Could not send propagation request to the network service" + ) + }); +} + +/// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the +/// network. +pub fn handle_attestation_verification_failure( + log: &Logger, + sync_tx: mpsc::UnboundedSender>, + peer_id: PeerId, + beacon_block_root: Hash256, + attestation_type: &str, + error: AttnError, +) { + metrics::register_attestation_error(&error); + match &error { + AttnError::FutureEpoch { .. } + | AttnError::PastEpoch { .. } + | AttnError::FutureSlot { .. } + | AttnError::PastSlot { .. } => { + /* + * These errors can be triggered by a mismatch between our slot and the peer. + * + * + * The peer has published an invalid consensus message, _only_ if we trust our own clock. + */ + } + AttnError::InvalidSelectionProof { .. } | AttnError::InvalidSignature => { + /* + * These errors are caused by invalid signatures. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::EmptyAggregationBitfield => { + /* + * The aggregate had no signatures and is therefore worthless. + * + * Whilst we don't gossip this attestation, this act is **not** a clear + * violation of the spec nor indication of fault. + * + * This may change soon. Reference: + * + * https://github.com/ethereum/eth2.0-specs/pull/1732 + */ + } + AttnError::AggregatorPubkeyUnknown(_) => { + /* + * The aggregator index was higher than any known validator index. This is + * possible in two cases: + * + * 1. The attestation is malformed + * 2. The attestation attests to a beacon_block_root that we do not know. + * + * It should be impossible to reach (2) without triggering + * `AttnError::UnknownHeadBlock`, so we can safely assume the peer is + * faulty. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::AggregatorNotInCommittee { .. } => { + /* + * The aggregator index was higher than any known validator index. This is + * possible in two cases: + * + * 1. The attestation is malformed + * 2. The attestation attests to a beacon_block_root that we do not know. + * + * It should be impossible to reach (2) without triggering + * `AttnError::UnknownHeadBlock`, so we can safely assume the peer is + * faulty. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::AttestationAlreadyKnown { .. } => { + /* + * The aggregate attestation has already been observed on the network or in + * a block. + * + * The peer is not necessarily faulty. + */ + trace!( + log, + "Attestation already known"; + "peer_id" => peer_id.to_string(), + "block" => format!("{}", beacon_block_root), + "type" => format!("{:?}", attestation_type), + ); + return; + } + AttnError::AggregatorAlreadyKnown(_) => { + /* + * There has already been an aggregate attestation seen from this + * aggregator index. + * + * The peer is not necessarily faulty. + */ + trace!( + log, + "Aggregator already known"; + "peer_id" => peer_id.to_string(), + "block" => format!("{}", beacon_block_root), + "type" => format!("{:?}", attestation_type), + ); + return; + } + AttnError::PriorAttestationKnown { .. } => { + /* + * We have already seen an attestation from this validator for this epoch. + * + * The peer is not necessarily faulty. + */ + trace!( + log, + "Prior attestation known"; + "peer_id" => peer_id.to_string(), + "block" => format!("{}", beacon_block_root), + "type" => format!("{:?}", attestation_type), + ); + return; + } + AttnError::ValidatorIndexTooHigh(_) => { + /* + * The aggregator index (or similar field) was higher than the maximum + * possible number of validators. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::UnknownHeadBlock { beacon_block_root } => { + // Note: its a little bit unclear as to whether or not this block is unknown or + // just old. See: + // + // https://github.com/sigp/lighthouse/issues/1039 + + // TODO: Maintain this attestation and re-process once sync completes + debug!( + log, + "Attestation for unknown block"; + "peer_id" => peer_id.to_string(), + "block" => format!("{}", beacon_block_root) + ); + // we don't know the block, get the sync manager to handle the block lookup + sync_tx + .send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root)) + .unwrap_or_else(|_| { + warn!( + log, + "Failed to send to sync service"; + "msg" => "UnknownBlockHash" + ) + }); + return; + } + AttnError::UnknownTargetRoot(_) => { + /* + * The block indicated by the target root is not known to us. + * + * We should always get `AttnError::UnknwonHeadBlock` before we get this + * error, so this means we can get this error if: + * + * 1. The target root does not represent a valid block. + * 2. We do not have the target root in our DB. + * + * For (2), we should only be processing attestations when we should have + * all the available information. Note: if we do a weak-subjectivity sync + * it's possible that this situation could occur, but I think it's + * unlikely. For now, we will declare this to be an invalid message> + * + * The peer has published an invalid consensus message. + */ + } + AttnError::BadTargetEpoch => { + /* + * The aggregator index (or similar field) was higher than the maximum + * possible number of validators. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::NoCommitteeForSlotAndIndex { .. } => { + /* + * It is not possible to attest this the given committee in the given slot. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::NotExactlyOneAggregationBitSet(_) => { + /* + * The unaggregated attestation doesn't have only one signature. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::AttestsToFutureBlock { .. } => { + /* + * The beacon_block_root is from a higher slot than the attestation. + * + * The peer has published an invalid consensus message. + */ + } + + AttnError::InvalidSubnetId { received, expected } => { + /* + * The attestation was received on an incorrect subnet id. + */ + debug!( + log, + "Received attestation on incorrect subnet"; + "expected" => format!("{:?}", expected), + "received" => format!("{:?}", received), + ) + } + AttnError::Invalid(_) => { + /* + * The attestation failed the state_processing verification. + * + * The peer has published an invalid consensus message. + */ + } + AttnError::BeaconChainError(e) => { + /* + * Lighthouse hit an unexpected error whilst processing the attestation. It + * should be impossible to trigger a `BeaconChainError` from the network, + * so we have a bug. + * + * It's not clear if the message is invalid/malicious. + */ + error!( + log, + "Unable to validate aggregate"; + "peer_id" => peer_id.to_string(), + "error" => format!("{:?}", e), + ); + } + } + + debug!( + log, + "Invalid attestation from network"; + "reason" => format!("{:?}", error), + "block" => format!("{}", beacon_block_root), + "peer_id" => peer_id.to_string(), + "type" => format!("{:?}", attestation_type), + ); +} diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index e487330f8d..bcb02fbbee 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -5,6 +5,7 @@ //! syncing-related responses to the Sync manager. #![allow(clippy::unit_arg)] +pub mod gossip_processor; pub mod processor; use crate::error; @@ -215,29 +216,17 @@ impl Router { match gossip_message { // Attestations should never reach the router. PubsubMessage::AggregateAndProofAttestation(aggregate_and_proof) => { - if let Some(gossip_verified) = self - .processor - .verify_aggregated_attestation_for_gossip(peer_id.clone(), *aggregate_and_proof) - { - self.propagate_message(id, peer_id.clone()); - self.processor - .import_aggregated_attestation(peer_id, gossip_verified); - } + self.processor + .on_aggregated_attestation_gossip(id, peer_id, *aggregate_and_proof); } PubsubMessage::Attestation(subnet_attestation) => { - if let Some(gossip_verified) = - self.processor.verify_unaggregated_attestation_for_gossip( - peer_id.clone(), - subnet_attestation.1.clone(), - subnet_attestation.0, - ) - { - self.propagate_message(id, peer_id.clone()); - if should_process { - self.processor - .import_unaggregated_attestation(peer_id, gossip_verified); - } - } + self.processor.on_unaggregated_attestation_gossip( + id, + peer_id, + subnet_attestation.1.clone(), + subnet_attestation.0, + should_process, + ); } PubsubMessage::BeaconBlock(block) => { match self.processor.should_forward_block(block) { diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 9d11e78544..52f97fff81 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -1,20 +1,19 @@ +use super::gossip_processor::{GossipProcessor, WorkEvent as GossipWorkEvent}; use crate::service::NetworkMessage; use crate::sync::{PeerSyncInfo, SyncMessage}; use beacon_chain::{ - attestation_verification::{ - Error as AttnError, SignatureVerifiedAttestation, VerifiedAggregatedAttestation, - VerifiedUnaggregatedAttestation, - }, - observed_operations::ObservationOutcome, - BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, + observed_operations::ObservationOutcome, BeaconChain, BeaconChainTypes, BlockError, GossipVerifiedBlock, }; use eth2_libp2p::rpc::*; -use eth2_libp2p::{NetworkGlobals, PeerAction, PeerId, PeerRequestId, Request, Response}; +use eth2_libp2p::{ + MessageId, NetworkGlobals, PeerAction, PeerId, PeerRequestId, Request, Response, +}; use itertools::process_results; use slog::{debug, error, o, trace, warn}; use ssz::Encode; use state_processing::SigVerifiedOp; +use std::cmp; use std::sync::Arc; use tokio::sync::mpsc; use types::{ @@ -37,6 +36,8 @@ pub struct Processor { sync_send: mpsc::UnboundedSender>, /// A network context to return and handle RPC requests. network: HandlerNetworkContext, + /// A multi-threaded, non-blocking processor for consensus gossip messages. + gossip_processor_send: mpsc::Sender>, /// The `RPCHandler` logger. log: slog::Logger, } @@ -54,17 +55,30 @@ impl Processor { // spawn the sync thread let sync_send = crate::sync::manager::spawn( - executor, + executor.clone(), beacon_chain.clone(), - network_globals, + network_globals.clone(), network_send.clone(), sync_logger, ); + let gossip_processor_send = GossipProcessor { + beacon_chain: beacon_chain.clone(), + network_tx: network_send.clone(), + sync_tx: sync_send.clone(), + network_globals, + executor, + max_workers: cmp::max(1, num_cpus::get()), + current_workers: 0, + log: log.clone(), + } + .spawn_manager(); + Processor { chain: beacon_chain, sync_send, network: HandlerNetworkContext::new(network_send, log.clone()), + gossip_processor_send, log: log.clone(), } } @@ -584,347 +598,50 @@ impl Processor { true } - /// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the - /// network. - pub fn handle_attestation_verification_failure( - &mut self, - peer_id: PeerId, - beacon_block_root: Hash256, - attestation_type: &str, - error: AttnError, - ) { - debug!( - self.log, - "Invalid attestation from network"; - "reason" => format!("{:?}", error), - "block" => format!("{}", beacon_block_root), - "peer_id" => peer_id.to_string(), - "type" => format!("{:?}", attestation_type), - ); - - match error { - AttnError::FutureEpoch { .. } - | AttnError::PastEpoch { .. } - | AttnError::FutureSlot { .. } - | AttnError::PastSlot { .. } => { - /* - * These errors can be triggered by a mismatch between our slot and the peer. - * - * - * The peer has published an invalid consensus message, _only_ if we trust our own clock. - */ - } - AttnError::InvalidSelectionProof { .. } | AttnError::InvalidSignature => { - /* - * These errors are caused by invalid signatures. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::EmptyAggregationBitfield => { - /* - * The aggregate had no signatures and is therefore worthless. - * - * Whilst we don't gossip this attestation, this act is **not** a clear - * violation of the spec nor indication of fault. - * - * This may change soon. Reference: - * - * https://github.com/ethereum/eth2.0-specs/pull/1732 - */ - } - AttnError::AggregatorPubkeyUnknown(_) => { - /* - * The aggregator index was higher than any known validator index. This is - * possible in two cases: - * - * 1. The attestation is malformed - * 2. The attestation attests to a beacon_block_root that we do not know. - * - * It should be impossible to reach (2) without triggering - * `AttnError::UnknownHeadBlock`, so we can safely assume the peer is - * faulty. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::AggregatorNotInCommittee { .. } => { - /* - * The aggregator index was higher than any known validator index. This is - * possible in two cases: - * - * 1. The attestation is malformed - * 2. The attestation attests to a beacon_block_root that we do not know. - * - * It should be impossible to reach (2) without triggering - * `AttnError::UnknownHeadBlock`, so we can safely assume the peer is - * faulty. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::AttestationAlreadyKnown { .. } => { - /* - * The aggregate attestation has already been observed on the network or in - * a block. - * - * The peer is not necessarily faulty. - */ - } - AttnError::AggregatorAlreadyKnown(_) => { - /* - * There has already been an aggregate attestation seen from this - * aggregator index. - * - * The peer is not necessarily faulty. - */ - } - AttnError::PriorAttestationKnown { .. } => { - /* - * We have already seen an attestation from this validator for this epoch. - * - * The peer is not necessarily faulty. - */ - } - AttnError::ValidatorIndexTooHigh(_) => { - /* - * The aggregator index (or similar field) was higher than the maximum - * possible number of validators. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::UnknownHeadBlock { beacon_block_root } => { - // Note: its a little bit unclear as to whether or not this block is unknown or - // just old. See: - // - // https://github.com/sigp/lighthouse/issues/1039 - - // TODO: Maintain this attestation and re-process once sync completes - debug!( - self.log, - "Attestation for unknown block"; - "peer_id" => peer_id.to_string(), - "block" => format!("{}", beacon_block_root) - ); - // we don't know the block, get the sync manager to handle the block lookup - self.send_to_sync(SyncMessage::UnknownBlockHash(peer_id, beacon_block_root)); - } - AttnError::UnknownTargetRoot(_) => { - /* - * The block indicated by the target root is not known to us. - * - * We should always get `AttnError::UnknwonHeadBlock` before we get this - * error, so this means we can get this error if: - * - * 1. The target root does not represent a valid block. - * 2. We do not have the target root in our DB. - * - * For (2), we should only be processing attestations when we should have - * all the available information. Note: if we do a weak-subjectivity sync - * it's possible that this situation could occur, but I think it's - * unlikely. For now, we will declare this to be an invalid message> - * - * The peer has published an invalid consensus message. - */ - } - AttnError::BadTargetEpoch => { - /* - * The aggregator index (or similar field) was higher than the maximum - * possible number of validators. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::NoCommitteeForSlotAndIndex { .. } => { - /* - * It is not possible to attest this the given committee in the given slot. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::NotExactlyOneAggregationBitSet(_) => { - /* - * The unaggregated attestation doesn't have only one signature. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::AttestsToFutureBlock { .. } => { - /* - * The beacon_block_root is from a higher slot than the attestation. - * - * The peer has published an invalid consensus message. - */ - } - - AttnError::InvalidSubnetId { received, expected } => { - /* - * The attestation was received on an incorrect subnet id. - */ - debug!( - self.log, - "Received attestation on incorrect subnet"; - "expected" => format!("{:?}", expected), - "received" => format!("{:?}", received), - ) - } - AttnError::Invalid(_) => { - /* - * The attestation failed the state_processing verification. - * - * The peer has published an invalid consensus message. - */ - } - AttnError::BeaconChainError(e) => { - /* - * Lighthouse hit an unexpected error whilst processing the attestation. It - * should be impossible to trigger a `BeaconChainError` from the network, - * so we have a bug. - * - * It's not clear if the message is invalid/malicious. - */ - error!( - self.log, - "Unable to validate aggregate"; - "peer_id" => peer_id.to_string(), - "error" => format!("{:?}", e), - ); - } - } - } - - pub fn verify_aggregated_attestation_for_gossip( - &mut self, - peer_id: PeerId, - aggregate_and_proof: SignedAggregateAndProof, - ) -> Option> { - // This is provided to the error handling function to assist with debugging. - let beacon_block_root = aggregate_and_proof.message.aggregate.data.beacon_block_root; - - self.chain - .verify_aggregated_attestation_for_gossip(aggregate_and_proof) - .map_err(|e| { - self.handle_attestation_verification_failure( - peer_id, - beacon_block_root, - "aggregated", - e, - ) - }) - .ok() - } - - pub fn import_aggregated_attestation( - &mut self, - peer_id: PeerId, - verified_attestation: VerifiedAggregatedAttestation, - ) { - // This is provided to the error handling function to assist with debugging. - let beacon_block_root = verified_attestation.attestation().data.beacon_block_root; - - self.apply_attestation_to_fork_choice( - peer_id.clone(), - beacon_block_root, - &verified_attestation, - ); - - if let Err(e) = self.chain.add_to_block_inclusion_pool(verified_attestation) { - debug!( - self.log, - "Attestation invalid for op pool"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) - ) - } - } - - pub fn verify_unaggregated_attestation_for_gossip( + pub fn on_unaggregated_attestation_gossip( &mut self, + message_id: MessageId, peer_id: PeerId, unaggregated_attestation: Attestation, subnet_id: SubnetId, - ) -> Option> { - // This is provided to the error handling function to assist with debugging. - let beacon_block_root = unaggregated_attestation.data.beacon_block_root; - - self.chain - .verify_unaggregated_attestation_for_gossip(unaggregated_attestation, subnet_id) - .map_err(|e| { - self.handle_attestation_verification_failure( - peer_id, - beacon_block_root, - "unaggregated", - e, + should_process: bool, + ) { + self.gossip_processor_send + .try_send(GossipWorkEvent::unaggregated_attestation( + message_id, + peer_id, + unaggregated_attestation, + subnet_id, + should_process, + )) + .unwrap_or_else(|e| { + error!( + &self.log, + "Unable to send to gossip processor"; + "type" => "unaggregated attestation gossip", + "error" => e.to_string(), ) }) - .ok() } - pub fn import_unaggregated_attestation( + pub fn on_aggregated_attestation_gossip( &mut self, + message_id: MessageId, peer_id: PeerId, - verified_attestation: VerifiedUnaggregatedAttestation, + aggregate: SignedAggregateAndProof, ) { - // This is provided to the error handling function to assist with debugging. - let beacon_block_root = verified_attestation.attestation().data.beacon_block_root; - - self.apply_attestation_to_fork_choice( - peer_id.clone(), - beacon_block_root, - &verified_attestation, - ); - - if let Err(e) = self - .chain - .add_to_naive_aggregation_pool(verified_attestation) - { - debug!( - self.log, - "Attestation invalid for agg pool"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) - ) - } - } - - /// Apply the attestation to fork choice, suppressing errors. - /// - /// We suppress the errors when adding an attestation to fork choice since the spec - /// permits gossiping attestations that are invalid to be applied to fork choice. - /// - /// An attestation that is invalid for fork choice can still be included in a block. - /// - /// Reference: - /// https://github.com/ethereum/eth2.0-specs/issues/1408#issuecomment-617599260 - fn apply_attestation_to_fork_choice<'a>( - &self, - peer_id: PeerId, - beacon_block_root: Hash256, - attestation: &'a impl SignatureVerifiedAttestation, - ) { - if let Err(e) = self.chain.apply_attestation_to_fork_choice(attestation) { - match e { - BeaconChainError::ForkChoiceError(ForkChoiceError::InvalidAttestation(e)) => { - debug!( - self.log, - "Attestation invalid for fork choice"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) - ) - } - e => error!( - self.log, - "Error applying attestation to fork choice"; - "reason" => format!("{:?}", e), - "peer" => peer_id.to_string(), - "beacon_block_root" => format!("{:?}", beacon_block_root) - ), - } - } + self.gossip_processor_send + .try_send(GossipWorkEvent::aggregated_attestation( + message_id, peer_id, aggregate, + )) + .unwrap_or_else(|e| { + error!( + &self.log, + "Unable to send to gossip processor"; + "type" => "aggregated attestation gossip", + "error" => e.to_string(), + ) + }) } /// Verify a voluntary exit before gossiping or processing it.