Admin add/remove peer (#7198)

N/A


  Adds endpoints to add and remove trusted peers from the http api. The added peers are trusted peers so they won't be disconnected for bad scores. We try to maintain a connection to the peer in case they disconnect from us by trying to dial it every heartbeat.
This commit is contained in:
Pawan Dhananjay
2025-03-28 05:59:09 -07:00
committed by GitHub
parent a5ea05ce2a
commit 54aef2d043
10 changed files with 217 additions and 7 deletions

View File

@@ -53,7 +53,7 @@ use eth2::types::{
use eth2::{CONSENSUS_VERSION_HEADER, CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
use health_metrics::observe::Observe;
use lighthouse_network::rpc::methods::MetaData;
use lighthouse_network::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage};
use lighthouse_network::{types::SyncState, Enr, EnrExt, NetworkGlobals, PeerId, PubsubMessage};
use lighthouse_version::version_with_platform;
use logging::SSELoggingComponents;
use network::{NetworkMessage, NetworkSenders, ValidatorSubscriptionMessage};
@@ -72,6 +72,7 @@ use std::future::Future;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::PathBuf;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use sysinfo::{System, SystemExt};
use system_health::{observe_nat, observe_system_health_bn};
@@ -3676,7 +3677,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(warp_utils::json::json())
.and(network_tx_filter)
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|not_synced_filter: Result<(), Rejection>,
@@ -4122,6 +4123,77 @@ pub fn serve<T: BeaconChainTypes>(
},
);
// POST lighthouse/add_peer
let post_lighthouse_add_peer = warp::path("lighthouse")
.and(warp::path("add_peer"))
.and(warp::path::end())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(network_globals.clone())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|request_data: api_types::AdminPeer,
task_spawner: TaskSpawner<T::EthSpec>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.blocking_json_task(Priority::P0, move || {
let enr = Enr::from_str(&request_data.enr).map_err(|e| {
warp_utils::reject::custom_bad_request(format!("invalid enr error {}", e))
})?;
info!(
log,
"Adding trusted peer";
"peer_id" => %enr.peer_id(),
"multiaddr" => ?enr.multiaddr()
);
network_globals.add_trusted_peer(enr.clone());
publish_network_message(&network_tx, NetworkMessage::ConnectTrustedPeer(enr))?;
Ok(())
})
},
);
// POST lighthouse/remove_peer
let post_lighthouse_remove_peer = warp::path("lighthouse")
.and(warp::path("remove_peer"))
.and(warp::path::end())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(network_globals.clone())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|request_data: api_types::AdminPeer,
task_spawner: TaskSpawner<T::EthSpec>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.blocking_json_task(Priority::P0, move || {
let enr = Enr::from_str(&request_data.enr).map_err(|e| {
warp_utils::reject::custom_bad_request(format!("invalid enr error {}", e))
})?;
info!(
log,
"Removing trusted peer";
"peer_id" => %enr.peer_id(),
"multiaddr" => ?enr.multiaddr()
);
network_globals.remove_trusted_peer(enr.clone());
publish_network_message(
&network_tx,
NetworkMessage::DisconnectTrustedPeer(enr),
)?;
Ok(())
})
},
);
// POST lighthouse/liveness
let post_lighthouse_liveness = warp::path("lighthouse")
.and(warp::path("liveness"))
@@ -4896,6 +4968,8 @@ pub fn serve<T: BeaconChainTypes>(
.uor(post_lighthouse_ui_validator_info)
.uor(post_lighthouse_finalize)
.uor(post_lighthouse_compaction)
.uor(post_lighthouse_add_peer)
.uor(post_lighthouse_remove_peer)
.recover(warp_utils::reject::handle_rejection),
),
)

View File

