fix up sync manager delay message logic

This commit is contained in:
realbigsean
2023-04-24 11:09:14 -04:00
parent 03de817abe
commit 93df0f50e6

View File

@@ -47,6 +47,7 @@ use beacon_chain::blob_verification::AsBlock;
use beacon_chain::blob_verification::BlockWrapper;
use beacon_chain::{
AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, EngineState,
MAXIMUM_GOSSIP_CLOCK_DISPARITY,
};
use futures::StreamExt;
use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS;
@@ -72,6 +73,9 @@ use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot};
/// gossip if no peers are further than this range ahead of us that we have not already downloaded
/// blocks for.
pub const SLOT_IMPORT_TOLERANCE: usize = 32;
/// The maximum number of messages the delay queue can handle in a single slot before messages are
/// dropped.
pub const DELAY_QUEUE_CHANNEL_SIZE: usize = 128;
pub type Id = u32;
@@ -233,7 +237,7 @@ pub fn spawn<T: BeaconChainTypes>(
// generate the message channel
let (sync_send, sync_recv) = mpsc::unbounded_channel::<SyncMessage<T::EthSpec>>();
let (delayed_lookups_send, mut delayed_lookups_recv) =
mpsc::channel::<SyncMessage<T::EthSpec>>(512); //TODO(sean) what's a reasonable size for this channel? h
mpsc::channel::<SyncMessage<T::EthSpec>>(DELAY_QUEUE_CHANNEL_SIZE);
// create an instance of the SyncManager
let mut sync_manager = SyncManager {
@@ -287,14 +291,9 @@ pub fn spawn<T: BeaconChainTypes>(
sleep(sleep_duration).await;
while let next = delayed_lookups_recv.try_recv() {
match next {
Ok(msg) => {
if let Err(e) = sync_send.send(msg) {
warn!(log, "Failed to send delayed lookup message"; "error" => ?e);
}
}
Err(_) => break,
while let Ok(msg) = delayed_lookups_recv.try_recv() {
if let Err(e) = sync_send.send(msg) {
warn!(log, "Failed to send delayed lookup message"; "error" => ?e);
}
}
}
@@ -637,24 +636,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
if self.synced_and_connected_within_tolerance(block_slot, &peer_id) {
let parent_root = block.parent_root();
//TODO(sean) what about early blocks
let slot = match self.chain.slot_clock.now() {
Some(slot) => slot,
None => {
error!(
self.log,
"Could not read slot clock, dropping unknown block message"
);
return;
}
};
if block_slot == slot {
if self.should_delay_lookup(block_slot) {
if let Err(e) = self
.delayed_lookups
.try_send(SyncMessage::UnknownBlock(peer_id, block, block_root))
{
warn!(self.log, "Delayed lookups dropped for block"; "block_root" => ?block_root);
warn!(self.log, "Delayed lookups dropped for block"; "block_root" => ?block_root, "error" => ?e);
}
} else {
self.block_lookups.search_current_unknown_parent(
@@ -679,24 +667,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
if self.synced_and_connected_within_tolerance(blob_slot, &peer_id) {
let block_root = blob.block_root;
let parent_root = blob.block_parent_root;
//TODO(sean) what about early blocks
let slot = match self.chain.slot_clock.now() {
Some(slot) => slot,
None => {
error!(
self.log,
"Could not read slot clock, dropping unknown blob parent message"
);
return;
}
};
if blob_slot == slot {
if self.should_delay_lookup(blob_slot) {
if let Err(e) = self
.delayed_lookups
.try_send(SyncMessage::BlobParentUnknown(peer_id, blob))
{
warn!(self.log, "Delayed lookups dropped for blob"; "block_root" => ?block_root);
warn!(self.log, "Delayed lookups dropped for blob"; "block_root" => ?block_root, "error" => ?e);
}
} else {
self.block_lookups.search_current_unknown_blob_parent(
@@ -724,24 +701,12 @@ impl<T: BeaconChainTypes> SyncManager<T> {
SyncMessage::UnknownBlockHashFromGossipBlob(slot, peer_id, block_hash) => {
// If we are not synced, ignore this block.
if self.synced_and_connected(&peer_id) {
//TODO(sean) what about early gossip messages?
let current_slot = match self.chain.slot_clock.now() {
Some(slot) => slot,
None => {
error!(
self.log,
"Could not read slot clock, dropping unknown block message"
);
return;
}
};
if slot == current_slot {
if self.should_delay_lookup(slot) {
if let Err(e) = self.delayed_lookups.try_send(
SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash),
) {
warn!(self.log, "Delayed lookup dropped for block referenced by a blob";
"block_root" => ?block_hash);
"block_root" => ?block_hash, "error" => ?e);
}
} else {
self.block_lookups
@@ -804,6 +769,32 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
fn should_delay_lookup(&mut self, slot: Slot) -> bool {
let earliest_slot = self
.chain
.slot_clock
.now_with_past_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY);
let latest_slot = self
.chain
.slot_clock
.now_with_future_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY);
let should_delay_lookup =
if let (Some(earliest_slot), Some(latest_slot)) = (earliest_slot, latest_slot) {
let msg_for_current_slot = slot >= earliest_slot && slot <= latest_slot;
let delay_threshold_unmet = self
.chain
.slot_clock
.seconds_from_current_slot_start()
.map_or(false, |secs_into_slot| {
secs_into_slot < self.chain.slot_clock.unagg_attestation_production_delay()
});
msg_for_current_slot && delay_threshold_unmet
} else {
false
};
should_delay_lookup
}
fn synced_and_connected_within_tolerance(
&mut self,
block_slot: Slot,