use super::*; use beacon_chain::{ BeaconChain, builder::{BeaconChainBuilder, Witness}, test_utils::get_kzg, }; use genesis::{DEFAULT_ETH1_BLOCK_HASH, generate_deterministic_keypairs, interop_genesis_state}; use lighthouse_network::NetworkConfig; use rand::SeedableRng; use rand::rngs::StdRng; use slot_clock::{SlotClock, SystemTimeSlotClock}; use std::sync::{Arc, LazyLock}; use std::time::{Duration, SystemTime}; use store::config::StoreConfig; use store::{HotColdDB, MemoryStore}; use task_executor::test_utils::TestRuntime; use tracing_subscriber::EnvFilter; use types::{ CommitteeIndex, Epoch, EthSpec, Hash256, MainnetEthSpec, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId, ValidatorSubscription, }; const SLOT_DURATION_MILLIS: u64 = 400; const TEST_LOG_LEVEL: Option<&str> = None; type TestBeaconChainType = Witness< SystemTimeSlotClock, MainnetEthSpec, MemoryStore, MemoryStore, >; pub struct TestBeaconChain { chain: Arc>, _test_runtime: TestRuntime, } impl TestBeaconChain { pub fn new_with_system_clock() -> Self { let spec = Arc::new(MainnetEthSpec::default_spec()); get_tracing_subscriber(TEST_LOG_LEVEL); let keypairs = generate_deterministic_keypairs(1); let store = HotColdDB::open_ephemeral(StoreConfig::default(), spec.clone()).unwrap(); let kzg = get_kzg(&spec); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); let test_runtime = TestRuntime::default(); let chain = Arc::new( BeaconChainBuilder::new(MainnetEthSpec, kzg.clone()) .custom_spec(spec.clone()) .store(Arc::new(store)) .task_executor(test_runtime.task_executor.clone()) .genesis_state( interop_genesis_state::( &keypairs, 0, Hash256::from_slice(DEFAULT_ETH1_BLOCK_HASH), None, &spec, ) .expect("should generate interop state"), ) .expect("should build state using recent genesis") .slot_clock(SystemTimeSlotClock::new( Slot::new(0), Duration::from_secs(recent_genesis_time()), Duration::from_millis(SLOT_DURATION_MILLIS), )) .shutdown_sender(shutdown_tx) .rng(Box::new(StdRng::seed_from_u64(42))) .build() .expect("should build"), ); Self { chain, _test_runtime: test_runtime, } } } pub fn recent_genesis_time() -> u64 { SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_secs() } fn get_tracing_subscriber(log_level: Option<&str>) { if let Some(level) = log_level { let _ = tracing_subscriber::fmt() .with_env_filter(EnvFilter::try_new(level).unwrap()) .try_init(); } } static CHAIN: LazyLock = LazyLock::new(TestBeaconChain::new_with_system_clock); fn get_subnet_service() -> SubnetService { let config = NetworkConfig::default(); let beacon_chain = CHAIN.chain.clone(); SubnetService::new( beacon_chain, lighthouse_network::discv5::enr::NodeId::random(), &config, ) } // gets a number of events from the subscription service, or returns none if it times out after a // specified duration. async fn get_events_until_timeout + Unpin>( stream: &mut S, num_events: Option, timeout: Duration, ) -> Vec { let mut events = Vec::new(); let sleep = tokio::time::sleep(timeout); futures::pin_mut!(sleep); loop { tokio::select! { Some(event) = stream.next() => { events.push(event); if let Some(num) = num_events && events.len() == num { break; } } _ = sleep.as_mut() => { break; } } } events } // gets a number of events from the subscription service, or returns none if it times out after a number // of slots async fn get_events_until_num_slots + Unpin>( stream: &mut S, num_events: Option, num_slots_before_timeout: u32, ) -> Vec { let timeout = Duration::from_millis(SLOT_DURATION_MILLIS) * num_slots_before_timeout; get_events_until_timeout(stream, num_events, timeout).await } mod test { #[cfg(not(windows))] use crate::subnet_service::MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD; use super::*; fn get_subscription( attestation_committee_index: CommitteeIndex, slot: Slot, committee_count_at_slot: u64, is_aggregator: bool, ) -> Subscription { Subscription::Attestation(ValidatorSubscription { attestation_committee_index, slot, committee_count_at_slot, is_aggregator, }) } fn get_subscriptions( validator_count: u64, slot: Slot, committee_count_at_slot: u64, is_aggregator: bool, ) -> Vec { (0..validator_count) .map(|validator_index| { get_subscription( validator_index, slot, committee_count_at_slot, is_aggregator, ) }) .collect() } #[tokio::test] async fn subscribe_current_slot_wait_for_unsubscribe() { // subscription config let committee_index = 1; let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize; // create the attestation service and subscriptions let mut subnet_service = get_subnet_service(); let _events = get_events_until_num_slots(&mut subnet_service, None, 1).await; let current_slot = subnet_service .beacon_chain .slot_clock .now() .expect("Could not get current slot"); // Generate a subnet that isn't in our permanent subnet collection let subscription_slot = current_slot + 1; let mut committee_count = 1; let mut subnet = Subnet::Attestation( SubnetId::compute_subnet::( subscription_slot, committee_index, committee_count, &subnet_service.beacon_chain.spec, ) .unwrap(), ); while subnet_service .permanent_subscriptions() .any(|x| *x == subnet) { committee_count += 1; subnet = Subnet::Attestation( SubnetId::compute_subnet::( subscription_slot, committee_index, committee_count, &subnet_service.beacon_chain.spec, ) .unwrap(), ); } let subscriptions = vec![get_subscription( committee_index, subscription_slot, committee_count, true, )]; // submit the subscriptions subnet_service.validator_subscriptions(subscriptions.into_iter()); // not enough time for peer discovery, just subscribe, unsubscribe let expected = [ SubnetServiceMessage::Subscribe(subnet), SubnetServiceMessage::Unsubscribe(subnet), ]; // Wait for 1 slot duration to get the unsubscription event let events = get_events_until_num_slots( &mut subnet_service, Some(2), (MainnetEthSpec::slots_per_epoch()) as u32, ) .await; assert_eq!(events, expected); // Should be subscribed to only subnets_per_node permananet subnet after unsubscription. assert_eq!( subnet_service.permanent_subscriptions().count(), subnets_per_node ); assert_eq!(subnet_service.subscriptions().count(), 0); } /// Test to verify that we are not unsubscribing to a subnet before a required subscription. #[cfg(not(windows))] #[tokio::test] async fn test_same_subnet_unsubscription() { // subscription config let committee_count = 1; // Makes 2 validator subscriptions to the same subnet but at different slots. // There should be just 1 unsubscription event for the later slot subscription (subscription_slot2). let subscription_slot1 = 0; let subscription_slot2 = 1; let com1 = 1; let com2 = 0; // create the subnet service and subscriptions let mut subnet_service = get_subnet_service(); let _events = get_events_until_num_slots(&mut subnet_service, None, 0).await; let current_slot = subnet_service .beacon_chain .slot_clock .now() .expect("Could not get current slot"); let sub1 = get_subscription( com1, current_slot + Slot::new(subscription_slot1), committee_count, true, ); let sub2 = get_subscription( com2, current_slot + Slot::new(subscription_slot2), committee_count, true, ); let subnet_id1 = SubnetId::compute_subnet::( current_slot + Slot::new(subscription_slot1), com1, committee_count, &subnet_service.beacon_chain.spec, ) .unwrap(); let subnet_id2 = SubnetId::compute_subnet::( current_slot + Slot::new(subscription_slot2), com2, committee_count, &subnet_service.beacon_chain.spec, ) .unwrap(); // Assert that subscriptions are different but their subnet is the same assert_ne!(sub1, sub2); assert_eq!(subnet_id1, subnet_id2); // submit the subscriptions subnet_service.validator_subscriptions(vec![sub1, sub2].into_iter()); // Unsubscription event should happen at slot 2 (since subnet id's are the same, unsubscription event should be at higher slot + 1) let expected = SubnetServiceMessage::Subscribe(Subnet::Attestation(subnet_id1)); if subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) { // If we are permanently subscribed to this subnet, we won't see a subscribe message let _ = get_events_until_num_slots(&mut subnet_service, None, 1).await; } else { let subscription = get_events_until_num_slots(&mut subnet_service, None, 1).await; assert_eq!(subscription, [expected]); } // Get event for 1 more slot duration, we should get the unsubscribe event now. let unsubscribe_event = get_events_until_num_slots(&mut subnet_service, None, 1).await; // If the long lived and short lived subnets are different, we should get an unsubscription // event. let expected = SubnetServiceMessage::Unsubscribe(Subnet::Attestation(subnet_id1)); if !subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) { assert_eq!([expected], unsubscribe_event[..]); } // Should no longer be subscribed to any short lived subnets after unsubscription. assert_eq!(subnet_service.subscriptions().count(), 0); } #[tokio::test] async fn subscribe_all_subnets() { let attestation_subnet_count = MainnetEthSpec::default_spec().attestation_subnet_count; let subscription_slot = 3; let subscriptions_count = attestation_subnet_count; let committee_count = 1; let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize; // create the attestation service and subscriptions let mut subnet_service = get_subnet_service(); let current_slot = subnet_service .beacon_chain .slot_clock .now() .expect("Could not get current slot"); let subscriptions = get_subscriptions( subscriptions_count, current_slot + subscription_slot, committee_count, true, ); // submit the subscriptions subnet_service.validator_subscriptions(subscriptions.into_iter()); let events = get_events_until_num_slots(&mut subnet_service, Some(130), 10).await; let mut discover_peer_count = 0; let mut enr_add_count = 0; let mut unsubscribe_event_count = 0; let mut subscription_event_count = 0; for event in &events { match event { SubnetServiceMessage::DiscoverPeers(_) => discover_peer_count += 1, SubnetServiceMessage::Subscribe(_any_subnet) => subscription_event_count += 1, SubnetServiceMessage::EnrAdd(_any_subnet) => enr_add_count += 1, SubnetServiceMessage::Unsubscribe(_) => unsubscribe_event_count += 1, SubnetServiceMessage::EnrRemove(_) => {} } } // There should be a Subscribe Event, an Enr Add event for each // permanent subnet initially. There is a single discovery event for the permanent // subnets. // The next event should be a bulk discovery event. let bulk_discovery_index = subnets_per_node * 2 + 1; // The bulk discovery request length should be equal to validator_count let bulk_discovery_event = &events[bulk_discovery_index]; if let SubnetServiceMessage::DiscoverPeers(d) = bulk_discovery_event { assert_eq!(d.len(), attestation_subnet_count as usize); } else { panic!("Unexpected event {:?}", bulk_discovery_event); } // 64 `DiscoverPeer` requests of length 1 corresponding to deterministic subnets // and 1 `DiscoverPeer` request corresponding to bulk subnet discovery. assert_eq!(discover_peer_count, 1 + 1); assert_eq!(subscription_event_count, attestation_subnet_count); assert_eq!(enr_add_count, subnets_per_node); assert_eq!( unsubscribe_event_count, attestation_subnet_count - subnets_per_node as u64 ); // test completed successfully } #[tokio::test] async fn subscribe_correct_number_of_subnets() { let attestation_subnet_count = MainnetEthSpec::default_spec().attestation_subnet_count; let subscription_slot = 10; let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize; // the 65th subscription should result in no more messages than the previous scenario let subscriptions_count = attestation_subnet_count + 1; let committee_count = 1; // create the attestation service and subscriptions let mut subnet_service = get_subnet_service(); let current_slot = subnet_service .beacon_chain .slot_clock .now() .expect("Could not get current slot"); let subscriptions = get_subscriptions( subscriptions_count, current_slot + subscription_slot, committee_count, true, ); // submit the subscriptions subnet_service.validator_subscriptions(subscriptions.into_iter()); let events = get_events_until_num_slots(&mut subnet_service, None, 3).await; let mut discover_peer_count = 0; let mut enr_add_count = 0; let mut unexpected_msg_count = 0; for event in &events { match event { SubnetServiceMessage::DiscoverPeers(_) => discover_peer_count += 1, SubnetServiceMessage::Subscribe(_any_subnet) => {} SubnetServiceMessage::EnrAdd(_any_subnet) => enr_add_count += 1, _ => { unexpected_msg_count += 1; println!("{:?}", event); } } } // The bulk discovery request length shouldn't exceed max attestation_subnet_count let bulk_discovery_event = events.last().unwrap(); if let SubnetServiceMessage::DiscoverPeers(d) = bulk_discovery_event { assert_eq!(d.len(), attestation_subnet_count as usize); } else { panic!("Unexpected event {:?}", bulk_discovery_event); } // subnets_per_node `DiscoverPeer` requests of length 1 corresponding to long-lived subnets // and 1 `DiscoverPeer` request corresponding to the bulk subnet discovery. assert_eq!(discover_peer_count, 1 + 1); // Generates a single discovery for permanent // subscriptions and 1 for the subscription assert_eq!(enr_add_count, subnets_per_node); assert_eq!(unexpected_msg_count, 0); } #[cfg(not(windows))] #[tokio::test] async fn test_subscribe_same_subnet_several_slots_apart() { // subscription config let committee_count = 1; // Makes 3 validator subscriptions to the same subnet but at different slots. let subscription_slot1 = 0; let subscription_slot2 = MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD + 4; let subscription_slot3 = subscription_slot2 * 2; let com1 = MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD + 4; let com2 = 0; let com3 = CHAIN.chain.spec.attestation_subnet_count - com1; // create the attestation service and subscriptions let mut subnet_service = get_subnet_service(); // Remove permanent events let _events = get_events_until_num_slots(&mut subnet_service, None, 0).await; let current_slot = subnet_service .beacon_chain .slot_clock .now() .expect("Could not get current slot"); let sub1 = get_subscription( com1, current_slot + Slot::new(subscription_slot1), committee_count, true, ); let sub2 = get_subscription( com2, current_slot + Slot::new(subscription_slot2), committee_count, true, ); let sub3 = get_subscription( com3, current_slot + Slot::new(subscription_slot3), committee_count, true, ); let subnet_id1 = SubnetId::compute_subnet::( current_slot + Slot::new(subscription_slot1), com1, committee_count, &subnet_service.beacon_chain.spec, ) .unwrap(); let subnet_id2 = SubnetId::compute_subnet::( current_slot + Slot::new(subscription_slot2), com2, committee_count, &subnet_service.beacon_chain.spec, ) .unwrap(); let subnet_id3 = SubnetId::compute_subnet::( current_slot + Slot::new(subscription_slot3), com3, committee_count, &subnet_service.beacon_chain.spec, ) .unwrap(); // Assert that subscriptions are different but their subnet is the same assert_ne!(sub1, sub2); assert_ne!(sub1, sub3); assert_ne!(sub2, sub3); assert_eq!(subnet_id1, subnet_id2); assert_eq!(subnet_id1, subnet_id3); // submit the subscriptions subnet_service.validator_subscriptions(vec![sub1, sub2, sub3].into_iter()); // Unsubscription event should happen at the end of the slot. // We wait for 2 slots, to avoid timeout issues let events = get_events_until_num_slots(&mut subnet_service, None, 2).await; let expected_subscription = SubnetServiceMessage::Subscribe(Subnet::Attestation(subnet_id1)); let expected_unsubscription = SubnetServiceMessage::Unsubscribe(Subnet::Attestation(subnet_id1)); if !subnet_service.is_subscribed_permanent(&Subnet::Attestation(subnet_id1)) { assert_eq!(expected_subscription, events[0]); assert_eq!(expected_unsubscription, events[2]); } // Check that there are no more subscriptions assert_eq!(subnet_service.subscriptions().count(), 0); println!("{events:?}"); let subscription_slot = current_slot + subscription_slot2 - 1; // one less do to the // advance subscription time let wait_duration = subnet_service .beacon_chain .slot_clock .duration_to_slot(subscription_slot) .unwrap(); let no_events = dbg!(get_events_until_timeout(&mut subnet_service, None, wait_duration).await); assert_eq!(no_events, []); let subscription_end_slot = current_slot + subscription_slot2 + 2; // +1 to get to the end of the duty slot, +1 for the slot to complete let wait_duration = subnet_service .beacon_chain .slot_clock .duration_to_slot(subscription_end_slot) .unwrap(); let second_subscribe_event = get_events_until_timeout(&mut subnet_service, None, wait_duration).await; // If the permanent and short lived subnets are different, we should get an unsubscription event. if !subnet_service.is_subscribed_permanent(&Subnet::Attestation(subnet_id1)) { assert_eq!( [ expected_subscription.clone(), expected_unsubscription.clone(), ], second_subscribe_event[..] ); } let subscription_slot = current_slot + subscription_slot3 - 1; let wait_duration = subnet_service .beacon_chain .slot_clock .duration_to_slot(subscription_slot) .unwrap(); let no_events = dbg!(get_events_until_timeout(&mut subnet_service, None, wait_duration).await); assert_eq!(no_events, []); let subscription_end_slot = current_slot + subscription_slot3 + 2; // +1 to get to the end of the duty slot, +1 for the slot to complete let wait_duration = subnet_service .beacon_chain .slot_clock .duration_to_slot(subscription_end_slot) .unwrap(); let third_subscribe_event = get_events_until_timeout(&mut subnet_service, None, wait_duration).await; if !subnet_service.is_subscribed_permanent(&Subnet::Attestation(subnet_id1)) { assert_eq!( [expected_subscription, expected_unsubscription], third_subscribe_event[..] ); } } #[tokio::test] async fn subscribe_and_unsubscribe_sync_committee() { // subscription config let validator_index = 1; let until_epoch = Epoch::new(1); let sync_committee_indices = vec![1]; // create the attestation service and subscriptions let mut subnet_service = get_subnet_service(); let _events = get_events_until_num_slots(&mut subnet_service, None, 0).await; let subscriptions = std::iter::once(Subscription::SyncCommittee(SyncCommitteeSubscription { validator_index, sync_committee_indices: sync_committee_indices.clone(), until_epoch, })); // submit the subscriptions subnet_service.validator_subscriptions(subscriptions); // Remove permanent subscription events let subnet_ids = SyncSubnetId::compute_subnets_for_sync_committee::( &sync_committee_indices, ) .unwrap(); let subnet_id = subnet_ids.iter().next().unwrap(); // Note: the unsubscription event takes 2 epochs (8 * 2 * 0.4 secs = 3.2 secs) let events = get_events_until_num_slots( &mut subnet_service, Some(5), (MainnetEthSpec::slots_per_epoch() * 3) as u32, // Have some buffer time before getting 5 events ) .await; assert_eq!( events[..2], [ SubnetServiceMessage::Subscribe(Subnet::SyncCommittee(*subnet_id)), SubnetServiceMessage::EnrAdd(Subnet::SyncCommittee(*subnet_id)) ] ); matches::assert_matches!( events[2..], [ SubnetServiceMessage::DiscoverPeers(_), SubnetServiceMessage::Unsubscribe(_), SubnetServiceMessage::EnrRemove(_), ] ); // Should be unsubscribed at the end. assert_eq!(subnet_service.subscriptions().count(), 0); } #[tokio::test] async fn same_subscription_with_lower_until_epoch() { // subscription config let validator_index = 1; let until_epoch = Epoch::new(2); let sync_committee_indices = vec![1]; // create the attestation service and subscriptions let mut subnet_service = get_subnet_service(); // Get the initial events from permanent subnet subscriptions let _events = get_events_until_num_slots(&mut subnet_service, None, 1).await; let subscriptions = std::iter::once(Subscription::SyncCommittee(SyncCommitteeSubscription { validator_index, sync_committee_indices: sync_committee_indices.clone(), until_epoch, })); // submit the subscriptions subnet_service.validator_subscriptions(subscriptions); // Get all immediate events (won't include unsubscriptions) let events = get_events_until_num_slots(&mut subnet_service, None, 1).await; matches::assert_matches!( events[..], [ SubnetServiceMessage::Subscribe(Subnet::SyncCommittee(_)), SubnetServiceMessage::EnrAdd(Subnet::SyncCommittee(_)), SubnetServiceMessage::DiscoverPeers(_), ] ); // Additional subscriptions which shouldn't emit any non-discovery events // Event 1 is a duplicate of an existing subscription // Event 2 is the same subscription with lower `until_epoch` than the existing subscription let subscriptions = vec![ Subscription::SyncCommittee(SyncCommitteeSubscription { validator_index, sync_committee_indices: sync_committee_indices.clone(), until_epoch, }), Subscription::SyncCommittee(SyncCommitteeSubscription { validator_index, sync_committee_indices: sync_committee_indices.clone(), until_epoch: until_epoch - 1, }), ]; // submit the subscriptions subnet_service.validator_subscriptions(subscriptions.into_iter()); // Get all immediate events (won't include unsubscriptions) let events = get_events_until_num_slots(&mut subnet_service, None, 1).await; matches::assert_matches!(events[..], [SubnetServiceMessage::DiscoverPeers(_),]); // Should be unsubscribed at the end. let sync_committee_subscriptions = subnet_service .subscriptions() .filter(|s| matches!(s, Subnet::SyncCommittee(_))) .count(); assert_eq!(sync_committee_subscriptions, 1); } }