diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 368faae612..be11a075f4 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -2922,7 +2922,7 @@ pub fn serve( p2p_addresses, discovery_addresses, metadata: from_meta_data::( - &network_globals.local_metadata, + &network_globals.local_metadata(), &chain.spec, ), })) @@ -4816,11 +4816,7 @@ pub fn serve( Ok(http_server) } -fn from_meta_data( - meta_data: &RwLock>, - spec: &ChainSpec, -) -> api_types::MetaData { - let meta_data = meta_data.read(); +fn from_meta_data(meta_data: &MetaData, spec: &ChainSpec) -> api_types::MetaData { let format_hex = |bytes: &[u8]| format!("0x{}", hex::encode(bytes)); let seq_number = *meta_data.seq_number(); diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index a5cd94536d..bd4fae750c 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -137,8 +137,10 @@ pub async fn publish_block>( spawn_build_data_sidecar_task(chain.clone(), block.clone(), unverified_blobs)?; // Gossip verify the block and blobs/data columns separately. - let gossip_verified_block_result = unverified_block - .into_gossip_verified_block(&chain, network_globals.custody_columns_count() as usize); + let gossip_verified_block_result = unverified_block.into_gossip_verified_block( + &chain, + network_globals.custody_columns_count(block.slot()) as usize, + ); let block_root = block_root.unwrap_or_else(|| { gossip_verified_block_result.as_ref().map_or_else( |_| block.canonical_root(), @@ -223,7 +225,7 @@ pub async fn publish_block>( publish_column_sidecars(network_tx, &gossip_verified_columns, &chain).map_err(|_| { warp_utils::reject::custom_server_error("unable to publish data column sidecars".into()) })?; - let sampling_columns_indices = &network_globals.sampling_columns; + let sampling_columns_indices = &network_globals.sampling_columns(block.slot()); let sampling_columns = gossip_verified_columns .into_iter() .flatten() diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 3f0b5b96ef..0d5e700913 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -40,7 +40,8 @@ use std::sync::Arc; use std::time::Duration; use tracing::{debug, info, instrument, trace, warn}; use types::{ - consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId, + consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, Epoch, EthSpec, ForkContext, Slot, + SubnetId, }; use types::{ChainSpec, ForkName}; use utils::{build_transport, strip_peer_id, Context as ServiceContext}; @@ -819,10 +820,18 @@ impl Network { } } + let fork_epoch = self + .fork_context + .spec + .fork_epoch(new_fork) + .unwrap_or(Epoch::new(0)); + // Subscribe to core topics for the new fork for kind in core_topics_to_subscribe::( new_fork, - &self.network_globals.as_topic_config(), + &self + .network_globals + .as_topic_config(fork_epoch.start_slot(E::slots_per_epoch())), &self.fork_context.spec, ) { let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest); @@ -1350,20 +1359,14 @@ impl Network { .expect("Local discovery must have sync committee bitfield"); // write lock scope - let mut meta_data_w = self.network_globals.local_metadata.write(); + self.network_globals + .update_metadata_bitfields(local_attnets, local_syncnets); - *meta_data_w.seq_number_mut() += 1; - *meta_data_w.attnets_mut() = local_attnets; - if let Ok(syncnets) = meta_data_w.syncnets_mut() { - *syncnets = local_syncnets; - } - let seq_number = *meta_data_w.seq_number(); - let meta_data = meta_data_w.clone(); - - drop(meta_data_w); - self.eth2_rpc_mut().update_seq_number(seq_number); + let metadata = self.network_globals.local_metadata(); + self.eth2_rpc_mut() + .update_seq_number(*metadata.seq_number()); // Save the updated metadata to disk - utils::save_metadata_to_disk(&self.network_dir, meta_data); + utils::save_metadata_to_disk(&self.network_dir, metadata); } /// Sends a Ping request to the peer. @@ -1411,7 +1414,7 @@ impl Network { request_id: rpc::RequestId, peer_id: PeerId, ) { - let metadata = self.network_globals.local_metadata.read().clone(); + let metadata = self.network_globals.local_metadata(); // The encoder is responsible for sending the negotiated version of the metadata let event = RpcResponse::Success(RpcSuccessResponse::MetaData(Arc::new(metadata))); self.eth2_rpc_mut() diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index f41f60008e..1db01fb2f2 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -7,11 +7,10 @@ use crate::{Client, Enr, EnrExt, GossipTopic, Multiaddr, NetworkConfig, PeerId}; use parking_lot::RwLock; use std::collections::HashSet; use std::sync::Arc; -use tracing::error; use types::data_column_custody_group::{ compute_columns_for_custody_group, compute_subnets_from_custody_group, get_custody_groups, }; -use types::{ChainSpec, ColumnIndex, DataColumnSubnetId, EthSpec}; +use types::{BitVector, ChainSpec, ColumnIndex, DataColumnSubnetId, EthSpec, Slot}; pub struct NetworkGlobals { /// The current local ENR. @@ -23,7 +22,7 @@ pub struct NetworkGlobals { /// The collection of known peers. pub peers: RwLock>, // The local meta data of our node. - pub local_metadata: RwLock>, + local_metadata: RwLock>, /// The current gossipsub topic subscriptions. pub gossipsub_subscriptions: RwLock>, /// The current sync status of the node. @@ -31,16 +30,20 @@ pub struct NetworkGlobals { /// The current state of the backfill sync. pub backfill_state: RwLock, /// The computed sampling subnets and columns is stored to avoid re-computing. - pub sampling_subnets: HashSet, - pub sampling_columns: HashSet, - /// Constant custody group count (CGC) set at startup - custody_group_count: u64, + all_sampling_subnets: Vec, + all_sampling_columns: Vec, + /// Dynamic custody group count (CGC) + custody_group_count: RwLock, /// Network-related configuration. Immutable after initialization. pub config: Arc, /// Ethereum chain configuration. Immutable after initialization. pub spec: Arc, } +struct CustodyGroupCount { + value: u64, +} + impl NetworkGlobals { pub fn new( enr: Enr, @@ -52,39 +55,23 @@ impl NetworkGlobals { ) -> Self { let node_id = enr.node_id().raw(); - let custody_group_count = match local_metadata.custody_group_count() { - Ok(&cgc) if cgc <= spec.number_of_custody_groups => cgc, - _ => { - if spec.is_peer_das_scheduled() { - error!( - info = "falling back to default custody requirement", - "custody_group_count from metadata is either invalid or not set. This is a bug!" - ); - } - spec.custody_requirement - } - }; - // The below `expect` calls will panic on start up if the chain spec config values used // are invalid - let sampling_size = spec - .sampling_size(custody_group_count) - .expect("should compute node sampling size from valid chain spec"); - let custody_groups = get_custody_groups(node_id, sampling_size, &spec) + let custody_groups = get_custody_groups(node_id, spec.number_of_custody_groups, &spec) .expect("should compute node custody groups"); - let mut sampling_subnets = HashSet::new(); + let mut all_sampling_subnets = vec![]; for custody_index in &custody_groups { let subnets = compute_subnets_from_custody_group(*custody_index, &spec) .expect("should compute custody subnets for node"); - sampling_subnets.extend(subnets); + all_sampling_subnets.extend(subnets); } - let mut sampling_columns = HashSet::new(); + let mut all_sampling_columns = vec![]; for custody_index in &custody_groups { let columns = compute_columns_for_custody_group(*custody_index, &spec) .expect("should compute custody columns for node"); - sampling_columns.extend(columns); + all_sampling_columns.extend(columns); } NetworkGlobals { @@ -96,9 +83,9 @@ impl NetworkGlobals { gossipsub_subscriptions: RwLock::new(HashSet::new()), sync_state: RwLock::new(SyncState::Stalled), backfill_state: RwLock::new(BackFillState::Paused), - sampling_subnets, - sampling_columns, - custody_group_count, + all_sampling_subnets, + all_sampling_columns, + custody_group_count: RwLock::new(CustodyGroupCount { value: 0 }), config, spec, } @@ -106,6 +93,7 @@ impl NetworkGlobals { /// Returns the local ENR from the underlying Discv5 behaviour that external peers may connect /// to. + /// TODO: This contains duplicate metadata. Test who is consuming this method pub fn local_enr(&self) -> Enr { self.local_enr.read().clone() } @@ -115,24 +103,60 @@ impl NetworkGlobals { *self.peer_id.read() } + pub fn local_metadata(&self) -> MetaData { + todo!(); + } + /// Returns the list of `Multiaddr` that the underlying libp2p instance is listening on. pub fn listen_multiaddrs(&self) -> Vec { self.listen_multiaddrs.read().clone() } /// Returns true if this node is configured as a PeerDAS supernode - pub fn is_supernode(&self) -> bool { - self.custody_group_count == self.spec.number_of_custody_groups + pub fn is_supernode(&self, slot: Slot) -> bool { + self.custody_group_count(slot) == self.spec.number_of_custody_groups + } + + pub fn sampling_subnets(&self, slot: Slot) -> &[DataColumnSubnetId] { + let cgc = self.custody_group_count(slot) as usize; + // Returns as many elements as possible, can't panic as it's upper bounded by len + &self.all_sampling_subnets[..self.all_sampling_subnets.len().min(cgc)] + } + + pub fn sampling_columns(&self, slot: Slot) -> &[ColumnIndex] { + let cgc = self.custody_group_count(slot) as usize; + // Returns as many elements as possible, can't panic as it's upper bounded by len + &self.all_sampling_columns[..self.all_sampling_columns.len().min(cgc)] + } + + /// Returns the custody group count (CGC) + fn custody_group_count(&self, slot: Slot) -> u64 { + let cgc = self.custody_group_count.read().value; + todo!("CGC at slot {slot} {cgc}"); } /// Returns the count of custody columns this node must sample for block import - pub fn custody_columns_count(&self) -> u64 { + pub fn custody_columns_count(&self, slot: Slot) -> u64 { // This only panics if the chain spec contains invalid values self.spec - .sampling_size(self.custody_group_count) + .sampling_size(self.custody_group_count(slot)) .expect("should compute node sampling size from valid chain spec") } + pub fn update_metadata_bitfields( + &self, + local_attnets: BitVector, + local_syncnets: BitVector, + ) { + let mut meta_data_w = self.local_metadata.write(); + + *meta_data_w.seq_number_mut() += 1; + *meta_data_w.attnets_mut() = local_attnets; + if let Ok(syncnets) = meta_data_w.syncnets_mut() { + *syncnets = local_syncnets; + } + } + /// Returns the number of libp2p connected peers. pub fn connected_peers(&self) -> usize { self.peers.read().connected_peer_ids().count() @@ -195,12 +219,12 @@ impl NetworkGlobals { } /// Returns the TopicConfig to compute the set of Gossip topics for a given fork - pub fn as_topic_config(&self) -> TopicConfig { + pub fn as_topic_config(&self, slot: Slot) -> TopicConfig { TopicConfig { enable_light_client_server: self.config.enable_light_client_server, subscribe_all_subnets: self.config.subscribe_all_subnets, subscribe_all_data_column_subnets: self.config.subscribe_all_data_column_subnets, - sampling_subnets: &self.sampling_subnets, + sampling_subnets: self.sampling_subnets(slot), } } @@ -249,6 +273,7 @@ mod test { let subnet_sampling_size = spec.sampling_size(custody_group_count).unwrap(); let metadata = get_metadata(custody_group_count); let config = Arc::new(NetworkConfig::default()); + let slot = Slot::new(0); let globals = NetworkGlobals::::new_test_globals_with_metadata( vec![], @@ -257,7 +282,7 @@ mod test { Arc::new(spec), ); assert_eq!( - globals.sampling_subnets.len(), + globals.sampling_subnets(slot).len(), subnet_sampling_size as usize ); } @@ -272,6 +297,7 @@ mod test { let subnet_sampling_size = spec.sampling_size(custody_group_count).unwrap(); let metadata = get_metadata(custody_group_count); let config = Arc::new(NetworkConfig::default()); + let slot = Slot::new(0); let globals = NetworkGlobals::::new_test_globals_with_metadata( vec![], @@ -280,7 +306,7 @@ mod test { Arc::new(spec), ); assert_eq!( - globals.sampling_columns.len(), + globals.sampling_columns(slot).len(), subnet_sampling_size as usize ); } diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index 56b97303d3..8a8eb6b849 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -1,6 +1,5 @@ use gossipsub::{IdentTopic as Topic, TopicHash}; use serde::{Deserialize, Serialize}; -use std::collections::HashSet; use strum::AsRefStr; use types::{ChainSpec, DataColumnSubnetId, EthSpec, ForkName, SubnetId, SyncSubnetId, Unsigned}; @@ -30,7 +29,7 @@ pub struct TopicConfig<'a> { pub enable_light_client_server: bool, pub subscribe_all_subnets: bool, pub subscribe_all_data_column_subnets: bool, - pub sampling_subnets: &'a HashSet, + pub sampling_subnets: &'a [DataColumnSubnetId], } /// Returns all the topics the node should subscribe at `fork_name` @@ -121,7 +120,7 @@ pub fn is_fork_non_core_topic(topic: &GossipTopic, _fork_name: ForkName) -> bool pub fn all_topics_at_fork(fork: ForkName, spec: &ChainSpec) -> Vec { // Compute the worst case of all forks - let sampling_subnets = HashSet::from_iter(spec.all_data_column_sidecar_subnets()); + let sampling_subnets = spec.all_data_column_sidecar_subnets().collect::>(); let opts = TopicConfig { enable_light_client_server: true, subscribe_all_subnets: true, @@ -512,11 +511,11 @@ mod tests { spec } - fn get_sampling_subnets() -> HashSet { - HashSet::new() + fn get_sampling_subnets() -> Vec { + vec![] } - fn get_topic_config(sampling_subnets: &HashSet) -> TopicConfig { + fn get_topic_config(sampling_subnets: &[DataColumnSubnetId]) -> TopicConfig { TopicConfig { enable_light_client_server: false, subscribe_all_subnets: false, @@ -562,7 +561,7 @@ mod tests { #[test] fn test_core_topics_to_subscribe() { let spec = get_spec(); - let s = HashSet::from_iter([1, 2].map(DataColumnSubnetId::new)); + let s = [1_u64, 2].map(DataColumnSubnetId::new); let mut topic_config = get_topic_config(&s); topic_config.enable_light_client_server = true; let latest_fork = *ForkName::list_all().last().unwrap(); diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 7c38ae9d75..ee104930db 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -16,6 +16,7 @@ use strum::AsRefStr; use strum::IntoEnumIterator; use types::DataColumnSubnetId; use types::EthSpec; +use types::Slot; pub const SUCCESS: &str = "SUCCESS"; pub const FAILURE: &str = "FAILURE"; @@ -742,7 +743,7 @@ pub fn update_gossip_metrics( } } -pub fn update_sync_metrics(network_globals: &Arc>) { +pub fn update_sync_metrics(network_globals: &Arc>, clock_slot: Slot) { // reset the counts if PEERS_PER_SYNC_TYPE .as_ref() @@ -771,7 +772,7 @@ pub fn update_sync_metrics(network_globals: &Arc>) let all_column_subnets = (0..network_globals.spec.data_column_sidecar_subnet_count).map(DataColumnSubnetId::new); - let custody_column_subnets = network_globals.sampling_subnets.iter(); + let custody_column_subnets = network_globals.sampling_subnets(clock_slot).iter(); // Iterate all subnet values to set to zero the empty entries in peers_per_column_subnet for subnet in all_column_subnets { diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index f104bbf1bc..5e7ac7b837 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1128,6 +1128,7 @@ impl NetworkBeaconProcessor { ) { let processing_start_time = Instant::now(); let block_root = verified_data_column.block_root(); + let block_slot = verified_data_column.slot(); let data_column_slot = verified_data_column.slot(); let data_column_index = verified_data_column.id().index; @@ -1159,7 +1160,8 @@ impl NetworkBeaconProcessor { "Processed data column, waiting for other components" ); - self.attempt_data_column_reconstruction(block_root).await; + self.attempt_data_column_reconstruction(block_root, block_slot) + .await; } }, Err(BlockError::DuplicateFullyImported(_)) => { @@ -1259,7 +1261,7 @@ impl NetworkBeaconProcessor { .clone() .verify_block_for_gossip( block.clone(), - self.network_globals.custody_columns_count() as usize, + self.network_globals.custody_columns_count(block.slot()) as usize, ) .await; diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 1329936932..b74a5acf6a 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -927,7 +927,7 @@ impl NetworkBeaconProcessor { block_root: Hash256, publish_blobs: bool, ) { - let is_supernode = self.network_globals.is_supernode(); + let is_supernode = self.network_globals.is_supernode(block.slot()); let self_cloned = self.clone(); let publish_fn = move |blobs_or_data_column| { @@ -1008,9 +1008,10 @@ impl NetworkBeaconProcessor { async fn attempt_data_column_reconstruction( self: &Arc, block_root: Hash256, + block_slot: Slot, ) -> Option { // Only supernodes attempt reconstruction - if !self.network_globals.is_supernode() { + if !self.network_globals.is_supernode(block_slot) { return None; } diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 48ae26c826..a8779fb8c7 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -383,8 +383,9 @@ impl NetworkBeaconProcessor { ); // Attempt reconstruction here before notifying sync, to avoid sending out more requests // that we may no longer need. - if let Some(availability) = - self.attempt_data_column_reconstruction(block_root).await + if let Some(availability) = self + .attempt_data_column_reconstruction(block_root, slot) + .await { result = Ok(availability) } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index d25e8509a4..68a029babd 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -420,9 +420,9 @@ impl NetworkService { metrics::update_gossip_metrics::( self.libp2p.gossipsub(), &self.network_globals, - ); + ); // update sync metrics - metrics::update_sync_metrics(&self.network_globals); + metrics::update_sync_metrics(&self.network_globals, self.clock_slot()); } _ = self.gossipsub_parameter_update.tick() => self.update_gossipsub_parameters(), @@ -690,7 +690,7 @@ impl NetworkService { let mut subscribed_topics: Vec = vec![]; for topic_kind in core_topics_to_subscribe::( self.fork_context.current_fork(), - &self.network_globals.as_topic_config(), + &self.network_globals.as_topic_config(self.clock_slot()), &self.fork_context.spec, ) { for fork_digest in self.required_gossip_fork_digests() { @@ -843,7 +843,7 @@ impl NetworkService { fn subscribed_core_topics(&self) -> bool { let core_topics = core_topics_to_subscribe::( self.fork_context.current_fork(), - &self.network_globals.as_topic_config(), + &self.network_globals.as_topic_config(self.clock_slot()), &self.fork_context.spec, ); let core_topics: HashSet<&GossipKind> = HashSet::from_iter(&core_topics); @@ -853,6 +853,10 @@ impl NetworkService { core_topics.is_subset(&subscribed_topics) } + + fn clock_slot(&self) -> Slot { + self.beacon_chain.slot().unwrap_or(Slot::new(0)) + } } /// Returns a `Sleep` that triggers after the next change in the beacon chain fork version. diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index 8eefb2d675..1f2f427580 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -13,7 +13,7 @@ use parking_lot::RwLock; use std::collections::HashSet; use std::sync::Arc; use types::blob_sidecar::FixedBlobSidecarList; -use types::{DataColumnSidecarList, SignedBeaconBlock}; +use types::{DataColumnSidecarList, SignedBeaconBlock, Slot}; use super::single_block_lookup::{ComponentRequests, DownloadResult}; use super::SingleLookupId; @@ -45,6 +45,7 @@ pub trait RequestState { id: Id, lookup_peers: Arc>>, expected_blobs: usize, + block_slot: Slot, cx: &mut SyncNetworkContext, ) -> Result; @@ -80,6 +81,7 @@ impl RequestState for BlockRequestState { id: SingleLookupId, lookup_peers: Arc>>, _: usize, + _: Slot, cx: &mut SyncNetworkContext, ) -> Result { cx.block_lookup_request(id, lookup_peers, self.requested_block_root) @@ -128,6 +130,7 @@ impl RequestState for BlobRequestState { id: Id, lookup_peers: Arc>>, expected_blobs: usize, + _: Slot, cx: &mut SyncNetworkContext, ) -> Result { cx.blob_lookup_request(id, lookup_peers, self.block_root, expected_blobs) @@ -155,7 +158,7 @@ impl RequestState for BlobRequestState { fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str> { match &mut request.component_requests { ComponentRequests::WaitingForBlock => Err("waiting for block"), - ComponentRequests::ActiveBlobRequest(request, _) => Ok(request), + ComponentRequests::ActiveBlobRequest(request, ..) => Ok(request), ComponentRequests::ActiveCustodyRequest { .. } => Err("expecting custody request"), ComponentRequests::NotNeeded { .. } => Err("not needed"), } @@ -176,9 +179,10 @@ impl RequestState for CustodyRequestState { id: Id, lookup_peers: Arc>>, _: usize, + block_slot: Slot, cx: &mut SyncNetworkContext, ) -> Result { - cx.custody_lookup_request(id, self.block_root, lookup_peers) + cx.custody_lookup_request(id, self.block_root, block_slot, lookup_peers) .map_err(LookupRequestError::SendFailedNetwork) } @@ -210,7 +214,7 @@ impl RequestState for CustodyRequestState { match &mut request.component_requests { ComponentRequests::WaitingForBlock => Err("waiting for block"), ComponentRequests::ActiveBlobRequest { .. } => Err("expecting blob request"), - ComponentRequests::ActiveCustodyRequest(request) => Ok(request), + ComponentRequests::ActiveCustodyRequest(request, ..) => Ok(request), ComponentRequests::NotNeeded { .. } => Err("not needed"), } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 3789dbe91e..39033198e5 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -75,8 +75,8 @@ pub struct SingleBlockLookup { #[derive(Debug)] pub(crate) enum ComponentRequests { WaitingForBlock, - ActiveBlobRequest(BlobRequestState, usize), - ActiveCustodyRequest(CustodyRequestState), + ActiveBlobRequest(BlobRequestState, usize, Slot), + ActiveCustodyRequest(CustodyRequestState, usize, Slot), // When printing in debug this state display the reason why it's not needed #[allow(dead_code)] NotNeeded(&'static str), @@ -161,8 +161,10 @@ impl SingleBlockLookup { self.block_request_state.state.is_processed() && match &self.component_requests { ComponentRequests::WaitingForBlock => false, - ComponentRequests::ActiveBlobRequest(request, _) => request.state.is_processed(), - ComponentRequests::ActiveCustodyRequest(request) => request.state.is_processed(), + ComponentRequests::ActiveBlobRequest(request, ..) => request.state.is_processed(), + ComponentRequests::ActiveCustodyRequest(request, ..) => { + request.state.is_processed() + } ComponentRequests::NotNeeded { .. } => true, } } @@ -176,10 +178,10 @@ impl SingleBlockLookup { // check if the`block_request_state.state.is_awaiting_event(). However we already // checked that above, so `WaitingForBlock => false` is equivalent. ComponentRequests::WaitingForBlock => false, - ComponentRequests::ActiveBlobRequest(request, _) => { + ComponentRequests::ActiveBlobRequest(request, ..) => { request.state.is_awaiting_event() } - ComponentRequests::ActiveCustodyRequest(request) => { + ComponentRequests::ActiveCustodyRequest(request, ..) => { request.state.is_awaiting_event() } ComponentRequests::NotNeeded { .. } => false, @@ -193,7 +195,7 @@ impl SingleBlockLookup { cx: &mut SyncNetworkContext, ) -> Result { // TODO: Check what's necessary to download, specially for blobs - self.continue_request::>(cx, 0)?; + self.continue_request::>(cx, 0, Slot::new(0))?; if let ComponentRequests::WaitingForBlock = self.component_requests { let downloaded_block = self @@ -213,6 +215,7 @@ impl SingleBlockLookup { } }) { let expected_blobs = block.num_expected_blobs(); + let block_slot = block.slot(); let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); if expected_blobs == 0 { self.component_requests = ComponentRequests::NotNeeded("no data"); @@ -220,10 +223,13 @@ impl SingleBlockLookup { self.component_requests = ComponentRequests::ActiveBlobRequest( BlobRequestState::new(self.block_root), expected_blobs, + block_slot, ); } else if cx.chain.should_fetch_custody_columns(block_epoch) { self.component_requests = ComponentRequests::ActiveCustodyRequest( CustodyRequestState::new(self.block_root), + expected_blobs, + block_slot, ); } else { self.component_requests = ComponentRequests::NotNeeded("outside da window"); @@ -244,11 +250,19 @@ impl SingleBlockLookup { match &self.component_requests { ComponentRequests::WaitingForBlock => {} // do nothing - ComponentRequests::ActiveBlobRequest(_, expected_blobs) => { - self.continue_request::>(cx, *expected_blobs)? + ComponentRequests::ActiveBlobRequest(_, expected_blobs, block_slot) => { + self.continue_request::>( + cx, + *expected_blobs, + *block_slot, + )? } - ComponentRequests::ActiveCustodyRequest(_) => { - self.continue_request::>(cx, 0)? + ComponentRequests::ActiveCustodyRequest(_, expected_blobs, block_slot) => { + self.continue_request::>( + cx, + *expected_blobs, + *block_slot, + )? } ComponentRequests::NotNeeded { .. } => {} // do nothing } @@ -268,6 +282,7 @@ impl SingleBlockLookup { &mut self, cx: &mut SyncNetworkContext, expected_blobs: usize, + block_slot: Slot, ) -> Result<(), LookupRequestError> { let id = self.id; let awaiting_parent = self.awaiting_parent.is_some(); @@ -287,7 +302,7 @@ impl SingleBlockLookup { let request = R::request_state_mut(self) .map_err(|e| LookupRequestError::BadState(e.to_owned()))?; - match request.make_request(id, peers, expected_blobs, cx)? { + match request.make_request(id, peers, expected_blobs, block_slot, cx)? { LookupRequestResult::RequestSent(req_id) => { // Lookup sync event safety: If make_request returns `RequestSent`, we are // guaranteed that `BlockLookups::on_download_response` will be called exactly diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 16fcf93bcf..387d6e2087 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -394,12 +394,13 @@ impl SyncNetworkContext { id: self.next_id(), requester, }; + let request_start_slot = Slot::new(*request.start_slot()); // Compute custody column peers before sending the blocks_by_range request. If we don't have // enough peers, error here. let data_column_requests = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { - let column_indexes = self.network_globals().sampling_columns.clone(); - Some(self.make_columns_by_range_requests(request.clone(), &column_indexes)?) + let column_indexes = self.network_globals().sampling_columns(request_start_slot); + Some(self.make_columns_by_range_requests(request.clone(), column_indexes)?) } else { None }; @@ -430,10 +431,8 @@ impl SyncNetworkContext { Some(( data_column_requests, self.network_globals() - .sampling_columns - .iter() - .cloned() - .collect::>(), + .sampling_columns(request_start_slot) + .to_vec(), )) } else { None @@ -448,7 +447,7 @@ impl SyncNetworkContext { fn make_columns_by_range_requests( &self, request: BlocksByRangeRequest, - custody_indexes: &HashSet, + custody_indexes: &[ColumnIndex], ) -> Result, RpcRequestSendError> { let mut peer_id_to_request_map = HashMap::new(); @@ -763,6 +762,7 @@ impl SyncNetworkContext { &mut self, lookup_id: SingleLookupId, block_root: Hash256, + block_slot: Slot, lookup_peers: Arc>>, ) -> Result { let span = span!( @@ -781,9 +781,9 @@ impl SyncNetworkContext { // Include only the blob indexes not yet imported (received through gossip) let custody_indexes_to_fetch = self .network_globals() - .sampling_columns - .clone() - .into_iter() + .sampling_columns(block_slot) + .iter() + .copied() .filter(|index| !custody_indexes_imported.contains(index)) .collect::>(); diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 24045e901b..767630804d 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1130,7 +1130,7 @@ impl SyncingChain { // Require peers on all sampling column subnets before sending batches let peers_on_all_custody_subnets = network .network_globals() - .sampling_subnets + .sampling_subnets(epoch.start_slot(T::EthSpec::slots_per_epoch())) .iter() .all(|subnet_id| { let peer_count = network diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index fe72979930..062a0346cb 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -1203,11 +1203,12 @@ impl TestRig { payload_verification_status: PayloadVerificationStatus::Verified, is_valid_merge_transition_block: false, }; + let block_slot = block.slot(); let executed_block = AvailabilityPendingExecutedBlock::new( block, import_data, payload_verification_outcome, - self.network_globals.custody_columns_count() as usize, + self.network_globals.custody_columns_count(block_slot) as usize, ); match self .harness