Subscribe to altair gossip topics 2 slots before fork (#2532)

## Issue Addressed

N/A

## Proposed Changes

Add a fork_digest to `ForkContext` only if it is set in the config.
Reject gossip messages on post fork topics before the fork happens.

Edit: Instead of rejecting gossip messages on post fork topics, we now subscribe to post fork topics 2 slots before the fork.

Co-authored-by: Age Manning <Age@AgeManning.com>
This commit is contained in:
Pawan Dhananjay
2021-09-17 01:11:16 +00:00
parent acdcea9663
commit 64ad2af100
7 changed files with 145 additions and 24 deletions

View File

@@ -25,14 +25,16 @@ use task_executor::ShutdownReason;
use tokio::sync::mpsc;
use tokio::time::Sleep;
use types::{
EthSpec, ForkContext, ForkName, RelativeEpoch, SubnetId, SyncCommitteeSubscription,
SyncSubnetId, Unsigned, ValidatorSubscription,
ChainSpec, EthSpec, ForkContext, ForkName, RelativeEpoch, Slot, SubnetId,
SyncCommitteeSubscription, SyncSubnetId, Unsigned, ValidatorSubscription,
};
mod tests;
/// The interval (in seconds) that various network metrics will update.
const METRIC_UPDATE_INTERVAL: u64 = 1;
/// Number of slots before the fork when we should subscribe to the new fork topics.
const SUBSCRIBE_DELAY_SLOTS: u64 = 2;
/// Delay after a fork where we unsubscribe from pre-fork topics.
const UNSUBSCRIBE_DELAY_EPOCHS: u64 = 2;
@@ -129,6 +131,8 @@ pub struct NetworkService<T: BeaconChainTypes> {
discovery_auto_update: bool,
/// A delay that expires when a new fork takes place.
next_fork_update: Pin<Box<OptionFuture<Sleep>>>,
/// A delay that expires when we need to subscribe to a new fork's topics.
next_fork_subscriptions: Pin<Box<OptionFuture<Sleep>>>,
/// A delay that expires when we need to unsubscribe from old fork topics.
next_unsubscribe: Pin<Box<OptionFuture<Sleep>>>,
/// Subscribe to all the subnets once synced.
@@ -179,6 +183,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
// keep track of when our fork_id needs to be updated
let next_fork_update = Box::pin(next_fork_delay(&beacon_chain).into());
let next_fork_subscriptions = Box::pin(next_fork_subscriptions_delay(&beacon_chain).into());
let next_unsubscribe = Box::pin(None.into());
let current_slot = beacon_chain
@@ -192,6 +197,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
&beacon_chain.spec,
));
debug!(network_log, "Current fork"; "fork_name" => ?fork_context.current_fork());
// launch libp2p service
let (network_globals, mut libp2p) = LibP2PService::new(
executor.clone(),
@@ -254,6 +261,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
upnp_mappings: (None, None),
discovery_auto_update: config.discv5_config.enr_update,
next_fork_update,
next_fork_subscriptions,
next_unsubscribe,
subscribe_all_subnets: config.subscribe_all_subnets,
shutdown_after_sync: config.shutdown_after_sync,
@@ -274,12 +282,23 @@ impl<T: BeaconChainTypes> NetworkService<T> {
/// digests since we should be subscribed to post fork topics before the fork.
pub fn required_gossip_fork_digests(&self) -> Vec<[u8; 4]> {
let fork_context = &self.fork_context;
let spec = &self.beacon_chain.spec;
match fork_context.current_fork() {
ForkName::Base => {
if fork_context.fork_exists(ForkName::Altair) {
fork_context.all_fork_digests()
} else {
vec![fork_context.genesis_context_bytes()]
// If we are SUBSCRIBE_DELAY_SLOTS before the fork slot, subscribe only to Base,
// else subscribe to Base and Altair.
let current_slot = self.beacon_chain.slot().unwrap_or(spec.genesis_slot);
match spec.next_fork_epoch::<T::EthSpec>(current_slot) {
Some((_, fork_epoch)) => {
if current_slot.saturating_add(Slot::new(SUBSCRIBE_DELAY_SLOTS))
>= fork_epoch.start_slot(T::EthSpec::slots_per_epoch())
{
fork_context.all_fork_digests()
} else {
vec![fork_context.genesis_context_bytes()]
}
}
None => vec![fork_context.genesis_context_bytes()],
}
}
ForkName::Altair => vec![fork_context
@@ -619,6 +638,7 @@ fn spawn_service<T: BeaconChainTypes>(
} => {
// Update prometheus metrics.
metrics::expose_receive_metrics(&message);
match message {
// attestation information gets processed in the attestation service
PubsubMessage::Attestation(ref subnet_and_attestation) => {
@@ -671,7 +691,7 @@ fn spawn_service<T: BeaconChainTypes>(
if let Some(new_fork_name) = fork_context.from_context_bytes(new_enr_fork_id.fork_digest) {
info!(
service.log,
"Updating enr fork version";
"Transitioned to new fork";
"old_fork" => ?fork_context.current_fork(),
"new_fork" => ?new_fork_name,
);
@@ -701,6 +721,18 @@ fn spawn_service<T: BeaconChainTypes>(
info!(service.log, "Unsubscribed from old fork topics");
service.next_unsubscribe = Box::pin(None.into());
}
Some(_) = &mut service.next_fork_subscriptions => {
if let Some((fork_name, _)) = service.beacon_chain.duration_to_next_fork() {
let fork_version = service.beacon_chain.spec.fork_version_for_name(fork_name);
let fork_digest = ChainSpec::compute_fork_digest(fork_version, service.beacon_chain.genesis_validators_root);
info!(service.log, "Subscribing to new fork topics");
service.libp2p.swarm.behaviour_mut().subscribe_new_fork_topics(fork_digest);
}
else {
error!(service.log, "Fork subscription scheduled but no fork scheduled");
}
service.next_fork_subscriptions = Box::pin(next_fork_subscriptions_delay(&service.beacon_chain).into());
}
}
metrics::update_bandwidth_metrics(service.libp2p.bandwidth.clone());
}
@@ -717,6 +749,22 @@ fn next_fork_delay<T: BeaconChainTypes>(
.map(|(_, until_fork)| tokio::time::sleep(until_fork))
}
/// Returns a `Sleep` that triggers `SUBSCRIBE_DELAY_SLOTS` before the next fork.
/// Returns `None` if there are no scheduled forks or we are already past `current_slot + SUBSCRIBE_DELAY_SLOTS > fork_slot`.
fn next_fork_subscriptions_delay<T: BeaconChainTypes>(
beacon_chain: &BeaconChain<T>,
) -> Option<tokio::time::Sleep> {
if let Some((_, duration_to_fork)) = beacon_chain.duration_to_next_fork() {
let duration_to_subscription = duration_to_fork.saturating_sub(Duration::from_secs(
beacon_chain.spec.seconds_per_slot * SUBSCRIBE_DELAY_SLOTS,
));
if !duration_to_subscription.is_zero() {
return Some(tokio::time::sleep(duration_to_subscription));
}
}
None
}
impl<T: BeaconChainTypes> Drop for NetworkService<T> {
fn drop(&mut self) {
// network thread is terminating