Remove VC response signing and fix HTTP error handling (#5529)

* and_then to then
remove expect
move convert_rejection to utils
remove signer from vc api

* remove key

* remove auth header

* revert

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into vc-api-fix

* merge unstable

* revert

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into vc-api-fix

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into vc-api-fix

* refactor blocking json task

* linting

* revert logging

* remove response signing checks in validtor http_api client

* remove notion of public key, prefixes, and simplify token generation

* fmt

* Remove outdated comment on public key
This commit is contained in:
Eitan Seri-Levi
2024-07-16 08:57:58 +01:00
committed by GitHub
parent 77d491bea1
commit bf2f0b02b8
8 changed files with 168 additions and 444 deletions

View File

@@ -97,7 +97,7 @@ use warp::hyper::Body;
use warp::sse::Event; use warp::sse::Event;
use warp::Reply; use warp::Reply;
use warp::{http::Response, Filter, Rejection}; use warp::{http::Response, Filter, Rejection};
use warp_utils::{query::multi_key_query, uor::UnifyingOrFilter}; use warp_utils::{query::multi_key_query, reject::convert_rejection, uor::UnifyingOrFilter};
const API_PREFIX: &str = "eth"; const API_PREFIX: &str = "eth";
@@ -1802,7 +1802,7 @@ pub fn serve<T: BeaconChainTypes>(
) )
.await .await
.map(|()| warp::reply::json(&())); .map(|()| warp::reply::json(&()));
task_spawner::convert_rejection(result).await convert_rejection(result).await
}, },
); );
@@ -3817,12 +3817,12 @@ pub fn serve<T: BeaconChainTypes>(
.await; .await;
if initial_result.is_err() { if initial_result.is_err() {
return task_spawner::convert_rejection(initial_result).await; return convert_rejection(initial_result).await;
} }
// Await a response from the builder without blocking a // Await a response from the builder without blocking a
// `BeaconProcessor` worker. // `BeaconProcessor` worker.
task_spawner::convert_rejection(rx.await.unwrap_or_else(|_| { convert_rejection(rx.await.unwrap_or_else(|_| {
Ok(warp::reply::with_status( Ok(warp::reply::with_status(
warp::reply::json(&"No response from channel"), warp::reply::json(&"No response from channel"),
eth2::StatusCode::INTERNAL_SERVER_ERROR, eth2::StatusCode::INTERNAL_SERVER_ERROR,

View File

@@ -4,6 +4,7 @@ use std::future::Future;
use tokio::sync::{mpsc::error::TrySendError, oneshot}; use tokio::sync::{mpsc::error::TrySendError, oneshot};
use types::EthSpec; use types::EthSpec;
use warp::reply::{Reply, Response}; use warp::reply::{Reply, Response};
use warp_utils::reject::convert_rejection;
/// Maps a request to a queue in the `BeaconProcessor`. /// Maps a request to a queue in the `BeaconProcessor`.
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
@@ -35,24 +36,6 @@ pub struct TaskSpawner<E: EthSpec> {
beacon_processor_send: Option<BeaconProcessorSend<E>>, beacon_processor_send: Option<BeaconProcessorSend<E>>,
} }
/// Convert a warp `Rejection` into a `Response`.
///
/// This function should *always* be used to convert rejections into responses. This prevents warp
/// from trying to backtrack in strange ways. See: https://github.com/sigp/lighthouse/issues/3404
pub async fn convert_rejection<T: Reply>(res: Result<T, warp::Rejection>) -> Response {
match res {
Ok(response) => response.into_response(),
Err(e) => match warp_utils::reject::handle_rejection(e).await {
Ok(reply) => reply.into_response(),
Err(_) => warp::reply::with_status(
warp::reply::json(&"unhandled error"),
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response(),
},
}
}
impl<E: EthSpec> TaskSpawner<E> { impl<E: EthSpec> TaskSpawner<E> {
pub fn new(beacon_processor_send: Option<BeaconProcessorSend<E>>) -> Self { pub fn new(beacon_processor_send: Option<BeaconProcessorSend<E>>) -> Self {
Self { Self {

View File

@@ -1,13 +1,10 @@
use super::{types::*, PK_LEN, SECRET_PREFIX}; use super::types::*;
use crate::Error; use crate::Error;
use account_utils::ZeroizeString; use account_utils::ZeroizeString;
use bytes::Bytes;
use libsecp256k1::{Message, PublicKey, Signature};
use reqwest::{ use reqwest::{
header::{HeaderMap, HeaderValue}, header::{HeaderMap, HeaderValue},
IntoUrl, IntoUrl,
}; };
use ring::digest::{digest, SHA256};
use sensitive_url::SensitiveUrl; use sensitive_url::SensitiveUrl;
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
use std::fmt::{self, Display}; use std::fmt::{self, Display};
@@ -24,8 +21,7 @@ use types::graffiti::GraffitiString;
pub struct ValidatorClientHttpClient { pub struct ValidatorClientHttpClient {
client: reqwest::Client, client: reqwest::Client,
server: SensitiveUrl, server: SensitiveUrl,
secret: Option<ZeroizeString>, api_token: Option<ZeroizeString>,
server_pubkey: Option<PublicKey>,
authorization_header: AuthorizationHeader, authorization_header: AuthorizationHeader,
} }
@@ -46,45 +42,13 @@ impl Display for AuthorizationHeader {
} }
} }
/// Parse an API token and return a secp256k1 public key.
///
/// If the token does not start with the Lighthouse token prefix then `Ok(None)` will be returned.
/// An error will be returned if the token looks like a Lighthouse token but doesn't correspond to a
/// valid public key.
pub fn parse_pubkey(secret: &str) -> Result<Option<PublicKey>, Error> {
let secret = if !secret.starts_with(SECRET_PREFIX) {
return Ok(None);
} else {
&secret[SECRET_PREFIX.len()..]
};
serde_utils::hex::decode(secret)
.map_err(|e| Error::InvalidSecret(format!("invalid hex: {:?}", e)))
.and_then(|bytes| {
if bytes.len() != PK_LEN {
return Err(Error::InvalidSecret(format!(
"expected {} bytes not {}",
PK_LEN,
bytes.len()
)));
}
let mut arr = [0; PK_LEN];
arr.copy_from_slice(&bytes);
PublicKey::parse_compressed(&arr)
.map_err(|e| Error::InvalidSecret(format!("invalid secp256k1 pubkey: {:?}", e)))
})
.map(Some)
}
impl ValidatorClientHttpClient { impl ValidatorClientHttpClient {
/// Create a new client pre-initialised with an API token. /// Create a new client pre-initialised with an API token.
pub fn new(server: SensitiveUrl, secret: String) -> Result<Self, Error> { pub fn new(server: SensitiveUrl, secret: String) -> Result<Self, Error> {
Ok(Self { Ok(Self {
client: reqwest::Client::new(), client: reqwest::Client::new(),
server, server,
server_pubkey: parse_pubkey(&secret)?, api_token: Some(secret.into()),
secret: Some(secret.into()),
authorization_header: AuthorizationHeader::Bearer, authorization_header: AuthorizationHeader::Bearer,
}) })
} }
@@ -96,8 +60,7 @@ impl ValidatorClientHttpClient {
Ok(Self { Ok(Self {
client: reqwest::Client::new(), client: reqwest::Client::new(),
server, server,
secret: None, api_token: None,
server_pubkey: None,
authorization_header: AuthorizationHeader::Omit, authorization_header: AuthorizationHeader::Omit,
}) })
} }
@@ -110,15 +73,14 @@ impl ValidatorClientHttpClient {
Ok(Self { Ok(Self {
client, client,
server, server,
server_pubkey: parse_pubkey(&secret)?, api_token: Some(secret.into()),
secret: Some(secret.into()),
authorization_header: AuthorizationHeader::Bearer, authorization_header: AuthorizationHeader::Bearer,
}) })
} }
/// Get a reference to this client's API token, if any. /// Get a reference to this client's API token, if any.
pub fn api_token(&self) -> Option<&ZeroizeString> { pub fn api_token(&self) -> Option<&ZeroizeString> {
self.secret.as_ref() self.api_token.as_ref()
} }
/// Read an API token from the specified `path`, stripping any trailing whitespace. /// Read an API token from the specified `path`, stripping any trailing whitespace.
@@ -128,19 +90,11 @@ impl ValidatorClientHttpClient {
} }
/// Add an authentication token to use when making requests. /// Add an authentication token to use when making requests.
///
/// If the token is Lighthouse-like, a pubkey derivation will be attempted. In the case
/// of failure the token will still be stored, and the client can continue to be used to
/// communicate with non-Lighthouse nodes.
pub fn add_auth_token(&mut self, token: ZeroizeString) -> Result<(), Error> { pub fn add_auth_token(&mut self, token: ZeroizeString) -> Result<(), Error> {
let pubkey_res = parse_pubkey(token.as_str()); self.api_token = Some(token);
self.secret = Some(token);
self.authorization_header = AuthorizationHeader::Bearer; self.authorization_header = AuthorizationHeader::Bearer;
pubkey_res.map(|opt_pubkey| { Ok(())
self.server_pubkey = opt_pubkey;
})
} }
/// Set to `false` to disable sending the `Authorization` header on requests. /// Set to `false` to disable sending the `Authorization` header on requests.
@@ -160,49 +114,17 @@ impl ValidatorClientHttpClient {
self.authorization_header = AuthorizationHeader::Basic; self.authorization_header = AuthorizationHeader::Basic;
} }
async fn signed_body(&self, response: Response) -> Result<Bytes, Error> {
let server_pubkey = self.server_pubkey.as_ref().ok_or(Error::NoServerPubkey)?;
let sig = response
.headers()
.get("Signature")
.ok_or(Error::MissingSignatureHeader)?
.to_str()
.map_err(|_| Error::InvalidSignatureHeader)?
.to_string();
let body = response.bytes().await.map_err(Error::from)?;
let message =
Message::parse_slice(digest(&SHA256, &body).as_ref()).expect("sha256 is 32 bytes");
serde_utils::hex::decode(&sig)
.ok()
.and_then(|bytes| {
let sig = Signature::parse_der(&bytes).ok()?;
Some(libsecp256k1::verify(&message, &sig, server_pubkey))
})
.filter(|is_valid| *is_valid)
.ok_or(Error::InvalidSignatureHeader)?;
Ok(body)
}
async fn signed_json<T: DeserializeOwned>(&self, response: Response) -> Result<T, Error> {
let body = self.signed_body(response).await?;
serde_json::from_slice(&body).map_err(Error::InvalidJson)
}
fn headers(&self) -> Result<HeaderMap, Error> { fn headers(&self) -> Result<HeaderMap, Error> {
let mut headers = HeaderMap::new(); let mut headers = HeaderMap::new();
if self.authorization_header == AuthorizationHeader::Basic if self.authorization_header == AuthorizationHeader::Basic
|| self.authorization_header == AuthorizationHeader::Bearer || self.authorization_header == AuthorizationHeader::Bearer
{ {
let secret = self.secret.as_ref().ok_or(Error::NoToken)?; let auth_header_token = self.api_token().ok_or(Error::NoToken)?;
let header_value = HeaderValue::from_str(&format!( let header_value = HeaderValue::from_str(&format!(
"{} {}", "{} {}",
self.authorization_header, self.authorization_header,
secret.as_str() auth_header_token.as_str()
)) ))
.map_err(|e| { .map_err(|e| {
Error::InvalidSecret(format!("secret is invalid as a header value: {}", e)) Error::InvalidSecret(format!("secret is invalid as a header value: {}", e))
@@ -240,7 +162,8 @@ impl ValidatorClientHttpClient {
async fn get<T: DeserializeOwned, U: IntoUrl>(&self, url: U) -> Result<T, Error> { async fn get<T: DeserializeOwned, U: IntoUrl>(&self, url: U) -> Result<T, Error> {
let response = self.get_response(url).await?; let response = self.get_response(url).await?;
self.signed_json(response).await let body = response.bytes().await.map_err(Error::from)?;
serde_json::from_slice(&body).map_err(Error::InvalidJson)
} }
async fn delete<U: IntoUrl>(&self, url: U) -> Result<(), Error> { async fn delete<U: IntoUrl>(&self, url: U) -> Result<(), Error> {
@@ -263,7 +186,14 @@ impl ValidatorClientHttpClient {
/// Perform a HTTP GET request, returning `None` on a 404 error. /// Perform a HTTP GET request, returning `None` on a 404 error.
async fn get_opt<T: DeserializeOwned, U: IntoUrl>(&self, url: U) -> Result<Option<T>, Error> { async fn get_opt<T: DeserializeOwned, U: IntoUrl>(&self, url: U) -> Result<Option<T>, Error> {
match self.get_response(url).await { match self.get_response(url).await {
Ok(resp) => self.signed_json(resp).await.map(Option::Some), Ok(resp) => {
let body = resp.bytes().await.map(Option::Some)?;
if let Some(body) = body {
serde_json::from_slice(&body).map_err(Error::InvalidJson)
} else {
Ok(None)
}
}
Err(err) => { Err(err) => {
if err.status() == Some(StatusCode::NOT_FOUND) { if err.status() == Some(StatusCode::NOT_FOUND) {
Ok(None) Ok(None)
@@ -297,7 +227,8 @@ impl ValidatorClientHttpClient {
body: &T, body: &T,
) -> Result<V, Error> { ) -> Result<V, Error> {
let response = self.post_with_raw_response(url, body).await?; let response = self.post_with_raw_response(url, body).await?;
self.signed_json(response).await let body = response.bytes().await.map_err(Error::from)?;
serde_json::from_slice(&body).map_err(Error::InvalidJson)
} }
async fn post_with_unsigned_response<T: Serialize, U: IntoUrl, V: DeserializeOwned>( async fn post_with_unsigned_response<T: Serialize, U: IntoUrl, V: DeserializeOwned>(
@@ -319,8 +250,7 @@ impl ValidatorClientHttpClient {
.send() .send()
.await .await
.map_err(Error::from)?; .map_err(Error::from)?;
let response = ok_or_error(response).await?; ok_or_error(response).await?;
self.signed_body(response).await?;
Ok(()) Ok(())
} }

View File

@@ -1,10 +1,3 @@
pub mod http_client; pub mod http_client;
pub mod std_types; pub mod std_types;
pub mod types; pub mod types;
/// The number of bytes in the secp256k1 public key used as the authorization token for the VC API.
pub const PK_LEN: usize = 33;
/// The prefix for the secp256k1 public key when it is used as the authorization token for the VC
/// API.
pub const SECRET_PREFIX: &str = "api-token-";

View File

@@ -2,7 +2,7 @@ use eth2::types::{ErrorMessage, Failure, IndexedErrorMessage};
use std::convert::Infallible; use std::convert::Infallible;
use std::error::Error; use std::error::Error;
use std::fmt; use std::fmt;
use warp::{http::StatusCode, reject::Reject}; use warp::{http::StatusCode, reject::Reject, reply::Response, Reply};
#[derive(Debug)] #[derive(Debug)]
pub struct ServerSentEventError(pub String); pub struct ServerSentEventError(pub String);
@@ -255,3 +255,21 @@ pub async fn handle_rejection(err: warp::Rejection) -> Result<impl warp::Reply,
Ok(warp::reply::with_status(json, code)) Ok(warp::reply::with_status(json, code))
} }
/// Convert a warp `Rejection` into a `Response`.
///
/// This function should *always* be used to convert rejections into responses. This prevents warp
/// from trying to backtrack in strange ways. See: https://github.com/sigp/lighthouse/issues/3404
pub async fn convert_rejection<T: Reply>(res: Result<T, warp::Rejection>) -> Response {
match res {
Ok(response) => response.into_response(),
Err(e) => match handle_rejection(e).await {
Ok(reply) => reply.into_response(),
Err(_) => warp::reply::with_status(
warp::reply::json(&"unhandled error"),
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response(),
},
}
}

View File

@@ -1,3 +1,4 @@
use crate::reject::convert_rejection;
use serde::Serialize; use serde::Serialize;
use warp::reply::{Reply, Response}; use warp::reply::{Reply, Response};
@@ -24,14 +25,16 @@ where
} }
/// A convenience wrapper around `blocking_task` for use with `warp` JSON responses. /// A convenience wrapper around `blocking_task` for use with `warp` JSON responses.
pub async fn blocking_json_task<F, T>(func: F) -> Result<Response, warp::Rejection> pub async fn blocking_json_task<F, T>(func: F) -> Response
where where
F: FnOnce() -> Result<T, warp::Rejection> + Send + 'static, F: FnOnce() -> Result<T, warp::Rejection> + Send + 'static,
T: Serialize + Send + 'static, T: Serialize + Send + 'static,
{ {
blocking_response_task(|| { let result = blocking_response_task(|| {
let response = func()?; let response = func()?;
Ok(warp::reply::json(&response)) Ok(warp::reply::json(&response))
}) })
.await .await;
convert_rejection(result).await
} }

View File

@@ -1,85 +1,53 @@
use eth2::lighthouse_vc::{PK_LEN, SECRET_PREFIX as PK_PREFIX};
use filesystem::create_with_600_perms; use filesystem::create_with_600_perms;
use libsecp256k1::{Message, PublicKey, SecretKey}; use rand::distributions::Alphanumeric;
use rand::thread_rng; use rand::{thread_rng, Rng};
use ring::digest::{digest, SHA256};
use std::fs; use std::fs;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use warp::Filter; use warp::Filter;
/// The name of the file which stores the secret key. /// The name of the file which stores the API token.
///
/// It is purposefully opaque to prevent users confusing it with the "secret" that they need to
/// share with API consumers (which is actually the public key).
pub const SK_FILENAME: &str = ".secp-sk";
/// Length of the raw secret key, in bytes.
pub const SK_LEN: usize = 32;
/// The name of the file which stores the public key.
///
/// For users, this public key is a "secret" that can be shared with API consumers to provide them
/// access to the API. We avoid calling it a "public" key to users, since they should not post this
/// value in a public forum.
pub const PK_FILENAME: &str = "api-token.txt"; pub const PK_FILENAME: &str = "api-token.txt";
/// Contains a `secp256k1` keypair that is saved-to/loaded-from disk on instantiation. The keypair pub const PK_LEN: usize = 33;
/// is used for authorization/authentication for requests/responses on the HTTP API.
/// Contains a randomly generated string which is used for authorization of requests to the HTTP API.
/// ///
/// Provides convenience functions to ultimately provide: /// Provides convenience functions to ultimately provide:
/// ///
/// - A signature across outgoing HTTP responses, applied to the `Signature` header.
/// - Verification of proof-of-knowledge of the public key in `self` for incoming HTTP requests, /// - Verification of proof-of-knowledge of the public key in `self` for incoming HTTP requests,
/// via the `Authorization` header. /// via the `Authorization` header.
/// ///
/// The aforementioned scheme was first defined here: /// The aforementioned scheme was first defined here:
/// ///
/// https://github.com/sigp/lighthouse/issues/1269#issuecomment-649879855 /// https://github.com/sigp/lighthouse/issues/1269#issuecomment-649879855
///
/// This scheme has since been tweaked to remove VC response signing and secp256k1 key generation.
/// https://github.com/sigp/lighthouse/issues/5423
pub struct ApiSecret { pub struct ApiSecret {
pk: PublicKey, pk: String,
sk: SecretKey,
pk_path: PathBuf, pk_path: PathBuf,
} }
impl ApiSecret { impl ApiSecret {
/// If both the secret and public keys are already on-disk, parse them and ensure they're both /// If the public key is already on-disk, use it.
/// from the same keypair.
/// ///
/// The provided `dir` is a directory containing two files, `SK_FILENAME` and `PK_FILENAME`. /// The provided `dir` is a directory containing `PK_FILENAME`.
/// ///
/// If either the secret or public key files are missing on disk, create a new keypair and /// If the public key file is missing on disk, create a new key and
/// write it to disk (over-writing any existing files). /// write it to disk (over-writing any existing files).
pub fn create_or_open<P: AsRef<Path>>(dir: P) -> Result<Self, String> { pub fn create_or_open<P: AsRef<Path>>(dir: P) -> Result<Self, String> {
let sk_path = dir.as_ref().join(SK_FILENAME);
let pk_path = dir.as_ref().join(PK_FILENAME); let pk_path = dir.as_ref().join(PK_FILENAME);
if !(sk_path.exists() && pk_path.exists()) { if !pk_path.exists() {
let sk = SecretKey::random(&mut thread_rng()); let length = PK_LEN;
let pk = PublicKey::from_secret_key(&sk); let pk: String = thread_rng()
.sample_iter(&Alphanumeric)
// Create and write the secret key to file with appropriate permissions .take(length)
create_with_600_perms( .map(char::from)
&sk_path, .collect();
serde_utils::hex::encode(sk.serialize()).as_bytes(),
)
.map_err(|e| {
format!(
"Unable to create file with permissions for {:?}: {:?}",
sk_path, e
)
})?;
// Create and write the public key to file with appropriate permissions // Create and write the public key to file with appropriate permissions
create_with_600_perms( create_with_600_perms(&pk_path, pk.to_string().as_bytes()).map_err(|e| {
&pk_path,
format!(
"{}{}",
PK_PREFIX,
serde_utils::hex::encode(&pk.serialize_compressed()[..])
)
.as_bytes(),
)
.map_err(|e| {
format!( format!(
"Unable to create file with permissions for {:?}: {:?}", "Unable to create file with permissions for {:?}: {:?}",
pk_path, e pk_path, e
@@ -87,78 +55,18 @@ impl ApiSecret {
})?; })?;
} }
let sk = fs::read(&sk_path)
.map_err(|e| format!("cannot read {}: {}", SK_FILENAME, e))
.and_then(|bytes| {
serde_utils::hex::decode(&String::from_utf8_lossy(&bytes))
.map_err(|_| format!("{} should be 0x-prefixed hex", PK_FILENAME))
})
.and_then(|bytes| {
if bytes.len() == SK_LEN {
let mut array = [0; SK_LEN];
array.copy_from_slice(&bytes);
SecretKey::parse(&array).map_err(|e| format!("invalid {}: {}", SK_FILENAME, e))
} else {
Err(format!(
"{} expected {} bytes not {}",
SK_FILENAME,
SK_LEN,
bytes.len()
))
}
})?;
let pk = fs::read(&pk_path) let pk = fs::read(&pk_path)
.map_err(|e| format!("cannot read {}: {}", PK_FILENAME, e)) .map_err(|e| format!("cannot read {}: {}", PK_FILENAME, e))?
.and_then(|bytes| { .iter()
let hex = .map(|&c| char::from(c))
String::from_utf8(bytes).map_err(|_| format!("{} is not utf8", SK_FILENAME))?; .collect();
if let Some(stripped) = hex.strip_prefix(PK_PREFIX) {
serde_utils::hex::decode(stripped)
.map_err(|_| format!("{} should be 0x-prefixed hex", SK_FILENAME))
} else {
Err(format!("unable to parse {}", SK_FILENAME))
}
})
.and_then(|bytes| {
if bytes.len() == PK_LEN {
let mut array = [0; PK_LEN];
array.copy_from_slice(&bytes);
PublicKey::parse_compressed(&array)
.map_err(|e| format!("invalid {}: {}", PK_FILENAME, e))
} else {
Err(format!(
"{} expected {} bytes not {}",
PK_FILENAME,
PK_LEN,
bytes.len()
))
}
})?;
// Ensure that the keys loaded from disk are indeed a pair. Ok(Self { pk, pk_path })
if PublicKey::from_secret_key(&sk) != pk {
fs::remove_file(&sk_path)
.map_err(|e| format!("unable to remove {}: {}", SK_FILENAME, e))?;
fs::remove_file(&pk_path)
.map_err(|e| format!("unable to remove {}: {}", PK_FILENAME, e))?;
return Err(format!(
"{:?} does not match {:?} and the files have been deleted. Please try again.",
sk_path, pk_path
));
}
Ok(Self { pk, sk, pk_path })
}
/// Returns the public key of `self` as a 0x-prefixed hex string.
fn pubkey_string(&self) -> String {
serde_utils::hex::encode(&self.pk.serialize_compressed()[..])
} }
/// Returns the API token. /// Returns the API token.
pub fn api_token(&self) -> String { pub fn api_token(&self) -> String {
format!("{}{}", PK_PREFIX, self.pubkey_string()) self.pk.clone()
} }
/// Returns the path for the API token file /// Returns the path for the API token file
@@ -196,16 +104,4 @@ impl ApiSecret {
.untuple_one() .untuple_one()
.boxed() .boxed()
} }
/// Returns a closure which produces a signature over some bytes using the secret key in
/// `self`. The signature is a 32-byte hash formatted as a 0x-prefixed string.
pub fn signer(&self) -> impl Fn(&[u8]) -> String + Clone {
let sk = self.sk;
move |input: &[u8]| -> String {
let message =
Message::parse_slice(digest(&SHA256, input).as_ref()).expect("sha256 is 32 bytes");
let (signature, _) = libsecp256k1::sign(&message, &sk);
serde_utils::hex::encode(signature.serialize_der().as_ref())
}
}
} }

View File

@@ -45,15 +45,8 @@ use task_executor::TaskExecutor;
use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use types::{ChainSpec, ConfigAndPreset, EthSpec}; use types::{ChainSpec, ConfigAndPreset, EthSpec};
use validator_dir::Builder as ValidatorDirBuilder; use validator_dir::Builder as ValidatorDirBuilder;
use warp::{ use warp::{sse::Event, Filter};
http::{ use warp_utils::task::blocking_json_task;
header::{HeaderValue, CONTENT_TYPE},
response::Response,
StatusCode,
},
sse::Event,
Filter,
};
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
@@ -176,9 +169,6 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
} }
}; };
let signer = ctx.api_secret.signer();
let signer = warp::any().map(move || signer.clone());
let inner_validator_store = ctx.validator_store.clone(); let inner_validator_store = ctx.validator_store.clone();
let validator_store_filter = warp::any() let validator_store_filter = warp::any()
.map(move || inner_validator_store.clone()) .map(move || inner_validator_store.clone())
@@ -270,9 +260,8 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
let get_node_version = warp::path("lighthouse") let get_node_version = warp::path("lighthouse")
.and(warp::path("version")) .and(warp::path("version"))
.and(warp::path::end()) .and(warp::path::end())
.and(signer.clone()) .then(|| {
.and_then(|signer| { blocking_json_task(move || {
blocking_signed_json_task(signer, move || {
Ok(api_types::GenericResponse::from(api_types::VersionData { Ok(api_types::GenericResponse::from(api_types::VersionData {
version: version_with_platform(), version: version_with_platform(),
})) }))
@@ -283,9 +272,8 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
let get_lighthouse_health = warp::path("lighthouse") let get_lighthouse_health = warp::path("lighthouse")
.and(warp::path("health")) .and(warp::path("health"))
.and(warp::path::end()) .and(warp::path::end())
.and(signer.clone()) .then(|| {
.and_then(|signer| { blocking_json_task(move || {
blocking_signed_json_task(signer, move || {
eth2::lighthouse::Health::observe() eth2::lighthouse::Health::observe()
.map(api_types::GenericResponse::from) .map(api_types::GenericResponse::from)
.map_err(warp_utils::reject::custom_bad_request) .map_err(warp_utils::reject::custom_bad_request)
@@ -297,9 +285,8 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path("spec")) .and(warp::path("spec"))
.and(warp::path::end()) .and(warp::path::end())
.and(spec_filter.clone()) .and(spec_filter.clone())
.and(signer.clone()) .then(|spec: Arc<_>| {
.and_then(|spec: Arc<_>, signer| { blocking_json_task(move || {
blocking_signed_json_task(signer, move || {
let config = ConfigAndPreset::from_chain_spec::<E>(&spec, None); let config = ConfigAndPreset::from_chain_spec::<E>(&spec, None);
Ok(api_types::GenericResponse::from(config)) Ok(api_types::GenericResponse::from(config))
}) })
@@ -310,9 +297,8 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path("validators")) .and(warp::path("validators"))
.and(warp::path::end()) .and(warp::path::end())
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(signer.clone()) .then(|validator_store: Arc<ValidatorStore<T, E>>| {
.and_then(|validator_store: Arc<ValidatorStore<T, E>>, signer| { blocking_json_task(move || {
blocking_signed_json_task(signer, move || {
let validators = validator_store let validators = validator_store
.initialized_validators() .initialized_validators()
.read() .read()
@@ -335,10 +321,9 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::param::<PublicKey>()) .and(warp::path::param::<PublicKey>())
.and(warp::path::end()) .and(warp::path::end())
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(signer.clone()) .then(
.and_then( |validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>| {
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| { blocking_json_task(move || {
blocking_signed_json_task(signer, move || {
let validator = validator_store let validator = validator_store
.initialized_validators() .initialized_validators()
.read() .read()
@@ -370,9 +355,8 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(system_info_filter) .and(system_info_filter)
.and(app_start_filter) .and(app_start_filter)
.and(validator_dir_filter.clone()) .and(validator_dir_filter.clone())
.and(signer.clone()) .then(|sysinfo, app_start: std::time::Instant, val_dir| {
.and_then(|sysinfo, app_start: std::time::Instant, val_dir, signer| { blocking_json_task(move || {
blocking_signed_json_task(signer, move || {
let app_uptime = app_start.elapsed().as_secs(); let app_uptime = app_start.elapsed().as_secs();
Ok(api_types::GenericResponse::from(observe_system_health_vc( Ok(api_types::GenericResponse::from(observe_system_health_vc(
sysinfo, val_dir, app_uptime, sysinfo, val_dir, app_uptime,
@@ -387,15 +371,13 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(graffiti_file_filter.clone()) .and(graffiti_file_filter.clone())
.and(graffiti_flag_filter) .and(graffiti_flag_filter)
.and(signer.clone())
.and(log_filter.clone()) .and(log_filter.clone())
.and_then( .then(
|validator_store: Arc<ValidatorStore<T, E>>, |validator_store: Arc<ValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>, graffiti_file: Option<GraffitiFile>,
graffiti_flag: Option<Graffiti>, graffiti_flag: Option<Graffiti>,
signer,
log| { log| {
blocking_signed_json_task(signer, move || { blocking_json_task(move || {
let mut result = HashMap::new(); let mut result = HashMap::new();
for (key, graffiti_definition) in validator_store for (key, graffiti_definition) in validator_store
.initialized_validators() .initialized_validators()
@@ -425,17 +407,15 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(secrets_dir_filter.clone()) .and(secrets_dir_filter.clone())
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(spec_filter.clone()) .and(spec_filter.clone())
.and(signer.clone())
.and(task_executor_filter.clone()) .and(task_executor_filter.clone())
.and_then( .then(
move |body: Vec<api_types::ValidatorRequest>, move |body: Vec<api_types::ValidatorRequest>,
validator_dir: PathBuf, validator_dir: PathBuf,
secrets_dir: PathBuf, secrets_dir: PathBuf,
validator_store: Arc<ValidatorStore<T, E>>, validator_store: Arc<ValidatorStore<T, E>>,
spec: Arc<ChainSpec>, spec: Arc<ChainSpec>,
signer,
task_executor: TaskExecutor| { task_executor: TaskExecutor| {
blocking_signed_json_task(signer, move || { blocking_json_task(move || {
let secrets_dir = store_passwords_in_secrets_dir.then_some(secrets_dir); let secrets_dir = store_passwords_in_secrets_dir.then_some(secrets_dir);
if let Some(handle) = task_executor.handle() { if let Some(handle) = task_executor.handle() {
let (validators, mnemonic) = let (validators, mnemonic) =
@@ -472,17 +452,15 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(secrets_dir_filter.clone()) .and(secrets_dir_filter.clone())
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(spec_filter) .and(spec_filter)
.and(signer.clone())
.and(task_executor_filter.clone()) .and(task_executor_filter.clone())
.and_then( .then(
move |body: api_types::CreateValidatorsMnemonicRequest, move |body: api_types::CreateValidatorsMnemonicRequest,
validator_dir: PathBuf, validator_dir: PathBuf,
secrets_dir: PathBuf, secrets_dir: PathBuf,
validator_store: Arc<ValidatorStore<T, E>>, validator_store: Arc<ValidatorStore<T, E>>,
spec: Arc<ChainSpec>, spec: Arc<ChainSpec>,
signer,
task_executor: TaskExecutor| { task_executor: TaskExecutor| {
blocking_signed_json_task(signer, move || { blocking_json_task(move || {
let secrets_dir = store_passwords_in_secrets_dir.then_some(secrets_dir); let secrets_dir = store_passwords_in_secrets_dir.then_some(secrets_dir);
if let Some(handle) = task_executor.handle() { if let Some(handle) = task_executor.handle() {
let mnemonic = let mnemonic =
@@ -521,16 +499,14 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_dir_filter.clone()) .and(validator_dir_filter.clone())
.and(secrets_dir_filter.clone()) .and(secrets_dir_filter.clone())
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(signer.clone())
.and(task_executor_filter.clone()) .and(task_executor_filter.clone())
.and_then( .then(
move |body: api_types::KeystoreValidatorsPostRequest, move |body: api_types::KeystoreValidatorsPostRequest,
validator_dir: PathBuf, validator_dir: PathBuf,
secrets_dir: PathBuf, secrets_dir: PathBuf,
validator_store: Arc<ValidatorStore<T, E>>, validator_store: Arc<ValidatorStore<T, E>>,
signer,
task_executor: TaskExecutor| { task_executor: TaskExecutor| {
blocking_signed_json_task(signer, move || { blocking_json_task(move || {
// Check to ensure the password is correct. // Check to ensure the password is correct.
let keypair = body let keypair = body
.keystore .keystore
@@ -611,14 +587,12 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end()) .and(warp::path::end())
.and(warp::body::json()) .and(warp::body::json())
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(signer.clone())
.and(task_executor_filter.clone()) .and(task_executor_filter.clone())
.and_then( .then(
|body: Vec<api_types::Web3SignerValidatorRequest>, |body: Vec<api_types::Web3SignerValidatorRequest>,
validator_store: Arc<ValidatorStore<T, E>>, validator_store: Arc<ValidatorStore<T, E>>,
signer,
task_executor: TaskExecutor| { task_executor: TaskExecutor| {
blocking_signed_json_task(signer, move || { blocking_json_task(move || {
if let Some(handle) = task_executor.handle() { if let Some(handle) = task_executor.handle() {
let web3signers: Vec<ValidatorDefinition> = body let web3signers: Vec<ValidatorDefinition> = body
.into_iter() .into_iter()
@@ -666,16 +640,14 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::body::json()) .and(warp::body::json())
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(graffiti_file_filter.clone()) .and(graffiti_file_filter.clone())
.and(signer.clone())
.and(task_executor_filter.clone()) .and(task_executor_filter.clone())
.and_then( .then(
|validator_pubkey: PublicKey, |validator_pubkey: PublicKey,
body: api_types::ValidatorPatchRequest, body: api_types::ValidatorPatchRequest,
validator_store: Arc<ValidatorStore<T, E>>, validator_store: Arc<ValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>, graffiti_file: Option<GraffitiFile>,
signer,
task_executor: TaskExecutor| { task_executor: TaskExecutor| {
blocking_signed_json_task(signer, move || { blocking_json_task(move || {
if body.graffiti.is_some() && graffiti_file.is_some() { if body.graffiti.is_some() && graffiti_file.is_some() {
return Err(warp_utils::reject::custom_bad_request( return Err(warp_utils::reject::custom_bad_request(
"Unable to update graffiti as the \"--graffiti-file\" flag is set" "Unable to update graffiti as the \"--graffiti-file\" flag is set"
@@ -784,10 +756,9 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
// GET /lighthouse/auth // GET /lighthouse/auth
let get_auth = warp::path("lighthouse").and(warp::path("auth").and(warp::path::end())); let get_auth = warp::path("lighthouse").and(warp::path("auth").and(warp::path::end()));
let get_auth = get_auth let get_auth = get_auth
.and(signer.clone())
.and(api_token_path_filter) .and(api_token_path_filter)
.and_then(|signer, token_path: PathBuf| { .then(move |token_path: PathBuf| {
blocking_signed_json_task(signer, move || { blocking_json_task(move || {
Ok(AuthResponse { Ok(AuthResponse {
token_path: token_path.display().to_string(), token_path: token_path.display().to_string(),
}) })
@@ -799,13 +770,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path("keystores")) .and(warp::path("keystores"))
.and(warp::path::end()) .and(warp::path::end())
.and(warp::body::json()) .and(warp::body::json())
.and(signer.clone())
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(task_executor_filter.clone()) .and(task_executor_filter.clone())
.and(log_filter.clone()) .and(log_filter.clone())
.and_then( .then(move |request, validator_store, task_executor, log| {
move |request, signer, validator_store, task_executor, log| { blocking_json_task(move || {
blocking_signed_json_task(signer, move || {
if allow_keystore_export { if allow_keystore_export {
keystores::export(request, validator_store, task_executor, log) keystores::export(request, validator_store, task_executor, log)
} else { } else {
@@ -814,8 +783,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
)) ))
} }
}) })
}, });
);
// Standard key-manager endpoints. // Standard key-manager endpoints.
let eth_v1 = warp::path("eth").and(warp::path("v1")); let eth_v1 = warp::path("eth").and(warp::path("v1"));
@@ -829,10 +797,9 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path("feerecipient")) .and(warp::path("feerecipient"))
.and(warp::path::end()) .and(warp::path::end())
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(signer.clone()) .then(
.and_then( |validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>| {
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| { blocking_json_task(move || {
blocking_signed_json_task(signer, move || {
if validator_store if validator_store
.initialized_validators() .initialized_validators()
.read() .read()
@@ -869,13 +836,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::body::json()) .and(warp::body::json())
.and(warp::path::end()) .and(warp::path::end())
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(signer.clone()) .then(
.and_then(
|validator_pubkey: PublicKey, |validator_pubkey: PublicKey,
request: api_types::UpdateFeeRecipientRequest, request: api_types::UpdateFeeRecipientRequest,
validator_store: Arc<ValidatorStore<T, E>>, validator_store: Arc<ValidatorStore<T, E>>| {
signer| { blocking_json_task(move || {
blocking_signed_json_task(signer, move || {
if validator_store if validator_store
.initialized_validators() .initialized_validators()
.read() .read()
@@ -909,10 +874,9 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path("feerecipient")) .and(warp::path("feerecipient"))
.and(warp::path::end()) .and(warp::path::end())
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(signer.clone()) .then(
.and_then( |validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>| {
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| { blocking_json_task(move || {
blocking_signed_json_task(signer, move || {
if validator_store if validator_store
.initialized_validators() .initialized_validators()
.read() .read()
@@ -946,10 +910,9 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path("gas_limit")) .and(warp::path("gas_limit"))
.and(warp::path::end()) .and(warp::path::end())
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(signer.clone()) .then(
.and_then( |validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>| {
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| { blocking_json_task(move || {
blocking_signed_json_task(signer, move || {
if validator_store if validator_store
.initialized_validators() .initialized_validators()
.read() .read()
@@ -978,13 +941,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::body::json()) .and(warp::body::json())
.and(warp::path::end()) .and(warp::path::end())
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(signer.clone()) .then(
.and_then(
|validator_pubkey: PublicKey, |validator_pubkey: PublicKey,
request: api_types::UpdateGasLimitRequest, request: api_types::UpdateGasLimitRequest,
validator_store: Arc<ValidatorStore<T, E>>, validator_store: Arc<ValidatorStore<T, E>>| {
signer| { blocking_json_task(move || {
blocking_signed_json_task(signer, move || {
if validator_store if validator_store
.initialized_validators() .initialized_validators()
.read() .read()
@@ -1018,10 +979,9 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path("gas_limit")) .and(warp::path("gas_limit"))
.and(warp::path::end()) .and(warp::path::end())
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(signer.clone()) .then(
.and_then( |validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>| {
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| { blocking_json_task(move || {
blocking_signed_json_task(signer, move || {
if validator_store if validator_store
.initialized_validators() .initialized_validators()
.read() .read()
@@ -1058,17 +1018,15 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(slot_clock_filter) .and(slot_clock_filter)
.and(log_filter.clone()) .and(log_filter.clone())
.and(signer.clone())
.and(task_executor_filter.clone()) .and(task_executor_filter.clone())
.and_then( .then(
|pubkey: PublicKey, |pubkey: PublicKey,
query: api_types::VoluntaryExitQuery, query: api_types::VoluntaryExitQuery,
validator_store: Arc<ValidatorStore<T, E>>, validator_store: Arc<ValidatorStore<T, E>>,
slot_clock: T, slot_clock: T,
log, log,
signer,
task_executor: TaskExecutor| { task_executor: TaskExecutor| {
blocking_signed_json_task(signer, move || { blocking_json_task(move || {
if let Some(handle) = task_executor.handle() { if let Some(handle) = task_executor.handle() {
let signed_voluntary_exit = let signed_voluntary_exit =
handle.block_on(create_signed_voluntary_exit( handle.block_on(create_signed_voluntary_exit(
@@ -1096,13 +1054,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end()) .and(warp::path::end())
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(graffiti_flag_filter) .and(graffiti_flag_filter)
.and(signer.clone()) .then(
.and_then(
|pubkey: PublicKey, |pubkey: PublicKey,
validator_store: Arc<ValidatorStore<T, E>>, validator_store: Arc<ValidatorStore<T, E>>,
graffiti_flag: Option<Graffiti>, graffiti_flag: Option<Graffiti>| {
signer| { blocking_json_task(move || {
blocking_signed_json_task(signer, move || {
let graffiti = get_graffiti(pubkey.clone(), validator_store, graffiti_flag)?; let graffiti = get_graffiti(pubkey.clone(), validator_store, graffiti_flag)?;
Ok(GenericResponse::from(GetGraffitiResponse { Ok(GenericResponse::from(GetGraffitiResponse {
pubkey: pubkey.into(), pubkey: pubkey.into(),
@@ -1121,14 +1077,12 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end()) .and(warp::path::end())
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(graffiti_file_filter.clone()) .and(graffiti_file_filter.clone())
.and(signer.clone()) .then(
.and_then(
|pubkey: PublicKey, |pubkey: PublicKey,
query: SetGraffitiRequest, query: SetGraffitiRequest,
validator_store: Arc<ValidatorStore<T, E>>, validator_store: Arc<ValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>, graffiti_file: Option<GraffitiFile>| {
signer| { blocking_json_task(move || {
blocking_signed_json_task(signer, move || {
if graffiti_file.is_some() { if graffiti_file.is_some() {
return Err(warp_utils::reject::invalid_auth( return Err(warp_utils::reject::invalid_auth(
"Unable to update graffiti as the \"--graffiti-file\" flag is set" "Unable to update graffiti as the \"--graffiti-file\" flag is set"
@@ -1149,13 +1103,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end()) .and(warp::path::end())
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(graffiti_file_filter.clone()) .and(graffiti_file_filter.clone())
.and(signer.clone()) .then(
.and_then(
|pubkey: PublicKey, |pubkey: PublicKey,
validator_store: Arc<ValidatorStore<T, E>>, validator_store: Arc<ValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>, graffiti_file: Option<GraffitiFile>| {
signer| { blocking_json_task(move || {
blocking_signed_json_task(signer, move || {
if graffiti_file.is_some() { if graffiti_file.is_some() {
return Err(warp_utils::reject::invalid_auth( return Err(warp_utils::reject::invalid_auth(
"Unable to delete graffiti as the \"--graffiti-file\" flag is set" "Unable to delete graffiti as the \"--graffiti-file\" flag is set"
@@ -1169,32 +1121,24 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.map(|reply| warp::reply::with_status(reply, warp::http::StatusCode::NO_CONTENT)); .map(|reply| warp::reply::with_status(reply, warp::http::StatusCode::NO_CONTENT));
// GET /eth/v1/keystores // GET /eth/v1/keystores
let get_std_keystores = std_keystores let get_std_keystores = std_keystores.and(validator_store_filter.clone()).then(
.and(signer.clone()) |validator_store: Arc<ValidatorStore<T, E>>| {
.and(validator_store_filter.clone()) blocking_json_task(move || Ok(keystores::list(validator_store)))
.and_then(|signer, validator_store: Arc<ValidatorStore<T, E>>| { },
blocking_signed_json_task(signer, move || Ok(keystores::list(validator_store))) );
});
// POST /eth/v1/keystores // POST /eth/v1/keystores
let post_std_keystores = std_keystores let post_std_keystores = std_keystores
.and(warp::body::json()) .and(warp::body::json())
.and(signer.clone())
.and(validator_dir_filter) .and(validator_dir_filter)
.and(secrets_dir_filter) .and(secrets_dir_filter)
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(task_executor_filter.clone()) .and(task_executor_filter.clone())
.and(log_filter.clone()) .and(log_filter.clone())
.and_then( .then(
move |request, move |request, validator_dir, secrets_dir, validator_store, task_executor, log| {
signer,
validator_dir,
secrets_dir,
validator_store,
task_executor,
log| {
let secrets_dir = store_passwords_in_secrets_dir.then_some(secrets_dir); let secrets_dir = store_passwords_in_secrets_dir.then_some(secrets_dir);
blocking_signed_json_task(signer, move || { blocking_json_task(move || {
keystores::import( keystores::import(
request, request,
validator_dir, validator_dir,
@@ -1210,33 +1154,30 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
// DELETE /eth/v1/keystores // DELETE /eth/v1/keystores
let delete_std_keystores = std_keystores let delete_std_keystores = std_keystores
.and(warp::body::json()) .and(warp::body::json())
.and(signer.clone())
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(task_executor_filter.clone()) .and(task_executor_filter.clone())
.and(log_filter.clone()) .and(log_filter.clone())
.and_then(|request, signer, validator_store, task_executor, log| { .then(|request, validator_store, task_executor, log| {
blocking_signed_json_task(signer, move || { blocking_json_task(move || {
keystores::delete(request, validator_store, task_executor, log) keystores::delete(request, validator_store, task_executor, log)
}) })
}); });
// GET /eth/v1/remotekeys // GET /eth/v1/remotekeys
let get_std_remotekeys = std_remotekeys let get_std_remotekeys = std_remotekeys.and(validator_store_filter.clone()).then(
.and(signer.clone()) |validator_store: Arc<ValidatorStore<T, E>>| {
.and(validator_store_filter.clone()) blocking_json_task(move || Ok(remotekeys::list(validator_store)))
.and_then(|signer, validator_store: Arc<ValidatorStore<T, E>>| { },
blocking_signed_json_task(signer, move || Ok(remotekeys::list(validator_store))) );
});
// POST /eth/v1/remotekeys // POST /eth/v1/remotekeys
let post_std_remotekeys = std_remotekeys let post_std_remotekeys = std_remotekeys
.and(warp::body::json()) .and(warp::body::json())
.and(signer.clone())
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(task_executor_filter.clone()) .and(task_executor_filter.clone())
.and(log_filter.clone()) .and(log_filter.clone())
.and_then(|request, signer, validator_store, task_executor, log| { .then(|request, validator_store, task_executor, log| {
blocking_signed_json_task(signer, move || { blocking_json_task(move || {
remotekeys::import(request, validator_store, task_executor, log) remotekeys::import(request, validator_store, task_executor, log)
}) })
}); });
@@ -1244,12 +1185,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
// DELETE /eth/v1/remotekeys // DELETE /eth/v1/remotekeys
let delete_std_remotekeys = std_remotekeys let delete_std_remotekeys = std_remotekeys
.and(warp::body::json()) .and(warp::body::json())
.and(signer)
.and(validator_store_filter) .and(validator_store_filter)
.and(task_executor_filter) .and(task_executor_filter)
.and(log_filter.clone()) .and(log_filter.clone())
.and_then(|request, signer, validator_store, task_executor, log| { .then(|request, validator_store, task_executor, log| {
blocking_signed_json_task(signer, move || { blocking_json_task(move || {
remotekeys::delete(request, validator_store, task_executor, log) remotekeys::delete(request, validator_store, task_executor, log)
}) })
}); });
@@ -1369,42 +1309,3 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
Ok((listening_socket, server)) Ok((listening_socket, server))
} }
/// Executes `func` in blocking tokio task (i.e., where long-running tasks are permitted).
/// JSON-encodes the return value of `func`, using the `signer` function to produce a signature of
/// those bytes.
pub async fn blocking_signed_json_task<S, F, T>(
signer: S,
func: F,
) -> Result<impl warp::Reply, warp::Rejection>
where
S: Fn(&[u8]) -> String,
F: FnOnce() -> Result<T, warp::Rejection> + Send + 'static,
T: Serialize + Send + 'static,
{
warp_utils::task::blocking_task(func)
.await
.map(|func_output| {
let mut response = match serde_json::to_vec(&func_output) {
Ok(body) => {
let mut res = Response::new(body);
res.headers_mut()
.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
res
}
Err(_) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(vec![])
.expect("can produce simple response from static values"),
};
let body: &Vec<u8> = response.body();
let signature = signer(body);
let header_value =
HeaderValue::from_str(&signature).expect("hash can be encoded as header");
response.headers_mut().append("Signature", header_value);
response
})
}