track information about peer source

This commit is contained in:
realbigsean
2023-05-02 12:28:32 -04:00
parent 8edefb7e0d
commit 56b2365e17
5 changed files with 1081 additions and 751 deletions

View File

@@ -11,6 +11,7 @@ use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use store::Hash256;
use strum::Display;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{BlobSidecar, SignedBeaconBlock, Slot};
@@ -87,10 +88,24 @@ pub enum ResponseType {
Blob,
}
#[derive(Debug, Copy, Clone)]
pub enum PeerShouldHave {
BlockAndBlobs,
Neither,
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Display)]
pub enum PeerSource {
Attestation(PeerId),
Gossip(PeerId),
}
impl PeerSource {
fn as_peer_id(&self) -> &PeerId {
match self {
PeerSource::Attestation(id) => id,
PeerSource::Gossip(id) => id,
}
}
fn to_peer_id(self) -> PeerId {
match self {
PeerSource::Attestation(id) => id,
PeerSource::Gossip(id) => id,
}
}
}
#[derive(Debug, Copy, Clone)]
@@ -121,11 +136,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn search_block(
&mut self,
hash: Hash256,
peer_id: PeerId,
peer_usefulness: PeerShouldHave,
peer_source: PeerSource,
cx: &mut SyncNetworkContext<T>,
) {
self.search_block_with(|_| {}, hash, peer_id, peer_usefulness, cx)
self.search_block_with(|_| {}, hash, peer_source, cx)
}
/// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is
@@ -134,8 +148,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self,
cache_fn: impl Fn(&mut SingleBlockLookup<SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, T>),
hash: Hash256,
peer_id: PeerId,
peer_usefulness: PeerShouldHave,
peer_source: PeerSource,
cx: &mut SyncNetworkContext<T>,
) {
// Do not re-request a block that is already being requested
@@ -143,15 +156,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
.single_block_lookups
.iter_mut()
.any(|(_, _, single_block_request)| {
single_block_request.add_peer_if_useful(&hash, &peer_id, peer_usefulness)
single_block_request.add_peer_if_useful(&hash, peer_source)
})
{
return;
}
if self.parent_lookups.iter_mut().any(|parent_req| {
parent_req.add_peer_if_useful(&hash, &peer_id, peer_usefulness)
|| parent_req.contains_block(&hash)
parent_req.add_peer_if_useful(&hash, peer_source) || parent_req.contains_block(&hash)
}) {
// If the block was already downloaded, or is being downloaded in this moment, do not
// request it.
@@ -170,12 +182,12 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
debug!(
self.log,
"Searching for block";
"peer_id" => %peer_id,
"peer_id" => %peer_source,
"block" => %hash
);
let mut single_block_request =
SingleBlockLookup::new(hash, peer_id, self.da_checker.clone());
SingleBlockLookup::new(hash, peer_source, self.da_checker.clone());
cache_fn(&mut single_block_request);
let block_request_id =
@@ -213,8 +225,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let _ = request.add_block_wrapper(block_root, block.clone());
},
block_root,
peer_id,
PeerShouldHave::Neither,
PeerSource::Gossip(peer_id),
cx,
);
}
@@ -231,8 +242,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let _ = request.add_blob(blob.clone());
},
block_root,
peer_id,
PeerShouldHave::Neither,
PeerSource::Gossip(peer_id),
cx,
);
}
@@ -248,7 +258,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
cx: &mut SyncNetworkContext<T>,
) {
// Gossip blocks or blobs shouldn't be propogated if parents are unavailable.
let peer_usefulness = PeerShouldHave::BlockAndBlobs;
let peer_source = PeerSource::Attestation(peer_id);
// If this block or it's parent is part of a known failed chain, ignore it.
if self.failed_chains.contains(&parent_root) || self.failed_chains.contains(&block_root) {
@@ -261,7 +271,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// being searched for.
if self.parent_lookups.iter_mut().any(|parent_req| {
parent_req.contains_block(&block_root)
|| parent_req.add_peer_if_useful(&block_root, &peer_id, peer_usefulness)
|| parent_req.add_peer_if_useful(&block_root, peer_source)
}) {
// we are already searching for this block, ignore it
return;
@@ -276,8 +286,12 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
return;
}
let parent_lookup =
ParentLookup::new(block_root, parent_root, peer_id, self.da_checker.clone());
let parent_lookup = ParentLookup::new(
block_root,
parent_root,
peer_source,
self.da_checker.clone(),
);
self.request_parent_block_and_blobs(parent_lookup, cx);
}
@@ -518,8 +532,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
Ok(LookupDownloadStatus::SearchBlock(block_root)) => {
self.search_block(block_root, peer_id, PeerShouldHave::BlockAndBlobs, cx);
self.parent_lookups.push(parent_lookup)
if let Some(peer_source) =
parent_lookup.peer_source(ResponseType::Block, peer_id)
{
self.search_block(block_root, peer_source, cx);
self.parent_lookups.push(parent_lookup)
} else {
warn!(self.log, "Response from untracked peer"; "peer_id" => %peer_id, "block_root" => ?block_root);
}
}
Err(e) => {}
}
@@ -614,8 +634,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
LookupDownloadStatus::SearchBlock(block_root) => {
self.search_block(block_root, peer_id, PeerShouldHave::BlockAndBlobs, cx);
self.parent_lookups.push(parent_lookup)
if let Some(peer_source) =
parent_lookup.peer_source(ResponseType::Block, peer_id)
{
self.search_block(block_root, peer_source, cx);
self.parent_lookups.push(parent_lookup)
} else {
warn!(self.log, "Response from untracked peer"; "peer_id" => %peer_id, "block_root" => ?block_root);
}
}
}
}
@@ -834,8 +860,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
ShouldRemoveLookup::True
}
AvailabilityProcessingStatus::MissingComponents(_, block_root) => {
// At this point we don't know what the peer *should* have.
self.search_block(block_root, peer_id, PeerShouldHave::Neither, cx);
self.search_block(block_root, peer_id, cx);
ShouldRemoveLookup::False
}
},
@@ -862,7 +887,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
ShouldRemoveLookup::True
}
BlockError::ParentUnknown(block) => {
self.search_parent(block.slot(), root, block.parent_root(), peer_id, cx);
self.search_parent(
block.slot(),
root,
block.parent_root(),
peer_id.to_peer_id(),
cx,
);
ShouldRemoveLookup::False
}
ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => {
@@ -879,7 +910,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
other => {
warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id);
cx.report_peer(
peer_id,
peer_id.to_peer_id(),
PeerAction::MidToleranceError,
"single_block_failure",
);
@@ -888,7 +919,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
request_id_ref,
request_ref,
response_type,
&peer_id,
peer_id.as_peer_id(),
cx,
&self.log,
)
@@ -961,7 +992,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
_,
block_root,
)) => {
self.search_block(block_root, peer_id, PeerShouldHave::BlockAndBlobs, cx);
self.search_block(block_root, peer_id, cx);
}
BlockProcessingResult::Err(BlockError::ParentUnknown(block)) => {
parent_lookup.add_block_wrapper(block);
@@ -1024,7 +1055,11 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// This currently can be a host of errors. We permit this due to the partial
// ambiguity.
cx.report_peer(peer_id, PeerAction::MidToleranceError, "parent_request_err");
cx.report_peer(
peer_id.to_peer_id(),
PeerAction::MidToleranceError,
"parent_request_err",
);
// Try again if possible
match response_type {
@@ -1115,8 +1150,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.failed_chains.insert(chain_hash);
let mut all_peers = request.block_request_state.used_peers.clone();
all_peers.extend(request.blob_request_state.used_peers);
for peer_id in all_peers {
cx.report_peer(peer_id, penalty, "parent_chain_failure")
for peer_source in all_peers {
cx.report_peer(peer_source, penalty, "parent_chain_failure")
}
}
BatchProcessResult::NonFaultyFailure => {

View File

@@ -1,5 +1,5 @@
use super::single_block_lookup::{LookupRequestError, LookupVerifyError, SingleBlockLookup};
use super::{DownloadedBlocks, PeerShouldHave, ResponseType};
use super::{DownloadedBlocks, PeerSource, ResponseType};
use crate::sync::block_lookups::{single_block_lookup, RootBlobsTuple, RootBlockTuple};
use crate::sync::{
manager::{Id, SLOT_IMPORT_TOLERANCE},
@@ -76,7 +76,7 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
pub fn new(
block_root: Hash256,
parent_root: Hash256,
peer_id: PeerId,
peer_id: PeerSource,
da_checker: Arc<DataAvailabilityChecker<T::EthSpec, T::SlotClock>>,
) -> Self {
let current_parent_request = SingleBlockLookup::new(parent_root, peer_id, da_checker);
@@ -321,14 +321,9 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
.failed_attempts()
}
pub fn add_peer_if_useful(
&mut self,
block_root: &Hash256,
peer_id: &PeerId,
peer_usefulness: PeerShouldHave,
) -> bool {
pub fn add_peer_if_useful(&mut self, block_root: &Hash256, peer_source: PeerSource) -> bool {
self.current_parent_request
.add_peer_if_useful(block_root, peer_id, peer_usefulness)
.add_peer_if_useful(block_root, peer_source)
}
pub fn used_peers(&self, response_type: ResponseType) -> impl Iterator<Item = &PeerId> + '_ {
@@ -345,6 +340,15 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
.iter(),
}
}
pub(crate) fn peer_source(
&self,
response_type: ResponseType,
peer_id: PeerId,
) -> Option<PeerSource> {
self.current_parent_request
.peer_source(response_type, peer_id)
}
}
impl From<LookupVerifyError> for ParentVerifyError {

View File

@@ -14,7 +14,7 @@ use strum::IntoStaticStr;
use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList};
use types::{BlobSidecar, SignedBeaconBlock};
use super::{PeerShouldHave, ResponseType};
use super::{PeerSource, ResponseType};
pub struct SingleBlockLookup<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> {
pub requested_block_root: Hash256,
@@ -33,21 +33,23 @@ pub struct SingleBlockLookup<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> {
pub struct SingleLookupRequestState<const MAX_ATTEMPTS: u8> {
/// State of this request.
pub state: State,
/// Peers that should have this block.
/// Peers that should have this block or blob.
pub available_peers: HashSet<PeerId>,
/// Peers that mar or may not have this block or blob.
pub potential_peers: HashSet<PeerId>,
/// Peers from which we have requested this block.
pub used_peers: HashSet<PeerId>,
/// How many times have we attempted to process this block.
/// How many times have we attempted to process this block or blob.
failed_processing: u8,
/// How many times have we attempted to download this block.
/// How many times have we attempted to download this block or blob.
failed_downloading: u8,
}
#[derive(Debug, PartialEq, Eq)]
pub enum State {
AwaitingDownload,
Downloading { peer_id: PeerId },
Processing { peer_id: PeerId },
Downloading { peer_id: PeerSource },
Processing { peer_id: PeerSource },
}
#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
@@ -74,7 +76,7 @@ pub enum LookupRequestError {
impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS, T> {
pub fn new(
requested_block_root: Hash256,
peer_id: PeerId,
peer_source: PeerSource,
da_checker: Arc<DataAvailabilityChecker<T::EthSpec, T::SlotClock>>,
) -> Self {
Self {
@@ -82,8 +84,8 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
requested_ids: <_>::default(),
downloaded_block: None,
downloaded_blobs: <_>::default(),
block_request_state: SingleLookupRequestState::new(peer_id),
blob_request_state: SingleLookupRequestState::new(peer_id),
block_request_state: SingleLookupRequestState::new(peer_source),
blob_request_state: SingleLookupRequestState::new(peer_source),
da_checker,
}
}
@@ -277,7 +279,6 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
},
State::Processing { peer_id: _ } => match blob {
Some(_) => {
dbg!("here");
// We sent the blob for processing and received an extra blob.
self.blob_request_state.register_failure_downloading();
Err(LookupVerifyError::ExtraBlobsReturned)
@@ -317,8 +318,26 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
let request = BlocksByRootRequest {
block_roots: VariableList::from(vec![self.requested_block_root]),
};
self.block_request_state.state = State::Downloading { peer_id };
self.block_request_state.used_peers.insert(peer_id);
let peer_source = PeerSource::Attestation(peer_id);
self.block_request_state.state = State::Downloading {
peer_id: peer_source,
};
Ok(Some((peer_id, request)))
} else if let Some(&peer_id) = self
.block_request_state
.potential_peers
.iter()
.choose(&mut rand::thread_rng())
{
let request = BlocksByRootRequest {
block_roots: VariableList::from(vec![self.requested_block_root]),
};
self.block_request_state.used_peers.insert(peer_id);
let peer_source = PeerSource::Gossip(peer_id);
self.block_request_state.state = State::Downloading {
peer_id: peer_source,
};
Ok(Some((peer_id, request)))
} else {
Err(LookupRequestError::NoPeers)
@@ -353,46 +372,96 @@ impl<const MAX_ATTEMPTS: u8, T: BeaconChainTypes> SingleBlockLookup<MAX_ATTEMPTS
let request = BlobsByRootRequest {
blob_ids: VariableList::from(missing_ids.clone()),
};
self.requested_ids = missing_ids;
self.blob_request_state.state = State::Downloading { peer_id };
self.blob_request_state.used_peers.insert(peer_id);
let peer_source = PeerSource::Attestation(peer_id);
self.requested_ids = missing_ids;
self.blob_request_state.state = State::Downloading {
peer_id: peer_source,
};
Ok(Some((peer_id, request)))
} else if let Some(&peer_id) = self
.blob_request_state
.potential_peers
.iter()
.choose(&mut rand::thread_rng())
{
let request = BlobsByRootRequest {
blob_ids: VariableList::from(missing_ids.clone()),
};
self.blob_request_state.used_peers.insert(peer_id);
let peer_source = PeerSource::Gossip(peer_id);
self.requested_ids = missing_ids;
self.blob_request_state.state = State::Downloading {
peer_id: peer_source,
};
Ok(Some((peer_id, request)))
} else {
Err(LookupRequestError::NoPeers)
}
}
pub fn add_peer_if_useful(
&mut self,
block_root: &Hash256,
peer_id: &PeerId,
peer_usefulness: PeerShouldHave,
) -> bool {
pub fn add_peer_if_useful(&mut self, block_root: &Hash256, peer_source: PeerSource) -> bool {
if *block_root != self.requested_block_root {
return false;
}
match peer_usefulness {
PeerShouldHave::BlockAndBlobs => {
self.block_request_state.add_peer(peer_id);
self.blob_request_state.add_peer(peer_id);
match peer_source {
PeerSource::Attestation(peer_id) => {
self.block_request_state.add_peer(&peer_id);
self.blob_request_state.add_peer(&peer_id);
}
PeerSource::Gossip(peer_id) => {
self.block_request_state.add_potential_peer(&peer_id);
self.blob_request_state.add_potential_peer(&peer_id);
}
PeerShouldHave::Neither => {}
}
true
}
pub fn processing_peer(&self, response_type: ResponseType) -> Result<PeerId, ()> {
pub fn processing_peer(&self, response_type: ResponseType) -> Result<PeerSource, ()> {
match response_type {
ResponseType::Block => self.block_request_state.processing_peer(),
ResponseType::Blob => self.blob_request_state.processing_peer(),
}
}
pub(crate) fn peer_source(
&self,
response_type: ResponseType,
peer_id: PeerId,
) -> Option<PeerSource> {
match response_type {
ResponseType::Block => {
if self.block_request_state.available_peers.contains(&peer_id) {
Some(PeerSource::Attestation(peer_id))
} else if self.block_request_state.potential_peers.contains(&peer_id) {
Some(PeerSource::Gossip(peer_id))
} else {
None
}
}
ResponseType::Blob => {
if self.blob_request_state.available_peers.contains(&peer_id) {
Some(PeerSource::Attestation(peer_id))
} else if self.blob_request_state.potential_peers.contains(&peer_id) {
Some(PeerSource::Gossip(peer_id))
} else {
None
}
}
}
}
}
impl<const MAX_ATTEMPTS: u8> SingleLookupRequestState<MAX_ATTEMPTS> {
pub fn new(peer_id: PeerId) -> Self {
pub fn new(peer_source: PeerSource) -> Self {
let (available_peers, potential_peers) = match peer_source {
PeerSource::Attestation(peer_id) => (HashSet::from([peer_id]), HashSet::default()),
PeerSource::Gossip(peer_id) => (HashSet::default(), HashSet::from([peer_id])),
};
Self {
state: State::AwaitingDownload,
available_peers: HashSet::from([peer_id]),
available_peers,
potential_peers,
used_peers: HashSet::default(),
failed_processing: 0,
failed_downloading: 0,
@@ -417,15 +486,23 @@ impl<const MAX_ATTEMPTS: u8> SingleLookupRequestState<MAX_ATTEMPTS> {
self.failed_processing + self.failed_downloading
}
pub fn add_peer(&mut self, peer_id: &PeerId) -> bool {
self.available_peers.insert(*peer_id)
pub fn add_peer(&mut self, peer_id: &PeerId) {
self.potential_peers.remove(peer_id);
self.available_peers.insert(*peer_id);
}
pub fn add_potential_peer(&mut self, peer_id: &PeerId) {
if self.available_peers.contains(peer_id) {
self.potential_peers.insert(*peer_id);
}
}
/// If a peer disconnects, this request could be failed. If so, an error is returned
pub fn check_peer_disconnected(&mut self, dc_peer_id: &PeerId) -> Result<(), ()> {
self.available_peers.remove(dc_peer_id);
self.potential_peers.remove(dc_peer_id);
if let State::Downloading { peer_id } = &self.state {
if peer_id == dc_peer_id {
if peer_id.as_peer_id() == dc_peer_id {
// Peer disconnected before providing a block
self.register_failure_downloading();
return Err(());
@@ -434,7 +511,7 @@ impl<const MAX_ATTEMPTS: u8> SingleLookupRequestState<MAX_ATTEMPTS> {
Ok(())
}
pub fn processing_peer(&self) -> Result<PeerId, ()> {
pub fn processing_peer(&self) -> Result<PeerSource, ()> {
if let State::Processing { peer_id } = &self.state {
Ok(*peer_id)
} else {

File diff suppressed because it is too large Load Diff

View File

@@ -34,7 +34,7 @@
//! search for the block and subsequently search for parents if needed.
use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart};
use super::block_lookups::{BlockLookups, PeerShouldHave};
use super::block_lookups::{BlockLookups, PeerSource};
use super::network_context::{BlockOrBlob, SyncNetworkContext};
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
@@ -700,8 +700,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
if self.synced_and_connected(&peer_id) {
self.block_lookups.search_block(
block_hash,
peer_id,
PeerShouldHave::BlockAndBlobs,
PeerSource::Attestation(peer_id),
&mut self.network,
);
}
@@ -719,8 +718,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} else {
self.block_lookups.search_block(
block_hash,
peer_id,
PeerShouldHave::Neither,
PeerSource::Gossip(peer_id),
&mut self.network,
)
}