Remove reprocessing cache stuff

This commit is contained in:
Pawan Dhananjay
2023-03-20 21:21:45 +05:30
parent 3c1687d23c
commit 463ce3d1fe
3 changed files with 5 additions and 174 deletions

View File

@@ -42,9 +42,7 @@ use crate::sync::manager::BlockProcessType;
use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::parking_lot::Mutex;
use beacon_chain::{
BeaconChain, BeaconChainTypes, ExecutedBlock, GossipVerifiedBlock, NotifyExecutionLayer,
};
use beacon_chain::{BeaconChain, BeaconChainTypes, GossipVerifiedBlock, NotifyExecutionLayer};
use derivative::Derivative;
use futures::stream::{Stream, StreamExt};
use futures::task::Poll;
@@ -85,8 +83,6 @@ mod worker;
use crate::beacon_processor::work_reprocessing_queue::QueuedGossipBlock;
pub use worker::{ChainSegmentProcessId, GossipAggregatePackage, GossipAttestationPackage};
use self::work_reprocessing_queue::QueuedExecutedBlock;
/// The maximum size of the channel for work events to the `BeaconProcessor`.
///
/// Setting this too low will cause consensus messages to be dropped.
@@ -223,7 +219,6 @@ pub const GOSSIP_ATTESTATION_BATCH: &str = "gossip_attestation_batch";
pub const GOSSIP_AGGREGATE: &str = "gossip_aggregate";
pub const GOSSIP_AGGREGATE_BATCH: &str = "gossip_aggregate_batch";
pub const GOSSIP_BLOCK: &str = "gossip_block";
pub const EXECUTED_BLOCK: &str = "executed_block";
pub const GOSSIP_BLOCK_AND_BLOBS_SIDECAR: &str = "gossip_block_and_blobs_sidecar";
pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block";
pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit";
@@ -746,18 +741,6 @@ impl<T: BeaconChainTypes> std::convert::From<ReadyWork<T>> for WorkEvent<T> {
seen_timestamp,
},
},
ReadyWork::ExecutedBlock(QueuedExecutedBlock {
peer_id,
block,
seen_timestamp,
}) => Self {
drop_during_sync: false,
work: Work::ExecutedBlock {
peer_id,
block,
seen_timestamp,
},
},
ReadyWork::RpcBlock(QueuedRpcBlock {
block_root,
block,
@@ -889,11 +872,6 @@ pub enum Work<T: BeaconChainTypes> {
block: Box<GossipVerifiedBlock<T>>,
seen_timestamp: Duration,
},
ExecutedBlock {
peer_id: PeerId,
block: ExecutedBlock<T::EthSpec>,
seen_timestamp: Duration,
},
GossipVoluntaryExit {
message_id: MessageId,
peer_id: PeerId,
@@ -990,7 +968,6 @@ impl<T: BeaconChainTypes> Work<T> {
Work::GossipAggregate { .. } => GOSSIP_AGGREGATE,
Work::GossipAggregateBatch { .. } => GOSSIP_AGGREGATE_BATCH,
Work::GossipBlock { .. } => GOSSIP_BLOCK,
Work::ExecutedBlock { .. } => EXECUTED_BLOCK,
Work::GossipSignedBlobSidecar { .. } => GOSSIP_BLOCK_AND_BLOBS_SIDECAR,
Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK,
Work::GossipVoluntaryExit { .. } => GOSSIP_VOLUNTARY_EXIT,
@@ -1150,7 +1127,6 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN);
// Using a FIFO queue since blocks need to be imported sequentially.
let mut executed_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
@@ -1267,9 +1243,6 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
// on the delayed ones.
} else if let Some(item) = delayed_block_queue.pop() {
self.spawn_worker(item, toolbox);
// Check availability pending blocks
} else if let Some(item) = executed_block_queue.pop() {
self.spawn_worker(item, toolbox);
// Check gossip blocks before gossip attestations, since a block might be
// required to verify some attestations.
} else if let Some(item) = gossip_block_queue.pop() {
@@ -1489,9 +1462,6 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
Work::GossipBlock { .. } => {
gossip_block_queue.push(work, work_id, &self.log)
}
Work::ExecutedBlock { .. } => {
gossip_block_queue.push(work, work_id, &self.log)
}
Work::GossipSignedBlobSidecar { .. } => {
gossip_block_and_blobs_sidecar_queue.push(work, work_id, &self.log)
}
@@ -1772,20 +1742,6 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
)
.await
}),
Work::ExecutedBlock {
peer_id,
block,
seen_timestamp,
} => task_spawner.spawn_async(async move {
worker
.process_execution_verified_block(
peer_id,
block,
work_reprocessing_tx,
seen_timestamp,
)
.await
}),
/*
* Verification for blobs sidecars received on gossip.
*/

View File

