Reprocess queue for data columns referencing unknown blocks

This commit is contained in:
Eitan Seri-Levi
2026-05-22 14:44:05 +03:00
parent 60abd4b5b9
commit 9f4e3f367a
3 changed files with 140 additions and 10 deletions

View File

@@ -41,8 +41,8 @@
pub use crate::scheduler::BeaconProcessorQueueLengths; pub use crate::scheduler::BeaconProcessorQueueLengths;
use crate::scheduler::work_queue::WorkQueues; use crate::scheduler::work_queue::WorkQueues;
use crate::work_reprocessing_queue::{ use crate::work_reprocessing_queue::{
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, QueuedGossipEnvelope, QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, QueuedGossipDataColumn,
ReprocessQueueMessage, QueuedGossipEnvelope, ReprocessQueueMessage,
}; };
use futures::stream::{Stream, StreamExt}; use futures::stream::{Stream, StreamExt};
use futures::task::Poll; use futures::task::Poll;
@@ -304,6 +304,10 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
work: Work::ColumnReconstruction(process_fn), work: Work::ColumnReconstruction(process_fn),
} }
} }
ReadyWork::DataColumn(QueuedGossipDataColumn { process_fn, .. }) => Self {
drop_during_sync: true,
work: Work::UnknownBlockAttestation { process_fn },
},
} }
} }
} }

View File

