From c8a0c9e37932d67ee74da255a796d367725ea93b Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 26 May 2025 19:04:50 -0500 Subject: [PATCH] Remove CustodyByRoot and CustodyByRange types --- beacon_node/network/src/sync/manager.rs | 7 +- .../network/src/sync/network_context.rs | 68 +++++++++++-------- 2 files changed, 41 insertions(+), 34 deletions(-) diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index adcc177b8b..b5e936d7e8 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -36,8 +36,7 @@ use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; use super::block_lookups::BlockLookups; use super::network_context::{ - CustodyByRangeResult, CustodyByRootResult, RangeBlockComponent, RangeRequestId, RpcEvent, - SyncNetworkContext, + CustodyRequestResult, RangeBlockComponent, RangeRequestId, RpcEvent, SyncNetworkContext, }; use super::peer_sampling::{Sampling, SamplingConfig, SamplingResult}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; @@ -1236,7 +1235,7 @@ impl SyncManager { fn on_custody_by_range_result( &mut self, id: CustodyByRangeRequestId, - result: CustodyByRangeResult, + result: CustodyRequestResult, ) { // TODO(das): Improve the type of RangeBlockComponent::CustodyColumns, not // not have to pass a PeerGroup in case of error @@ -1259,7 +1258,7 @@ impl SyncManager { fn on_custody_by_root_result( &mut self, requester: CustodyRequester, - response: CustodyByRootResult, + response: CustodyRequestResult, ) { self.block_lookups .on_download_response::>( diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 458ff755d2..ce2f91a391 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1,8 +1,8 @@ //! Provides network functionality for the Syncing thread. This fundamentally wraps a network //! channel and stores a global RPC ID to perform requests. -use self::custody_by_range::{ActiveCustodyByRangeRequest, CustodyByRangeRequestResult}; -use self::custody_by_root::{ActiveCustodyByRootRequest, CustodyByRootRequestResult}; +use self::custody_by_range::ActiveCustodyByRangeRequest; +use self::custody_by_root::ActiveCustodyByRootRequest; pub use self::requests::{BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest}; use super::manager::BlockProcessType; use super::range_sync::BatchPeers; @@ -75,12 +75,12 @@ impl RpcEvent { pub type RpcResponseResult = Result<(T, Duration), RpcResponseError>; +/// Duration = latest seen timestamp of all received data columns pub type RpcResponseBatchResult = Result<(T, PeerGroup, Duration), RpcResponseError>; -/// Duration = latest seen timestamp of all received data columns -pub type CustodyByRootResult = RpcResponseBatchResult>; - -pub type CustodyByRangeResult = RpcResponseBatchResult>; +/// Common result type for `custody_by_root` and `custody_by_range` requests. The peers are part of +/// the `Ok` response since they are not known until the entire request succeeds. +pub type CustodyRequestResult = RpcResponseBatchResult>; #[derive(Debug, Clone)] pub enum RpcResponseError { @@ -1102,7 +1102,7 @@ impl SyncNetworkContext { /// attempt. pub fn continue_custody_by_root_requests( &mut self, - ) -> Vec<(CustodyRequester, CustodyByRootResult)> { + ) -> Vec<(CustodyRequester, CustodyRequestResult)> { let ids = self .custody_by_root_requests .keys() @@ -1116,7 +1116,10 @@ impl SyncNetworkContext { .custody_by_root_requests .remove(&id) .expect("key of hashmap"); - let result = request.continue_requests(self); + let result = request + .continue_requests(self) + .map_err(Into::::into) + .transpose(); self.handle_custody_by_root_result(id, request, result) .map(|result| (id, result)) }) @@ -1128,7 +1131,7 @@ impl SyncNetworkContext { /// attempt. pub fn continue_custody_by_range_requests( &mut self, - ) -> Vec<(CustodyByRangeRequestId, CustodyByRangeResult)> { + ) -> Vec<(CustodyByRangeRequestId, CustodyRequestResult)> { let ids = self .custody_by_range_requests .keys() @@ -1142,7 +1145,10 @@ impl SyncNetworkContext { .custody_by_range_requests .remove(&id) .expect("key of hashmap"); - let result = request.continue_requests(self); + let result = request + .continue_requests(self) + .map_err(Into::::into) + .transpose(); self.handle_custody_by_range_result(id, request, result) .map(|result| (id, result)) }) @@ -1313,7 +1319,7 @@ impl SyncNetworkContext { req_id: DataColumnsByRootRequestId, peer_id: PeerId, resp: RpcResponseResult>>>, - ) -> Option> { + ) -> Option> { let span = span!( Level::INFO, "SyncNetworkContext", @@ -1331,7 +1337,10 @@ impl SyncNetworkContext { return None; }; - let result = request.on_data_column_downloaded(peer_id, req_id, resp, self); + let result = request + .on_data_column_downloaded(peer_id, req_id, resp, self) + .map_err(Into::::into) + .transpose(); self.handle_custody_by_root_result(id.requester, request, result) } @@ -1340,8 +1349,8 @@ impl SyncNetworkContext { &mut self, id: CustodyRequester, request: ActiveCustodyByRootRequest, - result: CustodyByRootRequestResult, - ) -> Option> { + result: Option>, + ) -> Option> { let span = span!( Level::INFO, "SyncNetworkContext", @@ -1350,19 +1359,17 @@ impl SyncNetworkContext { let _enter = span.enter(); match &result { - Ok(Some((columns, peer_group, _))) => { + Some(Ok((columns, peer_group, _))) => { debug!(%id, count = columns.len(), peers = ?peer_group, "Custody by root request success, removing") } - Err(e) => { + Some(Err(e)) => { debug!(%id, error = ?e, "Custody by root request failure, removing") } - Ok(None) => { + None => { self.custody_by_root_requests.insert(id, request); } } - // Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to - // an Option first to use in an `if let Some() { act on result }` block. - result.map_err(Into::::into).transpose() + result } /// Insert a downloaded column into an active custody request. Then make progress on the @@ -1379,7 +1386,7 @@ impl SyncNetworkContext { req_id: DataColumnsByRangeRequestId, peer_id: PeerId, resp: RpcResponseResult>>>, - ) -> Option> { + ) -> Option> { // Note: need to remove the request to borrow self again below. Otherwise we can't // do nested requests let Some(mut request) = self.custody_by_range_requests.remove(&id) else { @@ -1390,7 +1397,10 @@ impl SyncNetworkContext { return None; }; - let result = request.on_data_column_downloaded(peer_id, req_id, resp, self); + let result = request + .on_data_column_downloaded(peer_id, req_id, resp, self) + .map_err(Into::::into) + .transpose(); self.handle_custody_by_range_result(id, request, result) } @@ -1399,25 +1409,23 @@ impl SyncNetworkContext { &mut self, id: CustodyByRangeRequestId, request: ActiveCustodyByRangeRequest, - result: CustodyByRangeRequestResult, - ) -> Option> { + result: Option>, + ) -> Option> { match &result { - Ok(Some((columns, _peer_group, _))) => { + Some(Ok((columns, _peer_group, _))) => { // Don't log the peer_group here, it's very long (could be up to 128 peers). If you // want to trace which peer sent the column at index X, search for the log: // `Sync RPC request sent method="DataColumnsByRange" ...` debug!(%id, count = columns.len(), "Custody by range request success, removing") } - Err(e) => { + Some(Err(e)) => { debug!(%id, error = ?e, "Custody by range request failure, removing") } - Ok(None) => { + None => { self.custody_by_range_requests.insert(id, request); } } - // Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to - // an Option first to use in an `if let Some() { act on result }` block. - result.map_err(Into::::into).transpose() + result } /// Processes the result of an `*_by_range` RPC request issued by a