This commit is contained in:
Michael Sproul
2025-05-08 20:52:50 +10:00
parent 2e50c2daea
commit 4d118a6ff2
18 changed files with 338 additions and 321 deletions

View File

@@ -21,7 +21,7 @@ pub async fn create_signed_voluntary_exit<T: 'static + SlotClock + Clone, E: Eth
};
let pubkey_bytes = PublicKeyBytes::from(pubkey);
if !validator_store.has_validator(&pubkey_bytes) {
if !validator_store.has_validator(&pubkey_bytes).await {
return Err(warp_utils::reject::custom_not_found(format!(
"{} is disabled or not managed by this validator client",
pubkey_bytes.as_hex_string()
@@ -30,6 +30,7 @@ pub async fn create_signed_voluntary_exit<T: 'static + SlotClock + Clone, E: Eth
let validator_index = validator_store
.validator_index(&pubkey_bytes)
.await
.ok_or_else(|| {
warp_utils::reject::custom_not_found(format!(
"The validator index for {} is not known. The validator client \

View File

@@ -4,13 +4,13 @@ use slot_clock::SlotClock;
use std::sync::Arc;
use types::{graffiti::GraffitiString, EthSpec, Graffiti};
pub fn get_graffiti<T: 'static + SlotClock + Clone, E: EthSpec>(
pub async fn get_graffiti<T: 'static + SlotClock + Clone, E: EthSpec>(
validator_pubkey: PublicKey,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
graffiti_flag: Option<Graffiti>,
) -> Result<Graffiti, warp::Rejection> {
let initialized_validators_rw_lock = validator_store.initialized_validators();
let initialized_validators = initialized_validators_rw_lock.read();
let initialized_validators = initialized_validators_rw_lock.read().await;
match initialized_validators.validator(&validator_pubkey.compress()) {
None => Err(warp_utils::reject::custom_not_found(
"The key was not found on the server".to_string(),
@@ -26,13 +26,13 @@ pub fn get_graffiti<T: 'static + SlotClock + Clone, E: EthSpec>(
}
}
pub fn set_graffiti<T: 'static + SlotClock + Clone, E: EthSpec>(
pub async fn set_graffiti<T: 'static + SlotClock + Clone, E: EthSpec>(
validator_pubkey: PublicKey,
graffiti: GraffitiString,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
) -> Result<(), warp::Rejection> {
let initialized_validators_rw_lock = validator_store.initialized_validators();
let mut initialized_validators = initialized_validators_rw_lock.write();
let mut initialized_validators = initialized_validators_rw_lock.write().await;
match initialized_validators.validator(&validator_pubkey.compress()) {
None => Err(warp_utils::reject::custom_not_found(
"The key was not found on the server, nothing to update".to_string(),
@@ -53,12 +53,12 @@ pub fn set_graffiti<T: 'static + SlotClock + Clone, E: EthSpec>(
}
}
pub fn delete_graffiti<T: 'static + SlotClock + Clone, E: EthSpec>(
pub async fn delete_graffiti<T: 'static + SlotClock + Clone, E: EthSpec>(
validator_pubkey: PublicKey,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
) -> Result<(), warp::Rejection> {
let initialized_validators_rw_lock = validator_store.initialized_validators();
let mut initialized_validators = initialized_validators_rw_lock.write();
let mut initialized_validators = initialized_validators_rw_lock.write().await;
match initialized_validators.validator(&validator_pubkey.compress()) {
None => Err(warp_utils::reject::custom_not_found(
"The key was not found on the server, nothing to delete".to_string(),

View File

@@ -16,7 +16,6 @@ use slot_clock::SlotClock;
use std::path::PathBuf;
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::runtime::Handle;
use tracing::{info, warn};
use types::{EthSpec, PublicKeyBytes};
use validator_dir::{keystore_password_path, Builder as ValidatorDirBuilder};
@@ -24,11 +23,11 @@ use warp::Rejection;
use warp_utils::reject::{custom_bad_request, custom_server_error};
use zeroize::Zeroizing;
pub fn list<T: SlotClock + 'static, E: EthSpec>(
pub async fn list<T: SlotClock + 'static, E: EthSpec>(
validator_store: Arc<LighthouseValidatorStore<T, E>>,
) -> ListKeystoresResponse {
let initialized_validators_rwlock = validator_store.initialized_validators();
let initialized_validators = initialized_validators_rwlock.read();
let initialized_validators = initialized_validators_rwlock.read().await;
let keystores = initialized_validators
.validator_definitions()
@@ -58,12 +57,12 @@ pub fn list<T: SlotClock + 'static, E: EthSpec>(
ListKeystoresResponse { data: keystores }
}
pub fn import<T: SlotClock + 'static, E: EthSpec>(
pub async fn import<T: SlotClock + 'static, E: EthSpec>(
request: ImportKeystoresRequest,
validator_dir: PathBuf,
secrets_dir: Option<PathBuf>,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
task_executor: TaskExecutor,
_task_executor: TaskExecutor,
) -> Result<ImportKeystoresResponse, Rejection> {
// Check request validity. This is the only cases in which we should return a 4xx code.
if request.keystores.len() != request.passwords.len() {
@@ -115,7 +114,7 @@ pub fn import<T: SlotClock + 'static, E: EthSpec>(
ImportKeystoreStatus::Error,
format!("slashing protection import failed: {:?}", e),
)
} else if let Some(handle) = task_executor.handle() {
} else {
// Import the keystore.
match import_single_keystore::<_, E>(
keystore,
@@ -123,8 +122,9 @@ pub fn import<T: SlotClock + 'static, E: EthSpec>(
validator_dir.clone(),
secrets_dir.clone(),
&validator_store,
handle,
) {
)
.await
{
Ok(status) => Status::ok(status),
Err(e) => {
warn!(
@@ -135,11 +135,6 @@ pub fn import<T: SlotClock + 'static, E: EthSpec>(
Status::error(ImportKeystoreStatus::Error, e)
}
}
} else {
Status::error(
ImportKeystoreStatus::Error,
"validator client shutdown".into(),
)
};
statuses.push(status);
}
@@ -159,13 +154,12 @@ pub fn import<T: SlotClock + 'static, E: EthSpec>(
Ok(ImportKeystoresResponse { data: statuses })
}
fn import_single_keystore<T: SlotClock + 'static, E: EthSpec>(
async fn import_single_keystore<T: SlotClock + 'static, E: EthSpec>(
keystore: Keystore,
password: Zeroizing<String>,
validator_dir_path: PathBuf,
secrets_dir: Option<PathBuf>,
validator_store: &LighthouseValidatorStore<T, E>,
handle: Handle,
) -> Result<ImportKeystoreStatus, String> {
// Check if the validator key already exists, erroring if it is a remote signer validator.
let pubkey = keystore
@@ -174,6 +168,7 @@ fn import_single_keystore<T: SlotClock + 'static, E: EthSpec>(
if let Some(def) = validator_store
.initialized_validators()
.read()
.await
.validator_definitions()
.iter()
.find(|def| def.voting_public_key == pubkey)
@@ -215,8 +210,8 @@ fn import_single_keystore<T: SlotClock + 'static, E: EthSpec>(
let voting_keystore_path = validator_dir.voting_keystore_path();
drop(validator_dir);
handle
.block_on(validator_store.add_validator_keystore(
validator_store
.add_validator_keystore(
voting_keystore_path,
password_storage,
true,
@@ -226,18 +221,18 @@ fn import_single_keystore<T: SlotClock + 'static, E: EthSpec>(
None,
None,
None,
))
)
.await
.map_err(|e| format!("failed to initialize validator: {:?}", e))?;
Ok(ImportKeystoreStatus::Imported)
}
pub fn delete<T: SlotClock + 'static, E: EthSpec>(
pub async fn delete<T: SlotClock + 'static, E: EthSpec>(
request: DeleteKeystoresRequest,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
task_executor: TaskExecutor,
) -> Result<DeleteKeystoresResponse, Rejection> {
let export_response = export(request, validator_store, task_executor)?;
let export_response = export(request, validator_store).await?;
// Check the status is Deleted to confirm deletion is successful, then only display the log
let successful_deletion = export_response
@@ -263,49 +258,40 @@ pub fn delete<T: SlotClock + 'static, E: EthSpec>(
})
}
pub fn export<T: SlotClock + 'static, E: EthSpec>(
pub async fn export<T: SlotClock + 'static, E: EthSpec>(
request: DeleteKeystoresRequest,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
task_executor: TaskExecutor,
) -> Result<ExportKeystoresResponse, Rejection> {
// Remove from initialized validators.
let initialized_validators_rwlock = validator_store.initialized_validators();
let mut initialized_validators = initialized_validators_rwlock.write();
let mut initialized_validators = initialized_validators_rwlock.write().await;
let mut responses = request
.pubkeys
.iter()
.map(|pubkey_bytes| {
match delete_single_keystore(
pubkey_bytes,
&mut initialized_validators,
task_executor.clone(),
) {
Ok(status) => status,
Err(error) => {
warn!(
pubkey = ?pubkey_bytes,
?error,
"Error deleting keystore"
);
SingleExportKeystoresResponse {
status: Status::error(DeleteKeystoreStatus::Error, error),
validating_keystore: None,
validating_keystore_password: None,
}
}
let mut responses = vec![];
for pubkey_bytes in &request.pubkeys {
match delete_single_keystore(pubkey_bytes, &mut initialized_validators).await {
Ok(status) => responses.push(status),
Err(error) => {
warn!(
pubkey = ?pubkey_bytes,
?error,
"Error deleting keystore"
);
responses.push(SingleExportKeystoresResponse {
status: Status::error(DeleteKeystoreStatus::Error, error),
validating_keystore: None,
validating_keystore_password: None,
})
}
})
.collect::<Vec<_>>();
}
}
// Use `update_validators` to update the key cache. It is safe to let the key cache get a bit out
// of date as it resets when it can't be decrypted. We update it just a single time to avoid
// continually resetting it after each key deletion.
if let Some(handle) = task_executor.handle() {
handle
.block_on(initialized_validators.update_validators())
.map_err(|e| custom_server_error(format!("unable to update key cache: {:?}", e)))?;
}
initialized_validators
.update_validators()
.await
.map_err(|e| custom_server_error(format!("unable to update key cache: {:?}", e)))?;
// Export the slashing protection data.
let slashing_protection = validator_store
@@ -332,38 +318,35 @@ pub fn export<T: SlotClock + 'static, E: EthSpec>(
})
}
fn delete_single_keystore(
async fn delete_single_keystore(
pubkey_bytes: &PublicKeyBytes,
initialized_validators: &mut InitializedValidators,
task_executor: TaskExecutor,
) -> Result<SingleExportKeystoresResponse, String> {
if let Some(handle) = task_executor.handle() {
let pubkey = pubkey_bytes
.decompress()
.map_err(|e| format!("invalid pubkey, {:?}: {:?}", pubkey_bytes, e))?;
let pubkey = pubkey_bytes
.decompress()
.map_err(|e| format!("invalid pubkey, {:?}: {:?}", pubkey_bytes, e))?;
match handle.block_on(initialized_validators.delete_definition_and_keystore(&pubkey, true))
{
Ok(Some(keystore_and_password)) => Ok(SingleExportKeystoresResponse {
status: Status::ok(DeleteKeystoreStatus::Deleted),
validating_keystore: Some(KeystoreJsonStr(keystore_and_password.keystore)),
validating_keystore_password: keystore_and_password.password,
}),
Ok(None) => Ok(SingleExportKeystoresResponse {
status: Status::ok(DeleteKeystoreStatus::Deleted),
match initialized_validators
.delete_definition_and_keystore(&pubkey, true)
.await
{
Ok(Some(keystore_and_password)) => Ok(SingleExportKeystoresResponse {
status: Status::ok(DeleteKeystoreStatus::Deleted),
validating_keystore: Some(KeystoreJsonStr(keystore_and_password.keystore)),
validating_keystore_password: keystore_and_password.password,
}),
Ok(None) => Ok(SingleExportKeystoresResponse {
status: Status::ok(DeleteKeystoreStatus::Deleted),
validating_keystore: None,
validating_keystore_password: None,
}),
Err(e) => match e {
Error::ValidatorNotInitialized(_) => Ok(SingleExportKeystoresResponse {
status: Status::ok(DeleteKeystoreStatus::NotFound),
validating_keystore: None,
validating_keystore_password: None,
}),
Err(e) => match e {
Error::ValidatorNotInitialized(_) => Ok(SingleExportKeystoresResponse {
status: Status::ok(DeleteKeystoreStatus::NotFound),
validating_keystore: None,
validating_keystore_password: None,
}),
_ => Err(format!("unable to disable and delete: {:?}", e)),
},
}
} else {
Err("validator client shutdown".into())
_ => Err(format!("unable to disable and delete: {:?}", e)),
},
}
}

View File

@@ -54,7 +54,7 @@ use types::{ChainSpec, ConfigAndPreset, EthSpec};
use validator_dir::Builder as ValidatorDirBuilder;
use validator_services::block_service::BlockService;
use warp::{sse::Event, Filter};
use warp_utils::task::blocking_json_task;
use warp_utils::task::{async_json_task, blocking_json_task};
#[derive(Debug)]
pub enum Error {
@@ -320,10 +320,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.then(|validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
async_json_task(async move {
let validators = validator_store
.initialized_validators()
.read()
.await
.validator_definitions()
.iter()
.map(|def| api_types::ValidatorData {
@@ -345,10 +346,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.then(
|validator_pubkey: PublicKey, validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
async_json_task(async move {
let validator = validator_store
.initialized_validators()
.read()
.await
.validator_definitions()
.iter()
.find(|def| def.voting_public_key == validator_pubkey)
@@ -397,11 +399,12 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
|validator_store: Arc<LighthouseValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>,
graffiti_flag: Option<Graffiti>| {
blocking_json_task(move || {
async_json_task(async move {
let mut result = HashMap::new();
for (key, graffiti_definition) in validator_store
.initialized_validators()
.read()
.await
.get_all_validators_graffiti()
{
let graffiti = determine_graffiti(
@@ -697,12 +700,13 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(graffiti_file_filter.clone())
.and(task_executor_filter.clone())
.then(
// FIXME(sproul): do we need task_executor for anything here?
|validator_pubkey: PublicKey,
body: api_types::ValidatorPatchRequest,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>,
task_executor: TaskExecutor| {
blocking_json_task(move || {
_task_executor: TaskExecutor| {
async_json_task(async move {
if body.graffiti.is_some() && graffiti_file.is_some() {
return Err(warp_utils::reject::custom_bad_request(
"Unable to update graffiti as the \"--graffiti-file\" flag is set"
@@ -711,8 +715,8 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
}
let maybe_graffiti = body.graffiti.clone().map(Into::into);
let initialized_validators_rw_lock = validator_store.initialized_validators();
let initialized_validators = initialized_validators_rw_lock.upgradable_read();
let initialized_validators_lock = validator_store.initialized_validators();
let mut initialized_validators = initialized_validators_lock.write().await;
// Do not make any changes if all fields are identical or unchanged.
fn equal_or_none<T: PartialEq>(
@@ -770,38 +774,24 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
Ok(())
}
(Some(_), _) => {
// Upgrade read lock only in the case where a write is actually
// required.
let mut initialized_validators_write =
parking_lot::RwLockUpgradableReadGuard::upgrade(
initialized_validators,
);
if let Some(handle) = task_executor.handle() {
handle
.block_on(
initialized_validators_write
.set_validator_definition_fields(
&validator_pubkey,
body.enabled,
body.gas_limit,
body.builder_proposals,
body.builder_boost_factor,
body.prefer_builder_proposals,
body.graffiti,
),
)
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
"unable to set validator status: {:?}",
e
))
})?;
Ok(())
} else {
Err(warp_utils::reject::custom_server_error(
"Lighthouse shutting down".into(),
))
}
initialized_validators
.set_validator_definition_fields(
&validator_pubkey,
body.enabled,
body.gas_limit,
body.builder_proposals,
body.builder_boost_factor,
body.prefer_builder_proposals,
body.graffiti,
)
.await
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
"unable to set validator status: {:?}",
e
))
})?;
Ok(())
}
}
})
@@ -827,10 +817,10 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::body::json())
.and(validator_store_filter.clone())
.and(task_executor_filter.clone())
.then(move |request, validator_store, task_executor| {
blocking_json_task(move || {
.then(move |request, validator_store, _task_executor| {
async_json_task(async move {
if allow_keystore_export {
keystores::export(request, validator_store, task_executor)
keystores::export(request, validator_store).await
} else {
Err(warp_utils::reject::custom_bad_request(
"keystore export is disabled".to_string(),
@@ -853,10 +843,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.then(
|validator_pubkey: PublicKey, validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
async_json_task(async move {
if validator_store
.initialized_validators()
.read()
.await
.is_enabled(&validator_pubkey)
.is_none()
{
@@ -867,6 +858,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
}
validator_store
.get_fee_recipient(&PublicKeyBytes::from(&validator_pubkey))
.await
.map(|fee_recipient| {
GenericResponse::from(GetFeeRecipientResponse {
pubkey: PublicKeyBytes::from(validator_pubkey.clone()),
@@ -894,10 +886,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
|validator_pubkey: PublicKey,
request: api_types::UpdateFeeRecipientRequest,
validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
async_json_task(async move {
if validator_store
.initialized_validators()
.read()
.await
.is_enabled(&validator_pubkey)
.is_none()
{
@@ -909,6 +902,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
validator_store
.initialized_validators()
.write()
.await
.set_validator_fee_recipient(&validator_pubkey, request.ethaddress)
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
@@ -919,7 +913,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
})
},
)
.map(|reply| warp::reply::with_status(reply, warp::http::StatusCode::ACCEPTED));
.map(|response| warp::reply::with_status(response, warp::http::StatusCode::ACCEPTED));
// DELETE /eth/v1/validator/{pubkey}/feerecipient
let delete_fee_recipient = eth_v1
@@ -930,10 +924,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.then(
|validator_pubkey: PublicKey, validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
async_json_task(async move {
if validator_store
.initialized_validators()
.read()
.await
.is_enabled(&validator_pubkey)
.is_none()
{
@@ -945,6 +940,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
validator_store
.initialized_validators()
.write()
.await
.delete_validator_fee_recipient(&validator_pubkey)
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
@@ -966,10 +962,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.then(
|validator_pubkey: PublicKey, validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
async_json_task(async move {
if validator_store
.initialized_validators()
.read()
.await
.is_enabled(&validator_pubkey)
.is_none()
{
@@ -981,7 +978,8 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
Ok(GenericResponse::from(GetGasLimitResponse {
pubkey: PublicKeyBytes::from(validator_pubkey.clone()),
gas_limit: validator_store
.get_gas_limit(&PublicKeyBytes::from(&validator_pubkey)),
.get_gas_limit(&PublicKeyBytes::from(&validator_pubkey))
.await,
}))
})
},
@@ -999,10 +997,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
|validator_pubkey: PublicKey,
request: api_types::UpdateGasLimitRequest,
validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
async_json_task(async move {
if validator_store
.initialized_validators()
.read()
.await
.is_enabled(&validator_pubkey)
.is_none()
{
@@ -1014,6 +1013,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
validator_store
.initialized_validators()
.write()
.await
.set_validator_gas_limit(&validator_pubkey, request.gas_limit)
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
@@ -1035,10 +1035,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.then(
|validator_pubkey: PublicKey, validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
async_json_task(async move {
if validator_store
.initialized_validators()
.read()
.await
.is_enabled(&validator_pubkey)
.is_none()
{
@@ -1050,6 +1051,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
validator_store
.initialized_validators()
.write()
.await
.delete_validator_gas_limit(&validator_pubkey)
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
@@ -1109,8 +1111,9 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
|pubkey: PublicKey,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
graffiti_flag: Option<Graffiti>| {
blocking_json_task(move || {
let graffiti = get_graffiti(pubkey.clone(), validator_store, graffiti_flag)?;
async_json_task(async move {
let graffiti =
get_graffiti(pubkey.clone(), validator_store, graffiti_flag).await?;
Ok(GenericResponse::from(GetGraffitiResponse {
pubkey: pubkey.into(),
graffiti,
@@ -1133,14 +1136,14 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
query: SetGraffitiRequest,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>| {
blocking_json_task(move || {
async_json_task(async move {
if graffiti_file.is_some() {
return Err(warp_utils::reject::invalid_auth(
"Unable to update graffiti as the \"--graffiti-file\" flag is set"
.to_string(),
));
}
set_graffiti(pubkey.clone(), query.graffiti, validator_store)
set_graffiti(pubkey.clone(), query.graffiti, validator_store).await
})
},
)
@@ -1158,14 +1161,14 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
|pubkey: PublicKey,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>| {
blocking_json_task(move || {
async_json_task(async move {
if graffiti_file.is_some() {
return Err(warp_utils::reject::invalid_auth(
"Unable to delete graffiti as the \"--graffiti-file\" flag is set"
.to_string(),
));
}
delete_graffiti(pubkey.clone(), validator_store)
delete_graffiti(pubkey.clone(), validator_store).await
})
},
)
@@ -1174,7 +1177,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
// GET /eth/v1/keystores
let get_std_keystores = std_keystores.and(validator_store_filter.clone()).then(
|validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || Ok(keystores::list(validator_store)))
async_json_task(async move { Ok(keystores::list(validator_store).await) })
},
);
@@ -1188,7 +1191,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.then(
move |request, validator_dir, secrets_dir, validator_store, task_executor| {
let secrets_dir = store_passwords_in_secrets_dir.then_some(secrets_dir);
blocking_json_task(move || {
async_json_task(async move {
keystores::import::<_, E>(
request,
validator_dir,
@@ -1196,6 +1199,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
validator_store,
task_executor,
)
.await
})
},
);
@@ -1204,15 +1208,14 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
let delete_std_keystores = std_keystores
.and(warp::body::json())
.and(validator_store_filter.clone())
.and(task_executor_filter.clone())
.then(|request, validator_store, task_executor| {
blocking_json_task(move || keystores::delete(request, validator_store, task_executor))
.then(|request, validator_store| {
async_json_task(async move { keystores::delete(request, validator_store).await })
});
// GET /eth/v1/remotekeys
let get_std_remotekeys = std_remotekeys.and(validator_store_filter.clone()).then(
|validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || Ok(remotekeys::list(validator_store)))
async_json_task(async move { Ok(remotekeys::list(validator_store).await) })
},
);
@@ -1220,20 +1223,18 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
let post_std_remotekeys = std_remotekeys
.and(warp::body::json())
.and(validator_store_filter.clone())
.and(task_executor_filter.clone())
.then(|request, validator_store, task_executor| {
blocking_json_task(move || {
remotekeys::import::<_, E>(request, validator_store, task_executor)
})
.then(|request, validator_store| {
async_json_task(
async move { remotekeys::import::<_, E>(request, validator_store).await },
)
});
// DELETE /eth/v1/remotekeys
let delete_std_remotekeys = std_remotekeys
.and(warp::body::json())
.and(validator_store_filter)
.and(task_executor_filter)
.then(|request, validator_store, task_executor| {
blocking_json_task(move || remotekeys::delete(request, validator_store, task_executor))
.then(|request, validator_store| {
async_json_task(async move { remotekeys::delete(request, validator_store).await })
});
// Subscribe to get VC logs via Server side events

View File

@@ -11,19 +11,17 @@ use initialized_validators::{Error, InitializedValidators};
use lighthouse_validator_store::LighthouseValidatorStore;
use slot_clock::SlotClock;
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::runtime::Handle;
use tracing::{info, warn};
use types::{EthSpec, PublicKeyBytes};
use url::Url;
use warp::Rejection;
use warp_utils::reject::custom_server_error;
pub fn list<T: SlotClock + 'static, E: EthSpec>(
pub async fn list<T: SlotClock + 'static, E: EthSpec>(
validator_store: Arc<LighthouseValidatorStore<T, E>>,
) -> ListRemotekeysResponse {
let initialized_validators_rwlock = validator_store.initialized_validators();
let initialized_validators = initialized_validators_rwlock.read();
let initialized_validators = initialized_validators_rwlock.read().await;
let keystores = initialized_validators
.validator_definitions()
@@ -48,10 +46,9 @@ pub fn list<T: SlotClock + 'static, E: EthSpec>(
ListRemotekeysResponse { data: keystores }
}
pub fn import<T: SlotClock + 'static, E: EthSpec>(
pub async fn import<T: SlotClock + 'static, E: EthSpec>(
request: ImportRemotekeysRequest,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
task_executor: TaskExecutor,
) -> Result<ImportRemotekeysResponse, Rejection> {
info!(
count = request.remote_keys.len(),
@@ -61,40 +58,33 @@ pub fn import<T: SlotClock + 'static, E: EthSpec>(
let mut statuses = Vec::with_capacity(request.remote_keys.len());
for remotekey in request.remote_keys {
let status = if let Some(handle) = task_executor.handle() {
// Import the keystore.
match import_single_remotekey::<_, E>(
remotekey.pubkey,
remotekey.url,
&validator_store,
handle,
) {
Ok(status) => Status::ok(status),
Err(e) => {
warn!(
pubkey = remotekey.pubkey.to_string(),
error = ?e,
"Error importing keystore, skipped"
);
Status::error(ImportRemotekeyStatus::Error, e)
}
// Import the keystore.
let status = match import_single_remotekey::<_, E>(
remotekey.pubkey,
remotekey.url,
&validator_store,
)
.await
{
Ok(status) => Status::ok(status),
Err(e) => {
warn!(
pubkey = remotekey.pubkey.to_string(),
error = ?e,
"Error importing keystore, skipped"
);
Status::error(ImportRemotekeyStatus::Error, e)
}
} else {
Status::error(
ImportRemotekeyStatus::Error,
"validator client shutdown".into(),
)
};
statuses.push(status);
}
Ok(ImportRemotekeysResponse { data: statuses })
}
fn import_single_remotekey<T: SlotClock + 'static, E: EthSpec>(
async fn import_single_remotekey<T: SlotClock + 'static, E: EthSpec>(
pubkey: PublicKeyBytes,
url: String,
validator_store: &LighthouseValidatorStore<T, E>,
handle: Handle,
) -> Result<ImportRemotekeyStatus, String> {
if let Err(url_err) = Url::parse(&url) {
return Err(format!("failed to parse remotekey URL: {}", url_err));
@@ -107,6 +97,7 @@ fn import_single_remotekey<T: SlotClock + 'static, E: EthSpec>(
if let Some(def) = validator_store
.initialized_validators()
.read()
.await
.validator_definitions()
.iter()
.find(|def| def.voting_public_key == pubkey)
@@ -138,17 +129,17 @@ fn import_single_remotekey<T: SlotClock + 'static, E: EthSpec>(
client_identity_password: None,
}),
};
handle
.block_on(validator_store.add_validator(web3signer_validator))
validator_store
.add_validator(web3signer_validator)
.await
.map_err(|e| format!("failed to initialize validator: {:?}", e))?;
Ok(ImportRemotekeyStatus::Imported)
}
pub fn delete<T: SlotClock + 'static, E: EthSpec>(
pub async fn delete<T: SlotClock + 'static, E: EthSpec>(
request: DeleteRemotekeysRequest,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
task_executor: TaskExecutor,
) -> Result<DeleteRemotekeysResponse, Rejection> {
info!(
count = request.pubkeys.len(),
@@ -156,61 +147,51 @@ pub fn delete<T: SlotClock + 'static, E: EthSpec>(
);
// Remove from initialized validators.
let initialized_validators_rwlock = validator_store.initialized_validators();
let mut initialized_validators = initialized_validators_rwlock.write();
let mut initialized_validators = initialized_validators_rwlock.write().await;
let statuses = request
.pubkeys
.iter()
.map(|pubkey_bytes| {
match delete_single_remotekey(
pubkey_bytes,
&mut initialized_validators,
task_executor.clone(),
) {
Ok(status) => Status::ok(status),
Err(error) => {
warn!(
pubkey = ?pubkey_bytes,
?error,
"Error deleting keystore"
);
Status::error(DeleteRemotekeyStatus::Error, error)
}
let mut statuses = vec![];
for pubkey_bytes in &request.pubkeys {
match delete_single_remotekey(pubkey_bytes, &mut initialized_validators).await {
Ok(status) => statuses.push(Status::ok(status)),
Err(error) => {
warn!(
pubkey = ?pubkey_bytes,
?error,
"Error deleting keystore"
);
statuses.push(Status::error(DeleteRemotekeyStatus::Error, error))
}
})
.collect::<Vec<_>>();
}
}
// Use `update_validators` to update the key cache. It is safe to let the key cache get a bit out
// of date as it resets when it can't be decrypted. We update it just a single time to avoid
// continually resetting it after each key deletion.
if let Some(handle) = task_executor.handle() {
handle
.block_on(initialized_validators.update_validators())
.map_err(|e| custom_server_error(format!("unable to update key cache: {:?}", e)))?;
}
initialized_validators
.update_validators()
.await
.map_err(|e| custom_server_error(format!("unable to update key cache: {:?}", e)))?;
Ok(DeleteRemotekeysResponse { data: statuses })
}
fn delete_single_remotekey(
async fn delete_single_remotekey(
pubkey_bytes: &PublicKeyBytes,
initialized_validators: &mut InitializedValidators,
task_executor: TaskExecutor,
) -> Result<DeleteRemotekeyStatus, String> {
if let Some(handle) = task_executor.handle() {
let pubkey = pubkey_bytes
.decompress()
.map_err(|e| format!("invalid pubkey, {:?}: {:?}", pubkey_bytes, e))?;
let pubkey = pubkey_bytes
.decompress()
.map_err(|e| format!("invalid pubkey, {:?}: {:?}", pubkey_bytes, e))?;
match handle.block_on(initialized_validators.delete_definition_and_keystore(&pubkey, false))
{
Ok(_) => Ok(DeleteRemotekeyStatus::Deleted),
Err(e) => match e {
Error::ValidatorNotInitialized(_) => Ok(DeleteRemotekeyStatus::NotFound),
_ => Err(format!("unable to disable and delete: {:?}", e)),
},
}
} else {
Err("validator client shutdown".into())
match initialized_validators
.delete_definition_and_keystore(&pubkey, false)
.await
{
Ok(_) => Ok(DeleteRemotekeyStatus::Deleted),
Err(e) => match e {
Error::ValidatorNotInitialized(_) => Ok(DeleteRemotekeyStatus::NotFound),
_ => Err(format!("unable to disable and delete: {:?}", e)),
},
}
}

View File

@@ -15,7 +15,6 @@ use eth2_keystore::KeystoreBuilder;
use initialized_validators::key_cache::{KeyCache, CACHE_FILENAME};
use initialized_validators::{InitializedValidators, OnDecryptFailure};
use lighthouse_validator_store::{Config as ValidatorStoreConfig, LighthouseValidatorStore};
use parking_lot::RwLock;
use sensitive_url::SensitiveUrl;
use slashing_protection::{SlashingDatabase, SLASHING_PROTECTION_FILENAME};
use slot_clock::{SlotClock, TestingSlotClock};
@@ -25,7 +24,7 @@ use std::sync::Arc;
use std::time::Duration;
use task_executor::test_utils::TestRuntime;
use tempfile::{tempdir, TempDir};
use tokio::sync::oneshot;
use tokio::sync::{oneshot, RwLock};
use validator_services::block_service::BlockService;
use zeroize::Zeroizing;
@@ -114,6 +113,7 @@ impl ApiTester {
validator_store
.register_all_in_doppelganger_protection_if_enabled()
.await
.expect("Should attach doppelganger service");
let initialized_validators = validator_store.initialized_validators();
@@ -189,6 +189,7 @@ impl ApiTester {
self.initialized_validators
.read()
.await
.decrypt_key_cache(key_cache, &mut <_>::default(), OnDecryptFailure::Error)
.await
.expect("key cache should decypt");
@@ -280,27 +281,27 @@ impl ApiTester {
self
}
pub fn vals_total(&self) -> usize {
self.initialized_validators.read().num_total()
pub async fn vals_total(&self) -> usize {
self.initialized_validators.read().await.num_total()
}
pub fn vals_enabled(&self) -> usize {
self.initialized_validators.read().num_enabled()
pub async fn vals_enabled(&self) -> usize {
self.initialized_validators.read().await.num_enabled()
}
pub fn assert_enabled_validators_count(self, count: usize) -> Self {
assert_eq!(self.vals_enabled(), count);
pub async fn assert_enabled_validators_count(self, count: usize) -> Self {
assert_eq!(self.vals_enabled().await, count);
self
}
pub fn assert_validators_count(self, count: usize) -> Self {
assert_eq!(self.vals_total(), count);
pub async fn assert_validators_count(self, count: usize) -> Self {
assert_eq!(self.vals_total().await, count);
self
}
pub async fn create_hd_validators(self, s: HdValidatorScenario) -> Self {
let initial_vals = self.vals_total();
let initial_enabled_vals = self.vals_enabled();
let initial_vals = self.vals_total().await;
let initial_enabled_vals = self.vals_enabled().await;
let validators = (0..s.count)
.map(|i| ValidatorRequest {
@@ -346,15 +347,15 @@ impl ApiTester {
};
assert_eq!(response.len(), s.count);
assert_eq!(self.vals_total(), initial_vals + s.count);
assert_eq!(self.vals_total().await, initial_vals + s.count);
assert_eq!(
self.vals_enabled(),
self.vals_enabled().await,
initial_enabled_vals + s.count - s.disabled.len()
);
let server_vals = self.client.get_lighthouse_validators().await.unwrap().data;
assert_eq!(server_vals.len(), self.vals_total());
assert_eq!(server_vals.len(), self.vals_total().await);
// Ensure the server lists all of these newly created validators.
for validator in &response {
@@ -423,8 +424,8 @@ impl ApiTester {
}
pub async fn create_keystore_validators(self, s: KeystoreValidatorScenario) -> Self {
let initial_vals = self.vals_total();
let initial_enabled_vals = self.vals_enabled();
let initial_vals = self.vals_total().await;
let initial_enabled_vals = self.vals_enabled().await;
let password = random_password();
let keypair = Keypair::random();
@@ -479,12 +480,15 @@ impl ApiTester {
let num_enabled = s.enabled as usize;
assert_eq!(self.vals_total(), initial_vals + 1);
assert_eq!(self.vals_enabled(), initial_enabled_vals + num_enabled);
assert_eq!(self.vals_total().await, initial_vals + 1);
assert_eq!(
self.vals_enabled().await,
initial_enabled_vals + num_enabled
);
let server_vals = self.client.get_lighthouse_validators().await.unwrap().data;
assert_eq!(server_vals.len(), self.vals_total());
assert_eq!(server_vals.len(), self.vals_total().await);
assert_eq!(response.voting_pubkey, keypair.pk.into());
assert_eq!(response.enabled, s.enabled);
@@ -493,8 +497,8 @@ impl ApiTester {
}
pub async fn create_web3signer_validators(self, s: Web3SignerValidatorScenario) -> Self {
let initial_vals = self.vals_total();
let initial_enabled_vals = self.vals_enabled();
let initial_vals = self.vals_total().await;
let initial_enabled_vals = self.vals_enabled().await;
let request: Vec<_> = (0..s.count)
.map(|i| {
@@ -523,11 +527,11 @@ impl ApiTester {
.await
.unwrap();
assert_eq!(self.vals_total(), initial_vals + s.count);
assert_eq!(self.vals_total().await, initial_vals + s.count);
if s.enabled {
assert_eq!(self.vals_enabled(), initial_enabled_vals + s.count);
assert_eq!(self.vals_enabled().await, initial_enabled_vals + s.count);
} else {
assert_eq!(self.vals_enabled(), initial_enabled_vals);
assert_eq!(self.vals_enabled().await, initial_enabled_vals);
};
self
@@ -552,6 +556,7 @@ impl ApiTester {
assert_eq!(
self.initialized_validators
.read()
.await
.is_enabled(&validator.voting_pubkey.decompress().unwrap())
.unwrap(),
enabled
@@ -606,7 +611,9 @@ impl ApiTester {
let validator = &self.client.get_lighthouse_validators().await.unwrap().data[index];
assert_eq!(
self.validator_store.get_gas_limit(&validator.voting_pubkey),
self.validator_store
.get_gas_limit(&validator.voting_pubkey)
.await,
gas_limit
);
@@ -637,7 +644,8 @@ impl ApiTester {
assert_eq!(
self.validator_store
.get_builder_proposals_testing_only(&validator.voting_pubkey),
.get_builder_proposals_testing_only(&validator.voting_pubkey)
.await,
builder_proposals
);

View File

@@ -19,7 +19,6 @@ use eth2::{
};
use eth2_keystore::KeystoreBuilder;
use lighthouse_validator_store::{Config as ValidatorStoreConfig, LighthouseValidatorStore};
use parking_lot::RwLock;
use sensitive_url::SensitiveUrl;
use slashing_protection::{SlashingDatabase, SLASHING_PROTECTION_FILENAME};
use slot_clock::{SlotClock, TestingSlotClock};
@@ -30,6 +29,7 @@ use std::sync::Arc;
use std::time::Duration;
use task_executor::test_utils::TestRuntime;
use tempfile::{tempdir, TempDir};
use tokio::sync::RwLock;
use types::graffiti::GraffitiString;
use validator_store::ValidatorStore;
use zeroize::Zeroizing;
@@ -104,6 +104,7 @@ impl ApiTester {
validator_store
.register_all_in_doppelganger_protection_if_enabled()
.await
.expect("Should attach doppelganger service");
let initialized_validators = validator_store.initialized_validators();
@@ -243,27 +244,27 @@ impl ApiTester {
self
}
pub fn vals_total(&self) -> usize {
self.initialized_validators.read().num_total()
pub async fn vals_total(&self) -> usize {
self.initialized_validators.read().await.num_total()
}
pub fn vals_enabled(&self) -> usize {
self.initialized_validators.read().num_enabled()
pub async fn vals_enabled(&self) -> usize {
self.initialized_validators.read().await.num_enabled()
}
pub fn assert_enabled_validators_count(self, count: usize) -> Self {
assert_eq!(self.vals_enabled(), count);
pub async fn assert_enabled_validators_count(self, count: usize) -> Self {
assert_eq!(self.vals_enabled().await, count);
self
}
pub fn assert_validators_count(self, count: usize) -> Self {
assert_eq!(self.vals_total(), count);
pub async fn assert_validators_count(self, count: usize) -> Self {
assert_eq!(self.vals_total().await, count);
self
}
pub async fn create_hd_validators(self, s: HdValidatorScenario) -> Self {
let initial_vals = self.vals_total();
let initial_enabled_vals = self.vals_enabled();
let initial_vals = self.vals_total().await;
let initial_enabled_vals = self.vals_enabled().await;
let validators = (0..s.count)
.map(|i| ValidatorRequest {
@@ -309,15 +310,15 @@ impl ApiTester {
};
assert_eq!(response.len(), s.count);
assert_eq!(self.vals_total(), initial_vals + s.count);
assert_eq!(self.vals_total().await, initial_vals + s.count);
assert_eq!(
self.vals_enabled(),
self.vals_enabled().await,
initial_enabled_vals + s.count - s.disabled.len()
);
let server_vals = self.client.get_lighthouse_validators().await.unwrap().data;
assert_eq!(server_vals.len(), self.vals_total());
assert_eq!(server_vals.len(), self.vals_total().await);
// Ensure the server lists all of these newly created validators.
for validator in &response {
@@ -386,8 +387,8 @@ impl ApiTester {
}
pub async fn create_keystore_validators(self, s: KeystoreValidatorScenario) -> Self {
let initial_vals = self.vals_total();
let initial_enabled_vals = self.vals_enabled();
let initial_vals = self.vals_total().await;
let initial_enabled_vals = self.vals_enabled().await;
let password = random_password();
let keypair = Keypair::random();
@@ -442,12 +443,15 @@ impl ApiTester {
let num_enabled = s.enabled as usize;
assert_eq!(self.vals_total(), initial_vals + 1);
assert_eq!(self.vals_enabled(), initial_enabled_vals + num_enabled);
assert_eq!(self.vals_total().await, initial_vals + 1);
assert_eq!(
self.vals_enabled().await,
initial_enabled_vals + num_enabled
);
let server_vals = self.client.get_lighthouse_validators().await.unwrap().data;
assert_eq!(server_vals.len(), self.vals_total());
assert_eq!(server_vals.len(), self.vals_total().await);
assert_eq!(response.voting_pubkey, keypair.pk.into());
assert_eq!(response.enabled, s.enabled);
@@ -486,11 +490,11 @@ impl ApiTester {
.await
.unwrap();
assert_eq!(self.vals_total(), initial_vals + s.count);
assert_eq!(self.vals_total().await, initial_vals + s.count);
if s.enabled {
assert_eq!(self.vals_enabled(), initial_enabled_vals + s.count);
assert_eq!(self.vals_enabled().await, initial_enabled_vals + s.count);
} else {
assert_eq!(self.vals_enabled(), initial_enabled_vals);
assert_eq!(self.vals_enabled().await, initial_enabled_vals);
};
self
@@ -501,6 +505,7 @@ impl ApiTester {
// manually setting validator index in `ValidatorStore`
self.initialized_validators
.write()
.await
.set_index(&validator.voting_pubkey, 0);
let expected_exit_epoch = maybe_epoch.unwrap_or_else(|| self.get_current_epoch());
@@ -542,6 +547,7 @@ impl ApiTester {
assert_eq!(
self.initialized_validators
.read()
.await
.is_enabled(&validator.voting_pubkey.decompress().unwrap())
.unwrap(),
enabled
@@ -596,7 +602,9 @@ impl ApiTester {
let validator = &self.client.get_lighthouse_validators().await.unwrap().data[index];
assert_eq!(
self.validator_store.get_gas_limit(&validator.voting_pubkey),
self.validator_store
.get_gas_limit(&validator.voting_pubkey)
.await,
gas_limit
);
@@ -669,7 +677,8 @@ impl ApiTester {
assert_eq!(
self.validator_store
.get_builder_proposals_testing_only(&validator.voting_pubkey),
.get_builder_proposals_testing_only(&validator.voting_pubkey)
.await,
builder_proposals
);
@@ -685,7 +694,8 @@ impl ApiTester {
assert_eq!(
self.validator_store
.get_builder_boost_factor_testing_only(&validator.voting_pubkey),
.get_builder_boost_factor_testing_only(&validator.voting_pubkey)
.await,
builder_boost_factor
);
@@ -701,17 +711,22 @@ impl ApiTester {
assert_eq!(
self.validator_store
.determine_builder_boost_factor(&validator.voting_pubkey),
.determine_builder_boost_factor(&validator.voting_pubkey)
.await,
builder_boost_factor
);
self
}
pub fn assert_default_builder_boost_factor(self, builder_boost_factor: Option<u64>) -> Self {
pub async fn assert_default_builder_boost_factor(
self,
builder_boost_factor: Option<u64>,
) -> Self {
assert_eq!(
self.validator_store
.determine_builder_boost_factor(&PublicKeyBytes::empty()),
.determine_builder_boost_factor(&PublicKeyBytes::empty())
.await,
builder_boost_factor
);
@@ -727,7 +742,8 @@ impl ApiTester {
assert_eq!(
self.validator_store
.get_prefer_builder_proposals_testing_only(&validator.voting_pubkey),
.get_prefer_builder_proposals_testing_only(&validator.voting_pubkey)
.await,
prefer_builder_proposals
);
@@ -757,7 +773,9 @@ impl ApiTester {
let validator = &self.client.get_lighthouse_validators().await.unwrap().data[index];
let graffiti_str = GraffitiString::from_str(graffiti).unwrap();
assert_eq!(
self.validator_store.graffiti(&validator.voting_pubkey),
self.validator_store
.graffiti(&validator.voting_pubkey)
.await,
Some(graffiti_str.into())
);

View File

@@ -2152,6 +2152,7 @@ async fn import_remotekey_web3signer_enabled() {
let web3_vals = tester
.initialized_validators
.read()
.await
.validator_definitions()
.to_vec();
@@ -2171,7 +2172,7 @@ async fn import_remotekey_web3signer_enabled() {
assert_eq!(tester.vals_total(), 1);
assert_eq!(tester.vals_enabled(), 1);
{
let vals = tester.initialized_validators.read();
let vals = tester.initialized_validators.read().await;
let remote_vals = vals.validator_definitions();
// Web3signer should not be overwritten since it is enabled.