Add test for ActiveSamplingRequest (#6307)

* Add test for ActiveSamplingRequest

* Fix the column_indexes field from the requested ones to the responded ones

* Fix clippy errors

* Move tests to tests.rs

* Fix unused import

* Fix clippy error

* Merge branch 'unstable' into fork/add-test-for-active-sampling-request

# Conflicts:
#	beacon_node/network/Cargo.toml
#	beacon_node/network/src/sync/sampling.rs

* Merge branch 'unstable' into fork/add-test-for-active-sampling-request
This commit is contained in:
Akihito Nakano
2024-10-04 12:00:32 +09:00
committed by GitHub
parent f3a5e256da
commit 8cf686f5c1
5 changed files with 145 additions and 1 deletions

1
Cargo.lock generated
View File

@@ -5640,6 +5640,7 @@ dependencies = [
"async-channel",
"beacon_chain",
"beacon_processor",
"bls",
"delay_map",
"derivative",
"error-chain",

View File

@@ -15,6 +15,7 @@ eth2 = { workspace = true }
gossipsub = { workspace = true }
eth2_network_config = { workspace = true }
kzg = { workspace = true }
bls = { workspace = true }
[dependencies]
alloy-primitives = { workspace = true }

View File

@@ -310,6 +310,13 @@ impl TestRig {
);
}
fn expect_active_sampling(&mut self, block_root: &Hash256) {
assert!(self
.sync_manager
.active_sampling_requests()
.contains(block_root));
}
fn expect_clean_finished_sampling(&mut self) {
self.expect_empty_network();
self.expect_sampling_result_work();
@@ -1090,6 +1097,11 @@ impl TestRig {
.unwrap_or_else(|e| panic!("Expected sampling result work: {e}"))
}
fn expect_no_work_event(&mut self) {
self.drain_processor_rx();
assert!(self.network_rx_queue.is_empty());
}
fn expect_no_penalty_for(&mut self, peer_id: PeerId) {
self.drain_network_rx();
let downscore_events = self
@@ -1290,6 +1302,16 @@ impl TestRig {
imported: false,
});
}
fn assert_sampling_request_status(
&self,
block_root: Hash256,
ongoing: &Vec<ColumnIndex>,
no_peers: &Vec<ColumnIndex>,
) {
self.sync_manager
.assert_sampling_request_status(block_root, ongoing, no_peers)
}
}
#[test]
@@ -2023,6 +2045,76 @@ fn sampling_avoid_retrying_same_peer() {
r.expect_empty_network();
}
#[test]
fn sampling_batch_requests() {
let Some(mut r) = TestRig::test_setup_after_peerdas() else {
return;
};
let _supernode = r.new_connected_supernode_peer();
let (block, data_columns) = r.rand_block_and_data_columns();
let block_root = block.canonical_root();
r.trigger_sample_block(block_root, block.slot());
// Retrieve the sample request, which should be batched.
let (sync_request_id, column_indexes) = r
.expect_only_data_columns_by_root_requests(block_root, 1)
.pop()
.unwrap();
assert_eq!(column_indexes.len(), SAMPLING_REQUIRED_SUCCESSES);
r.assert_sampling_request_status(block_root, &column_indexes, &vec![]);
// Resolve the request.
r.complete_valid_sampling_column_requests(
vec![(sync_request_id, column_indexes.clone())],
data_columns,
);
r.expect_clean_finished_sampling();
}
#[test]
fn sampling_batch_requests_not_enough_responses_returned() {
let Some(mut r) = TestRig::test_setup_after_peerdas() else {
return;
};
let _supernode = r.new_connected_supernode_peer();
let (block, data_columns) = r.rand_block_and_data_columns();
let block_root = block.canonical_root();
r.trigger_sample_block(block_root, block.slot());
// Retrieve the sample request, which should be batched.
let (sync_request_id, column_indexes) = r
.expect_only_data_columns_by_root_requests(block_root, 1)
.pop()
.unwrap();
assert_eq!(column_indexes.len(), SAMPLING_REQUIRED_SUCCESSES);
// The request status should be set to Sampling.
r.assert_sampling_request_status(block_root, &column_indexes, &vec![]);
// Split the indexes to simulate the case where the supernode doesn't have the requested column.
let (_column_indexes_supernode_does_not_have, column_indexes_to_complete) =
column_indexes.split_at(1);
// Complete the requests but only partially, so a NotEnoughResponsesReturned error occurs.
let data_columns_to_complete = data_columns
.iter()
.filter(|d| column_indexes_to_complete.contains(&d.index))
.cloned()
.collect::<Vec<_>>();
r.complete_data_columns_by_root_request(
(sync_request_id, column_indexes.clone()),
&data_columns_to_complete,
);
// The request status should be set to NoPeers since the supernode, the only peer, returned not enough responses.
r.assert_sampling_request_status(block_root, &vec![], &column_indexes);
// The sampling request stalls.
r.expect_empty_network();
r.expect_no_work_event();
r.expect_active_sampling(&block_root);
}
#[test]
fn custody_lookup_happy_path() {
let Some(mut r) = TestRig::test_setup_after_peerdas() else {

View File

@@ -71,6 +71,9 @@ use std::time::Duration;
use tokio::sync::mpsc;
use types::{BlobSidecar, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot};
#[cfg(test)]
use types::ColumnIndex;
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
/// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a
/// fully sync'd peer.
@@ -334,6 +337,17 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.sampling.active_sampling_requests()
}
#[cfg(test)]
pub(crate) fn assert_sampling_request_status(
&self,
block_root: Hash256,
ongoing: &Vec<ColumnIndex>,
no_peers: &Vec<ColumnIndex>,
) {
self.sampling
.assert_sampling_request_status(block_root, ongoing, no_peers);
}
fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
self.network.network_globals()
}

