diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 66447845dc..240bf24285 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -42,9 +42,7 @@ use crate::sync::manager::BlockProcessType; use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::parking_lot::Mutex; -use beacon_chain::{ - BeaconChain, BeaconChainTypes, ExecutedBlock, GossipVerifiedBlock, NotifyExecutionLayer, -}; +use beacon_chain::{BeaconChain, BeaconChainTypes, GossipVerifiedBlock, NotifyExecutionLayer}; use derivative::Derivative; use futures::stream::{Stream, StreamExt}; use futures::task::Poll; @@ -85,8 +83,6 @@ mod worker; use crate::beacon_processor::work_reprocessing_queue::QueuedGossipBlock; pub use worker::{ChainSegmentProcessId, GossipAggregatePackage, GossipAttestationPackage}; -use self::work_reprocessing_queue::QueuedExecutedBlock; - /// The maximum size of the channel for work events to the `BeaconProcessor`. /// /// Setting this too low will cause consensus messages to be dropped. @@ -223,7 +219,6 @@ pub const GOSSIP_ATTESTATION_BATCH: &str = "gossip_attestation_batch"; pub const GOSSIP_AGGREGATE: &str = "gossip_aggregate"; pub const GOSSIP_AGGREGATE_BATCH: &str = "gossip_aggregate_batch"; pub const GOSSIP_BLOCK: &str = "gossip_block"; -pub const EXECUTED_BLOCK: &str = "executed_block"; pub const GOSSIP_BLOCK_AND_BLOBS_SIDECAR: &str = "gossip_block_and_blobs_sidecar"; pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block"; pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit"; @@ -746,18 +741,6 @@ impl std::convert::From> for WorkEvent { seen_timestamp, }, }, - ReadyWork::ExecutedBlock(QueuedExecutedBlock { - peer_id, - block, - seen_timestamp, - }) => Self { - drop_during_sync: false, - work: Work::ExecutedBlock { - peer_id, - block, - seen_timestamp, - }, - }, ReadyWork::RpcBlock(QueuedRpcBlock { block_root, block, @@ -889,11 +872,6 @@ pub enum Work { block: Box>, seen_timestamp: Duration, }, - ExecutedBlock { - peer_id: PeerId, - block: ExecutedBlock, - seen_timestamp: Duration, - }, GossipVoluntaryExit { message_id: MessageId, peer_id: PeerId, @@ -990,7 +968,6 @@ impl Work { Work::GossipAggregate { .. } => GOSSIP_AGGREGATE, Work::GossipAggregateBatch { .. } => GOSSIP_AGGREGATE_BATCH, Work::GossipBlock { .. } => GOSSIP_BLOCK, - Work::ExecutedBlock { .. } => EXECUTED_BLOCK, Work::GossipSignedBlobSidecar { .. } => GOSSIP_BLOCK_AND_BLOBS_SIDECAR, Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK, Work::GossipVoluntaryExit { .. } => GOSSIP_VOLUNTARY_EXIT, @@ -1150,7 +1127,6 @@ impl BeaconProcessor { FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN); // Using a FIFO queue since blocks need to be imported sequentially. - let mut executed_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN); let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN); let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); @@ -1267,9 +1243,6 @@ impl BeaconProcessor { // on the delayed ones. } else if let Some(item) = delayed_block_queue.pop() { self.spawn_worker(item, toolbox); - // Check availability pending blocks - } else if let Some(item) = executed_block_queue.pop() { - self.spawn_worker(item, toolbox); // Check gossip blocks before gossip attestations, since a block might be // required to verify some attestations. } else if let Some(item) = gossip_block_queue.pop() { @@ -1489,9 +1462,6 @@ impl BeaconProcessor { Work::GossipBlock { .. } => { gossip_block_queue.push(work, work_id, &self.log) } - Work::ExecutedBlock { .. } => { - gossip_block_queue.push(work, work_id, &self.log) - } Work::GossipSignedBlobSidecar { .. } => { gossip_block_and_blobs_sidecar_queue.push(work, work_id, &self.log) } @@ -1772,20 +1742,6 @@ impl BeaconProcessor { ) .await }), - Work::ExecutedBlock { - peer_id, - block, - seen_timestamp, - } => task_spawner.spawn_async(async move { - worker - .process_execution_verified_block( - peer_id, - block, - work_reprocessing_tx, - seen_timestamp, - ) - .await - }), /* * Verification for blobs sidecars received on gossip. */ diff --git a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs index 82111fa6f1..e344a61132 100644 --- a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs +++ b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs @@ -14,15 +14,13 @@ use super::MAX_SCHEDULED_WORK_QUEUE_LEN; use crate::metrics; use crate::sync::manager::BlockProcessType; use beacon_chain::blob_verification::{AsBlock, BlockWrapper}; -use beacon_chain::{ - BeaconChainTypes, ExecutedBlock, GossipVerifiedBlock, MAXIMUM_GOSSIP_CLOCK_DISPARITY, -}; +use beacon_chain::{BeaconChainTypes, GossipVerifiedBlock, MAXIMUM_GOSSIP_CLOCK_DISPARITY}; use fnv::FnvHashMap; use futures::task::Poll; use futures::{Stream, StreamExt}; use lighthouse_network::{MessageId, PeerId}; use logging::TimeLatch; -use slog::{crit, debug, error, trace, warn, Logger}; +use slog::{debug, error, trace, warn, Logger}; use slot_clock::SlotClock; use std::collections::{HashMap, HashSet}; use std::pin::Pin; @@ -30,7 +28,6 @@ use std::task::Context; use std::time::Duration; use task_executor::TaskExecutor; use tokio::sync::mpsc::{self, Receiver, Sender}; -use tokio::time::error::Error as TimeError; use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey}; use types::{ Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof, SubnetId, @@ -55,19 +52,11 @@ pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12); /// For how long to queue rpc blocks before sending them back for reprocessing. pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(3); -/// For how long to queue executed blocks before sending them back for reprocessing. -pub const QUEUED_EXECUTED_BLOCK_DELAY: Duration = Duration::from_secs(12); - /// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that /// we signature-verify blocks before putting them in the queue *should* protect against this, but /// it's nice to have extra protection. const MAXIMUM_QUEUED_BLOCKS: usize = 16; -/// An `ExecutedBlock` contains the entire `BeaconState`, so we shouldn't be storing too many of them -/// to avoid getting DoS'd by the block proposer. -/// TODO(pawan): revise the max blocks -const MAXIMUM_QUEUED_EXECUTED_BLOCKS: usize = 4; - /// How many attestations we keep before new ones get dropped. const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384; @@ -87,9 +76,6 @@ pub enum ReprocessQueueMessage { block_root: Hash256, parent_root: Hash256, }, - ExecutedBlock(QueuedExecutedBlock), - /// The blobs corresponding to a `block_root` are now fully available. - BlobsAvailable(Hash256), /// An unaggregated attestation that references an unknown block. UnknownBlockUnaggregate(QueuedUnaggregate), /// An aggregated attestation that references an unknown block. @@ -101,7 +87,6 @@ pub enum ReprocessQueueMessage { /// Events sent by the scheduler once they are ready for re-processing. pub enum ReadyWork { GossipBlock(QueuedGossipBlock), - ExecutedBlock(QueuedExecutedBlock), RpcBlock(QueuedRpcBlock), Unaggregate(QueuedUnaggregate), Aggregate(QueuedAggregate), @@ -145,14 +130,6 @@ pub struct QueuedGossipBlock { pub seen_timestamp: Duration, } -/// A block that has been fully verified and is pending data availability -/// and import into the beacon chain. -pub struct QueuedExecutedBlock { - pub peer_id: PeerId, - pub block: ExecutedBlock, - pub seen_timestamp: Duration, -} - /// A block that arrived for processing when the same block was being imported over gossip. /// It is queued for later import. pub struct QueuedRpcBlock { @@ -169,9 +146,6 @@ pub struct QueuedRpcBlock { enum InboundEvent { /// A gossip block that was queued for later processing and is ready for import. ReadyGossipBlock(QueuedGossipBlock), - /// An executed block that was queued for blob availability and is now - /// ready for import - ReadyExecutedBlock(QueuedExecutedBlock), /// A rpc block that was queued because the same gossip block was being imported /// will now be retried for import. ReadyRpcBlock(QueuedRpcBlock), @@ -179,8 +153,6 @@ enum InboundEvent { ReadyAttestation(QueuedAttestationId), /// A light client update that is ready for re-processing. ReadyLightClientUpdate(QueuedLightClientUpdateId), - /// A `DelayQueue` returned an error. - DelayQueueError(TimeError, &'static str), /// A message sent to the `ReprocessQueue` Msg(ReprocessQueueMessage), } @@ -195,8 +167,6 @@ struct ReprocessQueue { /* Queues */ /// Queue to manage scheduled early blocks. gossip_block_delay_queue: DelayQueue>, - /// Queue to manage availability pending blocks. - executed_block_delay_queue: DelayQueue>, /// Queue to manage scheduled early blocks. rpc_block_delay_queue: DelayQueue>, /// Queue to manage scheduled attestations. @@ -207,8 +177,6 @@ struct ReprocessQueue { /* Queued items */ /// Queued blocks. queued_gossip_block_roots: HashSet, - /// Queued availability pending blocks. - queued_executed_block_roots: HashMap, /// Queued aggregated attestations. queued_aggregates: FnvHashMap, DelayKey)>, /// Queued attestations. @@ -272,17 +240,6 @@ impl Stream for ReprocessQueue { Poll::Ready(None) | Poll::Pending => (), } - match self.executed_block_delay_queue.poll_expired(cx) { - Poll::Ready(Some(queued_block)) => { - return Poll::Ready(Some(InboundEvent::ReadyExecutedBlock( - queued_block.into_inner(), - ))); - } - // `Poll::Ready(None)` means that there are no more entries in the delay queue and we - // will continue to get this result until something else is added into the queue. - Poll::Ready(None) | Poll::Pending => (), - } - match self.rpc_block_delay_queue.poll_expired(cx) { Poll::Ready(Some(queued_block)) => { return Poll::Ready(Some(InboundEvent::ReadyRpcBlock(queued_block.into_inner()))); @@ -341,12 +298,10 @@ pub fn spawn_reprocess_scheduler( work_reprocessing_rx, ready_work_tx, gossip_block_delay_queue: DelayQueue::new(), - executed_block_delay_queue: DelayQueue::new(), rpc_block_delay_queue: DelayQueue::new(), attestations_delay_queue: DelayQueue::new(), lc_updates_delay_queue: DelayQueue::new(), queued_gossip_block_roots: HashSet::new(), - queued_executed_block_roots: HashMap::new(), queued_lc_updates: FnvHashMap::default(), queued_aggregates: FnvHashMap::default(), queued_unaggregates: FnvHashMap::default(), @@ -441,59 +396,6 @@ impl ReprocessQueue { } } } - InboundEvent::Msg(ExecutedBlock(executed_block)) => { - if self.executed_block_delay_queue.len() >= MAXIMUM_QUEUED_EXECUTED_BLOCKS { - // TODO(use your own debounce) - if self.rpc_block_debounce.elapsed() { - warn!( - log, - "Executed blocks queue is full"; - "queue_size" => MAXIMUM_QUEUED_EXECUTED_BLOCKS, - "msg" => "check system clock" - ); - } - // TODO(pawan): block would essentially get dropped here - // can the devs do something? - } - // Queue the block for a slot - let block_root = executed_block.block.block_root; - if !self.queued_executed_block_roots.contains_key(&block_root) { - let key = self - .executed_block_delay_queue - .insert(executed_block, QUEUED_EXECUTED_BLOCK_DELAY); - - self.queued_executed_block_roots.insert(block_root, key); - } - } - InboundEvent::Msg(BlobsAvailable(block_root)) => { - match self.queued_executed_block_roots.remove(&block_root) { - None => { - // Log an error to alert that we've made a bad assumption about how this - // program works, but still process the block anyway. - error!( - log, - "Unknown executed block in delay queue"; - "block_root" => ?block_root - ); - } - Some(key) => { - if let Some(executed_block) = - self.executed_block_delay_queue.try_remove(&key) - { - if self - .ready_work_tx - .try_send(ReadyWork::ExecutedBlock(executed_block.into_inner())) - .is_err() - { - error!( - log, - "Failed to pop queued block"; - ); - } - } - } - } - } // A rpc block arrived for processing at the same time when a gossip block // for the same block hash is being imported. We wait for `QUEUED_RPC_BLOCK_DELAY` // and then send the rpc block back for processing assuming the gossip import @@ -747,24 +649,6 @@ impl ReprocessQueue { } } } - InboundEvent::ReadyExecutedBlock(executed_block) => { - let block_root = executed_block.block.block_root; - - if self - .queued_executed_block_roots - .remove(&block_root) - .is_none() - { - // Log an error to alert that we've made a bad assumption about how this - // program works, but still process the block anyway. - error!( - log, - "Unknown block in delay queue"; - "block_root" => ?block_root - ); - } - // TODO(pawan): just dropping the block, rethink what can be done here - } // A block that was queued for later processing is now ready to be processed. InboundEvent::ReadyGossipBlock(ready_block) => { let block_root = ready_block.block.block_root; @@ -791,14 +675,6 @@ impl ReprocessQueue { } } - InboundEvent::DelayQueueError(e, queue_name) => { - crit!( - log, - "Failed to poll queue"; - "queue" => queue_name, - "e" => ?e - ) - } InboundEvent::ReadyAttestation(queued_id) => { metrics::inc_counter( &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS, diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 27b7104d99..f60843ab9b 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -18,7 +18,6 @@ use operation_pool::ReceivedPreCapella; use slog::{crit, debug, error, info, trace, warn}; use slot_clock::SlotClock; use ssz::Encode; -use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::HotColdDBError; use tokio::sync::mpsc; @@ -31,8 +30,8 @@ use types::{ use super::{ super::work_reprocessing_queue::{ - QueuedAggregate, QueuedExecutedBlock, QueuedGossipBlock, QueuedLightClientUpdate, - QueuedUnaggregate, ReprocessQueueMessage, + QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate, + ReprocessQueueMessage, }, Worker, };