From 8cf686f5c11cbad19727885d07df5abfddeddb0f Mon Sep 17 00:00:00 2001 From: Akihito Nakano Date: Fri, 4 Oct 2024 12:00:32 +0900 Subject: [PATCH] Add test for ActiveSamplingRequest (#6307) * Add test for ActiveSamplingRequest * Fix the column_indexes field from the requested ones to the responded ones * Fix clippy errors * Move tests to tests.rs * Fix unused import * Fix clippy error * Merge branch 'unstable' into fork/add-test-for-active-sampling-request # Conflicts: # beacon_node/network/Cargo.toml # beacon_node/network/src/sync/sampling.rs * Merge branch 'unstable' into fork/add-test-for-active-sampling-request --- Cargo.lock | 1 + beacon_node/network/Cargo.toml | 1 + .../network/src/sync/block_lookups/tests.rs | 92 +++++++++++++++++++ beacon_node/network/src/sync/manager.rs | 14 +++ beacon_node/network/src/sync/peer_sampling.rs | 38 +++++++- 5 files changed, 145 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 44ca67e9b4..3a063e7e0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5640,6 +5640,7 @@ dependencies = [ "async-channel", "beacon_chain", "beacon_processor", + "bls", "delay_map", "derivative", "error-chain", diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 4df1761732..6d61bffe3d 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -15,6 +15,7 @@ eth2 = { workspace = true } gossipsub = { workspace = true } eth2_network_config = { workspace = true } kzg = { workspace = true } +bls = { workspace = true } [dependencies] alloy-primitives = { workspace = true } diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 151333a2ef..cd4609e147 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -310,6 +310,13 @@ impl TestRig { ); } + fn expect_active_sampling(&mut self, block_root: &Hash256) { + assert!(self + .sync_manager + .active_sampling_requests() + .contains(block_root)); + } + fn expect_clean_finished_sampling(&mut self) { self.expect_empty_network(); self.expect_sampling_result_work(); @@ -1090,6 +1097,11 @@ impl TestRig { .unwrap_or_else(|e| panic!("Expected sampling result work: {e}")) } + fn expect_no_work_event(&mut self) { + self.drain_processor_rx(); + assert!(self.network_rx_queue.is_empty()); + } + fn expect_no_penalty_for(&mut self, peer_id: PeerId) { self.drain_network_rx(); let downscore_events = self @@ -1290,6 +1302,16 @@ impl TestRig { imported: false, }); } + + fn assert_sampling_request_status( + &self, + block_root: Hash256, + ongoing: &Vec, + no_peers: &Vec, + ) { + self.sync_manager + .assert_sampling_request_status(block_root, ongoing, no_peers) + } } #[test] @@ -2023,6 +2045,76 @@ fn sampling_avoid_retrying_same_peer() { r.expect_empty_network(); } +#[test] +fn sampling_batch_requests() { + let Some(mut r) = TestRig::test_setup_after_peerdas() else { + return; + }; + let _supernode = r.new_connected_supernode_peer(); + let (block, data_columns) = r.rand_block_and_data_columns(); + let block_root = block.canonical_root(); + r.trigger_sample_block(block_root, block.slot()); + + // Retrieve the sample request, which should be batched. + let (sync_request_id, column_indexes) = r + .expect_only_data_columns_by_root_requests(block_root, 1) + .pop() + .unwrap(); + assert_eq!(column_indexes.len(), SAMPLING_REQUIRED_SUCCESSES); + r.assert_sampling_request_status(block_root, &column_indexes, &vec![]); + + // Resolve the request. + r.complete_valid_sampling_column_requests( + vec![(sync_request_id, column_indexes.clone())], + data_columns, + ); + r.expect_clean_finished_sampling(); +} + +#[test] +fn sampling_batch_requests_not_enough_responses_returned() { + let Some(mut r) = TestRig::test_setup_after_peerdas() else { + return; + }; + let _supernode = r.new_connected_supernode_peer(); + let (block, data_columns) = r.rand_block_and_data_columns(); + let block_root = block.canonical_root(); + r.trigger_sample_block(block_root, block.slot()); + + // Retrieve the sample request, which should be batched. + let (sync_request_id, column_indexes) = r + .expect_only_data_columns_by_root_requests(block_root, 1) + .pop() + .unwrap(); + assert_eq!(column_indexes.len(), SAMPLING_REQUIRED_SUCCESSES); + + // The request status should be set to Sampling. + r.assert_sampling_request_status(block_root, &column_indexes, &vec![]); + + // Split the indexes to simulate the case where the supernode doesn't have the requested column. + let (_column_indexes_supernode_does_not_have, column_indexes_to_complete) = + column_indexes.split_at(1); + + // Complete the requests but only partially, so a NotEnoughResponsesReturned error occurs. + let data_columns_to_complete = data_columns + .iter() + .filter(|d| column_indexes_to_complete.contains(&d.index)) + .cloned() + .collect::>(); + r.complete_data_columns_by_root_request( + (sync_request_id, column_indexes.clone()), + &data_columns_to_complete, + ); + + // The request status should be set to NoPeers since the supernode, the only peer, returned not enough responses. + r.assert_sampling_request_status(block_root, &vec![], &column_indexes); + + // The sampling request stalls. + r.expect_empty_network(); + r.expect_no_work_event(); + r.expect_active_sampling(&block_root); +} + #[test] fn custody_lookup_happy_path() { let Some(mut r) = TestRig::test_setup_after_peerdas() else { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index c1f4fe54fb..708c4308b8 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -71,6 +71,9 @@ use std::time::Duration; use tokio::sync::mpsc; use types::{BlobSidecar, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot}; +#[cfg(test)] +use types::ColumnIndex; + /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a /// fully sync'd peer. @@ -334,6 +337,17 @@ impl SyncManager { self.sampling.active_sampling_requests() } + #[cfg(test)] + pub(crate) fn assert_sampling_request_status( + &self, + block_root: Hash256, + ongoing: &Vec, + no_peers: &Vec, + ) { + self.sampling + .assert_sampling_request_status(block_root, ongoing, no_peers); + } + fn network_globals(&self) -> &NetworkGlobals { self.network.network_globals() } diff --git a/beacon_node/network/src/sync/peer_sampling.rs b/beacon_node/network/src/sync/peer_sampling.rs index 4d0fa509cd..086fb0ec8d 100644 --- a/beacon_node/network/src/sync/peer_sampling.rs +++ b/beacon_node/network/src/sync/peer_sampling.rs @@ -42,6 +42,18 @@ impl Sampling { self.requests.values().map(|r| r.block_root).collect() } + #[cfg(test)] + pub fn assert_sampling_request_status( + &self, + block_root: Hash256, + ongoing: &Vec, + no_peers: &Vec, + ) { + let requester = SamplingRequester::ImportedBlock(block_root); + let active_sampling_request = self.requests.get(&requester).unwrap(); + active_sampling_request.assert_sampling_request_status(ongoing, no_peers); + } + /// Create a new sampling request for a known block /// /// ### Returns @@ -220,6 +232,21 @@ impl ActiveSamplingRequest { } } + #[cfg(test)] + pub fn assert_sampling_request_status( + &self, + ongoing: &Vec, + no_peers: &Vec, + ) { + for idx in ongoing { + assert!(self.column_requests.get(idx).unwrap().is_ongoing()); + } + + for idx in no_peers { + assert!(self.column_requests.get(idx).unwrap().is_no_peers()); + } + } + /// Insert a downloaded column into an active sampling request. Then make progress on the /// entire request. /// @@ -253,10 +280,14 @@ impl ActiveSamplingRequest { match resp { Ok((mut resp_data_columns, seen_timestamp)) => { + let resp_column_indexes = resp_data_columns + .iter() + .map(|r| r.index) + .collect::>(); debug!(self.log, "Sample download success"; "block_root" => %self.block_root, - "column_indexes" => ?column_indexes, + "column_indexes" => ?resp_column_indexes, "count" => resp_data_columns.len() ); metrics::inc_counter_vec(&metrics::SAMPLE_DOWNLOAD_RESULT, &[metrics::SUCCESS]); @@ -598,6 +629,11 @@ mod request { } } + #[cfg(test)] + pub(crate) fn is_no_peers(&self) -> bool { + matches!(self.status, Status::NoPeers) + } + pub(crate) fn choose_peer( &mut self, cx: &SyncNetworkContext,