diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 3bb4ece4ae..1c8c9ab9c7 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -11,6 +11,7 @@ use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; use store::Hash256; +use strum::Display; use types::blob_sidecar::FixedBlobSidecarList; use types::{BlobSidecar, SignedBeaconBlock, Slot}; @@ -87,10 +88,24 @@ pub enum ResponseType { Blob, } -#[derive(Debug, Copy, Clone)] -pub enum PeerShouldHave { - BlockAndBlobs, - Neither, +#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Display)] +pub enum PeerSource { + Attestation(PeerId), + Gossip(PeerId), +} +impl PeerSource { + fn as_peer_id(&self) -> &PeerId { + match self { + PeerSource::Attestation(id) => id, + PeerSource::Gossip(id) => id, + } + } + fn to_peer_id(self) -> PeerId { + match self { + PeerSource::Attestation(id) => id, + PeerSource::Gossip(id) => id, + } + } } #[derive(Debug, Copy, Clone)] @@ -121,11 +136,10 @@ impl BlockLookups { pub fn search_block( &mut self, hash: Hash256, - peer_id: PeerId, - peer_usefulness: PeerShouldHave, + peer_source: PeerSource, cx: &mut SyncNetworkContext, ) { - self.search_block_with(|_| {}, hash, peer_id, peer_usefulness, cx) + self.search_block_with(|_| {}, hash, peer_source, cx) } /// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is @@ -134,8 +148,7 @@ impl BlockLookups { &mut self, cache_fn: impl Fn(&mut SingleBlockLookup), hash: Hash256, - peer_id: PeerId, - peer_usefulness: PeerShouldHave, + peer_source: PeerSource, cx: &mut SyncNetworkContext, ) { // Do not re-request a block that is already being requested @@ -143,15 +156,14 @@ impl BlockLookups { .single_block_lookups .iter_mut() .any(|(_, _, single_block_request)| { - single_block_request.add_peer_if_useful(&hash, &peer_id, peer_usefulness) + single_block_request.add_peer_if_useful(&hash, peer_source) }) { return; } if self.parent_lookups.iter_mut().any(|parent_req| { - parent_req.add_peer_if_useful(&hash, &peer_id, peer_usefulness) - || parent_req.contains_block(&hash) + parent_req.add_peer_if_useful(&hash, peer_source) || parent_req.contains_block(&hash) }) { // If the block was already downloaded, or is being downloaded in this moment, do not // request it. @@ -170,12 +182,12 @@ impl BlockLookups { debug!( self.log, "Searching for block"; - "peer_id" => %peer_id, + "peer_id" => %peer_source, "block" => %hash ); let mut single_block_request = - SingleBlockLookup::new(hash, peer_id, self.da_checker.clone()); + SingleBlockLookup::new(hash, peer_source, self.da_checker.clone()); cache_fn(&mut single_block_request); let block_request_id = @@ -213,8 +225,7 @@ impl BlockLookups { let _ = request.add_block_wrapper(block_root, block.clone()); }, block_root, - peer_id, - PeerShouldHave::Neither, + PeerSource::Gossip(peer_id), cx, ); } @@ -231,8 +242,7 @@ impl BlockLookups { let _ = request.add_blob(blob.clone()); }, block_root, - peer_id, - PeerShouldHave::Neither, + PeerSource::Gossip(peer_id), cx, ); } @@ -248,7 +258,7 @@ impl BlockLookups { cx: &mut SyncNetworkContext, ) { // Gossip blocks or blobs shouldn't be propogated if parents are unavailable. - let peer_usefulness = PeerShouldHave::BlockAndBlobs; + let peer_source = PeerSource::Attestation(peer_id); // If this block or it's parent is part of a known failed chain, ignore it. if self.failed_chains.contains(&parent_root) || self.failed_chains.contains(&block_root) { @@ -261,7 +271,7 @@ impl BlockLookups { // being searched for. if self.parent_lookups.iter_mut().any(|parent_req| { parent_req.contains_block(&block_root) - || parent_req.add_peer_if_useful(&block_root, &peer_id, peer_usefulness) + || parent_req.add_peer_if_useful(&block_root, peer_source) }) { // we are already searching for this block, ignore it return; @@ -276,8 +286,12 @@ impl BlockLookups { return; } - let parent_lookup = - ParentLookup::new(block_root, parent_root, peer_id, self.da_checker.clone()); + let parent_lookup = ParentLookup::new( + block_root, + parent_root, + peer_source, + self.da_checker.clone(), + ); self.request_parent_block_and_blobs(parent_lookup, cx); } @@ -518,8 +532,14 @@ impl BlockLookups { } } Ok(LookupDownloadStatus::SearchBlock(block_root)) => { - self.search_block(block_root, peer_id, PeerShouldHave::BlockAndBlobs, cx); - self.parent_lookups.push(parent_lookup) + if let Some(peer_source) = + parent_lookup.peer_source(ResponseType::Block, peer_id) + { + self.search_block(block_root, peer_source, cx); + self.parent_lookups.push(parent_lookup) + } else { + warn!(self.log, "Response from untracked peer"; "peer_id" => %peer_id, "block_root" => ?block_root); + } } Err(e) => {} } @@ -614,8 +634,14 @@ impl BlockLookups { } } LookupDownloadStatus::SearchBlock(block_root) => { - self.search_block(block_root, peer_id, PeerShouldHave::BlockAndBlobs, cx); - self.parent_lookups.push(parent_lookup) + if let Some(peer_source) = + parent_lookup.peer_source(ResponseType::Block, peer_id) + { + self.search_block(block_root, peer_source, cx); + self.parent_lookups.push(parent_lookup) + } else { + warn!(self.log, "Response from untracked peer"; "peer_id" => %peer_id, "block_root" => ?block_root); + } } } } @@ -834,8 +860,7 @@ impl BlockLookups { ShouldRemoveLookup::True } AvailabilityProcessingStatus::MissingComponents(_, block_root) => { - // At this point we don't know what the peer *should* have. - self.search_block(block_root, peer_id, PeerShouldHave::Neither, cx); + self.search_block(block_root, peer_id, cx); ShouldRemoveLookup::False } }, @@ -862,7 +887,13 @@ impl BlockLookups { ShouldRemoveLookup::True } BlockError::ParentUnknown(block) => { - self.search_parent(block.slot(), root, block.parent_root(), peer_id, cx); + self.search_parent( + block.slot(), + root, + block.parent_root(), + peer_id.to_peer_id(), + cx, + ); ShouldRemoveLookup::False } ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { @@ -879,7 +910,7 @@ impl BlockLookups { other => { warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id); cx.report_peer( - peer_id, + peer_id.to_peer_id(), PeerAction::MidToleranceError, "single_block_failure", ); @@ -888,7 +919,7 @@ impl BlockLookups { request_id_ref, request_ref, response_type, - &peer_id, + peer_id.as_peer_id(), cx, &self.log, ) @@ -961,7 +992,7 @@ impl BlockLookups { _, block_root, )) => { - self.search_block(block_root, peer_id, PeerShouldHave::BlockAndBlobs, cx); + self.search_block(block_root, peer_id, cx); } BlockProcessingResult::Err(BlockError::ParentUnknown(block)) => { parent_lookup.add_block_wrapper(block); @@ -1024,7 +1055,11 @@ impl BlockLookups { // This currently can be a host of errors. We permit this due to the partial // ambiguity. - cx.report_peer(peer_id, PeerAction::MidToleranceError, "parent_request_err"); + cx.report_peer( + peer_id.to_peer_id(), + PeerAction::MidToleranceError, + "parent_request_err", + ); // Try again if possible match response_type { @@ -1115,8 +1150,8 @@ impl BlockLookups { self.failed_chains.insert(chain_hash); let mut all_peers = request.block_request_state.used_peers.clone(); all_peers.extend(request.blob_request_state.used_peers); - for peer_id in all_peers { - cx.report_peer(peer_id, penalty, "parent_chain_failure") + for peer_source in all_peers { + cx.report_peer(peer_source, penalty, "parent_chain_failure") } } BatchProcessResult::NonFaultyFailure => { diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 657982b17a..b65f36e588 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -1,5 +1,5 @@ use super::single_block_lookup::{LookupRequestError, LookupVerifyError, SingleBlockLookup}; -use super::{DownloadedBlocks, PeerShouldHave, ResponseType}; +use super::{DownloadedBlocks, PeerSource, ResponseType}; use crate::sync::block_lookups::{single_block_lookup, RootBlobsTuple, RootBlockTuple}; use crate::sync::{ manager::{Id, SLOT_IMPORT_TOLERANCE}, @@ -76,7 +76,7 @@ impl ParentLookup { pub fn new( block_root: Hash256, parent_root: Hash256, - peer_id: PeerId, + peer_id: PeerSource, da_checker: Arc>, ) -> Self { let current_parent_request = SingleBlockLookup::new(parent_root, peer_id, da_checker); @@ -321,14 +321,9 @@ impl ParentLookup { .failed_attempts() } - pub fn add_peer_if_useful( - &mut self, - block_root: &Hash256, - peer_id: &PeerId, - peer_usefulness: PeerShouldHave, - ) -> bool { + pub fn add_peer_if_useful(&mut self, block_root: &Hash256, peer_source: PeerSource) -> bool { self.current_parent_request - .add_peer_if_useful(block_root, peer_id, peer_usefulness) + .add_peer_if_useful(block_root, peer_source) } pub fn used_peers(&self, response_type: ResponseType) -> impl Iterator + '_ { @@ -345,6 +340,15 @@ impl ParentLookup { .iter(), } } + + pub(crate) fn peer_source( + &self, + response_type: ResponseType, + peer_id: PeerId, + ) -> Option { + self.current_parent_request + .peer_source(response_type, peer_id) + } } impl From for ParentVerifyError { diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index ec47faa372..60f38f7ee4 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -14,7 +14,7 @@ use strum::IntoStaticStr; use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList}; use types::{BlobSidecar, SignedBeaconBlock}; -use super::{PeerShouldHave, ResponseType}; +use super::{PeerSource, ResponseType}; pub struct SingleBlockLookup { pub requested_block_root: Hash256, @@ -33,21 +33,23 @@ pub struct SingleBlockLookup { pub struct SingleLookupRequestState { /// State of this request. pub state: State, - /// Peers that should have this block. + /// Peers that should have this block or blob. pub available_peers: HashSet, + /// Peers that mar or may not have this block or blob. + pub potential_peers: HashSet, /// Peers from which we have requested this block. pub used_peers: HashSet, - /// How many times have we attempted to process this block. + /// How many times have we attempted to process this block or blob. failed_processing: u8, - /// How many times have we attempted to download this block. + /// How many times have we attempted to download this block or blob. failed_downloading: u8, } #[derive(Debug, PartialEq, Eq)] pub enum State { AwaitingDownload, - Downloading { peer_id: PeerId }, - Processing { peer_id: PeerId }, + Downloading { peer_id: PeerSource }, + Processing { peer_id: PeerSource }, } #[derive(Debug, PartialEq, Eq, IntoStaticStr)] @@ -74,7 +76,7 @@ pub enum LookupRequestError { impl SingleBlockLookup { pub fn new( requested_block_root: Hash256, - peer_id: PeerId, + peer_source: PeerSource, da_checker: Arc>, ) -> Self { Self { @@ -82,8 +84,8 @@ impl SingleBlockLookup::default(), downloaded_block: None, downloaded_blobs: <_>::default(), - block_request_state: SingleLookupRequestState::new(peer_id), - blob_request_state: SingleLookupRequestState::new(peer_id), + block_request_state: SingleLookupRequestState::new(peer_source), + blob_request_state: SingleLookupRequestState::new(peer_source), da_checker, } } @@ -277,7 +279,6 @@ impl SingleBlockLookup match blob { Some(_) => { - dbg!("here"); // We sent the blob for processing and received an extra blob. self.blob_request_state.register_failure_downloading(); Err(LookupVerifyError::ExtraBlobsReturned) @@ -317,8 +318,26 @@ impl SingleBlockLookup SingleBlockLookup bool { + + pub fn add_peer_if_useful(&mut self, block_root: &Hash256, peer_source: PeerSource) -> bool { if *block_root != self.requested_block_root { return false; } - match peer_usefulness { - PeerShouldHave::BlockAndBlobs => { - self.block_request_state.add_peer(peer_id); - self.blob_request_state.add_peer(peer_id); + match peer_source { + PeerSource::Attestation(peer_id) => { + self.block_request_state.add_peer(&peer_id); + self.blob_request_state.add_peer(&peer_id); + } + PeerSource::Gossip(peer_id) => { + self.block_request_state.add_potential_peer(&peer_id); + self.blob_request_state.add_potential_peer(&peer_id); } - PeerShouldHave::Neither => {} } true } - pub fn processing_peer(&self, response_type: ResponseType) -> Result { + pub fn processing_peer(&self, response_type: ResponseType) -> Result { match response_type { ResponseType::Block => self.block_request_state.processing_peer(), ResponseType::Blob => self.blob_request_state.processing_peer(), } } + + pub(crate) fn peer_source( + &self, + response_type: ResponseType, + peer_id: PeerId, + ) -> Option { + match response_type { + ResponseType::Block => { + if self.block_request_state.available_peers.contains(&peer_id) { + Some(PeerSource::Attestation(peer_id)) + } else if self.block_request_state.potential_peers.contains(&peer_id) { + Some(PeerSource::Gossip(peer_id)) + } else { + None + } + } + ResponseType::Blob => { + if self.blob_request_state.available_peers.contains(&peer_id) { + Some(PeerSource::Attestation(peer_id)) + } else if self.blob_request_state.potential_peers.contains(&peer_id) { + Some(PeerSource::Gossip(peer_id)) + } else { + None + } + } + } + } } impl SingleLookupRequestState { - pub fn new(peer_id: PeerId) -> Self { + pub fn new(peer_source: PeerSource) -> Self { + let (available_peers, potential_peers) = match peer_source { + PeerSource::Attestation(peer_id) => (HashSet::from([peer_id]), HashSet::default()), + PeerSource::Gossip(peer_id) => (HashSet::default(), HashSet::from([peer_id])), + }; Self { state: State::AwaitingDownload, - available_peers: HashSet::from([peer_id]), + available_peers, + potential_peers, used_peers: HashSet::default(), failed_processing: 0, failed_downloading: 0, @@ -417,15 +486,23 @@ impl SingleLookupRequestState { self.failed_processing + self.failed_downloading } - pub fn add_peer(&mut self, peer_id: &PeerId) -> bool { - self.available_peers.insert(*peer_id) + pub fn add_peer(&mut self, peer_id: &PeerId) { + self.potential_peers.remove(peer_id); + self.available_peers.insert(*peer_id); + } + + pub fn add_potential_peer(&mut self, peer_id: &PeerId) { + if self.available_peers.contains(peer_id) { + self.potential_peers.insert(*peer_id); + } } /// If a peer disconnects, this request could be failed. If so, an error is returned pub fn check_peer_disconnected(&mut self, dc_peer_id: &PeerId) -> Result<(), ()> { self.available_peers.remove(dc_peer_id); + self.potential_peers.remove(dc_peer_id); if let State::Downloading { peer_id } = &self.state { - if peer_id == dc_peer_id { + if peer_id.as_peer_id() == dc_peer_id { // Peer disconnected before providing a block self.register_failure_downloading(); return Err(()); @@ -434,7 +511,7 @@ impl SingleLookupRequestState { Ok(()) } - pub fn processing_peer(&self) -> Result { + pub fn processing_peer(&self) -> Result { if let State::Processing { peer_id } = &self.state { Ok(*peer_id) } else { diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index fc129f2afe..d8743c97be 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -101,7 +101,13 @@ impl TestRig { // get random number between 0 and Max Blobs let mut payload: &mut FullPayloadDeneb = &mut message.body.execution_payload; let num_blobs = match num_blobs { - NumBlobs::Random => rand::random::() % E::max_blobs_per_block(), + NumBlobs::Random => { + let mut num_blobs = rand::random::() % E::max_blobs_per_block(); + if num_blobs == 0 { + num_blobs += 1; + } + num_blobs + } NumBlobs::None => 0, }; let (bundle, transactions) = execution_layer::test_utils::generate_random_blobs::( @@ -851,7 +857,6 @@ fn test_parent_lookup_too_deep() { // If we're in deneb, a blob request should have been triggered as well, // we don't require a response because we're generateing 0-blob blocks in this test. if matches!(fork_name, ForkName::Deneb) { - dbg!("here"); let _ = rig.expect_parent_request(ResponseType::Blob); } // the block @@ -1113,7 +1118,7 @@ mod deneb_only { } #[test] - fn test_single_block_lookup_happy_path() { + fn test_single_block_and_blob_lookup_happy_path() { let fork_name = get_fork_name(); if !matches!(fork_name, ForkName::Deneb) { return; @@ -1178,672 +1183,883 @@ mod deneb_only { assert_eq!(bl.single_block_lookups.len(), 0); } - // #[test] - // fn test_single_block_lookup_empty_response() { - // let response_type = ResponseType::Block; - // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - // - // let block_hash = Hash256::random(); - // let peer_id = PeerId::random(); - // - // // Trigger the request - // bl.search_block(block_hash, peer_id, PeerShouldHave::BlockAndBlobs, &mut cx); - // let id = rig.expect_block_request(response_type); - // - // // The peer does not have the block. It should be penalized. - // bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); - // rig.expect_penalty(); - // - // rig.expect_block_request(response_type); // it should be retried - // } - // - // #[test] - // fn test_single_block_lookup_wrong_response() { - // let fork_name = ForkName::Deneb; - // let response_type = ResponseType::Block; - // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - // - // let block_hash = Hash256::random(); - // let peer_id = PeerId::random(); - // - // // Trigger the request - // bl.search_block(block_hash, peer_id, PeerShouldHave::BlockAndBlobs, &mut cx); - // let id = rig.expect_block_request(response_type); - // - // // Peer sends something else. It should be penalized. - // let bad_block = rig.rand_block(fork_name); - // bl.single_block_lookup_response(id, peer_id, Some(bad_block.into()), D, &mut cx); - // rig.expect_penalty(); - // rig.expect_block_request(response_type); // should be retried - // - // // Send the stream termination. This should not produce an additional penalty. - // bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); - // rig.expect_empty_network(); - // } - // - // #[test] - // fn test_single_block_lookup_failure() { - // let response_type = ResponseType::Block; - // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - // - // let block_hash = Hash256::random(); - // let peer_id = PeerId::random(); - // - // // Trigger the request - // bl.search_block(block_hash, peer_id, PeerShouldHave::BlockAndBlobs, &mut cx); - // let id = rig.expect_block_request(response_type); - // - // // The request fails. RPC failures are handled elsewhere so we should not penalize the peer. - // bl.single_block_lookup_failed(id, &peer_id, &mut cx, RPCError::UnsupportedProtocol); - // rig.expect_block_request(response_type); - // rig.expect_empty_network(); - // } - // - // #[test] - // fn test_single_block_lookup_becomes_parent_request() { - // let fork_name = ForkName::Deneb; - // let response_type = ResponseType::Block; - // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - // - // let block = Arc::new(rig.rand_block(fork_name)); - // let peer_id = PeerId::random(); - // - // // Trigger the request - // bl.search_block( - // block.canonical_root(), - // peer_id, - // PeerShouldHave::BlockAndBlobs, - // &mut cx, - // ); - // let id = rig.expect_block_request(response_type); - // - // // The peer provides the correct block, should not be penalized. Now the block should be sent - // // for processing. - // bl.single_block_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx); - // rig.expect_empty_network(); - // rig.expect_block_process(response_type); - // - // // The request should still be active. - // assert_eq!(bl.single_block_lookups.len(), 1); - // - // // Send the stream termination. Peer should have not been penalized, and the request moved to a - // // parent request after processing. - // bl.single_block_processed( - // id, - // BlockError::ParentUnknown(block.into()).into(), - // response_type, - // &mut cx, - // ); - // assert_eq!(bl.single_block_lookups.len(), 1); - // rig.expect_parent_request(response_type); - // rig.expect_empty_network(); - // assert_eq!(bl.parent_lookups.len(), 1); - // } - // - // #[test] - // fn test_parent_lookup_happy_path() { - // let fork_name = ForkName::Deneb; - // let response_type = ResponseType::Block; - // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - // - // let parent = rig.rand_block(fork_name); - // let block = rig.block_with_parent(parent.canonical_root(), fork_name); - // let chain_hash = block.canonical_root(); - // let peer_id = PeerId::random(); - // let block_root = block.canonical_root(); - // let parent_root = block.parent_root(); - // let slot = block.slot(); - // - // // Trigger the request - // bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); - // let id = rig.expect_parent_request(response_type); - // - // // Peer sends the right block, it should be sent for processing. Peer should not be penalized. - // bl.parent_lookup_response(id, peer_id, Some(parent.into()), D, &mut cx); - // rig.expect_block_process(response_type); - // rig.expect_empty_network(); - // - // // Processing succeeds, now the rest of the chain should be sent for processing. - // bl.parent_block_processed( - // chain_hash, - // BlockError::BlockIsAlreadyKnown.into(), - // response_type, - // &mut cx, - // ); - // rig.expect_parent_chain_process(); - // let process_result = BatchProcessResult::Success { - // was_non_empty: true, - // }; - // bl.parent_chain_processed(chain_hash, process_result, &mut cx); - // assert_eq!(bl.parent_lookups.len(), 0); - // } - // - // #[test] - // fn test_parent_lookup_wrong_response() { - // let fork_name = ForkName::Deneb; - // let response_type = ResponseType::Block; - // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - // - // let parent = rig.rand_block(fork_name); - // let block = rig.block_with_parent(parent.canonical_root(), fork_name); - // let chain_hash = block.canonical_root(); - // let peer_id = PeerId::random(); - // let block_root = block.canonical_root(); - // let parent_root = block.parent_root(); - // let slot = block.slot(); - // - // // Trigger the request - // bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); - // let id1 = rig.expect_parent_request(response_type); - // - // // Peer sends the wrong block, peer should be penalized and the block re-requested. - // let bad_block = rig.rand_block(fork_name); - // bl.parent_lookup_response(id1, peer_id, Some(bad_block.into()), D, &mut cx); - // rig.expect_penalty(); - // let id2 = rig.expect_parent_request(response_type); - // - // // Send the stream termination for the first request. This should not produce extra penalties. - // bl.parent_lookup_response(id1, peer_id, None, D, &mut cx); - // rig.expect_empty_network(); - // - // // Send the right block this time. - // bl.parent_lookup_response(id2, peer_id, Some(parent.into()), D, &mut cx); - // rig.expect_block_process(response_type); - // - // // Processing succeeds, now the rest of the chain should be sent for processing. - // bl.parent_block_processed( - // chain_hash, - // BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), - // response_type, - // &mut cx, - // ); - // rig.expect_parent_chain_process(); - // let process_result = BatchProcessResult::Success { - // was_non_empty: true, - // }; - // bl.parent_chain_processed(chain_hash, process_result, &mut cx); - // assert_eq!(bl.parent_lookups.len(), 0); - // } - // - // #[test] - // fn test_parent_lookup_empty_response() { - // let fork_name = ForkName::Deneb; - // let response_type = ResponseType::Block; - // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - // - // let parent = rig.rand_block(fork_name); - // let block = rig.block_with_parent(parent.canonical_root(), fork_name); - // let chain_hash = block.canonical_root(); - // let peer_id = PeerId::random(); - // let block_root = block.canonical_root(); - // let parent_root = block.parent_root(); - // let slot = block.slot(); - // - // // Trigger the request - // bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); - // let id1 = rig.expect_parent_request(response_type); - // - // // Peer sends an empty response, peer should be penalized and the block re-requested. - // bl.parent_lookup_response(id1, peer_id, None, D, &mut cx); - // rig.expect_penalty(); - // let id2 = rig.expect_parent_request(response_type); - // - // // Send the right block this time. - // bl.parent_lookup_response(id2, peer_id, Some(parent.into()), D, &mut cx); - // rig.expect_block_process(response_type); - // - // // Processing succeeds, now the rest of the chain should be sent for processing. - // bl.parent_block_processed( - // chain_hash, - // BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), - // response_type, - // &mut cx, - // ); - // rig.expect_parent_chain_process(); - // let process_result = BatchProcessResult::Success { - // was_non_empty: true, - // }; - // bl.parent_chain_processed(chain_hash, process_result, &mut cx); - // assert_eq!(bl.parent_lookups.len(), 0); - // } - // - // #[test] - // fn test_parent_lookup_rpc_failure() { - // let fork_name = ForkName::Deneb; - // let response_type = ResponseType::Block; - // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - // - // let parent = rig.rand_block(fork_name); - // let block = rig.block_with_parent(parent.canonical_root(), fork_name); - // let chain_hash = block.canonical_root(); - // let peer_id = PeerId::random(); - // let block_root = block.canonical_root(); - // let parent_root = block.parent_root(); - // let slot = block.slot(); - // - // // Trigger the request - // bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); - // let id1 = rig.expect_parent_request(response_type); - // - // // The request fails. It should be tried again. - // bl.parent_lookup_failed( - // id1, - // peer_id, - // &mut cx, - // RPCError::ErrorResponse( - // RPCResponseErrorCode::ResourceUnavailable, - // "older than deneb".into(), - // ), - // ); - // let id2 = rig.expect_parent_request(response_type); - // - // // Send the right block this time. - // bl.parent_lookup_response(id2, peer_id, Some(parent.into()), D, &mut cx); - // rig.expect_block_process(response_type); - // - // // Processing succeeds, now the rest of the chain should be sent for processing. - // bl.parent_block_processed( - // chain_hash, - // BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), - // response_type, - // &mut cx, - // ); - // rig.expect_parent_chain_process(); - // let process_result = BatchProcessResult::Success { - // was_non_empty: true, - // }; - // bl.parent_chain_processed(chain_hash, process_result, &mut cx); - // assert_eq!(bl.parent_lookups.len(), 0); - // } - // - // #[test] - // fn test_parent_lookup_too_many_attempts() { - // let fork_name = ForkName::Deneb; - // let response_type = ResponseType::Block; - // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - // - // let parent = rig.rand_block(fork_name); - // let block = rig.block_with_parent(parent.canonical_root(), fork_name); - // let peer_id = PeerId::random(); - // let block_root = block.canonical_root(); - // let parent_root = block.parent_root(); - // let slot = block.slot(); - // - // // Trigger the request - // bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); - // for i in 1..=parent_lookup::PARENT_FAIL_TOLERANCE { - // let id = rig.expect_parent_request(response_type); - // match i % 2 { - // // make sure every error is accounted for - // 0 => { - // // The request fails. It should be tried again. - // bl.parent_lookup_failed( - // id, - // peer_id, - // &mut cx, - // RPCError::ErrorResponse( - // RPCResponseErrorCode::ResourceUnavailable, - // "older than deneb".into(), - // ), - // ); - // } - // _ => { - // // Send a bad block this time. It should be tried again. - // let bad_block = rig.rand_block(fork_name); - // bl.parent_lookup_response(id, peer_id, Some(bad_block.into()), D, &mut cx); - // // Send the stream termination - // bl.parent_lookup_response(id, peer_id, None, D, &mut cx); - // rig.expect_penalty(); - // } - // } - // if i < parent_lookup::PARENT_FAIL_TOLERANCE { - // assert_eq!(bl.parent_lookups[0].failed_block_attempts(), dbg!(i)); - // } - // } - // - // assert_eq!(bl.parent_lookups.len(), 0); - // } - // - // #[test] - // fn test_parent_lookup_too_many_download_attempts_no_blacklist() { - // let fork_name = ForkName::Deneb; - // let response_type = ResponseType::Block; - // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - // - // let parent = rig.rand_block(fork_name); - // let block = rig.block_with_parent(parent.canonical_root(), fork_name); - // let block_hash = block.canonical_root(); - // let peer_id = PeerId::random(); - // let block_root = block.canonical_root(); - // let parent_root = block.parent_root(); - // let slot = block.slot(); - // - // // Trigger the request - // bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); - // for i in 1..=parent_lookup::PARENT_FAIL_TOLERANCE { - // assert!(!bl.failed_chains.contains(&block_hash)); - // let id = rig.expect_parent_request(response_type); - // if i % 2 != 0 { - // // The request fails. It should be tried again. - // bl.parent_lookup_failed( - // id, - // peer_id, - // &mut cx, - // RPCError::ErrorResponse( - // RPCResponseErrorCode::ResourceUnavailable, - // "older than deneb".into(), - // ), - // ); - // } else { - // // Send a bad block this time. It should be tried again. - // let bad_block = rig.rand_block(fork_name); - // bl.parent_lookup_response(id, peer_id, Some(bad_block.into()), D, &mut cx); - // rig.expect_penalty(); - // } - // if i < parent_lookup::PARENT_FAIL_TOLERANCE { - // assert_eq!(bl.parent_lookups[0].failed_block_attempts(), dbg!(i)); - // } - // } - // - // assert_eq!(bl.parent_lookups.len(), 0); - // assert!(!bl.failed_chains.contains(&block_hash)); - // assert!(!bl.failed_chains.contains(&parent.canonical_root())); - // } - // - // #[test] - // fn test_parent_lookup_too_many_processing_attempts_must_blacklist() { - // let fork_name = ForkName::Deneb; - // let response_type = ResponseType::Block; - // const PROCESSING_FAILURES: u8 = parent_lookup::PARENT_FAIL_TOLERANCE / 2 + 1; - // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - // - // let parent = Arc::new(rig.rand_block(fork_name)); - // let block = rig.block_with_parent(parent.canonical_root(), fork_name); - // let peer_id = PeerId::random(); - // let block_root = block.canonical_root(); - // let parent_root = block.parent_root(); - // let slot = block.slot(); - // - // // Trigger the request - // bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); - // - // // Fail downloading the block - // for _ in 0..(parent_lookup::PARENT_FAIL_TOLERANCE - PROCESSING_FAILURES) { - // let id = rig.expect_parent_request(response_type); - // // The request fails. It should be tried again. - // bl.parent_lookup_failed( - // id, - // peer_id, - // &mut cx, - // RPCError::ErrorResponse( - // RPCResponseErrorCode::ResourceUnavailable, - // "older than deneb".into(), - // ), - // ); - // } - // - // // Now fail processing a block in the parent request - // for _ in 0..PROCESSING_FAILURES { - // let id = dbg!(rig.expect_parent_request(response_type)); - // assert!(!bl.failed_chains.contains(&block_root)); - // // send the right parent but fail processing - // bl.parent_lookup_response(id, peer_id, Some(parent.clone()), D, &mut cx); - // bl.parent_block_processed( - // block_root, - // BlockError::InvalidSignature.into(), - // response_type, - // &mut cx, - // ); - // bl.parent_lookup_response(id, peer_id, None, D, &mut cx); - // rig.expect_penalty(); - // } - // - // assert!(bl.failed_chains.contains(&block_root)); - // assert_eq!(bl.parent_lookups.len(), 0); - // } - // - // #[test] - // fn test_parent_lookup_too_deep() { - // let fork_name = ForkName::Deneb; - // let response_type = ResponseType::Block; - // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - // let mut blocks = - // Vec::>>::with_capacity(parent_lookup::PARENT_DEPTH_TOLERANCE); - // while blocks.len() < parent_lookup::PARENT_DEPTH_TOLERANCE { - // let parent = blocks - // .last() - // .map(|b| b.canonical_root()) - // .unwrap_or_else(Hash256::random); - // let block = Arc::new(rig.block_with_parent(parent, fork_name)); - // blocks.push(block); - // } - // - // let peer_id = PeerId::random(); - // let trigger_block = blocks.pop().unwrap(); - // let chain_hash = trigger_block.canonical_root(); - // let trigger_block_root = trigger_block.canonical_root(); - // let trigger_parent_root = trigger_block.parent_root(); - // let trigger_slot = trigger_block.slot(); - // bl.search_parent( - // trigger_slot, - // trigger_block_root, - // trigger_parent_root, - // peer_id, - // &mut cx, - // ); - // - // for block in blocks.into_iter().rev() { - // let id = rig.expect_parent_request(response_type); - // // the block - // bl.parent_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx); - // // the stream termination - // bl.parent_lookup_response(id, peer_id, None, D, &mut cx); - // // the processing request - // rig.expect_block_process(response_type); - // // the processing result - // bl.parent_block_processed( - // chain_hash, - // BlockError::ParentUnknown(block.into()).into(), - // response_type, - // &mut cx, - // ) - // } - // - // rig.expect_penalty(); - // assert!(bl.failed_chains.contains(&chain_hash)); - // } - // - // #[test] - // fn test_parent_lookup_disconnection() { - // let fork_name = ForkName::Deneb; - // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - // let peer_id = PeerId::random(); - // let trigger_block = rig.rand_block(fork_name); - // let trigger_block_root = trigger_block.canonical_root(); - // let trigger_parent_root = trigger_block.parent_root(); - // let trigger_slot = trigger_block.slot(); - // bl.search_parent( - // trigger_slot, - // trigger_block_root, - // trigger_parent_root, - // peer_id, - // &mut cx, - // ); - // - // bl.peer_disconnected(&peer_id, &mut cx); - // assert!(bl.parent_lookups.is_empty()); - // } - // - // #[test] - // fn test_single_block_lookup_ignored_response() { - // let fork_name = ForkName::Deneb; - // let response_type = ResponseType::Block; - // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - // - // let block = rig.rand_block(fork_name); - // let peer_id = PeerId::random(); - // - // // Trigger the request - // bl.search_block( - // block.canonical_root(), - // peer_id, - // PeerShouldHave::BlockAndBlobs, - // &mut cx, - // ); - // let id = rig.expect_block_request(response_type); - // - // // The peer provides the correct block, should not be penalized. Now the block should be sent - // // for processing. - // bl.single_block_lookup_response(id, peer_id, Some(block.into()), D, &mut cx); - // rig.expect_empty_network(); - // rig.expect_block_process(response_type); - // - // // The request should still be active. - // assert_eq!(bl.single_block_lookups.len(), 1); - // - // // Send the stream termination. Peer should have not been penalized, and the request removed - // // after processing. - // bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); - // // Send an Ignored response, the request should be dropped - // bl.single_block_processed(id, BlockProcessingResult::Ignored, response_type, &mut cx); - // rig.expect_empty_network(); - // assert_eq!(bl.single_block_lookups.len(), 0); - // } - // - // #[test] - // fn test_parent_lookup_ignored_response() { - // let fork_name = ForkName::Deneb; - // let response_type = ResponseType::Block; - // let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - // - // let parent = rig.rand_block(fork_name); - // let block = rig.block_with_parent(parent.canonical_root(), fork_name); - // let chain_hash = block.canonical_root(); - // let peer_id = PeerId::random(); - // let block_root = block.canonical_root(); - // let parent_root = block.parent_root(); - // let slot = block.slot(); - // - // // Trigger the request - // bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); - // let id = rig.expect_parent_request(response_type); - // - // // Peer sends the right block, it should be sent for processing. Peer should not be penalized. - // bl.parent_lookup_response(id, peer_id, Some(parent.into()), D, &mut cx); - // rig.expect_block_process(response_type); - // rig.expect_empty_network(); - // - // // Return an Ignored result. The request should be dropped - // bl.parent_block_processed( - // chain_hash, - // BlockProcessingResult::Ignored, - // response_type, - // &mut cx, - // ); - // rig.expect_empty_network(); - // assert_eq!(bl.parent_lookups.len(), 0); - // } - // - // /// This is a regression test. - // #[test] - // fn test_same_chain_race_condition() { - // let fork_name = ForkName::Deneb; - // let response_type = ResponseType::Block; - // let (mut bl, mut cx, mut rig) = TestRig::test_setup(true); - // - // #[track_caller] - // fn parent_lookups_consistency(bl: &BlockLookups) { - // let hashes: Vec<_> = bl - // .parent_lookups - // .iter() - // .map(|req| req.chain_hash()) - // .collect(); - // let expected = hashes.len(); - // assert_eq!( - // expected, - // hashes - // .into_iter() - // .collect::>() - // .len(), - // "duplicated chain hashes in parent queue" - // ) - // } - // // if we use one or two blocks it will match on the hash or the parent hash, so make a longer - // // chain. - // let depth = 4; - // let mut blocks = Vec::>>::with_capacity(depth); - // while blocks.len() < depth { - // let parent = blocks - // .last() - // .map(|b| b.canonical_root()) - // .unwrap_or_else(Hash256::random); - // let block = Arc::new(rig.block_with_parent(parent, fork_name)); - // blocks.push(block); - // } - // - // let peer_id = PeerId::random(); - // let trigger_block = blocks.pop().unwrap(); - // let chain_hash = trigger_block.canonical_root(); - // let trigger_block_root = trigger_block.canonical_root(); - // let trigger_parent_root = trigger_block.parent_root(); - // let trigger_slot = trigger_block.slot(); - // bl.search_parent( - // trigger_slot, - // trigger_block_root, - // trigger_parent_root, - // peer_id, - // &mut cx, - // ); - // - // for (i, block) in blocks.into_iter().rev().enumerate() { - // let id = rig.expect_parent_request(response_type); - // // the block - // bl.parent_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx); - // // the stream termination - // bl.parent_lookup_response(id, peer_id, None, D, &mut cx); - // // the processing request - // rig.expect_block_process(response_type); - // // the processing result - // if i + 2 == depth { - // // one block was removed - // bl.parent_block_processed( - // chain_hash, - // BlockError::BlockIsAlreadyKnown.into(), - // response_type, - // &mut cx, - // ) - // } else { - // bl.parent_block_processed( - // chain_hash, - // BlockError::ParentUnknown(block.into()).into(), - // response_type, - // &mut cx, - // ) - // } - // parent_lookups_consistency(&bl) - // } - // - // // Processing succeeds, now the rest of the chain should be sent for processing. - // rig.expect_parent_chain_process(); - // - // // Try to get this block again while the chain is being processed. We should not request it again. - // let peer_id = PeerId::random(); - // let trigger_block_root = trigger_block.canonical_root(); - // let trigger_parent_root = trigger_block.parent_root(); - // let trigger_slot = trigger_block.slot(); - // bl.search_parent( - // trigger_slot, - // trigger_block_root, - // trigger_parent_root, - // peer_id, - // &mut cx, - // ); - // parent_lookups_consistency(&bl); - // - // let process_result = BatchProcessResult::Success { - // was_non_empty: true, - // }; - // bl.parent_chain_processed(chain_hash, process_result, &mut cx); - // assert_eq!(bl.parent_lookups.len(), 0); - // } + #[test] + fn test_single_block_and_blob_lookup_empty_response() { + let response_type = ResponseType::Block; + let fork_name = get_fork_name(); + if !matches!(fork_name, ForkName::Deneb) { + return; + } + let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + + let block_hash = Hash256::random(); + let peer_id = PeerId::random(); + + // Trigger the request + bl.search_block(block_hash, peer_id, PeerShouldHave::BlockAndBlobs, &mut cx); + let id = rig.expect_block_request(response_type); + let blob_id = rig.expect_block_request(ResponseType::Blob); + + // The peer does not have the block. It should be penalized. + bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); + rig.expect_penalty(); + + rig.expect_block_request(response_type); // it should be retried + rig.expect_empty_network(); // there should be no blob retry + + bl.single_blob_lookup_response(blob_id, peer_id, None, D, &mut cx); + rig.expect_empty_network(); // there should be no penalty or retry, we don't know + // whether we should have blobs + } + + #[test] + fn test_single_blob_lookup_empty_response() { + let response_type = ResponseType::Block; + let fork_name = get_fork_name(); + if !matches!(fork_name, ForkName::Deneb) { + return; + } + let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + + let block_hash = Hash256::random(); + let peer_id = PeerId::random(); + + // Trigger the request + bl.search_block(block_hash, peer_id, PeerShouldHave::BlockAndBlobs, &mut cx); + let id = rig.expect_block_request(response_type); + let _ = rig.expect_block_request(ResponseType::Blob); + + // The peer does not have the block. It should be penalized. + bl.single_blob_lookup_response(id, peer_id, None, D, &mut cx); + rig.expect_empty_network(); // there should be no penalty or retry, we don't know + // whether we should have blobs + } + + #[test] + fn test_single_block_response_then_empty_blob_response() { + let fork_name = get_fork_name(); + if !matches!(fork_name, ForkName::Deneb) { + return; + } + let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + rig.harness + .chain + .slot_clock + .set_slot(E::slots_per_epoch() * rig.harness.spec.deneb_fork_epoch.unwrap().as_u64()); + + let (block, _) = rig.rand_block_and_blobs(fork_name, NumBlobs::Random); + let slot = block.slot(); + let peer_id = PeerId::random(); + let block_root = block.canonical_root(); + + // Trigger the request + bl.search_block(block_root, peer_id, PeerShouldHave::BlockAndBlobs, &mut cx); + let block_id = rig.expect_block_request(ResponseType::Block); + let blob_id = rig.expect_block_request(ResponseType::Blob); + + // The peer provides the correct block, should not be penalized. Now the block should be sent + // for processing. + bl.single_block_lookup_response(block_id, peer_id, Some(block.into()), D, &mut cx); + rig.expect_empty_network(); + rig.expect_block_process(ResponseType::Block); + + // The request should still be active. + assert_eq!(bl.single_block_lookups.len(), 1); + + // Send the stream termination. Peer should have not been penalized. + bl.single_block_lookup_response(block_id, peer_id, None, D, &mut cx); + // Missing blobs should be the request is not removed, the outstanding blobs request should + // mean we do not send a new request. + bl.single_block_processed( + block_id, + BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( + slot, block_root, + )), + ResponseType::Block, + &mut cx, + ); + rig.expect_empty_network(); + assert_eq!(bl.single_block_lookups.len(), 1); + + // The peer does not have the block. It should be penalized. + bl.single_blob_lookup_response(blob_id, peer_id, None, D, &mut cx); + rig.expect_penalty(); + + rig.expect_block_request(ResponseType::Blob); // it should be retried + rig.expect_empty_network(); + } + + #[test] + fn test_single_block_lookup_wrong_response() { + let response_type = ResponseType::Block; + let fork_name = get_fork_name(); + if !matches!(fork_name, ForkName::Deneb) { + return; + } + let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + + let fork_name = rig + .harness + .spec + .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); + let block_hash = Hash256::random(); + let peer_id = PeerId::random(); + + // Trigger the request + bl.search_block(block_hash, peer_id, PeerShouldHave::BlockAndBlobs, &mut cx); + let id = rig.expect_block_request(response_type); + let blob_id = rig.expect_block_request(ResponseType::Blob); + + // Peer sends something else. It should be penalized. + let bad_block = rig.rand_block(fork_name); + bl.single_block_lookup_response(id, peer_id, Some(bad_block.into()), D, &mut cx); + rig.expect_penalty(); + rig.expect_block_request(response_type); // should be retried + + // Send the stream termination. This should not produce an additional penalty. + bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); + rig.expect_empty_network(); + } + + #[test] + fn test_single_block_lookup_failure() { + let response_type = ResponseType::Block; + let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + + let fork_name = rig + .harness + .spec + .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); + let block_hash = Hash256::random(); + let peer_id = PeerId::random(); + + // Trigger the request + bl.search_block(block_hash, peer_id, PeerShouldHave::BlockAndBlobs, &mut cx); + let id = rig.expect_block_request(response_type); + // If we're in deneb, a blob request should have been triggered as well, + // we don't require a response because we're generateing 0-blob blocks in this test. + if matches!(fork_name, ForkName::Deneb) { + let _ = rig.expect_block_request(ResponseType::Blob); + } + + // The request fails. RPC failures are handled elsewhere so we should not penalize the peer. + bl.single_block_lookup_failed(id, &peer_id, &mut cx, RPCError::UnsupportedProtocol); + rig.expect_block_request(response_type); + rig.expect_empty_network(); + } + + #[test] + fn test_single_block_lookup_becomes_parent_request() { + let response_type = ResponseType::Block; + let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + + let fork_name = rig + .harness + .spec + .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); + let block = Arc::new(rig.rand_block(fork_name)); + let peer_id = PeerId::random(); + + // Trigger the request + bl.search_block( + block.canonical_root(), + peer_id, + PeerShouldHave::BlockAndBlobs, + &mut cx, + ); + let id = rig.expect_block_request(response_type); + // If we're in deneb, a blob request should have been triggered as well, + // we don't require a response because we're generateing 0-blob blocks in this test. + if matches!(fork_name, ForkName::Deneb) { + let _ = rig.expect_block_request(ResponseType::Blob); + } + + // The peer provides the correct block, should not be penalized. Now the block should be sent + // for processing. + bl.single_block_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx); + rig.expect_empty_network(); + rig.expect_block_process(response_type); + + // The request should still be active. + assert_eq!(bl.single_block_lookups.len(), 1); + + // Send the stream termination. Peer should have not been penalized, and the request moved to a + // parent request after processing. + bl.single_block_processed( + id, + BlockError::ParentUnknown(block.into()).into(), + response_type, + &mut cx, + ); + assert_eq!(bl.single_block_lookups.len(), 1); + rig.expect_parent_request(response_type); + // If we're in deneb, a blob request should have been triggered as well, + // we don't require a response because we're generateing 0-blob blocks in this test. + if matches!(fork_name, ForkName::Deneb) { + let _ = rig.expect_parent_request(ResponseType::Blob); + } + rig.expect_empty_network(); + assert_eq!(bl.parent_lookups.len(), 1); + } + + #[test] + fn test_parent_lookup_happy_path() { + let response_type = ResponseType::Block; + let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + + let fork_name = rig + .harness + .spec + .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); + let parent = rig.rand_block(fork_name); + let block = rig.block_with_parent(parent.canonical_root(), fork_name); + let chain_hash = block.canonical_root(); + let peer_id = PeerId::random(); + let block_root = block.canonical_root(); + let parent_root = block.parent_root(); + let slot = block.slot(); + + // Trigger the request + bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); + let id = rig.expect_parent_request(response_type); + // If we're in deneb, a blob request should have been triggered as well, + // we don't require a response because we're generateing 0-blob blocks in this test. + if matches!(fork_name, ForkName::Deneb) { + let _ = rig.expect_parent_request(ResponseType::Blob); + } + + // Peer sends the right block, it should be sent for processing. Peer should not be penalized. + bl.parent_lookup_response(id, peer_id, Some(parent.into()), D, &mut cx); + rig.expect_block_process(response_type); + rig.expect_empty_network(); + + // Processing succeeds, now the rest of the chain should be sent for processing. + bl.parent_block_processed( + chain_hash, + BlockError::BlockIsAlreadyKnown.into(), + response_type, + &mut cx, + ); + rig.expect_parent_chain_process(); + let process_result = BatchProcessResult::Success { + was_non_empty: true, + }; + bl.parent_chain_processed(chain_hash, process_result, &mut cx); + assert_eq!(bl.parent_lookups.len(), 0); + } + + #[test] + fn test_parent_lookup_wrong_response() { + let response_type = ResponseType::Block; + let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + + let fork_name = rig + .harness + .spec + .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); + let parent = rig.rand_block(fork_name); + let block = rig.block_with_parent(parent.canonical_root(), fork_name); + let chain_hash = block.canonical_root(); + let peer_id = PeerId::random(); + let block_root = block.canonical_root(); + let parent_root = block.parent_root(); + let slot = block.slot(); + + // Trigger the request + bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); + let id1 = rig.expect_parent_request(response_type); + // If we're in deneb, a blob request should have been triggered as well, + // we don't require a response because we're generateing 0-blob blocks in this test. + if matches!(fork_name, ForkName::Deneb) { + let _ = rig.expect_parent_request(ResponseType::Blob); + } + + // Peer sends the wrong block, peer should be penalized and the block re-requested. + let bad_block = rig.rand_block(fork_name); + bl.parent_lookup_response(id1, peer_id, Some(bad_block.into()), D, &mut cx); + rig.expect_penalty(); + let id2 = rig.expect_parent_request(response_type); + + // Send the stream termination for the first request. This should not produce extra penalties. + bl.parent_lookup_response(id1, peer_id, None, D, &mut cx); + rig.expect_empty_network(); + + // Send the right block this time. + bl.parent_lookup_response(id2, peer_id, Some(parent.into()), D, &mut cx); + rig.expect_block_process(response_type); + + // Processing succeeds, now the rest of the chain should be sent for processing. + bl.parent_block_processed( + chain_hash, + BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), + response_type, + &mut cx, + ); + rig.expect_parent_chain_process(); + let process_result = BatchProcessResult::Success { + was_non_empty: true, + }; + bl.parent_chain_processed(chain_hash, process_result, &mut cx); + assert_eq!(bl.parent_lookups.len(), 0); + } + + #[test] + fn test_parent_lookup_empty_response() { + let response_type = ResponseType::Block; + let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + + let fork_name = rig + .harness + .spec + .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); + let parent = rig.rand_block(fork_name); + let block = rig.block_with_parent(parent.canonical_root(), fork_name); + let chain_hash = block.canonical_root(); + let peer_id = PeerId::random(); + let block_root = block.canonical_root(); + let parent_root = block.parent_root(); + let slot = block.slot(); + + // Trigger the request + bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); + let id1 = rig.expect_parent_request(response_type); + // If we're in deneb, a blob request should have been triggered as well, + // we don't require a response because we're generateing 0-blob blocks in this test. + if matches!(fork_name, ForkName::Deneb) { + let _ = rig.expect_parent_request(ResponseType::Blob); + } + + // Peer sends an empty response, peer should be penalized and the block re-requested. + bl.parent_lookup_response(id1, peer_id, None, D, &mut cx); + rig.expect_penalty(); + let id2 = rig.expect_parent_request(response_type); + + // Send the right block this time. + bl.parent_lookup_response(id2, peer_id, Some(parent.into()), D, &mut cx); + rig.expect_block_process(response_type); + + // Processing succeeds, now the rest of the chain should be sent for processing. + bl.parent_block_processed( + chain_hash, + BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), + response_type, + &mut cx, + ); + rig.expect_parent_chain_process(); + let process_result = BatchProcessResult::Success { + was_non_empty: true, + }; + bl.parent_chain_processed(chain_hash, process_result, &mut cx); + assert_eq!(bl.parent_lookups.len(), 0); + } + + #[test] + fn test_parent_lookup_rpc_failure() { + let response_type = ResponseType::Block; + let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + + let fork_name = rig + .harness + .spec + .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); + let parent = rig.rand_block(fork_name); + let block = rig.block_with_parent(parent.canonical_root(), fork_name); + let chain_hash = block.canonical_root(); + let peer_id = PeerId::random(); + let block_root = block.canonical_root(); + let parent_root = block.parent_root(); + let slot = block.slot(); + + // Trigger the request + bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); + let id1 = rig.expect_parent_request(response_type); + // If we're in deneb, a blob request should have been triggered as well, + // we don't require a response because we're generateing 0-blob blocks in this test. + if matches!(fork_name, ForkName::Deneb) { + let _ = rig.expect_parent_request(ResponseType::Blob); + } + + // The request fails. It should be tried again. + bl.parent_lookup_failed( + id1, + peer_id, + &mut cx, + RPCError::ErrorResponse( + RPCResponseErrorCode::ResourceUnavailable, + "older than deneb".into(), + ), + ); + let id2 = rig.expect_parent_request(response_type); + + // Send the right block this time. + bl.parent_lookup_response(id2, peer_id, Some(parent.into()), D, &mut cx); + rig.expect_block_process(response_type); + + // Processing succeeds, now the rest of the chain should be sent for processing. + bl.parent_block_processed( + chain_hash, + BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), + response_type, + &mut cx, + ); + rig.expect_parent_chain_process(); + let process_result = BatchProcessResult::Success { + was_non_empty: true, + }; + bl.parent_chain_processed(chain_hash, process_result, &mut cx); + assert_eq!(bl.parent_lookups.len(), 0); + } + + #[test] + fn test_parent_lookup_too_many_attempts() { + let response_type = ResponseType::Block; + let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + + let fork_name = rig + .harness + .spec + .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); + let parent = rig.rand_block(fork_name); + let block = rig.block_with_parent(parent.canonical_root(), fork_name); + let peer_id = PeerId::random(); + let block_root = block.canonical_root(); + let parent_root = block.parent_root(); + let slot = block.slot(); + + // Trigger the request + bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); + for i in 1..=parent_lookup::PARENT_FAIL_TOLERANCE { + let id = rig.expect_parent_request(response_type); + // If we're in deneb, a blob request should have been triggered as well, + // we don't require a response because we're generateing 0-blob blocks in this test. + if matches!(fork_name, ForkName::Deneb) && i == 1 { + let _ = rig.expect_parent_request(ResponseType::Blob); + } + match i % 2 { + // make sure every error is accounted for + 0 => { + // The request fails. It should be tried again. + bl.parent_lookup_failed( + id, + peer_id, + &mut cx, + RPCError::ErrorResponse( + RPCResponseErrorCode::ResourceUnavailable, + "older than deneb".into(), + ), + ); + } + _ => { + // Send a bad block this time. It should be tried again. + let bad_block = rig.rand_block(fork_name); + bl.parent_lookup_response(id, peer_id, Some(bad_block.into()), D, &mut cx); + // Send the stream termination + bl.parent_lookup_response(id, peer_id, None, D, &mut cx); + rig.expect_penalty(); + } + } + if i < parent_lookup::PARENT_FAIL_TOLERANCE { + assert_eq!(bl.parent_lookups[0].failed_block_attempts(), dbg!(i)); + } + } + + assert_eq!(bl.parent_lookups.len(), 0); + } + + #[test] + fn test_parent_lookup_too_many_download_attempts_no_blacklist() { + let response_type = ResponseType::Block; + let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + + let fork_name = rig + .harness + .spec + .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); + let parent = rig.rand_block(fork_name); + let block = rig.block_with_parent(parent.canonical_root(), fork_name); + let block_hash = block.canonical_root(); + let peer_id = PeerId::random(); + let block_root = block.canonical_root(); + let parent_root = block.parent_root(); + let slot = block.slot(); + + // Trigger the request + bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); + for i in 1..=parent_lookup::PARENT_FAIL_TOLERANCE { + assert!(!bl.failed_chains.contains(&block_hash)); + let id = rig.expect_parent_request(response_type); + // If we're in deneb, a blob request should have been triggered as well, + // we don't require a response because we're generateing 0-blob blocks in this test. + if matches!(fork_name, ForkName::Deneb) && i == 1 { + let _ = rig.expect_parent_request(ResponseType::Blob); + } + if i % 2 != 0 { + // The request fails. It should be tried again. + bl.parent_lookup_failed( + id, + peer_id, + &mut cx, + RPCError::ErrorResponse( + RPCResponseErrorCode::ResourceUnavailable, + "older than deneb".into(), + ), + ); + } else { + // Send a bad block this time. It should be tried again. + let bad_block = rig.rand_block(fork_name); + bl.parent_lookup_response(id, peer_id, Some(bad_block.into()), D, &mut cx); + rig.expect_penalty(); + } + if i < parent_lookup::PARENT_FAIL_TOLERANCE { + assert_eq!(bl.parent_lookups[0].failed_block_attempts(), dbg!(i)); + } + } + + assert_eq!(bl.parent_lookups.len(), 0); + assert!(!bl.failed_chains.contains(&block_hash)); + assert!(!bl.failed_chains.contains(&parent.canonical_root())); + } + + #[test] + fn test_parent_lookup_too_many_processing_attempts_must_blacklist() { + let response_type = ResponseType::Block; + const PROCESSING_FAILURES: u8 = parent_lookup::PARENT_FAIL_TOLERANCE / 2 + 1; + let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + let fork_name = rig + .harness + .spec + .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); + + let parent = Arc::new(rig.rand_block(fork_name)); + let block = rig.block_with_parent(parent.canonical_root(), fork_name); + let peer_id = PeerId::random(); + let block_root = block.canonical_root(); + let parent_root = block.parent_root(); + let slot = block.slot(); + + // Trigger the request + bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); + + // Fail downloading the block + for i in 0..(parent_lookup::PARENT_FAIL_TOLERANCE - PROCESSING_FAILURES) { + let id = rig.expect_parent_request(response_type); + // If we're in deneb, a blob request should have been triggered as well, + // we don't require a response because we're generateing 0-blob blocks in this test. + if matches!(fork_name, ForkName::Deneb) && i == 0 { + let _ = rig.expect_parent_request(ResponseType::Blob); + } + // The request fails. It should be tried again. + bl.parent_lookup_failed( + id, + peer_id, + &mut cx, + RPCError::ErrorResponse( + RPCResponseErrorCode::ResourceUnavailable, + "older than deneb".into(), + ), + ); + } + + // Now fail processing a block in the parent request + for _ in 0..PROCESSING_FAILURES { + let id = dbg!(rig.expect_parent_request(response_type)); + // If we're in deneb, a blob request should have been triggered as well, + // we don't require a response because we're generateing 0-blob blocks in this test. + assert!(!bl.failed_chains.contains(&block_root)); + // send the right parent but fail processing + bl.parent_lookup_response(id, peer_id, Some(parent.clone()), D, &mut cx); + bl.parent_block_processed( + block_root, + BlockError::InvalidSignature.into(), + response_type, + &mut cx, + ); + bl.parent_lookup_response(id, peer_id, None, D, &mut cx); + rig.expect_penalty(); + } + + assert!(bl.failed_chains.contains(&block_root)); + assert_eq!(bl.parent_lookups.len(), 0); + } + + #[test] + fn test_parent_lookup_too_deep() { + let response_type = ResponseType::Block; + let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + let fork_name = rig + .harness + .spec + .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); + let mut blocks = + Vec::>>::with_capacity(parent_lookup::PARENT_DEPTH_TOLERANCE); + while blocks.len() < parent_lookup::PARENT_DEPTH_TOLERANCE { + let parent = blocks + .last() + .map(|b| b.canonical_root()) + .unwrap_or_else(Hash256::random); + let block = Arc::new(rig.block_with_parent(parent, fork_name)); + blocks.push(block); + } + + let peer_id = PeerId::random(); + let trigger_block = blocks.pop().unwrap(); + let chain_hash = trigger_block.canonical_root(); + let trigger_block_root = trigger_block.canonical_root(); + let trigger_parent_root = trigger_block.parent_root(); + let trigger_slot = trigger_block.slot(); + bl.search_parent( + trigger_slot, + trigger_block_root, + trigger_parent_root, + peer_id, + &mut cx, + ); + + for block in blocks.into_iter().rev() { + let id = rig.expect_parent_request(response_type); + // If we're in deneb, a blob request should have been triggered as well, + // we don't require a response because we're generateing 0-blob blocks in this test. + if matches!(fork_name, ForkName::Deneb) { + let _ = rig.expect_parent_request(ResponseType::Blob); + } + // the block + bl.parent_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx); + // the stream termination + bl.parent_lookup_response(id, peer_id, None, D, &mut cx); + // the processing request + rig.expect_block_process(response_type); + // the processing result + bl.parent_block_processed( + chain_hash, + BlockError::ParentUnknown(block.into()).into(), + response_type, + &mut cx, + ) + } + + rig.expect_penalty(); + assert!(bl.failed_chains.contains(&chain_hash)); + } + + #[test] + fn test_parent_lookup_disconnection() { + let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + let peer_id = PeerId::random(); + let fork_name = rig + .harness + .spec + .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); + let trigger_block = rig.rand_block(fork_name); + let trigger_block_root = trigger_block.canonical_root(); + let trigger_parent_root = trigger_block.parent_root(); + let trigger_slot = trigger_block.slot(); + bl.search_parent( + trigger_slot, + trigger_block_root, + trigger_parent_root, + peer_id, + &mut cx, + ); + + bl.peer_disconnected(&peer_id, &mut cx); + assert!(bl.parent_lookups.is_empty()); + } + + #[test] + fn test_single_block_lookup_ignored_response() { + let response_type = ResponseType::Block; + let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + + let fork_name = rig + .harness + .spec + .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); + let block = rig.rand_block(fork_name); + let peer_id = PeerId::random(); + + // Trigger the request + bl.search_block( + block.canonical_root(), + peer_id, + PeerShouldHave::BlockAndBlobs, + &mut cx, + ); + let id = rig.expect_block_request(response_type); + // If we're in deneb, a blob request should have been triggered as well, + // we don't require a response because we're generateing 0-blob blocks in this test. + if matches!(fork_name, ForkName::Deneb) { + let _ = rig.expect_block_request(ResponseType::Blob); + } + + // The peer provides the correct block, should not be penalized. Now the block should be sent + // for processing. + bl.single_block_lookup_response(id, peer_id, Some(block.into()), D, &mut cx); + rig.expect_empty_network(); + rig.expect_block_process(response_type); + + // The request should still be active. + assert_eq!(bl.single_block_lookups.len(), 1); + + // Send the stream termination. Peer should have not been penalized, and the request removed + // after processing. + bl.single_block_lookup_response(id, peer_id, None, D, &mut cx); + // Send an Ignored response, the request should be dropped + bl.single_block_processed(id, BlockProcessingResult::Ignored, response_type, &mut cx); + rig.expect_empty_network(); + assert_eq!(bl.single_block_lookups.len(), 0); + } + + #[test] + fn test_parent_lookup_ignored_response() { + let response_type = ResponseType::Block; + let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + + let fork_name = rig + .harness + .spec + .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); + let parent = rig.rand_block(fork_name); + let block = rig.block_with_parent(parent.canonical_root(), fork_name); + let chain_hash = block.canonical_root(); + let peer_id = PeerId::random(); + let block_root = block.canonical_root(); + let parent_root = block.parent_root(); + let slot = block.slot(); + + // Trigger the request + bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); + let id = rig.expect_parent_request(response_type); + + // If we're in deneb, a blob request should have been triggered as well, + // we don't require a response because we're generateing 0-blob blocks in this test. + if matches!(fork_name, ForkName::Deneb) { + let _ = rig.expect_parent_request(ResponseType::Blob); + } + + // Peer sends the right block, it should be sent for processing. Peer should not be penalized. + bl.parent_lookup_response(id, peer_id, Some(parent.into()), D, &mut cx); + rig.expect_block_process(response_type); + rig.expect_empty_network(); + + // Return an Ignored result. The request should be dropped + bl.parent_block_processed( + chain_hash, + BlockProcessingResult::Ignored, + response_type, + &mut cx, + ); + rig.expect_empty_network(); + assert_eq!(bl.parent_lookups.len(), 0); + } + + /// This is a regression test. + #[test] + fn test_same_chain_race_condition() { + let response_type = ResponseType::Block; + let (mut bl, mut cx, mut rig) = TestRig::test_setup(true); + + let fork_name = rig + .harness + .spec + .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); + #[track_caller] + fn parent_lookups_consistency(bl: &BlockLookups) { + let hashes: Vec<_> = bl + .parent_lookups + .iter() + .map(|req| req.chain_hash()) + .collect(); + let expected = hashes.len(); + assert_eq!( + expected, + hashes + .into_iter() + .collect::>() + .len(), + "duplicated chain hashes in parent queue" + ) + } + // if we use one or two blocks it will match on the hash or the parent hash, so make a longer + // chain. + let depth = 4; + let mut blocks = Vec::>>::with_capacity(depth); + while blocks.len() < depth { + let parent = blocks + .last() + .map(|b| b.canonical_root()) + .unwrap_or_else(Hash256::random); + let block = Arc::new(rig.block_with_parent(parent, fork_name)); + blocks.push(block); + } + + let peer_id = PeerId::random(); + let trigger_block = blocks.pop().unwrap(); + let chain_hash = trigger_block.canonical_root(); + let trigger_block_root = trigger_block.canonical_root(); + let trigger_parent_root = trigger_block.parent_root(); + let trigger_slot = trigger_block.slot(); + bl.search_parent( + trigger_slot, + trigger_block_root, + trigger_parent_root, + peer_id, + &mut cx, + ); + + for (i, block) in blocks.into_iter().rev().enumerate() { + let id = rig.expect_parent_request(response_type); + // If we're in deneb, a blob request should have been triggered as well, + // we don't require a response because we're generateing 0-blob blocks in this test. + if matches!(fork_name, ForkName::Deneb) { + let _ = rig.expect_parent_request(ResponseType::Blob); + } + // the block + bl.parent_lookup_response(id, peer_id, Some(block.clone()), D, &mut cx); + // the stream termination + bl.parent_lookup_response(id, peer_id, None, D, &mut cx); + // the processing request + rig.expect_block_process(response_type); + // the processing result + if i + 2 == depth { + // one block was removed + bl.parent_block_processed( + chain_hash, + BlockError::BlockIsAlreadyKnown.into(), + response_type, + &mut cx, + ) + } else { + bl.parent_block_processed( + chain_hash, + BlockError::ParentUnknown(block.into()).into(), + response_type, + &mut cx, + ) + } + parent_lookups_consistency(&bl) + } + + // Processing succeeds, now the rest of the chain should be sent for processing. + rig.expect_parent_chain_process(); + + // Try to get this block again while the chain is being processed. We should not request it again. + let peer_id = PeerId::random(); + let trigger_block_root = trigger_block.canonical_root(); + let trigger_parent_root = trigger_block.parent_root(); + let trigger_slot = trigger_block.slot(); + bl.search_parent( + trigger_slot, + trigger_block_root, + trigger_parent_root, + peer_id, + &mut cx, + ); + parent_lookups_consistency(&bl); + + let process_result = BatchProcessResult::Success { + was_non_empty: true, + }; + bl.parent_chain_processed(chain_hash, process_result, &mut cx); + assert_eq!(bl.parent_lookups.len(), 0); + } } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index e645ba9f96..e85a53ab2b 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -34,7 +34,7 @@ //! search for the block and subsequently search for parents if needed. use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; -use super::block_lookups::{BlockLookups, PeerShouldHave}; +use super::block_lookups::{BlockLookups, PeerSource}; use super::network_context::{BlockOrBlob, SyncNetworkContext}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; @@ -700,8 +700,7 @@ impl SyncManager { if self.synced_and_connected(&peer_id) { self.block_lookups.search_block( block_hash, - peer_id, - PeerShouldHave::BlockAndBlobs, + PeerSource::Attestation(peer_id), &mut self.network, ); } @@ -719,8 +718,7 @@ impl SyncManager { } else { self.block_lookups.search_block( block_hash, - peer_id, - PeerShouldHave::Neither, + PeerSource::Gossip(peer_id), &mut self.network, ) }