From a7351c00c0354f54981a929cc9bb1f4c31f43322 Mon Sep 17 00:00:00 2001 From: GeemoCandama Date: Tue, 24 Jan 2023 22:17:50 +0000 Subject: [PATCH] light client optimistic update reprocessing (#3799) ## Issue Addressed Currently there is a race between receiving blocks and receiving light client optimistic updates (in unstable), which results in processing errors. This is a continuation of PR #3693 and seeks to progress on issue #3651 ## Proposed Changes Add the parent_root to ReprocessQueueMessage::BlockImported so we can remove blocks from queue when a block arrives that has the same parent root. We use the parent root as opposed to the block_root because the LightClientOptimisticUpdate does not contain the block_root. If light_client_optimistic_update.attested_header.canonical_root() != head_block.message().parent_root() then we queue the update. Otherwise we process immediately. ## Additional Info michaelsproul came up with this idea. The code was heavily based off of the attestation reprocessing. I have not properly tested this to see if it works as intended. --- ...t_client_optimistic_update_verification.rs | 15 ++ .../network/src/beacon_processor/mod.rs | 50 ++++- .../work_reprocessing_queue.rs | 200 +++++++++++++++++- .../beacon_processor/worker/gossip_methods.rs | 122 ++++++++--- .../beacon_processor/worker/sync_methods.rs | 6 +- beacon_node/network/src/metrics.rs | 15 ++ 6 files changed, 371 insertions(+), 37 deletions(-) diff --git a/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs b/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs index ec9c90e735..20d7181808 100644 --- a/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs +++ b/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs @@ -2,6 +2,7 @@ use crate::{ beacon_chain::MAXIMUM_GOSSIP_CLOCK_DISPARITY, BeaconChain, BeaconChainError, BeaconChainTypes, }; use derivative::Derivative; +use eth2::types::Hash256; use slot_clock::SlotClock; use std::time::Duration; use strum::AsRefStr; @@ -36,6 +37,8 @@ pub enum Error { SigSlotStartIsNone, /// Failed to construct a LightClientOptimisticUpdate from state. FailedConstructingUpdate, + /// Unknown block with parent root. + UnknownBlockParentRoot(Hash256), /// Beacon chain error occured. BeaconChainError(BeaconChainError), LightClientUpdateError(LightClientUpdateError), @@ -58,6 +61,7 @@ impl From for Error { #[derivative(Clone(bound = "T: BeaconChainTypes"))] pub struct VerifiedLightClientOptimisticUpdate { light_client_optimistic_update: LightClientOptimisticUpdate, + pub parent_root: Hash256, seen_timestamp: Duration, } @@ -107,6 +111,16 @@ impl VerifiedLightClientOptimisticUpdate { None => return Err(Error::SigSlotStartIsNone), } + // check if we can process the optimistic update immediately + // otherwise queue + let canonical_root = light_client_optimistic_update + .attested_header + .canonical_root(); + + if canonical_root != head_block.message().parent_root() { + return Err(Error::UnknownBlockParentRoot(canonical_root)); + } + let optimistic_update = LightClientOptimisticUpdate::new(&chain.spec, head_block, &attested_state)?; @@ -119,6 +133,7 @@ impl VerifiedLightClientOptimisticUpdate { Ok(Self { light_client_optimistic_update, + parent_root: canonical_root, seen_timestamp, }) } diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 743a97a29c..8118443a65 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -67,7 +67,8 @@ use types::{ SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; use work_reprocessing_queue::{ - spawn_reprocess_scheduler, QueuedAggregate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork, + spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock, + QueuedUnaggregate, ReadyWork, }; use worker::{Toolbox, Worker}; @@ -137,6 +138,10 @@ const MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN: usize = 1_024; /// before we start dropping them. const MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN: usize = 1_024; +/// The maximum number of queued `LightClientOptimisticUpdate` objects received on gossip that will be stored +/// for reprocessing before we start dropping them. +const MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN: usize = 128; + /// The maximum number of queued `SyncCommitteeMessage` objects that will be stored before we start dropping /// them. const MAX_SYNC_MESSAGE_QUEUE_LEN: usize = 2048; @@ -213,6 +218,7 @@ pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request"; pub const LIGHT_CLIENT_BOOTSTRAP_REQUEST: &str = "light_client_bootstrap"; pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation"; pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; +pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update"; /// A simple first-in-first-out queue with a maximum length. struct FifoQueue { @@ -694,6 +700,21 @@ impl std::convert::From> for WorkEvent { seen_timestamp, }, }, + ReadyWork::LightClientUpdate(QueuedLightClientUpdate { + peer_id, + message_id, + light_client_optimistic_update, + seen_timestamp, + .. + }) => Self { + drop_during_sync: true, + work: Work::UnknownLightClientOptimisticUpdate { + message_id, + peer_id, + light_client_optimistic_update, + seen_timestamp, + }, + }, } } } @@ -733,6 +754,12 @@ pub enum Work { aggregate: Box>, seen_timestamp: Duration, }, + UnknownLightClientOptimisticUpdate { + message_id: MessageId, + peer_id: PeerId, + light_client_optimistic_update: Box>, + seen_timestamp: Duration, + }, GossipAggregateBatch { packages: Vec>, }, @@ -845,6 +872,7 @@ impl Work { Work::LightClientBootstrapRequest { .. } => LIGHT_CLIENT_BOOTSTRAP_REQUEST, Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION, Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, + Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE, } } } @@ -979,6 +1007,8 @@ impl BeaconProcessor { // Using a FIFO queue for light client updates to maintain sequence order. let mut finality_update_queue = FifoQueue::new(MAX_GOSSIP_FINALITY_UPDATE_QUEUE_LEN); let mut optimistic_update_queue = FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN); + let mut unknown_light_client_update_queue = + FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN); // Using a FIFO queue since blocks need to be imported sequentially. let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN); @@ -1346,6 +1376,9 @@ impl BeaconProcessor { Work::UnknownBlockAggregate { .. } => { unknown_block_aggregate_queue.push(work) } + Work::UnknownLightClientOptimisticUpdate { .. } => { + unknown_light_client_update_queue.push(work, work_id, &self.log) + } } } } @@ -1665,6 +1698,7 @@ impl BeaconProcessor { message_id, peer_id, *light_client_optimistic_update, + Some(work_reprocessing_tx), seen_timestamp, ) }), @@ -1787,6 +1821,20 @@ impl BeaconProcessor { seen_timestamp, ) }), + Work::UnknownLightClientOptimisticUpdate { + message_id, + peer_id, + light_client_optimistic_update, + seen_timestamp, + } => task_spawner.spawn_blocking(move || { + worker.process_gossip_optimistic_update( + message_id, + peer_id, + *light_client_optimistic_update, + None, + seen_timestamp, + ) + }), }; } } diff --git a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs index 2aeec11c32..8c568a7eef 100644 --- a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs +++ b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs @@ -19,7 +19,7 @@ use futures::task::Poll; use futures::{Stream, StreamExt}; use lighthouse_network::{MessageId, PeerId}; use logging::TimeLatch; -use slog::{crit, debug, error, warn, Logger}; +use slog::{crit, debug, error, trace, warn, Logger}; use slot_clock::SlotClock; use std::collections::{HashMap, HashSet}; use std::pin::Pin; @@ -30,12 +30,16 @@ use task_executor::TaskExecutor; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::time::error::Error as TimeError; use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey}; -use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SubnetId}; +use types::{ + Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof, + SignedBeaconBlock, SubnetId, +}; const TASK_NAME: &str = "beacon_processor_reprocess_queue"; const GOSSIP_BLOCKS: &str = "gossip_blocks"; const RPC_BLOCKS: &str = "rpc_blocks"; const ATTESTATIONS: &str = "attestations"; +const LIGHT_CLIENT_UPDATES: &str = "lc_updates"; /// Queue blocks for re-processing with an `ADDITIONAL_QUEUED_BLOCK_DELAY` after the slot starts. /// This is to account for any slight drift in the system clock. @@ -44,6 +48,9 @@ const ADDITIONAL_QUEUED_BLOCK_DELAY: Duration = Duration::from_millis(5); /// For how long to queue aggregated and unaggregated attestations for re-processing. pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12); +/// For how long to queue light client updates for re-processing. +pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12); + /// For how long to queue rpc blocks before sending them back for reprocessing. pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(3); @@ -55,6 +62,9 @@ const MAXIMUM_QUEUED_BLOCKS: usize = 16; /// How many attestations we keep before new ones get dropped. const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384; +/// How many light client updates we keep before new ones get dropped. +const MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES: usize = 128; + /// Messages that the scheduler can receive. pub enum ReprocessQueueMessage { /// A block that has been received early and we should queue for later processing. @@ -62,13 +72,18 @@ pub enum ReprocessQueueMessage { /// A gossip block for hash `X` is being imported, we should queue the rpc block for the same /// hash until the gossip block is imported. RpcBlock(QueuedRpcBlock), - /// A block that was successfully processed. We use this to handle attestations for unknown - /// blocks. - BlockImported(Hash256), + /// A block that was successfully processed. We use this to handle attestations and light client updates + /// for unknown blocks. + BlockImported { + block_root: Hash256, + parent_root: Hash256, + }, /// An unaggregated attestation that references an unknown block. UnknownBlockUnaggregate(QueuedUnaggregate), /// An aggregated attestation that references an unknown block. UnknownBlockAggregate(QueuedAggregate), + /// A light client optimistic update that references a parent root that has not been seen as a parent. + UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate), } /// Events sent by the scheduler once they are ready for re-processing. @@ -77,6 +92,7 @@ pub enum ReadyWork { RpcBlock(QueuedRpcBlock), Unaggregate(QueuedUnaggregate), Aggregate(QueuedAggregate), + LightClientUpdate(QueuedLightClientUpdate), } /// An Attestation for which the corresponding block was not seen while processing, queued for @@ -99,6 +115,16 @@ pub struct QueuedAggregate { pub seen_timestamp: Duration, } +/// A light client update for which the corresponding parent block was not seen while processing, +/// queued for later. +pub struct QueuedLightClientUpdate { + pub peer_id: PeerId, + pub message_id: MessageId, + pub light_client_optimistic_update: Box>, + pub parent_root: Hash256, + pub seen_timestamp: Duration, +} + /// A block that arrived early and has been queued for later import. pub struct QueuedGossipBlock { pub peer_id: PeerId, @@ -127,6 +153,8 @@ enum InboundEvent { ReadyRpcBlock(QueuedRpcBlock), /// An aggregated or unaggregated attestation is ready for re-processing. ReadyAttestation(QueuedAttestationId), + /// A light client update that is ready for re-processing. + ReadyLightClientUpdate(QueuedLightClientUpdateId), /// A `DelayQueue` returned an error. DelayQueueError(TimeError, &'static str), /// A message sent to the `ReprocessQueue` @@ -147,6 +175,8 @@ struct ReprocessQueue { rpc_block_delay_queue: DelayQueue>, /// Queue to manage scheduled attestations. attestations_delay_queue: DelayQueue, + /// Queue to manage scheduled light client updates. + lc_updates_delay_queue: DelayQueue, /* Queued items */ /// Queued blocks. @@ -157,15 +187,23 @@ struct ReprocessQueue { queued_unaggregates: FnvHashMap, DelayKey)>, /// Attestations (aggregated and unaggregated) per root. awaiting_attestations_per_root: HashMap>, + /// Queued Light Client Updates. + queued_lc_updates: FnvHashMap, DelayKey)>, + /// Light Client Updates per parent_root. + awaiting_lc_updates_per_parent_root: HashMap>, /* Aux */ /// Next attestation id, used for both aggregated and unaggregated attestations next_attestation: usize, + next_lc_update: usize, early_block_debounce: TimeLatch, rpc_block_debounce: TimeLatch, attestation_delay_debounce: TimeLatch, + lc_update_delay_debounce: TimeLatch, } +pub type QueuedLightClientUpdateId = usize; + #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum QueuedAttestationId { Aggregate(usize), @@ -235,6 +273,20 @@ impl Stream for ReprocessQueue { Poll::Ready(None) | Poll::Pending => (), } + match self.lc_updates_delay_queue.poll_expired(cx) { + Poll::Ready(Some(Ok(lc_id))) => { + return Poll::Ready(Some(InboundEvent::ReadyLightClientUpdate( + lc_id.into_inner(), + ))); + } + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Some(InboundEvent::DelayQueueError(e, "lc_updates_queue"))); + } + // `Poll::Ready(None)` means that there are no more entries in the delay queue and we + // will continue to get this result until something else is added into the queue. + Poll::Ready(None) | Poll::Pending => (), + } + // Last empty the messages channel. match self.work_reprocessing_rx.poll_recv(cx) { Poll::Ready(Some(message)) => return Poll::Ready(Some(InboundEvent::Msg(message))), @@ -264,14 +316,19 @@ pub fn spawn_reprocess_scheduler( gossip_block_delay_queue: DelayQueue::new(), rpc_block_delay_queue: DelayQueue::new(), attestations_delay_queue: DelayQueue::new(), + lc_updates_delay_queue: DelayQueue::new(), queued_gossip_block_roots: HashSet::new(), + queued_lc_updates: FnvHashMap::default(), queued_aggregates: FnvHashMap::default(), queued_unaggregates: FnvHashMap::default(), awaiting_attestations_per_root: HashMap::new(), + awaiting_lc_updates_per_parent_root: HashMap::new(), next_attestation: 0, + next_lc_update: 0, early_block_debounce: TimeLatch::default(), rpc_block_debounce: TimeLatch::default(), attestation_delay_debounce: TimeLatch::default(), + lc_update_delay_debounce: TimeLatch::default(), }; executor.spawn( @@ -473,9 +530,49 @@ impl ReprocessQueue { self.next_attestation += 1; } - InboundEvent::Msg(BlockImported(root)) => { + InboundEvent::Msg(UnknownLightClientOptimisticUpdate( + queued_light_client_optimistic_update, + )) => { + if self.lc_updates_delay_queue.len() >= MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES { + if self.lc_update_delay_debounce.elapsed() { + error!( + log, + "Light client updates delay queue is full"; + "queue_size" => MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES, + "msg" => "check system clock" + ); + } + // Drop the light client update. + return; + } + + let lc_id: QueuedLightClientUpdateId = self.next_lc_update; + + // Register the delay. + let delay_key = self + .lc_updates_delay_queue + .insert(lc_id, QUEUED_LIGHT_CLIENT_UPDATE_DELAY); + + // Register the light client update for the corresponding root. + self.awaiting_lc_updates_per_parent_root + .entry(queued_light_client_optimistic_update.parent_root) + .or_default() + .push(lc_id); + + // Store the light client update and its info. + self.queued_lc_updates.insert( + self.next_lc_update, + (queued_light_client_optimistic_update, delay_key), + ); + + self.next_lc_update += 1; + } + InboundEvent::Msg(BlockImported { + block_root, + parent_root, + }) => { // Unqueue the attestations we have for this root, if any. - if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&root) { + if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&block_root) { for id in queued_ids { metrics::inc_counter( &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_ATTESTATIONS, @@ -511,12 +608,62 @@ impl ReprocessQueue { error!( log, "Unknown queued attestation for block root"; - "block_root" => ?root, + "block_root" => ?block_root, "att_id" => ?id, ); } } } + // Unqueue the light client optimistic updates we have for this root, if any. + if let Some(queued_lc_id) = self + .awaiting_lc_updates_per_parent_root + .remove(&parent_root) + { + debug!( + log, + "Dequeuing light client optimistic updates"; + "parent_root" => %parent_root, + "count" => queued_lc_id.len(), + ); + + for lc_id in queued_lc_id { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_OPTIMISTIC_UPDATES, + ); + if let Some((work, delay_key)) = self.queued_lc_updates.remove(&lc_id).map( + |(light_client_optimistic_update, delay_key)| { + ( + ReadyWork::LightClientUpdate(light_client_optimistic_update), + delay_key, + ) + }, + ) { + // Remove the delay + self.lc_updates_delay_queue.remove(&delay_key); + + // Send the work + match self.ready_work_tx.try_send(work) { + Ok(_) => trace!( + log, + "reprocessing light client update sent"; + ), + Err(_) => error!( + log, + "Failed to send scheduled light client update"; + ), + } + } else { + // There is a mismatch between the light client update ids registered for this + // root and the queued light client updates. This should never happen. + error!( + log, + "Unknown queued light client update for parent root"; + "parent_root" => ?parent_root, + "lc_id" => ?lc_id, + ); + } + } + } } // A block that was queued for later processing is now ready to be processed. InboundEvent::ReadyGossipBlock(ready_block) => { @@ -591,6 +738,38 @@ impl ReprocessQueue { } } } + InboundEvent::ReadyLightClientUpdate(queued_id) => { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_OPTIMISTIC_UPDATES, + ); + + if let Some((parent_root, work)) = self.queued_lc_updates.remove(&queued_id).map( + |(queued_lc_update, _delay_key)| { + ( + queued_lc_update.parent_root, + ReadyWork::LightClientUpdate(queued_lc_update), + ) + }, + ) { + if self.ready_work_tx.try_send(work).is_err() { + error!( + log, + "Failed to send scheduled light client optimistic update"; + ); + } + + if let Some(queued_lc_updates) = self + .awaiting_lc_updates_per_parent_root + .get_mut(&parent_root) + { + if let Some(index) = + queued_lc_updates.iter().position(|&id| id == queued_id) + { + queued_lc_updates.swap_remove(index); + } + } + } + } } metrics::set_gauge_vec( @@ -608,5 +787,10 @@ impl ReprocessQueue { &[ATTESTATIONS], self.attestations_delay_queue.len() as i64, ); + metrics::set_gauge_vec( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL, + &[LIGHT_CLIENT_UPDATES], + self.lc_updates_delay_queue.len() as i64, + ); } } diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index c142359f3e..3601ccb195 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -28,7 +28,8 @@ use types::{ use super::{ super::work_reprocessing_queue::{ - QueuedAggregate, QueuedGossipBlock, QueuedUnaggregate, ReprocessQueueMessage, + QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate, + ReprocessQueueMessage, }, Worker, }; @@ -953,7 +954,10 @@ impl Worker { metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL); if reprocess_tx - .try_send(ReprocessQueueMessage::BlockImported(block_root)) + .try_send(ReprocessQueueMessage::BlockImported { + block_root, + parent_root: block.message().parent_root(), + }) .is_err() { error!( @@ -1330,7 +1334,7 @@ impl Worker { LightClientFinalityUpdateError::InvalidLightClientFinalityUpdate => { debug!( self.log, - "LC invalid finality update"; + "Light client invalid finality update"; "peer" => %peer_id, "error" => ?e, ); @@ -1344,7 +1348,7 @@ impl Worker { LightClientFinalityUpdateError::TooEarly => { debug!( self.log, - "LC finality update too early"; + "Light client finality update too early"; "peer" => %peer_id, "error" => ?e, ); @@ -1357,7 +1361,7 @@ impl Worker { } LightClientFinalityUpdateError::FinalityUpdateAlreadySeen => debug!( self.log, - "LC finality update already seen"; + "Light client finality update already seen"; "peer" => %peer_id, "error" => ?e, ), @@ -1366,7 +1370,7 @@ impl Worker { | LightClientFinalityUpdateError::SigSlotStartIsNone | LightClientFinalityUpdateError::FailedConstructingUpdate => debug!( self.log, - "LC error constructing finality update"; + "Light client error constructing finality update"; "peer" => %peer_id, "error" => ?e, ), @@ -1381,22 +1385,77 @@ impl Worker { message_id: MessageId, peer_id: PeerId, light_client_optimistic_update: LightClientOptimisticUpdate, + reprocess_tx: Option>>, seen_timestamp: Duration, ) { - match self - .chain - .verify_optimistic_update_for_gossip(light_client_optimistic_update, seen_timestamp) - { - Ok(_verified_light_client_optimistic_update) => { + match self.chain.verify_optimistic_update_for_gossip( + light_client_optimistic_update.clone(), + seen_timestamp, + ) { + Ok(verified_light_client_optimistic_update) => { + debug!( + self.log, + "Light client successful optimistic update"; + "peer" => %peer_id, + "parent_root" => %verified_light_client_optimistic_update.parent_root, + ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); } Err(e) => { - metrics::register_optimistic_update_error(&e); match e { - LightClientOptimisticUpdateError::InvalidLightClientOptimisticUpdate => { + LightClientOptimisticUpdateError::UnknownBlockParentRoot(parent_root) => { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_SENT_OPTIMISTIC_UPDATES, + ); debug!( self.log, - "LC invalid optimistic update"; + "Optimistic update for unknown block"; + "peer_id" => %peer_id, + "parent_root" => ?parent_root + ); + + if let Some(sender) = reprocess_tx { + let msg = ReprocessQueueMessage::UnknownLightClientOptimisticUpdate( + QueuedLightClientUpdate { + peer_id, + message_id, + light_client_optimistic_update: Box::new( + light_client_optimistic_update, + ), + parent_root, + seen_timestamp, + }, + ); + + if sender.try_send(msg).is_err() { + error!( + self.log, + "Failed to send optimistic update for re-processing"; + ) + } + } else { + debug!( + self.log, + "Not sending light client update because it had been reprocessed"; + "peer_id" => %peer_id, + "parent_root" => ?parent_root + ); + + self.propagate_validation_result( + message_id, + peer_id, + MessageAcceptance::Ignore, + ); + } + return; + } + LightClientOptimisticUpdateError::InvalidLightClientOptimisticUpdate => { + metrics::register_optimistic_update_error(&e); + + debug!( + self.log, + "Light client invalid optimistic update"; "peer" => %peer_id, "error" => ?e, ); @@ -1408,9 +1467,10 @@ impl Worker { ) } LightClientOptimisticUpdateError::TooEarly => { + metrics::register_optimistic_update_error(&e); debug!( self.log, - "LC optimistic update too early"; + "Light client optimistic update too early"; "peer" => %peer_id, "error" => ?e, ); @@ -1421,21 +1481,29 @@ impl Worker { "light_client_gossip_error", ); } - LightClientOptimisticUpdateError::OptimisticUpdateAlreadySeen => debug!( - self.log, - "LC optimistic update already seen"; - "peer" => %peer_id, - "error" => ?e, - ), + LightClientOptimisticUpdateError::OptimisticUpdateAlreadySeen => { + metrics::register_optimistic_update_error(&e); + + debug!( + self.log, + "Light client optimistic update already seen"; + "peer" => %peer_id, + "error" => ?e, + ) + } LightClientOptimisticUpdateError::BeaconChainError(_) | LightClientOptimisticUpdateError::LightClientUpdateError(_) | LightClientOptimisticUpdateError::SigSlotStartIsNone - | LightClientOptimisticUpdateError::FailedConstructingUpdate => debug!( - self.log, - "LC error constructing optimistic update"; - "peer" => %peer_id, - "error" => ?e, - ), + | LightClientOptimisticUpdateError::FailedConstructingUpdate => { + metrics::register_optimistic_update_error(&e); + + debug!( + self.log, + "Light client error constructing optimistic update"; + "peer" => %peer_id, + "error" => ?e, + ) + } } self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); } diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 1ec045e97e..6e6e681550 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -84,6 +84,7 @@ impl Worker { } }; let slot = block.slot(); + let parent_root = block.message().parent_root(); let result = self .chain .process_block( @@ -101,7 +102,10 @@ impl Worker { info!(self.log, "New RPC block received"; "slot" => slot, "hash" => %hash); // Trigger processing for work referencing this block. - let reprocess_msg = ReprocessQueueMessage::BlockImported(hash); + let reprocess_msg = ReprocessQueueMessage::BlockImported { + block_root: hash, + parent_root, + }; if reprocess_tx.try_send(reprocess_msg).is_err() { error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %hash) }; diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index baf00720b0..8dc76877a1 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -370,6 +370,21 @@ lazy_static! { "Number of queued attestations where as matching block has been imported." ); + /* + * Light client update reprocessing queue metrics. + */ + pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_OPTIMISTIC_UPDATES: Result = try_create_int_counter( + "beacon_processor_reprocessing_queue_expired_optimistic_updates", + "Number of queued light client optimistic updates which have expired before a matching block has been found." + ); + pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_OPTIMISTIC_UPDATES: Result = try_create_int_counter( + "beacon_processor_reprocessing_queue_matched_optimistic_updates", + "Number of queued light client optimistic updates where as matching block has been imported." + ); + pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_SENT_OPTIMISTIC_UPDATES: Result = try_create_int_counter( + "beacon_processor_reprocessing_queue_sent_optimistic_updates", + "Number of queued light client optimistic updates where as matching block has been imported." + ); } pub fn update_bandwidth_metrics(bandwidth: Arc) {