mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-09 19:51:47 +00:00
Resolves #6767 This PR implements a basic version of validator custody. - It introduces a new `CustodyContext` object which contains info regarding number of validators attached to a node and the custody count they contribute to the cgc. - The `CustodyContext` is added in the da_checker and has methods for returning the current cgc and the number of columns to sample at head. Note that the logic for returning the cgc existed previously in the network globals. - To estimate the number of validators attached, we use the `beacon_committee_subscriptions` endpoint. This might overestimate the number of validators actually publishing attestations from the node in the case of multi BN setups. We could also potentially use the `publish_attestations` endpoint to get a more conservative estimate at a later point. - Anytime there's a change in the `custody_group_count` due to addition/removal of validators, the custody context should send an event on a broadcast channnel. The only subscriber for the channel exists in the network service which simply subscribes to more subnets. There can be additional subscribers in sync that will start a backfill once the cgc changes. TODO - [ ] **NOT REQUIRED:** Currently, the logic only handles an increase in validator count and does not handle a decrease. We should ideally unsubscribe from subnets when the cgc has decreased. - [ ] **NOT REQUIRED:** Add a service in the `CustodyContext` that emits an event once `MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS ` passes after updating the current cgc. This event should be picked up by a subscriber which updates the enr and metadata. - [x] Add more tests
279 lines
8.8 KiB
Rust
279 lines
8.8 KiB
Rust
#![cfg(test)]
|
|
use lighthouse_network::service::Network as LibP2PService;
|
|
use lighthouse_network::Enr;
|
|
use lighthouse_network::EnrExt;
|
|
use lighthouse_network::Multiaddr;
|
|
use lighthouse_network::{NetworkConfig, NetworkEvent};
|
|
use std::sync::Arc;
|
|
use std::sync::Weak;
|
|
use tokio::runtime::Runtime;
|
|
use tracing::{debug, error, info_span, Instrument};
|
|
use tracing_subscriber::EnvFilter;
|
|
use types::{
|
|
ChainSpec, EnrForkId, Epoch, EthSpec, FixedBytesExtended, ForkContext, ForkName, Hash256,
|
|
MinimalEthSpec, Slot,
|
|
};
|
|
|
|
type E = MinimalEthSpec;
|
|
|
|
use lighthouse_network::rpc::config::InboundRateLimiterConfig;
|
|
use tempfile::Builder as TempBuilder;
|
|
|
|
/// Returns a dummy fork context
|
|
pub fn fork_context(fork_name: ForkName) -> ForkContext {
|
|
let mut chain_spec = E::default_spec();
|
|
let altair_fork_epoch = Epoch::new(1);
|
|
let bellatrix_fork_epoch = Epoch::new(2);
|
|
let capella_fork_epoch = Epoch::new(3);
|
|
let deneb_fork_epoch = Epoch::new(4);
|
|
let electra_fork_epoch = Epoch::new(5);
|
|
let fulu_fork_epoch = Epoch::new(6);
|
|
|
|
chain_spec.altair_fork_epoch = Some(altair_fork_epoch);
|
|
chain_spec.bellatrix_fork_epoch = Some(bellatrix_fork_epoch);
|
|
chain_spec.capella_fork_epoch = Some(capella_fork_epoch);
|
|
chain_spec.deneb_fork_epoch = Some(deneb_fork_epoch);
|
|
chain_spec.electra_fork_epoch = Some(electra_fork_epoch);
|
|
chain_spec.fulu_fork_epoch = Some(fulu_fork_epoch);
|
|
|
|
let current_slot = match fork_name {
|
|
ForkName::Base => Slot::new(0),
|
|
ForkName::Altair => altair_fork_epoch.start_slot(E::slots_per_epoch()),
|
|
ForkName::Bellatrix => bellatrix_fork_epoch.start_slot(E::slots_per_epoch()),
|
|
ForkName::Capella => capella_fork_epoch.start_slot(E::slots_per_epoch()),
|
|
ForkName::Deneb => deneb_fork_epoch.start_slot(E::slots_per_epoch()),
|
|
ForkName::Electra => electra_fork_epoch.start_slot(E::slots_per_epoch()),
|
|
ForkName::Fulu => fulu_fork_epoch.start_slot(E::slots_per_epoch()),
|
|
};
|
|
ForkContext::new::<E>(current_slot, Hash256::zero(), &chain_spec)
|
|
}
|
|
|
|
pub struct Libp2pInstance(
|
|
LibP2PService<E>,
|
|
#[allow(dead_code)]
|
|
// This field is managed for lifetime purposes may not be used directly, hence the `#[allow(dead_code)]` attribute.
|
|
async_channel::Sender<()>,
|
|
);
|
|
|
|
impl std::ops::Deref for Libp2pInstance {
|
|
type Target = LibP2PService<E>;
|
|
fn deref(&self) -> &Self::Target {
|
|
&self.0
|
|
}
|
|
}
|
|
|
|
impl std::ops::DerefMut for Libp2pInstance {
|
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
|
&mut self.0
|
|
}
|
|
}
|
|
|
|
#[allow(unused)]
|
|
pub fn build_tracing_subscriber(level: &str, enabled: bool) {
|
|
if enabled {
|
|
tracing_subscriber::fmt()
|
|
.with_env_filter(EnvFilter::try_new(level).unwrap())
|
|
.try_init()
|
|
.unwrap();
|
|
}
|
|
}
|
|
|
|
pub fn build_config(
|
|
mut boot_nodes: Vec<Enr>,
|
|
disable_peer_scoring: bool,
|
|
inbound_rate_limiter: Option<InboundRateLimiterConfig>,
|
|
) -> Arc<NetworkConfig> {
|
|
let mut config = NetworkConfig::default();
|
|
|
|
// Find unused ports by using the 0 port.
|
|
let port = 0;
|
|
|
|
let random_path: u16 = rand::random();
|
|
let path = TempBuilder::new()
|
|
.prefix(&format!("libp2p_test_{}", random_path))
|
|
.tempdir()
|
|
.unwrap();
|
|
|
|
config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, port, port, port);
|
|
config.enr_address = (Some(std::net::Ipv4Addr::LOCALHOST), None);
|
|
config.boot_nodes_enr.append(&mut boot_nodes);
|
|
config.network_dir = path.into_path();
|
|
config.disable_peer_scoring = disable_peer_scoring;
|
|
config.inbound_rate_limiter_config = inbound_rate_limiter;
|
|
Arc::new(config)
|
|
}
|
|
|
|
pub async fn build_libp2p_instance(
|
|
rt: Weak<Runtime>,
|
|
boot_nodes: Vec<Enr>,
|
|
fork_name: ForkName,
|
|
chain_spec: Arc<ChainSpec>,
|
|
service_name: String,
|
|
disable_peer_scoring: bool,
|
|
inbound_rate_limiter: Option<InboundRateLimiterConfig>,
|
|
) -> Libp2pInstance {
|
|
let config = build_config(boot_nodes, disable_peer_scoring, inbound_rate_limiter);
|
|
// launch libp2p service
|
|
|
|
let (signal, exit) = async_channel::bounded(1);
|
|
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
|
|
let executor = task_executor::TaskExecutor::new(rt, exit, shutdown_tx, service_name);
|
|
let custody_group_count = chain_spec.custody_requirement;
|
|
let libp2p_context = lighthouse_network::Context {
|
|
config,
|
|
enr_fork_id: EnrForkId::default(),
|
|
fork_context: Arc::new(fork_context(fork_name)),
|
|
chain_spec,
|
|
libp2p_registry: None,
|
|
};
|
|
Libp2pInstance(
|
|
LibP2PService::new(executor, libp2p_context, custody_group_count)
|
|
.await
|
|
.expect("should build libp2p instance")
|
|
.0,
|
|
signal,
|
|
)
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
pub fn get_enr(node: &LibP2PService<E>) -> Enr {
|
|
node.local_enr()
|
|
}
|
|
|
|
// Protocol for the node pair connection.
|
|
pub enum Protocol {
|
|
Tcp,
|
|
Quic,
|
|
}
|
|
|
|
// Constructs a pair of nodes with separate loggers. The sender dials the receiver.
|
|
// This returns a (sender, receiver) pair.
|
|
#[allow(dead_code)]
|
|
pub async fn build_node_pair(
|
|
rt: Weak<Runtime>,
|
|
fork_name: ForkName,
|
|
spec: Arc<ChainSpec>,
|
|
protocol: Protocol,
|
|
disable_peer_scoring: bool,
|
|
inbound_rate_limiter: Option<InboundRateLimiterConfig>,
|
|
) -> (Libp2pInstance, Libp2pInstance) {
|
|
let mut sender = build_libp2p_instance(
|
|
rt.clone(),
|
|
vec![],
|
|
fork_name,
|
|
spec.clone(),
|
|
"sender".to_string(),
|
|
disable_peer_scoring,
|
|
inbound_rate_limiter.clone(),
|
|
)
|
|
.await;
|
|
let mut receiver = build_libp2p_instance(
|
|
rt,
|
|
vec![],
|
|
fork_name,
|
|
spec.clone(),
|
|
"receiver".to_string(),
|
|
disable_peer_scoring,
|
|
inbound_rate_limiter,
|
|
)
|
|
.await;
|
|
|
|
// let the two nodes set up listeners
|
|
let sender_fut = async {
|
|
loop {
|
|
if let NetworkEvent::NewListenAddr(addr) = sender.next_event().await {
|
|
// Only end once we've listened on the protocol we care about
|
|
match protocol {
|
|
Protocol::Tcp => {
|
|
if addr.iter().any(|multiaddr_proto| {
|
|
matches!(multiaddr_proto, libp2p::multiaddr::Protocol::Tcp(_))
|
|
}) {
|
|
return addr;
|
|
}
|
|
}
|
|
Protocol::Quic => {
|
|
if addr.iter().any(|multiaddr_proto| {
|
|
matches!(multiaddr_proto, libp2p::multiaddr::Protocol::QuicV1)
|
|
}) {
|
|
return addr;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
.instrument(info_span!("Sender", who = "sender"));
|
|
let receiver_fut = async {
|
|
loop {
|
|
if let NetworkEvent::NewListenAddr(addr) = receiver.next_event().await {
|
|
match protocol {
|
|
Protocol::Tcp => {
|
|
if addr.iter().any(|multiaddr_proto| {
|
|
matches!(multiaddr_proto, libp2p::multiaddr::Protocol::Tcp(_))
|
|
}) {
|
|
return addr;
|
|
}
|
|
}
|
|
Protocol::Quic => {
|
|
if addr.iter().any(|multiaddr_proto| {
|
|
matches!(multiaddr_proto, libp2p::multiaddr::Protocol::QuicV1)
|
|
}) {
|
|
return addr;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
.instrument(info_span!("Receiver", who = "receiver"));
|
|
|
|
let joined = futures::future::join(sender_fut, receiver_fut);
|
|
|
|
let receiver_multiaddr = joined.await.1;
|
|
|
|
match sender.testing_dial(receiver_multiaddr.clone()) {
|
|
Ok(()) => {
|
|
debug!(address = ?receiver_multiaddr, "Sender dialed receiver")
|
|
}
|
|
Err(_) => error!("Dialing failed"),
|
|
};
|
|
(sender, receiver)
|
|
}
|
|
|
|
// Returns `n` peers in a linear topology
|
|
#[allow(dead_code)]
|
|
pub async fn build_linear(
|
|
rt: Weak<Runtime>,
|
|
n: usize,
|
|
fork_name: ForkName,
|
|
spec: Arc<ChainSpec>,
|
|
) -> Vec<Libp2pInstance> {
|
|
let mut nodes = Vec::with_capacity(n);
|
|
for _ in 0..n {
|
|
nodes.push(
|
|
build_libp2p_instance(
|
|
rt.clone(),
|
|
vec![],
|
|
fork_name,
|
|
spec.clone(),
|
|
"linear".to_string(),
|
|
false,
|
|
None,
|
|
)
|
|
.await,
|
|
);
|
|
}
|
|
|
|
let multiaddrs: Vec<Multiaddr> = nodes
|
|
.iter()
|
|
.map(|x| get_enr(x).multiaddr()[1].clone())
|
|
.collect();
|
|
for i in 0..n - 1 {
|
|
match nodes[i].testing_dial(multiaddrs[i + 1].clone()) {
|
|
Ok(()) => debug!("Connected"),
|
|
Err(_) => error!("Failed to connect"),
|
|
};
|
|
}
|
|
nodes
|
|
}
|