Altair networking (#2300)

## Issue Addressed

Resolves #2278 

## Proposed Changes

Implements the networking components for the Altair hard fork https://github.com/ethereum/eth2.0-specs/blob/dev/specs/altair/p2p-interface.md

## Additional Info

This PR acts as the base branch for networking changes and tracks https://github.com/sigp/lighthouse/pull/2279 . Changes to gossip, rpc and discovery can be separate PRs to be merged here for ease of review.

Co-authored-by: realbigsean <seananderson33@gmail.com>
This commit is contained in:
Pawan Dhananjay
2021-08-04 01:44:57 +00:00
parent 6a620a31da
commit e8c0d1f19b
51 changed files with 4038 additions and 1354 deletions

View File

@@ -1,429 +0,0 @@
use super::*;
use beacon_chain::{
builder::{BeaconChainBuilder, Witness},
eth1_chain::CachingEth1Backend,
};
use futures::Stream;
use genesis::{generate_deterministic_keypairs, interop_genesis_state};
use lazy_static::lazy_static;
use matches::assert_matches;
use slog::Logger;
use sloggers::{null::NullLoggerBuilder, Build};
use slot_clock::{SlotClock, SystemTimeSlotClock};
use std::time::{Duration, SystemTime};
use store::config::StoreConfig;
use store::{HotColdDB, MemoryStore};
use types::{CommitteeIndex, EthSpec, MinimalEthSpec};
const SLOT_DURATION_MILLIS: u64 = 400;
type TestBeaconChainType = Witness<
SystemTimeSlotClock,
CachingEth1Backend<MinimalEthSpec>,
MinimalEthSpec,
MemoryStore<MinimalEthSpec>,
MemoryStore<MinimalEthSpec>,
>;
pub struct TestBeaconChain {
chain: Arc<BeaconChain<TestBeaconChainType>>,
}
impl TestBeaconChain {
pub fn new_with_system_clock() -> Self {
let spec = MinimalEthSpec::default_spec();
let keypairs = generate_deterministic_keypairs(1);
let log = get_logger();
let store =
HotColdDB::open_ephemeral(StoreConfig::default(), spec.clone(), log.clone()).unwrap();
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let chain = Arc::new(
BeaconChainBuilder::new(MinimalEthSpec)
.logger(log.clone())
.custom_spec(spec.clone())
.store(Arc::new(store))
.genesis_state(
interop_genesis_state::<MinimalEthSpec>(&keypairs, 0, &spec)
.expect("should generate interop state"),
)
.expect("should build state using recent genesis")
.dummy_eth1_backend()
.expect("should build dummy backend")
.slot_clock(SystemTimeSlotClock::new(
Slot::new(0),
Duration::from_secs(recent_genesis_time()),
Duration::from_millis(SLOT_DURATION_MILLIS),
))
.shutdown_sender(shutdown_tx)
.monitor_validators(true, vec![], log)
.build()
.expect("should build"),
);
Self { chain }
}
}
pub fn recent_genesis_time() -> u64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
}
fn get_logger() -> Logger {
NullLoggerBuilder.build().expect("logger should build")
}
lazy_static! {
static ref CHAIN: TestBeaconChain = TestBeaconChain::new_with_system_clock();
}
fn get_attestation_service() -> AttestationService<TestBeaconChainType> {
let log = get_logger();
let config = NetworkConfig::default();
let beacon_chain = CHAIN.chain.clone();
AttestationService::new(beacon_chain, &config, &log)
}
fn get_subscription(
validator_index: u64,
attestation_committee_index: CommitteeIndex,
slot: Slot,
committee_count_at_slot: u64,
) -> ValidatorSubscription {
let is_aggregator = true;
ValidatorSubscription {
validator_index,
attestation_committee_index,
slot,
committee_count_at_slot,
is_aggregator,
}
}
fn get_subscriptions(
validator_count: u64,
slot: Slot,
committee_count_at_slot: u64,
) -> Vec<ValidatorSubscription> {
(0..validator_count)
.map(|validator_index| {
get_subscription(
validator_index,
validator_index,
slot,
committee_count_at_slot,
)
})
.collect()
}
// gets a number of events from the subscription service, or returns none if it times out after a number
// of slots
async fn get_events<S: Stream<Item = AttServiceMessage> + Unpin>(
stream: &mut S,
num_events: Option<usize>,
num_slots_before_timeout: u32,
) -> Vec<AttServiceMessage> {
let mut events = Vec::new();
let collect_stream_fut = async {
loop {
if let Some(result) = stream.next().await {
events.push(result);
if let Some(num) = num_events {
if events.len() == num {
return;
}
}
}
}
};
tokio::select! {
_ = collect_stream_fut => {events}
_ = tokio::time::sleep(
Duration::from_millis(SLOT_DURATION_MILLIS) * num_slots_before_timeout,
) => { events }
}
}
#[tokio::test]
async fn subscribe_current_slot_wait_for_unsubscribe() {
// subscription config
let validator_index = 1;
let committee_index = 1;
// Keep a low subscription slot so that there are no additional subnet discovery events.
let subscription_slot = 0;
let committee_count = 1;
// create the attestation service and subscriptions
let mut attestation_service = get_attestation_service();
let current_slot = attestation_service
.beacon_chain
.slot_clock
.now()
.expect("Could not get current slot");
let subscriptions = vec![get_subscription(
validator_index,
committee_index,
current_slot + Slot::new(subscription_slot),
committee_count,
)];
// submit the subscriptions
attestation_service
.validator_subscriptions(subscriptions)
.unwrap();
// not enough time for peer discovery, just subscribe, unsubscribe
let subnet_id = SubnetId::compute_subnet::<MinimalEthSpec>(
current_slot + Slot::new(subscription_slot),
committee_index,
committee_count,
&attestation_service.beacon_chain.spec,
)
.unwrap();
let expected = vec![
AttServiceMessage::Subscribe(subnet_id),
AttServiceMessage::Unsubscribe(subnet_id),
];
// Wait for 1 slot duration to get the unsubscription event
let events = get_events(&mut attestation_service, None, 1).await;
assert_matches!(
events[..3],
[
AttServiceMessage::DiscoverPeers(_),
AttServiceMessage::Subscribe(_any1),
AttServiceMessage::EnrAdd(_any3)
]
);
// If the long lived and short lived subnets are the same, there should be no more events
// as we don't resubscribe already subscribed subnets.
if !attestation_service.random_subnets.contains(&subnet_id) {
assert_eq!(expected[..], events[3..]);
}
// Should be subscribed to only 1 long lived subnet after unsubscription.
assert_eq!(attestation_service.subscription_count(), 1);
}
/// Test to verify that we are not unsubscribing to a subnet before a required subscription.
#[tokio::test]
async fn test_same_subnet_unsubscription() {
// subscription config
let validator_index = 1;
let committee_count = 1;
// Makes 2 validator subscriptions to the same subnet but at different slots.
// There should be just 1 unsubscription event for the later slot subscription (subscription_slot2).
let subscription_slot1 = 0;
let subscription_slot2 = 1;
let com1 = 1;
let com2 = 0;
// create the attestation service and subscriptions
let mut attestation_service = get_attestation_service();
let current_slot = attestation_service
.beacon_chain
.slot_clock
.now()
.expect("Could not get current slot");
let sub1 = get_subscription(
validator_index,
com1,
current_slot + Slot::new(subscription_slot1),
committee_count,
);
let sub2 = get_subscription(
validator_index,
com2,
current_slot + Slot::new(subscription_slot2),
committee_count,
);
let subnet_id1 = SubnetId::compute_subnet::<MinimalEthSpec>(
current_slot + Slot::new(subscription_slot1),
com1,
committee_count,
&attestation_service.beacon_chain.spec,
)
.unwrap();
let subnet_id2 = SubnetId::compute_subnet::<MinimalEthSpec>(
current_slot + Slot::new(subscription_slot2),
com2,
committee_count,
&attestation_service.beacon_chain.spec,
)
.unwrap();
// Assert that subscriptions are different but their subnet is the same
assert_ne!(sub1, sub2);
assert_eq!(subnet_id1, subnet_id2);
// submit the subscriptions
attestation_service
.validator_subscriptions(vec![sub1, sub2])
.unwrap();
// Unsubscription event should happen at slot 2 (since subnet id's are the same, unsubscription event should be at higher slot + 1)
// Get all events for 1 slot duration (unsubscription event should happen after 2 slot durations).
let events = get_events(&mut attestation_service, None, 1).await;
assert_matches!(
events[..3],
[
AttServiceMessage::DiscoverPeers(_),
AttServiceMessage::Subscribe(_any1),
AttServiceMessage::EnrAdd(_any3)
]
);
let expected = AttServiceMessage::Subscribe(subnet_id1);
// Should be still subscribed to 1 long lived and 1 short lived subnet if both are different.
if !attestation_service.random_subnets.contains(&subnet_id1) {
assert_eq!(expected, events[3]);
assert_eq!(attestation_service.subscription_count(), 2);
} else {
assert_eq!(attestation_service.subscription_count(), 1);
}
// Get event for 1 more slot duration, we should get the unsubscribe event now.
let unsubscribe_event = get_events(&mut attestation_service, None, 1).await;
// If the long lived and short lived subnets are different, we should get an unsubscription event.
if !attestation_service.random_subnets.contains(&subnet_id1) {
assert_eq!(
[AttServiceMessage::Unsubscribe(subnet_id1)],
unsubscribe_event[..]
);
}
// Should be subscribed to only 1 long lived subnet after unsubscription.
assert_eq!(attestation_service.subscription_count(), 1);
}
#[tokio::test]
async fn subscribe_all_random_subnets() {
let attestation_subnet_count = MinimalEthSpec::default_spec().attestation_subnet_count;
let subscription_slot = 10;
let subscription_count = attestation_subnet_count;
let committee_count = 1;
// create the attestation service and subscriptions
let mut attestation_service = get_attestation_service();
let current_slot = attestation_service
.beacon_chain
.slot_clock
.now()
.expect("Could not get current slot");
let subscriptions = get_subscriptions(
subscription_count,
current_slot + subscription_slot,
committee_count,
);
// submit the subscriptions
attestation_service
.validator_subscriptions(subscriptions)
.unwrap();
let events = get_events(&mut attestation_service, None, 3).await;
let mut discover_peer_count = 0;
let mut enr_add_count = 0;
let mut unexpected_msg_count = 0;
for event in &events {
match event {
AttServiceMessage::DiscoverPeers(_) => discover_peer_count += 1,
AttServiceMessage::Subscribe(_any_subnet) => {}
AttServiceMessage::EnrAdd(_any_subnet) => enr_add_count += 1,
_ => unexpected_msg_count += 1,
}
}
// The bulk discovery request length should be equal to validator_count
let bulk_discovery_event = events.last().unwrap();
if let AttServiceMessage::DiscoverPeers(d) = bulk_discovery_event {
assert_eq!(d.len(), attestation_subnet_count as usize);
} else {
panic!("Unexpected event {:?}", bulk_discovery_event);
}
// 64 `DiscoverPeer` requests of length 1 corresponding to random subnets
// and 1 `DiscoverPeer` request corresponding to bulk subnet discovery.
assert_eq!(discover_peer_count, subscription_count + 1);
assert_eq!(attestation_service.subscription_count(), 64);
assert_eq!(enr_add_count, 64);
assert_eq!(unexpected_msg_count, 0);
// test completed successfully
}
#[tokio::test]
async fn subscribe_all_random_subnets_plus_one() {
let attestation_subnet_count = MinimalEthSpec::default_spec().attestation_subnet_count;
let subscription_slot = 10;
// the 65th subscription should result in no more messages than the previous scenario
let subscription_count = attestation_subnet_count + 1;
let committee_count = 1;
// create the attestation service and subscriptions
let mut attestation_service = get_attestation_service();
let current_slot = attestation_service
.beacon_chain
.slot_clock
.now()
.expect("Could not get current slot");
let subscriptions = get_subscriptions(
subscription_count,
current_slot + subscription_slot,
committee_count,
);
// submit the subscriptions
attestation_service
.validator_subscriptions(subscriptions)
.unwrap();
let events = get_events(&mut attestation_service, None, 3).await;
let mut discover_peer_count = 0;
let mut enr_add_count = 0;
let mut unexpected_msg_count = 0;
for event in &events {
match event {
AttServiceMessage::DiscoverPeers(_) => discover_peer_count += 1,
AttServiceMessage::Subscribe(_any_subnet) => {}
AttServiceMessage::EnrAdd(_any_subnet) => enr_add_count += 1,
_ => unexpected_msg_count += 1,
}
}
// The bulk discovery request length shouldn't exceed max attestation_subnet_count
let bulk_discovery_event = events.last().unwrap();
if let AttServiceMessage::DiscoverPeers(d) = bulk_discovery_event {
assert_eq!(d.len(), attestation_subnet_count as usize);
} else {
panic!("Unexpected event {:?}", bulk_discovery_event);
}
// 64 `DiscoverPeer` requests of length 1 corresponding to random subnets
// and 1 `DiscoverPeer` request corresponding to the bulk subnet discovery.
// For the 65th subscription, the call to `subscribe_to_random_subnets` is not made because we are at capacity.
assert_eq!(discover_peer_count, 64 + 1);
assert_eq!(attestation_service.subscription_count(), 64);
assert_eq!(enr_add_count, 64);
assert_eq!(unexpected_msg_count, 0);
}

View File

