More comments

This commit is contained in:
dapplion
2025-05-26 18:37:20 -05:00
parent 801659d4ae
commit b383f7af53
3 changed files with 100 additions and 73 deletions

View File

@@ -507,75 +507,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
Ok(id.id)
}
/// Received a blocks by range or blobs by range response for a request that couples blocks '
/// and blobs.
#[allow(clippy::type_complexity)]
pub fn on_block_components_by_range_response(
&mut self,
id: ComponentsByRangeRequestId,
range_block_component: RangeBlockComponent<T::EthSpec>,
) -> Option<Result<(Vec<RpcBlock<T::EthSpec>>, BatchPeers), RpcResponseError>> {
// Note: need to remove the request to borrow self again below. Otherwise we can't
// do nested requests
let Some(mut request) = self.block_components_by_range_requests.remove(&id) else {
metrics::inc_counter_vec(
&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS,
&["block_components_by_range"],
);
return None;
};
let result = match range_block_component {
RangeBlockComponent::Block(req_id, resp, peer_id) => resp.and_then(|(blocks, _)| {
request
.on_blocks_by_range_result(req_id, blocks, peer_id, self)
.map_err(Into::<RpcResponseError>::into)
}),
RangeBlockComponent::Blob(req_id, resp, peer_id) => resp.and_then(|(blobs, _)| {
request
.on_blobs_by_range_result(req_id, blobs, peer_id, self)
.map_err(Into::<RpcResponseError>::into)
}),
RangeBlockComponent::CustodyColumns(req_id, resp, peers) => {
resp.and_then(|(custody_columns, _)| {
request
.on_custody_by_range_result(req_id, custody_columns, peers, self)
.map_err(Into::<RpcResponseError>::into)
})
}
};
let result = result.transpose();
// Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to
// an Option first to use in an `if let Some() { act on result }` block.
match result.as_ref() {
Some(Ok((blocks, peer_group))) => {
let blocks_with_data = blocks
.iter()
.filter(|block| block.as_block().has_data())
.count();
// Don't log the peer_group here, it's very long (could be up to 128 peers). If you
// want to trace which peer sent the column at index X, search for the log:
// `Sync RPC request sent method="DataColumnsByRange" ...`
debug!(
%id,
blocks = blocks.len(),
blocks_with_data,
block_peer = ?peer_group.block(),
"Block components by range request success, removing"
)
}
Some(Err(e)) => {
debug!(%id, error = ?e, "Block components by range request failure, removing" )
}
None => {
self.block_components_by_range_requests.insert(id, request);
}
}
result
}
/// Request block of `block_root` if necessary by checking:
/// - If the da_checker has a pending block from gossip or a previous request
///
@@ -1220,6 +1151,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// Request handlers
/// Processes a single `RpcEvent` blocks_by_root RPC request.
/// Same logic as [`on_blocks_by_range_response`] but it converts a `Vec<Block>` into a `Block`
pub(crate) fn on_single_block_response(
&mut self,
id: SingleLookupReqId,
@@ -1242,6 +1175,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.on_rpc_response_result(id, "BlocksByRoot", resp, peer_id, |_| 1)
}
/// Processes a single `RpcEvent` blobs_by_root RPC request.
/// Same logic as [`on_blocks_by_range_response`]
pub(crate) fn on_single_blob_response(
&mut self,
id: SingleLookupReqId,
@@ -1271,6 +1206,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.on_rpc_response_result(id, "BlobsByRoot", resp, peer_id, |_| 1)
}
/// Processes a single `RpcEvent` for a data_columns_by_root RPC request.
/// Same logic as [`on_blocks_by_range_response`]
#[allow(clippy::type_complexity)]
pub(crate) fn on_data_columns_by_root_response(
&mut self,
@@ -1284,6 +1221,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.on_rpc_response_result(id, "DataColumnsByRoot", resp, peer_id, |_| 1)
}
/// Processes a single `RpcEvent` for a blocks_by_range RPC request.
/// - If the event completes the request, it returns `Some(Ok)` with a vec of blocks
/// - If the event is an error it fails the request and returns `Some(Err)`
/// - else it appends the response chunk to the active request state and returns `None`
#[allow(clippy::type_complexity)]
pub(crate) fn on_blocks_by_range_response(
&mut self,
@@ -1295,6 +1236,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.on_rpc_response_result(id, "BlocksByRange", resp, peer_id, |b| b.len())
}
/// Processes a single `RpcEvent` for a blobs_by_range RPC request.
/// Same logic as [`on_blocks_by_range_response`]
#[allow(clippy::type_complexity)]
pub(crate) fn on_blobs_by_range_response(
&mut self,
@@ -1306,6 +1249,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.on_rpc_response_result(id, "BlobsByRangeRequest", resp, peer_id, |b| b.len())
}
/// Processes a single `RpcEvent` for a data_columns_by_range RPC request.
/// Same logic as [`on_blocks_by_range_response`]
#[allow(clippy::type_complexity)]
pub(crate) fn on_data_columns_by_range_response(
&mut self,
@@ -1319,6 +1264,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.on_rpc_response_result(id, "DataColumnsByRange", resp, peer_id, |d| d.len())
}
/// Common logic for `on_*_response` handlers. Ensures we have consistent logging and metrics
/// and peer reporting for all request types.
fn on_rpc_response_result<I: std::fmt::Display, R, F: FnOnce(&R) -> usize>(
&mut self,
id: I,
@@ -1475,6 +1422,79 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
result
}
/// Processes the result of an `*_by_range` RPC request issued by a
/// block_components_by_range_request.
///
/// - If the result completes the request, it returns `Some(Ok)` with a vec of coupled RpcBlocks
/// - If the result fails the request, it returns `Some(Err)`. Note that a failed request may
/// not fail the block_components_by_range_request as it implements retries.
/// - else it appends the result to the active request state and returns `None`
#[allow(clippy::type_complexity)]
pub fn on_block_components_by_range_response(
&mut self,
id: ComponentsByRangeRequestId,
range_block_component: RangeBlockComponent<T::EthSpec>,
) -> Option<Result<(Vec<RpcBlock<T::EthSpec>>, BatchPeers), RpcResponseError>> {
// Note: need to remove the request to borrow self again below. Otherwise we can't
// do nested requests
let Some(mut request) = self.block_components_by_range_requests.remove(&id) else {
metrics::inc_counter_vec(
&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS,
&["block_components_by_range"],
);
return None;
};
let result = match range_block_component {
RangeBlockComponent::Block(req_id, resp, peer_id) => resp.and_then(|(blocks, _)| {
request
.on_blocks_by_range_result(req_id, blocks, peer_id, self)
.map_err(Into::<RpcResponseError>::into)
}),
RangeBlockComponent::Blob(req_id, resp, peer_id) => resp.and_then(|(blobs, _)| {
request
.on_blobs_by_range_result(req_id, blobs, peer_id, self)
.map_err(Into::<RpcResponseError>::into)
}),
RangeBlockComponent::CustodyColumns(req_id, resp, peers) => {
resp.and_then(|(custody_columns, _)| {
request
.on_custody_by_range_result(req_id, custody_columns, peers, self)
.map_err(Into::<RpcResponseError>::into)
})
}
}
// Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to
// an Option first to use in an `if let Some() { act on result }` block.
.transpose();
match result.as_ref() {
Some(Ok((blocks, peer_group))) => {
let blocks_with_data = blocks
.iter()
.filter(|block| block.as_block().has_data())
.count();
// Don't log the peer_group here, it's very long (could be up to 128 peers). If you
// want to trace which peer sent the column at index X, search for the log:
// `Sync RPC request sent method="DataColumnsByRange" ...`
debug!(
%id,
blocks = blocks.len(),
blocks_with_data,
block_peer = ?peer_group.block(),
"Block components by range request success, removing"
)
}
Some(Err(e)) => {
debug!(%id, error = ?e, "Block components by range request failure, removing" )
}
None => {
self.block_components_by_range_requests.insert(id, request);
}
}
result
}
pub fn send_block_for_processing(
&self,
id: Id,