Fix custodial peer assumption on lookup custody requests (#6815)

* Fix custodial peer assumption on lookup custody requests

* lint
This commit is contained in:
Lion - dapplion
2025-01-20 19:31:18 +07:00
committed by GitHub
parent 6ce33c4d1d
commit 7a0388ef2a
5 changed files with 80 additions and 60 deletions

View File

@@ -9,6 +9,8 @@ use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes; use beacon_chain::BeaconChainTypes;
use lighthouse_network::service::api_types::Id; use lighthouse_network::service::api_types::Id;
use parking_lot::RwLock;
use std::collections::HashSet;
use std::sync::Arc; use std::sync::Arc;
use types::blob_sidecar::FixedBlobSidecarList; use types::blob_sidecar::FixedBlobSidecarList;
use types::{DataColumnSidecarList, SignedBeaconBlock}; use types::{DataColumnSidecarList, SignedBeaconBlock};
@@ -41,7 +43,7 @@ pub trait RequestState<T: BeaconChainTypes> {
fn make_request( fn make_request(
&self, &self,
id: Id, id: Id,
peer_id: PeerId, lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
expected_blobs: usize, expected_blobs: usize,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError>; ) -> Result<LookupRequestResult, LookupRequestError>;
@@ -76,11 +78,11 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
fn make_request( fn make_request(
&self, &self,
id: SingleLookupId, id: SingleLookupId,
peer_id: PeerId, lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
_: usize, _: usize,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> { ) -> Result<LookupRequestResult, LookupRequestError> {
cx.block_lookup_request(id, peer_id, self.requested_block_root) cx.block_lookup_request(id, lookup_peers, self.requested_block_root)
.map_err(LookupRequestError::SendFailedNetwork) .map_err(LookupRequestError::SendFailedNetwork)
} }
@@ -124,11 +126,11 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
fn make_request( fn make_request(
&self, &self,
id: Id, id: Id,
peer_id: PeerId, lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
expected_blobs: usize, expected_blobs: usize,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> { ) -> Result<LookupRequestResult, LookupRequestError> {
cx.blob_lookup_request(id, peer_id, self.block_root, expected_blobs) cx.blob_lookup_request(id, lookup_peers, self.block_root, expected_blobs)
.map_err(LookupRequestError::SendFailedNetwork) .map_err(LookupRequestError::SendFailedNetwork)
} }
@@ -172,12 +174,11 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
fn make_request( fn make_request(
&self, &self,
id: Id, id: Id,
// TODO(das): consider selecting peers that have custody but are in this set lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
_peer_id: PeerId,
_: usize, _: usize,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> { ) -> Result<LookupRequestResult, LookupRequestError> {
cx.custody_lookup_request(id, self.block_root) cx.custody_lookup_request(id, self.block_root, lookup_peers)
.map_err(LookupRequestError::SendFailedNetwork) .map_err(LookupRequestError::SendFailedNetwork)
} }

View File

@@ -153,14 +153,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub(crate) fn active_single_lookups(&self) -> Vec<BlockLookupSummary> { pub(crate) fn active_single_lookups(&self) -> Vec<BlockLookupSummary> {
self.single_block_lookups self.single_block_lookups
.iter() .iter()
.map(|(id, l)| { .map(|(id, l)| (*id, l.block_root(), l.awaiting_parent(), l.all_peers()))
(
*id,
l.block_root(),
l.awaiting_parent(),
l.all_peers().copied().collect(),
)
})
.collect() .collect()
} }
@@ -283,7 +276,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
.find(|(_, l)| l.block_root() == parent_chain_tip) .find(|(_, l)| l.block_root() == parent_chain_tip)
{ {
cx.send_sync_message(SyncMessage::AddPeersForceRangeSync { cx.send_sync_message(SyncMessage::AddPeersForceRangeSync {
peers: lookup.all_peers().copied().collect(), peers: lookup.all_peers(),
head_slot: tip_lookup.peek_downloaded_block_slot(), head_slot: tip_lookup.peek_downloaded_block_slot(),
head_root: parent_chain_tip, head_root: parent_chain_tip,
}); });
@@ -682,7 +675,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
lookup.continue_requests(cx) lookup.continue_requests(cx)
} }
Action::ParentUnknown { parent_root } => { Action::ParentUnknown { parent_root } => {
let peers = lookup.all_peers().copied().collect::<Vec<_>>(); let peers = lookup.all_peers();
lookup.set_awaiting_parent(parent_root); lookup.set_awaiting_parent(parent_root);
debug!(self.log, "Marking lookup as awaiting parent"; "id" => lookup.id, "block_root" => ?block_root, "parent_root" => ?parent_root); debug!(self.log, "Marking lookup as awaiting parent"; "id" => lookup.id, "block_root" => ?block_root, "parent_root" => ?parent_root);
self.search_parent_of_child(parent_root, block_root, &peers, cx); self.search_parent_of_child(parent_root, block_root, &peers, cx);

View File

@@ -7,7 +7,7 @@ use crate::sync::network_context::{
use beacon_chain::{BeaconChainTypes, BlockProcessStatus}; use beacon_chain::{BeaconChainTypes, BlockProcessStatus};
use derivative::Derivative; use derivative::Derivative;
use lighthouse_network::service::api_types::Id; use lighthouse_network::service::api_types::Id;
use rand::seq::IteratorRandom; use parking_lot::RwLock;
use std::collections::HashSet; use std::collections::HashSet;
use std::fmt::Debug; use std::fmt::Debug;
use std::sync::Arc; use std::sync::Arc;
@@ -33,8 +33,6 @@ pub enum LookupRequestError {
/// The failed attempts were primarily due to processing failures. /// The failed attempts were primarily due to processing failures.
cannot_process: bool, cannot_process: bool,
}, },
/// No peers left to serve this lookup
NoPeers,
/// Error sending event to network /// Error sending event to network
SendFailedNetwork(RpcRequestSendError), SendFailedNetwork(RpcRequestSendError),
/// Error sending event to processor /// Error sending event to processor
@@ -63,9 +61,12 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
pub id: Id, pub id: Id,
pub block_request_state: BlockRequestState<T::EthSpec>, pub block_request_state: BlockRequestState<T::EthSpec>,
pub component_requests: ComponentRequests<T::EthSpec>, pub component_requests: ComponentRequests<T::EthSpec>,
/// Peers that claim to have imported this set of block components /// Peers that claim to have imported this set of block components. This state is shared with
/// the custody request to have an updated view of the peers that claim to have imported the
/// block associated with this lookup. The peer set of a lookup can change rapidly, and faster
/// than the lifetime of a custody request.
#[derivative(Debug(format_with = "fmt_peer_set_as_len"))] #[derivative(Debug(format_with = "fmt_peer_set_as_len"))]
peers: HashSet<PeerId>, peers: Arc<RwLock<HashSet<PeerId>>>,
block_root: Hash256, block_root: Hash256,
awaiting_parent: Option<Hash256>, awaiting_parent: Option<Hash256>,
created: Instant, created: Instant,
@@ -92,7 +93,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
id, id,
block_request_state: BlockRequestState::new(requested_block_root), block_request_state: BlockRequestState::new(requested_block_root),
component_requests: ComponentRequests::WaitingForBlock, component_requests: ComponentRequests::WaitingForBlock,
peers: HashSet::from_iter(peers.iter().copied()), peers: Arc::new(RwLock::new(HashSet::from_iter(peers.iter().copied()))),
block_root: requested_block_root, block_root: requested_block_root,
awaiting_parent, awaiting_parent,
created: Instant::now(), created: Instant::now(),
@@ -282,24 +283,11 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
return Err(LookupRequestError::TooManyAttempts { cannot_process }); return Err(LookupRequestError::TooManyAttempts { cannot_process });
} }
let Some(peer_id) = self.use_rand_available_peer() else { let peers = self.peers.clone();
// Allow lookup to not have any peers and do nothing. This is an optimization to not
// lose progress of lookups created from a block with unknown parent before we receive
// attestations for said block.
// Lookup sync event safety: If a lookup requires peers to make progress, and does
// not receive any new peers for some time it will be dropped. If it receives a new
// peer it must attempt to make progress.
R::request_state_mut(self)
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?
.get_state_mut()
.update_awaiting_download_status("no peers");
return Ok(());
};
let request = R::request_state_mut(self) let request = R::request_state_mut(self)
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?; .map_err(|e| LookupRequestError::BadState(e.to_owned()))?;
match request.make_request(id, peer_id, expected_blobs, cx)? { match request.make_request(id, peers, expected_blobs, cx)? {
LookupRequestResult::RequestSent(req_id) => { LookupRequestResult::RequestSent(req_id) => {
// Lookup sync event safety: If make_request returns `RequestSent`, we are // Lookup sync event safety: If make_request returns `RequestSent`, we are
// guaranteed that `BlockLookups::on_download_response` will be called exactly // guaranteed that `BlockLookups::on_download_response` will be called exactly
@@ -347,29 +335,24 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
} }
/// Get all unique peers that claim to have imported this set of block components /// Get all unique peers that claim to have imported this set of block components
pub fn all_peers(&self) -> impl Iterator<Item = &PeerId> + '_ { pub fn all_peers(&self) -> Vec<PeerId> {
self.peers.iter() self.peers.read().iter().copied().collect()
} }
/// Add peer to all request states. The peer must be able to serve this request. /// Add peer to all request states. The peer must be able to serve this request.
/// Returns true if the peer was newly inserted into some request state. /// Returns true if the peer was newly inserted into some request state.
pub fn add_peer(&mut self, peer_id: PeerId) -> bool { pub fn add_peer(&mut self, peer_id: PeerId) -> bool {
self.peers.insert(peer_id) self.peers.write().insert(peer_id)
} }
/// Remove peer from available peers. /// Remove peer from available peers.
pub fn remove_peer(&mut self, peer_id: &PeerId) { pub fn remove_peer(&mut self, peer_id: &PeerId) {
self.peers.remove(peer_id); self.peers.write().remove(peer_id);
} }
/// Returns true if this lookup has zero peers /// Returns true if this lookup has zero peers
pub fn has_no_peers(&self) -> bool { pub fn has_no_peers(&self) -> bool {
self.peers.is_empty() self.peers.read().is_empty()
}
/// Selects a random peer from available peers if any
fn use_rand_available_peer(&mut self) -> Option<PeerId> {
self.peers.iter().choose(&mut rand::thread_rng()).copied()
} }
} }
@@ -688,8 +671,8 @@ impl<T: Clone> std::fmt::Debug for State<T> {
} }
fn fmt_peer_set_as_len( fn fmt_peer_set_as_len(
peer_set: &HashSet<PeerId>, peer_set: &Arc<RwLock<HashSet<PeerId>>>,
f: &mut std::fmt::Formatter, f: &mut std::fmt::Formatter,
) -> Result<(), std::fmt::Error> { ) -> Result<(), std::fmt::Error> {
write!(f, "{}", peer_set.len()) write!(f, "{}", peer_set.read().len())
} }

View File

@@ -27,7 +27,8 @@ use lighthouse_network::service::api_types::{
DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId,
}; };
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource};
use rand::seq::SliceRandom; use parking_lot::RwLock;
use rand::prelude::IteratorRandom;
use rand::thread_rng; use rand::thread_rng;
pub use requests::LookupVerifyError; pub use requests::LookupVerifyError;
use requests::{ use requests::{
@@ -308,8 +309,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn get_random_custodial_peer(&self, column_index: ColumnIndex) -> Option<PeerId> { pub fn get_random_custodial_peer(&self, column_index: ColumnIndex) -> Option<PeerId> {
self.get_custodial_peers(column_index) self.get_custodial_peers(column_index)
.into_iter()
.choose(&mut thread_rng()) .choose(&mut thread_rng())
.cloned()
} }
pub fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> { pub fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
@@ -562,9 +563,24 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn block_lookup_request( pub fn block_lookup_request(
&mut self, &mut self,
lookup_id: SingleLookupId, lookup_id: SingleLookupId,
peer_id: PeerId, lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
block_root: Hash256, block_root: Hash256,
) -> Result<LookupRequestResult, RpcRequestSendError> { ) -> Result<LookupRequestResult, RpcRequestSendError> {
let Some(peer_id) = lookup_peers
.read()
.iter()
.choose(&mut rand::thread_rng())
.copied()
else {
// Allow lookup to not have any peers and do nothing. This is an optimization to not
// lose progress of lookups created from a block with unknown parent before we receive
// attestations for said block.
// Lookup sync event safety: If a lookup requires peers to make progress, and does
// not receive any new peers for some time it will be dropped. If it receives a new
// peer it must attempt to make progress.
return Ok(LookupRequestResult::Pending("no peers"));
};
match self.chain.get_block_process_status(&block_root) { match self.chain.get_block_process_status(&block_root) {
// Unknown block, continue request to download // Unknown block, continue request to download
BlockProcessStatus::Unknown => {} BlockProcessStatus::Unknown => {}
@@ -634,10 +650,25 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn blob_lookup_request( pub fn blob_lookup_request(
&mut self, &mut self,
lookup_id: SingleLookupId, lookup_id: SingleLookupId,
peer_id: PeerId, lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
block_root: Hash256, block_root: Hash256,
expected_blobs: usize, expected_blobs: usize,
) -> Result<LookupRequestResult, RpcRequestSendError> { ) -> Result<LookupRequestResult, RpcRequestSendError> {
let Some(peer_id) = lookup_peers
.read()
.iter()
.choose(&mut rand::thread_rng())
.copied()
else {
// Allow lookup to not have any peers and do nothing. This is an optimization to not
// lose progress of lookups created from a block with unknown parent before we receive
// attestations for said block.
// Lookup sync event safety: If a lookup requires peers to make progress, and does
// not receive any new peers for some time it will be dropped. If it receives a new
// peer it must attempt to make progress.
return Ok(LookupRequestResult::Pending("no peers"));
};
let imported_blob_indexes = self let imported_blob_indexes = self
.chain .chain
.data_availability_checker .data_availability_checker
@@ -740,6 +771,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&mut self, &mut self,
lookup_id: SingleLookupId, lookup_id: SingleLookupId,
block_root: Hash256, block_root: Hash256,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
) -> Result<LookupRequestResult, RpcRequestSendError> { ) -> Result<LookupRequestResult, RpcRequestSendError> {
let custody_indexes_imported = self let custody_indexes_imported = self
.chain .chain
@@ -777,6 +809,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
block_root, block_root,
CustodyId { requester }, CustodyId { requester },
&custody_indexes_to_fetch, &custody_indexes_to_fetch,
lookup_peers,
self.log.clone(), self.log.clone(),
); );

View File

@@ -7,8 +7,10 @@ use fnv::FnvHashMap;
use lighthouse_network::service::api_types::{CustodyId, DataColumnsByRootRequester}; use lighthouse_network::service::api_types::{CustodyId, DataColumnsByRootRequester};
use lighthouse_network::PeerId; use lighthouse_network::PeerId;
use lru_cache::LRUTimeCache; use lru_cache::LRUTimeCache;
use parking_lot::RwLock;
use rand::Rng; use rand::Rng;
use slog::{debug, warn}; use slog::{debug, warn};
use std::collections::HashSet;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::{collections::HashMap, marker::PhantomData, sync::Arc}; use std::{collections::HashMap, marker::PhantomData, sync::Arc};
use types::EthSpec; use types::EthSpec;
@@ -32,6 +34,8 @@ pub struct ActiveCustodyRequest<T: BeaconChainTypes> {
/// Peers that have recently failed to successfully respond to a columns by root request. /// Peers that have recently failed to successfully respond to a columns by root request.
/// Having a LRUTimeCache allows this request to not have to track disconnecting peers. /// Having a LRUTimeCache allows this request to not have to track disconnecting peers.
failed_peers: LRUTimeCache<PeerId>, failed_peers: LRUTimeCache<PeerId>,
/// Set of peers that claim to have imported this block and their custody columns
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
/// Logger for the `SyncNetworkContext`. /// Logger for the `SyncNetworkContext`.
pub log: slog::Logger, pub log: slog::Logger,
_phantom: PhantomData<T>, _phantom: PhantomData<T>,
@@ -64,6 +68,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
block_root: Hash256, block_root: Hash256,
custody_id: CustodyId, custody_id: CustodyId,
column_indices: &[ColumnIndex], column_indices: &[ColumnIndex],
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
log: slog::Logger, log: slog::Logger,
) -> Self { ) -> Self {
Self { Self {
@@ -76,6 +81,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
), ),
active_batch_columns_requests: <_>::default(), active_batch_columns_requests: <_>::default(),
failed_peers: LRUTimeCache::new(Duration::from_secs(FAILED_PEERS_CACHE_EXPIRY_SECONDS)), failed_peers: LRUTimeCache::new(Duration::from_secs(FAILED_PEERS_CACHE_EXPIRY_SECONDS)),
lookup_peers,
log, log,
_phantom: PhantomData, _phantom: PhantomData,
} }
@@ -215,6 +221,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
} }
let mut columns_to_request_by_peer = HashMap::<PeerId, Vec<ColumnIndex>>::new(); let mut columns_to_request_by_peer = HashMap::<PeerId, Vec<ColumnIndex>>::new();
let lookup_peers = self.lookup_peers.read();
// Need to: // Need to:
// - track how many active requests a peer has for load balancing // - track how many active requests a peer has for load balancing
@@ -244,6 +251,8 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
.iter() .iter()
.map(|peer| { .map(|peer| {
( (
// Prioritize peers that claim to know have imported this block
if lookup_peers.contains(peer) { 0 } else { 1 },
// De-prioritize peers that have failed to successfully respond to // De-prioritize peers that have failed to successfully respond to
// requests recently // requests recently
self.failed_peers.contains(peer), self.failed_peers.contains(peer),
@@ -257,7 +266,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
priorized_peers.sort_unstable(); priorized_peers.sort_unstable();
if let Some((_, _, _, peer_id)) = priorized_peers.first() { if let Some((_, _, _, _, peer_id)) = priorized_peers.first() {
columns_to_request_by_peer columns_to_request_by_peer
.entry(*peer_id) .entry(*peer_id)
.or_default() .or_default()
@@ -283,10 +292,11 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
block_root: self.block_root, block_root: self.block_root,
indices: indices.clone(), indices: indices.clone(),
}, },
// true = enforce max_requests are returned data_columns_by_root. We only issue requests // If peer is in the lookup peer set, it claims to have imported the block and
// for blocks after we know the block has data, and only request peers after they claim to // must have its columns in custody. In that case, set `true = enforce max_requests`
// have imported the block+columns and claim to be custodians // and downscore if data_columns_by_root does not returned the expected custody
true, // columns. For the rest of peers, don't downscore if columns are missing.
lookup_peers.contains(&peer_id),
) )
.map_err(Error::SendFailed)?; .map_err(Error::SendFailed)?;