Merge branch 'unstable' into vc-fallback

This commit is contained in:
Mac L
2024-07-16 21:39:40 +10:00
116 changed files with 3449 additions and 3100 deletions

View File

@@ -286,17 +286,21 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
// Then download, sign and publish a `SignedAggregateAndProof` for each
// validator that is elected to aggregate for this `slot` and
// `committee_index`.
self.produce_and_publish_aggregates(&attestation_data, &validator_duties)
.await
.map_err(move |e| {
crit!(
log,
"Error during attestation routine";
"error" => format!("{:?}", e),
"committee_index" => committee_index,
"slot" => slot.as_u64(),
)
})?;
self.produce_and_publish_aggregates(
&attestation_data,
committee_index,
&validator_duties,
)
.await
.map_err(move |e| {
crit!(
log,
"Error during attestation routine";
"error" => format!("{:?}", e),
"committee_index" => committee_index,
"slot" => slot.as_u64(),
)
})?;
}
Ok(())
@@ -440,6 +444,11 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
warn!(log, "No attestations were published");
return Ok(None);
}
let fork_name = self
.context
.eth2_config
.spec
.fork_name_at_slot::<E>(attestation_data.slot);
// Post the attestations to the BN.
match self
@@ -449,9 +458,15 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::ATTESTATIONS_HTTP_POST],
);
beacon_node
.post_beacon_pool_attestations(attestations)
.await
if fork_name.electra_enabled() {
beacon_node
.post_beacon_pool_attestations_v2(attestations, fork_name)
.await
} else {
beacon_node
.post_beacon_pool_attestations_v1(attestations)
.await
}
})
.await
{
@@ -494,6 +509,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
async fn produce_and_publish_aggregates(
&self,
attestation_data: &AttestationData,
committee_index: CommitteeIndex,
validator_duties: &[DutyAndProof],
) -> Result<(), String> {
let log = self.context.log();
@@ -506,6 +522,12 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
return Ok(());
}
let fork_name = self
.context
.eth2_config
.spec
.fork_name_at_slot::<E>(attestation_data.slot);
let aggregated_attestation = &self
.beacon_nodes
.first_success(|beacon_node| async move {
@@ -513,15 +535,32 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::AGGREGATES_HTTP_GET],
);
beacon_node
.get_validator_aggregate_attestation(
attestation_data.slot,
attestation_data.tree_hash_root(),
)
.await
.map_err(|e| format!("Failed to produce an aggregate attestation: {:?}", e))?
.ok_or_else(|| format!("No aggregate available for {:?}", attestation_data))
.map(|result| result.data)
if fork_name.electra_enabled() {
beacon_node
.get_validator_aggregate_attestation_v2(
attestation_data.slot,
attestation_data.tree_hash_root(),
committee_index,
)
.await
.map_err(|e| {
format!("Failed to produce an aggregate attestation: {:?}", e)
})?
.ok_or_else(|| format!("No aggregate available for {:?}", attestation_data))
.map(|result| result.data)
} else {
beacon_node
.get_validator_aggregate_attestation_v1(
attestation_data.slot,
attestation_data.tree_hash_root(),
)
.await
.map_err(|e| {
format!("Failed to produce an aggregate attestation: {:?}", e)
})?
.ok_or_else(|| format!("No aggregate available for {:?}", attestation_data))
.map(|result| result.data)
}
})
.await
.map_err(|e| e.to_string())?;
@@ -585,9 +624,20 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::AGGREGATES_HTTP_POST],
);
beacon_node
.post_validator_aggregate_and_proof(signed_aggregate_and_proofs_slice)
.await
if fork_name.electra_enabled() {
beacon_node
.post_validator_aggregate_and_proof_v2(
signed_aggregate_and_proofs_slice,
fork_name,
)
.await
} else {
beacon_node
.post_validator_aggregate_and_proof_v1(
signed_aggregate_and_proofs_slice,
)
.await
}
})
.await
{

View File

@@ -1,85 +1,53 @@
use eth2::lighthouse_vc::{PK_LEN, SECRET_PREFIX as PK_PREFIX};
use filesystem::create_with_600_perms;
use libsecp256k1::{Message, PublicKey, SecretKey};
use rand::thread_rng;
use ring::digest::{digest, SHA256};
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use std::fs;
use std::path::{Path, PathBuf};
use warp::Filter;
/// The name of the file which stores the secret key.
///
/// It is purposefully opaque to prevent users confusing it with the "secret" that they need to
/// share with API consumers (which is actually the public key).
pub const SK_FILENAME: &str = ".secp-sk";
/// Length of the raw secret key, in bytes.
pub const SK_LEN: usize = 32;
/// The name of the file which stores the public key.
///
/// For users, this public key is a "secret" that can be shared with API consumers to provide them
/// access to the API. We avoid calling it a "public" key to users, since they should not post this
/// value in a public forum.
/// The name of the file which stores the API token.
pub const PK_FILENAME: &str = "api-token.txt";
/// Contains a `secp256k1` keypair that is saved-to/loaded-from disk on instantiation. The keypair
/// is used for authorization/authentication for requests/responses on the HTTP API.
pub const PK_LEN: usize = 33;
/// Contains a randomly generated string which is used for authorization of requests to the HTTP API.
///
/// Provides convenience functions to ultimately provide:
///
/// - A signature across outgoing HTTP responses, applied to the `Signature` header.
/// - Verification of proof-of-knowledge of the public key in `self` for incoming HTTP requests,
/// via the `Authorization` header.
///
/// The aforementioned scheme was first defined here:
///
/// https://github.com/sigp/lighthouse/issues/1269#issuecomment-649879855
///
/// This scheme has since been tweaked to remove VC response signing and secp256k1 key generation.
/// https://github.com/sigp/lighthouse/issues/5423
pub struct ApiSecret {
pk: PublicKey,
sk: SecretKey,
pk: String,
pk_path: PathBuf,
}
impl ApiSecret {
/// If both the secret and public keys are already on-disk, parse them and ensure they're both
/// from the same keypair.
/// If the public key is already on-disk, use it.
///
/// The provided `dir` is a directory containing two files, `SK_FILENAME` and `PK_FILENAME`.
/// The provided `dir` is a directory containing `PK_FILENAME`.
///
/// If either the secret or public key files are missing on disk, create a new keypair and
/// If the public key file is missing on disk, create a new key and
/// write it to disk (over-writing any existing files).
pub fn create_or_open<P: AsRef<Path>>(dir: P) -> Result<Self, String> {
let sk_path = dir.as_ref().join(SK_FILENAME);
let pk_path = dir.as_ref().join(PK_FILENAME);
if !(sk_path.exists() && pk_path.exists()) {
let sk = SecretKey::random(&mut thread_rng());
let pk = PublicKey::from_secret_key(&sk);
// Create and write the secret key to file with appropriate permissions
create_with_600_perms(
&sk_path,
serde_utils::hex::encode(sk.serialize()).as_bytes(),
)
.map_err(|e| {
format!(
"Unable to create file with permissions for {:?}: {:?}",
sk_path, e
)
})?;
if !pk_path.exists() {
let length = PK_LEN;
let pk: String = thread_rng()
.sample_iter(&Alphanumeric)
.take(length)
.map(char::from)
.collect();
// Create and write the public key to file with appropriate permissions
create_with_600_perms(
&pk_path,
format!(
"{}{}",
PK_PREFIX,
serde_utils::hex::encode(&pk.serialize_compressed()[..])
)
.as_bytes(),
)
.map_err(|e| {
create_with_600_perms(&pk_path, pk.to_string().as_bytes()).map_err(|e| {
format!(
"Unable to create file with permissions for {:?}: {:?}",
pk_path, e
@@ -87,78 +55,18 @@ impl ApiSecret {
})?;
}
let sk = fs::read(&sk_path)
.map_err(|e| format!("cannot read {}: {}", SK_FILENAME, e))
.and_then(|bytes| {
serde_utils::hex::decode(&String::from_utf8_lossy(&bytes))
.map_err(|_| format!("{} should be 0x-prefixed hex", PK_FILENAME))
})
.and_then(|bytes| {
if bytes.len() == SK_LEN {
let mut array = [0; SK_LEN];
array.copy_from_slice(&bytes);
SecretKey::parse(&array).map_err(|e| format!("invalid {}: {}", SK_FILENAME, e))
} else {
Err(format!(
"{} expected {} bytes not {}",
SK_FILENAME,
SK_LEN,
bytes.len()
))
}
})?;
let pk = fs::read(&pk_path)
.map_err(|e| format!("cannot read {}: {}", PK_FILENAME, e))
.and_then(|bytes| {
let hex =
String::from_utf8(bytes).map_err(|_| format!("{} is not utf8", SK_FILENAME))?;
if let Some(stripped) = hex.strip_prefix(PK_PREFIX) {
serde_utils::hex::decode(stripped)
.map_err(|_| format!("{} should be 0x-prefixed hex", SK_FILENAME))
} else {
Err(format!("unable to parse {}", SK_FILENAME))
}
})
.and_then(|bytes| {
if bytes.len() == PK_LEN {
let mut array = [0; PK_LEN];
array.copy_from_slice(&bytes);
PublicKey::parse_compressed(&array)
.map_err(|e| format!("invalid {}: {}", PK_FILENAME, e))
} else {
Err(format!(
"{} expected {} bytes not {}",
PK_FILENAME,
PK_LEN,
bytes.len()
))
}
})?;
.map_err(|e| format!("cannot read {}: {}", PK_FILENAME, e))?
.iter()
.map(|&c| char::from(c))
.collect();
// Ensure that the keys loaded from disk are indeed a pair.
if PublicKey::from_secret_key(&sk) != pk {
fs::remove_file(&sk_path)
.map_err(|e| format!("unable to remove {}: {}", SK_FILENAME, e))?;
fs::remove_file(&pk_path)
.map_err(|e| format!("unable to remove {}: {}", PK_FILENAME, e))?;
return Err(format!(
"{:?} does not match {:?} and the files have been deleted. Please try again.",
sk_path, pk_path
));
}
Ok(Self { pk, sk, pk_path })
}
/// Returns the public key of `self` as a 0x-prefixed hex string.
fn pubkey_string(&self) -> String {
serde_utils::hex::encode(&self.pk.serialize_compressed()[..])
Ok(Self { pk, pk_path })
}
/// Returns the API token.
pub fn api_token(&self) -> String {
format!("{}{}", PK_PREFIX, self.pubkey_string())
self.pk.clone()
}
/// Returns the path for the API token file
@@ -196,16 +104,4 @@ impl ApiSecret {
.untuple_one()
.boxed()
}
/// Returns a closure which produces a signature over some bytes using the secret key in
/// `self`. The signature is a 32-byte hash formatted as a 0x-prefixed string.
pub fn signer(&self) -> impl Fn(&[u8]) -> String + Clone {
let sk = self.sk;
move |input: &[u8]| -> String {
let message =
Message::parse_slice(digest(&SHA256, input).as_ref()).expect("sha256 is 32 bytes");
let (signature, _) = libsecp256k1::sign(&message, &sk);
serde_utils::hex::encode(signature.serialize_der().as_ref())
}
}
}

View File

@@ -47,15 +47,8 @@ use task_executor::TaskExecutor;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use types::{ChainSpec, ConfigAndPreset, EthSpec};
use validator_dir::Builder as ValidatorDirBuilder;
use warp::{
http::{
header::{HeaderValue, CONTENT_TYPE},
response::Response,
StatusCode,
},
sse::Event,
Filter,
};
use warp::{sse::Event, Filter};
use warp_utils::task::blocking_json_task;
#[derive(Debug)]
pub enum Error {
@@ -179,9 +172,6 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
}
};
let signer = ctx.api_secret.signer();
let signer = warp::any().map(move || signer.clone());
let inner_block_service = ctx.block_service.clone();
let block_service_filter = warp::any()
.map(move || inner_block_service.clone())
@@ -284,9 +274,8 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
let get_node_version = warp::path("lighthouse")
.and(warp::path("version"))
.and(warp::path::end())
.and(signer.clone())
.and_then(|signer| {
blocking_signed_json_task(signer, move || {
.then(|| {
blocking_json_task(move || {
Ok(api_types::GenericResponse::from(api_types::VersionData {
version: version_with_platform(),
}))
@@ -297,9 +286,8 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
let get_lighthouse_health = warp::path("lighthouse")
.and(warp::path("health"))
.and(warp::path::end())
.and(signer.clone())
.and_then(|signer| {
blocking_signed_json_task(signer, move || {
.then(|| {
blocking_json_task(move || {
eth2::lighthouse::Health::observe()
.map(api_types::GenericResponse::from)
.map_err(warp_utils::reject::custom_bad_request)
@@ -311,9 +299,8 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path("spec"))
.and(warp::path::end())
.and(spec_filter.clone())
.and(signer.clone())
.and_then(|spec: Arc<_>, signer| {
blocking_signed_json_task(signer, move || {
.then(|spec: Arc<_>| {
blocking_json_task(move || {
let config = ConfigAndPreset::from_chain_spec::<E>(&spec, None);
Ok(api_types::GenericResponse::from(config))
})
@@ -324,9 +311,8 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path("validators"))
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(|validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
.then(|validator_store: Arc<ValidatorStore<T, E>>| {
blocking_json_task(move || {
let validators = validator_store
.initialized_validators()
.read()
@@ -349,10 +335,9 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::param::<PublicKey>())
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>| {
blocking_json_task(move || {
let validator = validator_store
.initialized_validators()
.read()
@@ -384,9 +369,8 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(system_info_filter)
.and(app_start_filter)
.and(validator_dir_filter.clone())
.and(signer.clone())
.and_then(|sysinfo, app_start: std::time::Instant, val_dir, signer| {
blocking_signed_json_task(signer, move || {
.then(|sysinfo, app_start: std::time::Instant, val_dir| {
blocking_json_task(move || {
let app_uptime = app_start.elapsed().as_secs();
Ok(api_types::GenericResponse::from(observe_system_health_vc(
sysinfo, val_dir, app_uptime,
@@ -401,15 +385,13 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(graffiti_file_filter.clone())
.and(graffiti_flag_filter)
.and(signer.clone())
.and(log_filter.clone())
.and_then(
.then(
|validator_store: Arc<ValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>,
graffiti_flag: Option<Graffiti>,
signer,
log| {
blocking_signed_json_task(signer, move || {
blocking_json_task(move || {
let mut result = HashMap::new();
for (key, graffiti_definition) in validator_store
.initialized_validators()
@@ -435,9 +417,8 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path("ui"))
.and(warp::path("fallback_health"))
.and(warp::path::end())
.and(signer.clone())
.and(block_service_filter.clone())
.and_then(|signer, block_filter: BlockService<T, E>| async move {
.then(|block_filter: BlockService<T, E>| async move {
let mut result: HashMap<(usize, String), Result<BeaconNodeHealth, CandidateError>> =
HashMap::new();
for node in &*block_filter.beacon_nodes.candidates.read().await {
@@ -455,8 +436,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
}
}
blocking_signed_json_task(signer, move || Ok(api_types::GenericResponse::from(result)))
.await
blocking_json_task(move || Ok(api_types::GenericResponse::from(result))).await
});
// POST lighthouse/validators/
@@ -468,17 +448,15 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(secrets_dir_filter.clone())
.and(validator_store_filter.clone())
.and(spec_filter.clone())
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
move |body: Vec<api_types::ValidatorRequest>,
validator_dir: PathBuf,
secrets_dir: PathBuf,
validator_store: Arc<ValidatorStore<T, E>>,
spec: Arc<ChainSpec>,
signer,
task_executor: TaskExecutor| {
blocking_signed_json_task(signer, move || {
blocking_json_task(move || {
let secrets_dir = store_passwords_in_secrets_dir.then_some(secrets_dir);
if let Some(handle) = task_executor.handle() {
let (validators, mnemonic) =
@@ -515,17 +493,15 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(secrets_dir_filter.clone())
.and(validator_store_filter.clone())
.and(spec_filter)
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
move |body: api_types::CreateValidatorsMnemonicRequest,
validator_dir: PathBuf,
secrets_dir: PathBuf,
validator_store: Arc<ValidatorStore<T, E>>,
spec: Arc<ChainSpec>,
signer,
task_executor: TaskExecutor| {
blocking_signed_json_task(signer, move || {
blocking_json_task(move || {
let secrets_dir = store_passwords_in_secrets_dir.then_some(secrets_dir);
if let Some(handle) = task_executor.handle() {
let mnemonic =
@@ -564,16 +540,14 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_dir_filter.clone())
.and(secrets_dir_filter.clone())
.and(validator_store_filter.clone())
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
move |body: api_types::KeystoreValidatorsPostRequest,
validator_dir: PathBuf,
secrets_dir: PathBuf,
validator_store: Arc<ValidatorStore<T, E>>,
signer,
task_executor: TaskExecutor| {
blocking_signed_json_task(signer, move || {
blocking_json_task(move || {
// Check to ensure the password is correct.
let keypair = body
.keystore
@@ -654,14 +628,12 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(warp::body::json())
.and(validator_store_filter.clone())
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
|body: Vec<api_types::Web3SignerValidatorRequest>,
validator_store: Arc<ValidatorStore<T, E>>,
signer,
task_executor: TaskExecutor| {
blocking_signed_json_task(signer, move || {
blocking_json_task(move || {
if let Some(handle) = task_executor.handle() {
let web3signers: Vec<ValidatorDefinition> = body
.into_iter()
@@ -709,16 +681,14 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::body::json())
.and(validator_store_filter.clone())
.and(graffiti_file_filter.clone())
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
|validator_pubkey: PublicKey,
body: api_types::ValidatorPatchRequest,
validator_store: Arc<ValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>,
signer,
task_executor: TaskExecutor| {
blocking_signed_json_task(signer, move || {
blocking_json_task(move || {
if body.graffiti.is_some() && graffiti_file.is_some() {
return Err(warp_utils::reject::custom_bad_request(
"Unable to update graffiti as the \"--graffiti-file\" flag is set"
@@ -827,10 +797,9 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
// GET /lighthouse/auth
let get_auth = warp::path("lighthouse").and(warp::path("auth").and(warp::path::end()));
let get_auth = get_auth
.and(signer.clone())
.and(api_token_path_filter)
.and_then(|signer, token_path: PathBuf| {
blocking_signed_json_task(signer, move || {
.then(move |token_path: PathBuf| {
blocking_json_task(move || {
Ok(AuthResponse {
token_path: token_path.display().to_string(),
})
@@ -842,23 +811,20 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path("keystores"))
.and(warp::path::end())
.and(warp::body::json())
.and(signer.clone())
.and(validator_store_filter.clone())
.and(task_executor_filter.clone())
.and(log_filter.clone())
.and_then(
move |request, signer, validator_store, task_executor, log| {
blocking_signed_json_task(signer, move || {
if allow_keystore_export {
keystores::export(request, validator_store, task_executor, log)
} else {
Err(warp_utils::reject::custom_bad_request(
"keystore export is disabled".to_string(),
))
}
})
},
);
.then(move |request, validator_store, task_executor, log| {
blocking_json_task(move || {
if allow_keystore_export {
keystores::export(request, validator_store, task_executor, log)
} else {
Err(warp_utils::reject::custom_bad_request(
"keystore export is disabled".to_string(),
))
}
})
});
// Standard key-manager endpoints.
let eth_v1 = warp::path("eth").and(warp::path("v1"));
@@ -872,10 +838,9 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path("feerecipient"))
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>| {
blocking_json_task(move || {
if validator_store
.initialized_validators()
.read()
@@ -912,13 +877,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::body::json())
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
.then(
|validator_pubkey: PublicKey,
request: api_types::UpdateFeeRecipientRequest,
validator_store: Arc<ValidatorStore<T, E>>,
signer| {
blocking_signed_json_task(signer, move || {
validator_store: Arc<ValidatorStore<T, E>>| {
blocking_json_task(move || {
if validator_store
.initialized_validators()
.read()
@@ -952,10 +915,9 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path("feerecipient"))
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>| {
blocking_json_task(move || {
if validator_store
.initialized_validators()
.read()
@@ -989,10 +951,9 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path("gas_limit"))
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>| {
blocking_json_task(move || {
if validator_store
.initialized_validators()
.read()
@@ -1021,13 +982,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::body::json())
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
.then(
|validator_pubkey: PublicKey,
request: api_types::UpdateGasLimitRequest,
validator_store: Arc<ValidatorStore<T, E>>,
signer| {
blocking_signed_json_task(signer, move || {
validator_store: Arc<ValidatorStore<T, E>>| {
blocking_json_task(move || {
if validator_store
.initialized_validators()
.read()
@@ -1061,10 +1020,9 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path("gas_limit"))
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>| {
blocking_json_task(move || {
if validator_store
.initialized_validators()
.read()
@@ -1101,17 +1059,15 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(slot_clock_filter)
.and(log_filter.clone())
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
|pubkey: PublicKey,
query: api_types::VoluntaryExitQuery,
validator_store: Arc<ValidatorStore<T, E>>,
slot_clock: T,
log,
signer,
task_executor: TaskExecutor| {
blocking_signed_json_task(signer, move || {
blocking_json_task(move || {
if let Some(handle) = task_executor.handle() {
let signed_voluntary_exit =
handle.block_on(create_signed_voluntary_exit(
@@ -1139,13 +1095,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(graffiti_flag_filter)
.and(signer.clone())
.and_then(
.then(
|pubkey: PublicKey,
validator_store: Arc<ValidatorStore<T, E>>,
graffiti_flag: Option<Graffiti>,
signer| {
blocking_signed_json_task(signer, move || {
graffiti_flag: Option<Graffiti>| {
blocking_json_task(move || {
let graffiti = get_graffiti(pubkey.clone(), validator_store, graffiti_flag)?;
Ok(GenericResponse::from(GetGraffitiResponse {
pubkey: pubkey.into(),
@@ -1164,14 +1118,12 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(graffiti_file_filter.clone())
.and(signer.clone())
.and_then(
.then(
|pubkey: PublicKey,
query: SetGraffitiRequest,
validator_store: Arc<ValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>,
signer| {
blocking_signed_json_task(signer, move || {
graffiti_file: Option<GraffitiFile>| {
blocking_json_task(move || {
if graffiti_file.is_some() {
return Err(warp_utils::reject::invalid_auth(
"Unable to update graffiti as the \"--graffiti-file\" flag is set"
@@ -1192,13 +1144,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(graffiti_file_filter.clone())
.and(signer.clone())
.and_then(
.then(
|pubkey: PublicKey,
validator_store: Arc<ValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>,
signer| {
blocking_signed_json_task(signer, move || {
graffiti_file: Option<GraffitiFile>| {
blocking_json_task(move || {
if graffiti_file.is_some() {
return Err(warp_utils::reject::invalid_auth(
"Unable to delete graffiti as the \"--graffiti-file\" flag is set"
@@ -1212,32 +1162,24 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.map(|reply| warp::reply::with_status(reply, warp::http::StatusCode::NO_CONTENT));
// GET /eth/v1/keystores
let get_std_keystores = std_keystores
.and(signer.clone())
.and(validator_store_filter.clone())
.and_then(|signer, validator_store: Arc<ValidatorStore<T, E>>| {
blocking_signed_json_task(signer, move || Ok(keystores::list(validator_store)))
});
let get_std_keystores = std_keystores.and(validator_store_filter.clone()).then(
|validator_store: Arc<ValidatorStore<T, E>>| {
blocking_json_task(move || Ok(keystores::list(validator_store)))
},
);
// POST /eth/v1/keystores
let post_std_keystores = std_keystores
.and(warp::body::json())
.and(signer.clone())
.and(validator_dir_filter)
.and(secrets_dir_filter)
.and(validator_store_filter.clone())
.and(task_executor_filter.clone())
.and(log_filter.clone())
.and_then(
move |request,
signer,
validator_dir,
secrets_dir,
validator_store,
task_executor,
log| {
.then(
move |request, validator_dir, secrets_dir, validator_store, task_executor, log| {
let secrets_dir = store_passwords_in_secrets_dir.then_some(secrets_dir);
blocking_signed_json_task(signer, move || {
blocking_json_task(move || {
keystores::import(
request,
validator_dir,
@@ -1253,33 +1195,30 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
// DELETE /eth/v1/keystores
let delete_std_keystores = std_keystores
.and(warp::body::json())
.and(signer.clone())
.and(validator_store_filter.clone())
.and(task_executor_filter.clone())
.and(log_filter.clone())
.and_then(|request, signer, validator_store, task_executor, log| {
blocking_signed_json_task(signer, move || {
.then(|request, validator_store, task_executor, log| {
blocking_json_task(move || {
keystores::delete(request, validator_store, task_executor, log)
})
});
// GET /eth/v1/remotekeys
let get_std_remotekeys = std_remotekeys
.and(signer.clone())
.and(validator_store_filter.clone())
.and_then(|signer, validator_store: Arc<ValidatorStore<T, E>>| {
blocking_signed_json_task(signer, move || Ok(remotekeys::list(validator_store)))
});
let get_std_remotekeys = std_remotekeys.and(validator_store_filter.clone()).then(
|validator_store: Arc<ValidatorStore<T, E>>| {
blocking_json_task(move || Ok(remotekeys::list(validator_store)))
},
);
// POST /eth/v1/remotekeys
let post_std_remotekeys = std_remotekeys
.and(warp::body::json())
.and(signer.clone())
.and(validator_store_filter.clone())
.and(task_executor_filter.clone())
.and(log_filter.clone())
.and_then(|request, signer, validator_store, task_executor, log| {
blocking_signed_json_task(signer, move || {
.then(|request, validator_store, task_executor, log| {
blocking_json_task(move || {
remotekeys::import(request, validator_store, task_executor, log)
})
});
@@ -1287,12 +1226,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
// DELETE /eth/v1/remotekeys
let delete_std_remotekeys = std_remotekeys
.and(warp::body::json())
.and(signer)
.and(validator_store_filter)
.and(task_executor_filter)
.and(log_filter.clone())
.and_then(|request, signer, validator_store, task_executor, log| {
blocking_signed_json_task(signer, move || {
.then(|request, validator_store, task_executor, log| {
blocking_json_task(move || {
remotekeys::delete(request, validator_store, task_executor, log)
})
});
@@ -1413,42 +1351,3 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
Ok((listening_socket, server))
}
/// Executes `func` in blocking tokio task (i.e., where long-running tasks are permitted).
/// JSON-encodes the return value of `func`, using the `signer` function to produce a signature of
/// those bytes.
pub async fn blocking_signed_json_task<S, F, T>(
signer: S,
func: F,
) -> Result<impl warp::Reply, warp::Rejection>
where
S: Fn(&[u8]) -> String,
F: FnOnce() -> Result<T, warp::Rejection> + Send + 'static,
T: Serialize + Send + 'static,
{
warp_utils::task::blocking_task(func)
.await
.map(|func_output| {
let mut response = match serde_json::to_vec(&func_output) {
Ok(body) => {
let mut res = Response::new(body);
res.headers_mut()
.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
res
}
Err(_) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(vec![])
.expect("can produce simple response from static values"),
};
let body: &Vec<u8> = response.body();
let signature = signer(body);
let header_value =
HeaderValue::from_str(&signature).expect("hash can be encoded as header");
response.headers_mut().append("Signature", header_value);
response
})
}