From 381044abe7d54371c47faa30673bc944c19046da Mon Sep 17 00:00:00 2001 From: realbigsean Date: Mon, 24 Apr 2023 12:27:49 -0400 Subject: [PATCH] add peer usefulness enum --- beacon_node/beacon_chain/src/beacon_chain.rs | 10 ++-- beacon_node/http_api/src/publish_blocks.rs | 2 +- .../beacon_processor/worker/gossip_methods.rs | 8 +-- .../beacon_processor/worker/sync_methods.rs | 2 +- .../network/src/sync/block_lookups/mod.rs | 50 ++++++++++++------- .../src/sync/block_lookups/parent_lookup.rs | 22 ++++---- .../sync/block_lookups/single_block_lookup.rs | 20 ++++++++ beacon_node/network/src/sync/manager.rs | 36 +++++++------ 8 files changed, 95 insertions(+), 55 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 3ba6d0759a..f4993c2677 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -184,7 +184,7 @@ pub enum WhenSlotSkipped { #[derive(Debug, PartialEq)] pub enum AvailabilityProcessingStatus { - MissingParts(Slot, Hash256), + MissingComponents(Slot, Hash256), Imported(Hash256), } @@ -2668,7 +2668,7 @@ impl BeaconChain { AvailabilityProcessingStatus::Imported(_) => { // The block was imported successfully. } - AvailabilityProcessingStatus::MissingParts(slot, block_root) => { + AvailabilityProcessingStatus::MissingComponents(slot, block_root) => { return ChainSegmentResult::Failed { imported_blocks, error: BlockError::MissingBlockParts(slot, block_root), @@ -2919,9 +2919,9 @@ impl BeaconChain { Availability::Available(block) => { self.import_available_block(block, count_unrealized).await } - Availability::MissingParts(block_root) => { - Ok(AvailabilityProcessingStatus::MissingParts(slot, block_root)) - } + Availability::MissingParts(block_root) => Ok( + AvailabilityProcessingStatus::MissingComponents(slot, block_root), + ), } } diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index f830b194ec..dcbe809724 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -140,7 +140,7 @@ pub async fn publish_block( Ok(()) } - Ok(AvailabilityProcessingStatus::MissingParts(_, block_root)) => { + Ok(AvailabilityProcessingStatus::MissingComponents(_, block_root)) => { let msg = format!("Missing parts of block with root {:?}", block_root); error!( log, diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 3ee3990e5e..9b76b25e59 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -762,8 +762,8 @@ impl Worker { //TODO(sean) add metrics and logging self.chain.recompute_head_at_current_slot().await; } - Ok(AvailabilityProcessingStatus::MissingParts(slot, block_hash)) => { - self.send_sync_message(SyncMessage::UnknownBlockHashFromGossipBlob( + Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_hash)) => { + self.send_sync_message(SyncMessage::MissingGossipBlockComponents( slot, peer_id, block_hash, )); } @@ -1133,9 +1133,9 @@ impl Worker { self.chain.recompute_head_at_current_slot().await; } - Ok(AvailabilityProcessingStatus::MissingParts(slot, block_root)) => { + Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => { // make rpc request for blob - self.send_sync_message(SyncMessage::UnknownBlockHashFromGossipBlob( + self.send_sync_message(SyncMessage::MissingGossipBlockComponents( slot, peer_id, block_root, )); } diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index d650b3ff0a..1946111067 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -17,8 +17,8 @@ use beacon_chain::{ use beacon_chain::{AvailabilityProcessingStatus, CountUnrealized}; use lighthouse_network::PeerAction; use slog::{debug, error, info, warn}; -use ssz_types::FixedVector; use slot_clock::SlotClock; +use ssz_types::FixedVector; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc; diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index ff787869e8..b3a834b7bb 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -85,6 +85,12 @@ pub enum ResponseType { Blob, } +#[derive(Debug, Copy, Clone)] +pub enum PeerShouldHave { + BlockAndBlobs, + Neither, +} + impl BlockLookups { pub fn new( da_checker: Arc>, @@ -104,8 +110,14 @@ impl BlockLookups { /* Lookup requests */ - pub fn search_block(&mut self, hash: Hash256, peer_id: PeerId, cx: &mut SyncNetworkContext) { - self.search_block_with(|_| {}, hash, peer_id, cx) + pub fn search_block( + &mut self, + hash: Hash256, + peer_id: PeerId, + peer_usefulness: PeerShouldHave, + cx: &mut SyncNetworkContext, + ) { + self.search_block_with(|_| {}, hash, peer_id, peer_usefulness, cx) } /// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is @@ -115,6 +127,7 @@ impl BlockLookups { cache_fn: impl Fn(&mut SingleBlockLookup), hash: Hash256, peer_id: PeerId, + peer_usefulness: PeerShouldHave, cx: &mut SyncNetworkContext, ) { // Do not re-request a block that is already being requested @@ -122,19 +135,15 @@ impl BlockLookups { .single_block_lookups .iter_mut() .any(|(_, _, single_block_request)| { - if single_block_request.requested_block_root == hash { - single_block_request.block_request_state.add_peer(&peer_id); - single_block_request.blob_request_state.add_peer(&peer_id); - return true; - } - false + single_block_request.add_peer_if_useful(&hash, &peer_id, peer_usefulness) }) { return; } if self.parent_lookups.iter_mut().any(|parent_req| { - parent_req.add_peer(&hash, &peer_id) || parent_req.contains_block(&hash) + parent_req.add_peer_if_useful(&hash, &peer_id, peer_usefulness) + || parent_req.contains_block(&hash) }) { // If the block was already downloaded, or is being downloaded in this moment, do not // request it. @@ -197,6 +206,7 @@ impl BlockLookups { }, block_root, peer_id, + PeerShouldHave::Neither, cx, ); } @@ -214,6 +224,7 @@ impl BlockLookups { }, block_root, peer_id, + PeerShouldHave::Neither, cx, ); } @@ -228,6 +239,9 @@ impl BlockLookups { peer_id: PeerId, cx: &mut SyncNetworkContext, ) { + // Gossip blocks or blobs shouldn't be propogated if parents are unavailable. + let peer_usefulness = PeerShouldHave::BlockAndBlobs; + // If this block or it's parent is part of a known failed chain, ignore it. if self.failed_chains.contains(&parent_root) || self.failed_chains.contains(&block_root) { debug!(self.log, "Block is from a past failed chain. Dropping"; @@ -238,7 +252,8 @@ impl BlockLookups { // Make sure this block is not already downloaded, and that neither it or its parent is // being searched for. if self.parent_lookups.iter_mut().any(|parent_req| { - parent_req.contains_block(&block_root) || parent_req.add_peer(&block_root, &peer_id) + parent_req.contains_block(&block_root) + || parent_req.add_peer_if_useful(&block_root, &peer_id, peer_usefulness) }) { // we are already searching for this block, ignore it return; @@ -560,7 +575,7 @@ impl BlockLookups { } } LookupDownloadStatus::SearchBlock(block_root) => { - self.search_block(block_root, peer_id, cx); + self.search_block(block_root, peer_id, PeerShouldHave::BlockAndBlobs, cx); self.parent_lookups.push(parent_lookup) } } @@ -654,7 +669,7 @@ impl BlockLookups { } } LookupDownloadStatus::SearchBlock(block_root) => { - self.search_block(block_root, peer_id, cx); + self.search_block(block_root, peer_id, PeerShouldHave::BlockAndBlobs, cx); self.parent_lookups.push(parent_lookup) } } @@ -938,8 +953,9 @@ impl BlockLookups { trace!(self.log, "Single block processing succeeded"; "block" => %root); true } - AvailabilityProcessingStatus::MissingParts(_, block_root) => { - self.search_block(block_root, peer_id, cx); + AvailabilityProcessingStatus::MissingComponents(_, block_root) => { + // At this point we don't know what the peer *should* have. + self.search_block(block_root, peer_id, PeerShouldHave::Neither, cx); false } }, @@ -1080,7 +1096,7 @@ impl BlockLookups { AvailabilityProcessingStatus::Imported(hash) => { trace!(self.log, "Parent block processing succeeded"; &parent_lookup) } - AvailabilityProcessingStatus::MissingParts(_, block_root) => { + AvailabilityProcessingStatus::MissingComponents(_, block_root) => { trace!(self.log, "Parent missing parts, triggering single block lookup "; &parent_lookup) } }, @@ -1098,11 +1114,11 @@ impl BlockLookups { } match result { - BlockPartProcessingResult::Ok(AvailabilityProcessingStatus::MissingParts( + BlockPartProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( _, block_root, )) => { - self.search_block(block_root, peer_id, cx); + self.search_block(block_root, peer_id, PeerShouldHave::BlockAndBlobs, cx); } BlockPartProcessingResult::Err(BlockError::ParentUnknown(block)) => { parent_lookup.add_block_wrapper(block); diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 5166d8fc29..6cd1fedc40 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -1,5 +1,5 @@ use super::single_block_lookup::{LookupRequestError, LookupVerifyError, SingleBlockLookup}; -use super::DownloadedBlocks; +use super::{DownloadedBlocks, PeerShouldHave}; use crate::sync::block_lookups::{single_block_lookup, RootBlockTuple}; use crate::sync::{ manager::{Id, SLOT_IMPORT_TOLERANCE}, @@ -45,7 +45,7 @@ pub enum ParentVerifyError { ExtraBlobsReturned, InvalidIndex(u64), PreviousFailure { parent_root: Hash256 }, - AvailabilityCheck, //TODO(sean) wrap the underlying error + AvailabilityCheck, } #[derive(Debug, PartialEq, Eq)] @@ -348,18 +348,14 @@ impl ParentLookup { self.current_parent_request.failed_attempts() } - //TODO(sean) fix this up - pub fn add_peer(&mut self, block_root: &Hash256, peer_id: &PeerId) -> bool { - if block_root == &self.chain_hash { - return false; - } + pub fn add_peer_if_useful( + &mut self, + block_root: &Hash256, + peer_id: &PeerId, + peer_usefulness: PeerShouldHave, + ) -> bool { self.current_parent_request - .block_request_state - .add_peer(peer_id); - self.current_parent_request - .blob_request_state - .add_peer(peer_id); - true + .add_peer_if_useful(block_root, peer_id, peer_usefulness) } //TODO(sean) fix this up diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index f87751029d..3a05cc3bac 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -14,6 +14,8 @@ use strum::IntoStaticStr; use types::blob_sidecar::BlobIdentifier; use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; +use super::PeerShouldHave; + pub struct SingleBlockLookup { pub requested_block_root: Hash256, pub requested_ids: Vec, @@ -372,6 +374,24 @@ impl SingleBlockLookup bool { + if *block_root != self.requested_block_root { + return false; + } + match peer_usefulness { + PeerShouldHave::BlockAndBlobs => { + self.block_request_state.add_peer(peer_id); + self.blob_request_state.add_peer(peer_id); + } + PeerShouldHave::Neither => {} + } + true + } } impl SingleLookupRequestState { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 2da1c934cb..42de376424 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -34,7 +34,7 @@ //! search for the block and subsequently search for parents if needed. use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; -use super::block_lookups::BlockLookups; +use super::block_lookups::{BlockLookups, PeerShouldHave}; use super::network_context::{BlockOrBlob, SyncNetworkContext}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; @@ -133,7 +133,7 @@ pub enum SyncMessage { /// A peer has sent a blob that references a block that is unknown. This triggers the /// manager to attempt to find the block matching the unknown hash when the specified delay expires. - UnknownBlockHashFromGossipBlob(Slot, PeerId, Hash256), + MissingGossipBlockComponents(Slot, PeerId, Hash256), /// A peer has disconnected. Disconnect(PeerId), @@ -651,14 +651,14 @@ impl SyncManager { peer_id, &mut self.network, ); + self.block_lookups.search_parent( + block_slot, + block_root, + parent_root, + peer_id, + &mut self.network, + ); } - self.block_lookups.search_parent( - block_slot, - block_root, - parent_root, - peer_id, - &mut self.network, - ); } } SyncMessage::BlobParentUnknown(peer_id, blob) => { @@ -694,11 +694,15 @@ impl SyncManager { SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash) => { // If we are not synced, ignore this block. if self.synced_and_connected(&peer_id) { - self.block_lookups - .search_block(block_hash, peer_id, &mut self.network); + self.block_lookups.search_block( + block_hash, + peer_id, + PeerShouldHave::BlockAndBlobs, + &mut self.network, + ); } } - SyncMessage::UnknownBlockHashFromGossipBlob(slot, peer_id, block_hash) => { + SyncMessage::MissingGossipBlockComponents(slot, peer_id, block_hash) => { // If we are not synced, ignore this block. if self.synced_and_connected(&peer_id) { if self.should_delay_lookup(slot) { @@ -709,8 +713,12 @@ impl SyncManager { "block_root" => ?block_hash, "error" => ?e); } } else { - self.block_lookups - .search_block(block_hash, peer_id, &mut self.network) + self.block_lookups.search_block( + block_hash, + peer_id, + PeerShouldHave::Neither, + &mut self.network, + ) } } }