Maintain peers across all data column subnets (#7915)

Closes:
- #7865
- #7855

Changes extracted from earlier PR #7876

This PR fixes two main things with a few other improvements mentioned below:
- Prevent Lighthouse from repeatedly sending `DataColumnByRoot` requests to an unsynced peer, causing lookup sync to get stuck
- Allows Lighthouse to send discovery requests if there isn't enough **synced** peers in the required sampling subnets - this fixes the stuck sync scenario where there isn't enough usable peers in sampling subnet but no discovery is attempted.


  - Make peer discovery queries if custody subnet peer count drops below the minimum threshold
- Update peer pruning logic to prioritise uniform distribution across all data column subnets and avoid pruning sampling peers if the count is below the target threshold (2)
- Check sync status when making discovery requests, to make sure we don't ignore requests if there isn't enough synced peers in the required sampling subnets
- Optimise some of the `PeerDB` functions checking custody peers
- Only send lookup requests to peers that are synced or advanced
This commit is contained in:
Jimmy Chen
2025-09-04 15:36:20 +10:00
committed by GitHub
parent 76adedff27
commit c2a92f1a8c
6 changed files with 974 additions and 336 deletions

View File

@@ -1223,7 +1223,7 @@ impl<E: EthSpec> Discovery<E> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::rpc::methods::{MetaData, MetaDataV2}; use crate::rpc::methods::{MetaData, MetaDataV3};
use libp2p::identity::secp256k1; use libp2p::identity::secp256k1;
use types::{BitVector, MinimalEthSpec, SubnetId}; use types::{BitVector, MinimalEthSpec, SubnetId};
@@ -1248,10 +1248,11 @@ mod tests {
.unwrap(); .unwrap();
let globals = NetworkGlobals::new( let globals = NetworkGlobals::new(
enr, enr,
MetaData::V2(MetaDataV2 { MetaData::V3(MetaDataV3 {
seq_number: 0, seq_number: 0,
attnets: Default::default(), attnets: Default::default(),
syncnets: Default::default(), syncnets: Default::default(),
custody_group_count: spec.custody_requirement,
}), }),
vec![], vec![],
false, false,

File diff suppressed because it is too large Load Diff

View File

@@ -300,6 +300,7 @@ impl<E: EthSpec> PeerDB<E> {
.filter(move |(_, info)| { .filter(move |(_, info)| {
// We check both the metadata and gossipsub data as we only want to count long-lived subscribed peers // We check both the metadata and gossipsub data as we only want to count long-lived subscribed peers
info.is_connected() info.is_connected()
&& info.is_synced_or_advanced()
&& info.on_subnet_metadata(&subnet) && info.on_subnet_metadata(&subnet)
&& info.on_subnet_gossipsub(&subnet) && info.on_subnet_gossipsub(&subnet)
&& info.is_good_gossipsub_peer() && info.is_good_gossipsub_peer()
@@ -318,40 +319,69 @@ impl<E: EthSpec> PeerDB<E> {
.filter(move |(_, info)| { .filter(move |(_, info)| {
// The custody_subnets hashset can be populated via enr or metadata // The custody_subnets hashset can be populated via enr or metadata
let is_custody_subnet_peer = info.is_assigned_to_custody_subnet(&subnet); let is_custody_subnet_peer = info.is_assigned_to_custody_subnet(&subnet);
info.is_connected() && info.is_good_gossipsub_peer() && is_custody_subnet_peer info.is_connected()
&& info.is_good_gossipsub_peer()
&& is_custody_subnet_peer
&& info.is_synced_or_advanced()
}) })
.map(|(peer_id, _)| peer_id) .map(|(peer_id, _)| peer_id)
} }
/// Returns an iterator of all peers that are supposed to be custodying /// Checks if there is at least one good peer for each specified custody subnet for the given epoch.
/// the given subnet id. /// A "good" peer is one that is both connected and synced (or advanced) for the specified epoch.
pub fn good_range_sync_custody_subnet_peers( pub fn has_good_custody_range_sync_peer(
&self, &self,
subnet: DataColumnSubnetId, subnets: &HashSet<DataColumnSubnetId>,
) -> impl Iterator<Item = &PeerId> { epoch: Epoch,
self.peers
.iter()
.filter(move |(_, info)| {
// The custody_subnets hashset can be populated via enr or metadata
info.is_connected() && info.is_assigned_to_custody_subnet(&subnet)
})
.map(|(peer_id, _)| peer_id)
}
/// Returns `true` if the given peer is assigned to the given subnet.
/// else returns `false`
///
/// Returns `false` if peer doesn't exist in peerdb.
pub fn is_good_range_sync_custody_subnet_peer(
&self,
subnet: DataColumnSubnetId,
peer: &PeerId,
) -> bool { ) -> bool {
if let Some(info) = self.peers.get(peer) { let mut remaining_subnets = subnets.clone();
info.is_connected() && info.is_assigned_to_custody_subnet(&subnet)
} else { let good_sync_peers_for_epoch = self.peers.values().filter(|&info| {
info.is_connected()
&& match info.sync_status() {
SyncStatus::Synced { info } | SyncStatus::Advanced { info } => {
info.has_slot(epoch.end_slot(E::slots_per_epoch()))
}
SyncStatus::IrrelevantPeer
| SyncStatus::Behind { .. }
| SyncStatus::Unknown => false,
}
});
for info in good_sync_peers_for_epoch {
for subnet in info.custody_subnets_iter() {
if remaining_subnets.remove(subnet) && remaining_subnets.is_empty() {
return true;
}
}
}
false false
} }
/// Checks if there are sufficient good peers for a single custody subnet.
/// A "good" peer is one that is both connected and synced (or advanced).
pub fn has_good_peers_in_custody_subnet(
&self,
subnet: &DataColumnSubnetId,
target_peers: usize,
) -> bool {
let mut peer_count = 0usize;
for info in self
.peers
.values()
.filter(|info| info.is_connected() && info.is_synced_or_advanced())
{
if info.is_assigned_to_custody_subnet(subnet) {
peer_count += 1;
}
if peer_count >= target_peers {
return true;
}
}
false
} }
/// Gives the ids of all known disconnected peers. /// Gives the ids of all known disconnected peers.

View File

@@ -174,19 +174,6 @@ impl<E: EthSpec> PeerInfo<E> {
self.subnets.iter() self.subnets.iter()
} }
/// Returns the number of long lived subnets a peer is subscribed to.
// NOTE: This currently excludes sync committee subnets
pub fn long_lived_subnet_count(&self) -> usize {
if let Some(meta_data) = self.meta_data.as_ref() {
return meta_data.attnets().num_set_bits();
} else if let Some(enr) = self.enr.as_ref()
&& let Ok(attnets) = enr.attestation_bitfield::<E>()
{
return attnets.num_set_bits();
}
0
}
/// Returns an iterator over the long-lived subnets if it has any. /// Returns an iterator over the long-lived subnets if it has any.
pub fn long_lived_subnets(&self) -> Vec<Subnet> { pub fn long_lived_subnets(&self) -> Vec<Subnet> {
let mut long_lived_subnets = Vec::new(); let mut long_lived_subnets = Vec::new();
@@ -222,6 +209,13 @@ impl<E: EthSpec> PeerInfo<E> {
} }
} }
} }
long_lived_subnets.extend(
self.custody_subnets
.iter()
.map(|&id| Subnet::DataColumn(id)),
);
long_lived_subnets long_lived_subnets
} }
@@ -240,6 +234,11 @@ impl<E: EthSpec> PeerInfo<E> {
self.custody_subnets.iter() self.custody_subnets.iter()
} }
/// Returns the number of custody subnets this peer is assigned to.
pub fn custody_subnet_count(&self) -> usize {
self.custody_subnets.len()
}
/// Returns true if the peer is connected to a long-lived subnet. /// Returns true if the peer is connected to a long-lived subnet.
pub fn has_long_lived_subnet(&self) -> bool { pub fn has_long_lived_subnet(&self) -> bool {
// Check the meta_data // Check the meta_data
@@ -262,6 +261,17 @@ impl<E: EthSpec> PeerInfo<E> {
{ {
return true; return true;
} }
// Check if the peer has custody subnets populated and the peer is subscribed to any of
// its custody subnets
let subscribed_to_any_custody_subnets = self
.custody_subnets
.iter()
.any(|subnet_id| self.subnets.contains(&Subnet::DataColumn(*subnet_id)));
if subscribed_to_any_custody_subnets {
return true;
}
false false
} }
@@ -318,6 +328,14 @@ impl<E: EthSpec> PeerInfo<E> {
) )
} }
/// Checks if the peer is synced or advanced.
pub fn is_synced_or_advanced(&self) -> bool {
matches!(
self.sync_status,
SyncStatus::Synced { .. } | SyncStatus::Advanced { .. }
)
}
/// Checks if the status is connected. /// Checks if the status is connected.
pub fn is_dialing(&self) -> bool { pub fn is_dialing(&self) -> bool {
matches!(self.connection_status, PeerConnectionStatus::Dialing { .. }) matches!(self.connection_status, PeerConnectionStatus::Dialing { .. })
@@ -645,3 +663,50 @@ impl From<PeerConnectionStatus> for PeerState {
} }
} }
} }
#[cfg(test)]
mod tests {
use super::*;
use crate::types::Subnet;
use types::{DataColumnSubnetId, MainnetEthSpec};
type E = MainnetEthSpec;
fn create_test_peer_info() -> PeerInfo<E> {
PeerInfo::default()
}
#[test]
fn test_has_long_lived_subnet_empty_custody_subnets() {
let peer_info = create_test_peer_info();
// peer has no custody subnets or subscribed to any subnets hence return false
assert!(!peer_info.has_long_lived_subnet());
}
#[test]
fn test_has_long_lived_subnet_empty_subnets_with_custody_subnets() {
let mut peer_info = create_test_peer_info();
peer_info.custody_subnets.insert(DataColumnSubnetId::new(1));
peer_info.custody_subnets.insert(DataColumnSubnetId::new(2));
// Peer has custody subnets but isn't subscribed to any hence return false
assert!(!peer_info.has_long_lived_subnet());
}
#[test]
fn test_has_long_lived_subnet_subscribed_to_custody_subnets() {
let mut peer_info = create_test_peer_info();
peer_info.custody_subnets.insert(DataColumnSubnetId::new(1));
peer_info.custody_subnets.insert(DataColumnSubnetId::new(2));
peer_info.custody_subnets.insert(DataColumnSubnetId::new(3));
peer_info
.subnets
.insert(Subnet::DataColumn(DataColumnSubnetId::new(1)));
peer_info
.subnets
.insert(Subnet::DataColumn(DataColumnSubnetId::new(2)));
// Missing DataColumnSubnetId::new(3) - but peer is subscribed to some custody subnets
// Peer is subscribed to any custody subnets - return true
assert!(peer_info.has_long_lived_subnet());
}
}

View File

@@ -1120,13 +1120,12 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
.sampling_subnets() .sampling_subnets()
.iter() .iter()
.all(|subnet_id| { .all(|subnet_id| {
let peer_count = network let min_peer_count = 1;
network
.network_globals() .network_globals()
.peers .peers
.read() .read()
.good_range_sync_custody_subnet_peers(*subnet_id) .has_good_peers_in_custody_subnet(subnet_id, min_peer_count)
.count();
peer_count > 0
}) })
} else { } else {
true true

View File

@@ -1132,21 +1132,12 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
) -> bool { ) -> bool {
if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) { if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) {
// Require peers on all sampling column subnets before sending batches // Require peers on all sampling column subnets before sending batches
let sampling_subnets = network.network_globals().sampling_subnets();
network network
.network_globals() .network_globals()
.sampling_subnets()
.iter()
.all(|subnet_id| {
let peer_db = network.network_globals().peers.read();
let peer_count = self
.peers .peers
.iter() .read()
.filter(|peer| { .has_good_custody_range_sync_peer(&sampling_subnets, epoch)
peer_db.is_good_range_sync_custody_subnet_peer(*subnet_id, peer)
})
.count();
peer_count > 0
})
} else { } else {
true true
} }