From 93df0f50e6e2c9b4e9f9a5ad8af4533582a58593 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Mon, 24 Apr 2023 11:09:14 -0400 Subject: [PATCH] fix up sync manager delay message logic --- beacon_node/network/src/sync/manager.rs | 89 +++++++++++-------------- 1 file changed, 40 insertions(+), 49 deletions(-) diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 2b3183e1ef..2da1c934cb 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -47,6 +47,7 @@ use beacon_chain::blob_verification::AsBlock; use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, EngineState, + MAXIMUM_GOSSIP_CLOCK_DISPARITY, }; use futures::StreamExt; use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS; @@ -72,6 +73,9 @@ use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// gossip if no peers are further than this range ahead of us that we have not already downloaded /// blocks for. pub const SLOT_IMPORT_TOLERANCE: usize = 32; +/// The maximum number of messages the delay queue can handle in a single slot before messages are +/// dropped. +pub const DELAY_QUEUE_CHANNEL_SIZE: usize = 128; pub type Id = u32; @@ -233,7 +237,7 @@ pub fn spawn( // generate the message channel let (sync_send, sync_recv) = mpsc::unbounded_channel::>(); let (delayed_lookups_send, mut delayed_lookups_recv) = - mpsc::channel::>(512); //TODO(sean) what's a reasonable size for this channel? h + mpsc::channel::>(DELAY_QUEUE_CHANNEL_SIZE); // create an instance of the SyncManager let mut sync_manager = SyncManager { @@ -287,14 +291,9 @@ pub fn spawn( sleep(sleep_duration).await; - while let next = delayed_lookups_recv.try_recv() { - match next { - Ok(msg) => { - if let Err(e) = sync_send.send(msg) { - warn!(log, "Failed to send delayed lookup message"; "error" => ?e); - } - } - Err(_) => break, + while let Ok(msg) = delayed_lookups_recv.try_recv() { + if let Err(e) = sync_send.send(msg) { + warn!(log, "Failed to send delayed lookup message"; "error" => ?e); } } } @@ -637,24 +636,13 @@ impl SyncManager { if self.synced_and_connected_within_tolerance(block_slot, &peer_id) { let parent_root = block.parent_root(); - //TODO(sean) what about early blocks - let slot = match self.chain.slot_clock.now() { - Some(slot) => slot, - None => { - error!( - self.log, - "Could not read slot clock, dropping unknown block message" - ); - return; - } - }; - if block_slot == slot { + if self.should_delay_lookup(block_slot) { if let Err(e) = self .delayed_lookups .try_send(SyncMessage::UnknownBlock(peer_id, block, block_root)) { - warn!(self.log, "Delayed lookups dropped for block"; "block_root" => ?block_root); + warn!(self.log, "Delayed lookups dropped for block"; "block_root" => ?block_root, "error" => ?e); } } else { self.block_lookups.search_current_unknown_parent( @@ -679,24 +667,13 @@ impl SyncManager { if self.synced_and_connected_within_tolerance(blob_slot, &peer_id) { let block_root = blob.block_root; let parent_root = blob.block_parent_root; - //TODO(sean) what about early blocks - let slot = match self.chain.slot_clock.now() { - Some(slot) => slot, - None => { - error!( - self.log, - "Could not read slot clock, dropping unknown blob parent message" - ); - return; - } - }; - if blob_slot == slot { + if self.should_delay_lookup(blob_slot) { if let Err(e) = self .delayed_lookups .try_send(SyncMessage::BlobParentUnknown(peer_id, blob)) { - warn!(self.log, "Delayed lookups dropped for blob"; "block_root" => ?block_root); + warn!(self.log, "Delayed lookups dropped for blob"; "block_root" => ?block_root, "error" => ?e); } } else { self.block_lookups.search_current_unknown_blob_parent( @@ -724,24 +701,12 @@ impl SyncManager { SyncMessage::UnknownBlockHashFromGossipBlob(slot, peer_id, block_hash) => { // If we are not synced, ignore this block. if self.synced_and_connected(&peer_id) { - //TODO(sean) what about early gossip messages? - let current_slot = match self.chain.slot_clock.now() { - Some(slot) => slot, - None => { - error!( - self.log, - "Could not read slot clock, dropping unknown block message" - ); - return; - } - }; - - if slot == current_slot { + if self.should_delay_lookup(slot) { if let Err(e) = self.delayed_lookups.try_send( SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash), ) { warn!(self.log, "Delayed lookup dropped for block referenced by a blob"; - "block_root" => ?block_hash); + "block_root" => ?block_hash, "error" => ?e); } } else { self.block_lookups @@ -804,6 +769,32 @@ impl SyncManager { } } + fn should_delay_lookup(&mut self, slot: Slot) -> bool { + let earliest_slot = self + .chain + .slot_clock + .now_with_past_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY); + let latest_slot = self + .chain + .slot_clock + .now_with_future_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY); + let should_delay_lookup = + if let (Some(earliest_slot), Some(latest_slot)) = (earliest_slot, latest_slot) { + let msg_for_current_slot = slot >= earliest_slot && slot <= latest_slot; + let delay_threshold_unmet = self + .chain + .slot_clock + .seconds_from_current_slot_start() + .map_or(false, |secs_into_slot| { + secs_into_slot < self.chain.slot_clock.unagg_attestation_production_delay() + }); + msg_for_current_slot && delay_threshold_unmet + } else { + false + }; + should_delay_lookup + } + fn synced_and_connected_within_tolerance( &mut self, block_slot: Slot,