Merge latest master

This commit is contained in:
Age Manning
2019-07-16 23:26:31 +10:00
74 changed files with 4211 additions and 786 deletions

View File

@@ -2,19 +2,20 @@ use crate::error;
use crate::service::{NetworkMessage, OutgoingMessage};
use crate::sync::SimpleSync;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use crossbeam_channel::{unbounded as channel, Sender};
use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::{
behaviour::PubsubMessage,
rpc::{RPCError, RPCErrorResponse, RPCRequest, RPCResponse, RequestId},
PeerId, RPCEvent,
};
use futures::future;
use futures::future::Future;
use futures::stream::Stream;
use slog::{debug, error, warn};
use ssz::Decode;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc;
/// Handles messages received from the network and client and organises syncing.
pub struct MessageHandler<T: BeaconChainTypes> {
@@ -45,13 +46,13 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
/// Initializes and runs the MessageHandler.
pub fn spawn(
beacon_chain: Arc<BeaconChain<T>>,
network_send: crossbeam_channel::Sender<NetworkMessage>,
network_send: mpsc::UnboundedSender<NetworkMessage>,
executor: &tokio::runtime::TaskExecutor,
log: slog::Logger,
) -> error::Result<Sender<HandlerMessage>> {
) -> error::Result<mpsc::UnboundedSender<HandlerMessage>> {
debug!(log, "Service starting");
let (handler_send, handler_recv) = channel();
let (handler_send, handler_recv) = mpsc::unbounded_channel();
// Initialise sync and begin processing in thread
// generate the Message handler
@@ -66,13 +67,13 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
// spawn handler task
// TODO: Handle manual termination of thread
executor.spawn(future::poll_fn(move || -> Result<_, _> {
loop {
handler.handle_message(handler_recv.recv().map_err(|_| {
executor.spawn(
handler_recv
.for_each(move |msg| Ok(handler.handle_message(msg)))
.map_err(move |_| {
debug!(log, "Network message handler terminated.");
})?);
}
}));
}),
);
Ok(handler_send)
}
@@ -302,7 +303,7 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
pub struct NetworkContext {
/// The network channel to relay messages to the Network service.
network_send: crossbeam_channel::Sender<NetworkMessage>,
network_send: mpsc::UnboundedSender<NetworkMessage>,
/// A mapping of peers and the RPC id we have sent an RPC request to.
outstanding_outgoing_request_ids: HashMap<(PeerId, RequestId), Instant>,
/// Stores the next `RequestId` we should include on an outgoing `RPCRequest` to a `PeerId`.
@@ -312,7 +313,7 @@ pub struct NetworkContext {
}
impl NetworkContext {
pub fn new(network_send: crossbeam_channel::Sender<NetworkMessage>, log: slog::Logger) -> Self {
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage>, log: slog::Logger) -> Self {
Self {
network_send,
outstanding_outgoing_request_ids: HashMap::new(),
@@ -348,13 +349,13 @@ impl NetworkContext {
);
}
fn send_rpc_event(&self, peer_id: PeerId, rpc_event: RPCEvent) {
fn send_rpc_event(&mut self, peer_id: PeerId, rpc_event: RPCEvent) {
self.send(peer_id, OutgoingMessage::RPC(rpc_event))
}
fn send(&self, peer_id: PeerId, outgoing_message: OutgoingMessage) {
fn send(&mut self, peer_id: PeerId, outgoing_message: OutgoingMessage) {
self.network_send
.send(NetworkMessage::Send(peer_id, outgoing_message))
.try_send(NetworkMessage::Send(peer_id, outgoing_message))
.unwrap_or_else(|_| {
warn!(
self.log,

View File

@@ -2,24 +2,23 @@ use crate::error;
use crate::message_handler::{HandlerMessage, MessageHandler};
use crate::NetworkConfig;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use crossbeam_channel::{unbounded as channel, Sender, TryRecvError};
use eth2_libp2p::Service as LibP2PService;
use eth2_libp2p::Topic;
use eth2_libp2p::{Libp2pEvent, PeerId};
use eth2_libp2p::{PubsubMessage, RPCEvent};
use futures::prelude::*;
use futures::sync::oneshot;
use futures::Stream;
use slog::{debug, info, o, trace};
use std::marker::PhantomData;
use std::sync::Arc;
use tokio::runtime::TaskExecutor;
use tokio::sync::{mpsc, oneshot};
/// Service that handles communication between internal services and the eth2_libp2p network service.
pub struct Service<T: BeaconChainTypes> {
//libp2p_service: Arc<Mutex<LibP2PService>>,
_libp2p_exit: oneshot::Sender<()>,
network_send: crossbeam_channel::Sender<NetworkMessage>,
network_send: mpsc::UnboundedSender<NetworkMessage>,
_phantom: PhantomData<T>, //message_handler: MessageHandler,
//message_handler_send: Sender<HandlerMessage>
}
@@ -30,9 +29,9 @@ impl<T: BeaconChainTypes + 'static> Service<T> {
config: &NetworkConfig,
executor: &TaskExecutor,
log: slog::Logger,
) -> error::Result<(Arc<Self>, Sender<NetworkMessage>)> {
) -> error::Result<(Arc<Self>, mpsc::UnboundedSender<NetworkMessage>)> {
// build the network channel
let (network_send, network_recv) = channel::<NetworkMessage>();
let (network_send, network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
// launch message handler thread
let message_handler_log = log.new(o!("Service" => "MessageHandler"));
let message_handler_send = MessageHandler::spawn(
@@ -64,9 +63,9 @@ impl<T: BeaconChainTypes + 'static> Service<T> {
}
// TODO: Testing only
pub fn send_message(&self) {
pub fn send_message(&mut self) {
self.network_send
.send(NetworkMessage::Send(
.try_send(NetworkMessage::Send(
PeerId::random(),
OutgoingMessage::NotifierTest,
))
@@ -76,12 +75,12 @@ impl<T: BeaconChainTypes + 'static> Service<T> {
fn spawn_service(
libp2p_service: LibP2PService,
network_recv: crossbeam_channel::Receiver<NetworkMessage>,
message_handler_send: crossbeam_channel::Sender<HandlerMessage>,
network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
message_handler_send: mpsc::UnboundedSender<HandlerMessage>,
executor: &TaskExecutor,
log: slog::Logger,
) -> error::Result<oneshot::Sender<()>> {
let (network_exit, exit_rx) = oneshot::channel();
) -> error::Result<tokio::sync::oneshot::Sender<()>> {
let (network_exit, exit_rx) = tokio::sync::oneshot::channel();
// spawn on the current executor
executor.spawn(
@@ -105,31 +104,67 @@ fn spawn_service(
//TODO: Potentially handle channel errors
fn network_service(
mut libp2p_service: LibP2PService,
network_recv: crossbeam_channel::Receiver<NetworkMessage>,
message_handler_send: crossbeam_channel::Sender<HandlerMessage>,
mut network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
mut message_handler_send: mpsc::UnboundedSender<HandlerMessage>,
log: slog::Logger,
) -> impl futures::Future<Item = (), Error = eth2_libp2p::error::Error> {
futures::future::poll_fn(move || -> Result<_, eth2_libp2p::error::Error> {
// poll the swarm
loop {
// only end the loop once both major polls are not ready.
let mut not_ready_count = 0;
while not_ready_count < 2 {
not_ready_count = 0;
// poll the network channel
match network_recv.poll() {
Ok(Async::Ready(Some(message))) => {
match message {
// TODO: Testing message - remove
NetworkMessage::Send(peer_id, outgoing_message) => {
match outgoing_message {
OutgoingMessage::RPC(rpc_event) => {
trace!(log, "Sending RPC Event: {:?}", rpc_event);
//TODO: Make swarm private
//TODO: Implement correct peer id topic message handling
libp2p_service.swarm.send_rpc(peer_id, rpc_event);
}
OutgoingMessage::NotifierTest => {
// debug!(log, "Received message from notifier");
}
};
}
NetworkMessage::Publish { topics, message } => {
debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics));
libp2p_service.swarm.publish(topics, *message);
}
}
}
Ok(Async::NotReady) => not_ready_count += 1,
Ok(Async::Ready(None)) => {
return Err(eth2_libp2p::error::Error::from("Network channel closed"));
}
Err(_) => {
return Err(eth2_libp2p::error::Error::from("Network channel error"));
}
}
// poll the swarm
match libp2p_service.poll() {
Ok(Async::Ready(Some(event))) => match event {
Libp2pEvent::RPC(peer_id, rpc_event) => {
trace!(log, "RPC Event: RPC message received: {:?}", rpc_event);
message_handler_send
.send(HandlerMessage::RPC(peer_id, rpc_event))
.map_err(|_| "Failed to send rpc to handler")?;
.try_send(HandlerMessage::RPC(peer_id, rpc_event))
.map_err(|_| "Failed to send RPC to handler")?;
}
Libp2pEvent::PeerDialed(peer_id) => {
debug!(log, "Peer Dialed: {:?}", peer_id);
message_handler_send
.send(HandlerMessage::PeerDialed(peer_id))
.try_send(HandlerMessage::PeerDialed(peer_id))
.map_err(|_| "Failed to send PeerDialed to handler")?;
}
Libp2pEvent::PeerDisconnected(peer_id) => {
debug!(log, "Peer Disconnected: {:?}", peer_id);
message_handler_send
.send(HandlerMessage::PeerDisconnected(peer_id))
.try_send(HandlerMessage::PeerDisconnected(peer_id))
.map_err(|_| "Failed to send PeerDisconnected to handler")?;
}
Libp2pEvent::PubsubMessage {
@@ -138,43 +173,13 @@ fn network_service(
//TODO: Decide if we need to propagate the topic upwards. (Potentially for
//attestations)
message_handler_send
.send(HandlerMessage::PubsubMessage(source, message))
.try_send(HandlerMessage::PubsubMessage(source, message))
.map_err(|_| " failed to send pubsub message to handler")?;
}
},
Ok(Async::Ready(None)) => unreachable!("Stream never ends"),
Ok(Async::NotReady) => break,
Err(_) => break,
}
}
// poll the network channel
// TODO: refactor - combine poll_fn's?
loop {
match network_recv.try_recv() {
// TODO: Testing message - remove
Ok(NetworkMessage::Send(peer_id, outgoing_message)) => {
match outgoing_message {
OutgoingMessage::RPC(rpc_event) => {
trace!(log, "Sending RPC Event: {:?}", rpc_event);
//TODO: Make swarm private
//TODO: Implement correct peer id topic message handling
libp2p_service.swarm.send_rpc(peer_id, rpc_event);
}
OutgoingMessage::NotifierTest => {
// debug!(log, "Received message from notifier");
}
};
}
Ok(NetworkMessage::Publish { topics, message }) => {
debug!(log, "Sending pubsub message on topics {:?}", topics);
libp2p_service.swarm.publish(topics, *message);
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
return Err(eth2_libp2p::error::Error::from(
"Network channel disconnected",
));
}
Ok(Async::NotReady) => not_ready_count += 1,
Err(_) => not_ready_count += 1,
}
}
Ok(Async::NotReady)

View File

@@ -1,7 +1,8 @@
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::PeerId;
use slog::{debug, error};
use slog::error;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tree_hash::TreeHash;
@@ -22,7 +23,7 @@ use types::{BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Hash256, Slot};
pub struct ImportQueue<T: BeaconChainTypes> {
pub chain: Arc<BeaconChain<T>>,
/// Partially imported blocks, keyed by the root of `BeaconBlockBody`.
pub partials: Vec<PartialBeaconBlock>,
partials: HashMap<Hash256, PartialBeaconBlock>,
/// Time before a queue entry is considered state.
pub stale_time: Duration,
/// Logging
@@ -34,41 +35,33 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
pub fn new(chain: Arc<BeaconChain<T>>, stale_time: Duration, log: slog::Logger) -> Self {
Self {
chain,
partials: vec![],
partials: HashMap::new(),
stale_time,
log,
}
}
/// Completes all possible partials into `BeaconBlock` and returns them, sorted by increasing
/// slot number. Does not delete the partials from the queue, this must be done manually.
///
/// Returns `(queue_index, block, sender)`:
///
/// - `block_root`: may be used to remove the entry if it is successfully processed.
/// - `block`: the completed block.
/// - `sender`: the `PeerId` the provided the `BeaconBlockBody` which completed the partial.
pub fn complete_blocks(&self) -> Vec<(Hash256, BeaconBlock, PeerId)> {
let mut complete: Vec<(Hash256, BeaconBlock, PeerId)> = self
.partials
.iter()
.filter_map(|partial| partial.clone().complete())
.collect();
/// Returns true of the if the `BlockRoot` is found in the `import_queue`.
pub fn contains_block_root(&self, block_root: Hash256) -> bool {
self.partials.contains_key(&block_root)
}
// Sort the completable partials to be in ascending slot order.
complete.sort_unstable_by(|a, b| a.1.slot.partial_cmp(&b.1.slot).unwrap());
complete
/// Attempts to complete the `BlockRoot` if it is found in the `import_queue`.
///
/// Returns an Enum with a `PartialBeaconBlockCompletion`.
/// Does not remove the `block_root` from the `import_queue`.
pub fn attempt_complete_block(&self, block_root: Hash256) -> PartialBeaconBlockCompletion {
if let Some(partial) = self.partials.get(&block_root) {
partial.attempt_complete()
} else {
PartialBeaconBlockCompletion::MissingRoot
}
}
/// Removes the first `PartialBeaconBlock` with a matching `block_root`, returning the partial
/// if it exists.
pub fn remove(&mut self, block_root: Hash256) -> Option<PartialBeaconBlock> {
let position = self
.partials
.iter()
.position(|p| p.block_root == block_root)?;
Some(self.partials.remove(position))
self.partials.remove(&block_root)
}
/// Flushes all stale entries from the queue.
@@ -76,31 +69,10 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
/// An entry is stale if it has as a `inserted` time that is more than `self.stale_time` in the
/// past.
pub fn remove_stale(&mut self) {
let stale_indices: Vec<usize> = self
.partials
.iter()
.enumerate()
.filter_map(|(i, partial)| {
if partial.inserted + self.stale_time <= Instant::now() {
Some(i)
} else {
None
}
})
.collect();
let stale_time = self.stale_time;
if !stale_indices.is_empty() {
debug!(
self.log,
"ImportQueue removing stale entries";
"stale_items" => stale_indices.len(),
"stale_time_seconds" => self.stale_time.as_secs()
);
}
stale_indices.iter().for_each(|&i| {
self.partials.remove(i);
});
self.partials
.retain(|_, partial| partial.inserted + stale_time > Instant::now())
}
/// Returns `true` if `self.chain` has not yet processed this block.
@@ -122,27 +94,32 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
block_roots: &[BlockRootSlot],
sender: PeerId,
) -> Vec<BlockRootSlot> {
let new_roots: Vec<BlockRootSlot> = block_roots
// TODO: This will currently not return a `BlockRootSlot` if this root exists but there is no header.
// It would be more robust if it did.
let new_block_root_slots: Vec<BlockRootSlot> = block_roots
.iter()
// Ignore any roots already stored in the queue.
.filter(|brs| !self.contains_block_root(brs.block_root))
// Ignore any roots already processed by the chain.
.filter(|brs| self.chain_has_not_seen_block(&brs.block_root))
// Ignore any roots already stored in the queue.
.filter(|brs| !self.partials.iter().any(|p| p.block_root == brs.block_root))
.cloned()
.collect();
new_roots.iter().for_each(|brs| {
self.partials.push(PartialBeaconBlock {
slot: brs.slot,
block_root: brs.block_root,
sender: sender.clone(),
header: None,
body: None,
inserted: Instant::now(),
})
});
self.partials.extend(
new_block_root_slots
.iter()
.map(|brs| PartialBeaconBlock {
slot: brs.slot,
block_root: brs.block_root,
sender: sender.clone(),
header: None,
body: None,
inserted: Instant::now(),
})
.map(|partial| (partial.block_root, partial)),
);
new_roots
new_block_root_slots
}
/// Adds the `headers` to the `partials` queue. Returns a list of `Hash256` block roots for
@@ -152,12 +129,8 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
/// the queue and it's block root is included in the output.
///
/// If a `header` is already in the queue, but not yet processed by the chain the block root is
/// included in the output and the `inserted` time for the partial record is set to
/// not included in the output and the `inserted` time for the partial record is set to
/// `Instant::now()`. Updating the `inserted` time stops the partial from becoming stale.
///
/// Presently the queue enforces that a `BeaconBlockHeader` _must_ be received before its
/// `BeaconBlockBody`. This is not a natural requirement and we could enhance the queue to lift
/// this restraint.
pub fn enqueue_headers(
&mut self,
headers: Vec<BeaconBlockHeader>,
@@ -169,8 +142,10 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
let block_root = Hash256::from_slice(&header.canonical_root()[..]);
if self.chain_has_not_seen_block(&block_root) {
self.insert_header(block_root, header, sender.clone());
required_bodies.push(block_root)
if !self.insert_header(block_root, header, sender.clone()) {
// If a body is empty
required_bodies.push(block_root);
}
}
}
@@ -180,10 +155,17 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
/// If there is a matching `header` for this `body`, adds it to the queue.
///
/// If there is no `header` for the `body`, the body is simply discarded.
pub fn enqueue_bodies(&mut self, bodies: Vec<BeaconBlockBody>, sender: PeerId) {
pub fn enqueue_bodies(
&mut self,
bodies: Vec<BeaconBlockBody>,
sender: PeerId,
) -> Option<Hash256> {
let mut last_block_hash = None;
for body in bodies {
self.insert_body(body, sender.clone());
last_block_hash = self.insert_body(body, sender.clone());
}
last_block_hash
}
pub fn enqueue_full_blocks(&mut self, blocks: Vec<BeaconBlock>, sender: PeerId) {
@@ -196,54 +178,55 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
///
/// If the header already exists, the `inserted` time is set to `now` and not other
/// modifications are made.
fn insert_header(&mut self, block_root: Hash256, header: BeaconBlockHeader, sender: PeerId) {
if let Some(i) = self
.partials
.iter()
.position(|p| p.block_root == block_root)
{
// Case 1: there already exists a partial with a matching block root.
//
// The `inserted` time is set to now and the header is replaced, regardless of whether
// it existed or not.
self.partials[i].header = Some(header);
self.partials[i].inserted = Instant::now();
} else {
// Case 2: there was no partial with a matching block root.
//
// A new partial is added. This case permits adding a header without already known the
// root.
self.partials.push(PartialBeaconBlock {
/// Returns true is `body` exists.
fn insert_header(
&mut self,
block_root: Hash256,
header: BeaconBlockHeader,
sender: PeerId,
) -> bool {
let mut exists = false;
self.partials
.entry(block_root)
.and_modify(|partial| {
partial.header = Some(header.clone());
partial.inserted = Instant::now();
if partial.body.is_some() {
exists = true;
}
})
.or_insert_with(|| PartialBeaconBlock {
slot: header.slot,
block_root,
header: Some(header),
body: None,
inserted: Instant::now(),
sender,
})
}
});
exists
}
/// Updates an existing partial with the `body`.
///
/// If there is no header for the `body`, the body is simply discarded.
///
/// If the body already existed, the `inserted` time is set to `now`.
fn insert_body(&mut self, body: BeaconBlockBody, sender: PeerId) {
///
/// Returns the block hash of the inserted body
fn insert_body(&mut self, body: BeaconBlockBody, sender: PeerId) -> Option<Hash256> {
let body_root = Hash256::from_slice(&body.tree_hash_root()[..]);
let mut last_root = None;
self.partials.iter_mut().for_each(|mut p| {
self.partials.iter_mut().for_each(|(root, mut p)| {
if let Some(header) = &mut p.header {
if body_root == header.block_body_root {
p.inserted = Instant::now();
if p.body.is_none() {
p.body = Some(body.clone());
p.sender = sender.clone();
}
p.body = Some(body.clone());
p.sender = sender.clone();
last_root = Some(*root);
}
}
});
last_root
}
/// Updates an existing `partial` with the completed block, or adds a new (complete) partial.
@@ -261,15 +244,10 @@ impl<T: BeaconChainTypes> ImportQueue<T> {
sender,
};
if let Some(i) = self
.partials
.iter()
.position(|p| p.block_root == block_root)
{
self.partials[i] = partial;
} else {
self.partials.push(partial)
}
self.partials
.entry(block_root)
.and_modify(|existing_partial| *existing_partial = partial.clone())
.or_insert(partial);
}
}
@@ -290,13 +268,33 @@ pub struct PartialBeaconBlock {
}
impl PartialBeaconBlock {
/// Consumes `self` and returns a full built `BeaconBlock`, it's root and the `sender`
/// `PeerId`, if enough information exists to complete the block. Otherwise, returns `None`.
pub fn complete(self) -> Option<(Hash256, BeaconBlock, PeerId)> {
Some((
self.block_root,
self.header?.into_block(self.body?),
self.sender,
))
/// Attempts to build a block.
///
/// Does not comsume the `PartialBeaconBlock`.
pub fn attempt_complete(&self) -> PartialBeaconBlockCompletion {
if self.header.is_none() {
PartialBeaconBlockCompletion::MissingHeader(self.slot)
} else if self.body.is_none() {
PartialBeaconBlockCompletion::MissingBody
} else {
PartialBeaconBlockCompletion::Complete(
self.header
.clone()
.unwrap()
.into_block(self.body.clone().unwrap()),
)
}
}
}
/// The result of trying to convert a `BeaconBlock` into a `PartialBeaconBlock`.
pub enum PartialBeaconBlockCompletion {
/// The partial contains a valid BeaconBlock.
Complete(BeaconBlock),
/// The partial does not exist.
MissingRoot,
/// The partial contains a `BeaconBlockRoot` but no `BeaconBlockHeader`.
MissingHeader(Slot),
/// The partial contains a `BeaconBlockRoot` and `BeaconBlockHeader` but no `BeaconBlockBody`.
MissingBody,
}

View File

@@ -1,4 +1,4 @@
use super::import_queue::ImportQueue;
use super::import_queue::{ImportQueue, PartialBeaconBlockCompletion};
use crate::message_handler::NetworkContext;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome};
use eth2_libp2p::rpc::methods::*;
@@ -18,7 +18,7 @@ use types::{
const SLOT_IMPORT_TOLERANCE: u64 = 100;
/// The amount of seconds a block (or partial block) may exist in the import queue.
const QUEUE_STALE_SECS: u64 = 600;
const QUEUE_STALE_SECS: u64 = 100;
/// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it.
/// Otherwise we queue it.
@@ -75,7 +75,6 @@ pub struct SimpleSync<T: BeaconChainTypes> {
import_queue: ImportQueue<T>,
/// The current state of the syncing protocol.
state: SyncState,
/// Sync logger.
log: slog::Logger,
}
@@ -174,96 +173,105 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
hello: HelloMessage,
network: &mut NetworkContext,
) {
let spec = &self.chain.spec;
let remote = PeerSyncInfo::from(hello);
let local = PeerSyncInfo::from(&self.chain);
// Disconnect nodes who are on a different network.
let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch());
if local.network_id != remote.network_id {
// The node is on a different network, disconnect them.
info!(
self.log, "HandshakeFailure";
"peer" => format!("{:?}", peer_id),
"reason" => "network_id"
);
network.disconnect(peer_id.clone(), GoodbyeReason::IrreleventNetwork);
// Disconnect nodes if our finalized epoch is greater than thieirs, and their finalized
// epoch is not in our chain. Viz., they are on another chain.
//
// If the local or remote have a `latest_finalized_root == ZERO_HASH`, skips checks about
// the finalized_root. The logic is akward and I think we're better without it.
} else if (local.latest_finalized_epoch >= remote.latest_finalized_epoch)
&& (!self
.chain
.rev_iter_block_roots(local.best_slot)
.any(|(root, _slot)| root == remote.latest_finalized_root))
&& (local.latest_finalized_root != spec.zero_hash)
&& (remote.latest_finalized_root != spec.zero_hash)
} else if remote.latest_finalized_epoch <= local.latest_finalized_epoch
&& remote.latest_finalized_root != self.chain.spec.zero_hash
&& local.latest_finalized_root != self.chain.spec.zero_hash
&& (self.root_at_slot(start_slot(remote.latest_finalized_epoch))
!= Some(remote.latest_finalized_root))
{
// The remotes finalized epoch is less than or greater than ours, but the block root is
// different to the one in our chain.
//
// Therefore, the node is on a different chain and we should not communicate with them.
info!(
self.log, "HandshakeFailure";
"peer" => format!("{:?}", peer_id),
"reason" => "wrong_finalized_chain"
"reason" => "different finalized chain"
);
network.disconnect(peer_id.clone(), GoodbyeReason::IrreleventNetwork);
// Process handshakes from peers that seem to be on our chain.
} else {
info!(self.log, "HandshakeSuccess"; "peer" => format!("{:?}", peer_id));
self.known_peers.insert(peer_id.clone(), remote);
// If we have equal or better finalized epochs and best slots, we require nothing else from
// this peer.
} else if remote.latest_finalized_epoch < local.latest_finalized_epoch {
// The node has a lower finalized epoch, their chain is not useful to us. There are two
// cases where a node can have a lower finalized epoch:
//
// We make an exception when our best slot is 0. Best slot does not indicate whether or
// not there is a block at slot zero.
if (remote.latest_finalized_epoch <= local.latest_finalized_epoch)
&& (remote.best_slot <= local.best_slot)
&& (local.best_slot > 0)
{
debug!(self.log, "Peer is naive"; "peer" => format!("{:?}", peer_id));
return;
}
// ## The node is on the same chain
//
// If a node is on the same chain but has a lower finalized epoch, their head must be
// lower than ours. Therefore, we have nothing to request from them.
//
// ## The node is on a fork
//
// If a node is on a fork that has a lower finalized epoch, switching to that fork would
// cause us to revert a finalized block. This is not permitted, therefore we have no
// interest in their blocks.
debug!(
self.log,
"NaivePeer";
"peer" => format!("{:?}", peer_id),
"reason" => "lower finalized epoch"
);
} else if self
.chain
.store
.exists::<BeaconBlock>(&remote.best_root)
.unwrap_or_else(|_| false)
{
// If the node's best-block is already known to us, we have nothing to request.
debug!(
self.log,
"NaivePeer";
"peer" => format!("{:?}", peer_id),
"reason" => "best block is known"
);
} else {
// The remote node has an equal or great finalized epoch and we don't know it's head.
//
// Therefore, there are some blocks between the local finalized epoch and the remote
// head that are worth downloading.
debug!(
self.log, "UsefulPeer";
"peer" => format!("{:?}", peer_id),
"local_finalized_epoch" => local.latest_finalized_epoch,
"remote_latest_finalized_epoch" => remote.latest_finalized_epoch,
);
// If the remote has a higher finalized epoch, request all block roots from our finalized
// epoch through to its best slot.
if remote.latest_finalized_epoch > local.latest_finalized_epoch {
debug!(self.log, "Peer has high finalized epoch"; "peer" => format!("{:?}", peer_id));
let start_slot = local
.latest_finalized_epoch
.start_slot(T::EthSpec::slots_per_epoch());
let required_slots = remote.best_slot - start_slot;
let start_slot = local
.latest_finalized_epoch
.start_slot(T::EthSpec::slots_per_epoch());
let required_slots = remote.best_slot - start_slot;
self.request_block_roots(
peer_id,
BeaconBlockRootsRequest {
start_slot,
count: required_slots.into(),
},
network,
);
// If the remote has a greater best slot, request the roots between our best slot and their
// best slot.
} else if remote.best_slot > local.best_slot {
debug!(self.log, "Peer has higher best slot"; "peer" => format!("{:?}", peer_id));
let start_slot = local
.latest_finalized_epoch
.start_slot(T::EthSpec::slots_per_epoch());
let required_slots = remote.best_slot - start_slot;
self.request_block_roots(
peer_id,
BeaconBlockRootsRequest {
start_slot,
count: required_slots.into(),
},
network,
);
} else {
debug!(self.log, "Nothing to request from peer"; "peer" => format!("{:?}", peer_id));
}
self.request_block_roots(
peer_id,
BeaconBlockRootsRequest {
start_slot,
count: required_slots.as_u64(),
},
network,
);
}
}
fn root_at_slot(&self, target_slot: Slot) -> Option<Hash256> {
self.chain
.rev_iter_best_block_roots(target_slot)
.take(1)
.find(|(_root, slot)| *slot == target_slot)
.map(|(root, _slot)| root)
}
/// Handle a `BeaconBlockRoots` request from the peer.
pub fn on_beacon_block_roots_request(
&mut self,
@@ -282,18 +290,19 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
let mut roots: Vec<BlockRootSlot> = self
.chain
.rev_iter_block_roots(req.start_slot + req.count)
.skip(1)
.rev_iter_best_block_roots(req.start_slot + req.count)
.take(req.count as usize)
.map(|(block_root, slot)| BlockRootSlot { slot, block_root })
.collect();
if roots.len() as u64 != req.count {
debug!(
warn!(
self.log,
"BlockRootsRequest";
"peer" => format!("{:?}", peer_id),
"msg" => "Failed to return all requested hashes",
"start_slot" => req.start_slot,
"current_slot" => self.chain.current_state().slot,
"requested" => req.count,
"returned" => roots.len(),
);
@@ -395,7 +404,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
// unnecessary block deserialization when `req.skip_slots > 0`.
let mut roots: Vec<Hash256> = self
.chain
.rev_iter_block_roots(req.start_slot + (count - 1))
.rev_iter_best_block_roots(req.start_slot + count)
.take(count as usize)
.map(|(root, _slot)| root)
.collect();
@@ -454,7 +463,9 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
.import_queue
.enqueue_headers(res.headers, peer_id.clone());
self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network);
if !block_roots.is_empty() {
self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network);
}
}
/// Handle a `BeaconBlockBodies` request from the peer.
@@ -522,14 +533,26 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
"count" => res.block_bodies.len(),
);
self.import_queue
.enqueue_bodies(res.block_bodies, peer_id.clone());
if !res.block_bodies.is_empty() {
// Import all blocks to queue
let last_root = self
.import_queue
.enqueue_bodies(res.block_bodies, peer_id.clone());
// Attempt to process all recieved bodies by recursively processing the latest block
if let Some(root) = last_root {
match self.attempt_process_partial_block(peer_id, root, network, &"rpc") {
Some(BlockProcessingOutcome::Processed { block_root: _ }) => {
// If processing is successful remove from `import_queue`
self.import_queue.remove(root);
}
_ => {}
}
}
}
// Clear out old entries
self.import_queue.remove_stale();
// Import blocks, if possible.
self.process_import_queue(network);
}
/// Process a gossip message declaring a new block.
@@ -548,15 +571,37 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
{
match outcome {
BlockProcessingOutcome::Processed { .. } => SHOULD_FORWARD_GOSSIP_BLOCK,
BlockProcessingOutcome::ParentUnknown { .. } => {
BlockProcessingOutcome::ParentUnknown { parent } => {
// Add this block to the queue
self.import_queue
.enqueue_full_blocks(vec![block], peer_id.clone());
trace!(
self.log,
"NewGossipBlock";
.enqueue_full_blocks(vec![block.clone()], peer_id.clone());
debug!(
self.log, "RequestParentBlock";
"parent_root" => format!("{}", parent),
"parent_slot" => block.slot - 1,
"peer" => format!("{:?}", peer_id),
);
// Request roots between parent and start of finality from peer.
let start_slot = self
.chain
.head()
.beacon_state
.finalized_epoch
.start_slot(T::EthSpec::slots_per_epoch());
self.request_block_roots(
peer_id,
BeaconBlockRootsRequest {
// Request blocks between `latest_finalized_slot` and the `block`
start_slot,
count: block.slot.as_u64() - start_slot.as_u64(),
},
network,
);
// Clean the stale entries from the queue.
self.import_queue.remove_stale();
SHOULD_FORWARD_GOSSIP_BLOCK
}
@@ -598,40 +643,6 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
}
}
/// Iterate through the `import_queue` and process any complete blocks.
///
/// If a block is successfully processed it is removed from the queue, otherwise it remains in
/// the queue.
pub fn process_import_queue(&mut self, network: &mut NetworkContext) {
let mut successful = 0;
// Loop through all of the complete blocks in the queue.
for (block_root, block, sender) in self.import_queue.complete_blocks() {
let processing_result = self.process_block(sender, block.clone(), network, &"gossip");
let should_dequeue = match processing_result {
Some(BlockProcessingOutcome::ParentUnknown { .. }) => false,
Some(BlockProcessingOutcome::FutureSlot {
present_slot,
block_slot,
}) if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot => false,
_ => true,
};
if processing_result == Some(BlockProcessingOutcome::Processed { block_root }) {
successful += 1;
}
if should_dequeue {
self.import_queue.remove(block_root);
}
}
if successful > 0 {
info!(self.log, "Imported {} blocks", successful)
}
}
/// Request some `BeaconBlockRoots` from the remote peer.
fn request_block_roots(
&mut self,
@@ -706,6 +717,89 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
hello_message(&self.chain)
}
/// Helper function to attempt to process a partial block.
///
/// If the block can be completed recursively call `process_block`
/// else request missing parts.
fn attempt_process_partial_block(
&mut self,
peer_id: PeerId,
block_root: Hash256,
network: &mut NetworkContext,
source: &str,
) -> Option<BlockProcessingOutcome> {
match self.import_queue.attempt_complete_block(block_root) {
PartialBeaconBlockCompletion::MissingBody => {
// Unable to complete the block because the block body is missing.
debug!(
self.log, "RequestParentBody";
"source" => source,
"block_root" => format!("{}", block_root),
"peer" => format!("{:?}", peer_id),
);
// Request the block body from the peer.
self.request_block_bodies(
peer_id,
BeaconBlockBodiesRequest {
block_roots: vec![block_root],
},
network,
);
None
}
PartialBeaconBlockCompletion::MissingHeader(slot) => {
// Unable to complete the block because the block header is missing.
debug!(
self.log, "RequestParentHeader";
"source" => source,
"block_root" => format!("{}", block_root),
"peer" => format!("{:?}", peer_id),
);
// Request the block header from the peer.
self.request_block_headers(
peer_id,
BeaconBlockHeadersRequest {
start_root: block_root,
start_slot: slot,
max_headers: 1,
skip_slots: 0,
},
network,
);
None
}
PartialBeaconBlockCompletion::MissingRoot => {
// The `block_root` is not known to the queue.
debug!(
self.log, "MissingParentRoot";
"source" => source,
"block_root" => format!("{}", block_root),
"peer" => format!("{:?}", peer_id),
);
// Do nothing.
None
}
PartialBeaconBlockCompletion::Complete(block) => {
// The block exists in the queue, attempt to process it
trace!(
self.log, "AttemptProcessParent";
"source" => source,
"block_root" => format!("{}", block_root),
"parent_slot" => block.slot,
"peer" => format!("{:?}", peer_id),
);
self.process_block(peer_id.clone(), block, network, source)
}
}
}
/// Processes the `block` that was received from `peer_id`.
///
/// If the block was submitted to the beacon chain without internal error, `Some(outcome)` is
@@ -732,7 +826,8 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
if let Ok(outcome) = processing_result {
match outcome {
BlockProcessingOutcome::Processed { block_root } => {
info!(
// The block was valid and we processed it successfully.
debug!(
self.log, "Imported block from network";
"source" => source,
"slot" => block.slot,
@@ -741,36 +836,30 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
);
}
BlockProcessingOutcome::ParentUnknown { parent } => {
// The block was valid and we processed it successfully.
debug!(
// The parent has not been processed
trace!(
self.log, "ParentBlockUnknown";
"source" => source,
"parent_root" => format!("{}", parent),
"baby_block_slot" => block.slot,
"peer" => format!("{:?}", peer_id),
);
// Send a hello to learn of the clients best slot so we can then sync the require
// parent(s).
network.send_rpc_request(
peer_id.clone(),
RPCRequest::Hello(hello_message(&self.chain)),
);
// If the parent is in the `import_queue` attempt to complete it then process it.
match self.attempt_process_partial_block(peer_id, parent, network, source) {
// If processing parent is sucessful, re-process block and remove parent from queue
Some(BlockProcessingOutcome::Processed { block_root: _ }) => {
self.import_queue.remove(parent);
// Explicitly request the parent block from the peer.
//
// It is likely that this is duplicate work, given we already send a hello
// request. However, I believe there are some edge-cases where the hello
// message doesn't suffice, so we perform this request as well.
self.request_block_headers(
peer_id,
BeaconBlockHeadersRequest {
start_root: parent,
start_slot: block.slot - 1,
max_headers: 1,
skip_slots: 0,
},
network,
)
// Attempt to process `block` again
match self.chain.process_block(block) {
Ok(outcome) => return Some(outcome),
Err(_) => return None,
}
}
// All other cases leave `parent` in `import_queue` and return original outcome.
_ => {}
}
}
BlockProcessingOutcome::FutureSlot {
present_slot,