diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 6f75e1fb23..a08f34f707 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -66,7 +66,7 @@ use types::{ SyncCommitteeMessage, SyncSubnetId, }; use work_reprocessing_queue::{ - spawn_reprocess_scheduler, QueuedAggregate, QueuedUnaggregate, ReadyWork, + spawn_reprocess_scheduler, QueuedAggregate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork, }; use worker::{Toolbox, Worker}; @@ -75,7 +75,7 @@ mod tests; mod work_reprocessing_queue; mod worker; -use crate::beacon_processor::work_reprocessing_queue::QueuedBlock; +use crate::beacon_processor::work_reprocessing_queue::QueuedGossipBlock; pub use worker::{ ChainSegmentProcessId, FailureMode, GossipAggregatePackage, GossipAttestationPackage, }; @@ -501,6 +501,7 @@ impl WorkEvent { block, seen_timestamp, process_type, + should_process: true, }, } } @@ -565,7 +566,7 @@ impl WorkEvent { impl std::convert::From> for WorkEvent { fn from(ready_work: ReadyWork) -> Self { match ready_work { - ReadyWork::Block(QueuedBlock { + ReadyWork::Block(QueuedGossipBlock { peer_id, block, seen_timestamp, @@ -577,6 +578,20 @@ impl std::convert::From> for WorkEvent { seen_timestamp, }, }, + ReadyWork::RpcBlock(QueuedRpcBlock { + block, + seen_timestamp, + process_type, + should_process, + }) => Self { + drop_during_sync: false, + work: Work::RpcBlock { + block, + seen_timestamp, + process_type, + should_process, + }, + }, ReadyWork::Unaggregate(QueuedUnaggregate { peer_id, message_id, @@ -695,6 +710,7 @@ pub enum Work { block: Arc>, seen_timestamp: Duration, process_type: BlockProcessType, + should_process: bool, }, ChainSegment { process_id: ChainSegmentProcessId, @@ -1521,12 +1537,14 @@ impl BeaconProcessor { block, seen_timestamp, process_type, + should_process, } => task_spawner.spawn_async(worker.process_rpc_block( block, seen_timestamp, process_type, work_reprocessing_tx, duplicate_cache, + should_process, )), /* * Verification for a chain segment (multiple blocks). diff --git a/beacon_node/network/src/beacon_processor/tests.rs b/beacon_node/network/src/beacon_processor/tests.rs index a39ca2ec33..d437cf0bed 100644 --- a/beacon_node/network/src/beacon_processor/tests.rs +++ b/beacon_node/network/src/beacon_processor/tests.rs @@ -1,7 +1,9 @@ #![cfg(not(debug_assertions))] // Tests are too slow in debug. #![cfg(test)] -use crate::beacon_processor::work_reprocessing_queue::QUEUED_ATTESTATION_DELAY; +use crate::beacon_processor::work_reprocessing_queue::{ + QUEUED_ATTESTATION_DELAY, QUEUED_RPC_BLOCK_DELAY, +}; use crate::beacon_processor::*; use crate::{service::NetworkMessage, sync::SyncMessage}; use beacon_chain::test_utils::{ @@ -54,6 +56,7 @@ struct TestRig { work_journal_rx: mpsc::Receiver<&'static str>, _network_rx: mpsc::UnboundedReceiver>, _sync_rx: mpsc::UnboundedReceiver>, + duplicate_cache: DuplicateCache, _harness: BeaconChainHarness, } @@ -185,6 +188,7 @@ impl TestRig { let (work_journal_tx, work_journal_rx) = mpsc::channel(16_364); + let duplicate_cache = DuplicateCache::default(); BeaconProcessor { beacon_chain: Arc::downgrade(&chain), network_tx, @@ -193,7 +197,7 @@ impl TestRig { executor, max_workers: cmp::max(1, num_cpus::get()), current_workers: 0, - importing_blocks: Default::default(), + importing_blocks: duplicate_cache.clone(), log: log.clone(), } .spawn_manager(beacon_processor_rx, Some(work_journal_tx)); @@ -211,6 +215,7 @@ impl TestRig { work_journal_rx, _network_rx, _sync_rx, + duplicate_cache, _harness: harness, } } @@ -246,6 +251,15 @@ impl TestRig { self.beacon_processor_tx.try_send(event).unwrap(); } + pub fn enqueue_single_lookup_rpc_block(&self) { + let event = WorkEvent::rpc_beacon_block( + self.next_block.clone(), + std::time::Duration::default(), + BlockProcessType::SingleBlock { id: 1 }, + ); + self.beacon_processor_tx.try_send(event).unwrap(); + } + pub fn enqueue_unaggregated_attestation(&self) { let (attestation, subnet_id) = self.attestations.first().unwrap().clone(); self.beacon_processor_tx @@ -828,3 +842,33 @@ async fn import_misc_gossip_ops() { "op pool should have one more exit" ); } + +/// Ensure that rpc block going to the reprocessing queue flow +/// works when the duplicate cache handle is held by another task. +#[tokio::test] +async fn test_rpc_block_reprocessing() { + let mut rig = TestRig::new(SMALL_CHAIN).await; + let next_block_root = rig.next_block.canonical_root(); + // Insert the next block into the duplicate cache manually + let handle = rig.duplicate_cache.check_and_insert(next_block_root); + rig.enqueue_single_lookup_rpc_block(); + + rig.assert_event_journal(&[RPC_BLOCK, WORKER_FREED, NOTHING_TO_DO]) + .await; + // next_block shouldn't be processed since it couldn't get the + // duplicate cache handle + assert_ne!(next_block_root, rig.head_root()); + + drop(handle); + + // The block should arrive at the beacon processor again after + // the specified delay. + tokio::time::sleep(QUEUED_RPC_BLOCK_DELAY).await; + + rig.assert_event_journal(&[RPC_BLOCK]).await; + // Add an extra delay for block processing + tokio::time::sleep(Duration::from_millis(10)).await; + // head should update to next block now since the duplicate + // cache handle was dropped. + assert_eq!(next_block_root, rig.head_root()); +} diff --git a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs index 33c15cf06b..efe8d3bf12 100644 --- a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs +++ b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs @@ -12,6 +12,7 @@ //! block will be re-queued until their block is imported, or until they expire. use super::MAX_SCHEDULED_WORK_QUEUE_LEN; use crate::metrics; +use crate::sync::manager::BlockProcessType; use beacon_chain::{BeaconChainTypes, GossipVerifiedBlock, MAXIMUM_GOSSIP_CLOCK_DISPARITY}; use fnv::FnvHashMap; use futures::task::Poll; @@ -22,16 +23,18 @@ use slog::{crit, debug, error, warn, Logger}; use slot_clock::SlotClock; use std::collections::{HashMap, HashSet}; use std::pin::Pin; +use std::sync::Arc; use std::task::Context; use std::time::Duration; use task_executor::TaskExecutor; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::time::error::Error as TimeError; use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey}; -use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SubnetId}; +use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SubnetId}; const TASK_NAME: &str = "beacon_processor_reprocess_queue"; -const BLOCKS: &str = "blocks"; +const GOSSIP_BLOCKS: &str = "gossip_blocks"; +const RPC_BLOCKS: &str = "rpc_blocks"; const ATTESTATIONS: &str = "attestations"; /// Queue blocks for re-processing with an `ADDITIONAL_QUEUED_BLOCK_DELAY` after the slot starts. @@ -41,6 +44,9 @@ const ADDITIONAL_QUEUED_BLOCK_DELAY: Duration = Duration::from_millis(5); /// For how long to queue aggregated and unaggregated attestations for re-processing. pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12); +/// For how long to queue rpc blocks before sending them back for reprocessing. +pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(3); + /// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that /// we signature-verify blocks before putting them in the queue *should* protect against this, but /// it's nice to have extra protection. @@ -52,7 +58,10 @@ const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384; /// Messages that the scheduler can receive. pub enum ReprocessQueueMessage { /// A block that has been received early and we should queue for later processing. - EarlyBlock(QueuedBlock), + EarlyBlock(QueuedGossipBlock), + /// A gossip block for hash `X` is being imported, we should queue the rpc block for the same + /// hash until the gossip block is imported. + RpcBlock(QueuedRpcBlock), /// A block that was successfully processed. We use this to handle attestations for unknown /// blocks. BlockImported(Hash256), @@ -64,7 +73,8 @@ pub enum ReprocessQueueMessage { /// Events sent by the scheduler once they are ready for re-processing. pub enum ReadyWork { - Block(QueuedBlock), + Block(QueuedGossipBlock), + RpcBlock(QueuedRpcBlock), Unaggregate(QueuedUnaggregate), Aggregate(QueuedAggregate), } @@ -90,16 +100,30 @@ pub struct QueuedAggregate { } /// A block that arrived early and has been queued for later import. -pub struct QueuedBlock { +pub struct QueuedGossipBlock { pub peer_id: PeerId, pub block: Box>, pub seen_timestamp: Duration, } +/// A block that arrived for processing when the same block was being imported over gossip. +/// It is queued for later import. +pub struct QueuedRpcBlock { + pub block: Arc>, + pub process_type: BlockProcessType, + pub seen_timestamp: Duration, + /// Indicates if the beacon chain should process this block or not. + /// We use this to ignore block processing when rpc block queues are full. + pub should_process: bool, +} + /// Unifies the different messages processed by the block delay queue. enum InboundEvent { - /// A block that was queued for later processing and is ready for import. - ReadyBlock(QueuedBlock), + /// A gossip block that was queued for later processing and is ready for import. + ReadyGossipBlock(QueuedGossipBlock), + /// A rpc block that was queued because the same gossip block was being imported + /// will now be retried for import. + ReadyRpcBlock(QueuedRpcBlock), /// An aggregated or unaggregated attestation is ready for re-processing. ReadyAttestation(QueuedAttestationId), /// A `DelayQueue` returned an error. @@ -117,13 +141,15 @@ struct ReprocessQueue { /* Queues */ /// Queue to manage scheduled early blocks. - block_delay_queue: DelayQueue>, + gossip_block_delay_queue: DelayQueue>, + /// Queue to manage scheduled early blocks. + rpc_block_delay_queue: DelayQueue>, /// Queue to manage scheduled attestations. attestations_delay_queue: DelayQueue, /* Queued items */ /// Queued blocks. - queued_block_roots: HashSet, + queued_gossip_block_roots: HashSet, /// Queued aggregated attestations. queued_aggregates: FnvHashMap, DelayKey)>, /// Queued attestations. @@ -135,6 +161,7 @@ struct ReprocessQueue { /// Next attestation id, used for both aggregated and unaggregated attestations next_attestation: usize, early_block_debounce: TimeLatch, + rpc_block_debounce: TimeLatch, attestation_delay_debounce: TimeLatch, } @@ -167,12 +194,26 @@ impl Stream for ReprocessQueue { // // The sequential nature of blockchains means it is generally better to try and import all // existing blocks before new ones. - match self.block_delay_queue.poll_expired(cx) { + match self.gossip_block_delay_queue.poll_expired(cx) { Poll::Ready(Some(Ok(queued_block))) => { - return Poll::Ready(Some(InboundEvent::ReadyBlock(queued_block.into_inner()))); + return Poll::Ready(Some(InboundEvent::ReadyGossipBlock( + queued_block.into_inner(), + ))); } Poll::Ready(Some(Err(e))) => { - return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "block_queue"))); + return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "gossip_block_queue"))); + } + // `Poll::Ready(None)` means that there are no more entries in the delay queue and we + // will continue to get this result until something else is added into the queue. + Poll::Ready(None) | Poll::Pending => (), + } + + match self.rpc_block_delay_queue.poll_expired(cx) { + Poll::Ready(Some(Ok(queued_block))) => { + return Poll::Ready(Some(InboundEvent::ReadyRpcBlock(queued_block.into_inner()))); + } + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "rpc_block_queue"))); } // `Poll::Ready(None)` means that there are no more entries in the delay queue and we // will continue to get this result until something else is added into the queue. @@ -219,14 +260,16 @@ pub fn spawn_reprocess_scheduler( let mut queue = ReprocessQueue { work_reprocessing_rx, ready_work_tx, - block_delay_queue: DelayQueue::new(), + gossip_block_delay_queue: DelayQueue::new(), + rpc_block_delay_queue: DelayQueue::new(), attestations_delay_queue: DelayQueue::new(), - queued_block_roots: HashSet::new(), + queued_gossip_block_roots: HashSet::new(), queued_aggregates: FnvHashMap::default(), queued_unaggregates: FnvHashMap::default(), awaiting_attestations_per_root: HashMap::new(), next_attestation: 0, early_block_debounce: TimeLatch::default(), + rpc_block_debounce: TimeLatch::default(), attestation_delay_debounce: TimeLatch::default(), }; @@ -259,13 +302,13 @@ impl ReprocessQueue { let block_root = early_block.block.block_root; // Don't add the same block to the queue twice. This prevents DoS attacks. - if self.queued_block_roots.contains(&block_root) { + if self.queued_gossip_block_roots.contains(&block_root) { return; } if let Some(duration_till_slot) = slot_clock.duration_to_slot(block_slot) { // Check to ensure this won't over-fill the queue. - if self.queued_block_roots.len() >= MAXIMUM_QUEUED_BLOCKS { + if self.queued_gossip_block_roots.len() >= MAXIMUM_QUEUED_BLOCKS { if self.early_block_debounce.elapsed() { warn!( log, @@ -278,10 +321,10 @@ impl ReprocessQueue { return; } - self.queued_block_roots.insert(block_root); + self.queued_gossip_block_roots.insert(block_root); // Queue the block until the start of the appropriate slot, plus // `ADDITIONAL_QUEUED_BLOCK_DELAY`. - self.block_delay_queue.insert( + self.gossip_block_delay_queue.insert( early_block, duration_till_slot + ADDITIONAL_QUEUED_BLOCK_DELAY, ); @@ -311,6 +354,58 @@ impl ReprocessQueue { } } } + // A rpc block arrived for processing at the same time when a gossip block + // for the same block hash is being imported. We wait for `QUEUED_RPC_BLOCK_DELAY` + // and then send the rpc block back for processing assuming the gossip import + // has completed by then. + InboundEvent::Msg(RpcBlock(mut rpc_block)) => { + // Check to ensure this won't over-fill the queue. + if self.rpc_block_delay_queue.len() >= MAXIMUM_QUEUED_BLOCKS { + if self.rpc_block_debounce.elapsed() { + warn!( + log, + "RPC blocks queue is full"; + "queue_size" => MAXIMUM_QUEUED_BLOCKS, + "msg" => "check system clock" + ); + } + // Return the block to the beacon processor signalling to + // ignore processing for this block + rpc_block.should_process = false; + if self + .ready_work_tx + .try_send(ReadyWork::RpcBlock(rpc_block)) + .is_err() + { + error!( + log, + "Failed to send rpc block to beacon processor"; + ); + } + return; + } + + // Queue the block for 1/4th of a slot + self.rpc_block_delay_queue + .insert(rpc_block, QUEUED_RPC_BLOCK_DELAY); + } + InboundEvent::ReadyRpcBlock(queued_rpc_block) => { + debug!( + log, + "Sending rpc block for reprocessing"; + "block_root" => %queued_rpc_block.block.canonical_root() + ); + if self + .ready_work_tx + .try_send(ReadyWork::RpcBlock(queued_rpc_block)) + .is_err() + { + error!( + log, + "Failed to send rpc block to beacon processor"; + ); + } + } InboundEvent::Msg(UnknownBlockAggregate(queued_aggregate)) => { if self.attestations_delay_queue.len() >= MAXIMUM_QUEUED_ATTESTATIONS { if self.attestation_delay_debounce.elapsed() { @@ -423,10 +518,10 @@ impl ReprocessQueue { } } // A block that was queued for later processing is now ready to be processed. - InboundEvent::ReadyBlock(ready_block) => { + InboundEvent::ReadyGossipBlock(ready_block) => { let block_root = ready_block.block.block_root; - if !self.queued_block_roots.remove(&block_root) { + if !self.queued_gossip_block_roots.remove(&block_root) { // Log an error to alert that we've made a bad assumption about how this // program works, but still process the block anyway. error!( @@ -499,8 +594,13 @@ impl ReprocessQueue { metrics::set_gauge_vec( &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL, - &[BLOCKS], - self.block_delay_queue.len() as i64, + &[GOSSIP_BLOCKS], + self.gossip_block_delay_queue.len() as i64, + ); + metrics::set_gauge_vec( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL, + &[RPC_BLOCKS], + self.rpc_block_delay_queue.len() as i64, ); metrics::set_gauge_vec( &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL, diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 56f38c7f22..2dc02a31b3 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -25,7 +25,7 @@ use types::{ use super::{ super::work_reprocessing_queue::{ - QueuedAggregate, QueuedBlock, QueuedUnaggregate, ReprocessQueueMessage, + QueuedAggregate, QueuedGossipBlock, QueuedUnaggregate, ReprocessQueueMessage, }, Worker, }; @@ -857,7 +857,7 @@ impl Worker { metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_REQUEUED_TOTAL); if reprocess_tx - .try_send(ReprocessQueueMessage::EarlyBlock(QueuedBlock { + .try_send(ReprocessQueueMessage::EarlyBlock(QueuedGossipBlock { peer_id, block: Box::new(verified_block), seen_timestamp: seen_duration, diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 804cfbe463..84e3c95c69 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -1,6 +1,7 @@ use std::time::Duration; use super::{super::work_reprocessing_queue::ReprocessQueueMessage, Worker}; +use crate::beacon_processor::work_reprocessing_queue::QueuedRpcBlock; use crate::beacon_processor::worker::FUTURE_SLOT_TOLERANCE; use crate::beacon_processor::DuplicateCache; use crate::metrics; @@ -53,16 +54,37 @@ impl Worker { process_type: BlockProcessType, reprocess_tx: mpsc::Sender>, duplicate_cache: DuplicateCache, + should_process: bool, ) { + if !should_process { + // Sync handles these results + self.send_sync_message(SyncMessage::BlockProcessed { + process_type, + result: crate::sync::manager::BlockProcessResult::Ignored, + }); + return; + } // Check if the block is already being imported through another source let handle = match duplicate_cache.check_and_insert(block.canonical_root()) { Some(handle) => handle, None => { - // Sync handles these results - self.send_sync_message(SyncMessage::BlockProcessed { + debug!( + self.log, + "Gossip block is being processed"; + "action" => "sending rpc block to reprocessing queue", + "block_root" => %block.canonical_root(), + ); + // Send message to work reprocess queue to retry the block + let reprocess_msg = ReprocessQueueMessage::RpcBlock(QueuedRpcBlock { + block: block.clone(), process_type, - result: Err(BlockError::BlockIsAlreadyKnown), + seen_timestamp, + should_process: true, }); + + if reprocess_tx.try_send(reprocess_msg).is_err() { + error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %block.canonical_root()) + }; return; } }; @@ -95,7 +117,7 @@ impl Worker { // Sync handles these results self.send_sync_message(SyncMessage::BlockProcessed { process_type, - result: result.map(|_| ()), + result: result.into(), }); // Drop the handle to remove the entry from the cache diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 99df8e4a66..e32770c592 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -19,6 +19,7 @@ use self::{ single_block_lookup::SingleBlockRequest, }; +use super::manager::BlockProcessResult; use super::BatchProcessResult; use super::{ manager::{BlockProcessType, Id}, @@ -247,7 +248,7 @@ impl BlockLookups { | VerifyError::ExtraBlocksReturned => { let e = e.into(); warn!(self.log, "Peer sent invalid response to parent request."; - "peer_id" => %peer_id, "reason" => e); + "peer_id" => %peer_id, "reason" => %e); // We do not tolerate these kinds of errors. We will accept a few but these are signs // of a faulty peer. @@ -381,7 +382,7 @@ impl BlockLookups { pub fn single_block_processed( &mut self, id: Id, - result: Result<(), BlockError>, + result: BlockProcessResult, cx: &mut SyncNetworkContext, ) { let mut req = match self.single_block_lookups.remove(&id) { @@ -403,52 +404,62 @@ impl BlockLookups { Err(_) => return, }; - if let Err(e) = &result { - trace!(self.log, "Single block processing failed"; "block" => %root, "error" => %e); - } else { - trace!(self.log, "Single block processing succeeded"; "block" => %root); - } - - if let Err(e) = result { - match e { - BlockError::BlockIsAlreadyKnown => { - // No error here - } - BlockError::BeaconChainError(e) => { - // Internal error - error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e); - } - BlockError::ParentUnknown(block) => { - self.search_parent(block, peer_id, cx); - } - - e @ BlockError::ExecutionPayloadError(ExecutionPayloadError::RequestFailed(_)) - | e @ BlockError::ExecutionPayloadError( - ExecutionPayloadError::NoExecutionConnection, - ) => { - // These errors indicate that the execution layer is offline - // and failed to validate the execution payload. Do not downscore peer. - debug!( - self.log, - "Single block lookup failed. Execution layer is offline"; - "root" => %root, - "error" => ?e - ); - } - other => { - warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id); - cx.report_peer( - peer_id, - PeerAction::MidToleranceError, - "single_block_failure", - ); - - // Try it again if possible. - req.register_failure(); - if let Ok((peer_id, request)) = req.request_block() { - if let Ok(request_id) = cx.single_block_lookup_request(peer_id, request) { - // insert with the new id - self.single_block_lookups.insert(request_id, req); + match result { + BlockProcessResult::Ok => { + trace!(self.log, "Single block processing succeeded"; "block" => %root); + } + BlockProcessResult::Ignored => { + // Beacon processor signalled to ignore the block processing result. + // This implies that the cpu is overloaded. Drop the request. + warn!( + self.log, + "Single block processing was ignored, cpu might be overloaded"; + "action" => "dropping single block request" + ); + } + BlockProcessResult::Err(e) => { + trace!(self.log, "Single block processing failed"; "block" => %root, "error" => %e); + match e { + BlockError::BlockIsAlreadyKnown => { + // No error here + } + BlockError::BeaconChainError(e) => { + // Internal error + error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e); + } + BlockError::ParentUnknown(block) => { + self.search_parent(block, peer_id, cx); + } + e @ BlockError::ExecutionPayloadError(ExecutionPayloadError::RequestFailed( + _, + )) + | e @ BlockError::ExecutionPayloadError( + ExecutionPayloadError::NoExecutionConnection, + ) => { + // These errors indicate that the execution layer is offline + // and failed to validate the execution payload. Do not downscore peer. + debug!( + self.log, + "Single block lookup failed. Execution layer is offline"; + "root" => %root, + "error" => ?e + ); + } + other => { + warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id); + cx.report_peer( + peer_id, + PeerAction::MidToleranceError, + "single_block_failure", + ); + // Try it again if possible. + req.register_failure(); + if let Ok((peer_id, request)) = req.request_block() { + if let Ok(request_id) = cx.single_block_lookup_request(peer_id, request) + { + // insert with the new id + self.single_block_lookups.insert(request_id, req); + } } } } @@ -464,7 +475,7 @@ impl BlockLookups { pub fn parent_block_processed( &mut self, chain_hash: Hash256, - result: Result<(), BlockError>, + result: BlockProcessResult, cx: &mut SyncNetworkContext, ) { let (mut parent_lookup, peer_id) = if let Some((pos, peer)) = self @@ -487,20 +498,32 @@ impl BlockLookups { return crit!(self.log, "Process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash); }; - if let Err(e) = &result { - trace!(self.log, "Parent block processing failed"; &parent_lookup, "error" => %e); - } else { - trace!(self.log, "Parent block processing succeeded"; &parent_lookup); + match &result { + BlockProcessResult::Ok => { + trace!(self.log, "Parent block processing succeeded"; &parent_lookup) + } + BlockProcessResult::Err(e) => { + trace!(self.log, "Parent block processing failed"; &parent_lookup, "error" => %e) + } + BlockProcessResult::Ignored => { + trace!( + self.log, + "Parent block processing job was ignored"; + "action" => "re-requesting block", + &parent_lookup + ); + } } match result { - Err(BlockError::ParentUnknown(block)) => { + BlockProcessResult::Err(BlockError::ParentUnknown(block)) => { // need to keep looking for parents // add the block back to the queue and continue the search parent_lookup.add_block(block); self.request_parent(parent_lookup, cx); } - Ok(_) | Err(BlockError::BlockIsAlreadyKnown { .. }) => { + BlockProcessResult::Ok + | BlockProcessResult::Err(BlockError::BlockIsAlreadyKnown { .. }) => { let chain_hash = parent_lookup.chain_hash(); let blocks = parent_lookup.chain_blocks(); let process_id = ChainSegmentProcessId::ParentLookup(chain_hash); @@ -521,8 +544,10 @@ impl BlockLookups { } } } - Err(e @ BlockError::ExecutionPayloadError(ExecutionPayloadError::RequestFailed(_))) - | Err( + BlockProcessResult::Err( + e @ BlockError::ExecutionPayloadError(ExecutionPayloadError::RequestFailed(_)), + ) + | BlockProcessResult::Err( e @ BlockError::ExecutionPayloadError(ExecutionPayloadError::NoExecutionConnection), ) => { // These errors indicate that the execution layer is offline @@ -534,7 +559,7 @@ impl BlockLookups { "error" => ?e ); } - Err(outcome) => { + BlockProcessResult::Err(outcome) => { // all else we consider the chain a failure and downvote the peer that sent // us the last block warn!( @@ -551,6 +576,15 @@ impl BlockLookups { // ambiguity. cx.report_peer(peer_id, PeerAction::MidToleranceError, "parent_request_err"); } + BlockProcessResult::Ignored => { + // Beacon processor signalled to ignore the block processing result. + // This implies that the cpu is overloaded. Drop the request. + warn!( + self.log, + "Parent block processing was ignored, cpu might be overloaded"; + "action" => "dropping parent request" + ); + } } metrics::set_gauge( diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index e9c8ac8ca7..352de4e09b 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -168,7 +168,7 @@ fn test_single_block_lookup_happy_path() { // Send the stream termination. Peer should have not been penalized, and the request removed // after processing. bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); - bl.single_block_processed(id, Ok(()), &mut cx); + bl.single_block_processed(id, Ok(()).into(), &mut cx); rig.expect_empty_network(); assert_eq!(bl.single_block_lookups.len(), 0); } @@ -252,7 +252,11 @@ fn test_single_block_lookup_becomes_parent_request() { // Send the stream termination. Peer should have not been penalized, and the request moved to a // parent request after processing. - bl.single_block_processed(id, Err(BlockError::ParentUnknown(Arc::new(block))), &mut cx); + bl.single_block_processed( + id, + BlockError::ParentUnknown(Arc::new(block)).into(), + &mut cx, + ); assert_eq!(bl.single_block_lookups.len(), 0); rig.expect_parent_request(); rig.expect_empty_network(); @@ -278,7 +282,7 @@ fn test_parent_lookup_happy_path() { rig.expect_empty_network(); // Processing succeeds, now the rest of the chain should be sent for processing. - bl.parent_block_processed(chain_hash, Err(BlockError::BlockIsAlreadyKnown), &mut cx); + bl.parent_block_processed(chain_hash, BlockError::BlockIsAlreadyKnown.into(), &mut cx); rig.expect_parent_chain_process(); bl.parent_chain_processed(chain_hash, BatchProcessResult::Success(true), &mut cx); assert_eq!(bl.parent_queue.len(), 0); @@ -312,7 +316,7 @@ fn test_parent_lookup_wrong_response() { rig.expect_block_process(); // Processing succeeds, now the rest of the chain should be sent for processing. - bl.parent_block_processed(chain_hash, Ok(()), &mut cx); + bl.parent_block_processed(chain_hash, Ok(()).into(), &mut cx); rig.expect_parent_chain_process(); bl.parent_chain_processed(chain_hash, BatchProcessResult::Success(true), &mut cx); assert_eq!(bl.parent_queue.len(), 0); @@ -341,7 +345,7 @@ fn test_parent_lookup_empty_response() { rig.expect_block_process(); // Processing succeeds, now the rest of the chain should be sent for processing. - bl.parent_block_processed(chain_hash, Ok(()), &mut cx); + bl.parent_block_processed(chain_hash, Ok(()).into(), &mut cx); rig.expect_parent_chain_process(); bl.parent_chain_processed(chain_hash, BatchProcessResult::Success(true), &mut cx); assert_eq!(bl.parent_queue.len(), 0); @@ -369,7 +373,7 @@ fn test_parent_lookup_rpc_failure() { rig.expect_block_process(); // Processing succeeds, now the rest of the chain should be sent for processing. - bl.parent_block_processed(chain_hash, Ok(()), &mut cx); + bl.parent_block_processed(chain_hash, Ok(()).into(), &mut cx); rig.expect_parent_chain_process(); bl.parent_chain_processed(chain_hash, BatchProcessResult::Success(true), &mut cx); assert_eq!(bl.parent_queue.len(), 0); @@ -440,7 +444,7 @@ fn test_parent_lookup_too_deep() { // the processing result bl.parent_block_processed( chain_hash, - Err(BlockError::ParentUnknown(Arc::new(block))), + BlockError::ParentUnknown(Arc::new(block)).into(), &mut cx, ) } @@ -458,3 +462,56 @@ fn test_parent_lookup_disconnection() { bl.peer_disconnected(&peer_id, &mut cx); assert!(bl.parent_queue.is_empty()); } + +#[test] +fn test_single_block_lookup_ignored_response() { + let (mut bl, mut cx, mut rig) = TestRig::test_setup(None); + + let block = rig.rand_block(); + let peer_id = PeerId::random(); + + // Trigger the request + bl.search_block(block.canonical_root(), peer_id, &mut cx); + let id = rig.expect_block_request(); + + // The peer provides the correct block, should not be penalized. Now the block should be sent + // for processing. + bl.single_block_lookup_response(id, peer_id, Some(Arc::new(block)), D, &mut cx); + rig.expect_empty_network(); + rig.expect_block_process(); + + // The request should still be active. + assert_eq!(bl.single_block_lookups.len(), 1); + + // Send the stream termination. Peer should have not been penalized, and the request removed + // after processing. + bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); + // Send an Ignored response, the request should be dropped + bl.single_block_processed(id, BlockProcessResult::Ignored, &mut cx); + rig.expect_empty_network(); + assert_eq!(bl.single_block_lookups.len(), 0); +} + +#[test] +fn test_parent_lookup_ignored_response() { + let (mut bl, mut cx, mut rig) = TestRig::test_setup(None); + + let parent = rig.rand_block(); + let block = rig.block_with_parent(parent.canonical_root()); + let chain_hash = block.canonical_root(); + let peer_id = PeerId::random(); + + // Trigger the request + bl.search_parent(Arc::new(block), peer_id, &mut cx); + let id = rig.expect_parent_request(); + + // Peer sends the right block, it should be sent for processing. Peer should not be penalized. + bl.parent_lookup_response(id, peer_id, Some(Arc::new(parent)), D, &mut cx); + rig.expect_block_process(); + rig.expect_empty_network(); + + // Return an Ignored result. The request should be dropped + bl.parent_block_processed(chain_hash, BlockProcessResult::Ignored, &mut cx); + rig.expect_empty_network(); + assert_eq!(bl.parent_queue.len(), 0); +} diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 3e44256655..d0919406b2 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -117,7 +117,7 @@ pub enum SyncMessage { /// Block processed BlockProcessed { process_type: BlockProcessType, - result: Result<(), BlockError>, + result: BlockProcessResult, }, } @@ -128,6 +128,13 @@ pub enum BlockProcessType { ParentLookup { chain_hash: Hash256 }, } +#[derive(Debug)] +pub enum BlockProcessResult { + Ok, + Err(BlockError), + Ignored, +} + /// The result of processing multiple blocks (a chain segment). #[derive(Debug)] pub enum BatchProcessResult { @@ -620,3 +627,18 @@ impl SyncManager { } } } + +impl From>> for BlockProcessResult { + fn from(result: Result>) -> Self { + match result { + Ok(_) => BlockProcessResult::Ok, + Err(e) => e.into(), + } + } +} + +impl From> for BlockProcessResult { + fn from(e: BlockError) -> Self { + BlockProcessResult::Err(e) + } +}