mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-09 19:51:47 +00:00
## Issue Addressed NA ## Proposed Changes Reverts #2345 in the interests of getting v1.4.0 out this week. Once we have released that, we can go back to testing this again. ## Additional Info NA
This commit is contained in:
@@ -15,6 +15,7 @@ slog-term = "2.6.0"
|
||||
slog-async = "2.5.0"
|
||||
logging = { path = "../../common/logging" }
|
||||
environment = { path = "../../lighthouse/environment" }
|
||||
discv5 = { version = "0.1.0-beta.3" }
|
||||
|
||||
[dependencies]
|
||||
beacon_chain = { path = "../beacon_chain" }
|
||||
|
||||
@@ -7,8 +7,8 @@ use beacon_chain::{
|
||||
test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType},
|
||||
BeaconChain, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
|
||||
};
|
||||
use discv5::enr::{CombinedKey, EnrBuilder};
|
||||
use environment::{null_logger, Environment, EnvironmentBuilder};
|
||||
use eth2_libp2p::discv5::enr::{CombinedKey, EnrBuilder};
|
||||
use eth2_libp2p::{rpc::methods::MetaData, types::EnrBitfield, MessageId, NetworkGlobals, PeerId};
|
||||
use slot_clock::SlotClock;
|
||||
use std::cmp;
|
||||
|
||||
@@ -27,13 +27,6 @@ pub fn persist_dht<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
store.put_item(&DHT_DB_KEY, &PersistedDht { enrs })
|
||||
}
|
||||
|
||||
/// Attempts to clear any DHT entries.
|
||||
pub fn clear_dht<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
store: Arc<HotColdDB<E, Hot, Cold>>,
|
||||
) -> Result<(), store::Error> {
|
||||
store.hot_db.delete::<PersistedDht>(&DHT_DB_KEY)
|
||||
}
|
||||
|
||||
/// Wrapper around DHT for persistence to disk.
|
||||
pub struct PersistedDht {
|
||||
pub enrs: Vec<Enr>,
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::persisted_dht::{clear_dht, load_dht, persist_dht};
|
||||
use crate::persisted_dht::{load_dht, persist_dht};
|
||||
use crate::router::{Router, RouterMessage};
|
||||
use crate::{
|
||||
attestation_service::{AttServiceMessage, AttestationService},
|
||||
@@ -178,7 +178,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
"Loading peers into the routing table"; "peers" => enrs_to_load.len()
|
||||
);
|
||||
for enr in enrs_to_load {
|
||||
libp2p.swarm.behaviour_mut().add_enr(enr.clone());
|
||||
libp2p.swarm.add_enr(enr.clone());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -251,7 +251,7 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
.map(|gauge| gauge.reset());
|
||||
}
|
||||
metrics::update_gossip_metrics::<T::EthSpec>(
|
||||
&service.libp2p.swarm.behaviour_mut().gs(),
|
||||
&service.libp2p.swarm.gs(),
|
||||
&service.network_globals,
|
||||
);
|
||||
// update sync metrics
|
||||
@@ -287,7 +287,8 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
})
|
||||
)
|
||||
}).unwrap_or(None) {
|
||||
if service.libp2p.swarm.behaviour_mut().update_gossipsub_parameters(active_validators, slot).is_err() {
|
||||
if (*service.libp2p.swarm)
|
||||
.update_gossipsub_parameters(active_validators, slot).is_err() {
|
||||
error!(
|
||||
service.log,
|
||||
"Failed to update gossipsub parameters";
|
||||
@@ -313,7 +314,7 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
service.upnp_mappings = (tcp_socket.map(|s| s.port()), udp_socket.map(|s| s.port()));
|
||||
// If there is an external TCP port update, modify our local ENR.
|
||||
if let Some(tcp_socket) = tcp_socket {
|
||||
if let Err(e) = service.libp2p.swarm.behaviour_mut().peer_manager().discovery_mut().update_enr_tcp_port(tcp_socket.port()) {
|
||||
if let Err(e) = service.libp2p.swarm.peer_manager().discovery_mut().update_enr_tcp_port(tcp_socket.port()) {
|
||||
warn!(service.log, "Failed to update ENR"; "error" => e);
|
||||
}
|
||||
}
|
||||
@@ -321,7 +322,7 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
// UPnP mappings
|
||||
if !service.discovery_auto_update {
|
||||
if let Some(udp_socket) = udp_socket {
|
||||
if let Err(e) = service.libp2p.swarm.behaviour_mut().peer_manager().discovery_mut().update_enr_udp_socket(udp_socket) {
|
||||
if let Err(e) = service.libp2p.swarm.peer_manager().discovery_mut().update_enr_udp_socket(udp_socket) {
|
||||
warn!(service.log, "Failed to update ENR"; "error" => e);
|
||||
}
|
||||
}
|
||||
@@ -340,7 +341,6 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
service
|
||||
.libp2p
|
||||
.swarm
|
||||
.behaviour_mut()
|
||||
.report_message_validation_result(
|
||||
&propagation_source, message_id, validation_result
|
||||
);
|
||||
@@ -359,7 +359,7 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
"topics" => ?topic_kinds
|
||||
);
|
||||
metrics::expose_publish_metrics(&messages);
|
||||
service.libp2p.swarm.behaviour_mut().publish(messages);
|
||||
service.libp2p.swarm.publish(messages);
|
||||
}
|
||||
NetworkMessage::ReportPeer { peer_id, action, source } => service.libp2p.report_peer(&peer_id, action, source),
|
||||
NetworkMessage::GoodbyePeer { peer_id, reason, source } => service.libp2p.goodbye_peer(&peer_id, reason, source),
|
||||
@@ -375,7 +375,7 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
let already_subscribed = service.network_globals.gossipsub_subscriptions.read().clone();
|
||||
let already_subscribed = already_subscribed.iter().map(|x| x.kind()).collect::<std::collections::HashSet<_>>();
|
||||
for topic_kind in eth2_libp2p::types::CORE_TOPICS.iter().filter(|topic| already_subscribed.get(topic).is_none()) {
|
||||
if service.libp2p.swarm.behaviour_mut().subscribe_kind(topic_kind.clone()) {
|
||||
if service.libp2p.swarm.subscribe_kind(topic_kind.clone()) {
|
||||
subscribed_topics.push(topic_kind.clone());
|
||||
} else {
|
||||
warn!(service.log, "Could not subscribe to topic"; "topic" => %topic_kind);
|
||||
@@ -387,9 +387,9 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() {
|
||||
let subnet_id = SubnetId::new(subnet_id);
|
||||
let topic_kind = eth2_libp2p::types::GossipKind::Attestation(subnet_id);
|
||||
if service.libp2p.swarm.behaviour_mut().subscribe_kind(topic_kind.clone()) {
|
||||
if service.libp2p.swarm.subscribe_kind(topic_kind.clone()) {
|
||||
// Update the ENR bitfield.
|
||||
service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet_id, true);
|
||||
service.libp2p.swarm.update_enr_subnet(subnet_id, true);
|
||||
subscribed_topics.push(topic_kind.clone());
|
||||
} else {
|
||||
warn!(service.log, "Could not subscribe to topic"; "topic" => %topic_kind);
|
||||
@@ -407,19 +407,19 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
Some(attestation_service_message) = service.attestation_service.next() => {
|
||||
match attestation_service_message {
|
||||
AttServiceMessage::Subscribe(subnet_id) => {
|
||||
service.libp2p.swarm.behaviour_mut().subscribe_to_subnet(subnet_id);
|
||||
service.libp2p.swarm.subscribe_to_subnet(subnet_id);
|
||||
}
|
||||
AttServiceMessage::Unsubscribe(subnet_id) => {
|
||||
service.libp2p.swarm.behaviour_mut().unsubscribe_from_subnet(subnet_id);
|
||||
service.libp2p.swarm.unsubscribe_from_subnet(subnet_id);
|
||||
}
|
||||
AttServiceMessage::EnrAdd(subnet_id) => {
|
||||
service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet_id, true);
|
||||
service.libp2p.swarm.update_enr_subnet(subnet_id, true);
|
||||
}
|
||||
AttServiceMessage::EnrRemove(subnet_id) => {
|
||||
service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet_id, false);
|
||||
service.libp2p.swarm.update_enr_subnet(subnet_id, false);
|
||||
}
|
||||
AttServiceMessage::DiscoverPeers(subnets_to_discover) => {
|
||||
service.libp2p.swarm.behaviour_mut().discover_subnet_peers(subnets_to_discover);
|
||||
service.libp2p.swarm.discover_subnet_peers(subnets_to_discover);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -541,7 +541,6 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
service
|
||||
.libp2p
|
||||
.swarm
|
||||
.behaviour_mut()
|
||||
.update_fork_version(service.beacon_chain.enr_fork_id());
|
||||
service.next_fork_update = next_fork_delay(&service.beacon_chain);
|
||||
}
|
||||
@@ -567,16 +566,12 @@ fn next_fork_delay<T: BeaconChainTypes>(
|
||||
impl<T: BeaconChainTypes> Drop for NetworkService<T> {
|
||||
fn drop(&mut self) {
|
||||
// network thread is terminating
|
||||
let enrs = self.libp2p.swarm.behaviour_mut().enr_entries();
|
||||
let enrs = self.libp2p.swarm.enr_entries();
|
||||
debug!(
|
||||
self.log,
|
||||
"Persisting DHT to store";
|
||||
"Number of peers" => enrs.len(),
|
||||
);
|
||||
if let Err(e) = clear_dht::<T::EthSpec, T::HotStore, T::ColdStore>(self.store.clone()) {
|
||||
error!(self.log, "Failed to clear old DHT entries"; "error" => ?e);
|
||||
}
|
||||
// Still try to update new entries
|
||||
match persist_dht::<T::EthSpec, T::HotStore, T::ColdStore>(self.store.clone(), enrs) {
|
||||
Err(e) => error!(
|
||||
self.log,
|
||||
|
||||
Reference in New Issue
Block a user