diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 5c2bc65229..aa2694769c 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -1,4 +1,5 @@ use std::collections::hash_map::Entry; +use std::collections::HashMap; use std::time::Duration; use beacon_chain::{BeaconChainTypes, BlockError}; @@ -13,6 +14,7 @@ use store::{Hash256, SignedBeaconBlock}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent}; use crate::metrics; +use self::parent_lookup::PARENT_FAIL_TOLERANCE; use self::{ parent_lookup::{ParentLookup, VerifyError}, single_block_lookup::SingleBlockRequest, @@ -36,8 +38,11 @@ const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60; const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3; pub(crate) struct BlockLookups { - /// A collection of parent block lookups. - parent_queue: SmallVec<[ParentLookup; 3]>, + /// Parent chain lookups being downloaded. + parent_lookups: SmallVec<[ParentLookup; 3]>, + + processing_parent_lookups: + HashMap, SingleBlockRequest)>, /// A cache of failed chain lookups to prevent duplicate searches. failed_chains: LRUTimeCache, @@ -55,7 +60,8 @@ pub(crate) struct BlockLookups { impl BlockLookups { pub fn new(log: Logger) -> Self { Self { - parent_queue: Default::default(), + parent_lookups: Default::default(), + processing_parent_lookups: Default::default(), failed_chains: LRUTimeCache::new(Duration::from_secs( FAILED_CHAINS_CACHE_EXPIRY_SECONDS, )), @@ -78,6 +84,23 @@ impl BlockLookups { return; } + if self.parent_lookups.iter_mut().any(|parent_req| { + parent_req.add_peer(&hash, &peer_id) || parent_req.contains_block(&hash) + }) { + // If the block was already downloaded, or is being downloaded in this moment, do not + // request it. + return; + } + + if self + .processing_parent_lookups + .values() + .any(|(hashes, _last_parent_request)| hashes.contains(&hash)) + { + // we are already processing this block, ignore it. + return; + } + debug!( self.log, "Searching for block"; @@ -118,8 +141,8 @@ impl BlockLookups { // Make sure this block is not already downloaded, and that neither it or its parent is // being searched for. - if self.parent_queue.iter_mut().any(|parent_req| { - parent_req.contains_block(&block) + if self.parent_lookups.iter_mut().any(|parent_req| { + parent_req.contains_block(&block_root) || parent_req.add_peer(&block_root, &peer_id) || parent_req.add_peer(&parent_root, &peer_id) }) { @@ -127,6 +150,15 @@ impl BlockLookups { return; } + if self + .processing_parent_lookups + .values() + .any(|(hashes, _peers)| hashes.contains(&block_root) || hashes.contains(&parent_root)) + { + // we are already processing this block, ignore it. + return; + } + let parent_lookup = ParentLookup::new(block_root, block, peer_id); self.request_parent(parent_lookup, cx); } @@ -207,11 +239,11 @@ impl BlockLookups { cx: &mut SyncNetworkContext, ) { let mut parent_lookup = if let Some(pos) = self - .parent_queue + .parent_lookups .iter() .position(|request| request.pending_response(id)) { - self.parent_queue.remove(pos) + self.parent_lookups.remove(pos) } else { if block.is_some() { debug!(self.log, "Response for a parent lookup request that was not found"; "peer_id" => %peer_id); @@ -233,13 +265,13 @@ impl BlockLookups { ) .is_ok() { - self.parent_queue.push(parent_lookup) + self.parent_lookups.push(parent_lookup) } } Ok(None) => { // Request finished successfully, nothing else to do. It will be removed after the // processing result arrives. - self.parent_queue.push(parent_lookup); + self.parent_lookups.push(parent_lookup); } Err(e) => match e { VerifyError::RootMismatch @@ -276,7 +308,7 @@ impl BlockLookups { metrics::set_gauge( &metrics::SYNC_PARENT_BLOCK_LOOKUPS, - self.parent_queue.len() as i64, + self.parent_lookups.len() as i64, ); } @@ -324,11 +356,11 @@ impl BlockLookups { /* Check disconnection for parent lookups */ while let Some(pos) = self - .parent_queue + .parent_lookups .iter_mut() .position(|req| req.check_peer_disconnected(peer_id).is_err()) { - let parent_lookup = self.parent_queue.remove(pos); + let parent_lookup = self.parent_lookups.remove(pos); trace!(self.log, "Parent lookup's peer disconnected"; &parent_lookup); self.request_parent(parent_lookup, cx); } @@ -342,11 +374,11 @@ impl BlockLookups { cx: &mut SyncNetworkContext, ) { if let Some(pos) = self - .parent_queue + .parent_lookups .iter() .position(|request| request.pending_response(id)) { - let mut parent_lookup = self.parent_queue.remove(pos); + let mut parent_lookup = self.parent_lookups.remove(pos); parent_lookup.download_failed(); trace!(self.log, "Parent lookup request failed"; &parent_lookup); self.request_parent(parent_lookup, cx); @@ -355,7 +387,7 @@ impl BlockLookups { }; metrics::set_gauge( &metrics::SYNC_PARENT_BLOCK_LOOKUPS, - self.parent_queue.len() as i64, + self.parent_lookups.len() as i64, ); } @@ -470,7 +502,7 @@ impl BlockLookups { cx: &mut SyncNetworkContext, ) { let (mut parent_lookup, peer_id) = if let Some((pos, peer)) = self - .parent_queue + .parent_lookups .iter() .enumerate() .find_map(|(pos, request)| { @@ -478,7 +510,7 @@ impl BlockLookups { .get_processing_peer(chain_hash) .map(|peer| (pos, peer)) }) { - (self.parent_queue.remove(pos), peer) + (self.parent_lookups.remove(pos), peer) } else { return debug!(self.log, "Process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash); }; @@ -520,13 +552,13 @@ impl BlockLookups { ); } }; - let chain_hash = parent_lookup.chain_hash(); - let blocks = parent_lookup.chain_blocks(); + let (chain_hash, blocks, hashes, request) = parent_lookup.parts_for_processing(); let process_id = ChainSegmentProcessId::ParentLookup(chain_hash); match beacon_processor_send.try_send(WorkEvent::chain_segment(process_id, blocks)) { Ok(_) => { - self.parent_queue.push(parent_lookup); + self.processing_parent_lookups + .insert(chain_hash, (hashes, request)); } Err(e) => { error!( @@ -580,7 +612,7 @@ impl BlockLookups { metrics::set_gauge( &metrics::SYNC_PARENT_BLOCK_LOOKUPS, - self.parent_queue.len() as i64, + self.parent_lookups.len() as i64, ); } @@ -590,14 +622,11 @@ impl BlockLookups { result: BatchProcessResult, cx: &mut SyncNetworkContext, ) { - let parent_lookup = if let Some(pos) = self - .parent_queue - .iter() - .position(|request| request.chain_hash() == chain_hash) - { - self.parent_queue.remove(pos) - } else { - return debug!(self.log, "Chain process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash); + let request = match self.processing_parent_lookups.remove(&chain_hash) { + Some((_hashes, request)) => request, + None => { + return debug!(self.log, "Chain process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash, "result" => ?result) + } }; debug!(self.log, "Parent chain processed"; "chain_hash" => %chain_hash, "result" => ?result); @@ -609,8 +638,8 @@ impl BlockLookups { imported_blocks: _, penalty, } => { - self.failed_chains.insert(parent_lookup.chain_hash()); - for &peer_id in parent_lookup.used_peers() { + self.failed_chains.insert(chain_hash); + for peer_id in request.used_peers { cx.report_peer(peer_id, penalty, "parent_chain_failure") } } @@ -621,7 +650,7 @@ impl BlockLookups { metrics::set_gauge( &metrics::SYNC_PARENT_BLOCK_LOOKUPS, - self.parent_queue.len() as i64, + self.parent_lookups.len() as i64, ); } @@ -697,14 +726,14 @@ impl BlockLookups { } Ok(_) => { debug!(self.log, "Requesting parent"; &parent_lookup); - self.parent_queue.push(parent_lookup) + self.parent_lookups.push(parent_lookup) } } // We remove and add back again requests so we want this updated regardless of outcome. metrics::set_gauge( &metrics::SYNC_PARENT_BLOCK_LOOKUPS, - self.parent_queue.len() as i64, + self.parent_lookups.len() as i64, ); } @@ -715,6 +744,6 @@ impl BlockLookups { /// Drops all the parent chain requests and returns how many requests were dropped. pub fn drop_parent_chain_requests(&mut self) -> usize { - self.parent_queue.drain(..).len() + self.parent_lookups.drain(..).len() } } diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 38ad59ebc4..a2c2f1d1ce 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -24,7 +24,7 @@ pub(crate) struct ParentLookup { /// The root of the block triggering this parent request. chain_hash: Hash256, /// The blocks that have currently been downloaded. - downloaded_blocks: Vec>>, + downloaded_blocks: Vec>, /// Request of the last parent. current_parent_request: SingleBlockRequest, /// Id of the last parent request. @@ -53,10 +53,10 @@ pub enum RequestError { } impl ParentLookup { - pub fn contains_block(&self, block: &SignedBeaconBlock) -> bool { + pub fn contains_block(&self, block_root: &Hash256) -> bool { self.downloaded_blocks .iter() - .any(|d_block| d_block.as_ref() == block) + .any(|(root, _d_block)| root == block_root) } pub fn new( @@ -68,7 +68,7 @@ impl ParentLookup { Self { chain_hash: block_root, - downloaded_blocks: vec![block], + downloaded_blocks: vec![(block_root, block)], current_parent_request, current_parent_request_id: None, } @@ -100,7 +100,8 @@ impl ParentLookup { pub fn add_block(&mut self, block: Arc>) { let next_parent = block.parent_root(); - self.downloaded_blocks.push(block); + let current_root = self.current_parent_request.hash; + self.downloaded_blocks.push((current_root, block)); self.current_parent_request.hash = next_parent; self.current_parent_request.state = single_block_lookup::State::AwaitingDownload; self.current_parent_request_id = None; @@ -110,6 +111,32 @@ impl ParentLookup { self.current_parent_request_id == Some(req_id) } + /// Consumes the parent request and destructures it into it's parts. + #[allow(clippy::type_complexity)] + pub fn parts_for_processing( + self, + ) -> ( + Hash256, + Vec>>, + Vec, + SingleBlockRequest, + ) { + let ParentLookup { + chain_hash, + downloaded_blocks, + current_parent_request, + current_parent_request_id: _, + } = self; + let block_count = downloaded_blocks.len(); + let mut blocks = Vec::with_capacity(block_count); + let mut hashes = Vec::with_capacity(block_count); + for (hash, block) in downloaded_blocks { + blocks.push(block); + hashes.push(hash); + } + (chain_hash, blocks, hashes, current_parent_request) + } + /// Get the parent lookup's chain hash. pub fn chain_hash(&self) -> Hash256 { self.chain_hash @@ -125,10 +152,6 @@ impl ParentLookup { self.current_parent_request_id = None; } - pub fn chain_blocks(&mut self) -> Vec>> { - std::mem::take(&mut self.downloaded_blocks) - } - /// Verifies that the received block is what we requested. If so, parent lookup now waits for /// the processing result of the block. pub fn verify_block( diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 64a1a6e836..8ade622f8d 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -259,7 +259,7 @@ fn test_single_block_lookup_becomes_parent_request() { assert_eq!(bl.single_block_lookups.len(), 0); rig.expect_parent_request(); rig.expect_empty_network(); - assert_eq!(bl.parent_queue.len(), 1); + assert_eq!(bl.parent_lookups.len(), 1); } #[test] @@ -287,7 +287,7 @@ fn test_parent_lookup_happy_path() { was_non_empty: true, }; bl.parent_chain_processed(chain_hash, process_result, &mut cx); - assert_eq!(bl.parent_queue.len(), 0); + assert_eq!(bl.parent_lookups.len(), 0); } #[test] @@ -324,7 +324,7 @@ fn test_parent_lookup_wrong_response() { was_non_empty: true, }; bl.parent_chain_processed(chain_hash, process_result, &mut cx); - assert_eq!(bl.parent_queue.len(), 0); + assert_eq!(bl.parent_lookups.len(), 0); } #[test] @@ -356,7 +356,7 @@ fn test_parent_lookup_empty_response() { was_non_empty: true, }; bl.parent_chain_processed(chain_hash, process_result, &mut cx); - assert_eq!(bl.parent_queue.len(), 0); + assert_eq!(bl.parent_lookups.len(), 0); } #[test] @@ -387,7 +387,7 @@ fn test_parent_lookup_rpc_failure() { was_non_empty: true, }; bl.parent_chain_processed(chain_hash, process_result, &mut cx); - assert_eq!(bl.parent_queue.len(), 0); + assert_eq!(bl.parent_lookups.len(), 0); } #[test] @@ -419,11 +419,11 @@ fn test_parent_lookup_too_many_attempts() { } } if i < parent_lookup::PARENT_FAIL_TOLERANCE { - assert_eq!(bl.parent_queue[0].failed_attempts(), dbg!(i)); + assert_eq!(bl.parent_lookups[0].failed_attempts(), dbg!(i)); } } - assert_eq!(bl.parent_queue.len(), 0); + assert_eq!(bl.parent_lookups.len(), 0); } #[test] @@ -450,11 +450,11 @@ fn test_parent_lookup_too_many_download_attempts_no_blacklist() { rig.expect_penalty(); } if i < parent_lookup::PARENT_FAIL_TOLERANCE { - assert_eq!(bl.parent_queue[0].failed_attempts(), dbg!(i)); + assert_eq!(bl.parent_lookups[0].failed_attempts(), dbg!(i)); } } - assert_eq!(bl.parent_queue.len(), 0); + assert_eq!(bl.parent_lookups.len(), 0); assert!(!bl.failed_chains.contains(&block_hash)); assert!(!bl.failed_chains.contains(&parent.canonical_root())); } @@ -491,7 +491,7 @@ fn test_parent_lookup_too_many_processing_attempts_must_blacklist() { } assert!(bl.failed_chains.contains(&block_hash)); - assert_eq!(bl.parent_queue.len(), 0); + assert_eq!(bl.parent_lookups.len(), 0); } #[test] @@ -545,7 +545,7 @@ fn test_parent_lookup_disconnection() { &mut cx, ); bl.peer_disconnected(&peer_id, &mut cx); - assert!(bl.parent_queue.is_empty()); + assert!(bl.parent_lookups.is_empty()); } #[test] @@ -598,5 +598,78 @@ fn test_parent_lookup_ignored_response() { // Return an Ignored result. The request should be dropped bl.parent_block_processed(chain_hash, BlockProcessResult::Ignored, &mut cx); rig.expect_empty_network(); - assert_eq!(bl.parent_queue.len(), 0); + assert_eq!(bl.parent_lookups.len(), 0); +} + +/// This is a regression test. +#[test] +fn test_same_chain_race_condition() { + let (mut bl, mut cx, mut rig) = TestRig::test_setup(Some(Level::Debug)); + + #[track_caller] + fn parent_lookups_consistency(bl: &BlockLookups) { + let hashes: Vec<_> = bl + .parent_lookups + .iter() + .map(|req| req.chain_hash()) + .collect(); + let expected = hashes.len(); + assert_eq!( + expected, + hashes + .into_iter() + .collect::>() + .len(), + "duplicated chain hashes in parent queue" + ) + } + // if we use one or two blocks it will match on the hash or the parent hash, so make a longer + // chain. + let depth = 4; + let mut blocks = Vec::>>::with_capacity(depth); + while blocks.len() < depth { + let parent = blocks + .last() + .map(|b| b.canonical_root()) + .unwrap_or_else(Hash256::random); + let block = Arc::new(rig.block_with_parent(parent)); + blocks.push(block); + } + + let peer_id = PeerId::random(); + let trigger_block = blocks.pop().unwrap(); + let chain_hash = trigger_block.canonical_root(); + bl.search_parent(chain_hash, trigger_block.clone(), peer_id, &mut cx); + + for (i, block) in blocks.into_iter().rev().enumerate() { + let id = rig.expect_parent_request(); + // the block + bl.parent_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx); + // the stream termination + bl.parent_lookup_response(id, peer_id, None, D, &mut cx); + // the processing request + rig.expect_block_process(); + // the processing result + if i + 2 == depth { + // one block was removed + bl.parent_block_processed(chain_hash, BlockError::BlockIsAlreadyKnown.into(), &mut cx) + } else { + bl.parent_block_processed(chain_hash, BlockError::ParentUnknown(block).into(), &mut cx) + } + parent_lookups_consistency(&bl) + } + + // Processing succeeds, now the rest of the chain should be sent for processing. + rig.expect_parent_chain_process(); + + // Try to get this block again while the chain is being processed. We should not request it again. + let peer_id = PeerId::random(); + bl.search_parent(chain_hash, trigger_block, peer_id, &mut cx); + parent_lookups_consistency(&bl); + + let process_result = BatchProcessResult::Success { + was_non_empty: true, + }; + bl.parent_chain_processed(chain_hash, process_result, &mut cx); + assert_eq!(bl.parent_lookups.len(), 0); }