From cd7b6da88ed52a604e9ce3cf1d5985aff4f94a13 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 3 Sep 2019 00:34:41 +1000 Subject: [PATCH] Updates syncing, corrects CLI variables --- beacon_node/beacon_chain/src/beacon_chain.rs | 11 +- beacon_node/eth2-libp2p/src/discovery.rs | 10 +- beacon_node/eth2-libp2p/src/service.rs | 19 +- beacon_node/network/src/sync/manager.rs | 616 +++++++++++-------- beacon_node/network/src/sync/simple_sync.rs | 5 +- beacon_node/src/main.rs | 13 +- 6 files changed, 374 insertions(+), 300 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 6380d03b3e..a142816aed 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -442,6 +442,15 @@ impl BeaconChain { None } + /// Returns the block canonical root of the current canonical chain at a given slot. + /// + /// Returns None if a block doesn't exist at the slot. + pub fn root_at_slot(&self, target_slot: Slot) -> Option { + self.rev_iter_block_roots() + .find(|(_root, slot)| *slot == target_slot) + .map(|(root, _slot)| root) + } + /// Reads the slot clock (see `self.read_slot_clock()` and returns the number of slots since /// genesis. pub fn slots_since_genesis(&self) -> Option { @@ -1006,7 +1015,7 @@ impl BeaconChain { }; // Load the parent blocks state from the database, returning an error if it is not found. - // It is an error because if know the parent block we should also know the parent state. + // It is an error because if we know the parent block we should also know the parent state. let parent_state_root = parent_block.state_root; let parent_state = self .store diff --git a/beacon_node/eth2-libp2p/src/discovery.rs b/beacon_node/eth2-libp2p/src/discovery.rs index 6c80a85968..4a8aba2b1b 100644 --- a/beacon_node/eth2-libp2p/src/discovery.rs +++ b/beacon_node/eth2-libp2p/src/discovery.rs @@ -341,13 +341,9 @@ fn save_enr_to_disc(dir: &Path, enr: &Enr, log: &slog::Logger) { } Err(e) => { warn!( - log, - <<<<<<< HEAD - "Could not write ENR to file"; "file" => format!("{:?}{:?}",dir, ENR_FILENAME), "error" => format!("{}", e) - ======= - "Could not write ENR to file"; "File" => format!("{:?}{:?}",dir, ENR_FILENAME), "Error" => format!("{}", e) - >>>>>>> interop - ); + log, + "Could not write ENR to file"; "file" => format!("{:?}{:?}",dir, ENR_FILENAME), "error" => format!("{}", e) + ); } } } diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 589106f48e..1ea1723b68 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -82,17 +82,10 @@ impl Service { // attempt to connect to user-input libp2p nodes for multiaddr in config.libp2p_nodes { match Swarm::dial_addr(&mut swarm, multiaddr.clone()) { -<<<<<<< HEAD Ok(()) => debug!(log, "Dialing libp2p peer"; "address" => format!("{}", multiaddr)), Err(err) => debug!( log, - "Could not connect to peer"; "address" => format!("{}", multiaddr), "Error" => format!("{:?}", err) -======= - Ok(()) => debug!(log, "Dialing libp2p peer"; "Address" => format!("{}", multiaddr)), - Err(err) => debug!( - log, - "Could not connect to peer"; "Address" => format!("{}", multiaddr), "Error" => format!("{:?}", err) ->>>>>>> interop + "Could not connect to peer"; "address" => format!("{}", multiaddr), "error" => format!("{:?}", err) ), }; } @@ -129,7 +122,6 @@ impl Service { let mut subscribed_topics = vec![]; for topic in topics { if swarm.subscribe(topic.clone()) { -<<<<<<< HEAD trace!(log, "Subscribed to topic"; "topic" => format!("{}", topic)); subscribed_topics.push(topic); } else { @@ -137,15 +129,6 @@ impl Service { } } info!(log, "Subscribed to topics"; "topics" => format!("{:?}", subscribed_topics.iter().map(|t| format!("{}", t)).collect::>())); -======= - trace!(log, "Subscribed to topic"; "Topic" => format!("{}", topic)); - subscribed_topics.push(topic); - } else { - warn!(log, "Could not subscribe to topic"; "Topic" => format!("{}", topic)); - } - } - info!(log, "Subscribed to topics"; "Topics" => format!("{:?}", subscribed_topics.iter().map(|t| format!("{}", t)).collect::>())); ->>>>>>> interop Ok(Service { local_peer_id, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 9b2d780f45..a48b43ad7b 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -80,6 +80,9 @@ const PARENT_FAIL_TOLERANCE: usize = 3; /// canonical chain to its head once the peer connects. A chain should not appear where it's depth /// is further back than the most recent head slot. const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2; +/// The number of empty batches we tolerate before dropping the peer. This prevents endless +/// requests to peers who never return blocks. +const EMPTY_BATCH_TOLERANCE: usize = 100; #[derive(PartialEq)] /// The current state of a block or batches lookup. @@ -95,6 +98,19 @@ enum BlockRequestsState { Failed, } +/// The state of batch requests. +enum SyncDirection { + /// The batch has just been initialised and we need to check to see if a backward sync is + /// required on first batch response. + Initial, + /// We are syncing forwards, the next batch should contain higher slot numbers than is + /// predecessor. + Forwards, + /// We are syncing backwards and looking for a common ancestor chain before we can start + /// processing the downloaded blocks. + Backwards, +} + /// `BlockRequests` keep track of the long-range (batch) sync process per peer. struct BlockRequests { /// The peer's head slot and the target of this batch download. @@ -104,10 +120,13 @@ struct BlockRequests { target_head_root: Hash256, /// The blocks that we have currently downloaded from the peer that are yet to be processed. downloaded_blocks: Vec>, + /// The number of empty batches we have consecutively received. If a peer returns more than + /// EMPTY_BATCHES_TOLERANCE, they are dropped. + consecutive_empty_batches: usize, /// The current state of this batch request. state: BlockRequestsState, - /// Specifies whether the current state is syncing forwards or backwards. - forward_sync: bool, + /// Specifies the current direction of this batch request. + sync_direction: SyncDirection, /// The current `start_slot` of the batched block request. current_start_slot: Slot, } @@ -129,10 +148,13 @@ struct ParentRequests { impl BlockRequests { /// Gets the next start slot for a batch and transitions the state to a Queued state. fn update_start_slot(&mut self) { - if self.forward_sync { - self.current_start_slot += Slot::from(MAX_BLOCKS_PER_REQUEST); - } else { - self.current_start_slot -= Slot::from(MAX_BLOCKS_PER_REQUEST); + match self.sync_direction { + SyncDirection::Initial | SyncDirection::Forwards => { + self.current_start_slot += Slot::from(MAX_BLOCKS_PER_REQUEST); + } + SyncDirection::Backwards => { + self.current_start_slot -= Slot::from(MAX_BLOCKS_PER_REQUEST); + } } self.state = BlockRequestsState::Queued; } @@ -175,6 +197,8 @@ pub(crate) enum ImportManagerOutcome { /// controls the logic behind both the long-range (batch) sync and the on-going potential parent /// look-up of blocks. pub struct ImportManager { + /// List of events to be processed externally. + event_queue: SmallVec<[ImportManagerOutcome; 20]>, /// A weak reference to the underlying beacon chain. chain: Weak>, /// The current state of the import manager. @@ -200,6 +224,7 @@ impl ImportManager { /// dropped during the syncing process. The syncing handles this termination gracefully. pub fn new(beacon_chain: Arc>, log: &slog::Logger) -> Self { ImportManager { + event_queue: SmallVec::new(), chain: Arc::downgrade(&beacon_chain), state: ManagerState::Regular, import_queue: HashMap::new(), @@ -253,7 +278,7 @@ impl ImportManager { // Check if the peer is significantly is behind us. If within `SLOT_IMPORT_TOLERANCE` // treat them as a fully synced peer. If not, ignore them in the sync process if local.head_slot.sub(remote.head_slot).as_usize() < SLOT_IMPORT_TOLERANCE { - self.add_full_peer(peer_id); + self.add_full_peer(peer_id.clone()); } else { debug!( self.log, @@ -275,9 +300,10 @@ impl ImportManager { let block_requests = BlockRequests { target_head_slot: remote.head_slot, // this should be larger than the current head. It is checked in the SyncManager before add_peer is called target_head_root: remote.head_root, + consecutive_empty_batches: 0, downloaded_blocks: Vec::new(), state: BlockRequestsState::Queued, - forward_sync: true, + sync_direction: SyncDirection::Initial, current_start_slot: chain.best_slot(), }; self.import_queue.insert(peer_id, block_requests); @@ -291,6 +317,16 @@ impl ImportManager { request_id: RequestId, mut blocks: Vec>, ) { + // ensure the underlying chain still exists + let chain = match self.chain.upgrade() { + Some(chain) => chain, + None => { + debug!(self.log, "Chain dropped. Sync terminating"); + self.event_queue.clear(); + return; + } + }; + // find the request associated with this response let block_requests = match self .import_queue @@ -315,10 +351,19 @@ impl ImportManager { if blocks.is_empty() { debug!(self.log, "BeaconBlocks response was empty"; "request_id" => request_id); - block_requests.update_start_slot(); + block_requests.consecutive_empty_batches += 1; + if block_requests.consecutive_empty_batches >= EMPTY_BATCH_TOLERANCE { + warn!(self.log, "Peer returned too many empty block batches"; + "peer" => format!("{:?}", peer_id)); + block_requests.state = BlockRequestsState::Failed; + } else { + block_requests.update_start_slot(); + } return; } + block_requests.consecutive_empty_batches = 0; + // verify the range of received blocks // Note that the order of blocks is verified in block processing let last_sent_slot = blocks[blocks.len() - 1].slot; @@ -328,83 +373,90 @@ impl ImportManager { .add(MAX_BLOCKS_PER_REQUEST) < last_sent_slot { - //TODO: Downvote peer - add a reason to failed - dbg!(&blocks); warn!(self.log, "BeaconBlocks response returned out of range blocks"; "request_id" => request_id, "response_initial_slot" => blocks[0].slot, "requested_initial_slot" => block_requests.current_start_slot); + self.event_queue + .push(ImportManagerOutcome::DownvotePeer(peer_id)); // consider this sync failed block_requests.state = BlockRequestsState::Failed; return; } // Determine if more blocks need to be downloaded. There are a few cases: - // - We have downloaded a batch from our head_slot, which has not reached the remotes head - // (target head). Therefore we need to download another sequential batch. - // - The latest batch includes blocks that greater than or equal to the target_head slot, - // which means we have caught up to their head. We then check to see if the first - // block downloaded matches our head. If so, we are on the same chain and can process - // the blocks. If not we need to sync back further until we are on the same chain. So - // request more blocks. - // - We are syncing backwards (from our head slot) and need to check if we are on the same - // chain. If so, process the blocks, if not, request more blocks all the way up to - // our last finalized slot. + // - We are in initial sync mode - We have requested blocks and need to determine if this + // is part of a known chain to determine the whether to start syncing backwards or continue + // syncing forwards. + // - We are syncing backwards and need to verify if we have found a common ancestor in + // order to start processing the downloaded blocks. + // - We are syncing forwards. We mark this as complete and check if any further blocks are + // required to download when processing the batch. - if block_requests.forward_sync { - // append blocks if syncing forward - block_requests.downloaded_blocks.append(&mut blocks); - } else { - // prepend blocks if syncing backwards - block_requests.downloaded_blocks.splice(..0, blocks); - } + match block_requests.sync_direction { + SyncDirection::Initial => { + block_requests.downloaded_blocks.append(&mut blocks); - // does the batch contain the target_head_slot - let last_element_index = block_requests.downloaded_blocks.len() - 1; - if block_requests.downloaded_blocks[last_element_index].slot - >= block_requests.target_head_slot - || !block_requests.forward_sync - { - // if the batch is on our chain, this is complete and we can then process. - // Otherwise start backwards syncing until we reach a common chain. - let earliest_slot = block_requests.downloaded_blocks[0].slot; - //TODO: Decide which is faster. Reading block from db and comparing or calculating - //the hash tree root and comparing. - if Some(block_requests.downloaded_blocks[0].canonical_root()) - == root_at_slot(&self.chain, earliest_slot) - { - block_requests.state = BlockRequestsState::Complete; - return; + // this batch is the first batch downloaded. Check if we can process or if we need + // to backwards search. + + //TODO: Decide which is faster. Reading block from db and comparing or calculating + //the hash tree root and comparing. + let earliest_slot = block_requests.downloaded_blocks[0].slot; + if Some(block_requests.downloaded_blocks[0].canonical_root()) + == chain.root_at_slot(earliest_slot) + { + // we have a common head, start processing and begin a forwards sync + block_requests.sync_direction = SyncDirection::Forwards; + block_requests.state = BlockRequestsState::ReadyToProcess; + return; + } + // no common head, begin a backwards search + block_requests.sync_direction = SyncDirection::Backwards; + block_requests.current_start_slot = + std::cmp::min(chain.best_slot(), block_requests.downloaded_blocks[0].slot); + block_requests.update_start_slot(); } - - // not on the same chain, request blocks backwards - let state = &self.chain.head().beacon_state; - let local_finalized_slot = state - .finalized_checkpoint - .epoch - .start_slot(T::EthSpec::slots_per_epoch()); - - // check that the request hasn't failed by having no common chain - if local_finalized_slot >= block_requests.current_start_slot { - warn!(self.log, "Peer returned an unknown chain."; "request_id" => request_id); - block_requests.state = BlockRequestsState::Failed; - return; + SyncDirection::Forwards => { + // continue processing all blocks forwards, verify the end in the processing + block_requests.downloaded_blocks.append(&mut blocks); + block_requests.state = BlockRequestsState::ReadyToProcess; } + SyncDirection::Backwards => { + block_requests.downloaded_blocks.splice(..0, blocks); - // if this is a forward sync, then we have reached the head without a common chain - // and we need to start syncing backwards. - if block_requests.forward_sync { - // Start a backwards sync by requesting earlier blocks - block_requests.forward_sync = false; - block_requests.current_start_slot = std::cmp::min( - self.chain.best_slot(), - block_requests.downloaded_blocks[0].slot, - ); + // verify the request hasn't failed by having no common ancestor chain + // get our local finalized_slot + let local_finalized_slot = { + let state = &chain.head().beacon_state; + state + .finalized_checkpoint + .epoch + .start_slot(T::EthSpec::slots_per_epoch()) + }; + + if local_finalized_slot >= block_requests.current_start_slot { + warn!(self.log, "Peer returned an unknown chain."; "request_id" => request_id); + block_requests.state = BlockRequestsState::Failed; + return; + } + + // check if we have reached a common chain ancestor + let earliest_slot = block_requests.downloaded_blocks[0].slot; + if Some(block_requests.downloaded_blocks[0].canonical_root()) + == chain.root_at_slot(earliest_slot) + { + // we have a common head, start processing and begin a forwards sync + block_requests.sync_direction = SyncDirection::Forwards; + block_requests.state = BlockRequestsState::ReadyToProcess; + return; + } + + // no common chain, haven't passed last_finalized_head, so continue backwards + // search + block_requests.update_start_slot(); } } - - // update the start slot and re-queue the batch - block_requests.update_start_slot(); } pub fn recent_blocks_response( @@ -447,7 +499,7 @@ impl ImportManager { } // queue for processing - parent_request.state = BlockRequestsState::Complete; + parent_request.state = BlockRequestsState::ReadyToProcess; } pub fn _inject_error(_peer_id: PeerId, _id: RequestId) { @@ -500,29 +552,41 @@ impl ImportManager { pub(crate) fn poll(&mut self) -> ImportManagerOutcome { loop { + //TODO: Optimize the lookups. Potentially keep state of whether each of these functions + //need to be called. + + // only break once everything has been processed + let mut re_run = false; + + // only process batch requests if there are any + if !self.import_queue.is_empty() { + // process potential block requests + self.process_potential_block_requests(); + + // process any complete long-range batches + re_run = self.process_complete_batches(); + } + + // only process parent objects if we are in regular sync + if let ManagerState::Regular = self.state { + // process any parent block lookup-requests + self.process_parent_requests(); + + // process any complete parent lookups + re_run = self.process_complete_parent_requests(); + } + + // return any queued events + if !self.event_queue.is_empty() { + let event = self.event_queue.remove(0); + self.event_queue.shrink_to_fit(); + return event; + } + // update the state of the manager self.update_state(); - // process potential block requests - if let Some(outcome) = self.process_potential_block_requests() { - return outcome; - } - - // process any complete long-range batches - if let Some(outcome) = self.process_complete_batches() { - return outcome; - } - - // process any parent block lookup-requests - if let Some(outcome) = self.process_parent_requests() { - return outcome; - } - - // process any complete parent lookups - let (re_run, outcome) = self.process_complete_parent_requests(); - if let Some(outcome) = outcome { - return outcome; - } else if !re_run { + if !re_run { break; } } @@ -549,11 +613,11 @@ impl ImportManager { } } - fn process_potential_block_requests(&mut self) -> Option { + fn process_potential_block_requests(&mut self) { // check if an outbound request is required // Managing a fixed number of outbound requests is maintained at the RPC protocol libp2p - // layer and not needed here. - // If any in queued state we submit a request. + // layer and not needed here. Therefore we create many outbound requests and let the RPC + // handle the number of simultaneous requests. Request all queued objects. // remove any failed batches let debug_log = &self.log; @@ -585,56 +649,84 @@ impl ImportManager { count: MAX_BLOCKS_PER_REQUEST, step: 0, }; - return Some(ImportManagerOutcome::RequestBlocks { + self.event_queue.push(ImportManagerOutcome::RequestBlocks { peer_id: peer_id.clone(), request, request_id, }); } - - None } - fn process_complete_batches(&mut self) -> Option { - let completed_batches = self - .import_queue - .iter() - .filter(|(_peer, block_requests)| block_requests.state == BlockRequestsState::Complete) - .map(|(peer, _)| peer) - .cloned() - .collect::>(); - for peer_id in completed_batches { - let block_requests = self.import_queue.remove(&peer_id).expect("key exists"); - match self.process_blocks(block_requests.downloaded_blocks.clone()) { - Ok(()) => { - //TODO: Verify it's impossible to have empty downloaded_blocks - let last_element = block_requests.downloaded_blocks.len() - 1; - debug!(self.log, "Blocks processed successfully"; - "peer" => format!("{:?}", peer_id), - "start_slot" => block_requests.downloaded_blocks[0].slot, - "end_slot" => block_requests.downloaded_blocks[last_element].slot, - "no_blocks" => last_element + 1, - ); - // Re-HELLO to ensure we are up to the latest head - return Some(ImportManagerOutcome::Hello(peer_id)); - } - Err(e) => { - let last_element = block_requests.downloaded_blocks.len() - 1; - warn!(self.log, "Block processing failed"; + fn process_complete_batches(&mut self) -> bool { + // flag to indicate if the manager can be switched to idle or not + let mut re_run = false; + + // create reference variables to be moved into subsequent closure + let chain_ref = self.chain.clone(); + let log_ref = &self.log; + let event_queue_ref = &mut self.event_queue; + + self.import_queue.retain(|peer_id, block_requests| { + // check that the chain still exists + if let Some(chain) = chain_ref.upgrade() { + let downloaded_blocks = + std::mem::replace(&mut block_requests.downloaded_blocks, Vec::new()); + let last_element = block_requests.downloaded_blocks.len() - 1; + let start_slot = block_requests.downloaded_blocks[0].slot; + let end_slot = block_requests.downloaded_blocks[last_element].slot; + + match process_blocks(chain, downloaded_blocks, log_ref) { + Ok(()) => { + debug!(log_ref, "Blocks processed successfully"; "peer" => format!("{:?}", peer_id), - "start_slot" => block_requests.downloaded_blocks[0].slot, - "end_slot" => block_requests.downloaded_blocks[last_element].slot, + "start_slot" => start_slot, + "end_slot" => end_slot, "no_blocks" => last_element + 1, - "error" => format!("{:?}", e), - ); - return Some(ImportManagerOutcome::DownvotePeer(peer_id)); + ); + + // check if the batch is complete, by verifying if we have reached the + // target head + if end_slot >= block_requests.target_head_slot { + // Completed, re-hello the peer to ensure we are up to the latest head + event_queue_ref.push(ImportManagerOutcome::Hello(peer_id.clone())); + // remove the request + false + } else { + // have not reached the end, queue another batch + block_requests.update_start_slot(); + re_run = true; + // keep the batch + true + } + } + Err(e) => { + warn!(log_ref, "Block processing failed"; + "peer" => format!("{:?}", peer_id), + "start_slot" => start_slot, + "end_slot" => end_slot, + "no_blocks" => last_element + 1, + "error" => format!("{:?}", e), + ); + event_queue_ref.push(ImportManagerOutcome::DownvotePeer(peer_id.clone())); + false + } } + } else { + // chain no longer exists, empty the queue and return + event_queue_ref.clear(); + return false; } - } - None + }); + + re_run } - fn process_parent_requests(&mut self) -> Option { + fn process_parent_requests(&mut self) { + // check to make sure there are peers to search for the parent from + if self.full_peers.is_empty() { + return; + } + // remove any failed requests let debug_log = &self.log; self.parent_queue.retain(|parent_request| { @@ -649,11 +741,6 @@ impl ImportManager { } }); - // check to make sure there are peers to search for the parent from - if self.full_peers.is_empty() { - return None; - } - // check if parents need to be searched for for parent_request in self.parent_queue.iter_mut() { if parent_request.failed_attempts >= PARENT_FAIL_TOLERANCE { @@ -677,23 +764,21 @@ impl ImportManager { // select a random fully synced peer to attempt to download the parent block let peer_id = self.full_peers.iter().next().expect("List is not empty"); - return Some(ImportManagerOutcome::RecentRequest(peer_id.clone(), req)); + self.event_queue + .push(ImportManagerOutcome::RecentRequest(peer_id.clone(), req)); } } - - None } - fn process_complete_parent_requests(&mut self) -> (bool, Option) { - // flag to determine if there is more process to drive or if the manager can be switched to - // an idle state + fn process_complete_parent_requests(&mut self) -> bool { + // returned value indicating whether the manager can be switched to idle or not let mut re_run = false; // Find any parent_requests ready to be processed for completed_request in self .parent_queue .iter_mut() - .filter(|req| req.state == BlockRequestsState::Complete) + .filter(|req| req.state == BlockRequestsState::ReadyToProcess) { // verify the last added block is the parent of the last requested block let last_index = completed_request.downloaded_blocks.len() - 1; @@ -711,7 +796,9 @@ impl ImportManager { "received_block" => format!("{}", block_hash), "expected_parent" => format!("{}", expected_hash), ); - return (true, Some(ImportManagerOutcome::DownvotePeer(peer))); + re_run = true; + self.event_queue + .push(ImportManagerOutcome::DownvotePeer(peer)); } // try and process the list of blocks up to the requested block @@ -720,154 +807,153 @@ impl ImportManager { .downloaded_blocks .pop() .expect("Block must exist exist"); - match self.chain.process_block(block.clone()) { - Ok(BlockProcessingOutcome::ParentUnknown { parent: _ }) => { - // need to keep looking for parents - completed_request.downloaded_blocks.push(block); - completed_request.state = BlockRequestsState::Queued; - re_run = true; - break; - } - Ok(BlockProcessingOutcome::Processed { block_root: _ }) => {} - Ok(outcome) => { - // it's a future slot or an invalid block, remove it and try again - completed_request.failed_attempts += 1; - trace!( - self.log, "Invalid parent block"; - "outcome" => format!("{:?}", outcome), - "peer" => format!("{:?}", completed_request.last_submitted_peer), - ); - completed_request.state = BlockRequestsState::Queued; - re_run = true; - return ( - re_run, - Some(ImportManagerOutcome::DownvotePeer( + + // check if the chain exists + if let Some(chain) = self.chain.upgrade() { + match chain.process_block(block.clone()) { + Ok(BlockProcessingOutcome::ParentUnknown { parent: _ }) => { + // need to keep looking for parents + completed_request.downloaded_blocks.push(block); + completed_request.state = BlockRequestsState::Queued; + re_run = true; + break; + } + Ok(BlockProcessingOutcome::Processed { block_root: _ }) => {} + Ok(outcome) => { + // it's a future slot or an invalid block, remove it and try again + completed_request.failed_attempts += 1; + trace!( + self.log, "Invalid parent block"; + "outcome" => format!("{:?}", outcome), + "peer" => format!("{:?}", completed_request.last_submitted_peer), + ); + completed_request.state = BlockRequestsState::Queued; + re_run = true; + self.event_queue.push(ImportManagerOutcome::DownvotePeer( completed_request.last_submitted_peer.clone(), - )), - ); - } - Err(e) => { - completed_request.failed_attempts += 1; - warn!( - self.log, "Parent processing error"; - "error" => format!("{:?}", e) - ); - completed_request.state = BlockRequestsState::Queued; - re_run = true; - return ( - re_run, - Some(ImportManagerOutcome::DownvotePeer( + )); + return re_run; + } + Err(e) => { + completed_request.failed_attempts += 1; + warn!( + self.log, "Parent processing error"; + "error" => format!("{:?}", e) + ); + completed_request.state = BlockRequestsState::Queued; + re_run = true; + self.event_queue.push(ImportManagerOutcome::DownvotePeer( completed_request.last_submitted_peer.clone(), - )), - ); + )); + return re_run; + } } + } else { + // chain doesn't exist - clear the event queue and return + self.event_queue.clear(); + return false; } } } - // remove any full completed and processed parent chains + // remove any fully processed parent chains self.parent_queue.retain(|req| { - if req.state == BlockRequestsState::Complete { + if req.state == BlockRequestsState::ReadyToProcess { false } else { true } }); - (re_run, None) + re_run } +} - fn process_blocks(&mut self, blocks: Vec>) -> Result<(), String> { - for block in blocks { - let processing_result = self.chain.process_block(block.clone()); +// Helper function to process blocks +fn process_blocks( + chain: Arc>, + blocks: Vec>, + log: &Logger, +) -> Result<(), String> { + for block in blocks { + let processing_result = chain.process_block(block.clone()); - if let Ok(outcome) = processing_result { - match outcome { - BlockProcessingOutcome::Processed { block_root } => { - // The block was valid and we processed it successfully. + if let Ok(outcome) = processing_result { + match outcome { + BlockProcessingOutcome::Processed { block_root } => { + // The block was valid and we processed it successfully. + trace!( + log, "Imported block from network"; + "slot" => block.slot, + "block_root" => format!("{}", block_root), + ); + } + BlockProcessingOutcome::ParentUnknown { parent } => { + // blocks should be sequential and all parents should exist + trace!( + log, "ParentBlockUnknown"; + "parent_root" => format!("{}", parent), + "baby_block_slot" => block.slot, + ); + return Err(format!( + "Block at slot {} has an unknown parent.", + block.slot + )); + } + BlockProcessingOutcome::FutureSlot { + present_slot, + block_slot, + } => { + if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot { + // The block is too far in the future, drop it. trace!( - self.log, "Imported block from network"; - "slot" => block.slot, - "block_root" => format!("{}", block_root), - ); - } - BlockProcessingOutcome::ParentUnknown { parent } => { - // blocks should be sequential and all parents should exist - trace!( - self.log, "ParentBlockUnknown"; - "parent_root" => format!("{}", parent), - "baby_block_slot" => block.slot, + log, "FutureBlock"; + "msg" => "block for future slot rejected, check your time", + "present_slot" => present_slot, + "block_slot" => block_slot, + "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, ); return Err(format!( - "Block at slot {} has an unknown parent.", + "Block at slot {} is too far in the future", block.slot )); - } - BlockProcessingOutcome::FutureSlot { - present_slot, - block_slot, - } => { - if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot { - // The block is too far in the future, drop it. - trace!( - self.log, "FutureBlock"; - "msg" => "block for future slot rejected, check your time", - "present_slot" => present_slot, - "block_slot" => block_slot, - "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, - ); - return Err(format!( - "Block at slot {} is too far in the future", - block.slot - )); - } else { - // The block is in the future, but not too far. - trace!( - self.log, "QueuedFutureBlock"; - "msg" => "queuing future block, check your time", - "present_slot" => present_slot, - "block_slot" => block_slot, - "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, - ); - } - } - BlockProcessingOutcome::FinalizedSlot => { + } else { + // The block is in the future, but not too far. trace!( - self.log, "Finalized or earlier block processed"; - "outcome" => format!("{:?}", outcome), + log, "QueuedFutureBlock"; + "msg" => "queuing future block, check your time", + "present_slot" => present_slot, + "block_slot" => block_slot, + "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, ); - // block reached our finalized slot or was earlier, move to the next block - } - _ => { - trace!( - self.log, "InvalidBlock"; - "msg" => "peer sent invalid block", - "outcome" => format!("{:?}", outcome), - ); - return Err(format!("Invalid block at slot {}", block.slot)); } } - } else { - trace!( - self.log, "BlockProcessingFailure"; - "msg" => "unexpected condition in processing block.", - "outcome" => format!("{:?}", processing_result) - ); - return Err(format!( - "Unexpected block processing error: {:?}", - processing_result - )); + BlockProcessingOutcome::FinalizedSlot => { + trace!( + log, "Finalized or earlier block processed"; + "outcome" => format!("{:?}", outcome), + ); + // block reached our finalized slot or was earlier, move to the next block + } + _ => { + trace!( + log, "InvalidBlock"; + "msg" => "peer sent invalid block", + "outcome" => format!("{:?}", outcome), + ); + return Err(format!("Invalid block at slot {}", block.slot)); + } } + } else { + trace!( + log, "BlockProcessingFailure"; + "msg" => "unexpected condition in processing block.", + "outcome" => format!("{:?}", processing_result) + ); + return Err(format!( + "Unexpected block processing error: {:?}", + processing_result + )); } - Ok(()) } -} - -fn root_at_slot( - chain: &Arc>, - target_slot: Slot, -) -> Option { - chain - .rev_iter_block_roots() - .find(|(_root, slot)| *slot == target_slot) - .map(|(root, _slot)| root) + Ok(()) } diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index dd857d8c39..36947082eb 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -6,7 +6,6 @@ use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::PeerId; use slog::{debug, info, o, trace, warn}; use ssz::Encode; -use std::ops::Sub; use std::sync::Arc; use store::Store; use tokio::sync::mpsc; @@ -190,7 +189,7 @@ impl SimpleSync { trace!( self.log, "Out of date or potentially sync'd peer found"; "peer" => format!("{:?}", peer_id), - "remote_head_slot" => remote.head_slot + "remote_head_slot" => remote.head_slot, "remote_latest_finalized_epoch" => remote.finalized_epoch, ); @@ -386,7 +385,7 @@ impl SimpleSync { "peer" => format!("{:?}", peer_id), "msg" => "Failed to return all requested hashes", "start_slot" => req.start_slot, - "current_slot" => self.chain.present_slot(), + "current_slot" => self.chain.best_slot(), "requested" => req.count, "returned" => blocks.len(), ); diff --git a/beacon_node/src/main.rs b/beacon_node/src/main.rs index 26537c6f76..ea801cd8bc 100644 --- a/beacon_node/src/main.rs +++ b/beacon_node/src/main.rs @@ -33,14 +33,14 @@ fn main() { .arg( Arg::with_name("logfile") .long("logfile") - .value_name("logfile") + .value_name("FILE") .help("File path where output will be written.") .takes_value(true), ) .arg( Arg::with_name("network-dir") .long("network-dir") - .value_name("NETWORK-DIR") + .value_name("DIR") .help("Data directory for network keys.") .takes_value(true) .global(true) @@ -83,7 +83,7 @@ fn main() { Arg::with_name("boot-nodes") .long("boot-nodes") .allow_hyphen_values(true) - .value_name("BOOTNODES") + .value_name("ENR-LIST") .help("One or more comma-delimited base64-encoded ENR's to bootstrap the p2p network.") .takes_value(true), ) @@ -128,13 +128,14 @@ fn main() { .arg( Arg::with_name("rpc-address") .long("rpc-address") - .value_name("Address") + .value_name("ADDRESS") .help("Listen address for RPC endpoint.") .takes_value(true), ) .arg( Arg::with_name("rpc-port") .long("rpc-port") + .value_name("PORT") .help("Listen port for RPC endpoint.") .conflicts_with("port-bump") .takes_value(true), @@ -149,14 +150,14 @@ fn main() { .arg( Arg::with_name("api-address") .long("api-address") - .value_name("APIADDRESS") + .value_name("ADDRESS") .help("Set the listen address for the RESTful HTTP API server.") .takes_value(true), ) .arg( Arg::with_name("api-port") .long("api-port") - .value_name("APIPORT") + .value_name("PORT") .help("Set the listen TCP port for the RESTful HTTP API server.") .conflicts_with("port-bump") .takes_value(true),