@@ -14,15 +14,13 @@ use super::MAX_SCHEDULED_WORK_QUEUE_LEN;
use crate::metrics;
use crate::sync::manager::BlockProcessType;
use beacon_chain::blob_verification::{AsBlock, BlockWrapper};
use beacon_chain::{
BeaconChainTypes, ExecutedBlock, GossipVerifiedBlock, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
};
use beacon_chain::{BeaconChainTypes, GossipVerifiedBlock, MAXIMUM_GOSSIP_CLOCK_DISPARITY};
use fnv::FnvHashMap;
use futures::task::Poll;
use futures::{Stream, StreamExt};
use lighthouse_network::{MessageId, PeerId};
use logging::TimeLatch;
use slog::{crit, debug, error, trace, warn, Logger};
use slog::{debug, error, trace, warn, Logger};
use slot_clock::SlotClock;
use std::collections::{HashMap, HashSet};
use std::pin::Pin;
@@ -30,7 +28,6 @@ use std::task::Context;
use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::time::error::Error as TimeError;
use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey};
use types::{
Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof, SubnetId,
@@ -55,19 +52,11 @@ pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12);
/// For how long to queue rpc blocks before sending them back for reprocessing.
pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(3);
/// For how long to queue executed blocks before sending them back for reprocessing.
pub const QUEUED_EXECUTED_BLOCK_DELAY: Duration = Duration::from_secs(12);
/// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that
/// we signature-verify blocks before putting them in the queue *should* protect against this, but
/// it's nice to have extra protection.
const MAXIMUM_QUEUED_BLOCKS: usize = 16;
/// An `ExecutedBlock` contains the entire `BeaconState`, so we shouldn't be storing too many of them
/// to avoid getting DoS'd by the block proposer.
/// TODO(pawan): revise the max blocks
const MAXIMUM_QUEUED_EXECUTED_BLOCKS: usize = 4;
/// How many attestations we keep before new ones get dropped.
const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384;
@@ -87,9 +76,6 @@ pub enum ReprocessQueueMessage<T: BeaconChainTypes> {
block_root: Hash256,
parent_root: Hash256,
},
ExecutedBlock(QueuedExecutedBlock<T::EthSpec>),
/// The blobs corresponding to a `block_root` are now fully available.
BlobsAvailable(Hash256),
/// An unaggregated attestation that references an unknown block.
UnknownBlockUnaggregate(QueuedUnaggregate<T::EthSpec>),
/// An aggregated attestation that references an unknown block.
@@ -101,7 +87,6 @@ pub enum ReprocessQueueMessage<T: BeaconChainTypes> {
/// Events sent by the scheduler once they are ready for re-processing.
pub enum ReadyWork<T: BeaconChainTypes> {
GossipBlock(QueuedGossipBlock<T>),
ExecutedBlock(QueuedExecutedBlock<T::EthSpec>),
RpcBlock(QueuedRpcBlock<T::EthSpec>),
Unaggregate(QueuedUnaggregate<T::EthSpec>),
Aggregate(QueuedAggregate<T::EthSpec>),
@@ -145,14 +130,6 @@ pub struct QueuedGossipBlock<T: BeaconChainTypes> {
pub seen_timestamp: Duration,
}
/// A block that has been fully verified and is pending data availability
/// and import into the beacon chain.
pub struct QueuedExecutedBlock<T: EthSpec> {
pub peer_id: PeerId,
pub block: ExecutedBlock<T>,
pub seen_timestamp: Duration,
}
/// A block that arrived for processing when the same block was being imported over gossip.
/// It is queued for later import.
pub struct QueuedRpcBlock<T: EthSpec> {
@@ -169,9 +146,6 @@ pub struct QueuedRpcBlock<T: EthSpec> {
enum InboundEvent<T: BeaconChainTypes> {
/// A gossip block that was queued for later processing and is ready for import.
ReadyGossipBlock(QueuedGossipBlock<T>),
/// An executed block that was queued for blob availability and is now
/// ready for import
ReadyExecutedBlock(QueuedExecutedBlock<T::EthSpec>),
/// A rpc block that was queued because the same gossip block was being imported
/// will now be retried for import.
ReadyRpcBlock(QueuedRpcBlock<T::EthSpec>),
@@ -179,8 +153,6 @@ enum InboundEvent<T: BeaconChainTypes> {
ReadyAttestation(QueuedAttestationId),
/// A light client update that is ready for re-processing.
ReadyLightClientUpdate(QueuedLightClientUpdateId),
/// A `DelayQueue` returned an error.
DelayQueueError(TimeError, &'static str),
/// A message sent to the `ReprocessQueue`
Msg(ReprocessQueueMessage<T>),
}
@@ -195,8 +167,6 @@ struct ReprocessQueue<T: BeaconChainTypes> {
/* Queues */
/// Queue to manage scheduled early blocks.
gossip_block_delay_queue: DelayQueue<QueuedGossipBlock<T>>,
/// Queue to manage availability pending blocks.
executed_block_delay_queue: DelayQueue<QueuedExecutedBlock<T::EthSpec>>,
/// Queue to manage scheduled early blocks.
rpc_block_delay_queue: DelayQueue<QueuedRpcBlock<T::EthSpec>>,
/// Queue to manage scheduled attestations.
@@ -207,8 +177,6 @@ struct ReprocessQueue<T: BeaconChainTypes> {
/* Queued items */
/// Queued blocks.
queued_gossip_block_roots: HashSet<Hash256>,
/// Queued availability pending blocks.
queued_executed_block_roots: HashMap<Hash256, DelayKey>,
/// Queued aggregated attestations.
queued_aggregates: FnvHashMap<usize, (QueuedAggregate<T::EthSpec>, DelayKey)>,
/// Queued attestations.
@@ -272,17 +240,6 @@ impl<T: BeaconChainTypes> Stream for ReprocessQueue<T> {
Poll::Ready(None) | Poll::Pending => (),
}
match self.executed_block_delay_queue.poll_expired(cx) {
Poll::Ready(Some(queued_block)) => {
return Poll::Ready(Some(InboundEvent::ReadyExecutedBlock(
queued_block.into_inner(),
)));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}
match self.rpc_block_delay_queue.poll_expired(cx) {
Poll::Ready(Some(queued_block)) => {
return Poll::Ready(Some(InboundEvent::ReadyRpcBlock(queued_block.into_inner())));
@@ -341,12 +298,10 @@ pub fn spawn_reprocess_scheduler<T: BeaconChainTypes>(
work_reprocessing_rx,
ready_work_tx,
gossip_block_delay_queue: DelayQueue::new(),
executed_block_delay_queue: DelayQueue::new(),
rpc_block_delay_queue: DelayQueue::new(),
attestations_delay_queue: DelayQueue::new(),
lc_updates_delay_queue: DelayQueue::new(),
queued_gossip_block_roots: HashSet::new(),
queued_executed_block_roots: HashMap::new(),
queued_lc_updates: FnvHashMap::default(),
queued_aggregates: FnvHashMap::default(),
queued_unaggregates: FnvHashMap::default(),
@@ -441,59 +396,6 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
}
}
}
InboundEvent::Msg(ExecutedBlock(executed_block)) => {
if self.executed_block_delay_queue.len() >= MAXIMUM_QUEUED_EXECUTED_BLOCKS {
// TODO(use your own debounce)
if self.rpc_block_debounce.elapsed() {
warn!(
log,
"Executed blocks queue is full";
"queue_size" => MAXIMUM_QUEUED_EXECUTED_BLOCKS,
"msg" => "check system clock"
);
}
// TODO(pawan): block would essentially get dropped here
// can the devs do something?
}
// Queue the block for a slot
let block_root = executed_block.block.block_root;
if !self.queued_executed_block_roots.contains_key(&block_root) {
let key = self
.executed_block_delay_queue
.insert(executed_block, QUEUED_EXECUTED_BLOCK_DELAY);
self.queued_executed_block_roots.insert(block_root, key);
}
}
InboundEvent::Msg(BlobsAvailable(block_root)) => {
match self.queued_executed_block_roots.remove(&block_root) {
None => {
// Log an error to alert that we've made a bad assumption about how this
// program works, but still process the block anyway.
error!(
log,
"Unknown executed block in delay queue";
"block_root" => ?block_root
);
}
Some(key) => {
if let Some(executed_block) =
self.executed_block_delay_queue.try_remove(&key)
{
if self
.ready_work_tx
.try_send(ReadyWork::ExecutedBlock(executed_block.into_inner()))
.is_err()
{
error!(
log,
"Failed to pop queued block";
);
}
}
}
}
}
// A rpc block arrived for processing at the same time when a gossip block
// for the same block hash is being imported. We wait for `QUEUED_RPC_BLOCK_DELAY`
// and then send the rpc block back for processing assuming the gossip import
@@ -747,24 +649,6 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
}
}
}
InboundEvent::ReadyExecutedBlock(executed_block) => {
let block_root = executed_block.block.block_root;
if self
.queued_executed_block_roots
.remove(&block_root)
.is_none()
{
// Log an error to alert that we've made a bad assumption about how this
// program works, but still process the block anyway.
error!(
log,
"Unknown block in delay queue";
"block_root" => ?block_root
);
}
// TODO(pawan): just dropping the block, rethink what can be done here
}
// A block that was queued for later processing is now ready to be processed.
InboundEvent::ReadyGossipBlock(ready_block) => {
let block_root = ready_block.block.block_root;
@@ -791,14 +675,6 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
}
}
InboundEvent::DelayQueueError(e, queue_name) => {
crit!(
log,
"Failed to poll queue";
"queue" => queue_name,
"e" => ?e
)
}
InboundEvent::ReadyAttestation(queued_id) => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS,

View File

@@ -18,7 +18,6 @@ use operation_pool::ReceivedPreCapella;
use slog::{crit, debug, error, info, trace, warn};
use slot_clock::SlotClock;
use ssz::Encode;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use store::hot_cold_store::HotColdDBError;
use tokio::sync::mpsc;
@@ -31,8 +30,8 @@ use types::{
use super::{
super::work_reprocessing_queue::{
QueuedAggregate, QueuedExecutedBlock, QueuedGossipBlock, QueuedLightClientUpdate,
QueuedUnaggregate, ReprocessQueueMessage,
QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate,
ReprocessQueueMessage,
},
Worker,
};