add delayed processing logic and combine some requests

This commit is contained in:
realbigsean
2023-04-14 16:50:41 -04:00
parent 2f8c8852ba
commit 8618c301b5
9 changed files with 445 additions and 519 deletions

View File

@@ -54,11 +54,14 @@ use lighthouse_network::types::{NetworkGlobals, SyncState};
use lighthouse_network::SyncInfo;
use lighthouse_network::{PeerAction, PeerId};
use slog::{crit, debug, error, info, trace, warn, Logger};
use slot_clock::SlotClock;
use std::boxed::Box;
use std::ops::Sub;
use std::sync::mpsc::TryRecvError;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::sleep;
use types::blob_sidecar::BlobIdentifier;
use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot};
@@ -125,17 +128,7 @@ pub enum SyncMessage<T: EthSpec> {
/// A peer has sent a blob that references a block that is unknown. This triggers the
/// manager to attempt to find the block matching the unknown hash when the specified delay expires.
UnknownBlockHashFromGossipBlob(PeerId, Hash256, Duration),
/// A peer has sent us a block that we haven't received all the blobs for. This triggers
/// the manager to attempt to find the pending blobs for the given block root when the specified
/// delay expires.
MissingBlobs {
peer_id: PeerId,
block_root: Hash256,
pending_blobs: Vec<BlobIdentifier>,
search_delay: Duration,
},
UnknownBlockHashFromGossipBlob(Slot, PeerId, Hash256),
/// A peer has disconnected.
Disconnect(PeerId),
@@ -214,6 +207,8 @@ pub struct SyncManager<T: BeaconChainTypes> {
block_lookups: BlockLookups<T>,
delayed_lookups: mpsc::Sender<SyncMessage<T::EthSpec>>,
/// The logger for the import manager.
log: Logger,
}
@@ -235,6 +230,8 @@ pub fn spawn<T: BeaconChainTypes>(
);
// generate the message channel
let (sync_send, sync_recv) = mpsc::unbounded_channel::<SyncMessage<T::EthSpec>>();
let (delayed_lookups_send, mut delayed_lookups_recv) =
mpsc::channel::<SyncMessage<T::EthSpec>>(512); //TODO(sean) what's a reasonable size for this channel? h
// create an instance of the SyncManager
let mut sync_manager = SyncManager {
@@ -251,9 +248,53 @@ pub fn spawn<T: BeaconChainTypes>(
range_sync: RangeSync::new(beacon_chain.clone(), log.clone()),
backfill_sync: BackFillSync::new(beacon_chain, network_globals, log.clone()),
block_lookups: BlockLookups::new(log.clone()),
delayed_lookups: delayed_lookups_send,
log: log.clone(),
};
executor.spawn(
async move {
let slot_duration = slot_clock.slot_duration();
// TODO(sean) think about what this should be
let delay = beacon_chain.slot_clock.unagg_attestation_production_delay();
loop {
let sleep_duration = match (
beacon_chain.slot_clock.duration_to_next_slot(),
beacon_chain.slot_clock.seconds_from_current_slot_start(),
) {
(Some(duration_to_next_slot), Some(seconds_from_current_slot_start)) => {
if seconds_from_current_slot_start > delay {
duration_to_next_slot + delay
} else {
delay - seconds_from_current_slot_start
}
}
_ => {
error!(log, "Failed to read slot clock");
// If we can't read the slot clock, just wait another slot.
sleep(slot_duration).await;
continue;
}
};
sleep(sleep_duration).await;
while let next = delayed_lookups_recv.try_recv() {
match next {
Ok(msg) => {
if let Err(e) = sync_send.send(msg) {
warn!(log, "Failed to send delayed lookup message"; "error" => ?e);
}
}
Err(_) => break,
}
}
}
},
"delayed_lookups",
);
// spawn the sync manager thread
debug!(log, "Sync Manager started");
executor.spawn(async move { Box::pin(sync_manager.main()).await }, "sync");
@@ -603,8 +644,25 @@ impl<T: BeaconChainTypes> SyncManager<T> {
if self.network_globals.peers.read().is_connected(&peer_id)
&& self.network.is_execution_engine_online()
{
self.block_lookups
.search_parent(block_root, block, peer_id, &mut self.network);
let parent_root = block.parent_root();
//TODO(sean) what about early blocks
if block.slot() == self.chain.slot_clock.now() {
self.delayed_lookups
.send(SyncMessage::UnknownBlock(peer_id, block, block_root));
} else {
self.block_lookups.search_current_unknown_parent(
block_root,
block,
peer_id,
&mut self.network,
);
}
self.block_lookups.search_parent(
block_root,
parent_root,
peer_id,
&mut self.network,
);
}
}
SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash) => {
@@ -612,35 +670,21 @@ impl<T: BeaconChainTypes> SyncManager<T> {
if self.synced_and_connected(&peer_id) {
self.block_lookups
.search_block(block_hash, peer_id, &mut self.network);
//TODO(sean) we could always request all blobs at this point
}
}
SyncMessage::UnknownBlockHashFromGossipBlob(peer_id, block_hash, delay) => {
SyncMessage::UnknownBlockHashFromGossipBlob(slot, peer_id, block_hash) => {
// If we are not synced, ignore this block.
if self.synced_and_connected(&peer_id) {
self.block_lookups.search_block_delayed(
peer_id,
block_hash,
delay,
&mut self.network,
);
}
}
SyncMessage::MissingBlobs {
peer_id,
block_root,
pending_blobs,
search_delay,
} => {
// If we are not synced, ignore these blobs.
if self.synced_and_connected(&peer_id) {
self.block_lookups.search_blobs_delayed(
peer_id,
block_root,
pending_blobs,
search_delay,
&mut self.network,
);
//TODO(sean) what about early gossip messages?
if Some(slot) == self.chain.slot_clock.now() {
self.delayed_lookups
.send(SyncMessage::UnknownBlockHashFromAttestation(
peer_id, block_hash,
))
} else {
self.block_lookups
.search_block(block_hash, peer_id, &mut self.network)
}
}
}
SyncMessage::Disconnect(peer_id) => {
@@ -835,7 +879,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
seen_timestamp: Duration,
) {
match request_id {
RequestId::SingleBlock { id } => self.block_lookups.single_lookup_blob_response(
RequestId::SingleBlock { id } => self.block_lookups.single_blob_lookup_response(
id,
peer_id,
blob,