diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index dfafc88405..f21576372d 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -50,6 +50,7 @@ use crate::sync::block_lookups::{ BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult, }; use crate::sync::network_context::PeerGroup; +use crate::sync::range_sync::BATCH_BUFFER_SIZE; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::validator_monitor::timestamp_now; use beacon_chain::{ @@ -280,6 +281,7 @@ pub fn spawn( sync_recv, SamplingConfig::Default, fork_context, + BATCH_BUFFER_SIZE, ); // spawn the sync manager thread @@ -302,6 +304,7 @@ impl SyncManager { sync_recv: mpsc::UnboundedReceiver>, sampling_config: SamplingConfig, fork_context: Arc, + batch_buffer_size: usize, ) -> Self { let network_globals = beacon_processor.network_globals.clone(); Self { @@ -313,7 +316,7 @@ impl SyncManager { beacon_chain.clone(), fork_context.clone(), ), - range_sync: RangeSync::new(beacon_chain.clone()), + range_sync: RangeSync::new(beacon_chain.clone(), batch_buffer_size), backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals), block_lookups: BlockLookups::new(), notified_unknown_roots: LRUTimeCache::new(Duration::from_secs( diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 76721ec5aa..44b2b1937d 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -26,7 +26,7 @@ use types::{Epoch, EthSpec, Hash256, Slot}; pub const EPOCHS_PER_BATCH: u64 = 1; /// The maximum number of batches to queue before requesting more. -const BATCH_BUFFER_SIZE: u8 = 5; +pub const BATCH_BUFFER_SIZE: usize = 5; /// A return type for functions that act on a `Chain` which informs the caller whether the chain /// has been completed and should be removed or to be kept if further processing is @@ -119,6 +119,9 @@ pub struct SyncingChain { /// The current processing batch, if any. current_processing_batch: Option, + + /// The maximum number of batches to queue before requesting more. + batch_buffer_size: usize, } #[derive(PartialEq, Debug)] @@ -147,6 +150,7 @@ impl SyncingChain { target_head_root: Hash256, peer_id: PeerId, chain_type: SyncingChainType, + batch_buffer_size: usize, ) -> Self { SyncingChain { id, @@ -163,6 +167,7 @@ impl SyncingChain { attempted_optimistic_starts: HashSet::default(), state: ChainSyncingState::Stopped, current_processing_batch: None, + batch_buffer_size, } } @@ -1075,7 +1080,7 @@ impl SyncingChain { .iter() .filter(|&(_epoch, batch)| in_buffer(batch)) .count() - > BATCH_BUFFER_SIZE as usize + >= self.batch_buffer_size as usize { return None; } @@ -1105,28 +1110,28 @@ impl SyncingChain { /// batch states. See [BatchState::visualize] for symbol definitions. #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)] fn visualize_batch_state(&self) -> String { - let mut visualization_string = String::with_capacity((BATCH_BUFFER_SIZE * 3) as usize); + let mut visualization_string = String::with_capacity((self.batch_buffer_size * 3) as usize); // Start of the block visualization_string.push('['); - for mut batch_index in 0..BATCH_BUFFER_SIZE { + for mut batch_index in 0..self.batch_buffer_size { if let Some(batch) = self .batches .get(&(self.processing_target + batch_index as u64 * EPOCHS_PER_BATCH)) { visualization_string.push(batch.visualize()); - if batch_index != BATCH_BUFFER_SIZE { + if batch_index != self.batch_buffer_size { // Add a comma in between elements visualization_string.push(','); } } else { // No batch exists, it is on our list to be downloaded // Fill in the rest of the gaps - while batch_index < BATCH_BUFFER_SIZE { + while batch_index < self.batch_buffer_size { visualization_string.push('E'); // Add a comma between the empty batches - if batch_index < BATCH_BUFFER_SIZE.saturating_sub(1) { + if batch_index < self.batch_buffer_size.saturating_sub(1) { visualization_string.push(',') } batch_index += 1; diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 454f7c02d1..44ce43d56a 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -51,6 +51,8 @@ pub struct ChainCollection { head_chains: FnvHashMap>, /// The current sync state of the process. state: RangeSyncState, + /// The maximum number of batches to queue before requesting more. + batch_buffer_size: usize, } impl ChainCollection { @@ -61,12 +63,13 @@ impl ChainCollection { .chain(self.head_chains.values()) } - pub fn new(beacon_chain: Arc>) -> Self { + pub fn new(beacon_chain: Arc>, batch_buffer_size: usize) -> Self { ChainCollection { beacon_chain, finalized_chains: FnvHashMap::default(), head_chains: FnvHashMap::default(), state: RangeSyncState::Idle, + batch_buffer_size, } } @@ -504,6 +507,7 @@ impl ChainCollection { target_head_root, peer, sync_type.into(), + self.batch_buffer_size, ); debug!( diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index 225b536d1d..67479f9a1e 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -10,6 +10,6 @@ mod sync_type; pub use batch::{ BatchConfig, BatchInfo, BatchOperationOutcome, BatchPeers, BatchProcessingResult, BatchState, }; -pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH}; +pub use chain::{BatchId, ChainId, BATCH_BUFFER_SIZE, EPOCHS_PER_BATCH}; pub use range::RangeSync; pub use sync_type::RangeSyncType; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 62d1825268..8f52fa7a49 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -90,10 +90,10 @@ where name = "range_sync", skip_all )] - pub fn new(beacon_chain: Arc>) -> Self { + pub fn new(beacon_chain: Arc>, batch_buffer_size: usize) -> Self { RangeSync { beacon_chain: beacon_chain.clone(), - chains: ChainCollection::new(beacon_chain), + chains: ChainCollection::new(beacon_chain, batch_buffer_size), failed_chains: LRUTimeCache::new(std::time::Duration::from_secs( FAILED_CHAINS_EXPIRY_SECONDS, )), diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index f26a467f27..8477b46958 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -2,6 +2,7 @@ use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::sync::block_lookups::{ BlockLookupSummary, PARENT_DEPTH_TOLERANCE, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, }; +use crate::sync::range_sync::BATCH_BUFFER_SIZE; use crate::sync::{ manager::{BlockProcessType, BlockProcessingResult, SyncManager}, peer_sampling::SamplingConfig, @@ -59,16 +60,29 @@ pub enum PeersConfig { SupernodeOnly, } +pub struct TestOptions { + /// If the node created by this test harness is a supernode + pub is_supernode: bool, + /// The maximum number of batches to queue before requesting more. + pub batch_buffer_size: usize, +} + impl TestRig { pub fn test_setup() -> Self { - Self::test_setup_with_options(false) + Self::test_setup_with_options(TestOptions { + is_supernode: false, + batch_buffer_size: BATCH_BUFFER_SIZE, + }) } pub fn test_setup_as_supernode() -> Self { - Self::test_setup_with_options(true) + Self::test_setup_with_options(TestOptions { + is_supernode: true, + batch_buffer_size: BATCH_BUFFER_SIZE, + }) } - fn test_setup_with_options(is_supernode: bool) -> Self { + pub fn test_setup_with_options(options: TestOptions) -> Self { // Use `fork_from_env` logic to set correct fork epochs let spec = test_spec::(); @@ -101,7 +115,7 @@ impl TestRig { Vec::new(), network_config, chain.spec.clone(), - is_supernode, + options.is_supernode, )); let (beacon_processor, beacon_processor_rx) = NetworkBeaconProcessor::null_for_testing( globals, @@ -143,6 +157,7 @@ impl TestRig { required_successes: vec![SAMPLING_REQUIRED_SUCCESSES], }, fork_context, + options.batch_buffer_size, ), harness, fork_name, diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index 642f92ee66..382965ec97 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -4,12 +4,12 @@ use crate::status::ToStatusMessage; use crate::sync::manager::SLOT_IMPORT_TOLERANCE; use crate::sync::network_context::{BlockComponentsByRangeRequestStep, RangeRequestId}; use crate::sync::range_sync::{BatchId, BatchState, RangeSyncType}; +use crate::sync::tests::lookups::TestOptions; use crate::sync::{ChainId, SyncMessage}; use beacon_chain::data_column_verification::CustodyDataColumn; -use beacon_chain::test_utils::{test_spec, AttestationStrategy, BlockStrategy}; +use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy}; use beacon_chain::{block_verification_types::RpcBlock, EngineState, NotifyExecutionLayer}; use beacon_processor::WorkType; -use lighthouse_network::discovery::{peer_id_to_node_id, CombinedKey}; use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, DataColumnsByRangeRequest, OldBlocksByRangeRequest, }; @@ -19,16 +19,13 @@ use lighthouse_network::service::api_types::{ DataColumnsByRangeRequestId, SyncRequestId, }; use lighthouse_network::types::SyncState; -use lighthouse_network::{Enr, EnrExt, PeerId, SyncInfo}; -use rand::SeedableRng; -use rand_chacha::ChaCha20Rng; +use lighthouse_network::{PeerId, SyncInfo}; use std::collections::HashSet; use std::time::Duration; -use types::data_column_custody_group::compute_subnets_for_node; use types::{ - BeaconBlock, BlobSidecarList, BlockImportSource, ColumnIndex, DataColumnSidecar, - DataColumnSubnetId, Epoch, EthSpec, Hash256, KzgCommitment, MinimalEthSpec as E, Signature, - SignedBeaconBlock, SignedBeaconBlockHash, Slot, VariableList, + BeaconBlock, BlobSidecarList, BlockImportSource, ColumnIndex, DataColumnSidecar, Epoch, + EthSpec, Hash256, KzgCommitment, MinimalEthSpec as E, Signature, SignedBeaconBlock, + SignedBeaconBlockHash, Slot, VariableList, }; const D: Duration = Duration::new(0, 0); @@ -93,6 +90,12 @@ struct RequestFilter { column_index: Option, } +const NO_FILTER: RequestFilter = RequestFilter { + peer: None, + epoch: None, + column_index: None, +}; + impl RequestFilter { fn peer(mut self, peer: PeerId) -> Self { self.peer = Some(peer); @@ -1094,7 +1097,7 @@ fn finalized_sync_not_enough_custody_peers_on_start(config: Config) { // The SyncingChain has a single peer, so it can issue blocks_by_range requests. However, it // doesn't have enough peers to cover all columns - r.progress_until_no_events(filter(), complete()); + r.progress_until_no_events(NO_FILTER, complete()); r.expect_no_active_rpc_requests(); // Here we have a batch with partially completed block_components_by_range requests. The batch @@ -1108,7 +1111,7 @@ fn finalized_sync_not_enough_custody_peers_on_start(config: Config) { // We still need to add enough peers to trigger batch downloads with idle peers. Same issue as // the test above. - r.progress_until_no_events(filter(), complete()); + r.progress_until_no_events(NO_FILTER, complete()); r.expect_no_active_rpc_requests(); r.expect_no_active_block_components_by_range_requests(); // TOOD(das): For now this tests don't complete sync. We can't track beacon processor Work @@ -1134,7 +1137,7 @@ fn finalized_sync_single_custody_peer_failure() { // Progress all blocks_by_range and columns_by_range requests but respond empty for a single // column index r.progress_until_no_events( - filter(), + NO_FILTER, complete().custody_failure_at_index(column_index_to_fail), ); r.expect_penalties("custody_failure"); @@ -1162,7 +1165,13 @@ fn finalized_sync_single_custody_peer_failure() { #[test] fn finalized_sync_permanent_custody_peer_failure() { - let mut r = TestRig::test_setup(); + let mut r = TestRig::test_setup_with_options(TestOptions { + is_supernode: false, + // The default buffer size is 5, but we want to manually complete only the batch for epoch + // 0. By setting this buffer to 1 sync will create a single batch until it completes. We can + // do better assertions of state assuming there's only one batch and logs are cleaner. + batch_buffer_size: 1, + }); // Only run post-PeerDAS if !r.fork_name.fulu_enabled() { return; @@ -1192,7 +1201,7 @@ fn finalized_sync_permanent_custody_peer_failure() { // Some peer had a costudy failure at `column_index` so sync should do a single extra request // for that index and epoch. We want to make sure that the request goes to different peer - // than the attempts before. + // than the attempted before. let reqs = r.find_data_by_range_request(filter().epoch(0).column_index(column_index_to_fail)); let req_peer = reqs.peer(); @@ -1212,35 +1221,3 @@ fn finalized_sync_permanent_custody_peer_failure() { // custody_by_range request is still active waiting for a new peer to connect r.expect_active_block_components_by_range_request_on_custody_step(); } - -#[test] -#[ignore] -fn mine_peerids() { - let spec = test_spec::(); - let mut rng = ChaCha20Rng::from_seed([0u8; 32]); - - let expected_subnets = (0..3) - .map(|i| DataColumnSubnetId::new(i as u64)) - .collect::>(); - - for i in 0..usize::MAX { - let key: CombinedKey = k256::ecdsa::SigningKey::random(&mut rng).into(); - let enr = Enr::builder().build(&key).unwrap(); - let peer_id = enr.peer_id(); - // Use default custody groups count - let node_id = peer_id_to_node_id(&peer_id).expect("convert peer_id to node_id"); - let subnets = compute_subnets_for_node(node_id.raw(), spec.custody_requirement, &spec) - .expect("should compute custody subnets"); - if expected_subnets == subnets { - panic!("{:?}", subnets); - } else { - let matches = expected_subnets - .iter() - .filter(|index| subnets.contains(index)) - .count(); - if matches > 0 { - println!("{i} {:?}", matches); - } - } - } -}