From 304fb05e444de1ba7688d6cc971f02e08d87a30b Mon Sep 17 00:00:00 2001 From: divma Date: Wed, 14 Jul 2021 05:24:08 +0000 Subject: [PATCH] Maintain attestations that reference unknown blocks (#2319) ## Issue Addressed #635 ## Proposed Changes - Keep attestations that reference a block we have not seen for 30secs before being re processed - If we do import the block before that time elapses, it is reprocessed in that moment - The first time it fails, do nothing wrt to gossipsub propagation or peer downscoring. If after being re processed it fails, downscore with a `LowToleranceError` and ignore the message. --- .../src/attestation_verification.rs | 61 ++- beacon_node/beacon_chain/src/beacon_chain.rs | 8 +- .../tests/attestation_verification.rs | 4 +- beacon_node/beacon_chain/tests/store_tests.rs | 2 +- beacon_node/beacon_chain/tests/tests.rs | 2 +- beacon_node/http_api/src/lib.rs | 8 +- .../src/beacon_processor/block_delay_queue.rs | 210 -------- .../network/src/beacon_processor/mod.rs | 216 ++++++-- .../network/src/beacon_processor/tests.rs | 380 ++++++++++++- .../work_reprocessing_queue.rs | 500 ++++++++++++++++++ .../beacon_processor/worker/gossip_methods.rs | 184 +++++-- .../src/beacon_processor/worker/mod.rs | 4 +- .../beacon_processor/worker/sync_methods.rs | 16 +- beacon_node/network/src/metrics.rs | 26 + consensus/types/src/eth_spec.rs | 2 +- 15 files changed, 1267 insertions(+), 356 deletions(-) delete mode 100644 beacon_node/network/src/beacon_processor/block_delay_queue.rs create mode 100644 beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index 552d466024..197480b6ad 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -375,7 +375,7 @@ impl VerifiedAggregatedAttestation { pub fn verify( signed_aggregate: SignedAggregateAndProof, chain: &BeaconChain, - ) -> Result { + ) -> Result)> { Self::verify_slashable(signed_aggregate, chain) .map(|verified_aggregate| { if let Some(slasher) = chain.slasher.as_ref() { @@ -383,7 +383,9 @@ impl VerifiedAggregatedAttestation { } verified_aggregate }) - .map_err(|slash_info| process_slash_info(slash_info, chain)) + .map_err(|(slash_info, original_aggregate)| { + (process_slash_info(slash_info, chain), original_aggregate) + }) } /// Run the checks that happen before an indexed attestation is constructed. @@ -509,17 +511,31 @@ impl VerifiedAggregatedAttestation { } /// Verify the attestation, producing extra information about whether it might be slashable. + // NOTE: Clippy considers the return too complex. This tuple is not used elsewhere so it is not + // worth creating an alias. + #[allow(clippy::type_complexity)] pub fn verify_slashable( signed_aggregate: SignedAggregateAndProof, chain: &BeaconChain, - ) -> Result> { + ) -> Result< + Self, + ( + AttestationSlashInfo, + SignedAggregateAndProof, + ), + > { use AttestationSlashInfo::*; let attestation = &signed_aggregate.message.aggregate; let aggregator_index = signed_aggregate.message.aggregator_index; let attestation_root = match Self::verify_early_checks(&signed_aggregate, chain) { Ok(root) => root, - Err(e) => return Err(SignatureNotChecked(signed_aggregate.message.aggregate, e)), + Err(e) => { + return Err(( + SignatureNotChecked(signed_aggregate.message.aggregate.clone(), e), + signed_aggregate, + )) + } }; let indexed_attestation = @@ -546,7 +562,12 @@ impl VerifiedAggregatedAttestation { .map_err(|e| BeaconChainError::from(e).into()) }) { Ok(indexed_attestation) => indexed_attestation, - Err(e) => return Err(SignatureNotChecked(signed_aggregate.message.aggregate, e)), + Err(e) => { + return Err(( + SignatureNotChecked(signed_aggregate.message.aggregate.clone(), e), + signed_aggregate, + )) + } }; // Ensure that all signatures are valid. @@ -560,11 +581,11 @@ impl VerifiedAggregatedAttestation { } }) { - return Err(SignatureInvalid(e)); + return Err((SignatureInvalid(e), signed_aggregate)); } if let Err(e) = Self::verify_late_checks(&signed_aggregate, attestation_root, chain) { - return Err(SignatureValid(indexed_attestation, e)); + return Err((SignatureValid(indexed_attestation, e), signed_aggregate)); } Ok(VerifiedAggregatedAttestation { @@ -715,7 +736,7 @@ impl VerifiedUnaggregatedAttestation { attestation: Attestation, subnet_id: Option, chain: &BeaconChain, - ) -> Result { + ) -> Result)> { Self::verify_slashable(attestation, subnet_id, chain) .map(|verified_unaggregated| { if let Some(slasher) = chain.slasher.as_ref() { @@ -723,26 +744,31 @@ impl VerifiedUnaggregatedAttestation { } verified_unaggregated }) - .map_err(|slash_info| process_slash_info(slash_info, chain)) + .map_err(|(slash_info, original_attestation)| { + (process_slash_info(slash_info, chain), original_attestation) + }) } /// Verify the attestation, producing extra information about whether it might be slashable. + // NOTE: Clippy considers the return too complex. This tuple is not used elsewhere so it is not + // worth creating an alias. + #[allow(clippy::type_complexity)] pub fn verify_slashable( attestation: Attestation, subnet_id: Option, chain: &BeaconChain, - ) -> Result> { + ) -> Result, Attestation)> { use AttestationSlashInfo::*; if let Err(e) = Self::verify_early_checks(&attestation, chain) { - return Err(SignatureNotChecked(attestation, e)); + return Err((SignatureNotChecked(attestation.clone(), e), attestation)); } let (indexed_attestation, committees_per_slot) = match obtain_indexed_attestation_and_committees_per_slot(chain, &attestation) { Ok(x) => x, Err(e) => { - return Err(SignatureNotChecked(attestation, e)); + return Err((SignatureNotChecked(attestation.clone(), e), attestation)); } }; @@ -754,16 +780,21 @@ impl VerifiedUnaggregatedAttestation { chain, ) { Ok(t) => t, - Err(e) => return Err(SignatureNotCheckedIndexed(indexed_attestation, e)), + Err(e) => { + return Err(( + SignatureNotCheckedIndexed(indexed_attestation, e), + attestation, + )) + } }; // The aggregate signature of the attestation is valid. if let Err(e) = verify_attestation_signature(chain, &indexed_attestation) { - return Err(SignatureInvalid(e)); + return Err((SignatureInvalid(e), attestation)); } if let Err(e) = Self::verify_late_checks(&attestation, validator_index, chain) { - return Err(SignatureValid(indexed_attestation, e)); + return Err((SignatureValid(indexed_attestation, e), attestation)); } Ok(Self { diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 227813380d..7cb23d1c0c 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1225,7 +1225,8 @@ impl BeaconChain { &self, unaggregated_attestation: Attestation, subnet_id: Option, - ) -> Result, AttestationError> { + ) -> Result, (AttestationError, Attestation)> + { metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_REQUESTS); let _timer = metrics::start_timer(&metrics::UNAGGREGATED_ATTESTATION_GOSSIP_VERIFICATION_TIMES); @@ -1249,7 +1250,10 @@ impl BeaconChain { pub fn verify_aggregated_attestation_for_gossip( &self, signed_aggregate: SignedAggregateAndProof, - ) -> Result, AttestationError> { + ) -> Result< + VerifiedAggregatedAttestation, + (AttestationError, SignedAggregateAndProof), + > { metrics::inc_counter(&metrics::AGGREGATED_ATTESTATION_PROCESSING_REQUESTS); let _timer = metrics::start_timer(&metrics::AGGREGATED_ATTESTATION_GOSSIP_VERIFICATION_TIMES); diff --git a/beacon_node/beacon_chain/tests/attestation_verification.rs b/beacon_node/beacon_chain/tests/attestation_verification.rs index 2d5b0c81fa..101823b8fa 100644 --- a/beacon_node/beacon_chain/tests/attestation_verification.rs +++ b/beacon_node/beacon_chain/tests/attestation_verification.rs @@ -223,7 +223,7 @@ fn aggregated_gossip_verification() { .expect(&format!( "{} should error during verify_aggregated_attestation_for_gossip", $desc - )), + )).0, $( $error ) |+ $( if $guard )? ), "case: {}", @@ -606,7 +606,7 @@ fn unaggregated_gossip_verification() { .expect(&format!( "{} should error during verify_unaggregated_attestation_for_gossip", $desc - )), + )).0, $( $error ) |+ $( if $guard )? ), "case: {}", diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 4d526e72b1..4b687fa04d 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -332,7 +332,7 @@ fn epoch_boundary_state_attestation_processing() { { checked_pre_fin = true; assert!(matches!( - res.err().unwrap(), + res.err().unwrap().0, AttnError::PastSlot { attestation_slot, earliest_permissible_slot, diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index 2740d566a8..dcb5dc5295 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -537,7 +537,7 @@ fn attestations_with_increasing_slots() { if expected_attestation_slot < expected_earliest_permissible_slot { assert!(matches!( - res.err().unwrap(), + res.err().unwrap().0, AttnError::PastSlot { attestation_slot, earliest_permissible_slot, diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 25dcbcf0ed..10c5668f0e 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1790,8 +1790,8 @@ pub fn serve( let mut failures = Vec::new(); // Verify that all messages in the post are valid before processing further - for (index, aggregate) in aggregates.as_slice().iter().enumerate() { - match chain.verify_aggregated_attestation_for_gossip(aggregate.clone()) { + for (index, aggregate) in aggregates.into_iter().enumerate() { + match chain.verify_aggregated_attestation_for_gossip(aggregate) { Ok(verified_aggregate) => { messages.push(PubsubMessage::AggregateAndProofAttestation(Box::new( verified_aggregate.aggregate().clone(), @@ -1816,8 +1816,8 @@ pub fn serve( // It's reasonably likely that two different validators produce // identical aggregates, especially if they're using the same beacon // node. - Err(AttnError::AttestationAlreadyKnown(_)) => continue, - Err(e) => { + Err((AttnError::AttestationAlreadyKnown(_), _)) => continue, + Err((e, aggregate)) => { error!(log, "Failure verifying aggregate and proofs"; "error" => format!("{:?}", e), diff --git a/beacon_node/network/src/beacon_processor/block_delay_queue.rs b/beacon_node/network/src/beacon_processor/block_delay_queue.rs deleted file mode 100644 index c259d95fdc..0000000000 --- a/beacon_node/network/src/beacon_processor/block_delay_queue.rs +++ /dev/null @@ -1,210 +0,0 @@ -//! Provides a mechanism which queues blocks for later processing when they arrive too early. -//! -//! When the `beacon_processor::Worker` imports a block that is acceptably early (i.e., within the -//! gossip propagation tolerance) it will send it to this queue where it will be placed in a -//! `DelayQueue` until the slot arrives. Once the block has been determined to be ready, it will be -//! sent back out on a channel to be processed by the `BeaconProcessor` again. -//! -//! There is the edge-case where the slot arrives before this queue manages to process it. In that -//! case, the block will be sent off for immediate processing (skipping the `DelayQueue`). -use super::MAX_DELAYED_BLOCK_QUEUE_LEN; -use beacon_chain::{BeaconChainTypes, GossipVerifiedBlock}; -use eth2_libp2p::PeerId; -use futures::stream::{Stream, StreamExt}; -use futures::task::Poll; -use slog::{crit, debug, error, Logger}; -use slot_clock::SlotClock; -use std::collections::HashSet; -use std::pin::Pin; -use std::task::Context; -use std::time::Duration; -use task_executor::TaskExecutor; -use tokio::sync::mpsc::{self, Receiver, Sender}; -use tokio::time::error::Error as TimeError; -use tokio_util::time::DelayQueue; - -const TASK_NAME: &str = "beacon_processor_block_delay_queue"; - -/// Queue blocks for re-processing with an `ADDITIONAL_DELAY` after the slot starts. This is to -/// account for any slight drift in the system clock. -const ADDITIONAL_DELAY: Duration = Duration::from_millis(5); - -/// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that -/// we signature-verify blocks before putting them in the queue *should* protect against this, but -/// it's nice to have extra protection. -const MAXIMUM_QUEUED_BLOCKS: usize = 16; - -/// A block that arrived early and has been queued for later import. -pub struct QueuedBlock { - pub peer_id: PeerId, - pub block: GossipVerifiedBlock, - pub seen_timestamp: Duration, -} - -/// Unifies the different messages processed by the block delay queue. -enum InboundEvent { - /// A block that has been received early that we should queue for later processing. - EarlyBlock(QueuedBlock), - /// A block that was queued for later processing and is ready for import. - ReadyBlock(QueuedBlock), - /// The `DelayQueue` returned an error. - DelayQueueError(TimeError), -} - -/// Combines the `DelayQueue` and `Receiver` streams into a single stream. -/// -/// This struct has a similar purpose to `tokio::select!`, however it allows for more fine-grained -/// control (specifically in the ordering of event processing). -struct InboundEvents { - pub delay_queue: DelayQueue>, - early_blocks_rx: Receiver>, -} - -impl Stream for InboundEvents { - type Item = InboundEvent; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // Poll for expired blocks *before* we try to process new blocks. - // - // The sequential nature of blockchains means it is generally better to try and import all - // existing blocks before new ones. - match self.delay_queue.poll_expired(cx) { - Poll::Ready(Some(Ok(queued_block))) => { - return Poll::Ready(Some(InboundEvent::ReadyBlock(queued_block.into_inner()))); - } - Poll::Ready(Some(Err(e))) => { - return Poll::Ready(Some(InboundEvent::DelayQueueError(e))); - } - // `Poll::Ready(None)` means that there are no more entries in the delay queue and we - // will continue to get this result until something else is added into the queue. - Poll::Ready(None) | Poll::Pending => (), - } - - match self.early_blocks_rx.poll_recv(cx) { - Poll::Ready(Some(queued_block)) => { - return Poll::Ready(Some(InboundEvent::EarlyBlock(queued_block))); - } - Poll::Ready(None) => { - return Poll::Ready(None); - } - Poll::Pending => {} - } - - Poll::Pending - } -} - -/// Spawn a queue which will accept blocks via the returned `Sender`, potentially queue them until -/// their slot arrives, then send them back out via `ready_blocks_tx`. -pub fn spawn_block_delay_queue( - ready_blocks_tx: Sender>, - executor: &TaskExecutor, - slot_clock: T::SlotClock, - log: Logger, -) -> Sender> { - let (early_blocks_tx, early_blocks_rx): (_, Receiver>) = - mpsc::channel(MAX_DELAYED_BLOCK_QUEUE_LEN); - - let queue_future = async move { - let mut queued_block_roots = HashSet::new(); - - let mut inbound_events = InboundEvents { - early_blocks_rx, - delay_queue: DelayQueue::new(), - }; - - loop { - match inbound_events.next().await { - // Some block has been indicated as "early" and should be processed when the - // appropriate slot arrives. - Some(InboundEvent::EarlyBlock(early_block)) => { - let block_slot = early_block.block.block.slot(); - let block_root = early_block.block.block_root; - - // Don't add the same block to the queue twice. This prevents DoS attacks. - if queued_block_roots.contains(&block_root) { - continue; - } - - if let Some(duration_till_slot) = slot_clock.duration_to_slot(block_slot) { - // Check to ensure this won't over-fill the queue. - if queued_block_roots.len() >= MAXIMUM_QUEUED_BLOCKS { - error!( - log, - "Early blocks queue is full"; - "queue_size" => MAXIMUM_QUEUED_BLOCKS, - "msg" => "check system clock" - ); - // Drop the block. - continue; - } - - queued_block_roots.insert(block_root); - // Queue the block until the start of the appropriate slot, plus - // `ADDITIONAL_DELAY`. - inbound_events - .delay_queue - .insert(early_block, duration_till_slot + ADDITIONAL_DELAY); - } else { - // If there is no duration till the next slot, check to see if the slot - // has already arrived. If it has already arrived, send it out for - // immediate processing. - // - // If we can't read the slot or the slot hasn't arrived, simply drop the - // block. - // - // This logic is slightly awkward since `SlotClock::duration_to_slot` - // doesn't distinguish between a slot that has already arrived and an - // error reading the slot clock. - if let Some(now) = slot_clock.now() { - if block_slot <= now && ready_blocks_tx.try_send(early_block).is_err() { - error!( - log, - "Failed to send block"; - ); - } - } - } - } - // A block that was queued for later processing is now ready to be processed. - Some(InboundEvent::ReadyBlock(ready_block)) => { - let block_root = ready_block.block.block_root; - - if !queued_block_roots.remove(&block_root) { - // Log an error to alert that we've made a bad assumption about how this - // program works, but still process the block anyway. - error!( - log, - "Unknown block in delay queue"; - "block_root" => ?block_root - ); - } - - if ready_blocks_tx.try_send(ready_block).is_err() { - error!( - log, - "Failed to pop queued block"; - ); - } - } - Some(InboundEvent::DelayQueueError(e)) => crit!( - log, - "Failed to poll block delay queue"; - "e" => ?e - ), - None => { - debug!( - log, - "Block delay queue stopped"; - "msg" => "shutting down" - ); - break; - } - } - } - }; - - executor.spawn(queue_future, TASK_NAME); - - early_blocks_tx -} diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 04c1724269..fb8fbc9dcd 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -5,6 +5,7 @@ //! //! - A "manager" task, which either spawns worker tasks or enqueues work. //! - One or more "worker" tasks which perform time-intensive work on the `BeaconChain`. +//! - A task managing the scheduling of work that needs to be re-processed. //! //! ## Purpose //! @@ -19,10 +20,12 @@ //! //! ## Detail //! -//! There is a single "manager" thread who listens to two event channels. These events are either: +//! There is a single "manager" thread who listens to three event channels. These events are +//! either: //! //! - A new parcel of work (work event). //! - Indication that a worker has finished a parcel of work (worker idle). +//! - A work ready for reprocessing (work event). //! //! Then, there is a maximum of `n` "worker" blocking threads, where `n` is the CPU count. //! @@ -37,7 +40,6 @@ use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, GossipVerifiedBlock}; -use block_delay_queue::{spawn_block_delay_queue, QueuedBlock}; use eth2_libp2p::{ rpc::{BlocksByRangeRequest, BlocksByRootRequest, StatusMessage}, MessageId, NetworkGlobals, PeerId, PeerRequestId, @@ -57,11 +59,14 @@ use types::{ Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, SubnetId, }; +use work_reprocessing_queue::{ + spawn_reprocess_scheduler, QueuedAggregate, QueuedBlock, QueuedUnaggregate, ReadyWork, +}; use worker::{Toolbox, Worker}; -mod block_delay_queue; mod tests; +mod work_reprocessing_queue; mod worker; pub use worker::ProcessId; @@ -77,14 +82,25 @@ pub const MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384; /// set to the CPU count, but we set it high to be safe. const MAX_IDLE_QUEUE_LEN: usize = 16_384; +/// The maximum size of the channel for re-processing work events. +const MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 16_384; + /// The maximum number of queued `Attestation` objects that will be stored before we start dropping /// them. const MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN: usize = 16_384; +/// The maximum number of queued `Attestation` objects that will be stored before we start dropping +/// them. +const MAX_UNAGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN: usize = 8_192; + /// The maximum number of queued `SignedAggregateAndProof` objects that will be stored before we /// start dropping them. const MAX_AGGREGATED_ATTESTATION_QUEUE_LEN: usize = 1_024; +/// The maximum number of queued `SignedAggregateAndProof` objects that will be stored before we +/// start dropping them. +const MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN: usize = 1_024; + /// The maximum number of queued `SignedBeaconBlock` objects received on gossip that will be stored /// before we start dropping them. const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024; @@ -127,6 +143,7 @@ const MAX_BLOCKS_BY_ROOTS_QUEUE_LEN: usize = 1_024; /// The name of the manager tokio task. const MANAGER_TASK_NAME: &str = "beacon_processor_manager"; + /// The name of the worker tokio tasks. const WORKER_TASK_NAME: &str = "beacon_processor_worker"; @@ -148,6 +165,8 @@ pub const CHAIN_SEGMENT: &str = "chain_segment"; pub const STATUS_PROCESSING: &str = "status_processing"; pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request"; pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request"; +pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation"; +pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; /// Used to send/receive results from a rpc block import in a blocking task. pub type BlockResultSender = oneshot::Sender>>; @@ -308,22 +327,6 @@ impl WorkEvent { } } - /// Create a new `Work` event for some block that was delayed for later processing. - pub fn delayed_import_beacon_block( - peer_id: PeerId, - block: Box>, - seen_timestamp: Duration, - ) -> Self { - Self { - drop_during_sync: false, - work: Work::DelayedImportBlock { - peer_id, - block, - seen_timestamp, - }, - } - } - /// Create a new `Work` event for some exit. pub fn gossip_voluntary_exit( message_id: MessageId, @@ -442,6 +445,57 @@ impl WorkEvent { } } +impl std::convert::From> for WorkEvent { + fn from(ready_work: ReadyWork) -> Self { + match ready_work { + ReadyWork::Block(QueuedBlock { + peer_id, + block, + seen_timestamp, + }) => Self { + drop_during_sync: false, + work: Work::DelayedImportBlock { + peer_id, + block: Box::new(block), + seen_timestamp, + }, + }, + ReadyWork::Unaggregate(QueuedUnaggregate { + peer_id, + message_id, + attestation, + subnet_id, + should_import, + seen_timestamp, + }) => Self { + drop_during_sync: true, + work: Work::UnknownBlockAttestation { + message_id, + peer_id, + attestation, + subnet_id, + should_import, + seen_timestamp, + }, + }, + ReadyWork::Aggregate(QueuedAggregate { + peer_id, + message_id, + attestation, + seen_timestamp, + }) => Self { + drop_during_sync: true, + work: Work::UnknownBlockAggregate { + message_id, + peer_id, + aggregate: attestation, + seen_timestamp, + }, + }, + } + } +} + /// A consensus message (or multiple) from the network that requires processing. #[derive(Debug)] pub enum Work { @@ -453,12 +507,26 @@ pub enum Work { should_import: bool, seen_timestamp: Duration, }, + UnknownBlockAttestation { + message_id: MessageId, + peer_id: PeerId, + attestation: Box>, + subnet_id: SubnetId, + should_import: bool, + seen_timestamp: Duration, + }, GossipAggregate { message_id: MessageId, peer_id: PeerId, aggregate: Box>, seen_timestamp: Duration, }, + UnknownBlockAggregate { + message_id: MessageId, + peer_id: PeerId, + aggregate: Box>, + seen_timestamp: Duration, + }, GossipBlock { message_id: MessageId, peer_id: PeerId, @@ -525,6 +593,8 @@ impl Work { Work::Status { .. } => STATUS_PROCESSING, Work::BlocksByRangeRequest { .. } => BLOCKS_BY_RANGE_REQUEST, Work::BlocksByRootsRequest { .. } => BLOCKS_BY_ROOTS_REQUEST, + Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION, + Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, } } } @@ -554,8 +624,8 @@ enum InboundEvent { WorkerIdle, /// There is new work to be done. WorkEvent(WorkEvent), - /// A block that was delayed for import at a later slot has become ready. - QueuedBlock(Box>), + /// A work event that was queued for re-processing has become ready. + ReprocessingWork(WorkEvent), } /// Combines the various incoming event streams for the `BeaconProcessor` into a single stream. @@ -567,8 +637,8 @@ struct InboundEvents { idle_rx: mpsc::Receiver<()>, /// Used by upstream processes to send new work to the `BeaconProcessor`. event_rx: mpsc::Receiver>, - /// Used internally for queuing blocks for processing once their slot arrives. - post_delay_block_queue_rx: mpsc::Receiver>, + /// Used internally for queuing work ready to be re-processed. + reprocess_work_rx: mpsc::Receiver>, } impl Stream for InboundEvents { @@ -589,9 +659,9 @@ impl Stream for InboundEvents { // Poll for delayed blocks before polling for new work. It might be the case that a delayed // block is required to successfully process some new work. - match self.post_delay_block_queue_rx.poll_recv(cx) { - Poll::Ready(Some(queued_block)) => { - return Poll::Ready(Some(InboundEvent::QueuedBlock(Box::new(queued_block)))); + match self.reprocess_work_rx.poll_recv(cx) { + Poll::Ready(Some(ready_work)) => { + return Poll::Ready(Some(InboundEvent::ReprocessingWork(ready_work.into()))); } Poll::Ready(None) => { return Poll::Ready(None); @@ -643,7 +713,7 @@ impl BeaconProcessor { pub fn spawn_manager( mut self, event_rx: mpsc::Receiver>, - work_journal_tx: Option>, + work_journal_tx: Option>, ) { // Used by workers to communicate that they are finished a task. let (idle_tx, idle_rx) = mpsc::channel::<()>(MAX_IDLE_QUEUE_LEN); @@ -655,6 +725,10 @@ impl BeaconProcessor { let mut aggregate_debounce = TimeLatch::default(); let mut attestation_queue = LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN); let mut attestation_debounce = TimeLatch::default(); + let mut unknown_block_aggregate_queue = + LifoQueue::new(MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN); + let mut unknown_block_attestation_queue = + LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN); // Using a FIFO queue for voluntary exits since it prevents exit censoring. I don't have // a strong feeling about queue type for exits. @@ -677,14 +751,13 @@ impl BeaconProcessor { let mut bbrange_queue = FifoQueue::new(MAX_BLOCKS_BY_RANGE_QUEUE_LEN); let mut bbroots_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN); - // The delayed block queues are used to re-queue blocks for processing at a later time if - // they're received early. - let (post_delay_block_queue_tx, post_delay_block_queue_rx) = - mpsc::channel(MAX_DELAYED_BLOCK_QUEUE_LEN); - let pre_delay_block_queue_tx = { + // Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to + // receive them back once they are ready (`ready_work_rx`). + let (ready_work_tx, ready_work_rx) = mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN); + let work_reprocessing_tx = { if let Some(chain) = self.beacon_chain.upgrade() { - spawn_block_delay_queue( - post_delay_block_queue_tx, + spawn_reprocess_scheduler( + ready_work_tx, &self.executor, chain.slot_clock.clone(), self.log.clone(), @@ -704,7 +777,7 @@ impl BeaconProcessor { let mut inbound_events = InboundEvents { idle_rx, event_rx, - post_delay_block_queue_rx, + reprocess_work_rx: ready_work_rx, }; loop { @@ -713,14 +786,8 @@ impl BeaconProcessor { self.current_workers = self.current_workers.saturating_sub(1); None } - Some(InboundEvent::WorkEvent(event)) => Some(event), - Some(InboundEvent::QueuedBlock(queued_block)) => { - Some(WorkEvent::delayed_import_beacon_block( - queued_block.peer_id, - Box::new(queued_block.block), - queued_block.seen_timestamp, - )) - } + Some(InboundEvent::WorkEvent(event)) + | Some(InboundEvent::ReprocessingWork(event)) => Some(event), None => { debug!( self.log, @@ -750,7 +817,7 @@ impl BeaconProcessor { // We don't care if this message was successfully sent, we only use the journal // during testing. - let _ = work_journal_tx.try_send(id.to_string()); + let _ = work_journal_tx.try_send(id); } let can_spawn = self.current_workers < self.max_workers; @@ -766,7 +833,7 @@ impl BeaconProcessor { None if can_spawn => { let toolbox = Toolbox { idle_tx: idle_tx.clone(), - delayed_block_tx: pre_delay_block_queue_tx.clone(), + work_reprocessing_tx: work_reprocessing_tx.clone(), }; // Check for chain segments first, they're the most efficient way to get @@ -792,6 +859,12 @@ impl BeaconProcessor { self.spawn_worker(item, toolbox); } else if let Some(item) = attestation_queue.pop() { self.spawn_worker(item, toolbox); + // Aggregates and unaggregates queued for re-processing are older and we + // care about fresher ones, so check those first. + } else if let Some(item) = unknown_block_aggregate_queue.pop() { + self.spawn_worker(item, toolbox); + } else if let Some(item) = unknown_block_attestation_queue.pop() { + self.spawn_worker(item, toolbox); // Check RPC methods next. Status messages are needed for sync so // prioritize them over syncing requests from other peers (BlocksByRange // and BlocksByRoot) @@ -820,7 +893,7 @@ impl BeaconProcessor { if let Some(work_journal_tx) = &work_journal_tx { // We don't care if this message was successfully sent, we only use the journal // during testing. - let _ = work_journal_tx.try_send(NOTHING_TO_DO.to_string()); + let _ = work_journal_tx.try_send(NOTHING_TO_DO); } } } @@ -857,7 +930,7 @@ impl BeaconProcessor { let work_id = work.str_id(); let toolbox = Toolbox { idle_tx: idle_tx.clone(), - delayed_block_tx: pre_delay_block_queue_tx.clone(), + work_reprocessing_tx: work_reprocessing_tx.clone(), }; match work { @@ -890,6 +963,12 @@ impl BeaconProcessor { Work::BlocksByRootsRequest { .. } => { bbroots_queue.push(work, work_id, &self.log) } + Work::UnknownBlockAttestation { .. } => { + unknown_block_attestation_queue.push(work) + } + Work::UnknownBlockAggregate { .. } => { + unknown_block_aggregate_queue.push(work) + } } } } @@ -960,7 +1039,7 @@ impl BeaconProcessor { /// Sends an message on `idle_tx` when the work is complete and the task is stopping. fn spawn_worker(&mut self, work: Work, toolbox: Toolbox) { let idle_tx = toolbox.idle_tx; - let delayed_block_tx = toolbox.delayed_block_tx; + let work_reprocessing_tx = toolbox.work_reprocessing_tx; // Wrap the `idle_tx` in a struct that will fire the idle message whenever it is dropped. // @@ -1031,6 +1110,7 @@ impl BeaconProcessor { *attestation, subnet_id, should_import, + Some(work_reprocessing_tx), seen_timestamp, ), /* @@ -1045,6 +1125,7 @@ impl BeaconProcessor { message_id, peer_id, *aggregate, + Some(work_reprocessing_tx), seen_timestamp, ), /* @@ -1059,7 +1140,7 @@ impl BeaconProcessor { message_id, peer_id, *block, - delayed_block_tx, + work_reprocessing_tx, seen_timestamp, ), /* @@ -1069,7 +1150,12 @@ impl BeaconProcessor { peer_id, block, seen_timestamp, - } => worker.process_gossip_verified_block(peer_id, *block, seen_timestamp), + } => worker.process_gossip_verified_block( + peer_id, + *block, + work_reprocessing_tx, + seen_timestamp, + ), /* * Voluntary exits received on gossip. */ @@ -1106,7 +1192,7 @@ impl BeaconProcessor { * Verification for beacon blocks received during syncing via RPC. */ Work::RpcBlock { block, result_tx } => { - worker.process_rpc_block(*block, result_tx) + worker.process_rpc_block(*block, result_tx, work_reprocessing_tx) } /* * Verification for a chain segment (multiple blocks). @@ -1134,6 +1220,34 @@ impl BeaconProcessor { request_id, request, } => worker.handle_blocks_by_root_request(peer_id, request_id, request), + Work::UnknownBlockAttestation { + message_id, + peer_id, + attestation, + subnet_id, + should_import, + seen_timestamp, + } => worker.process_gossip_attestation( + message_id, + peer_id, + *attestation, + subnet_id, + should_import, + None, // Do not allow this attestation to be re-processed beyond this point. + seen_timestamp, + ), + Work::UnknownBlockAggregate { + message_id, + peer_id, + aggregate, + seen_timestamp, + } => worker.process_gossip_aggregate( + message_id, + peer_id, + *aggregate, + None, + seen_timestamp, + ), }; trace!( diff --git a/beacon_node/network/src/beacon_processor/tests.rs b/beacon_node/network/src/beacon_processor/tests.rs index c177d1c17b..50050c5be5 100644 --- a/beacon_node/network/src/beacon_processor/tests.rs +++ b/beacon_node/network/src/beacon_processor/tests.rs @@ -1,6 +1,7 @@ #![cfg(not(debug_assertions))] // Tests are too slow in debug. #![cfg(test)] +use crate::beacon_processor::work_reprocessing_queue::QUEUED_ATTESTATION_DELAY; use crate::beacon_processor::*; use crate::{service::NetworkMessage, sync::SyncMessage}; use beacon_chain::test_utils::{ @@ -42,11 +43,13 @@ struct TestRig { chain: Arc>, next_block: SignedBeaconBlock, attestations: Vec<(Attestation, SubnetId)>, + next_block_attestations: Vec<(Attestation, SubnetId)>, + next_block_aggregate_attestations: Vec>, attester_slashing: AttesterSlashing, proposer_slashing: ProposerSlashing, voluntary_exit: SignedVoluntaryExit, beacon_processor_tx: mpsc::Sender>, - work_journal_rx: mpsc::Receiver, + work_journal_rx: mpsc::Receiver<&'static str>, _network_rx: mpsc::UnboundedReceiver>, _sync_rx: mpsc::UnboundedReceiver>, environment: Option>, @@ -90,7 +93,7 @@ impl TestRig { "precondition: current slot is one after head" ); - let (next_block, _next_state) = + let (next_block, next_state) = harness.make_block(head.beacon_state.clone(), harness.chain.slot().unwrap()); let head_state_root = head.beacon_state_root(); @@ -111,6 +114,35 @@ impl TestRig { "precondition: attestations for testing" ); + let next_block_attestations = harness + .get_unaggregated_attestations( + &AttestationStrategy::AllValidators, + &next_state, + next_block.state_root(), + next_block.canonical_root(), + next_block.slot(), + ) + .into_iter() + .flatten() + .collect::>(); + + let next_block_aggregate_attestations = harness + .make_attestations( + &harness.get_all_validators(), + &next_state, + next_block.state_root(), + next_block.canonical_root().into(), + next_block.slot(), + ) + .into_iter() + .filter_map(|(_, aggregate_opt)| aggregate_opt) + .collect::>(); + + assert!( + !next_block_attestations.is_empty(), + "precondition: attestation for next block are not empty" + ); + let attester_slashing = harness.make_attester_slashing(vec![0, 1]); let proposer_slashing = harness.make_proposer_slashing(2); let voluntary_exit = harness.make_voluntary_exit(3, harness.chain.epoch().unwrap()); @@ -174,6 +206,8 @@ impl TestRig { chain, next_block, attestations, + next_block_attestations, + next_block_aggregate_attestations, attester_slashing, proposer_slashing, voluntary_exit, @@ -185,6 +219,10 @@ impl TestRig { } } + pub fn head_root(&self) -> Hash256 { + self.chain.head().unwrap().beacon_block_root + } + pub fn enqueue_gossip_block(&self) { self.beacon_processor_tx .try_send(WorkEvent::gossip_beacon_block( @@ -196,6 +234,11 @@ impl TestRig { .unwrap(); } + pub fn enqueue_rpc_block(&self) { + let (event, _rx) = WorkEvent::rpc_beacon_block(Box::new(self.next_block.clone())); + self.beacon_processor_tx.try_send(event).unwrap(); + } + pub fn enqueue_unaggregated_attestation(&self) { let (attestation, subnet_id) = self.attestations.first().unwrap().clone(); self.beacon_processor_tx @@ -240,6 +283,36 @@ impl TestRig { .unwrap(); } + pub fn enqueue_next_block_unaggregated_attestation(&self) { + let (attestation, subnet_id) = self.next_block_attestations.first().unwrap().clone(); + self.beacon_processor_tx + .try_send(WorkEvent::unaggregated_attestation( + junk_message_id(), + junk_peer_id(), + attestation, + subnet_id, + true, + Duration::from_secs(0), + )) + .unwrap(); + } + + pub fn enqueue_next_block_aggregated_attestation(&self) { + let aggregate = self + .next_block_aggregate_attestations + .first() + .unwrap() + .clone(); + self.beacon_processor_tx + .try_send(WorkEvent::aggregated_attestation( + junk_message_id(), + junk_peer_id(), + aggregate, + Duration::from_secs(0), + )) + .unwrap(); + } + fn runtime(&mut self) -> Arc { self.environment .as_mut() @@ -265,27 +338,37 @@ impl TestRig { }) } - /// Assert that the `BeaconProcessor` event journal is as `expected`. + /// Checks that the `BeaconProcessor` event journal contains the `expected` events in the given + /// order with a matching number of `WORKER_FREED` events in between. `NOTHING_TO_DO` events + /// are ignored. /// - /// ## Note - /// - /// We won't attempt to listen for any more than `expected.len()` events. As such, it makes sense - /// to use the `NOTHING_TO_DO` event to ensure that execution has completed. - pub fn assert_event_journal(&mut self, expected: &[&str]) { - let events = self.runtime().block_on(async { - let mut events = vec![]; + /// Given the described logic, `expected` must not contain `WORKER_FREED` or `NOTHING_TO_DO` + /// events. + pub fn assert_event_journal_contains_ordered(&mut self, expected: &[&str]) { + assert!(expected + .iter() + .all(|ev| ev != &WORKER_FREED && ev != &NOTHING_TO_DO)); + + let (events, worker_freed_remaining) = self.runtime().block_on(async { + let mut events = Vec::with_capacity(expected.len()); + let mut worker_freed_remaining = expected.len(); let drain_future = async { loop { match self.work_journal_rx.recv().await { - Some(event) => { - events.push(event); - - // Break as soon as we collect the desired number of events. - if events.len() >= expected.len() { + Some(event) if event == WORKER_FREED => { + worker_freed_remaining -= 1; + if worker_freed_remaining == 0 { + // Break when all expected events are finished. break; } } + Some(event) if event == NOTHING_TO_DO => { + // Ignore these. + } + Some(event) => { + events.push(event); + } None => break, } } @@ -294,9 +377,53 @@ impl TestRig { // Drain the expected number of events from the channel, or time out and give up. tokio::select! { _ = tokio::time::sleep(STANDARD_TIMEOUT) => panic!( - "timeout ({:?}) expired waiting for events. expected {:?} but got {:?}", + "Timeout ({:?}) expired waiting for events. Expected {:?} but got {:?} waiting for {} `WORKER_FREED` events.", STANDARD_TIMEOUT, expected, + events, + worker_freed_remaining, + ), + _ = drain_future => {}, + } + + (events, worker_freed_remaining) + }); + + assert_eq!(events, expected); + assert_eq!(worker_freed_remaining, 0); + } + + pub fn assert_event_journal(&mut self, expected: &[&str]) { + self.assert_event_journal_with_timeout(expected, STANDARD_TIMEOUT); + } + + /// Assert that the `BeaconProcessor` event journal is as `expected`. + /// + /// ## Note + /// + /// We won't attempt to listen for any more than `expected.len()` events. As such, it makes sense + /// to use the `NOTHING_TO_DO` event to ensure that execution has completed. + pub fn assert_event_journal_with_timeout(&mut self, expected: &[&str], timeout: Duration) { + let events = self.runtime().block_on(async { + let mut events = Vec::with_capacity(expected.len()); + + let drain_future = async { + while let Some(event) = self.work_journal_rx.recv().await { + events.push(event); + + // Break as soon as we collect the desired number of events. + if events.len() >= expected.len() { + break; + } + } + }; + + // Drain the expected number of events from the channel, or time out and give up. + tokio::select! { + _ = tokio::time::sleep(timeout) => panic!( + "Timeout ({:?}) expired waiting for events. Expected {:?} but got {:?}", + timeout, + expected, events ), _ = drain_future => {}, @@ -305,13 +432,7 @@ impl TestRig { events }); - assert_eq!( - events, - expected - .into_iter() - .map(|s| s.to_string()) - .collect::>() - ); + assert_eq!(events, expected); } } @@ -353,18 +474,18 @@ fn import_gossip_block_acceptably_early() { // processing. // // If this causes issues we might be able to make the block delay queue add a longer delay for - // processing, instead of just MAXIMUM_GOSSIP_CLOCK_DISPARITY. Speak to @paulhauner if this test + // processing, instead of just ADDITIONAL_QUEUED_BLOCK_DELAY. Speak to @paulhauner if this test // starts failing. rig.chain.slot_clock.set_slot(rig.next_block.slot().into()); assert!( - rig.chain.head().unwrap().beacon_block_root != rig.next_block.canonical_root(), + rig.head_root() != rig.next_block.canonical_root(), "block not yet imported" ); rig.assert_event_journal(&[DELAYED_IMPORT_BLOCK, WORKER_FREED, NOTHING_TO_DO]); assert_eq!( - rig.chain.head().unwrap().beacon_block_root, + rig.head_root(), rig.next_block.canonical_root(), "block should be imported and become head" ); @@ -395,12 +516,12 @@ fn import_gossip_block_unacceptably_early() { rig.assert_event_journal(&[GOSSIP_BLOCK, WORKER_FREED, NOTHING_TO_DO]); - // Waiting for 5 seconds is a bit arbtirary, however it *should* be long enough to ensure the + // Waiting for 5 seconds is a bit arbitrary, however it *should* be long enough to ensure the // block isn't imported. rig.assert_no_events_for(Duration::from_secs(5)); assert!( - rig.chain.head().unwrap().beacon_block_root != rig.next_block.canonical_root(), + rig.head_root() != rig.next_block.canonical_root(), "block should not be imported" ); } @@ -421,7 +542,7 @@ fn import_gossip_block_at_current_slot() { rig.assert_event_journal(&[GOSSIP_BLOCK, WORKER_FREED, NOTHING_TO_DO]); assert_eq!( - rig.chain.head().unwrap().beacon_block_root, + rig.head_root(), rig.next_block.canonical_root(), "block should be imported and become head" ); @@ -445,6 +566,207 @@ fn import_gossip_attestation() { ); } +enum BlockImportMethod { + Gossip, + Rpc, +} + +/// Ensure that attestations that reference an unknown block get properly re-queued and +/// re-processed upon importing the block. +fn attestation_to_unknown_block_processed(import_method: BlockImportMethod) { + let mut rig = TestRig::new(SMALL_CHAIN); + + // Send the attestation but not the block, and check that it was not imported. + + let initial_attns = rig.chain.naive_aggregation_pool.read().num_attestations(); + + rig.enqueue_next_block_unaggregated_attestation(); + + rig.assert_event_journal(&[GOSSIP_ATTESTATION, WORKER_FREED, NOTHING_TO_DO]); + + assert_eq!( + rig.chain.naive_aggregation_pool.read().num_attestations(), + initial_attns, + "Attestation should not have been included." + ); + + // Send the block and ensure that the attestation is received back and imported. + + let block_event = match import_method { + BlockImportMethod::Gossip => { + rig.enqueue_gossip_block(); + GOSSIP_BLOCK + } + BlockImportMethod::Rpc => { + rig.enqueue_rpc_block(); + RPC_BLOCK + } + }; + + rig.assert_event_journal_contains_ordered(&[block_event, UNKNOWN_BLOCK_ATTESTATION]); + + // Run fork choice, since it isn't run when processing an RPC block. At runtime it is the + // responsibility of the sync manager to do this. + rig.chain.fork_choice().unwrap(); + + assert_eq!( + rig.head_root(), + rig.next_block.canonical_root(), + "Block should be imported and become head." + ); + + assert_eq!( + rig.chain.naive_aggregation_pool.read().num_attestations(), + initial_attns + 1, + "Attestation should have been included." + ); +} + +#[test] +fn attestation_to_unknown_block_processed_after_gossip_block() { + attestation_to_unknown_block_processed(BlockImportMethod::Gossip) +} + +#[test] +fn attestation_to_unknown_block_processed_after_rpc_block() { + attestation_to_unknown_block_processed(BlockImportMethod::Rpc) +} + +/// Ensure that attestations that reference an unknown block get properly re-queued and +/// re-processed upon importing the block. +fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod) { + let mut rig = TestRig::new(SMALL_CHAIN); + + // Empty the op pool. + rig.chain + .op_pool + .prune_attestations(u64::max_value().into()); + assert_eq!(rig.chain.op_pool.num_attestations(), 0); + + // Send the attestation but not the block, and check that it was not imported. + + let initial_attns = rig.chain.op_pool.num_attestations(); + + rig.enqueue_next_block_aggregated_attestation(); + + rig.assert_event_journal(&[GOSSIP_AGGREGATE, WORKER_FREED, NOTHING_TO_DO]); + + assert_eq!( + rig.chain.op_pool.num_attestations(), + initial_attns, + "Attestation should not have been included." + ); + + // Send the block and ensure that the attestation is received back and imported. + + let block_event = match import_method { + BlockImportMethod::Gossip => { + rig.enqueue_gossip_block(); + GOSSIP_BLOCK + } + BlockImportMethod::Rpc => { + rig.enqueue_rpc_block(); + RPC_BLOCK + } + }; + + rig.assert_event_journal_contains_ordered(&[block_event, UNKNOWN_BLOCK_AGGREGATE]); + + // Run fork choice, since it isn't run when processing an RPC block. At runtime it is the + // responsibility of the sync manager to do this. + rig.chain.fork_choice().unwrap(); + + assert_eq!( + rig.head_root(), + rig.next_block.canonical_root(), + "Block should be imported and become head." + ); + + assert_eq!( + rig.chain.op_pool.num_attestations(), + initial_attns + 1, + "Attestation should have been included." + ); +} + +#[test] +fn aggregate_attestation_to_unknown_block_processed_after_gossip_block() { + aggregate_attestation_to_unknown_block(BlockImportMethod::Gossip) +} + +#[test] +fn aggregate_attestation_to_unknown_block_processed_after_rpc_block() { + aggregate_attestation_to_unknown_block(BlockImportMethod::Rpc) +} + +/// Ensure that attestations that reference an unknown block get properly re-queued and re-processed +/// when the block is not seen. +#[test] +fn requeue_unknown_block_gossip_attestation_without_import() { + let mut rig = TestRig::new(SMALL_CHAIN); + + // Send the attestation but not the block, and check that it was not imported. + + let initial_attns = rig.chain.naive_aggregation_pool.read().num_attestations(); + + rig.enqueue_next_block_unaggregated_attestation(); + + rig.assert_event_journal(&[GOSSIP_ATTESTATION, WORKER_FREED, NOTHING_TO_DO]); + + assert_eq!( + rig.chain.naive_aggregation_pool.read().num_attestations(), + initial_attns, + "Attestation should not have been included." + ); + + // Ensure that the attestation is received back but not imported. + + rig.assert_event_journal_with_timeout( + &[UNKNOWN_BLOCK_ATTESTATION, WORKER_FREED, NOTHING_TO_DO], + Duration::from_secs(1) + QUEUED_ATTESTATION_DELAY, + ); + + assert_eq!( + rig.chain.naive_aggregation_pool.read().num_attestations(), + initial_attns, + "Attestation should not have been included." + ); +} + +/// Ensure that aggregate that reference an unknown block get properly re-queued and re-processed +/// when the block is not seen. +#[test] +fn requeue_unknown_block_gossip_aggregated_attestation_without_import() { + let mut rig = TestRig::new(SMALL_CHAIN); + + // Send the attestation but not the block, and check that it was not imported. + + let initial_attns = rig.chain.op_pool.num_attestations(); + + rig.enqueue_next_block_aggregated_attestation(); + + rig.assert_event_journal(&[GOSSIP_AGGREGATE, WORKER_FREED, NOTHING_TO_DO]); + + assert_eq!( + rig.chain.naive_aggregation_pool.read().num_attestations(), + initial_attns, + "Attestation should not have been included." + ); + + // Ensure that the attestation is received back but not imported. + + rig.assert_event_journal_with_timeout( + &[UNKNOWN_BLOCK_AGGREGATE, WORKER_FREED, NOTHING_TO_DO], + Duration::from_secs(1) + QUEUED_ATTESTATION_DELAY, + ); + + assert_eq!( + rig.chain.op_pool.num_attestations(), + initial_attns, + "Attestation should not have been included." + ); +} + /// Ensure a bunch of valid operations can be imported. #[test] fn import_misc_gossip_ops() { diff --git a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs new file mode 100644 index 0000000000..b950168052 --- /dev/null +++ b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs @@ -0,0 +1,500 @@ +//! Provides a mechanism which queues work for later processing. +//! +//! When the `beacon_processor::Worker` imports a block that is acceptably early (i.e., within the +//! gossip propagation tolerance) it will send it to this queue where it will be placed in a +//! `DelayQueue` until the slot arrives. Once the block has been determined to be ready, it will be +//! sent back out on a channel to be processed by the `BeaconProcessor` again. +//! +//! There is the edge-case where the slot arrives before this queue manages to process it. In that +//! case, the block will be sent off for immediate processing (skipping the `DelayQueue`). +//! +//! Aggregated and unaggregated attestations that failed verification due to referencing an unknown +//! block will be re-queued until their block is imported, or until they expire. +use super::MAX_SCHEDULED_WORK_QUEUE_LEN; +use crate::metrics; +use beacon_chain::{BeaconChainTypes, GossipVerifiedBlock, MAXIMUM_GOSSIP_CLOCK_DISPARITY}; +use eth2_libp2p::{MessageId, PeerId}; +use fnv::FnvHashMap; +use futures::task::Poll; +use futures::{Stream, StreamExt}; +use slog::{crit, debug, error, Logger}; +use slot_clock::SlotClock; +use std::collections::{HashMap, HashSet}; +use std::pin::Pin; +use std::task::Context; +use std::time::Duration; +use task_executor::TaskExecutor; +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::time::error::Error as TimeError; +use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey}; +use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SubnetId}; + +const TASK_NAME: &str = "beacon_processor_reprocess_queue"; +const BLOCKS: &str = "blocks"; +const ATTESTATIONS: &str = "attestations"; + +/// Queue blocks for re-processing with an `ADDITIONAL_QUEUED_BLOCK_DELAY` after the slot starts. +/// This is to account for any slight drift in the system clock. +const ADDITIONAL_QUEUED_BLOCK_DELAY: Duration = Duration::from_millis(5); + +/// For how long to queue aggregated and unaggregated attestations for re-processing. +pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12); + +/// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that +/// we signature-verify blocks before putting them in the queue *should* protect against this, but +/// it's nice to have extra protection. +const MAXIMUM_QUEUED_BLOCKS: usize = 16; + +/// How many attestations we keep before new ones get dropped. +const MAXIMUM_QUEUED_ATTESTATIONS: usize = 1_024; + +/// Messages that the scheduler can receive. +pub enum ReprocessQueueMessage { + /// A block that has been received early and we should queue for later processing. + EarlyBlock(QueuedBlock), + /// A block that was successfully processed. We use this to handle attestations for unknown + /// blocks. + BlockImported(Hash256), + /// An unaggregated attestation that references an unknown block. + UnknownBlockUnaggregate(QueuedUnaggregate), + /// An aggregated attestation that references an unknown block. + UnknownBlockAggregate(QueuedAggregate), +} + +/// Events sent by the scheduler once they are ready for re-processing. +pub enum ReadyWork { + Block(QueuedBlock), + Unaggregate(QueuedUnaggregate), + Aggregate(QueuedAggregate), +} + +/// An Attestation for which the corresponding block was not seen while processing, queued for +/// later. +pub struct QueuedUnaggregate { + pub peer_id: PeerId, + pub message_id: MessageId, + pub attestation: Box>, + pub subnet_id: SubnetId, + pub should_import: bool, + pub seen_timestamp: Duration, +} + +/// An aggregated attestation for which the corresponding block was not seen while processing, queued for +/// later. +pub struct QueuedAggregate { + pub peer_id: PeerId, + pub message_id: MessageId, + pub attestation: Box>, + pub seen_timestamp: Duration, +} + +/// A block that arrived early and has been queued for later import. +pub struct QueuedBlock { + pub peer_id: PeerId, + pub block: GossipVerifiedBlock, + pub seen_timestamp: Duration, +} + +/// Unifies the different messages processed by the block delay queue. +enum InboundEvent { + /// A block that was queued for later processing and is ready for import. + ReadyBlock(QueuedBlock), + /// An aggregated or unaggregated attestation is ready for re-processing. + ReadyAttestation(QueuedAttestationId), + /// A `DelayQueue` returned an error. + DelayQueueError(TimeError, &'static str), + /// A message sent to the `ReprocessQueue` + Msg(ReprocessQueueMessage), +} + +/// Manages scheduling works that need to be later re-processed. +struct ReprocessQueue { + /// Receiver of messages relevant to schedule works for reprocessing. + work_reprocessing_rx: Receiver>, + /// Sender of works once they become ready + ready_work_tx: Sender>, + + /* Queues */ + /// Queue to manage scheduled early blocks. + block_delay_queue: DelayQueue>, + /// Queue to manage scheduled attestations. + attestations_delay_queue: DelayQueue, + + /* Queued items */ + /// Queued blocks. + queued_block_roots: HashSet, + /// Queued aggregated attestations. + queued_aggregates: FnvHashMap, DelayKey)>, + /// Queued attestations. + queued_unaggregates: FnvHashMap, DelayKey)>, + /// Attestations (aggregated and unaggregated) per root. + awaiting_attestations_per_root: HashMap>, + + /* Aux */ + /// Next attestation id, used for both aggregated and unaggregated attestations + next_attestation: usize, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum QueuedAttestationId { + Aggregate(usize), + Unaggregate(usize), +} + +impl QueuedAggregate { + pub fn beacon_block_root(&self) -> &Hash256 { + &self.attestation.message.aggregate.data.beacon_block_root + } +} + +impl QueuedUnaggregate { + pub fn beacon_block_root(&self) -> &Hash256 { + &self.attestation.data.beacon_block_root + } +} + +impl Stream for ReprocessQueue { + type Item = InboundEvent; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // NOTE: implementing `Stream` is not necessary but allows to maintain the future selection + // order fine-grained and separate from the logic of handling each message, which is nice. + + // Poll for expired blocks *before* we try to process new blocks. + // + // The sequential nature of blockchains means it is generally better to try and import all + // existing blocks before new ones. + match self.block_delay_queue.poll_expired(cx) { + Poll::Ready(Some(Ok(queued_block))) => { + return Poll::Ready(Some(InboundEvent::ReadyBlock(queued_block.into_inner()))); + } + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "block_queue"))); + } + // `Poll::Ready(None)` means that there are no more entries in the delay queue and we + // will continue to get this result until something else is added into the queue. + Poll::Ready(None) | Poll::Pending => (), + } + + match self.attestations_delay_queue.poll_expired(cx) { + Poll::Ready(Some(Ok(attestation_id))) => { + return Poll::Ready(Some(InboundEvent::ReadyAttestation( + attestation_id.into_inner(), + ))); + } + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "attestations_queue"))); + } + // `Poll::Ready(None)` means that there are no more entries in the delay queue and we + // will continue to get this result until something else is added into the queue. + Poll::Ready(None) | Poll::Pending => (), + } + + // Last empty the messages channel. + match self.work_reprocessing_rx.poll_recv(cx) { + Poll::Ready(Some(message)) => return Poll::Ready(Some(InboundEvent::Msg(message))), + Poll::Ready(None) | Poll::Pending => {} + } + + Poll::Pending + } +} + +/// Starts the job that manages scheduling works that need re-processing. The returned `Sender` +/// gives the communicating channel to receive those works. Once a work is ready, it is sent back +/// via `ready_work_tx`. +pub fn spawn_reprocess_scheduler( + ready_work_tx: Sender>, + executor: &TaskExecutor, + slot_clock: T::SlotClock, + log: Logger, +) -> Sender> { + let (work_reprocessing_tx, work_reprocessing_rx) = mpsc::channel(MAX_SCHEDULED_WORK_QUEUE_LEN); + // Basic sanity check. + assert!(ADDITIONAL_QUEUED_BLOCK_DELAY < MAXIMUM_GOSSIP_CLOCK_DISPARITY); + + let mut queue = ReprocessQueue { + work_reprocessing_rx, + ready_work_tx, + block_delay_queue: DelayQueue::new(), + attestations_delay_queue: DelayQueue::new(), + queued_block_roots: HashSet::new(), + queued_aggregates: FnvHashMap::default(), + queued_unaggregates: FnvHashMap::default(), + awaiting_attestations_per_root: HashMap::new(), + next_attestation: 0, + }; + + executor.spawn( + async move { + while let Some(msg) = queue.next().await { + queue.handle_message(msg, &slot_clock, &log); + } + + debug!( + log, + "Re-process queue stopped"; + "msg" => "shutting down" + ); + }, + TASK_NAME, + ); + + work_reprocessing_tx +} + +impl ReprocessQueue { + fn handle_message(&mut self, msg: InboundEvent, slot_clock: &T::SlotClock, log: &Logger) { + use ReprocessQueueMessage::*; + match msg { + // Some block has been indicated as "early" and should be processed when the + // appropriate slot arrives. + InboundEvent::Msg(EarlyBlock(early_block)) => { + let block_slot = early_block.block.block.slot(); + let block_root = early_block.block.block_root; + + // Don't add the same block to the queue twice. This prevents DoS attacks. + if self.queued_block_roots.contains(&block_root) { + return; + } + + if let Some(duration_till_slot) = slot_clock.duration_to_slot(block_slot) { + // Check to ensure this won't over-fill the queue. + if self.queued_block_roots.len() >= MAXIMUM_QUEUED_BLOCKS { + error!( + log, + "Early blocks queue is full"; + "queue_size" => MAXIMUM_QUEUED_BLOCKS, + "msg" => "check system clock" + ); + // Drop the block. + return; + } + + self.queued_block_roots.insert(block_root); + // Queue the block until the start of the appropriate slot, plus + // `ADDITIONAL_QUEUED_BLOCK_DELAY`. + self.block_delay_queue.insert( + early_block, + duration_till_slot + ADDITIONAL_QUEUED_BLOCK_DELAY, + ); + } else { + // If there is no duration till the next slot, check to see if the slot + // has already arrived. If it has already arrived, send it out for + // immediate processing. + // + // If we can't read the slot or the slot hasn't arrived, simply drop the + // block. + // + // This logic is slightly awkward since `SlotClock::duration_to_slot` + // doesn't distinguish between a slot that has already arrived and an + // error reading the slot clock. + if let Some(now) = slot_clock.now() { + if block_slot <= now + && self + .ready_work_tx + .try_send(ReadyWork::Block(early_block)) + .is_err() + { + error!( + log, + "Failed to send block"; + ); + } + } + } + } + InboundEvent::Msg(UnknownBlockAggregate(queued_aggregate)) => { + if self.attestations_delay_queue.len() >= MAXIMUM_QUEUED_ATTESTATIONS { + error!( + log, + "Aggregate attestation delay queue is full"; + "queue_size" => MAXIMUM_QUEUED_ATTESTATIONS, + "msg" => "check system clock" + ); + // Drop the attestation. + return; + } + + let att_id = QueuedAttestationId::Aggregate(self.next_attestation); + + // Register the delay. + let delay_key = self + .attestations_delay_queue + .insert(att_id, QUEUED_ATTESTATION_DELAY); + + // Register this attestation for the corresponding root. + self.awaiting_attestations_per_root + .entry(*queued_aggregate.beacon_block_root()) + .or_default() + .push(att_id); + + // Store the attestation and its info. + self.queued_aggregates + .insert(self.next_attestation, (queued_aggregate, delay_key)); + + self.next_attestation += 1; + } + InboundEvent::Msg(UnknownBlockUnaggregate(queued_unaggregate)) => { + if self.attestations_delay_queue.len() >= MAXIMUM_QUEUED_ATTESTATIONS { + error!( + log, + "Attestation delay queue is full"; + "queue_size" => MAXIMUM_QUEUED_ATTESTATIONS, + "msg" => "check system clock" + ); + // Drop the attestation. + return; + } + + let att_id = QueuedAttestationId::Unaggregate(self.next_attestation); + + // Register the delay. + let delay_key = self + .attestations_delay_queue + .insert(att_id, QUEUED_ATTESTATION_DELAY); + + // Register this attestation for the corresponding root. + self.awaiting_attestations_per_root + .entry(*queued_unaggregate.beacon_block_root()) + .or_default() + .push(att_id); + + // Store the attestation and its info. + self.queued_unaggregates + .insert(self.next_attestation, (queued_unaggregate, delay_key)); + + self.next_attestation += 1; + } + InboundEvent::Msg(BlockImported(root)) => { + // Unqueue the attestations we have for this root, if any. + if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&root) { + for id in queued_ids { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_ATTESTATIONS, + ); + + if let Some((work, delay_key)) = match id { + QueuedAttestationId::Aggregate(id) => self + .queued_aggregates + .remove(&id) + .map(|(aggregate, delay_key)| { + (ReadyWork::Aggregate(aggregate), delay_key) + }), + QueuedAttestationId::Unaggregate(id) => self + .queued_unaggregates + .remove(&id) + .map(|(unaggregate, delay_key)| { + (ReadyWork::Unaggregate(unaggregate), delay_key) + }), + } { + // Remove the delay. + self.attestations_delay_queue.remove(&delay_key); + + // Send the work. + if self.ready_work_tx.try_send(work).is_err() { + error!( + log, + "Failed to send scheduled attestation"; + ); + } + } else { + // There is a mismatch between the attestation ids registered for this + // root and the queued attestations. This should never happen. + error!( + log, + "Unknown queued attestation for block root"; + "block_root" => ?root, + "att_id" => ?id, + ); + } + } + } + } + // A block that was queued for later processing is now ready to be processed. + InboundEvent::ReadyBlock(ready_block) => { + let block_root = ready_block.block.block_root; + + if !self.queued_block_roots.remove(&block_root) { + // Log an error to alert that we've made a bad assumption about how this + // program works, but still process the block anyway. + error!( + log, + "Unknown block in delay queue"; + "block_root" => ?block_root + ); + } + + if self + .ready_work_tx + .try_send(ReadyWork::Block(ready_block)) + .is_err() + { + error!( + log, + "Failed to pop queued block"; + ); + } + } + InboundEvent::DelayQueueError(e, queue_name) => { + crit!( + log, + "Failed to poll queue"; + "queue" => queue_name, + "e" => ?e + ) + } + InboundEvent::ReadyAttestation(queued_id) => { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS, + ); + + if let Some((root, work)) = match queued_id { + QueuedAttestationId::Aggregate(id) => { + self.queued_aggregates + .remove(&id) + .map(|(aggregate, _delay_key)| { + ( + *aggregate.beacon_block_root(), + ReadyWork::Aggregate(aggregate), + ) + }) + } + QueuedAttestationId::Unaggregate(id) => self + .queued_unaggregates + .remove(&id) + .map(|(unaggregate, _delay_key)| { + ( + *unaggregate.beacon_block_root(), + ReadyWork::Unaggregate(unaggregate), + ) + }), + } { + if self.ready_work_tx.try_send(work).is_err() { + error!( + log, + "Failed to send scheduled attestation"; + ); + } + + if let Some(queued_atts) = self.awaiting_attestations_per_root.get_mut(&root) { + if let Some(index) = queued_atts.iter().position(|&id| id == queued_id) { + queued_atts.swap_remove(index); + } + } + } + } + } + + metrics::set_gauge_vec( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL, + &[BLOCKS], + self.block_delay_queue.len() as i64, + ); + metrics::set_gauge_vec( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL, + &[ATTESTATIONS], + self.attestations_delay_queue.len() as i64, + ); + } +} diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index fcf63fc27c..0040a996f8 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -13,11 +13,48 @@ use ssz::Encode; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc; use types::{ - Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof, + Attestation, AttesterSlashing, EthSpec, Hash256, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, SubnetId, }; -use super::{super::block_delay_queue::QueuedBlock, Worker}; +use super::{ + super::work_reprocessing_queue::{ + QueuedAggregate, QueuedBlock, QueuedUnaggregate, ReprocessQueueMessage, + }, + Worker, +}; + +/// Data for an aggregated or unaggregated attestation that failed verification. +enum FailedAtt { + Unaggregate { + attestation: Box>, + subnet_id: SubnetId, + should_import: bool, + seen_timestamp: Duration, + }, + Aggregate { + attestation: Box>, + seen_timestamp: Duration, + }, +} + +impl FailedAtt { + pub fn root(&self) -> &Hash256 { + match self { + FailedAtt::Unaggregate { attestation, .. } => &attestation.data.beacon_block_root, + FailedAtt::Aggregate { attestation, .. } => { + &attestation.message.aggregate.data.beacon_block_root + } + } + } + + pub fn kind(&self) -> &'static str { + match self { + FailedAtt::Unaggregate { .. } => "unaggregated", + FailedAtt::Aggregate { .. } => "aggregated", + } + } +} impl Worker { /* Auxiliary functions */ @@ -59,6 +96,7 @@ impl Worker { /// - Attempt to add it to the naive aggregation pool. /// /// Raises a log if there are errors. + #[allow(clippy::too_many_arguments)] pub fn process_gossip_attestation( self, message_id: MessageId, @@ -66,6 +104,7 @@ impl Worker { attestation: Attestation, subnet_id: SubnetId, should_import: bool, + reprocess_tx: Option>>, seen_timestamp: Duration, ) { let beacon_block_root = attestation.data.beacon_block_root; @@ -75,12 +114,17 @@ impl Worker { .verify_unaggregated_attestation_for_gossip(attestation, Some(subnet_id)) { Ok(attestation) => attestation, - Err(e) => { + Err((e, attestation)) => { self.handle_attestation_verification_failure( peer_id, message_id, - beacon_block_root, - "unaggregated", + FailedAtt::Unaggregate { + attestation: Box::new(attestation), + subnet_id, + should_import, + seen_timestamp, + }, + reprocess_tx, e, ); return; @@ -153,6 +197,7 @@ impl Worker { message_id: MessageId, peer_id: PeerId, aggregate: SignedAggregateAndProof, + reprocess_tx: Option>>, seen_timestamp: Duration, ) { let beacon_block_root = aggregate.message.aggregate.data.beacon_block_root; @@ -162,13 +207,16 @@ impl Worker { .verify_aggregated_attestation_for_gossip(aggregate) { Ok(aggregate) => aggregate, - Err(e) => { + Err((e, attestation)) => { // Report the failure to gossipsub self.handle_attestation_verification_failure( peer_id, message_id, - beacon_block_root, - "aggregated", + FailedAtt::Aggregate { + attestation: Box::new(attestation), + seen_timestamp, + }, + reprocess_tx, e, ); return; @@ -238,7 +286,7 @@ impl Worker { message_id: MessageId, peer_id: PeerId, block: SignedBeaconBlock, - delayed_import_tx: mpsc::Sender>, + reprocess_tx: mpsc::Sender>, seen_duration: Duration, ) { // Log metrics to track delay from other nodes on the network. @@ -361,12 +409,12 @@ impl Worker { metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_REQUEUED_TOTAL); - if delayed_import_tx - .try_send(QueuedBlock { + if reprocess_tx + .try_send(ReprocessQueueMessage::EarlyBlock(QueuedBlock { peer_id, block: verified_block, seen_timestamp: seen_duration, - }) + })) .is_err() { error!( @@ -378,7 +426,12 @@ impl Worker { ) } } - Ok(_) => self.process_gossip_verified_block(peer_id, verified_block, seen_duration), + Ok(_) => self.process_gossip_verified_block( + peer_id, + verified_block, + reprocess_tx, + seen_duration, + ), Err(e) => { error!( self.log, @@ -399,24 +452,34 @@ impl Worker { self, peer_id: PeerId, verified_block: GossipVerifiedBlock, + reprocess_tx: mpsc::Sender>, // This value is not used presently, but it might come in handy for debugging. _seen_duration: Duration, ) { let block = Box::new(verified_block.block.clone()); match self.chain.process_block(verified_block) { - Ok(_block_root) => { + Ok(block_root) => { metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL); + if reprocess_tx + .try_send(ReprocessQueueMessage::BlockImported(block_root)) + .is_err() + { + error!( + self.log, + "Failed to inform block import"; + "source" => "gossip", + "block_root" => %block_root, + ) + }; + trace!( self.log, "Gossipsub block processed"; "peer_id" => %peer_id ); - // The `MessageHandler` would be the place to put this, however it doesn't seem - // to have a reference to the `BeaconChain`. I will leave this for future - // works. match self.chain.fork_choice() { Ok(()) => trace!( self.log, @@ -627,14 +690,16 @@ impl Worker { /// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the /// network. - pub fn handle_attestation_verification_failure( + fn handle_attestation_verification_failure( &self, peer_id: PeerId, message_id: MessageId, - beacon_block_root: Hash256, - attestation_type: &str, + failed_att: FailedAtt, + reprocess_tx: Option>>, error: AttnError, ) { + let beacon_block_root = failed_att.root(); + let attestation_type = failed_att.kind(); metrics::register_attestation_error(&error); match &error { AttnError::FutureEpoch { .. } @@ -796,30 +861,76 @@ impl Worker { self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError); } AttnError::UnknownHeadBlock { beacon_block_root } => { - // Note: its a little bit unclear as to whether or not this block is unknown or - // just old. See: - // - // https://github.com/sigp/lighthouse/issues/1039 - - // TODO: Maintain this attestation and re-process once sync completes - // TODO: We then score based on whether we can download the block and re-process. trace!( self.log, "Attestation for unknown block"; "peer_id" => %peer_id, "block" => %beacon_block_root ); - // we don't know the block, get the sync manager to handle the block lookup - self.sync_tx - .send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root)) - .unwrap_or_else(|_| { - warn!( + if let Some(sender) = reprocess_tx { + // We don't know the block, get the sync manager to handle the block lookup, and + // send the attestation to be scheduled for re-processing. + self.sync_tx + .send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root)) + .unwrap_or_else(|_| { + warn!( + self.log, + "Failed to send to sync service"; + "msg" => "UnknownBlockHash" + ) + }); + let msg = match failed_att { + FailedAtt::Aggregate { + attestation, + seen_timestamp, + } => { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_REQUEUED_TOTAL, + ); + ReprocessQueueMessage::UnknownBlockAggregate(QueuedAggregate { + peer_id, + message_id, + attestation, + seen_timestamp, + }) + } + FailedAtt::Unaggregate { + attestation, + subnet_id, + should_import, + seen_timestamp, + } => { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_REQUEUED_TOTAL, + ); + ReprocessQueueMessage::UnknownBlockUnaggregate(QueuedUnaggregate { + peer_id, + message_id, + attestation, + subnet_id, + should_import, + seen_timestamp, + }) + } + }; + + if sender.try_send(msg).is_err() { + error!( self.log, - "Failed to send to sync service"; - "msg" => "UnknownBlockHash" + "Failed to send attestation for re-processing"; ) - }); - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + } else { + // We shouldn't make any further attempts to process this attestation. + // Downscore the peer. + self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError); + self.propagate_validation_result( + message_id, + peer_id, + MessageAcceptance::Ignore, + ); + } + return; } AttnError::UnknownTargetRoot(_) => { @@ -879,7 +990,6 @@ impl Worker { self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError); } - AttnError::InvalidSubnetId { received, expected } => { /* * The attestation was received on an incorrect subnet id. diff --git a/beacon_node/network/src/beacon_processor/worker/mod.rs b/beacon_node/network/src/beacon_processor/worker/mod.rs index 1ac5a863c5..e79889274f 100644 --- a/beacon_node/network/src/beacon_processor/worker/mod.rs +++ b/beacon_node/network/src/beacon_processor/worker/mod.rs @@ -1,4 +1,4 @@ -use super::QueuedBlock; +use super::work_reprocessing_queue::ReprocessQueueMessage; use crate::{service::NetworkMessage, sync::SyncMessage}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use slog::{error, Logger}; @@ -46,5 +46,5 @@ impl Worker { /// Contains the necessary items for a worker to do their job. pub struct Toolbox { pub idle_tx: mpsc::Sender<()>, - pub delayed_block_tx: mpsc::Sender>, + pub work_reprocessing_tx: mpsc::Sender>, } diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 5db81131e1..db2b8db75e 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -1,4 +1,4 @@ -use super::Worker; +use super::{super::work_reprocessing_queue::ReprocessQueueMessage, Worker}; use crate::beacon_processor::worker::FUTURE_SLOT_TOLERANCE; use crate::beacon_processor::BlockResultSender; use crate::metrics; @@ -7,6 +7,7 @@ use crate::sync::{BatchProcessResult, ChainId}; use beacon_chain::{BeaconChainTypes, BlockError, ChainSegmentResult}; use eth2_libp2p::PeerId; use slog::{crit, debug, error, info, trace, warn}; +use tokio::sync::mpsc; use types::{Epoch, Hash256, SignedBeaconBlock}; /// Id associated to a block processing request, either a batch or a single block. @@ -27,6 +28,7 @@ impl Worker { self, block: SignedBeaconBlock, result_tx: BlockResultSender, + reprocess_tx: mpsc::Sender>, ) { let slot = block.slot(); let block_result = self.chain.process_block(block); @@ -40,6 +42,18 @@ impl Worker { "slot" => slot, "hash" => %root ); + + if reprocess_tx + .try_send(ReprocessQueueMessage::BlockImported(*root)) + .is_err() + { + error!( + self.log, + "Failed to inform block import"; + "source" => "rpc", + "block_root" => %root, + ) + }; } if result_tx.send(block_result).is_err() { diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index f04cf8d7ed..a201a97d83 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -301,6 +301,10 @@ lazy_static! { "beacon_processor_unaggregated_attestation_imported_total", "Total number of unaggregated attestations imported to fork choice, etc." ); + pub static ref BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_REQUEUED_TOTAL: Result = try_create_int_counter( + "beacon_processor_unaggregated_attestation_requeued_total", + "Total number of unaggregated attestations that referenced an unknown block and were re-queued." + ); // Aggregated attestations. pub static ref BEACON_PROCESSOR_AGGREGATED_ATTESTATION_QUEUE_TOTAL: Result = try_create_int_gauge( "beacon_processor_aggregated_attestation_queue_total", @@ -314,6 +318,10 @@ lazy_static! { "beacon_processor_aggregated_attestation_imported_total", "Total number of aggregated attestations imported to fork choice, etc." ); + pub static ref BEACON_PROCESSOR_AGGREGATED_ATTESTATION_REQUEUED_TOTAL: Result = try_create_int_counter( + "beacon_processor_aggregated_attestation_requeued_total", + "Total number of aggregated attestations that referenced an unknown block and were re-queued." + ); } lazy_static! { @@ -370,6 +378,24 @@ lazy_static! { "beacon_block_gossip_slot_start_delay_time", "Duration between when the block is received and the start of the slot it belongs to.", ); + + /* + * Attestation reprocessing queue metrics. + */ + pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL: Result = + try_create_int_gauge_vec( + "beacon_processor_reprocessing_queue_total", + "Count of items in a reprocessing queue.", + &["type"] + ); + pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS: Result = try_create_int_counter( + "beacon_processor_reprocessing_queue_expired_attestations", + "Number of queued attestations which have expired before a matching block has been found" + ); + pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_ATTESTATIONS: Result = try_create_int_counter( + "beacon_processor_reprocessing_queue_matched_attestations", + "Number of queued attestations where as matching block has been imported" + ); } pub fn register_attestation_error(error: &AttnError) { diff --git a/consensus/types/src/eth_spec.rs b/consensus/types/src/eth_spec.rs index dbf70f78e3..8db6d10706 100644 --- a/consensus/types/src/eth_spec.rs +++ b/consensus/types/src/eth_spec.rs @@ -52,7 +52,7 @@ pub trait EthSpec: 'static + Default + Sync + Send + Clone + Debug + PartialEq + /* * Misc */ - type MaxValidatorsPerCommittee: Unsigned + Clone + Sync + Send + Debug + PartialEq + Eq + Unpin; + type MaxValidatorsPerCommittee: Unsigned + Clone + Sync + Send + Debug + PartialEq + Eq; /* * Time parameters */