diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index ce3851ea54..cd58558b12 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -41,8 +41,8 @@ pub use crate::scheduler::BeaconProcessorQueueLengths; use crate::scheduler::work_queue::WorkQueues; use crate::work_reprocessing_queue::{ - QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, QueuedGossipEnvelope, - ReprocessQueueMessage, + QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, QueuedGossipDataColumn, + QueuedGossipEnvelope, ReprocessQueueMessage, }; use futures::stream::{Stream, StreamExt}; use futures::task::Poll; @@ -304,6 +304,10 @@ impl From for WorkEvent { work: Work::ColumnReconstruction(process_fn), } } + ReadyWork::DataColumn(QueuedGossipDataColumn { process_fn, .. }) => Self { + drop_during_sync: true, + work: Work::UnknownBlockAttestation { process_fn }, + }, } } } diff --git a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs index b1fa56af01..1b875162c6 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs @@ -123,6 +123,8 @@ pub enum ReprocessQueueMessage { UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate), /// A new backfill batch that needs to be scheduled for processing. BackfillSync(QueuedBackfillBatch), + /// A gossip data column that references an unknown block. + UnknownBlockDataColumn(QueuedGossipDataColumn), /// A delayed column reconstruction that needs checking DelayColumnReconstruction(QueuedColumnReconstruction), } @@ -138,6 +140,7 @@ pub enum ReadyWork { LightClientUpdate(QueuedLightClientUpdate), BackfillSync(QueuedBackfillBatch), ColumnReconstruction(QueuedColumnReconstruction), + DataColumn(QueuedGossipDataColumn), } /// An Attestation for which the corresponding block was not seen while processing, queued for @@ -200,6 +203,12 @@ pub struct QueuedColumnReconstruction { pub process_fn: AsyncFn, } +/// A gossip data column that references an unknown block, queued for later reprocessing. +pub struct QueuedGossipDataColumn { + pub beacon_block_root: Hash256, + pub process_fn: BlockingFn, +} + impl TryFrom> for QueuedBackfillBatch { type Error = WorkEvent; @@ -240,6 +249,8 @@ enum InboundEvent { ReadyBackfillSync(QueuedBackfillBatch), /// A column reconstruction that was queued is ready for processing. ReadyColumnReconstruction(QueuedColumnReconstruction), + /// A gossip data column that timed out waiting for its block. + ReadyDataColumn(usize), /// A message sent to the `ReprocessQueue` Msg(ReprocessQueueMessage), } @@ -264,6 +275,7 @@ struct ReprocessQueue { lc_updates_delay_queue: DelayQueue, /// Queue to manage scheduled column reconstructions. column_reconstructions_delay_queue: DelayQueue, + data_columns_delay_queue: DelayQueue, /* Queued items */ /// Queued blocks. @@ -284,10 +296,15 @@ struct ReprocessQueue { queued_column_reconstructions: HashMap>, /// Queued backfill batches queued_backfill_batches: Vec, + /// Queued gossip data columns awaiting their block. + queued_gossip_data_columns: FnvHashMap, + /// Data columns per block root. + awaiting_data_columns_per_root: HashMap>, /* Aux */ /// Next attestation id, used for both aggregated and unaggregated attestations next_attestation: usize, + next_data_column: usize, next_lc_update: usize, early_block_debounce: TimeLatch, envelope_delay_debounce: TimeLatch, @@ -387,6 +404,13 @@ impl Stream for ReprocessQueue { Poll::Ready(None) | Poll::Pending => (), } + match self.data_columns_delay_queue.poll_expired(cx) { + Poll::Ready(Some(col_id)) => { + return Poll::Ready(Some(InboundEvent::ReadyDataColumn(col_id.into_inner()))); + } + Poll::Ready(None) | Poll::Pending => (), + } + if let Some(next_backfill_batch_event) = self.next_backfill_batch_event.as_mut() { match next_backfill_batch_event.as_mut().poll(cx) { Poll::Ready(_) => { @@ -455,6 +479,7 @@ impl ReprocessQueue { attestations_delay_queue: DelayQueue::new(), lc_updates_delay_queue: DelayQueue::new(), column_reconstructions_delay_queue: DelayQueue::new(), + data_columns_delay_queue: DelayQueue::new(), queued_gossip_block_roots: HashSet::new(), awaiting_envelopes_per_root: HashMap::new(), queued_lc_updates: FnvHashMap::default(), @@ -464,7 +489,10 @@ impl ReprocessQueue { awaiting_lc_updates_per_parent_root: HashMap::new(), queued_backfill_batches: Vec::new(), queued_column_reconstructions: HashMap::new(), + queued_gossip_data_columns: FnvHashMap::default(), + awaiting_data_columns_per_root: HashMap::new(), next_attestation: 0, + next_data_column: 0, next_lc_update: 0, early_block_debounce: TimeLatch::default(), envelope_delay_debounce: TimeLatch::default(), @@ -688,6 +716,29 @@ impl ReprocessQueue { self.next_attestation += 1; } + InboundEvent::Msg(UnknownBlockDataColumn(queued_data_column)) => { + if self.queued_gossip_data_columns.len() >= MAXIMUM_QUEUED_ATTESTATIONS { + return; + } + + let col_id = self.next_data_column; + + let delay_key = self + .data_columns_delay_queue + .insert(col_id, QUEUED_ATTESTATION_DELAY); + + // Register this column for the corresponding block root. + self.awaiting_data_columns_per_root + .entry(queued_data_column.beacon_block_root) + .or_default() + .push(col_id); + + // Store the column and its info. + self.queued_gossip_data_columns + .insert(col_id, (queued_data_column, delay_key)); + + self.next_data_column += 1; + } InboundEvent::Msg(UnknownLightClientOptimisticUpdate( queued_light_client_optimistic_update, )) => { @@ -800,6 +851,29 @@ impl ReprocessQueue { ); } } + + // Unqueue the data columns we have for this root, if any. + if let Some(queued_ids) = + self.awaiting_data_columns_per_root.remove(&block_root) + { + for col_id in queued_ids { + if let Some((data_column, delay_key)) = + self.queued_gossip_data_columns.remove(&col_id) + { + self.data_columns_delay_queue.remove(&delay_key); + if self + .ready_work_tx + .try_send(ReadyWork::DataColumn(data_column)) + .is_err() + { + error!( + ?block_root, + "Failed to send data column for reprocessing" + ); + } + } + } + } } InboundEvent::Msg(NewLightClientOptimisticUpdate { parent_root }) => { // Unqueue the light client optimistic updates we have for this root, if any. @@ -1053,6 +1127,31 @@ impl ReprocessQueue { ); } } + InboundEvent::ReadyDataColumn(col_id) => { + if let Some((data_column, _)) = self.queued_gossip_data_columns.remove(&col_id) { + // Clean up the per-root index. + let root = data_column.beacon_block_root; + if let Entry::Occupied(mut entry) = + self.awaiting_data_columns_per_root.entry(root) + { + let ids = entry.get_mut(); + ids.retain(|&id| id != col_id); + if ids.is_empty() { + entry.remove_entry(); + } + } + if self + .ready_work_tx + .try_send(ReadyWork::DataColumn(data_column)) + .is_err() + { + error!( + hint = "system may be overloaded", + "Ignored expired gossip data column" + ); + } + } + } } metrics::set_gauge_vec( diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 3e8845f017..5dce67ec55 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -64,8 +64,8 @@ use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction; use beacon_processor::{ DuplicateCache, GossipAggregatePackage, GossipAttestationBatch, work_reprocessing_queue::{ - QueuedAggregate, QueuedGossipBlock, QueuedGossipEnvelope, QueuedLightClientUpdate, - QueuedUnaggregate, ReprocessQueueMessage, + QueuedAggregate, QueuedGossipBlock, QueuedGossipDataColumn, QueuedGossipEnvelope, + QueuedLightClientUpdate, QueuedUnaggregate, ReprocessQueueMessage, }, }; @@ -728,19 +728,46 @@ impl NetworkBeaconProcessor { .. } => { debug!( - action = "ignoring", + action = "queuing for reprocessing", %unknown_block_root, "Unknown block root for column" ); - // TODO(gloas): wire this into proper lookup sync. Sending - // `UnknownBlockHashFromAttestation` here is a Fulu-shaped fallback that - // mixes column processing with the attestation lookup path and is not - // the right primitive for Gloas column lookups. self.propagate_validation_result( - message_id, + message_id.clone(), peer_id, MessageAcceptance::Ignore, ); + + // Queue the column for reprocessing when the block arrives. + let processor = self.clone(); + let reprocess_msg = ReprocessQueueMessage::UnknownBlockDataColumn( + QueuedGossipDataColumn { + beacon_block_root: unknown_block_root, + process_fn: Box::new(move || { + // Re-dispatch through the normal gossip column processing path. + let _ = processor.send_gossip_data_column_sidecar( + message_id, + peer_id, + subnet_id, + column_sidecar, + seen_duration, + ); + }), + }, + ); + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(reprocess_msg), + }) + .is_err() + { + debug!( + %unknown_block_root, + "Failed to queue data column for reprocessing" + ); + } } GossipDataColumnError::InvalidVariant | GossipDataColumnError::PubkeyCacheTimeout