@@ -5768,6 +5768,27 @@ impl ApiTester {
self
}
pub async fn test_post_lighthouse_add_remove_peer(self) -> Self {
let trusted_peers = self.ctx.network_globals.as_ref().unwrap().trusted_peers();
// Check that there aren't any trusted peers on startup
assert!(trusted_peers.is_empty());
let enr = AdminPeer {enr: "enr:-QESuEDpVVjo8dmDuneRhLnXdIGY3e9NQiaG4sJR3GS-VMQCQDsmBYoQhJRaPeZzPlTsZj2F8v-iV4lKJEYIRIyztqexHodhdHRuZXRziAwAAAAAAAAAhmNsaWVudNiKTGlnaHRob3VzZYw3LjAuMC1iZXRhLjSEZXRoMpDS8Zl_YAAJEAAIAAAAAAAAgmlkgnY0gmlwhIe11XmDaXA2kCoBBPkAOitZAAAAAAAAAAKEcXVpY4IjKYVxdWljNoIjg4lzZWNwMjU2azGhA43ihEr9BUVVnIHIfFqBR3Izs4YRHHPsTqIbUgEb3Hc8iHN5bmNuZXRzD4N0Y3CCIyiEdGNwNoIjgoN1ZHCCIyiEdWRwNoIjgg".to_string()};
self.client
.post_lighthouse_add_peer(enr.clone())
.await
.unwrap();
let trusted_peers = self.ctx.network_globals.as_ref().unwrap().trusted_peers();
// Should have 1 trusted peer
assert_eq!(trusted_peers.len(), 1);
self.client.post_lighthouse_remove_peer(enr).await.unwrap();
let trusted_peers = self.ctx.network_globals.as_ref().unwrap().trusted_peers();
// Should be empty after removing
assert!(trusted_peers.is_empty());
self
}
pub async fn test_post_lighthouse_liveness(self) -> Self {
let epoch = self.chain.epoch().unwrap();
let head_state = self.chain.head_beacon_state_cloned();
@@ -7334,6 +7355,8 @@ async fn lighthouse_endpoints() {
.test_post_lighthouse_database_reconstruct()
.await
.test_post_lighthouse_liveness()
.await
.test_post_lighthouse_add_remove_peer()
.await;
}

View File

@@ -114,6 +114,7 @@ pub struct PeerManager<E: EthSpec> {
metrics_enabled: bool,
/// Keeps track of whether the QUIC protocol is enabled or not.
quic_enabled: bool,
trusted_peers: HashSet<Enr>,
/// The logger associated with the `PeerManager`.
log: slog::Logger,
}
@@ -195,6 +196,7 @@ impl<E: EthSpec> PeerManager<E> {
discovery_enabled,
metrics_enabled,
quic_enabled,
trusted_peers: Default::default(),
log: log.clone(),
})
}
@@ -894,7 +896,7 @@ impl<E: EthSpec> PeerManager<E> {
}
// Gracefully disconnects a peer without banning them.
fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
pub fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
self.events
.push(PeerManagerEvent::DisconnectPeer(peer_id, reason));
self.network_globals
@@ -943,6 +945,13 @@ impl<E: EthSpec> PeerManager<E> {
}
}
fn maintain_trusted_peers(&mut self) {
let trusted_peers = self.trusted_peers.clone();
for trusted_peer in trusted_peers {
self.dial_peer(trusted_peer);
}
}
/// This function checks the status of our current peers and optionally requests a discovery
/// query if we need to find more peers to maintain the current number of peers
fn maintain_peer_count(&mut self, dialing_peers: usize) {
@@ -1234,6 +1243,7 @@ impl<E: EthSpec> PeerManager<E> {
fn heartbeat(&mut self) {
// Optionally run a discovery query if we need more peers.
self.maintain_peer_count(0);
self.maintain_trusted_peers();
// Cleans up the connection state of dialing peers.
// Libp2p dials peer-ids, but sometimes the response is from another peer-id or libp2p
@@ -1470,6 +1480,14 @@ impl<E: EthSpec> PeerManager<E> {
)
})
}
pub fn add_trusted_peer(&mut self, enr: Enr) {
self.trusted_peers.insert(enr);
}
pub fn remove_trusted_peer(&mut self, enr: Enr) {
self.trusted_peers.remove(&enr);
}
}
enum ConnectingType {

View File

@@ -9,7 +9,7 @@ use std::net::IpAddr;
use std::time::Instant;
use std::{cmp::Ordering, fmt::Display};
use std::{
collections::{HashMap, HashSet},
collections::{hash_map::Entry, HashMap, HashSet},
fmt::Formatter,
};
use sync_status::SyncStatus;
@@ -79,6 +79,33 @@ impl<E: EthSpec> PeerDB<E> {
self.peers.iter()
}
pub fn set_trusted_peer(&mut self, enr: Enr) {
match self.peers.entry(enr.peer_id()) {
Entry::Occupied(mut info) => {
let entry = info.get_mut();
entry.score = Score::max_score();
entry.is_trusted = true;
}
Entry::Vacant(entry) => {
entry.insert(PeerInfo::trusted_peer_info());
}
}
}
pub fn unset_trusted_peer(&mut self, enr: Enr) {
if let Some(info) = self.peers.get_mut(&enr.peer_id()) {
info.is_trusted = false;
info.score = Score::default();
}
}
pub fn trusted_peers(&self) -> Vec<PeerId> {
self.peers
.iter()
.filter_map(|(id, info)| if info.is_trusted { Some(*id) } else { None })
.collect()
}
/// Gives the ids of all known peers.
pub fn peer_ids(&self) -> impl Iterator<Item = &PeerId> {
self.peers.keys()

View File

@@ -21,7 +21,7 @@ use PeerConnectionStatus::*;
#[serde(bound = "E: EthSpec")]
pub struct PeerInfo<E: EthSpec> {
/// The peers reputation
score: Score,
pub(crate) score: Score,
/// Client managing this peer
client: Client,
/// Connection status of this peer
@@ -50,7 +50,7 @@ pub struct PeerInfo<E: EthSpec> {
#[serde(skip)]
min_ttl: Option<Instant>,
/// Is the peer a trusted peer.
is_trusted: bool,
pub(crate) is_trusted: bool,
/// Direction of the first connection of the last (or current) connected session with this peer.
/// None if this peer was never connected.
connection_direction: Option<ConnectionDirection>,

View File

@@ -1236,6 +1236,21 @@ impl<E: EthSpec> Network<E> {
}
}
/// Adds the given `enr` to the trusted peers mapping and tries to dial it
/// every heartbeat to maintain the connection.
pub fn dial_trusted_peer(&mut self, enr: Enr) {
self.peer_manager_mut().add_trusted_peer(enr.clone());
self.peer_manager_mut().dial_peer(enr);
}
/// Remove the given peer from the trusted peers mapping if it exists and disconnect
/// from it.
pub fn remove_trusted_peer(&mut self, enr: Enr) {
self.peer_manager_mut().remove_trusted_peer(enr.clone());
self.peer_manager_mut()
.disconnect_peer(enr.peer_id(), GoodbyeReason::TooManyPeers);
}
/* Sub-behaviour event handling functions */
/// Handle a gossipsub event.

View File

@@ -162,6 +162,18 @@ impl<E: EthSpec> NetworkGlobals<E> {
.unwrap_or_default()
}
pub fn add_trusted_peer(&self, enr: Enr) {
self.peers.write().set_trusted_peer(enr);
}
pub fn remove_trusted_peer(&self, enr: Enr) {
self.peers.write().unset_trusted_peer(enr);
}
pub fn trusted_peers(&self) -> Vec<PeerId> {
self.peers.read().trusted_peers()
}
/// Updates the syncing state of the node.
///
/// The old state is returned

View File

@@ -14,6 +14,7 @@ use futures::StreamExt;
use lighthouse_network::rpc::{RequestId, RequestType};
use lighthouse_network::service::Network;
use lighthouse_network::types::GossipKind;
use lighthouse_network::Enr;
use lighthouse_network::{prometheus_client::registry::Registry, MessageAcceptance};
use lighthouse_network::{
rpc::{GoodbyeReason, RpcErrorResponse},
@@ -101,6 +102,10 @@ pub enum NetworkMessage<E: EthSpec> {
reason: GoodbyeReason,
source: ReportSource,
},
/// Connect to a trusted peer and try to maintain the connection.
ConnectTrustedPeer(Enr),
/// Disconnect from a trusted peer and remove it from the `trusted_peers` mapping.
DisconnectTrustedPeer(Enr),
}
/// Messages triggered by validators that may trigger a subscription to a subnet.
@@ -688,6 +693,12 @@ impl<T: BeaconChainTypes> NetworkService<T> {
reason,
source,
} => self.libp2p.goodbye_peer(&peer_id, reason, source),
NetworkMessage::ConnectTrustedPeer(enr) => {
self.libp2p.dial_trusted_peer(enr);
}
NetworkMessage::DisconnectTrustedPeer(enr) => {
self.libp2p.remove_trusted_peer(enr);
}
NetworkMessage::SubscribeCoreTopics => {
if self.subscribed_core_topics() {
return;

View File

@@ -9,7 +9,8 @@ mod sync_committee_rewards;
use crate::{
types::{
DepositTreeSnapshot, Epoch, EthSpec, FinalizedExecutionBlock, GenericResponse, ValidatorId,
AdminPeer, DepositTreeSnapshot, Epoch, EthSpec, FinalizedExecutionBlock, GenericResponse,
ValidatorId,
},
BeaconNodeHttpClient, DepositData, Error, Eth1Data, Hash256, Slot,
};
@@ -406,6 +407,30 @@ impl BeaconNodeHttpClient {
self.post_with_response(path, &()).await
}
/// `POST lighthouse/add_peer`
pub async fn post_lighthouse_add_peer(&self, req: AdminPeer) -> Result<(), Error> {
let mut path = self.server.full.clone();
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("lighthouse")
.push("add_peer");
self.post_with_response(path, &req).await
}
/// `POST lighthouse/remove_peer`
pub async fn post_lighthouse_remove_peer(&self, req: AdminPeer) -> Result<(), Error> {
let mut path = self.server.full.clone();
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("lighthouse")
.push("remove_peer");
self.post_with_response(path, &req).await
}
/*
Analysis endpoints.
*/

View File

@@ -1431,6 +1431,11 @@ pub struct ManualFinalizationRequestData {
pub block_root: Hash256,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct AdminPeer {
pub enr: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct LivenessRequestData {
pub epoch: Epoch,