From b383f7af536329ef99989fe3390b83659d47339c Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 26 May 2025 18:37:20 -0500 Subject: [PATCH] More comments --- beacon_node/network/src/sync/manager.rs | 4 +- .../network/src/sync/network_context.rs | 158 ++++++++++-------- .../block_components_by_range.rs | 11 +- 3 files changed, 100 insertions(+), 73 deletions(-) diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 0cf17c7b89..adcc177b8b 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -1301,11 +1301,11 @@ impl SyncManager { range_request_id: ComponentsByRangeRequestId, range_block_component: RangeBlockComponent, ) { - if let Some(resp) = self + if let Some(result) = self .network .on_block_components_by_range_response(range_request_id, range_block_component) { - match resp { + match result { Ok((blocks, batch_peers)) => { match range_request_id.requester { RangeRequestId::RangeSync { chain_id, batch_id } => { diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index d7ad9d3eb7..3bc8192437 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -507,75 +507,6 @@ impl SyncNetworkContext { Ok(id.id) } - /// Received a blocks by range or blobs by range response for a request that couples blocks ' - /// and blobs. - #[allow(clippy::type_complexity)] - pub fn on_block_components_by_range_response( - &mut self, - id: ComponentsByRangeRequestId, - range_block_component: RangeBlockComponent, - ) -> Option>, BatchPeers), RpcResponseError>> { - // Note: need to remove the request to borrow self again below. Otherwise we can't - // do nested requests - let Some(mut request) = self.block_components_by_range_requests.remove(&id) else { - metrics::inc_counter_vec( - &metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, - &["block_components_by_range"], - ); - return None; - }; - - let result = match range_block_component { - RangeBlockComponent::Block(req_id, resp, peer_id) => resp.and_then(|(blocks, _)| { - request - .on_blocks_by_range_result(req_id, blocks, peer_id, self) - .map_err(Into::::into) - }), - RangeBlockComponent::Blob(req_id, resp, peer_id) => resp.and_then(|(blobs, _)| { - request - .on_blobs_by_range_result(req_id, blobs, peer_id, self) - .map_err(Into::::into) - }), - RangeBlockComponent::CustodyColumns(req_id, resp, peers) => { - resp.and_then(|(custody_columns, _)| { - request - .on_custody_by_range_result(req_id, custody_columns, peers, self) - .map_err(Into::::into) - }) - } - }; - - let result = result.transpose(); - - // Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to - // an Option first to use in an `if let Some() { act on result }` block. - match result.as_ref() { - Some(Ok((blocks, peer_group))) => { - let blocks_with_data = blocks - .iter() - .filter(|block| block.as_block().has_data()) - .count(); - // Don't log the peer_group here, it's very long (could be up to 128 peers). If you - // want to trace which peer sent the column at index X, search for the log: - // `Sync RPC request sent method="DataColumnsByRange" ...` - debug!( - %id, - blocks = blocks.len(), - blocks_with_data, - block_peer = ?peer_group.block(), - "Block components by range request success, removing" - ) - } - Some(Err(e)) => { - debug!(%id, error = ?e, "Block components by range request failure, removing" ) - } - None => { - self.block_components_by_range_requests.insert(id, request); - } - } - result - } - /// Request block of `block_root` if necessary by checking: /// - If the da_checker has a pending block from gossip or a previous request /// @@ -1220,6 +1151,8 @@ impl SyncNetworkContext { // Request handlers + /// Processes a single `RpcEvent` blocks_by_root RPC request. + /// Same logic as [`on_blocks_by_range_response`] but it converts a `Vec` into a `Block` pub(crate) fn on_single_block_response( &mut self, id: SingleLookupReqId, @@ -1242,6 +1175,8 @@ impl SyncNetworkContext { self.on_rpc_response_result(id, "BlocksByRoot", resp, peer_id, |_| 1) } + /// Processes a single `RpcEvent` blobs_by_root RPC request. + /// Same logic as [`on_blocks_by_range_response`] pub(crate) fn on_single_blob_response( &mut self, id: SingleLookupReqId, @@ -1271,6 +1206,8 @@ impl SyncNetworkContext { self.on_rpc_response_result(id, "BlobsByRoot", resp, peer_id, |_| 1) } + /// Processes a single `RpcEvent` for a data_columns_by_root RPC request. + /// Same logic as [`on_blocks_by_range_response`] #[allow(clippy::type_complexity)] pub(crate) fn on_data_columns_by_root_response( &mut self, @@ -1284,6 +1221,10 @@ impl SyncNetworkContext { self.on_rpc_response_result(id, "DataColumnsByRoot", resp, peer_id, |_| 1) } + /// Processes a single `RpcEvent` for a blocks_by_range RPC request. + /// - If the event completes the request, it returns `Some(Ok)` with a vec of blocks + /// - If the event is an error it fails the request and returns `Some(Err)` + /// - else it appends the response chunk to the active request state and returns `None` #[allow(clippy::type_complexity)] pub(crate) fn on_blocks_by_range_response( &mut self, @@ -1295,6 +1236,8 @@ impl SyncNetworkContext { self.on_rpc_response_result(id, "BlocksByRange", resp, peer_id, |b| b.len()) } + /// Processes a single `RpcEvent` for a blobs_by_range RPC request. + /// Same logic as [`on_blocks_by_range_response`] #[allow(clippy::type_complexity)] pub(crate) fn on_blobs_by_range_response( &mut self, @@ -1306,6 +1249,8 @@ impl SyncNetworkContext { self.on_rpc_response_result(id, "BlobsByRangeRequest", resp, peer_id, |b| b.len()) } + /// Processes a single `RpcEvent` for a data_columns_by_range RPC request. + /// Same logic as [`on_blocks_by_range_response`] #[allow(clippy::type_complexity)] pub(crate) fn on_data_columns_by_range_response( &mut self, @@ -1319,6 +1264,8 @@ impl SyncNetworkContext { self.on_rpc_response_result(id, "DataColumnsByRange", resp, peer_id, |d| d.len()) } + /// Common logic for `on_*_response` handlers. Ensures we have consistent logging and metrics + /// and peer reporting for all request types. fn on_rpc_response_result usize>( &mut self, id: I, @@ -1475,6 +1422,79 @@ impl SyncNetworkContext { result } + /// Processes the result of an `*_by_range` RPC request issued by a + /// block_components_by_range_request. + /// + /// - If the result completes the request, it returns `Some(Ok)` with a vec of coupled RpcBlocks + /// - If the result fails the request, it returns `Some(Err)`. Note that a failed request may + /// not fail the block_components_by_range_request as it implements retries. + /// - else it appends the result to the active request state and returns `None` + #[allow(clippy::type_complexity)] + pub fn on_block_components_by_range_response( + &mut self, + id: ComponentsByRangeRequestId, + range_block_component: RangeBlockComponent, + ) -> Option>, BatchPeers), RpcResponseError>> { + // Note: need to remove the request to borrow self again below. Otherwise we can't + // do nested requests + let Some(mut request) = self.block_components_by_range_requests.remove(&id) else { + metrics::inc_counter_vec( + &metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, + &["block_components_by_range"], + ); + return None; + }; + + let result = match range_block_component { + RangeBlockComponent::Block(req_id, resp, peer_id) => resp.and_then(|(blocks, _)| { + request + .on_blocks_by_range_result(req_id, blocks, peer_id, self) + .map_err(Into::::into) + }), + RangeBlockComponent::Blob(req_id, resp, peer_id) => resp.and_then(|(blobs, _)| { + request + .on_blobs_by_range_result(req_id, blobs, peer_id, self) + .map_err(Into::::into) + }), + RangeBlockComponent::CustodyColumns(req_id, resp, peers) => { + resp.and_then(|(custody_columns, _)| { + request + .on_custody_by_range_result(req_id, custody_columns, peers, self) + .map_err(Into::::into) + }) + } + } + // Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to + // an Option first to use in an `if let Some() { act on result }` block. + .transpose(); + + match result.as_ref() { + Some(Ok((blocks, peer_group))) => { + let blocks_with_data = blocks + .iter() + .filter(|block| block.as_block().has_data()) + .count(); + // Don't log the peer_group here, it's very long (could be up to 128 peers). If you + // want to trace which peer sent the column at index X, search for the log: + // `Sync RPC request sent method="DataColumnsByRange" ...` + debug!( + %id, + blocks = blocks.len(), + blocks_with_data, + block_peer = ?peer_group.block(), + "Block components by range request success, removing" + ) + } + Some(Err(e)) => { + debug!(%id, error = ?e, "Block components by range request failure, removing" ) + } + None => { + self.block_components_by_range_requests.insert(id, request); + } + } + result + } + pub fn send_block_for_processing( &self, id: Id, diff --git a/beacon_node/network/src/sync/network_context/block_components_by_range.rs b/beacon_node/network/src/sync/network_context/block_components_by_range.rs index 45e5091665..7c8e59eb97 100644 --- a/beacon_node/network/src/sync/network_context/block_components_by_range.rs +++ b/beacon_node/network/src/sync/network_context/block_components_by_range.rs @@ -19,6 +19,10 @@ use types::{ SignedBeaconBlock, Slot, }; +/// Given a `BlocksByRangeRequest` (a range of slots) fetches all necessary data to return +/// potentially available RpcBlocks. +/// +/// See [`State`] for the set of `*_by_range` it may issue depending on the fork. pub struct BlockComponentsByRangeRequest { id: ComponentsByRangeRequestId, peers: Arc>>, @@ -31,13 +35,16 @@ enum State { blocks_by_range_request: ByRangeRequest>>>, }, - // Two single concurrent requests for block + blobs + // Two single concurrent requests for block + blobs. As of now we request blocks and blobs to + // the same peer, so we can attribute coupling errors to the same unique peer. DenebEnabled { blocks_by_range_request: ByRangeRequest>>>, blobs_by_range_request: ByRangeRequest>>>, }, - // Request blocks first, then columns + // Request blocks first, then columns. Assuming the block peer is honest we can attribute + // custody failures to the peers serving us columns. We want to get rid of the honest block + // peer assumption in the future, see https://github.com/sigp/lighthouse/issues/6258 FuluEnabled(FuluEnabledState), }