Wait before column reconstruction (#7588)

This commit is contained in:
Daniel Knopik
2025-06-13 20:19:06 +02:00
committed by GitHub
parent a65f78222d
commit ccd99c138c
4 changed files with 110 additions and 3 deletions

View File

@@ -39,7 +39,7 @@
//! task.
use crate::work_reprocessing_queue::{
QueuedBackfillBatch, QueuedGossipBlock, ReprocessQueueMessage,
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage,
};
use futures::stream::{Stream, StreamExt};
use futures::task::Poll;
@@ -117,6 +117,7 @@ pub struct BeaconProcessorQueueLengths {
rpc_custody_column_queue: usize,
rpc_verify_data_column_queue: usize,
sampling_result_queue: usize,
column_reconstruction_queue: usize,
chain_segment_queue: usize,
backfill_chain_segment: usize,
gossip_block_queue: usize,
@@ -184,6 +185,7 @@ impl BeaconProcessorQueueLengths {
rpc_verify_data_column_queue: 1000,
unknown_block_sampling_request_queue: 16384,
sampling_result_queue: 1000,
column_reconstruction_queue: 64,
chain_segment_queue: 64,
backfill_chain_segment: 64,
gossip_block_queue: 1024,
@@ -498,6 +500,12 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
drop_during_sync: false,
work: Work::ChainSegmentBackfill(process_fn),
},
ReadyWork::ColumnReconstruction(QueuedColumnReconstruction { process_fn, .. }) => {
Self {
drop_during_sync: true,
work: Work::ColumnReconstruction(process_fn),
}
}
}
}
}
@@ -619,6 +627,7 @@ pub enum Work<E: EthSpec> {
RpcCustodyColumn(AsyncFn),
RpcVerifyDataColumn(AsyncFn),
SamplingResult(AsyncFn),
ColumnReconstruction(AsyncFn),
IgnoredRpcBlock {
process_fn: BlockingFn,
},
@@ -674,6 +683,7 @@ pub enum WorkType {
RpcCustodyColumn,
RpcVerifyDataColumn,
SamplingResult,
ColumnReconstruction,
IgnoredRpcBlock,
ChainSegment,
ChainSegmentBackfill,
@@ -725,6 +735,7 @@ impl<E: EthSpec> Work<E> {
Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn,
Work::RpcVerifyDataColumn { .. } => WorkType::RpcVerifyDataColumn,
Work::SamplingResult { .. } => WorkType::SamplingResult,
Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction,
Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock,
Work::ChainSegment { .. } => WorkType::ChainSegment,
Work::ChainSegmentBackfill(_) => WorkType::ChainSegmentBackfill,
@@ -891,6 +902,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
FifoQueue::new(queue_lengths.rpc_verify_data_column_queue);
// TODO(das): the sampling_request_queue is never read
let mut sampling_result_queue = FifoQueue::new(queue_lengths.sampling_result_queue);
let mut column_reconstruction_queue =
FifoQueue::new(queue_lengths.column_reconstruction_queue);
let mut unknown_block_sampling_request_queue =
FifoQueue::new(queue_lengths.unknown_block_sampling_request_queue);
let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue);
@@ -1072,6 +1085,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
Some(item)
} else if let Some(item) = gossip_data_column_queue.pop() {
Some(item)
} else if let Some(item) = column_reconstruction_queue.pop() {
Some(item)
// Check the priority 0 API requests after blocks and blobs, but before attestations.
} else if let Some(item) = api_request_p0_queue.pop() {
Some(item)
@@ -1371,6 +1386,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
rpc_verify_data_column_queue.push(work, work_id)
}
Work::SamplingResult(_) => sampling_result_queue.push(work, work_id),
Work::ColumnReconstruction(_) => {
column_reconstruction_queue.push(work, work_id)
}
Work::ChainSegment { .. } => chain_segment_queue.push(work, work_id),
Work::ChainSegmentBackfill { .. } => {
backfill_chain_segment.push(work, work_id)
@@ -1460,6 +1478,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(),
WorkType::RpcVerifyDataColumn => rpc_verify_data_column_queue.len(),
WorkType::SamplingResult => sampling_result_queue.len(),
WorkType::ColumnReconstruction => column_reconstruction_queue.len(),
WorkType::ChainSegment => chain_segment_queue.len(),
WorkType::ChainSegmentBackfill => backfill_chain_segment.len(),
WorkType::Status => status_queue.len(),
@@ -1602,7 +1621,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
| Work::RpcBlobs { process_fn }
| Work::RpcCustodyColumn(process_fn)
| Work::RpcVerifyDataColumn(process_fn)
| Work::SamplingResult(process_fn) => task_spawner.spawn_async(process_fn),
| Work::SamplingResult(process_fn)
| Work::ColumnReconstruction(process_fn) => task_spawner.spawn_async(process_fn),
Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),
Work::GossipBlock(work)
| Work::GossipBlobSidecar(work)

View File

@@ -19,6 +19,7 @@ use itertools::Itertools;
use logging::crit;
use logging::TimeLatch;
use slot_clock::SlotClock;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::pin::Pin;
@@ -54,6 +55,9 @@ pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(4);
/// For how long to queue sampling requests for reprocessing.
pub const QUEUED_SAMPLING_REQUESTS_DELAY: Duration = Duration::from_secs(12);
/// For how long to queue delayed column reconstruction.
pub const QUEUED_RECONSTRUCTION_DELAY: Duration = Duration::from_millis(150);
/// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that
/// we signature-verify blocks before putting them in the queue *should* protect against this, but
/// it's nice to have extra protection.
@@ -109,6 +113,8 @@ pub enum ReprocessQueueMessage {
UnknownBlockSamplingRequest(QueuedSamplingRequest),
/// A new backfill batch that needs to be scheduled for processing.
BackfillSync(QueuedBackfillBatch),
/// A delayed column reconstruction that needs checking
DelayColumnReconstruction(QueuedColumnReconstruction),
}
/// Events sent by the scheduler once they are ready for re-processing.
@@ -121,6 +127,7 @@ pub enum ReadyWork {
LightClientUpdate(QueuedLightClientUpdate),
SamplingRequest(QueuedSamplingRequest),
BackfillSync(QueuedBackfillBatch),
ColumnReconstruction(QueuedColumnReconstruction),
}
/// An Attestation for which the corresponding block was not seen while processing, queued for
@@ -176,6 +183,11 @@ pub struct IgnoredRpcBlock {
/// A backfill batch work that has been queued for processing later.
pub struct QueuedBackfillBatch(pub AsyncFn);
pub struct QueuedColumnReconstruction {
pub block_root: Hash256,
pub process_fn: AsyncFn,
}
impl<E: EthSpec> TryFrom<WorkEvent<E>> for QueuedBackfillBatch {
type Error = WorkEvent<E>;
@@ -212,6 +224,8 @@ enum InboundEvent {
ReadyLightClientUpdate(QueuedLightClientUpdateId),
/// A backfill batch that was queued is ready for processing.
ReadyBackfillSync(QueuedBackfillBatch),
/// A column reconstruction that was queued is ready for processing.
ReadyColumnReconstruction(QueuedColumnReconstruction),
/// A message sent to the `ReprocessQueue`
Msg(ReprocessQueueMessage),
}
@@ -234,6 +248,8 @@ struct ReprocessQueue<S> {
lc_updates_delay_queue: DelayQueue<QueuedLightClientUpdateId>,
/// Queue to manage scheduled sampling requests
sampling_requests_delay_queue: DelayQueue<QueuedSamplingRequestId>,
/// Queue to manage scheduled column reconstructions.
column_reconstructions_delay_queue: DelayQueue<QueuedColumnReconstruction>,
/* Queued items */
/// Queued blocks.
@@ -252,6 +268,8 @@ struct ReprocessQueue<S> {
queued_sampling_requests: FnvHashMap<usize, (QueuedSamplingRequest, DelayKey)>,
/// Sampling requests per block root.
awaiting_sampling_requests_per_block_root: HashMap<Hash256, Vec<QueuedSamplingRequestId>>,
/// Column reconstruction per block root.
queued_column_reconstructions: HashMap<Hash256, DelayKey>,
/// Queued backfill batches
queued_backfill_batches: Vec<QueuedBackfillBatch>,
@@ -343,6 +361,15 @@ impl<S: SlotClock> Stream for ReprocessQueue<S> {
Poll::Ready(None) | Poll::Pending => (),
}
match self.column_reconstructions_delay_queue.poll_expired(cx) {
Poll::Ready(Some(reconstruction)) => {
return Poll::Ready(Some(InboundEvent::ReadyColumnReconstruction(
reconstruction.into_inner(),
)));
}
Poll::Ready(None) | Poll::Pending => (),
}
if let Some(next_backfill_batch_event) = self.next_backfill_batch_event.as_mut() {
match next_backfill_batch_event.as_mut().poll(cx) {
Poll::Ready(_) => {
@@ -410,6 +437,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
attestations_delay_queue: DelayQueue::new(),
lc_updates_delay_queue: DelayQueue::new(),
sampling_requests_delay_queue: <_>::default(),
column_reconstructions_delay_queue: DelayQueue::new(),
queued_gossip_block_roots: HashSet::new(),
queued_lc_updates: FnvHashMap::default(),
queued_aggregates: FnvHashMap::default(),
@@ -419,6 +447,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
awaiting_lc_updates_per_parent_root: HashMap::new(),
awaiting_sampling_requests_per_block_root: <_>::default(),
queued_backfill_batches: Vec::new(),
queued_column_reconstructions: HashMap::new(),
next_attestation: 0,
next_lc_update: 0,
next_sampling_request_update: 0,
@@ -817,6 +846,21 @@ impl<S: SlotClock> ReprocessQueue<S> {
self.recompute_next_backfill_batch_event();
}
}
InboundEvent::Msg(DelayColumnReconstruction(request)) => {
match self.queued_column_reconstructions.entry(request.block_root) {
Entry::Occupied(key) => {
// Push back the reattempted reconstruction
self.column_reconstructions_delay_queue
.reset(key.get(), QUEUED_RECONSTRUCTION_DELAY)
}
Entry::Vacant(vacant) => {
let delay_key = self
.column_reconstructions_delay_queue
.insert(request, QUEUED_RECONSTRUCTION_DELAY);
vacant.insert(delay_key);
}
}
}
// A block that was queued for later processing is now ready to be processed.
InboundEvent::ReadyGossipBlock(ready_block) => {
let block_root = ready_block.beacon_block_root;
@@ -940,6 +984,20 @@ impl<S: SlotClock> ReprocessQueue<S> {
_ => crit!("Unexpected return from try_send error"),
}
}
InboundEvent::ReadyColumnReconstruction(column_reconstruction) => {
self.queued_column_reconstructions
.remove(&column_reconstruction.block_root);
if self
.ready_work_tx
.try_send(ReadyWork::ColumnReconstruction(column_reconstruction))
.is_err()
{
error!(
hint = "system may be overloaded",
"Ignored scheduled column reconstruction"
);
}
}
}
metrics::set_gauge_vec(