mirror of
https://github.com/sigp/lighthouse.git
synced 2026-06-30 19:34:37 +00:00
Bump warp and begin axum migration (#9001)
- Bump `warp` to 0.4. This unifies `warp` and `axum` onto the same `http`, `hyper`, `h2`, `rustls`, etc versions. - Create `axum_utils` which contain common functions and types - Begins migration of all HTTP API servers from warp to axum Co-Authored-By: Mac L <mjladson@pm.me>
This commit is contained in:
@@ -13,6 +13,8 @@ testing = ["dep:deposit_contract", "dep:doppelganger_service", "dep:tempfile"]
|
||||
|
||||
[dependencies]
|
||||
account_utils = { workspace = true }
|
||||
axum = { workspace = true }
|
||||
axum_utils = { workspace = true }
|
||||
beacon_node_fallback = { workspace = true }
|
||||
bls = { workspace = true }
|
||||
deposit_contract = { workspace = true, optional = true }
|
||||
@@ -42,6 +44,7 @@ sysinfo = { workspace = true }
|
||||
system_health = { workspace = true }
|
||||
task_executor = { workspace = true }
|
||||
tempfile = { workspace = true, optional = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tokio-stream = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
|
||||
@@ -9,23 +9,18 @@ mod keystores;
|
||||
mod remotekeys;
|
||||
mod tests;
|
||||
|
||||
pub use api_secret::PK_FILENAME;
|
||||
|
||||
use graffiti::{delete_graffiti, get_graffiti, set_graffiti};
|
||||
|
||||
use create_signed_voluntary_exit::create_signed_voluntary_exit;
|
||||
use graffiti_file::{GraffitiFile, determine_graffiti};
|
||||
use lighthouse_validator_store::LighthouseValidatorStore;
|
||||
use validator_store::ValidatorStore;
|
||||
pub use api_secret::{ApiSecret, PK_FILENAME};
|
||||
|
||||
use account_utils::{
|
||||
mnemonic_from_phrase,
|
||||
validator_definitions::{SigningDefinition, ValidatorDefinition, Web3SignerDefinition},
|
||||
};
|
||||
pub use api_secret::ApiSecret;
|
||||
use axum::Router;
|
||||
use axum_utils::server::Server;
|
||||
use beacon_node_fallback::CandidateInfo;
|
||||
use bls::{PublicKey, PublicKeyBytes};
|
||||
use core::convert::Infallible;
|
||||
use create_signed_voluntary_exit::create_signed_voluntary_exit;
|
||||
use create_validator::{
|
||||
create_validators_mnemonic, create_validators_web3signer, get_voting_password_storage,
|
||||
};
|
||||
@@ -37,7 +32,10 @@ use eth2::lighthouse_vc::{
|
||||
UpdateCandidatesRequest, UpdateCandidatesResponse,
|
||||
},
|
||||
};
|
||||
use graffiti::{delete_graffiti, get_graffiti, set_graffiti};
|
||||
use graffiti_file::{GraffitiFile, determine_graffiti};
|
||||
use health_metrics::observe::Observe;
|
||||
use lighthouse_validator_store::LighthouseValidatorStore;
|
||||
use lighthouse_version::version_with_platform;
|
||||
use logging::SSELoggingComponents;
|
||||
use logging::crit;
|
||||
@@ -54,26 +52,27 @@ use sysinfo::{System, SystemExt};
|
||||
use system_health::observe_system_health_vc;
|
||||
use task_executor::TaskExecutor;
|
||||
use tokio_stream::{StreamExt, wrappers::BroadcastStream};
|
||||
use tracing::{info, warn};
|
||||
use tracing::{error, info, warn};
|
||||
use types::{ChainSpec, ConfigAndPreset, EthSpec};
|
||||
use validator_dir::Builder as ValidatorDirBuilder;
|
||||
use validator_services::block_service::BlockService;
|
||||
use validator_store::ValidatorStore;
|
||||
use warp::{Filter, reply::Response, sse::Event};
|
||||
use warp_utils::reject::convert_rejection;
|
||||
use warp_utils::task::blocking_json_task;
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum Error {
|
||||
Warp(warp::Error),
|
||||
#[error("Builder error: {0}")]
|
||||
Builder(#[from] axum_utils::server::BuilderError),
|
||||
#[error("Server error: {0}")]
|
||||
Server(#[from] axum_utils::server::ServerError),
|
||||
#[error("Warp error: {0}")]
|
||||
Warp(#[from] warp::Error),
|
||||
#[error("{0}")]
|
||||
Other(String),
|
||||
}
|
||||
|
||||
impl From<warp::Error> for Error {
|
||||
fn from(e: warp::Error) -> Self {
|
||||
Error::Warp(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<String> for Error {
|
||||
fn from(e: String) -> Self {
|
||||
Error::Other(e)
|
||||
@@ -148,7 +147,7 @@ impl Default for Config {
|
||||
///
|
||||
/// Returns an error if the server is unable to bind or there is another error during
|
||||
/// configuration.
|
||||
pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
|
||||
pub async fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
|
||||
ctx: Arc<Context<T, E>>,
|
||||
shutdown: impl Future<Output = ()> + Send + Sync + 'static,
|
||||
) -> Result<(SocketAddr, impl Future<Output = ()>), Error> {
|
||||
@@ -1399,20 +1398,33 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
|
||||
.recover(warp_utils::reject::handle_rejection)
|
||||
// Add a `Server` header.
|
||||
.map(|reply| warp::reply::with_header(reply, "Server", &version_with_platform()))
|
||||
.with(cors_builder.build());
|
||||
.with(cors_builder.build())
|
||||
.boxed();
|
||||
|
||||
let (listening_socket, server) = warp::serve(routes).try_bind_with_graceful_shutdown(
|
||||
SocketAddr::new(config.listen_addr, config.listen_port),
|
||||
async {
|
||||
shutdown.await;
|
||||
},
|
||||
)?;
|
||||
let axum_router = Router::new().fallback_service(warp::service(routes));
|
||||
|
||||
let address = SocketAddr::new(config.listen_addr, config.listen_port);
|
||||
|
||||
let server = Server::builder(axum_router, address).build().await?;
|
||||
|
||||
let (address, server) = server.serve_with_shutdown(shutdown).await?;
|
||||
|
||||
info!(
|
||||
listen_address = listening_socket.to_string(),
|
||||
listen_address = %address,
|
||||
?api_token_path,
|
||||
"HTTP API started"
|
||||
);
|
||||
|
||||
Ok((listening_socket, server))
|
||||
let server_future = async move {
|
||||
match server.await {
|
||||
Ok(()) => {
|
||||
info!("HTTP API server stopped");
|
||||
}
|
||||
Err(e) => {
|
||||
error!(error = ?e, "HTTP API server error");
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok((address, server_future))
|
||||
}
|
||||
|
||||
@@ -149,7 +149,7 @@ impl ApiTester {
|
||||
// It's not really interesting why this triggered, just that it happened.
|
||||
let _ = shutdown_rx.await;
|
||||
};
|
||||
let (listening_socket, server) = super::serve::<_, E>(ctx, server_shutdown).unwrap();
|
||||
let (listening_socket, server) = super::serve::<_, E>(ctx, server_shutdown).await.unwrap();
|
||||
|
||||
tokio::spawn(server);
|
||||
|
||||
|
||||
@@ -135,7 +135,9 @@ impl ApiTester {
|
||||
});
|
||||
let ctx = context.clone();
|
||||
let (listening_socket, server) =
|
||||
super::serve::<_, E>(ctx, test_runtime.task_executor.exit()).unwrap();
|
||||
super::serve::<_, E>(ctx, test_runtime.task_executor.exit())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
tokio::spawn(server);
|
||||
|
||||
|
||||
@@ -5,6 +5,8 @@ edition = { workspace = true }
|
||||
authors = ["Sigma Prime <contact@sigmaprime.io>"]
|
||||
|
||||
[dependencies]
|
||||
axum = { workspace = true }
|
||||
axum_utils = { workspace = true }
|
||||
health_metrics = { workspace = true }
|
||||
lighthouse_validator_store = { workspace = true }
|
||||
lighthouse_version = { workspace = true }
|
||||
@@ -14,9 +16,8 @@ metrics = { workspace = true }
|
||||
parking_lot = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
slot_clock = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
types = { workspace = true }
|
||||
validator_metrics = { workspace = true }
|
||||
validator_services = { workspace = true }
|
||||
warp = { workspace = true }
|
||||
warp_utils = { workspace = true }
|
||||
|
||||
@@ -2,8 +2,15 @@
|
||||
//!
|
||||
//! For other endpoints, see the `http_api` crate.
|
||||
|
||||
use axum::{
|
||||
Router,
|
||||
extract::State,
|
||||
http::{Method, StatusCode, header},
|
||||
response::IntoResponse,
|
||||
routing::get,
|
||||
};
|
||||
use axum_utils::{Server, cors::build_cors_layer, middleware::add_server_header};
|
||||
use lighthouse_validator_store::LighthouseValidatorStore;
|
||||
use lighthouse_version::version_with_platform;
|
||||
use logging::crit;
|
||||
use malloc_utils::scrape_allocator_metrics;
|
||||
use parking_lot::RwLock;
|
||||
@@ -13,21 +20,20 @@ use std::future::Future;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
use std::sync::Arc;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use tracing::info;
|
||||
use tracing::{error, info};
|
||||
use types::EthSpec;
|
||||
use validator_services::duties_service::DutiesService;
|
||||
use warp::{Filter, http::Response};
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum Error {
|
||||
Warp(#[allow(dead_code)] warp::Error),
|
||||
Other(#[allow(dead_code)] String),
|
||||
}
|
||||
|
||||
impl From<warp::Error> for Error {
|
||||
fn from(e: warp::Error) -> Self {
|
||||
Error::Warp(e)
|
||||
}
|
||||
#[error("Builder error: {0}")]
|
||||
Builder(#[from] axum_utils::server::BuilderError),
|
||||
#[error("Server error: {0}")]
|
||||
Server(#[from] axum_utils::server::ServerError),
|
||||
#[error("CORS error: {0}")]
|
||||
Cors(#[from] axum_utils::cors::CorsError),
|
||||
#[error("{0}")]
|
||||
Other(String),
|
||||
}
|
||||
|
||||
impl From<String> for Error {
|
||||
@@ -90,26 +96,12 @@ impl Default for Config {
|
||||
///
|
||||
/// Returns an error if the server is unable to bind or there is another error during
|
||||
/// configuration.
|
||||
pub fn serve<E: EthSpec>(
|
||||
pub async fn serve<E: EthSpec>(
|
||||
ctx: Arc<Context<E>>,
|
||||
shutdown: impl Future<Output = ()> + Send + Sync + 'static,
|
||||
) -> Result<(SocketAddr, impl Future<Output = ()>), Error> {
|
||||
let config = &ctx.config;
|
||||
|
||||
// Configure CORS.
|
||||
let cors_builder = {
|
||||
let builder = warp::cors()
|
||||
.allow_method("GET")
|
||||
.allow_headers(vec!["Content-Type"]);
|
||||
|
||||
warp_utils::cors::set_builder_origins(
|
||||
builder,
|
||||
config.allow_origin.as_deref(),
|
||||
(config.listen_addr, config.listen_port),
|
||||
)?
|
||||
};
|
||||
|
||||
// Sanity check.
|
||||
if !config.enabled {
|
||||
crit!("Cannot start disabled metrics HTTP server");
|
||||
return Err(Error::Other(
|
||||
@@ -117,46 +109,52 @@ pub fn serve<E: EthSpec>(
|
||||
));
|
||||
}
|
||||
|
||||
let inner_ctx = ctx.clone();
|
||||
let routes = warp::get()
|
||||
.and(warp::path("metrics"))
|
||||
.map(move || inner_ctx.clone())
|
||||
.and_then(|ctx: Arc<Context<E>>| async move {
|
||||
Ok::<_, warp::Rejection>(
|
||||
gather_prometheus_metrics(&ctx)
|
||||
.map(|body| {
|
||||
Response::builder()
|
||||
.status(200)
|
||||
.header("Content-Type", "text/plain")
|
||||
.body(body)
|
||||
.unwrap()
|
||||
})
|
||||
.unwrap_or_else(|e| {
|
||||
Response::builder()
|
||||
.status(500)
|
||||
.header("Content-Type", "text/plain")
|
||||
.body(format!("Unable to gather metrics: {:?}", e))
|
||||
.unwrap()
|
||||
}),
|
||||
)
|
||||
})
|
||||
// Add a `Server` header.
|
||||
.map(|reply| warp::reply::with_header(reply, "Server", &version_with_platform()))
|
||||
.with(cors_builder.build());
|
||||
let cors_layer = build_cors_layer(
|
||||
config.allow_origin.as_deref(),
|
||||
config.listen_addr,
|
||||
config.listen_port,
|
||||
)?
|
||||
.allow_methods([Method::GET])
|
||||
.allow_headers([header::CONTENT_TYPE]);
|
||||
|
||||
let (listening_socket, server) = warp::serve(routes).try_bind_with_graceful_shutdown(
|
||||
SocketAddr::new(config.listen_addr, config.listen_port),
|
||||
async {
|
||||
shutdown.await;
|
||||
},
|
||||
)?;
|
||||
let server_header: header::HeaderValue = lighthouse_version::version_with_platform()
|
||||
.parse()
|
||||
.map_err(|e| Error::Other(format!("invalid version header value: {e}")))?;
|
||||
|
||||
let router = Router::new()
|
||||
.route("/metrics", get(metrics_handler::<E>))
|
||||
.with_state(ctx.clone())
|
||||
.layer(add_server_header(server_header))
|
||||
.layer(cors_layer);
|
||||
|
||||
let address = SocketAddr::new(config.listen_addr, config.listen_port);
|
||||
let server = Server::builder(router, address).build().await?;
|
||||
|
||||
let (address, server) = server.serve_with_shutdown(shutdown).await?;
|
||||
|
||||
info!(
|
||||
listen_address = listening_socket.to_string(),
|
||||
listen_address = %address,
|
||||
"Metrics HTTP server started"
|
||||
);
|
||||
|
||||
Ok((listening_socket, server))
|
||||
let server_future = async move {
|
||||
if let Err(e) = server.await {
|
||||
error!(error = ?e, "Metrics HTTP server error");
|
||||
}
|
||||
};
|
||||
|
||||
Ok((address, server_future))
|
||||
}
|
||||
|
||||
async fn metrics_handler<E: EthSpec>(State(ctx): State<Arc<Context<E>>>) -> impl IntoResponse {
|
||||
match gather_prometheus_metrics(&ctx) {
|
||||
Ok(body) => (StatusCode::OK, [(header::CONTENT_TYPE, "text/plain")], body),
|
||||
Err(e) => (
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
[(header::CONTENT_TYPE, "text/plain")],
|
||||
format!("Unable to gather metrics: {:?}", e),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn gather_prometheus_metrics<E: EthSpec>(
|
||||
|
||||
@@ -153,6 +153,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
|
||||
let exit = context.executor.exit();
|
||||
|
||||
let (_listen_addr, server) = validator_http_metrics::serve(ctx.clone(), exit)
|
||||
.await
|
||||
.map_err(|e| format!("Unable to start metrics API server: {:?}", e))?;
|
||||
|
||||
context
|
||||
@@ -622,6 +623,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
|
||||
let exit = self.context.executor.exit();
|
||||
|
||||
let (listen_addr, server) = validator_http_api::serve::<_, E>(ctx, exit)
|
||||
.await
|
||||
.map_err(|e| format!("Unable to start HTTP API server: {:?}", e))?;
|
||||
|
||||
self.context
|
||||
|
||||
Reference in New Issue
Block a user