Retry custody requests after peer metadata updates (#6975)

Closes https://github.com/sigp/lighthouse/issues/6895

We need sync to retry custody requests when a peer CGC updates. A higher CGC can result in a data column subnet peer count increasing from 0 to 1, allowing requests to happen.


  Add new sync event `SyncMessage::UpdatedPeerCgc`. It's sent by the router when a metadata response updates the known CGC
This commit is contained in:
Lion - dapplion
2025-05-09 05:27:17 -03:00
committed by GitHub
parent 4b9c16fc71
commit a497ec601c
5 changed files with 106 additions and 8 deletions

View File

@@ -712,8 +712,9 @@ impl<E: EthSpec> PeerManager<E> {
}
/// Received a metadata response from a peer.
pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData<E>) {
pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData<E>) -> bool {
let mut invalid_meta_data = false;
let mut updated_cgc = false;
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
if let Some(known_meta_data) = &peer_info.meta_data() {
@@ -729,12 +730,16 @@ impl<E: EthSpec> PeerManager<E> {
debug!(%peer_id, new_seq_no = meta_data.seq_number(), "Obtained peer's metadata");
}
let known_custody_group_count = peer_info
.meta_data()
.and_then(|meta_data| meta_data.custody_group_count().copied().ok());
let custody_group_count_opt = meta_data.custody_group_count().copied().ok();
peer_info.set_meta_data(meta_data);
if self.network_globals.spec.is_peer_das_scheduled() {
// Gracefully ignore metadata/v2 peers. Potentially downscore after PeerDAS to
// prioritize PeerDAS peers.
// Gracefully ignore metadata/v2 peers.
// We only send metadata v3 requests when PeerDAS is scheduled
if let Some(custody_group_count) = custody_group_count_opt {
match self.compute_peer_custody_groups(peer_id, custody_group_count) {
Ok(custody_groups) => {
@@ -755,6 +760,8 @@ impl<E: EthSpec> PeerManager<E> {
})
.collect();
peer_info.set_custody_subnets(custody_subnets);
updated_cgc = Some(custody_group_count) != known_custody_group_count;
}
Err(err) => {
debug!(
@@ -777,6 +784,8 @@ impl<E: EthSpec> PeerManager<E> {
if invalid_meta_data {
self.goodbye_peer(peer_id, GoodbyeReason::Fault, ReportSource::PeerManager)
}
updated_cgc
}
/// Updates the gossipsub scores for all known peers in gossipsub.
@@ -1487,6 +1496,15 @@ impl<E: EthSpec> PeerManager<E> {
pub fn remove_trusted_peer(&mut self, enr: Enr) {
self.trusted_peers.remove(&enr);
}
#[cfg(test)]
fn custody_subnet_count_for_peer(&self, peer_id: &PeerId) -> Option<usize> {
self.network_globals
.peers
.read()
.peer_info(peer_id)
.map(|peer_info| peer_info.custody_subnets_iter().count())
}
}
enum ConnectingType {
@@ -1507,8 +1525,9 @@ enum ConnectingType {
#[cfg(test)]
mod tests {
use super::*;
use crate::rpc::MetaDataV3;
use crate::NetworkConfig;
use types::MainnetEthSpec as E;
use types::{ChainSpec, ForkName, MainnetEthSpec as E};
async fn build_peer_manager(target_peer_count: usize) -> PeerManager<E> {
build_peer_manager_with_trusted_peers(vec![], target_peer_count).await
@@ -1517,6 +1536,15 @@ mod tests {
async fn build_peer_manager_with_trusted_peers(
trusted_peers: Vec<PeerId>,
target_peer_count: usize,
) -> PeerManager<E> {
let spec = Arc::new(E::default_spec());
build_peer_manager_with_opts(trusted_peers, target_peer_count, spec).await
}
async fn build_peer_manager_with_opts(
trusted_peers: Vec<PeerId>,
target_peer_count: usize,
spec: Arc<ChainSpec>,
) -> PeerManager<E> {
let config = config::Config {
target_peer_count,
@@ -1527,7 +1555,6 @@ mod tests {
target_peers: target_peer_count,
..Default::default()
});
let spec = Arc::new(E::default_spec());
let globals = NetworkGlobals::new_test_globals(trusted_peers, network_config, spec);
PeerManager::new(config, Arc::new(globals)).unwrap()
}
@@ -1878,6 +1905,44 @@ mod tests {
assert!(peers_should_have_removed.is_empty());
}
#[tokio::test]
/// Test a metadata response should update custody subnets
async fn test_peer_manager_update_custody_subnets() {
// PeerDAS is enabled from Fulu.
let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec()));
let mut peer_manager = build_peer_manager_with_opts(vec![], 1, spec).await;
let pubkey = Keypair::generate_secp256k1().public();
let peer_id = PeerId::from_public_key(&pubkey);
peer_manager.inject_connect_ingoing(
&peer_id,
Multiaddr::empty().with_p2p(peer_id).unwrap(),
None,
);
// A newly connected peer should have no custody subnets before metadata is received.
let custody_subnet_count = peer_manager.custody_subnet_count_for_peer(&peer_id);
assert_eq!(custody_subnet_count, Some(0));
// Metadata should update the custody subnets.
let peer_cgc = 4;
let meta_data = MetaData::V3(MetaDataV3 {
seq_number: 0,
attnets: Default::default(),
syncnets: Default::default(),
custody_group_count: peer_cgc,
});
let cgc_updated = peer_manager.meta_data_response(&peer_id, meta_data.clone());
assert!(cgc_updated);
let custody_subnet_count = peer_manager.custody_subnet_count_for_peer(&peer_id);
assert_eq!(custody_subnet_count, Some(peer_cgc as usize));
// Make another update and assert that CGC is not updated.
let cgc_updated = peer_manager.meta_data_response(&peer_id, meta_data);
assert!(!cgc_updated);
let custody_subnet_count = peer_manager.custody_subnet_count_for_peer(&peer_id);
assert_eq!(custody_subnet_count, Some(peer_cgc as usize));
}
#[tokio::test]
/// Test the pruning logic to remove grouped subnet peers
async fn test_peer_manager_prune_grouped_subnet_peers() {

View File

@@ -103,6 +103,8 @@ pub enum NetworkEvent<E: EthSpec> {
StatusPeer(PeerId),
NewListenAddr(Multiaddr),
ZeroListeners,
/// A peer has an updated custody group count from MetaData.
PeerUpdatedCustodyGroupCount(PeerId),
}
pub type Gossipsub = gossipsub::Behaviour<SnappyTransform, SubscriptionFilter>;
@@ -1655,7 +1657,7 @@ impl<E: EthSpec> Network<E> {
return None;
}
// The METADATA and PING RPC responses are handled within the behaviour and not propagated
// The PING RPC responses are handled within the behaviour and not propagated
match event.message {
Err(handler_err) => {
match handler_err {
@@ -1858,9 +1860,11 @@ impl<E: EthSpec> Network<E> {
None
}
RpcSuccessResponse::MetaData(meta_data) => {
self.peer_manager_mut()
let updated_cgc = self
.peer_manager_mut()
.meta_data_response(&peer_id, meta_data.as_ref().clone());
None
// Send event after calling into peer_manager so the PeerDB is updated.
updated_cgc.then(|| NetworkEvent::PeerUpdatedCustodyGroupCount(peer_id))
}
/* Network propagated protocols */
RpcSuccessResponse::Status(msg) => {

View File

@@ -73,6 +73,8 @@ pub enum RouterMessage<E: EthSpec> {
PubsubMessage(MessageId, PeerId, PubsubMessage<E>, bool),
/// The peer manager has requested we re-status a peer.
StatusPeer(PeerId),
/// The peer has an updated custody group count from METADATA.
PeerUpdatedCustodyGroupCount(PeerId),
}
impl<T: BeaconChainTypes> Router<T> {
@@ -155,6 +157,10 @@ impl<T: BeaconChainTypes> Router<T> {
RouterMessage::PeerDisconnected(peer_id) => {
self.send_to_sync(SyncMessage::Disconnect(peer_id));
}
// A peer has updated CGC
RouterMessage::PeerUpdatedCustodyGroupCount(peer_id) => {
self.send_to_sync(SyncMessage::UpdatedPeerCgc(peer_id));
}
RouterMessage::RPCRequestReceived {
peer_id,
inbound_request_id,

View File

@@ -485,6 +485,9 @@ impl<T: BeaconChainTypes> NetworkService<T> {
NetworkEvent::PeerDisconnected(peer_id) => {
self.send_to_router(RouterMessage::PeerDisconnected(peer_id));
}
NetworkEvent::PeerUpdatedCustodyGroupCount(peer_id) => {
self.send_to_router(RouterMessage::PeerUpdatedCustodyGroupCount(peer_id));
}
NetworkEvent::RequestReceived {
peer_id,
inbound_request_id,

View File

@@ -106,6 +106,9 @@ pub enum SyncMessage<E: EthSpec> {
head_slot: Option<Slot>,
},
/// Peer manager has received a MetaData of a peer with a new or updated CGC value.
UpdatedPeerCgc(PeerId),
/// A block has been received from the RPC.
RpcBlock {
sync_request_id: SyncRequestId,
@@ -476,6 +479,16 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
fn updated_peer_cgc(&mut self, _peer_id: PeerId) {
// Try to make progress on custody requests that are waiting for peers
for (id, result) in self.network.continue_custody_by_root_requests() {
self.on_custody_by_root_result(id, result);
}
// Attempt to resume range sync too
self.range_sync.resume(&mut self.network);
}
/// Handles RPC errors related to requests that were emitted from the sync manager.
fn inject_error(&mut self, peer_id: PeerId, sync_request_id: SyncRequestId, error: RPCError) {
trace!("Sync manager received a failed RPC");
@@ -748,6 +761,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} => {
self.add_peers_force_range_sync(&peers, head_root, head_slot);
}
SyncMessage::UpdatedPeerCgc(peer_id) => {
debug!(
peer_id = ?peer_id,
"Received updated peer CGC message"
);
self.updated_peer_cgc(peer_id);
}
SyncMessage::RpcBlock {
sync_request_id,
peer_id,