From 52722b7b2ee627b76af82ee7357437b01e6ea2c0 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Tue, 27 May 2025 14:13:31 -0500 Subject: [PATCH] Resolve TODO(das) --- .../network/src/sync/backfill_sync/mod.rs | 3 -- .../network/src/sync/block_lookups/mod.rs | 2 +- .../network/src/sync/network_context.rs | 47 +++++++++---------- .../block_components_by_range.rs | 7 +-- .../sync/network_context/custody_by_range.rs | 27 +++++------ .../sync/network_context/custody_by_root.rs | 3 +- .../network/src/sync/range_sync/batch.rs | 13 ++--- beacon_node/network/src/sync/tests/range.rs | 7 +-- 8 files changed, 48 insertions(+), 61 deletions(-) diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 45b9c61641..e4bf1d93ef 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -948,9 +948,6 @@ impl BackFillSync { return Ok(()); } Err(e) => match e { - // TODO(das): block_components_by_range requests can now hang out indefinitely. - // Is that fine? Maybe we should fail the requests from the network_context - // level without involving the BackfillSync itself. RpcRequestSendError::InternalError(e) => { // NOTE: under normal conditions this shouldn't happen but we handle it anyway warn!(%batch_id, error = ?e, %batch,"Could not send batch request"); diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 2c59f710d0..f676068326 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -724,7 +724,7 @@ impl BlockLookups { // Collect all peers that sent a column that was invalid. Must // run .unique as a single peer can send multiple invalid // columns. Penalize once to avoid insta-bans - .flat_map(|(index, _)| peer_group.of_index((*index) as usize)) + .flat_map(|(index, _)| peer_group.of_index(&(*index as usize))) .unique() .collect(), _ => peer_group.all().collect(), diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index f4db7e2256..61f223d938 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -48,7 +48,7 @@ use tokio::sync::mpsc; use tracing::{debug, error, span, warn, Level}; use types::blob_sidecar::FixedBlobSidecarList; use types::{ - BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, + BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, ForkContext, Hash256, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, }; @@ -124,42 +124,41 @@ pub struct PeerGroup { /// Peers group by which indexed section of the block component they served. For example: /// - PeerA served = [blob index 0, blob index 2] /// - PeerA served = [blob index 1] - peers: HashMap>, + peers: HashMap, } impl PeerGroup { + pub fn empty() -> Self { + Self { + peers: HashMap::new(), + } + } + /// Return a peer group where a single peer returned all parts of a block component. For /// example, a block has a single component (the block = index 0/1). pub fn from_single(peer: PeerId) -> Self { Self { - peers: HashMap::from_iter([(peer, vec![0])]), + peers: HashMap::from_iter([(0, peer)]), } } - pub fn from_set(peers: HashMap>) -> Self { + pub fn from_set(peer_to_indices: HashMap>) -> Self { + let mut peers = HashMap::new(); + for (peer, indices) in peer_to_indices { + for index in indices { + peers.insert(index, peer); + } + } Self { peers } } pub fn all(&self) -> impl Iterator + '_ { - self.peers.keys() + self.peers.values() } - pub fn of_index(&self, index: usize) -> impl Iterator + '_ { - self.peers.iter().filter_map(move |(peer, indices)| { - if indices.contains(&index) { - Some(peer) - } else { - None - } - }) + pub fn of_index(&self, index: &usize) -> Option<&PeerId> { + self.peers.get(index) } - pub fn as_reversed_map(&self) -> HashMap { - // TODO(das): should we change PeerGroup to hold this map? - let mut index_to_peer = HashMap::::new(); - for (peer, indices) in self.peers.iter() { - for &index in indices { - index_to_peer.insert(index as u64, *peer); - } - } - index_to_peer + pub fn as_map(&self) -> &HashMap { + &self.peers } } @@ -953,7 +952,7 @@ impl SyncNetworkContext { &mut self, parent_id: ComponentsByRangeRequestId, blocks_with_data: Vec, - epoch: Epoch, + request: BlocksByRangeRequest, column_indices: Vec, lookup_peers: Arc>>, ) -> Result { @@ -970,7 +969,7 @@ impl SyncNetworkContext { let mut request = ActiveCustodyByRangeRequest::new( id, - epoch, + request, blocks_with_data, &column_indices, lookup_peers, diff --git a/beacon_node/network/src/sync/network_context/block_components_by_range.rs b/beacon_node/network/src/sync/network_context/block_components_by_range.rs index 00f64f2e39..fc08bcdb9c 100644 --- a/beacon_node/network/src/sync/network_context/block_components_by_range.rs +++ b/beacon_node/network/src/sync/network_context/block_components_by_range.rs @@ -144,7 +144,6 @@ impl BlockComponentsByRangeRequest { else { // When a peer disconnects and is removed from the SyncingChain peer set, if the set // reaches zero the SyncingChain is removed. - // TODO(das): add test for this. return Err(RpcRequestSendError::InternalError( "A batch peer set should never be empty".to_string(), )); @@ -270,8 +269,7 @@ impl BlockComponentsByRangeRequest { .send_custody_by_range_request( self.id, blocks_with_data, - Slot::new(*self.request.start_slot()) - .epoch(T::EthSpec::slots_per_epoch()), + self.request.clone(), column_indices, self.peers.clone(), ) @@ -309,8 +307,7 @@ impl BlockComponentsByRangeRequest { .copied() .collect(); - let peer_group = - BatchPeers::new(*block_peer, column_peers.as_reversed_map()); + let peer_group = BatchPeers::new(*block_peer, column_peers.clone()); let rpc_blocks = couple_blocks_fulu( blocks.to_vec(), columns.to_vec(), diff --git a/beacon_node/network/src/sync/network_context/custody_by_range.rs b/beacon_node/network/src/sync/network_context/custody_by_range.rs index 6b4d233188..18dea2070f 100644 --- a/beacon_node/network/src/sync/network_context/custody_by_range.rs +++ b/beacon_node/network/src/sync/network_context/custody_by_range.rs @@ -3,7 +3,7 @@ use crate::sync::network_context::RpcResponseError; use beacon_chain::validator_monitor::timestamp_now; use beacon_chain::BeaconChainTypes; use fnv::FnvHashMap; -use lighthouse_network::rpc::methods::DataColumnsByRangeRequest; +use lighthouse_network::rpc::{methods::DataColumnsByRangeRequest, BlocksByRangeRequest}; use lighthouse_network::service::api_types::{ CustodyByRangeRequestId, DataColumnsByRangeRequestId, }; @@ -16,8 +16,8 @@ use std::time::{Duration, Instant}; use std::{collections::HashMap, marker::PhantomData, sync::Arc}; use tracing::{debug, warn}; use types::{ - data_column_sidecar::ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, - Hash256, SignedBeaconBlockHeader, Slot, + data_column_sidecar::ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Hash256, + SignedBeaconBlockHeader, Slot, }; use super::{PeerGroup, RpcResponseResult, SyncNetworkContext}; @@ -28,8 +28,7 @@ const REQUEST_EXPIRY_SECONDS: u64 = 300; pub struct ActiveCustodyByRangeRequest { start_time: Instant, id: CustodyByRangeRequestId, - // TODO(das): Pass a better type for the by_range request - epoch: Epoch, + request: BlocksByRangeRequest, /// Blocks that we expect peers to serve data columns for blocks_with_data: Vec, /// List of column indices this request needs to download to complete successfully @@ -74,7 +73,7 @@ enum ColumnResponseError { impl ActiveCustodyByRangeRequest { pub(crate) fn new( id: CustodyByRangeRequestId, - epoch: Epoch, + request: BlocksByRangeRequest, blocks_with_data: Vec, column_indices: &[ColumnIndex], lookup_peers: Arc>>, @@ -82,7 +81,7 @@ impl ActiveCustodyByRangeRequest { Self { start_time: Instant::now(), id, - epoch, + request, blocks_with_data, column_requests: HashMap::from_iter( column_indices @@ -350,7 +349,6 @@ impl ActiveCustodyByRangeRequest { }) .collect::, _>>()? // Flatten Vec> to Vec - // TODO(das): maybe not optimal for the coupling logic later .into_iter() .flatten() .collect(); @@ -375,8 +373,9 @@ impl ActiveCustodyByRangeRequest { return Err(Error::TooManyDownloadErrors(last_error)); } - // TODO(das): When is a fork and only a subset of your peers know about a block, we should - // only query the peers on that fork. Should this case be handled? How to handle it? + // TODO(das): We should only query peers that are likely to know about this block. + // For by_range requests, only peers in the SyncingChain peer set. Else consider a + // fallback to the peers that are synced up to the epoch we want to query. let custodial_peers = cx.get_custodial_peers(*column_index); // We draw from the total set of peers, but prioritize those peers who we have @@ -433,12 +432,8 @@ impl ActiveCustodyByRangeRequest { .send_data_columns_by_range_request( peer_id, DataColumnsByRangeRequest { - // TODO(das): generalize with constants from batch - start_slot: self - .epoch - .start_slot(T::EthSpec::slots_per_epoch()) - .as_u64(), - count: T::EthSpec::slots_per_epoch(), + start_slot: *self.request.start_slot(), + count: *self.request.count(), columns: indices.clone(), }, self.id, diff --git a/beacon_node/network/src/sync/network_context/custody_by_root.rs b/beacon_node/network/src/sync/network_context/custody_by_root.rs index 489b9c3b11..1ca2a55a13 100644 --- a/beacon_node/network/src/sync/network_context/custody_by_root.rs +++ b/beacon_node/network/src/sync/network_context/custody_by_root.rs @@ -21,7 +21,8 @@ use super::{LookupRequestResult, PeerGroup, RpcResponseResult, SyncNetworkContex const FAILED_PEERS_CACHE_EXPIRY_SECONDS: u64 = 5; const REQUEST_EXPIRY_SECONDS: u64 = 300; -/// TODO(das): this attempt count is nested into the existing lookup request count. +/// TODO(das): Reconsider this retry count, it was choosen as a placeholder value. Each +/// `custody_by_*` request is already retried multiple inside of a lookup or batch const MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS: usize = 3; pub struct ActiveCustodyByRootRequest { diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 81f33352f5..8ee9748ebc 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -1,9 +1,10 @@ +use crate::sync::network_context::PeerGroup; use beacon_chain::block_verification_types::RpcBlock; use itertools::Itertools; use lighthouse_network::rpc::methods::BlocksByRangeRequest; use lighthouse_network::service::api_types::Id; use lighthouse_network::PeerId; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::fmt; use std::hash::{Hash, Hasher}; use std::ops::Sub; @@ -22,17 +23,17 @@ const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3; #[derive(Clone, Debug)] pub struct BatchPeers { block_peer: PeerId, - column_peers: HashMap, + column_peers: PeerGroup, } impl BatchPeers { pub fn new_from_block_peer(block_peer: PeerId) -> Self { Self { block_peer, - column_peers: <_>::default(), + column_peers: PeerGroup::empty(), } } - pub fn new(block_peer: PeerId, column_peers: HashMap) -> Self { + pub fn new(block_peer: PeerId, column_peers: PeerGroup) -> Self { Self { block_peer, column_peers, @@ -44,12 +45,12 @@ impl BatchPeers { } pub fn column(&self, index: &ColumnIndex) -> Option<&PeerId> { - self.column_peers.get(index) + self.column_peers.of_index(&((*index) as usize)) } pub fn iter_unique_peers(&self) -> impl Iterator { std::iter::once(&self.block_peer) - .chain(self.column_peers.values()) + .chain(self.column_peers.all()) .unique() } } diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index 1fb19e15ef..09c99d07d8 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -188,8 +188,6 @@ struct CompleteConfig { } impl CompleteConfig { - // TODO(das): add tests where blocks don't have data - fn custody_failure_at_index(mut self, index: u64) -> Self { self.custody_failure_at_index = Some(index); self @@ -1192,15 +1190,14 @@ fn finalized_sync_permanent_custody_peer_failure() { // Find the requests first to assert that this is the only request that exists r.expect_no_data_columns_by_range_requests(filter().epoch(0)); - // complete this one request without the custody failure now r.complete_data_by_range_request( reqs, complete().custody_failure_at_index(column_index_to_fail), ); } - // TODO(das): send batch 1 for completing processing and check that SyncingChain processed batch - // 1 successfully + // custody_by_range request is still active waiting for a new peer to connect + r.expect_active_block_components_by_range_request_on_custody_step(); } #[test]