Add more range sync tests (#6872)

Currently we have very poor coverage of range sync with unit tests. With the event driven test framework we could cover much more ground and be confident when modifying the code.


  Add two basic cases:
- Happy path, complete a finalized sync for 2 epochs
- Post-PeerDAS case where we start without enough custody peers and later we find enough

⚠️  If you have ideas for more test cases, please let me know! I'll write them
This commit is contained in:
Lion - dapplion
2025-02-10 04:55:22 -03:00
committed by GitHub
parent 62a0f25f97
commit d5a03c9d86
9 changed files with 223 additions and 33 deletions

View File

@@ -344,6 +344,16 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.range_sync.state()
}
#[cfg(test)]
pub(crate) fn range_sync_state(&self) -> super::range_sync::SyncChainStatus {
self.range_sync.state()
}
#[cfg(test)]
pub(crate) fn __range_failed_chains(&mut self) -> Vec<Hash256> {
self.range_sync.__failed_chains()
}
#[cfg(test)]
pub(crate) fn get_failed_chains(&mut self) -> Vec<Hash256> {
self.block_lookups.get_failed_chains()
@@ -368,11 +378,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.sampling.get_request_status(block_root, index)
}
#[cfg(test)]
pub(crate) fn range_sync_state(&self) -> super::range_sync::SyncChainStatus {
self.range_sync.state()
}
#[cfg(test)]
pub(crate) fn update_execution_engine_state(&mut self, state: EngineState) {
self.handle_new_execution_engine_state(state);

View File

@@ -477,7 +477,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
.find(|(_, chain)| chain.has_same_target(target_head_slot, target_head_root))
{
Some((&id, chain)) => {
debug!(self.log, "Adding peer to known chain"; "peer_id" => %peer, "sync_type" => ?sync_type, &chain);
debug!(self.log, "Adding peer to known chain"; "peer_id" => %peer, "sync_type" => ?sync_type, "id" => id);
debug_assert_eq!(chain.target_head_root, target_head_root);
debug_assert_eq!(chain.target_head_slot, target_head_slot);
if let Err(remove_reason) = chain.add_peer(network, peer) {

View File

@@ -94,6 +94,11 @@ where
}
}
#[cfg(test)]
pub(crate) fn __failed_chains(&mut self) -> Vec<Hash256> {
self.failed_chains.keys().copied().collect()
}
pub fn state(&self) -> SyncChainStatus {
self.chains.state()
}

View File

@@ -27,6 +27,7 @@ use beacon_chain::{
PayloadVerificationOutcome, PayloadVerificationStatus,
};
use beacon_processor::WorkEvent;
use lighthouse_network::discovery::CombinedKey;
use lighthouse_network::{
rpc::{RPCError, RequestType, RpcErrorResponse},
service::api_types::{
@@ -39,18 +40,16 @@ use lighthouse_network::{
use slog::info;
use slot_clock::{SlotClock, TestingSlotClock};
use tokio::sync::mpsc;
use types::ForkContext;
use types::{
data_column_sidecar::ColumnIndex,
test_utils::{SeedableRng, TestRandom, XorShiftRng},
BeaconState, BeaconStateBase, BlobSidecar, DataColumnSidecar, EthSpec, ForkName, Hash256,
MinimalEthSpec as E, SignedBeaconBlock, Slot,
BeaconState, BeaconStateBase, BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, ForkName,
Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot,
};
const D: Duration = Duration::new(0, 0);
const PARENT_FAIL_TOLERANCE: u8 = SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS;
const SAMPLING_REQUIRED_SUCCESSES: usize = 2;
type DCByRootIds = Vec<DCByRootId>;
type DCByRootId = (SyncRequestId, Vec<ColumnIndex>);
@@ -117,7 +116,9 @@ impl TestRig {
let spec = chain.spec.clone();
let rng = XorShiftRng::from_seed([42; 16]);
// deterministic seed
let rng = ChaCha20Rng::from_seed([0u8; 32]);
TestRig {
beacon_processor_rx,
beacon_processor_rx_queue: vec![],
@@ -154,7 +155,7 @@ impl TestRig {
}
}
fn test_setup_after_fulu() -> Option<Self> {
pub fn test_setup_after_fulu() -> Option<Self> {
let r = Self::test_setup();
if r.fork_name.fulu_enabled() {
Some(r)
@@ -369,20 +370,26 @@ impl TestRig {
}
pub fn new_connected_peer(&mut self) -> PeerId {
let key = self.determinstic_key();
self.network_globals
.peers
.write()
.__add_connected_peer_testing_only(false, &self.harness.spec)
.__add_connected_peer_testing_only(false, &self.harness.spec, key)
}
pub fn new_connected_supernode_peer(&mut self) -> PeerId {
let key = self.determinstic_key();
self.network_globals
.peers
.write()
.__add_connected_peer_testing_only(true, &self.harness.spec)
.__add_connected_peer_testing_only(true, &self.harness.spec, key)
}
fn new_connected_peers_for_peerdas(&mut self) {
fn determinstic_key(&mut self) -> CombinedKey {
k256::ecdsa::SigningKey::random(&mut self.rng).into()
}
pub fn new_connected_peers_for_peerdas(&mut self) {
// Enough sampling peers with few columns
for _ in 0..100 {
self.new_connected_peer();
@@ -1113,7 +1120,7 @@ impl TestRig {
}
#[track_caller]
fn expect_empty_network(&mut self) {
pub fn expect_empty_network(&mut self) {
self.drain_network_rx();
if !self.network_rx_queue.is_empty() {
let n = self.network_rx_queue.len();

View File

@@ -7,12 +7,13 @@ use beacon_chain::eth1_chain::CachingEth1Backend;
use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType};
use beacon_processor::WorkEvent;
use lighthouse_network::NetworkGlobals;
use rand_chacha::ChaCha20Rng;
use slog::Logger;
use slot_clock::ManualSlotClock;
use std::sync::Arc;
use store::MemoryStore;
use tokio::sync::mpsc;
use types::{test_utils::XorShiftRng, ChainSpec, ForkName, MinimalEthSpec as E};
use types::{ChainSpec, ForkName, MinimalEthSpec as E};
mod lookups;
mod range;
@@ -61,7 +62,7 @@ struct TestRig {
/// Beacon chain harness
harness: BeaconChainHarness<EphemeralHarnessType<E>>,
/// `rng` for generating test blocks and blobs.
rng: XorShiftRng,
rng: ChaCha20Rng,
fork_name: ForkName,
log: Logger,
spec: Arc<ChainSpec>,

View File

@@ -1,11 +1,14 @@
use super::*;
use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::status::ToStatusMessage;
use crate::sync::manager::SLOT_IMPORT_TOLERANCE;
use crate::sync::network_context::RangeRequestId;
use crate::sync::range_sync::RangeSyncType;
use crate::sync::SyncMessage;
use beacon_chain::data_column_verification::CustodyDataColumn;
use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy};
use beacon_chain::{block_verification_types::RpcBlock, EngineState, NotifyExecutionLayer};
use beacon_processor::WorkType;
use lighthouse_network::rpc::methods::{
BlobsByRangeRequest, DataColumnsByRangeRequest, OldBlocksByRangeRequest,
OldBlocksByRangeRequestV2,
@@ -18,8 +21,8 @@ use lighthouse_network::service::api_types::{
use lighthouse_network::{PeerId, SyncInfo};
use std::time::Duration;
use types::{
BlobSidecarList, BlockImportSource, EthSpec, Hash256, MinimalEthSpec as E, SignedBeaconBlock,
SignedBeaconBlockHash, Slot,
BlobSidecarList, BlockImportSource, Epoch, EthSpec, Hash256, MinimalEthSpec as E,
SignedBeaconBlock, SignedBeaconBlockHash, Slot,
};
const D: Duration = Duration::new(0, 0);
@@ -43,7 +46,7 @@ enum ByRangeDataRequestIds {
/// To make writting tests succint, the machinery in this testing rig automatically identifies
/// _which_ request to complete. Picking the right request is critical for tests to pass, so this
/// filter allows better expressivity on the criteria to identify the right request.
#[derive(Default)]
#[derive(Default, Debug, Clone)]
struct RequestFilter {
peer: Option<PeerId>,
epoch: Option<u64>,
@@ -74,7 +77,7 @@ impl TestRig {
/// Produce a head peer with an advanced head
fn add_head_peer_with_root(&mut self, head_root: Hash256) -> PeerId {
let local_info = self.local_info();
self.add_peer(SyncInfo {
self.add_random_peer(SyncInfo {
head_root,
head_slot: local_info.head_slot + 1 + Slot::new(SLOT_IMPORT_TOLERANCE as u64),
..local_info
@@ -90,7 +93,7 @@ impl TestRig {
fn add_finalized_peer_with_root(&mut self, finalized_root: Hash256) -> PeerId {
let local_info = self.local_info();
let finalized_epoch = local_info.finalized_epoch + 2;
self.add_peer(SyncInfo {
self.add_random_peer(SyncInfo {
finalized_epoch,
finalized_root,
head_slot: finalized_epoch.start_slot(E::slots_per_epoch()),
@@ -98,6 +101,17 @@ impl TestRig {
})
}
fn finalized_remote_info_advanced_by(&self, advanced_epochs: Epoch) -> SyncInfo {
let local_info = self.local_info();
let finalized_epoch = local_info.finalized_epoch + advanced_epochs;
SyncInfo {
finalized_epoch,
finalized_root: Hash256::random(),
head_slot: finalized_epoch.start_slot(E::slots_per_epoch()),
head_root: Hash256::random(),
}
}
fn local_info(&self) -> SyncInfo {
let StatusMessage {
fork_digest: _,
@@ -114,28 +128,59 @@ impl TestRig {
}
}
fn add_peer(&mut self, remote_info: SyncInfo) -> PeerId {
fn add_random_peer_not_supernode(&mut self, remote_info: SyncInfo) -> PeerId {
let peer_id = self.new_connected_peer();
self.send_sync_message(SyncMessage::AddPeer(peer_id, remote_info));
peer_id
}
fn add_random_peer(&mut self, remote_info: SyncInfo) -> PeerId {
// Create valid peer known to network globals
// TODO(fulu): Using supernode peers to ensure we have peer across all column
// subnets for syncing. Should add tests connecting to full node peers.
let peer_id = self.new_connected_supernode_peer();
// Send peer to sync
self.send_sync_message(SyncMessage::AddPeer(peer_id, remote_info.clone()));
self.send_sync_message(SyncMessage::AddPeer(peer_id, remote_info));
peer_id
}
fn add_random_peers(&mut self, remote_info: SyncInfo, count: usize) {
for _ in 0..count {
let peer = self.new_connected_peer();
self.add_peer(peer, remote_info.clone());
}
}
fn add_peer(&mut self, peer: PeerId, remote_info: SyncInfo) {
self.send_sync_message(SyncMessage::AddPeer(peer, remote_info));
}
fn assert_state(&self, state: RangeSyncType) {
assert_eq!(
self.sync_manager
.range_sync_state()
.expect("State is ok")
.expect("Range should be syncing")
.expect("Range should be syncing, there are no chains")
.0,
state,
"not expected range sync state"
);
}
fn assert_no_chains_exist(&self) {
if let Some(chain) = self.sync_manager.get_range_sync_chains().unwrap() {
panic!("There still exists a chain {chain:?}");
}
}
fn assert_no_failed_chains(&mut self) {
assert_eq!(
self.sync_manager.__range_failed_chains(),
Vec::<Hash256>::new(),
"Expected no failed chains"
)
}
#[track_caller]
fn expect_chain_segments(&mut self, count: usize) {
for i in 0..count {
@@ -170,7 +215,7 @@ impl TestRig {
true
};
let block_req_id = self
let block_req = self
.pop_received_network_event(|ev| match ev {
NetworkMessage::SendRequest {
peer_id,
@@ -182,7 +227,9 @@ impl TestRig {
} if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)),
_ => None,
})
.expect("Should have a blocks by range request");
.unwrap_or_else(|e| {
panic!("Should have a BlocksByRange request, filter {request_filter:?}: {e:?}")
});
let by_range_data_requests = if self.after_fulu() {
let mut data_columns_requests = vec![];
@@ -200,7 +247,7 @@ impl TestRig {
data_columns_requests.push(data_columns_request);
}
if data_columns_requests.is_empty() {
panic!("Found zero DataColumnsByRange requests");
panic!("Found zero DataColumnsByRange requests, filter {request_filter:?}");
}
ByRangeDataRequestIds::PostPeerDAS(data_columns_requests)
} else if self.after_deneb() {
@@ -213,16 +260,21 @@ impl TestRig {
} if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)),
_ => None,
})
.expect("Should have a blobs by range request");
.unwrap_or_else(|e| {
panic!("Should have a blobs by range request, filter {request_filter:?}: {e:?}")
});
ByRangeDataRequestIds::PrePeerDAS(id, peer)
} else {
ByRangeDataRequestIds::PreDeneb
};
(block_req_id, by_range_data_requests)
(block_req, by_range_data_requests)
}
fn find_and_complete_blocks_by_range_request(&mut self, request_filter: RequestFilter) {
fn find_and_complete_blocks_by_range_request(
&mut self,
request_filter: RequestFilter,
) -> RangeRequestId {
let ((blocks_req_id, block_peer), by_range_data_request_ids) =
self.find_blocks_by_range_request(request_filter);
@@ -266,6 +318,60 @@ impl TestRig {
}
}
}
blocks_req_id.parent_request_id.requester
}
fn find_and_complete_processing_chain_segment(&mut self, id: ChainSegmentProcessId) {
self.pop_received_processor_event(|ev| {
(ev.work_type() == WorkType::ChainSegment).then_some(())
})
.unwrap_or_else(|e| panic!("Expected chain segment work event: {e}"));
self.log(&format!(
"Completing ChainSegment processing work {id:?} with success"
));
self.send_sync_message(SyncMessage::BatchProcessed {
sync_type: id,
result: crate::sync::BatchProcessResult::Success {
sent_blocks: 8,
imported_blocks: 8,
},
});
}
fn complete_and_process_range_sync_until(
&mut self,
last_epoch: u64,
request_filter: RequestFilter,
) {
for epoch in 0..last_epoch {
// Note: In this test we can't predict the block peer
let id =
self.find_and_complete_blocks_by_range_request(request_filter.clone().epoch(epoch));
if let RangeRequestId::RangeSync { batch_id, .. } = id {
assert_eq!(batch_id.as_u64(), epoch, "Unexpected batch_id");
} else {
panic!("unexpected RangeRequestId {id:?}");
}
let id = match id {
RangeRequestId::RangeSync { chain_id, batch_id } => {
ChainSegmentProcessId::RangeBatchId(chain_id, batch_id)
}
RangeRequestId::BackfillSync { batch_id } => {
ChainSegmentProcessId::BackSyncBatchId(batch_id)
}
};
self.find_and_complete_processing_chain_segment(id);
if epoch < last_epoch - 1 {
self.assert_state(RangeSyncType::Finalized);
} else {
self.assert_no_chains_exist();
self.assert_no_failed_chains();
}
}
}
async fn create_canonical_block(&mut self) -> (SignedBeaconBlock<E>, Option<DataSidecars<E>>) {
@@ -442,3 +548,65 @@ fn pause_and_resume_on_ee_offline() {
// The head chain and finalized chain (2) should be in the processing queue
rig.expect_chain_segments(2);
}
/// To attempt to finalize the peer's status finalized checkpoint we synced to its finalized epoch +
/// 2 epochs + 1 slot.
const EXTRA_SYNCED_EPOCHS: u64 = 2 + 1;
#[test]
fn finalized_sync_enough_global_custody_peers_few_chain_peers() {
// Run for all forks
let mut r = TestRig::test_setup();
// This test creates enough global custody peers to satisfy column queries but only adds few
// peers to the chain
r.new_connected_peers_for_peerdas();
let advanced_epochs: u64 = 2;
let remote_info = r.finalized_remote_info_advanced_by(advanced_epochs.into());
// Current priorization only sends batches to idle peers, so we need enough peers for each batch
// TODO: Test this with a single peer in the chain, it should still work
r.add_random_peers(
remote_info,
(advanced_epochs + EXTRA_SYNCED_EPOCHS) as usize,
);
r.assert_state(RangeSyncType::Finalized);
let last_epoch = advanced_epochs + EXTRA_SYNCED_EPOCHS;
r.complete_and_process_range_sync_until(last_epoch, filter());
}
#[test]
fn finalized_sync_not_enough_custody_peers_on_start() {
let mut r = TestRig::test_setup();
// Only run post-PeerDAS
if !r.fork_name.fulu_enabled() {
return;
}
let advanced_epochs: u64 = 2;
let remote_info = r.finalized_remote_info_advanced_by(advanced_epochs.into());
// Unikely that the single peer we added has enough columns for us. Tests are determinstic and
// this error should never be hit
r.add_random_peer_not_supernode(remote_info.clone());
r.assert_state(RangeSyncType::Finalized);
// Because we don't have enough peers on all columns we haven't sent any request.
// NOTE: There's a small chance that this single peer happens to custody exactly the set we
// expect, in that case the test will fail. Find a way to make the test deterministic.
r.expect_empty_network();
// Generate enough peers and supernodes to cover all custody columns
r.new_connected_peers_for_peerdas();
// Note: not necessary to add this peers to the chain, as we draw from the global pool
// We still need to add enough peers to trigger batch downloads with idle peers. Same issue as
// the test above.
r.add_random_peers(
remote_info,
(advanced_epochs + EXTRA_SYNCED_EPOCHS - 1) as usize,
);
let last_epoch = advanced_epochs + EXTRA_SYNCED_EPOCHS;
r.complete_and_process_range_sync_until(last_epoch, filter());
}