mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-17 20:02:43 +00:00
Merge remote-tracking branch 'origin/unstable' into tree-states-update
This commit is contained in:
@@ -3,231 +3,58 @@
|
||||
//! Currently supported strategies:
|
||||
//! - UPnP
|
||||
|
||||
use crate::{NetworkConfig, NetworkMessage};
|
||||
use if_addrs::get_if_addrs;
|
||||
use slog::{debug, info};
|
||||
use std::net::{IpAddr, SocketAddr, SocketAddrV4};
|
||||
use tokio::sync::mpsc;
|
||||
use types::EthSpec;
|
||||
use anyhow::{bail, Context, Error};
|
||||
use igd_next::{aio::tokio as igd, PortMappingProtocol};
|
||||
use slog::debug;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
use std::time::Duration;
|
||||
use tokio::time::sleep;
|
||||
|
||||
/// Configuration required to construct the UPnP port mappings.
|
||||
pub struct UPnPConfig {
|
||||
/// The local TCP port.
|
||||
tcp_port: u16,
|
||||
/// The local UDP discovery port.
|
||||
disc_port: u16,
|
||||
/// The local UDP quic port.
|
||||
quic_port: u16,
|
||||
/// Whether discovery is enabled or not.
|
||||
disable_discovery: bool,
|
||||
/// Whether quic is enabled or not.
|
||||
disable_quic_support: bool,
|
||||
}
|
||||
/// The duration in seconds of a port mapping on the gateway.
|
||||
const MAPPING_DURATION: u32 = 3600;
|
||||
|
||||
/// Contains mappings that managed to be established.
|
||||
#[derive(Default, Debug)]
|
||||
pub struct EstablishedUPnPMappings {
|
||||
/// A TCP port mapping for libp2p.
|
||||
pub tcp_port: Option<u16>,
|
||||
/// A UDP port for the QUIC libp2p transport.
|
||||
pub udp_quic_port: Option<u16>,
|
||||
/// A UDP port for discv5.
|
||||
pub udp_disc_port: Option<u16>,
|
||||
}
|
||||
/// Renew the Mapping every half of `MAPPING_DURATION` to avoid the port being unmapped.
|
||||
const MAPPING_TIMEOUT: u64 = MAPPING_DURATION as u64 / 2;
|
||||
|
||||
impl EstablishedUPnPMappings {
|
||||
/// Returns true if at least one value is set.
|
||||
pub fn is_some(&self) -> bool {
|
||||
self.tcp_port.is_some() || self.udp_quic_port.is_some() || self.udp_disc_port.is_some()
|
||||
}
|
||||
|
||||
// Iterator over the UDP ports
|
||||
pub fn udp_ports(&self) -> impl Iterator<Item = &u16> {
|
||||
self.udp_quic_port.iter().chain(self.udp_disc_port.iter())
|
||||
}
|
||||
}
|
||||
|
||||
impl UPnPConfig {
|
||||
pub fn from_config(config: &NetworkConfig) -> Option<Self> {
|
||||
config.listen_addrs().v4().map(|v4_addr| UPnPConfig {
|
||||
tcp_port: v4_addr.tcp_port,
|
||||
disc_port: v4_addr.disc_port,
|
||||
quic_port: v4_addr.quic_port,
|
||||
disable_discovery: config.disable_discovery,
|
||||
disable_quic_support: config.disable_quic_support,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to construct external port mappings with UPnP.
|
||||
pub fn construct_upnp_mappings<T: EthSpec>(
|
||||
config: UPnPConfig,
|
||||
network_send: mpsc::UnboundedSender<NetworkMessage<T>>,
|
||||
/// Attempts to map Discovery external port mappings with UPnP.
|
||||
pub async fn construct_upnp_mappings(
|
||||
addr: Ipv4Addr,
|
||||
port: u16,
|
||||
log: slog::Logger,
|
||||
) {
|
||||
info!(log, "UPnP Attempting to initialise routes");
|
||||
match igd_next::search_gateway(Default::default()) {
|
||||
Err(e) => info!(log, "UPnP not available"; "error" => %e),
|
||||
Ok(gateway) => {
|
||||
// Need to find the local listening address matched with the router subnet
|
||||
let interfaces = match get_if_addrs() {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
info!(log, "UPnP failed to get local interfaces"; "error" => %e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let local_ip = interfaces.iter().find_map(|interface| {
|
||||
// Just use the first IP of the first interface that is not a loopback and not an
|
||||
// ipv6 address.
|
||||
if !interface.is_loopback() {
|
||||
interface.ip().is_ipv4().then(|| interface.ip())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
) -> Result<(), Error> {
|
||||
let gateway = igd::search_gateway(Default::default())
|
||||
.await
|
||||
.context("Gateway does not support UPnP")?;
|
||||
|
||||
let local_ip = match local_ip {
|
||||
None => {
|
||||
info!(log, "UPnP failed to find local IP address");
|
||||
return;
|
||||
}
|
||||
Some(v) => v,
|
||||
};
|
||||
let external_address = gateway
|
||||
.get_external_ip()
|
||||
.await
|
||||
.context("Could not access gateway's external ip")?;
|
||||
|
||||
debug!(log, "UPnP Local IP Discovered"; "ip" => ?local_ip);
|
||||
|
||||
let mut mappings = EstablishedUPnPMappings::default();
|
||||
|
||||
match local_ip {
|
||||
IpAddr::V4(address) => {
|
||||
let libp2p_socket = SocketAddrV4::new(address, config.tcp_port);
|
||||
let external_ip = gateway.get_external_ip();
|
||||
// We add specific port mappings rather than getting the router to arbitrary assign
|
||||
// one.
|
||||
// I've found this to be more reliable. If multiple users are behind a single
|
||||
// router, they should ideally try to set different port numbers.
|
||||
mappings.tcp_port = add_port_mapping(
|
||||
&gateway,
|
||||
igd_next::PortMappingProtocol::TCP,
|
||||
libp2p_socket,
|
||||
"tcp",
|
||||
&log,
|
||||
).map(|_| {
|
||||
let external_socket = external_ip.as_ref().map(|ip| SocketAddr::new(*ip, config.tcp_port)).map_err(|_| ());
|
||||
info!(log, "UPnP TCP route established"; "external_socket" => format!("{}:{}", external_socket.as_ref().map(|ip| ip.to_string()).unwrap_or_else(|_| "".into()), config.tcp_port));
|
||||
config.tcp_port
|
||||
}).ok();
|
||||
|
||||
let set_udp_mapping = |udp_port| {
|
||||
let udp_socket = SocketAddrV4::new(address, udp_port);
|
||||
add_port_mapping(
|
||||
&gateway,
|
||||
igd_next::PortMappingProtocol::UDP,
|
||||
udp_socket,
|
||||
"udp",
|
||||
&log,
|
||||
).map(|_| {
|
||||
info!(log, "UPnP UDP route established"; "external_socket" => format!("{}:{}", external_ip.as_ref().map(|ip| ip.to_string()).unwrap_or_else(|_| "".into()), udp_port));
|
||||
})
|
||||
};
|
||||
|
||||
// Set the discovery UDP port mapping
|
||||
if !config.disable_discovery && set_udp_mapping(config.disc_port).is_ok() {
|
||||
mappings.udp_disc_port = Some(config.disc_port);
|
||||
}
|
||||
|
||||
// Set the quic UDP port mapping
|
||||
if !config.disable_quic_support && set_udp_mapping(config.quic_port).is_ok() {
|
||||
mappings.udp_quic_port = Some(config.quic_port);
|
||||
}
|
||||
|
||||
// report any updates to the network service.
|
||||
if mappings.is_some() {
|
||||
network_send.send(NetworkMessage::UPnPMappingEstablished{ mappings })
|
||||
.unwrap_or_else(|e| debug!(log, "Could not send message to the network service"; "error" => %e));
|
||||
}
|
||||
}
|
||||
_ => debug!(log, "UPnP no routes constructed. IPv6 not supported"),
|
||||
}
|
||||
}
|
||||
let is_private = match external_address {
|
||||
IpAddr::V4(ipv4) => ipv4.is_private(),
|
||||
IpAddr::V6(ipv6) => ipv6.is_loopback() || ipv6.is_unspecified(),
|
||||
};
|
||||
}
|
||||
|
||||
/// Sets up a port mapping for a protocol returning the mapped port if successful.
|
||||
fn add_port_mapping(
|
||||
gateway: &igd_next::Gateway,
|
||||
protocol: igd_next::PortMappingProtocol,
|
||||
socket: SocketAddrV4,
|
||||
protocol_string: &'static str,
|
||||
log: &slog::Logger,
|
||||
) -> Result<(), ()> {
|
||||
// We add specific port mappings rather than getting the router to arbitrary assign
|
||||
// one.
|
||||
// I've found this to be more reliable. If multiple users are behind a single
|
||||
// router, they should ideally try to set different port numbers.
|
||||
let mapping_string = &format!("lighthouse-{}", protocol_string);
|
||||
for _ in 0..2 {
|
||||
match gateway.add_port(
|
||||
protocol,
|
||||
socket.port(),
|
||||
SocketAddr::V4(socket),
|
||||
0,
|
||||
mapping_string,
|
||||
) {
|
||||
Err(e) => {
|
||||
match e {
|
||||
igd_next::AddPortError::PortInUse => {
|
||||
// Try and remove and re-create
|
||||
debug!(log, "UPnP port in use, attempting to remap"; "protocol" => protocol_string, "port" => socket.port());
|
||||
match gateway.remove_port(protocol, socket.port()) {
|
||||
Ok(()) => {
|
||||
debug!(log, "UPnP Removed port mapping"; "protocol" => protocol_string, "port" => socket.port())
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(log, "UPnP Port remove failure"; "protocol" => protocol_string, "port" => socket.port(), "error" => %e);
|
||||
return Err(());
|
||||
}
|
||||
}
|
||||
}
|
||||
e => {
|
||||
info!(log, "UPnP TCP route not set"; "error" => %e);
|
||||
return Err(());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(_) => {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
if is_private {
|
||||
bail!(
|
||||
"Gateway's external address is a private address: {}",
|
||||
external_address
|
||||
);
|
||||
}
|
||||
Err(())
|
||||
}
|
||||
|
||||
/// Removes the specified TCP and UDP port mappings.
|
||||
pub fn remove_mappings(mappings: &EstablishedUPnPMappings, log: &slog::Logger) {
|
||||
if mappings.is_some() {
|
||||
debug!(log, "Removing UPnP port mappings");
|
||||
match igd_next::search_gateway(Default::default()) {
|
||||
Ok(gateway) => {
|
||||
if let Some(tcp_port) = mappings.tcp_port {
|
||||
match gateway.remove_port(igd_next::PortMappingProtocol::TCP, tcp_port) {
|
||||
Ok(()) => debug!(log, "UPnP Removed TCP port mapping"; "port" => tcp_port),
|
||||
Err(e) => {
|
||||
debug!(log, "UPnP Failed to remove TCP port mapping"; "port" => tcp_port, "error" => %e)
|
||||
}
|
||||
}
|
||||
}
|
||||
for udp_port in mappings.udp_ports() {
|
||||
match gateway.remove_port(igd_next::PortMappingProtocol::UDP, *udp_port) {
|
||||
Ok(()) => debug!(log, "UPnP Removed UDP port mapping"; "port" => udp_port),
|
||||
Err(e) => {
|
||||
debug!(log, "UPnP Failed to remove UDP port mapping"; "port" => udp_port, "error" => %e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => debug!(log, "UPnP failed to remove mappings"; "error" => %e),
|
||||
}
|
||||
loop {
|
||||
gateway
|
||||
.add_port(
|
||||
PortMappingProtocol::UDP,
|
||||
port,
|
||||
SocketAddr::new(IpAddr::V4(addr), port),
|
||||
MAPPING_DURATION,
|
||||
"Lighthouse Discovery port",
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("Could not UPnP map port: {} on the gateway", port))?;
|
||||
debug!(log, "Discovery UPnP port mapped"; "port" => %port);
|
||||
sleep(Duration::from_secs(MAPPING_TIMEOUT)).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use super::sync::manager::RequestId as SyncId;
|
||||
use crate::nat::EstablishedUPnPMappings;
|
||||
use crate::nat;
|
||||
use crate::network_beacon_processor::InvalidBlockStorage;
|
||||
use crate::persisted_dht::{clear_dht, load_dht, persist_dht};
|
||||
use crate::router::{Router, RouterMessage};
|
||||
@@ -27,6 +27,7 @@ use lighthouse_network::{
|
||||
MessageId, NetworkEvent, NetworkGlobals, PeerId,
|
||||
};
|
||||
use slog::{crit, debug, error, info, o, trace, warn};
|
||||
use std::collections::BTreeSet;
|
||||
use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration};
|
||||
use store::HotColdDB;
|
||||
use strum::IntoStaticStr;
|
||||
@@ -94,11 +95,6 @@ pub enum NetworkMessage<T: EthSpec> {
|
||||
/// The result of the validation
|
||||
validation_result: MessageAcceptance,
|
||||
},
|
||||
/// Called if UPnP managed to establish an external port mapping.
|
||||
UPnPMappingEstablished {
|
||||
/// The mappings that were established.
|
||||
mappings: EstablishedUPnPMappings,
|
||||
},
|
||||
/// Reports a peer to the peer manager for performing an action.
|
||||
ReportPeer {
|
||||
peer_id: PeerId,
|
||||
@@ -124,7 +120,7 @@ pub enum NetworkMessage<T: EthSpec> {
|
||||
pub enum ValidatorSubscriptionMessage {
|
||||
/// Subscribes a list of validators to specific slots for attestation duties.
|
||||
AttestationSubscribe {
|
||||
subscriptions: Vec<ValidatorSubscription>,
|
||||
subscriptions: BTreeSet<ValidatorSubscription>,
|
||||
},
|
||||
SyncCommitteeSubscribe {
|
||||
subscriptions: Vec<SyncCommitteeSubscription>,
|
||||
@@ -188,9 +184,6 @@ pub struct NetworkService<T: BeaconChainTypes> {
|
||||
store: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
|
||||
/// A collection of global variables, accessible outside of the network service.
|
||||
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
|
||||
/// Stores potentially created UPnP mappings to be removed on shutdown. (TCP port and UDP
|
||||
/// ports).
|
||||
upnp_mappings: EstablishedUPnPMappings,
|
||||
/// A delay that expires when a new fork takes place.
|
||||
next_fork_update: Pin<Box<OptionFuture<Sleep>>>,
|
||||
/// A delay that expires when we need to subscribe to a new fork's topics.
|
||||
@@ -237,22 +230,24 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
"Backfill is disabled. DO NOT RUN IN PRODUCTION"
|
||||
);
|
||||
|
||||
// try and construct UPnP port mappings if required.
|
||||
if let Some(upnp_config) = crate::nat::UPnPConfig::from_config(config) {
|
||||
let upnp_log = network_log.new(o!("service" => "UPnP"));
|
||||
let upnp_network_send = network_senders.network_send();
|
||||
if config.upnp_enabled {
|
||||
executor.spawn_blocking(
|
||||
move || {
|
||||
crate::nat::construct_upnp_mappings(
|
||||
upnp_config,
|
||||
upnp_network_send,
|
||||
upnp_log,
|
||||
)
|
||||
},
|
||||
"UPnP",
|
||||
);
|
||||
}
|
||||
if let (true, false, Some(v4)) = (
|
||||
config.upnp_enabled,
|
||||
config.disable_discovery,
|
||||
config.listen_addrs().v4(),
|
||||
) {
|
||||
let nw = network_log.clone();
|
||||
let v4 = v4.clone();
|
||||
executor.spawn(
|
||||
async move {
|
||||
info!(nw, "UPnP Attempting to initialise routes");
|
||||
if let Err(e) =
|
||||
nat::construct_upnp_mappings(v4.addr, v4.disc_port, nw.clone()).await
|
||||
{
|
||||
info!(nw, "Could not UPnP map Discovery port"; "error" => %e);
|
||||
}
|
||||
},
|
||||
"UPnP",
|
||||
);
|
||||
}
|
||||
|
||||
// get a reference to the beacon chain store
|
||||
@@ -358,7 +353,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
router_send,
|
||||
store,
|
||||
network_globals: network_globals.clone(),
|
||||
upnp_mappings: EstablishedUPnPMappings::default(),
|
||||
next_fork_update,
|
||||
next_fork_subscriptions,
|
||||
next_unsubscribe,
|
||||
@@ -636,21 +630,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
} => {
|
||||
self.libp2p.send_error_response(peer_id, id, error, reason);
|
||||
}
|
||||
NetworkMessage::UPnPMappingEstablished { mappings } => {
|
||||
self.upnp_mappings = mappings;
|
||||
// If there is an external TCP port update, modify our local ENR.
|
||||
if let Some(tcp_port) = self.upnp_mappings.tcp_port {
|
||||
if let Err(e) = self.libp2p.discovery_mut().update_enr_tcp_port(tcp_port) {
|
||||
warn!(self.log, "Failed to update ENR"; "error" => e);
|
||||
}
|
||||
}
|
||||
// If there is an external QUIC port update, modify our local ENR.
|
||||
if let Some(quic_port) = self.upnp_mappings.udp_quic_port {
|
||||
if let Err(e) = self.libp2p.discovery_mut().update_enr_quic_port(quic_port) {
|
||||
warn!(self.log, "Failed to update ENR"; "error" => e);
|
||||
}
|
||||
}
|
||||
}
|
||||
NetworkMessage::ValidationResult {
|
||||
propagation_source,
|
||||
message_id,
|
||||
@@ -805,7 +784,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
ValidatorSubscriptionMessage::AttestationSubscribe { subscriptions } => {
|
||||
if let Err(e) = self
|
||||
.attestation_service
|
||||
.validator_subscriptions(subscriptions)
|
||||
.validator_subscriptions(subscriptions.into_iter())
|
||||
{
|
||||
warn!(self.log, "Attestation validator subscription failed"; "error" => e);
|
||||
}
|
||||
@@ -1009,10 +988,6 @@ impl<T: BeaconChainTypes> Drop for NetworkService<T> {
|
||||
"Saved DHT state";
|
||||
),
|
||||
}
|
||||
|
||||
// attempt to remove port mappings
|
||||
crate::nat::remove_mappings(&self.upnp_mappings, &self.log);
|
||||
|
||||
info!(self.log, "Network service shutdown");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,7 +62,7 @@ mod tests {
|
||||
|
||||
let runtime = Arc::new(Runtime::new().unwrap());
|
||||
|
||||
let (signal, exit) = exit_future::signal();
|
||||
let (signal, exit) = async_channel::bounded(1);
|
||||
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
|
||||
let executor = task_executor::TaskExecutor::new(
|
||||
Arc::downgrade(&runtime),
|
||||
@@ -139,7 +139,7 @@ mod tests {
|
||||
|
||||
// Build network service.
|
||||
let (mut network_service, network_globals, _network_senders) = runtime.block_on(async {
|
||||
let (_, exit) = exit_future::signal();
|
||||
let (_, exit) = async_channel::bounded(1);
|
||||
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
|
||||
let executor = task_executor::TaskExecutor::new(
|
||||
Arc::downgrade(&runtime),
|
||||
|
||||
@@ -196,7 +196,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
/// safely dropped.
|
||||
pub fn validator_subscriptions(
|
||||
&mut self,
|
||||
subscriptions: Vec<ValidatorSubscription>,
|
||||
subscriptions: impl Iterator<Item = ValidatorSubscription>,
|
||||
) -> Result<(), String> {
|
||||
// If the node is in a proposer-only state, we ignore all subnet subscriptions.
|
||||
if self.proposer_only {
|
||||
@@ -227,7 +227,6 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
warn!(self.log,
|
||||
"Failed to compute subnet id for validator subscription";
|
||||
"error" => ?e,
|
||||
"validator_index" => subscription.validator_index
|
||||
);
|
||||
continue;
|
||||
}
|
||||
@@ -257,13 +256,11 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
||||
warn!(self.log,
|
||||
"Subscription to subnet error";
|
||||
"error" => e,
|
||||
"validator_index" => subscription.validator_index,
|
||||
);
|
||||
} else {
|
||||
trace!(self.log,
|
||||
"Subscribed to subnet for aggregator duties";
|
||||
"exact_subnet" => ?exact_subnet,
|
||||
"validator_index" => subscription.validator_index
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -180,14 +180,12 @@ mod attestation_service {
|
||||
use super::*;
|
||||
|
||||
fn get_subscription(
|
||||
validator_index: u64,
|
||||
attestation_committee_index: CommitteeIndex,
|
||||
slot: Slot,
|
||||
committee_count_at_slot: u64,
|
||||
is_aggregator: bool,
|
||||
) -> ValidatorSubscription {
|
||||
ValidatorSubscription {
|
||||
validator_index,
|
||||
attestation_committee_index,
|
||||
slot,
|
||||
committee_count_at_slot,
|
||||
@@ -204,7 +202,6 @@ mod attestation_service {
|
||||
(0..validator_count)
|
||||
.map(|validator_index| {
|
||||
get_subscription(
|
||||
validator_index,
|
||||
validator_index,
|
||||
slot,
|
||||
committee_count_at_slot,
|
||||
@@ -217,7 +214,6 @@ mod attestation_service {
|
||||
#[tokio::test]
|
||||
async fn subscribe_current_slot_wait_for_unsubscribe() {
|
||||
// subscription config
|
||||
let validator_index = 1;
|
||||
let committee_index = 1;
|
||||
// Keep a low subscription slot so that there are no additional subnet discovery events.
|
||||
let subscription_slot = 0;
|
||||
@@ -233,7 +229,6 @@ mod attestation_service {
|
||||
.expect("Could not get current slot");
|
||||
|
||||
let subscriptions = vec![get_subscription(
|
||||
validator_index,
|
||||
committee_index,
|
||||
current_slot + Slot::new(subscription_slot),
|
||||
committee_count,
|
||||
@@ -242,7 +237,7 @@ mod attestation_service {
|
||||
|
||||
// submit the subscriptions
|
||||
attestation_service
|
||||
.validator_subscriptions(subscriptions)
|
||||
.validator_subscriptions(subscriptions.into_iter())
|
||||
.unwrap();
|
||||
|
||||
// not enough time for peer discovery, just subscribe, unsubscribe
|
||||
@@ -293,7 +288,6 @@ mod attestation_service {
|
||||
#[tokio::test]
|
||||
async fn test_same_subnet_unsubscription() {
|
||||
// subscription config
|
||||
let validator_index = 1;
|
||||
let committee_count = 1;
|
||||
let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
|
||||
|
||||
@@ -313,7 +307,6 @@ mod attestation_service {
|
||||
.expect("Could not get current slot");
|
||||
|
||||
let sub1 = get_subscription(
|
||||
validator_index,
|
||||
com1,
|
||||
current_slot + Slot::new(subscription_slot1),
|
||||
committee_count,
|
||||
@@ -321,7 +314,6 @@ mod attestation_service {
|
||||
);
|
||||
|
||||
let sub2 = get_subscription(
|
||||
validator_index,
|
||||
com2,
|
||||
current_slot + Slot::new(subscription_slot2),
|
||||
committee_count,
|
||||
@@ -350,7 +342,7 @@ mod attestation_service {
|
||||
|
||||
// submit the subscriptions
|
||||
attestation_service
|
||||
.validator_subscriptions(vec![sub1, sub2])
|
||||
.validator_subscriptions(vec![sub1, sub2].into_iter())
|
||||
.unwrap();
|
||||
|
||||
// Unsubscription event should happen at slot 2 (since subnet id's are the same, unsubscription event should be at higher slot + 1)
|
||||
@@ -431,7 +423,7 @@ mod attestation_service {
|
||||
|
||||
// submit the subscriptions
|
||||
attestation_service
|
||||
.validator_subscriptions(subscriptions)
|
||||
.validator_subscriptions(subscriptions.into_iter())
|
||||
.unwrap();
|
||||
|
||||
let events = get_events(&mut attestation_service, Some(131), 10).await;
|
||||
@@ -501,7 +493,7 @@ mod attestation_service {
|
||||
|
||||
// submit the subscriptions
|
||||
attestation_service
|
||||
.validator_subscriptions(subscriptions)
|
||||
.validator_subscriptions(subscriptions.into_iter())
|
||||
.unwrap();
|
||||
|
||||
let events = get_events(&mut attestation_service, None, 3).await;
|
||||
@@ -538,7 +530,6 @@ mod attestation_service {
|
||||
#[tokio::test]
|
||||
async fn test_subscribe_same_subnet_several_slots_apart() {
|
||||
// subscription config
|
||||
let validator_index = 1;
|
||||
let committee_count = 1;
|
||||
let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
|
||||
|
||||
@@ -558,7 +549,6 @@ mod attestation_service {
|
||||
.expect("Could not get current slot");
|
||||
|
||||
let sub1 = get_subscription(
|
||||
validator_index,
|
||||
com1,
|
||||
current_slot + Slot::new(subscription_slot1),
|
||||
committee_count,
|
||||
@@ -566,7 +556,6 @@ mod attestation_service {
|
||||
);
|
||||
|
||||
let sub2 = get_subscription(
|
||||
validator_index,
|
||||
com2,
|
||||
current_slot + Slot::new(subscription_slot2),
|
||||
committee_count,
|
||||
@@ -595,7 +584,7 @@ mod attestation_service {
|
||||
|
||||
// submit the subscriptions
|
||||
attestation_service
|
||||
.validator_subscriptions(vec![sub1, sub2])
|
||||
.validator_subscriptions(vec![sub1, sub2].into_iter())
|
||||
.unwrap();
|
||||
|
||||
// Unsubscription event should happen at the end of the slot.
|
||||
@@ -668,7 +657,7 @@ mod attestation_service {
|
||||
|
||||
// submit the subscriptions
|
||||
attestation_service
|
||||
.validator_subscriptions(subscriptions)
|
||||
.validator_subscriptions(subscriptions.into_iter())
|
||||
.unwrap();
|
||||
|
||||
// There should only be the same subscriptions as there are in the specification,
|
||||
|
||||
Reference in New Issue
Block a user