Resolve TODO(das)

This commit is contained in:
dapplion
2025-05-27 14:13:31 -05:00
parent 86ad87eced
commit 52722b7b2e
8 changed files with 48 additions and 61 deletions

View File

@@ -948,9 +948,6 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
return Ok(());
}
Err(e) => match e {
// TODO(das): block_components_by_range requests can now hang out indefinitely.
// Is that fine? Maybe we should fail the requests from the network_context
// level without involving the BackfillSync itself.
RpcRequestSendError::InternalError(e) => {
// NOTE: under normal conditions this shouldn't happen but we handle it anyway
warn!(%batch_id, error = ?e, %batch,"Could not send batch request");

View File

@@ -724,7 +724,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// Collect all peers that sent a column that was invalid. Must
// run .unique as a single peer can send multiple invalid
// columns. Penalize once to avoid insta-bans
.flat_map(|(index, _)| peer_group.of_index((*index) as usize))
.flat_map(|(index, _)| peer_group.of_index(&(*index as usize)))
.unique()
.collect(),
_ => peer_group.all().collect(),

View File

@@ -48,7 +48,7 @@ use tokio::sync::mpsc;
use tracing::{debug, error, span, warn, Level};
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec,
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
ForkContext, Hash256, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
};
@@ -124,42 +124,41 @@ pub struct PeerGroup {
/// Peers group by which indexed section of the block component they served. For example:
/// - PeerA served = [blob index 0, blob index 2]
/// - PeerA served = [blob index 1]
peers: HashMap<PeerId, Vec<usize>>,
peers: HashMap<usize, PeerId>,
}
impl PeerGroup {
pub fn empty() -> Self {
Self {
peers: HashMap::new(),
}
}
/// Return a peer group where a single peer returned all parts of a block component. For
/// example, a block has a single component (the block = index 0/1).
pub fn from_single(peer: PeerId) -> Self {
Self {
peers: HashMap::from_iter([(peer, vec![0])]),
peers: HashMap::from_iter([(0, peer)]),
}
}
pub fn from_set(peers: HashMap<PeerId, Vec<usize>>) -> Self {
pub fn from_set(peer_to_indices: HashMap<PeerId, Vec<usize>>) -> Self {
let mut peers = HashMap::new();
for (peer, indices) in peer_to_indices {
for index in indices {
peers.insert(index, peer);
}
}
Self { peers }
}
pub fn all(&self) -> impl Iterator<Item = &PeerId> + '_ {
self.peers.keys()
self.peers.values()
}
pub fn of_index(&self, index: usize) -> impl Iterator<Item = &PeerId> + '_ {
self.peers.iter().filter_map(move |(peer, indices)| {
if indices.contains(&index) {
Some(peer)
} else {
None
}
})
pub fn of_index(&self, index: &usize) -> Option<&PeerId> {
self.peers.get(index)
}
pub fn as_reversed_map(&self) -> HashMap<u64, PeerId> {
// TODO(das): should we change PeerGroup to hold this map?
let mut index_to_peer = HashMap::<u64, PeerId>::new();
for (peer, indices) in self.peers.iter() {
for &index in indices {
index_to_peer.insert(index as u64, *peer);
}
}
index_to_peer
pub fn as_map(&self) -> &HashMap<usize, PeerId> {
&self.peers
}
}
@@ -953,7 +952,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&mut self,
parent_id: ComponentsByRangeRequestId,
blocks_with_data: Vec<SignedBeaconBlockHeader>,
epoch: Epoch,
request: BlocksByRangeRequest,
column_indices: Vec<ColumnIndex>,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
) -> Result<CustodyByRangeRequestId, RpcRequestSendError> {
@@ -970,7 +969,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let mut request = ActiveCustodyByRangeRequest::new(
id,
epoch,
request,
blocks_with_data,
&column_indices,
lookup_peers,

View File

@@ -144,7 +144,6 @@ impl<T: BeaconChainTypes> BlockComponentsByRangeRequest<T> {
else {
// When a peer disconnects and is removed from the SyncingChain peer set, if the set
// reaches zero the SyncingChain is removed.
// TODO(das): add test for this.
return Err(RpcRequestSendError::InternalError(
"A batch peer set should never be empty".to_string(),
));
@@ -270,8 +269,7 @@ impl<T: BeaconChainTypes> BlockComponentsByRangeRequest<T> {
.send_custody_by_range_request(
self.id,
blocks_with_data,
Slot::new(*self.request.start_slot())
.epoch(T::EthSpec::slots_per_epoch()),
self.request.clone(),
column_indices,
self.peers.clone(),
)
@@ -309,8 +307,7 @@ impl<T: BeaconChainTypes> BlockComponentsByRangeRequest<T> {
.copied()
.collect();
let peer_group =
BatchPeers::new(*block_peer, column_peers.as_reversed_map());
let peer_group = BatchPeers::new(*block_peer, column_peers.clone());
let rpc_blocks = couple_blocks_fulu(
blocks.to_vec(),
columns.to_vec(),

View File

@@ -3,7 +3,7 @@ use crate::sync::network_context::RpcResponseError;
use beacon_chain::validator_monitor::timestamp_now;
use beacon_chain::BeaconChainTypes;
use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::DataColumnsByRangeRequest;
use lighthouse_network::rpc::{methods::DataColumnsByRangeRequest, BlocksByRangeRequest};
use lighthouse_network::service::api_types::{
CustodyByRangeRequestId, DataColumnsByRangeRequestId,
};
@@ -16,8 +16,8 @@ use std::time::{Duration, Instant};
use std::{collections::HashMap, marker::PhantomData, sync::Arc};
use tracing::{debug, warn};
use types::{
data_column_sidecar::ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec,
Hash256, SignedBeaconBlockHeader, Slot,
data_column_sidecar::ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Hash256,
SignedBeaconBlockHeader, Slot,
};
use super::{PeerGroup, RpcResponseResult, SyncNetworkContext};
@@ -28,8 +28,7 @@ const REQUEST_EXPIRY_SECONDS: u64 = 300;
pub struct ActiveCustodyByRangeRequest<T: BeaconChainTypes> {
start_time: Instant,
id: CustodyByRangeRequestId,
// TODO(das): Pass a better type for the by_range request
epoch: Epoch,
request: BlocksByRangeRequest,
/// Blocks that we expect peers to serve data columns for
blocks_with_data: Vec<SignedBeaconBlockHeader>,
/// List of column indices this request needs to download to complete successfully
@@ -74,7 +73,7 @@ enum ColumnResponseError {
impl<T: BeaconChainTypes> ActiveCustodyByRangeRequest<T> {
pub(crate) fn new(
id: CustodyByRangeRequestId,
epoch: Epoch,
request: BlocksByRangeRequest,
blocks_with_data: Vec<SignedBeaconBlockHeader>,
column_indices: &[ColumnIndex],
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
@@ -82,7 +81,7 @@ impl<T: BeaconChainTypes> ActiveCustodyByRangeRequest<T> {
Self {
start_time: Instant::now(),
id,
epoch,
request,
blocks_with_data,
column_requests: HashMap::from_iter(
column_indices
@@ -350,7 +349,6 @@ impl<T: BeaconChainTypes> ActiveCustodyByRangeRequest<T> {
})
.collect::<Result<Vec<_>, _>>()?
// Flatten Vec<Vec<Columns>> to Vec<Columns>
// TODO(das): maybe not optimal for the coupling logic later
.into_iter()
.flatten()
.collect();
@@ -375,8 +373,9 @@ impl<T: BeaconChainTypes> ActiveCustodyByRangeRequest<T> {
return Err(Error::TooManyDownloadErrors(last_error));
}
// TODO(das): When is a fork and only a subset of your peers know about a block, we should
// only query the peers on that fork. Should this case be handled? How to handle it?
// TODO(das): We should only query peers that are likely to know about this block.
// For by_range requests, only peers in the SyncingChain peer set. Else consider a
// fallback to the peers that are synced up to the epoch we want to query.
let custodial_peers = cx.get_custodial_peers(*column_index);
// We draw from the total set of peers, but prioritize those peers who we have
@@ -433,12 +432,8 @@ impl<T: BeaconChainTypes> ActiveCustodyByRangeRequest<T> {
.send_data_columns_by_range_request(
peer_id,
DataColumnsByRangeRequest {
// TODO(das): generalize with constants from batch
start_slot: self
.epoch
.start_slot(T::EthSpec::slots_per_epoch())
.as_u64(),
count: T::EthSpec::slots_per_epoch(),
start_slot: *self.request.start_slot(),
count: *self.request.count(),
columns: indices.clone(),
},
self.id,

View File

@@ -21,7 +21,8 @@ use super::{LookupRequestResult, PeerGroup, RpcResponseResult, SyncNetworkContex
const FAILED_PEERS_CACHE_EXPIRY_SECONDS: u64 = 5;
const REQUEST_EXPIRY_SECONDS: u64 = 300;
/// TODO(das): this attempt count is nested into the existing lookup request count.
/// TODO(das): Reconsider this retry count, it was choosen as a placeholder value. Each
/// `custody_by_*` request is already retried multiple inside of a lookup or batch
const MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS: usize = 3;
pub struct ActiveCustodyByRootRequest<T: BeaconChainTypes> {

View File

@@ -1,9 +1,10 @@
use crate::sync::network_context::PeerGroup;
use beacon_chain::block_verification_types::RpcBlock;
use itertools::Itertools;
use lighthouse_network::rpc::methods::BlocksByRangeRequest;
use lighthouse_network::service::api_types::Id;
use lighthouse_network::PeerId;
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::ops::Sub;
@@ -22,17 +23,17 @@ const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3;
#[derive(Clone, Debug)]
pub struct BatchPeers {
block_peer: PeerId,
column_peers: HashMap<ColumnIndex, PeerId>,
column_peers: PeerGroup,
}
impl BatchPeers {
pub fn new_from_block_peer(block_peer: PeerId) -> Self {
Self {
block_peer,
column_peers: <_>::default(),
column_peers: PeerGroup::empty(),
}
}
pub fn new(block_peer: PeerId, column_peers: HashMap<ColumnIndex, PeerId>) -> Self {
pub fn new(block_peer: PeerId, column_peers: PeerGroup) -> Self {
Self {
block_peer,
column_peers,
@@ -44,12 +45,12 @@ impl BatchPeers {
}
pub fn column(&self, index: &ColumnIndex) -> Option<&PeerId> {
self.column_peers.get(index)
self.column_peers.of_index(&((*index) as usize))
}
pub fn iter_unique_peers(&self) -> impl Iterator<Item = &PeerId> {
std::iter::once(&self.block_peer)
.chain(self.column_peers.values())
.chain(self.column_peers.all())
.unique()
}
}

View File

@@ -188,8 +188,6 @@ struct CompleteConfig {
}
impl CompleteConfig {
// TODO(das): add tests where blocks don't have data
fn custody_failure_at_index(mut self, index: u64) -> Self {
self.custody_failure_at_index = Some(index);
self
@@ -1192,15 +1190,14 @@ fn finalized_sync_permanent_custody_peer_failure() {
// Find the requests first to assert that this is the only request that exists
r.expect_no_data_columns_by_range_requests(filter().epoch(0));
// complete this one request without the custody failure now
r.complete_data_by_range_request(
reqs,
complete().custody_failure_at_index(column_index_to_fail),
);
}
// TODO(das): send batch 1 for completing processing and check that SyncingChain processed batch
// 1 successfully
// custody_by_range request is still active waiting for a new peer to connect
r.expect_active_block_components_by_range_request_on_custody_step();
}
#[test]