Remove total_requests_per_peer

This commit is contained in:
dapplion
2025-06-11 11:21:12 +02:00
parent 28d9d8b8e2
commit 7a03578795
5 changed files with 5 additions and 45 deletions

View File

@@ -26,7 +26,7 @@ use logging::crit;
use parking_lot::RwLock; use parking_lot::RwLock;
use std::collections::{ use std::collections::{
btree_map::{BTreeMap, Entry}, btree_map::{BTreeMap, Entry},
HashMap, HashSet, HashSet,
}; };
use std::sync::Arc; use std::sync::Arc;
use tracing::{debug, error, info, instrument, warn}; use tracing::{debug, error, info, instrument, warn};
@@ -932,8 +932,6 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
RangeRequestId::BackfillSync { batch_id }, RangeRequestId::BackfillSync { batch_id },
self.peers.clone(), self.peers.clone(),
&failed_peers, &failed_peers,
// Does not track total requests per peers for now
&HashMap::new(),
) { ) {
Ok(request_id) => { Ok(request_id) => {
// inform the batch about the new request // inform the batch about the new request

View File

@@ -480,21 +480,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
requester: RangeRequestId, requester: RangeRequestId,
peers: Arc<RwLock<HashSet<PeerId>>>, peers: Arc<RwLock<HashSet<PeerId>>>,
peers_to_deprioritize: &HashSet<PeerId>, peers_to_deprioritize: &HashSet<PeerId>,
total_requests_per_peer: &HashMap<PeerId, usize>,
) -> Result<Id, RpcRequestSendError> { ) -> Result<Id, RpcRequestSendError> {
let id = ComponentsByRangeRequestId { let id = ComponentsByRangeRequestId {
id: self.next_id(), id: self.next_id(),
requester, requester,
}; };
let req = BlockComponentsByRangeRequest::new( let req =
id, BlockComponentsByRangeRequest::new(id, request, peers, peers_to_deprioritize, self)?;
request,
peers,
peers_to_deprioritize,
total_requests_per_peer,
self,
)?;
self.block_components_by_range_requests.insert(id, req); self.block_components_by_range_requests.insert(id, req);

View File

@@ -105,7 +105,6 @@ impl<T: BeaconChainTypes> BlockComponentsByRangeRequest<T> {
request: BlocksByRangeRequest, request: BlocksByRangeRequest,
peers: Arc<RwLock<HashSet<PeerId>>>, peers: Arc<RwLock<HashSet<PeerId>>>,
peers_to_deprioritize: &HashSet<PeerId>, peers_to_deprioritize: &HashSet<PeerId>,
total_requests_per_peer: &HashMap<PeerId, usize>,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
) -> Result<Self, RpcRequestSendError> { ) -> Result<Self, RpcRequestSendError> {
// Induces a compile time panic if this doesn't hold true. // Induces a compile time panic if this doesn't hold true.
@@ -129,19 +128,13 @@ impl<T: BeaconChainTypes> BlockComponentsByRangeRequest<T> {
( (
// If contains -> 1 (order after), not contains -> 0 (order first) // If contains -> 1 (order after), not contains -> 0 (order first)
peers_to_deprioritize.contains(peer), peers_to_deprioritize.contains(peer),
// TODO(das): Should we use active_request_count_by_peer?
// Prefer peers with less overall requests
// active_request_count_by_peer.get(peer).copied().unwrap_or(0),
// Prefer peers with less total cummulative requests, so we fetch data from a
// diverse set of peers
total_requests_per_peer.get(peer).copied().unwrap_or(0),
// Random factor to break ties, otherwise the PeerID breaks ties // Random factor to break ties, otherwise the PeerID breaks ties
rand::random::<u32>(), rand::random::<u32>(),
peer, peer,
) )
}) })
.min() .min()
.map(|(_, _, _, peer)| *peer) .map(|(_, _, peer)| *peer)
else { else {
// When a peer disconnects and is removed from the SyncingChain peer set, if the set // When a peer disconnects and is removed from the SyncingChain peer set, if the set
// reaches zero the SyncingChain is removed. // reaches zero the SyncingChain is removed.

View File

@@ -1,6 +1,5 @@
use crate::sync::network_context::PeerGroup; use crate::sync::network_context::PeerGroup;
use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::block_verification_types::RpcBlock;
use itertools::Itertools;
use lighthouse_network::rpc::methods::BlocksByRangeRequest; use lighthouse_network::rpc::methods::BlocksByRangeRequest;
use lighthouse_network::service::api_types::Id; use lighthouse_network::service::api_types::Id;
use lighthouse_network::PeerId; use lighthouse_network::PeerId;
@@ -46,12 +45,6 @@ impl BatchPeers {
pub fn column(&self, index: &ColumnIndex) -> Option<&PeerId> { pub fn column(&self, index: &ColumnIndex) -> Option<&PeerId> {
self.column_peers.of_index(&((*index) as usize)) self.column_peers.of_index(&((*index) as usize))
} }
pub fn iter_unique_peers(&self) -> impl Iterator<Item = &PeerId> {
std::iter::once(&self.block_peer)
.chain(self.column_peers.all())
.unique()
}
} }
/// Allows customisation of the above constants used in other sync methods such as BackFillSync. /// Allows customisation of the above constants used in other sync methods such as BackFillSync.

View File

@@ -11,7 +11,7 @@ use lighthouse_network::service::api_types::Id;
use lighthouse_network::{PeerAction, PeerId}; use lighthouse_network::{PeerAction, PeerId};
use logging::crit; use logging::crit;
use parking_lot::RwLock; use parking_lot::RwLock;
use std::collections::{btree_map::Entry, BTreeMap, HashMap, HashSet}; use std::collections::{btree_map::Entry, BTreeMap, HashSet};
use std::sync::Arc; use std::sync::Arc;
use strum::IntoStaticStr; use strum::IntoStaticStr;
use tracing::{debug, instrument, warn}; use tracing::{debug, instrument, warn};
@@ -90,15 +90,8 @@ pub struct SyncingChain<T: BeaconChainTypes> {
/// The peers that agree on the `target_head_slot` and `target_head_root` as a canonical chain /// The peers that agree on the `target_head_slot` and `target_head_root` as a canonical chain
/// and thus available to download this chain from. /// and thus available to download this chain from.
///
/// Also, For each peer tracks the total requests done per peer as part of this SyncingChain
/// `HashMap<peer, total_requests_per_peer>`
peers: Arc<RwLock<HashSet<PeerId>>>, peers: Arc<RwLock<HashSet<PeerId>>>,
/// Tracks the total requests done to each peer for this SyncingChain. Forces us to fetch data
/// from all peers to prevent eclipse attacks
requests_per_peer: HashMap<PeerId, usize>,
/// Starting epoch of the next batch that needs to be downloaded. /// Starting epoch of the next batch that needs to be downloaded.
to_be_downloaded: BatchId, to_be_downloaded: BatchId,
@@ -160,7 +153,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
target_head_root, target_head_root,
batches: BTreeMap::new(), batches: BTreeMap::new(),
peers: Arc::new(RwLock::new(HashSet::from_iter([peer_id]))), peers: Arc::new(RwLock::new(HashSet::from_iter([peer_id]))),
requests_per_peer: HashMap::from_iter([(peer_id, <_>::default())]),
to_be_downloaded: start_epoch, to_be_downloaded: start_epoch,
processing_target: start_epoch, processing_target: start_epoch,
optimistic_start: None, optimistic_start: None,
@@ -221,7 +213,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
#[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)] #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult { pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult {
self.peers.write().remove(peer_id); self.peers.write().remove(peer_id);
self.requests_per_peer.remove(peer_id);
if self.peers.read().is_empty() { if self.peers.read().is_empty() {
Err(RemoveChain::EmptyPeerPool) Err(RemoveChain::EmptyPeerPool)
@@ -250,12 +241,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
request_id: Id, request_id: Id,
blocks: Vec<RpcBlock<T::EthSpec>>, blocks: Vec<RpcBlock<T::EthSpec>>,
) -> ProcessingResult { ) -> ProcessingResult {
// Account for one more requests to this peer
// TODO(das): this code assumes that we do a single request per peer per RpcBlock
for peer in batch_peers.iter_unique_peers() {
*self.requests_per_peer.entry(*peer).or_default() += 1;
}
// check if we have this batch // check if we have this batch
let batch = match self.batches.get_mut(&batch_id) { let batch = match self.batches.get_mut(&batch_id) {
None => { None => {
@@ -873,7 +858,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
peer_id: PeerId, peer_id: PeerId,
) -> ProcessingResult { ) -> ProcessingResult {
self.peers.write().insert(peer_id); self.peers.write().insert(peer_id);
self.requests_per_peer.insert(peer_id, <_>::default());
self.request_batches(network) self.request_batches(network)
} }
@@ -955,7 +939,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}, },
self.peers.clone(), self.peers.clone(),
&failed_peers, &failed_peers,
&self.requests_per_peer,
) { ) {
Ok(request_id) => { Ok(request_id) => {
// inform the batch about the new request // inform the batch about the new request