View File

@@ -42,6 +42,18 @@ impl<T: BeaconChainTypes> Sampling<T> {
self.requests.values().map(|r| r.block_root).collect()
}
#[cfg(test)]
pub fn assert_sampling_request_status(
&self,
block_root: Hash256,
ongoing: &Vec<ColumnIndex>,
no_peers: &Vec<ColumnIndex>,
) {
let requester = SamplingRequester::ImportedBlock(block_root);
let active_sampling_request = self.requests.get(&requester).unwrap();
active_sampling_request.assert_sampling_request_status(ongoing, no_peers);
}
/// Create a new sampling request for a known block
///
/// ### Returns
@@ -220,6 +232,21 @@ impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {
}
}
#[cfg(test)]
pub fn assert_sampling_request_status(
&self,
ongoing: &Vec<ColumnIndex>,
no_peers: &Vec<ColumnIndex>,
) {
for idx in ongoing {
assert!(self.column_requests.get(idx).unwrap().is_ongoing());
}
for idx in no_peers {
assert!(self.column_requests.get(idx).unwrap().is_no_peers());
}
}
/// Insert a downloaded column into an active sampling request. Then make progress on the
/// entire request.
///
@@ -253,10 +280,14 @@ impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {
match resp {
Ok((mut resp_data_columns, seen_timestamp)) => {
let resp_column_indexes = resp_data_columns
.iter()
.map(|r| r.index)
.collect::<Vec<_>>();
debug!(self.log,
"Sample download success";
"block_root" => %self.block_root,
"column_indexes" => ?column_indexes,
"column_indexes" => ?resp_column_indexes,
"count" => resp_data_columns.len()
);
metrics::inc_counter_vec(&metrics::SAMPLE_DOWNLOAD_RESULT, &[metrics::SUCCESS]);
@@ -598,6 +629,11 @@ mod request {
}
}
#[cfg(test)]
pub(crate) fn is_no_peers(&self) -> bool {
matches!(self.status, Status::NoPeers)
}
pub(crate) fn choose_peer<T: BeaconChainTypes>(
&mut self,
cx: &SyncNetworkContext<T>,