From 2469bde6b1cb54ba65614e30ccf6bce865179cd7 Mon Sep 17 00:00:00 2001 From: divma Date: Wed, 22 Apr 2020 06:17:56 -0500 Subject: [PATCH] Add chain_id in range syncing to avoid wrong dispatching of batch results (#1037) --- .../network/src/sync/block_processor.rs | 11 +-- beacon_node/network/src/sync/manager.rs | 5 +- .../network/src/sync/range_sync/chain.rs | 70 +++++++++++++------ .../src/sync/range_sync/chain_collection.rs | 4 ++ .../network/src/sync/range_sync/mod.rs | 1 + .../network/src/sync/range_sync/range.rs | 12 +++- 6 files changed, 75 insertions(+), 28 deletions(-) diff --git a/beacon_node/network/src/sync/block_processor.rs b/beacon_node/network/src/sync/block_processor.rs index 77d77cfe07..cc546ea081 100644 --- a/beacon_node/network/src/sync/block_processor.rs +++ b/beacon_node/network/src/sync/block_processor.rs @@ -1,6 +1,6 @@ use crate::router::processor::FUTURE_SLOT_TOLERANCE; use crate::sync::manager::SyncMessage; -use crate::sync::range_sync::BatchId; +use crate::sync::range_sync::{BatchId, ChainId}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, ChainSegmentResult}; use eth2_libp2p::PeerId; use slog::{crit, debug, error, trace, warn}; @@ -12,7 +12,7 @@ use types::SignedBeaconBlock; #[derive(Clone, Debug, PartialEq)] pub enum ProcessId { /// Processing Id of a range syncing batch. - RangeBatchId(BatchId), + RangeBatchId(ChainId, BatchId), /// Processing Id of the parent lookup of a block ParentLookup(PeerId), } @@ -40,7 +40,7 @@ pub fn spawn_block_processor( std::thread::spawn(move || { match process_id { // this a request from the range sync - ProcessId::RangeBatchId(batch_id) => { + ProcessId::RangeBatchId(chain_id, batch_id) => { debug!(log, "Processing batch"; "id" => *batch_id, "blocks" => downloaded_blocks.len()); let result = match process_blocks(chain, downloaded_blocks.iter(), &log) { (_, Ok(_)) => { @@ -59,8 +59,9 @@ pub fn spawn_block_processor( }; let msg = SyncMessage::BatchProcessed { - batch_id: batch_id, - downloaded_blocks: downloaded_blocks, + chain_id, + batch_id, + downloaded_blocks, result, }; sync_send.try_send(msg).unwrap_or_else(|_| { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 9e06150e80..db826b54c4 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -35,7 +35,7 @@ use super::block_processor::{spawn_block_processor, BatchProcessResult, ProcessId}; use super::network_context::SyncNetworkContext; -use super::range_sync::{BatchId, RangeSync}; +use super::range_sync::{BatchId, ChainId, RangeSync}; use crate::router::processor::PeerSyncInfo; use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; @@ -99,6 +99,7 @@ pub enum SyncMessage { /// A batch has been processed by the block processor thread. BatchProcessed { + chain_id: ChainId, batch_id: BatchId, downloaded_blocks: Vec>, result: BatchProcessResult, @@ -731,12 +732,14 @@ impl Future for SyncManager { self.inject_error(peer_id, request_id); } SyncMessage::BatchProcessed { + chain_id, batch_id, downloaded_blocks, result, } => { self.range_sync.handle_block_process_result( &mut self.network, + chain_id, batch_id, downloaded_blocks, result, diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 0edb431163..2f19e3673a 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -38,10 +38,16 @@ pub enum ProcessingResult { RemoveChain, } +/// A chain identifier +pub type ChainId = u64; + /// A chain of blocks that need to be downloaded. Peers who claim to contain the target head /// root are grouped into the peer pool and queried for batches when downloading the /// chain. pub struct SyncingChain { + /// A random id used to identify this chain. + id: ChainId, + /// The original start slot when this chain was initialised. pub start_slot: Slot, @@ -99,6 +105,7 @@ pub enum ChainSyncingState { impl SyncingChain { pub fn new( + id: u64, start_slot: Slot, target_head_slot: Slot, target_head_root: Hash256, @@ -111,6 +118,7 @@ impl SyncingChain { peer_pool.insert(peer_id); SyncingChain { + id, start_slot, target_head_slot, target_head_root, @@ -242,11 +250,11 @@ impl SyncingChain { /// Sends a batch to the batch processor. fn process_batch(&mut self, mut batch: Batch) { let downloaded_blocks = std::mem::replace(&mut batch.downloaded_blocks, Vec::new()); - let batch_id = ProcessId::RangeBatchId(batch.id.clone()); + let process_id = ProcessId::RangeBatchId(self.id.clone(), batch.id.clone()); self.current_processing_batch = Some(batch); spawn_block_processor( Arc::downgrade(&self.chain.clone()), - batch_id, + process_id, downloaded_blocks, self.sync_send.clone(), self.log.clone(), @@ -258,26 +266,36 @@ impl SyncingChain { pub fn on_batch_process_result( &mut self, network: &mut SyncNetworkContext, + chain_id: ChainId, batch_id: BatchId, downloaded_blocks: &mut Option>>, result: &BatchProcessResult, ) -> Option { - if let Some(current_batch) = &self.current_processing_batch { - if current_batch.id != batch_id { - // batch process does not belong to this chain + if chain_id != self.id { + // the result does not belong to this chain + return None; + } + match &self.current_processing_batch { + Some(current_batch) if current_batch.id != batch_id => { + debug!(self.log, "Unexpected batch result"; + "chain_id" => self.id, "batch_id" => *batch_id, "expected_batch_id" => *current_batch.id); return None; } - // Continue. This is our processing request - } else { - // not waiting on a processing result - return None; + None => { + debug!(self.log, "Chain was not expecting a batch result"; + "chain_id" => self.id, "batch_id" => *batch_id); + return None; + } + _ => { + // chain_id and batch_id match, continue + } } // claim the result by consuming the option let downloaded_blocks = downloaded_blocks.take().or_else(|| { // if taken by another chain, we are no longer waiting on a result. self.current_processing_batch = None; - crit!(self.log, "Processed batch taken by another chain"); + crit!(self.log, "Processed batch taken by another chain"; "chain_id" => self.id); None })?; @@ -289,6 +307,7 @@ impl SyncingChain { // double check batches are processed in order TODO: Remove for prod if batch.id != self.to_be_processed_id { crit!(self.log, "Batch processed out of order"; + "chain_id" => self.id, "processed_batch_id" => *batch.id, "expected_id" => *self.to_be_processed_id); } @@ -330,7 +349,7 @@ impl SyncingChain { } BatchProcessResult::Partial => { warn!(self.log, "Batch processing failed but at least one block was imported"; - "id" => *batch.id, "peer" => format!("{}", batch.current_peer) + "chain_id" => self.id, "id" => *batch.id, "peer" => format!("{}", batch.current_peer) ); // At least one block was successfully verified and imported, so we can be sure all // previous batches are valid and we only need to download the current failed @@ -343,7 +362,8 @@ impl SyncingChain { // that it is likely all peers in this chain are are sending invalid batches // repeatedly and are either malicious or faulty. We drop the chain and // downvote all peers. - warn!(self.log, "Batch failed to download. Dropping chain and downvoting peers"; "id"=> *batch.id); + warn!(self.log, "Batch failed to download. Dropping chain and downvoting peers"; + "chain_id" => self.id, "id"=> *batch.id); for peer_id in self.peer_pool.drain() { network.downvote_peer(peer_id); } @@ -355,7 +375,8 @@ impl SyncingChain { } } BatchProcessResult::Failed => { - warn!(self.log, "Batch processing failed"; "id" => *batch.id, "peer" => format!("{}", batch.current_peer)); + warn!(self.log, "Batch processing failed"; + "chain_id" => self.id,"id" => *batch.id, "peer" => format!("{}", batch.current_peer)); // The batch processing failed // This could be because this batch is invalid, or a previous invalidated batch // is invalid. We need to find out which and downvote the peer that has sent us @@ -367,7 +388,8 @@ impl SyncingChain { // that it is likely all peers in this chain are are sending invalid batches // repeatedly and are either malicious or faulty. We drop the chain and // downvote all peers. - warn!(self.log, "Batch failed to download. Dropping chain and downvoting peers"; "id"=> *batch.id); + warn!(self.log, "Batch failed to download. Dropping chain and downvoting peers"; + "chain_id" => self.id, "id"=> *batch.id); for peer_id in self.peer_pool.drain() { network.downvote_peer(peer_id); } @@ -399,8 +421,9 @@ impl SyncingChain { let processed_batch = self.processed_batches.remove(0); if *processed_batch.id >= *last_batch.id { crit!(self.log, "A processed batch had a greater id than the current process id"; - "processed_id" => *processed_batch.id, - "current_id" => *last_batch.id); + "chain_id" => self.id, + "processed_id" => *processed_batch.id, + "current_id" => *last_batch.id); } if let Some(prev_hash) = processed_batch.original_hash { @@ -415,9 +438,10 @@ impl SyncingChain { // now. debug!( self.log, "Re-processed batch validated. Downvoting original peer"; - "batch_id" => *processed_batch.id, - "original_peer" => format!("{}",processed_batch.original_peer), - "new_peer" => format!("{}", processed_batch.current_peer) + "chain_id" => self.id, + "batch_id" => *processed_batch.id, + "original_peer" => format!("{}",processed_batch.original_peer), + "new_peer" => format!("{}", processed_batch.current_peer) ); network.downvote_peer(processed_batch.original_peer); } @@ -494,6 +518,7 @@ impl SyncingChain { batch.current_peer = new_peer.clone(); debug!(self.log, "Re-requesting batch"; + "chain_id" => self.id, "start_slot" => batch.start_slot, "end_slot" => batch.end_slot, "id" => *batch.id, @@ -527,6 +552,7 @@ impl SyncingChain { if local_finalized_slot > self.current_processed_slot() { debug!(self.log, "Updating chain's progress"; + "chain_id" => self.id, "prev_completed_slot" => self.current_processed_slot(), "new_completed_slot" => local_finalized_slot.as_u64()); // Re-index batches @@ -554,7 +580,8 @@ impl SyncingChain { self.peer_pool.insert(peer_id.clone()); // do not request blocks if the chain is not syncing if let ChainSyncingState::Stopped = self.state { - debug!(self.log, "Peer added to a non-syncing chain"; "peer_id" => format!("{}", peer_id)); + debug!(self.log, "Peer added to a non-syncing chain"; + "chain_id" => self.id, "peer_id" => format!("{}", peer_id)); return; } @@ -583,6 +610,7 @@ impl SyncingChain { ) -> Option { if let Some(batch) = self.pending_batches.remove(request_id) { warn!(self.log, "Batch failed. RPC Error"; + "chain_id" => self.id, "id" => *batch.id, "retries" => batch.retries, "peer" => format!("{:?}", peer_id)); @@ -623,6 +651,7 @@ impl SyncingChain { batch.current_peer = new_peer.clone(); debug!(self.log, "Re-Requesting batch"; + "chain_id" => self.id, "start_slot" => batch.start_slot, "end_slot" => batch.end_slot, "id" => *batch.id, @@ -647,6 +676,7 @@ impl SyncingChain { if let Some(peer_id) = self.get_next_peer() { if let Some(batch) = self.get_next_batch(peer_id) { debug!(self.log, "Requesting batch"; + "chain_id" => self.id, "start_slot" => batch.start_slot, "end_slot" => batch.end_slot, "id" => *batch.id, diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 7ce4f0552e..4fed36ae9a 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -305,7 +305,9 @@ impl ChainCollection { peer_id: PeerId, sync_send: mpsc::UnboundedSender>, ) { + let chain_id = rand::random(); self.finalized_chains.push(SyncingChain::new( + chain_id, local_finalized_slot, target_slot, target_head, @@ -334,7 +336,9 @@ impl ChainCollection { }); self.head_chains.retain(|chain| !chain.peer_pool.is_empty()); + let chain_id = rand::random(); let mut new_head_chain = SyncingChain::new( + chain_id, remote_finalized_slot, target_slot, target_head, diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index 5d7b17c07a..069fe712bf 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -8,4 +8,5 @@ mod range; pub use batch::Batch; pub use batch::BatchId; +pub use chain::ChainId; pub use range::RangeSync; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 1288e4e96c..0525909ef6 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -39,7 +39,7 @@ //! Each chain is downloaded in batches of blocks. The batched blocks are processed sequentially //! and further batches are requested as current blocks are being processed. -use super::chain::ProcessingResult; +use super::chain::{ChainId, ProcessingResult}; use super::chain_collection::{ChainCollection, RangeSyncState}; use super::BatchId; use crate::router::processor::PeerSyncInfo; @@ -252,6 +252,7 @@ impl RangeSync { pub fn handle_block_process_result( &mut self, network: &mut SyncNetworkContext, + chain_id: ChainId, batch_id: BatchId, downloaded_blocks: Vec>, result: BatchProcessResult, @@ -260,7 +261,13 @@ impl RangeSync { let mut downloaded_blocks = Some(downloaded_blocks); match self.chains.finalized_request(|chain| { - chain.on_batch_process_result(network, batch_id, &mut downloaded_blocks, &result) + chain.on_batch_process_result( + network, + chain_id, + batch_id, + &mut downloaded_blocks, + &result, + ) }) { Some((index, ProcessingResult::RemoveChain)) => { let chain = self.chains.remove_finalized_chain(index); @@ -291,6 +298,7 @@ impl RangeSync { match self.chains.head_request(|chain| { chain.on_batch_process_result( network, + chain_id, batch_id, &mut downloaded_blocks, &result,