#![cfg(not(debug_assertions))] #![cfg(test)] use crate::persisted_dht::load_dht; use crate::{NetworkConfig, NetworkService}; use beacon_chain::BeaconChainTypes; use beacon_chain::test_utils::BeaconChainHarness; use beacon_processor::{BeaconProcessorChannels, BeaconProcessorConfig}; use futures::StreamExt; use lighthouse_network::identity::secp256k1; use lighthouse_network::types::{GossipEncoding, GossipKind}; use lighthouse_network::{Enr, GossipTopic}; use std::str::FromStr; use std::sync::Arc; use tokio::runtime::Runtime; use types::{Epoch, EthSpec, MinimalEthSpec, SubnetId}; impl NetworkService { fn get_topic_params(&self, topic: GossipTopic) -> Option<&gossipsub::TopicScoreParams> { self.libp2p.get_topic_params(topic) } } #[test] fn test_dht_persistence() { let beacon_chain = BeaconChainHarness::builder(MinimalEthSpec) .default_spec() .deterministic_keypairs(8) .fresh_ephemeral_store() .build() .chain; let store = beacon_chain.store.clone(); let enr1 = Enr::from_str("enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjzCBOonrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQPKY0yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8").unwrap(); let enr2 = Enr::from_str("enr:-IS4QJ2d11eu6dC7E7LoXeLMgMP3kom1u3SE8esFSWvaHoo0dP1jg8O3-nx9ht-EO3CmG7L6OkHcMmoIh00IYWB92QABgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQIB_c-jQMOXsbjWkbN-Oj99H57gfId5pfb4wa1qxwV4CIN1ZHCCIyk").unwrap(); let enrs = vec![enr1, enr2]; let runtime = Arc::new(Runtime::new().unwrap()); let (signal, exit) = async_channel::bounded(1); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); let executor = task_executor::TaskExecutor::new(Arc::downgrade(&runtime), exit, shutdown_tx); let mut config = NetworkConfig::default(); config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, 21212, 21212, 21213); config.discv5_config.table_filter = |_| true; // Do not ignore local IPs config.upnp_enabled = false; config.boot_nodes_enr = enrs.clone(); let config = Arc::new(config); runtime.block_on(async move { // Create a new network service which implicitly gets dropped at the // end of the block. let BeaconProcessorChannels { beacon_processor_tx, beacon_processor_rx: _beacon_processor_rx, } = <_>::default(); let _network_service = NetworkService::start( beacon_chain.clone(), config, executor, None, beacon_processor_tx, secp256k1::Keypair::generate().into(), ) .await .unwrap(); drop(signal); }); let raw_runtime = Arc::try_unwrap(runtime).unwrap(); raw_runtime.shutdown_timeout(tokio::time::Duration::from_secs(300)); // Load the persisted dht from the store let persisted_enrs = load_dht(store); assert!( persisted_enrs.contains(&enrs[0]), "should have persisted the first ENR to store" ); assert!( persisted_enrs.contains(&enrs[1]), "should have persisted the second ENR to store" ); } // Test removing topic weight on old topics when a fork happens. #[test] fn test_removing_topic_weight_on_old_topics() { let runtime = Arc::new(Runtime::new().unwrap()); // Capella spec let mut spec = MinimalEthSpec::default_spec(); spec.altair_fork_epoch = Some(Epoch::new(0)); spec.bellatrix_fork_epoch = Some(Epoch::new(0)); spec.capella_fork_epoch = Some(Epoch::new(1)); // Build beacon chain. let beacon_chain = BeaconChainHarness::builder(MinimalEthSpec) .spec(spec.clone().into()) .deterministic_keypairs(8) .fresh_ephemeral_store() .mock_execution_layer() .build() .chain; let (next_fork_epoch, _) = beacon_chain.duration_to_next_digest().expect("next fork"); assert_eq!(Some(next_fork_epoch), spec.capella_fork_epoch); // Build network service. let (mut network_service, network_globals, _network_senders) = runtime.block_on(async { let (_, exit) = async_channel::bounded(1); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); let executor = task_executor::TaskExecutor::new(Arc::downgrade(&runtime), exit, shutdown_tx); let mut config = NetworkConfig::default(); config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, 21214, 21214, 21215); config.discv5_config.table_filter = |_| true; // Do not ignore local IPs config.upnp_enabled = false; let config = Arc::new(config); let beacon_processor_channels = BeaconProcessorChannels::new(&BeaconProcessorConfig::default()); NetworkService::build( beacon_chain.clone(), config, executor.clone(), None, beacon_processor_channels.beacon_processor_tx, secp256k1::Keypair::generate().into(), ) .await .unwrap() }); // Subscribe to the topics. runtime.block_on(async { while network_globals.gossipsub_subscriptions.read().len() < 2 { if let Some(msg) = network_service.subnet_service.next().await { network_service.on_subnet_service_msg(msg); } } }); // Make sure the service is subscribed to the topics. let (old_topic1, old_topic2) = { let mut subnets = SubnetId::compute_attestation_subnets( network_globals.local_enr().node_id().raw(), &spec, ) .collect::>(); assert_eq!(2, subnets.len()); let old_fork_digest = beacon_chain.enr_fork_id().fork_digest; let old_topic1 = GossipTopic::new( GossipKind::Attestation(subnets.pop().unwrap()), GossipEncoding::SSZSnappy, old_fork_digest, ); let old_topic2 = GossipTopic::new( GossipKind::Attestation(subnets.pop().unwrap()), GossipEncoding::SSZSnappy, old_fork_digest, ); (old_topic1, old_topic2) }; let subscriptions = network_globals.gossipsub_subscriptions.read().clone(); assert_eq!(2, subscriptions.len()); assert!(subscriptions.contains(&old_topic1)); assert!(subscriptions.contains(&old_topic2)); let old_topic_params1 = network_service .get_topic_params(old_topic1.clone()) .expect("topic score params"); assert!(old_topic_params1.topic_weight > 0.0); let old_topic_params2 = network_service .get_topic_params(old_topic2.clone()) .expect("topic score params"); assert!(old_topic_params2.topic_weight > 0.0); // Advance slot to the next fork for _ in 0..MinimalEthSpec::slots_per_epoch() { beacon_chain.slot_clock.advance_slot(); } runtime.block_on(async { network_service.update_next_fork_digest(); }); // Check that topic_weight on the old topics has been zeroed. let old_topic_params1 = network_service .get_topic_params(old_topic1) .expect("topic score params"); assert_eq!(0.0, old_topic_params1.topic_weight); let old_topic_params2 = network_service .get_topic_params(old_topic2) .expect("topic score params"); assert_eq!(0.0, old_topic_params2.topic_weight); }