diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 0a338bb011..693fdebb69 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -257,17 +257,9 @@ impl PeerDB { .iter() .filter(move |(_, info)| { info.is_connected() - && match info.sync_status() { - SyncStatus::Synced { info } => { - info.has_slot(epoch.start_slot(E::slots_per_epoch())) - } - SyncStatus::Advanced { info } => { - info.has_slot(epoch.start_slot(E::slots_per_epoch())) - } - SyncStatus::IrrelevantPeer - | SyncStatus::Behind { .. } - | SyncStatus::Unknown => false, - } + && info.is_synced_or_advanced_with_available_slot( + epoch.start_slot(E::slots_per_epoch()), + ) }) .map(|(peer_id, _)| peer_id) } @@ -301,10 +293,11 @@ impl PeerDB { } /// Returns an iterator of all good gossipsub peers that are supposed to be custodying - /// the given subnet id. + /// the given subnet id, with data available at the given slot. pub fn good_custody_subnet_peer( &self, subnet: DataColumnSubnetId, + slot: Slot, ) -> impl Iterator { self.peers .iter() @@ -314,7 +307,7 @@ impl PeerDB { info.is_connected() && info.is_good_gossipsub_peer() && is_custody_subnet_peer - && info.is_synced_or_advanced() + && info.is_synced_or_advanced_with_available_slot(slot) }) .map(|(peer_id, _)| peer_id) } @@ -330,14 +323,9 @@ impl PeerDB { let good_sync_peers_for_epoch = self.peers.values().filter(|&info| { info.is_connected() - && match info.sync_status() { - SyncStatus::Synced { info } | SyncStatus::Advanced { info } => { - info.has_slot(epoch.start_slot(E::slots_per_epoch())) - } - SyncStatus::IrrelevantPeer - | SyncStatus::Behind { .. } - | SyncStatus::Unknown => false, - } + && info.is_synced_or_advanced_with_available_slot( + epoch.start_slot(E::slots_per_epoch()), + ) }); for info in good_sync_peers_for_epoch { @@ -2211,6 +2199,89 @@ mod tests { ); } + #[test] + fn test_good_custody_subnet_peer_respects_earliest_available_slot() { + let mut pdb = get_db(); + let subnet = DataColumnSubnetId::new(0); + let request_slot = Slot::new(10); + + fn sync_info(earliest_available_slot: Option) -> SyncInfo { + SyncInfo { + head_slot: Slot::new(100), + head_root: Hash256::ZERO, + finalized_epoch: Epoch::new(0), + finalized_root: Hash256::ZERO, + earliest_available_slot, + } + } + + let add_custody_peer = |pdb: &mut PeerDB, sync_status: SyncStatus| { + let peer_id = PeerId::random(); + pdb.connect_ingoing(&peer_id, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.__set_custody_subnets(&peer_id, HashSet::from([subnet])) + .unwrap(); + pdb.update_sync_status(&peer_id, sync_status); + peer_id + }; + + let peer_with_data = add_custody_peer( + &mut pdb, + SyncStatus::Synced { + info: sync_info(Some(Slot::new(5))), + }, + ); + let peer_at_boundary = add_custody_peer( + &mut pdb, + SyncStatus::Synced { + info: sync_info(Some(request_slot)), + }, + ); + let peer_pruned = add_custody_peer( + &mut pdb, + SyncStatus::Synced { + info: sync_info(Some(Slot::new(11))), + }, + ); + let peer_no_eas = add_custody_peer( + &mut pdb, + SyncStatus::Synced { + info: sync_info(None), + }, + ); + let peer_behind = add_custody_peer( + &mut pdb, + SyncStatus::Behind { + info: sync_info(Some(Slot::new(0))), + }, + ); + + let good_peers = pdb + .good_custody_subnet_peer(subnet, request_slot) + .copied() + .collect::>(); + + assert!( + good_peers.contains(&peer_with_data), + "peer with earliest_available_slot before the request slot should be returned" + ); + assert!( + good_peers.contains(&peer_at_boundary), + "peer with earliest_available_slot equal to the request slot should be returned" + ); + assert!( + !good_peers.contains(&peer_pruned), + "peer with earliest_available_slot after the request slot should be excluded" + ); + assert!( + good_peers.contains(&peer_no_eas), + "peer without an advertised earliest_available_slot should be returned" + ); + assert!( + !good_peers.contains(&peer_behind), + "behind peer should be excluded regardless of earliest_available_slot" + ); + } + #[test] fn test_disable_peer_scoring() { let peer = PeerId::random(); diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs index 8ad7d10a88..658a6355e3 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs @@ -15,7 +15,7 @@ use std::collections::HashSet; use std::net::IpAddr; use std::time::Instant; use strum::AsRefStr; -use types::{DataColumnSubnetId, EthSpec}; +use types::{DataColumnSubnetId, EthSpec, Slot}; /// Information about a given connected peer. #[derive(Clone, Debug, Serialize)] @@ -339,6 +339,14 @@ impl PeerInfo { ) } + /// Checks if the peer is synced or advanced, and has data available for the given slot. + pub fn is_synced_or_advanced_with_available_slot(&self, slot: Slot) -> bool { + match &self.sync_status { + SyncStatus::Synced { info } | SyncStatus::Advanced { info } => info.has_slot(slot), + SyncStatus::IrrelevantPeer | SyncStatus::Behind { .. } | SyncStatus::Unknown => false, + } + } + /// Checks if the status is connected. pub fn is_dialing(&self) -> bool { matches!(self.connection_status, PeerConnectionStatus::Dialing { .. }) diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index df8dbdc559..1f770a5847 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -11,7 +11,7 @@ use std::collections::HashSet; use std::sync::Arc; use tracing::{debug, error}; use types::data::{compute_subnets_from_custody_group, get_custody_groups}; -use types::{ChainSpec, ColumnIndex, DataColumnSubnetId, EthSpec}; +use types::{ChainSpec, ColumnIndex, DataColumnSubnetId, EthSpec, Slot}; pub struct NetworkGlobals { /// The current local ENR. @@ -196,14 +196,19 @@ impl NetworkGlobals { /// Returns a connected peer that: /// 1. is connected /// 2. assigned to custody the column based on it's `custody_subnet_count` from ENR or metadata - /// 3. has a good score - pub fn custody_peers_for_column(&self, column_index: ColumnIndex) -> Vec { + /// 3. has data available past the given slot + /// 4. has a good score + pub fn custody_peers_for_column( + &self, + column_index: ColumnIndex, + block_slot: Slot, + ) -> Vec { self.peers .read() - .good_custody_subnet_peer(DataColumnSubnetId::from_column_index( - column_index, - &self.spec, - )) + .good_custody_subnet_peer( + DataColumnSubnetId::from_column_index(column_index, &self.spec), + block_slot, + ) .cloned() .collect::>() } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 8e8abd4fa6..d2ced9fd9d 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -397,9 +397,9 @@ impl SyncNetworkContext { .collect() } - pub fn get_custodial_peers(&self, column_index: ColumnIndex) -> Vec { + pub fn get_custodial_peers(&self, column_index: ColumnIndex, block_slot: Slot) -> Vec { self.network_globals() - .custody_peers_for_column(column_index) + .custody_peers_for_column(column_index, block_slot) } pub fn network_globals(&self) -> &NetworkGlobals { @@ -1161,6 +1161,7 @@ impl SyncNetworkContext { let requester = CustodyRequester(id); let mut request = ActiveCustodyRequest::new( block_root, + block_slot, CustodyId { requester }, &custody_indexes_to_fetch, lookup_peers, diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index b1a4b52867..3518eecf09 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -13,7 +13,7 @@ use std::hash::{BuildHasher, RandomState}; use std::time::{Duration, Instant}; use std::{collections::HashMap, marker::PhantomData, sync::Arc}; use tracing::{Span, debug, debug_span, warn}; -use types::{DataColumnSidecar, Hash256, data::ColumnIndex}; +use types::{DataColumnSidecar, Hash256, Slot, data::ColumnIndex}; use types::{DataColumnSidecarList, EthSpec}; use super::{LookupRequestResult, PeerGroup, RpcResponseResult, SyncNetworkContext}; @@ -22,6 +22,7 @@ const MAX_STALE_NO_PEERS_DURATION: Duration = Duration::from_secs(30); pub struct ActiveCustodyRequest { block_root: Hash256, + block_slot: Slot, custody_id: CustodyId, /// List of column indices this request needs to download to complete successfully column_requests: FnvHashMap>, @@ -62,6 +63,7 @@ pub type CustodyRequestResult = Result ActiveCustodyRequest { pub(crate) fn new( block_root: Hash256, + block_slot: Slot, custody_id: CustodyId, column_indices: &[ColumnIndex], lookup_peers: Arc>>, @@ -73,6 +75,7 @@ impl ActiveCustodyRequest { ); Self { block_root, + block_slot, custody_id, column_requests: HashMap::from_iter( column_indices @@ -365,7 +368,7 @@ impl ActiveCustodyRequest { // We draw from the total set of peers, but prioritize those peers who we have // received an attestation or a block from (`lookup_peers`), as the `lookup_peers` may take // time to build up and we are likely to not find any column peers initially. - let custodial_peers = cx.get_custodial_peers(column_index); + let custodial_peers = cx.get_custodial_peers(column_index, self.block_slot); let mut prioritized_peers = custodial_peers .iter() .filter(|peer| {