diff --git a/beacon_node/network/src/sync/range_sync/batch_processing.rs b/beacon_node/network/src/sync/block_processor.rs similarity index 66% rename from beacon_node/network/src/sync/range_sync/batch_processing.rs rename to beacon_node/network/src/sync/block_processor.rs index 484bab9bac..cfb82eb8fb 100644 --- a/beacon_node/network/src/sync/range_sync/batch_processing.rs +++ b/beacon_node/network/src/sync/block_processor.rs @@ -1,12 +1,23 @@ -use super::batch::Batch; use crate::message_processor::FUTURE_SLOT_TOLERANCE; use crate::sync::manager::SyncMessage; +use crate::sync::range_sync::BatchId; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; +use eth2_libp2p::PeerId; use slog::{debug, error, trace, warn}; use std::sync::{Arc, Weak}; use tokio::sync::mpsc; +use types::SignedBeaconBlock; -/// The result of attempting to process a batch of blocks. +/// Id associated to a block processing request, either a batch or a single block. +#[derive(Clone, Debug, PartialEq)] +pub enum ProcessId { + /// Processing Id of a range syncing batch. + RangeBatchId(BatchId), + /// Processing Id of the parent lookup of a block + ParentLookup(PeerId), +} + +/// The result of a block processing request. // TODO: When correct batch error handling occurs, we will include an error type. #[derive(Debug)] pub enum BatchProcessResult { @@ -16,46 +27,81 @@ pub enum BatchProcessResult { Failed, } -// TODO: Refactor to async fn, with stable futures -pub fn spawn_batch_processor( +/// Spawns a thread handling the block processing of a request: range syncing or parent lookup. +pub fn spawn_block_processor( chain: Weak>, - process_id: u64, - batch: Batch, + process_id: ProcessId, + downloaded_blocks: Vec>, mut sync_send: mpsc::UnboundedSender>, log: slog::Logger, ) { std::thread::spawn(move || { - debug!(log, "Processing batch"; "id" => *batch.id); - let result = match process_batch(chain, &batch, &log) { - Ok(_) => BatchProcessResult::Success, - Err(_) => BatchProcessResult::Failed, - }; + match process_id { + // this a request from the range sync + ProcessId::RangeBatchId(batch_id) => { + debug!(log, "Processing batch"; "id" => *batch_id, "blocks" => downloaded_blocks.len()); + let result = match process_blocks(chain, downloaded_blocks.iter(), &log) { + Ok(_) => { + debug!(log, "Batch processed"; "id" => *batch_id ); + BatchProcessResult::Success + } + Err(e) => { + debug!(log, "Batch processing failed"; "id" => *batch_id, "error" => e); + BatchProcessResult::Failed + } + }; - debug!(log, "Batch processed"; "id" => *batch.id, "result" => format!("{:?}", result)); - - sync_send - .try_send(SyncMessage::BatchProcessed { - process_id, - batch: Box::new(batch), - result, - }) - .unwrap_or_else(|_| { - debug!( - log, - "Batch result could not inform sync. Likely shutting down." - ); - }); + let msg = SyncMessage::BatchProcessed { + batch_id: batch_id, + downloaded_blocks: downloaded_blocks, + result, + }; + sync_send.try_send(msg).unwrap_or_else(|_| { + debug!( + log, + "Block processor could not inform range sync result. Likely shutting down." + ); + }); + } + // this a parent lookup request from the sync manager + ProcessId::ParentLookup(peer_id) => { + debug!(log, "Processing parent lookup"; "last_peer_id" => format!("{}", peer_id), "blocks" => downloaded_blocks.len()); + // parent blocks are ordered from highest slot to lowest, so we need to process in + // reverse + match process_blocks(chain, downloaded_blocks.iter().rev(), &log) { + Err(e) => { + warn!(log, "Parent lookup failed"; "last_peer_id" => format!("{}", peer_id), "error" => e); + sync_send + .try_send(SyncMessage::ParentLookupFailed(peer_id)) + .unwrap_or_else(|_| { + // on failure, inform to downvote the peer + debug!( + log, + "Block processor could not inform parent lookup result. Likely shutting down." + ); + }); + } + Ok(_) => { + debug!(log, "Parent lookup processed successfully"); + } + } + } + } }); } -// Helper function to process block batches which only consumes the chain and blocks to process -fn process_batch( +/// Helper function to process blocks batches which only consumes the chain and blocks to process. +fn process_blocks< + 'a, + T: BeaconChainTypes, + I: Iterator>, +>( chain: Weak>, - batch: &Batch, + downloaded_blocks: I, log: &slog::Logger, ) -> Result<(), String> { let mut successful_block_import = false; - for block in &batch.downloaded_blocks { + for block in downloaded_blocks { if let Some(chain) = chain.upgrade() { let processing_result = chain.process_block(block.clone()); @@ -72,6 +118,7 @@ fn process_batch( } BlockProcessingOutcome::ParentUnknown { parent, .. } => { // blocks should be sequential and all parents should exist + // this is a failure if blocks do not have parents warn!( log, "Parent block is unknown"; "parent_root" => format!("{}", parent), diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index ae542b324c..9fac59497b 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -33,8 +33,9 @@ //! if an attestation references an unknown block) this manager can search for the block and //! subsequently search for parents if needed. +use super::block_processor::{spawn_block_processor, BatchProcessResult, ProcessId}; use super::network_context::SyncNetworkContext; -use super::range_sync::{Batch, BatchProcessResult, RangeSync}; +use super::range_sync::{BatchId, RangeSync}; use crate::message_processor::PeerSyncInfo; use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; @@ -99,10 +100,13 @@ pub enum SyncMessage { /// A batch has been processed by the block processor thread. BatchProcessed { - process_id: u64, - batch: Box>, + batch_id: BatchId, + downloaded_blocks: Vec>, result: BatchProcessResult, }, + + /// A parent lookup has failed for a block given by this `peer_id`. + ParentLookupFailed(PeerId), } /// Maintains a sequential list of parents to lookup and the lookup's current state. @@ -172,6 +176,9 @@ pub struct SyncManager { /// The logger for the import manager. log: Logger, + + /// The sending part of input_channel + sync_send: mpsc::UnboundedSender>, } /// Spawns a new `SyncManager` thread which has a weak reference to underlying beacon @@ -202,6 +209,7 @@ pub fn spawn( single_block_lookups: FnvHashMap::default(), full_peers: HashSet::new(), log: log.clone(), + sync_send: sync_send.clone(), }; // spawn the sync manager thread @@ -590,8 +598,6 @@ impl SyncManager { // If the last block in the queue has an unknown parent, we continue the parent // lookup-search. - let total_blocks_to_process = parent_request.downloaded_blocks.len(); - if let Some(chain) = self.chain.upgrade() { let newest_block = parent_request .downloaded_blocks @@ -606,7 +612,15 @@ impl SyncManager { return; } Ok(BlockProcessingOutcome::Processed { .. }) - | Ok(BlockProcessingOutcome::BlockIsAlreadyKnown { .. }) => {} + | Ok(BlockProcessingOutcome::BlockIsAlreadyKnown { .. }) => { + spawn_block_processor( + self.chain.clone(), + ProcessId::ParentLookup(parent_request.last_submitted_peer.clone()), + parent_request.downloaded_blocks, + self.sync_send.clone(), + self.log.clone(), + ); + } Ok(outcome) => { // all else we consider the chain a failure and downvote the peer that sent // us the last block @@ -634,64 +648,6 @@ impl SyncManager { // chain doesn't exist, drop the parent queue and return return; } - - //TODO: Shift this to a block processing thread - - // the last received block has been successfully processed, process all other blocks in the - // chain - while let Some(block) = parent_request.downloaded_blocks.pop() { - // check if the chain exists - if let Some(chain) = self.chain.upgrade() { - match chain.process_block(block) { - Ok(BlockProcessingOutcome::Processed { .. }) - | Ok(BlockProcessingOutcome::BlockIsAlreadyKnown { .. }) => {} // continue to the next block - - // all else is considered a failure - Ok(outcome) => { - // the previous blocks have failed, notify the user the chain lookup has - // failed and drop the parent queue - debug!( - self.log, "Invalid parent chain. Past blocks failure"; - "outcome" => format!("{:?}", outcome), - "peer" => format!("{:?}", parent_request.last_submitted_peer), - ); - self.network - .downvote_peer(parent_request.last_submitted_peer.clone()); - break; - } - Err(e) => { - warn!( - self.log, "Parent chain processing error."; - "error" => format!("{:?}", e) - ); - self.network - .downvote_peer(parent_request.last_submitted_peer.clone()); - break; - } - } - } else { - // chain doesn't exist, end the processing - break; - } - } - - // at least one block has been processed, run fork-choice - if let Some(chain) = self.chain.upgrade() { - match chain.fork_choice() { - Ok(()) => trace!( - self.log, - "Fork choice success"; - "block_imports" => total_blocks_to_process - parent_request.downloaded_blocks.len(), - "location" => "parent request" - ), - Err(e) => error!( - self.log, - "Fork choice failed"; - "error" => format!("{:?}", e), - "location" => "parent request" - ), - }; - } } } @@ -782,17 +738,20 @@ impl Future for SyncManager { self.inject_error(peer_id, request_id); } SyncMessage::BatchProcessed { - process_id, - batch, + batch_id, + downloaded_blocks, result, } => { self.range_sync.handle_block_process_result( &mut self.network, - process_id, - *batch, + batch_id, + downloaded_blocks, result, ); } + SyncMessage::ParentLookupFailed(peer_id) => { + self.network.downvote_peer(peer_id); + } }, Ok(Async::NotReady) => break, Ok(Async::Ready(None)) => { diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index e9bc70e557..57d9ee393a 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -1,6 +1,7 @@ //! Syncing for lighthouse. //! //! Stores the various syncing methods for the beacon chain. +mod block_processor; pub mod manager; mod network_context; mod range_sync; diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index e548134f71..88061378a7 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,5 +1,5 @@ use super::batch::{Batch, BatchId, PendingBatches}; -use super::batch_processing::{spawn_batch_processor, BatchProcessResult}; +use crate::sync::block_processor::{spawn_block_processor, BatchProcessResult, ProcessId}; use crate::sync::network_context::SyncNetworkContext; use crate::sync::SyncMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; @@ -76,7 +76,7 @@ pub struct SyncingChain { /// A random id given to a batch process request. This is None if there is no ongoing batch /// process. - current_processing_id: Option, + current_processing_batch: Option>, /// A send channel to the sync manager. This is given to the batch processor thread to report /// back once batch processing has completed. @@ -120,7 +120,7 @@ impl SyncingChain { to_be_downloaded_id: BatchId(1), to_be_processed_id: BatchId(1), state: ChainSyncingState::Stopped, - current_processing_id: None, + current_processing_batch: None, sync_send, chain, log, @@ -167,15 +167,16 @@ impl SyncingChain { // An entire batch of blocks has been received. This functions checks to see if it can be processed, // remove any batches waiting to be verified and if this chain is syncing, request new // blocks for the peer. - debug!(self.log, "Completed batch received"; "id"=> *batch.id, "blocks"=>batch.downloaded_blocks.len(), "awaiting_batches" => self.completed_batches.len()); + debug!(self.log, "Completed batch received"; "id"=> *batch.id, "blocks" => &batch.downloaded_blocks.len(), "awaiting_batches" => self.completed_batches.len()); // verify the range of received blocks // Note that the order of blocks is verified in block processing if let Some(last_slot) = batch.downloaded_blocks.last().map(|b| b.slot()) { // the batch is non-empty - if batch.start_slot > batch.downloaded_blocks[0].slot() || batch.end_slot < last_slot { + let first_slot = batch.downloaded_blocks[0].slot(); + if batch.start_slot > first_slot || batch.end_slot < last_slot { warn!(self.log, "BlocksByRange response returned out of range blocks"; - "response_initial_slot" => batch.downloaded_blocks[0].slot(), + "response_initial_slot" => first_slot, "requested_initial_slot" => batch.start_slot); network.downvote_peer(batch.current_peer); self.to_be_processed_id = batch.id; // reset the id back to here, when incrementing, it will check against completed batches @@ -218,7 +219,7 @@ impl SyncingChain { } // Only process one batch at a time - if self.current_processing_id.is_some() { + if self.current_processing_batch.is_some() { return; } @@ -238,14 +239,14 @@ impl SyncingChain { } /// Sends a batch to the batch processor. - fn process_batch(&mut self, batch: Batch) { - // only spawn one instance at a time - let processing_id: u64 = rand::random(); - self.current_processing_id = Some(processing_id); - spawn_batch_processor( + fn process_batch(&mut self, mut batch: Batch) { + let downloaded_blocks = std::mem::replace(&mut batch.downloaded_blocks, Vec::new()); + let batch_id = ProcessId::RangeBatchId(batch.id.clone()); + self.current_processing_batch = Some(batch); + spawn_block_processor( self.chain.clone(), - processing_id, - batch, + batch_id, + downloaded_blocks, self.sync_send.clone(), self.log.clone(), ); @@ -256,30 +257,41 @@ impl SyncingChain { pub fn on_batch_process_result( &mut self, network: &mut SyncNetworkContext, - processing_id: u64, - batch: &mut Option>, + batch_id: BatchId, + downloaded_blocks: &mut Option>>, result: &BatchProcessResult, ) -> Option { - if Some(processing_id) != self.current_processing_id { - // batch process doesn't belong to this chain + if let Some(current_batch) = &self.current_processing_batch { + if current_batch.id != batch_id { + // batch process does not belong to this chain + return None; + } + // Continue. This is our processing request + } else { + // not waiting on a processing result return None; } - // Consume the batch option - let batch = batch.take().or_else(|| { + // claim the result by consuming the option + let downloaded_blocks = downloaded_blocks.take().or_else(|| { + // if taken by another chain, we are no longer waiting on a result. + self.current_processing_batch = None; crit!(self.log, "Processed batch taken by another chain"); None })?; + // No longer waiting on a processing result + let mut batch = self.current_processing_batch.take().unwrap(); + // These are the blocks of this batch + batch.downloaded_blocks = downloaded_blocks; + // double check batches are processed in order TODO: Remove for prod if batch.id != self.to_be_processed_id { crit!(self.log, "Batch processed out of order"; - "processed_batch_id" => *batch.id, - "expected_id" => *self.to_be_processed_id); + "processed_batch_id" => *batch.id, + "expected_id" => *self.to_be_processed_id); } - self.current_processing_id = None; - let res = match result { BatchProcessResult::Success => { *self.to_be_processed_id += 1; diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index 28cff24e31..5d7b17c07a 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -2,11 +2,10 @@ //! peers. mod batch; -mod batch_processing; mod chain; mod chain_collection; mod range; pub use batch::Batch; -pub use batch_processing::BatchProcessResult; +pub use batch::BatchId; pub use range::RangeSync; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 59c6ff598d..d09ef4e25c 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -41,8 +41,9 @@ use super::chain::ProcessingResult; use super::chain_collection::{ChainCollection, SyncState}; -use super::{Batch, BatchProcessResult}; +use super::BatchId; use crate::message_processor::PeerSyncInfo; +use crate::sync::block_processor::BatchProcessResult; use crate::sync::manager::SyncMessage; use crate::sync::network_context::SyncNetworkContext; use beacon_chain::{BeaconChain, BeaconChainTypes}; @@ -130,8 +131,8 @@ impl RangeSync { }, None => { return warn!(self.log, - "Beacon chain dropped. Peer not considered for sync"; - "peer_id" => format!("{:?}", peer_id)); + "Beacon chain dropped. Peer not considered for sync"; + "peer_id" => format!("{:?}", peer_id)); } }; @@ -256,15 +257,15 @@ impl RangeSync { pub fn handle_block_process_result( &mut self, network: &mut SyncNetworkContext, - processing_id: u64, - batch: Batch, + batch_id: BatchId, + downloaded_blocks: Vec>, result: BatchProcessResult, ) { - // build an option for passing the batch to each chain - let mut batch = Some(batch); + // build an option for passing the downloaded_blocks to each chain + let mut downloaded_blocks = Some(downloaded_blocks); match self.chains.finalized_request(|chain| { - chain.on_batch_process_result(network, processing_id, &mut batch, &result) + chain.on_batch_process_result(network, batch_id, &mut downloaded_blocks, &result) }) { Some((index, ProcessingResult::RemoveChain)) => { let chain = self.chains.remove_finalized_chain(index); @@ -293,7 +294,12 @@ impl RangeSync { Some((_, ProcessingResult::KeepChain)) => {} None => { match self.chains.head_request(|chain| { - chain.on_batch_process_result(network, processing_id, &mut batch, &result) + chain.on_batch_process_result( + network, + batch_id, + &mut downloaded_blocks, + &result, + ) }) { Some((index, ProcessingResult::RemoveChain)) => { let chain = self.chains.remove_head_chain(index); @@ -308,7 +314,7 @@ impl RangeSync { None => { // This can happen if a chain gets purged due to being out of date whilst a // batch process is in progress. - debug!(self.log, "No chains match the block processing id"; "id" => processing_id); + debug!(self.log, "No chains match the block processing id"; "id" => *batch_id); } } }