merge unknown parent messages before current slot lookup

This commit is contained in:
realbigsean
2023-05-23 11:33:00 -04:00
parent c59a3fcf42
commit 8ea98c596c
12 changed files with 596 additions and 508 deletions

View File

@@ -58,6 +58,7 @@ use lighthouse_network::{PeerAction, PeerId};
use slog::{crit, debug, error, info, trace, warn, Logger};
use slot_clock::SlotClock;
use std::boxed::Box;
use std::collections::HashMap;
use std::ops::IndexMut;
use std::ops::Sub;
use std::sync::Arc;
@@ -127,8 +128,18 @@ pub enum SyncMessage<T: EthSpec> {
/// A block with an unknown parent has been received.
UnknownBlock(PeerId, BlockWrapper<T>, Hash256),
/// A blob with an unknown parent has been received.
BlobParentUnknown(PeerId, Arc<BlobSidecar<T>>),
/// Used to re-trigger requests after delaying the lookup for the block + blobs in the
/// current slot.
MergedParentUnknown(
Hash256,
Vec<PeerShouldHave>,
Option<Arc<SignedBeaconBlock<T>>>,
Option<FixedBlobSidecarList<T>>,
),
/// A peer has sent an attestation that references a block that is unknown. This triggers the
/// manager to attempt to find the block matching the unknown hash.
UnknownBlockHashFromAttestation(PeerId, Hash256),
@@ -293,11 +304,57 @@ pub fn spawn<T: BeaconChainTypes>(
sleep(sleep_duration).await;
//TODO(sean) aggregate messages for blobs for the same block
let mut merged = HashMap::new();
while let Ok(msg) = delayed_lookups_recv.try_recv() {
match msg {
SyncMessage::BlobParentUnknown(peer_id, blob) => {
let blob_index = blob.index;
if blob_index < T::EthSpec::max_blobs_per_block() as u64 {
let (_, blobs, peers) = merged.entry(blob.block_root).or_insert_with(||{
(None, Some(FixedBlobSidecarList::default()), vec![])
});
if let Some(blobs) = blobs {
*blobs.index_mut(blob_index as usize) = Some(blob);
}
peers.push(PeerShouldHave::Neither(peer_id));
} else {
warn!(log, "Received blob with invalid index"; "index" => blob_index, "peer_id" => %peer_id);
}
}
SyncMessage::UnknownBlock(peer_id, block, root) => {
let (block, blobs) = block.deconstruct();
let (cached_block, cached_blobs, peers) = merged.entry(root).or_insert_with(||{
(None, Some(FixedBlobSidecarList::default()), vec![])
});
if let (Some(cached_blobs), Some( mut blobs)) = (cached_blobs, blobs) {
for blob in blobs.iter_mut() {
if let Some(blob) = blob.take() {
let blob_index = blob.index;
if blob_index < T::EthSpec::max_blobs_per_block() as u64 {
*cached_blobs.index_mut(blob_index as usize) = Some(blob);
} else {
warn!(log, "Received blob with invalid index"; "index" => blob_index, "peer_id" => %peer_id);
}
}
}
}
*cached_block = Some(block);
peers.push(PeerShouldHave::Neither(peer_id));
}
_ => {
if let Err(e) = sync_send.send(msg) {
warn!(log, "Failed to send delayed lookup message"; "error" => ?e);
}
}
}
}
// Send `MergedParentUnknown` messages to the sync manager.
for (root, (block, blobs, peers)) in merged {
let msg = SyncMessage::MergedParentUnknown(root, peers, block, blobs);
if let Err(e) = sync_send.send(msg) {
warn!(log, "Failed to send delayed lookup message"; "error" => ?e);
warn!(log, "Failed to send merged delayed lookup message"; "error" => ?e);
}
}
}
@@ -644,7 +701,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
if self.synced_and_connected_within_tolerance(block_slot, &peer_id) {
let parent_root = block.parent_root();
self.block_lookups.search_parent(
block_slot,
block_root,
parent_root,
peer_id,
&mut self.network,
);
if self.should_delay_lookup(block_slot) {
if let Err(e) = self
.delayed_lookups
@@ -659,16 +722,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
block_root,
Some(block),
blobs,
peer_id,
&[PeerShouldHave::Neither(peer_id)],
&mut self.network,
);
self.block_lookups.search_parent(
block_slot,
block_root,
parent_root,
peer_id,
&mut self.network,
);
}
}
}
@@ -679,7 +735,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
let block_root = blob.block_root;
let parent_root = blob.block_parent_root;
let blob_index = blob.index;
self.block_lookups.search_parent(
blob_slot,
block_root,
parent_root,
peer_id,
&mut self.network,
);
if self.should_delay_lookup(blob_slot) {
if let Err(e) = self
.delayed_lookups
@@ -695,18 +757,21 @@ impl<T: BeaconChainTypes> SyncManager<T> {
block_root,
None,
Some(blobs),
peer_id,
&[PeerShouldHave::Neither(peer_id)],
&mut self.network,
);
}
self.block_lookups.search_parent(
blob_slot,
}
}
SyncMessage::MergedParentUnknown(block_root, peers, block, blobs) => {
self.block_lookups
.search_current_unknown_parent_block_and_blobs(
block_root,
parent_root,
peer_id,
block,
blobs,
peers.as_slice(),
&mut self.network,
);
}
}
SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash) => {
// If we are not synced, ignore this block.