Refactor block processing results, some sync logic

This commit is contained in:
Paul Hauner
2019-06-12 23:54:16 -04:00
parent b9e832216b
commit 4c0724fba6
5 changed files with 215 additions and 359 deletions

View File

@@ -1,6 +1,6 @@
use super::import_queue::ImportQueue;
use crate::message_handler::NetworkContext;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome, InvalidBlock};
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome};
use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::{RPCRequest, RPCResponse, RequestId};
use eth2_libp2p::PeerId;
@@ -9,7 +9,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use store::Store;
use tree_hash::TreeHash;
use types::{
Attestation, BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, EthSpec, Hash256, Slot,
};
@@ -24,6 +23,9 @@ const QUEUE_STALE_SECS: u64 = 600;
/// Otherwise we queue it.
const FUTURE_SLOT_TOLERANCE: u64 = 1;
const SHOULD_FORWARD_GOSSIP_BLOCK: bool = true;
const SHOULD_NOT_FORWARD_GOSSIP_BLOCK: bool = false;
/// Keeps track of syncing information for known connected peers.
#[derive(Clone, Copy, Debug)]
pub struct PeerSyncInfo {
@@ -393,6 +395,7 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
.collect();
roots.reverse();
roots.dedup();
let headers: Vec<BeaconBlockHeader> = roots
.into_iter()
@@ -509,6 +512,8 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
/// Process a gossip message declaring a new block.
///
/// Attempts to apply to block to the beacon chain. May queue the block for later processing.
///
/// Returns a `bool` which, if `true`, indicates we should forward the block to our peers.
pub fn on_block_gossip(
&mut self,
@@ -516,133 +521,35 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
block: BeaconBlock,
network: &mut NetworkContext,
) -> bool {
// Ignore any block from a finalized slot.
if self.slot_is_finalized(block.slot) {
debug!(
self.log, "IgnoredFinalizedBlock";
"source" => "gossip",
"msg" => "chain is finalized at block slot",
"block_slot" => block.slot,
);
return false;
}
if let Some(outcome) =
self.process_block(peer_id.clone(), block.clone(), network, &"gossip")
{
match outcome {
BlockProcessingOutcome::Processed => SHOULD_FORWARD_GOSSIP_BLOCK,
BlockProcessingOutcome::ParentUnknown { .. } => {
self.import_queue
.enqueue_full_blocks(vec![block], peer_id.clone());
let block_root = Hash256::from_slice(&block.tree_hash_root());
// Ignore any block that the chain already knows about.
if self.chain_has_seen_block(&block_root) {
// TODO: Age confirm that we shouldn't forward a block if we already know of it.
return false;
}
match self.chain.process_block(block.clone()) {
Ok(BlockProcessingOutcome::InvalidBlock(InvalidBlock::ParentUnknown { .. })) => {
// The block was valid and we processed it successfully.
debug!(
self.log, "ParentBlockUnknown";
"source" => "gossip",
"parent_root" => format!("{}", block.previous_block_root),
"peer" => format!("{:?}", peer_id),
);
// Queue the block for later processing.
self.import_queue
.enqueue_full_blocks(vec![block], peer_id.clone());
// Send a hello to learn of the clients best slot so we can then sync the require
// parent(s).
network.send_rpc_request(
peer_id.clone(),
RPCRequest::Hello(hello_message(&self.chain)),
);
// Forward the block onto our peers.
//
// Note: this may need to be changed if we decide to only forward blocks if we have
// all required info.
true
}
Ok(BlockProcessingOutcome::InvalidBlock(InvalidBlock::FutureSlot {
present_slot,
block_slot,
})) => {
if block_slot - present_slot > FUTURE_SLOT_TOLERANCE {
// The block is too far in the future, drop it.
warn!(
self.log, "FutureBlock";
"source" => "gossip",
"msg" => "block for future slot rejected, check your time",
"present_slot" => present_slot,
"block_slot" => block_slot,
"FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE,
"peer" => format!("{:?}", peer_id),
);
// Do not forward the block around to peers.
false
} else {
// The block is in the future, but not too far.
warn!(
self.log, "QueuedFutureBlock";
"source" => "gossip",
"msg" => "queuing future block, check your time",
"present_slot" => present_slot,
"block_slot" => block_slot,
"FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE,
"peer" => format!("{:?}", peer_id),
);
// Queue the block for later processing.
self.import_queue.enqueue_full_blocks(vec![block], peer_id);
// Forward the block around to peers.
true
SHOULD_FORWARD_GOSSIP_BLOCK
}
}
Ok(outcome) => {
if outcome.is_invalid() {
// The peer has sent a block which is fundamentally invalid.
warn!(
self.log, "InvalidBlock";
"source" => "gossip",
"msg" => "peer sent objectively invalid block",
"outcome" => format!("{:?}", outcome),
"peer" => format!("{:?}", peer_id),
);
// Disconnect the peer
network.disconnect(peer_id, GoodbyeReason::Fault);
// Do not forward the block to peers.
false
} else if outcome.sucessfully_processed() {
// The block was valid and we processed it successfully.
info!(
self.log, "ImportedBlock";
"source" => "gossip",
"peer" => format!("{:?}", peer_id),
);
// Forward the block to peers
true
} else {
// The block wasn't necessarily invalid but we didn't process it successfully.
// This condition shouldn't be reached.
error!(
self.log, "BlockProcessingFailure";
"source" => "gossip",
"msg" => "unexpected condition in processing block.",
"outcome" => format!("{:?}", outcome),
);
// Do not forward the block on.
false
BlockProcessingOutcome::FutureSlot {
present_slot,
block_slot,
} if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot => {
self.import_queue
.enqueue_full_blocks(vec![block], peer_id.clone());
SHOULD_FORWARD_GOSSIP_BLOCK
}
}
Err(e) => {
// We encountered an error whilst processing the block.
// Note: known blocks are forwarded on the gossip network.
//
// Blocks should not be able to trigger errors, instead they should be flagged as
// invalid.
error!(
self.log, "BlockProcessingError";
"msg" => "internal error in processing block.",
"source" => "gossip",
"error" => format!("{:?}", e),
);
// Do not forward the block to peers.
false
// We rely upon the lower layers (libp2p) to stop loops occuring from re-gossiped
// blocks.
BlockProcessingOutcome::BlockIsAlreadyKnown => SHOULD_FORWARD_GOSSIP_BLOCK,
_ => SHOULD_NOT_FORWARD_GOSSIP_BLOCK,
}
} else {
SHOULD_NOT_FORWARD_GOSSIP_BLOCK
}
}
@@ -669,57 +576,32 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
/// the queue.
pub fn process_import_queue(&mut self, network: &mut NetworkContext) {
let mut successful = 0;
let mut invalid = 0;
let mut errored = 0;
// Loop through all of the complete blocks in the queue.
for (block_root, block, sender) in self.import_queue.complete_blocks() {
let slot = block.slot;
let parent_root = block.previous_block_root;
let processing_result = self.process_block(sender, block.clone(), network, &"gossip");
match self.chain.process_block(block) {
Ok(outcome) => {
if outcome.is_invalid() {
invalid += 1;
warn!(
self.log,
"InvalidBlock";
"sender_peer_id" => format!("{:?}", sender.clone()),
"block_root" => format!("{}", block_root),
"reason" => format!("{:?}", outcome),
);
network.disconnect(sender, GoodbyeReason::Fault);
} else if outcome.sucessfully_processed() {
successful += 1;
self.import_queue.remove(block_root);
} else {
debug!(
self.log,
"ProcessImportQueue";
"msg" => "Block not imported",
"outcome" => format!("{:?}", outcome),
"block_slot" => format!("{:?}", slot),
"parent_root" => format!("{}", parent_root),
"peer" => format!("{:?}", sender),
);
}
}
Err(e) => {
errored += 1;
error!(self.log, "BlockProcessingError"; "error" => format!("{:?}", e));
}
let should_dequeue = match processing_result {
Some(BlockProcessingOutcome::ParentUnknown { .. }) => false,
Some(BlockProcessingOutcome::FutureSlot {
present_slot,
block_slot,
}) if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot => false,
_ => true,
};
if processing_result == Some(BlockProcessingOutcome::Processed) {
successful += 1;
}
if should_dequeue {
self.import_queue.remove(block_root);
}
}
if successful > 0 {
info!(self.log, "Imported {} blocks", successful)
}
if invalid > 0 {
warn!(self.log, "Rejected {} invalid blocks", invalid)
}
if errored > 0 {
warn!(self.log, "Failed to process {} blocks", errored)
}
}
/// Request some `BeaconBlockRoots` from the remote peer.
@@ -791,17 +673,128 @@ impl<T: BeaconChainTypes> SimpleSync<T> {
})
}
/// Returns `true` if the given slot is finalized in our chain.
fn slot_is_finalized(&self, slot: Slot) -> bool {
slot <= hello_message(&self.chain)
.latest_finalized_epoch
.start_slot(T::EthSpec::slots_per_epoch())
}
/// Generates our current state in the form of a HELLO RPC message.
pub fn generate_hello(&self) -> HelloMessage {
hello_message(&self.chain)
}
/// Processes the `block` that was received from `peer_id`.
///
/// If the block was submitted to the beacon chain without internal error, `Some(outcome)` is
/// returned, otherwise `None` is returned. Note: `Some(_)` does not necessarily indicate that
/// the block was successfully processed or valid.
///
/// This function performs the following duties:
///
/// - Attempting to import the block into the beacon chain.
/// - Logging
/// - Requesting unavailable blocks (e.g., if parent is unknown).
/// - Disconnecting faulty nodes.
///
/// This function does not remove processed blocks from the import queue.
fn process_block(
&mut self,
peer_id: PeerId,
block: BeaconBlock,
network: &mut NetworkContext,
source: &str,
) -> Option<BlockProcessingOutcome> {
let processing_result = self.chain.process_block(block.clone());
if let Ok(outcome) = processing_result {
match outcome {
BlockProcessingOutcome::Processed => {
info!(
self.log, "Imported block from network";
"source" => source,
"slot" => block.slot,
"peer" => format!("{:?}", peer_id),
);
}
BlockProcessingOutcome::ParentUnknown { parent } => {
// The block was valid and we processed it successfully.
debug!(
self.log, "ParentBlockUnknown";
"source" => source,
"parent_root" => format!("{}", parent),
"peer" => format!("{:?}", peer_id),
);
// Send a hello to learn of the clients best slot so we can then sync the require
// parent(s).
network.send_rpc_request(
peer_id.clone(),
RPCRequest::Hello(hello_message(&self.chain)),
);
// Explicitly request the parent block from the peer.
//
// It is likely that this is duplicate work, given we already send a hello
// request. However, I believe there are some edge-cases where the hello
// message doesn't suffice, so we perform this request as well.
self.request_block_headers(
peer_id,
BeaconBlockHeadersRequest {
start_root: parent,
start_slot: block.slot - 1,
max_headers: 1,
skip_slots: 0,
},
network,
)
}
BlockProcessingOutcome::FutureSlot {
present_slot,
block_slot,
} => {
if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot {
// The block is too far in the future, drop it.
warn!(
self.log, "FutureBlock";
"source" => source,
"msg" => "block for future slot rejected, check your time",
"present_slot" => present_slot,
"block_slot" => block_slot,
"FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE,
"peer" => format!("{:?}", peer_id),
);
network.disconnect(peer_id, GoodbyeReason::Fault);
} else {
// The block is in the future, but not too far.
debug!(
self.log, "QueuedFutureBlock";
"source" => source,
"msg" => "queuing future block, check your time",
"present_slot" => present_slot,
"block_slot" => block_slot,
"FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE,
"peer" => format!("{:?}", peer_id),
);
}
}
_ => {
debug!(
self.log, "InvalidBlock";
"source" => source,
"msg" => "peer sent invalid block",
"outcome" => format!("{:?}", outcome),
"peer" => format!("{:?}", peer_id),
);
}
}
Some(outcome)
} else {
error!(
self.log, "BlockProcessingFailure";
"source" => source,
"msg" => "unexpected condition in processing block.",
"outcome" => format!("{:?}", processing_result)
);
None
}
}
}
/// Build a `HelloMessage` representing the state of the given `beacon_chain`.