diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index a2f4792928..deedc1448f 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -366,7 +366,7 @@ impl SyncManager { // check if the parent of this block isn't in our failed cache. If it is, this // chain should be dropped and the peer downscored. if self.failed_chains.contains(&block.message.parent_root) { - debug!(self.log, "Parent chain ignored due to past failure"; "block" => format!("{:?}", block.message.parent_root), "slot" => block.message.slot); + debug!(self.log, "Parent chain ignored due to past failure"; "block" => ?block.message.parent_root, "slot" => block.message.slot); if !parent_request.downloaded_blocks.is_empty() { // Add the root block to failed chains self.failed_chains @@ -392,7 +392,7 @@ impl SyncManager { // This can be allowed as some clients may implement pruning. We mildly // tolerate this behaviour. if !single_block_request.block_returned { - warn!(self.log, "Peer didn't respond with a block it referenced"; "referenced_block_hash" => format!("{}", single_block_request.hash), "peer_id" => format!("{}", peer_id)); + warn!(self.log, "Peer didn't respond with a block it referenced"; "referenced_block_hash" => %single_block_request.hash, "peer_id" => %peer_id); self.network .report_peer(peer_id, PeerAction::MidToleranceError); } @@ -433,7 +433,7 @@ impl SyncManager { error!( self.log, "Failed to send sync block to processor"; - "error" => format!("{:?}", e) + "error" => ?e ); return None; } @@ -465,7 +465,7 @@ impl SyncManager { if expected_block_hash != block.canonical_root() { // The peer that sent this, sent us the wrong block. // We do not tolerate this behaviour. The peer is instantly disconnected and banned. - warn!(self.log, "Peer sent incorrect block for single block lookup"; "peer_id" => format!("{}", peer_id)); + warn!(self.log, "Peer sent incorrect block for single block lookup"; "peer_id" => %peer_id); self.network.goodbye_peer(peer_id, GoodbyeReason::Fault); return; } @@ -478,7 +478,7 @@ impl SyncManager { // we have the correct block, try and process it match block_result { Ok(block_root) => { - info!(self.log, "Processed block"; "block" => format!("{}", block_root)); + info!(self.log, "Processed block"; "block" => %block_root); match self.chain.fork_choice() { Ok(()) => trace!( @@ -489,7 +489,7 @@ impl SyncManager { Err(e) => error!( self.log, "Fork choice failed"; - "error" => format!("{:?}", e), + "error" => ?e, "location" => "single block" ), } @@ -502,10 +502,10 @@ impl SyncManager { trace!(self.log, "Single block lookup already known"); } Err(BlockError::BeaconChainError(e)) => { - warn!(self.log, "Unexpected block processing error"; "error" => format!("{:?}", e)); + warn!(self.log, "Unexpected block processing error"; "error" => ?e); } outcome => { - warn!(self.log, "Single block lookup failed"; "outcome" => format!("{:?}", outcome)); + warn!(self.log, "Single block lookup failed"; "outcome" => ?outcome); // This could be a range of errors. But we couldn't process the block. // For now we consider this a mid tolerance error. self.network @@ -542,7 +542,7 @@ impl SyncManager { if self.failed_chains.contains(&block.message.parent_root) || self.failed_chains.contains(&block_root) { - debug!(self.log, "Block is from a past failed chain. Dropping"; "block_root" => format!("{:?}", block_root), "block_slot" => block.message.slot); + debug!(self.log, "Block is from a past failed chain. Dropping"; "block_root" => ?block_root, "block_slot" => block.message.slot); return; } @@ -559,7 +559,7 @@ impl SyncManager { } } - debug!(self.log, "Unknown block received. Starting a parent lookup"; "block_slot" => block.message.slot, "block_hash" => format!("{}", block.canonical_root())); + debug!(self.log, "Unknown block received. Starting a parent lookup"; "block_slot" => block.message.slot, "block_hash" => %block.canonical_root()); let parent_request = ParentRequests { downloaded_blocks: vec![block], @@ -636,10 +636,10 @@ impl SyncManager { let head_slot = sync_info.head_slot; let finalized_epoch = sync_info.finalized_epoch; if peer_info.sync_status.update_synced(sync_info.into()) { - debug!(self.log, "Peer transitioned sync state"; "new_state" => "synced", "peer_id" => format!("{}", peer_id), "head_slot" => head_slot, "finalized_epoch" => finalized_epoch); + debug!(self.log, "Peer transitioned sync state"; "new_state" => "synced", "peer_id" => %peer_id, "head_slot" => head_slot, "finalized_epoch" => finalized_epoch); } } else { - crit!(self.log, "Status'd peer is unknown"; "peer_id" => format!("{}", peer_id)); + crit!(self.log, "Status'd peer is unknown"; "peer_id" => %peer_id); } self.update_sync_state(); } @@ -650,10 +650,10 @@ impl SyncManager { let head_slot = sync_info.head_slot; let finalized_epoch = sync_info.finalized_epoch; if peer_info.sync_status.update_advanced(sync_info.into()) { - debug!(self.log, "Peer transitioned sync state"; "new_state" => "advanced", "peer_id" => format!("{}", peer_id), "head_slot" => head_slot, "finalized_epoch" => finalized_epoch); + debug!(self.log, "Peer transitioned sync state"; "new_state" => "advanced", "peer_id" => %peer_id, "head_slot" => head_slot, "finalized_epoch" => finalized_epoch); } } else { - crit!(self.log, "Status'd peer is unknown"; "peer_id" => format!("{}", peer_id)); + crit!(self.log, "Status'd peer is unknown"; "peer_id" => %peer_id); } self.update_sync_state(); } @@ -664,10 +664,10 @@ impl SyncManager { let head_slot = sync_info.head_slot; let finalized_epoch = sync_info.finalized_epoch; if peer_info.sync_status.update_behind(sync_info.into()) { - debug!(self.log, "Peer transitioned sync state"; "new_state" => "behind", "peer_id" => format!("{}", peer_id), "head_slot" => head_slot, "finalized_epoch" => finalized_epoch); + debug!(self.log, "Peer transitioned sync state"; "new_state" => "behind", "peer_id" => %peer_id, "head_slot" => head_slot, "finalized_epoch" => finalized_epoch); } } else { - crit!(self.log, "Status'd peer is unknown"; "peer_id" => format!("{}", peer_id)); + crit!(self.log, "Status'd peer is unknown"; "peer_id" => %peer_id); } self.update_sync_state(); } @@ -675,7 +675,7 @@ impl SyncManager { /// Updates the global sync state and logs any changes. fn update_sync_state(&mut self) { if let Some((old_state, new_state)) = self.network_globals.update_sync_state() { - info!(self.log, "Sync state updated"; "old_state" => format!("{}", old_state), "new_state" => format!("{}",new_state)); + info!(self.log, "Sync state updated"; "old_state" => %old_state, "new_state" => %new_state); // If we have become synced - Subscribe to all the core subnet topics if new_state == eth2_libp2p::types::SyncState::Synced { self.network.subscribe_core_topics(); @@ -715,9 +715,9 @@ impl SyncManager { let peer = parent_request.last_submitted_peer.clone(); warn!(self.log, "Peer sent invalid parent."; - "peer_id" => format!("{:?}",peer), - "received_block" => format!("{}", block_hash), - "expected_parent" => format!("{}", expected_hash), + "peer_id" => %peer, + "received_block" => %block_hash, + "expected_parent" => %expected_hash, ); // We try again, but downvote the peer. @@ -772,7 +772,7 @@ impl SyncManager { error!( self.log, "Failed to send chain segment to processor"; - "error" => format!("{:?}", e) + "error" => ?e ); } } @@ -782,9 +782,9 @@ impl SyncManager { // us the last block warn!( self.log, "Invalid parent chain"; - "score_adjustment" => PeerAction::MidToleranceError.to_string(), - "outcome" => format!("{:?}", outcome), - "last_peer" => parent_request.last_submitted_peer.to_string(), + "score_adjustment" => %PeerAction::MidToleranceError, + "outcome" => ?outcome, + "last_peer" => %parent_request.last_submitted_peer, ); // Add this chain to cache of failed chains @@ -827,7 +827,7 @@ impl SyncManager { }; debug!(self.log, "Parent import failed"; - "block" => format!("{:?}",parent_request.downloaded_blocks[0].canonical_root()), + "block" => ?parent_request.downloaded_blocks[0].canonical_root(), "ancestors_found" => parent_request.downloaded_blocks.len(), "reason" => error ); diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 715344eb18..c9b0ee058d 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -87,7 +87,7 @@ impl SyncNetworkContext { request: BlocksByRangeRequest, chain_id: ChainId, batch_id: BatchId, - ) -> Result<(), &'static str> { + ) -> Result { trace!( self.log, "Sending BlocksByRange Request"; @@ -97,7 +97,7 @@ impl SyncNetworkContext { ); let req_id = self.send_rpc_request(peer_id, Request::BlocksByRange(request))?; self.range_requests.insert(req_id, (chain_id, batch_id)); - Ok(()) + Ok(req_id) } pub fn blocks_by_range_response( diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 532dafd2e4..aa863576fc 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -1,3 +1,4 @@ +use crate::sync::RequestId; use eth2_libp2p::rpc::methods::BlocksByRangeRequest; use eth2_libp2p::PeerId; use ssz::Encode; @@ -32,7 +33,7 @@ pub enum BatchState { /// The batch has failed either downloading or processing, but can be requested again. AwaitingDownload, /// The batch is being downloaded. - Downloading(PeerId, Vec>), + Downloading(PeerId, Vec>, RequestId), /// The batch has been completely downloaded and is ready for processing. AwaitingProcessing(PeerId, Vec>), /// The batch is being processed. @@ -99,7 +100,7 @@ impl BatchInfo { pub fn current_peer(&self) -> Option<&PeerId> { match &self.state { BatchState::AwaitingDownload | BatchState::Failed => None, - BatchState::Downloading(peer_id, _) + BatchState::Downloading(peer_id, _, _) | BatchState::AwaitingProcessing(peer_id, _) | BatchState::Processing(Attempt { peer_id, .. }) | BatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(&peer_id), @@ -126,9 +127,9 @@ impl BatchInfo { /// Adds a block to a downloading batch. pub fn add_block(&mut self, block: SignedBeaconBlock) { match self.state.poison() { - BatchState::Downloading(peer, mut blocks) => { + BatchState::Downloading(peer, mut blocks, req_id) => { blocks.push(block); - self.state = BatchState::Downloading(peer, blocks) + self.state = BatchState::Downloading(peer, blocks, req_id) } other => unreachable!("Add block for batch in wrong state: {:?}", other), } @@ -148,7 +149,7 @@ impl BatchInfo { ), > { match self.state.poison() { - BatchState::Downloading(peer, blocks) => { + BatchState::Downloading(peer, blocks, _request_id) => { // verify that blocks are in range if let Some(last_slot) = blocks.last().map(|b| b.slot()) { // the batch is non-empty @@ -189,7 +190,7 @@ impl BatchInfo { #[must_use = "Batch may have failed"] pub fn download_failed(&mut self) -> &BatchState { match self.state.poison() { - BatchState::Downloading(peer, _) => { + BatchState::Downloading(peer, _, _request_id) => { // register the attempt and check if the batch can be tried again self.failed_download_attempts.push(peer); self.state = if self.failed_download_attempts.len() @@ -206,10 +207,10 @@ impl BatchInfo { } } - pub fn start_downloading_from_peer(&mut self, peer: PeerId) { + pub fn start_downloading_from_peer(&mut self, peer: PeerId, request_id: RequestId) { match self.state.poison() { BatchState::AwaitingDownload => { - self.state = BatchState::Downloading(peer, Vec::new()); + self.state = BatchState::Downloading(peer, Vec::new(), request_id); } other => unreachable!("Starting download for batch in wrong state: {:?}", other), } @@ -333,9 +334,13 @@ impl std::fmt::Debug for BatchState { BatchState::AwaitingProcessing(ref peer, ref blocks) => { write!(f, "AwaitingProcessing({}, {} blocks)", peer, blocks.len()) } - BatchState::Downloading(peer, blocks) => { - write!(f, "Downloading({}, {} blocks)", peer, blocks.len()) - } + BatchState::Downloading(peer, blocks, request_id) => write!( + f, + "Downloading({}, {} blocks, {})", + peer, + blocks.len(), + request_id + ), BatchState::Poisoned => f.write_str("Poisoned"), } } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 4decdc212a..755f9e0e00 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,7 +1,7 @@ use super::batch::{BatchInfo, BatchState}; use crate::beacon_processor::ProcessId; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; -use crate::sync::{network_context::SyncNetworkContext, BatchProcessResult}; +use crate::sync::{network_context::SyncNetworkContext, BatchProcessResult, RequestId}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::{PeerAction, PeerId}; use fnv::FnvHashMap; @@ -75,9 +75,9 @@ pub struct SyncingChain { /// If a block is imported for this batch, the chain advances to this point. optimistic_start: Option, - /// When a batch for an optimistic start fails processing, it is stored to avoid trying it - /// again due to chain stopping/re-starting on chain switching. - failed_optimistic_starts: HashSet, + /// When a batch for an optimistic start is tried (either successful or not), it is stored to + /// avoid trying it again due to chain stopping/re-starting on chain switching. + attempted_optimistic_starts: HashSet, /// The current state of the chain. pub state: ChainSyncingState, @@ -135,7 +135,7 @@ impl SyncingChain { to_be_downloaded: start_epoch, processing_target: start_epoch, optimistic_start: None, - failed_optimistic_starts: HashSet::default(), + attempted_optimistic_starts: HashSet::default(), state: ChainSyncingState::Stopped, current_processing_batch: None, beacon_processor_send, @@ -200,7 +200,8 @@ impl SyncingChain { &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, - peer_id: PeerId, + peer_id: &PeerId, + request_id: RequestId, beacon_block: Option>, ) -> ProcessingResult { // check if we have this batch @@ -213,9 +214,14 @@ impl SyncingChain { Some(batch) => { // A batch could be retried without the peer failing the request (disconnecting/ // sending an error /timeout) if the peer is removed from the chain for other - // reasons. Check that this block belongs to the expected peer - if Some(&peer_id) != batch.current_peer() { - return ProcessingResult::KeepChain; + // reasons. Check that this block belongs to the expected peer, and that the + // request_id matches + if let BatchState::Downloading(expected_peer, _, expected_request_id) = + batch.state() + { + if expected_peer != peer_id || expected_request_id != &request_id { + return ProcessingResult::KeepChain; + } } batch } @@ -228,11 +234,9 @@ impl SyncingChain { } else { // A stream termination has been sent. This batch has ended. Process a completed batch. // Remove the request from the peer's active batches - let peer = batch - .current_peer() - .expect("Batch is downloading from a peer"); + self.peers - .get_mut(peer) + .get_mut(peer_id) .unwrap_or_else(|| panic!("Batch is registered for the peer")) .remove(&batch_id); @@ -442,6 +446,8 @@ impl SyncingChain { // blocks. if *was_non_empty { self.advance_chain(network, batch_id); + // we register so that on chain switching we don't try it again + self.attempted_optimistic_starts.insert(batch_id); } else if let Some(epoch) = self.optimistic_start { // check if this batch corresponds to an optimistic batch. In this case, we // reject it as an optimistic candidate since the batch was empty @@ -520,9 +526,8 @@ impl SyncingChain { redownload: bool, reason: &str, ) -> ProcessingResult { - if let Some(epoch) = self.optimistic_start { - self.optimistic_start = None; - self.failed_optimistic_starts.insert(epoch); + if let Some(epoch) = self.optimistic_start.take() { + self.attempted_optimistic_starts.insert(epoch); // if this batch is inside the current processing range, keep it, otherwise drop // it. NOTE: this is done to prevent non-sequential batches coming from optimistic // starts from filling up the buffer size @@ -628,7 +633,7 @@ impl SyncingChain { self.start_epoch = validating_epoch; self.to_be_downloaded = self.to_be_downloaded.max(validating_epoch); if self.batches.contains_key(&self.to_be_downloaded) { - // if a chain is advanced by Range beyond the previous `seld.to_be_downloaded`, we + // if a chain is advanced by Range beyond the previous `self.to_be_downloaded`, we // won't have this batch, so we need to request it. self.to_be_downloaded += EPOCHS_PER_BATCH; } @@ -732,8 +737,8 @@ impl SyncingChain { // advance the chain to the new validating epoch self.advance_chain(network, validating_epoch); if self.optimistic_start.is_none() - && optimistic_epoch > self.start_epoch - && !self.failed_optimistic_starts.contains(&optimistic_epoch) + && optimistic_epoch > self.processing_target + && !self.attempted_optimistic_starts.contains(&optimistic_epoch) { self.optimistic_start = Some(optimistic_epoch); } @@ -782,21 +787,21 @@ impl SyncingChain { &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, - peer_id: PeerId, + peer_id: &PeerId, + request_id: RequestId, ) -> ProcessingResult { if let Some(batch) = self.batches.get_mut(&batch_id) { // A batch could be retried without the peer failing the request (disconnecting/ // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer - if Some(&peer_id) != batch.current_peer() { - return ProcessingResult::KeepChain; + if let BatchState::Downloading(expected_peer, _, expected_request_id) = batch.state() { + if expected_peer != peer_id || expected_request_id != &request_id { + return ProcessingResult::KeepChain; + } } debug!(self.log, "Batch failed. RPC Error"; "batch_epoch" => batch_id); - let failed_peer = batch - .current_peer() - .expect("Batch is downloading from a peer"); self.peers - .get_mut(failed_peer) + .get_mut(peer_id) .expect("Peer belongs to the chain") .remove(&batch_id); if let BatchState::Failed = batch.download_failed() { @@ -851,10 +856,10 @@ impl SyncingChain { ) -> ProcessingResult { if let Some(batch) = self.batches.get_mut(&batch_id) { let request = batch.to_blocks_by_range_request(); - // inform the batch about the new request - batch.start_downloading_from_peer(peer.clone()); match network.blocks_by_range_request(peer.clone(), request, self.id, batch_id) { - Ok(()) => { + Ok(request_id) => { + // inform the batch about the new request + batch.start_downloading_from_peer(peer.clone(), request_id); if self .optimistic_start .map(|epoch| epoch == batch_id) @@ -876,6 +881,7 @@ impl SyncingChain { warn!(self.log, "Could not send batch request"; "batch_id" => batch_id, "error" => e, &batch); // register the failed download and check if the batch can be retried + batch.start_downloading_from_peer(peer.clone(), 1); // fake request_id is not relevant self.peers .get_mut(&peer) .expect("peer belongs to the peer pool") diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 6847838e04..48a9bd5d40 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -214,7 +214,7 @@ impl RangeSync { { // check if this chunk removes the chain match self.chains.call_by_id(chain_id, |chain| { - chain.on_block_response(network, batch_id, peer_id, beacon_block) + chain.on_block_response(network, batch_id, &peer_id, request_id, beacon_block) }) { Ok((removed_chain, sync_type)) => { if let Some(removed_chain) = removed_chain { @@ -228,7 +228,7 @@ impl RangeSync { } } } else { - warn!(self.log, "Response/Error for non registered request"; "request_id" => request_id) + debug!(self.log, "Response/Error for non registered request"; "request_id" => request_id) } } @@ -337,7 +337,7 @@ impl RangeSync { if let Some((chain_id, batch_id)) = network.blocks_by_range_response(request_id, true) { // check that this request is pending match self.chains.call_by_id(chain_id, |chain| { - chain.inject_error(network, batch_id, peer_id) + chain.inject_error(network, batch_id, &peer_id, request_id) }) { Ok((removed_chain, sync_type)) => { if let Some(removed_chain) = removed_chain {