Merge remote-tracking branch 'origin/master' into spec-v0.12

This commit is contained in:
Michael Sproul
2020-06-26 12:08:46 +10:00
34 changed files with 196 additions and 227 deletions

View File

@@ -70,7 +70,7 @@ pub enum DelegateIn<TSpec: EthSpec> {
pub enum DelegateOut<TSpec: EthSpec> {
Gossipsub(<GossipHandler as ProtocolsHandler>::OutEvent),
RPC(<RPCHandler<TSpec> as ProtocolsHandler>::OutEvent),
Identify(<IdentifyHandler as ProtocolsHandler>::OutEvent),
Identify(Box<<IdentifyHandler as ProtocolsHandler>::OutEvent>),
}
/// Wrapper around the `ProtocolsHandler::Error` types of the handlers.
@@ -342,7 +342,9 @@ impl<TSpec: EthSpec> ProtocolsHandler for DelegatingHandler<TSpec> {
match self.identify_handler.poll(cx) {
Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => {
return Poll::Ready(ProtocolsHandlerEvent::Custom(DelegateOut::Identify(event)));
return Poll::Ready(ProtocolsHandlerEvent::Custom(DelegateOut::Identify(
Box::new(event),
)));
}
Poll::Ready(ProtocolsHandlerEvent::Close(event)) => {
return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::Identify(event)));

View File

@@ -42,7 +42,7 @@ pub enum BehaviourHandlerIn<TSpec: EthSpec> {
}
pub enum BehaviourHandlerOut<TSpec: EthSpec> {
Delegate(DelegateOut<TSpec>),
Delegate(Box<DelegateOut<TSpec>>),
// TODO: replace custom with events to send
Custom,
}
@@ -119,7 +119,7 @@ impl<TSpec: EthSpec> ProtocolsHandler for BehaviourHandler<TSpec> {
match self.delegate.poll(cx) {
Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => {
return Poll::Ready(ProtocolsHandlerEvent::Custom(
BehaviourHandlerOut::Delegate(event),
BehaviourHandlerOut::Delegate(Box::new(event)),
))
}
Poll::Ready(ProtocolsHandlerEvent::Close(err)) => {

View File

@@ -160,10 +160,10 @@ impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
) {
match event {
// Events comming from the handler, redirected to each behaviour
BehaviourHandlerOut::Delegate(delegate) => match delegate {
BehaviourHandlerOut::Delegate(delegate) => match *delegate {
DelegateOut::Gossipsub(ev) => self.gossipsub.inject_event(peer_id, conn_id, ev),
DelegateOut::RPC(ev) => self.eth2_rpc.inject_event(peer_id, conn_id, ev),
DelegateOut::Identify(ev) => self.identify.inject_event(peer_id, conn_id, ev),
DelegateOut::Identify(ev) => self.identify.inject_event(peer_id, conn_id, *ev),
},
/* Custom events sent BY the handler */
BehaviourHandlerOut::Custom => {

View File

@@ -56,6 +56,9 @@ pub struct Config {
/// Client version
pub client_version: String,
/// Disables the discovery protocol from starting.
pub disable_discovery: bool,
/// List of extra topics to initially subscribe to as strings.
pub topics: Vec<GossipKind>,
}
@@ -104,7 +107,7 @@ impl Default for Config {
.request_retries(2)
.enr_peer_update_min(2) // prevents NAT's should be raised for mainnet
.query_parallelism(5)
.query_timeout(Duration::from_secs(60))
.query_timeout(Duration::from_secs(30))
.query_peer_timeout(Duration::from_secs(2))
.ip_limit() // limits /24 IP's in buckets.
.ping_interval(Duration::from_secs(300))
@@ -125,6 +128,7 @@ impl Default for Config {
boot_nodes: vec![],
libp2p_nodes: vec![],
client_version: version::version(),
disable_discovery: false,
topics,
}
}

View File

@@ -51,7 +51,7 @@ const FIND_NODE_QUERY_CLOSEST_PEERS: usize = 16;
pub enum DiscoveryEvent {
/// A query has completed. The first parameter is the `min_ttl` of the peers if it is specified
/// and the second parameter are the discovered peers.
QueryResult(Option<Instant>, Box<Vec<Enr>>),
QueryResult(Option<Instant>, Vec<Enr>),
/// This indicates that our local UDP socketaddr has been updated and we should inform libp2p.
SocketUpdated(SocketAddr),
}
@@ -112,10 +112,12 @@ enum EventStream {
),
/// The future has completed.
Present(mpsc::Receiver<Discv5Event>),
// The future has failed, there are no events from discv5.
Failed,
// The future has failed or discv5 has been disabled. There are no events from discv5.
InActive,
}
/// The main discovery service. This can be disabled via CLI arguements. When disabled the
/// underlying processes are not started, but this struct still maintains our current ENR.
pub struct Discovery<TSpec: EthSpec> {
/// A collection of seen live ENRs for quick lookup and to map peer-id's to ENRs.
cached_enrs: LruCache<PeerId, Enr>,
@@ -145,6 +147,10 @@ pub struct Discovery<TSpec: EthSpec> {
/// The discv5 event stream.
event_stream: EventStream,
/// Indicates if the discovery service has been started. When the service is disabled, this is
/// always false.
started: bool,
/// Logger for the discovery behaviour.
log: slog::Logger,
}
@@ -196,12 +202,16 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
});
}
// Start the discv5 service.
discv5.start(listen_socket);
debug!(log, "Discovery service started");
// Start the discv5 service and obtain an event stream
let event_stream = if !config.disable_discovery {
discv5.start(listen_socket);
debug!(log, "Discovery service started");
EventStream::Awaiting(Box::pin(discv5.event_stream()))
} else {
EventStream::InActive
};
// Obtain the event stream
let event_stream = EventStream::Awaiting(Box::pin(discv5.event_stream()));
Ok(Self {
cached_enrs: LruCache::new(50),
@@ -211,6 +221,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
active_queries: FuturesUnordered::new(),
discv5,
event_stream,
started: !config.disable_discovery,
log,
enr_dir,
})
@@ -223,10 +234,11 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
/// This adds a new `FindPeers` query to the queue if one doesn't already exist.
pub fn discover_peers(&mut self) {
// If we are in the process of a query, don't bother queuing a new one.
if self.find_peer_active {
// If the discv5 service isn't running or we are in the process of a query, don't bother queuing a new one.
if !self.started || self.find_peer_active {
return;
}
// If there is not already a find peer's query queued, add one
let query = QueryType::FindPeers;
if !self.queued_queries.contains(&query) {
@@ -239,54 +251,11 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
/// Processes a request to search for more peers on a subnet.
pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option<Instant>) {
self.add_subnet_query(subnet_id, min_ttl, 0);
}
/// Adds a subnet query if one doesn't exist. If a subnet query already exists, this
/// updates the min_ttl field.
fn add_subnet_query(&mut self, subnet_id: SubnetId, min_ttl: Option<Instant>, retries: usize) {
// remove the entry and complete the query if greater than the maximum search count
if retries >= MAX_DISCOVERY_RETRY {
debug!(
self.log,
"Subnet peer discovery did not find sufficient peers. Reached max retry limit"
);
// If the discv5 service isn't running, ignore queries
if !self.started {
return;
}
// Search through any queued requests and update the timeout if a query for this subnet
// already exists
let mut found = false;
for query in self.queued_queries.iter_mut() {
if let QueryType::Subnet {
subnet_id: ref mut q_subnet_id,
min_ttl: ref mut q_min_ttl,
retries: ref mut q_retries,
} = query
{
if *q_subnet_id == subnet_id {
if *q_min_ttl < min_ttl {
*q_min_ttl = min_ttl;
}
// update the number of retries
*q_retries = retries;
// mimic an `Iter::Find()` and short-circuit the loop
found = true;
break;
}
}
}
if !found {
// Set up the query and add it to the queue
let query = QueryType::Subnet {
subnet_id,
min_ttl,
retries,
};
// update the metrics and insert into the queue.
metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64);
self.queued_queries.push_back(query);
}
self.add_subnet_query(subnet_id, min_ttl, 0);
}
/// Add an ENR to the routing table of the discovery mechanism.
@@ -295,7 +264,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
self.cached_enrs.put(enr.peer_id(), enr.clone());
if let Err(e) = self.discv5.add_enr(enr) {
warn!(
debug!(
self.log,
"Could not add peer to the local routing table";
"error" => format!("{}", e)
@@ -359,7 +328,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
.enr_insert(BITFIELD_ENR_KEY, current_bitfield.as_ssz_bytes());
// replace the global version
*self.network_globals.local_enr.write() = self.discv5.local_enr().clone();
*self.network_globals.local_enr.write() = self.discv5.local_enr();
Ok(())
}
@@ -391,11 +360,58 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
});
// replace the global version with discovery version
*self.network_globals.local_enr.write() = self.discv5.local_enr().clone();
*self.network_globals.local_enr.write() = self.discv5.local_enr();
}
/* Internal Functions */
/// Adds a subnet query if one doesn't exist. If a subnet query already exists, this
/// updates the min_ttl field.
fn add_subnet_query(&mut self, subnet_id: SubnetId, min_ttl: Option<Instant>, retries: usize) {
// remove the entry and complete the query if greater than the maximum search count
if retries >= MAX_DISCOVERY_RETRY {
debug!(
self.log,
"Subnet peer discovery did not find sufficient peers. Reached max retry limit"
);
return;
}
// Search through any queued requests and update the timeout if a query for this subnet
// already exists
let mut found = false;
for query in self.queued_queries.iter_mut() {
if let QueryType::Subnet {
subnet_id: ref mut q_subnet_id,
min_ttl: ref mut q_min_ttl,
retries: ref mut q_retries,
} = query
{
if *q_subnet_id == subnet_id {
if *q_min_ttl < min_ttl {
*q_min_ttl = min_ttl;
}
// update the number of retries
*q_retries = retries;
// mimic an `Iter::Find()` and short-circuit the loop
found = true;
break;
}
}
}
if !found {
// Set up the query and add it to the queue
let query = QueryType::Subnet {
subnet_id,
min_ttl,
retries,
};
// update the metrics and insert into the queue.
metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64);
self.queued_queries.push_back(query);
}
}
/// Consume the discovery queue and initiate queries when applicable.
///
/// This also sanitizes the queue removing out-dated queries.
@@ -572,6 +588,10 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
// Main execution loop to be driven by the peer manager.
pub fn poll(&mut self, cx: &mut Context) -> Poll<DiscoveryEvent> {
if !self.started {
return Poll::Pending;
}
// Process the query queue
self.process_queue();
@@ -582,7 +602,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
self.cached_enrs.put(enr.peer_id(), enr);
}
// return the result to the peer manager
return Poll::Ready(DiscoveryEvent::QueryResult(min_ttl, Box::new(result)));
return Poll::Ready(DiscoveryEvent::QueryResult(min_ttl, result));
}
// Process the server event stream
@@ -594,12 +614,12 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
Ok(stream) => self.event_stream = EventStream::Present(stream),
Err(e) => {
slog::crit!(self.log, "Discv5 event stream failed"; "error" => e.to_string());
self.event_stream = EventStream::Failed;
self.event_stream = EventStream::InActive;
}
}
}
}
EventStream::Failed => {} // ignore checking the stream
EventStream::InActive => {} // ignore checking the stream
EventStream::Present(ref mut stream) => {
while let Ok(event) = stream.try_recv() {
match event {

View File

@@ -422,7 +422,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// with a new `PeerId` which involves a discovery routing table lookup. We could dial the
/// multiaddr here, however this could relate to duplicate PeerId's etc. If the lookup
/// proves resource constraining, we should switch to multiaddr dialling here.
fn peers_discovered(&mut self, peers: Vec<Enr>, min_ttl: Option<Instant>) {
fn peers_discovered(&mut self, peers: &[Enr], min_ttl: Option<Instant>) {
for enr in peers {
let peer_id = enr.peer_id();
@@ -623,7 +623,7 @@ impl<TSpec: EthSpec> Stream for PeerManager<TSpec> {
match event {
DiscoveryEvent::SocketUpdated(socket_addr) => self.socket_updated(socket_addr),
DiscoveryEvent::QueryResult(min_ttl, peers) => {
self.peers_discovered(*peers, min_ttl)
self.peers_discovered(&peers, min_ttl)
}
}
}

View File

@@ -169,7 +169,7 @@ where
/// A response has been sent, pending writing.
ResponsePendingSend {
/// The substream used to send the response
substream: InboundFramed<NegotiatedSubstream, TSpec>,
substream: Box<InboundFramed<NegotiatedSubstream, TSpec>>,
/// The message that is attempting to be sent.
message: RPCCodedResponse<TSpec>,
/// Whether a stream termination is requested. If true the stream will be closed after
@@ -180,7 +180,7 @@ where
/// A response has been sent, pending flush.
ResponsePendingFlush {
/// The substream used to send the response
substream: InboundFramed<NegotiatedSubstream, TSpec>,
substream: Box<InboundFramed<NegotiatedSubstream, TSpec>>,
/// Whether a stream termination is requested. If true the stream will be closed after
/// this send. Otherwise it will transition to an idle state until a stream termination is
/// requested or a timeout is reached.
@@ -188,9 +188,9 @@ where
},
/// The response stream is idle and awaiting input from the application to send more chunked
/// responses.
ResponseIdle(InboundFramed<NegotiatedSubstream, TSpec>),
ResponseIdle(Box<InboundFramed<NegotiatedSubstream, TSpec>>),
/// The substream is attempting to shutdown.
Closing(InboundFramed<NegotiatedSubstream, TSpec>),
Closing(Box<InboundFramed<NegotiatedSubstream, TSpec>>),
/// Temporary state during processing
Poisoned,
}
@@ -201,12 +201,12 @@ pub enum OutboundSubstreamState<TSpec: EthSpec> {
/// handler because GOODBYE requests can be handled and responses dropped instantly.
RequestPendingResponse {
/// The framed negotiated substream.
substream: OutboundFramed<NegotiatedSubstream, TSpec>,
substream: Box<OutboundFramed<NegotiatedSubstream, TSpec>>,
/// Keeps track of the actual request sent.
request: RPCRequest<TSpec>,
},
/// Closing an outbound substream>
Closing(OutboundFramed<NegotiatedSubstream, TSpec>),
Closing(Box<OutboundFramed<NegotiatedSubstream, TSpec>>),
/// Temporary state during processing
Poisoned,
}
@@ -326,7 +326,7 @@ where
if matches!(self.state, HandlerState::Active) {
debug!(self.log, "Starting handler shutdown"; "unsent_queued_requests" => self.dial_queue.len());
// we now drive to completion communications already dialed/established
for (id, req) in self.dial_queue.pop() {
while let Some((id, req)) = self.dial_queue.pop() {
self.pending_errors.push(HandlerErr::Outbound {
id,
proto: req.protocol(),
@@ -551,7 +551,7 @@ where
self.current_inbound_substream_id,
Duration::from_secs(RESPONSE_TIMEOUT),
);
let awaiting_stream = InboundSubstreamState::ResponseIdle(substream);
let awaiting_stream = InboundSubstreamState::ResponseIdle(Box::new(substream));
self.inbound_substreams.insert(
self.current_inbound_substream_id,
(awaiting_stream, Some(delay_key), req.protocol()),
@@ -593,7 +593,7 @@ where
Duration::from_secs(RESPONSE_TIMEOUT),
);
let awaiting_stream = OutboundSubstreamState::RequestPendingResponse {
substream: out,
substream: Box::new(out),
request,
};
let expected_responses = if expected_responses > 1 {
@@ -833,7 +833,7 @@ where
// await flush
entry.get_mut().0 =
InboundSubstreamState::ResponsePendingFlush {
substream,
substream: substream,
closing,
};
drive_stream_further = true;
@@ -853,7 +853,7 @@ where
} else {
// check for queued chunks and update the stream
entry.get_mut().0 = apply_queued_responses(
substream,
*substream,
&mut self
.queued_outbound_items
.get_mut(&request_id),
@@ -908,7 +908,7 @@ where
} else {
// check for queued chunks and update the stream
entry.get_mut().0 = apply_queued_responses(
substream,
*substream,
&mut self
.queued_outbound_items
.get_mut(&request_id),
@@ -942,7 +942,7 @@ where
InboundSubstreamState::ResponseIdle(substream) => {
if !deactivated {
entry.get_mut().0 = apply_queued_responses(
substream,
*substream,
&mut self.queued_outbound_items.get_mut(&request_id),
&mut drive_stream_further,
);
@@ -1190,10 +1190,10 @@ fn apply_queued_responses<TSpec: EthSpec>(
match queue.remove(0) {
RPCCodedResponse::StreamTermination(_) => {
// close the stream if this is a stream termination
InboundSubstreamState::Closing(substream)
InboundSubstreamState::Closing(Box::new(substream))
}
chunk => InboundSubstreamState::ResponsePendingSend {
substream,
substream: Box::new(substream),
message: chunk,
closing: false,
},
@@ -1201,7 +1201,7 @@ fn apply_queued_responses<TSpec: EthSpec>(
}
_ => {
// no items queued set to idle
InboundSubstreamState::ResponseIdle(substream)
InboundSubstreamState::ResponseIdle(Box::new(substream))
}
}
}

View File

@@ -106,7 +106,12 @@ impl<TSpec: EthSpec> Service<TSpec> {
));
info!(log, "Libp2p Service"; "peer_id" => format!("{:?}", enr.peer_id()));
debug!(log, "Attempting to open listening ports"; "address" => format!("{}", config.listen_address), "tcp_port" => config.libp2p_port, "udp_port" => config.discovery_port);
let discovery_string = if config.disable_discovery {
"None".into()
} else {
config.discovery_port.to_string()
};
debug!(log, "Attempting to open listening ports"; "address" => format!("{}", config.listen_address), "tcp_port" => config.libp2p_port, "udp_port" => discovery_string);
let mut swarm = {
// Set up the transport - tcp/ws with noise and yamux/mplex