Make single block lookup respect earliest_available_slot for column requests (#9447)

Single block lookups do not respect the `earliest_available_slot` peers sent. This causes us to potentially request columns from peers that do not custody columns yet (but will soon).


  Pass down the block's slot and only consider peers where `earliest_available_slot <= block_slot` for custody column requests.


Co-Authored-By: Daniel Knopik <daniel@dknopik.de>
This commit is contained in:
Daniel Knopik
2026-06-17 01:54:00 +02:00
committed by GitHub
parent e0ff3b5709
commit 9de2e9e6e1
5 changed files with 121 additions and 33 deletions

View File

@@ -257,17 +257,9 @@ impl<E: EthSpec> PeerDB<E> {
.iter() .iter()
.filter(move |(_, info)| { .filter(move |(_, info)| {
info.is_connected() info.is_connected()
&& match info.sync_status() { && info.is_synced_or_advanced_with_available_slot(
SyncStatus::Synced { info } => { epoch.start_slot(E::slots_per_epoch()),
info.has_slot(epoch.start_slot(E::slots_per_epoch())) )
}
SyncStatus::Advanced { info } => {
info.has_slot(epoch.start_slot(E::slots_per_epoch()))
}
SyncStatus::IrrelevantPeer
| SyncStatus::Behind { .. }
| SyncStatus::Unknown => false,
}
}) })
.map(|(peer_id, _)| peer_id) .map(|(peer_id, _)| peer_id)
} }
@@ -301,10 +293,11 @@ impl<E: EthSpec> PeerDB<E> {
} }
/// Returns an iterator of all good gossipsub peers that are supposed to be custodying /// Returns an iterator of all good gossipsub peers that are supposed to be custodying
/// the given subnet id. /// the given subnet id, with data available at the given slot.
pub fn good_custody_subnet_peer( pub fn good_custody_subnet_peer(
&self, &self,
subnet: DataColumnSubnetId, subnet: DataColumnSubnetId,
slot: Slot,
) -> impl Iterator<Item = &PeerId> { ) -> impl Iterator<Item = &PeerId> {
self.peers self.peers
.iter() .iter()
@@ -314,7 +307,7 @@ impl<E: EthSpec> PeerDB<E> {
info.is_connected() info.is_connected()
&& info.is_good_gossipsub_peer() && info.is_good_gossipsub_peer()
&& is_custody_subnet_peer && is_custody_subnet_peer
&& info.is_synced_or_advanced() && info.is_synced_or_advanced_with_available_slot(slot)
}) })
.map(|(peer_id, _)| peer_id) .map(|(peer_id, _)| peer_id)
} }
@@ -330,14 +323,9 @@ impl<E: EthSpec> PeerDB<E> {
let good_sync_peers_for_epoch = self.peers.values().filter(|&info| { let good_sync_peers_for_epoch = self.peers.values().filter(|&info| {
info.is_connected() info.is_connected()
&& match info.sync_status() { && info.is_synced_or_advanced_with_available_slot(
SyncStatus::Synced { info } | SyncStatus::Advanced { info } => { epoch.start_slot(E::slots_per_epoch()),
info.has_slot(epoch.start_slot(E::slots_per_epoch())) )
}
SyncStatus::IrrelevantPeer
| SyncStatus::Behind { .. }
| SyncStatus::Unknown => false,
}
}); });
for info in good_sync_peers_for_epoch { for info in good_sync_peers_for_epoch {
@@ -2211,6 +2199,89 @@ mod tests {
); );
} }
#[test]
fn test_good_custody_subnet_peer_respects_earliest_available_slot() {
let mut pdb = get_db();
let subnet = DataColumnSubnetId::new(0);
let request_slot = Slot::new(10);
fn sync_info(earliest_available_slot: Option<Slot>) -> SyncInfo {
SyncInfo {
head_slot: Slot::new(100),
head_root: Hash256::ZERO,
finalized_epoch: Epoch::new(0),
finalized_root: Hash256::ZERO,
earliest_available_slot,
}
}
let add_custody_peer = |pdb: &mut PeerDB<M>, sync_status: SyncStatus| {
let peer_id = PeerId::random();
pdb.connect_ingoing(&peer_id, "/ip4/0.0.0.0".parse().unwrap(), None);
pdb.__set_custody_subnets(&peer_id, HashSet::from([subnet]))
.unwrap();
pdb.update_sync_status(&peer_id, sync_status);
peer_id
};
let peer_with_data = add_custody_peer(
&mut pdb,
SyncStatus::Synced {
info: sync_info(Some(Slot::new(5))),
},
);
let peer_at_boundary = add_custody_peer(
&mut pdb,
SyncStatus::Synced {
info: sync_info(Some(request_slot)),
},
);
let peer_pruned = add_custody_peer(
&mut pdb,
SyncStatus::Synced {
info: sync_info(Some(Slot::new(11))),
},
);
let peer_no_eas = add_custody_peer(
&mut pdb,
SyncStatus::Synced {
info: sync_info(None),
},
);
let peer_behind = add_custody_peer(
&mut pdb,
SyncStatus::Behind {
info: sync_info(Some(Slot::new(0))),
},
);
let good_peers = pdb
.good_custody_subnet_peer(subnet, request_slot)
.copied()
.collect::<HashSet<_>>();
assert!(
good_peers.contains(&peer_with_data),
"peer with earliest_available_slot before the request slot should be returned"
);
assert!(
good_peers.contains(&peer_at_boundary),
"peer with earliest_available_slot equal to the request slot should be returned"
);
assert!(
!good_peers.contains(&peer_pruned),
"peer with earliest_available_slot after the request slot should be excluded"
);
assert!(
good_peers.contains(&peer_no_eas),
"peer without an advertised earliest_available_slot should be returned"
);
assert!(
!good_peers.contains(&peer_behind),
"behind peer should be excluded regardless of earliest_available_slot"
);
}
#[test] #[test]
fn test_disable_peer_scoring() { fn test_disable_peer_scoring() {
let peer = PeerId::random(); let peer = PeerId::random();

View File

@@ -15,7 +15,7 @@ use std::collections::HashSet;
use std::net::IpAddr; use std::net::IpAddr;
use std::time::Instant; use std::time::Instant;
use strum::AsRefStr; use strum::AsRefStr;
use types::{DataColumnSubnetId, EthSpec}; use types::{DataColumnSubnetId, EthSpec, Slot};
/// Information about a given connected peer. /// Information about a given connected peer.
#[derive(Clone, Debug, Serialize)] #[derive(Clone, Debug, Serialize)]
@@ -339,6 +339,14 @@ impl<E: EthSpec> PeerInfo<E> {
) )
} }
/// Checks if the peer is synced or advanced, and has data available for the given slot.
pub fn is_synced_or_advanced_with_available_slot(&self, slot: Slot) -> bool {
match &self.sync_status {
SyncStatus::Synced { info } | SyncStatus::Advanced { info } => info.has_slot(slot),
SyncStatus::IrrelevantPeer | SyncStatus::Behind { .. } | SyncStatus::Unknown => false,
}
}
/// Checks if the status is connected. /// Checks if the status is connected.
pub fn is_dialing(&self) -> bool { pub fn is_dialing(&self) -> bool {
matches!(self.connection_status, PeerConnectionStatus::Dialing { .. }) matches!(self.connection_status, PeerConnectionStatus::Dialing { .. })

View File

@@ -11,7 +11,7 @@ use std::collections::HashSet;
use std::sync::Arc; use std::sync::Arc;
use tracing::{debug, error}; use tracing::{debug, error};
use types::data::{compute_subnets_from_custody_group, get_custody_groups}; use types::data::{compute_subnets_from_custody_group, get_custody_groups};
use types::{ChainSpec, ColumnIndex, DataColumnSubnetId, EthSpec}; use types::{ChainSpec, ColumnIndex, DataColumnSubnetId, EthSpec, Slot};
pub struct NetworkGlobals<E: EthSpec> { pub struct NetworkGlobals<E: EthSpec> {
/// The current local ENR. /// The current local ENR.
@@ -196,14 +196,19 @@ impl<E: EthSpec> NetworkGlobals<E> {
/// Returns a connected peer that: /// Returns a connected peer that:
/// 1. is connected /// 1. is connected
/// 2. assigned to custody the column based on it's `custody_subnet_count` from ENR or metadata /// 2. assigned to custody the column based on it's `custody_subnet_count` from ENR or metadata
/// 3. has a good score /// 3. has data available past the given slot
pub fn custody_peers_for_column(&self, column_index: ColumnIndex) -> Vec<PeerId> { /// 4. has a good score
pub fn custody_peers_for_column(
&self,
column_index: ColumnIndex,
block_slot: Slot,
) -> Vec<PeerId> {
self.peers self.peers
.read() .read()
.good_custody_subnet_peer(DataColumnSubnetId::from_column_index( .good_custody_subnet_peer(
column_index, DataColumnSubnetId::from_column_index(column_index, &self.spec),
&self.spec, block_slot,
)) )
.cloned() .cloned()
.collect::<Vec<_>>() .collect::<Vec<_>>()
} }

View File

@@ -397,9 +397,9 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.collect() .collect()
} }
pub fn get_custodial_peers(&self, column_index: ColumnIndex) -> Vec<PeerId> { pub fn get_custodial_peers(&self, column_index: ColumnIndex, block_slot: Slot) -> Vec<PeerId> {
self.network_globals() self.network_globals()
.custody_peers_for_column(column_index) .custody_peers_for_column(column_index, block_slot)
} }
pub fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> { pub fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
@@ -1161,6 +1161,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let requester = CustodyRequester(id); let requester = CustodyRequester(id);
let mut request = ActiveCustodyRequest::new( let mut request = ActiveCustodyRequest::new(
block_root, block_root,
block_slot,
CustodyId { requester }, CustodyId { requester },
&custody_indexes_to_fetch, &custody_indexes_to_fetch,
lookup_peers, lookup_peers,

View File

@@ -13,7 +13,7 @@ use std::hash::{BuildHasher, RandomState};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::{collections::HashMap, marker::PhantomData, sync::Arc}; use std::{collections::HashMap, marker::PhantomData, sync::Arc};
use tracing::{Span, debug, debug_span, warn}; use tracing::{Span, debug, debug_span, warn};
use types::{DataColumnSidecar, Hash256, data::ColumnIndex}; use types::{DataColumnSidecar, Hash256, Slot, data::ColumnIndex};
use types::{DataColumnSidecarList, EthSpec}; use types::{DataColumnSidecarList, EthSpec};
use super::{LookupRequestResult, PeerGroup, RpcResponseResult, SyncNetworkContext}; use super::{LookupRequestResult, PeerGroup, RpcResponseResult, SyncNetworkContext};
@@ -22,6 +22,7 @@ const MAX_STALE_NO_PEERS_DURATION: Duration = Duration::from_secs(30);
pub struct ActiveCustodyRequest<T: BeaconChainTypes> { pub struct ActiveCustodyRequest<T: BeaconChainTypes> {
block_root: Hash256, block_root: Hash256,
block_slot: Slot,
custody_id: CustodyId, custody_id: CustodyId,
/// List of column indices this request needs to download to complete successfully /// List of column indices this request needs to download to complete successfully
column_requests: FnvHashMap<ColumnIndex, ColumnRequest<T::EthSpec>>, column_requests: FnvHashMap<ColumnIndex, ColumnRequest<T::EthSpec>>,
@@ -62,6 +63,7 @@ pub type CustodyRequestResult<E> = Result<Option<DownloadResult<DataColumnSideca
impl<T: BeaconChainTypes> ActiveCustodyRequest<T> { impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
pub(crate) fn new( pub(crate) fn new(
block_root: Hash256, block_root: Hash256,
block_slot: Slot,
custody_id: CustodyId, custody_id: CustodyId,
column_indices: &[ColumnIndex], column_indices: &[ColumnIndex],
lookup_peers: Arc<RwLock<HashSet<PeerId>>>, lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
@@ -73,6 +75,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
); );
Self { Self {
block_root, block_root,
block_slot,
custody_id, custody_id,
column_requests: HashMap::from_iter( column_requests: HashMap::from_iter(
column_indices column_indices
@@ -365,7 +368,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
// We draw from the total set of peers, but prioritize those peers who we have // We draw from the total set of peers, but prioritize those peers who we have
// received an attestation or a block from (`lookup_peers`), as the `lookup_peers` may take // received an attestation or a block from (`lookup_peers`), as the `lookup_peers` may take
// time to build up and we are likely to not find any column peers initially. // time to build up and we are likely to not find any column peers initially.
let custodial_peers = cx.get_custodial_peers(column_index); let custodial_peers = cx.get_custodial_peers(column_index, self.block_slot);
let mut prioritized_peers = custodial_peers let mut prioritized_peers = custodial_peers
.iter() .iter()
.filter(|peer| { .filter(|peer| {