add handling of failed batches that imported blocks (#996)

This commit is contained in:
divma
2020-04-13 08:23:44 -05:00
committed by GitHub
parent 7bf1ea2356
commit fa9daa488d
6 changed files with 302 additions and 144 deletions

View File

@@ -1,7 +1,7 @@
use crate::router::processor::FUTURE_SLOT_TOLERANCE;
use crate::sync::manager::SyncMessage;
use crate::sync::range_sync::BatchId;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError};
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, ChainSegmentResult};
use eth2_libp2p::PeerId;
use slog::{crit, debug, error, trace, warn};
use std::sync::{Arc, Weak};
@@ -25,6 +25,8 @@ pub enum BatchProcessResult {
Success,
/// The batch processing failed.
Failed,
/// The batch processing failed but managed to import at least one block.
Partial,
}
/// Spawns a thread handling the block processing of a request: range syncing or parent lookup.
@@ -41,11 +43,16 @@ pub fn spawn_block_processor<T: BeaconChainTypes>(
ProcessId::RangeBatchId(batch_id) => {
debug!(log, "Processing batch"; "id" => *batch_id, "blocks" => downloaded_blocks.len());
let result = match process_blocks(chain, downloaded_blocks.iter(), &log) {
Ok(_) => {
(_, Ok(_)) => {
debug!(log, "Batch processed"; "id" => *batch_id );
BatchProcessResult::Success
}
Err(e) => {
(imported_blocks, Err(e)) if imported_blocks > 0 => {
debug!(log, "Batch processing failed but imported some blocks";
"id" => *batch_id, "error" => e, "imported_blocks"=> imported_blocks);
BatchProcessResult::Partial
}
(_, Err(e)) => {
debug!(log, "Batch processing failed"; "id" => *batch_id, "error" => e);
BatchProcessResult::Failed
}
@@ -65,11 +72,15 @@ pub fn spawn_block_processor<T: BeaconChainTypes>(
}
// this a parent lookup request from the sync manager
ProcessId::ParentLookup(peer_id) => {
debug!(log, "Processing parent lookup"; "last_peer_id" => format!("{}", peer_id), "blocks" => downloaded_blocks.len());
debug!(
log, "Processing parent lookup";
"last_peer_id" => format!("{}", peer_id),
"blocks" => downloaded_blocks.len()
);
// parent blocks are ordered from highest slot to lowest, so we need to process in
// reverse
match process_blocks(chain, downloaded_blocks.iter().rev(), &log) {
Err(e) => {
(_, Err(e)) => {
warn!(log, "Parent lookup failed"; "last_peer_id" => format!("{}", peer_id), "error" => e);
sync_send
.try_send(SyncMessage::ParentLookupFailed(peer_id))
@@ -81,7 +92,7 @@ pub fn spawn_block_processor<T: BeaconChainTypes>(
);
});
}
Ok(_) => {
(_, Ok(_)) => {
debug!(log, "Parent lookup processed successfully");
}
}
@@ -101,98 +112,39 @@ fn process_blocks<
chain: Weak<BeaconChain<T>>,
downloaded_blocks: I,
log: &slog::Logger,
) -> Result<(), String> {
) -> (usize, Result<(), String>) {
if let Some(chain) = chain.upgrade() {
let blocks = downloaded_blocks.cloned().collect::<Vec<_>>();
match chain.process_chain_segment(blocks) {
Ok(roots) => {
if roots.is_empty() {
let (imported_blocks, r) = match chain.process_chain_segment(blocks) {
ChainSegmentResult::Successful { imported_blocks } => {
if imported_blocks == 0 {
debug!(log, "All blocks already known");
} else {
debug!(
log, "Imported blocks from network";
"count" => roots.len(),
"count" => imported_blocks,
);
// Batch completed successfully with at least one block, run fork choice.
// TODO: Verify this logic
run_fork_choice(chain, log);
}
(imported_blocks, Ok(()))
}
Err(BlockError::ParentUnknown(parent)) => {
// blocks should be sequential and all parents should exist
warn!(
log, "Parent block is unknown";
"parent_root" => format!("{}", parent),
);
return Err(format!("Block has an unknown parent: {}", parent));
ChainSegmentResult::Failed {
imported_blocks,
error,
} => {
let r = handle_failed_chain_segment(chain, imported_blocks, error, log);
(imported_blocks, r)
}
Err(BlockError::BlockIsAlreadyKnown) => {
// TODO: Check handling of this
crit!(log, "Unknown handling of block error");
}
Err(BlockError::FutureSlot {
present_slot,
block_slot,
}) => {
if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot {
// The block is too far in the future, drop it.
warn!(
log, "Block is ahead of our slot clock";
"msg" => "block for future slot rejected, check your time",
"present_slot" => present_slot,
"block_slot" => block_slot,
"FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE,
);
} else {
// The block is in the future, but not too far.
debug!(
log, "Block is slightly ahead of our slot clock, ignoring.";
"present_slot" => present_slot,
"block_slot" => block_slot,
"FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE,
);
}
return Err(format!(
"Block with slot {} is higher than the current slot {}",
block_slot, present_slot
));
}
Err(BlockError::WouldRevertFinalizedSlot { .. }) => {
//TODO: Check handling. Run fork choice?
debug!(
log, "Finalized or earlier block processed";
);
// block reached our finalized slot or was earlier, move to the next block
// TODO: How does this logic happen for the chain segment. We would want to
// continue processing in this case.
}
Err(BlockError::GenesisBlock) => {
debug!(
log, "Genesis block was processed";
);
// TODO: Similarly here. Prefer to continue processing.
}
Err(BlockError::BeaconChainError(e)) => {
// TODO: Run fork choice?
warn!(
log, "BlockProcessingFailure";
"msg" => "unexpected condition in processing block.",
"outcome" => format!("{:?}", e)
);
return Err(format!("Internal error whilst processing block: {:?}", e));
}
other => {
// TODO: Run fork choice?
warn!(
log, "Invalid block received";
"msg" => "peer sent invalid block",
"outcome" => format!("{:?}", other),
);
return Err(format!("Peer sent invalid block. Reason: {:?}", other));
}
}
};
return (imported_blocks, r);
}
Ok(())
(0, Ok(()))
}
/// Runs fork-choice on a given chain. This is used during block processing after one successful
@@ -212,3 +164,109 @@ fn run_fork_choice<T: BeaconChainTypes>(chain: Arc<BeaconChain<T>>, log: &slog::
),
}
}
/// Helper function to handle a `BlockError` from `process_chain_segment`
fn handle_failed_chain_segment<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
imported_blocks: usize,
error: BlockError,
log: &slog::Logger,
) -> Result<(), String> {
match error {
BlockError::ParentUnknown(parent) => {
// blocks should be sequential and all parents should exist
warn!(
log, "Parent block is unknown";
"parent_root" => format!("{}", parent),
);
// NOTE: logic from master. TODO: check
if imported_blocks > 0 {
run_fork_choice(chain, log);
}
Err(format!("Block has an unknown parent: {}", parent))
}
BlockError::BlockIsAlreadyKnown => {
// TODO: Check handling of this
crit!(log, "Unknown handling of block error");
Ok(())
}
BlockError::FutureSlot {
present_slot,
block_slot,
} => {
if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot {
// The block is too far in the future, drop it.
warn!(
log, "Block is ahead of our slot clock";
"msg" => "block for future slot rejected, check your time",
"present_slot" => present_slot,
"block_slot" => block_slot,
"FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE,
);
// NOTE: logic from master. TODO: check
if imported_blocks > 0 {
run_fork_choice(chain, log);
}
} else {
// The block is in the future, but not too far.
debug!(
log, "Block is slightly ahead of our slot clock, ignoring.";
"present_slot" => present_slot,
"block_slot" => block_slot,
"FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE,
);
}
Err(format!(
"Block with slot {} is higher than the current slot {}",
block_slot, present_slot
))
}
BlockError::WouldRevertFinalizedSlot { .. } => {
//TODO: Check handling. Run fork choice?
debug!(
log, "Finalized or earlier block processed";
);
// block reached our finalized slot or was earlier, move to the next block
// TODO: How does this logic happen for the chain segment. We would want to
// continue processing in this case.
Ok(())
}
BlockError::GenesisBlock => {
debug!(
log, "Genesis block was processed";
);
// TODO: Similarly here. Prefer to continue processing.
Ok(())
}
BlockError::BeaconChainError(e) => {
// TODO: Run fork choice?
warn!(
log, "BlockProcessingFailure";
"msg" => "unexpected condition in processing block.",
"outcome" => format!("{:?}", e)
);
Err(format!("Internal error whilst processing block: {:?}", e))
}
other => {
// TODO: Run fork choice?
// NOTE: logic from master. TODO: check
if imported_blocks > 0 {
run_fork_choice(chain, log);
}
warn!(
log, "Invalid block received";
"msg" => "peer sent invalid block",
"outcome" => format!("{:?}", other),
);
Err(format!("Peer sent invalid block. Reason: {:?}", other))
}
}
}

View File

@@ -52,7 +52,7 @@ pub struct SyncingChain<T: BeaconChainTypes> {
pub target_head_root: Hash256,
/// The batches that are currently awaiting a response from a peer. An RPC request for these
/// have been sent.
/// has been sent.
pub pending_batches: PendingBatches<T::EthSpec>,
/// The batches that have been downloaded and are awaiting processing and/or validation.
@@ -299,40 +299,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// If the processed batch was not empty, we can validate previous invalidated
// blocks
if !batch.downloaded_blocks.is_empty() {
// Remove any batches awaiting validation.
//
// All blocks in processed_batches should be prior batches. As the current
// batch has been processed with blocks in it, all previous batches are valid.
//
// If a previous batch has been validated and it had been re-processed, downvote
// the original peer.
while !self.processed_batches.is_empty() {
let processed_batch = self.processed_batches.remove(0);
if *processed_batch.id >= *batch.id {
crit!(self.log, "A processed batch had a greater id than the current process id";
"processed_id" => *processed_batch.id,
"current_id" => *batch.id);
}
if let Some(prev_hash) = processed_batch.original_hash {
// The validated batch has been re-processed
if prev_hash != processed_batch.hash() {
// The re-downloaded version was different
if processed_batch.current_peer != processed_batch.original_peer {
// A new peer sent the correct batch, the previous peer did not
// downvote the original peer
//
// If the same peer corrected it's mistake, we allow it.... for
// now.
debug!(self.log, "Re-processed batch validated. Downvoting original peer";
"batch_id" => *processed_batch.id,
"original_peer" => format!("{}",processed_batch.original_peer),
"new_peer" => format!("{}", processed_batch.current_peer));
network.downvote_peer(processed_batch.original_peer);
}
}
}
}
self.mark_processed_batches_as_valid(network, &batch);
}
// Add the current batch to processed batches to be verified in the future. We are
@@ -360,6 +327,32 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
ProcessingResult::KeepChain
}
}
BatchProcessResult::Partial => {
warn!(self.log, "Batch processing failed but at least one block was imported";
"id" => *batch.id, "peer" => format!("{}", batch.current_peer)
);
// At least one block was successfully verified and imported, so we can be sure all
// previous batches are valid and we only need to download the current failed
// batch.
self.mark_processed_batches_as_valid(network, &batch);
// check that we have not exceeded the re-process retry counter
if batch.reprocess_retries > INVALID_BATCH_LOOKUP_ATTEMPTS {
// if a batch has exceeded the invalid batch lookup attempts limit, it means
// that it is likely all peers in this chain are are sending invalid batches
// repeatedly and are either malicious or faulty. We drop the chain and
// downvote all peers.
warn!(self.log, "Batch failed to download. Dropping chain and downvoting peers"; "id"=> *batch.id);
for peer_id in self.peer_pool.drain() {
network.downvote_peer(peer_id);
}
ProcessingResult::RemoveChain
} else {
// Handle this invalid batch, that is within the re-process retries limit.
self.handle_invalid_batch(network, batch);
ProcessingResult::KeepChain
}
}
BatchProcessResult::Failed => {
warn!(self.log, "Batch processing failed"; "id" => *batch.id, "peer" => format!("{}", batch.current_peer));
// The batch processing failed
@@ -367,7 +360,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// is invalid. We need to find out which and downvote the peer that has sent us
// an invalid batch.
// check that we have no exceeded the re-process retry counter
// check that we have not exceeded the re-process retry counter
if batch.reprocess_retries > INVALID_BATCH_LOOKUP_ATTEMPTS {
// if a batch has exceeded the invalid batch lookup attempts limit, it means
// that it is likely all peers in this chain are are sending invalid batches
@@ -389,6 +382,49 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
Some(res)
}
/// Removes any batches awaiting validation.
///
/// All blocks in `processed_batches` should be prior batches. As the `last_batch` has been
/// processed with blocks in it, all previous batches are valid.
///
/// If a previous batch has been validated and it had been re-processed, downvote
/// the original peer.
fn mark_processed_batches_as_valid(
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
last_batch: &Batch<T::EthSpec>,
) {
while !self.processed_batches.is_empty() {
let processed_batch = self.processed_batches.remove(0);
if *processed_batch.id >= *last_batch.id {
crit!(self.log, "A processed batch had a greater id than the current process id";
"processed_id" => *processed_batch.id,
"current_id" => *last_batch.id);
}
if let Some(prev_hash) = processed_batch.original_hash {
// The validated batch has been re-processed
if prev_hash != processed_batch.hash() {
// The re-downloaded version was different
if processed_batch.current_peer != processed_batch.original_peer {
// A new peer sent the correct batch, the previous peer did not
// downvote the original peer
//
// If the same peer corrected it's mistake, we allow it.... for
// now.
debug!(
self.log, "Re-processed batch validated. Downvoting original peer";
"batch_id" => *processed_batch.id,
"original_peer" => format!("{}",processed_batch.original_peer),
"new_peer" => format!("{}", processed_batch.current_peer)
);
network.downvote_peer(processed_batch.original_peer);
}
}
}
}
}
/// An invalid batch has been received that could not be processed.
///
/// These events occur when a peer as successfully responded with blocks, but the blocks we