From 86a18e72c4e52c651271433a9495671fba4326bc Mon Sep 17 00:00:00 2001 From: divma Date: Sun, 4 Oct 2020 23:49:14 +0000 Subject: [PATCH] Sync fixes (#1716) ## Issue Addressed chain state inconsistencies ## Proposed Changes - a batch can be fake-failed by Range if it needs to move a peer to another chain. The peer will still send blocks/ errors / produce timeouts for those requests, so check when we get a response from the RPC that the request id matches, instead of only the peer, since a re-request can be directed to the same peer. - if an optimistic batch succeeds, store the attempt to avoid trying it again when quickly switching chains. Also, use it only if ahead of our current target, instead of the segment's start epoch --- beacon_node/network/src/sync/manager.rs | 50 +++++++-------- .../network/src/sync/network_context.rs | 4 +- .../network/src/sync/range_sync/batch.rs | 27 ++++---- .../network/src/sync/range_sync/chain.rs | 64 ++++++++++--------- .../network/src/sync/range_sync/range.rs | 6 +- 5 files changed, 81 insertions(+), 70 deletions(-) diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index a2f4792928..deedc1448f 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -366,7 +366,7 @@ impl SyncManager { // check if the parent of this block isn't in our failed cache. If it is, this // chain should be dropped and the peer downscored. if self.failed_chains.contains(&block.message.parent_root) { - debug!(self.log, "Parent chain ignored due to past failure"; "block" => format!("{:?}", block.message.parent_root), "slot" => block.message.slot); + debug!(self.log, "Parent chain ignored due to past failure"; "block" => ?block.message.parent_root, "slot" => block.message.slot); if !parent_request.downloaded_blocks.is_empty() { // Add the root block to failed chains self.failed_chains @@ -392,7 +392,7 @@ impl SyncManager { // This can be allowed as some clients may implement pruning. We mildly // tolerate this behaviour. if !single_block_request.block_returned { - warn!(self.log, "Peer didn't respond with a block it referenced"; "referenced_block_hash" => format!("{}", single_block_request.hash), "peer_id" => format!("{}", peer_id)); + warn!(self.log, "Peer didn't respond with a block it referenced"; "referenced_block_hash" => %single_block_request.hash, "peer_id" => %peer_id); self.network .report_peer(peer_id, PeerAction::MidToleranceError); } @@ -433,7 +433,7 @@ impl SyncManager { error!( self.log, "Failed to send sync block to processor"; - "error" => format!("{:?}", e) + "error" => ?e ); return None; } @@ -465,7 +465,7 @@ impl SyncManager { if expected_block_hash != block.canonical_root() { // The peer that sent this, sent us the wrong block. // We do not tolerate this behaviour. The peer is instantly disconnected and banned. - warn!(self.log, "Peer sent incorrect block for single block lookup"; "peer_id" => format!("{}", peer_id)); + warn!(self.log, "Peer sent incorrect block for single block lookup"; "peer_id" => %peer_id); self.network.goodbye_peer(peer_id, GoodbyeReason::Fault); return; } @@ -478,7 +478,7 @@ impl SyncManager { // we have the correct block, try and process it match block_result { Ok(block_root) => { - info!(self.log, "Processed block"; "block" => format!("{}", block_root)); + info!(self.log, "Processed block"; "block" => %block_root); match self.chain.fork_choice() { Ok(()) => trace!( @@ -489,7 +489,7 @@ impl SyncManager { Err(e) => error!( self.log, "Fork choice failed"; - "error" => format!("{:?}", e), + "error" => ?e, "location" => "single block" ), } @@ -502,10 +502,10 @@ impl SyncManager { trace!(self.log, "Single block lookup already known"); } Err(BlockError::BeaconChainError(e)) => { - warn!(self.log, "Unexpected block processing error"; "error" => format!("{:?}", e)); + warn!(self.log, "Unexpected block processing error"; "error" => ?e); } outcome => { - warn!(self.log, "Single block lookup failed"; "outcome" => format!("{:?}", outcome)); + warn!(self.log, "Single block lookup failed"; "outcome" => ?outcome); // This could be a range of errors. But we couldn't process the block. // For now we consider this a mid tolerance error. self.network @@ -542,7 +542,7 @@ impl SyncManager { if self.failed_chains.contains(&block.message.parent_root) || self.failed_chains.contains(&block_root) { - debug!(self.log, "Block is from a past failed chain. Dropping"; "block_root" => format!("{:?}", block_root), "block_slot" => block.message.slot); + debug!(self.log, "Block is from a past failed chain. Dropping"; "block_root" => ?block_root, "block_slot" => block.message.slot); return; } @@ -559,7 +559,7 @@ impl SyncManager { } } - debug!(self.log, "Unknown block received. Starting a parent lookup"; "block_slot" => block.message.slot, "block_hash" => format!("{}", block.canonical_root())); + debug!(self.log, "Unknown block received. Starting a parent lookup"; "block_slot" => block.message.slot, "block_hash" => %block.canonical_root()); let parent_request = ParentRequests { downloaded_blocks: vec![block], @@ -636,10 +636,10 @@ impl SyncManager { let head_slot = sync_info.head_slot; let finalized_epoch = sync_info.finalized_epoch; if peer_info.sync_status.update_synced(sync_info.into()) { - debug!(self.log, "Peer transitioned sync state"; "new_state" => "synced", "peer_id" => format!("{}", peer_id), "head_slot" => head_slot, "finalized_epoch" => finalized_epoch); + debug!(self.log, "Peer transitioned sync state"; "new_state" => "synced", "peer_id" => %peer_id, "head_slot" => head_slot, "finalized_epoch" => finalized_epoch); } } else { - crit!(self.log, "Status'd peer is unknown"; "peer_id" => format!("{}", peer_id)); + crit!(self.log, "Status'd peer is unknown"; "peer_id" => %peer_id); } self.update_sync_state(); } @@ -650,10 +650,10 @@ impl SyncManager { let head_slot = sync_info.head_slot; let finalized_epoch = sync_info.finalized_epoch; if peer_info.sync_status.update_advanced(sync_info.into()) { - debug!(self.log, "Peer transitioned sync state"; "new_state" => "advanced", "peer_id" => format!("{}", peer_id), "head_slot" => head_slot, "finalized_epoch" => finalized_epoch); + debug!(self.log, "Peer transitioned sync state"; "new_state" => "advanced", "peer_id" => %peer_id, "head_slot" => head_slot, "finalized_epoch" => finalized_epoch); } } else { - crit!(self.log, "Status'd peer is unknown"; "peer_id" => format!("{}", peer_id)); + crit!(self.log, "Status'd peer is unknown"; "peer_id" => %peer_id); } self.update_sync_state(); } @@ -664,10 +664,10 @@ impl SyncManager { let head_slot = sync_info.head_slot; let finalized_epoch = sync_info.finalized_epoch; if peer_info.sync_status.update_behind(sync_info.into()) { - debug!(self.log, "Peer transitioned sync state"; "new_state" => "behind", "peer_id" => format!("{}", peer_id), "head_slot" => head_slot, "finalized_epoch" => finalized_epoch); + debug!(self.log, "Peer transitioned sync state"; "new_state" => "behind", "peer_id" => %peer_id, "head_slot" => head_slot, "finalized_epoch" => finalized_epoch); } } else { - crit!(self.log, "Status'd peer is unknown"; "peer_id" => format!("{}", peer_id)); + crit!(self.log, "Status'd peer is unknown"; "peer_id" => %peer_id); } self.update_sync_state(); } @@ -675,7 +675,7 @@ impl SyncManager { /// Updates the global sync state and logs any changes. fn update_sync_state(&mut self) { if let Some((old_state, new_state)) = self.network_globals.update_sync_state() { - info!(self.log, "Sync state updated"; "old_state" => format!("{}", old_state), "new_state" => format!("{}",new_state)); + info!(self.log, "Sync state updated"; "old_state" => %old_state, "new_state" => %new_state); // If we have become synced - Subscribe to all the core subnet topics if new_state == eth2_libp2p::types::SyncState::Synced { self.network.subscribe_core_topics(); @@ -715,9 +715,9 @@ impl SyncManager { let peer = parent_request.last_submitted_peer.clone(); warn!(self.log, "Peer sent invalid parent."; - "peer_id" => format!("{:?}",peer), - "received_block" => format!("{}", block_hash), - "expected_parent" => format!("{}", expected_hash), + "peer_id" => %peer, + "received_block" => %block_hash, + "expected_parent" => %expected_hash, ); // We try again, but downvote the peer. @@ -772,7 +772,7 @@ impl SyncManager { error!( self.log, "Failed to send chain segment to processor"; - "error" => format!("{:?}", e) + "error" => ?e ); } } @@ -782,9 +782,9 @@ impl SyncManager { // us the last block warn!( self.log, "Invalid parent chain"; - "score_adjustment" => PeerAction::MidToleranceError.to_string(), - "outcome" => format!("{:?}", outcome), - "last_peer" => parent_request.last_submitted_peer.to_string(), + "score_adjustment" => %PeerAction::MidToleranceError, + "outcome" => ?outcome, + "last_peer" => %parent_request.last_submitted_peer, ); // Add this chain to cache of failed chains @@ -827,7 +827,7 @@ impl SyncManager { }; debug!(self.log, "Parent import failed"; - "block" => format!("{:?}",parent_request.downloaded_blocks[0].canonical_root()), + "block" => ?parent_request.downloaded_blocks[0].canonical_root(), "ancestors_found" => parent_request.downloaded_blocks.len(), "reason" => error ); diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 715344eb18..c9b0ee058d 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -87,7 +87,7 @@ impl SyncNetworkContext { request: BlocksByRangeRequest, chain_id: ChainId, batch_id: BatchId, - ) -> Result<(), &'static str> { + ) -> Result { trace!( self.log, "Sending BlocksByRange Request"; @@ -97,7 +97,7 @@ impl SyncNetworkContext { ); let req_id = self.send_rpc_request(peer_id, Request::BlocksByRange(request))?; self.range_requests.insert(req_id, (chain_id, batch_id)); - Ok(()) + Ok(req_id) } pub fn blocks_by_range_response( diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 532dafd2e4..aa863576fc 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -1,3 +1,4 @@ +use crate::sync::RequestId; use eth2_libp2p::rpc::methods::BlocksByRangeRequest; use eth2_libp2p::PeerId; use ssz::Encode; @@ -32,7 +33,7 @@ pub enum BatchState { /// The batch has failed either downloading or processing, but can be requested again. AwaitingDownload, /// The batch is being downloaded. - Downloading(PeerId, Vec>), + Downloading(PeerId, Vec>, RequestId), /// The batch has been completely downloaded and is ready for processing. AwaitingProcessing(PeerId, Vec>), /// The batch is being processed. @@ -99,7 +100,7 @@ impl BatchInfo { pub fn current_peer(&self) -> Option<&PeerId> { match &self.state { BatchState::AwaitingDownload | BatchState::Failed => None, - BatchState::Downloading(peer_id, _) + BatchState::Downloading(peer_id, _, _) | BatchState::AwaitingProcessing(peer_id, _) | BatchState::Processing(Attempt { peer_id, .. }) | BatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(&peer_id), @@ -126,9 +127,9 @@ impl BatchInfo { /// Adds a block to a downloading batch. pub fn add_block(&mut self, block: SignedBeaconBlock) { match self.state.poison() { - BatchState::Downloading(peer, mut blocks) => { + BatchState::Downloading(peer, mut blocks, req_id) => { blocks.push(block); - self.state = BatchState::Downloading(peer, blocks) + self.state = BatchState::Downloading(peer, blocks, req_id) } other => unreachable!("Add block for batch in wrong state: {:?}", other), } @@ -148,7 +149,7 @@ impl BatchInfo { ), > { match self.state.poison() { - BatchState::Downloading(peer, blocks) => { + BatchState::Downloading(peer, blocks, _request_id) => { // verify that blocks are in range if let Some(last_slot) = blocks.last().map(|b| b.slot()) { // the batch is non-empty @@ -189,7 +190,7 @@ impl BatchInfo { #[must_use = "Batch may have failed"] pub fn download_failed(&mut self) -> &BatchState { match self.state.poison() { - BatchState::Downloading(peer, _) => { + BatchState::Downloading(peer, _, _request_id) => { // register the attempt and check if the batch can be tried again self.failed_download_attempts.push(peer); self.state = if self.failed_download_attempts.len() @@ -206,10 +207,10 @@ impl BatchInfo { } } - pub fn start_downloading_from_peer(&mut self, peer: PeerId) { + pub fn start_downloading_from_peer(&mut self, peer: PeerId, request_id: RequestId) { match self.state.poison() { BatchState::AwaitingDownload => { - self.state = BatchState::Downloading(peer, Vec::new()); + self.state = BatchState::Downloading(peer, Vec::new(), request_id); } other => unreachable!("Starting download for batch in wrong state: {:?}", other), } @@ -333,9 +334,13 @@ impl std::fmt::Debug for BatchState { BatchState::AwaitingProcessing(ref peer, ref blocks) => { write!(f, "AwaitingProcessing({}, {} blocks)", peer, blocks.len()) } - BatchState::Downloading(peer, blocks) => { - write!(f, "Downloading({}, {} blocks)", peer, blocks.len()) - } + BatchState::Downloading(peer, blocks, request_id) => write!( + f, + "Downloading({}, {} blocks, {})", + peer, + blocks.len(), + request_id + ), BatchState::Poisoned => f.write_str("Poisoned"), } } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 4decdc212a..755f9e0e00 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,7 +1,7 @@ use super::batch::{BatchInfo, BatchState}; use crate::beacon_processor::ProcessId; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; -use crate::sync::{network_context::SyncNetworkContext, BatchProcessResult}; +use crate::sync::{network_context::SyncNetworkContext, BatchProcessResult, RequestId}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::{PeerAction, PeerId}; use fnv::FnvHashMap; @@ -75,9 +75,9 @@ pub struct SyncingChain { /// If a block is imported for this batch, the chain advances to this point. optimistic_start: Option, - /// When a batch for an optimistic start fails processing, it is stored to avoid trying it - /// again due to chain stopping/re-starting on chain switching. - failed_optimistic_starts: HashSet, + /// When a batch for an optimistic start is tried (either successful or not), it is stored to + /// avoid trying it again due to chain stopping/re-starting on chain switching. + attempted_optimistic_starts: HashSet, /// The current state of the chain. pub state: ChainSyncingState, @@ -135,7 +135,7 @@ impl SyncingChain { to_be_downloaded: start_epoch, processing_target: start_epoch, optimistic_start: None, - failed_optimistic_starts: HashSet::default(), + attempted_optimistic_starts: HashSet::default(), state: ChainSyncingState::Stopped, current_processing_batch: None, beacon_processor_send, @@ -200,7 +200,8 @@ impl SyncingChain { &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, - peer_id: PeerId, + peer_id: &PeerId, + request_id: RequestId, beacon_block: Option>, ) -> ProcessingResult { // check if we have this batch @@ -213,9 +214,14 @@ impl SyncingChain { Some(batch) => { // A batch could be retried without the peer failing the request (disconnecting/ // sending an error /timeout) if the peer is removed from the chain for other - // reasons. Check that this block belongs to the expected peer - if Some(&peer_id) != batch.current_peer() { - return ProcessingResult::KeepChain; + // reasons. Check that this block belongs to the expected peer, and that the + // request_id matches + if let BatchState::Downloading(expected_peer, _, expected_request_id) = + batch.state() + { + if expected_peer != peer_id || expected_request_id != &request_id { + return ProcessingResult::KeepChain; + } } batch } @@ -228,11 +234,9 @@ impl SyncingChain { } else { // A stream termination has been sent. This batch has ended. Process a completed batch. // Remove the request from the peer's active batches - let peer = batch - .current_peer() - .expect("Batch is downloading from a peer"); + self.peers - .get_mut(peer) + .get_mut(peer_id) .unwrap_or_else(|| panic!("Batch is registered for the peer")) .remove(&batch_id); @@ -442,6 +446,8 @@ impl SyncingChain { // blocks. if *was_non_empty { self.advance_chain(network, batch_id); + // we register so that on chain switching we don't try it again + self.attempted_optimistic_starts.insert(batch_id); } else if let Some(epoch) = self.optimistic_start { // check if this batch corresponds to an optimistic batch. In this case, we // reject it as an optimistic candidate since the batch was empty @@ -520,9 +526,8 @@ impl SyncingChain { redownload: bool, reason: &str, ) -> ProcessingResult { - if let Some(epoch) = self.optimistic_start { - self.optimistic_start = None; - self.failed_optimistic_starts.insert(epoch); + if let Some(epoch) = self.optimistic_start.take() { + self.attempted_optimistic_starts.insert(epoch); // if this batch is inside the current processing range, keep it, otherwise drop // it. NOTE: this is done to prevent non-sequential batches coming from optimistic // starts from filling up the buffer size @@ -628,7 +633,7 @@ impl SyncingChain { self.start_epoch = validating_epoch; self.to_be_downloaded = self.to_be_downloaded.max(validating_epoch); if self.batches.contains_key(&self.to_be_downloaded) { - // if a chain is advanced by Range beyond the previous `seld.to_be_downloaded`, we + // if a chain is advanced by Range beyond the previous `self.to_be_downloaded`, we // won't have this batch, so we need to request it. self.to_be_downloaded += EPOCHS_PER_BATCH; } @@ -732,8 +737,8 @@ impl SyncingChain { // advance the chain to the new validating epoch self.advance_chain(network, validating_epoch); if self.optimistic_start.is_none() - && optimistic_epoch > self.start_epoch - && !self.failed_optimistic_starts.contains(&optimistic_epoch) + && optimistic_epoch > self.processing_target + && !self.attempted_optimistic_starts.contains(&optimistic_epoch) { self.optimistic_start = Some(optimistic_epoch); } @@ -782,21 +787,21 @@ impl SyncingChain { &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, - peer_id: PeerId, + peer_id: &PeerId, + request_id: RequestId, ) -> ProcessingResult { if let Some(batch) = self.batches.get_mut(&batch_id) { // A batch could be retried without the peer failing the request (disconnecting/ // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer - if Some(&peer_id) != batch.current_peer() { - return ProcessingResult::KeepChain; + if let BatchState::Downloading(expected_peer, _, expected_request_id) = batch.state() { + if expected_peer != peer_id || expected_request_id != &request_id { + return ProcessingResult::KeepChain; + } } debug!(self.log, "Batch failed. RPC Error"; "batch_epoch" => batch_id); - let failed_peer = batch - .current_peer() - .expect("Batch is downloading from a peer"); self.peers - .get_mut(failed_peer) + .get_mut(peer_id) .expect("Peer belongs to the chain") .remove(&batch_id); if let BatchState::Failed = batch.download_failed() { @@ -851,10 +856,10 @@ impl SyncingChain { ) -> ProcessingResult { if let Some(batch) = self.batches.get_mut(&batch_id) { let request = batch.to_blocks_by_range_request(); - // inform the batch about the new request - batch.start_downloading_from_peer(peer.clone()); match network.blocks_by_range_request(peer.clone(), request, self.id, batch_id) { - Ok(()) => { + Ok(request_id) => { + // inform the batch about the new request + batch.start_downloading_from_peer(peer.clone(), request_id); if self .optimistic_start .map(|epoch| epoch == batch_id) @@ -876,6 +881,7 @@ impl SyncingChain { warn!(self.log, "Could not send batch request"; "batch_id" => batch_id, "error" => e, &batch); // register the failed download and check if the batch can be retried + batch.start_downloading_from_peer(peer.clone(), 1); // fake request_id is not relevant self.peers .get_mut(&peer) .expect("peer belongs to the peer pool") diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 6847838e04..48a9bd5d40 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -214,7 +214,7 @@ impl RangeSync { { // check if this chunk removes the chain match self.chains.call_by_id(chain_id, |chain| { - chain.on_block_response(network, batch_id, peer_id, beacon_block) + chain.on_block_response(network, batch_id, &peer_id, request_id, beacon_block) }) { Ok((removed_chain, sync_type)) => { if let Some(removed_chain) = removed_chain { @@ -228,7 +228,7 @@ impl RangeSync { } } } else { - warn!(self.log, "Response/Error for non registered request"; "request_id" => request_id) + debug!(self.log, "Response/Error for non registered request"; "request_id" => request_id) } } @@ -337,7 +337,7 @@ impl RangeSync { if let Some((chain_id, batch_id)) = network.blocks_by_range_response(request_id, true) { // check that this request is pending match self.chains.call_by_id(chain_id, |chain| { - chain.inject_error(network, batch_id, peer_id) + chain.inject_error(network, batch_id, &peer_id, request_id) }) { Ok((removed_chain, sync_type)) => { if let Some(removed_chain) = removed_chain {