From 7a0388ef2aa8ea6de5cfa0be26dd000f011a6484 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 20 Jan 2025 19:31:18 +0700 Subject: [PATCH] Fix custodial peer assumption on lookup custody requests (#6815) * Fix custodial peer assumption on lookup custody requests * lint --- .../network/src/sync/block_lookups/common.rs | 17 ++++--- .../network/src/sync/block_lookups/mod.rs | 13 ++--- .../sync/block_lookups/single_block_lookup.rs | 49 ++++++------------- .../network/src/sync/network_context.rs | 41 ++++++++++++++-- .../src/sync/network_context/custody.rs | 20 ++++++-- 5 files changed, 80 insertions(+), 60 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index 5e336d9c38..8eefb2d675 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -9,6 +9,8 @@ use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::BeaconChainTypes; use lighthouse_network::service::api_types::Id; +use parking_lot::RwLock; +use std::collections::HashSet; use std::sync::Arc; use types::blob_sidecar::FixedBlobSidecarList; use types::{DataColumnSidecarList, SignedBeaconBlock}; @@ -41,7 +43,7 @@ pub trait RequestState { fn make_request( &self, id: Id, - peer_id: PeerId, + lookup_peers: Arc>>, expected_blobs: usize, cx: &mut SyncNetworkContext, ) -> Result; @@ -76,11 +78,11 @@ impl RequestState for BlockRequestState { fn make_request( &self, id: SingleLookupId, - peer_id: PeerId, + lookup_peers: Arc>>, _: usize, cx: &mut SyncNetworkContext, ) -> Result { - cx.block_lookup_request(id, peer_id, self.requested_block_root) + cx.block_lookup_request(id, lookup_peers, self.requested_block_root) .map_err(LookupRequestError::SendFailedNetwork) } @@ -124,11 +126,11 @@ impl RequestState for BlobRequestState { fn make_request( &self, id: Id, - peer_id: PeerId, + lookup_peers: Arc>>, expected_blobs: usize, cx: &mut SyncNetworkContext, ) -> Result { - cx.blob_lookup_request(id, peer_id, self.block_root, expected_blobs) + cx.blob_lookup_request(id, lookup_peers, self.block_root, expected_blobs) .map_err(LookupRequestError::SendFailedNetwork) } @@ -172,12 +174,11 @@ impl RequestState for CustodyRequestState { fn make_request( &self, id: Id, - // TODO(das): consider selecting peers that have custody but are in this set - _peer_id: PeerId, + lookup_peers: Arc>>, _: usize, cx: &mut SyncNetworkContext, ) -> Result { - cx.custody_lookup_request(id, self.block_root) + cx.custody_lookup_request(id, self.block_root, lookup_peers) .map_err(LookupRequestError::SendFailedNetwork) } diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 5a11bca481..ac4df42a4e 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -153,14 +153,7 @@ impl BlockLookups { pub(crate) fn active_single_lookups(&self) -> Vec { self.single_block_lookups .iter() - .map(|(id, l)| { - ( - *id, - l.block_root(), - l.awaiting_parent(), - l.all_peers().copied().collect(), - ) - }) + .map(|(id, l)| (*id, l.block_root(), l.awaiting_parent(), l.all_peers())) .collect() } @@ -283,7 +276,7 @@ impl BlockLookups { .find(|(_, l)| l.block_root() == parent_chain_tip) { cx.send_sync_message(SyncMessage::AddPeersForceRangeSync { - peers: lookup.all_peers().copied().collect(), + peers: lookup.all_peers(), head_slot: tip_lookup.peek_downloaded_block_slot(), head_root: parent_chain_tip, }); @@ -682,7 +675,7 @@ impl BlockLookups { lookup.continue_requests(cx) } Action::ParentUnknown { parent_root } => { - let peers = lookup.all_peers().copied().collect::>(); + let peers = lookup.all_peers(); lookup.set_awaiting_parent(parent_root); debug!(self.log, "Marking lookup as awaiting parent"; "id" => lookup.id, "block_root" => ?block_root, "parent_root" => ?parent_root); self.search_parent_of_child(parent_root, block_root, &peers, cx); diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index a096efcbb2..3789dbe91e 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -7,7 +7,7 @@ use crate::sync::network_context::{ use beacon_chain::{BeaconChainTypes, BlockProcessStatus}; use derivative::Derivative; use lighthouse_network::service::api_types::Id; -use rand::seq::IteratorRandom; +use parking_lot::RwLock; use std::collections::HashSet; use std::fmt::Debug; use std::sync::Arc; @@ -33,8 +33,6 @@ pub enum LookupRequestError { /// The failed attempts were primarily due to processing failures. cannot_process: bool, }, - /// No peers left to serve this lookup - NoPeers, /// Error sending event to network SendFailedNetwork(RpcRequestSendError), /// Error sending event to processor @@ -63,9 +61,12 @@ pub struct SingleBlockLookup { pub id: Id, pub block_request_state: BlockRequestState, pub component_requests: ComponentRequests, - /// Peers that claim to have imported this set of block components + /// Peers that claim to have imported this set of block components. This state is shared with + /// the custody request to have an updated view of the peers that claim to have imported the + /// block associated with this lookup. The peer set of a lookup can change rapidly, and faster + /// than the lifetime of a custody request. #[derivative(Debug(format_with = "fmt_peer_set_as_len"))] - peers: HashSet, + peers: Arc>>, block_root: Hash256, awaiting_parent: Option, created: Instant, @@ -92,7 +93,7 @@ impl SingleBlockLookup { id, block_request_state: BlockRequestState::new(requested_block_root), component_requests: ComponentRequests::WaitingForBlock, - peers: HashSet::from_iter(peers.iter().copied()), + peers: Arc::new(RwLock::new(HashSet::from_iter(peers.iter().copied()))), block_root: requested_block_root, awaiting_parent, created: Instant::now(), @@ -282,24 +283,11 @@ impl SingleBlockLookup { return Err(LookupRequestError::TooManyAttempts { cannot_process }); } - let Some(peer_id) = self.use_rand_available_peer() else { - // Allow lookup to not have any peers and do nothing. This is an optimization to not - // lose progress of lookups created from a block with unknown parent before we receive - // attestations for said block. - // Lookup sync event safety: If a lookup requires peers to make progress, and does - // not receive any new peers for some time it will be dropped. If it receives a new - // peer it must attempt to make progress. - R::request_state_mut(self) - .map_err(|e| LookupRequestError::BadState(e.to_owned()))? - .get_state_mut() - .update_awaiting_download_status("no peers"); - return Ok(()); - }; - + let peers = self.peers.clone(); let request = R::request_state_mut(self) .map_err(|e| LookupRequestError::BadState(e.to_owned()))?; - match request.make_request(id, peer_id, expected_blobs, cx)? { + match request.make_request(id, peers, expected_blobs, cx)? { LookupRequestResult::RequestSent(req_id) => { // Lookup sync event safety: If make_request returns `RequestSent`, we are // guaranteed that `BlockLookups::on_download_response` will be called exactly @@ -347,29 +335,24 @@ impl SingleBlockLookup { } /// Get all unique peers that claim to have imported this set of block components - pub fn all_peers(&self) -> impl Iterator + '_ { - self.peers.iter() + pub fn all_peers(&self) -> Vec { + self.peers.read().iter().copied().collect() } /// Add peer to all request states. The peer must be able to serve this request. /// Returns true if the peer was newly inserted into some request state. pub fn add_peer(&mut self, peer_id: PeerId) -> bool { - self.peers.insert(peer_id) + self.peers.write().insert(peer_id) } /// Remove peer from available peers. pub fn remove_peer(&mut self, peer_id: &PeerId) { - self.peers.remove(peer_id); + self.peers.write().remove(peer_id); } /// Returns true if this lookup has zero peers pub fn has_no_peers(&self) -> bool { - self.peers.is_empty() - } - - /// Selects a random peer from available peers if any - fn use_rand_available_peer(&mut self) -> Option { - self.peers.iter().choose(&mut rand::thread_rng()).copied() + self.peers.read().is_empty() } } @@ -688,8 +671,8 @@ impl std::fmt::Debug for State { } fn fmt_peer_set_as_len( - peer_set: &HashSet, + peer_set: &Arc>>, f: &mut std::fmt::Formatter, ) -> Result<(), std::fmt::Error> { - write!(f, "{}", peer_set.len()) + write!(f, "{}", peer_set.read().len()) } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 0a6bc8961f..f899936128 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -27,7 +27,8 @@ use lighthouse_network::service::api_types::{ DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, }; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; -use rand::seq::SliceRandom; +use parking_lot::RwLock; +use rand::prelude::IteratorRandom; use rand::thread_rng; pub use requests::LookupVerifyError; use requests::{ @@ -308,8 +309,8 @@ impl SyncNetworkContext { pub fn get_random_custodial_peer(&self, column_index: ColumnIndex) -> Option { self.get_custodial_peers(column_index) + .into_iter() .choose(&mut thread_rng()) - .cloned() } pub fn network_globals(&self) -> &NetworkGlobals { @@ -562,9 +563,24 @@ impl SyncNetworkContext { pub fn block_lookup_request( &mut self, lookup_id: SingleLookupId, - peer_id: PeerId, + lookup_peers: Arc>>, block_root: Hash256, ) -> Result { + let Some(peer_id) = lookup_peers + .read() + .iter() + .choose(&mut rand::thread_rng()) + .copied() + else { + // Allow lookup to not have any peers and do nothing. This is an optimization to not + // lose progress of lookups created from a block with unknown parent before we receive + // attestations for said block. + // Lookup sync event safety: If a lookup requires peers to make progress, and does + // not receive any new peers for some time it will be dropped. If it receives a new + // peer it must attempt to make progress. + return Ok(LookupRequestResult::Pending("no peers")); + }; + match self.chain.get_block_process_status(&block_root) { // Unknown block, continue request to download BlockProcessStatus::Unknown => {} @@ -634,10 +650,25 @@ impl SyncNetworkContext { pub fn blob_lookup_request( &mut self, lookup_id: SingleLookupId, - peer_id: PeerId, + lookup_peers: Arc>>, block_root: Hash256, expected_blobs: usize, ) -> Result { + let Some(peer_id) = lookup_peers + .read() + .iter() + .choose(&mut rand::thread_rng()) + .copied() + else { + // Allow lookup to not have any peers and do nothing. This is an optimization to not + // lose progress of lookups created from a block with unknown parent before we receive + // attestations for said block. + // Lookup sync event safety: If a lookup requires peers to make progress, and does + // not receive any new peers for some time it will be dropped. If it receives a new + // peer it must attempt to make progress. + return Ok(LookupRequestResult::Pending("no peers")); + }; + let imported_blob_indexes = self .chain .data_availability_checker @@ -740,6 +771,7 @@ impl SyncNetworkContext { &mut self, lookup_id: SingleLookupId, block_root: Hash256, + lookup_peers: Arc>>, ) -> Result { let custody_indexes_imported = self .chain @@ -777,6 +809,7 @@ impl SyncNetworkContext { block_root, CustodyId { requester }, &custody_indexes_to_fetch, + lookup_peers, self.log.clone(), ); diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index e4bce3dafc..8a29545c21 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -7,8 +7,10 @@ use fnv::FnvHashMap; use lighthouse_network::service::api_types::{CustodyId, DataColumnsByRootRequester}; use lighthouse_network::PeerId; use lru_cache::LRUTimeCache; +use parking_lot::RwLock; use rand::Rng; use slog::{debug, warn}; +use std::collections::HashSet; use std::time::{Duration, Instant}; use std::{collections::HashMap, marker::PhantomData, sync::Arc}; use types::EthSpec; @@ -32,6 +34,8 @@ pub struct ActiveCustodyRequest { /// Peers that have recently failed to successfully respond to a columns by root request. /// Having a LRUTimeCache allows this request to not have to track disconnecting peers. failed_peers: LRUTimeCache, + /// Set of peers that claim to have imported this block and their custody columns + lookup_peers: Arc>>, /// Logger for the `SyncNetworkContext`. pub log: slog::Logger, _phantom: PhantomData, @@ -64,6 +68,7 @@ impl ActiveCustodyRequest { block_root: Hash256, custody_id: CustodyId, column_indices: &[ColumnIndex], + lookup_peers: Arc>>, log: slog::Logger, ) -> Self { Self { @@ -76,6 +81,7 @@ impl ActiveCustodyRequest { ), active_batch_columns_requests: <_>::default(), failed_peers: LRUTimeCache::new(Duration::from_secs(FAILED_PEERS_CACHE_EXPIRY_SECONDS)), + lookup_peers, log, _phantom: PhantomData, } @@ -215,6 +221,7 @@ impl ActiveCustodyRequest { } let mut columns_to_request_by_peer = HashMap::>::new(); + let lookup_peers = self.lookup_peers.read(); // Need to: // - track how many active requests a peer has for load balancing @@ -244,6 +251,8 @@ impl ActiveCustodyRequest { .iter() .map(|peer| { ( + // Prioritize peers that claim to know have imported this block + if lookup_peers.contains(peer) { 0 } else { 1 }, // De-prioritize peers that have failed to successfully respond to // requests recently self.failed_peers.contains(peer), @@ -257,7 +266,7 @@ impl ActiveCustodyRequest { .collect::>(); priorized_peers.sort_unstable(); - if let Some((_, _, _, peer_id)) = priorized_peers.first() { + if let Some((_, _, _, _, peer_id)) = priorized_peers.first() { columns_to_request_by_peer .entry(*peer_id) .or_default() @@ -283,10 +292,11 @@ impl ActiveCustodyRequest { block_root: self.block_root, indices: indices.clone(), }, - // true = enforce max_requests are returned data_columns_by_root. We only issue requests - // for blocks after we know the block has data, and only request peers after they claim to - // have imported the block+columns and claim to be custodians - true, + // If peer is in the lookup peer set, it claims to have imported the block and + // must have its columns in custody. In that case, set `true = enforce max_requests` + // and downscore if data_columns_by_root does not returned the expected custody + // columns. For the rest of peers, don't downscore if columns are missing. + lookup_peers.contains(&peer_id), ) .map_err(Error::SendFailed)?;