add peer usefulness enum

This commit is contained in:
realbigsean
2023-04-24 12:27:49 -04:00
parent 93df0f50e6
commit 381044abe7
8 changed files with 95 additions and 55 deletions

View File

@@ -762,8 +762,8 @@ impl<T: BeaconChainTypes> Worker<T> {
//TODO(sean) add metrics and logging
self.chain.recompute_head_at_current_slot().await;
}
Ok(AvailabilityProcessingStatus::MissingParts(slot, block_hash)) => {
self.send_sync_message(SyncMessage::UnknownBlockHashFromGossipBlob(
Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_hash)) => {
self.send_sync_message(SyncMessage::MissingGossipBlockComponents(
slot, peer_id, block_hash,
));
}
@@ -1133,9 +1133,9 @@ impl<T: BeaconChainTypes> Worker<T> {
self.chain.recompute_head_at_current_slot().await;
}
Ok(AvailabilityProcessingStatus::MissingParts(slot, block_root)) => {
Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => {
// make rpc request for blob
self.send_sync_message(SyncMessage::UnknownBlockHashFromGossipBlob(
self.send_sync_message(SyncMessage::MissingGossipBlockComponents(
slot, peer_id, block_root,
));
}

View File

@@ -17,8 +17,8 @@ use beacon_chain::{
use beacon_chain::{AvailabilityProcessingStatus, CountUnrealized};
use lighthouse_network::PeerAction;
use slog::{debug, error, info, warn};
use ssz_types::FixedVector;
use slot_clock::SlotClock;
use ssz_types::FixedVector;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;

View File

@@ -85,6 +85,12 @@ pub enum ResponseType {
Blob,
}
#[derive(Debug, Copy, Clone)]
pub enum PeerShouldHave {
BlockAndBlobs,
Neither,
}
impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn new(
da_checker: Arc<DataAvailabilityChecker<T::EthSpec, T::SlotClock>>,
@@ -104,8 +110,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/* Lookup requests */
pub fn search_block(&mut self, hash: Hash256, peer_id: PeerId, cx: &mut SyncNetworkContext<T>) {
self.search_block_with(|_| {}, hash, peer_id, cx)
pub fn search_block(
&mut self,
hash: Hash256,
peer_id: PeerId,
peer_usefulness: PeerShouldHave,
cx: &mut SyncNetworkContext<T>,
) {
self.search_block_with(|_| {}, hash, peer_id, peer_usefulness, cx)
}
/// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is
@@ -115,6 +127,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
cache_fn: impl Fn(&mut SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T>),
hash: Hash256,
peer_id: PeerId,
peer_usefulness: PeerShouldHave,
cx: &mut SyncNetworkContext<T>,
) {
// Do not re-request a block that is already being requested
@@ -122,19 +135,15 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
.single_block_lookups
.iter_mut()
.any(|(_, _, single_block_request)| {
if single_block_request.requested_block_root == hash {
single_block_request.block_request_state.add_peer(&peer_id);
single_block_request.blob_request_state.add_peer(&peer_id);
return true;
}
false
single_block_request.add_peer_if_useful(&hash, &peer_id, peer_usefulness)
})
{
return;
}
if self.parent_lookups.iter_mut().any(|parent_req| {
parent_req.add_peer(&hash, &peer_id) || parent_req.contains_block(&hash)
parent_req.add_peer_if_useful(&hash, &peer_id, peer_usefulness)
|| parent_req.contains_block(&hash)
}) {
// If the block was already downloaded, or is being downloaded in this moment, do not
// request it.
@@ -197,6 +206,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
},
block_root,
peer_id,
PeerShouldHave::Neither,
cx,
);
}
@@ -214,6 +224,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
},
block_root,
peer_id,
PeerShouldHave::Neither,
cx,
);
}
@@ -228,6 +239,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
peer_id: PeerId,
cx: &mut SyncNetworkContext<T>,
) {
// Gossip blocks or blobs shouldn't be propogated if parents are unavailable.
let peer_usefulness = PeerShouldHave::BlockAndBlobs;
// If this block or it's parent is part of a known failed chain, ignore it.
if self.failed_chains.contains(&parent_root) || self.failed_chains.contains(&block_root) {
debug!(self.log, "Block is from a past failed chain. Dropping";
@@ -238,7 +252,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// Make sure this block is not already downloaded, and that neither it or its parent is
// being searched for.
if self.parent_lookups.iter_mut().any(|parent_req| {
parent_req.contains_block(&block_root) || parent_req.add_peer(&block_root, &peer_id)
parent_req.contains_block(&block_root)
|| parent_req.add_peer_if_useful(&block_root, &peer_id, peer_usefulness)
}) {
// we are already searching for this block, ignore it
return;
@@ -560,7 +575,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
LookupDownloadStatus::SearchBlock(block_root) => {
self.search_block(block_root, peer_id, cx);
self.search_block(block_root, peer_id, PeerShouldHave::BlockAndBlobs, cx);
self.parent_lookups.push(parent_lookup)
}
}
@@ -654,7 +669,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
LookupDownloadStatus::SearchBlock(block_root) => {
self.search_block(block_root, peer_id, cx);
self.search_block(block_root, peer_id, PeerShouldHave::BlockAndBlobs, cx);
self.parent_lookups.push(parent_lookup)
}
}
@@ -938,8 +953,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
trace!(self.log, "Single block processing succeeded"; "block" => %root);
true
}
AvailabilityProcessingStatus::MissingParts(_, block_root) => {
self.search_block(block_root, peer_id, cx);
AvailabilityProcessingStatus::MissingComponents(_, block_root) => {
// At this point we don't know what the peer *should* have.
self.search_block(block_root, peer_id, PeerShouldHave::Neither, cx);
false
}
},
@@ -1080,7 +1096,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
AvailabilityProcessingStatus::Imported(hash) => {
trace!(self.log, "Parent block processing succeeded"; &parent_lookup)
}
AvailabilityProcessingStatus::MissingParts(_, block_root) => {
AvailabilityProcessingStatus::MissingComponents(_, block_root) => {
trace!(self.log, "Parent missing parts, triggering single block lookup "; &parent_lookup)
}
},
@@ -1098,11 +1114,11 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
match result {
BlockPartProcessingResult::Ok(AvailabilityProcessingStatus::MissingParts(
BlockPartProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents(
_,
block_root,
)) => {
self.search_block(block_root, peer_id, cx);
self.search_block(block_root, peer_id, PeerShouldHave::BlockAndBlobs, cx);
}
BlockPartProcessingResult::Err(BlockError::ParentUnknown(block)) => {
parent_lookup.add_block_wrapper(block);

View File

@@ -1,5 +1,5 @@
use super::single_block_lookup::{LookupRequestError, LookupVerifyError, SingleBlockLookup};
use super::DownloadedBlocks;
use super::{DownloadedBlocks, PeerShouldHave};
use crate::sync::block_lookups::{single_block_lookup, RootBlockTuple};
use crate::sync::{
manager::{Id, SLOT_IMPORT_TOLERANCE},
@@ -45,7 +45,7 @@ pub enum ParentVerifyError {
ExtraBlobsReturned,
InvalidIndex(u64),
PreviousFailure { parent_root: Hash256 },
AvailabilityCheck, //TODO(sean) wrap the underlying error
AvailabilityCheck,
}
#[derive(Debug, PartialEq, Eq)]
@@ -348,18 +348,14 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
self.current_parent_request.failed_attempts()
}
//TODO(sean) fix this up
pub fn add_peer(&mut self, block_root: &Hash256, peer_id: &PeerId) -> bool {
if block_root == &self.chain_hash {
return false;
}
pub fn add_peer_if_useful(
&mut self,
block_root: &Hash256,
peer_id: &PeerId,
peer_usefulness: PeerShouldHave,
) -> bool {
self.current_parent_request
.block_request_state
.add_peer(peer_id);
self.current_parent_request
.blob_request_state
.add_peer(peer_id);
true
.add_peer_if_useful(block_root, peer_id, peer_usefulness)
}
//TODO(sean) fix this up

View File

@@ -14,6 +14,8 @@ use strum::IntoStaticStr;
use types::blob_sidecar::BlobIdentifier;
use types::{BlobSidecar, EthSpec, SignedBeaconBlock};
use super::PeerShouldHave;
pub struct SingleBlockLookup<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> {
pub requested_block_root: Hash256,
pub requested_ids: Vec<BlobIdentifier>,
@@ -372,6 +374,24 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
Err(LookupRequestError::NoPeers)
}
}
pub fn add_peer_if_useful(
&mut self,
block_root: &Hash256,
peer_id: &PeerId,
peer_usefulness: PeerShouldHave,
) -> bool {
if *block_root != self.requested_block_root {
return false;
}
match peer_usefulness {
PeerShouldHave::BlockAndBlobs => {
self.block_request_state.add_peer(peer_id);
self.blob_request_state.add_peer(peer_id);
}
PeerShouldHave::Neither => {}
}
true
}
}
impl<const MAX_ATTEMPTS: u8> SingleLookupRequestState<MAX_ATTEMPTS> {

View File

@@ -34,7 +34,7 @@
//! search for the block and subsequently search for parents if needed.
use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart};
use super::block_lookups::BlockLookups;
use super::block_lookups::{BlockLookups, PeerShouldHave};
use super::network_context::{BlockOrBlob, SyncNetworkContext};
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
@@ -133,7 +133,7 @@ pub enum SyncMessage<T: EthSpec> {
/// A peer has sent a blob that references a block that is unknown. This triggers the
/// manager to attempt to find the block matching the unknown hash when the specified delay expires.
UnknownBlockHashFromGossipBlob(Slot, PeerId, Hash256),
MissingGossipBlockComponents(Slot, PeerId, Hash256),
/// A peer has disconnected.
Disconnect(PeerId),
@@ -651,14 +651,14 @@ impl<T: BeaconChainTypes> SyncManager<T> {
peer_id,
&mut self.network,
);
self.block_lookups.search_parent(
block_slot,
block_root,
parent_root,
peer_id,
&mut self.network,
);
}
self.block_lookups.search_parent(
block_slot,
block_root,
parent_root,
peer_id,
&mut self.network,
);
}
}
SyncMessage::BlobParentUnknown(peer_id, blob) => {
@@ -694,11 +694,15 @@ impl<T: BeaconChainTypes> SyncManager<T> {
SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash) => {
// If we are not synced, ignore this block.
if self.synced_and_connected(&peer_id) {
self.block_lookups
.search_block(block_hash, peer_id, &mut self.network);
self.block_lookups.search_block(
block_hash,
peer_id,
PeerShouldHave::BlockAndBlobs,
&mut self.network,
);
}
}
SyncMessage::UnknownBlockHashFromGossipBlob(slot, peer_id, block_hash) => {
SyncMessage::MissingGossipBlockComponents(slot, peer_id, block_hash) => {
// If we are not synced, ignore this block.
if self.synced_and_connected(&peer_id) {
if self.should_delay_lookup(slot) {
@@ -709,8 +713,12 @@ impl<T: BeaconChainTypes> SyncManager<T> {
"block_root" => ?block_hash, "error" => ?e);
}
} else {
self.block_lookups
.search_block(block_hash, peer_id, &mut self.network)
self.block_lookups.search_block(
block_hash,
peer_id,
PeerShouldHave::Neither,
&mut self.network,
)
}
}
}