Merge branch 'unstable' of https://github.com/sigp/lighthouse into gloas-payload-processing

This commit is contained in:
Eitan Seri- Levi
2026-02-11 23:35:28 -08:00
36 changed files with 509 additions and 120 deletions

View File

@@ -507,6 +507,30 @@ pub static SYNC_UNKNOWN_NETWORK_REQUESTS: LazyLock<Result<IntCounterVec>> = Lazy
&["type"],
)
});
pub static SYNC_RPC_REQUEST_SUCCESSES: LazyLock<Result<IntCounterVec>> = LazyLock::new(|| {
try_create_int_counter_vec(
"sync_rpc_requests_success_total",
"Total count of sync RPC requests successes",
&["protocol"],
)
});
pub static SYNC_RPC_REQUEST_ERRORS: LazyLock<Result<IntCounterVec>> = LazyLock::new(|| {
try_create_int_counter_vec(
"sync_rpc_requests_error_total",
"Total count of sync RPC requests errors",
&["protocol", "error"],
)
});
pub static SYNC_RPC_REQUEST_TIME: LazyLock<Result<HistogramVec>> = LazyLock::new(|| {
try_create_histogram_vec_with_buckets(
"sync_rpc_request_duration_sec",
"Time to complete a successful sync RPC requesst",
Ok(vec![
0.001, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 1.0, 2.0,
]),
&["protocol"],
)
});
/*
* Block Delay Metrics

View File

@@ -1430,7 +1430,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
})
});
self.on_rpc_response_result(id, "BlocksByRoot", resp, peer_id, |_| 1)
self.on_rpc_response_result(resp, peer_id)
}
pub(crate) fn on_single_blob_response(
@@ -1459,7 +1459,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
})
});
self.on_rpc_response_result(id, "BlobsByRoot", resp, peer_id, |_| 1)
self.on_rpc_response_result(resp, peer_id)
}
#[allow(clippy::type_complexity)]
@@ -1472,7 +1472,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let resp = self
.data_columns_by_root_requests
.on_response(id, rpc_event);
self.on_rpc_response_result(id, "DataColumnsByRoot", resp, peer_id, |_| 1)
self.on_rpc_response_result(resp, peer_id)
}
#[allow(clippy::type_complexity)]
@@ -1483,7 +1483,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
rpc_event: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
) -> Option<RpcResponseResult<Vec<Arc<SignedBeaconBlock<T::EthSpec>>>>> {
let resp = self.blocks_by_range_requests.on_response(id, rpc_event);
self.on_rpc_response_result(id, "BlocksByRange", resp, peer_id, |b| b.len())
self.on_rpc_response_result(resp, peer_id)
}
#[allow(clippy::type_complexity)]
@@ -1494,7 +1494,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
rpc_event: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
) -> Option<RpcResponseResult<Vec<Arc<BlobSidecar<T::EthSpec>>>>> {
let resp = self.blobs_by_range_requests.on_response(id, rpc_event);
self.on_rpc_response_result(id, "BlobsByRangeRequest", resp, peer_id, |b| b.len())
self.on_rpc_response_result(resp, peer_id)
}
#[allow(clippy::type_complexity)]
@@ -1507,36 +1507,15 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let resp = self
.data_columns_by_range_requests
.on_response(id, rpc_event);
self.on_rpc_response_result(id, "DataColumnsByRange", resp, peer_id, |d| d.len())
self.on_rpc_response_result(resp, peer_id)
}
fn on_rpc_response_result<I: std::fmt::Display, R, F: FnOnce(&R) -> usize>(
/// Common handler for consistent scoring of RpcResponseError
fn on_rpc_response_result<R>(
&mut self,
id: I,
method: &'static str,
resp: Option<RpcResponseResult<R>>,
peer_id: PeerId,
get_count: F,
) -> Option<RpcResponseResult<R>> {
match &resp {
None => {}
Some(Ok((v, _))) => {
debug!(
%id,
method,
count = get_count(v),
"Sync RPC request completed"
);
}
Some(Err(e)) => {
debug!(
%id,
method,
error = ?e,
"Sync RPC request error"
);
}
}
if let Some(Err(RpcResponseError::VerifyError(e))) = &resp {
self.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
}

View File

@@ -1,10 +1,11 @@
use std::time::Instant;
use std::{collections::hash_map::Entry, hash::Hash};
use beacon_chain::validator_monitor::timestamp_now;
use fnv::FnvHashMap;
use lighthouse_network::PeerId;
use strum::IntoStaticStr;
use tracing::Span;
use tracing::{Span, debug};
use types::{Hash256, Slot};
pub use blobs_by_range::BlobsByRangeRequestItems;
@@ -18,7 +19,7 @@ pub use data_columns_by_root::{
use crate::metrics;
use super::{RpcEvent, RpcResponseResult};
use super::{RpcEvent, RpcResponseError, RpcResponseResult};
mod blobs_by_range;
mod blobs_by_root;
@@ -51,6 +52,7 @@ struct ActiveRequest<T: ActiveRequestItems> {
peer_id: PeerId,
// Error if the request terminates before receiving max expected responses
expect_max_responses: bool,
start_instant: Instant,
span: Span,
}
@@ -60,7 +62,7 @@ enum State<T> {
Errored,
}
impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
impl<K: Copy + Eq + Hash + std::fmt::Display, T: ActiveRequestItems> ActiveRequests<K, T> {
pub fn new(name: &'static str) -> Self {
Self {
requests: <_>::default(),
@@ -83,6 +85,7 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
state: State::Active(items),
peer_id,
expect_max_responses,
start_instant: Instant::now(),
span,
},
);
@@ -112,7 +115,7 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
return None;
};
match rpc_event {
let result = match rpc_event {
// Handler of a success ReqResp chunk. Adds the item to the request accumulator.
// `ActiveRequestItems` validates the item before appending to its internal state.
RpcEvent::Response(item, seen_timestamp) => {
@@ -126,7 +129,7 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
Ok(true) => {
let items = items.consume();
request.state = State::CompletedEarly;
Some(Ok((items, seen_timestamp)))
Some(Ok((items, seen_timestamp, request.start_instant.elapsed())))
}
// Received item, but we are still expecting more
Ok(false) => None,
@@ -163,7 +166,11 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
}
.into()))
} else {
Some(Ok((items.consume(), timestamp_now())))
Some(Ok((
items.consume(),
timestamp_now(),
request.start_instant.elapsed(),
)))
}
}
// Items already returned, ignore stream termination
@@ -188,7 +195,41 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
State::Errored => None,
}
}
}
};
result.map(|result| match result {
Ok((items, seen_timestamp, duration)) => {
metrics::inc_counter_vec(&metrics::SYNC_RPC_REQUEST_SUCCESSES, &[self.name]);
metrics::observe_timer_vec(&metrics::SYNC_RPC_REQUEST_TIME, &[self.name], duration);
debug!(
%id,
method = self.name,
count = items.len(),
"Sync RPC request completed"
);
Ok((items, seen_timestamp))
}
Err(e) => {
let err_str: &'static str = match &e {
RpcResponseError::RpcError(e) => e.into(),
RpcResponseError::VerifyError(e) => e.into(),
RpcResponseError::CustodyRequestError(_) => "CustodyRequestError",
RpcResponseError::BlockComponentCouplingError(_) => {
"BlockComponentCouplingError"
}
};
metrics::inc_counter_vec(&metrics::SYNC_RPC_REQUEST_ERRORS, &[self.name, err_str]);
debug!(
%id,
method = self.name,
error = ?e,
"Sync RPC request error"
);
Err(e)
}
})
}
pub fn active_requests_of_peer(&self, peer_id: &PeerId) -> Vec<&K> {