Final changes for fusaka-devnet-2 (#7655)

Closes #7467.

This PR primarily addresses [the P2P changes](https://github.com/ethereum/EIPs/pull/9840) in [fusaka-devnet-2](https://fusaka-devnet-2.ethpandaops.io/). Specifically:

* [the new `nfd` parameter added to the `ENR`](https://github.com/ethereum/EIPs/pull/9840)
* [the modified `compute_fork_digest()` changes for every BPO fork](https://github.com/ethereum/EIPs/pull/9840)

90% of this PR was absolutely hacked together as fast as possible during the Berlinterop as fast as I could while running between Glamsterdam debates. Luckily, it seems to work. But I was unable to be as careful in avoiding bugs as I usually am. I've cleaned up the things *I remember* wanting to come back and have a closer look at. But still working on this.

Progress:
* [x] get it working on `fusaka-devnet-2`
* [ ] [*optional* disconnect from peers with incorrect `nfd` at the fork boundary](https://github.com/ethereum/consensus-specs/pull/4407) - Can be addressed in a future PR if necessary
* [x] first pass clean-up
* [x] fix up all the broken tests
* [x] final self-review
* [x] more thorough review from people more familiar with affected code
This commit is contained in:
ethDreamer
2025-07-10 16:32:58 -05:00
committed by GitHub
parent 3826fe91f4
commit b43e0b446c
26 changed files with 1047 additions and 581 deletions

View File

@@ -37,8 +37,8 @@ use tokio::sync::mpsc;
use tokio::time::Sleep;
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use types::{
ChainSpec, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId,
Unsigned, ValidatorSubscription,
EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId, Unsigned,
ValidatorSubscription,
};
mod tests;
@@ -187,11 +187,11 @@ pub struct NetworkService<T: BeaconChainTypes> {
store: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
/// A collection of global variables, accessible outside of the network service.
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
/// A delay that expires when a new fork takes place.
next_fork_update: Pin<Box<OptionFuture<Sleep>>>,
/// A delay that expires when we need to subscribe to a new fork's topics.
next_fork_subscriptions: Pin<Box<OptionFuture<Sleep>>>,
/// A delay that expires when we need to unsubscribe from old fork topics.
/// A delay that expires when the fork digest changes.
next_digest_update: Pin<Box<OptionFuture<Sleep>>>,
/// A delay that expires when we need to subscribe to a new set of topics.
next_topic_subscriptions: Pin<Box<OptionFuture<Sleep>>>,
/// A delay that expires when we need to unsubscribe from old topics.
next_unsubscribe: Pin<Box<OptionFuture<Sleep>>>,
/// Shutdown beacon node after sync is complete.
shutdown_after_sync: bool,
@@ -250,8 +250,10 @@ impl<T: BeaconChainTypes> NetworkService<T> {
let enr_fork_id = beacon_chain.enr_fork_id();
// keep track of when our fork_id needs to be updated
let next_fork_update = Box::pin(next_fork_delay(&beacon_chain).into());
let next_fork_subscriptions = Box::pin(next_fork_subscriptions_delay(&beacon_chain).into());
let next_digest_update = Box::pin(next_digest_delay(&beacon_chain).into());
// topics change when the fork digest changes
let next_topic_subscriptions =
Box::pin(next_topic_subscriptions_delay(&beacon_chain).into());
let next_unsubscribe = Box::pin(None.into());
let current_slot = beacon_chain
@@ -265,8 +267,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
&beacon_chain.spec,
));
debug!(fork_name = ?fork_context.current_fork(), "Current fork");
// construct the libp2p service context
let service_context = Context {
config: config.clone(),
@@ -346,8 +346,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
router_send,
store,
network_globals: network_globals.clone(),
next_fork_update,
next_fork_subscriptions,
next_digest_update,
next_topic_subscriptions,
next_unsubscribe,
shutdown_after_sync: config.shutdown_after_sync,
metrics_enabled: config.metrics_enabled,
@@ -389,29 +389,16 @@ impl<T: BeaconChainTypes> NetworkService<T> {
let fork_context = &self.fork_context;
let spec = &self.beacon_chain.spec;
let current_slot = self.beacon_chain.slot().unwrap_or(spec.genesis_slot);
let current_fork = fork_context.current_fork();
let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch());
let mut result = vec![fork_context
.to_context_bytes(current_fork)
.unwrap_or_else(|| {
panic!(
"{} fork bytes should exist as it's initialized in ForkContext",
current_fork
)
})];
let mut result = vec![fork_context.context_bytes(current_epoch)];
if let Some((next_fork, fork_epoch)) = spec.next_fork_epoch::<T::EthSpec>(current_slot) {
if let Some(next_digest_epoch) = spec.next_digest_epoch(current_epoch) {
if current_slot.saturating_add(Slot::new(SUBSCRIBE_DELAY_SLOTS))
>= fork_epoch.start_slot(T::EthSpec::slots_per_epoch())
>= next_digest_epoch.start_slot(T::EthSpec::slots_per_epoch())
{
let next_fork_context_bytes =
fork_context.to_context_bytes(next_fork).unwrap_or_else(|| {
panic!(
"context bytes should exist as spec.next_fork_epoch({}) returned Some({})",
current_slot, next_fork
)
});
result.push(next_fork_context_bytes);
let next_digest = fork_context.context_bytes(next_digest_epoch);
result.push(next_digest);
}
}
@@ -454,7 +441,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
event = self.libp2p.next_event() => self.on_libp2p_event(event, &mut shutdown_sender).await,
Some(_) = &mut self.next_fork_update => self.update_next_fork(),
Some(_) = &mut self.next_digest_update => self.update_next_fork_digest(),
Some(_) = &mut self.next_unsubscribe => {
let new_enr_fork_id = self.beacon_chain.enr_fork_id();
@@ -463,13 +450,13 @@ impl<T: BeaconChainTypes> NetworkService<T> {
self.next_unsubscribe = Box::pin(None.into());
}
Some(_) = &mut self.next_fork_subscriptions => {
if let Some((fork_name, _)) = self.beacon_chain.duration_to_next_fork() {
let fork_version = self.beacon_chain.spec.fork_version_for_name(fork_name);
let fork_digest = ChainSpec::compute_fork_digest(fork_version, self.beacon_chain.genesis_validators_root);
Some(_) = &mut self.next_topic_subscriptions => {
if let Some((epoch, _)) = self.beacon_chain.duration_to_next_digest() {
let fork_name = self.beacon_chain.spec.fork_name_at_epoch(epoch);
let fork_digest = self.beacon_chain.compute_fork_digest(epoch);
info!("Subscribing to new fork topics");
self.libp2p.subscribe_new_fork_topics(fork_name, fork_digest);
self.next_fork_subscriptions = Box::pin(None.into());
self.next_topic_subscriptions = Box::pin(None.into());
}
else {
error!( "Fork subscription scheduled but no fork scheduled");
@@ -702,7 +689,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
let mut subscribed_topics: Vec<GossipTopic> = vec![];
for topic_kind in core_topics_to_subscribe::<T::EthSpec>(
self.fork_context.current_fork(),
self.fork_context.current_fork_name(),
&self.network_globals.as_topic_config(),
&self.fork_context.spec,
) {
@@ -830,31 +817,52 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}
fn update_next_fork(&mut self) {
fn update_next_fork_digest(&mut self) {
let new_enr_fork_id = self.beacon_chain.enr_fork_id();
// if we are unable to read the slot clock we assume that it is prior to genesis
let current_epoch = self.beacon_chain.epoch().unwrap_or(
self.beacon_chain
.spec
.genesis_slot
.epoch(T::EthSpec::slots_per_epoch()),
);
let new_fork_digest = new_enr_fork_id.fork_digest;
let fork_context = &self.fork_context;
if let Some(new_fork_name) = fork_context.from_context_bytes(new_fork_digest) {
info!(
old_fork = ?fork_context.current_fork(),
new_fork = ?new_fork_name,
"Transitioned to new fork"
);
fork_context.update_current_fork(*new_fork_name);
if let Some(new_fork_name) = fork_context.get_fork_from_context_bytes(new_fork_digest) {
if fork_context.current_fork_name() == *new_fork_name {
info!(
epoch = ?current_epoch,
"BPO Fork Triggered"
)
} else {
info!(
old_fork = ?fork_context.current_fork_name(),
new_fork = ?new_fork_name,
"Transitioned to new fork"
);
}
fork_context.update_current_fork(*new_fork_name, new_fork_digest, current_epoch);
if self.beacon_chain.spec.is_peer_das_scheduled() {
let next_fork_digest = fork_context
.next_fork_digest()
.unwrap_or_else(|| fork_context.current_fork_digest());
self.libp2p.update_nfd(next_fork_digest);
}
self.libp2p.update_fork_version(new_enr_fork_id);
// Reinitialize the next_fork_update
self.next_fork_update = Box::pin(next_fork_delay(&self.beacon_chain).into());
self.next_digest_update = Box::pin(next_digest_delay(&self.beacon_chain).into());
// Set the next_unsubscribe delay.
let epoch_duration =
self.beacon_chain.spec.seconds_per_slot * T::EthSpec::slots_per_epoch();
let unsubscribe_delay = Duration::from_secs(UNSUBSCRIBE_DELAY_EPOCHS * epoch_duration);
// Update the `next_fork_subscriptions` timer if the next fork is known.
self.next_fork_subscriptions =
Box::pin(next_fork_subscriptions_delay(&self.beacon_chain).into());
// Update the `next_topic_subscriptions` timer if the next change in the fork digest is known.
self.next_topic_subscriptions =
Box::pin(next_topic_subscriptions_delay(&self.beacon_chain).into());
self.next_unsubscribe = Box::pin(Some(tokio::time::sleep(unsubscribe_delay)).into());
info!(
remaining_epochs = UNSUBSCRIBE_DELAY_EPOCHS,
@@ -871,7 +879,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
fn subscribed_core_topics(&self) -> bool {
let core_topics = core_topics_to_subscribe::<T::EthSpec>(
self.fork_context.current_fork(),
self.fork_context.current_fork_name(),
&self.network_globals.as_topic_config(),
&self.fork_context.spec,
);
@@ -884,23 +892,23 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}
/// Returns a `Sleep` that triggers after the next change in the beacon chain fork version.
/// Returns a `Sleep` that triggers after the next change in the fork digest.
/// If there is no scheduled fork, `None` is returned.
fn next_fork_delay<T: BeaconChainTypes>(
fn next_digest_delay<T: BeaconChainTypes>(
beacon_chain: &BeaconChain<T>,
) -> Option<tokio::time::Sleep> {
beacon_chain
.duration_to_next_fork()
.map(|(_, until_fork)| tokio::time::sleep(until_fork))
.duration_to_next_digest()
.map(|(_, until_epoch)| tokio::time::sleep(until_epoch))
}
/// Returns a `Sleep` that triggers `SUBSCRIBE_DELAY_SLOTS` before the next fork.
/// Returns a `Sleep` that triggers `SUBSCRIBE_DELAY_SLOTS` before the next fork digest changes.
/// Returns `None` if there are no scheduled forks or we are already past `current_slot + SUBSCRIBE_DELAY_SLOTS > fork_slot`.
fn next_fork_subscriptions_delay<T: BeaconChainTypes>(
fn next_topic_subscriptions_delay<T: BeaconChainTypes>(
beacon_chain: &BeaconChain<T>,
) -> Option<tokio::time::Sleep> {
if let Some((_, duration_to_fork)) = beacon_chain.duration_to_next_fork() {
let duration_to_subscription = duration_to_fork.saturating_sub(Duration::from_secs(
if let Some((_, duration_to_epoch)) = beacon_chain.duration_to_next_digest() {
let duration_to_subscription = duration_to_epoch.saturating_sub(Duration::from_secs(
beacon_chain.spec.seconds_per_slot * SUBSCRIBE_DELAY_SLOTS,
));
if !duration_to_subscription.is_zero() {

View File

@@ -11,7 +11,7 @@ use lighthouse_network::{Enr, GossipTopic};
use std::str::FromStr;
use std::sync::Arc;
use tokio::runtime::Runtime;
use types::{Epoch, EthSpec, ForkName, MinimalEthSpec, SubnetId};
use types::{Epoch, EthSpec, MinimalEthSpec, SubnetId};
impl<T: BeaconChainTypes> NetworkService<T> {
fn get_topic_params(&self, topic: GossipTopic) -> Option<&gossipsub::TopicScoreParams> {
@@ -106,8 +106,8 @@ fn test_removing_topic_weight_on_old_topics() {
.mock_execution_layer()
.build()
.chain;
let (next_fork_name, _) = beacon_chain.duration_to_next_fork().expect("next fork");
assert_eq!(next_fork_name, ForkName::Capella);
let (next_fork_epoch, _) = beacon_chain.duration_to_next_digest().expect("next fork");
assert_eq!(Some(next_fork_epoch), spec.capella_fork_epoch);
// Build network service.
let (mut network_service, network_globals, _network_senders) = runtime.block_on(async {
@@ -189,9 +189,8 @@ fn test_removing_topic_weight_on_old_topics() {
beacon_chain.slot_clock.advance_slot();
}
// Run `NetworkService::update_next_fork()`.
runtime.block_on(async {
network_service.update_next_fork();
network_service.update_next_fork_digest();
});
// Check that topic_weight on the old topics has been zeroed.

View File

@@ -264,7 +264,7 @@ pub fn spawn<T: BeaconChainTypes>(
fork_context: Arc<ForkContext>,
) {
assert!(
beacon_chain.spec.max_request_blocks(fork_context.current_fork()) as u64 >= T::EthSpec::slots_per_epoch() * EPOCHS_PER_BATCH,
beacon_chain.spec.max_request_blocks(fork_context.current_fork_name()) as u64 >= T::EthSpec::slots_per_epoch() * EPOCHS_PER_BATCH,
"Max blocks that can be requested in a single batch greater than max allowed blocks in a single request"
);

View File

@@ -879,7 +879,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request: RequestType::DataColumnsByRoot(
request
.clone()
.try_into_request(self.fork_context.current_fork(), &self.chain.spec)?,
.try_into_request(self.fork_context.current_fork_name(), &self.chain.spec)?,
),
app_request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(id)),
})?;