@@ -123,6 +123,8 @@ pub enum ReprocessQueueMessage {
UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate), UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate),
/// A new backfill batch that needs to be scheduled for processing. /// A new backfill batch that needs to be scheduled for processing.
BackfillSync(QueuedBackfillBatch), BackfillSync(QueuedBackfillBatch),
/// A gossip data column that references an unknown block.
UnknownBlockDataColumn(QueuedGossipDataColumn),
/// A delayed column reconstruction that needs checking /// A delayed column reconstruction that needs checking
DelayColumnReconstruction(QueuedColumnReconstruction), DelayColumnReconstruction(QueuedColumnReconstruction),
} }
@@ -138,6 +140,7 @@ pub enum ReadyWork {
LightClientUpdate(QueuedLightClientUpdate), LightClientUpdate(QueuedLightClientUpdate),
BackfillSync(QueuedBackfillBatch), BackfillSync(QueuedBackfillBatch),
ColumnReconstruction(QueuedColumnReconstruction), ColumnReconstruction(QueuedColumnReconstruction),
DataColumn(QueuedGossipDataColumn),
} }
/// An Attestation for which the corresponding block was not seen while processing, queued for /// An Attestation for which the corresponding block was not seen while processing, queued for
@@ -200,6 +203,12 @@ pub struct QueuedColumnReconstruction {
pub process_fn: AsyncFn, pub process_fn: AsyncFn,
} }
/// A gossip data column that references an unknown block, queued for later reprocessing.
pub struct QueuedGossipDataColumn {
pub beacon_block_root: Hash256,
pub process_fn: BlockingFn,
}
impl<E: EthSpec> TryFrom<WorkEvent<E>> for QueuedBackfillBatch { impl<E: EthSpec> TryFrom<WorkEvent<E>> for QueuedBackfillBatch {
type Error = WorkEvent<E>; type Error = WorkEvent<E>;
@@ -240,6 +249,8 @@ enum InboundEvent {
ReadyBackfillSync(QueuedBackfillBatch), ReadyBackfillSync(QueuedBackfillBatch),
/// A column reconstruction that was queued is ready for processing. /// A column reconstruction that was queued is ready for processing.
ReadyColumnReconstruction(QueuedColumnReconstruction), ReadyColumnReconstruction(QueuedColumnReconstruction),
/// A gossip data column that timed out waiting for its block.
ReadyDataColumn(usize),
/// A message sent to the `ReprocessQueue` /// A message sent to the `ReprocessQueue`
Msg(ReprocessQueueMessage), Msg(ReprocessQueueMessage),
} }
@@ -264,6 +275,7 @@ struct ReprocessQueue<S> {
lc_updates_delay_queue: DelayQueue<QueuedLightClientUpdateId>, lc_updates_delay_queue: DelayQueue<QueuedLightClientUpdateId>,
/// Queue to manage scheduled column reconstructions. /// Queue to manage scheduled column reconstructions.
column_reconstructions_delay_queue: DelayQueue<QueuedColumnReconstruction>, column_reconstructions_delay_queue: DelayQueue<QueuedColumnReconstruction>,
data_columns_delay_queue: DelayQueue<usize>,
/* Queued items */ /* Queued items */
/// Queued blocks. /// Queued blocks.
@@ -284,10 +296,15 @@ struct ReprocessQueue<S> {
queued_column_reconstructions: HashMap<Hash256, Option<DelayKey>>, queued_column_reconstructions: HashMap<Hash256, Option<DelayKey>>,
/// Queued backfill batches /// Queued backfill batches
queued_backfill_batches: Vec<QueuedBackfillBatch>, queued_backfill_batches: Vec<QueuedBackfillBatch>,
/// Queued gossip data columns awaiting their block.
queued_gossip_data_columns: FnvHashMap<usize, (QueuedGossipDataColumn, DelayKey)>,
/// Data columns per block root.
awaiting_data_columns_per_root: HashMap<Hash256, Vec<usize>>,
/* Aux */ /* Aux */
/// Next attestation id, used for both aggregated and unaggregated attestations /// Next attestation id, used for both aggregated and unaggregated attestations
next_attestation: usize, next_attestation: usize,
next_data_column: usize,
next_lc_update: usize, next_lc_update: usize,
early_block_debounce: TimeLatch, early_block_debounce: TimeLatch,
envelope_delay_debounce: TimeLatch, envelope_delay_debounce: TimeLatch,
@@ -387,6 +404,13 @@ impl<S: SlotClock> Stream for ReprocessQueue<S> {
Poll::Ready(None) | Poll::Pending => (), Poll::Ready(None) | Poll::Pending => (),
} }
match self.data_columns_delay_queue.poll_expired(cx) {
Poll::Ready(Some(col_id)) => {
return Poll::Ready(Some(InboundEvent::ReadyDataColumn(col_id.into_inner())));
}
Poll::Ready(None) | Poll::Pending => (),
}
if let Some(next_backfill_batch_event) = self.next_backfill_batch_event.as_mut() { if let Some(next_backfill_batch_event) = self.next_backfill_batch_event.as_mut() {
match next_backfill_batch_event.as_mut().poll(cx) { match next_backfill_batch_event.as_mut().poll(cx) {
Poll::Ready(_) => { Poll::Ready(_) => {
@@ -455,6 +479,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
attestations_delay_queue: DelayQueue::new(), attestations_delay_queue: DelayQueue::new(),
lc_updates_delay_queue: DelayQueue::new(), lc_updates_delay_queue: DelayQueue::new(),
column_reconstructions_delay_queue: DelayQueue::new(), column_reconstructions_delay_queue: DelayQueue::new(),
data_columns_delay_queue: DelayQueue::new(),
queued_gossip_block_roots: HashSet::new(), queued_gossip_block_roots: HashSet::new(),
awaiting_envelopes_per_root: HashMap::new(), awaiting_envelopes_per_root: HashMap::new(),
queued_lc_updates: FnvHashMap::default(), queued_lc_updates: FnvHashMap::default(),
@@ -464,7 +489,10 @@ impl<S: SlotClock> ReprocessQueue<S> {
awaiting_lc_updates_per_parent_root: HashMap::new(), awaiting_lc_updates_per_parent_root: HashMap::new(),
queued_backfill_batches: Vec::new(), queued_backfill_batches: Vec::new(),
queued_column_reconstructions: HashMap::new(), queued_column_reconstructions: HashMap::new(),
queued_gossip_data_columns: FnvHashMap::default(),
awaiting_data_columns_per_root: HashMap::new(),
next_attestation: 0, next_attestation: 0,
next_data_column: 0,
next_lc_update: 0, next_lc_update: 0,
early_block_debounce: TimeLatch::default(), early_block_debounce: TimeLatch::default(),
envelope_delay_debounce: TimeLatch::default(), envelope_delay_debounce: TimeLatch::default(),
@@ -688,6 +716,29 @@ impl<S: SlotClock> ReprocessQueue<S> {
self.next_attestation += 1; self.next_attestation += 1;
} }
InboundEvent::Msg(UnknownBlockDataColumn(queued_data_column)) => {
if self.queued_gossip_data_columns.len() >= MAXIMUM_QUEUED_ATTESTATIONS {
return;
}
let col_id = self.next_data_column;
let delay_key = self
.data_columns_delay_queue
.insert(col_id, QUEUED_ATTESTATION_DELAY);
// Register this column for the corresponding block root.
self.awaiting_data_columns_per_root
.entry(queued_data_column.beacon_block_root)
.or_default()
.push(col_id);
// Store the column and its info.
self.queued_gossip_data_columns
.insert(col_id, (queued_data_column, delay_key));
self.next_data_column += 1;
}
InboundEvent::Msg(UnknownLightClientOptimisticUpdate( InboundEvent::Msg(UnknownLightClientOptimisticUpdate(
queued_light_client_optimistic_update, queued_light_client_optimistic_update,
)) => { )) => {
@@ -800,6 +851,29 @@ impl<S: SlotClock> ReprocessQueue<S> {
); );
} }
} }
// Unqueue the data columns we have for this root, if any.
if let Some(queued_ids) =
self.awaiting_data_columns_per_root.remove(&block_root)
{
for col_id in queued_ids {
if let Some((data_column, delay_key)) =
self.queued_gossip_data_columns.remove(&col_id)
{
self.data_columns_delay_queue.remove(&delay_key);
if self
.ready_work_tx
.try_send(ReadyWork::DataColumn(data_column))
.is_err()
{
error!(
?block_root,
"Failed to send data column for reprocessing"
);
}
}
}
}
} }
InboundEvent::Msg(NewLightClientOptimisticUpdate { parent_root }) => { InboundEvent::Msg(NewLightClientOptimisticUpdate { parent_root }) => {
// Unqueue the light client optimistic updates we have for this root, if any. // Unqueue the light client optimistic updates we have for this root, if any.
@@ -1053,6 +1127,31 @@ impl<S: SlotClock> ReprocessQueue<S> {
); );
} }
} }
InboundEvent::ReadyDataColumn(col_id) => {
if let Some((data_column, _)) = self.queued_gossip_data_columns.remove(&col_id) {
// Clean up the per-root index.
let root = data_column.beacon_block_root;
if let Entry::Occupied(mut entry) =
self.awaiting_data_columns_per_root.entry(root)
{
let ids = entry.get_mut();
ids.retain(|&id| id != col_id);
if ids.is_empty() {
entry.remove_entry();
}
}
if self
.ready_work_tx
.try_send(ReadyWork::DataColumn(data_column))
.is_err()
{
error!(
hint = "system may be overloaded",
"Ignored expired gossip data column"
);
}
}
}
} }
metrics::set_gauge_vec( metrics::set_gauge_vec(

View File

@@ -64,8 +64,8 @@ use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction;
use beacon_processor::{ use beacon_processor::{
DuplicateCache, GossipAggregatePackage, GossipAttestationBatch, DuplicateCache, GossipAggregatePackage, GossipAttestationBatch,
work_reprocessing_queue::{ work_reprocessing_queue::{
QueuedAggregate, QueuedGossipBlock, QueuedGossipEnvelope, QueuedLightClientUpdate, QueuedAggregate, QueuedGossipBlock, QueuedGossipDataColumn, QueuedGossipEnvelope,
QueuedUnaggregate, ReprocessQueueMessage, QueuedLightClientUpdate, QueuedUnaggregate, ReprocessQueueMessage,
}, },
}; };
@@ -728,19 +728,46 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.. ..
} => { } => {
debug!( debug!(
action = "ignoring", action = "queuing for reprocessing",
%unknown_block_root, %unknown_block_root,
"Unknown block root for column" "Unknown block root for column"
); );
// TODO(gloas): wire this into proper lookup sync. Sending
// `UnknownBlockHashFromAttestation` here is a Fulu-shaped fallback that
// mixes column processing with the attestation lookup path and is not
// the right primitive for Gloas column lookups.
self.propagate_validation_result( self.propagate_validation_result(
message_id, message_id.clone(),
peer_id, peer_id,
MessageAcceptance::Ignore, MessageAcceptance::Ignore,
); );
// Queue the column for reprocessing when the block arrives.
let processor = self.clone();
let reprocess_msg = ReprocessQueueMessage::UnknownBlockDataColumn(
QueuedGossipDataColumn {
beacon_block_root: unknown_block_root,
process_fn: Box::new(move || {
// Re-dispatch through the normal gossip column processing path.
let _ = processor.send_gossip_data_column_sidecar(
message_id,
peer_id,
subnet_id,
column_sidecar,
seen_duration,
);
}),
},
);
if self
.beacon_processor_send
.try_send(WorkEvent {
drop_during_sync: false,
work: Work::Reprocess(reprocess_msg),
})
.is_err()
{
debug!(
%unknown_block_root,
"Failed to queue data column for reprocessing"
);
}
} }
GossipDataColumnError::InvalidVariant GossipDataColumnError::InvalidVariant
| GossipDataColumnError::PubkeyCacheTimeout | GossipDataColumnError::PubkeyCacheTimeout