@@ -57,7 +57,8 @@ use task_executor::TaskExecutor;
use tokio::sync::{mpsc, oneshot};
use types::{
Attestation, AttesterSlashing, Hash256, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedVoluntaryExit, SubnetId,
SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId,
SyncCommitteeMessage, SyncSubnetId,
};
use work_reprocessing_queue::{
spawn_reprocess_scheduler, QueuedAggregate, QueuedBlock, QueuedUnaggregate, ReadyWork,
@@ -121,6 +122,14 @@ const MAX_GOSSIP_PROPOSER_SLASHING_QUEUE_LEN: usize = 4_096;
/// before we start dropping them.
const MAX_GOSSIP_ATTESTER_SLASHING_QUEUE_LEN: usize = 4_096;
/// The maximum number of queued `SyncCommitteeMessage` objects that will be stored before we start dropping
/// them.
const MAX_SYNC_MESSAGE_QUEUE_LEN: usize = 2048;
/// The maximum number of queued `SignedContributionAndProof` objects that will be stored before we
/// start dropping them.
const MAX_SYNC_CONTRIBUTION_QUEUE_LEN: usize = 1024;
/// The maximum number of queued `SignedBeaconBlock` objects received from the network RPC that
/// will be stored before we start dropping them.
const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024;
@@ -160,6 +169,8 @@ pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block";
pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit";
pub const GOSSIP_PROPOSER_SLASHING: &str = "gossip_proposer_slashing";
pub const GOSSIP_ATTESTER_SLASHING: &str = "gossip_attester_slashing";
pub const GOSSIP_SYNC_SIGNATURE: &str = "gossip_sync_signature";
pub const GOSSIP_SYNC_CONTRIBUTION: &str = "gossip_sync_contribution";
pub const RPC_BLOCK: &str = "rpc_block";
pub const CHAIN_SEGMENT: &str = "chain_segment";
pub const STATUS_PROCESSING: &str = "status_processing";
@@ -327,6 +338,44 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
}
}
/// Create a new `Work` event for some sync committee signature.
pub fn gossip_sync_signature(
message_id: MessageId,
peer_id: PeerId,
sync_signature: SyncCommitteeMessage,
subnet_id: SyncSubnetId,
seen_timestamp: Duration,
) -> Self {
Self {
drop_during_sync: true,
work: Work::GossipSyncSignature {
message_id,
peer_id,
sync_signature: Box::new(sync_signature),
subnet_id,
seen_timestamp,
},
}
}
/// Create a new `Work` event for some sync committee contribution.
pub fn gossip_sync_contribution(
message_id: MessageId,
peer_id: PeerId,
sync_contribution: SignedContributionAndProof<T::EthSpec>,
seen_timestamp: Duration,
) -> Self {
Self {
drop_during_sync: true,
work: Work::GossipSyncContribution {
message_id,
peer_id,
sync_contribution: Box::new(sync_contribution),
seen_timestamp,
},
}
}
/// Create a new `Work` event for some exit.
pub fn gossip_voluntary_exit(
message_id: MessageId,
@@ -553,6 +602,19 @@ pub enum Work<T: BeaconChainTypes> {
peer_id: PeerId,
attester_slashing: Box<AttesterSlashing<T::EthSpec>>,
},
GossipSyncSignature {
message_id: MessageId,
peer_id: PeerId,
sync_signature: Box<SyncCommitteeMessage>,
subnet_id: SyncSubnetId,
seen_timestamp: Duration,
},
GossipSyncContribution {
message_id: MessageId,
peer_id: PeerId,
sync_contribution: Box<SignedContributionAndProof<T::EthSpec>>,
seen_timestamp: Duration,
},
RpcBlock {
block: Box<SignedBeaconBlock<T::EthSpec>>,
result_tx: BlockResultSender<T::EthSpec>,
@@ -588,6 +650,8 @@ impl<T: BeaconChainTypes> Work<T> {
Work::GossipVoluntaryExit { .. } => GOSSIP_VOLUNTARY_EXIT,
Work::GossipProposerSlashing { .. } => GOSSIP_PROPOSER_SLASHING,
Work::GossipAttesterSlashing { .. } => GOSSIP_ATTESTER_SLASHING,
Work::GossipSyncSignature { .. } => GOSSIP_SYNC_SIGNATURE,
Work::GossipSyncContribution { .. } => GOSSIP_SYNC_CONTRIBUTION,
Work::RpcBlock { .. } => RPC_BLOCK,
Work::ChainSegment { .. } => CHAIN_SEGMENT,
Work::Status { .. } => STATUS_PROCESSING,
@@ -730,6 +794,9 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
let mut unknown_block_attestation_queue =
LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_REPROCESS_QUEUE_LEN);
let mut sync_message_queue = LifoQueue::new(MAX_SYNC_MESSAGE_QUEUE_LEN);
let mut sync_contribution_queue = LifoQueue::new(MAX_SYNC_CONTRIBUTION_QUEUE_LEN);
// Using a FIFO queue for voluntary exits since it prevents exit censoring. I don't have
// a strong feeling about queue type for exits.
let mut gossip_voluntary_exit_queue = FifoQueue::new(MAX_GOSSIP_EXIT_QUEUE_LEN);
@@ -859,6 +926,12 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
self.spawn_worker(item, toolbox);
} else if let Some(item) = attestation_queue.pop() {
self.spawn_worker(item, toolbox);
// Check sync committee messages after attestations as their rewards are lesser
// and they don't influence fork choice.
} else if let Some(item) = sync_contribution_queue.pop() {
self.spawn_worker(item, toolbox);
} else if let Some(item) = sync_message_queue.pop() {
self.spawn_worker(item, toolbox);
// Aggregates and unaggregates queued for re-processing are older and we
// care about fresher ones, so check those first.
} else if let Some(item) = unknown_block_aggregate_queue.pop() {
@@ -952,6 +1025,10 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
Work::GossipAttesterSlashing { .. } => {
gossip_attester_slashing_queue.push(work, work_id, &self.log)
}
Work::GossipSyncSignature { .. } => sync_message_queue.push(work),
Work::GossipSyncContribution { .. } => {
sync_contribution_queue.push(work)
}
Work::RpcBlock { .. } => rpc_block_queue.push(work, work_id, &self.log),
Work::ChainSegment { .. } => {
chain_segment_queue.push(work, work_id, &self.log)
@@ -985,6 +1062,14 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_QUEUE_TOTAL,
aggregate_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_SYNC_MESSAGE_QUEUE_TOTAL,
sync_message_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_SYNC_CONTRIBUTION_QUEUE_TOTAL,
sync_contribution_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_QUEUE_TOTAL,
gossip_block_queue.len() as i64,
@@ -1188,6 +1273,36 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
peer_id,
*attester_slashing,
),
/*
* Sync committee message verification.
*/
Work::GossipSyncSignature {
message_id,
peer_id,
sync_signature,
subnet_id,
seen_timestamp,
} => worker.process_gossip_sync_committee_signature(
message_id,
peer_id,
*sync_signature,
subnet_id,
seen_timestamp,
),
/*
* Syn contribution verification.
*/
Work::GossipSyncContribution {
message_id,
peer_id,
sync_contribution,
seen_timestamp,
} => worker.process_sync_committee_contribution(
message_id,
peer_id,
*sync_contribution,
seen_timestamp,
),
/*
* Verification for beacon blocks received during syncing via RPC.
*/

View File

@@ -9,8 +9,12 @@ use beacon_chain::test_utils::{
};
use beacon_chain::{BeaconChain, MAXIMUM_GOSSIP_CLOCK_DISPARITY};
use environment::{null_logger, Environment, EnvironmentBuilder};
use eth2_libp2p::discv5::enr::{CombinedKey, EnrBuilder};
use eth2_libp2p::{rpc::methods::MetaData, types::EnrBitfield, MessageId, NetworkGlobals, PeerId};
use eth2_libp2p::{
discv5::enr::{CombinedKey, EnrBuilder},
rpc::methods::{MetaData, MetaDataV2},
types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield},
MessageId, NetworkGlobals, PeerId,
};
use slot_clock::SlotClock;
use std::cmp;
use std::iter::Iterator;
@@ -163,10 +167,11 @@ impl TestRig {
let (sync_tx, _sync_rx) = mpsc::unbounded_channel();
// Default metadata
let meta_data = MetaData {
let meta_data = MetaData::V2(MetaDataV2 {
seq_number: SEQ_NUMBER,
attnets: EnrBitfield::<MainnetEthSpec>::default(),
};
attnets: EnrAttestationBitfield::<MainnetEthSpec>::default(),
syncnets: EnrSyncCommitteeBitfield::<MainnetEthSpec>::default(),
});
let enr_key = CombinedKey::generate_secp256k1();
let enr = EnrBuilder::new("v4").build(&enr_key).unwrap();
let network_globals = Arc::new(NetworkGlobals::new(

View File

@@ -3,6 +3,7 @@ use crate::{metrics, service::NetworkMessage, sync::SyncMessage};
use beacon_chain::{
attestation_verification::{Error as AttnError, SignatureVerifiedAttestation},
observed_operations::ObservationOutcome,
sync_committee_verification::Error as SyncCommitteeError,
validator_monitor::get_block_delay_ms,
BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, GossipVerifiedBlock,
};
@@ -14,7 +15,8 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
use types::{
Attestation, AttesterSlashing, EthSpec, Hash256, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedVoluntaryExit, SubnetId,
SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId,
SyncCommitteeMessage, SyncSubnetId,
};
use super::{
@@ -688,6 +690,131 @@ impl<T: BeaconChainTypes> Worker<T> {
}
}
/// Process the sync committee signature received from the gossip network and:
///
/// - If it passes gossip propagation criteria, tell the network thread to forward it.
/// - Attempt to add it to the naive aggregation pool.
///
/// Raises a log if there are errors.
pub fn process_gossip_sync_committee_signature(
self,
message_id: MessageId,
peer_id: PeerId,
sync_signature: SyncCommitteeMessage,
subnet_id: SyncSubnetId,
_seen_timestamp: Duration,
) {
let sync_signature = match self
.chain
.verify_sync_committee_message_for_gossip(sync_signature, subnet_id)
{
Ok(sync_signature) => sync_signature,
Err(e) => {
self.handle_sync_committee_message_failure(
peer_id,
message_id,
"sync_signature",
e,
);
return;
}
};
/*TODO:
// Register the sync signature with any monitored validators.
self.chain
.validator_monitor
.read()
.register_gossip_unaggregated_attestation(
seen_timestamp,
attestation.indexed_attestation(),
&self.chain.slot_clock,
);
*/
// Indicate to the `Network` service that this message is valid and can be
// propagated on the gossip network.
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
metrics::inc_counter(&metrics::BEACON_PROCESSOR_SYNC_MESSAGE_VERIFIED_TOTAL);
if let Err(e) = self
.chain
.add_to_naive_sync_aggregation_pool(sync_signature)
{
debug!(
self.log,
"Sync committee signature invalid for agg pool";
"reason" => ?e,
"peer" => %peer_id,
)
}
metrics::inc_counter(&metrics::BEACON_PROCESSOR_SYNC_MESSAGE_IMPORTED_TOTAL);
}
/// Process the sync committee contribution received from the gossip network and:
///
/// - If it passes gossip propagation criteria, tell the network thread to forward it.
/// - Attempt to add it to the block inclusion pool.
///
/// Raises a log if there are errors.
pub fn process_sync_committee_contribution(
self,
message_id: MessageId,
peer_id: PeerId,
sync_contribution: SignedContributionAndProof<T::EthSpec>,
_seen_timestamp: Duration,
) {
let sync_contribution = match self
.chain
.verify_sync_contribution_for_gossip(sync_contribution)
{
Ok(sync_contribution) => sync_contribution,
Err(e) => {
// Report the failure to gossipsub
self.handle_sync_committee_message_failure(
peer_id,
message_id,
"sync_contribution",
e,
);
return;
}
};
// Indicate to the `Network` service that this message is valid and can be
// propagated on the gossip network.
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
/* TODO
// Register the attestation with any monitored validators.
self.chain
.validator_monitor
.read()
.register_gossip_aggregated_attestation(
seen_timestamp,
aggregate.aggregate(),
aggregate.indexed_attestation(),
&self.chain.slot_clock,
);
metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL);
*/
if let Err(e) = self
.chain
.add_contribution_to_block_inclusion_pool(sync_contribution)
{
debug!(
self.log,
"Sync contribution invalid for op pool";
"reason" => ?e,
"peer" => %peer_id,
)
}
metrics::inc_counter(&metrics::BEACON_PROCESSOR_SYNC_CONTRIBUTION_IMPORTED_TOTAL);
}
/// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the
/// network.
fn handle_attestation_verification_failure(
@@ -740,8 +867,7 @@ impl<T: BeaconChainTypes> Worker<T> {
/*
* The aggregate had no signatures and is therefore worthless.
*
* Whilst we don't gossip this attestation, this act is **not** a clear
* violation of the spec nor indication of fault.
* This is forbidden by the p2p spec. Reject the message.
*
*/
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
@@ -1079,4 +1205,242 @@ impl<T: BeaconChainTypes> Worker<T> {
"type" => ?attestation_type,
);
}
/// Handle an error whilst verifying a `SyncCommitteeMessage` or `SignedContributionAndProof` from the
/// network.
pub fn handle_sync_committee_message_failure(
&self,
peer_id: PeerId,
message_id: MessageId,
message_type: &str,
error: SyncCommitteeError,
) {
metrics::register_sync_committee_error(&error);
match &error {
SyncCommitteeError::FutureSlot { .. } | SyncCommitteeError::PastSlot { .. } => {
/*
* These errors can be triggered by a mismatch between our slot and the peer.
*
*
* The peer has published an invalid consensus message, _only_ if we trust our own clock.
*/
trace!(
self.log,
"Sync committee message is not within the last MAXIMUM_GOSSIP_CLOCK_DISPARITY slots";
"peer_id" => %peer_id,
"type" => ?message_type,
);
// Peers that are slow or not to spec can spam us with these messages draining our
// bandwidth. We therefore penalize these peers when they do this.
self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError);
// Do not propagate these messages.
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
}
SyncCommitteeError::EmptyAggregationBitfield => {
/*
* The aggregate had no signatures and is therefore worthless.
*
* This is forbidden by the p2p spec. Reject the message.
*
*/
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError);
}
SyncCommitteeError::InvalidSelectionProof { .. }
| SyncCommitteeError::InvalidSignature => {
/*
* These errors are caused by invalid signatures.
*
* The peer has published an invalid consensus message.
*/
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError);
}
SyncCommitteeError::AggregatorNotInCommittee { .. }
| SyncCommitteeError::AggregatorPubkeyUnknown(_) => {
/*
* The aggregator is not in the committee for the given `ContributionAndSync` OR
The aggregator index was higher than any known validator index
*
* The peer has published an invalid consensus message.
*/
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError);
}
SyncCommitteeError::SyncContributionAlreadyKnown(_)
| SyncCommitteeError::AggregatorAlreadyKnown(_) => {
/*
* The sync committee message already been observed on the network or in
* a block.
*
* The peer is not necessarily faulty.
*/
trace!(
self.log,
"Sync committee message is already known";
"peer_id" => %peer_id,
"type" => ?message_type,
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return;
}
SyncCommitteeError::UnknownValidatorIndex(_) => {
/*
* The aggregator index (or similar field) was higher than the maximum
* possible number of validators.
*
* The peer has published an invalid consensus message.
*/
debug!(
self.log,
"Validation Index too high";
"peer_id" => %peer_id,
"type" => ?message_type,
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError);
}
SyncCommitteeError::UnknownValidatorPubkey(_) => {
debug!(
self.log,
"Validator pubkey is unknown";
"peer_id" => %peer_id,
"type" => ?message_type,
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError);
}
SyncCommitteeError::InvalidSubnetId { received, expected } => {
/*
* The sync committee message was received on an incorrect subnet id.
*/
debug!(
self.log,
"Received sync committee message on incorrect subnet";
"expected" => ?expected,
"received" => ?received,
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError);
}
SyncCommitteeError::Invalid(_) => {
/*
* The sync committee message failed the state_processing verification.
*
* The peer has published an invalid consensus message.
*/
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError);
}
SyncCommitteeError::PriorSyncCommitteeMessageKnown { .. } => {
/*
* We have already seen a sync committee message from this validator for this epoch.
*
* The peer is not necessarily faulty.
*/
debug!(
self.log,
"Prior sync committee message known";
"peer_id" => %peer_id,
"type" => ?message_type,
);
// We still penalize the peer slightly. We don't want this to be a recurring
// behaviour.
self.gossip_penalize_peer(peer_id, PeerAction::HighToleranceError);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return;
}
SyncCommitteeError::BeaconChainError(e) => {
/*
* Lighthouse hit an unexpected error whilst processing the sync committee message. It
* should be impossible to trigger a `BeaconChainError` from the network,
* so we have a bug.
*
* It's not clear if the message is invalid/malicious.
*/
error!(
self.log,
"Unable to validate sync committee message";
"peer_id" => %peer_id,
"error" => ?e,
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
// Penalize the peer slightly
self.gossip_penalize_peer(peer_id, PeerAction::HighToleranceError);
}
SyncCommitteeError::BeaconStateError(e) => {
/*
* Lighthouse hit an unexpected error whilst processing the sync committee message. It
* should be impossible to trigger a `BeaconStateError` from the network,
* so we have a bug.
*
* It's not clear if the message is invalid/malicious.
*/
error!(
self.log,
"Unable to validate sync committee message";
"peer_id" => %peer_id,
"error" => ?e,
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
// Penalize the peer slightly
self.gossip_penalize_peer(peer_id, PeerAction::HighToleranceError);
}
SyncCommitteeError::ContributionError(e) => {
error!(
self.log,
"Error while processing sync contribution";
"peer_id" => %peer_id,
"error" => ?e,
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
// Penalize the peer slightly
self.gossip_penalize_peer(peer_id, PeerAction::HighToleranceError);
}
SyncCommitteeError::SyncCommitteeError(e) => {
error!(
self.log,
"Error while processing sync committee message";
"peer_id" => %peer_id,
"error" => ?e,
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
// Penalize the peer slightly
self.gossip_penalize_peer(peer_id, PeerAction::HighToleranceError);
}
SyncCommitteeError::ArithError(e) => {
/*
This would most likely imply incompatible configs or an invalid message.
*/
error!(
self.log,
"Arithematic error while processing sync committee message";
"peer_id" => %peer_id,
"error" => ?e,
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError);
}
SyncCommitteeError::InvalidSubcommittee { .. } => {
/*
The subcommittee index is higher than `SYNC_COMMITTEE_SUBNET_COUNT`. This would imply
an invalid message.
*/
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError);
}
}
debug!(
self.log,
"Invalid sync committee message from network";
"reason" => ?error,
"peer_id" => %peer_id,
"type" => ?message_type,
);
}
}

View File

@@ -6,7 +6,6 @@ pub mod error;
#[allow(clippy::mutable_key_type)] // PeerId in hashmaps are no longer permitted by clippy
pub mod service;
mod attestation_service;
mod beacon_processor;
#[allow(clippy::mutable_key_type)] // PeerId in hashmaps are no longer permitted by clippy
mod metrics;
@@ -14,6 +13,7 @@ mod nat;
mod persisted_dht;
mod router;
mod status;
mod subnet_service;
#[allow(clippy::mutable_key_type)] // PeerId in hashmaps are no longer permitted by clippy
mod sync;

View File

@@ -1,4 +1,7 @@
use beacon_chain::attestation_verification::Error as AttnError;
use beacon_chain::{
attestation_verification::Error as AttnError,
sync_committee_verification::Error as SyncCommitteeError,
};
use eth2_libp2p::PubsubMessage;
use eth2_libp2p::{
types::GossipKind, BandwidthSinks, GossipTopic, Gossipsub, NetworkGlobals, TopicHash,
@@ -7,7 +10,10 @@ use fnv::FnvHashMap;
pub use lighthouse_metrics::*;
use std::{collections::HashMap, sync::Arc};
use strum::AsStaticRef;
use types::{subnet_id::subnet_id_to_string, EthSpec};
use types::{
consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, subnet_id::subnet_id_to_string,
sync_subnet_id::sync_subnet_id_to_string, EthSpec,
};
lazy_static! {
@@ -20,15 +26,27 @@ lazy_static! {
&["protocol"]
);
pub static ref GOSSIPSUB_SUBSCRIBED_SUBNET_TOPIC: Result<IntGaugeVec> = try_create_int_gauge_vec(
"gossipsub_subscribed_subnets",
"Subnets currently subscribed to",
pub static ref GOSSIPSUB_SUBSCRIBED_ATTESTATION_SUBNET_TOPIC: Result<IntGaugeVec> = try_create_int_gauge_vec(
"gossipsub_subscribed_attestation_subnets",
"Attestation subnets currently subscribed to",
&["subnet"]
);
pub static ref GOSSIPSUB_SUBSCRIBED_PEERS_SUBNET_TOPIC: Result<IntGaugeVec> = try_create_int_gauge_vec(
"gossipsub_peers_per_subnet_topic_count",
"Peers subscribed per subnet topic",
pub static ref GOSSIPSUB_SUBSCRIBED_SYNC_SUBNET_TOPIC: Result<IntGaugeVec> = try_create_int_gauge_vec(
"gossipsub_subscribed_sync_subnets",
"Sync subnets currently subscribed to",
&["subnet"]
);
pub static ref GOSSIPSUB_SUBSCRIBED_PEERS_ATTESTATION_SUBNET_TOPIC: Result<IntGaugeVec> = try_create_int_gauge_vec(
"gossipsub_peers_per_attestation_subnet_topic_count",
"Peers subscribed per attestation subnet topic",
&["subnet"]
);
pub static ref GOSSIPSUB_SUBSCRIBED_PEERS_SYNC_SUBNET_TOPIC: Result<IntGaugeVec> = try_create_int_gauge_vec(
"gossipsub_peers_per_sync_subnet_topic_count",
"Peers subscribed per sync subnet topic",
&["subnet"]
);
@@ -38,7 +56,13 @@ lazy_static! {
&["topic_hash"]
);
pub static ref MESH_PEERS_PER_SUBNET_TOPIC: Result<IntGaugeVec> = try_create_int_gauge_vec(
pub static ref MESH_PEERS_PER_ATTESTATION_SUBNET_TOPIC: Result<IntGaugeVec> = try_create_int_gauge_vec(
"gossipsub_mesh_peers_per_subnet_topic",
"Mesh peers per subnet topic",
&["subnet"]
);
pub static ref MESH_PEERS_PER_SYNC_SUBNET_TOPIC: Result<IntGaugeVec> = try_create_int_gauge_vec(
"gossipsub_mesh_peers_per_subnet_topic",
"Mesh peers per subnet topic",
&["subnet"]
@@ -50,9 +74,15 @@ lazy_static! {
&["topic_hash"]
);
pub static ref AVG_GOSSIPSUB_PEER_SCORE_PER_SUBNET_TOPIC: Result<GaugeVec> = try_create_float_gauge_vec(
"gossipsub_avg_peer_score_per_subnet_topic",
"Average peer's score per subnet topic",
pub static ref AVG_GOSSIPSUB_PEER_SCORE_PER_ATTESTATION_SUBNET_TOPIC: Result<GaugeVec> = try_create_float_gauge_vec(
"gossipsub_avg_peer_score_per_attestation_subnet_topic",
"Average peer's score per attestation subnet topic",
&["subnet"]
);
pub static ref AVG_GOSSIPSUB_PEER_SCORE_PER_SYNC_SUBNET_TOPIC: Result<GaugeVec> = try_create_float_gauge_vec(
"gossipsub_avg_peer_score_per_sync_subnet_topic",
"Average peer's score per sync committee subnet topic",
&["subnet"]
);
@@ -133,6 +163,14 @@ lazy_static! {
"gossipsub_aggregated_attestations_rx_total",
"Count of gossip aggregated attestations received"
);
pub static ref GOSSIP_SYNC_COMMITTEE_MESSAGE_RX: Result<IntCounter> = try_create_int_counter(
"gossipsub_sync_committee_message_rx_total",
"Count of gossip sync committee messages received"
);
pub static ref GOSSIP_SYNC_COMMITTEE_CONTRIBUTION_RX: Result<IntCounter> = try_create_int_counter(
"gossipsub_sync_committee_contribution_received_total",
"Count of gossip sync committee contributions received"
);
/*
@@ -150,19 +188,35 @@ lazy_static! {
"gossipsub_aggregated_attestations_tx_total",
"Count of gossip aggregated attestations transmitted"
);
pub static ref GOSSIP_SYNC_COMMITTEE_MESSAGE_TX: Result<IntCounter> = try_create_int_counter(
"gossipsub_sync_committee_message_tx_total",
"Count of gossip sync committee messages transmitted"
);
pub static ref GOSSIP_SYNC_COMMITTEE_CONTRIBUTION_TX: Result<IntCounter> = try_create_int_counter(
"gossipsub_sync_committee_contribution_tx_total",
"Count of gossip sync committee contributions transmitted"
);
/*
* Attestation subnet subscriptions
*/
pub static ref SUBNET_SUBSCRIPTION_REQUESTS: Result<IntCounter> = try_create_int_counter(
"gossipsub_subnet_subscriptions_total",
"Count of validator subscription requests."
"gossipsub_attestation_subnet_subscriptions_total",
"Count of validator attestation subscription requests."
);
pub static ref SUBNET_SUBSCRIPTION_AGGREGATOR_REQUESTS: Result<IntCounter> = try_create_int_counter(
"gossipsub_subnet_subscriptions_aggregator_total",
"Count of validator subscription requests where the subscriber is an aggregator."
);
/*
* Sync committee subnet subscriptions
*/
pub static ref SYNC_COMMITTEE_SUBSCRIPTION_REQUESTS: Result<IntCounter> = try_create_int_counter(
"gossipsub_sync_committee_subnet_subscriptions_total",
"Count of validator sync committee subscription requests."
);
/*
* Gossip processor
*/
@@ -322,6 +376,33 @@ lazy_static! {
"beacon_processor_aggregated_attestation_requeued_total",
"Total number of aggregated attestations that referenced an unknown block and were re-queued."
);
// Sync committee messages.
pub static ref BEACON_PROCESSOR_SYNC_MESSAGE_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_sync_message_queue_total",
"Count of sync committee messages waiting to be processed."
);
pub static ref BEACON_PROCESSOR_SYNC_MESSAGE_VERIFIED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_sync_message_verified_total",
"Total number of sync committee messages verified for gossip."
);
pub static ref BEACON_PROCESSOR_SYNC_MESSAGE_IMPORTED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_sync_message_imported_total",
"Total number of sync committee messages imported to fork choice, etc."
);
// Sync contribution.
pub static ref BEACON_PROCESSOR_SYNC_CONTRIBUTION_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_sync_contribution_queue_total",
"Count of sync committee contributions waiting to be processed."
);
pub static ref BEACON_PROCESSOR_SYNC_CONTRIBUTION_VERIFIED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_sync_contribution_verified_total",
"Total number of sync committee contributions verified for gossip."
);
pub static ref BEACON_PROCESSOR_SYNC_CONTRIBUTION_IMPORTED_TOTAL: Result<IntCounter> = try_create_int_counter(
"beacon_processor_sync_contribution_imported_total",
"Total number of sync committee contributions imported to fork choice, etc."
);
}
lazy_static! {
@@ -331,6 +412,12 @@ lazy_static! {
"Gossipsub attestation errors per error type",
&["type"]
);
pub static ref GOSSIP_SYNC_COMMITTEE_ERRORS_PER_TYPE: Result<IntCounterVec> =
try_create_int_counter_vec(
"gossipsub_sync_committee_errors_per_type",
"Gossipsub sync_committee errors per error type",
&["type"]
);
pub static ref INBOUND_LIBP2P_BYTES: Result<IntGauge> =
try_create_int_gauge("libp2p_inbound_bytes", "The inbound bandwidth over libp2p");
pub static ref OUTBOUND_LIBP2P_BYTES: Result<IntGauge> = try_create_int_gauge(
@@ -402,6 +489,10 @@ pub fn register_attestation_error(error: &AttnError) {
inc_counter_vec(&GOSSIP_ATTESTATION_ERRORS_PER_TYPE, &[error.as_ref()]);
}
pub fn register_sync_committee_error(error: &SyncCommitteeError) {
inc_counter_vec(&GOSSIP_SYNC_COMMITTEE_ERRORS_PER_TYPE, &[error.as_ref()]);
}
/// Inspects the `messages` that were being sent to the network and updates Prometheus metrics.
pub fn expose_publish_metrics<T: EthSpec>(messages: &[PubsubMessage<T>]) {
for message in messages {
@@ -417,6 +508,12 @@ pub fn expose_publish_metrics<T: EthSpec>(messages: &[PubsubMessage<T>]) {
PubsubMessage::AggregateAndProofAttestation(_) => {
inc_counter(&GOSSIP_AGGREGATED_ATTESTATIONS_TX)
}
PubsubMessage::SyncCommitteeMessage(_) => {
inc_counter(&GOSSIP_SYNC_COMMITTEE_MESSAGE_TX)
}
PubsubMessage::SignedContributionAndProof(_) => {
inc_counter(&GOSSIP_SYNC_COMMITTEE_CONTRIBUTION_TX)
}
_ => {}
}
}
@@ -430,6 +527,10 @@ pub fn expose_receive_metrics<T: EthSpec>(message: &PubsubMessage<T>) {
PubsubMessage::AggregateAndProofAttestation(_) => {
inc_counter(&GOSSIP_AGGREGATED_ATTESTATIONS_RX)
}
PubsubMessage::SyncCommitteeMessage(_) => inc_counter(&GOSSIP_SYNC_COMMITTEE_MESSAGE_RX),
PubsubMessage::SignedContributionAndProof(_) => {
inc_counter(&GOSSIP_SYNC_COMMITTEE_CONTRIBUTION_RX)
}
_ => {}
}
}
@@ -447,7 +548,10 @@ pub fn update_gossip_metrics<T: EthSpec>(
let _ = AVG_GOSSIPSUB_PEER_SCORE_PER_MAIN_TOPIC
.as_ref()
.map(|gauge| gauge.reset());
let _ = AVG_GOSSIPSUB_PEER_SCORE_PER_SUBNET_TOPIC
let _ = AVG_GOSSIPSUB_PEER_SCORE_PER_ATTESTATION_SUBNET_TOPIC
.as_ref()
.map(|gauge| gauge.reset());
let _ = AVG_GOSSIPSUB_PEER_SCORE_PER_SYNC_SUBNET_TOPIC
.as_ref()
.map(|gauge| gauge.reset());
@@ -478,30 +582,50 @@ pub fn update_gossip_metrics<T: EthSpec>(
// reset the mesh peers, showing all subnets
for subnet_id in 0..T::default_spec().attestation_subnet_count {
let _ = get_int_gauge(
&MESH_PEERS_PER_SUBNET_TOPIC,
&MESH_PEERS_PER_ATTESTATION_SUBNET_TOPIC,
&[subnet_id_to_string(subnet_id)],
)
.map(|v| v.set(0));
let _ = get_int_gauge(
&GOSSIPSUB_SUBSCRIBED_SUBNET_TOPIC,
&GOSSIPSUB_SUBSCRIBED_ATTESTATION_SUBNET_TOPIC,
&[subnet_id_to_string(subnet_id)],
)
.map(|v| v.set(0));
let _ = get_int_gauge(
&GOSSIPSUB_SUBSCRIBED_PEERS_SUBNET_TOPIC,
&GOSSIPSUB_SUBSCRIBED_PEERS_ATTESTATION_SUBNET_TOPIC,
&[subnet_id_to_string(subnet_id)],
)
.map(|v| v.set(0));
}
for subnet_id in 0..SYNC_COMMITTEE_SUBNET_COUNT {
let _ = get_int_gauge(
&MESH_PEERS_PER_SYNC_SUBNET_TOPIC,
&[sync_subnet_id_to_string(subnet_id)],
)
.map(|v| v.set(0));
let _ = get_int_gauge(
&GOSSIPSUB_SUBSCRIBED_SYNC_SUBNET_TOPIC,
&[sync_subnet_id_to_string(subnet_id)],
)
.map(|v| v.set(0));
let _ = get_int_gauge(
&GOSSIPSUB_SUBSCRIBED_PEERS_SYNC_SUBNET_TOPIC,
&[sync_subnet_id_to_string(subnet_id)],
)
.map(|v| v.set(0));
}
// Subnet topics subscribed to
for topic_hash in gossipsub.topics() {
if let Ok(topic) = GossipTopic::decode(topic_hash.as_str()) {
if let GossipKind::Attestation(subnet_id) = topic.kind() {
let _ = get_int_gauge(
&GOSSIPSUB_SUBSCRIBED_SUBNET_TOPIC,
&GOSSIPSUB_SUBSCRIBED_ATTESTATION_SUBNET_TOPIC,
&[subnet_id_to_string(subnet_id.into())],
)
.map(|v| v.set(1));
@@ -519,7 +643,7 @@ pub fn update_gossip_metrics<T: EthSpec>(
match topic.kind() {
GossipKind::Attestation(subnet_id) => {
if let Some(v) = get_int_gauge(
&GOSSIPSUB_SUBSCRIBED_PEERS_SUBNET_TOPIC,
&GOSSIPSUB_SUBSCRIBED_PEERS_ATTESTATION_SUBNET_TOPIC,
&[subnet_id_to_string(subnet_id.into())],
) {
v.inc()
@@ -528,13 +652,31 @@ pub fn update_gossip_metrics<T: EthSpec>(
// average peer scores
if let Some(score) = gossipsub.peer_score(peer_id) {
if let Some(v) = get_gauge(
&AVG_GOSSIPSUB_PEER_SCORE_PER_SUBNET_TOPIC,
&AVG_GOSSIPSUB_PEER_SCORE_PER_ATTESTATION_SUBNET_TOPIC,
&[subnet_id_to_string(subnet_id.into())],
) {
v.add(score)
};
}
}
GossipKind::SyncCommitteeMessage(subnet_id) => {
if let Some(v) = get_int_gauge(
&GOSSIPSUB_SUBSCRIBED_PEERS_SYNC_SUBNET_TOPIC,
&[sync_subnet_id_to_string(subnet_id.into())],
) {
v.inc()
};
// average peer scores
if let Some(score) = gossipsub.peer_score(peer_id) {
if let Some(v) = get_gauge(
&AVG_GOSSIPSUB_PEER_SCORE_PER_SYNC_SUBNET_TOPIC,
&[sync_subnet_id_to_string(subnet_id.into())],
) {
v.add(score)
};
}
}
kind => {
// main topics
if let Some(score) = gossipsub.peer_score(peer_id) {
@@ -557,12 +699,21 @@ pub fn update_gossip_metrics<T: EthSpec>(
GossipKind::Attestation(subnet_id) => {
// average peer scores
if let Some(v) = get_gauge(
&AVG_GOSSIPSUB_PEER_SCORE_PER_SUBNET_TOPIC,
&AVG_GOSSIPSUB_PEER_SCORE_PER_ATTESTATION_SUBNET_TOPIC,
&[subnet_id_to_string(subnet_id.into())],
) {
v.set(v.get() / (*peers as f64))
};
}
GossipKind::SyncCommitteeMessage(subnet_id) => {
// average peer scores
if let Some(v) = get_gauge(
&AVG_GOSSIPSUB_PEER_SCORE_PER_SYNC_SUBNET_TOPIC,
&[sync_subnet_id_to_string(subnet_id.into())],
) {
v.set(v.get() / (*peers as f64))
};
}
kind => {
// main topics
if let Some(v) =
@@ -582,12 +733,20 @@ pub fn update_gossip_metrics<T: EthSpec>(
match topic.kind() {
GossipKind::Attestation(subnet_id) => {
if let Some(v) = get_int_gauge(
&MESH_PEERS_PER_SUBNET_TOPIC,
&MESH_PEERS_PER_ATTESTATION_SUBNET_TOPIC,
&[subnet_id_to_string(subnet_id.into())],
) {
v.set(peers as i64)
};
}
GossipKind::SyncCommitteeMessage(subnet_id) => {
if let Some(v) = get_int_gauge(
&MESH_PEERS_PER_SYNC_SUBNET_TOPIC,
&[sync_subnet_id_to_string(subnet_id.into())],
) {
v.set(peers as i64)
};
}
kind => {
// main topics
if let Some(v) = get_int_gauge(&MESH_PEERS_PER_MAIN_TOPIC, &[kind.as_ref()]) {

View File

@@ -247,6 +247,31 @@ impl<T: BeaconChainTypes> Router<T> {
self.processor
.on_attester_slashing_gossip(id, peer_id, attester_slashing);
}
PubsubMessage::SignedContributionAndProof(contribution_and_proof) => {
trace!(
self.log,
"Received sync committee aggregate";
"peer_id" => %peer_id
);
self.processor.on_sync_committee_contribution_gossip(
id,
peer_id,
*contribution_and_proof,
);
}
PubsubMessage::SyncCommitteeMessage(sync_committtee_msg) => {
trace!(
self.log,
"Received sync committee signature";
"peer_id" => %peer_id
);
self.processor.on_sync_committee_signature_gossip(
id,
peer_id,
sync_committtee_msg.1,
sync_committtee_msg.0,
);
}
}
}
}

View File

@@ -10,10 +10,11 @@ use slog::{debug, error, o, trace, warn};
use std::cmp;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use store::SyncCommitteeMessage;
use tokio::sync::mpsc;
use types::{
Attestation, AttesterSlashing, ChainSpec, EthSpec, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedVoluntaryExit, SubnetId,
Attestation, AttesterSlashing, EthSpec, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, SyncSubnetId,
};
/// Processes validated messages from the network. It relays necessary data to the syncing thread
@@ -309,6 +310,36 @@ impl<T: BeaconChainTypes> Processor<T> {
))
}
pub fn on_sync_committee_signature_gossip(
&mut self,
message_id: MessageId,
peer_id: PeerId,
sync_signature: SyncCommitteeMessage,
subnet_id: SyncSubnetId,
) {
self.send_beacon_processor_work(BeaconWorkEvent::gossip_sync_signature(
message_id,
peer_id,
sync_signature,
subnet_id,
timestamp_now(),
))
}
pub fn on_sync_committee_contribution_gossip(
&mut self,
message_id: MessageId,
peer_id: PeerId,
sync_contribution: SignedContributionAndProof<T::EthSpec>,
) {
self.send_beacon_processor_work(BeaconWorkEvent::gossip_sync_contribution(
message_id,
peer_id,
sync_contribution,
timestamp_now(),
))
}
fn send_beacon_processor_work(&mut self, work: BeaconWorkEvent<T>) {
self.beacon_processor_send
.try_send(work)
@@ -328,10 +359,7 @@ pub(crate) fn status_message<T: BeaconChainTypes>(
beacon_chain: &BeaconChain<T>,
) -> Result<StatusMessage, BeaconChainError> {
let head_info = beacon_chain.head_info()?;
let genesis_validators_root = beacon_chain.genesis_validators_root;
let fork_digest =
ChainSpec::compute_fork_digest(head_info.fork.current_version, genesis_validators_root);
let fork_digest = beacon_chain.enr_fork_id().fork_digest;
Ok(StatusMessage {
fork_digest,

View File

@@ -1,38 +1,51 @@
use crate::persisted_dht::{clear_dht, load_dht, persist_dht};
use crate::router::{Router, RouterMessage};
use crate::subnet_service::SyncCommitteeService;
use crate::{error, metrics};
use crate::{
attestation_service::{AttServiceMessage, AttestationService},
subnet_service::{AttestationService, SubnetServiceMessage},
NetworkConfig,
};
use crate::{error, metrics};
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes};
use eth2_libp2p::{
rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId},
Libp2pEvent, PeerAction, PeerRequestId, PubsubMessage, ReportSource, Request, Response,
Libp2pEvent, PeerAction, PeerRequestId, PubsubMessage, ReportSource, Request, Response, Subnet,
};
use eth2_libp2p::{
types::{GossipEncoding, GossipTopic},
BehaviourEvent, MessageId, NetworkGlobals, PeerId,
};
use eth2_libp2p::{types::GossipKind, BehaviourEvent, MessageId, NetworkGlobals, PeerId};
use eth2_libp2p::{MessageAcceptance, Service as LibP2PService};
use futures::future::OptionFuture;
use futures::prelude::*;
use slog::{debug, error, info, o, trace, warn};
use std::{net::SocketAddr, sync::Arc, time::Duration};
use slog::{crit, debug, error, info, o, trace, warn};
use std::{net::SocketAddr, pin::Pin, sync::Arc, time::Duration};
use store::HotColdDB;
use task_executor::ShutdownReason;
use tokio::sync::mpsc;
use tokio::time::Sleep;
use types::{EthSpec, RelativeEpoch, SubnetId, Unsigned, ValidatorSubscription};
use types::{
EthSpec, ForkContext, ForkName, RelativeEpoch, SubnetId, SyncCommitteeSubscription,
SyncSubnetId, Unsigned, ValidatorSubscription,
};
mod tests;
/// The interval (in seconds) that various network metrics will update.
const METRIC_UPDATE_INTERVAL: u64 = 1;
/// Delay after a fork where we unsubscribe from pre-fork topics.
const UNSUBSCRIBE_DELAY_EPOCHS: u64 = 2;
/// Types of messages that the network service can receive.
#[derive(Debug)]
pub enum NetworkMessage<T: EthSpec> {
/// Subscribes a list of validators to specific slots for attestation duties.
Subscribe {
AttestationSubscribe {
subscriptions: Vec<ValidatorSubscription>,
},
SyncCommitteeSubscribe {
subscriptions: Vec<SyncCommitteeSubscription>,
},
/// Subscribes the beacon node to the core gossipsub topics. We do this when we are either
/// synced or close to the head slot.
SubscribeCoreTopics,
@@ -97,6 +110,8 @@ pub struct NetworkService<T: BeaconChainTypes> {
libp2p: LibP2PService<T::EthSpec>,
/// An attestation and subnet manager service.
attestation_service: AttestationService<T>,
/// A sync committeee subnet manager service.
sync_committee_service: SyncCommitteeService<T>,
/// The receiver channel for lighthouse to communicate with the network service.
network_recv: mpsc::UnboundedReceiver<NetworkMessage<T::EthSpec>>,
/// The sending channel for the network service to send messages to be routed throughout
@@ -113,7 +128,9 @@ pub struct NetworkService<T: BeaconChainTypes> {
/// update the UDP socket of discovery if the UPnP mappings get established.
discovery_auto_update: bool,
/// A delay that expires when a new fork takes place.
next_fork_update: Option<Sleep>,
next_fork_update: Pin<Box<OptionFuture<Sleep>>>,
/// A delay that expires when we need to unsubscribe from old fork topics.
next_unsubscribe: Pin<Box<OptionFuture<Sleep>>>,
/// Subscribe to all the subnets once synced.
subscribe_all_subnets: bool,
/// A timer for updating various network metrics.
@@ -121,6 +138,7 @@ pub struct NetworkService<T: BeaconChainTypes> {
/// gossipsub_parameter_update timer
gossipsub_parameter_update: tokio::time::Interval,
/// The logger for the network service.
fork_context: Arc<ForkContext>,
log: slog::Logger,
}
@@ -158,7 +176,19 @@ impl<T: BeaconChainTypes> NetworkService<T> {
let enr_fork_id = beacon_chain.enr_fork_id();
// keep track of when our fork_id needs to be updated
let next_fork_update = next_fork_delay(&beacon_chain);
let next_fork_update = Box::pin(next_fork_delay(&beacon_chain).into());
let next_unsubscribe = Box::pin(None.into());
let current_slot = beacon_chain
.slot()
.unwrap_or(beacon_chain.spec.genesis_slot);
// Create a fork context for the given config and genesis validators root
let fork_context = Arc::new(ForkContext::new::<T::EthSpec>(
current_slot,
beacon_chain.genesis_validators_root,
&beacon_chain.spec,
));
// launch libp2p service
let (network_globals, mut libp2p) = LibP2PService::new(
@@ -166,6 +196,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
config,
enr_fork_id,
&network_log,
fork_context.clone(),
&beacon_chain.spec,
)
.await?;
@@ -193,10 +224,14 @@ impl<T: BeaconChainTypes> NetworkService<T> {
network_log.clone(),
)?;
// attestation service
// attestation subnet service
let attestation_service =
AttestationService::new(beacon_chain.clone(), config, &network_log);
// sync committee subnet service
let sync_committee_service =
SyncCommitteeService::new(beacon_chain.clone(), config, &network_log);
// create a timer for updating network metrics
let metrics_update = tokio::time::interval(Duration::from_secs(METRIC_UPDATE_INTERVAL));
@@ -209,6 +244,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
beacon_chain,
libp2p,
attestation_service,
sync_committee_service,
network_recv,
router_send,
store,
@@ -216,9 +252,11 @@ impl<T: BeaconChainTypes> NetworkService<T> {
upnp_mappings: (None, None),
discovery_auto_update: config.discv5_config.enr_update,
next_fork_update,
next_unsubscribe,
subscribe_all_subnets: config.subscribe_all_subnets,
metrics_update,
gossipsub_parameter_update,
fork_context,
log: network_log,
};
@@ -226,6 +264,26 @@ impl<T: BeaconChainTypes> NetworkService<T> {
Ok((network_globals, network_send))
}
/// Returns the required fork digests that gossipsub needs to subscribe to based on the current slot.
///
/// For `current_slot < fork_slot`, this function returns both the pre-fork and post-fork
/// digests since we should be subscribed to post fork topics before the fork.
pub fn required_gossip_fork_digests(&self) -> Vec<[u8; 4]> {
let fork_context = &self.fork_context;
match fork_context.current_fork() {
ForkName::Base => {
if fork_context.fork_exists(ForkName::Altair) {
fork_context.all_fork_digests()
} else {
vec![fork_context.genesis_context_bytes()]
}
}
ForkName::Altair => vec![fork_context
.to_context_bytes(ForkName::Altair)
.expect("Altair fork bytes should exist as it's initialized in ForkContext")],
}
}
}
fn spawn_service<T: BeaconChainTypes>(
@@ -363,42 +421,69 @@ fn spawn_service<T: BeaconChainTypes>(
}
NetworkMessage::ReportPeer { peer_id, action, source } => service.libp2p.report_peer(&peer_id, action, source),
NetworkMessage::GoodbyePeer { peer_id, reason, source } => service.libp2p.goodbye_peer(&peer_id, reason, source),
NetworkMessage::Subscribe { subscriptions } => {
NetworkMessage::AttestationSubscribe { subscriptions } => {
if let Err(e) = service
.attestation_service
.validator_subscriptions(subscriptions) {
warn!(service.log, "Validator subscription failed"; "error" => e);
warn!(service.log, "Attestation validator subscription failed"; "error" => e);
}
}
NetworkMessage::SyncCommitteeSubscribe { subscriptions } => {
if let Err(e) = service
.sync_committee_service
.validator_subscriptions(subscriptions) {
warn!(service.log, "Sync committee calidator subscription failed"; "error" => e);
}
}
NetworkMessage::SubscribeCoreTopics => {
let mut subscribed_topics: Vec<GossipKind> = vec![];
let already_subscribed = service.network_globals.gossipsub_subscriptions.read().clone();
let already_subscribed = already_subscribed.iter().map(|x| x.kind()).collect::<std::collections::HashSet<_>>();
for topic_kind in eth2_libp2p::types::CORE_TOPICS.iter().filter(|topic| already_subscribed.get(topic).is_none()) {
if service.libp2p.swarm.behaviour_mut().subscribe_kind(topic_kind.clone()) {
subscribed_topics.push(topic_kind.clone());
} else {
warn!(service.log, "Could not subscribe to topic"; "topic" => %topic_kind);
let mut subscribed_topics: Vec<GossipTopic> = vec![];
for topic_kind in eth2_libp2p::types::CORE_TOPICS.iter() {
for fork_digest in service.required_gossip_fork_digests() {
let topic = GossipTopic::new(topic_kind.clone(), GossipEncoding::default(), fork_digest);
if service.libp2p.swarm.behaviour_mut().subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(service.log, "Could not subscribe to topic"; "topic" => %topic);
}
}
}
// if we are to subscribe to all subnets we do it here
// If we are to subscribe to all subnets we do it here
if service.subscribe_all_subnets {
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() {
let subnet_id = SubnetId::new(subnet_id);
let topic_kind = eth2_libp2p::types::GossipKind::Attestation(subnet_id);
if service.libp2p.swarm.behaviour_mut().subscribe_kind(topic_kind.clone()) {
// Update the ENR bitfield.
service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet_id, true);
subscribed_topics.push(topic_kind.clone());
} else {
warn!(service.log, "Could not subscribe to topic"; "topic" => %topic_kind);
let subnet = Subnet::Attestation(SubnetId::new(subnet_id));
// Update the ENR bitfield
service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet, true);
for fork_digest in service.required_gossip_fork_digests() {
let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
if service.libp2p.swarm.behaviour_mut().subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(service.log, "Could not subscribe to topic"; "topic" => %topic);
}
}
}
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SyncCommitteeSubnetCount::to_u64() {
let subnet = Subnet::SyncCommittee(SyncSubnetId::new(subnet_id));
// Update the ENR bitfield
service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet, true);
for fork_digest in service.required_gossip_fork_digests() {
let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
if service.libp2p.swarm.behaviour_mut().subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(service.log, "Could not subscribe to topic"; "topic" => %topic);
}
}
}
}
if !subscribed_topics.is_empty() {
info!(service.log, "Subscribed to topics"; "topics" => ?subscribed_topics);
info!(
service.log,
"Subscribed to topics";
"topics" => ?subscribed_topics.into_iter().map(|topic| format!("{}", topic)).collect::<Vec<_>>()
);
}
}
}
@@ -406,19 +491,51 @@ fn spawn_service<T: BeaconChainTypes>(
// process any attestation service events
Some(attestation_service_message) = service.attestation_service.next() => {
match attestation_service_message {
AttServiceMessage::Subscribe(subnet_id) => {
service.libp2p.swarm.behaviour_mut().subscribe_to_subnet(subnet_id);
SubnetServiceMessage::Subscribe(subnet) => {
for fork_digest in service.required_gossip_fork_digests() {
let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
service.libp2p.swarm.behaviour_mut().subscribe(topic);
}
}
AttServiceMessage::Unsubscribe(subnet_id) => {
service.libp2p.swarm.behaviour_mut().unsubscribe_from_subnet(subnet_id);
SubnetServiceMessage::Unsubscribe(subnet) => {
for fork_digest in service.required_gossip_fork_digests() {
let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
service.libp2p.swarm.behaviour_mut().unsubscribe(topic);
}
}
AttServiceMessage::EnrAdd(subnet_id) => {
service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet_id, true);
SubnetServiceMessage::EnrAdd(subnet) => {
service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet, true);
}
AttServiceMessage::EnrRemove(subnet_id) => {
service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet_id, false);
SubnetServiceMessage::EnrRemove(subnet) => {
service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet, false);
}
AttServiceMessage::DiscoverPeers(subnets_to_discover) => {
SubnetServiceMessage::DiscoverPeers(subnets_to_discover) => {
service.libp2p.swarm.behaviour_mut().discover_subnet_peers(subnets_to_discover);
}
}
}
// process any sync committee service events
Some(sync_committee_service_message) = service.sync_committee_service.next() => {
match sync_committee_service_message {
SubnetServiceMessage::Subscribe(subnet) => {
for fork_digest in service.required_gossip_fork_digests() {
let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
service.libp2p.swarm.behaviour_mut().subscribe(topic);
}
}
SubnetServiceMessage::Unsubscribe(subnet) => {
for fork_digest in service.required_gossip_fork_digests() {
let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
service.libp2p.swarm.behaviour_mut().unsubscribe(topic);
}
}
SubnetServiceMessage::EnrAdd(subnet) => {
service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet, true);
}
SubnetServiceMessage::EnrRemove(subnet) => {
service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet, false);
}
SubnetServiceMessage::DiscoverPeers(subnets_to_discover) => {
service.libp2p.swarm.behaviour_mut().discover_subnet_peers(subnets_to_discover);
}
}
@@ -532,34 +649,57 @@ fn spawn_service<T: BeaconChainTypes>(
}
}
}
}
Some(_) = &mut service.next_fork_update => {
let new_enr_fork_id = service.beacon_chain.enr_fork_id();
if let Some(delay) = &service.next_fork_update {
if delay.is_elapsed() {
service
.libp2p
.swarm
.behaviour_mut()
.update_fork_version(service.beacon_chain.enr_fork_id());
service.next_fork_update = next_fork_delay(&service.beacon_chain);
let fork_context = &service.fork_context;
if let Some(new_fork_name) = fork_context.from_context_bytes(new_enr_fork_id.fork_digest) {
info!(
service.log,
"Updating enr fork version";
"old_fork" => ?fork_context.current_fork(),
"new_fork" => ?new_fork_name,
);
fork_context.update_current_fork(*new_fork_name);
service
.libp2p
.swarm
.behaviour_mut()
.update_fork_version(new_enr_fork_id.clone());
// Reinitialize the next_fork_update
service.next_fork_update = Box::pin(next_fork_delay(&service.beacon_chain).into());
// Set the next_unsubscribe delay.
let epoch_duration = service.beacon_chain.spec.seconds_per_slot * T::EthSpec::slots_per_epoch();
let unsubscribe_delay = Duration::from_secs(UNSUBSCRIBE_DELAY_EPOCHS * epoch_duration);
service.next_unsubscribe = Box::pin(Some(tokio::time::sleep(unsubscribe_delay)).into());
info!(service.log, "Network will unsubscribe from old fork gossip topics in a few epochs"; "remaining_epochs" => UNSUBSCRIBE_DELAY_EPOCHS);
} else {
crit!(service.log, "Unknown new enr fork id"; "new_fork_id" => ?new_enr_fork_id);
}
}
Some(_) = &mut service.next_unsubscribe => {
let new_enr_fork_id = service.beacon_chain.enr_fork_id();
service.libp2p.swarm.behaviour_mut().unsubscribe_from_fork_topics_except(new_enr_fork_id.fork_digest);
info!(service.log, "Unsubscribed from old fork topics");
service.next_unsubscribe = Box::pin(None.into());
}
}
metrics::update_bandwidth_metrics(service.libp2p.bandwidth.clone());
}
}, "network");
}
/// Returns a `Sleep` that triggers shortly after the next change in the beacon chain fork version.
/// Returns a `Sleep` that triggers after the next change in the beacon chain fork version.
/// If there is no scheduled fork, `None` is returned.
fn next_fork_delay<T: BeaconChainTypes>(
beacon_chain: &BeaconChain<T>,
) -> Option<tokio::time::Sleep> {
beacon_chain.duration_to_next_fork().map(|until_fork| {
// Add a short time-out to start within the new fork period.
let delay = Duration::from_millis(200);
tokio::time::sleep_until(tokio::time::Instant::now() + until_fork + delay)
})
beacon_chain
.duration_to_next_fork()
.map(|(_, until_fork)| tokio::time::sleep(until_fork))
}
impl<T: BeaconChainTypes> Drop for NetworkService<T> {

View File

@@ -1,5 +1,4 @@
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes};
use types::ChainSpec;
use eth2_libp2p::rpc::StatusMessage;
/// Trait to produce a `StatusMessage` representing the state of the given `beacon_chain`.
@@ -13,10 +12,7 @@ pub trait ToStatusMessage {
impl<T: BeaconChainTypes> ToStatusMessage for BeaconChain<T> {
fn status_message(&self) -> Result<StatusMessage, BeaconChainError> {
let head_info = self.head_info()?;
let genesis_validators_root = self.genesis_validators_root;
let fork_digest =
ChainSpec::compute_fork_digest(head_info.fork.current_version, genesis_validators_root);
let fork_digest = self.enr_fork_id().fork_digest;
Ok(StatusMessage {
fork_digest,

View File

@@ -2,6 +2,7 @@
//! given time. It schedules subscriptions to shard subnets, requests peer discoveries and
//! determines whether attestations should be aggregated and/or passed to the beacon node.
use super::SubnetServiceMessage;
use std::collections::{HashMap, HashSet, VecDeque};
use std::pin::Pin;
use std::sync::Arc;
@@ -13,16 +14,13 @@ use rand::seq::SliceRandom;
use slog::{debug, error, o, trace, warn};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::{NetworkConfig, SubnetDiscovery};
use eth2_libp2p::{NetworkConfig, Subnet, SubnetDiscovery};
use hashset_delay::HashSetDelay;
use slot_clock::SlotClock;
use types::{Attestation, EthSpec, Slot, SubnetId, ValidatorSubscription};
use crate::metrics;
#[cfg(test)]
mod tests;
/// The minimum number of slots ahead that we attempt to discover peers for a subscription. If the
/// slot is less than this number, skip the peer discovery process.
/// Subnet discovery query takes atmost 30 secs, 2 slots take 24s.
@@ -30,7 +28,6 @@ const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 2;
/// The time (in slots) before a last seen validator is considered absent and we unsubscribe from the random
/// gossip topics that we subscribed to due to the validator connection.
const LAST_SEEN_VALIDATOR_TIMEOUT: u32 = 150;
// 30 mins at a 12s slot time
/// The fraction of a slot that we subscribe to a subnet before the required slot.
///
/// Note: The time is calculated as `time = seconds_per_slot / ADVANCE_SUBSCRIPTION_TIME`.
@@ -39,46 +36,6 @@ const ADVANCE_SUBSCRIBE_TIME: u32 = 3;
/// 36s at 12s slot time
const DEFAULT_EXPIRATION_TIMEOUT: u32 = 3;
#[derive(Debug, Clone)]
pub enum AttServiceMessage {
/// Subscribe to the specified subnet id.
Subscribe(SubnetId),
/// Unsubscribe to the specified subnet id.
Unsubscribe(SubnetId),
/// Add the `SubnetId` to the ENR bitfield.
EnrAdd(SubnetId),
/// Remove the `SubnetId` from the ENR bitfield.
EnrRemove(SubnetId),
/// Discover peers for a list of `SubnetDiscovery`.
DiscoverPeers(Vec<SubnetDiscovery>),
}
/// Note: This `PartialEq` impl is for use only in tests.
/// The `DiscoverPeers` comparison is good enough for testing only.
#[cfg(test)]
impl PartialEq for AttServiceMessage {
fn eq(&self, other: &AttServiceMessage) -> bool {
match (self, other) {
(AttServiceMessage::Subscribe(a), AttServiceMessage::Subscribe(b)) => a == b,
(AttServiceMessage::Unsubscribe(a), AttServiceMessage::Unsubscribe(b)) => a == b,
(AttServiceMessage::EnrAdd(a), AttServiceMessage::EnrAdd(b)) => a == b,
(AttServiceMessage::EnrRemove(a), AttServiceMessage::EnrRemove(b)) => a == b,
(AttServiceMessage::DiscoverPeers(a), AttServiceMessage::DiscoverPeers(b)) => {
if a.len() != b.len() {
return false;
}
for i in 0..a.len() {
if a[i].subnet_id != b[i].subnet_id || a[i].min_ttl != b[i].min_ttl {
return false;
}
}
true
}
_ => false,
}
}
}
/// A particular subnet at a given slot.
#[derive(PartialEq, Eq, Hash, Clone, Debug)]
pub struct ExactSubnet {
@@ -90,13 +47,13 @@ pub struct ExactSubnet {
pub struct AttestationService<T: BeaconChainTypes> {
/// Queued events to return to the driving service.
events: VecDeque<AttServiceMessage>,
events: VecDeque<SubnetServiceMessage>,
/// A reference to the beacon chain to process received attestations.
beacon_chain: Arc<BeaconChain<T>>,
pub(crate) beacon_chain: Arc<BeaconChain<T>>,
/// The collection of currently subscribed random subnets mapped to their expiry deadline.
random_subnets: HashSetDelay<SubnetId>,
pub(crate) random_subnets: HashSetDelay<SubnetId>,
/// The collection of all currently subscribed subnets (long-lived **and** short-lived).
subscriptions: HashSet<SubnetId>,
@@ -332,7 +289,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
.duration_to_slot(exact_subnet.slot + 1)
.map(|duration| std::time::Instant::now() + duration);
Some(SubnetDiscovery {
subnet_id: exact_subnet.subnet_id,
subnet: Subnet::Attestation(exact_subnet.subnet_id),
min_ttl,
})
} else {
@@ -349,7 +306,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
if !discovery_subnets.is_empty() {
self.events
.push_back(AttServiceMessage::DiscoverPeers(discovery_subnets));
.push_back(SubnetServiceMessage::DiscoverPeers(discovery_subnets));
}
Ok(())
}
@@ -474,8 +431,8 @@ impl<T: BeaconChainTypes> AttestationService<T> {
// However, subscribing to random subnets ideally shouldn't happen very often (once in ~27 hours) and
// this makes it easier to deterministically test the attestations service.
self.events
.push_back(AttServiceMessage::DiscoverPeers(vec![SubnetDiscovery {
subnet_id,
.push_back(SubnetServiceMessage::DiscoverPeers(vec![SubnetDiscovery {
subnet: Subnet::Attestation(subnet_id),
min_ttl: None,
}]));
@@ -484,11 +441,14 @@ impl<T: BeaconChainTypes> AttestationService<T> {
self.subscriptions.insert(subnet_id);
debug!(self.log, "Subscribing to random subnet"; "subnet_id" => ?subnet_id);
self.events
.push_back(AttServiceMessage::Subscribe(subnet_id));
.push_back(SubnetServiceMessage::Subscribe(Subnet::Attestation(
subnet_id,
)));
}
// add the subnet to the ENR bitfield
self.events.push_back(AttServiceMessage::EnrAdd(subnet_id));
self.events
.push_back(SubnetServiceMessage::EnrAdd(Subnet::Attestation(subnet_id)));
}
}
@@ -525,7 +485,9 @@ impl<T: BeaconChainTypes> AttestationService<T> {
debug!(self.log, "Subscribing to subnet"; "subnet" => *exact_subnet.subnet_id, "target_slot" => exact_subnet.slot.as_u64());
self.subscriptions.insert(exact_subnet.subnet_id);
self.events
.push_back(AttServiceMessage::Subscribe(exact_subnet.subnet_id));
.push_back(SubnetServiceMessage::Subscribe(Subnet::Attestation(
exact_subnet.subnet_id,
)));
}
}
}
@@ -544,7 +506,9 @@ impl<T: BeaconChainTypes> AttestationService<T> {
self.subscriptions.remove(&exact_subnet.subnet_id);
self.events
.push_back(AttServiceMessage::Unsubscribe(exact_subnet.subnet_id));
.push_back(SubnetServiceMessage::Unsubscribe(Subnet::Attestation(
exact_subnet.subnet_id,
)));
}
/// A random subnet has expired.
@@ -567,12 +531,16 @@ impl<T: BeaconChainTypes> AttestationService<T> {
// we are not at capacity, unsubscribe from the current subnet.
debug!(self.log, "Unsubscribing from random subnet"; "subnet_id" => *subnet_id);
self.events
.push_back(AttServiceMessage::Unsubscribe(subnet_id));
.push_back(SubnetServiceMessage::Unsubscribe(Subnet::Attestation(
subnet_id,
)));
}
// Remove the ENR bitfield bit and choose a new random on from the available subnets
self.events
.push_back(AttServiceMessage::EnrRemove(subnet_id));
.push_back(SubnetServiceMessage::EnrRemove(Subnet::Attestation(
subnet_id,
)));
// Subscribe to a new random subnet
self.subscribe_to_random_subnets(1);
}
@@ -606,19 +574,23 @@ impl<T: BeaconChainTypes> AttestationService<T> {
.any(|s| s.subnet_id == *subnet_id)
{
self.events
.push_back(AttServiceMessage::Unsubscribe(*subnet_id));
.push_back(SubnetServiceMessage::Unsubscribe(Subnet::Attestation(
*subnet_id,
)));
}
// as the long lasting subnet subscription is being removed, remove the subnet_id from
// the ENR bitfield
self.events
.push_back(AttServiceMessage::EnrRemove(*subnet_id));
.push_back(SubnetServiceMessage::EnrRemove(Subnet::Attestation(
*subnet_id,
)));
self.random_subnets.remove(subnet_id);
}
}
}
impl<T: BeaconChainTypes> Stream for AttestationService<T> {
type Item = AttServiceMessage;
type Item = SubnetServiceMessage;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// update the waker if needed

View File

@@ -0,0 +1,50 @@
pub mod attestation_subnets;
pub mod sync_subnets;
use eth2_libp2p::{Subnet, SubnetDiscovery};
pub use attestation_subnets::AttestationService;
pub use sync_subnets::SyncCommitteeService;
#[cfg(test)]
mod tests;
#[derive(Debug, Clone)]
pub enum SubnetServiceMessage {
/// Subscribe to the specified subnet id.
Subscribe(Subnet),
/// Unsubscribe to the specified subnet id.
Unsubscribe(Subnet),
/// Add the `SubnetId` to the ENR bitfield.
EnrAdd(Subnet),
/// Remove the `SubnetId` from the ENR bitfield.
EnrRemove(Subnet),
/// Discover peers for a list of `SubnetDiscovery`.
DiscoverPeers(Vec<SubnetDiscovery>),
}
/// Note: This `PartialEq` impl is for use only in tests.
/// The `DiscoverPeers` comparison is good enough for testing only.
#[cfg(test)]
impl PartialEq for SubnetServiceMessage {
fn eq(&self, other: &SubnetServiceMessage) -> bool {
match (self, other) {
(SubnetServiceMessage::Subscribe(a), SubnetServiceMessage::Subscribe(b)) => a == b,
(SubnetServiceMessage::Unsubscribe(a), SubnetServiceMessage::Unsubscribe(b)) => a == b,
(SubnetServiceMessage::EnrAdd(a), SubnetServiceMessage::EnrAdd(b)) => a == b,
(SubnetServiceMessage::EnrRemove(a), SubnetServiceMessage::EnrRemove(b)) => a == b,
(SubnetServiceMessage::DiscoverPeers(a), SubnetServiceMessage::DiscoverPeers(b)) => {
if a.len() != b.len() {
return false;
}
for i in 0..a.len() {
if a[i].subnet != b[i].subnet || a[i].min_ttl != b[i].min_ttl {
return false;
}
}
true
}
_ => false,
}
}
}

View File

@@ -0,0 +1,350 @@
//! This service keeps track of which sync committee subnet the beacon node should be subscribed to at any
//! given time. It schedules subscriptions to sync committee subnets and requests peer discoveries.
use std::collections::{hash_map::Entry, HashMap, VecDeque};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use futures::prelude::*;
use slog::{debug, error, o, trace, warn};
use super::SubnetServiceMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::{NetworkConfig, Subnet, SubnetDiscovery};
use hashset_delay::HashSetDelay;
use slot_clock::SlotClock;
use types::{Epoch, EthSpec, SyncCommitteeSubscription, SyncSubnetId};
use crate::metrics;
/// The minimum number of slots ahead that we attempt to discover peers for a subscription. If the
/// slot is less than this number, skip the peer discovery process.
/// Subnet discovery query takes atmost 30 secs, 2 slots take 24s.
const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 2;
/// A particular subnet at a given slot.
#[derive(PartialEq, Eq, Hash, Clone, Debug)]
pub struct ExactSubnet {
/// The `SyncSubnetId` associated with this subnet.
pub subnet_id: SyncSubnetId,
/// The epoch until which we need to stay subscribed to the subnet.
pub until_epoch: Epoch,
}
pub struct SyncCommitteeService<T: BeaconChainTypes> {
/// Queued events to return to the driving service.
events: VecDeque<SubnetServiceMessage>,
/// A reference to the beacon chain to process received attestations.
pub(crate) beacon_chain: Arc<BeaconChain<T>>,
/// The collection of all currently subscribed subnets.
subscriptions: HashMap<SyncSubnetId, Epoch>,
/// A collection of timeouts for when to unsubscribe from a subnet.
unsubscriptions: HashSetDelay<SyncSubnetId>,
/// The waker for the current thread.
waker: Option<std::task::Waker>,
/// The discovery mechanism of lighthouse is disabled.
discovery_disabled: bool,
/// We are always subscribed to all subnets.
subscribe_all_subnets: bool,
/// The logger for the attestation service.
log: slog::Logger,
}
impl<T: BeaconChainTypes> SyncCommitteeService<T> {
/* Public functions */
pub fn new(
beacon_chain: Arc<BeaconChain<T>>,
config: &NetworkConfig,
log: &slog::Logger,
) -> Self {
let log = log.new(o!("service" => "sync_committee_service"));
let spec = &beacon_chain.spec;
let epoch_duration_secs =
beacon_chain.slot_clock.slot_duration().as_secs() * T::EthSpec::slots_per_epoch();
let default_timeout =
epoch_duration_secs.saturating_mul(spec.epochs_per_sync_committee_period.as_u64());
SyncCommitteeService {
events: VecDeque::with_capacity(10),
beacon_chain,
subscriptions: HashMap::new(),
unsubscriptions: HashSetDelay::new(Duration::from_secs(default_timeout)),
waker: None,
subscribe_all_subnets: config.subscribe_all_subnets,
discovery_disabled: config.disable_discovery,
log,
}
}
/// Return count of all currently subscribed subnets.
#[cfg(test)]
pub fn subscription_count(&self) -> usize {
use types::consts::altair::SYNC_COMMITTEE_SUBNET_COUNT;
if self.subscribe_all_subnets {
SYNC_COMMITTEE_SUBNET_COUNT as usize
} else {
self.subscriptions.len()
}
}
/// Processes a list of sync committee subscriptions.
///
/// This will:
/// - Search for peers for required subnets.
/// - Request subscriptions required subnets.
/// - Build the timeouts for each of these events.
///
/// This returns a result simply for the ergonomics of using ?. The result can be
/// safely dropped.
pub fn validator_subscriptions(
&mut self,
subscriptions: Vec<SyncCommitteeSubscription>,
) -> Result<(), String> {
let mut subnets_to_discover = Vec::new();
for subscription in subscriptions {
metrics::inc_counter(&metrics::SYNC_COMMITTEE_SUBSCRIPTION_REQUESTS);
//NOTE: We assume all subscriptions have been verified before reaching this service
// Registers the validator with the subnet service.
// This will subscribe to long-lived random subnets if required.
trace!(self.log,
"Sync committee subscription";
"subscription" => ?subscription,
);
let subnet_ids = match SyncSubnetId::compute_subnets_for_sync_committee::<T::EthSpec>(
&subscription.sync_committee_indices,
) {
Ok(subnet_ids) => subnet_ids,
Err(e) => {
warn!(self.log,
"Failed to compute subnet id for sync committee subscription";
"error" => ?e,
"validator_index" => subscription.validator_index
);
continue;
}
};
for subnet_id in subnet_ids {
let exact_subnet = ExactSubnet {
subnet_id,
until_epoch: subscription.until_epoch,
};
subnets_to_discover.push(exact_subnet.clone());
if let Err(e) = self.subscribe_to_subnet(exact_subnet.clone()) {
warn!(self.log,
"Subscription to sync subnet error";
"error" => e,
"validator_index" => subscription.validator_index,
);
} else {
trace!(self.log,
"Subscribed to subnet for sync committee duties";
"exact_subnet" => ?exact_subnet,
"validator_index" => subscription.validator_index
);
}
}
}
// If the discovery mechanism isn't disabled, attempt to set up a peer discovery for the
// required subnets.
if !self.discovery_disabled {
if let Err(e) = self.discover_peers_request(subnets_to_discover.iter()) {
warn!(self.log, "Discovery lookup request error"; "error" => e);
};
}
// pre-emptively wake the thread to check for new events
if let Some(waker) = &self.waker {
waker.wake_by_ref();
}
Ok(())
}
/* Internal private functions */
/// Checks if there are currently queued discovery requests and the time required to make the
/// request.
///
/// If there is sufficient time, queues a peer discovery request for all the required subnets.
fn discover_peers_request<'a>(
&mut self,
exact_subnets: impl Iterator<Item = &'a ExactSubnet>,
) -> Result<(), &'static str> {
let current_slot = self
.beacon_chain
.slot_clock
.now()
.ok_or("Could not get the current slot")?;
let slots_per_epoch = T::EthSpec::slots_per_epoch();
let discovery_subnets: Vec<SubnetDiscovery> = exact_subnets
.filter_map(|exact_subnet| {
let until_slot = exact_subnet.until_epoch.end_slot(slots_per_epoch);
// check if there is enough time to perform a discovery lookup
if until_slot >= current_slot.saturating_add(MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD) {
// if the slot is more than epoch away, add an event to start looking for peers
// add one slot to ensure we keep the peer for the subscription slot
let min_ttl = self
.beacon_chain
.slot_clock
.duration_to_slot(until_slot + 1)
.map(|duration| std::time::Instant::now() + duration);
Some(SubnetDiscovery {
subnet: Subnet::SyncCommittee(exact_subnet.subnet_id),
min_ttl,
})
} else {
// We may want to check the global PeerInfo to see estimated timeouts for each
// peer before they can be removed.
warn!(self.log,
"Not enough time for a discovery search";
"subnet_id" => ?exact_subnet
);
None
}
})
.collect();
if !discovery_subnets.is_empty() {
self.events
.push_back(SubnetServiceMessage::DiscoverPeers(discovery_subnets));
}
Ok(())
}
/// Adds a subscription event and an associated unsubscription event if required.
fn subscribe_to_subnet(&mut self, exact_subnet: ExactSubnet) -> Result<(), &'static str> {
// Return if we have subscribed to all subnets
if self.subscribe_all_subnets {
return Ok(());
}
// Return if we already have a subscription for exact_subnet
if self.subscriptions.get(&exact_subnet.subnet_id) == Some(&exact_subnet.until_epoch) {
return Ok(());
}
// Return if we already have subscription set to expire later than the current request.
if let Some(until_epoch) = self.subscriptions.get(&exact_subnet.subnet_id) {
if *until_epoch >= exact_subnet.until_epoch {
return Ok(());
}
}
// initialise timing variables
let current_slot = self
.beacon_chain
.slot_clock
.now()
.ok_or("Could not get the current slot")?;
let slots_per_epoch = T::EthSpec::slots_per_epoch();
let until_slot = exact_subnet.until_epoch.end_slot(slots_per_epoch);
// Calculate the duration to the unsubscription event.
let expected_end_subscription_duration = if current_slot >= until_slot {
warn!(
self.log,
"Sync committee subscription is past expiration";
"current_slot" => current_slot,
"exact_subnet" => ?exact_subnet,
);
return Ok(());
} else {
let slot_duration = self.beacon_chain.slot_clock.slot_duration();
// the duration until we no longer need this subscription. We assume a single slot is
// sufficient.
self.beacon_chain
.slot_clock
.duration_to_slot(until_slot)
.ok_or("Unable to determine duration to unsubscription slot")?
+ slot_duration
};
if let Entry::Vacant(e) = self.subscriptions.entry(exact_subnet.subnet_id) {
// We are not currently subscribed and have no waiting subscription, create one
debug!(self.log, "Subscribing to subnet"; "subnet" => *exact_subnet.subnet_id, "until_epoch" => ?exact_subnet.until_epoch);
e.insert(exact_subnet.until_epoch);
self.events
.push_back(SubnetServiceMessage::Subscribe(Subnet::SyncCommittee(
exact_subnet.subnet_id,
)));
// add the subnet to the ENR bitfield
self.events
.push_back(SubnetServiceMessage::EnrAdd(Subnet::SyncCommittee(
exact_subnet.subnet_id,
)));
// add an unsubscription event to remove ourselves from the subnet once completed
self.unsubscriptions
.insert_at(exact_subnet.subnet_id, expected_end_subscription_duration);
} else {
// We are already subscribed, extend the unsubscription duration
self.unsubscriptions
.update_timeout(&exact_subnet.subnet_id, expected_end_subscription_duration);
}
Ok(())
}
/// A queued unsubscription is ready.
fn handle_unsubscriptions(&mut self, subnet_id: SyncSubnetId) {
debug!(self.log, "Unsubscribing from subnet"; "subnet" => *subnet_id);
self.subscriptions.remove(&subnet_id);
self.events
.push_back(SubnetServiceMessage::Unsubscribe(Subnet::SyncCommittee(
subnet_id,
)));
self.events
.push_back(SubnetServiceMessage::EnrRemove(Subnet::SyncCommittee(
subnet_id,
)));
}
}
impl<T: BeaconChainTypes> Stream for SyncCommitteeService<T> {
type Item = SubnetServiceMessage;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// update the waker if needed
if let Some(waker) = &self.waker {
if waker.will_wake(cx.waker()) {
self.waker = Some(cx.waker().clone());
}
} else {
self.waker = Some(cx.waker().clone());
}
// process any un-subscription events
match self.unsubscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(exact_subnet))) => self.handle_unsubscriptions(exact_subnet),
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for subnet unsubscription times"; "error"=> e);
}
Poll::Ready(None) | Poll::Pending => {}
}
// process any generated events
if let Some(event) = self.events.pop_front() {
return Poll::Ready(Some(event));
}
Poll::Pending
}
}

View File

@@ -0,0 +1,573 @@
use super::*;
use beacon_chain::{
builder::{BeaconChainBuilder, Witness},
eth1_chain::CachingEth1Backend,
BeaconChain,
};
use eth2_libp2p::NetworkConfig;
use futures::prelude::*;
use genesis::{generate_deterministic_keypairs, interop_genesis_state};
use lazy_static::lazy_static;
use slog::Logger;
use sloggers::{null::NullLoggerBuilder, Build};
use slot_clock::{SlotClock, SystemTimeSlotClock};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use store::config::StoreConfig;
use store::{HotColdDB, MemoryStore};
use types::{
CommitteeIndex, Epoch, EthSpec, MainnetEthSpec, Slot, SubnetId, SyncCommitteeSubscription,
SyncSubnetId, ValidatorSubscription,
};
const SLOT_DURATION_MILLIS: u64 = 400;
type TestBeaconChainType = Witness<
SystemTimeSlotClock,
CachingEth1Backend<MainnetEthSpec>,
MainnetEthSpec,
MemoryStore<MainnetEthSpec>,
MemoryStore<MainnetEthSpec>,
>;
pub struct TestBeaconChain {
chain: Arc<BeaconChain<TestBeaconChainType>>,
}
impl TestBeaconChain {
pub fn new_with_system_clock() -> Self {
let spec = MainnetEthSpec::default_spec();
let keypairs = generate_deterministic_keypairs(1);
let log = get_logger();
let store =
HotColdDB::open_ephemeral(StoreConfig::default(), spec.clone(), log.clone()).unwrap();
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let chain = Arc::new(
BeaconChainBuilder::new(MainnetEthSpec)
.logger(log.clone())
.custom_spec(spec.clone())
.store(Arc::new(store))
.genesis_state(
interop_genesis_state::<MainnetEthSpec>(&keypairs, 0, &spec)
.expect("should generate interop state"),
)
.expect("should build state using recent genesis")
.dummy_eth1_backend()
.expect("should build dummy backend")
.slot_clock(SystemTimeSlotClock::new(
Slot::new(0),
Duration::from_secs(recent_genesis_time()),
Duration::from_millis(SLOT_DURATION_MILLIS),
))
.shutdown_sender(shutdown_tx)
.monitor_validators(true, vec![], log)
.build()
.expect("should build"),
);
Self { chain }
}
}
pub fn recent_genesis_time() -> u64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
}
fn get_logger() -> Logger {
NullLoggerBuilder.build().expect("logger should build")
}
lazy_static! {
static ref CHAIN: TestBeaconChain = TestBeaconChain::new_with_system_clock();
}
fn get_attestation_service() -> AttestationService<TestBeaconChainType> {
let log = get_logger();
let config = NetworkConfig::default();
let beacon_chain = CHAIN.chain.clone();
AttestationService::new(beacon_chain, &config, &log)
}
fn get_sync_committee_service() -> SyncCommitteeService<TestBeaconChainType> {
let log = get_logger();
let config = NetworkConfig::default();
let beacon_chain = CHAIN.chain.clone();
SyncCommitteeService::new(beacon_chain, &config, &log)
}
// gets a number of events from the subscription service, or returns none if it times out after a number
// of slots
async fn get_events<S: Stream<Item = SubnetServiceMessage> + Unpin>(
stream: &mut S,
num_events: Option<usize>,
num_slots_before_timeout: u32,
) -> Vec<SubnetServiceMessage> {
let mut events = Vec::new();
let collect_stream_fut = async {
loop {
if let Some(result) = stream.next().await {
events.push(result);
if let Some(num) = num_events {
if events.len() == num {
return;
}
}
}
}
};
tokio::select! {
_ = collect_stream_fut => events,
_ = tokio::time::sleep(
Duration::from_millis(SLOT_DURATION_MILLIS) * num_slots_before_timeout,
) => events
}
}
mod attestation_service {
use super::*;
fn get_subscription(
validator_index: u64,
attestation_committee_index: CommitteeIndex,
slot: Slot,
committee_count_at_slot: u64,
) -> ValidatorSubscription {
let is_aggregator = true;
ValidatorSubscription {
validator_index,
attestation_committee_index,
slot,
committee_count_at_slot,
is_aggregator,
}
}
fn get_subscriptions(
validator_count: u64,
slot: Slot,
committee_count_at_slot: u64,
) -> Vec<ValidatorSubscription> {
(0..validator_count)
.map(|validator_index| {
get_subscription(
validator_index,
validator_index,
slot,
committee_count_at_slot,
)
})
.collect()
}
#[tokio::test]
async fn subscribe_current_slot_wait_for_unsubscribe() {
// subscription config
let validator_index = 1;
let committee_index = 1;
// Keep a low subscription slot so that there are no additional subnet discovery events.
let subscription_slot = 0;
let committee_count = 1;
// create the attestation service and subscriptions
let mut attestation_service = get_attestation_service();
let current_slot = attestation_service
.beacon_chain
.slot_clock
.now()
.expect("Could not get current slot");
let subscriptions = vec![get_subscription(
validator_index,
committee_index,
current_slot + Slot::new(subscription_slot),
committee_count,
)];
// submit the subscriptions
attestation_service
.validator_subscriptions(subscriptions)
.unwrap();
// not enough time for peer discovery, just subscribe, unsubscribe
let subnet_id = SubnetId::compute_subnet::<MainnetEthSpec>(
current_slot + Slot::new(subscription_slot),
committee_index,
committee_count,
&attestation_service.beacon_chain.spec,
)
.unwrap();
let expected = vec![
SubnetServiceMessage::Subscribe(Subnet::Attestation(subnet_id)),
SubnetServiceMessage::Unsubscribe(Subnet::Attestation(subnet_id)),
];
// Wait for 1 slot duration to get the unsubscription event
let events = get_events(
&mut attestation_service,
Some(5),
(MainnetEthSpec::slots_per_epoch() * 3) as u32,
)
.await;
matches::assert_matches!(
events[..3],
[
SubnetServiceMessage::DiscoverPeers(_),
SubnetServiceMessage::Subscribe(_any1),
SubnetServiceMessage::EnrAdd(_any3)
]
);
// If the long lived and short lived subnets are the same, there should be no more events
// as we don't resubscribe already subscribed subnets.
if !attestation_service.random_subnets.contains(&subnet_id) {
assert_eq!(expected[..], events[3..]);
}
// Should be subscribed to only 1 long lived subnet after unsubscription.
assert_eq!(attestation_service.subscription_count(), 1);
}
/// Test to verify that we are not unsubscribing to a subnet before a required subscription.
#[tokio::test]
async fn test_same_subnet_unsubscription() {
// subscription config
let validator_index = 1;
let committee_count = 1;
// Makes 2 validator subscriptions to the same subnet but at different slots.
// There should be just 1 unsubscription event for the later slot subscription (subscription_slot2).
let subscription_slot1 = 0;
let subscription_slot2 = 1;
let com1 = 1;
let com2 = 0;
// create the attestation service and subscriptions
let mut attestation_service = get_attestation_service();
let current_slot = attestation_service
.beacon_chain
.slot_clock
.now()
.expect("Could not get current slot");
let sub1 = get_subscription(
validator_index,
com1,
current_slot + Slot::new(subscription_slot1),
committee_count,
);
let sub2 = get_subscription(
validator_index,
com2,
current_slot + Slot::new(subscription_slot2),
committee_count,
);
let subnet_id1 = SubnetId::compute_subnet::<MainnetEthSpec>(
current_slot + Slot::new(subscription_slot1),
com1,
committee_count,
&attestation_service.beacon_chain.spec,
)
.unwrap();
let subnet_id2 = SubnetId::compute_subnet::<MainnetEthSpec>(
current_slot + Slot::new(subscription_slot2),
com2,
committee_count,
&attestation_service.beacon_chain.spec,
)
.unwrap();
// Assert that subscriptions are different but their subnet is the same
assert_ne!(sub1, sub2);
assert_eq!(subnet_id1, subnet_id2);
// submit the subscriptions
attestation_service
.validator_subscriptions(vec![sub1, sub2])
.unwrap();
// Unsubscription event should happen at slot 2 (since subnet id's are the same, unsubscription event should be at higher slot + 1)
// Get all events for 1 slot duration (unsubscription event should happen after 2 slot durations).
let events = get_events(&mut attestation_service, None, 1).await;
matches::assert_matches!(
events[..3],
[
SubnetServiceMessage::DiscoverPeers(_),
SubnetServiceMessage::Subscribe(_any1),
SubnetServiceMessage::EnrAdd(_any3)
]
);
let expected = SubnetServiceMessage::Subscribe(Subnet::Attestation(subnet_id1));
// Should be still subscribed to 1 long lived and 1 short lived subnet if both are different.
if !attestation_service.random_subnets.contains(&subnet_id1) {
assert_eq!(expected, events[3]);
assert_eq!(attestation_service.subscription_count(), 2);
} else {
assert_eq!(attestation_service.subscription_count(), 1);
}
// Get event for 1 more slot duration, we should get the unsubscribe event now.
let unsubscribe_event = get_events(&mut attestation_service, None, 1).await;
// If the long lived and short lived subnets are different, we should get an unsubscription event.
if !attestation_service.random_subnets.contains(&subnet_id1) {
assert_eq!(
[SubnetServiceMessage::Unsubscribe(Subnet::Attestation(
subnet_id1
))],
unsubscribe_event[..]
);
}
// Should be subscribed to only 1 long lived subnet after unsubscription.
assert_eq!(attestation_service.subscription_count(), 1);
}
#[tokio::test]
async fn subscribe_all_random_subnets() {
let attestation_subnet_count = MainnetEthSpec::default_spec().attestation_subnet_count;
let subscription_slot = 10;
let subscription_count = attestation_subnet_count;
let committee_count = 1;
// create the attestation service and subscriptions
let mut attestation_service = get_attestation_service();
let current_slot = attestation_service
.beacon_chain
.slot_clock
.now()
.expect("Could not get current slot");
let subscriptions = get_subscriptions(
subscription_count,
current_slot + subscription_slot,
committee_count,
);
// submit the subscriptions
attestation_service
.validator_subscriptions(subscriptions)
.unwrap();
let events = get_events(&mut attestation_service, None, 3).await;
let mut discover_peer_count = 0;
let mut enr_add_count = 0;
let mut unexpected_msg_count = 0;
for event in &events {
match event {
SubnetServiceMessage::DiscoverPeers(_) => discover_peer_count += 1,
SubnetServiceMessage::Subscribe(_any_subnet) => {}
SubnetServiceMessage::EnrAdd(_any_subnet) => enr_add_count += 1,
_ => unexpected_msg_count += 1,
}
}
// The bulk discovery request length should be equal to validator_count
let bulk_discovery_event = events.last().unwrap();
if let SubnetServiceMessage::DiscoverPeers(d) = bulk_discovery_event {
assert_eq!(d.len(), attestation_subnet_count as usize);
} else {
panic!("Unexpected event {:?}", bulk_discovery_event);
}
// 64 `DiscoverPeer` requests of length 1 corresponding to random subnets
// and 1 `DiscoverPeer` request corresponding to bulk subnet discovery.
assert_eq!(discover_peer_count, subscription_count + 1);
assert_eq!(attestation_service.subscription_count(), 64);
assert_eq!(enr_add_count, 64);
assert_eq!(unexpected_msg_count, 0);
// test completed successfully
}
#[tokio::test]
async fn subscribe_all_random_subnets_plus_one() {
let attestation_subnet_count = MainnetEthSpec::default_spec().attestation_subnet_count;
let subscription_slot = 10;
// the 65th subscription should result in no more messages than the previous scenario
let subscription_count = attestation_subnet_count + 1;
let committee_count = 1;
// create the attestation service and subscriptions
let mut attestation_service = get_attestation_service();
let current_slot = attestation_service
.beacon_chain
.slot_clock
.now()
.expect("Could not get current slot");
let subscriptions = get_subscriptions(
subscription_count,
current_slot + subscription_slot,
committee_count,
);
// submit the subscriptions
attestation_service
.validator_subscriptions(subscriptions)
.unwrap();
let events = get_events(&mut attestation_service, None, 3).await;
let mut discover_peer_count = 0;
let mut enr_add_count = 0;
let mut unexpected_msg_count = 0;
for event in &events {
match event {
SubnetServiceMessage::DiscoverPeers(_) => discover_peer_count += 1,
SubnetServiceMessage::Subscribe(_any_subnet) => {}
SubnetServiceMessage::EnrAdd(_any_subnet) => enr_add_count += 1,
_ => unexpected_msg_count += 1,
}
}
// The bulk discovery request length shouldn't exceed max attestation_subnet_count
let bulk_discovery_event = events.last().unwrap();
if let SubnetServiceMessage::DiscoverPeers(d) = bulk_discovery_event {
assert_eq!(d.len(), attestation_subnet_count as usize);
} else {
panic!("Unexpected event {:?}", bulk_discovery_event);
}
// 64 `DiscoverPeer` requests of length 1 corresponding to random subnets
// and 1 `DiscoverPeer` request corresponding to the bulk subnet discovery.
// For the 65th subscription, the call to `subscribe_to_random_subnets` is not made because we are at capacity.
assert_eq!(discover_peer_count, 64 + 1);
assert_eq!(attestation_service.subscription_count(), 64);
assert_eq!(enr_add_count, 64);
assert_eq!(unexpected_msg_count, 0);
}
}
mod sync_committee_service {
use super::*;
#[tokio::test]
async fn subscribe_and_unsubscribe() {
// subscription config
let validator_index = 1;
let until_epoch = Epoch::new(1);
let sync_committee_indices = vec![1];
// create the attestation service and subscriptions
let mut sync_committee_service = get_sync_committee_service();
let subscriptions = vec![SyncCommitteeSubscription {
validator_index,
sync_committee_indices: sync_committee_indices.clone(),
until_epoch,
}];
// submit the subscriptions
sync_committee_service
.validator_subscriptions(subscriptions)
.unwrap();
let subnet_ids = SyncSubnetId::compute_subnets_for_sync_committee::<MainnetEthSpec>(
&sync_committee_indices,
)
.unwrap();
let subnet_id = subnet_ids.iter().next().unwrap();
// Note: the unsubscription event takes 2 epochs (8 * 2 * 0.4 secs = 3.2 secs)
let events = get_events(
&mut sync_committee_service,
Some(5),
(MainnetEthSpec::slots_per_epoch() * 3) as u32, // Have some buffer time before getting 5 events
)
.await;
assert_eq!(
events[..2],
[
SubnetServiceMessage::Subscribe(Subnet::SyncCommittee(*subnet_id)),
SubnetServiceMessage::EnrAdd(Subnet::SyncCommittee(*subnet_id))
]
);
matches::assert_matches!(
events[2..],
[
SubnetServiceMessage::DiscoverPeers(_),
SubnetServiceMessage::Unsubscribe(_),
SubnetServiceMessage::EnrRemove(_),
]
);
// Should be unsubscribed at the end.
assert_eq!(sync_committee_service.subscription_count(), 0);
}
#[tokio::test]
async fn same_subscription_with_lower_until_epoch() {
// subscription config
let validator_index = 1;
let until_epoch = Epoch::new(2);
let sync_committee_indices = vec![1];
// create the attestation service and subscriptions
let mut sync_committee_service = get_sync_committee_service();
let subscriptions = vec![SyncCommitteeSubscription {
validator_index,
sync_committee_indices: sync_committee_indices.clone(),
until_epoch,
}];
// submit the subscriptions
sync_committee_service
.validator_subscriptions(subscriptions)
.unwrap();
// Get all immediate events (won't include unsubscriptions)
let events = get_events(&mut sync_committee_service, None, 1).await;
matches::assert_matches!(
events[..],
[
SubnetServiceMessage::Subscribe(Subnet::SyncCommittee(_)),
SubnetServiceMessage::EnrAdd(Subnet::SyncCommittee(_)),
SubnetServiceMessage::DiscoverPeers(_),
]
);
// Additional subscriptions which shouldn't emit any non-discovery events
// Event 1 is a duplicate of an existing subscription
// Event 2 is the same subscription with lower `until_epoch` than the existing subscription
let subscriptions = vec![
SyncCommitteeSubscription {
validator_index,
sync_committee_indices: sync_committee_indices.clone(),
until_epoch,
},
SyncCommitteeSubscription {
validator_index,
sync_committee_indices: sync_committee_indices.clone(),
until_epoch: until_epoch - 1,
},
];
// submit the subscriptions
sync_committee_service
.validator_subscriptions(subscriptions)
.unwrap();
// Get all immediate events (won't include unsubscriptions)
let events = get_events(&mut sync_committee_service, None, 1).await;
matches::assert_matches!(events[..], [SubnetServiceMessage::DiscoverPeers(_),]);
// Should be unsubscribed at the end.
assert_eq!(sync_committee_service.subscription_count(), 1);
}
}