From 21bf3d37cdce46632cfa4e3f5abb194f172c6851 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Wed, 5 Oct 2022 02:52:23 -0500 Subject: [PATCH] Reprocess blob sidecar messages --- .../beacon_chain/src/blob_verification.rs | 8 + .../network/src/beacon_processor/mod.rs | 60 +++++-- .../work_reprocessing_queue.rs | 159 +++++++++++++++++- .../beacon_processor/worker/gossip_methods.rs | 80 ++++++++- 4 files changed, 288 insertions(+), 19 deletions(-) diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 4d1627567c..18708aa5ff 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -62,6 +62,14 @@ pub enum BlobError { /// be equal to the given sidecar. RepeatSidecar { proposer: u64, slot: Slot }, + /// The `blobs_sidecar.message.beacon_block_root` block is unknown. + /// + /// ## Peer scoring + /// + /// The attestation points to a block we have not yet imported. It's unclear if the attestation + /// is valid or not. + UnknownHeadBlock { beacon_block_root: Hash256 }, + /// There was an error whilst processing the sync contribution. It is not known if it is valid or invalid. /// /// ## Peer scoring diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 947c215b3d..87e092332f 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -80,6 +80,8 @@ mod worker; use crate::beacon_processor::work_reprocessing_queue::QueuedGossipBlock; pub use worker::{ChainSegmentProcessId, GossipAggregatePackage, GossipAttestationPackage}; +use self::work_reprocessing_queue::QueuedBlobsSidecar; + /// The maximum size of the channel for work events to the `BeaconProcessor`. /// /// Setting this too low will cause consensus messages to be dropped. @@ -116,6 +118,8 @@ const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024; //FIXME(sean) verify const MAX_GOSSIP_BLOB_QUEUE_LEN: usize = 1_024; +//FIXME(sean) verify +const MAX_BLOBS_SIDECAR_REPROCESS_QUEUE_LEN: usize = 1_024; /// The maximum number of queued `SignedBeaconBlock` objects received prior to their slot (but /// within acceptable clock disparity) that will be queued before we start dropping them. @@ -206,6 +210,7 @@ pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request"; pub const BLOBS_BY_RANGE_REQUEST: &str = "blobs_by_range_request"; pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation"; pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; +pub const UNKNOWN_BLOBS_SIDECAR: &str = "unknown_blobs_sidecar"; /// A simple first-in-first-out queue with a maximum length. struct FifoQueue { @@ -413,7 +418,7 @@ impl WorkEvent { pub fn gossip_blobs_sidecar( message_id: MessageId, peer_id: PeerId, - peer_client: Client, + _peer_client: Client, blobs: Arc>, seen_timestamp: Duration, ) -> Self { @@ -422,7 +427,6 @@ impl WorkEvent { work: Work::GossipBlobsSidecar { message_id, peer_id, - peer_client, blobs, seen_timestamp, }, @@ -670,6 +674,20 @@ impl std::convert::From> for WorkEvent { seen_timestamp, }, }, + ReadyWork::BlobsSidecar(QueuedBlobsSidecar { + peer_id, + message_id, + blobs_sidecar, + seen_timestamp, + }) => Self { + drop_during_sync: true, + work: Work::UnknownBlobsSidecar { + message_id, + peer_id, + blobs: blobs_sidecar, + seen_timestamp, + }, + }, } } } @@ -722,7 +740,12 @@ pub enum Work { GossipBlobsSidecar { message_id: MessageId, peer_id: PeerId, - peer_client: Client, + blobs: Arc>, + seen_timestamp: Duration, + }, + UnknownBlobsSidecar { + message_id: MessageId, + peer_id: PeerId, blobs: Arc>, seen_timestamp: Duration, }, @@ -815,6 +838,7 @@ impl Work { Work::BlobsByRangeRequest { .. } => BLOBS_BY_RANGE_REQUEST, Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION, Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, + Work::UnknownBlobsSidecar { .. } => UNKNOWN_BLOBS_SIDECAR, } } } @@ -931,6 +955,7 @@ impl BeaconProcessor { LifoQueue::new(MAX_AGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN); let mut unknown_block_attestation_queue = LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN); + let mut unknown_blobs_sidecar_queue = LifoQueue::new(MAX_BLOBS_SIDECAR_REPROCESS_QUEUE_LEN); let mut sync_message_queue = LifoQueue::new(MAX_SYNC_MESSAGE_QUEUE_LEN); let mut sync_contribution_queue = LifoQueue::new(MAX_SYNC_CONTRIBUTION_QUEUE_LEN); @@ -1312,6 +1337,9 @@ impl BeaconProcessor { Work::UnknownBlockAggregate { .. } => { unknown_block_aggregate_queue.push(work) } + Work::UnknownBlobsSidecar { .. } => { + unknown_blobs_sidecar_queue.push(work) + } } } } @@ -1531,20 +1559,16 @@ impl BeaconProcessor { Work::GossipBlobsSidecar { message_id, peer_id, - peer_client, blobs, seen_timestamp, } => task_spawner.spawn_async(async move { - worker - .process_gossip_blob( - message_id, - peer_id, - peer_client, - blobs, - work_reprocessing_tx, - seen_timestamp, - ) - .await + worker.process_gossip_blob( + message_id, + peer_id, + blobs, + Some(work_reprocessing_tx), + seen_timestamp, + ) }), /* * Import for blocks that we received earlier than their intended slot. @@ -1731,6 +1755,14 @@ impl BeaconProcessor { seen_timestamp, ) }), + Work::UnknownBlobsSidecar { + message_id, + peer_id, + blobs, + seen_timestamp, + } => task_spawner.spawn_blocking(move || { + worker.process_gossip_blob(message_id, peer_id, blobs, None, seen_timestamp) + }), }; } } diff --git a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs index 2aeec11c32..b08542eeb5 100644 --- a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs +++ b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs @@ -30,7 +30,10 @@ use task_executor::TaskExecutor; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::time::error::Error as TimeError; use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey}; -use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SubnetId}; +use types::{ + Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SignedBlobsSidecar, + SubnetId, +}; const TASK_NAME: &str = "beacon_processor_reprocess_queue"; const GOSSIP_BLOCKS: &str = "gossip_blocks"; @@ -44,6 +47,10 @@ const ADDITIONAL_QUEUED_BLOCK_DELAY: Duration = Duration::from_millis(5); /// For how long to queue aggregated and unaggregated attestations for re-processing. pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12); +/// For how long to queue blob sidecars for re-processing. +/// TODO: rethink duration +pub const QUEUED_BLOBS_SIDECARS_DELAY: Duration = Duration::from_secs(6); + /// For how long to queue rpc blocks before sending them back for reprocessing. pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(3); @@ -55,6 +62,10 @@ const MAXIMUM_QUEUED_BLOCKS: usize = 16; /// How many attestations we keep before new ones get dropped. const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384; +/// TODO: fix number +/// How many blobs we keep before new ones get dropped. +const MAXIMUM_QUEUED_BLOB_SIDECARS: usize = 16_384; + /// Messages that the scheduler can receive. pub enum ReprocessQueueMessage { /// A block that has been received early and we should queue for later processing. @@ -69,6 +80,8 @@ pub enum ReprocessQueueMessage { UnknownBlockUnaggregate(QueuedUnaggregate), /// An aggregated attestation that references an unknown block. UnknownBlockAggregate(QueuedAggregate), + /// A blob sidecar that references an unknown block. + UnknownBlobSidecar(QueuedBlobsSidecar), } /// Events sent by the scheduler once they are ready for re-processing. @@ -77,6 +90,7 @@ pub enum ReadyWork { RpcBlock(QueuedRpcBlock), Unaggregate(QueuedUnaggregate), Aggregate(QueuedAggregate), + BlobsSidecar(QueuedBlobsSidecar), } /// An Attestation for which the corresponding block was not seen while processing, queued for @@ -118,6 +132,15 @@ pub struct QueuedRpcBlock { pub should_process: bool, } +/// A blob sidecar for which the corresponding block was not seen while processing, queued for +/// later. +pub struct QueuedBlobsSidecar { + pub peer_id: PeerId, + pub message_id: MessageId, + pub blobs_sidecar: Arc>, + pub seen_timestamp: Duration, +} + /// Unifies the different messages processed by the block delay queue. enum InboundEvent { /// A gossip block that was queued for later processing and is ready for import. @@ -127,6 +150,8 @@ enum InboundEvent { ReadyRpcBlock(QueuedRpcBlock), /// An aggregated or unaggregated attestation is ready for re-processing. ReadyAttestation(QueuedAttestationId), + /// A blob sidecar is ready for re-processing. + ReadyBlobsSidecar(QueuedBlobsSidecarId), /// A `DelayQueue` returned an error. DelayQueueError(TimeError, &'static str), /// A message sent to the `ReprocessQueue` @@ -147,6 +172,7 @@ struct ReprocessQueue { rpc_block_delay_queue: DelayQueue>, /// Queue to manage scheduled attestations. attestations_delay_queue: DelayQueue, + blobs_sidecar_delay_queue: DelayQueue, /* Queued items */ /// Queued blocks. @@ -155,15 +181,19 @@ struct ReprocessQueue { queued_aggregates: FnvHashMap, DelayKey)>, /// Queued attestations. queued_unaggregates: FnvHashMap, DelayKey)>, + queued_blob_sidecars: FnvHashMap, DelayKey)>, /// Attestations (aggregated and unaggregated) per root. awaiting_attestations_per_root: HashMap>, + awaiting_blobs_sidecars_per_root: HashMap>, /* Aux */ /// Next attestation id, used for both aggregated and unaggregated attestations next_attestation: usize, + next_sidecar: usize, early_block_debounce: TimeLatch, rpc_block_debounce: TimeLatch, attestation_delay_debounce: TimeLatch, + blobs_sidecar_debounce: TimeLatch, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -172,6 +202,9 @@ enum QueuedAttestationId { Unaggregate(usize), } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +struct QueuedBlobsSidecarId(usize); + impl QueuedAggregate { pub fn beacon_block_root(&self) -> &Hash256 { &self.attestation.message.aggregate.data.beacon_block_root @@ -235,6 +268,21 @@ impl Stream for ReprocessQueue { Poll::Ready(None) | Poll::Pending => (), } + match self.blobs_sidecar_delay_queue.poll_expired(cx) { + Poll::Ready(Some(Ok(id))) => { + return Poll::Ready(Some(InboundEvent::ReadyBlobsSidecar(id.into_inner()))); + } + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Some(InboundEvent::DelayQueueError( + e, + "blobs_sidecar_queue", + ))); + } + // `Poll::Ready(None)` means that there are no more entries in the delay queue and we + // will continue to get this result until something else is added into the queue. + Poll::Ready(None) | Poll::Pending => (), + } + // Last empty the messages channel. match self.work_reprocessing_rx.poll_recv(cx) { Poll::Ready(Some(message)) => return Poll::Ready(Some(InboundEvent::Msg(message))), @@ -264,14 +312,19 @@ pub fn spawn_reprocess_scheduler( gossip_block_delay_queue: DelayQueue::new(), rpc_block_delay_queue: DelayQueue::new(), attestations_delay_queue: DelayQueue::new(), + blobs_sidecar_delay_queue: DelayQueue::new(), queued_gossip_block_roots: HashSet::new(), queued_aggregates: FnvHashMap::default(), queued_unaggregates: FnvHashMap::default(), + queued_blob_sidecars: FnvHashMap::default(), awaiting_attestations_per_root: HashMap::new(), + awaiting_blobs_sidecars_per_root: HashMap::new(), next_attestation: 0, + next_sidecar: 0, early_block_debounce: TimeLatch::default(), rpc_block_debounce: TimeLatch::default(), attestation_delay_debounce: TimeLatch::default(), + blobs_sidecar_debounce: TimeLatch::default(), }; executor.spawn( @@ -473,6 +526,39 @@ impl ReprocessQueue { self.next_attestation += 1; } + InboundEvent::Msg(UnknownBlobSidecar(queued_blob_sidecar)) => { + if self.blobs_sidecar_delay_queue.len() >= MAXIMUM_QUEUED_BLOB_SIDECARS { + if self.blobs_sidecar_debounce.elapsed() { + error!( + log, + "Blobs sidecar queue is full"; + "queue_size" => MAXIMUM_QUEUED_BLOB_SIDECARS, + "msg" => "check system clock" + ); + } + // Drop the attestation. + return; + } + + let id = QueuedBlobsSidecarId(self.next_sidecar); + + // Register the delay. + let delay_key = self + .blobs_sidecar_delay_queue + .insert(id, QUEUED_BLOBS_SIDECARS_DELAY); + + // Register this sidecar for the corresponding root. + self.awaiting_blobs_sidecars_per_root + .entry(queued_blob_sidecar.blobs_sidecar.message.beacon_block_root) + .or_default() + .push(id); + + // Store the blob sidecar and its info. + self.queued_blob_sidecars + .insert(self.next_sidecar, (queued_blob_sidecar, delay_key)); + + self.next_sidecar += 1; + } InboundEvent::Msg(BlockImported(root)) => { // Unqueue the attestations we have for this root, if any. if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&root) { @@ -517,6 +603,43 @@ impl ReprocessQueue { } } } + // Unqueue the blob sidecars we have for this root, if any. + // TODO: merge the 2 data structures. + if let Some(queued_ids) = self.awaiting_blobs_sidecars_per_root.remove(&root) { + for id in queued_ids { + // metrics::inc_counter( + // &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_ATTESTATIONS, + // ); + + if let Some((work, delay_key)) = self + .queued_blob_sidecars + .remove(&id.0) + .map(|(blobs_sidecar, delay_key)| { + (ReadyWork::BlobsSidecar(blobs_sidecar), delay_key) + }) + { + // Remove the delay. + self.blobs_sidecar_delay_queue.remove(&delay_key); + + // Send the work. + if self.ready_work_tx.try_send(work).is_err() { + error!( + log, + "Failed to send scheduled blob sidecar"; + ); + } + } else { + // There is a mismatch between the blob sidecar ids registered for this + // root and the queued blob sidecars. This should never happen. + error!( + log, + "Unknown queued blob sidecar for block root"; + "block_root" => ?root, + "id" => ?id, + ); + } + } + } } // A block that was queued for later processing is now ready to be processed. InboundEvent::ReadyGossipBlock(ready_block) => { @@ -591,6 +714,40 @@ impl ReprocessQueue { } } } + InboundEvent::ReadyBlobsSidecar(queued_blobs_sidecar_id) => { + // metrics::inc_counter( + // &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS, + // ); + + if let Some((root, work)) = self + .queued_blob_sidecars + .remove(&queued_blobs_sidecar_id.0) + .map(|(blobs_sidecar, _delay_key)| { + ( + blobs_sidecar.blobs_sidecar.message.beacon_block_root, + ReadyWork::BlobsSidecar(blobs_sidecar), + ) + }) + { + if self.ready_work_tx.try_send(work).is_err() { + error!( + log, + "Failed to send scheduled attestation"; + ); + } + + if let Some(queued_blob_sidecars) = + self.awaiting_blobs_sidecars_per_root.get_mut(&root) + { + if let Some(index) = queued_blob_sidecars + .iter() + .position(|&id| id == queued_blobs_sidecar_id) + { + queued_blob_sidecars.swap_remove(index); + } + } + } + } } metrics::set_gauge_vec( diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 37c5f8c776..b59537a1d2 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -1,3 +1,4 @@ +use crate::beacon_processor::work_reprocessing_queue::QueuedBlobsSidecar; use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; use beacon_chain::store::Error; @@ -696,13 +697,12 @@ impl Worker { } #[allow(clippy::too_many_arguments)] - pub async fn process_gossip_blob( + pub fn process_gossip_blob( self, message_id: MessageId, peer_id: PeerId, - peer_client: Client, blob: Arc>, - reprocess_tx: mpsc::Sender>, + reprocess_tx: Option>>, seen_timestamp: Duration, ) { match self.chain.verify_blobs_sidecar_for_gossip(&blob) { @@ -714,8 +714,9 @@ impl Worker { Err(error) => self.handle_blobs_verification_failure( peer_id, message_id, - Some(reprocess_tx), + reprocess_tx, error, + blob, seen_timestamp, ), }; @@ -2233,7 +2234,78 @@ impl Worker { message_id: MessageId, reprocess_tx: Option>>, error: BlobError, + blobs_sidecar: Arc>, seen_timestamp: Duration, ) { + // TODO: metrics + match &error { + BlobError::FutureSlot { .. } => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + BlobError::PastSlot { .. } => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + BlobError::BeaconChainError(e) => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + BlobError::BlobOutOfRange { blob_index } => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + } + BlobError::InvalidKZGCommitment => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + } + BlobError::ProposalSignatureInvalid => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + } + BlobError::RepeatSidecar { proposer, slot } => { + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + } + BlobError::UnknownHeadBlock { beacon_block_root } => { + debug!( + self.log, + "Blob sidecar for unknown block"; + "peer_id" => %peer_id, + "block" => ?beacon_block_root + ); + if let Some(sender) = reprocess_tx { + // We don't know the block, get the sync manager to handle the block lookup, and + // send the attestation to be scheduled for re-processing. + self.sync_tx + .send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root)) + .unwrap_or_else(|_| { + warn!( + self.log, + "Failed to send to sync service"; + "msg" => "UnknownBlockHash" + ) + }); + let msg = ReprocessQueueMessage::UnknownBlobSidecar(QueuedBlobsSidecar { + peer_id, + message_id, + blobs_sidecar, + seen_timestamp, + }); + + if sender.try_send(msg).is_err() { + error!( + self.log, + "Failed to send blob sidecar for re-processing"; + ) + } + } else { + // We shouldn't make any further attempts to process this attestation. + // + // Don't downscore the peer since it's not clear if we requested this head + // block from them or not. + self.propagate_validation_result( + message_id, + peer_id, + MessageAcceptance::Ignore, + ); + } + + return; + } + } } }