Files
lighthouse/common/monitoring_api/src/lib.rs
Mac L 3066f0bef2 Prepare sensitive_url for crates.io (#8223)
Another good candidate for publishing separately from Lighthouse is `sensitive_url` as it's a general utility crate and not related to Ethereum. This PR prepares it to be spun out into its own crate.


  I've made the `full` field on `SensitiveUrl` private and instead provided an explicit getter called `.expose_full()`. It's a bit ugly for the diff but I prefer the explicit nature of the getter.
I've also added some extra tests and doc strings along with feature gating `Serialize` and `Deserialize` implementations behind the `serde` feature.


Co-Authored-By: Mac L <mjladson@pm.me>
2025-11-05 07:46:32 +00:00

216 lines
7.6 KiB
Rust

mod gather;
mod types;
use std::{path::PathBuf, time::Duration};
use eth2::lighthouse::SystemHealth;
use gather::{gather_beacon_metrics, gather_validator_metrics};
use health_metrics::observe::Observe;
use reqwest::{IntoUrl, Response};
pub use reqwest::{StatusCode, Url};
use sensitive_url::SensitiveUrl;
use serde::{Deserialize, Serialize};
use task_executor::TaskExecutor;
use tokio::time::{Instant, interval_at};
use tracing::{debug, error, info};
use types::*;
pub use types::ProcessType;
/// Duration after which we collect and send metrics to remote endpoint.
pub const DEFAULT_UPDATE_DURATION: u64 = 60;
/// Timeout for HTTP requests.
pub const TIMEOUT_DURATION: u64 = 5;
#[derive(Debug)]
pub enum Error {
/// The `reqwest` client raised an error.
Reqwest(reqwest::Error),
/// The supplied URL is badly formatted. It should look something like `http://127.0.0.1:5052`.
InvalidUrl(SensitiveUrl),
SystemMetricsFailed(String),
BeaconMetricsFailed(String),
ValidatorMetricsFailed(String),
/// The server returned an error message where the body was able to be parsed.
ServerMessage(ErrorMessage),
/// The server returned an error message where the body was unable to be parsed.
StatusCode(StatusCode),
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self {
Error::Reqwest(e) => write!(f, "Reqwest error: {}", e),
// Print the debug value
e => write!(f, "{:?}", e),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct Config {
/// Endpoint
pub monitoring_endpoint: String,
/// Path for the hot database required for fetching beacon db size metrics.
/// Note: not relevant for validator and system metrics.
pub db_path: Option<PathBuf>,
/// Path for the cold database required for fetching beacon db size metrics.
/// Note: not relevant for validator and system metrics.
pub freezer_db_path: Option<PathBuf>,
/// User-defined update period in seconds.
pub update_period_secs: Option<u64>,
}
#[derive(Clone)]
pub struct MonitoringHttpClient {
client: reqwest::Client,
/// Path to the hot database. Required for getting db size metrics
db_path: Option<PathBuf>,
/// Path to the freezer database.
freezer_db_path: Option<PathBuf>,
update_period: Duration,
monitoring_endpoint: SensitiveUrl,
}
impl MonitoringHttpClient {
pub fn new(config: &Config) -> Result<Self, String> {
Ok(Self {
client: reqwest::Client::new(),
db_path: config.db_path.clone(),
freezer_db_path: config.freezer_db_path.clone(),
update_period: Duration::from_secs(
config.update_period_secs.unwrap_or(DEFAULT_UPDATE_DURATION),
),
monitoring_endpoint: SensitiveUrl::parse(&config.monitoring_endpoint)
.map_err(|e| format!("Invalid monitoring endpoint: {:?}", e))?,
})
}
/// Perform a HTTP POST request.
async fn post<T: Serialize, U: IntoUrl>(&self, url: U, body: &T) -> Result<(), Error> {
let response = self
.client
.post(url)
.json(body)
.timeout(Duration::from_secs(TIMEOUT_DURATION))
.send()
.await
.map_err(Error::Reqwest)?;
ok_or_error(response).await?;
Ok(())
}
/// Creates a task which periodically sends the provided process metrics
/// to the configured remote endpoint.
pub fn auto_update(self, executor: TaskExecutor, processes: Vec<ProcessType>) {
let mut interval = interval_at(
// Have some initial delay for the metrics to get initialized
Instant::now() + Duration::from_secs(25),
self.update_period,
);
info!(
endpoint = %self.monitoring_endpoint,
update_period = format!("{}s", self.update_period.as_secs()),
"Starting monitoring API"
);
let update_future = async move {
loop {
interval.tick().await;
match self.send_metrics(&processes).await {
Ok(()) => {
debug!(endpoint = %self.monitoring_endpoint, "Metrics sent to remote server");
}
Err(e) => {
error!(error = %e, "Failed to send metrics to remote endpoint")
}
}
}
};
executor.spawn(update_future, "monitoring_api");
}
/// Gets beacon metrics and updates the metrics struct
pub fn get_beacon_metrics(&self) -> Result<MonitoringMetrics, Error> {
let db_path = self.db_path.as_ref().ok_or_else(|| {
Error::BeaconMetricsFailed("Beacon metrics require db path".to_string())
})?;
let freezer_db_path = self.freezer_db_path.as_ref().ok_or_else(|| {
Error::BeaconMetricsFailed("Beacon metrics require freezer db path".to_string())
})?;
let metrics =
gather_beacon_metrics(db_path, freezer_db_path).map_err(Error::BeaconMetricsFailed)?;
Ok(MonitoringMetrics {
metadata: Metadata::new(ProcessType::BeaconNode),
process_metrics: Process::Beacon(metrics),
})
}
/// Gets validator process metrics by querying the validator metrics endpoint
pub fn get_validator_metrics(&self) -> Result<MonitoringMetrics, Error> {
let metrics = gather_validator_metrics().map_err(Error::BeaconMetricsFailed)?;
Ok(MonitoringMetrics {
metadata: Metadata::new(ProcessType::Validator),
process_metrics: Process::Validator(metrics),
})
}
/// Gets system metrics by observing capturing the SystemHealth metrics.
pub fn get_system_metrics(&self) -> Result<MonitoringMetrics, Error> {
let system_health = SystemHealth::observe().map_err(Error::SystemMetricsFailed)?;
Ok(MonitoringMetrics {
metadata: Metadata::new(ProcessType::System),
process_metrics: Process::System(system_health.into()),
})
}
/// Return metric based on process type.
pub async fn get_metrics(
&self,
process_type: &ProcessType,
) -> Result<MonitoringMetrics, Error> {
match process_type {
ProcessType::BeaconNode => self.get_beacon_metrics(),
ProcessType::System => self.get_system_metrics(),
ProcessType::Validator => self.get_validator_metrics(),
}
}
/// Send metrics to the remote endpoint
pub async fn send_metrics(&self, processes: &[ProcessType]) -> Result<(), Error> {
let mut metrics = Vec::new();
for process in processes {
match self.get_metrics(process).await {
Err(e) => error!(
process_type = ?process,
error = %e,
"Failed to get metrics"
),
Ok(metric) => metrics.push(metric),
}
}
info!(
endpoint = %self.monitoring_endpoint,
"Sending metrics to remote endpoint"
);
self.post(self.monitoring_endpoint.expose_full().clone(), &metrics)
.await
}
}
/// Returns `Ok(response)` if the response is a `200 OK` response. Otherwise, creates an
/// appropriate error message.
async fn ok_or_error(response: Response) -> Result<Response, Error> {
let status = response.status();
if status == StatusCode::OK {
Ok(response)
} else if let Ok(message) = response.json().await {
Err(Error::ServerMessage(message))
} else {
Err(Error::StatusCode(status))
}
}