Ignored sync jobs 2 (#3317)

## Issue Addressed

Duplicate of #3269. Making this since @divagant-martian opened the previous PR and she can't approve her own PR 😄 


Co-authored-by: Diva M <divma@protonmail.com>
This commit is contained in:
Pawan Dhananjay
2022-07-15 07:31:20 +00:00
parent 98a9626ef5
commit 28b0ff27ff
8 changed files with 396 additions and 99 deletions

View File

@@ -66,7 +66,7 @@ use types::{
SyncCommitteeMessage, SyncSubnetId,
};
use work_reprocessing_queue::{
spawn_reprocess_scheduler, QueuedAggregate, QueuedUnaggregate, ReadyWork,
spawn_reprocess_scheduler, QueuedAggregate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork,
};
use worker::{Toolbox, Worker};
@@ -75,7 +75,7 @@ mod tests;
mod work_reprocessing_queue;
mod worker;
use crate::beacon_processor::work_reprocessing_queue::QueuedBlock;
use crate::beacon_processor::work_reprocessing_queue::QueuedGossipBlock;
pub use worker::{
ChainSegmentProcessId, FailureMode, GossipAggregatePackage, GossipAttestationPackage,
};
@@ -501,6 +501,7 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
block,
seen_timestamp,
process_type,
should_process: true,
},
}
}
@@ -565,7 +566,7 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
fn from(ready_work: ReadyWork<T>) -> Self {
match ready_work {
ReadyWork::Block(QueuedBlock {
ReadyWork::Block(QueuedGossipBlock {
peer_id,
block,
seen_timestamp,
@@ -577,6 +578,20 @@ impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
seen_timestamp,
},
},
ReadyWork::RpcBlock(QueuedRpcBlock {
block,
seen_timestamp,
process_type,
should_process,
}) => Self {
drop_during_sync: false,
work: Work::RpcBlock {
block,
seen_timestamp,
process_type,
should_process,
},
},
ReadyWork::Unaggregate(QueuedUnaggregate {
peer_id,
message_id,
@@ -695,6 +710,7 @@ pub enum Work<T: BeaconChainTypes> {
block: Arc<SignedBeaconBlock<T::EthSpec>>,
seen_timestamp: Duration,
process_type: BlockProcessType,
should_process: bool,
},
ChainSegment {
process_id: ChainSegmentProcessId,
@@ -1521,12 +1537,14 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
block,
seen_timestamp,
process_type,
should_process,
} => task_spawner.spawn_async(worker.process_rpc_block(
block,
seen_timestamp,
process_type,
work_reprocessing_tx,
duplicate_cache,
should_process,
)),
/*
* Verification for a chain segment (multiple blocks).

View File

@@ -1,7 +1,9 @@
#![cfg(not(debug_assertions))] // Tests are too slow in debug.
#![cfg(test)]
use crate::beacon_processor::work_reprocessing_queue::QUEUED_ATTESTATION_DELAY;
use crate::beacon_processor::work_reprocessing_queue::{
QUEUED_ATTESTATION_DELAY, QUEUED_RPC_BLOCK_DELAY,
};
use crate::beacon_processor::*;
use crate::{service::NetworkMessage, sync::SyncMessage};
use beacon_chain::test_utils::{
@@ -54,6 +56,7 @@ struct TestRig {
work_journal_rx: mpsc::Receiver<&'static str>,
_network_rx: mpsc::UnboundedReceiver<NetworkMessage<E>>,
_sync_rx: mpsc::UnboundedReceiver<SyncMessage<E>>,
duplicate_cache: DuplicateCache,
_harness: BeaconChainHarness<T>,
}
@@ -185,6 +188,7 @@ impl TestRig {
let (work_journal_tx, work_journal_rx) = mpsc::channel(16_364);
let duplicate_cache = DuplicateCache::default();
BeaconProcessor {
beacon_chain: Arc::downgrade(&chain),
network_tx,
@@ -193,7 +197,7 @@ impl TestRig {
executor,
max_workers: cmp::max(1, num_cpus::get()),
current_workers: 0,
importing_blocks: Default::default(),
importing_blocks: duplicate_cache.clone(),
log: log.clone(),
}
.spawn_manager(beacon_processor_rx, Some(work_journal_tx));
@@ -211,6 +215,7 @@ impl TestRig {
work_journal_rx,
_network_rx,
_sync_rx,
duplicate_cache,
_harness: harness,
}
}
@@ -246,6 +251,15 @@ impl TestRig {
self.beacon_processor_tx.try_send(event).unwrap();
}
pub fn enqueue_single_lookup_rpc_block(&self) {
let event = WorkEvent::rpc_beacon_block(
self.next_block.clone(),
std::time::Duration::default(),
BlockProcessType::SingleBlock { id: 1 },
);
self.beacon_processor_tx.try_send(event).unwrap();
}
pub fn enqueue_unaggregated_attestation(&self) {
let (attestation, subnet_id) = self.attestations.first().unwrap().clone();
self.beacon_processor_tx
@@ -828,3 +842,33 @@ async fn import_misc_gossip_ops() {
"op pool should have one more exit"
);
}
/// Ensure that rpc block going to the reprocessing queue flow
/// works when the duplicate cache handle is held by another task.
#[tokio::test]
async fn test_rpc_block_reprocessing() {
let mut rig = TestRig::new(SMALL_CHAIN).await;
let next_block_root = rig.next_block.canonical_root();
// Insert the next block into the duplicate cache manually
let handle = rig.duplicate_cache.check_and_insert(next_block_root);
rig.enqueue_single_lookup_rpc_block();
rig.assert_event_journal(&[RPC_BLOCK, WORKER_FREED, NOTHING_TO_DO])
.await;
// next_block shouldn't be processed since it couldn't get the
// duplicate cache handle
assert_ne!(next_block_root, rig.head_root());
drop(handle);
// The block should arrive at the beacon processor again after
// the specified delay.
tokio::time::sleep(QUEUED_RPC_BLOCK_DELAY).await;
rig.assert_event_journal(&[RPC_BLOCK]).await;
// Add an extra delay for block processing
tokio::time::sleep(Duration::from_millis(10)).await;
// head should update to next block now since the duplicate
// cache handle was dropped.
assert_eq!(next_block_root, rig.head_root());
}

View File

@@ -12,6 +12,7 @@
//! block will be re-queued until their block is imported, or until they expire.
use super::MAX_SCHEDULED_WORK_QUEUE_LEN;
use crate::metrics;
use crate::sync::manager::BlockProcessType;
use beacon_chain::{BeaconChainTypes, GossipVerifiedBlock, MAXIMUM_GOSSIP_CLOCK_DISPARITY};
use fnv::FnvHashMap;
use futures::task::Poll;
@@ -22,16 +23,18 @@ use slog::{crit, debug, error, warn, Logger};
use slot_clock::SlotClock;
use std::collections::{HashMap, HashSet};
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::time::error::Error as TimeError;
use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey};
use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SubnetId};
use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SubnetId};
const TASK_NAME: &str = "beacon_processor_reprocess_queue";
const BLOCKS: &str = "blocks";
const GOSSIP_BLOCKS: &str = "gossip_blocks";
const RPC_BLOCKS: &str = "rpc_blocks";
const ATTESTATIONS: &str = "attestations";
/// Queue blocks for re-processing with an `ADDITIONAL_QUEUED_BLOCK_DELAY` after the slot starts.
@@ -41,6 +44,9 @@ const ADDITIONAL_QUEUED_BLOCK_DELAY: Duration = Duration::from_millis(5);
/// For how long to queue aggregated and unaggregated attestations for re-processing.
pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12);
/// For how long to queue rpc blocks before sending them back for reprocessing.
pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(3);
/// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that
/// we signature-verify blocks before putting them in the queue *should* protect against this, but
/// it's nice to have extra protection.
@@ -52,7 +58,10 @@ const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384;
/// Messages that the scheduler can receive.
pub enum ReprocessQueueMessage<T: BeaconChainTypes> {
/// A block that has been received early and we should queue for later processing.
EarlyBlock(QueuedBlock<T>),
EarlyBlock(QueuedGossipBlock<T>),
/// A gossip block for hash `X` is being imported, we should queue the rpc block for the same
/// hash until the gossip block is imported.
RpcBlock(QueuedRpcBlock<T::EthSpec>),
/// A block that was successfully processed. We use this to handle attestations for unknown
/// blocks.
BlockImported(Hash256),
@@ -64,7 +73,8 @@ pub enum ReprocessQueueMessage<T: BeaconChainTypes> {
/// Events sent by the scheduler once they are ready for re-processing.
pub enum ReadyWork<T: BeaconChainTypes> {
Block(QueuedBlock<T>),
Block(QueuedGossipBlock<T>),
RpcBlock(QueuedRpcBlock<T::EthSpec>),
Unaggregate(QueuedUnaggregate<T::EthSpec>),
Aggregate(QueuedAggregate<T::EthSpec>),
}
@@ -90,16 +100,30 @@ pub struct QueuedAggregate<T: EthSpec> {
}
/// A block that arrived early and has been queued for later import.
pub struct QueuedBlock<T: BeaconChainTypes> {
pub struct QueuedGossipBlock<T: BeaconChainTypes> {
pub peer_id: PeerId,
pub block: Box<GossipVerifiedBlock<T>>,
pub seen_timestamp: Duration,
}
/// A block that arrived for processing when the same block was being imported over gossip.
/// It is queued for later import.
pub struct QueuedRpcBlock<T: EthSpec> {
pub block: Arc<SignedBeaconBlock<T>>,
pub process_type: BlockProcessType,
pub seen_timestamp: Duration,
/// Indicates if the beacon chain should process this block or not.
/// We use this to ignore block processing when rpc block queues are full.
pub should_process: bool,
}
/// Unifies the different messages processed by the block delay queue.
enum InboundEvent<T: BeaconChainTypes> {
/// A block that was queued for later processing and is ready for import.
ReadyBlock(QueuedBlock<T>),
/// A gossip block that was queued for later processing and is ready for import.
ReadyGossipBlock(QueuedGossipBlock<T>),
/// A rpc block that was queued because the same gossip block was being imported
/// will now be retried for import.
ReadyRpcBlock(QueuedRpcBlock<T::EthSpec>),
/// An aggregated or unaggregated attestation is ready for re-processing.
ReadyAttestation(QueuedAttestationId),
/// A `DelayQueue` returned an error.
@@ -117,13 +141,15 @@ struct ReprocessQueue<T: BeaconChainTypes> {
/* Queues */
/// Queue to manage scheduled early blocks.
block_delay_queue: DelayQueue<QueuedBlock<T>>,
gossip_block_delay_queue: DelayQueue<QueuedGossipBlock<T>>,
/// Queue to manage scheduled early blocks.
rpc_block_delay_queue: DelayQueue<QueuedRpcBlock<T::EthSpec>>,
/// Queue to manage scheduled attestations.
attestations_delay_queue: DelayQueue<QueuedAttestationId>,
/* Queued items */
/// Queued blocks.
queued_block_roots: HashSet<Hash256>,
queued_gossip_block_roots: HashSet<Hash256>,
/// Queued aggregated attestations.
queued_aggregates: FnvHashMap<usize, (QueuedAggregate<T::EthSpec>, DelayKey)>,
/// Queued attestations.
@@ -135,6 +161,7 @@ struct ReprocessQueue<T: BeaconChainTypes> {
/// Next attestation id, used for both aggregated and unaggregated attestations
next_attestation: usize,
early_block_debounce: TimeLatch,
rpc_block_debounce: TimeLatch,
attestation_delay_debounce: TimeLatch,
}
@@ -167,12 +194,26 @@ impl<T: BeaconChainTypes> Stream for ReprocessQueue<T> {
//
// The sequential nature of blockchains means it is generally better to try and import all
// existing blocks before new ones.
match self.block_delay_queue.poll_expired(cx) {
match self.gossip_block_delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(queued_block))) => {
return Poll::Ready(Some(InboundEvent::ReadyBlock(queued_block.into_inner())));
return Poll::Ready(Some(InboundEvent::ReadyGossipBlock(
queued_block.into_inner(),
)));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "block_queue")));
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "gossip_block_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}
match self.rpc_block_delay_queue.poll_expired(cx) {
Poll::Ready(Some(Ok(queued_block))) => {
return Poll::Ready(Some(InboundEvent::ReadyRpcBlock(queued_block.into_inner())));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "rpc_block_queue")));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
@@ -219,14 +260,16 @@ pub fn spawn_reprocess_scheduler<T: BeaconChainTypes>(
let mut queue = ReprocessQueue {
work_reprocessing_rx,
ready_work_tx,
block_delay_queue: DelayQueue::new(),
gossip_block_delay_queue: DelayQueue::new(),
rpc_block_delay_queue: DelayQueue::new(),
attestations_delay_queue: DelayQueue::new(),
queued_block_roots: HashSet::new(),
queued_gossip_block_roots: HashSet::new(),
queued_aggregates: FnvHashMap::default(),
queued_unaggregates: FnvHashMap::default(),
awaiting_attestations_per_root: HashMap::new(),
next_attestation: 0,
early_block_debounce: TimeLatch::default(),
rpc_block_debounce: TimeLatch::default(),
attestation_delay_debounce: TimeLatch::default(),
};
@@ -259,13 +302,13 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
let block_root = early_block.block.block_root;
// Don't add the same block to the queue twice. This prevents DoS attacks.
if self.queued_block_roots.contains(&block_root) {
if self.queued_gossip_block_roots.contains(&block_root) {
return;
}
if let Some(duration_till_slot) = slot_clock.duration_to_slot(block_slot) {
// Check to ensure this won't over-fill the queue.
if self.queued_block_roots.len() >= MAXIMUM_QUEUED_BLOCKS {
if self.queued_gossip_block_roots.len() >= MAXIMUM_QUEUED_BLOCKS {
if self.early_block_debounce.elapsed() {
warn!(
log,
@@ -278,10 +321,10 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
return;
}
self.queued_block_roots.insert(block_root);
self.queued_gossip_block_roots.insert(block_root);
// Queue the block until the start of the appropriate slot, plus
// `ADDITIONAL_QUEUED_BLOCK_DELAY`.
self.block_delay_queue.insert(
self.gossip_block_delay_queue.insert(
early_block,
duration_till_slot + ADDITIONAL_QUEUED_BLOCK_DELAY,
);
@@ -311,6 +354,58 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
}
}
}
// A rpc block arrived for processing at the same time when a gossip block
// for the same block hash is being imported. We wait for `QUEUED_RPC_BLOCK_DELAY`
// and then send the rpc block back for processing assuming the gossip import
// has completed by then.
InboundEvent::Msg(RpcBlock(mut rpc_block)) => {
// Check to ensure this won't over-fill the queue.
if self.rpc_block_delay_queue.len() >= MAXIMUM_QUEUED_BLOCKS {
if self.rpc_block_debounce.elapsed() {
warn!(
log,
"RPC blocks queue is full";
"queue_size" => MAXIMUM_QUEUED_BLOCKS,
"msg" => "check system clock"
);
}
// Return the block to the beacon processor signalling to
// ignore processing for this block
rpc_block.should_process = false;
if self
.ready_work_tx
.try_send(ReadyWork::RpcBlock(rpc_block))
.is_err()
{
error!(
log,
"Failed to send rpc block to beacon processor";
);
}
return;
}
// Queue the block for 1/4th of a slot
self.rpc_block_delay_queue
.insert(rpc_block, QUEUED_RPC_BLOCK_DELAY);
}
InboundEvent::ReadyRpcBlock(queued_rpc_block) => {
debug!(
log,
"Sending rpc block for reprocessing";
"block_root" => %queued_rpc_block.block.canonical_root()
);
if self
.ready_work_tx
.try_send(ReadyWork::RpcBlock(queued_rpc_block))
.is_err()
{
error!(
log,
"Failed to send rpc block to beacon processor";
);
}
}
InboundEvent::Msg(UnknownBlockAggregate(queued_aggregate)) => {
if self.attestations_delay_queue.len() >= MAXIMUM_QUEUED_ATTESTATIONS {
if self.attestation_delay_debounce.elapsed() {
@@ -423,10 +518,10 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
}
}
// A block that was queued for later processing is now ready to be processed.
InboundEvent::ReadyBlock(ready_block) => {
InboundEvent::ReadyGossipBlock(ready_block) => {
let block_root = ready_block.block.block_root;
if !self.queued_block_roots.remove(&block_root) {
if !self.queued_gossip_block_roots.remove(&block_root) {
// Log an error to alert that we've made a bad assumption about how this
// program works, but still process the block anyway.
error!(
@@ -499,8 +594,13 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
metrics::set_gauge_vec(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
&[BLOCKS],
self.block_delay_queue.len() as i64,
&[GOSSIP_BLOCKS],
self.gossip_block_delay_queue.len() as i64,
);
metrics::set_gauge_vec(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
&[RPC_BLOCKS],
self.rpc_block_delay_queue.len() as i64,
);
metrics::set_gauge_vec(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,

View File

@@ -25,7 +25,7 @@ use types::{
use super::{
super::work_reprocessing_queue::{
QueuedAggregate, QueuedBlock, QueuedUnaggregate, ReprocessQueueMessage,
QueuedAggregate, QueuedGossipBlock, QueuedUnaggregate, ReprocessQueueMessage,
},
Worker,
};
@@ -857,7 +857,7 @@ impl<T: BeaconChainTypes> Worker<T> {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_REQUEUED_TOTAL);
if reprocess_tx
.try_send(ReprocessQueueMessage::EarlyBlock(QueuedBlock {
.try_send(ReprocessQueueMessage::EarlyBlock(QueuedGossipBlock {
peer_id,
block: Box::new(verified_block),
seen_timestamp: seen_duration,

View File

@@ -1,6 +1,7 @@
use std::time::Duration;
use super::{super::work_reprocessing_queue::ReprocessQueueMessage, Worker};
use crate::beacon_processor::work_reprocessing_queue::QueuedRpcBlock;
use crate::beacon_processor::worker::FUTURE_SLOT_TOLERANCE;
use crate::beacon_processor::DuplicateCache;
use crate::metrics;
@@ -53,16 +54,37 @@ impl<T: BeaconChainTypes> Worker<T> {
process_type: BlockProcessType,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
duplicate_cache: DuplicateCache,
should_process: bool,
) {
if !should_process {
// Sync handles these results
self.send_sync_message(SyncMessage::BlockProcessed {
process_type,
result: crate::sync::manager::BlockProcessResult::Ignored,
});
return;
}
// Check if the block is already being imported through another source
let handle = match duplicate_cache.check_and_insert(block.canonical_root()) {
Some(handle) => handle,
None => {
// Sync handles these results
self.send_sync_message(SyncMessage::BlockProcessed {
debug!(
self.log,
"Gossip block is being processed";
"action" => "sending rpc block to reprocessing queue",
"block_root" => %block.canonical_root(),
);
// Send message to work reprocess queue to retry the block
let reprocess_msg = ReprocessQueueMessage::RpcBlock(QueuedRpcBlock {
block: block.clone(),
process_type,
result: Err(BlockError::BlockIsAlreadyKnown),
seen_timestamp,
should_process: true,
});
if reprocess_tx.try_send(reprocess_msg).is_err() {
error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %block.canonical_root())
};
return;
}
};
@@ -95,7 +117,7 @@ impl<T: BeaconChainTypes> Worker<T> {
// Sync handles these results
self.send_sync_message(SyncMessage::BlockProcessed {
process_type,
result: result.map(|_| ()),
result: result.into(),
});
// Drop the handle to remove the entry from the cache