mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-11 18:04:18 +00:00
Implement hello responses.
This commit is contained in:
@@ -6,7 +6,7 @@ use crossbeam_channel::{unbounded as channel, Sender};
|
||||
use futures::future;
|
||||
use libp2p::{
|
||||
rpc::{RPCRequest, RPCResponse},
|
||||
HelloMessage, PeerId, RPCEvent,
|
||||
PeerId, RPCEvent,
|
||||
};
|
||||
use slog::debug;
|
||||
use slog::warn;
|
||||
@@ -85,7 +85,7 @@ impl MessageHandler {
|
||||
match message {
|
||||
// we have initiated a connection to a peer
|
||||
HandlerMessage::PeerDialed(peer_id) => {
|
||||
self.sync.on_connect(&peer_id, &mut self.network_context);
|
||||
self.sync.on_connect(peer_id, &mut self.network_context);
|
||||
}
|
||||
// we have received an RPC message request/response
|
||||
HandlerMessage::RPC(peer_id, rpc_event) => {
|
||||
@@ -113,7 +113,7 @@ impl MessageHandler {
|
||||
match request {
|
||||
RPCRequest::Hello(hello_message) => {
|
||||
self.sync
|
||||
.on_hello(&peer_id, hello_message, &mut self.network_context)
|
||||
.on_hello(peer_id, hello_message, &mut self.network_context)
|
||||
}
|
||||
// TODO: Handle all requests
|
||||
_ => {}
|
||||
@@ -136,26 +136,13 @@ impl MessageHandler {
|
||||
match response {
|
||||
RPCResponse::Hello(hello_message) => {
|
||||
debug!(self.log, "Hello response received from peer: {:?}", peer_id);
|
||||
self.validate_hello(peer_id, hello_message);
|
||||
self.sync
|
||||
.on_hello(peer_id, hello_message, &mut self.network_context);
|
||||
}
|
||||
// TODO: Handle all responses
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
/// Validate a HELLO RPC message.
|
||||
fn validate_hello(&mut self, peer_id: PeerId, message: HelloMessage) {
|
||||
self.sync
|
||||
.on_hello(&peer_id, message.clone(), &mut self.network_context);
|
||||
// validate the peer
|
||||
if !self.sync.validate_peer(peer_id.clone(), message) {
|
||||
debug!(
|
||||
self.log,
|
||||
"Peer dropped due to mismatching HELLO messages: {:?}", peer_id
|
||||
);
|
||||
//TODO: block/ban the peer
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct NetworkContext {
|
||||
@@ -179,6 +166,10 @@ impl NetworkContext {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn disconnect(&self, _peer_id: PeerId) {
|
||||
// TODO: disconnect peers.
|
||||
}
|
||||
|
||||
pub fn send_rpc_request(&mut self, peer_id: PeerId, rpc_request: RPCRequest) {
|
||||
let id = self.generate_request_id(&peer_id);
|
||||
self.send_rpc_event(
|
||||
|
||||
@@ -6,7 +6,6 @@ use crossbeam_channel::{unbounded as channel, Sender, TryRecvError};
|
||||
use futures::prelude::*;
|
||||
use futures::sync::oneshot;
|
||||
use futures::Stream;
|
||||
use libp2p::rpc::RPCResponse;
|
||||
use libp2p::RPCEvent;
|
||||
use libp2p::Service as LibP2PService;
|
||||
use libp2p::{Libp2pEvent, PeerId};
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
use crate::beacon_chain::BeaconChain;
|
||||
use crate::message_handler::{MessageHandler, NetworkContext};
|
||||
use crate::message_handler::NetworkContext;
|
||||
use crate::service::NetworkMessage;
|
||||
use crossbeam_channel::Sender;
|
||||
use libp2p::rpc::{HelloMessage, RPCMethod, RPCRequest, RPCResponse};
|
||||
use libp2p::rpc::methods::*;
|
||||
use libp2p::rpc::{RPCRequest, RPCResponse};
|
||||
use libp2p::PeerId;
|
||||
use slog::{debug, o};
|
||||
use std::collections::HashMap;
|
||||
@@ -15,6 +16,7 @@ type NetworkSender = Sender<NetworkMessage>;
|
||||
const SLOT_IMPORT_TOLERANCE: u64 = 100;
|
||||
|
||||
/// Keeps track of syncing information for known connected peers.
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct PeerSyncInfo {
|
||||
latest_finalized_root: Hash256,
|
||||
latest_finalized_epoch: Epoch,
|
||||
@@ -23,18 +25,37 @@ pub struct PeerSyncInfo {
|
||||
}
|
||||
|
||||
impl PeerSyncInfo {
|
||||
pub fn is_on_chain(&self, chain: &Arc<BeaconChain>) -> bool {
|
||||
fn is_on_chain(&self, chain: &Arc<BeaconChain>) -> bool {
|
||||
// TODO: make useful.
|
||||
true
|
||||
}
|
||||
|
||||
pub fn has_higher_finalized_epoch(&self, chain: &Arc<BeaconChain>) -> bool {
|
||||
fn has_higher_finalized_epoch(&self, chain: &Arc<BeaconChain>) -> bool {
|
||||
self.latest_finalized_epoch > chain.get_state().finalized_epoch
|
||||
}
|
||||
|
||||
pub fn has_higher_best_slot(&self, chain: &Arc<BeaconChain>) -> bool {
|
||||
fn has_higher_best_slot(&self, chain: &Arc<BeaconChain>) -> bool {
|
||||
self.latest_finalized_epoch > chain.get_state().finalized_epoch
|
||||
}
|
||||
|
||||
pub fn status(&self, chain: &Arc<BeaconChain>) -> PeerStatus {
|
||||
if self.has_higher_finalized_epoch(chain) {
|
||||
PeerStatus::HigherFinalizedEpoch
|
||||
} else if !self.is_on_chain(chain) {
|
||||
PeerStatus::HigherFinalizedEpoch
|
||||
} else if self.has_higher_best_slot(chain) {
|
||||
PeerStatus::HigherBestSlot
|
||||
} else {
|
||||
PeerStatus::NotInteresting
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum PeerStatus {
|
||||
OnDifferentChain,
|
||||
HigherFinalizedEpoch,
|
||||
HigherBestSlot,
|
||||
NotInteresting,
|
||||
}
|
||||
|
||||
impl From<HelloMessage> for PeerSyncInfo {
|
||||
@@ -91,16 +112,13 @@ impl SimpleSync {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn on_connect(&self, peer_id: &PeerId, network: &mut NetworkContext) {
|
||||
network.send_rpc_request(
|
||||
peer_id.clone(),
|
||||
RPCRequest::Hello(self.chain.hello_message()),
|
||||
);
|
||||
pub fn on_connect(&self, peer_id: PeerId, network: &mut NetworkContext) {
|
||||
network.send_rpc_request(peer_id, RPCRequest::Hello(self.chain.hello_message()));
|
||||
}
|
||||
|
||||
pub fn on_hello_request(
|
||||
&self,
|
||||
peer_id: &PeerId,
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
hello: HelloMessage,
|
||||
network: &mut NetworkContext,
|
||||
) {
|
||||
@@ -111,97 +129,63 @@ impl SimpleSync {
|
||||
self.on_hello(peer_id, hello, network);
|
||||
}
|
||||
|
||||
pub fn on_hello(&self, peer_id: &PeerId, hello: HelloMessage, network: &mut NetworkContext) {
|
||||
pub fn on_hello(&mut self, peer_id: PeerId, hello: HelloMessage, network: &mut NetworkContext) {
|
||||
let spec = self.chain.get_spec();
|
||||
|
||||
// network id must match
|
||||
if hello.network_id != self.network_id {
|
||||
debug!(self.log, "Bad network id. Peer: {:?}", peer_id);
|
||||
network.disconnect(peer_id);
|
||||
return;
|
||||
}
|
||||
|
||||
let peer = PeerSyncInfo::from(hello);
|
||||
|
||||
/*
|
||||
if peer.has_higher_finalized_epoch(&self.chain) {
|
||||
// we need blocks
|
||||
let peer_slot = peer.latest_finalized_epoch.start_slot(spec.slots_per_epoch);
|
||||
let our_slot = self.chain.finalized_epoch();
|
||||
let required_slots = peer_slot - our_slot;
|
||||
} else {
|
||||
if !peer.is_on_chain(&self.chain) {
|
||||
return (true, responses);
|
||||
}
|
||||
//
|
||||
}
|
||||
*/
|
||||
|
||||
/*
|
||||
// compare latest epoch and finalized root to see if they exist in our chain
|
||||
if peer_info.latest_finalized_epoch <= self.latest_finalized_epoch {
|
||||
// ensure their finalized root is in our chain
|
||||
// TODO: Get the finalized root at hello_message.latest_epoch and ensure they match
|
||||
//if (hello_message.latest_finalized_root == self.chain.get_state() {
|
||||
// return false;
|
||||
// }
|
||||
}
|
||||
|
||||
// the client is valid, add it to our list of known_peers and request sync if required
|
||||
// update peer list if peer already exists
|
||||
let peer_info = PeerSyncInfo::from(hello);
|
||||
|
||||
debug!(self.log, "Handshake successful. Peer: {:?}", peer_id);
|
||||
self.known_peers.insert(peer_id, peer_info);
|
||||
self.known_peers.insert(peer_id.clone(), peer);
|
||||
|
||||
// set state to sync
|
||||
if self.state == SyncState::Idle
|
||||
&& hello_message.best_slot > self.latest_slot + SLOT_IMPORT_TOLERANCE
|
||||
{
|
||||
match peer.status(&self.chain) {
|
||||
PeerStatus::OnDifferentChain => {
|
||||
debug!(self.log, "Peer is on different chain. Peer: {:?}", peer_id);
|
||||
|
||||
network.disconnect(peer_id);
|
||||
}
|
||||
PeerStatus::HigherFinalizedEpoch => {
|
||||
let start_slot = peer.latest_finalized_epoch.start_slot(spec.slots_per_epoch);
|
||||
let required_slots = start_slot - self.chain.slot();
|
||||
|
||||
self.request_block_roots(peer_id, start_slot, required_slots.as_u64(), network);
|
||||
}
|
||||
PeerStatus::HigherBestSlot => {
|
||||
let start_slot = peer.best_slot;
|
||||
let required_slots = start_slot - self.chain.slot();
|
||||
|
||||
self.request_block_roots(peer_id, start_slot, required_slots.as_u64(), network);
|
||||
}
|
||||
PeerStatus::NotInteresting => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn request_block_roots(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
start_slot: Slot,
|
||||
count: u64,
|
||||
network: &mut NetworkContext,
|
||||
) {
|
||||
// Potentially set state to sync.
|
||||
if self.state == SyncState::Idle && count > SLOT_IMPORT_TOLERANCE {
|
||||
self.state = SyncState::Downloading;
|
||||
//TODO: Start requesting blocks from known peers. Ideally in batches
|
||||
}
|
||||
|
||||
true
|
||||
*/
|
||||
// TODO: handle count > max count.
|
||||
network.send_rpc_request(
|
||||
peer_id.clone(),
|
||||
RPCRequest::BeaconBlockRoots(BeaconBlockRootsRequest { start_slot, count }),
|
||||
);
|
||||
}
|
||||
|
||||
/// Generates our current state in the form of a HELLO RPC message.
|
||||
pub fn generate_hello(&self) -> HelloMessage {
|
||||
self.chain.hello_message()
|
||||
}
|
||||
|
||||
pub fn validate_peer(&mut self, peer_id: PeerId, hello_message: HelloMessage) -> bool {
|
||||
// network id must match
|
||||
if hello_message.network_id != self.network_id {
|
||||
return false;
|
||||
}
|
||||
// compare latest epoch and finalized root to see if they exist in our chain
|
||||
if hello_message.latest_finalized_epoch <= self.latest_finalized_epoch {
|
||||
// ensure their finalized root is in our chain
|
||||
// TODO: Get the finalized root at hello_message.latest_epoch and ensure they match
|
||||
//if (hello_message.latest_finalized_root == self.chain.get_state() {
|
||||
// return false;
|
||||
// }
|
||||
}
|
||||
|
||||
// the client is valid, add it to our list of known_peers and request sync if required
|
||||
// update peer list if peer already exists
|
||||
let peer_info = PeerSyncInfo {
|
||||
latest_finalized_root: hello_message.latest_finalized_root,
|
||||
latest_finalized_epoch: hello_message.latest_finalized_epoch,
|
||||
best_root: hello_message.best_root,
|
||||
best_slot: hello_message.best_slot,
|
||||
};
|
||||
|
||||
debug!(self.log, "Handshake successful. Peer: {:?}", peer_id);
|
||||
self.known_peers.insert(peer_id, peer_info);
|
||||
|
||||
// set state to sync
|
||||
if self.state == SyncState::Idle
|
||||
&& hello_message.best_slot > self.latest_slot + SLOT_IMPORT_TOLERANCE
|
||||
{
|
||||
self.state = SyncState::Downloading;
|
||||
//TODO: Start requesting blocks from known peers. Ideally in batches
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user