diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index aa5509fd2e..eb2df89c1b 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -79,6 +79,19 @@ pub enum AttestationType { Aggregated, } +/// The result of a chain segment processing. +#[derive(Debug)] +pub enum ChainSegmentResult { + /// Processing this chain segment finished successfully. + Successful { imported_blocks: usize }, + /// There was an error processing this chain segment. Before the error, some blocks could + /// have been imported. + Failed { + imported_blocks: usize, + error: BlockError, + }, +} + /// The accepted clock drift for nodes gossiping blocks and attestations (spec v0.11.0). See: /// /// https://github.com/ethereum/eth2.0-specs/blob/v0.11.0/specs/phase0/p2p-interface.md#configuration @@ -1307,8 +1320,9 @@ impl BeaconChain { pub fn process_chain_segment( &self, chain_segment: Vec>, - ) -> Result, BlockError> { + ) -> ChainSegmentResult { let mut filtered_chain_segment = Vec::with_capacity(chain_segment.len()); + let mut imported_blocks = 0; // Produce a list of the parent root and slot of the child of each block. // @@ -1329,12 +1343,18 @@ impl BeaconChain { // Without this check it would be possible to have a block verified using the // incorrect shuffling. That would be bad, mmkay. if block_root != *child_parent_root { - return Err(BlockError::NonLinearParentRoots); + return ChainSegmentResult::Failed { + imported_blocks, + error: BlockError::NonLinearParentRoots, + }; } - // Ensure that the slots are strictly increasing throughout the chain segement. + // Ensure that the slots are strictly increasing throughout the chain segment. if *child_slot <= block.slot() { - return Err(BlockError::NonLinearSlots); + return ChainSegmentResult::Failed { + imported_blocks, + error: BlockError::NonLinearSlots, + }; } } @@ -1348,7 +1368,10 @@ impl BeaconChain { // If there was an error whilst determining if the block was invalid, return that // error. Err(BlockError::BeaconChainError(e)) => { - return Err(BlockError::BeaconChainError(e)) + return ChainSegmentResult::Failed { + imported_blocks, + error: BlockError::BeaconChainError(e), + } } // If the block was decided to be irrelevant for any other reason, don't include // this block or any of it's children in the filtered chain segment. @@ -1356,8 +1379,6 @@ impl BeaconChain { } } - let mut roots = Vec::with_capacity(filtered_chain_segment.len()); - while !filtered_chain_segment.is_empty() { // Determine the epoch of the first block in the remaining segment. let start_epoch = filtered_chain_segment @@ -1383,15 +1404,31 @@ impl BeaconChain { std::mem::swap(&mut blocks, &mut filtered_chain_segment); // Verify the signature of the blocks, returning early if the signature is invalid. - let signature_verified_blocks = signature_verify_chain_segment(blocks, self)?; + let signature_verified_blocks = match signature_verify_chain_segment(blocks, self) { + Ok(blocks) => blocks, + Err(error) => { + return ChainSegmentResult::Failed { + imported_blocks, + error, + } + } + }; // Import the blocks into the chain. for signature_verified_block in signature_verified_blocks { - roots.push(self.process_block(signature_verified_block)?); + match self.process_block(signature_verified_block) { + Ok(_) => imported_blocks += 1, + Err(error) => { + return ChainSegmentResult::Failed { + imported_blocks, + error, + } + } + } } } - Ok(roots) + ChainSegmentResult::Successful { imported_blocks } } /// Returns `Ok(GossipVerifiedBlock)` if the supplied `block` should be forwarded onto the @@ -2024,7 +2061,7 @@ impl BeaconChain { Ok(dump) } - /// Gets the current EnrForkId. + /// Gets the current `EnrForkId`. pub fn enr_fork_id(&self) -> EnrForkId { // If we are unable to read the slot clock we assume that it is prior to genesis and // therefore use the genesis slot. @@ -2081,3 +2118,12 @@ impl From for Error { Error::BeaconStateError(e) } } + +impl ChainSegmentResult { + pub fn to_block_error(self) -> Result<(), BlockError> { + match self { + ChainSegmentResult::Failed { error, .. } => Err(error), + ChainSegmentResult::Successful { .. } => Ok(()), + } + } +} diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 4b71386943..679dfc667a 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -103,7 +103,7 @@ pub enum BlockError { InvalidSignature, /// The provided block is from an earlier slot than its parent. BlockIsNotLaterThanParent { block_slot: Slot, state_slot: Slot }, - /// At least one block in the chain segement did not have it's parent root set to the root of + /// At least one block in the chain segment did not have it's parent root set to the root of /// the prior block. NonLinearParentRoots, /// The slots of the blocks in the chain segment were not strictly increasing. I.e., a child @@ -153,7 +153,7 @@ impl From for BlockError { /// /// ## Errors /// -/// The given `chain_segement` must span no more than two epochs, otherwise an error will be +/// The given `chain_segment` must span no more than two epochs, otherwise an error will be /// returned. pub fn signature_verify_chain_segment( chain_segment: Vec<(Hash256, SignedBeaconBlock)>, @@ -592,7 +592,7 @@ fn check_block_against_finalized_slot( } } -/// Performs simple, cheap checks to ensure that the block is relevant to imported. +/// Performs simple, cheap checks to ensure that the block is relevant to be imported. /// /// `Ok(block_root)` is returned if the block passes these checks and should progress with /// verification (viz., it is relevant). diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index c9b84e8794..c4ff203082 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -21,7 +21,8 @@ mod timeout_rw_lock; mod validator_pubkey_cache; pub use self::beacon_chain::{ - AttestationProcessingOutcome, AttestationType, BeaconChain, BeaconChainTypes, StateSkipConfig, + AttestationProcessingOutcome, AttestationType, BeaconChain, BeaconChainTypes, + ChainSegmentResult, StateSkipConfig, }; pub use self::beacon_snapshot::BeaconSnapshot; pub use self::errors::{BeaconChainError, BlockProductionError}; diff --git a/beacon_node/beacon_chain/tests/import_chain_segment_tests.rs b/beacon_node/beacon_chain/tests/import_chain_segment_tests.rs index ead944315c..bbefe5be32 100644 --- a/beacon_node/beacon_chain/tests/import_chain_segment_tests.rs +++ b/beacon_node/beacon_chain/tests/import_chain_segment_tests.rs @@ -121,11 +121,13 @@ fn chain_segment_full_segment() { harness .chain .process_chain_segment(vec![]) + .to_block_error() .expect("should import empty chain segment"); harness .chain .process_chain_segment(blocks.clone()) + .to_block_error() .expect("should import chain segment"); harness.chain.fork_choice().expect("should run fork choice"); @@ -156,6 +158,7 @@ fn chain_segment_varying_chunk_size() { harness .chain .process_chain_segment(chunk.to_vec()) + .to_block_error() .expect(&format!( "should import chain segment of len {}", chunk_size @@ -191,7 +194,10 @@ fn chain_segment_non_linear_parent_roots() { blocks.remove(2); assert_eq!( - harness.chain.process_chain_segment(blocks.clone()), + harness + .chain + .process_chain_segment(blocks.clone()) + .to_block_error(), Err(BlockError::NonLinearParentRoots), "should not import chain with missing parent" ); @@ -203,7 +209,10 @@ fn chain_segment_non_linear_parent_roots() { blocks[3].message.parent_root = Hash256::zero(); assert_eq!( - harness.chain.process_chain_segment(blocks.clone()), + harness + .chain + .process_chain_segment(blocks.clone()) + .to_block_error(), Err(BlockError::NonLinearParentRoots), "should not import chain with a broken parent root link" ); @@ -225,7 +234,10 @@ fn chain_segment_non_linear_slots() { blocks[3].message.slot = Slot::new(0); assert_eq!( - harness.chain.process_chain_segment(blocks.clone()), + harness + .chain + .process_chain_segment(blocks.clone()) + .to_block_error(), Err(BlockError::NonLinearSlots), "should not import chain with a parent that has a lower slot than its child" ); @@ -238,7 +250,10 @@ fn chain_segment_non_linear_slots() { blocks[3].message.slot = blocks[2].message.slot; assert_eq!( - harness.chain.process_chain_segment(blocks.clone()), + harness + .chain + .process_chain_segment(blocks.clone()) + .to_block_error(), Err(BlockError::NonLinearSlots), "should not import chain with a parent that has an equal slot to its child" ); @@ -264,6 +279,7 @@ fn invalid_signatures() { harness .chain .process_chain_segment(ancestor_blocks) + .to_block_error() .expect("should import all blocks prior to the one being tested"); // For the given snapshots, test the following: @@ -273,7 +289,7 @@ fn invalid_signatures() { // `SignedBeaconBlock` directly. // - The `verify_block_for_gossip` function does _not_ return an error. // - The `process_block` function returns `InvalidSignature` when verifying the - // GossipVerifiedBlock. + // `GossipVerifiedBlock`. let assert_invalid_signature = |snapshots: &[BeaconSnapshot], item: &str| { let blocks = snapshots .iter() @@ -282,7 +298,7 @@ fn invalid_signatures() { // Ensure the block will be rejected if imported in a chain segment. assert_eq!( - harness.chain.process_chain_segment(blocks), + harness.chain.process_chain_segment(blocks).to_block_error(), Err(BlockError::InvalidSignature), "should not import chain segment with an invalid {} signature", item @@ -321,7 +337,7 @@ fn invalid_signatures() { .collect(); // Ensure the block will be rejected if imported in a chain segment. assert_eq!( - harness.chain.process_chain_segment(blocks), + harness.chain.process_chain_segment(blocks).to_block_error(), Err(BlockError::InvalidSignature), "should not import chain segment with an invalid gossip signature", ); @@ -455,7 +471,8 @@ fn invalid_signatures() { .map(|snapshot| snapshot.beacon_block.clone()) .collect(); assert!( - harness.chain.process_chain_segment(blocks) != Err(BlockError::InvalidSignature), + harness.chain.process_chain_segment(blocks).to_block_error() + != Err(BlockError::InvalidSignature), "should not throw an invalid signature error for a bad deposit signature" ); @@ -516,7 +533,7 @@ fn gossip_verification() { harness .chain .process_block(gossip_verified) - .expect("should import valid gossip verfied block"); + .expect("should import valid gossip verified block"); } /* diff --git a/beacon_node/network/src/sync/block_processor.rs b/beacon_node/network/src/sync/block_processor.rs index 25f53da25d..77d77cfe07 100644 --- a/beacon_node/network/src/sync/block_processor.rs +++ b/beacon_node/network/src/sync/block_processor.rs @@ -1,7 +1,7 @@ use crate::router::processor::FUTURE_SLOT_TOLERANCE; use crate::sync::manager::SyncMessage; use crate::sync::range_sync::BatchId; -use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; +use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, ChainSegmentResult}; use eth2_libp2p::PeerId; use slog::{crit, debug, error, trace, warn}; use std::sync::{Arc, Weak}; @@ -25,6 +25,8 @@ pub enum BatchProcessResult { Success, /// The batch processing failed. Failed, + /// The batch processing failed but managed to import at least one block. + Partial, } /// Spawns a thread handling the block processing of a request: range syncing or parent lookup. @@ -41,11 +43,16 @@ pub fn spawn_block_processor( ProcessId::RangeBatchId(batch_id) => { debug!(log, "Processing batch"; "id" => *batch_id, "blocks" => downloaded_blocks.len()); let result = match process_blocks(chain, downloaded_blocks.iter(), &log) { - Ok(_) => { + (_, Ok(_)) => { debug!(log, "Batch processed"; "id" => *batch_id ); BatchProcessResult::Success } - Err(e) => { + (imported_blocks, Err(e)) if imported_blocks > 0 => { + debug!(log, "Batch processing failed but imported some blocks"; + "id" => *batch_id, "error" => e, "imported_blocks"=> imported_blocks); + BatchProcessResult::Partial + } + (_, Err(e)) => { debug!(log, "Batch processing failed"; "id" => *batch_id, "error" => e); BatchProcessResult::Failed } @@ -65,11 +72,15 @@ pub fn spawn_block_processor( } // this a parent lookup request from the sync manager ProcessId::ParentLookup(peer_id) => { - debug!(log, "Processing parent lookup"; "last_peer_id" => format!("{}", peer_id), "blocks" => downloaded_blocks.len()); + debug!( + log, "Processing parent lookup"; + "last_peer_id" => format!("{}", peer_id), + "blocks" => downloaded_blocks.len() + ); // parent blocks are ordered from highest slot to lowest, so we need to process in // reverse match process_blocks(chain, downloaded_blocks.iter().rev(), &log) { - Err(e) => { + (_, Err(e)) => { warn!(log, "Parent lookup failed"; "last_peer_id" => format!("{}", peer_id), "error" => e); sync_send .try_send(SyncMessage::ParentLookupFailed(peer_id)) @@ -81,7 +92,7 @@ pub fn spawn_block_processor( ); }); } - Ok(_) => { + (_, Ok(_)) => { debug!(log, "Parent lookup processed successfully"); } } @@ -101,98 +112,39 @@ fn process_blocks< chain: Weak>, downloaded_blocks: I, log: &slog::Logger, -) -> Result<(), String> { +) -> (usize, Result<(), String>) { if let Some(chain) = chain.upgrade() { let blocks = downloaded_blocks.cloned().collect::>(); - match chain.process_chain_segment(blocks) { - Ok(roots) => { - if roots.is_empty() { + let (imported_blocks, r) = match chain.process_chain_segment(blocks) { + ChainSegmentResult::Successful { imported_blocks } => { + if imported_blocks == 0 { debug!(log, "All blocks already known"); } else { debug!( log, "Imported blocks from network"; - "count" => roots.len(), + "count" => imported_blocks, ); // Batch completed successfully with at least one block, run fork choice. // TODO: Verify this logic run_fork_choice(chain, log); } + + (imported_blocks, Ok(())) } - Err(BlockError::ParentUnknown(parent)) => { - // blocks should be sequential and all parents should exist - warn!( - log, "Parent block is unknown"; - "parent_root" => format!("{}", parent), - ); - return Err(format!("Block has an unknown parent: {}", parent)); + ChainSegmentResult::Failed { + imported_blocks, + error, + } => { + let r = handle_failed_chain_segment(chain, imported_blocks, error, log); + + (imported_blocks, r) } - Err(BlockError::BlockIsAlreadyKnown) => { - // TODO: Check handling of this - crit!(log, "Unknown handling of block error"); - } - Err(BlockError::FutureSlot { - present_slot, - block_slot, - }) => { - if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot { - // The block is too far in the future, drop it. - warn!( - log, "Block is ahead of our slot clock"; - "msg" => "block for future slot rejected, check your time", - "present_slot" => present_slot, - "block_slot" => block_slot, - "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, - ); - } else { - // The block is in the future, but not too far. - debug!( - log, "Block is slightly ahead of our slot clock, ignoring."; - "present_slot" => present_slot, - "block_slot" => block_slot, - "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, - ); - } - return Err(format!( - "Block with slot {} is higher than the current slot {}", - block_slot, present_slot - )); - } - Err(BlockError::WouldRevertFinalizedSlot { .. }) => { - //TODO: Check handling. Run fork choice? - debug!( - log, "Finalized or earlier block processed"; - ); - // block reached our finalized slot or was earlier, move to the next block - // TODO: How does this logic happen for the chain segment. We would want to - // continue processing in this case. - } - Err(BlockError::GenesisBlock) => { - debug!( - log, "Genesis block was processed"; - ); - // TODO: Similarly here. Prefer to continue processing. - } - Err(BlockError::BeaconChainError(e)) => { - // TODO: Run fork choice? - warn!( - log, "BlockProcessingFailure"; - "msg" => "unexpected condition in processing block.", - "outcome" => format!("{:?}", e) - ); - return Err(format!("Internal error whilst processing block: {:?}", e)); - } - other => { - // TODO: Run fork choice? - warn!( - log, "Invalid block received"; - "msg" => "peer sent invalid block", - "outcome" => format!("{:?}", other), - ); - return Err(format!("Peer sent invalid block. Reason: {:?}", other)); - } - } + }; + + return (imported_blocks, r); } - Ok(()) + + (0, Ok(())) } /// Runs fork-choice on a given chain. This is used during block processing after one successful @@ -212,3 +164,109 @@ fn run_fork_choice(chain: Arc>, log: &slog:: ), } } + +/// Helper function to handle a `BlockError` from `process_chain_segment` +fn handle_failed_chain_segment( + chain: Arc>, + imported_blocks: usize, + error: BlockError, + log: &slog::Logger, +) -> Result<(), String> { + match error { + BlockError::ParentUnknown(parent) => { + // blocks should be sequential and all parents should exist + warn!( + log, "Parent block is unknown"; + "parent_root" => format!("{}", parent), + ); + + // NOTE: logic from master. TODO: check + if imported_blocks > 0 { + run_fork_choice(chain, log); + } + + Err(format!("Block has an unknown parent: {}", parent)) + } + BlockError::BlockIsAlreadyKnown => { + // TODO: Check handling of this + crit!(log, "Unknown handling of block error"); + + Ok(()) + } + BlockError::FutureSlot { + present_slot, + block_slot, + } => { + if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot { + // The block is too far in the future, drop it. + warn!( + log, "Block is ahead of our slot clock"; + "msg" => "block for future slot rejected, check your time", + "present_slot" => present_slot, + "block_slot" => block_slot, + "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, + ); + // NOTE: logic from master. TODO: check + if imported_blocks > 0 { + run_fork_choice(chain, log); + } + } else { + // The block is in the future, but not too far. + debug!( + log, "Block is slightly ahead of our slot clock, ignoring."; + "present_slot" => present_slot, + "block_slot" => block_slot, + "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, + ); + } + + Err(format!( + "Block with slot {} is higher than the current slot {}", + block_slot, present_slot + )) + } + BlockError::WouldRevertFinalizedSlot { .. } => { + //TODO: Check handling. Run fork choice? + debug!( + log, "Finalized or earlier block processed"; + ); + // block reached our finalized slot or was earlier, move to the next block + // TODO: How does this logic happen for the chain segment. We would want to + // continue processing in this case. + + Ok(()) + } + BlockError::GenesisBlock => { + debug!( + log, "Genesis block was processed"; + ); + // TODO: Similarly here. Prefer to continue processing. + + Ok(()) + } + BlockError::BeaconChainError(e) => { + // TODO: Run fork choice? + warn!( + log, "BlockProcessingFailure"; + "msg" => "unexpected condition in processing block.", + "outcome" => format!("{:?}", e) + ); + + Err(format!("Internal error whilst processing block: {:?}", e)) + } + other => { + // TODO: Run fork choice? + // NOTE: logic from master. TODO: check + if imported_blocks > 0 { + run_fork_choice(chain, log); + } + warn!( + log, "Invalid block received"; + "msg" => "peer sent invalid block", + "outcome" => format!("{:?}", other), + ); + + Err(format!("Peer sent invalid block. Reason: {:?}", other)) + } + } +} diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 95abc099ed..bc6216db20 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -52,7 +52,7 @@ pub struct SyncingChain { pub target_head_root: Hash256, /// The batches that are currently awaiting a response from a peer. An RPC request for these - /// have been sent. + /// has been sent. pub pending_batches: PendingBatches, /// The batches that have been downloaded and are awaiting processing and/or validation. @@ -299,40 +299,7 @@ impl SyncingChain { // If the processed batch was not empty, we can validate previous invalidated // blocks if !batch.downloaded_blocks.is_empty() { - // Remove any batches awaiting validation. - // - // All blocks in processed_batches should be prior batches. As the current - // batch has been processed with blocks in it, all previous batches are valid. - // - // If a previous batch has been validated and it had been re-processed, downvote - // the original peer. - while !self.processed_batches.is_empty() { - let processed_batch = self.processed_batches.remove(0); - if *processed_batch.id >= *batch.id { - crit!(self.log, "A processed batch had a greater id than the current process id"; - "processed_id" => *processed_batch.id, - "current_id" => *batch.id); - } - - if let Some(prev_hash) = processed_batch.original_hash { - // The validated batch has been re-processed - if prev_hash != processed_batch.hash() { - // The re-downloaded version was different - if processed_batch.current_peer != processed_batch.original_peer { - // A new peer sent the correct batch, the previous peer did not - // downvote the original peer - // - // If the same peer corrected it's mistake, we allow it.... for - // now. - debug!(self.log, "Re-processed batch validated. Downvoting original peer"; - "batch_id" => *processed_batch.id, - "original_peer" => format!("{}",processed_batch.original_peer), - "new_peer" => format!("{}", processed_batch.current_peer)); - network.downvote_peer(processed_batch.original_peer); - } - } - } - } + self.mark_processed_batches_as_valid(network, &batch); } // Add the current batch to processed batches to be verified in the future. We are @@ -360,6 +327,32 @@ impl SyncingChain { ProcessingResult::KeepChain } } + BatchProcessResult::Partial => { + warn!(self.log, "Batch processing failed but at least one block was imported"; + "id" => *batch.id, "peer" => format!("{}", batch.current_peer) + ); + // At least one block was successfully verified and imported, so we can be sure all + // previous batches are valid and we only need to download the current failed + // batch. + self.mark_processed_batches_as_valid(network, &batch); + + // check that we have not exceeded the re-process retry counter + if batch.reprocess_retries > INVALID_BATCH_LOOKUP_ATTEMPTS { + // if a batch has exceeded the invalid batch lookup attempts limit, it means + // that it is likely all peers in this chain are are sending invalid batches + // repeatedly and are either malicious or faulty. We drop the chain and + // downvote all peers. + warn!(self.log, "Batch failed to download. Dropping chain and downvoting peers"; "id"=> *batch.id); + for peer_id in self.peer_pool.drain() { + network.downvote_peer(peer_id); + } + ProcessingResult::RemoveChain + } else { + // Handle this invalid batch, that is within the re-process retries limit. + self.handle_invalid_batch(network, batch); + ProcessingResult::KeepChain + } + } BatchProcessResult::Failed => { warn!(self.log, "Batch processing failed"; "id" => *batch.id, "peer" => format!("{}", batch.current_peer)); // The batch processing failed @@ -367,7 +360,7 @@ impl SyncingChain { // is invalid. We need to find out which and downvote the peer that has sent us // an invalid batch. - // check that we have no exceeded the re-process retry counter + // check that we have not exceeded the re-process retry counter if batch.reprocess_retries > INVALID_BATCH_LOOKUP_ATTEMPTS { // if a batch has exceeded the invalid batch lookup attempts limit, it means // that it is likely all peers in this chain are are sending invalid batches @@ -389,6 +382,49 @@ impl SyncingChain { Some(res) } + /// Removes any batches awaiting validation. + /// + /// All blocks in `processed_batches` should be prior batches. As the `last_batch` has been + /// processed with blocks in it, all previous batches are valid. + /// + /// If a previous batch has been validated and it had been re-processed, downvote + /// the original peer. + fn mark_processed_batches_as_valid( + &mut self, + network: &mut SyncNetworkContext, + last_batch: &Batch, + ) { + while !self.processed_batches.is_empty() { + let processed_batch = self.processed_batches.remove(0); + if *processed_batch.id >= *last_batch.id { + crit!(self.log, "A processed batch had a greater id than the current process id"; + "processed_id" => *processed_batch.id, + "current_id" => *last_batch.id); + } + + if let Some(prev_hash) = processed_batch.original_hash { + // The validated batch has been re-processed + if prev_hash != processed_batch.hash() { + // The re-downloaded version was different + if processed_batch.current_peer != processed_batch.original_peer { + // A new peer sent the correct batch, the previous peer did not + // downvote the original peer + // + // If the same peer corrected it's mistake, we allow it.... for + // now. + debug!( + self.log, "Re-processed batch validated. Downvoting original peer"; + "batch_id" => *processed_batch.id, + "original_peer" => format!("{}",processed_batch.original_peer), + "new_peer" => format!("{}", processed_batch.current_peer) + ); + network.downvote_peer(processed_batch.original_peer); + } + } + } + } + } + /// An invalid batch has been received that could not be processed. /// /// These events occur when a peer as successfully responded with blocks, but the blocks we