diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 724c41cfc9..a6c76beb31 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -421,7 +421,11 @@ pub enum Work { IgnoredRpcBlock { process_fn: BlockingFn, }, - ChainSegment(AsyncFn), + ChainSegment { + process_fn: AsyncFn, + /// (chain_id, batch_epoch) for test observability + process_id: (u32, u64), + }, ChainSegmentBackfill(BlockingFn), Status(BlockingFn), BlocksByRangeRequest(AsyncFn), @@ -1473,7 +1477,7 @@ impl BeaconProcessor { } => task_spawner.spawn_blocking(move || { process_batch(aggregates); }), - Work::ChainSegment(process_fn) => task_spawner.spawn_async(async move { + Work::ChainSegment { process_fn, .. } => task_spawner.spawn_async(async move { process_fn.await; }), Work::UnknownBlockAttestation { process_fn } diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index f74e7dacfb..b3d6874b8a 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -620,11 +620,14 @@ impl NetworkBeaconProcessor { // Back-sync batches are dispatched with a different `Work` variant so // they can be rate-limited. let work = match process_id { - ChainSegmentProcessId::RangeBatchId(_, _) => { + ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => { let process_fn = async move { processor.process_chain_segment(process_id, blocks).await; }; - Work::ChainSegment(Box::pin(process_fn)) + Work::ChainSegment { + process_fn: Box::pin(process_fn), + process_id: (chain_id, epoch.as_u64()), + } } ChainSegmentProcessId::BackSyncBatchId(_) => { let process_fn = diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index cd872df887..a26996ec5e 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -1,16 +1,18 @@ use super::*; use crate::NetworkMessage; -use crate::network_beacon_processor::{InvalidBlockStorage, NetworkBeaconProcessor}; +use crate::network_beacon_processor::{ + ChainSegmentProcessId, InvalidBlockStorage, NetworkBeaconProcessor, +}; use crate::sync::block_lookups::{BlockLookupSummary, PARENT_DEPTH_TOLERANCE}; use crate::sync::{ SyncMessage, - manager::{BlockProcessType, BlockProcessingResult, SyncManager}, + manager::{BatchProcessResult, BlockProcessType, BlockProcessingResult, SyncManager}, }; use beacon_chain::blob_verification::KzgVerifiedBlob; use beacon_chain::block_verification_types::LookupBlock; use beacon_chain::custody_context::NodeCustodyType; use beacon_chain::{ - AvailabilityProcessingStatus, BlockError, NotifyExecutionLayer, + AvailabilityProcessingStatus, BlockError, EngineState, NotifyExecutionLayer, block_verification_types::{AsBlock, AvailableBlockData}, data_availability_checker::Availability, test_utils::{ @@ -23,7 +25,7 @@ use educe::Educe; use itertools::Itertools; use lighthouse_network::discovery::CombinedKey; use lighthouse_network::{ - NetworkConfig, NetworkGlobals, PeerId, + NetworkConfig, NetworkGlobals, PeerAction, PeerId, rpc::{RPCError, RequestType}, service::api_types::{AppRequestId, SyncRequestId}, types::SyncState, @@ -64,14 +66,33 @@ pub struct SimulateConfig { Option Option + Send + Sync>>, // Import a block directly before processing it (for simulating race conditions) import_block_before_process: HashSet, + /// Number of range batch processing attempts that return FaultyFailure + range_faulty_failures: usize, + /// Number of range batch processing attempts that return NonFaultyFailure + range_non_faulty_failures: usize, + /// Number of BlocksByRange requests that return empty (no blocks) + return_no_range_blocks_n_times: usize, + /// Number of DataColumnsByRange requests that return empty (no columns) + return_no_range_columns_n_times: usize, + /// Number of DataColumnsByRange requests that return columns with unrequested indices + return_wrong_range_column_indices_n_times: usize, + /// Number of DataColumnsByRange requests that return columns with unrequested slots + return_wrong_range_column_slots_n_times: usize, + /// Number of DataColumnsByRange requests that return fewer columns than requested + /// (drops half the columns). Triggers CouplingError::DataColumnPeerFailure → retry_partial_batch + return_partial_range_columns_n_times: usize, + /// Set EE offline at start, bring back online after this many BlocksByRange responses + ee_offline_for_n_range_responses: Option, + /// Disconnect all peers after this many successful BlocksByRange responses. + successful_range_responses_before_disconnect: Option, } impl SimulateConfig { - fn new() -> Self { + pub(super) fn new() -> Self { Self::default() } - fn happy_path() -> Self { + pub(super) fn happy_path() -> Self { Self::default() } @@ -111,7 +132,7 @@ impl SimulateConfig { self } - fn return_rpc_error(mut self, error: RPCError) -> Self { + pub(super) fn return_rpc_error(mut self, error: RPCError) -> Self { self.return_rpc_error = Some(error); self } @@ -133,6 +154,51 @@ impl SimulateConfig { self.import_block_before_process.insert(block_root); self } + + pub(super) fn with_range_faulty_failures(mut self, n: usize) -> Self { + self.range_faulty_failures = n; + self + } + + pub(super) fn with_range_non_faulty_failures(mut self, n: usize) -> Self { + self.range_non_faulty_failures = n; + self + } + + pub(super) fn with_no_range_blocks_n_times(mut self, n: usize) -> Self { + self.return_no_range_blocks_n_times = n; + self + } + + pub(super) fn with_no_range_columns_n_times(mut self, n: usize) -> Self { + self.return_no_range_columns_n_times = n; + self + } + + pub(super) fn with_wrong_range_column_indices_n_times(mut self, n: usize) -> Self { + self.return_wrong_range_column_indices_n_times = n; + self + } + + pub(super) fn with_wrong_range_column_slots_n_times(mut self, n: usize) -> Self { + self.return_wrong_range_column_slots_n_times = n; + self + } + + pub(super) fn with_partial_range_columns_n_times(mut self, n: usize) -> Self { + self.return_partial_range_columns_n_times = n; + self + } + + pub(super) fn with_ee_offline_for_n_range_responses(mut self, n: usize) -> Self { + self.ee_offline_for_n_range_responses = Some(n); + self + } + + pub(super) fn with_disconnect_after_range_requests(mut self, n: usize) -> Self { + self.successful_range_responses_before_disconnect = Some(n); + self + } } fn genesis_fork() -> ForkName { @@ -256,6 +322,7 @@ impl TestRig { }) } + #[allow(dead_code)] pub fn with_custody_type(node_custody_type: NodeCustodyType) -> Self { Self::new(TestRigConfig { fulu_test_type: FuluTestType::WeFullnodeThemSupernode, @@ -267,13 +334,23 @@ impl TestRig { /// /// Processes events from sync_rx (sink), beacon processor, and network queues in fixed /// priority order each tick. Handles completed work before pulling new requests. - async fn simulate(&mut self, complete_strategy: SimulateConfig) { + pub(super) async fn simulate(&mut self, complete_strategy: SimulateConfig) { self.complete_strategy = complete_strategy; self.log(&format!( "Running simulate with config {:?}", self.complete_strategy )); + // Set EE offline at the start if configured + if self + .complete_strategy + .ee_offline_for_n_range_responses + .is_some() + { + self.sync_manager + .update_execution_engine_state(EngineState::Offline); + } + let mut i = 0; loop { @@ -352,9 +429,34 @@ impl TestRig { process_fn.await } } - Work::RpcBlobs { process_fn } - | Work::RpcCustodyColumn(process_fn) - | Work::ChainSegment(process_fn) => process_fn.await, + Work::RpcBlobs { process_fn } | Work::RpcCustodyColumn(process_fn) => { + process_fn.await + } + Work::ChainSegment { + process_fn, + process_id: (chain_id, batch_epoch), + } => { + let sync_type = + ChainSegmentProcessId::RangeBatchId(chain_id, batch_epoch.into()); + if self.complete_strategy.range_faulty_failures > 0 { + self.complete_strategy.range_faulty_failures -= 1; + self.push_sync_message(SyncMessage::BatchProcessed { + sync_type, + result: BatchProcessResult::FaultyFailure { + imported_blocks: 0, + penalty: PeerAction::LowToleranceError, + }, + }); + } else if self.complete_strategy.range_non_faulty_failures > 0 { + self.complete_strategy.range_non_faulty_failures -= 1; + self.push_sync_message(SyncMessage::BatchProcessed { + sync_type, + result: BatchProcessResult::NonFaultyFailure, + }); + } else { + process_fn.await; + } + } Work::Reprocess(_) => {} // ignore other => panic!("Unsupported Work event {}", other.str_id()), } @@ -573,15 +675,50 @@ impl TestRig { if self.complete_strategy.skip_by_range_routes { return; } - let blocks = (*req.start_slot()..req.start_slot() + req.count()) - .filter_map(|slot| { - self.network_blocks_by_slot - .get(&Slot::new(slot)) - .map(|block| block.block_cloned()) - }) - .collect::>(); - self.send_rpc_blocks_response(req_id, peer_id, &blocks); + // Check if we should disconnect all peers instead of continuing + if let Some(ref mut remaining) = self + .complete_strategy + .successful_range_responses_before_disconnect + { + if *remaining == 0 { + // Disconnect all peers — remaining responses become "late" + for peer in self.get_connected_peers() { + self.peer_disconnected(peer); + } + return; + } else { + *remaining -= 1; + } + } + + // Return empty response N times to simulate peer returning no blocks + if self.complete_strategy.return_no_range_blocks_n_times > 0 { + self.complete_strategy.return_no_range_blocks_n_times -= 1; + self.send_rpc_blocks_response(req_id, peer_id, &[]); + } else { + let blocks = (*req.start_slot()..req.start_slot() + req.count()) + .filter_map(|slot| { + self.network_blocks_by_slot + .get(&Slot::new(slot)) + .map(|block| block.block_cloned()) + }) + .collect::>(); + self.send_rpc_blocks_response(req_id, peer_id, &blocks); + } + + // Bring EE back online after N range responses + if let Some(ref mut remaining) = + self.complete_strategy.ee_offline_for_n_range_responses + { + if *remaining == 0 { + self.sync_manager + .update_execution_engine_state(EngineState::Online); + self.complete_strategy.ee_offline_for_n_range_responses = None; + } else { + *remaining -= 1; + } + } } (RequestType::BlobsByRange(req), AppRequestId::Sync(req_id)) => { @@ -605,10 +742,80 @@ impl TestRig { if self.complete_strategy.skip_by_range_routes { return; } - // Note: This function is permissive, blocks may have zero columns and it won't - // error. Some caveats: - // - The genesis block never has columns - // - Some blocks may not have columns as the blob count is random + + // Return empty columns N times + if self.complete_strategy.return_no_range_columns_n_times > 0 { + self.complete_strategy.return_no_range_columns_n_times -= 1; + self.send_rpc_columns_response(req_id, peer_id, &[]); + return; + } + + // Return columns with unrequested indices N times. + // Note: for supernodes this returns no columns since they custody all indices. + if self + .complete_strategy + .return_wrong_range_column_indices_n_times + > 0 + { + self.complete_strategy + .return_wrong_range_column_indices_n_times -= 1; + let wrong_columns = (req.start_slot..req.start_slot + req.count) + .filter_map(|slot| self.network_blocks_by_slot.get(&Slot::new(slot))) + .filter_map(|block| block.block_data().data_columns()) + .flat_map(|columns| { + columns + .into_iter() + .filter(|c| !req.columns.contains(c.index())) + }) + .collect::>(); + self.send_rpc_columns_response(req_id, peer_id, &wrong_columns); + return; + } + + // Return columns from an out-of-range slot N times + if self + .complete_strategy + .return_wrong_range_column_slots_n_times + > 0 + { + self.complete_strategy + .return_wrong_range_column_slots_n_times -= 1; + // Get a column from a slot AFTER the requested range + let wrong_slot = req.start_slot + req.count; + let wrong_columns = self + .network_blocks_by_slot + .get(&Slot::new(wrong_slot)) + .and_then(|block| block.block_data().data_columns()) + .into_iter() + .flat_map(|columns| { + columns + .into_iter() + .filter(|c| req.columns.contains(c.index())) + }) + .collect::>(); + self.send_rpc_columns_response(req_id, peer_id, &wrong_columns); + return; + } + + // Return only half the requested columns N times — triggers CouplingError + if self.complete_strategy.return_partial_range_columns_n_times > 0 { + self.complete_strategy.return_partial_range_columns_n_times -= 1; + let columns = (req.start_slot..req.start_slot + req.count) + .filter_map(|slot| self.network_blocks_by_slot.get(&Slot::new(slot))) + .filter_map(|block| block.block_data().data_columns()) + .flat_map(|columns| { + columns + .into_iter() + .filter(|c| req.columns.contains(c.index())) + }) + .enumerate() + .filter(|(i, _)| i % 2 == 0) // keep every other column + .map(|(_, c)| c) + .collect::>(); + self.send_rpc_columns_response(req_id, peer_id, &columns); + return; + } + let columns = (req.start_slot..req.start_slot + req.count) .filter_map(|slot| self.network_blocks_by_slot.get(&Slot::new(slot))) .filter_map(|block| block.block_data().data_columns()) @@ -726,7 +933,7 @@ impl TestRig { // Preparation steps /// Returns the block root of the tip of the built chain - async fn build_chain(&mut self, block_count: usize) -> Hash256 { + pub(super) async fn build_chain(&mut self, block_count: usize) -> Hash256 { let mut blocks = vec![]; // Initialise a new beacon chain @@ -947,6 +1154,30 @@ impl TestRig { self.trigger_with_last_block(); } + /// Import blocks for slots 1..=up_to_slot into the local chain (advance local head) + pub(super) async fn import_blocks_up_to_slot(&mut self, up_to_slot: u64) { + for slot in 1..=up_to_slot { + let rpc_block = self + .network_blocks_by_slot + .get(&Slot::new(slot)) + .unwrap_or_else(|| panic!("No block at slot {slot}")) + .clone(); + let block_root = rpc_block.canonical_root(); + self.harness + .chain + .process_block( + block_root, + rpc_block, + NotifyExecutionLayer::Yes, + BlockImportSource::Gossip, + || Ok(()), + ) + .await + .unwrap(); + } + self.harness.chain.recompute_head_at_current_slot().await; + } + /// Import a block directly into the chain without going through lookup sync async fn import_block_by_root(&mut self, block_root: Hash256) { let range_sync_block = self @@ -1000,23 +1231,32 @@ impl TestRig { // Post-test assertions - fn head_slot(&self) -> Slot { + pub(super) fn head_slot(&self) -> Slot { self.harness.chain.head().head_slot() } - fn assert_head_slot(&self, slot: u64) { + pub(super) fn assert_head_slot(&self, slot: u64) { assert_eq!(self.head_slot(), Slot::new(slot), "Unexpected head slot"); } - fn max_known_slot(&self) -> Slot { + pub(super) fn max_known_slot(&self) -> Slot { self.network_blocks_by_slot .keys() .max() .copied() - .expect("no blocks") + .unwrap_or_default() } - fn assert_penalties(&self, expected_penalties: &[&'static str]) { + pub(super) fn finalized_epoch(&self) -> types::Epoch { + self.harness + .chain + .canonical_head + .cached_head() + .finalized_checkpoint() + .epoch + } + + pub(super) fn assert_penalties(&self, expected_penalties: &[&'static str]) { let penalties = self .penalties .iter() @@ -1034,7 +1274,7 @@ impl TestRig { } } - fn assert_penalties_of_type(&self, expected_penalty: &'static str) { + pub(super) fn assert_penalties_of_type(&self, expected_penalty: &'static str) { if self.penalties.is_empty() { panic!("No penalties but expected some of type {expected_penalty}"); } @@ -1051,7 +1291,7 @@ impl TestRig { } } - fn assert_no_penalties(&mut self) { + pub(super) fn assert_no_penalties(&mut self) { if !self.penalties.is_empty() { panic!("Some downscore events: {:?}", self.penalties); } @@ -1102,7 +1342,7 @@ impl TestRig { } /// Assert there is at least one range sync chain created and that all sync chains completed - fn assert_successful_range_sync(&self) { + pub(super) fn assert_successful_range_sync(&self) { assert!( self.range_sync_chains_added() > 0, "No created range sync chains" @@ -1425,6 +1665,7 @@ impl TestRig { } } + #[allow(dead_code)] pub fn pop_received_processor_event) -> Option>( &mut self, predicate_transform: F, diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index c19ee8eb6d..891d9d1e97 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -1,110 +1,47 @@ +//! Range sync tests for `BlocksByRange`, `BlobsByRange`, `DataColumnsByRange`. +//! +//! Tests follow the pattern from `lookups.rs`: +//! ```ignore +//! async fn test_name() { +//! let mut r = TestRig::default(); +//! r.setup_xyz().await; +//! r.simulate(SimulateConfig::happy_path()).await; +//! r.assert_range_sync_completed(); +//! } +//! ``` +//! +//! Rules: +//! - Tests must be succinct and readable (3-10 lines per test body) +//! - All complex logic lives in helpers (setup, SimulateConfig, assert) +//! - Test bodies must not manually grab requests, send SyncMessages, or do anything overly specific +//! - All tests use `simulate()` if they need peers to fulfill requests +//! - Extend `SimulateConfig` for new range-specific behaviors +//! - Extend `simulate()` to support by_range methods + +use super::lookups::SimulateConfig; use super::*; -use crate::network_beacon_processor::ChainSegmentProcessId; use crate::status::ToStatusMessage; use crate::sync::SyncMessage; use crate::sync::manager::SLOT_IMPORT_TOLERANCE; -use crate::sync::network_context::RangeRequestId; use crate::sync::range_sync::RangeSyncType; -use beacon_chain::BeaconChain; -use beacon_chain::block_verification_types::AvailableBlockData; -use beacon_chain::custody_context::NodeCustodyType; -use beacon_chain::data_column_verification::CustodyDataColumn; -use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy}; -use beacon_chain::{EngineState, NotifyExecutionLayer, block_verification_types::RangeSyncBlock}; -use beacon_processor::WorkType; -use lighthouse_network::rpc::RequestType; -use lighthouse_network::rpc::methods::{ - BlobsByRangeRequest, DataColumnsByRangeRequest, OldBlocksByRangeRequest, - OldBlocksByRangeRequestV2, StatusMessageV2, -}; -use lighthouse_network::service::api_types::{ - AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, - SyncRequestId, -}; +use lighthouse_network::rpc::RPCError; +use lighthouse_network::rpc::methods::StatusMessageV2; use lighthouse_network::{PeerId, SyncInfo}; -use std::time::Duration; -use types::{ - BlobSidecarList, BlockImportSource, Epoch, EthSpec, Hash256, MinimalEthSpec as E, - SignedBeaconBlock, SignedBeaconBlockHash, Slot, -}; +use types::{Epoch, EthSpec, Hash256, MinimalEthSpec as E, Slot}; -const D: Duration = Duration::new(0, 0); - -pub(crate) enum DataSidecars { - Blobs(BlobSidecarList), - DataColumns(Vec>), -} - -enum ByRangeDataRequestIds { - PreDeneb, - PrePeerDAS(BlobsByRangeRequestId, PeerId), - PostPeerDAS(Vec<(DataColumnsByRangeRequestId, PeerId)>), -} - -/// Sync tests are usually written in the form: -/// - Do some action -/// - Expect a request to be sent -/// - Complete the above request -/// -/// To make writting tests succint, the machinery in this testing rig automatically identifies -/// _which_ request to complete. Picking the right request is critical for tests to pass, so this -/// filter allows better expressivity on the criteria to identify the right request. -#[derive(Default, Debug, Clone)] -struct RequestFilter { - peer: Option, - epoch: Option, -} - -impl RequestFilter { - fn peer(mut self, peer: PeerId) -> Self { - self.peer = Some(peer); - self - } - - fn epoch(mut self, epoch: u64) -> Self { - self.epoch = Some(epoch); - self - } -} - -fn filter() -> RequestFilter { - RequestFilter::default() -} +/// MinimalEthSpec has 8 slots per epoch +const SLOTS_PER_EPOCH: usize = 8; impl TestRig { - /// Produce a head peer with an advanced head fn add_head_peer(&mut self) -> PeerId { - self.add_head_peer_with_root(Hash256::random()) - } - - /// Produce a head peer with an advanced head - fn add_head_peer_with_root(&mut self, head_root: Hash256) -> PeerId { let local_info = self.local_info(); self.add_supernode_peer(SyncInfo { - head_root, + head_root: Hash256::random(), head_slot: local_info.head_slot + 1 + Slot::new(SLOT_IMPORT_TOLERANCE as u64), ..local_info }) } - // Produce a finalized peer with an advanced finalized epoch - fn add_finalized_peer(&mut self) -> PeerId { - self.add_finalized_peer_with_root(Hash256::random()) - } - - // Produce a finalized peer with an advanced finalized epoch - fn add_finalized_peer_with_root(&mut self, finalized_root: Hash256) -> PeerId { - let local_info = self.local_info(); - let finalized_epoch = local_info.finalized_epoch + 2; - self.add_supernode_peer(SyncInfo { - finalized_epoch, - finalized_root, - head_slot: finalized_epoch.start_slot(E::slots_per_epoch()), - head_root: Hash256::random(), - earliest_available_slot: None, - }) - } - fn finalized_remote_info_advanced_by(&self, advanced_epochs: Epoch) -> SyncInfo { let local_info = self.local_info(); let finalized_epoch = local_info.finalized_epoch + advanced_epochs; @@ -142,11 +79,7 @@ impl TestRig { } fn add_supernode_peer(&mut self, remote_info: SyncInfo) -> PeerId { - // Create valid peer known to network globals - // TODO(fulu): Using supernode peers to ensure we have peer across all column - // subnets for syncing. Should add tests connecting to full node peers. let peer_id = self.new_connected_supernode_peer(); - // Send peer to sync self.send_sync_message(SyncMessage::AddPeer(peer_id, remote_info)); peer_id } @@ -184,450 +117,362 @@ impl TestRig { ) } - #[track_caller] - fn assert_chain_segments(&mut self, count: usize) { - for i in 0..count { - self.pop_received_processor_event(|ev| { - (ev.work_type() == beacon_processor::WorkType::ChainSegment).then_some(()) - }) - .unwrap_or_else(|e| panic!("Expect ChainSegment work event count {i}: {e:?}")); - } + // -- Setup helpers -- + + /// Head sync: peers whose finalized root/epoch match ours (known to fork choice), + /// but whose head is ahead. Only head chain is created. + async fn setup_head_sync(&mut self) { + self.build_chain(SLOTS_PER_EPOCH).await; + self.add_head_peer(); + self.assert_state(RangeSyncType::Head); } - fn update_execution_engine_state(&mut self, state: EngineState) { - self.log(&format!("execution engine state updated: {state:?}")); - self.sync_manager.update_execution_engine_state(state); + /// Finalized sync: peers whose finalized epoch is advanced and head == finalized start slot. + /// Returns the remote SyncInfo (needed for blacklist tests). + async fn setup_finalized_sync(&mut self) -> SyncInfo { + let advanced_epochs = 5; + self.build_chain(advanced_epochs * SLOTS_PER_EPOCH).await; + let remote_info = self.finalized_remote_info_advanced_by((advanced_epochs as u64).into()); + self.add_fullnode_peers(remote_info.clone(), 100); + self.add_supernode_peer(remote_info.clone()); + self.assert_state(RangeSyncType::Finalized); + remote_info } - fn find_blocks_by_range_request( - &mut self, - request_filter: RequestFilter, - ) -> ((BlocksByRangeRequestId, PeerId), ByRangeDataRequestIds) { - let filter_f = |peer: PeerId, start_slot: u64| { - if let Some(expected_epoch) = request_filter.epoch { - let epoch = Slot::new(start_slot).epoch(E::slots_per_epoch()).as_u64(); - if epoch != expected_epoch { - return false; - } - } - if let Some(expected_peer) = request_filter.peer - && peer != expected_peer - { - return false; - } - - true + /// Finalized-to-head: peers whose finalized is advanced AND head is beyond finalized. + /// After finalized sync completes, head chains are created from awaiting_head_peers. + async fn setup_finalized_and_head_sync(&mut self) { + let finalized_epochs = 5; + let head_epochs = 7; + self.build_chain(head_epochs * SLOTS_PER_EPOCH).await; + let local_info = self.local_info(); + let finalized_epoch = local_info.finalized_epoch + Epoch::new(finalized_epochs as u64); + let head_slot = Slot::new((head_epochs * SLOTS_PER_EPOCH) as u64); + let remote_info = SyncInfo { + finalized_epoch, + finalized_root: Hash256::random(), + head_slot, + head_root: Hash256::random(), + earliest_available_slot: None, }; - - let block_req = self - .pop_received_network_event(|ev| match ev { - NetworkMessage::SendRequest { - peer_id, - request: - RequestType::BlocksByRange(OldBlocksByRangeRequest::V2( - OldBlocksByRangeRequestV2 { start_slot, .. }, - )), - app_request_id: AppRequestId::Sync(SyncRequestId::BlocksByRange(id)), - } if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)), - _ => None, - }) - .unwrap_or_else(|e| { - panic!("Should have a BlocksByRange request, filter {request_filter:?}: {e:?}") - }); - - let by_range_data_requests = if self.is_after_fulu() { - let mut data_columns_requests = vec![]; - while let Ok(data_columns_request) = self.pop_received_network_event(|ev| match ev { - NetworkMessage::SendRequest { - peer_id, - request: - RequestType::DataColumnsByRange(DataColumnsByRangeRequest { - start_slot, .. - }), - app_request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRange(id)), - } if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)), - _ => None, - }) { - data_columns_requests.push(data_columns_request); - } - if data_columns_requests.is_empty() { - panic!("Found zero DataColumnsByRange requests, filter {request_filter:?}"); - } - ByRangeDataRequestIds::PostPeerDAS(data_columns_requests) - } else if self.is_after_deneb() { - let (id, peer) = self - .pop_received_network_event(|ev| match ev { - NetworkMessage::SendRequest { - peer_id, - request: RequestType::BlobsByRange(BlobsByRangeRequest { start_slot, .. }), - app_request_id: AppRequestId::Sync(SyncRequestId::BlobsByRange(id)), - } if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)), - _ => None, - }) - .unwrap_or_else(|e| { - panic!("Should have a blobs by range request, filter {request_filter:?}: {e:?}") - }); - ByRangeDataRequestIds::PrePeerDAS(id, peer) - } else { - ByRangeDataRequestIds::PreDeneb - }; - - (block_req, by_range_data_requests) + self.add_fullnode_peers(remote_info.clone(), 100); + self.add_supernode_peer(remote_info); + self.assert_state(RangeSyncType::Finalized); } - fn find_and_complete_blocks_by_range_request( - &mut self, - request_filter: RequestFilter, - ) -> RangeRequestId { - let ((blocks_req_id, block_peer), by_range_data_request_ids) = - self.find_blocks_by_range_request(request_filter); - - // Complete the request with a single stream termination - self.log(&format!( - "Completing BlocksByRange request {blocks_req_id:?} with empty stream" - )); - self.send_sync_message(SyncMessage::RpcBlock { - sync_request_id: SyncRequestId::BlocksByRange(blocks_req_id), - peer_id: block_peer, - beacon_block: None, - seen_timestamp: D, - }); - - match by_range_data_request_ids { - ByRangeDataRequestIds::PreDeneb => {} - ByRangeDataRequestIds::PrePeerDAS(id, peer_id) => { - // Complete the request with a single stream termination - self.log(&format!( - "Completing BlobsByRange request {id:?} with empty stream" - )); - self.send_sync_message(SyncMessage::RpcBlob { - sync_request_id: SyncRequestId::BlobsByRange(id), - peer_id, - blob_sidecar: None, - seen_timestamp: D, - }); - } - ByRangeDataRequestIds::PostPeerDAS(data_column_req_ids) => { - // Complete the request with a single stream termination - for (id, peer_id) in data_column_req_ids { - self.log(&format!( - "Completing DataColumnsByRange request {id:?} with empty stream" - )); - self.send_sync_message(SyncMessage::RpcDataColumn { - sync_request_id: SyncRequestId::DataColumnsByRange(id), - peer_id, - data_column: None, - seen_timestamp: D, - }); - } - } - } - - blocks_req_id.parent_request_id.requester + /// Finalized sync with only 1 fullnode peer (insufficient custody coverage). + /// Returns remote_info to pass to `add_remaining_finalized_peers`. + async fn setup_finalized_sync_insufficient_peers(&mut self) -> SyncInfo { + let advanced_epochs = 5; + self.build_chain(advanced_epochs * SLOTS_PER_EPOCH).await; + let remote_info = self.finalized_remote_info_advanced_by((advanced_epochs as u64).into()); + self.add_fullnode_peer(remote_info.clone()); + self.assert_state(RangeSyncType::Finalized); + remote_info } - fn find_and_complete_processing_chain_segment(&mut self, id: ChainSegmentProcessId) { - self.pop_received_processor_event(|ev| { - (ev.work_type() == WorkType::ChainSegment).then_some(()) - }) - .unwrap_or_else(|e| panic!("Expected chain segment work event: {e}")); - - self.log(&format!( - "Completing ChainSegment processing work {id:?} with success" - )); - self.send_sync_message(SyncMessage::BatchProcessed { - sync_type: id, - result: crate::sync::BatchProcessResult::Success { - sent_blocks: 8, - imported_blocks: 8, - }, - }); - } - - fn complete_and_process_range_sync_until( - &mut self, - last_epoch: u64, - request_filter: RequestFilter, - ) { - for epoch in 0..last_epoch { - // Note: In this test we can't predict the block peer - let id = - self.find_and_complete_blocks_by_range_request(request_filter.clone().epoch(epoch)); - if let RangeRequestId::RangeSync { batch_id, .. } = id { - assert_eq!(batch_id.as_u64(), epoch, "Unexpected batch_id"); - } else { - panic!("unexpected RangeRequestId {id:?}"); - } - - let id = match id { - RangeRequestId::RangeSync { chain_id, batch_id } => { - ChainSegmentProcessId::RangeBatchId(chain_id, batch_id) - } - RangeRequestId::BackfillSync { batch_id } => { - ChainSegmentProcessId::BackSyncBatchId(batch_id) - } - }; - - self.find_and_complete_processing_chain_segment(id); - if epoch < last_epoch - 1 { - self.assert_state(RangeSyncType::Finalized); - } else { - self.assert_no_chains_exist(); - self.assert_no_failed_chains(); - } - } - } - - async fn create_canonical_block(&mut self) -> (SignedBeaconBlock, Option>) { - self.harness.advance_slot(); - - let block_root = self - .harness - .extend_chain( - 1, - BlockStrategy::OnCanonicalHead, - AttestationStrategy::AllValidators, - ) + /// Finalized sync where local node already has blocks up to `local_epochs`. + /// Triggers optimistic start: the chain tries to download a batch at the local head + /// epoch concurrently with sequential processing from the start. + async fn setup_finalized_sync_with_local_head(&mut self, local_epochs: usize) { + let target_epochs = local_epochs + 3; // target beyond local head + self.build_chain(target_epochs * SLOTS_PER_EPOCH).await; + self.import_blocks_up_to_slot((local_epochs * SLOTS_PER_EPOCH) as u64) .await; - - let store = &self.harness.chain.store; - let block = store.get_full_block(&block_root).unwrap().unwrap(); - let fork = block.fork_name_unchecked(); - - let data_sidecars = if fork.fulu_enabled() { - store - .get_data_columns(&block_root, fork) - .unwrap() - .map(|columns| { - columns - .into_iter() - .map(CustodyDataColumn::from_asserted_custody) - .collect() - }) - .map(DataSidecars::DataColumns) - } else if fork.deneb_enabled() { - store - .get_blobs(&block_root) - .unwrap() - .blobs() - .map(DataSidecars::Blobs) - } else { - None - }; - - (block, data_sidecars) + let remote_info = self.finalized_remote_info_advanced_by((target_epochs as u64).into()); + self.add_fullnode_peers(remote_info.clone(), 100); + self.add_supernode_peer(remote_info); + self.assert_state(RangeSyncType::Finalized); } - async fn remember_block( - &mut self, - (block, data_sidecars): (SignedBeaconBlock, Option>), - ) { - // This code is kind of duplicated from Harness::process_block, but takes sidecars directly. - let block_root = block.canonical_root(); - self.harness.set_current_slot(block.slot()); - let _: SignedBeaconBlockHash = self - .harness - .chain - .process_block( - block_root, - build_range_sync_block(block.into(), &data_sidecars, self.harness.chain.clone()), - NotifyExecutionLayer::Yes, - BlockImportSource::RangeSync, - || Ok(()), - ) - .await - .unwrap() - .try_into() - .unwrap(); - self.harness.chain.recompute_head_at_current_slot().await; + /// Add enough peers to cover all custody columns (same chain as insufficient setup) + fn add_remaining_finalized_peers(&mut self, remote_info: SyncInfo) { + self.add_fullnode_peers(remote_info.clone(), 100); + self.add_supernode_peer(remote_info); + } + + // -- Assert helpers -- + + /// Assert range sync completed: chains created and removed, all blocks ingested, + /// finalized epoch advanced, no penalties, no leftover events. + fn assert_range_sync_completed(&mut self) { + self.assert_successful_range_sync(); + self.assert_no_failed_chains(); + assert_eq!( + self.head_slot(), + self.max_known_slot(), + "Head slot should match the last built block (all blocks ingested)" + ); + assert!( + self.finalized_epoch() > types::Epoch::new(0), + "Finalized epoch should have advanced past genesis, got {}", + self.finalized_epoch() + ); + self.assert_no_penalties(); + self.assert_empty_network(); + self.assert_empty_processor(); + } + + /// Assert head sync completed (no finalization expected for short ranges) + fn assert_head_sync_completed(&mut self) { + self.assert_successful_range_sync(); + self.assert_no_failed_chains(); + assert_eq!( + self.head_slot(), + self.max_known_slot(), + "Head slot should match the last built block (all blocks ingested)" + ); + self.assert_no_penalties(); + } + + /// Assert chain was removed and peers received faulty_chain penalty + fn assert_range_sync_chain_failed(&mut self) { + self.assert_no_chains_exist(); + assert!( + self.penalties.iter().any(|p| p.msg == "faulty_chain"), + "Expected faulty_chain penalty, got {:?}", + self.penalties + ); + } + + /// Assert range sync removed chains (e.g., all peers disconnected) + fn assert_range_sync_chain_removed(&mut self) { + self.assert_no_chains_exist(); + } + + /// Assert a new peer with a blacklisted root gets disconnected + fn assert_peer_blacklisted(&mut self, remote_info: SyncInfo) { + let new_peer = self.add_supernode_peer(remote_info); + self.pop_received_network_event(|ev| match ev { + NetworkMessage::GoodbyePeer { peer_id, .. } if *peer_id == new_peer => Some(()), + _ => None, + }) + .expect("Peer with blacklisted root should receive Goodbye"); } } -fn build_range_sync_block( - block: Arc>, - data_sidecars: &Option>, - chain: Arc>, -) -> RangeSyncBlock { - match data_sidecars { - Some(DataSidecars::Blobs(blobs)) => { - let block_data = AvailableBlockData::new_with_blobs(blobs.clone()); - RangeSyncBlock::new( - block, - block_data, - &chain.data_availability_checker, - chain.spec.clone(), - ) - .unwrap() - } - Some(DataSidecars::DataColumns(columns)) => { - let block_data = AvailableBlockData::new_with_data_columns( - columns - .iter() - .map(|c| c.as_data_column().clone()) - .collect::>(), - ); - RangeSyncBlock::new( - block, - block_data, - &chain.data_availability_checker, - chain.spec.clone(), - ) - .unwrap() - } - // Block has no data, expects zero columns - None => RangeSyncBlock::new( - block, - AvailableBlockData::NoData, - &chain.data_availability_checker, - chain.spec.clone(), - ) - .unwrap(), - } -} - -#[test] -fn head_chain_removed_while_finalized_syncing() { - // NOTE: this is a regression test. - // Added in PR https://github.com/sigp/lighthouse/pull/2821 - let mut rig = TestRig::default(); - - // Get a peer with an advanced head - let head_peer = rig.add_head_peer(); - rig.assert_state(RangeSyncType::Head); - - // Sync should have requested a batch, grab the request. - let _ = rig.find_blocks_by_range_request(filter().peer(head_peer)); - - // Now get a peer with an advanced finalized epoch. - let finalized_peer = rig.add_finalized_peer(); - rig.assert_state(RangeSyncType::Finalized); - - // Sync should have requested a batch, grab the request - let _ = rig.find_blocks_by_range_request(filter().peer(finalized_peer)); - - // Fail the head chain by disconnecting the peer. - rig.peer_disconnected(head_peer); - rig.assert_state(RangeSyncType::Finalized); -} +// ============================================================================================ +// Tests +// ============================================================================================ +/// Head sync: single peer slightly ahead → download batches → all blocks ingested. #[tokio::test] -async fn state_update_while_purging() { - // NOTE: this is a regression test. - // Added in PR https://github.com/sigp/lighthouse/pull/2827 - let mut rig = TestRig::with_custody_type(NodeCustodyType::SemiSupernode); - - // Create blocks on a separate harness - // SemiSupernode ensures enough columns are stored for sampling + custody RPC block validation - let mut rig_2 = TestRig::with_custody_type(NodeCustodyType::SemiSupernode); - // Need to create blocks that can be inserted into the fork-choice and fit the "known - // conditions" below. - let head_peer_block = rig_2.create_canonical_block().await; - let head_peer_root = head_peer_block.0.canonical_root(); - let finalized_peer_block = rig_2.create_canonical_block().await; - let finalized_peer_root = finalized_peer_block.0.canonical_root(); - - // Get a peer with an advanced head - let head_peer = rig.add_head_peer_with_root(head_peer_root); - rig.assert_state(RangeSyncType::Head); - - // Sync should have requested a batch, grab the request. - let _ = rig.find_blocks_by_range_request(filter().peer(head_peer)); - - // Now get a peer with an advanced finalized epoch. - let finalized_peer = rig.add_finalized_peer_with_root(finalized_peer_root); - rig.assert_state(RangeSyncType::Finalized); - - // Sync should have requested a batch, grab the request - let _ = rig.find_blocks_by_range_request(filter().peer(finalized_peer)); - - // Now the chain knows both chains target roots. - rig.remember_block(head_peer_block).await; - rig.remember_block(finalized_peer_block).await; - - // Add an additional peer to the second chain to make range update it's status - rig.add_finalized_peer(); -} - -#[test] -fn pause_and_resume_on_ee_offline() { - let mut rig = TestRig::default(); - - // add some peers - let peer1 = rig.add_head_peer(); - // make the ee offline - rig.update_execution_engine_state(EngineState::Offline); - // send the response to the request - rig.find_and_complete_blocks_by_range_request(filter().peer(peer1).epoch(0)); - // the beacon processor shouldn't have received any work - rig.assert_empty_processor(); - - // while the ee is offline, more peers might arrive. Add a new finalized peer. - let _peer2 = rig.add_finalized_peer(); - - // send the response to the request - // Don't filter requests and the columns requests may be sent to peer1 or peer2 - // We need to filter by epoch, because the previous batch eagerly sent requests for the next - // epoch for the other batch. So we can either filter by epoch of by sync type. - rig.find_and_complete_blocks_by_range_request(filter().epoch(0)); - // the beacon processor shouldn't have received any work - rig.assert_empty_processor(); - // make the beacon processor available again. - // update_execution_engine_state implicitly calls resume - // now resume range, we should have two processing requests in the beacon processor. - rig.update_execution_engine_state(EngineState::Online); - - // The head chain and finalized chain (2) should be in the processing queue - rig.assert_chain_segments(2); -} - -/// To attempt to finalize the peer's status finalized checkpoint we synced to its finalized epoch + -/// 2 epochs + 1 slot. -const EXTRA_SYNCED_EPOCHS: u64 = 2 + 1; - -#[test] -fn finalized_sync_enough_global_custody_peers_few_chain_peers() { - // Run for all forks +async fn head_sync_completes() { let mut r = TestRig::default(); - - let advanced_epochs: u64 = 2; - let remote_info = r.finalized_remote_info_advanced_by(advanced_epochs.into()); - - // Generate enough peers and supernodes to cover all custody columns - let peer_count = 100; - r.add_fullnode_peers(remote_info.clone(), peer_count); - r.add_supernode_peer(remote_info); - r.assert_state(RangeSyncType::Finalized); - - let last_epoch = advanced_epochs + EXTRA_SYNCED_EPOCHS; - r.complete_and_process_range_sync_until(last_epoch, filter()); + r.setup_head_sync().await; + r.simulate(SimulateConfig::happy_path()).await; + r.assert_head_sync_completed(); + r.assert_head_slot(SLOTS_PER_EPOCH as u64); } -#[test] -fn finalized_sync_not_enough_custody_peers_on_start() { +/// Peers with advanced finalized AND head beyond finalized. Finalized sync completes first, +/// then head chains are created from awaiting_head_peers to sync the remaining gap. +#[tokio::test] +async fn finalized_to_head_transition() { + let mut r = TestRig::default(); + r.setup_finalized_and_head_sync().await; + r.simulate(SimulateConfig::happy_path()).await; + r.assert_range_sync_completed(); + r.assert_head_slot(7 * SLOTS_PER_EPOCH as u64); +} + +/// Finalized sync happy path: all batches download and process, head advances to target, +/// finalized epoch advances past genesis. +#[tokio::test] +async fn finalized_sync_completes() { + let mut r = TestRig::default(); + r.setup_finalized_sync().await; + r.simulate(SimulateConfig::happy_path()).await; + r.assert_range_sync_completed(); + r.assert_head_slot(5 * SLOTS_PER_EPOCH as u64); +} + +/// First BlocksByRange request gets an RPC error. Batch retries from another peer, +/// sync completes with no penalties (RPC errors are not penalized). +#[tokio::test] +async fn batch_rpc_error_retries() { + let mut r = TestRig::default(); + r.setup_finalized_sync().await; + r.simulate(SimulateConfig::happy_path().return_rpc_error(RPCError::UnsupportedProtocol)) + .await; + r.assert_range_sync_completed(); +} + +/// Peer returns zero blocks for a BlocksByRange request. Batch retries, sync completes. +#[tokio::test] +async fn batch_peer_returns_empty_then_succeeds() { + let mut r = TestRig::default(); + r.setup_finalized_sync().await; + r.simulate(SimulateConfig::happy_path().with_no_range_blocks_n_times(1)) + .await; + r.assert_successful_range_sync(); +} + +/// Peer returns zero columns for a DataColumnsByRange request. Batch retries, sync completes. +/// Only exercises column logic on fulu+. +#[tokio::test] +async fn batch_peer_returns_no_columns_then_succeeds() { + let mut r = TestRig::default(); + r.setup_finalized_sync().await; + r.simulate(SimulateConfig::happy_path().with_no_range_columns_n_times(1)) + .await; + r.assert_successful_range_sync(); +} + +/// Peer returns columns with indices it wasn't asked for → UnrequestedIndex verify error. +/// Batch retries from another peer, sync completes. +#[tokio::test] +async fn batch_peer_returns_wrong_column_indices_then_succeeds() { + let mut r = TestRig::default(); + r.setup_finalized_sync().await; + r.simulate(SimulateConfig::happy_path().with_wrong_range_column_indices_n_times(1)) + .await; + r.assert_successful_range_sync(); +} + +/// Peer returns columns from a slot outside the requested range → UnrequestedSlot verify error. +/// Batch retries from another peer, sync completes. +#[tokio::test] +async fn batch_peer_returns_wrong_column_slots_then_succeeds() { + let mut r = TestRig::default(); + r.setup_finalized_sync().await; + r.simulate(SimulateConfig::happy_path().with_wrong_range_column_slots_n_times(1)) + .await; + r.assert_successful_range_sync(); +} + +/// PeerDAS: peer returns only half the requested columns. Block-sidecar coupling detects +/// missing columns → CouplingError::DataColumnPeerFailure → retry_partial_batch from other peers. +#[tokio::test] +async fn batch_peer_returns_partial_columns_then_succeeds() { let mut r = TestRig::default(); - // Only run post-PeerDAS if !r.fork_name.fulu_enabled() { return; } - - let advanced_epochs: u64 = 2; - let remote_info = r.finalized_remote_info_advanced_by(advanced_epochs.into()); - - // Unikely that the single peer we added has enough columns for us. Tests are deterministic and - // this error should never be hit - r.add_fullnode_peer(remote_info.clone()); - r.assert_state(RangeSyncType::Finalized); - - // Because we don't have enough peers on all columns we haven't sent any request. - // NOTE: There's a small chance that this single peer happens to custody exactly the set we - // expect, in that case the test will fail. Find a way to make the test deterministic. - r.assert_empty_network(); - - // Generate enough peers and supernodes to cover all custody columns - let peer_count = 100; - r.add_fullnode_peers(remote_info.clone(), peer_count); - r.add_supernode_peer(remote_info); - - let last_epoch = advanced_epochs + EXTRA_SYNCED_EPOCHS; - r.complete_and_process_range_sync_until(last_epoch, filter()); + r.setup_finalized_sync().await; + r.simulate(SimulateConfig::happy_path().with_partial_range_columns_n_times(1)) + .await; + r.assert_successful_range_sync(); +} + +/// Batch processing returns NonFaultyFailure (e.g. transient error). Batch goes back to +/// AwaitingDownload, retries without penalty, sync completes. +#[tokio::test] +async fn batch_non_faulty_failure_retries() { + let mut r = TestRig::default(); + r.setup_finalized_sync().await; + r.simulate(SimulateConfig::happy_path().with_range_non_faulty_failures(1)) + .await; + r.assert_range_sync_completed(); +} + +/// Batch processing returns FaultyFailure once. Peer penalized with "faulty_batch", +/// batch redownloaded from a different peer, sync completes. +#[tokio::test] +async fn batch_faulty_failure_redownloads() { + let mut r = TestRig::default(); + r.setup_finalized_sync().await; + r.simulate(SimulateConfig::happy_path().with_range_faulty_failures(1)) + .await; + r.assert_successful_range_sync(); + r.assert_penalties_of_type("faulty_batch"); +} + +/// Batch processing fails MAX_BATCH_PROCESSING_ATTEMPTS (3) times with FaultyFailure. +/// Chain removed, all peers penalized with "faulty_chain". +#[tokio::test] +async fn batch_max_failures_removes_chain() { + let mut r = TestRig::default(); + r.setup_finalized_sync().await; + r.simulate(SimulateConfig::happy_path().with_range_faulty_failures(3)) + .await; + r.assert_range_sync_chain_failed(); +} + +/// Chain fails via max faulty retries → finalized root added to failed_chains LRU. +/// A new peer advertising the same finalized root gets disconnected with GoodbyeReason. +#[tokio::test] +async fn failed_chain_blacklisted() { + let mut r = TestRig::default(); + let remote_info = r.setup_finalized_sync().await; + r.simulate(SimulateConfig::happy_path().with_range_faulty_failures(3)) + .await; + r.assert_range_sync_chain_failed(); + r.assert_peer_blacklisted(remote_info); +} + +/// All peers disconnect before any request is fulfilled → chain removed (EmptyPeerPool). +#[tokio::test] +async fn all_peers_disconnect_removes_chain() { + let mut r = TestRig::default(); + r.setup_finalized_sync().await; + r.simulate(SimulateConfig::happy_path().with_disconnect_after_range_requests(0)) + .await; + r.assert_range_sync_chain_removed(); +} + +/// Peers disconnect after 1 request is served. Remaining in-flight responses arrive +/// for a chain that no longer exists — verified as a no-op (no crash). +#[tokio::test] +async fn late_response_for_removed_chain() { + let mut r = TestRig::default(); + r.setup_finalized_sync().await; + r.simulate(SimulateConfig::happy_path().with_disconnect_after_range_requests(1)) + .await; + r.assert_range_sync_chain_removed(); +} + +/// Execution engine goes offline at sync start. Batch responses complete but processing +/// is paused. After 2 responses, EE comes back online, queued batches process, sync completes. +#[tokio::test] +async fn ee_offline_then_online_resumes_sync() { + let mut r = TestRig::default(); + r.setup_finalized_sync().await; + r.simulate(SimulateConfig::happy_path().with_ee_offline_for_n_range_responses(2)) + .await; + r.assert_range_sync_completed(); +} + +/// Local node already has blocks up to epoch 3. Finalized sync starts targeting epoch 6. +/// The chain uses optimistic start: downloads a batch at the local head epoch concurrently +/// with sequential processing from the start. All blocks ingested. +#[tokio::test] +async fn finalized_sync_with_local_head_partial() { + let mut r = TestRig::default(); + r.setup_finalized_sync_with_local_head(3).await; + r.simulate(SimulateConfig::happy_path()).await; + r.assert_range_sync_completed(); +} + +/// Local node has all blocks except the last one. Finalized sync only needs to fill the +/// final gap. Tests optimistic start where local head is near the target. +#[tokio::test] +async fn finalized_sync_with_local_head_near_target() { + let mut r = TestRig::default(); + let target_epochs = 5; + let local_slots = (target_epochs * SLOTS_PER_EPOCH) - 1; // all blocks except last + r.build_chain(target_epochs * SLOTS_PER_EPOCH).await; + r.import_blocks_up_to_slot(local_slots as u64).await; + let remote_info = r.finalized_remote_info_advanced_by((target_epochs as u64).into()); + r.add_fullnode_peers(remote_info.clone(), 100); + r.add_supernode_peer(remote_info); + r.assert_state(RangeSyncType::Finalized); + r.simulate(SimulateConfig::happy_path()).await; + r.assert_range_sync_completed(); + r.assert_head_slot((target_epochs * SLOTS_PER_EPOCH) as u64); +} + +/// PeerDAS only: single fullnode peer doesn't cover all custody columns → no requests sent. +/// Once enough fullnodes + a supernode arrive, sync proceeds and completes. +#[tokio::test] +async fn not_enough_custody_peers_then_peers_arrive() { + let mut r = TestRig::default(); + if !r.fork_name.fulu_enabled() { + return; + } + let remote_info = r.setup_finalized_sync_insufficient_peers().await; + r.assert_empty_network(); + r.add_remaining_finalized_peers(remote_info); + r.simulate(SimulateConfig::happy_path()).await; + r.assert_range_sync_completed(); } diff --git a/scripts/range-sync-coverage.sh b/scripts/range-sync-coverage.sh new file mode 100755 index 0000000000..df438c0c7f --- /dev/null +++ b/scripts/range-sync-coverage.sh @@ -0,0 +1,136 @@ +#!/bin/bash +# Aggregate range sync test coverage across all forks +# Usage: ./scripts/range-sync-coverage.sh [--html] +set -e + +REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)" +cd "$REPO_ROOT" + +TARGET_DIR="${CARGO_TARGET_DIR:-/mnt/ssd/builds/lighthouse-range-sync-tests}" +FORKS=(base altair bellatrix capella deneb electra fulu) +LCOV_DIR="/tmp/range-cov-forks" +MERGED="/tmp/range-cov-merged.lcov" + +rm -rf "$LCOV_DIR" +mkdir -p "$LCOV_DIR" + +echo "=== Running coverage for each fork ===" +for fork in "${FORKS[@]}"; do + echo "--- $fork ---" + CARGO_TARGET_DIR="$TARGET_DIR" FORK_NAME="$fork" \ + cargo llvm-cov --features "network/fake_crypto,network/fork_from_env" \ + -p network --lib --lcov --output-path "$LCOV_DIR/$fork.lcov" \ + -- "sync::tests::range" 2>&1 | grep -E "test result|running" +done + +echo "" +echo "=== Merging lcov files ===" + +# Merge all lcov files: for each source file, take max hit count per line +python3 - "$LCOV_DIR" "$MERGED" << 'PYEOF' +import sys, os, glob +from collections import defaultdict + +lcov_dir = sys.argv[1] +output = sys.argv[2] + +# Parse all lcov files: file -> line -> max hits +coverage = defaultdict(lambda: defaultdict(int)) +fn_coverage = defaultdict(lambda: defaultdict(int)) +current_sf = None + +for lcov_file in sorted(glob.glob(os.path.join(lcov_dir, "*.lcov"))): + with open(lcov_file) as f: + for line in f: + line = line.strip() + if line.startswith("SF:"): + current_sf = line[3:] + elif line.startswith("DA:") and current_sf: + parts = line[3:].split(",") + lineno = int(parts[0]) + hits = int(parts[1]) + coverage[current_sf][lineno] = max(coverage[current_sf][lineno], hits) + elif line.startswith("FNDA:") and current_sf: + parts = line[5:].split(",", 1) + hits = int(parts[0]) + fn_name = parts[1] + fn_coverage[current_sf][fn_name] = max(fn_coverage[current_sf][fn_name], hits) + +# Write merged lcov +with open(output, "w") as f: + for sf in sorted(coverage.keys()): + f.write(f"SF:{sf}\n") + for fn_name, hits in sorted(fn_coverage.get(sf, {}).items()): + f.write(f"FNDA:{hits},{fn_name}\n") + for lineno in sorted(coverage[sf].keys()): + f.write(f"DA:{lineno},{coverage[sf][lineno]}\n") + total = len(coverage[sf]) + covered = sum(1 for h in coverage[sf].values() if h > 0) + f.write(f"LH:{covered}\n") + f.write(f"LF:{total}\n") + f.write("end_of_record\n") + +print(f"Merged {len(glob.glob(os.path.join(lcov_dir, '*.lcov')))} lcov files -> {output}") +PYEOF + +echo "" +echo "=== Range sync coverage (merged across all forks) ===" + +# Extract and display range sync files +python3 - "$MERGED" << 'PYEOF' +import sys +from collections import defaultdict + +current_sf = None +files = {} # short_name -> (total_lines, covered_lines) +lines = defaultdict(dict) + +with open(sys.argv[1]) as f: + for line in f: + line = line.strip() + if line.startswith("SF:"): + current_sf = line[3:] + elif line.startswith("DA:") and current_sf: + parts = line[3:].split(",") + lineno, hits = int(parts[0]), int(parts[1]) + lines[current_sf][lineno] = hits + +# Filter to range sync files +targets = [ + "range_sync/chain.rs", + "range_sync/chain_collection.rs", + "range_sync/range.rs", + "requests/blocks_by_range.rs", + "requests/blobs_by_range.rs", + "requests/data_columns_by_range.rs", +] + +print(f"{'File':<45} {'Lines':>6} {'Covered':>8} {'Missed':>7} {'Coverage':>9}") +print("-" * 80) + +total_all = 0 +covered_all = 0 + +for sf in sorted(lines.keys()): + short = sf.split("sync/")[-1] if "sync/" in sf else sf.split("/")[-1] + if not any(t in sf for t in targets): + continue + total = len(lines[sf]) + covered = sum(1 for h in lines[sf].values() if h > 0) + missed = total - covered + pct = covered / total * 100 if total > 0 else 0 + total_all += total + covered_all += covered + print(f"{short:<45} {total:>6} {covered:>8} {missed:>7} {pct:>8.1f}%") + +print("-" * 80) +pct_all = covered_all / total_all * 100 if total_all > 0 else 0 +print(f"{'TOTAL':<45} {total_all:>6} {covered_all:>8} {total_all - covered_all:>7} {pct_all:>8.1f}%") +PYEOF + +if [ "$1" = "--html" ]; then + echo "" + echo "=== Generating HTML report ===" + genhtml "$MERGED" -o /tmp/range-cov-html --ignore-errors source 2>/dev/null + echo "HTML report: /tmp/range-cov-html/index.html" +fi