Merge in latest master

This commit is contained in:
Age Manning
2019-11-29 13:23:03 +11:00
14 changed files with 490 additions and 109 deletions

View File

@@ -195,9 +195,6 @@ impl<T: BeaconChainTypes> MessageHandler<T> {
}
}
}
RPCResponse::Goodbye => {
// A goodbye was successfully sent, ignore it
}
}
}
RPCErrorResponse::StreamTermination(response_type) => {

View File

@@ -4,15 +4,12 @@ use crate::NetworkConfig;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use core::marker::PhantomData;
use eth2_libp2p::Service as LibP2PService;
use eth2_libp2p::{
rpc::{RPCErrorResponse, RPCRequest, RPCResponse},
ConnectedPoint, Enr, Libp2pEvent, Multiaddr, NetworkBehaviour, PeerId, Swarm, Topic,
};
use eth2_libp2p::{rpc::RPCRequest, Enr, Libp2pEvent, Multiaddr, PeerId, Swarm, Topic};
use eth2_libp2p::{PubsubMessage, RPCEvent};
use futures::prelude::*;
use futures::Stream;
use parking_lot::{Mutex, MutexGuard};
use slog::{debug, info, trace, warn};
use parking_lot::Mutex;
use slog::{debug, info, trace};
use std::sync::Arc;
use tokio::runtime::TaskExecutor;
use tokio::sync::{mpsc, oneshot};
@@ -158,9 +155,6 @@ fn network_service(
propagation_percentage: Option<u8>,
) -> impl futures::Future<Item = (), Error = eth2_libp2p::error::Error> {
futures::future::poll_fn(move || -> Result<_, eth2_libp2p::error::Error> {
// keep a list of peers to disconnect, once all channels are processed, remove the peers.
let mut peers_to_ban = Vec::new();
// processes the network channel before processing the libp2p swarm
loop {
// poll the network channel
@@ -218,7 +212,7 @@ fn network_service(
}
}
NetworkMessage::Disconnect { peer_id } => {
peers_to_ban.push(peer_id);
libp2p_service.lock().disconnect_and_ban_peer(peer_id);
}
},
Ok(Async::NotReady) => break,
@@ -233,21 +227,15 @@ fn network_service(
loop {
// poll the swarm
match libp2p_service.lock().poll() {
let mut locked_service = libp2p_service.lock();
match locked_service.poll() {
Ok(Async::Ready(Some(event))) => match event {
Libp2pEvent::RPC(peer_id, rpc_event) => {
trace!(log, "Received RPC"; "rpc" => format!("{}", rpc_event));
// if we received or sent a Goodbye message, drop and ban the peer
match rpc_event {
RPCEvent::Request(_, RPCRequest::Goodbye(_))
| RPCEvent::Response(
_,
RPCErrorResponse::Success(RPCResponse::Goodbye),
) => {
peers_to_ban.push(peer_id.clone());
}
_ => {}
// if we received a Goodbye message, drop and ban the peer
if let RPCEvent::Request(_, RPCRequest::Goodbye(_)) = rpc_event {
locked_service.disconnect_and_ban_peer(peer_id.clone());
};
message_handler_send
.try_send(HandlerMessage::RPC(peer_id, rpc_event))
@@ -283,32 +271,10 @@ fn network_service(
}
}
while !peers_to_ban.is_empty() {
let service = libp2p_service.lock();
disconnect_peer(service, peers_to_ban.pop().expect("element exists"), &log);
}
Ok(Async::NotReady)
})
}
fn disconnect_peer(mut service: MutexGuard<LibP2PService>, peer_id: PeerId, log: &slog::Logger) {
warn!(log, "Disconnecting and banning peer"; "peer_id" => format!("{:?}", peer_id));
Swarm::ban_peer_id(&mut service.swarm, peer_id.clone());
// TODO: Correctly notify protocols of the disconnect
// TOOD: Also remove peer from the DHT: https://github.com/sigp/lighthouse/issues/629
let dummy_connected_point = ConnectedPoint::Dialer {
address: "/ip4/0.0.0.0"
.parse::<Multiaddr>()
.expect("valid multiaddr"),
};
service
.swarm
.inject_disconnected(&peer_id, dummy_connected_point);
// inform the behaviour that the peer has been banned
service.swarm.peer_banned(peer_id);
}
/// Types of messages that the network service can receive.
#[derive(Debug)]
pub enum NetworkMessage {