mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-07 00:42:42 +00:00
Sync active request byrange ids logs (#6914)
- Re-opened PR from https://github.com/sigp/lighthouse/pull/6869 Writing and running tests I noted that the sync RPC requests are very verbose now. `DataColumnsByRootRequestId { id: 123, requester: Custody(CustodyId { requester: CustodyRequester(SingleLookupReqId { req_id: 121, lookup_id: 101 }) }) }` Since this Id is logged rather often I believe there's value in 1. Making them more succinct for log verbosity 2. Make them a string that's easy to copy and work with elastic Write custom `Display` implementations to render Ids in a more DX format _ DataColumnsByRootRequestId with a block lookup_ ``` 123/Custody/121/Lookup/101 ``` _DataColumnsByRangeRequestId_ ``` 123/122/RangeSync/0/5492900659401505034 ``` - This one will be shorter after https://github.com/sigp/lighthouse/pull/6868 Also made the logs format and text consistent across all methods
This commit is contained in:
@@ -1,15 +1,14 @@
|
|||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use libp2p::swarm::ConnectionId;
|
|
||||||
use types::{
|
|
||||||
BlobSidecar, DataColumnSidecar, Epoch, EthSpec, Hash256, LightClientBootstrap,
|
|
||||||
LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock,
|
|
||||||
};
|
|
||||||
|
|
||||||
use crate::rpc::{
|
use crate::rpc::{
|
||||||
methods::{ResponseTermination, RpcResponse, RpcSuccessResponse, StatusMessage},
|
methods::{ResponseTermination, RpcResponse, RpcSuccessResponse, StatusMessage},
|
||||||
SubstreamId,
|
SubstreamId,
|
||||||
};
|
};
|
||||||
|
use libp2p::swarm::ConnectionId;
|
||||||
|
use std::fmt::{Display, Formatter};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use types::{
|
||||||
|
BlobSidecar, DataColumnSidecar, Epoch, EthSpec, Hash256, LightClientBootstrap,
|
||||||
|
LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock,
|
||||||
|
};
|
||||||
|
|
||||||
/// Identifier of requests sent by a peer.
|
/// Identifier of requests sent by a peer.
|
||||||
pub type PeerRequestId = (ConnectionId, SubstreamId);
|
pub type PeerRequestId = (ConnectionId, SubstreamId);
|
||||||
@@ -235,9 +234,108 @@ impl slog::Value for RequestId {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// This custom impl reduces log boilerplate not printing `DataColumnsByRootRequestId` on each id log
|
macro_rules! impl_display {
|
||||||
impl std::fmt::Display for DataColumnsByRootRequestId {
|
($structname: ty, $format: literal, $($field:ident),*) => {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
impl Display for $structname {
|
||||||
write!(f, "{} {:?}", self.id, self.requester)
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(f, $format, $(self.$field,)*)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since each request Id is deeply nested with various types, if rendered with Debug on logs they
|
||||||
|
// take too much visual space. This custom Display implementations make the overall Id short while
|
||||||
|
// not losing information
|
||||||
|
impl_display!(BlocksByRangeRequestId, "{}/{}", id, parent_request_id);
|
||||||
|
impl_display!(BlobsByRangeRequestId, "{}/{}", id, parent_request_id);
|
||||||
|
impl_display!(DataColumnsByRangeRequestId, "{}/{}", id, parent_request_id);
|
||||||
|
impl_display!(ComponentsByRangeRequestId, "{}/{}", id, requester);
|
||||||
|
impl_display!(DataColumnsByRootRequestId, "{}/{}", id, requester);
|
||||||
|
impl_display!(SingleLookupReqId, "{}/Lookup/{}", req_id, lookup_id);
|
||||||
|
impl_display!(CustodyId, "{}", requester);
|
||||||
|
impl_display!(SamplingId, "{}/{}", sampling_request_id, id);
|
||||||
|
|
||||||
|
impl Display for DataColumnsByRootRequester {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
Self::Custody(id) => write!(f, "Custody/{id}"),
|
||||||
|
Self::Sampling(id) => write!(f, "Sampling/{id}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for CustodyRequester {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(f, "{}", self.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for RangeRequestId {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
Self::RangeSync { chain_id, batch_id } => write!(f, "RangeSync/{batch_id}/{chain_id}"),
|
||||||
|
Self::BackfillSync { batch_id } => write!(f, "BackfillSync/{batch_id}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for SamplingRequestId {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(f, "{}", self.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for SamplingRequester {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
Self::ImportedBlock(block) => write!(f, "ImportedBlock/{block}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn display_id_data_columns_by_root_custody() {
|
||||||
|
let id = DataColumnsByRootRequestId {
|
||||||
|
id: 123,
|
||||||
|
requester: DataColumnsByRootRequester::Custody(CustodyId {
|
||||||
|
requester: CustodyRequester(SingleLookupReqId {
|
||||||
|
req_id: 121,
|
||||||
|
lookup_id: 101,
|
||||||
|
}),
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
assert_eq!(format!("{id}"), "123/Custody/121/Lookup/101");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn display_id_data_columns_by_root_sampling() {
|
||||||
|
let id = DataColumnsByRootRequestId {
|
||||||
|
id: 123,
|
||||||
|
requester: DataColumnsByRootRequester::Sampling(SamplingId {
|
||||||
|
id: SamplingRequester::ImportedBlock(Hash256::ZERO),
|
||||||
|
sampling_request_id: SamplingRequestId(101),
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
assert_eq!(format!("{id}"), "123/Sampling/101/ImportedBlock/0x0000000000000000000000000000000000000000000000000000000000000000");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn display_id_data_columns_by_range() {
|
||||||
|
let id = DataColumnsByRangeRequestId {
|
||||||
|
id: 123,
|
||||||
|
parent_request_id: ComponentsByRangeRequestId {
|
||||||
|
id: 122,
|
||||||
|
requester: RangeRequestId::RangeSync {
|
||||||
|
chain_id: 54,
|
||||||
|
batch_id: Epoch::new(0),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
assert_eq!(format!("{id}"), "123/122/RangeSync/0/54");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ use requests::{
|
|||||||
use slog::{debug, error, warn};
|
use slog::{debug, error, warn};
|
||||||
use std::collections::hash_map::Entry;
|
use std::collections::hash_map::Entry;
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
|
use std::fmt::Debug;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
@@ -535,17 +536,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let req_id = self.next_id();
|
let id = SingleLookupReqId {
|
||||||
let id = SingleLookupReqId { lookup_id, req_id };
|
lookup_id,
|
||||||
|
req_id: self.next_id(),
|
||||||
debug!(
|
};
|
||||||
self.log,
|
|
||||||
"Sending BlocksByRoot Request";
|
|
||||||
"method" => "BlocksByRoot",
|
|
||||||
"block_root" => ?block_root,
|
|
||||||
"peer" => %peer_id,
|
|
||||||
"id" => ?id
|
|
||||||
);
|
|
||||||
|
|
||||||
let request = BlocksByRootSingleRequest(block_root);
|
let request = BlocksByRootSingleRequest(block_root);
|
||||||
|
|
||||||
@@ -563,6 +557,15 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
})
|
})
|
||||||
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
|
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Sync RPC request sent";
|
||||||
|
"method" => "BlocksByRoot",
|
||||||
|
"block_root" => ?block_root,
|
||||||
|
"peer" => %peer_id,
|
||||||
|
"id" => %id
|
||||||
|
);
|
||||||
|
|
||||||
self.blocks_by_root_requests.insert(
|
self.blocks_by_root_requests.insert(
|
||||||
id,
|
id,
|
||||||
peer_id,
|
peer_id,
|
||||||
@@ -572,7 +575,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
BlocksByRootRequestItems::new(request),
|
BlocksByRootRequestItems::new(request),
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(LookupRequestResult::RequestSent(req_id))
|
Ok(LookupRequestResult::RequestSent(id.req_id))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking:
|
/// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking:
|
||||||
@@ -618,22 +621,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
return Ok(LookupRequestResult::NoRequestNeeded("no indices to fetch"));
|
return Ok(LookupRequestResult::NoRequestNeeded("no indices to fetch"));
|
||||||
}
|
}
|
||||||
|
|
||||||
let req_id = self.next_id();
|
let id = SingleLookupReqId {
|
||||||
let id = SingleLookupReqId { lookup_id, req_id };
|
lookup_id,
|
||||||
|
req_id: self.next_id(),
|
||||||
debug!(
|
};
|
||||||
self.log,
|
|
||||||
"Sending BlobsByRoot Request";
|
|
||||||
"method" => "BlobsByRoot",
|
|
||||||
"block_root" => ?block_root,
|
|
||||||
"blob_indices" => ?indices,
|
|
||||||
"peer" => %peer_id,
|
|
||||||
"id" => ?id
|
|
||||||
);
|
|
||||||
|
|
||||||
let request = BlobsByRootSingleBlockRequest {
|
let request = BlobsByRootSingleBlockRequest {
|
||||||
block_root,
|
block_root,
|
||||||
indices,
|
indices: indices.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Lookup sync event safety: Refer to `Self::block_lookup_request` `network_send.send` call
|
// Lookup sync event safety: Refer to `Self::block_lookup_request` `network_send.send` call
|
||||||
@@ -645,6 +640,16 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
})
|
})
|
||||||
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
|
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Sync RPC request sent";
|
||||||
|
"method" => "BlobsByRoot",
|
||||||
|
"block_root" => ?block_root,
|
||||||
|
"blob_indices" => ?indices,
|
||||||
|
"peer" => %peer_id,
|
||||||
|
"id" => %id
|
||||||
|
);
|
||||||
|
|
||||||
self.blobs_by_root_requests.insert(
|
self.blobs_by_root_requests.insert(
|
||||||
id,
|
id,
|
||||||
peer_id,
|
peer_id,
|
||||||
@@ -655,7 +660,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
BlobsByRootRequestItems::new(request),
|
BlobsByRootRequestItems::new(request),
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(LookupRequestResult::RequestSent(req_id))
|
Ok(LookupRequestResult::RequestSent(id.req_id))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Request to send a single `data_columns_by_root` request to the network.
|
/// Request to send a single `data_columns_by_root` request to the network.
|
||||||
@@ -666,35 +671,35 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
request: DataColumnsByRootSingleBlockRequest,
|
request: DataColumnsByRootSingleBlockRequest,
|
||||||
expect_max_responses: bool,
|
expect_max_responses: bool,
|
||||||
) -> Result<LookupRequestResult<DataColumnsByRootRequestId>, &'static str> {
|
) -> Result<LookupRequestResult<DataColumnsByRootRequestId>, &'static str> {
|
||||||
let req_id = DataColumnsByRootRequestId {
|
let id = DataColumnsByRootRequestId {
|
||||||
id: self.next_id(),
|
id: self.next_id(),
|
||||||
requester,
|
requester,
|
||||||
};
|
};
|
||||||
debug!(
|
|
||||||
self.log,
|
|
||||||
"Sending DataColumnsByRoot Request";
|
|
||||||
"method" => "DataColumnsByRoot",
|
|
||||||
"block_root" => ?request.block_root,
|
|
||||||
"indices" => ?request.indices,
|
|
||||||
"peer" => %peer_id,
|
|
||||||
"requester" => ?requester,
|
|
||||||
"req_id" => %req_id,
|
|
||||||
);
|
|
||||||
|
|
||||||
self.send_network_msg(NetworkMessage::SendRequest {
|
self.send_network_msg(NetworkMessage::SendRequest {
|
||||||
peer_id,
|
peer_id,
|
||||||
request: RequestType::DataColumnsByRoot(request.clone().into_request(&self.chain.spec)),
|
request: RequestType::DataColumnsByRoot(request.clone().into_request(&self.chain.spec)),
|
||||||
request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(req_id)),
|
request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(id)),
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Sync RPC request sent";
|
||||||
|
"method" => "DataColumnsByRoot",
|
||||||
|
"block_root" => ?request.block_root,
|
||||||
|
"indices" => ?request.indices,
|
||||||
|
"peer" => %peer_id,
|
||||||
|
"id" => %id,
|
||||||
|
);
|
||||||
|
|
||||||
self.data_columns_by_root_requests.insert(
|
self.data_columns_by_root_requests.insert(
|
||||||
req_id,
|
id,
|
||||||
peer_id,
|
peer_id,
|
||||||
expect_max_responses,
|
expect_max_responses,
|
||||||
DataColumnsByRootRequestItems::new(request),
|
DataColumnsByRootRequestItems::new(request),
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(LookupRequestResult::RequestSent(req_id))
|
Ok(LookupRequestResult::RequestSent(id))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Request to fetch all needed custody columns of a specific block. This function may not send
|
/// Request to fetch all needed custody columns of a specific block. This function may not send
|
||||||
@@ -727,15 +732,17 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
return Ok(LookupRequestResult::NoRequestNeeded("no indices to fetch"));
|
return Ok(LookupRequestResult::NoRequestNeeded("no indices to fetch"));
|
||||||
}
|
}
|
||||||
|
|
||||||
let req_id = self.next_id();
|
let id = SingleLookupReqId {
|
||||||
let id = SingleLookupReqId { lookup_id, req_id };
|
lookup_id,
|
||||||
|
req_id: self.next_id(),
|
||||||
|
};
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
self.log,
|
self.log,
|
||||||
"Starting custody columns request";
|
"Starting custody columns request";
|
||||||
"block_root" => ?block_root,
|
"block_root" => ?block_root,
|
||||||
"indices" => ?custody_indexes_to_fetch,
|
"indices" => ?custody_indexes_to_fetch,
|
||||||
"id" => ?id
|
"id" => %id
|
||||||
);
|
);
|
||||||
|
|
||||||
let requester = CustodyRequester(id);
|
let requester = CustodyRequester(id);
|
||||||
@@ -754,7 +761,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
// created cannot return data immediately, it must send some request to the network
|
// created cannot return data immediately, it must send some request to the network
|
||||||
// first. And there must exist some request, `custody_indexes_to_fetch` is not empty.
|
// first. And there must exist some request, `custody_indexes_to_fetch` is not empty.
|
||||||
self.custody_by_root_requests.insert(requester, request);
|
self.custody_by_root_requests.insert(requester, request);
|
||||||
Ok(LookupRequestResult::RequestSent(req_id))
|
Ok(LookupRequestResult::RequestSent(id.req_id))
|
||||||
}
|
}
|
||||||
Err(e) => Err(RpcRequestSendError::CustodyRequestError(e)),
|
Err(e) => Err(RpcRequestSendError::CustodyRequestError(e)),
|
||||||
}
|
}
|
||||||
@@ -770,15 +777,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
id: self.next_id(),
|
id: self.next_id(),
|
||||||
parent_request_id,
|
parent_request_id,
|
||||||
};
|
};
|
||||||
debug!(
|
|
||||||
self.log,
|
|
||||||
"Sending BlocksByRange request";
|
|
||||||
"method" => "BlocksByRange",
|
|
||||||
"count" => request.count(),
|
|
||||||
"epoch" => Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()),
|
|
||||||
"peer" => %peer_id,
|
|
||||||
"id" => ?id,
|
|
||||||
);
|
|
||||||
self.network_send
|
self.network_send
|
||||||
.send(NetworkMessage::SendRequest {
|
.send(NetworkMessage::SendRequest {
|
||||||
peer_id,
|
peer_id,
|
||||||
@@ -787,6 +785,16 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
})
|
})
|
||||||
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
|
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Sync RPC request sent";
|
||||||
|
"method" => "BlocksByRange",
|
||||||
|
"slots" => request.count(),
|
||||||
|
"epoch" => Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()),
|
||||||
|
"peer" => %peer_id,
|
||||||
|
"id" => %id,
|
||||||
|
);
|
||||||
|
|
||||||
self.blocks_by_range_requests.insert(
|
self.blocks_by_range_requests.insert(
|
||||||
id,
|
id,
|
||||||
peer_id,
|
peer_id,
|
||||||
@@ -809,15 +817,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
parent_request_id,
|
parent_request_id,
|
||||||
};
|
};
|
||||||
let request_epoch = Slot::new(request.start_slot).epoch(T::EthSpec::slots_per_epoch());
|
let request_epoch = Slot::new(request.start_slot).epoch(T::EthSpec::slots_per_epoch());
|
||||||
debug!(
|
|
||||||
self.log,
|
|
||||||
"Sending BlobsByRange requests";
|
|
||||||
"method" => "BlobsByRange",
|
|
||||||
"count" => request.count,
|
|
||||||
"epoch" => request_epoch,
|
|
||||||
"peer" => %peer_id,
|
|
||||||
"id" => ?id,
|
|
||||||
);
|
|
||||||
|
|
||||||
// Create the blob request based on the blocks request.
|
// Create the blob request based on the blocks request.
|
||||||
self.network_send
|
self.network_send
|
||||||
@@ -828,6 +827,16 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
})
|
})
|
||||||
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
|
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Sync RPC request sent";
|
||||||
|
"method" => "BlobsByRange",
|
||||||
|
"slots" => request.count,
|
||||||
|
"epoch" => request_epoch,
|
||||||
|
"peer" => %peer_id,
|
||||||
|
"id" => %id,
|
||||||
|
);
|
||||||
|
|
||||||
let max_blobs_per_block = self.chain.spec.max_blobs_per_block(request_epoch);
|
let max_blobs_per_block = self.chain.spec.max_blobs_per_block(request_epoch);
|
||||||
self.blobs_by_range_requests.insert(
|
self.blobs_by_range_requests.insert(
|
||||||
id,
|
id,
|
||||||
@@ -850,16 +859,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
id: self.next_id(),
|
id: self.next_id(),
|
||||||
parent_request_id,
|
parent_request_id,
|
||||||
};
|
};
|
||||||
debug!(
|
|
||||||
self.log,
|
|
||||||
"Sending DataColumnsByRange requests";
|
|
||||||
"method" => "DataColumnsByRange",
|
|
||||||
"count" => request.count,
|
|
||||||
"epoch" => Slot::new(request.start_slot).epoch(T::EthSpec::slots_per_epoch()),
|
|
||||||
"columns" => ?request.columns,
|
|
||||||
"peer" => %peer_id,
|
|
||||||
"id" => ?id,
|
|
||||||
);
|
|
||||||
|
|
||||||
self.send_network_msg(NetworkMessage::SendRequest {
|
self.send_network_msg(NetworkMessage::SendRequest {
|
||||||
peer_id,
|
peer_id,
|
||||||
@@ -868,6 +867,17 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
})
|
})
|
||||||
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
|
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Sync RPC request sent";
|
||||||
|
"method" => "DataColumnsByRange",
|
||||||
|
"slots" => request.count,
|
||||||
|
"epoch" => Slot::new(request.start_slot).epoch(T::EthSpec::slots_per_epoch()),
|
||||||
|
"columns" => ?request.columns,
|
||||||
|
"peer" => %peer_id,
|
||||||
|
"id" => %id,
|
||||||
|
);
|
||||||
|
|
||||||
self.data_columns_by_range_requests.insert(
|
self.data_columns_by_range_requests.insert(
|
||||||
id,
|
id,
|
||||||
peer_id,
|
peer_id,
|
||||||
@@ -1011,8 +1021,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
rpc_event: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
rpc_event: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
||||||
) -> Option<RpcResponseResult<Arc<SignedBeaconBlock<T::EthSpec>>>> {
|
) -> Option<RpcResponseResult<Arc<SignedBeaconBlock<T::EthSpec>>>> {
|
||||||
let response = self.blocks_by_root_requests.on_response(id, rpc_event);
|
let resp = self.blocks_by_root_requests.on_response(id, rpc_event);
|
||||||
let response = response.map(|res| {
|
let resp = resp.map(|res| {
|
||||||
res.and_then(|(mut blocks, seen_timestamp)| {
|
res.and_then(|(mut blocks, seen_timestamp)| {
|
||||||
// Enforce that exactly one chunk = one block is returned. ReqResp behavior limits the
|
// Enforce that exactly one chunk = one block is returned. ReqResp behavior limits the
|
||||||
// response count to at most 1.
|
// response count to at most 1.
|
||||||
@@ -1024,10 +1034,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
if let Some(Err(RpcResponseError::VerifyError(e))) = &response {
|
self.on_rpc_response_result(id, "BlocksByRoot", resp, peer_id, |_| 1)
|
||||||
self.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
|
|
||||||
}
|
|
||||||
response
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn on_single_blob_response(
|
pub(crate) fn on_single_blob_response(
|
||||||
@@ -1036,8 +1043,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
rpc_event: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
|
rpc_event: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
|
||||||
) -> Option<RpcResponseResult<FixedBlobSidecarList<T::EthSpec>>> {
|
) -> Option<RpcResponseResult<FixedBlobSidecarList<T::EthSpec>>> {
|
||||||
let response = self.blobs_by_root_requests.on_response(id, rpc_event);
|
let resp = self.blobs_by_root_requests.on_response(id, rpc_event);
|
||||||
let response = response.map(|res| {
|
let resp = resp.map(|res| {
|
||||||
res.and_then(|(blobs, seen_timestamp)| {
|
res.and_then(|(blobs, seen_timestamp)| {
|
||||||
if let Some(max_len) = blobs
|
if let Some(max_len) = blobs
|
||||||
.first()
|
.first()
|
||||||
@@ -1056,10 +1063,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
if let Some(Err(RpcResponseError::VerifyError(e))) = &response {
|
self.on_rpc_response_result(id, "BlobsByRoot", resp, peer_id, |_| 1)
|
||||||
self.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
|
|
||||||
}
|
|
||||||
response
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::type_complexity)]
|
#[allow(clippy::type_complexity)]
|
||||||
@@ -1072,7 +1076,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
let resp = self
|
let resp = self
|
||||||
.data_columns_by_root_requests
|
.data_columns_by_root_requests
|
||||||
.on_response(id, rpc_event);
|
.on_response(id, rpc_event);
|
||||||
self.report_rpc_response_errors(resp, peer_id)
|
self.on_rpc_response_result(id, "DataColumnsByRoot", resp, peer_id, |_| 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::type_complexity)]
|
#[allow(clippy::type_complexity)]
|
||||||
@@ -1083,7 +1087,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
rpc_event: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
rpc_event: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
|
||||||
) -> Option<RpcResponseResult<Vec<Arc<SignedBeaconBlock<T::EthSpec>>>>> {
|
) -> Option<RpcResponseResult<Vec<Arc<SignedBeaconBlock<T::EthSpec>>>>> {
|
||||||
let resp = self.blocks_by_range_requests.on_response(id, rpc_event);
|
let resp = self.blocks_by_range_requests.on_response(id, rpc_event);
|
||||||
self.report_rpc_response_errors(resp, peer_id)
|
self.on_rpc_response_result(id, "BlocksByRange", resp, peer_id, |b| b.len())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::type_complexity)]
|
#[allow(clippy::type_complexity)]
|
||||||
@@ -1094,7 +1098,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
rpc_event: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
|
rpc_event: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
|
||||||
) -> Option<RpcResponseResult<Vec<Arc<BlobSidecar<T::EthSpec>>>>> {
|
) -> Option<RpcResponseResult<Vec<Arc<BlobSidecar<T::EthSpec>>>>> {
|
||||||
let resp = self.blobs_by_range_requests.on_response(id, rpc_event);
|
let resp = self.blobs_by_range_requests.on_response(id, rpc_event);
|
||||||
self.report_rpc_response_errors(resp, peer_id)
|
self.on_rpc_response_result(id, "BlobsByRangeRequest", resp, peer_id, |b| b.len())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::type_complexity)]
|
#[allow(clippy::type_complexity)]
|
||||||
@@ -1107,14 +1111,38 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
let resp = self
|
let resp = self
|
||||||
.data_columns_by_range_requests
|
.data_columns_by_range_requests
|
||||||
.on_response(id, rpc_event);
|
.on_response(id, rpc_event);
|
||||||
self.report_rpc_response_errors(resp, peer_id)
|
self.on_rpc_response_result(id, "DataColumnsByRange", resp, peer_id, |d| d.len())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn report_rpc_response_errors<R>(
|
fn on_rpc_response_result<I: std::fmt::Display, R, F: FnOnce(&R) -> usize>(
|
||||||
&mut self,
|
&mut self,
|
||||||
|
id: I,
|
||||||
|
method: &'static str,
|
||||||
resp: Option<RpcResponseResult<R>>,
|
resp: Option<RpcResponseResult<R>>,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
|
get_count: F,
|
||||||
) -> Option<RpcResponseResult<R>> {
|
) -> Option<RpcResponseResult<R>> {
|
||||||
|
match &resp {
|
||||||
|
None => {}
|
||||||
|
Some(Ok((v, _))) => {
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Sync RPC request completed";
|
||||||
|
"id" => %id,
|
||||||
|
"method" => method,
|
||||||
|
"count" => get_count(v)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Some(Err(e)) => {
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Sync RPC request error";
|
||||||
|
"id" => %id,
|
||||||
|
"method" => method,
|
||||||
|
"error" => ?e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
if let Some(Err(RpcResponseError::VerifyError(e))) = &resp {
|
if let Some(Err(RpcResponseError::VerifyError(e))) = &resp {
|
||||||
self.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
|
self.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user