diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index d76f24709a..a33508dde9 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -53,7 +53,7 @@ use eth2::types::{ use eth2::{CONSENSUS_VERSION_HEADER, CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER}; use health_metrics::observe::Observe; use lighthouse_network::rpc::methods::MetaData; -use lighthouse_network::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage}; +use lighthouse_network::{types::SyncState, Enr, EnrExt, NetworkGlobals, PeerId, PubsubMessage}; use lighthouse_version::version_with_platform; use logging::SSELoggingComponents; use network::{NetworkMessage, NetworkSenders, ValidatorSubscriptionMessage}; @@ -72,6 +72,7 @@ use std::future::Future; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::PathBuf; use std::pin::Pin; +use std::str::FromStr; use std::sync::Arc; use sysinfo::{System, SystemExt}; use system_health::{observe_nat, observe_system_health_bn}; @@ -3676,7 +3677,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(warp_utils::json::json()) - .and(network_tx_filter) + .and(network_tx_filter.clone()) .and(log_filter.clone()) .then( |not_synced_filter: Result<(), Rejection>, @@ -4122,6 +4123,77 @@ pub fn serve( }, ); + // POST lighthouse/add_peer + let post_lighthouse_add_peer = warp::path("lighthouse") + .and(warp::path("add_peer")) + .and(warp::path::end()) + .and(warp_utils::json::json()) + .and(task_spawner_filter.clone()) + .and(network_globals.clone()) + .and(network_tx_filter.clone()) + .and(log_filter.clone()) + .then( + |request_data: api_types::AdminPeer, + task_spawner: TaskSpawner, + network_globals: Arc>, + network_tx: UnboundedSender>, + log: Logger| { + task_spawner.blocking_json_task(Priority::P0, move || { + let enr = Enr::from_str(&request_data.enr).map_err(|e| { + warp_utils::reject::custom_bad_request(format!("invalid enr error {}", e)) + })?; + info!( + log, + "Adding trusted peer"; + "peer_id" => %enr.peer_id(), + "multiaddr" => ?enr.multiaddr() + ); + network_globals.add_trusted_peer(enr.clone()); + + publish_network_message(&network_tx, NetworkMessage::ConnectTrustedPeer(enr))?; + + Ok(()) + }) + }, + ); + + // POST lighthouse/remove_peer + let post_lighthouse_remove_peer = warp::path("lighthouse") + .and(warp::path("remove_peer")) + .and(warp::path::end()) + .and(warp_utils::json::json()) + .and(task_spawner_filter.clone()) + .and(network_globals.clone()) + .and(network_tx_filter.clone()) + .and(log_filter.clone()) + .then( + |request_data: api_types::AdminPeer, + task_spawner: TaskSpawner, + network_globals: Arc>, + network_tx: UnboundedSender>, + log: Logger| { + task_spawner.blocking_json_task(Priority::P0, move || { + let enr = Enr::from_str(&request_data.enr).map_err(|e| { + warp_utils::reject::custom_bad_request(format!("invalid enr error {}", e)) + })?; + info!( + log, + "Removing trusted peer"; + "peer_id" => %enr.peer_id(), + "multiaddr" => ?enr.multiaddr() + ); + network_globals.remove_trusted_peer(enr.clone()); + + publish_network_message( + &network_tx, + NetworkMessage::DisconnectTrustedPeer(enr), + )?; + + Ok(()) + }) + }, + ); + // POST lighthouse/liveness let post_lighthouse_liveness = warp::path("lighthouse") .and(warp::path("liveness")) @@ -4896,6 +4968,8 @@ pub fn serve( .uor(post_lighthouse_ui_validator_info) .uor(post_lighthouse_finalize) .uor(post_lighthouse_compaction) + .uor(post_lighthouse_add_peer) + .uor(post_lighthouse_remove_peer) .recover(warp_utils::reject::handle_rejection), ), ) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index a1241f4929..6ddd49bfd9 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -5768,6 +5768,27 @@ impl ApiTester { self } + pub async fn test_post_lighthouse_add_remove_peer(self) -> Self { + let trusted_peers = self.ctx.network_globals.as_ref().unwrap().trusted_peers(); + // Check that there aren't any trusted peers on startup + assert!(trusted_peers.is_empty()); + let enr = AdminPeer {enr: "enr:-QESuEDpVVjo8dmDuneRhLnXdIGY3e9NQiaG4sJR3GS-VMQCQDsmBYoQhJRaPeZzPlTsZj2F8v-iV4lKJEYIRIyztqexHodhdHRuZXRziAwAAAAAAAAAhmNsaWVudNiKTGlnaHRob3VzZYw3LjAuMC1iZXRhLjSEZXRoMpDS8Zl_YAAJEAAIAAAAAAAAgmlkgnY0gmlwhIe11XmDaXA2kCoBBPkAOitZAAAAAAAAAAKEcXVpY4IjKYVxdWljNoIjg4lzZWNwMjU2azGhA43ihEr9BUVVnIHIfFqBR3Izs4YRHHPsTqIbUgEb3Hc8iHN5bmNuZXRzD4N0Y3CCIyiEdGNwNoIjgoN1ZHCCIyiEdWRwNoIjgg".to_string()}; + self.client + .post_lighthouse_add_peer(enr.clone()) + .await + .unwrap(); + let trusted_peers = self.ctx.network_globals.as_ref().unwrap().trusted_peers(); + // Should have 1 trusted peer + assert_eq!(trusted_peers.len(), 1); + + self.client.post_lighthouse_remove_peer(enr).await.unwrap(); + let trusted_peers = self.ctx.network_globals.as_ref().unwrap().trusted_peers(); + // Should be empty after removing + assert!(trusted_peers.is_empty()); + + self + } + pub async fn test_post_lighthouse_liveness(self) -> Self { let epoch = self.chain.epoch().unwrap(); let head_state = self.chain.head_beacon_state_cloned(); @@ -7334,6 +7355,8 @@ async fn lighthouse_endpoints() { .test_post_lighthouse_database_reconstruct() .await .test_post_lighthouse_liveness() + .await + .test_post_lighthouse_add_remove_peer() .await; } diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 07c4be7959..6067d52042 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -114,6 +114,7 @@ pub struct PeerManager { metrics_enabled: bool, /// Keeps track of whether the QUIC protocol is enabled or not. quic_enabled: bool, + trusted_peers: HashSet, /// The logger associated with the `PeerManager`. log: slog::Logger, } @@ -195,6 +196,7 @@ impl PeerManager { discovery_enabled, metrics_enabled, quic_enabled, + trusted_peers: Default::default(), log: log.clone(), }) } @@ -894,7 +896,7 @@ impl PeerManager { } // Gracefully disconnects a peer without banning them. - fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { + pub fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { self.events .push(PeerManagerEvent::DisconnectPeer(peer_id, reason)); self.network_globals @@ -943,6 +945,13 @@ impl PeerManager { } } + fn maintain_trusted_peers(&mut self) { + let trusted_peers = self.trusted_peers.clone(); + for trusted_peer in trusted_peers { + self.dial_peer(trusted_peer); + } + } + /// This function checks the status of our current peers and optionally requests a discovery /// query if we need to find more peers to maintain the current number of peers fn maintain_peer_count(&mut self, dialing_peers: usize) { @@ -1234,6 +1243,7 @@ impl PeerManager { fn heartbeat(&mut self) { // Optionally run a discovery query if we need more peers. self.maintain_peer_count(0); + self.maintain_trusted_peers(); // Cleans up the connection state of dialing peers. // Libp2p dials peer-ids, but sometimes the response is from another peer-id or libp2p @@ -1470,6 +1480,14 @@ impl PeerManager { ) }) } + + pub fn add_trusted_peer(&mut self, enr: Enr) { + self.trusted_peers.insert(enr); + } + + pub fn remove_trusted_peer(&mut self, enr: Enr) { + self.trusted_peers.remove(&enr); + } } enum ConnectingType { diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 37cb5df6ea..b692639911 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -9,7 +9,7 @@ use std::net::IpAddr; use std::time::Instant; use std::{cmp::Ordering, fmt::Display}; use std::{ - collections::{HashMap, HashSet}, + collections::{hash_map::Entry, HashMap, HashSet}, fmt::Formatter, }; use sync_status::SyncStatus; @@ -79,6 +79,33 @@ impl PeerDB { self.peers.iter() } + pub fn set_trusted_peer(&mut self, enr: Enr) { + match self.peers.entry(enr.peer_id()) { + Entry::Occupied(mut info) => { + let entry = info.get_mut(); + entry.score = Score::max_score(); + entry.is_trusted = true; + } + Entry::Vacant(entry) => { + entry.insert(PeerInfo::trusted_peer_info()); + } + } + } + + pub fn unset_trusted_peer(&mut self, enr: Enr) { + if let Some(info) = self.peers.get_mut(&enr.peer_id()) { + info.is_trusted = false; + info.score = Score::default(); + } + } + + pub fn trusted_peers(&self) -> Vec { + self.peers + .iter() + .filter_map(|(id, info)| if info.is_trusted { Some(*id) } else { None }) + .collect() + } + /// Gives the ids of all known peers. pub fn peer_ids(&self) -> impl Iterator { self.peers.keys() diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs index 2e8f462565..05a29ac47e 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs @@ -21,7 +21,7 @@ use PeerConnectionStatus::*; #[serde(bound = "E: EthSpec")] pub struct PeerInfo { /// The peers reputation - score: Score, + pub(crate) score: Score, /// Client managing this peer client: Client, /// Connection status of this peer @@ -50,7 +50,7 @@ pub struct PeerInfo { #[serde(skip)] min_ttl: Option, /// Is the peer a trusted peer. - is_trusted: bool, + pub(crate) is_trusted: bool, /// Direction of the first connection of the last (or current) connected session with this peer. /// None if this peer was never connected. connection_direction: Option, diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 8586fd9cd3..06d806ce0b 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1236,6 +1236,21 @@ impl Network { } } + /// Adds the given `enr` to the trusted peers mapping and tries to dial it + /// every heartbeat to maintain the connection. + pub fn dial_trusted_peer(&mut self, enr: Enr) { + self.peer_manager_mut().add_trusted_peer(enr.clone()); + self.peer_manager_mut().dial_peer(enr); + } + + /// Remove the given peer from the trusted peers mapping if it exists and disconnect + /// from it. + pub fn remove_trusted_peer(&mut self, enr: Enr) { + self.peer_manager_mut().remove_trusted_peer(enr.clone()); + self.peer_manager_mut() + .disconnect_peer(enr.peer_id(), GoodbyeReason::TooManyPeers); + } + /* Sub-behaviour event handling functions */ /// Handle a gossipsub event. diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index 2800b75133..b63754fd4e 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -162,6 +162,18 @@ impl NetworkGlobals { .unwrap_or_default() } + pub fn add_trusted_peer(&self, enr: Enr) { + self.peers.write().set_trusted_peer(enr); + } + + pub fn remove_trusted_peer(&self, enr: Enr) { + self.peers.write().unset_trusted_peer(enr); + } + + pub fn trusted_peers(&self) -> Vec { + self.peers.read().trusted_peers() + } + /// Updates the syncing state of the node. /// /// The old state is returned diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 1b2a681c64..42889169a2 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -14,6 +14,7 @@ use futures::StreamExt; use lighthouse_network::rpc::{RequestId, RequestType}; use lighthouse_network::service::Network; use lighthouse_network::types::GossipKind; +use lighthouse_network::Enr; use lighthouse_network::{prometheus_client::registry::Registry, MessageAcceptance}; use lighthouse_network::{ rpc::{GoodbyeReason, RpcErrorResponse}, @@ -101,6 +102,10 @@ pub enum NetworkMessage { reason: GoodbyeReason, source: ReportSource, }, + /// Connect to a trusted peer and try to maintain the connection. + ConnectTrustedPeer(Enr), + /// Disconnect from a trusted peer and remove it from the `trusted_peers` mapping. + DisconnectTrustedPeer(Enr), } /// Messages triggered by validators that may trigger a subscription to a subnet. @@ -688,6 +693,12 @@ impl NetworkService { reason, source, } => self.libp2p.goodbye_peer(&peer_id, reason, source), + NetworkMessage::ConnectTrustedPeer(enr) => { + self.libp2p.dial_trusted_peer(enr); + } + NetworkMessage::DisconnectTrustedPeer(enr) => { + self.libp2p.remove_trusted_peer(enr); + } NetworkMessage::SubscribeCoreTopics => { if self.subscribed_core_topics() { return; diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index badc4857c4..2f61c52476 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -9,7 +9,8 @@ mod sync_committee_rewards; use crate::{ types::{ - DepositTreeSnapshot, Epoch, EthSpec, FinalizedExecutionBlock, GenericResponse, ValidatorId, + AdminPeer, DepositTreeSnapshot, Epoch, EthSpec, FinalizedExecutionBlock, GenericResponse, + ValidatorId, }, BeaconNodeHttpClient, DepositData, Error, Eth1Data, Hash256, Slot, }; @@ -406,6 +407,30 @@ impl BeaconNodeHttpClient { self.post_with_response(path, &()).await } + /// `POST lighthouse/add_peer` + pub async fn post_lighthouse_add_peer(&self, req: AdminPeer) -> Result<(), Error> { + let mut path = self.server.full.clone(); + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("lighthouse") + .push("add_peer"); + + self.post_with_response(path, &req).await + } + + /// `POST lighthouse/remove_peer` + pub async fn post_lighthouse_remove_peer(&self, req: AdminPeer) -> Result<(), Error> { + let mut path = self.server.full.clone(); + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("lighthouse") + .push("remove_peer"); + + self.post_with_response(path, &req).await + } + /* Analysis endpoints. */ diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 7d70d242be..dd4f5437ae 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -1431,6 +1431,11 @@ pub struct ManualFinalizationRequestData { pub block_root: Hash256, } +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct AdminPeer { + pub enr: String, +} + #[derive(Debug, Serialize, Deserialize)] pub struct LivenessRequestData { pub epoch: Epoch,