use super::*; use beacon_chain::{ builder::{BeaconChainBuilder, Witness}, eth1_chain::CachingEth1Backend, }; use futures::Stream; use genesis::{generate_deterministic_keypairs, interop_genesis_state}; use lazy_static::lazy_static; use matches::assert_matches; use slog::Logger; use sloggers::{null::NullLoggerBuilder, Build}; use slot_clock::{SlotClock, SystemTimeSlotClock}; use std::time::{Duration, SystemTime}; use store::config::StoreConfig; use store::{HotColdDB, MemoryStore}; use types::{CommitteeIndex, EthSpec, MinimalEthSpec}; const SLOT_DURATION_MILLIS: u64 = 400; type TestBeaconChainType = Witness< SystemTimeSlotClock, CachingEth1Backend, MinimalEthSpec, MemoryStore, MemoryStore, >; pub struct TestBeaconChain { chain: Arc>, } impl TestBeaconChain { pub fn new_with_system_clock() -> Self { let spec = MinimalEthSpec::default_spec(); let keypairs = generate_deterministic_keypairs(1); let log = get_logger(); let store = HotColdDB::open_ephemeral(StoreConfig::default(), spec.clone(), log.clone()).unwrap(); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); let chain = Arc::new( BeaconChainBuilder::new(MinimalEthSpec) .logger(log.clone()) .custom_spec(spec.clone()) .store(Arc::new(store)) .genesis_state( interop_genesis_state::(&keypairs, 0, &spec) .expect("should generate interop state"), ) .expect("should build state using recent genesis") .dummy_eth1_backend() .expect("should build dummy backend") .slot_clock(SystemTimeSlotClock::new( Slot::new(0), Duration::from_secs(recent_genesis_time()), Duration::from_millis(SLOT_DURATION_MILLIS), )) .shutdown_sender(shutdown_tx) .monitor_validators(true, vec![], log) .build() .expect("should build"), ); Self { chain } } } pub fn recent_genesis_time() -> u64 { SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_secs() } fn get_logger() -> Logger { NullLoggerBuilder.build().expect("logger should build") } lazy_static! { static ref CHAIN: TestBeaconChain = TestBeaconChain::new_with_system_clock(); } fn get_attestation_service() -> AttestationService { let log = get_logger(); let config = NetworkConfig::default(); let beacon_chain = CHAIN.chain.clone(); AttestationService::new(beacon_chain, &config, &log) } fn get_subscription( validator_index: u64, attestation_committee_index: CommitteeIndex, slot: Slot, committee_count_at_slot: u64, ) -> ValidatorSubscription { let is_aggregator = true; ValidatorSubscription { validator_index, attestation_committee_index, slot, committee_count_at_slot, is_aggregator, } } fn get_subscriptions( validator_count: u64, slot: Slot, committee_count_at_slot: u64, ) -> Vec { (0..validator_count) .map(|validator_index| { get_subscription( validator_index, validator_index, slot, committee_count_at_slot, ) }) .collect() } // gets a number of events from the subscription service, or returns none if it times out after a number // of slots async fn get_events + Unpin>( stream: &mut S, num_events: Option, num_slots_before_timeout: u32, ) -> Vec { let mut events = Vec::new(); let collect_stream_fut = async { loop { if let Some(result) = stream.next().await { events.push(result); if let Some(num) = num_events { if events.len() == num { return; } } } } }; tokio::select! { _ = collect_stream_fut => {return events} _ = tokio::time::sleep( Duration::from_millis(SLOT_DURATION_MILLIS) * num_slots_before_timeout, ) => { return events; } } } #[tokio::test] async fn subscribe_current_slot_wait_for_unsubscribe() { // subscription config let validator_index = 1; let committee_index = 1; // Keep a low subscription slot so that there are no additional subnet discovery events. let subscription_slot = 0; let committee_count = 1; // create the attestation service and subscriptions let mut attestation_service = get_attestation_service(); let current_slot = attestation_service .beacon_chain .slot_clock .now() .expect("Could not get current slot"); let subscriptions = vec![get_subscription( validator_index, committee_index, current_slot + Slot::new(subscription_slot), committee_count, )]; // submit the subscriptions attestation_service .validator_subscriptions(subscriptions) .unwrap(); // not enough time for peer discovery, just subscribe, unsubscribe let subnet_id = SubnetId::compute_subnet::( current_slot + Slot::new(subscription_slot), committee_index, committee_count, &attestation_service.beacon_chain.spec, ) .unwrap(); let expected = vec![ AttServiceMessage::Subscribe(subnet_id), AttServiceMessage::Unsubscribe(subnet_id), ]; // Wait for 1 slot duration to get the unsubscription event let events = get_events(&mut attestation_service, None, 1).await; assert_matches!( events[..3], [ AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any1), AttServiceMessage::EnrAdd(_any3) ] ); // If the long lived and short lived subnets are the same, there should be no more events // as we don't resubscribe already subscribed subnets. if !attestation_service.random_subnets.contains(&subnet_id) { assert_eq!(expected[..], events[3..]); } // Should be subscribed to only 1 long lived subnet after unsubscription. assert_eq!(attestation_service.subscription_count(), 1); } /// Test to verify that we are not unsubscribing to a subnet before a required subscription. #[tokio::test] async fn test_same_subnet_unsubscription() { // subscription config let validator_index = 1; let committee_count = 1; // Makes 2 validator subscriptions to the same subnet but at different slots. // There should be just 1 unsubscription event for the later slot subscription (subscription_slot2). let subscription_slot1 = 0; let subscription_slot2 = 1; let com1 = 1; let com2 = 0; // create the attestation service and subscriptions let mut attestation_service = get_attestation_service(); let current_slot = attestation_service .beacon_chain .slot_clock .now() .expect("Could not get current slot"); let sub1 = get_subscription( validator_index, com1, current_slot + Slot::new(subscription_slot1), committee_count, ); let sub2 = get_subscription( validator_index, com2, current_slot + Slot::new(subscription_slot2), committee_count, ); let subnet_id1 = SubnetId::compute_subnet::( current_slot + Slot::new(subscription_slot1), com1, committee_count, &attestation_service.beacon_chain.spec, ) .unwrap(); let subnet_id2 = SubnetId::compute_subnet::( current_slot + Slot::new(subscription_slot2), com2, committee_count, &attestation_service.beacon_chain.spec, ) .unwrap(); // Assert that subscriptions are different but their subnet is the same assert_ne!(sub1, sub2); assert_eq!(subnet_id1, subnet_id2); // submit the subscriptions attestation_service .validator_subscriptions(vec![sub1, sub2]) .unwrap(); // Unsubscription event should happen at slot 2 (since subnet id's are the same, unsubscription event should be at higher slot + 1) // Get all events for 1 slot duration (unsubscription event should happen after 2 slot durations). let events = get_events(&mut attestation_service, None, 1).await; assert_matches!( events[..3], [ AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any1), AttServiceMessage::EnrAdd(_any3) ] ); let expected = AttServiceMessage::Subscribe(subnet_id1); // Should be still subscribed to 1 long lived and 1 short lived subnet if both are different. if !attestation_service.random_subnets.contains(&subnet_id1) { assert_eq!(expected, events[3]); assert_eq!(attestation_service.subscription_count(), 2); } else { assert_eq!(attestation_service.subscription_count(), 1); } // Get event for 1 more slot duration, we should get the unsubscribe event now. let unsubscribe_event = get_events(&mut attestation_service, None, 1).await; // If the long lived and short lived subnets are different, we should get an unsubscription event. if !attestation_service.random_subnets.contains(&subnet_id1) { assert_eq!( [AttServiceMessage::Unsubscribe(subnet_id1)], unsubscribe_event[..] ); } // Should be subscribed to only 1 long lived subnet after unsubscription. assert_eq!(attestation_service.subscription_count(), 1); } #[tokio::test] async fn subscribe_all_random_subnets() { let attestation_subnet_count = MinimalEthSpec::default_spec().attestation_subnet_count; let subscription_slot = 10; let subscription_count = attestation_subnet_count; let committee_count = 1; // create the attestation service and subscriptions let mut attestation_service = get_attestation_service(); let current_slot = attestation_service .beacon_chain .slot_clock .now() .expect("Could not get current slot"); let subscriptions = get_subscriptions( subscription_count, current_slot + subscription_slot, committee_count, ); // submit the subscriptions attestation_service .validator_subscriptions(subscriptions) .unwrap(); let events = get_events(&mut attestation_service, None, 3).await; let mut discover_peer_count = 0; let mut enr_add_count = 0; let mut unexpected_msg_count = 0; for event in &events { match event { AttServiceMessage::DiscoverPeers(_) => discover_peer_count += 1, AttServiceMessage::Subscribe(_any_subnet) => {} AttServiceMessage::EnrAdd(_any_subnet) => enr_add_count += 1, _ => unexpected_msg_count += 1, } } // The bulk discovery request length should be equal to validator_count let bulk_discovery_event = events.last().unwrap(); if let AttServiceMessage::DiscoverPeers(d) = bulk_discovery_event { assert_eq!(d.len(), attestation_subnet_count as usize); } else { panic!("Unexpected event {:?}", bulk_discovery_event); } // 64 `DiscoverPeer` requests of length 1 corresponding to random subnets // and 1 `DiscoverPeer` request corresponding to bulk subnet discovery. assert_eq!(discover_peer_count, subscription_count + 1); assert_eq!(attestation_service.subscription_count(), 64); assert_eq!(enr_add_count, 64); assert_eq!(unexpected_msg_count, 0); // test completed successfully } #[tokio::test] async fn subscribe_all_random_subnets_plus_one() { let attestation_subnet_count = MinimalEthSpec::default_spec().attestation_subnet_count; let subscription_slot = 10; // the 65th subscription should result in no more messages than the previous scenario let subscription_count = attestation_subnet_count + 1; let committee_count = 1; // create the attestation service and subscriptions let mut attestation_service = get_attestation_service(); let current_slot = attestation_service .beacon_chain .slot_clock .now() .expect("Could not get current slot"); let subscriptions = get_subscriptions( subscription_count, current_slot + subscription_slot, committee_count, ); // submit the subscriptions attestation_service .validator_subscriptions(subscriptions) .unwrap(); let events = get_events(&mut attestation_service, None, 3).await; let mut discover_peer_count = 0; let mut enr_add_count = 0; let mut unexpected_msg_count = 0; for event in &events { match event { AttServiceMessage::DiscoverPeers(_) => discover_peer_count += 1, AttServiceMessage::Subscribe(_any_subnet) => {} AttServiceMessage::EnrAdd(_any_subnet) => enr_add_count += 1, _ => unexpected_msg_count += 1, } } // The bulk discovery request length shouldn't exceed max attestation_subnet_count let bulk_discovery_event = events.last().unwrap(); if let AttServiceMessage::DiscoverPeers(d) = bulk_discovery_event { assert_eq!(d.len(), attestation_subnet_count as usize); } else { panic!("Unexpected event {:?}", bulk_discovery_event); } // 64 `DiscoverPeer` requests of length 1 corresponding to random subnets // and 1 `DiscoverPeer` request corresponding to the bulk subnet discovery. // For the 65th subscription, the call to `subscribe_to_random_subnets` is not made because we are at capacity. assert_eq!(discover_peer_count, 64 + 1); assert_eq!(attestation_service.subscription_count(), 64); assert_eq!(enr_add_count, 64); assert_eq!(unexpected_msg_count, 0); }