Merge remote-tracking branch 'origin/network-clean' into eth1-deploy

This commit is contained in:
Paul Hauner
2019-11-29 17:09:01 +11:00
13 changed files with 58 additions and 81 deletions

View File

@@ -173,11 +173,11 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<IdentifyEv
info.listen_addrs.truncate(MAX_IDENTIFY_ADDRESSES);
}
debug!(self.log, "Identified Peer"; "Peer" => format!("{}", peer_id),
"Protocol Version" => info.protocol_version,
"Agent Version" => info.agent_version,
"Listening Addresses" => format!("{:?}", info.listen_addrs),
"Observed Address" => format!("{:?}", observed_addr),
"Protocols" => format!("{:?}", info.protocols)
"protocol_version" => info.protocol_version,
"agent_version" => info.agent_version,
"listening_ addresses" => format!("{:?}", info.listen_addrs),
"observed_address" => format!("{:?}", observed_addr),
"protocols" => format!("{:?}", info.protocols)
);
}
IdentifyEvent::Sent { .. } => {}
@@ -210,7 +210,7 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
/// Publishes a message on the pubsub (gossipsub) behaviour.
pub fn publish(&mut self, topics: &[Topic], message: PubsubMessage) {
let message_data = message.to_data();
let message_data = message.into_data();
for topic in topics {
self.gossipsub.publish(topic, message_data.clone());
}
@@ -295,7 +295,7 @@ impl PubsubMessage {
* Also note that a message can be associated with many topics. As soon as one of the topics is
* known we match. If none of the topics are known we return an unknown state.
*/
fn from_topics(topics: &Vec<TopicHash>, data: Vec<u8>) -> Self {
fn from_topics(topics: &[TopicHash], data: Vec<u8>) -> Self {
for topic in topics {
// compare the prefix and postfix, then match on the topic
let topic_parts: Vec<&str> = topic.as_str().split('/').collect();
@@ -316,7 +316,7 @@ impl PubsubMessage {
PubsubMessage::Unknown(data)
}
fn to_data(self) -> Vec<u8> {
fn into_data(self) -> Vec<u8> {
match self {
PubsubMessage::Block(data)
| PubsubMessage::Attestation(data)

View File

@@ -1,3 +1,6 @@
#![allow(clippy::type_complexity)]
#![allow(clippy::cognitive_complexity)]
use super::methods::{RPCErrorResponse, RequestId};
use super::protocol::{RPCError, RPCProtocol, RPCRequest};
use super::RPCEvent;
@@ -174,7 +177,6 @@ where
}
/// Opens an outbound substream with a request.
#[inline]
pub fn send_request(&mut self, rpc_event: RPCEvent) {
self.keep_alive = KeepAlive::Yes;
@@ -194,12 +196,10 @@ where
type OutboundProtocol = RPCRequest;
type OutboundOpenInfo = RPCEvent; // Keep track of the id and the request
#[inline]
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
self.listen_protocol.clone()
}
#[inline]
fn inject_fully_negotiated_inbound(
&mut self,
out: <RPCProtocol as InboundUpgrade<TSubstream>>::Output,
@@ -225,7 +225,6 @@ where
self.current_substream_id += 1;
}
#[inline]
fn inject_fully_negotiated_outbound(
&mut self,
out: <RPCRequest as OutboundUpgrade<TSubstream>>::Output,
@@ -263,14 +262,11 @@ where
// Note: If the substream has closed due to inactivity, or the substream is in the
// wrong state a response will fail silently.
#[inline]
fn inject_event(&mut self, rpc_event: Self::InEvent) {
match rpc_event {
RPCEvent::Request(_, _) => self.send_request(rpc_event),
RPCEvent::Response(rpc_id, response) => {
// check if the stream matching the response still exists
trace!(self.log, "Checking for outbound stream");
// variables indicating if the response is an error response or a multi-part
// response
let res_is_error = response.is_error();
@@ -280,7 +276,6 @@ where
Some((substream_state, _)) => {
match std::mem::replace(substream_state, InboundSubstreamState::Poisoned) {
InboundSubstreamState::ResponseIdle(substream) => {
trace!(self.log, "Stream is idle, sending message"; "message" => format!("{}", response));
// close the stream if there is no response
if let RPCErrorResponse::StreamTermination(_) = response {
trace!(self.log, "Stream termination sent. Ending the stream");
@@ -298,7 +293,6 @@ where
if res_is_multiple =>
{
// the stream is in use, add the request to a pending queue
trace!(self.log, "Adding message to queue"; "message" => format!("{}", response));
(*self
.queued_outbound_items
.entry(rpc_id)
@@ -338,7 +332,6 @@ where
}
}
#[inline]
fn inject_dial_upgrade_error(
&mut self,
_: Self::OutboundOpenInfo,
@@ -351,7 +344,6 @@ where
}
}
#[inline]
fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
}
@@ -384,7 +376,6 @@ where
.poll()
.map_err(|_| ProtocolsHandlerUpgrErr::Timer)?
{
trace!(self.log, "Closing expired inbound stream");
self.inbound_substreams.remove(stream_id.get_ref());
}
@@ -394,7 +385,6 @@ where
.poll()
.map_err(|_| ProtocolsHandlerUpgrErr::Timer)?
{
trace!(self.log, "Closing expired outbound stream");
self.outbound_substreams.remove(stream_id.get_ref());
}
@@ -403,7 +393,7 @@ where
// Drain all queued items until all messages have been processed for this stream
// TODO Improve this code logic
let mut new_items_to_send = true;
while new_items_to_send == true {
while new_items_to_send {
new_items_to_send = false;
match self.inbound_substreams.entry(request_id) {
Entry::Occupied(mut entry) => {
@@ -418,7 +408,6 @@ where
match substream.poll() {
Ok(Async::Ready(raw_substream)) => {
// completed the send
trace!(self.log, "RPC message sent");
// close the stream if required
if closing {
@@ -426,7 +415,6 @@ where
InboundSubstreamState::Closing(raw_substream)
} else {
// check for queued chunks and update the stream
trace!(self.log, "Checking for queued items");
entry.get_mut().0 = apply_queued_responses(
raw_substream,
&mut self
@@ -454,7 +442,6 @@ where
};
}
InboundSubstreamState::ResponseIdle(substream) => {
trace!(self.log, "Idle stream searching queue");
entry.get_mut().0 = apply_queued_responses(
substream,
&mut self.queued_outbound_items.get_mut(&request_id),
@@ -500,12 +487,11 @@ where
request,
} => match substream.poll() {
Ok(Async::Ready(Some(response))) => {
trace!(self.log, "Message received"; "message" => format!("{}", response));
if request.multiple_responses() {
entry.get_mut().0 =
OutboundSubstreamState::RequestPendingResponse {
substream,
request: request,
request,
};
let delay_key = &entry.get().1;
self.outbound_substreams_delay

View File

@@ -77,7 +77,7 @@ impl<TSubstream> RPC<TSubstream> {
RPC {
events: Vec::new(),
marker: PhantomData,
log: log,
log,
}
}

View File

@@ -1,3 +1,5 @@
#![allow(clippy::type_complexity)]
use super::methods::*;
use crate::rpc::{
codec::{

View File

@@ -141,14 +141,7 @@ impl Service {
topics.push(topic_builder(ATTESTER_SLASHING_TOPIC));
// Add any topics specified by the user
topics.append(
&mut config
.topics
.iter()
.cloned()
.map(|s| Topic::new(s))
.collect(),
);
topics.append(&mut config.topics.iter().cloned().map(Topic::new).collect());
let mut subscribed_topics = vec![];
for topic in topics {