mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-15 02:42:38 +00:00
More metrics + RPC tweaks (#2041)
## Issue Addressed
NA
## Proposed Changes
This was mostly done to find the reason why LH was dropping peers from Nimbus. It proved to be useful so I think it's worth it. But there is also some functional stuff here
- Add metrics for rpc errors per client, error type and direction
- Add metrics for downscoring events per source type, client and penalty type
- Add metrics for gossip validation results per client for non-accepted messages
- Make the RPC handler return errors and requests/responses in the order we see them
- Allow a small burst for the Ping rate limit, from 1 every 5 seconds to 2 every 10 seconds
- Send rate limiting errors with a particular code and use that same code to identify them. I picked something different to 128 since that is most likely what other clients are using for their own errors
- Remove some unused code in the `PeerAction` and the rpc handler
- Remove the unused variant `RateLimited`. tTis was never produced directly, since the only way to get the request's protocol is via de handler. The handler upon receiving from LH a response with an error (rate limited in this case) emits this event with the missing info (It was always like this, just pointing out that we do downscore rate limiting errors regardless of the change)
Metrics for Nimbus looked like this:
Downscoring events: `increase(libp2p_peer_actions_per_client{client="Nimbus"}[5m])`

RPC Errors: `increase(libp2p_rpc_errors_per_client{client="Nimbus"}[5m])`

Unaccepted gossip message: `increase(gossipsub_unaccepted_messages_per_client{client="Nimbus"}[5m])`

This commit is contained in:
@@ -49,6 +49,9 @@ type InboundProcessingOutput<TSpec> = (
|
||||
u64, /* Chunks remaining to be sent after this processing finishes */
|
||||
);
|
||||
|
||||
/// Events the handler emits to the behaviour.
|
||||
type HandlerEvent<T> = Result<RPCReceived<T>, HandlerErr>;
|
||||
|
||||
/// An error encountered by the handler.
|
||||
pub enum HandlerErr {
|
||||
/// An error occurred for this peer's request. This can occur during protocol negotiation,
|
||||
@@ -82,11 +85,8 @@ where
|
||||
/// The upgrade for inbound substreams.
|
||||
listen_protocol: SubstreamProtocol<RPCProtocol<TSpec>, ()>,
|
||||
|
||||
/// Errors occurring on outbound and inbound connections queued for reporting back.
|
||||
pending_errors: Vec<HandlerErr>,
|
||||
|
||||
/// Queue of events to produce in `poll()`.
|
||||
events_out: SmallVec<[RPCReceived<TSpec>; 4]>,
|
||||
events_out: SmallVec<[HandlerEvent<TSpec>; 4]>,
|
||||
|
||||
/// Queue of outbound substreams to open.
|
||||
dial_queue: SmallVec<[(RequestId, RPCRequest<TSpec>); 4]>,
|
||||
@@ -203,7 +203,6 @@ where
|
||||
) -> Self {
|
||||
RPCHandler {
|
||||
listen_protocol,
|
||||
pending_errors: Vec::new(),
|
||||
events_out: SmallVec::new(),
|
||||
dial_queue: SmallVec::new(),
|
||||
dial_negotiated: 0,
|
||||
@@ -220,22 +219,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a reference to the listen protocol configuration.
|
||||
///
|
||||
/// > **Note**: If you modify the protocol, modifications will only applies to future inbound
|
||||
/// > substreams, not the ones already being negotiated.
|
||||
pub fn listen_protocol_ref(&self) -> &SubstreamProtocol<RPCProtocol<TSpec>, ()> {
|
||||
&self.listen_protocol
|
||||
}
|
||||
|
||||
/// Returns a mutable reference to the listen protocol configuration.
|
||||
///
|
||||
/// > **Note**: If you modify the protocol, modifications will only apply to future inbound
|
||||
/// > substreams, not the ones already being negotiated.
|
||||
pub fn listen_protocol_mut(&mut self) -> &mut SubstreamProtocol<RPCProtocol<TSpec>, ()> {
|
||||
&mut self.listen_protocol
|
||||
}
|
||||
|
||||
/// Initiates the handler's shutdown process, sending an optional last message to the peer.
|
||||
pub fn shutdown(&mut self, final_msg: Option<(RequestId, RPCRequest<TSpec>)>) {
|
||||
if matches!(self.state, HandlerState::Active) {
|
||||
@@ -244,11 +227,11 @@ where
|
||||
}
|
||||
// we now drive to completion communications already dialed/established
|
||||
while let Some((id, req)) = self.dial_queue.pop() {
|
||||
self.pending_errors.push(HandlerErr::Outbound {
|
||||
id,
|
||||
proto: req.protocol(),
|
||||
self.events_out.push(Err(HandlerErr::Outbound {
|
||||
error: RPCError::HandlerRejected,
|
||||
})
|
||||
proto: req.protocol(),
|
||||
id,
|
||||
}));
|
||||
}
|
||||
|
||||
// Queue our final message, if any
|
||||
@@ -268,13 +251,11 @@ where
|
||||
HandlerState::Active => {
|
||||
self.dial_queue.push((id, req));
|
||||
}
|
||||
_ => {
|
||||
self.pending_errors.push(HandlerErr::Outbound {
|
||||
id,
|
||||
proto: req.protocol(),
|
||||
error: RPCError::HandlerRejected,
|
||||
});
|
||||
}
|
||||
_ => self.events_out.push(Err(HandlerErr::Outbound {
|
||||
error: RPCError::HandlerRejected,
|
||||
proto: req.protocol(),
|
||||
id,
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -296,12 +277,11 @@ where
|
||||
|
||||
// If the response we are sending is an error, report back for handling
|
||||
if let RPCCodedResponse::Error(ref code, ref reason) = response {
|
||||
let err = HandlerErr::Inbound {
|
||||
id: inbound_id,
|
||||
proto: inbound_info.protocol,
|
||||
self.events_out.push(Err(HandlerErr::Inbound {
|
||||
error: RPCError::ErrorResponse(*code, reason.to_string()),
|
||||
};
|
||||
self.pending_errors.push(err);
|
||||
proto: inbound_info.protocol,
|
||||
id: inbound_id,
|
||||
}));
|
||||
}
|
||||
|
||||
if matches!(self.state, HandlerState::Deactivated) {
|
||||
@@ -319,7 +299,7 @@ where
|
||||
TSpec: EthSpec,
|
||||
{
|
||||
type InEvent = RPCSend<TSpec>;
|
||||
type OutEvent = Result<RPCReceived<TSpec>, HandlerErr>;
|
||||
type OutEvent = HandlerEvent<TSpec>;
|
||||
type Error = RPCError;
|
||||
type InboundProtocol = RPCProtocol<TSpec>;
|
||||
type OutboundProtocol = RPCRequest<TSpec>;
|
||||
@@ -363,8 +343,10 @@ where
|
||||
);
|
||||
}
|
||||
|
||||
self.events_out
|
||||
.push(RPCReceived::Request(self.current_inbound_substream_id, req));
|
||||
self.events_out.push(Ok(RPCReceived::Request(
|
||||
self.current_inbound_substream_id,
|
||||
req,
|
||||
)));
|
||||
self.current_inbound_substream_id.0 += 1;
|
||||
}
|
||||
|
||||
@@ -379,12 +361,11 @@ where
|
||||
|
||||
// accept outbound connections only if the handler is not deactivated
|
||||
if matches!(self.state, HandlerState::Deactivated) {
|
||||
self.pending_errors.push(HandlerErr::Outbound {
|
||||
id,
|
||||
proto,
|
||||
self.events_out.push(Err(HandlerErr::Outbound {
|
||||
error: RPCError::HandlerRejected,
|
||||
});
|
||||
return;
|
||||
proto,
|
||||
id,
|
||||
}));
|
||||
}
|
||||
|
||||
// add the stream to substreams if we expect a response, otherwise drop the stream.
|
||||
@@ -474,11 +455,11 @@ where
|
||||
}
|
||||
},
|
||||
};
|
||||
self.pending_errors.push(HandlerErr::Outbound {
|
||||
id,
|
||||
proto: req.protocol(),
|
||||
self.events_out.push(Err(HandlerErr::Outbound {
|
||||
error,
|
||||
});
|
||||
proto: req.protocol(),
|
||||
id,
|
||||
}));
|
||||
}
|
||||
|
||||
fn connection_keep_alive(&self) -> KeepAlive {
|
||||
@@ -490,7 +471,6 @@ where
|
||||
self.dial_queue.is_empty()
|
||||
&& self.outbound_substreams.is_empty()
|
||||
&& self.inbound_substreams.is_empty()
|
||||
&& self.pending_errors.is_empty()
|
||||
&& self.events_out.is_empty()
|
||||
&& self.dial_negotiated == 0
|
||||
}
|
||||
@@ -518,15 +498,9 @@ where
|
||||
Self::Error,
|
||||
>,
|
||||
> {
|
||||
// report failures
|
||||
if !self.pending_errors.is_empty() {
|
||||
let err_info = self.pending_errors.remove(0);
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(err_info)));
|
||||
}
|
||||
|
||||
// return any events that need to be reported
|
||||
if !self.events_out.is_empty() {
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(Ok(self.events_out.remove(0))));
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(self.events_out.remove(0)));
|
||||
} else {
|
||||
self.events_out.shrink_to_fit();
|
||||
}
|
||||
@@ -547,11 +521,11 @@ where
|
||||
if let Some(info) = self.inbound_substreams.get_mut(inbound_id.get_ref()) {
|
||||
// the delay has been removed
|
||||
info.delay_key = None;
|
||||
self.pending_errors.push(HandlerErr::Inbound {
|
||||
id: *inbound_id.get_ref(),
|
||||
proto: info.protocol,
|
||||
self.events_out.push(Err(HandlerErr::Inbound {
|
||||
error: RPCError::StreamTimeout,
|
||||
});
|
||||
proto: info.protocol,
|
||||
id: *inbound_id.get_ref(),
|
||||
}));
|
||||
|
||||
if info.pending_items.last().map(|l| l.close_after()) == Some(false) {
|
||||
// if the last chunk does not close the stream, append an error
|
||||
@@ -636,20 +610,20 @@ where
|
||||
self.inbound_substreams_delay.remove(delay_key);
|
||||
}
|
||||
if let Err(error) = res {
|
||||
self.pending_errors.push(HandlerErr::Inbound {
|
||||
id: *id,
|
||||
self.events_out.push(Err(HandlerErr::Inbound {
|
||||
error,
|
||||
proto: info.protocol,
|
||||
});
|
||||
id: *id,
|
||||
}));
|
||||
}
|
||||
if info.pending_items.last().map(|l| l.close_after()) == Some(false)
|
||||
{
|
||||
// if the request was still active, report back to cancel it
|
||||
self.pending_errors.push(HandlerErr::Inbound {
|
||||
id: *id,
|
||||
proto: info.protocol,
|
||||
self.events_out.push(Err(HandlerErr::Inbound {
|
||||
error: RPCError::HandlerRejected,
|
||||
});
|
||||
proto: info.protocol,
|
||||
id: *id,
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -662,11 +636,11 @@ where
|
||||
info.remaining_chunks = new_remaining_chunks;
|
||||
// report any error
|
||||
for error in errors {
|
||||
self.pending_errors.push(HandlerErr::Inbound {
|
||||
id: *id,
|
||||
self.events_out.push(Err(HandlerErr::Inbound {
|
||||
error,
|
||||
proto: info.protocol,
|
||||
})
|
||||
id: *id,
|
||||
}))
|
||||
}
|
||||
if remove {
|
||||
substreams_to_remove.push(*id);
|
||||
@@ -740,11 +714,11 @@ where
|
||||
} if deactivated => {
|
||||
// the handler is deactivated. Close the stream
|
||||
entry.get_mut().state = OutboundSubstreamState::Closing(substream);
|
||||
self.pending_errors.push(HandlerErr::Outbound {
|
||||
id: entry.get().req_id,
|
||||
proto: entry.get().proto,
|
||||
self.events_out.push(Err(HandlerErr::Outbound {
|
||||
error: RPCError::HandlerRejected,
|
||||
})
|
||||
proto: entry.get().proto,
|
||||
id: entry.get().req_id,
|
||||
}))
|
||||
}
|
||||
OutboundSubstreamState::RequestPendingResponse {
|
||||
mut substream,
|
||||
|
||||
@@ -285,6 +285,7 @@ impl<T: EthSpec> RPCCodedResponse<T> {
|
||||
let code = match response_code {
|
||||
1 => RPCResponseErrorCode::InvalidRequest,
|
||||
2 => RPCResponseErrorCode::ServerError,
|
||||
139 => RPCResponseErrorCode::RateLimited,
|
||||
_ => RPCResponseErrorCode::Unknown,
|
||||
};
|
||||
RPCCodedResponse::Error(code, err)
|
||||
@@ -318,7 +319,7 @@ impl RPCResponseErrorCode {
|
||||
RPCResponseErrorCode::InvalidRequest => 1,
|
||||
RPCResponseErrorCode::ServerError => 2,
|
||||
RPCResponseErrorCode::Unknown => 255,
|
||||
RPCResponseErrorCode::RateLimited => 128,
|
||||
RPCResponseErrorCode::RateLimited => 139,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -105,7 +105,7 @@ impl<TSpec: EthSpec> RPC<TSpec> {
|
||||
let log = log.new(o!("service" => "libp2p_rpc"));
|
||||
let limiter = RPCRateLimiterBuilder::new()
|
||||
.n_every(Protocol::MetaData, 2, Duration::from_secs(5))
|
||||
.one_every(Protocol::Ping, Duration::from_secs(5))
|
||||
.n_every(Protocol::Ping, 2, Duration::from_secs(10))
|
||||
.n_every(Protocol::Status, 5, Duration::from_secs(15))
|
||||
.one_every(Protocol::Goodbye, Duration::from_secs(10))
|
||||
.n_every(
|
||||
@@ -261,7 +261,7 @@ where
|
||||
(conn_id, *id),
|
||||
RPCCodedResponse::Error(
|
||||
RPCResponseErrorCode::RateLimited,
|
||||
format!("Rate limited: wait {:?}", wait_time).into(),
|
||||
format!("Wait {:?}", wait_time).into(),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -493,8 +493,6 @@ pub enum RPCError {
|
||||
NegotiationTimeout,
|
||||
/// Handler rejected this request.
|
||||
HandlerRejected,
|
||||
/// The request exceeds the rate limit.
|
||||
RateLimited,
|
||||
}
|
||||
|
||||
impl From<ssz::DecodeError> for RPCError {
|
||||
@@ -533,7 +531,6 @@ impl std::fmt::Display for RPCError {
|
||||
RPCError::InternalError(ref err) => write!(f, "Internal error: {}", err),
|
||||
RPCError::NegotiationTimeout => write!(f, "Negotiation timeout"),
|
||||
RPCError::HandlerRejected => write!(f, "Handler rejected the request"),
|
||||
RPCError::RateLimited => write!(f, "Request exceeds the rate limit"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -552,7 +549,6 @@ impl std::error::Error for RPCError {
|
||||
RPCError::ErrorResponse(_, _) => None,
|
||||
RPCError::NegotiationTimeout => None,
|
||||
RPCError::HandlerRejected => None,
|
||||
RPCError::RateLimited => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -569,3 +565,27 @@ impl<TSpec: EthSpec> std::fmt::Display for RPCRequest<TSpec> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RPCError {
|
||||
/// Get a `str` representation of the error.
|
||||
/// Used for metrics.
|
||||
pub fn as_static_str(&self) -> &'static str {
|
||||
match self {
|
||||
RPCError::SSZDecodeError { .. } => "decode_error",
|
||||
RPCError::IoError { .. } => "io_error",
|
||||
RPCError::ErrorResponse(ref code, ..) => match code {
|
||||
RPCResponseErrorCode::RateLimited => "rate_limited",
|
||||
RPCResponseErrorCode::InvalidRequest => "invalid_request",
|
||||
RPCResponseErrorCode::ServerError => "server_error",
|
||||
RPCResponseErrorCode::Unknown => "unknown_response_code",
|
||||
},
|
||||
RPCError::StreamTimeout => "stream_timeout",
|
||||
RPCError::UnsupportedProtocol => "unsupported_protocol",
|
||||
RPCError::IncompleteStream => "incomplete_stream",
|
||||
RPCError::InvalidData => "invalid_data",
|
||||
RPCError::InternalError { .. } => "internal_error",
|
||||
RPCError::NegotiationTimeout => "negotiation_timeout",
|
||||
RPCError::HandlerRejected => "handler_rejected",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user