Improve single-node testnet support and Arc NetworkConfig/ChainSpec (#6396)

* Arc ChainSpec and NetworkConfig

* Fix release tests

* Fix lint

* Merge remote-tracking branch 'origin/unstable' into single-node-testnet
This commit is contained in:
Michael Sproul
2024-09-24 10:16:18 +10:00
committed by GitHub
parent d84df5799c
commit 1447eeb40b
66 changed files with 340 additions and 250 deletions

View File

@@ -1215,10 +1215,11 @@ mod tests {
}
async fn build_discovery() -> Discovery<E> {
let spec = ChainSpec::default();
let spec = Arc::new(ChainSpec::default());
let keypair = secp256k1::Keypair::generate();
let mut config = NetworkConfig::default();
config.set_listening_addr(crate::ListenAddress::unused_v4_ports());
let config = Arc::new(config);
let enr_key: CombinedKey = CombinedKey::from_secp256k1(&keypair);
let enr: Enr = build_enr::<E>(&enr_key, &config, &EnrForkId::default(), &spec).unwrap();
let log = build_log(slog::Level::Debug, false);
@@ -1232,6 +1233,7 @@ mod tests {
vec![],
false,
&log,
config.clone(),
spec.clone(),
);
let keypair = keypair.into();

View File

@@ -4,9 +4,7 @@ use crate::discovery::enr_ext::EnrExt;
use crate::discovery::peer_id_to_node_id;
use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode};
use crate::service::TARGET_SUBNET_PEERS;
use crate::{error, metrics, Gossipsub};
use crate::{NetworkGlobals, PeerId};
use crate::{Subnet, SubnetDiscovery};
use crate::{error, metrics, Gossipsub, NetworkGlobals, PeerId, Subnet, SubnetDiscovery};
use delay_map::HashSetDelay;
use discv5::Enr;
use libp2p::identify::Info as IdentifyInfo;
@@ -1452,6 +1450,7 @@ enum ConnectingType {
#[cfg(test)]
mod tests {
use super::*;
use crate::NetworkConfig;
use slog::{o, Drain};
use types::MainnetEthSpec as E;
@@ -1468,15 +1467,7 @@ mod tests {
}
async fn build_peer_manager(target_peer_count: usize) -> PeerManager<E> {
let config = config::Config {
target_peer_count,
discovery_enabled: false,
..Default::default()
};
let log = build_log(slog::Level::Debug, false);
let spec = E::default_spec();
let globals = NetworkGlobals::new_test_globals(vec![], &log, spec);
PeerManager::new(config, Arc::new(globals), &log).unwrap()
build_peer_manager_with_trusted_peers(vec![], target_peer_count).await
}
async fn build_peer_manager_with_trusted_peers(
@@ -1488,9 +1479,13 @@ mod tests {
discovery_enabled: false,
..Default::default()
};
let network_config = Arc::new(NetworkConfig {
target_peers: target_peer_count,
..Default::default()
});
let log = build_log(slog::Level::Debug, false);
let spec = E::default_spec();
let globals = NetworkGlobals::new_test_globals(trusted_peers, &log, spec);
let spec = Arc::new(E::default_spec());
let globals = NetworkGlobals::new_test_globals(trusted_peers, &log, network_config, spec);
PeerManager::new(config, Arc::new(globals), &log).unwrap()
}

View File

@@ -166,7 +166,7 @@ impl<E: EthSpec> Network<E> {
&config,
&ctx.enr_fork_id,
&log,
ctx.chain_spec,
&ctx.chain_spec,
)?;
// Construct the metadata
let custody_subnet_count = if ctx.chain_spec.is_peer_das_scheduled() {
@@ -186,6 +186,7 @@ impl<E: EthSpec> Network<E> {
trusted_peers,
config.disable_peer_scoring,
&log,
config.clone(),
ctx.chain_spec.clone(),
);
Arc::new(globals)
@@ -209,7 +210,7 @@ impl<E: EthSpec> Network<E> {
E::slots_per_epoch(),
);
let score_settings = PeerScoreSettings::new(ctx.chain_spec, gs_config.mesh_n());
let score_settings = PeerScoreSettings::new(&ctx.chain_spec, gs_config.mesh_n());
let gossip_cache = {
let slot_duration = std::time::Duration::from_secs(ctx.chain_spec.seconds_per_slot);
@@ -346,7 +347,7 @@ impl<E: EthSpec> Network<E> {
&config,
network_globals.clone(),
&log,
ctx.chain_spec,
&ctx.chain_spec,
)
.await?;
// start searching for peers

View File

@@ -30,10 +30,10 @@ pub const MAX_CONNECTIONS_PER_PEER: u32 = 1;
pub const METADATA_FILENAME: &str = "metadata";
pub struct Context<'a> {
pub config: &'a NetworkConfig,
pub config: Arc<NetworkConfig>,
pub enr_fork_id: EnrForkId,
pub fork_context: Arc<ForkContext>,
pub chain_spec: &'a ChainSpec,
pub chain_spec: Arc<ChainSpec>,
pub libp2p_registry: Option<&'a mut Registry>,
}

View File

@@ -2,12 +2,11 @@
use crate::peer_manager::peerdb::PeerDB;
use crate::rpc::{MetaData, MetaDataV3};
use crate::types::{BackFillState, SyncState};
use crate::Client;
use crate::EnrExt;
use crate::{Enr, GossipTopic, Multiaddr, PeerId};
use crate::{Client, Enr, EnrExt, GossipTopic, Multiaddr, NetworkConfig, PeerId};
use itertools::Itertools;
use parking_lot::RwLock;
use std::collections::HashSet;
use std::sync::Arc;
use types::{ChainSpec, ColumnIndex, DataColumnSubnetId, EthSpec};
pub struct NetworkGlobals<E: EthSpec> {
@@ -30,7 +29,10 @@ pub struct NetworkGlobals<E: EthSpec> {
/// The computed custody subnets and columns is stored to avoid re-computing.
pub custody_subnets: Vec<DataColumnSubnetId>,
pub custody_columns: Vec<ColumnIndex>,
pub spec: ChainSpec,
/// Network-related configuration. Immutable after initialization.
pub config: Arc<NetworkConfig>,
/// Ethereum chain configuration. Immutable after initialization.
pub spec: Arc<ChainSpec>,
}
impl<E: EthSpec> NetworkGlobals<E> {
@@ -40,7 +42,8 @@ impl<E: EthSpec> NetworkGlobals<E> {
trusted_peers: Vec<PeerId>,
disable_peer_scoring: bool,
log: &slog::Logger,
spec: ChainSpec,
config: Arc<NetworkConfig>,
spec: Arc<ChainSpec>,
) -> Self {
let (custody_subnets, custody_columns) = if spec.is_peer_das_scheduled() {
let custody_subnet_count = local_metadata
@@ -75,6 +78,7 @@ impl<E: EthSpec> NetworkGlobals<E> {
backfill_state: RwLock::new(BackFillState::NotRequired),
custody_subnets,
custody_columns,
config,
spec,
}
}
@@ -160,7 +164,8 @@ impl<E: EthSpec> NetworkGlobals<E> {
pub fn new_test_globals(
trusted_peers: Vec<PeerId>,
log: &slog::Logger,
spec: ChainSpec,
config: Arc<NetworkConfig>,
spec: Arc<ChainSpec>,
) -> NetworkGlobals<E> {
let metadata = MetaData::V3(MetaDataV3 {
seq_number: 0,
@@ -168,20 +173,21 @@ impl<E: EthSpec> NetworkGlobals<E> {
syncnets: Default::default(),
custody_subnet_count: spec.custody_requirement,
});
Self::new_test_globals_with_metadata(trusted_peers, metadata, log, spec)
Self::new_test_globals_with_metadata(trusted_peers, metadata, log, config, spec)
}
pub(crate) fn new_test_globals_with_metadata(
trusted_peers: Vec<PeerId>,
metadata: MetaData<E>,
log: &slog::Logger,
spec: ChainSpec,
config: Arc<NetworkConfig>,
spec: Arc<ChainSpec>,
) -> NetworkGlobals<E> {
use crate::CombinedKeyExt;
let keypair = libp2p::identity::secp256k1::Keypair::generate();
let enr_key: discv5::enr::CombinedKey = discv5::enr::CombinedKey::from_secp256k1(&keypair);
let enr = discv5::enr::Enr::builder().build(&enr_key).unwrap();
NetworkGlobals::new(enr, metadata, trusted_peers, false, log, spec)
NetworkGlobals::new(enr, metadata, trusted_peers, false, log, config, spec)
}
}
@@ -198,9 +204,15 @@ mod test {
let custody_subnet_count = spec.data_column_sidecar_subnet_count / 2;
let metadata = get_metadata(custody_subnet_count);
let config = Arc::new(NetworkConfig::default());
let globals =
NetworkGlobals::<E>::new_test_globals_with_metadata(vec![], metadata, &log, spec);
let globals = NetworkGlobals::<E>::new_test_globals_with_metadata(
vec![],
metadata,
&log,
config,
Arc::new(spec),
);
assert_eq!(globals.custody_subnets.len(), custody_subnet_count as usize);
}
@@ -213,9 +225,15 @@ mod test {
let custody_subnet_count = spec.data_column_sidecar_subnet_count / 2;
let custody_columns_count = spec.number_of_columns / 2;
let metadata = get_metadata(custody_subnet_count);
let config = Arc::new(NetworkConfig::default());
let globals =
NetworkGlobals::<E>::new_test_globals_with_metadata(vec![], metadata, &log, spec);
let globals = NetworkGlobals::<E>::new_test_globals_with_metadata(
vec![],
metadata,
&log,
config,
Arc::new(spec),
);
assert_eq!(globals.custody_columns.len(), custody_columns_count);
}

View File

@@ -91,6 +91,14 @@ impl SyncState {
pub fn is_synced(&self) -> bool {
matches!(self, SyncState::Synced | SyncState::BackFillSyncing { .. })
}
/// Returns true if the node is *stalled*, i.e. has no synced peers.
///
/// Usually this state is treated as unsynced, except in some places where we make an exception
/// for single-node testnets where having 0 peers is desired.
pub fn is_stalled(&self) -> bool {
matches!(self, SyncState::Stalled)
}
}
impl std::fmt::Display for SyncState {

View File

@@ -76,7 +76,7 @@ pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger {
}
}
pub fn build_config(mut boot_nodes: Vec<Enr>) -> NetworkConfig {
pub fn build_config(mut boot_nodes: Vec<Enr>) -> Arc<NetworkConfig> {
let mut config = NetworkConfig::default();
// Find unused ports by using the 0 port.
@@ -92,7 +92,7 @@ pub fn build_config(mut boot_nodes: Vec<Enr>) -> NetworkConfig {
config.enr_address = (Some(std::net::Ipv4Addr::LOCALHOST), None);
config.boot_nodes_enr.append(&mut boot_nodes);
config.network_dir = path.into_path();
config
Arc::new(config)
}
pub async fn build_libp2p_instance(
@@ -100,7 +100,7 @@ pub async fn build_libp2p_instance(
boot_nodes: Vec<Enr>,
log: slog::Logger,
fork_name: ForkName,
spec: &ChainSpec,
chain_spec: Arc<ChainSpec>,
) -> Libp2pInstance {
let config = build_config(boot_nodes);
// launch libp2p service
@@ -109,10 +109,10 @@ pub async fn build_libp2p_instance(
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = task_executor::TaskExecutor::new(rt, exit, log.clone(), shutdown_tx);
let libp2p_context = lighthouse_network::Context {
config: &config,
config,
enr_fork_id: EnrForkId::default(),
fork_context: Arc::new(fork_context(fork_name)),
chain_spec: spec,
chain_spec,
libp2p_registry: None,
};
Libp2pInstance(
@@ -142,14 +142,16 @@ pub async fn build_node_pair(
rt: Weak<Runtime>,
log: &slog::Logger,
fork_name: ForkName,
spec: &ChainSpec,
spec: Arc<ChainSpec>,
protocol: Protocol,
) -> (Libp2pInstance, Libp2pInstance) {
let sender_log = log.new(o!("who" => "sender"));
let receiver_log = log.new(o!("who" => "receiver"));
let mut sender = build_libp2p_instance(rt.clone(), vec![], sender_log, fork_name, spec).await;
let mut receiver = build_libp2p_instance(rt, vec![], receiver_log, fork_name, spec).await;
let mut sender =
build_libp2p_instance(rt.clone(), vec![], sender_log, fork_name, spec.clone()).await;
let mut receiver =
build_libp2p_instance(rt, vec![], receiver_log, fork_name, spec.clone()).await;
// let the two nodes set up listeners
let sender_fut = async {
@@ -218,11 +220,13 @@ pub async fn build_linear(
log: slog::Logger,
n: usize,
fork_name: ForkName,
spec: &ChainSpec,
spec: Arc<ChainSpec>,
) -> Vec<Libp2pInstance> {
let mut nodes = Vec::with_capacity(n);
for _ in 0..n {
nodes.push(build_libp2p_instance(rt.clone(), vec![], log.clone(), fork_name, spec).await);
nodes.push(
build_libp2p_instance(rt.clone(), vec![], log.clone(), fork_name, spec.clone()).await,
);
}
let multiaddrs: Vec<Multiaddr> = nodes

View File

@@ -61,7 +61,7 @@ fn test_tcp_status_rpc() {
let log = common::build_log(log_level, enable_logging);
let spec = E::default_spec();
let spec = Arc::new(E::default_spec());
rt.block_on(async {
// get sender/receiver
@@ -69,7 +69,7 @@ fn test_tcp_status_rpc() {
Arc::downgrade(&rt),
&log,
ForkName::Base,
&spec,
spec,
Protocol::Tcp,
)
.await;
@@ -163,7 +163,7 @@ fn test_tcp_blocks_by_range_chunked_rpc() {
let rt = Arc::new(Runtime::new().unwrap());
let spec = E::default_spec();
let spec = Arc::new(E::default_spec());
rt.block_on(async {
// get sender/receiver
@@ -171,7 +171,7 @@ fn test_tcp_blocks_by_range_chunked_rpc() {
Arc::downgrade(&rt),
&log,
ForkName::Bellatrix,
&spec,
spec.clone(),
Protocol::Tcp,
)
.await;
@@ -179,8 +179,6 @@ fn test_tcp_blocks_by_range_chunked_rpc() {
// BlocksByRange Request
let rpc_request = Request::BlocksByRange(BlocksByRangeRequest::new(0, messages_to_send));
let spec = E::default_spec();
// BlocksByRange Response
let full_block = BeaconBlock::Base(BeaconBlockBase::<E>::full(&spec));
let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty());
@@ -300,12 +298,12 @@ fn test_blobs_by_range_chunked_rpc() {
rt.block_on(async {
// get sender/receiver
let spec = E::default_spec();
let spec = Arc::new(E::default_spec());
let (mut sender, mut receiver) = common::build_node_pair(
Arc::downgrade(&rt),
&log,
ForkName::Deneb,
&spec,
spec.clone(),
Protocol::Tcp,
)
.await;
@@ -410,7 +408,7 @@ fn test_tcp_blocks_by_range_over_limit() {
let rt = Arc::new(Runtime::new().unwrap());
let spec = E::default_spec();
let spec = Arc::new(E::default_spec());
rt.block_on(async {
// get sender/receiver
@@ -418,7 +416,7 @@ fn test_tcp_blocks_by_range_over_limit() {
Arc::downgrade(&rt),
&log,
ForkName::Bellatrix,
&spec,
spec.clone(),
Protocol::Tcp,
)
.await;
@@ -502,7 +500,7 @@ fn test_tcp_blocks_by_range_chunked_rpc_terminates_correctly() {
let rt = Arc::new(Runtime::new().unwrap());
let spec = E::default_spec();
let spec = Arc::new(E::default_spec());
rt.block_on(async {
// get sender/receiver
@@ -510,7 +508,7 @@ fn test_tcp_blocks_by_range_chunked_rpc_terminates_correctly() {
Arc::downgrade(&rt),
&log,
ForkName::Base,
&spec,
spec.clone(),
Protocol::Tcp,
)
.await;
@@ -519,7 +517,6 @@ fn test_tcp_blocks_by_range_chunked_rpc_terminates_correctly() {
let rpc_request = Request::BlocksByRange(BlocksByRangeRequest::new(0, messages_to_send));
// BlocksByRange Response
let spec = E::default_spec();
let empty_block = BeaconBlock::empty(&spec);
let empty_signed = SignedBeaconBlock::from_block(empty_block, Signature::empty());
let rpc_response = Response::BlocksByRange(Some(Arc::new(empty_signed)));
@@ -631,7 +628,7 @@ fn test_tcp_blocks_by_range_single_empty_rpc() {
let log = common::build_log(log_level, enable_logging);
let rt = Arc::new(Runtime::new().unwrap());
let spec = E::default_spec();
let spec = Arc::new(E::default_spec());
rt.block_on(async {
// get sender/receiver
@@ -639,7 +636,7 @@ fn test_tcp_blocks_by_range_single_empty_rpc() {
Arc::downgrade(&rt),
&log,
ForkName::Base,
&spec,
spec.clone(),
Protocol::Tcp,
)
.await;
@@ -648,7 +645,6 @@ fn test_tcp_blocks_by_range_single_empty_rpc() {
let rpc_request = Request::BlocksByRange(BlocksByRangeRequest::new(0, 10));
// BlocksByRange Response
let spec = E::default_spec();
let empty_block = BeaconBlock::empty(&spec);
let empty_signed = SignedBeaconBlock::from_block(empty_block, Signature::empty());
let rpc_response = Response::BlocksByRange(Some(Arc::new(empty_signed)));
@@ -739,7 +735,7 @@ fn test_tcp_blocks_by_root_chunked_rpc() {
let messages_to_send = 6;
let log = common::build_log(log_level, enable_logging);
let spec = E::default_spec();
let spec = Arc::new(E::default_spec());
let rt = Arc::new(Runtime::new().unwrap());
// get sender/receiver
@@ -748,7 +744,7 @@ fn test_tcp_blocks_by_root_chunked_rpc() {
Arc::downgrade(&rt),
&log,
ForkName::Bellatrix,
&spec,
spec.clone(),
Protocol::Tcp,
)
.await;
@@ -877,7 +873,7 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() {
let extra_messages_to_send: u64 = 10;
let log = common::build_log(log_level, enable_logging);
let spec = E::default_spec();
let spec = Arc::new(E::default_spec());
let rt = Arc::new(Runtime::new().unwrap());
// get sender/receiver
@@ -886,7 +882,7 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() {
Arc::downgrade(&rt),
&log,
ForkName::Base,
&spec,
spec.clone(),
Protocol::Tcp,
)
.await;
@@ -1016,12 +1012,12 @@ fn goodbye_test(log_level: Level, enable_logging: bool, protocol: Protocol) {
let rt = Arc::new(Runtime::new().unwrap());
let spec = E::default_spec();
let spec = Arc::new(E::default_spec());
// get sender/receiver
rt.block_on(async {
let (mut sender, mut receiver) =
common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base, &spec, protocol)
common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base, spec, protocol)
.await;
// build the sender future