From b1f9751a697c9ae21cced6735942f07756fe51ef Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 10 Apr 2024 21:05:18 +0900 Subject: [PATCH] Event-based block lookup tests (#5534) * WIP * Initial working version of new sync tests. * Remove sync traits and fix lints. * Reduce internal method visibility and make test method instead. Remove extra beacon chain harness instance created in tests. * Improve `SyncTester` api. * Fix lint. * Test example * Lookup tests using rig * Tests should interface with events only * lint * Skip deneb test pre-deneb * Add more assertions * Remove logging changes * Address @jimmygchen comments * Merge branch 'unstable' of https://github.com/sigp/lighthouse into bn-p2p-tests * remove unused assertions * fix lint --- beacon_node/beacon_chain/src/test_utils.rs | 3 + .../src/peer_manager/peerdb.rs | 15 + beacon_node/network/Cargo.toml | 1 + .../src/network_beacon_processor/mod.rs | 23 +- .../network/src/sync/block_lookups/mod.rs | 18 + .../network/src/sync/block_lookups/tests.rs | 1814 ++++++++--------- beacon_node/network/src/sync/manager.rs | 74 +- .../network/src/sync/range_sync/range.rs | 7 +- 8 files changed, 939 insertions(+), 1016 deletions(-) diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 0a494e1d8a..a73ba87f67 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -2498,6 +2498,7 @@ pub fn build_log(level: slog::Level, enabled: bool) -> Logger { pub enum NumBlobs { Random, + Number(usize), None, } @@ -2518,6 +2519,7 @@ pub fn generate_rand_block_and_blobs( let payload: &mut FullPayloadDeneb = &mut message.body.execution_payload; let num_blobs = match num_blobs { NumBlobs::Random => rng.gen_range(1..=E::max_blobs_per_block()), + NumBlobs::Number(n) => n, NumBlobs::None => 0, }; let (bundle, transactions) = @@ -2537,6 +2539,7 @@ pub fn generate_rand_block_and_blobs( let payload: &mut FullPayloadElectra = &mut message.body.execution_payload; let num_blobs = match num_blobs { NumBlobs::Random => rng.gen_range(1..=E::max_blobs_per_block()), + NumBlobs::Number(n) => n, NumBlobs::None => 0, }; let (bundle, transactions) = diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 978f815d5b..c3e77ae225 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -1,3 +1,4 @@ +use crate::discovery::CombinedKey; use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, Gossipsub, PeerId}; use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo}; use rand::seq::SliceRandom; @@ -671,6 +672,20 @@ impl PeerDB { ); } + /// Updates the connection state. MUST ONLY BE USED IN TESTS. + pub fn __add_connected_peer_testing_only(&mut self, peer_id: &PeerId) -> Option { + let enr_key = CombinedKey::generate_secp256k1(); + let enr = Enr::builder().build(&enr_key).unwrap(); + self.update_connection_state( + peer_id, + NewConnectionState::Connected { + enr: Some(enr), + seen_address: Multiaddr::empty(), + direction: ConnectionDirection::Outgoing, + }, + ) + } + /// The connection state of the peer has been changed. Modify the peer in the db to ensure all /// variables are in sync with libp2p. /// Updating the state can lead to a `BanOperation` which needs to be processed via the peer diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index d3d711884b..406015360e 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -58,3 +58,4 @@ environment = { workspace = true } disable-backfill = [] fork_from_env = ["beacon_chain/fork_from_env"] portable = ["beacon_chain/portable"] +test_logger = [] diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index ce368d5d6d..6872a712c9 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -3,16 +3,13 @@ use crate::{ sync::{manager::BlockProcessType, SyncMessage}, }; use beacon_chain::block_verification_types::RpcBlock; -use beacon_chain::{ - builder::Witness, eth1_chain::CachingEth1Backend, test_utils::BeaconChainHarness, BeaconChain, -}; +use beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend, BeaconChain}; use beacon_chain::{BeaconChainTypes, NotifyExecutionLayer}; use beacon_processor::{ work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorChannels, BeaconProcessorSend, DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work, WorkEvent as BeaconWorkEvent, }; -use environment::null_logger; use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; use lighthouse_network::{ rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage}, @@ -24,7 +21,6 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use store::MemoryStore; -use task_executor::test_utils::TestRuntime; use task_executor::TaskExecutor; use tokio::sync::mpsc::{self, error::TrySendError}; use types::*; @@ -667,6 +663,9 @@ impl NetworkBeaconProcessor> { // processor (but not much else). pub fn null_for_testing( network_globals: Arc>, + chain: Arc>>, + executor: TaskExecutor, + log: Logger, ) -> (Self, mpsc::Receiver>) { let BeaconProcessorChannels { beacon_processor_tx, @@ -677,27 +676,17 @@ impl NetworkBeaconProcessor> { let (network_tx, _network_rx) = mpsc::unbounded_channel(); let (sync_tx, _sync_rx) = mpsc::unbounded_channel(); - let log = null_logger().unwrap(); - let harness: BeaconChainHarness> = - BeaconChainHarness::builder(E::default()) - .spec(E::default_spec()) - .deterministic_keypairs(8) - .logger(log.clone()) - .fresh_ephemeral_store() - .mock_execution_layer() - .build(); - let runtime = TestRuntime::default(); let network_beacon_processor = Self { beacon_processor_send: beacon_processor_tx, duplicate_cache: DuplicateCache::default(), - chain: harness.chain, + chain, network_tx, sync_tx, reprocess_tx: work_reprocessing_tx, network_globals, invalid_block_storage: InvalidBlockStorage::Disabled, - executor: runtime.task_executor.clone(), + executor, log, }; diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 76efc728fd..f3c4a768ff 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -76,6 +76,24 @@ impl BlockLookups { } } + #[cfg(test)] + pub(crate) fn active_single_lookups(&self) -> Vec { + self.single_block_lookups.keys().cloned().collect() + } + + #[cfg(test)] + pub(crate) fn active_parent_lookups(&self) -> Vec { + self.parent_lookups + .iter() + .map(|r| r.chain_hash()) + .collect::>() + } + + #[cfg(test)] + pub(crate) fn failed_chains_contains(&mut self, chain_hash: &Hash256) -> bool { + self.failed_chains.contains(chain_hash) + } + /* Lookup requests */ /// Creates a lookup for the block with the given `block_root` and immediately triggers it. diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 2822cb7ba0..2d237ee797 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -1,7 +1,8 @@ use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::service::RequestId; -use crate::sync::manager::{RequestId as SyncId, SingleLookupReqId}; +use crate::sync::manager::{RequestId as SyncRequestId, SingleLookupReqId, SyncManager}; +use crate::sync::SyncMessage; use crate::NetworkMessage; use std::sync::Arc; @@ -15,29 +16,65 @@ use beacon_chain::test_utils::{ }; use beacon_processor::WorkEvent; use lighthouse_network::rpc::RPCResponseErrorCode; +use lighthouse_network::types::SyncState; use lighthouse_network::{NetworkGlobals, Request}; use slot_clock::{ManualSlotClock, SlotClock, TestingSlotClock}; use store::MemoryStore; use tokio::sync::mpsc; use types::{ test_utils::{SeedableRng, XorShiftRng}, - BlobSidecar, EthSpec, ForkName, MinimalEthSpec as E, SignedBeaconBlock, + BlobSidecar, ForkName, MinimalEthSpec as E, SignedBeaconBlock, }; type T = Witness, E, MemoryStore, MemoryStore>; +/// This test utility enables integration testing of Lighthouse sync components. +/// +/// It covers the following: +/// 1. Sending `SyncMessage` to `SyncManager` to trigger `RangeSync`, `BackFillSync` and `BlockLookups` behaviours. +/// 2. Making assertions on `WorkEvent`s received from sync +/// 3. Making assertion on `NetworkMessage` received from sync (Outgoing RPC requests). +/// +/// The test utility covers testing the interactions from and to `SyncManager`. In diagram form: +/// +-----------------+ +/// | BeaconProcessor | +/// +---------+-------+ +/// ^ | +/// | | +/// WorkEvent | | SyncMsg +/// | | (Result) +/// | v +/// +--------+ +-----+-----------+ +----------------+ +/// | Router +----------->| SyncManager +------------>| NetworkService | +/// +--------+ SyncMsg +-----------------+ NetworkMsg +----------------+ +/// (RPC resp) | - RangeSync | (RPC req) +/// +-----------------+ +/// | - BackFillSync | +/// +-----------------+ +/// | - BlockLookups | +/// +-----------------+ struct TestRig { + /// Receiver for `BeaconProcessor` events (e.g. block processing results). beacon_processor_rx: mpsc::Receiver>, + /// Receiver for `NetworkMessage` (e.g. outgoing RPC requests from sync) network_rx: mpsc::UnboundedReceiver>, + /// Stores all `NetworkMessage`s received from `network_recv`. (e.g. outgoing RPC requests) + network_rx_queue: Vec>, + /// To send `SyncMessage`. For sending RPC responses or block processing results to sync. + sync_manager: SyncManager, + /// To manipulate sync state and peer connection status + network_globals: Arc>, + /// `rng` for generating test blocks and blobs. rng: XorShiftRng, - harness: BeaconChainHarness, + fork_name: ForkName, } const D: Duration = Duration::new(0, 0); impl TestRig { - fn test_setup(enable_log: bool) -> (BlockLookups, SyncNetworkContext, Self) { - let log = build_log(slog::Level::Debug, enable_log); + fn test_setup() -> Self { + let enable_log = cfg!(feature = "test_logger"); + let log = build_log(slog::Level::Trace, enable_log); // Initialise a new beacon chain let harness = BeaconChainHarness::>::builder(E) @@ -56,91 +93,399 @@ impl TestRig { let (network_tx, network_rx) = mpsc::unbounded_channel(); let globals = Arc::new(NetworkGlobals::new_test_globals(Vec::new(), &log)); - let (network_beacon_processor, beacon_processor_rx) = - NetworkBeaconProcessor::null_for_testing(globals); + let (beacon_processor, beacon_processor_rx) = NetworkBeaconProcessor::null_for_testing( + globals, + chain.clone(), + harness.runtime.task_executor.clone(), + log.clone(), + ); + + let (_sync_send, sync_recv) = mpsc::unbounded_channel::>(); + + let fork_name = chain.spec.fork_name_at_slot::(chain.slot().unwrap()); + + // All current tests expect synced and EL online state + beacon_processor + .network_globals + .set_sync_state(SyncState::Synced); + let rng = XorShiftRng::from_seed([42; 16]); - let rig = TestRig { + TestRig { beacon_processor_rx, network_rx, + network_rx_queue: vec![], rng, - harness, - }; - - let bl = BlockLookups::new( - chain.data_availability_checker.clone(), - log.new(slog::o!("component" => "block_lookups")), - ); - let cx = { - SyncNetworkContext::new( - network_tx, - Arc::new(network_beacon_processor), + network_globals: beacon_processor.network_globals.clone(), + sync_manager: SyncManager::new( chain, - log.new(slog::o!("component" => "network_context")), - ) - }; - - (bl, cx, rig) + network_tx, + beacon_processor.into(), + sync_recv, + log.clone(), + ), + fork_name, + } } - fn rand_block(&mut self, fork_name: ForkName) -> SignedBeaconBlock { - self.rand_block_and_blobs(fork_name, NumBlobs::None).0 + fn test_setup_after_deneb() -> Option { + let r = Self::test_setup(); + if r.after_deneb() { + Some(r) + } else { + None + } + } + + fn after_deneb(&self) -> bool { + matches!(self.fork_name, ForkName::Deneb | ForkName::Electra) + } + + fn trigger_unknown_parent_block(&mut self, peer_id: PeerId, block: Arc>) { + let block_root = block.canonical_root(); + self.send_sync_message(SyncMessage::UnknownParentBlock( + peer_id, + RpcBlock::new_without_blobs(Some(block_root), block), + block_root, + )) + } + + fn trigger_unknown_parent_blob(&mut self, peer_id: PeerId, blob: BlobSidecar) { + self.send_sync_message(SyncMessage::UnknownParentBlob(peer_id, blob.into())); + } + + fn trigger_unknown_block_from_attestation(&mut self, block_root: Hash256, peer_id: PeerId) { + self.send_sync_message(SyncMessage::UnknownBlockHashFromAttestation( + peer_id, block_root, + )); + } + + fn rand_block(&mut self) -> SignedBeaconBlock { + self.rand_block_and_blobs(NumBlobs::None).0 } fn rand_block_and_blobs( &mut self, - fork_name: ForkName, num_blobs: NumBlobs, ) -> (SignedBeaconBlock, Vec>) { + let fork_name = self.fork_name; let rng = &mut self.rng; generate_rand_block_and_blobs::(fork_name, num_blobs, rng) } + pub fn rand_block_and_parent( + &mut self, + ) -> (SignedBeaconBlock, SignedBeaconBlock, Hash256, Hash256) { + let parent = self.rand_block(); + let parent_root = parent.canonical_root(); + let mut block = self.rand_block(); + *block.message_mut().parent_root_mut() = parent_root; + let block_root = block.canonical_root(); + (parent, block, parent_root, block_root) + } + + fn send_sync_message(&mut self, sync_message: SyncMessage) { + self.sync_manager.handle_message(sync_message); + } + + fn active_single_lookups_count(&self) -> usize { + self.sync_manager.active_single_lookups().len() + } + + fn active_parent_lookups(&self) -> Vec { + self.sync_manager.active_parent_lookups() + } + + fn active_parent_lookups_count(&self) -> usize { + self.sync_manager.active_parent_lookups().len() + } + + fn failed_chains_contains(&mut self, chain_hash: &Hash256) -> bool { + self.sync_manager.failed_chains_contains(chain_hash) + } + #[track_caller] - fn expect_lookup_request(&mut self, response_type: ResponseType) -> SingleLookupReqId { - match response_type { - ResponseType::Block => match self.network_rx.try_recv() { - Ok(NetworkMessage::SendRequest { - peer_id: _, - request: Request::BlocksByRoot(_request), - request_id: RequestId::Sync(SyncId::SingleBlock { id }), - }) => id, - other => { - panic!("Expected block request, found {:?}", other); - } - }, - ResponseType::Blob => match self.network_rx.try_recv() { - Ok(NetworkMessage::SendRequest { - peer_id: _, - request: Request::BlobsByRoot(_request), - request_id: RequestId::Sync(SyncId::SingleBlob { id }), - }) => id, - other => { - panic!("Expected blob request, found {:?}", other); - } + fn assert_parent_lookups_consistency(&self) { + let hashes = self.active_parent_lookups(); + let expected = hashes.len(); + assert_eq!( + expected, + hashes + .into_iter() + .collect::>() + .len(), + "duplicated chain hashes in parent queue" + ) + } + + fn new_connected_peer(&mut self) -> PeerId { + let peer_id = PeerId::random(); + self.network_globals + .peers + .write() + .__add_connected_peer_testing_only(&peer_id); + peer_id + } + + fn parent_chain_processed(&mut self, chain_hash: Hash256, result: BatchProcessResult) { + self.send_sync_message(SyncMessage::BatchProcessed { + sync_type: ChainSegmentProcessId::ParentLookup(chain_hash), + result, + }) + } + + fn parent_chain_processed_success(&mut self, chain_hash: Hash256) { + self.parent_chain_processed( + chain_hash, + BatchProcessResult::Success { + was_non_empty: true, }, + ) + } + + fn parent_block_processed(&mut self, chain_hash: Hash256, result: BlockProcessingResult) { + self.send_sync_message(SyncMessage::BlockComponentProcessed { + process_type: BlockProcessType::ParentLookup { chain_hash }, + result, + }); + } + + fn parent_block_processed_imported(&mut self, chain_hash: Hash256) { + self.parent_block_processed( + chain_hash, + BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(chain_hash)), + ); + } + + fn single_block_component_processed( + &mut self, + id: SingleLookupReqId, + result: BlockProcessingResult, + ) { + self.send_sync_message(SyncMessage::BlockComponentProcessed { + process_type: BlockProcessType::SingleBlock { id: id.id }, + result, + }) + } + + fn single_block_component_processed_imported( + &mut self, + id: SingleLookupReqId, + block_root: Hash256, + ) { + self.single_block_component_processed( + id, + BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), + ) + } + + fn single_blob_component_processed( + &mut self, + id: SingleLookupReqId, + result: BlockProcessingResult, + ) { + self.send_sync_message(SyncMessage::BlockComponentProcessed { + process_type: BlockProcessType::SingleBlob { id: id.id }, + result, + }) + } + + fn parent_lookup_block_response( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + beacon_block: Option>>, + ) { + self.send_sync_message(SyncMessage::RpcBlock { + request_id: SyncRequestId::ParentLookup { id }, + peer_id, + beacon_block, + seen_timestamp: D, + }); + } + + fn single_lookup_block_response( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + beacon_block: Option>>, + ) { + self.send_sync_message(SyncMessage::RpcBlock { + request_id: SyncRequestId::SingleBlock { id }, + peer_id, + beacon_block, + seen_timestamp: D, + }); + } + + fn parent_lookup_blob_response( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + blob_sidecar: Option>>, + ) { + self.send_sync_message(SyncMessage::RpcBlob { + request_id: SyncRequestId::ParentLookupBlob { id }, + peer_id, + blob_sidecar, + seen_timestamp: D, + }); + } + + fn single_lookup_blob_response( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + blob_sidecar: Option>>, + ) { + self.send_sync_message(SyncMessage::RpcBlob { + request_id: SyncRequestId::SingleBlob { id }, + peer_id, + blob_sidecar, + seen_timestamp: D, + }); + } + + fn parent_lookup_failed(&mut self, id: SingleLookupReqId, peer_id: PeerId, error: RPCError) { + self.send_sync_message(SyncMessage::RpcError { + peer_id, + request_id: SyncRequestId::ParentLookup { id }, + error, + }) + } + + fn parent_lookup_failed_unavailable(&mut self, id: SingleLookupReqId, peer_id: PeerId) { + self.parent_lookup_failed( + id, + peer_id, + RPCError::ErrorResponse( + RPCResponseErrorCode::ResourceUnavailable, + "older than deneb".into(), + ), + ); + } + + fn single_lookup_failed(&mut self, id: SingleLookupReqId, peer_id: PeerId, error: RPCError) { + self.send_sync_message(SyncMessage::RpcError { + peer_id, + request_id: SyncRequestId::SingleBlock { id }, + error, + }) + } + + fn peer_disconnected(&mut self, peer_id: PeerId) { + self.send_sync_message(SyncMessage::Disconnect(peer_id)); + } + + fn drain_network_rx(&mut self) { + while let Ok(event) = self.network_rx.try_recv() { + self.network_rx_queue.push(event); + } + } + + fn pop_received_network_event) -> Option>( + &mut self, + predicate_transform: F, + ) -> Result { + self.drain_network_rx(); + + if let Some(index) = self + .network_rx_queue + .iter() + .position(|x| predicate_transform(x).is_some()) + { + // Transform the item, knowing that it won't be None because we checked it in the position predicate. + let transformed = predicate_transform(&self.network_rx_queue[index]).unwrap(); + self.network_rx_queue.remove(index); + Ok(transformed) + } else { + Err(format!("current network messages {:?}", self.network_rx_queue).to_string()) } } #[track_caller] - fn expect_parent_request(&mut self, response_type: ResponseType) -> SingleLookupReqId { - match response_type { - ResponseType::Block => match self.network_rx.try_recv() { - Ok(NetworkMessage::SendRequest { - peer_id: _, - request: Request::BlocksByRoot(_request), - request_id: RequestId::Sync(SyncId::ParentLookup { id }), - }) => id, - other => panic!("Expected parent request, found {:?}", other), - }, - ResponseType::Blob => match self.network_rx.try_recv() { - Ok(NetworkMessage::SendRequest { - peer_id: _, - request: Request::BlobsByRoot(_request), - request_id: RequestId::Sync(SyncId::ParentLookupBlob { id }), - }) => id, - other => panic!("Expected parent blobs request, found {:?}", other), - }, + fn expect_block_lookup_request(&mut self, for_block: Hash256) -> SingleLookupReqId { + self.pop_received_network_event(|ev| match ev { + NetworkMessage::SendRequest { + peer_id: _, + request: Request::BlocksByRoot(request), + request_id: RequestId::Sync(SyncRequestId::SingleBlock { id }), + } if request.block_roots().to_vec().contains(&for_block) => Some(*id), + _ => None, + }) + .unwrap_or_else(|e| panic!("Expected block request for {for_block:?}: {e}")) + } + + #[track_caller] + fn expect_blob_lookup_request(&mut self, for_block: Hash256) -> SingleLookupReqId { + self.pop_received_network_event(|ev| match ev { + NetworkMessage::SendRequest { + peer_id: _, + request: Request::BlobsByRoot(request), + request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }), + } if request + .blob_ids + .to_vec() + .iter() + .any(|r| r.block_root == for_block) => + { + Some(*id) + } + _ => None, + }) + .unwrap_or_else(|e| panic!("Expected blob request for {for_block:?}: {e}")) + } + + #[track_caller] + fn expect_block_parent_request(&mut self, for_block: Hash256) -> SingleLookupReqId { + self.pop_received_network_event(|ev| match ev { + NetworkMessage::SendRequest { + peer_id: _, + request: Request::BlocksByRoot(request), + request_id: RequestId::Sync(SyncRequestId::ParentLookup { id }), + } if request.block_roots().to_vec().contains(&for_block) => Some(*id), + _ => None, + }) + .unwrap_or_else(|e| panic!("Expected block parent request for {for_block:?}: {e}")) + } + + #[track_caller] + fn expect_blob_parent_request(&mut self, for_block: Hash256) -> SingleLookupReqId { + self.pop_received_network_event(|ev| match ev { + NetworkMessage::SendRequest { + peer_id: _, + request: Request::BlobsByRoot(request), + request_id: RequestId::Sync(SyncRequestId::ParentLookupBlob { id }), + } if request + .blob_ids + .to_vec() + .iter() + .all(|r| r.block_root == for_block) => + { + Some(*id) + } + _ => None, + }) + .unwrap_or_else(|e| panic!("Expected blob parent request for {for_block:?}: {e}")) + } + + fn expect_lookup_request_block_and_blobs(&mut self, block_root: Hash256) -> SingleLookupReqId { + let id = self.expect_block_lookup_request(block_root); + // If we're in deneb, a blob request should have been triggered as well, + // we don't require a response because we're generateing 0-blob blocks in this test. + if self.after_deneb() { + let _ = self.expect_blob_lookup_request(block_root); } + id + } + + fn expect_parent_request_block_and_blobs(&mut self, block_root: Hash256) -> SingleLookupReqId { + let id = self.expect_block_parent_request(block_root); + // If we're in deneb, a blob request should have been triggered as well, + // we don't require a response because we're generateing 0-blob blocks in this test. + if self.after_deneb() { + let _ = self.expect_blob_parent_request(block_root); + } + id } #[track_caller] @@ -161,6 +506,23 @@ impl TestRig { } } + fn expect_no_penalty_for(&mut self, peer_id: PeerId) { + self.drain_network_rx(); + let downscore_events = self + .network_rx_queue + .iter() + .filter_map(|ev| match ev { + NetworkMessage::ReportPeer { + peer_id: p_id, msg, .. + } if p_id == &peer_id => Some(msg), + _ => None, + }) + .collect::>(); + if !downscore_events.is_empty() { + panic!("Some downscore events for {peer_id}: {downscore_events:?}"); + } + } + #[track_caller] fn expect_parent_chain_process(&mut self) { match self.beacon_processor_rx.try_recv() { @@ -173,10 +535,10 @@ impl TestRig { #[track_caller] fn expect_empty_network(&mut self) { - assert_eq!( - self.network_rx.try_recv().expect_err("must err"), - mpsc::error::TryRecvError::Empty - ); + self.drain_network_rx(); + if !self.network_rx_queue.is_empty() { + panic!("expected no network events: {:#?}", self.network_rx_queue); + } } #[track_caller] @@ -188,988 +550,538 @@ impl TestRig { } #[track_caller] - pub fn expect_penalty(&mut self) { - match self.network_rx.try_recv() { - Ok(NetworkMessage::ReportPeer { .. }) => {} - other => panic!("Expected peer penalty, found {:?}", other), - } - } - - pub fn block_with_parent( - &mut self, - parent_root: Hash256, - fork_name: ForkName, - ) -> SignedBeaconBlock { - let mut block = self.rand_block(fork_name); - *block.message_mut().parent_root_mut() = parent_root; - block + pub fn expect_penalty(&mut self, peer_id: PeerId) { + self.pop_received_network_event(|ev| match ev { + NetworkMessage::ReportPeer { peer_id: p_id, .. } if p_id == &peer_id => Some(()), + _ => None, + }) + .unwrap_or_else(|_| { + panic!( + "Expected peer penalty for {peer_id}: {:#?}", + self.network_rx_queue + ) + }) } pub fn block_with_parent_and_blobs( &mut self, parent_root: Hash256, - fork_name: ForkName, num_blobs: NumBlobs, ) -> (SignedBeaconBlock, Vec>) { - let (mut block, mut blobs) = self.rand_block_and_blobs(fork_name, num_blobs); + let (mut block, mut blobs) = self.rand_block_and_blobs(num_blobs); *block.message_mut().parent_root_mut() = parent_root; blobs.iter_mut().for_each(|blob| { blob.signed_block_header = block.signed_block_header(); }); (block, blobs) } + + pub fn rand_blockchain(&mut self, depth: usize) -> Vec>> { + let mut blocks = Vec::>>::with_capacity(depth); + while blocks.len() < depth { + let parent = blocks + .last() + .map(|b| b.canonical_root()) + .unwrap_or_else(Hash256::random); + let mut block = self.rand_block(); + *block.message_mut().parent_root_mut() = parent; + blocks.push(block.into()); + } + blocks + } } #[test] fn test_single_block_lookup_happy_path() { - let response_type = ResponseType::Block; - let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - let fork_name = rig - .harness - .spec - .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); - - let block = rig.rand_block(fork_name); - let peer_id = PeerId::random(); + let mut rig = TestRig::test_setup(); + let block = rig.rand_block(); + let peer_id = rig.new_connected_peer(); let block_root = block.canonical_root(); // Trigger the request - bl.search_block(block_root, &[peer_id], &mut cx); - let id = rig.expect_lookup_request(response_type); - // If we're in deneb, a blob request should have been triggered as well, - // we don't require a response because we're generateing 0-blob blocks in this test. - if matches!(fork_name, ForkName::Deneb | ForkName::Electra) { - let _ = rig.expect_lookup_request(ResponseType::Blob); - } + rig.trigger_unknown_block_from_attestation(block_root, peer_id); + let id = rig.expect_lookup_request_block_and_blobs(block_root); // The peer provides the correct block, should not be penalized. Now the block should be sent // for processing. - bl.single_lookup_response::>( - id, - peer_id, - Some(block.into()), - D, - &cx, - ); + rig.single_lookup_block_response(id, peer_id, Some(block.into())); rig.expect_empty_network(); - rig.expect_block_process(response_type); + rig.expect_block_process(ResponseType::Block); // The request should still be active. - assert_eq!(bl.single_block_lookups.len(), 1); + assert_eq!(rig.active_single_lookups_count(), 1); // Send the stream termination. Peer should have not been penalized, and the request removed // after processing. - bl.single_lookup_response::>(id, peer_id, None, D, &cx); - bl.single_block_component_processed::>( - id.id, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), - &mut cx, - ); + rig.single_lookup_block_response(id, peer_id, None); + rig.single_block_component_processed_imported(id, block_root); rig.expect_empty_network(); - assert_eq!(bl.single_block_lookups.len(), 0); + assert_eq!(rig.active_single_lookups_count(), 0); } #[test] fn test_single_block_lookup_empty_response() { - let response_type = ResponseType::Block; - let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - let fork_name = rig - .harness - .spec - .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); + let mut rig = TestRig::test_setup(); let block_hash = Hash256::random(); - let peer_id = PeerId::random(); + let peer_id = rig.new_connected_peer(); // Trigger the request - bl.search_block(block_hash, &[peer_id], &mut cx); - let id = rig.expect_lookup_request(response_type); - // If we're in deneb, a blob request should have been triggered as well, - // we don't require a response because we're generateing 0-blob blocks in this test. - if matches!(fork_name, ForkName::Deneb | ForkName::Electra) { - let _ = rig.expect_lookup_request(ResponseType::Blob); - } + rig.trigger_unknown_block_from_attestation(block_hash, peer_id); + let id = rig.expect_lookup_request_block_and_blobs(block_hash); // The peer does not have the block. It should be penalized. - bl.single_lookup_response::>(id, peer_id, None, D, &cx); - rig.expect_penalty(); + rig.single_lookup_block_response(id, peer_id, None); + rig.expect_penalty(peer_id); - rig.expect_lookup_request(response_type); // it should be retried + rig.expect_block_lookup_request(block_hash); // it should be retried } #[test] fn test_single_block_lookup_wrong_response() { - let response_type = ResponseType::Block; - let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + let mut rig = TestRig::test_setup(); - let fork_name = rig - .harness - .spec - .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); let block_hash = Hash256::random(); - let peer_id = PeerId::random(); + let peer_id = rig.new_connected_peer(); // Trigger the request - bl.search_block(block_hash, &[peer_id], &mut cx); - let id = rig.expect_lookup_request(response_type); - // If we're in deneb, a blob request should have been triggered as well, - // we don't require a response because we're generateing 0-blob blocks in this test. - if matches!(fork_name, ForkName::Deneb | ForkName::Electra) { - let _ = rig.expect_lookup_request(ResponseType::Blob); - } + rig.trigger_unknown_block_from_attestation(block_hash, peer_id); + let id = rig.expect_lookup_request_block_and_blobs(block_hash); // Peer sends something else. It should be penalized. - let bad_block = rig.rand_block(fork_name); - bl.single_lookup_response::>( - id, - peer_id, - Some(bad_block.into()), - D, - &cx, - ); - rig.expect_penalty(); - rig.expect_lookup_request(response_type); // should be retried + let bad_block = rig.rand_block(); + rig.single_lookup_block_response(id, peer_id, Some(bad_block.into())); + rig.expect_penalty(peer_id); + rig.expect_block_lookup_request(block_hash); // should be retried // Send the stream termination. This should not produce an additional penalty. - bl.single_lookup_response::>(id, peer_id, None, D, &cx); + rig.single_lookup_block_response(id, peer_id, None); rig.expect_empty_network(); } #[test] fn test_single_block_lookup_failure() { - let response_type = ResponseType::Block; - let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + let mut rig = TestRig::test_setup(); - let fork_name = rig - .harness - .spec - .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); let block_hash = Hash256::random(); - let peer_id = PeerId::random(); + let peer_id = rig.new_connected_peer(); // Trigger the request - bl.search_block(block_hash, &[peer_id], &mut cx); - let id = rig.expect_lookup_request(response_type); - // If we're in deneb, a blob request should have been triggered as well, - // we don't require a response because we're generateing 0-blob blocks in this test. - if matches!(fork_name, ForkName::Deneb | ForkName::Electra) { - let _ = rig.expect_lookup_request(ResponseType::Blob); - } + rig.trigger_unknown_block_from_attestation(block_hash, peer_id); + let id = rig.expect_lookup_request_block_and_blobs(block_hash); // The request fails. RPC failures are handled elsewhere so we should not penalize the peer. - bl.single_block_lookup_failed::>( - id, - &peer_id, - &cx, - RPCError::UnsupportedProtocol, - ); - rig.expect_lookup_request(response_type); + rig.single_lookup_failed(id, peer_id, RPCError::UnsupportedProtocol); + rig.expect_block_lookup_request(block_hash); rig.expect_empty_network(); } #[test] fn test_single_block_lookup_becomes_parent_request() { - let response_type = ResponseType::Block; - let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + let mut rig = TestRig::test_setup(); - let fork_name = rig - .harness - .spec - .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); - let block = Arc::new(rig.rand_block(fork_name)); - let peer_id = PeerId::random(); + let block = Arc::new(rig.rand_block()); + let block_root = block.canonical_root(); + let parent_root = block.parent_root(); + let peer_id = rig.new_connected_peer(); // Trigger the request - bl.search_block(block.canonical_root(), &[peer_id], &mut cx); - let id = rig.expect_lookup_request(response_type); - // If we're in deneb, a blob request should have been triggered as well, - // we don't require a response because we're generateing 0-blob blocks in this test. - if matches!(fork_name, ForkName::Deneb | ForkName::Electra) { - let _ = rig.expect_lookup_request(ResponseType::Blob); - } + rig.trigger_unknown_block_from_attestation(block.canonical_root(), peer_id); + let id = rig.expect_lookup_request_block_and_blobs(block_root); // The peer provides the correct block, should not be penalized. Now the block should be sent // for processing. - bl.single_lookup_response::>( - id, - peer_id, - Some(block.clone()), - D, - &cx, - ); + rig.single_lookup_block_response(id, peer_id, Some(block.clone())); rig.expect_empty_network(); - rig.expect_block_process(response_type); + rig.expect_block_process(ResponseType::Block); // The request should still be active. - assert_eq!(bl.single_block_lookups.len(), 1); + assert_eq!(rig.active_single_lookups_count(), 1); // Send the stream termination. Peer should have not been penalized, and the request moved to a // parent request after processing. - bl.single_block_component_processed::>( - id.id, + rig.single_block_component_processed( + id, BlockError::ParentUnknown(RpcBlock::new_without_blobs(None, block)).into(), - &mut cx, ); - assert_eq!(bl.single_block_lookups.len(), 1); - rig.expect_parent_request(response_type); - // If we're in deneb, a blob request should have been triggered as well, - // we don't require a response because we're generateing 0-blob blocks in this test. - if matches!(fork_name, ForkName::Deneb | ForkName::Electra) { - let _ = rig.expect_parent_request(ResponseType::Blob); - } + assert_eq!(rig.active_single_lookups_count(), 1); + rig.expect_parent_request_block_and_blobs(parent_root); rig.expect_empty_network(); - assert_eq!(bl.parent_lookups.len(), 1); + assert_eq!(rig.active_parent_lookups_count(), 1); } #[test] fn test_parent_lookup_happy_path() { - let response_type = ResponseType::Block; - let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + let mut rig = TestRig::test_setup(); - let fork_name = rig - .harness - .spec - .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); - let parent = rig.rand_block(fork_name); - let block = rig.block_with_parent(parent.canonical_root(), fork_name); - let chain_hash = block.canonical_root(); - let peer_id = PeerId::random(); - let block_root = block.canonical_root(); - let parent_root = block.parent_root(); - let slot = block.slot(); + let (parent, block, parent_root, block_root) = rig.rand_block_and_parent(); + let peer_id = rig.new_connected_peer(); // Trigger the request - bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); - let id = rig.expect_parent_request(response_type); - // If we're in deneb, a blob request should have been triggered as well, - // we don't require a response because we're generateing 0-blob blocks in this test. - if matches!(fork_name, ForkName::Deneb | ForkName::Electra) { - let _ = rig.expect_parent_request(ResponseType::Blob); - } + rig.trigger_unknown_parent_block(peer_id, block.into()); + let id = rig.expect_parent_request_block_and_blobs(parent_root); // Peer sends the right block, it should be sent for processing. Peer should not be penalized. - bl.parent_lookup_response::>( - id, - peer_id, - Some(parent.into()), - D, - &cx, - ); - rig.expect_block_process(response_type); + rig.parent_lookup_block_response(id, peer_id, Some(parent.into())); + rig.expect_block_process(ResponseType::Block); rig.expect_empty_network(); // Processing succeeds, now the rest of the chain should be sent for processing. - bl.parent_block_processed( - chain_hash, + rig.parent_block_processed( + block_root, BlockError::BlockIsAlreadyKnown(block_root).into(), - &mut cx, ); rig.expect_parent_chain_process(); - let process_result = BatchProcessResult::Success { - was_non_empty: true, - }; - bl.parent_chain_processed(chain_hash, process_result, &cx); - assert_eq!(bl.parent_lookups.len(), 0); + rig.parent_chain_processed_success(block_root); + assert_eq!(rig.active_parent_lookups_count(), 0); } #[test] fn test_parent_lookup_wrong_response() { - let response_type = ResponseType::Block; - let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + let mut rig = TestRig::test_setup(); - let fork_name = rig - .harness - .spec - .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); - let parent = rig.rand_block(fork_name); - let block = rig.block_with_parent(parent.canonical_root(), fork_name); - let chain_hash = block.canonical_root(); - let peer_id = PeerId::random(); - let block_root = block.canonical_root(); - let parent_root = block.parent_root(); - let slot = block.slot(); + let (parent, block, parent_root, block_root) = rig.rand_block_and_parent(); + let peer_id = rig.new_connected_peer(); // Trigger the request - bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); - let id1 = rig.expect_parent_request(response_type); - // If we're in deneb, a blob request should have been triggered as well, - // we don't require a response because we're generateing 0-blob blocks in this test. - if matches!(fork_name, ForkName::Deneb | ForkName::Electra) { - let _ = rig.expect_parent_request(ResponseType::Blob); - } + rig.trigger_unknown_parent_block(peer_id, block.into()); + let id1 = rig.expect_parent_request_block_and_blobs(parent_root); // Peer sends the wrong block, peer should be penalized and the block re-requested. - let bad_block = rig.rand_block(fork_name); - bl.parent_lookup_response::>( - id1, - peer_id, - Some(bad_block.into()), - D, - &cx, - ); - rig.expect_penalty(); - let id2 = rig.expect_parent_request(response_type); + let bad_block = rig.rand_block(); + rig.parent_lookup_block_response(id1, peer_id, Some(bad_block.into())); + rig.expect_penalty(peer_id); + let id2 = rig.expect_block_parent_request(parent_root); // Send the stream termination for the first request. This should not produce extra penalties. - bl.parent_lookup_response::>(id1, peer_id, None, D, &cx); + rig.parent_lookup_block_response(id1, peer_id, None); rig.expect_empty_network(); // Send the right block this time. - bl.parent_lookup_response::>( - id2, - peer_id, - Some(parent.into()), - D, - &cx, - ); - rig.expect_block_process(response_type); + rig.parent_lookup_block_response(id2, peer_id, Some(parent.into())); + rig.expect_block_process(ResponseType::Block); // Processing succeeds, now the rest of the chain should be sent for processing. - bl.parent_block_processed( - chain_hash, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), - &mut cx, - ); + rig.parent_block_processed_imported(block_root); rig.expect_parent_chain_process(); - let process_result = BatchProcessResult::Success { - was_non_empty: true, - }; - bl.parent_chain_processed(chain_hash, process_result, &cx); - assert_eq!(bl.parent_lookups.len(), 0); + rig.parent_chain_processed_success(block_root); + assert_eq!(rig.active_parent_lookups_count(), 0); } #[test] fn test_parent_lookup_empty_response() { - let response_type = ResponseType::Block; - let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + let mut rig = TestRig::test_setup(); - let fork_name = rig - .harness - .spec - .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); - let parent = rig.rand_block(fork_name); - let block = rig.block_with_parent(parent.canonical_root(), fork_name); - let chain_hash = block.canonical_root(); - let peer_id = PeerId::random(); - let block_root = block.canonical_root(); - let parent_root = block.parent_root(); - let slot = block.slot(); + let (parent, block, parent_root, block_root) = rig.rand_block_and_parent(); + let peer_id = rig.new_connected_peer(); // Trigger the request - bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); - let id1 = rig.expect_parent_request(response_type); - // If we're in deneb, a blob request should have been triggered as well, - // we don't require a response because we're generateing 0-blob blocks in this test. - if matches!(fork_name, ForkName::Deneb | ForkName::Electra) { - let _ = rig.expect_parent_request(ResponseType::Blob); - } + rig.trigger_unknown_parent_block(peer_id, block.into()); + let id1 = rig.expect_parent_request_block_and_blobs(parent_root); // Peer sends an empty response, peer should be penalized and the block re-requested. - bl.parent_lookup_response::>(id1, peer_id, None, D, &cx); - rig.expect_penalty(); - let id2 = rig.expect_parent_request(response_type); + rig.parent_lookup_block_response(id1, peer_id, None); + rig.expect_penalty(peer_id); + let id2 = rig.expect_block_parent_request(parent_root); // Send the right block this time. - bl.parent_lookup_response::>( - id2, - peer_id, - Some(parent.into()), - D, - &cx, - ); - rig.expect_block_process(response_type); + rig.parent_lookup_block_response(id2, peer_id, Some(parent.into())); + rig.expect_block_process(ResponseType::Block); // Processing succeeds, now the rest of the chain should be sent for processing. - bl.parent_block_processed( - chain_hash, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), - &mut cx, - ); + rig.parent_block_processed_imported(block_root); rig.expect_parent_chain_process(); - let process_result = BatchProcessResult::Success { - was_non_empty: true, - }; - bl.parent_chain_processed(chain_hash, process_result, &cx); - assert_eq!(bl.parent_lookups.len(), 0); + rig.parent_chain_processed_success(block_root); + assert_eq!(rig.active_parent_lookups_count(), 0); } #[test] fn test_parent_lookup_rpc_failure() { - let response_type = ResponseType::Block; - let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + let mut rig = TestRig::test_setup(); - let fork_name = rig - .harness - .spec - .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); - let parent = rig.rand_block(fork_name); - let block = rig.block_with_parent(parent.canonical_root(), fork_name); - let chain_hash = block.canonical_root(); - let peer_id = PeerId::random(); - let block_root = block.canonical_root(); - let parent_root = block.parent_root(); - let slot = block.slot(); + let (parent, block, parent_root, block_root) = rig.rand_block_and_parent(); + let peer_id = rig.new_connected_peer(); // Trigger the request - bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); - let id1 = rig.expect_parent_request(response_type); - // If we're in deneb, a blob request should have been triggered as well, - // we don't require a response because we're generateing 0-blob blocks in this test. - if matches!(fork_name, ForkName::Deneb | ForkName::Electra) { - let _ = rig.expect_parent_request(ResponseType::Blob); - } + rig.trigger_unknown_parent_block(peer_id, block.into()); + let id1 = rig.expect_parent_request_block_and_blobs(parent_root); // The request fails. It should be tried again. - bl.parent_lookup_failed::>( - id1, - peer_id, - &cx, - RPCError::ErrorResponse( - RPCResponseErrorCode::ResourceUnavailable, - "older than deneb".into(), - ), - ); - let id2 = rig.expect_parent_request(response_type); + rig.parent_lookup_failed_unavailable(id1, peer_id); + let id2 = rig.expect_block_parent_request(parent_root); // Send the right block this time. - bl.parent_lookup_response::>( - id2, - peer_id, - Some(parent.into()), - D, - &cx, - ); - rig.expect_block_process(response_type); + rig.parent_lookup_block_response(id2, peer_id, Some(parent.into())); + rig.expect_block_process(ResponseType::Block); // Processing succeeds, now the rest of the chain should be sent for processing. - bl.parent_block_processed( - chain_hash, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), - &mut cx, - ); + rig.parent_block_processed_imported(block_root); rig.expect_parent_chain_process(); - let process_result = BatchProcessResult::Success { - was_non_empty: true, - }; - bl.parent_chain_processed(chain_hash, process_result, &cx); - assert_eq!(bl.parent_lookups.len(), 0); + rig.parent_chain_processed_success(block_root); + assert_eq!(rig.active_parent_lookups_count(), 0); } #[test] fn test_parent_lookup_too_many_attempts() { - let response_type = ResponseType::Block; - let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + let mut rig = TestRig::test_setup(); - let fork_name = rig - .harness - .spec - .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); - let parent = rig.rand_block(fork_name); - let block = rig.block_with_parent(parent.canonical_root(), fork_name); - let peer_id = PeerId::random(); - let block_root = block.canonical_root(); + let block = rig.rand_block(); let parent_root = block.parent_root(); - let slot = block.slot(); + let peer_id = rig.new_connected_peer(); // Trigger the request - bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); + rig.trigger_unknown_parent_block(peer_id, block.into()); for i in 1..=parent_lookup::PARENT_FAIL_TOLERANCE { - let id = rig.expect_parent_request(response_type); - // If we're in deneb, a blob request should have been triggered as well, - // we don't require a response because we're generateing 0-blob blocks in this test. - if matches!(fork_name, ForkName::Deneb | ForkName::Electra) && i == 1 { - let _ = rig.expect_parent_request(ResponseType::Blob); + let id = rig.expect_block_parent_request(parent_root); + // Blobs are only requested in the first iteration as this test only retries blocks + if rig.after_deneb() && i == 1 { + let _ = rig.expect_blob_parent_request(parent_root); } - match i % 2 { - // make sure every error is accounted for - 0 => { - // The request fails. It should be tried again. - bl.parent_lookup_failed::>( - id, - peer_id, - &cx, - RPCError::ErrorResponse( - RPCResponseErrorCode::ResourceUnavailable, - "older than deneb".into(), - ), - ); - } - _ => { - // Send a bad block this time. It should be tried again. - let bad_block = rig.rand_block(fork_name); - bl.parent_lookup_response::>( - id, - peer_id, - Some(bad_block.into()), - D, - &cx, - ); - // Send the stream termination - // Note, previously we would send the same lookup id with a stream terminator, - // we'd ignore it because we'd intrepret it as an unrequested response, since - // we already got one response for the block. I'm not sure what the intent is - // for having this stream terminator line in this test at all. Receiving an invalid - // block and a stream terminator with the same Id now results in two failed attempts, - // I'm unsure if this is how it should behave? - // - bl.parent_lookup_response::>(id, peer_id, None, D, &cx); - rig.expect_penalty(); - } - } - if i < parent_lookup::PARENT_FAIL_TOLERANCE { - assert_eq!( - bl.parent_lookups[0] - .current_parent_request - .block_request_state - .state - .failed_attempts(), - dbg!(i) - ); + if i % 2 == 0 { + // make sure every error is accounted for + // The request fails. It should be tried again. + rig.parent_lookup_failed_unavailable(id, peer_id); + } else { + // Send a bad block this time. It should be tried again. + let bad_block = rig.rand_block(); + rig.parent_lookup_block_response(id, peer_id, Some(bad_block.into())); + // Send the stream termination + + // Note, previously we would send the same lookup id with a stream terminator, + // we'd ignore it because we'd intrepret it as an unrequested response, since + // we already got one response for the block. I'm not sure what the intent is + // for having this stream terminator line in this test at all. Receiving an invalid + // block and a stream terminator with the same Id now results in two failed attempts, + // I'm unsure if this is how it should behave? + // + rig.parent_lookup_block_response(id, peer_id, None); + rig.expect_penalty(peer_id); } } - assert_eq!(bl.parent_lookups.len(), 0); + assert_eq!(rig.active_parent_lookups_count(), 0); } #[test] fn test_parent_lookup_too_many_download_attempts_no_blacklist() { - let response_type = ResponseType::Block; - let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + let mut rig = TestRig::test_setup(); - let fork_name = rig - .harness - .spec - .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); - let parent = rig.rand_block(fork_name); - let block = rig.block_with_parent(parent.canonical_root(), fork_name); - let block_hash = block.canonical_root(); - let peer_id = PeerId::random(); - let block_root = block.canonical_root(); - let parent_root = block.parent_root(); - let slot = block.slot(); + let (parent, block, parent_root, block_root) = rig.rand_block_and_parent(); + let peer_id = rig.new_connected_peer(); // Trigger the request - bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); + rig.trigger_unknown_parent_block(peer_id, block.into()); for i in 1..=parent_lookup::PARENT_FAIL_TOLERANCE { - assert!(!bl.failed_chains.contains(&block_hash)); - let id = rig.expect_parent_request(response_type); - // If we're in deneb, a blob request should have been triggered as well, - // we don't require a response because we're generateing 0-blob blocks in this test. - if matches!(fork_name, ForkName::Deneb | ForkName::Electra) && i == 1 { - let _ = rig.expect_parent_request(ResponseType::Blob); + assert!(!rig.failed_chains_contains(&block_root)); + let id = rig.expect_block_parent_request(parent_root); + // Blobs are only requested in the first iteration as this test only retries blocks + if rig.after_deneb() && i == 1 { + let _ = rig.expect_blob_parent_request(parent_root); } if i % 2 != 0 { // The request fails. It should be tried again. - bl.parent_lookup_failed::>( - id, - peer_id, - &cx, - RPCError::ErrorResponse( - RPCResponseErrorCode::ResourceUnavailable, - "older than deneb".into(), - ), - ); + rig.parent_lookup_failed_unavailable(id, peer_id); } else { // Send a bad block this time. It should be tried again. - let bad_block = rig.rand_block(fork_name); - bl.parent_lookup_response::>( - id, - peer_id, - Some(bad_block.into()), - D, - &cx, - ); - rig.expect_penalty(); - } - if i < parent_lookup::PARENT_FAIL_TOLERANCE { - assert_eq!( - bl.parent_lookups[0] - .current_parent_request - .block_request_state - .state - .failed_attempts(), - dbg!(i) - ); + let bad_block = rig.rand_block(); + rig.parent_lookup_block_response(id, peer_id, Some(bad_block.into())); + rig.expect_penalty(peer_id); } } - assert_eq!(bl.parent_lookups.len(), 0); - assert!(!bl.failed_chains.contains(&block_hash)); - assert!(!bl.failed_chains.contains(&parent.canonical_root())); + assert_eq!(rig.active_parent_lookups_count(), 0); + assert!(!rig.failed_chains_contains(&block_root)); + assert!(!rig.failed_chains_contains(&parent.canonical_root())); } #[test] fn test_parent_lookup_too_many_processing_attempts_must_blacklist() { - let response_type = ResponseType::Block; const PROCESSING_FAILURES: u8 = parent_lookup::PARENT_FAIL_TOLERANCE / 2 + 1; - let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - let fork_name = rig - .harness - .spec - .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); - - let parent = Arc::new(rig.rand_block(fork_name)); - let block = rig.block_with_parent(parent.canonical_root(), fork_name); - let peer_id = PeerId::random(); - let block_root = block.canonical_root(); - let parent_root = block.parent_root(); - let slot = block.slot(); + let mut rig = TestRig::test_setup(); + let (parent, block, parent_root, block_root) = rig.rand_block_and_parent(); + let peer_id = rig.new_connected_peer(); // Trigger the request - bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); + rig.trigger_unknown_parent_block(peer_id, block.into()); // Fail downloading the block for i in 0..(parent_lookup::PARENT_FAIL_TOLERANCE - PROCESSING_FAILURES) { - let id = rig.expect_parent_request(response_type); - // If we're in deneb, a blob request should have been triggered as well, - // we don't require a response because we're generateing 0-blob blocks in this test. - if matches!(fork_name, ForkName::Deneb | ForkName::Electra) && i == 0 { - let _ = rig.expect_parent_request(ResponseType::Blob); + let id = rig.expect_block_parent_request(parent_root); + // Blobs are only requested in the first iteration as this test only retries blocks + if rig.after_deneb() && i == 0 { + let _ = rig.expect_blob_parent_request(parent_root); } // The request fails. It should be tried again. - bl.parent_lookup_failed::>( - id, - peer_id, - &cx, - RPCError::ErrorResponse( - RPCResponseErrorCode::ResourceUnavailable, - "older than deneb".into(), - ), - ); + rig.parent_lookup_failed_unavailable(id, peer_id); } // Now fail processing a block in the parent request for i in 0..PROCESSING_FAILURES { - let id = dbg!(rig.expect_parent_request(response_type)); - if matches!(fork_name, ForkName::Deneb | ForkName::Electra) && i != 0 { - let _ = rig.expect_parent_request(ResponseType::Blob); + let id = rig.expect_block_parent_request(parent_root); + // Blobs are only requested in the first iteration as this test only retries blocks + if rig.after_deneb() && i != 0 { + let _ = rig.expect_blob_parent_request(parent_root); } - // If we're in deneb, a blob request should have been triggered as well, - // we don't require a response because we're generateing 0-blob blocks in this test. - assert!(!bl.failed_chains.contains(&block_root)); + assert!(!rig.failed_chains_contains(&block_root)); // send the right parent but fail processing - bl.parent_lookup_response::>( - id, - peer_id, - Some(parent.clone()), - D, - &cx, - ); - bl.parent_block_processed(block_root, BlockError::InvalidSignature.into(), &mut cx); - bl.parent_lookup_response::>(id, peer_id, None, D, &cx); - rig.expect_penalty(); + rig.parent_lookup_block_response(id, peer_id, Some(parent.clone().into())); + rig.parent_block_processed(block_root, BlockError::InvalidSignature.into()); + rig.parent_lookup_block_response(id, peer_id, None); + rig.expect_penalty(peer_id); } - assert!(bl.failed_chains.contains(&block_root)); - assert_eq!(bl.parent_lookups.len(), 0); + assert!(rig.failed_chains_contains(&block_root)); + assert_eq!(rig.active_parent_lookups_count(), 0); } #[test] fn test_parent_lookup_too_deep() { - let response_type = ResponseType::Block; - let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - let fork_name = rig - .harness - .spec - .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); - let mut blocks = - Vec::>>::with_capacity(parent_lookup::PARENT_DEPTH_TOLERANCE); - while blocks.len() < parent_lookup::PARENT_DEPTH_TOLERANCE { - let parent = blocks - .last() - .map(|b| b.canonical_root()) - .unwrap_or_else(Hash256::random); - let block = Arc::new(rig.block_with_parent(parent, fork_name)); - blocks.push(block); - } + let mut rig = TestRig::test_setup(); + let mut blocks = rig.rand_blockchain(parent_lookup::PARENT_DEPTH_TOLERANCE); - let peer_id = PeerId::random(); + let peer_id = rig.new_connected_peer(); let trigger_block = blocks.pop().unwrap(); let chain_hash = trigger_block.canonical_root(); - let trigger_block_root = trigger_block.canonical_root(); - let trigger_parent_root = trigger_block.parent_root(); - let trigger_slot = trigger_block.slot(); - bl.search_parent( - trigger_slot, - trigger_block_root, - trigger_parent_root, - peer_id, - &mut cx, - ); + rig.trigger_unknown_parent_block(peer_id, trigger_block); for block in blocks.into_iter().rev() { - let id = rig.expect_parent_request(response_type); - // If we're in deneb, a blob request should have been triggered as well, - // we don't require a response because we're generateing 0-blob blocks in this test. - if matches!(fork_name, ForkName::Deneb | ForkName::Electra) { - let _ = rig.expect_parent_request(ResponseType::Blob); - } + let id = rig.expect_parent_request_block_and_blobs(block.canonical_root()); // the block - bl.parent_lookup_response::>( - id, - peer_id, - Some(block.clone()), - D, - &cx, - ); + rig.parent_lookup_block_response(id, peer_id, Some(block.clone())); // the stream termination - bl.parent_lookup_response::>(id, peer_id, None, D, &cx); + rig.parent_lookup_block_response(id, peer_id, None); // the processing request - rig.expect_block_process(response_type); + rig.expect_block_process(ResponseType::Block); // the processing result - bl.parent_block_processed( + rig.parent_block_processed( chain_hash, BlockError::ParentUnknown(RpcBlock::new_without_blobs(None, block)).into(), - &mut cx, ) } - rig.expect_penalty(); - assert!(bl.failed_chains.contains(&chain_hash)); + rig.expect_penalty(peer_id); + assert!(rig.failed_chains_contains(&chain_hash)); } #[test] fn test_parent_lookup_disconnection() { - let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - let peer_id = PeerId::random(); - let fork_name = rig - .harness - .spec - .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); - let trigger_block = rig.rand_block(fork_name); - let trigger_block_root = trigger_block.canonical_root(); - let trigger_parent_root = trigger_block.parent_root(); - let trigger_slot = trigger_block.slot(); - bl.search_parent( - trigger_slot, - trigger_block_root, - trigger_parent_root, - peer_id, - &mut cx, - ); + let mut rig = TestRig::test_setup(); + let peer_id = rig.new_connected_peer(); + let trigger_block = rig.rand_block(); + rig.trigger_unknown_parent_block(peer_id, trigger_block.into()); - bl.peer_disconnected(&peer_id, &mut cx); - assert!(bl.parent_lookups.is_empty()); + rig.peer_disconnected(peer_id); + assert_eq!(rig.active_parent_lookups_count(), 0); } #[test] fn test_single_block_lookup_ignored_response() { - let response_type = ResponseType::Block; - let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + let mut rig = TestRig::test_setup(); - let fork_name = rig - .harness - .spec - .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); - let block = rig.rand_block(fork_name); - let peer_id = PeerId::random(); + let block = rig.rand_block(); + let peer_id = rig.new_connected_peer(); // Trigger the request - bl.search_block(block.canonical_root(), &[peer_id], &mut cx); - let id = rig.expect_lookup_request(response_type); - // If we're in deneb, a blob request should have been triggered as well, - // we don't require a response because we're generateing 0-blob blocks in this test. - if matches!(fork_name, ForkName::Deneb | ForkName::Electra) { - let _ = rig.expect_lookup_request(ResponseType::Blob); - } + rig.trigger_unknown_block_from_attestation(block.canonical_root(), peer_id); + let id = rig.expect_lookup_request_block_and_blobs(block.canonical_root()); // The peer provides the correct block, should not be penalized. Now the block should be sent // for processing. - bl.single_lookup_response::>( - id, - peer_id, - Some(block.into()), - D, - &cx, - ); + rig.single_lookup_block_response(id, peer_id, Some(block.into())); rig.expect_empty_network(); - rig.expect_block_process(response_type); + rig.expect_block_process(ResponseType::Block); // The request should still be active. - assert_eq!(bl.single_block_lookups.len(), 1); + assert_eq!(rig.active_single_lookups_count(), 1); // Send the stream termination. Peer should have not been penalized, and the request removed // after processing. - bl.single_lookup_response::>(id, peer_id, None, D, &cx); + rig.single_lookup_block_response(id, peer_id, None); // Send an Ignored response, the request should be dropped - bl.single_block_component_processed::>( - id.id, - BlockProcessingResult::Ignored, - &mut cx, - ); + rig.single_block_component_processed(id, BlockProcessingResult::Ignored); rig.expect_empty_network(); - assert_eq!(bl.single_block_lookups.len(), 0); + assert_eq!(rig.active_single_lookups_count(), 0); } #[test] fn test_parent_lookup_ignored_response() { - let response_type = ResponseType::Block; - let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); + let mut rig = TestRig::test_setup(); - let fork_name = rig - .harness - .spec - .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); - let parent = rig.rand_block(fork_name); - let block = rig.block_with_parent(parent.canonical_root(), fork_name); - let chain_hash = block.canonical_root(); - let peer_id = PeerId::random(); - let block_root = block.canonical_root(); - let parent_root = block.parent_root(); - let slot = block.slot(); + let (parent, block, parent_root, block_root) = rig.rand_block_and_parent(); + let peer_id = rig.new_connected_peer(); // Trigger the request - bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); - let id = rig.expect_parent_request(response_type); - - // If we're in deneb, a blob request should have been triggered as well, - // we don't require a response because we're generateing 0-blob blocks in this test. - if matches!(fork_name, ForkName::Deneb | ForkName::Electra) { - let _ = rig.expect_parent_request(ResponseType::Blob); - } + rig.trigger_unknown_parent_block(peer_id, block.into()); + let id = rig.expect_parent_request_block_and_blobs(parent_root); // Peer sends the right block, it should be sent for processing. Peer should not be penalized. - bl.parent_lookup_response::>( - id, - peer_id, - Some(parent.into()), - D, - &cx, - ); - rig.expect_block_process(response_type); + rig.parent_lookup_block_response(id, peer_id, Some(parent.into())); + rig.expect_block_process(ResponseType::Block); rig.expect_empty_network(); // Return an Ignored result. The request should be dropped - bl.parent_block_processed(chain_hash, BlockProcessingResult::Ignored, &mut cx); + rig.parent_block_processed(block_root, BlockProcessingResult::Ignored); rig.expect_empty_network(); - assert_eq!(bl.parent_lookups.len(), 0); + assert_eq!(rig.active_parent_lookups_count(), 0); } /// This is a regression test. #[test] fn test_same_chain_race_condition() { - let response_type = ResponseType::Block; - let (mut bl, mut cx, mut rig) = TestRig::test_setup(true); + let mut rig = TestRig::test_setup(); - let fork_name = rig - .harness - .spec - .fork_name_at_slot::(rig.harness.chain.slot().unwrap()); - #[track_caller] - fn parent_lookups_consistency(bl: &BlockLookups) { - let hashes: Vec<_> = bl - .parent_lookups - .iter() - .map(|req| req.chain_hash()) - .collect(); - let expected = hashes.len(); - assert_eq!( - expected, - hashes - .into_iter() - .collect::>() - .len(), - "duplicated chain hashes in parent queue" - ) - } // if we use one or two blocks it will match on the hash or the parent hash, so make a longer // chain. let depth = 4; - let mut blocks = Vec::>>::with_capacity(depth); - while blocks.len() < depth { - let parent = blocks - .last() - .map(|b| b.canonical_root()) - .unwrap_or_else(Hash256::random); - let block = Arc::new(rig.block_with_parent(parent, fork_name)); - blocks.push(block); - } - - let peer_id = PeerId::random(); + let mut blocks = rig.rand_blockchain(depth); + let peer_id = rig.new_connected_peer(); let trigger_block = blocks.pop().unwrap(); let chain_hash = trigger_block.canonical_root(); - let trigger_block_root = trigger_block.canonical_root(); - let trigger_parent_root = trigger_block.parent_root(); - let trigger_slot = trigger_block.slot(); - bl.search_parent( - trigger_slot, - trigger_block_root, - trigger_parent_root, - peer_id, - &mut cx, - ); + rig.trigger_unknown_parent_block(peer_id, trigger_block.clone()); for (i, block) in blocks.into_iter().rev().enumerate() { - let id = rig.expect_parent_request(response_type); - // If we're in deneb, a blob request should have been triggered as well, - // we don't require a response because we're generateing 0-blob blocks in this test. - if matches!(fork_name, ForkName::Deneb | ForkName::Electra) { - let _ = rig.expect_parent_request(ResponseType::Blob); - } + let id = rig.expect_parent_request_block_and_blobs(block.canonical_root()); // the block - bl.parent_lookup_response::>( - id, - peer_id, - Some(block.clone()), - D, - &cx, - ); + rig.parent_lookup_block_response(id, peer_id, Some(block.clone())); // the stream termination - bl.parent_lookup_response::>(id, peer_id, None, D, &cx); + rig.parent_lookup_block_response(id, peer_id, None); // the processing request - rig.expect_block_process(response_type); + rig.expect_block_process(ResponseType::Block); // the processing result if i + 2 == depth { // one block was removed - bl.parent_block_processed( + rig.parent_block_processed( chain_hash, BlockError::BlockIsAlreadyKnown(block.canonical_root()).into(), - &mut cx, ) } else { - bl.parent_block_processed( + rig.parent_block_processed( chain_hash, BlockError::ParentUnknown(RpcBlock::new_without_blobs(None, block)).into(), - &mut cx, ) } - parent_lookups_consistency(&bl) + rig.assert_parent_lookups_consistency(); } // Processing succeeds, now the rest of the chain should be sent for processing. rig.expect_parent_chain_process(); // Try to get this block again while the chain is being processed. We should not request it again. - let peer_id = PeerId::random(); - let trigger_block_root = trigger_block.canonical_root(); - let trigger_parent_root = trigger_block.parent_root(); - let trigger_slot = trigger_block.slot(); - bl.search_parent( - trigger_slot, - trigger_block_root, - trigger_parent_root, - peer_id, - &mut cx, - ); - parent_lookups_consistency(&bl); + let peer_id = rig.new_connected_peer(); + rig.trigger_unknown_parent_block(peer_id, trigger_block); + rig.assert_parent_lookups_consistency(); - let process_result = BatchProcessResult::Success { - was_non_empty: true, - }; - bl.parent_chain_processed(chain_hash, process_result, &cx); - assert_eq!(bl.parent_lookups.len(), 0); + rig.parent_chain_processed_success(chain_hash); + assert_eq!(rig.active_parent_lookups_count(), 0); } mod deneb_only { use super::*; + use crate::sync::SyncMessage; use beacon_chain::data_availability_checker::AvailabilityCheckError; use ssz_types::VariableList; - use std::ops::IndexMut; - use std::str::FromStr; struct DenebTester { - bl: BlockLookups, - cx: SyncNetworkContext, rig: TestRig, block: Arc>, blobs: Vec>>, @@ -1204,15 +1116,10 @@ mod deneb_only { impl DenebTester { fn new(request_trigger: RequestTrigger) -> Option { - let fork_name = get_fork_name(); - if !matches!(fork_name, ForkName::Deneb) { + let Some(mut rig) = TestRig::test_setup_after_deneb() else { return None; - } - let (mut bl, mut cx, mut rig) = TestRig::test_setup(false); - rig.harness.chain.slot_clock.set_slot( - E::slots_per_epoch() * rig.harness.spec.deneb_fork_epoch.unwrap().as_u64(), - ); - let (block, blobs) = rig.rand_block_and_blobs(fork_name, NumBlobs::Random); + }; + let (block, blobs) = rig.rand_block_and_blobs(NumBlobs::Random); let mut block = Arc::new(block); let mut blobs = blobs.into_iter().map(Arc::new).collect::>(); let slot = block.slot(); @@ -1230,7 +1137,7 @@ mod deneb_only { // Create the next block. let (child_block, child_blobs) = - rig.block_with_parent_and_blobs(parent_root, get_fork_name(), NumBlobs::Random); + rig.block_with_parent_and_blobs(parent_root, NumBlobs::Random); let mut child_block = Arc::new(child_block); let mut child_blobs = child_blobs.into_iter().map(Arc::new).collect::>(); @@ -1239,32 +1146,32 @@ mod deneb_only { std::mem::swap(&mut child_blobs, &mut blobs); } let block_root = block.canonical_root(); - let parent_root = block.parent_root(); - let peer_id = PeerId::random(); + let peer_id = rig.new_connected_peer(); // Trigger the request let (block_req_id, blob_req_id, parent_block_req_id, parent_blob_req_id) = match request_trigger { RequestTrigger::AttestationUnknownBlock => { - bl.search_block(block_root, &[peer_id], &mut cx); - let block_req_id = rig.expect_lookup_request(ResponseType::Block); - let blob_req_id = rig.expect_lookup_request(ResponseType::Blob); + rig.send_sync_message(SyncMessage::UnknownBlockHashFromAttestation( + peer_id, block_root, + )); + let block_req_id = rig.expect_block_lookup_request(block_root); + let blob_req_id = rig.expect_blob_lookup_request(block_root); (Some(block_req_id), Some(blob_req_id), None, None) } RequestTrigger::GossipUnknownParentBlock { .. } => { - bl.search_child_block( + rig.send_sync_message(SyncMessage::UnknownParentBlock( + peer_id, + RpcBlock::new_without_blobs(Some(block_root), block.clone()), block_root, - ChildComponents::new(block_root, Some(block.clone()), None), - &[peer_id], - &mut cx, - ); + )); - let blob_req_id = rig.expect_lookup_request(ResponseType::Blob); - rig.expect_empty_network(); // expect no block request - bl.search_parent(slot, block_root, parent_root, peer_id, &mut cx); - let parent_block_req_id = rig.expect_parent_request(ResponseType::Block); - let parent_blob_req_id = rig.expect_parent_request(ResponseType::Blob); + let parent_root = block.parent_root(); + let blob_req_id = rig.expect_blob_lookup_request(block_root); + let parent_block_req_id = rig.expect_block_parent_request(parent_root); + let parent_blob_req_id = rig.expect_blob_parent_request(parent_root); + rig.expect_empty_network(); // expect no more requests ( None, Some(blob_req_id), @@ -1274,23 +1181,14 @@ mod deneb_only { } RequestTrigger::GossipUnknownParentBlob { .. } => { let single_blob = blobs.first().cloned().unwrap(); - let child_root = single_blob.block_root(); + let parent_root = single_blob.block_parent_root(); + rig.send_sync_message(SyncMessage::UnknownParentBlob(peer_id, single_blob)); - let mut lookup_blobs = FixedBlobSidecarList::default(); - *lookup_blobs.index_mut(0) = Some(single_blob); - bl.search_child_block( - child_root, - ChildComponents::new(child_root, None, Some(lookup_blobs)), - &[peer_id], - &mut cx, - ); - - let block_req_id = rig.expect_lookup_request(ResponseType::Block); - let blobs_req_id = rig.expect_lookup_request(ResponseType::Blob); - rig.expect_empty_network(); // expect no block request - bl.search_parent(slot, child_root, parent_root, peer_id, &mut cx); - let parent_block_req_id = rig.expect_parent_request(ResponseType::Block); - let parent_blob_req_id = rig.expect_parent_request(ResponseType::Blob); + let block_req_id = rig.expect_block_lookup_request(block_root); + let blobs_req_id = rig.expect_blob_lookup_request(block_root); + let parent_block_req_id = rig.expect_block_parent_request(parent_root); + let parent_blob_req_id = rig.expect_blob_parent_request(parent_root); + rig.expect_empty_network(); // expect no more requests ( Some(block_req_id), Some(blobs_req_id), @@ -1301,8 +1199,6 @@ mod deneb_only { }; Some(Self { - bl, - cx, rig, block, blobs, @@ -1324,15 +1220,13 @@ mod deneb_only { self.rig.expect_empty_network(); let block = self.parent_block.pop_front().unwrap().clone(); let _ = self.unknown_parent_block.insert(block.clone()); - self.bl.parent_lookup_response::>( + self.rig.parent_lookup_block_response( self.parent_block_req_id.expect("parent request id"), self.peer_id, Some(block), - D, - &self.cx, ); - assert_eq!(self.bl.parent_lookups.len(), 1); + assert_eq!(self.rig.active_parent_lookups_count(), 1); self } @@ -1340,24 +1234,18 @@ mod deneb_only { let blobs = self.parent_blobs.pop_front().unwrap(); let _ = self.unknown_parent_blobs.insert(blobs.clone()); for blob in &blobs { - self.bl - .parent_lookup_response::>( - self.parent_blob_req_id.expect("parent blob request id"), - self.peer_id, - Some(blob.clone()), - D, - &self.cx, - ); - assert_eq!(self.bl.parent_lookups.len(), 1); - } - self.bl - .parent_lookup_response::>( - self.parent_blob_req_id.expect("blob request id"), + self.rig.parent_lookup_blob_response( + self.parent_blob_req_id.expect("parent blob request id"), self.peer_id, - None, - D, - &self.cx, + Some(blob.clone()), ); + assert_eq!(self.rig.active_parent_lookups_count(), 1); + } + self.rig.parent_lookup_blob_response( + self.parent_blob_req_id.expect("blob request id"), + self.peer_id, + None, + ); self } @@ -1367,48 +1255,39 @@ mod deneb_only { me.rig.expect_block_process(ResponseType::Block); // The request should still be active. - assert_eq!(me.bl.single_block_lookups.len(), 1); + assert_eq!(me.rig.active_single_lookups_count(), 1); me } fn block_response(mut self) -> Self { // The peer provides the correct block, should not be penalized. Now the block should be sent // for processing. - self.bl - .single_lookup_response::>( - self.block_req_id.expect("block request id"), - self.peer_id, - Some(self.block.clone()), - D, - &self.cx, - ); + self.rig.single_lookup_block_response( + self.block_req_id.expect("block request id"), + self.peer_id, + Some(self.block.clone()), + ); self.rig.expect_empty_network(); // The request should still be active. - assert_eq!(self.bl.single_block_lookups.len(), 1); + assert_eq!(self.rig.active_single_lookups_count(), 1); self } fn blobs_response(mut self) -> Self { for blob in &self.blobs { - self.bl - .single_lookup_response::>( - self.blob_req_id.expect("blob request id"), - self.peer_id, - Some(blob.clone()), - D, - &self.cx, - ); - assert_eq!(self.bl.single_block_lookups.len(), 1); - } - self.bl - .single_lookup_response::>( + self.rig.single_lookup_blob_response( self.blob_req_id.expect("blob request id"), self.peer_id, - None, - D, - &self.cx, + Some(blob.clone()), ); + assert_eq!(self.rig.active_single_lookups_count(), 1); + } + self.rig.single_lookup_blob_response( + self.blob_req_id.expect("blob request id"), + self.peer_id, + None, + ); self } @@ -1426,183 +1305,171 @@ mod deneb_only { } fn empty_block_response(mut self) -> Self { - self.bl - .single_lookup_response::>( - self.block_req_id.expect("block request id"), - self.peer_id, - None, - D, - &self.cx, - ); + self.rig.single_lookup_block_response( + self.block_req_id.expect("block request id"), + self.peer_id, + None, + ); self } fn empty_blobs_response(mut self) -> Self { - self.bl - .single_lookup_response::>( - self.blob_req_id.expect("blob request id"), - self.peer_id, - None, - D, - &self.cx, - ); + self.rig.single_lookup_blob_response( + self.blob_req_id.expect("blob request id"), + self.peer_id, + None, + ); self } fn empty_parent_block_response(mut self) -> Self { - self.bl.parent_lookup_response::>( + self.rig.parent_lookup_block_response( self.parent_block_req_id.expect("block request id"), self.peer_id, None, - D, - &self.cx, ); self } fn empty_parent_blobs_response(mut self) -> Self { - self.bl - .parent_lookup_response::>( - self.parent_blob_req_id.expect("blob request id"), - self.peer_id, - None, - D, - &self.cx, - ); + self.rig.parent_lookup_blob_response( + self.parent_blob_req_id.expect("blob request id"), + self.peer_id, + None, + ); self } fn block_imported(mut self) -> Self { // Missing blobs should be the request is not removed, the outstanding blobs request should // mean we do not send a new request. - self.bl - .single_block_component_processed::>( - self.block_req_id.expect("block request id").id, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported( - self.block_root, - )), - &mut self.cx, - ); + self.rig.single_block_component_processed( + self.block_req_id.expect("block request id"), + BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(self.block_root)), + ); self.rig.expect_empty_network(); - assert_eq!(self.bl.single_block_lookups.len(), 0); + assert_eq!(self.rig.active_single_lookups_count(), 0); self } fn parent_block_imported(mut self) -> Self { - self.bl.parent_block_processed( + self.rig.parent_block_processed( self.block_root, BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(self.block_root)), - &mut self.cx, ); self.rig.expect_empty_network(); - assert_eq!(self.bl.parent_lookups.len(), 0); + assert_eq!(self.rig.active_parent_lookups_count(), 0); self } fn parent_block_unknown_parent(mut self) -> Self { let block = self.unknown_parent_block.take().unwrap(); + // Now this block is the one we expect requests from + self.block = block.clone(); let block = RpcBlock::new( Some(block.canonical_root()), block, self.unknown_parent_blobs.take().map(VariableList::from), ) .unwrap(); - self.bl.parent_block_processed( + self.rig.parent_block_processed( self.block_root, BlockProcessingResult::Err(BlockError::ParentUnknown(block)), - &mut self.cx, ); - assert_eq!(self.bl.parent_lookups.len(), 1); + assert_eq!(self.rig.active_parent_lookups_count(), 1); self } fn invalid_parent_processed(mut self) -> Self { - self.bl.parent_block_processed( + self.rig.parent_block_processed( self.block_root, BlockProcessingResult::Err(BlockError::ProposalSignatureInvalid), - &mut self.cx, ); - assert_eq!(self.bl.parent_lookups.len(), 1); + assert_eq!(self.rig.active_parent_lookups_count(), 1); self } fn invalid_block_processed(mut self) -> Self { - self.bl - .single_block_component_processed::>( - self.block_req_id.expect("block request id").id, - BlockProcessingResult::Err(BlockError::ProposalSignatureInvalid), - &mut self.cx, - ); - assert_eq!(self.bl.single_block_lookups.len(), 1); + self.rig.single_block_component_processed( + self.block_req_id.expect("block request id"), + BlockProcessingResult::Err(BlockError::ProposalSignatureInvalid), + ); + assert_eq!(self.rig.active_single_lookups_count(), 1); self } fn invalid_blob_processed(mut self) -> Self { - self.bl - .single_block_component_processed::>( - self.blob_req_id.expect("blob request id").id, - BlockProcessingResult::Err(BlockError::AvailabilityCheck( - AvailabilityCheckError::KzgVerificationFailed, - )), - &mut self.cx, - ); - assert_eq!(self.bl.single_block_lookups.len(), 1); + self.rig.single_block_component_processed( + self.blob_req_id.expect("blob request id"), + BlockProcessingResult::Err(BlockError::AvailabilityCheck( + AvailabilityCheckError::KzgVerificationFailed, + )), + ); + assert_eq!(self.rig.active_single_lookups_count(), 1); self } fn missing_components_from_block_request(mut self) -> Self { - self.bl - .single_block_component_processed::>( - self.block_req_id.expect("block request id").id, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( - self.slot, - self.block_root, - )), - &mut self.cx, - ); - assert_eq!(self.bl.single_block_lookups.len(), 1); + self.rig.single_block_component_processed( + self.block_req_id.expect("block request id"), + BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( + self.slot, + self.block_root, + )), + ); + assert_eq!(self.rig.active_single_lookups_count(), 1); self } fn missing_components_from_blob_request(mut self) -> Self { - self.bl - .single_block_component_processed::>( - self.blob_req_id.expect("blob request id").id, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( - self.slot, - self.block_root, - )), - &mut self.cx, - ); - assert_eq!(self.bl.single_block_lookups.len(), 1); + self.rig.single_blob_component_processed( + self.blob_req_id.expect("blob request id"), + BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( + self.slot, + self.block_root, + )), + ); + assert_eq!(self.rig.active_single_lookups_count(), 1); self } fn expect_penalty(mut self) -> Self { - self.rig.expect_penalty(); + self.rig.expect_penalty(self.peer_id); self } fn expect_no_penalty(mut self) -> Self { self.rig.expect_empty_network(); self } + fn expect_no_penalty_and_no_requests(mut self) -> Self { + self.rig.expect_empty_network(); + self + } fn expect_block_request(mut self) -> Self { - let id = self.rig.expect_lookup_request(ResponseType::Block); + let id = self + .rig + .expect_block_lookup_request(self.block.canonical_root()); self.block_req_id = Some(id); self } fn expect_blobs_request(mut self) -> Self { - let id = self.rig.expect_lookup_request(ResponseType::Blob); + let id = self + .rig + .expect_blob_lookup_request(self.block.canonical_root()); self.blob_req_id = Some(id); self } fn expect_parent_block_request(mut self) -> Self { - let id = self.rig.expect_parent_request(ResponseType::Block); + let id = self + .rig + .expect_block_parent_request(self.block.parent_root()); self.parent_block_req_id = Some(id); self } fn expect_parent_blobs_request(mut self) -> Self { - let id = self.rig.expect_parent_request(ResponseType::Blob); + let id = self + .rig + .expect_blob_parent_request(self.block.parent_root()); self.parent_blob_req_id = Some(id); self } @@ -1632,30 +1499,12 @@ mod deneb_only { self } fn search_parent_dup(mut self) -> Self { - self.bl.search_parent( - self.slot, - self.block_root, - self.block.parent_root(), - self.peer_id, - &mut self.cx, - ); + self.rig + .trigger_unknown_parent_block(self.peer_id, self.block.clone()); self } } - fn get_fork_name() -> ForkName { - ForkName::from_str( - &std::env::var(beacon_chain::test_utils::FORK_NAME_ENV_VAR).unwrap_or_else(|e| { - panic!( - "{} env var must be defined when using fork_from_env: {:?}", - beacon_chain::test_utils::FORK_NAME_ENV_VAR, - e - ) - }), - ) - .unwrap() - } - #[test] fn single_block_and_blob_lookup_block_returned_first_attestation() { let Some(tester) = DenebTester::new(RequestTrigger::AttestationUnknownBlock) else { @@ -1727,9 +1576,7 @@ mod deneb_only { tester .blobs_response() .blobs_response_was_valid() - .expect_no_penalty() - .expect_no_block_request() - .expect_no_blobs_request() + .expect_no_penalty_and_no_requests() .missing_components_from_blob_request() .empty_block_response() .expect_penalty() @@ -1751,9 +1598,7 @@ mod deneb_only { .expect_no_blobs_request() .blobs_response() .missing_components_from_blob_request() - .expect_no_penalty() - .expect_no_block_request() - .expect_no_block_request(); + .expect_no_penalty_and_no_requests(); } #[test] @@ -1814,9 +1659,7 @@ mod deneb_only { .invalidate_blobs_too_few() .blobs_response() .blobs_response_was_valid() - .expect_no_penalty() - .expect_no_blobs_request() - .expect_no_block_request() + .expect_no_penalty_and_no_requests() .block_response_triggering_process(); } @@ -1903,9 +1746,7 @@ mod deneb_only { tester .blobs_response() - .expect_no_penalty() - .expect_no_block_request() - .expect_no_blobs_request() + .expect_no_penalty_and_no_requests() .parent_block_response() .parent_blob_response() .expect_block_process() @@ -1946,9 +1787,7 @@ mod deneb_only { tester .blobs_response() .empty_parent_blobs_response() - .expect_no_penalty() - .expect_no_blobs_request() - .expect_no_block_request() + .expect_no_penalty_and_no_requests() .parent_block_response() .expect_penalty() .expect_parent_blobs_request() @@ -2026,9 +1865,7 @@ mod deneb_only { tester .block_response() - .expect_no_penalty() - .expect_no_block_request() - .expect_no_blobs_request() + .expect_no_penalty_and_no_requests() .parent_block_response() .parent_blob_response() .expect_block_process() @@ -2069,9 +1906,7 @@ mod deneb_only { tester .block_response() .empty_parent_blobs_response() - .expect_no_penalty() - .expect_no_blobs_request() - .expect_no_block_request() + .expect_no_penalty_and_no_requests() .parent_block_response() .expect_penalty() .expect_parent_blobs_request() @@ -2133,4 +1968,29 @@ mod deneb_only { .expect_no_blobs_request() .expect_no_block_request(); } + + #[test] + fn no_peer_penalty_when_rpc_response_already_known_from_gossip() { + let Some(mut r) = TestRig::test_setup_after_deneb() else { + return; + }; + let (block, blobs) = r.rand_block_and_blobs(NumBlobs::Number(2)); + let parent_root = block.parent_root(); + let blob_0 = blobs[0].clone(); + let blob_1 = blobs[1].clone(); + let peer_a = r.new_connected_peer(); + let peer_b = r.new_connected_peer(); + // Send unknown parent block lookup + r.trigger_unknown_parent_block(peer_a, block.into()); + // Expect network request for blobs + let id = r.expect_blob_parent_request(parent_root); + // Peer responses with blob 0 + r.single_lookup_blob_response(id, peer_a, Some(blob_0.into())); + // Blob 1 is received via gossip unknown parent blob from a different peer + r.trigger_unknown_parent_blob(peer_b, blob_1.clone()); + // Original peer sends blob 1 via RPC + r.single_lookup_blob_response(id, peer_a, Some(blob_1.into())); + // Assert no downscore event for original peer + r.expect_no_penalty_for(peer_a); + } } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index e95649806b..6cb354bedd 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -233,24 +233,13 @@ pub fn spawn( ); // create an instance of the SyncManager - let network_globals = beacon_processor.network_globals.clone(); - let mut sync_manager = SyncManager { - chain: beacon_chain.clone(), - input_channel: sync_recv, - network: SyncNetworkContext::new( - network_send, - beacon_processor.clone(), - beacon_chain.clone(), - log.clone(), - ), - range_sync: RangeSync::new(beacon_chain.clone(), log.clone()), - backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals, log.clone()), - block_lookups: BlockLookups::new( - beacon_chain.data_availability_checker.clone(), - log.clone(), - ), - log: log.clone(), - }; + let mut sync_manager = SyncManager::new( + beacon_chain, + network_send, + beacon_processor, + sync_recv, + log.clone(), + ); // spawn the sync manager thread debug!(log, "Sync Manager started"); @@ -258,6 +247,48 @@ pub fn spawn( } impl SyncManager { + pub(crate) fn new( + beacon_chain: Arc>, + network_send: mpsc::UnboundedSender>, + beacon_processor: Arc>, + sync_recv: mpsc::UnboundedReceiver>, + log: slog::Logger, + ) -> Self { + let network_globals = beacon_processor.network_globals.clone(); + Self { + chain: beacon_chain.clone(), + input_channel: sync_recv, + network: SyncNetworkContext::new( + network_send, + beacon_processor.clone(), + beacon_chain.clone(), + log.clone(), + ), + range_sync: RangeSync::new(beacon_chain.clone(), log.clone()), + backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals, log.clone()), + block_lookups: BlockLookups::new( + beacon_chain.data_availability_checker.clone(), + log.clone(), + ), + log: log.clone(), + } + } + + #[cfg(test)] + pub(crate) fn active_single_lookups(&self) -> Vec { + self.block_lookups.active_single_lookups() + } + + #[cfg(test)] + pub(crate) fn active_parent_lookups(&self) -> Vec { + self.block_lookups.active_parent_lookups() + } + + #[cfg(test)] + pub(crate) fn failed_chains_contains(&mut self, chain_hash: &Hash256) -> bool { + self.block_lookups.failed_chains_contains(chain_hash) + } + fn network_globals(&self) -> &NetworkGlobals { self.network.network_globals() } @@ -597,7 +628,7 @@ impl SyncManager { } } - fn handle_message(&mut self, sync_message: SyncMessage) { + pub(crate) fn handle_message(&mut self, sync_message: SyncMessage) { match sync_message { SyncMessage::AddPeer(peer_id, info) => { self.add_peer(peer_id, info); @@ -648,11 +679,12 @@ impl SyncManager { ChildComponents::new(block_root, None, Some(blobs)), ); } - SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash) => { + SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_root) => { // If we are not synced, ignore this block. if self.synced_and_connected(&peer_id) { + debug!(self.log, "Received sync_message"; "message" => "UnknownBlockHashFromAttestation", "block_root" => %block_root); self.block_lookups - .search_block(block_hash, &[peer_id], &mut self.network); + .search_block(block_root, &[peer_id], &mut self.network); } } SyncMessage::Disconnect(peer_id) => { diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 8fbc186a1b..a159eb4541 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -637,7 +637,12 @@ mod tests { let (network_tx, network_rx) = mpsc::unbounded_channel(); let globals = Arc::new(NetworkGlobals::new_test_globals(Vec::new(), &log)); let (network_beacon_processor, beacon_processor_rx) = - NetworkBeaconProcessor::null_for_testing(globals.clone()); + NetworkBeaconProcessor::null_for_testing( + globals.clone(), + chain.clone(), + harness.runtime.task_executor.clone(), + log.clone(), + ); let cx = SyncNetworkContext::new( network_tx, Arc::new(network_beacon_processor),