upgrade to libp2p 0.52 (#4431)

## Issue Addressed

Upgrade libp2p to v0.52

## Proposed Changes
- **Workflows**: remove installation of `protoc`
- **Book**: remove installation of `protoc`
- **`Dockerfile`s and `cross`**: remove custom base `Dockerfile` for cross since it's no longer needed. Remove `protoc` from remaining `Dockerfiles`s
- **Upgrade `discv5` to `v0.3.1`:** we have some cool stuff in there: no longer needs `protoc` and faster ip updates on cold start
- **Upgrade `prometheus` to `0.21.0`**, now it no longer needs encoding checks
- **things that look like refactors:** bunch of api types were renamed and need to be accessed in a different (clearer) way
- **Lighthouse network**
	- connection limits is now a behaviour
	- banned peers no longer exist on the swarm level, but at the behaviour level
	- `connection_event_buffer_size` now is handled per connection with a buffer size of 4
	- `mplex` is deprecated and was removed
	- rpc handler now logs the peer to which it belongs

## Additional Info

Tried to keep as much behaviour unchanged as possible. However, there is a great deal of improvements we can do _after_ this upgrade:
- Smart connection limits: Connection limits have been checked only based on numbers, we can now use information about the incoming peer to decide if we want it
- More powerful peer management: Dial attempts from other behaviours can be rejected early
- Incoming connections can be rejected early
- Banning can be returned exclusively to the peer management: We should not get connections to banned peers anymore making use of this
- TCP Nat updates: We might be able to take advantage of confirmed external addresses to check out tcp ports/ips


Co-authored-by: Age Manning <Age@AgeManning.com>
Co-authored-by: Akihito Nakano <sora.akatsuki@gmail.com>
This commit is contained in:
Divma
2023-08-02 00:59:34 +00:00
parent 73764d0dd2
commit ff9b09d964
35 changed files with 1594 additions and 2408 deletions

View File

@@ -3,21 +3,21 @@
use super::methods::{GoodbyeReason, RPCCodedResponse, RPCResponseErrorCode, ResponseTermination};
use super::outbound::OutboundRequestContainer;
use super::protocol::{max_rpc_size, InboundRequest, Protocol, RPCError, RPCProtocol};
use super::protocol::{
max_rpc_size, InboundOutput, InboundRequest, Protocol, RPCError, RPCProtocol,
};
use super::{RPCReceived, RPCSend, ReqId};
use crate::rpc::outbound::{OutboundFramed, OutboundRequest};
use crate::rpc::protocol::InboundFramed;
use fnv::FnvHashMap;
use futures::prelude::*;
use futures::{Sink, SinkExt};
use libp2p::core::upgrade::{
InboundUpgrade, NegotiationError, OutboundUpgrade, ProtocolError, UpgradeError,
};
use libp2p::swarm::handler::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, StreamUpgradeError,
SubstreamProtocol,
};
use libp2p::swarm::NegotiatedSubstream;
use libp2p::swarm::Stream;
use slog::{crit, debug, trace, warn};
use smallvec::SmallVec;
use std::{
@@ -47,7 +47,7 @@ const MAX_INBOUND_SUBSTREAMS: usize = 32;
#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)]
pub struct SubstreamId(usize);
type InboundSubstream<TSpec> = InboundFramed<NegotiatedSubstream, TSpec>;
type InboundSubstream<TSpec> = InboundFramed<Stream, TSpec>;
/// Events the handler emits to the behaviour.
pub type HandlerEvent<Id, T> = Result<RPCReceived<Id, T>, HandlerErr<Id>>;
@@ -195,12 +195,12 @@ pub enum OutboundSubstreamState<TSpec: EthSpec> {
/// handler because GOODBYE requests can be handled and responses dropped instantly.
RequestPendingResponse {
/// The framed negotiated substream.
substream: Box<OutboundFramed<NegotiatedSubstream, TSpec>>,
substream: Box<OutboundFramed<Stream, TSpec>>,
/// Keeps track of the actual request sent.
request: OutboundRequest<TSpec>,
},
/// Closing an outbound substream>
Closing(Box<OutboundFramed<NegotiatedSubstream, TSpec>>),
Closing(Box<OutboundFramed<Stream, TSpec>>),
/// Temporary state during processing
Poisoned,
}
@@ -212,7 +212,7 @@ where
pub fn new(
listen_protocol: SubstreamProtocol<RPCProtocol<TSpec>, ()>,
fork_context: Arc<ForkContext>,
log: &slog::Logger,
log: slog::Logger,
) -> Self {
RPCHandler {
listen_protocol,
@@ -230,7 +230,7 @@ where
outbound_io_error_retries: 0,
fork_context,
waker: None,
log: log.clone(),
log,
}
}
@@ -315,8 +315,8 @@ where
TSpec: EthSpec,
Id: ReqId,
{
type InEvent = RPCSend<Id, TSpec>;
type OutEvent = HandlerEvent<Id, TSpec>;
type FromBehaviour = RPCSend<Id, TSpec>;
type ToBehaviour = HandlerEvent<Id, TSpec>;
type Error = RPCError;
type InboundProtocol = RPCProtocol<TSpec>;
type OutboundProtocol = OutboundRequestContainer<TSpec>;
@@ -327,121 +327,7 @@ where
self.listen_protocol.clone()
}
fn inject_fully_negotiated_outbound(
&mut self,
out: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
request_info: Self::OutboundOpenInfo,
) {
self.dial_negotiated -= 1;
let (id, request) = request_info;
let proto = request.versioned_protocol().protocol();
// accept outbound connections only if the handler is not deactivated
if matches!(self.state, HandlerState::Deactivated) {
self.events_out.push(Err(HandlerErr::Outbound {
error: RPCError::Disconnected,
proto,
id,
}));
}
// add the stream to substreams if we expect a response, otherwise drop the stream.
let expected_responses = request.expected_responses();
if expected_responses > 0 {
// new outbound request. Store the stream and tag the output.
let delay_key = self.outbound_substreams_delay.insert(
self.current_outbound_substream_id,
Duration::from_secs(RESPONSE_TIMEOUT),
);
let awaiting_stream = OutboundSubstreamState::RequestPendingResponse {
substream: Box::new(out),
request,
};
let expected_responses = if expected_responses > 1 {
// Currently enforced only for multiple responses
Some(expected_responses)
} else {
None
};
if self
.outbound_substreams
.insert(
self.current_outbound_substream_id,
OutboundInfo {
state: awaiting_stream,
delay_key,
proto,
remaining_chunks: expected_responses,
req_id: id,
},
)
.is_some()
{
crit!(self.log, "Duplicate outbound substream id"; "id" => self.current_outbound_substream_id);
}
self.current_outbound_substream_id.0 += 1;
}
}
fn inject_fully_negotiated_inbound(
&mut self,
substream: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
_info: Self::InboundOpenInfo,
) {
// only accept new peer requests when active
if !matches!(self.state, HandlerState::Active) {
return;
}
let (req, substream) = substream;
let expected_responses = req.expected_responses();
// store requests that expect responses
if expected_responses > 0 {
if self.inbound_substreams.len() < MAX_INBOUND_SUBSTREAMS {
// Store the stream and tag the output.
let delay_key = self.inbound_substreams_delay.insert(
self.current_inbound_substream_id,
Duration::from_secs(RESPONSE_TIMEOUT),
);
let awaiting_stream = InboundState::Idle(substream);
self.inbound_substreams.insert(
self.current_inbound_substream_id,
InboundInfo {
state: awaiting_stream,
pending_items: VecDeque::with_capacity(std::cmp::min(
expected_responses,
128,
) as usize),
delay_key: Some(delay_key),
protocol: req.versioned_protocol().protocol(),
request_start_time: Instant::now(),
remaining_chunks: expected_responses,
},
);
} else {
self.events_out.push(Err(HandlerErr::Inbound {
id: self.current_inbound_substream_id,
proto: req.versioned_protocol().protocol(),
error: RPCError::HandlerRejected,
}));
return self.shutdown(None);
}
}
// If we received a goodbye, shutdown the connection.
if let InboundRequest::Goodbye(_) = req {
self.shutdown(None);
}
self.events_out.push(Ok(RPCReceived::Request(
self.current_inbound_substream_id,
req,
)));
self.current_inbound_substream_id.0 += 1;
}
fn inject_event(&mut self, rpc_event: Self::InEvent) {
fn on_behaviour_event(&mut self, rpc_event: Self::FromBehaviour) {
match rpc_event {
RPCSend::Request(id, req) => self.send_request(id, req),
RPCSend::Response(inbound_id, response) => self.send_response(inbound_id, response),
@@ -453,56 +339,6 @@ where
}
}
fn inject_dial_upgrade_error(
&mut self,
request_info: Self::OutboundOpenInfo,
error: ConnectionHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error,
>,
) {
let (id, req) = request_info;
if let ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(RPCError::IoError(_))) = error
{
self.outbound_io_error_retries += 1;
if self.outbound_io_error_retries < IO_ERROR_RETRIES {
self.send_request(id, req);
return;
}
}
// This dialing is now considered failed
self.dial_negotiated -= 1;
self.outbound_io_error_retries = 0;
// map the error
let error = match error {
ConnectionHandlerUpgrErr::Timer => RPCError::InternalError("Timer failed"),
ConnectionHandlerUpgrErr::Timeout => RPCError::NegotiationTimeout,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => e,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
RPCError::UnsupportedProtocol
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::ProtocolError(e),
)) => match e {
ProtocolError::IoError(io_err) => RPCError::IoError(io_err.to_string()),
ProtocolError::InvalidProtocol => {
RPCError::InternalError("Protocol was deemed invalid")
}
ProtocolError::InvalidMessage | ProtocolError::TooManyProtocols => {
// Peer is sending invalid data during the negotiation phase, not
// participating in the protocol
RPCError::InvalidData("Invalid message during negotiation".to_string())
}
},
};
self.events_out.push(Err(HandlerErr::Outbound {
error,
proto: req.versioned_protocol().protocol(),
id,
}));
}
fn connection_keep_alive(&self) -> KeepAlive {
// Check that we don't have outbound items pending for dialing, nor dialing, nor
// established. Also check that there are no established inbound substreams.
@@ -535,7 +371,7 @@ where
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Self::ToBehaviour,
Self::Error,
>,
> {
@@ -548,7 +384,9 @@ where
}
// return any events that need to be reported
if !self.events_out.is_empty() {
return Poll::Ready(ConnectionHandlerEvent::Custom(self.events_out.remove(0)));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
self.events_out.remove(0),
));
} else {
self.events_out.shrink_to_fit();
}
@@ -612,7 +450,9 @@ where
error: RPCError::StreamTimeout,
};
// notify the user
return Poll::Ready(ConnectionHandlerEvent::Custom(Err(outbound_err)));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(
outbound_err,
)));
} else {
crit!(self.log, "timed out substream not in the books"; "stream_id" => outbound_id.get_ref());
}
@@ -872,7 +712,7 @@ where
}),
};
return Poll::Ready(ConnectionHandlerEvent::Custom(received));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(received));
}
Poll::Ready(None) => {
// stream closed
@@ -887,7 +727,7 @@ where
// notify the application error
if request.expected_responses() > 1 {
// return an end of stream result
return Poll::Ready(ConnectionHandlerEvent::Custom(Ok(
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok(
RPCReceived::EndOfStream(request_id, request.stream_termination()),
)));
}
@@ -898,7 +738,9 @@ where
proto: request.versioned_protocol().protocol(),
error: RPCError::IncompleteStream,
};
return Poll::Ready(ConnectionHandlerEvent::Custom(Err(outbound_err)));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(
outbound_err,
)));
}
Poll::Pending => {
entry.get_mut().state =
@@ -914,7 +756,9 @@ where
error: e,
};
entry.remove_entry();
return Poll::Ready(ConnectionHandlerEvent::Custom(Err(outbound_err)));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Err(
outbound_err,
)));
}
},
OutboundSubstreamState::Closing(mut substream) => {
@@ -940,7 +784,7 @@ where
};
if let Some(termination) = termination {
return Poll::Ready(ConnectionHandlerEvent::Custom(Ok(
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok(
RPCReceived::EndOfStream(request_id, termination),
)));
}
@@ -989,6 +833,207 @@ where
Poll::Pending
}
fn on_connection_event(
&mut self,
event: ConnectionEvent<
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) {
match event {
ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
protocol,
info: _,
}) => self.on_fully_negotiated_inbound(protocol),
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
protocol,
info,
}) => self.on_fully_negotiated_outbound(protocol, info),
ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error }) => {
self.on_dial_upgrade_error(info, error)
}
ConnectionEvent::ListenUpgradeError(libp2p::swarm::handler::ListenUpgradeError {
info: _,
error: _, /* RPCError */
}) => {
// This is going to be removed in the next libp2p release. I think its fine to do
// nothing.
}
ConnectionEvent::LocalProtocolsChange(_) => {
// This shouldn't effect this handler, we will still negotiate streams if we support
// the protocol as usual.
}
ConnectionEvent::RemoteProtocolsChange(_) => {
// This shouldn't effect this handler, we will still negotiate streams if we support
// the protocol as usual.
}
ConnectionEvent::AddressChange(_) => {
// We dont care about these changes as they have no bearing on our RPC internal
// logic.
}
}
}
}
impl<Id, TSpec: EthSpec> RPCHandler<Id, TSpec>
where
Id: ReqId,
TSpec: EthSpec,
{
fn on_fully_negotiated_inbound(&mut self, substream: InboundOutput<Stream, TSpec>) {
// only accept new peer requests when active
if !matches!(self.state, HandlerState::Active) {
return;
}
let (req, substream) = substream;
let expected_responses = req.expected_responses();
// store requests that expect responses
if expected_responses > 0 {
if self.inbound_substreams.len() < MAX_INBOUND_SUBSTREAMS {
// Store the stream and tag the output.
let delay_key = self.inbound_substreams_delay.insert(
self.current_inbound_substream_id,
Duration::from_secs(RESPONSE_TIMEOUT),
);
let awaiting_stream = InboundState::Idle(substream);
self.inbound_substreams.insert(
self.current_inbound_substream_id,
InboundInfo {
state: awaiting_stream,
pending_items: VecDeque::with_capacity(std::cmp::min(
expected_responses,
128,
) as usize),
delay_key: Some(delay_key),
protocol: req.versioned_protocol().protocol(),
request_start_time: Instant::now(),
remaining_chunks: expected_responses,
},
);
} else {
self.events_out.push(Err(HandlerErr::Inbound {
id: self.current_inbound_substream_id,
proto: req.versioned_protocol().protocol(),
error: RPCError::HandlerRejected,
}));
return self.shutdown(None);
}
}
// If we received a goodbye, shutdown the connection.
if let InboundRequest::Goodbye(_) = req {
self.shutdown(None);
}
self.events_out.push(Ok(RPCReceived::Request(
self.current_inbound_substream_id,
req,
)));
self.current_inbound_substream_id.0 += 1;
}
fn on_fully_negotiated_outbound(
&mut self,
substream: OutboundFramed<Stream, TSpec>,
(id, request): (Id, OutboundRequest<TSpec>),
) {
self.dial_negotiated -= 1;
// Reset any io-retries counter.
self.outbound_io_error_retries = 0;
let proto = request.versioned_protocol().protocol();
// accept outbound connections only if the handler is not deactivated
if matches!(self.state, HandlerState::Deactivated) {
self.events_out.push(Err(HandlerErr::Outbound {
error: RPCError::Disconnected,
proto,
id,
}));
}
// add the stream to substreams if we expect a response, otherwise drop the stream.
let expected_responses = request.expected_responses();
if expected_responses > 0 {
// new outbound request. Store the stream and tag the output.
let delay_key = self.outbound_substreams_delay.insert(
self.current_outbound_substream_id,
Duration::from_secs(RESPONSE_TIMEOUT),
);
let awaiting_stream = OutboundSubstreamState::RequestPendingResponse {
substream: Box::new(substream),
request,
};
let expected_responses = if expected_responses > 1 {
// Currently enforced only for multiple responses
Some(expected_responses)
} else {
None
};
if self
.outbound_substreams
.insert(
self.current_outbound_substream_id,
OutboundInfo {
state: awaiting_stream,
delay_key,
proto,
remaining_chunks: expected_responses,
req_id: id,
},
)
.is_some()
{
crit!(self.log, "Duplicate outbound substream id"; "id" => self.current_outbound_substream_id);
}
self.current_outbound_substream_id.0 += 1;
}
}
fn on_dial_upgrade_error(
&mut self,
request_info: (Id, OutboundRequest<TSpec>),
error: StreamUpgradeError<RPCError>,
) {
let (id, req) = request_info;
// map the error
let error = match error {
StreamUpgradeError::Timeout => RPCError::NegotiationTimeout,
StreamUpgradeError::Apply(RPCError::IoError(e)) => {
self.outbound_io_error_retries += 1;
if self.outbound_io_error_retries < IO_ERROR_RETRIES {
self.send_request(id, req);
return;
}
RPCError::IoError(e)
}
StreamUpgradeError::NegotiationFailed => RPCError::UnsupportedProtocol,
StreamUpgradeError::Io(io_err) => {
self.outbound_io_error_retries += 1;
if self.outbound_io_error_retries < IO_ERROR_RETRIES {
self.send_request(id, req);
return;
}
RPCError::IoError(io_err.to_string())
}
StreamUpgradeError::Apply(other) => other,
};
// This dialing is now considered failed
self.dial_negotiated -= 1;
self.outbound_io_error_retries = 0;
self.events_out.push(Err(HandlerErr::Outbound {
error,
proto: req.versioned_protocol().protocol(),
id,
}));
}
}
impl slog::Value for SubstreamId {

View File

@@ -6,11 +6,11 @@
use futures::future::FutureExt;
use handler::{HandlerEvent, RPCHandler};
use libp2p::core::connection::ConnectionId;
use libp2p::swarm::{
handler::ConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler,
PollParameters, SubstreamProtocol,
handler::ConnectionHandler, ConnectionId, NetworkBehaviour, NotifyHandler, PollParameters,
ToSwarm,
};
use libp2p::swarm::{FromSwarm, SubstreamProtocol, THandlerInEvent};
use libp2p::PeerId;
use rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr};
use slog::{crit, debug, o};
@@ -21,7 +21,7 @@ use types::{EthSpec, ForkContext};
pub(crate) use handler::HandlerErr;
pub(crate) use methods::{MetaData, MetaDataV1, MetaDataV2, Ping, RPCCodedResponse, RPCResponse};
pub(crate) use protocol::{InboundRequest, RPCProtocol};
pub(crate) use protocol::InboundRequest;
pub use handler::SubstreamId;
pub use methods::{
@@ -32,6 +32,7 @@ pub(crate) use outbound::OutboundRequest;
pub use protocol::{max_rpc_size, Protocol, RPCError};
use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig};
use self::protocol::RPCProtocol;
use self::self_limiter::SelfRateLimiter;
pub(crate) mod codec;
@@ -104,8 +105,7 @@ pub struct RPCMessage<Id, TSpec: EthSpec> {
pub event: HandlerEvent<Id, TSpec>,
}
type BehaviourAction<Id, TSpec> =
NetworkBehaviourAction<RPCMessage<Id, TSpec>, RPCHandler<Id, TSpec>>;
type BehaviourAction<Id, TSpec> = ToSwarm<RPCMessage<Id, TSpec>, RPCSend<Id, TSpec>>;
/// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level
/// logic.
@@ -161,7 +161,7 @@ impl<Id: ReqId, TSpec: EthSpec> RPC<Id, TSpec> {
id: (ConnectionId, SubstreamId),
event: RPCCodedResponse<TSpec>,
) {
self.events.push(NetworkBehaviourAction::NotifyHandler {
self.events.push(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(id.0),
event: RPCSend::Response(id.1, event),
@@ -181,7 +181,7 @@ impl<Id: ReqId, TSpec: EthSpec> RPC<Id, TSpec> {
}
}
} else {
NetworkBehaviourAction::NotifyHandler {
ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event: RPCSend::Request(request_id, req),
@@ -194,7 +194,7 @@ impl<Id: ReqId, TSpec: EthSpec> RPC<Id, TSpec> {
/// Lighthouse wishes to disconnect from this peer by sending a Goodbye message. This
/// gracefully terminates the RPC behaviour with a goodbye message.
pub fn shutdown(&mut self, peer_id: PeerId, id: Id, reason: GoodbyeReason) {
self.events.push(NetworkBehaviourAction::NotifyHandler {
self.events.push(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event: RPCSend::Shutdown(id, reason),
@@ -208,29 +208,83 @@ where
Id: ReqId,
{
type ConnectionHandler = RPCHandler<Id, TSpec>;
type OutEvent = RPCMessage<Id, TSpec>;
type ToSwarm = RPCMessage<Id, TSpec>;
fn new_handler(&mut self) -> Self::ConnectionHandler {
RPCHandler::new(
SubstreamProtocol::new(
RPCProtocol {
fork_context: self.fork_context.clone(),
max_rpc_size: max_rpc_size(&self.fork_context),
enable_light_client_server: self.enable_light_client_server,
phantom: PhantomData,
},
(),
),
self.fork_context.clone(),
&self.log,
)
fn handle_established_inbound_connection(
&mut self,
_connection_id: ConnectionId,
peer_id: PeerId,
_local_addr: &libp2p::Multiaddr,
_remote_addr: &libp2p::Multiaddr,
) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
let protocol = SubstreamProtocol::new(
RPCProtocol {
fork_context: self.fork_context.clone(),
max_rpc_size: max_rpc_size(&self.fork_context),
enable_light_client_server: self.enable_light_client_server,
phantom: PhantomData,
},
(),
);
// NOTE: this is needed because PeerIds have interior mutability.
let peer_repr = peer_id.to_string();
let log = self.log.new(slog::o!("peer_id" => peer_repr));
let handler = RPCHandler::new(protocol, self.fork_context.clone(), log);
Ok(handler)
}
fn inject_event(
fn handle_established_outbound_connection(
&mut self,
_connection_id: ConnectionId,
peer_id: PeerId,
_addr: &libp2p::Multiaddr,
_role_override: libp2p::core::Endpoint,
) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
let protocol = SubstreamProtocol::new(
RPCProtocol {
fork_context: self.fork_context.clone(),
max_rpc_size: max_rpc_size(&self.fork_context),
enable_light_client_server: self.enable_light_client_server,
phantom: PhantomData,
},
(),
);
// NOTE: this is needed because PeerIds have interior mutability.
let peer_repr = peer_id.to_string();
let log = self.log.new(slog::o!("peer_id" => peer_repr));
let handler = RPCHandler::new(protocol, self.fork_context.clone(), log);
Ok(handler)
}
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
match event {
FromSwarm::ConnectionClosed(_)
| FromSwarm::ConnectionEstablished(_)
| FromSwarm::AddressChange(_)
| FromSwarm::DialFailure(_)
| FromSwarm::ListenFailure(_)
| FromSwarm::NewListener(_)
| FromSwarm::NewListenAddr(_)
| FromSwarm::ExpiredListenAddr(_)
| FromSwarm::ListenerError(_)
| FromSwarm::ListenerClosed(_)
| FromSwarm::NewExternalAddrCandidate(_)
| FromSwarm::ExternalAddrExpired(_)
| FromSwarm::ExternalAddrConfirmed(_) => {
// Rpc Behaviour does not act on these swarm events. We use a comprehensive match
// statement to ensure future events are dealt with appropriately.
}
}
}
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
conn_id: ConnectionId,
event: <Self::ConnectionHandler as ConnectionHandler>::OutEvent,
event: <Self::ConnectionHandler as ConnectionHandler>::ToBehaviour,
) {
if let Ok(RPCReceived::Request(ref id, ref req)) = event {
if let Some(limiter) = self.limiter.as_mut() {
@@ -238,12 +292,11 @@ where
match limiter.allows(&peer_id, req) {
Ok(()) => {
// send the event to the user
self.events
.push(NetworkBehaviourAction::GenerateEvent(RPCMessage {
peer_id,
conn_id,
event,
}))
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
conn_id,
event,
}))
}
Err(RateLimitedErr::TooLarge) => {
// we set the batch sizes, so this is a coding/config err for most protocols
@@ -281,20 +334,18 @@ where
}
} else {
// No rate limiting, send the event to the user
self.events
.push(NetworkBehaviourAction::GenerateEvent(RPCMessage {
peer_id,
conn_id,
event,
}))
}
} else {
self.events
.push(NetworkBehaviourAction::GenerateEvent(RPCMessage {
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
conn_id,
event,
}));
}))
}
} else {
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
conn_id,
event,
}));
}
}
@@ -302,7 +353,7 @@ where
&mut self,
cx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
// let the rate limiter prune.
if let Some(limiter) = self.limiter.as_mut() {
let _ = limiter.poll_unpin(cx);

View File

@@ -7,7 +7,7 @@ use crate::rpc::{
use futures::future::BoxFuture;
use futures::prelude::{AsyncRead, AsyncWrite};
use futures::{FutureExt, StreamExt};
use libp2p::core::{InboundUpgrade, ProtocolName, UpgradeInfo};
use libp2p::core::{InboundUpgrade, UpgradeInfo};
use ssz::Encode;
use ssz_types::VariableList;
use std::io;
@@ -313,6 +313,12 @@ pub struct ProtocolId {
protocol_id: String,
}
impl AsRef<str> for ProtocolId {
fn as_ref(&self) -> &str {
self.protocol_id.as_ref()
}
}
impl ProtocolId {
/// Returns min and max size for messages of given protocol id requests.
pub fn rpc_request_limits(&self) -> RpcLimits {
@@ -407,12 +413,6 @@ impl ProtocolId {
}
}
impl ProtocolName for ProtocolId {
fn protocol_name(&self) -> &[u8] {
self.protocol_id.as_bytes()
}
}
/* Inbound upgrade */
// The inbound protocol reads the request, decodes it and returns the stream to the protocol

View File

@@ -64,7 +64,7 @@ impl<Id: ReqId, TSpec: EthSpec> SelfRateLimiter<Id, TSpec> {
}
/// Checks if the rate limiter allows the request. If it's allowed, returns the
/// [`NetworkBehaviourAction`] that should be emitted. When not allowed, the request is delayed
/// [`ToSwarm`] that should be emitted. When not allowed, the request is delayed
/// until it can be sent.
pub fn allows(
&mut self,
@@ -95,7 +95,7 @@ impl<Id: ReqId, TSpec: EthSpec> SelfRateLimiter<Id, TSpec> {
}
/// Auxiliary function to deal with self rate limiting outcomes. If the rate limiter allows the
/// request, the [`NetworkBehaviourAction`] that should be emitted is returned. If the request
/// request, the [`ToSwarm`] that should be emitted is returned. If the request
/// should be delayed, it's returned with the duration to wait.
fn try_send_request(
limiter: &mut RateLimiter,