diff --git a/Cargo.lock b/Cargo.lock index ff87c32783..6bd4ee2cb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9790,6 +9790,7 @@ dependencies = [ "parking_lot 0.12.3", "serde", "slot_clock", + "tokio", "tracing", "types", "validator_metrics", diff --git a/common/warp_utils/src/task.rs b/common/warp_utils/src/task.rs index e2fa4ebc36..0be3e4e1bb 100644 --- a/common/warp_utils/src/task.rs +++ b/common/warp_utils/src/task.rs @@ -1,5 +1,6 @@ use crate::reject::convert_rejection; use serde::Serialize; +use std::future::Future; use warp::reply::{Reply, Response}; /// A convenience wrapper around `blocking_task`. @@ -38,3 +39,13 @@ where convert_rejection(result).await } + +/// Async variant of `blocking_json_task`, which handles JSONification and error conversion. +pub async fn async_json_task(fut: F) -> Response +where + F: Future>, + T: Serialize + Send + 'static, +{ + let result = fut.await.map(|response| warp::reply::json(&response)); + convert_rejection(result).await +} diff --git a/validator_client/http_api/src/create_signed_voluntary_exit.rs b/validator_client/http_api/src/create_signed_voluntary_exit.rs index b536a6aa7a..2048f819f3 100644 --- a/validator_client/http_api/src/create_signed_voluntary_exit.rs +++ b/validator_client/http_api/src/create_signed_voluntary_exit.rs @@ -21,7 +21,7 @@ pub async fn create_signed_voluntary_exit( +pub async fn get_graffiti( validator_pubkey: PublicKey, validator_store: Arc>, graffiti_flag: Option, ) -> Result { let initialized_validators_rw_lock = validator_store.initialized_validators(); - let initialized_validators = initialized_validators_rw_lock.read(); + let initialized_validators = initialized_validators_rw_lock.read().await; match initialized_validators.validator(&validator_pubkey.compress()) { None => Err(warp_utils::reject::custom_not_found( "The key was not found on the server".to_string(), @@ -26,13 +26,13 @@ pub fn get_graffiti( } } -pub fn set_graffiti( +pub async fn set_graffiti( validator_pubkey: PublicKey, graffiti: GraffitiString, validator_store: Arc>, ) -> Result<(), warp::Rejection> { let initialized_validators_rw_lock = validator_store.initialized_validators(); - let mut initialized_validators = initialized_validators_rw_lock.write(); + let mut initialized_validators = initialized_validators_rw_lock.write().await; match initialized_validators.validator(&validator_pubkey.compress()) { None => Err(warp_utils::reject::custom_not_found( "The key was not found on the server, nothing to update".to_string(), @@ -53,12 +53,12 @@ pub fn set_graffiti( } } -pub fn delete_graffiti( +pub async fn delete_graffiti( validator_pubkey: PublicKey, validator_store: Arc>, ) -> Result<(), warp::Rejection> { let initialized_validators_rw_lock = validator_store.initialized_validators(); - let mut initialized_validators = initialized_validators_rw_lock.write(); + let mut initialized_validators = initialized_validators_rw_lock.write().await; match initialized_validators.validator(&validator_pubkey.compress()) { None => Err(warp_utils::reject::custom_not_found( "The key was not found on the server, nothing to delete".to_string(), diff --git a/validator_client/http_api/src/keystores.rs b/validator_client/http_api/src/keystores.rs index 302b21d7d8..3db85c7d2a 100644 --- a/validator_client/http_api/src/keystores.rs +++ b/validator_client/http_api/src/keystores.rs @@ -16,7 +16,6 @@ use slot_clock::SlotClock; use std::path::PathBuf; use std::sync::Arc; use task_executor::TaskExecutor; -use tokio::runtime::Handle; use tracing::{info, warn}; use types::{EthSpec, PublicKeyBytes}; use validator_dir::{keystore_password_path, Builder as ValidatorDirBuilder}; @@ -24,11 +23,11 @@ use warp::Rejection; use warp_utils::reject::{custom_bad_request, custom_server_error}; use zeroize::Zeroizing; -pub fn list( +pub async fn list( validator_store: Arc>, ) -> ListKeystoresResponse { let initialized_validators_rwlock = validator_store.initialized_validators(); - let initialized_validators = initialized_validators_rwlock.read(); + let initialized_validators = initialized_validators_rwlock.read().await; let keystores = initialized_validators .validator_definitions() @@ -58,12 +57,12 @@ pub fn list( ListKeystoresResponse { data: keystores } } -pub fn import( +pub async fn import( request: ImportKeystoresRequest, validator_dir: PathBuf, secrets_dir: Option, validator_store: Arc>, - task_executor: TaskExecutor, + _task_executor: TaskExecutor, ) -> Result { // Check request validity. This is the only cases in which we should return a 4xx code. if request.keystores.len() != request.passwords.len() { @@ -115,7 +114,7 @@ pub fn import( ImportKeystoreStatus::Error, format!("slashing protection import failed: {:?}", e), ) - } else if let Some(handle) = task_executor.handle() { + } else { // Import the keystore. match import_single_keystore::<_, E>( keystore, @@ -123,8 +122,9 @@ pub fn import( validator_dir.clone(), secrets_dir.clone(), &validator_store, - handle, - ) { + ) + .await + { Ok(status) => Status::ok(status), Err(e) => { warn!( @@ -135,11 +135,6 @@ pub fn import( Status::error(ImportKeystoreStatus::Error, e) } } - } else { - Status::error( - ImportKeystoreStatus::Error, - "validator client shutdown".into(), - ) }; statuses.push(status); } @@ -159,13 +154,12 @@ pub fn import( Ok(ImportKeystoresResponse { data: statuses }) } -fn import_single_keystore( +async fn import_single_keystore( keystore: Keystore, password: Zeroizing, validator_dir_path: PathBuf, secrets_dir: Option, validator_store: &LighthouseValidatorStore, - handle: Handle, ) -> Result { // Check if the validator key already exists, erroring if it is a remote signer validator. let pubkey = keystore @@ -174,6 +168,7 @@ fn import_single_keystore( if let Some(def) = validator_store .initialized_validators() .read() + .await .validator_definitions() .iter() .find(|def| def.voting_public_key == pubkey) @@ -215,8 +210,8 @@ fn import_single_keystore( let voting_keystore_path = validator_dir.voting_keystore_path(); drop(validator_dir); - handle - .block_on(validator_store.add_validator_keystore( + validator_store + .add_validator_keystore( voting_keystore_path, password_storage, true, @@ -226,18 +221,18 @@ fn import_single_keystore( None, None, None, - )) + ) + .await .map_err(|e| format!("failed to initialize validator: {:?}", e))?; Ok(ImportKeystoreStatus::Imported) } -pub fn delete( +pub async fn delete( request: DeleteKeystoresRequest, validator_store: Arc>, - task_executor: TaskExecutor, ) -> Result { - let export_response = export(request, validator_store, task_executor)?; + let export_response = export(request, validator_store).await?; // Check the status is Deleted to confirm deletion is successful, then only display the log let successful_deletion = export_response @@ -263,49 +258,40 @@ pub fn delete( }) } -pub fn export( +pub async fn export( request: DeleteKeystoresRequest, validator_store: Arc>, - task_executor: TaskExecutor, ) -> Result { // Remove from initialized validators. let initialized_validators_rwlock = validator_store.initialized_validators(); - let mut initialized_validators = initialized_validators_rwlock.write(); + let mut initialized_validators = initialized_validators_rwlock.write().await; - let mut responses = request - .pubkeys - .iter() - .map(|pubkey_bytes| { - match delete_single_keystore( - pubkey_bytes, - &mut initialized_validators, - task_executor.clone(), - ) { - Ok(status) => status, - Err(error) => { - warn!( - pubkey = ?pubkey_bytes, - ?error, - "Error deleting keystore" - ); - SingleExportKeystoresResponse { - status: Status::error(DeleteKeystoreStatus::Error, error), - validating_keystore: None, - validating_keystore_password: None, - } - } + let mut responses = vec![]; + for pubkey_bytes in &request.pubkeys { + match delete_single_keystore(pubkey_bytes, &mut initialized_validators).await { + Ok(status) => responses.push(status), + Err(error) => { + warn!( + pubkey = ?pubkey_bytes, + ?error, + "Error deleting keystore" + ); + responses.push(SingleExportKeystoresResponse { + status: Status::error(DeleteKeystoreStatus::Error, error), + validating_keystore: None, + validating_keystore_password: None, + }) } - }) - .collect::>(); + } + } // Use `update_validators` to update the key cache. It is safe to let the key cache get a bit out // of date as it resets when it can't be decrypted. We update it just a single time to avoid // continually resetting it after each key deletion. - if let Some(handle) = task_executor.handle() { - handle - .block_on(initialized_validators.update_validators()) - .map_err(|e| custom_server_error(format!("unable to update key cache: {:?}", e)))?; - } + initialized_validators + .update_validators() + .await + .map_err(|e| custom_server_error(format!("unable to update key cache: {:?}", e)))?; // Export the slashing protection data. let slashing_protection = validator_store @@ -332,38 +318,35 @@ pub fn export( }) } -fn delete_single_keystore( +async fn delete_single_keystore( pubkey_bytes: &PublicKeyBytes, initialized_validators: &mut InitializedValidators, - task_executor: TaskExecutor, ) -> Result { - if let Some(handle) = task_executor.handle() { - let pubkey = pubkey_bytes - .decompress() - .map_err(|e| format!("invalid pubkey, {:?}: {:?}", pubkey_bytes, e))?; + let pubkey = pubkey_bytes + .decompress() + .map_err(|e| format!("invalid pubkey, {:?}: {:?}", pubkey_bytes, e))?; - match handle.block_on(initialized_validators.delete_definition_and_keystore(&pubkey, true)) - { - Ok(Some(keystore_and_password)) => Ok(SingleExportKeystoresResponse { - status: Status::ok(DeleteKeystoreStatus::Deleted), - validating_keystore: Some(KeystoreJsonStr(keystore_and_password.keystore)), - validating_keystore_password: keystore_and_password.password, - }), - Ok(None) => Ok(SingleExportKeystoresResponse { - status: Status::ok(DeleteKeystoreStatus::Deleted), + match initialized_validators + .delete_definition_and_keystore(&pubkey, true) + .await + { + Ok(Some(keystore_and_password)) => Ok(SingleExportKeystoresResponse { + status: Status::ok(DeleteKeystoreStatus::Deleted), + validating_keystore: Some(KeystoreJsonStr(keystore_and_password.keystore)), + validating_keystore_password: keystore_and_password.password, + }), + Ok(None) => Ok(SingleExportKeystoresResponse { + status: Status::ok(DeleteKeystoreStatus::Deleted), + validating_keystore: None, + validating_keystore_password: None, + }), + Err(e) => match e { + Error::ValidatorNotInitialized(_) => Ok(SingleExportKeystoresResponse { + status: Status::ok(DeleteKeystoreStatus::NotFound), validating_keystore: None, validating_keystore_password: None, }), - Err(e) => match e { - Error::ValidatorNotInitialized(_) => Ok(SingleExportKeystoresResponse { - status: Status::ok(DeleteKeystoreStatus::NotFound), - validating_keystore: None, - validating_keystore_password: None, - }), - _ => Err(format!("unable to disable and delete: {:?}", e)), - }, - } - } else { - Err("validator client shutdown".into()) + _ => Err(format!("unable to disable and delete: {:?}", e)), + }, } } diff --git a/validator_client/http_api/src/lib.rs b/validator_client/http_api/src/lib.rs index aebe179567..ce0ffa5da9 100644 --- a/validator_client/http_api/src/lib.rs +++ b/validator_client/http_api/src/lib.rs @@ -54,7 +54,7 @@ use types::{ChainSpec, ConfigAndPreset, EthSpec}; use validator_dir::Builder as ValidatorDirBuilder; use validator_services::block_service::BlockService; use warp::{sse::Event, Filter}; -use warp_utils::task::blocking_json_task; +use warp_utils::task::{async_json_task, blocking_json_task}; #[derive(Debug)] pub enum Error { @@ -320,10 +320,11 @@ pub fn serve( .and(warp::path::end()) .and(validator_store_filter.clone()) .then(|validator_store: Arc>| { - blocking_json_task(move || { + async_json_task(async move { let validators = validator_store .initialized_validators() .read() + .await .validator_definitions() .iter() .map(|def| api_types::ValidatorData { @@ -345,10 +346,11 @@ pub fn serve( .and(validator_store_filter.clone()) .then( |validator_pubkey: PublicKey, validator_store: Arc>| { - blocking_json_task(move || { + async_json_task(async move { let validator = validator_store .initialized_validators() .read() + .await .validator_definitions() .iter() .find(|def| def.voting_public_key == validator_pubkey) @@ -397,11 +399,12 @@ pub fn serve( |validator_store: Arc>, graffiti_file: Option, graffiti_flag: Option| { - blocking_json_task(move || { + async_json_task(async move { let mut result = HashMap::new(); for (key, graffiti_definition) in validator_store .initialized_validators() .read() + .await .get_all_validators_graffiti() { let graffiti = determine_graffiti( @@ -697,12 +700,13 @@ pub fn serve( .and(graffiti_file_filter.clone()) .and(task_executor_filter.clone()) .then( + // FIXME(sproul): do we need task_executor for anything here? |validator_pubkey: PublicKey, body: api_types::ValidatorPatchRequest, validator_store: Arc>, graffiti_file: Option, - task_executor: TaskExecutor| { - blocking_json_task(move || { + _task_executor: TaskExecutor| { + async_json_task(async move { if body.graffiti.is_some() && graffiti_file.is_some() { return Err(warp_utils::reject::custom_bad_request( "Unable to update graffiti as the \"--graffiti-file\" flag is set" @@ -711,8 +715,8 @@ pub fn serve( } let maybe_graffiti = body.graffiti.clone().map(Into::into); - let initialized_validators_rw_lock = validator_store.initialized_validators(); - let initialized_validators = initialized_validators_rw_lock.upgradable_read(); + let initialized_validators_lock = validator_store.initialized_validators(); + let mut initialized_validators = initialized_validators_lock.write().await; // Do not make any changes if all fields are identical or unchanged. fn equal_or_none( @@ -770,38 +774,24 @@ pub fn serve( Ok(()) } (Some(_), _) => { - // Upgrade read lock only in the case where a write is actually - // required. - let mut initialized_validators_write = - parking_lot::RwLockUpgradableReadGuard::upgrade( - initialized_validators, - ); - if let Some(handle) = task_executor.handle() { - handle - .block_on( - initialized_validators_write - .set_validator_definition_fields( - &validator_pubkey, - body.enabled, - body.gas_limit, - body.builder_proposals, - body.builder_boost_factor, - body.prefer_builder_proposals, - body.graffiti, - ), - ) - .map_err(|e| { - warp_utils::reject::custom_server_error(format!( - "unable to set validator status: {:?}", - e - )) - })?; - Ok(()) - } else { - Err(warp_utils::reject::custom_server_error( - "Lighthouse shutting down".into(), - )) - } + initialized_validators + .set_validator_definition_fields( + &validator_pubkey, + body.enabled, + body.gas_limit, + body.builder_proposals, + body.builder_boost_factor, + body.prefer_builder_proposals, + body.graffiti, + ) + .await + .map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "unable to set validator status: {:?}", + e + )) + })?; + Ok(()) } } }) @@ -827,10 +817,10 @@ pub fn serve( .and(warp::body::json()) .and(validator_store_filter.clone()) .and(task_executor_filter.clone()) - .then(move |request, validator_store, task_executor| { - blocking_json_task(move || { + .then(move |request, validator_store, _task_executor| { + async_json_task(async move { if allow_keystore_export { - keystores::export(request, validator_store, task_executor) + keystores::export(request, validator_store).await } else { Err(warp_utils::reject::custom_bad_request( "keystore export is disabled".to_string(), @@ -853,10 +843,11 @@ pub fn serve( .and(validator_store_filter.clone()) .then( |validator_pubkey: PublicKey, validator_store: Arc>| { - blocking_json_task(move || { + async_json_task(async move { if validator_store .initialized_validators() .read() + .await .is_enabled(&validator_pubkey) .is_none() { @@ -867,6 +858,7 @@ pub fn serve( } validator_store .get_fee_recipient(&PublicKeyBytes::from(&validator_pubkey)) + .await .map(|fee_recipient| { GenericResponse::from(GetFeeRecipientResponse { pubkey: PublicKeyBytes::from(validator_pubkey.clone()), @@ -894,10 +886,11 @@ pub fn serve( |validator_pubkey: PublicKey, request: api_types::UpdateFeeRecipientRequest, validator_store: Arc>| { - blocking_json_task(move || { + async_json_task(async move { if validator_store .initialized_validators() .read() + .await .is_enabled(&validator_pubkey) .is_none() { @@ -909,6 +902,7 @@ pub fn serve( validator_store .initialized_validators() .write() + .await .set_validator_fee_recipient(&validator_pubkey, request.ethaddress) .map_err(|e| { warp_utils::reject::custom_server_error(format!( @@ -919,7 +913,7 @@ pub fn serve( }) }, ) - .map(|reply| warp::reply::with_status(reply, warp::http::StatusCode::ACCEPTED)); + .map(|response| warp::reply::with_status(response, warp::http::StatusCode::ACCEPTED)); // DELETE /eth/v1/validator/{pubkey}/feerecipient let delete_fee_recipient = eth_v1 @@ -930,10 +924,11 @@ pub fn serve( .and(validator_store_filter.clone()) .then( |validator_pubkey: PublicKey, validator_store: Arc>| { - blocking_json_task(move || { + async_json_task(async move { if validator_store .initialized_validators() .read() + .await .is_enabled(&validator_pubkey) .is_none() { @@ -945,6 +940,7 @@ pub fn serve( validator_store .initialized_validators() .write() + .await .delete_validator_fee_recipient(&validator_pubkey) .map_err(|e| { warp_utils::reject::custom_server_error(format!( @@ -966,10 +962,11 @@ pub fn serve( .and(validator_store_filter.clone()) .then( |validator_pubkey: PublicKey, validator_store: Arc>| { - blocking_json_task(move || { + async_json_task(async move { if validator_store .initialized_validators() .read() + .await .is_enabled(&validator_pubkey) .is_none() { @@ -981,7 +978,8 @@ pub fn serve( Ok(GenericResponse::from(GetGasLimitResponse { pubkey: PublicKeyBytes::from(validator_pubkey.clone()), gas_limit: validator_store - .get_gas_limit(&PublicKeyBytes::from(&validator_pubkey)), + .get_gas_limit(&PublicKeyBytes::from(&validator_pubkey)) + .await, })) }) }, @@ -999,10 +997,11 @@ pub fn serve( |validator_pubkey: PublicKey, request: api_types::UpdateGasLimitRequest, validator_store: Arc>| { - blocking_json_task(move || { + async_json_task(async move { if validator_store .initialized_validators() .read() + .await .is_enabled(&validator_pubkey) .is_none() { @@ -1014,6 +1013,7 @@ pub fn serve( validator_store .initialized_validators() .write() + .await .set_validator_gas_limit(&validator_pubkey, request.gas_limit) .map_err(|e| { warp_utils::reject::custom_server_error(format!( @@ -1035,10 +1035,11 @@ pub fn serve( .and(validator_store_filter.clone()) .then( |validator_pubkey: PublicKey, validator_store: Arc>| { - blocking_json_task(move || { + async_json_task(async move { if validator_store .initialized_validators() .read() + .await .is_enabled(&validator_pubkey) .is_none() { @@ -1050,6 +1051,7 @@ pub fn serve( validator_store .initialized_validators() .write() + .await .delete_validator_gas_limit(&validator_pubkey) .map_err(|e| { warp_utils::reject::custom_server_error(format!( @@ -1109,8 +1111,9 @@ pub fn serve( |pubkey: PublicKey, validator_store: Arc>, graffiti_flag: Option| { - blocking_json_task(move || { - let graffiti = get_graffiti(pubkey.clone(), validator_store, graffiti_flag)?; + async_json_task(async move { + let graffiti = + get_graffiti(pubkey.clone(), validator_store, graffiti_flag).await?; Ok(GenericResponse::from(GetGraffitiResponse { pubkey: pubkey.into(), graffiti, @@ -1133,14 +1136,14 @@ pub fn serve( query: SetGraffitiRequest, validator_store: Arc>, graffiti_file: Option| { - blocking_json_task(move || { + async_json_task(async move { if graffiti_file.is_some() { return Err(warp_utils::reject::invalid_auth( "Unable to update graffiti as the \"--graffiti-file\" flag is set" .to_string(), )); } - set_graffiti(pubkey.clone(), query.graffiti, validator_store) + set_graffiti(pubkey.clone(), query.graffiti, validator_store).await }) }, ) @@ -1158,14 +1161,14 @@ pub fn serve( |pubkey: PublicKey, validator_store: Arc>, graffiti_file: Option| { - blocking_json_task(move || { + async_json_task(async move { if graffiti_file.is_some() { return Err(warp_utils::reject::invalid_auth( "Unable to delete graffiti as the \"--graffiti-file\" flag is set" .to_string(), )); } - delete_graffiti(pubkey.clone(), validator_store) + delete_graffiti(pubkey.clone(), validator_store).await }) }, ) @@ -1174,7 +1177,7 @@ pub fn serve( // GET /eth/v1/keystores let get_std_keystores = std_keystores.and(validator_store_filter.clone()).then( |validator_store: Arc>| { - blocking_json_task(move || Ok(keystores::list(validator_store))) + async_json_task(async move { Ok(keystores::list(validator_store).await) }) }, ); @@ -1188,7 +1191,7 @@ pub fn serve( .then( move |request, validator_dir, secrets_dir, validator_store, task_executor| { let secrets_dir = store_passwords_in_secrets_dir.then_some(secrets_dir); - blocking_json_task(move || { + async_json_task(async move { keystores::import::<_, E>( request, validator_dir, @@ -1196,6 +1199,7 @@ pub fn serve( validator_store, task_executor, ) + .await }) }, ); @@ -1204,15 +1208,14 @@ pub fn serve( let delete_std_keystores = std_keystores .and(warp::body::json()) .and(validator_store_filter.clone()) - .and(task_executor_filter.clone()) - .then(|request, validator_store, task_executor| { - blocking_json_task(move || keystores::delete(request, validator_store, task_executor)) + .then(|request, validator_store| { + async_json_task(async move { keystores::delete(request, validator_store).await }) }); // GET /eth/v1/remotekeys let get_std_remotekeys = std_remotekeys.and(validator_store_filter.clone()).then( |validator_store: Arc>| { - blocking_json_task(move || Ok(remotekeys::list(validator_store))) + async_json_task(async move { Ok(remotekeys::list(validator_store).await) }) }, ); @@ -1220,20 +1223,18 @@ pub fn serve( let post_std_remotekeys = std_remotekeys .and(warp::body::json()) .and(validator_store_filter.clone()) - .and(task_executor_filter.clone()) - .then(|request, validator_store, task_executor| { - blocking_json_task(move || { - remotekeys::import::<_, E>(request, validator_store, task_executor) - }) + .then(|request, validator_store| { + async_json_task( + async move { remotekeys::import::<_, E>(request, validator_store).await }, + ) }); // DELETE /eth/v1/remotekeys let delete_std_remotekeys = std_remotekeys .and(warp::body::json()) .and(validator_store_filter) - .and(task_executor_filter) - .then(|request, validator_store, task_executor| { - blocking_json_task(move || remotekeys::delete(request, validator_store, task_executor)) + .then(|request, validator_store| { + async_json_task(async move { remotekeys::delete(request, validator_store).await }) }); // Subscribe to get VC logs via Server side events diff --git a/validator_client/http_api/src/remotekeys.rs b/validator_client/http_api/src/remotekeys.rs index 5aa63baac3..4d6f59fba6 100644 --- a/validator_client/http_api/src/remotekeys.rs +++ b/validator_client/http_api/src/remotekeys.rs @@ -11,19 +11,17 @@ use initialized_validators::{Error, InitializedValidators}; use lighthouse_validator_store::LighthouseValidatorStore; use slot_clock::SlotClock; use std::sync::Arc; -use task_executor::TaskExecutor; -use tokio::runtime::Handle; use tracing::{info, warn}; use types::{EthSpec, PublicKeyBytes}; use url::Url; use warp::Rejection; use warp_utils::reject::custom_server_error; -pub fn list( +pub async fn list( validator_store: Arc>, ) -> ListRemotekeysResponse { let initialized_validators_rwlock = validator_store.initialized_validators(); - let initialized_validators = initialized_validators_rwlock.read(); + let initialized_validators = initialized_validators_rwlock.read().await; let keystores = initialized_validators .validator_definitions() @@ -48,10 +46,9 @@ pub fn list( ListRemotekeysResponse { data: keystores } } -pub fn import( +pub async fn import( request: ImportRemotekeysRequest, validator_store: Arc>, - task_executor: TaskExecutor, ) -> Result { info!( count = request.remote_keys.len(), @@ -61,40 +58,33 @@ pub fn import( let mut statuses = Vec::with_capacity(request.remote_keys.len()); for remotekey in request.remote_keys { - let status = if let Some(handle) = task_executor.handle() { - // Import the keystore. - match import_single_remotekey::<_, E>( - remotekey.pubkey, - remotekey.url, - &validator_store, - handle, - ) { - Ok(status) => Status::ok(status), - Err(e) => { - warn!( - pubkey = remotekey.pubkey.to_string(), - error = ?e, - "Error importing keystore, skipped" - ); - Status::error(ImportRemotekeyStatus::Error, e) - } + // Import the keystore. + let status = match import_single_remotekey::<_, E>( + remotekey.pubkey, + remotekey.url, + &validator_store, + ) + .await + { + Ok(status) => Status::ok(status), + Err(e) => { + warn!( + pubkey = remotekey.pubkey.to_string(), + error = ?e, + "Error importing keystore, skipped" + ); + Status::error(ImportRemotekeyStatus::Error, e) } - } else { - Status::error( - ImportRemotekeyStatus::Error, - "validator client shutdown".into(), - ) }; statuses.push(status); } Ok(ImportRemotekeysResponse { data: statuses }) } -fn import_single_remotekey( +async fn import_single_remotekey( pubkey: PublicKeyBytes, url: String, validator_store: &LighthouseValidatorStore, - handle: Handle, ) -> Result { if let Err(url_err) = Url::parse(&url) { return Err(format!("failed to parse remotekey URL: {}", url_err)); @@ -107,6 +97,7 @@ fn import_single_remotekey( if let Some(def) = validator_store .initialized_validators() .read() + .await .validator_definitions() .iter() .find(|def| def.voting_public_key == pubkey) @@ -138,17 +129,17 @@ fn import_single_remotekey( client_identity_password: None, }), }; - handle - .block_on(validator_store.add_validator(web3signer_validator)) + validator_store + .add_validator(web3signer_validator) + .await .map_err(|e| format!("failed to initialize validator: {:?}", e))?; Ok(ImportRemotekeyStatus::Imported) } -pub fn delete( +pub async fn delete( request: DeleteRemotekeysRequest, validator_store: Arc>, - task_executor: TaskExecutor, ) -> Result { info!( count = request.pubkeys.len(), @@ -156,61 +147,51 @@ pub fn delete( ); // Remove from initialized validators. let initialized_validators_rwlock = validator_store.initialized_validators(); - let mut initialized_validators = initialized_validators_rwlock.write(); + let mut initialized_validators = initialized_validators_rwlock.write().await; - let statuses = request - .pubkeys - .iter() - .map(|pubkey_bytes| { - match delete_single_remotekey( - pubkey_bytes, - &mut initialized_validators, - task_executor.clone(), - ) { - Ok(status) => Status::ok(status), - Err(error) => { - warn!( - pubkey = ?pubkey_bytes, - ?error, - "Error deleting keystore" - ); - Status::error(DeleteRemotekeyStatus::Error, error) - } + let mut statuses = vec![]; + + for pubkey_bytes in &request.pubkeys { + match delete_single_remotekey(pubkey_bytes, &mut initialized_validators).await { + Ok(status) => statuses.push(Status::ok(status)), + Err(error) => { + warn!( + pubkey = ?pubkey_bytes, + ?error, + "Error deleting keystore" + ); + statuses.push(Status::error(DeleteRemotekeyStatus::Error, error)) } - }) - .collect::>(); + } + } // Use `update_validators` to update the key cache. It is safe to let the key cache get a bit out // of date as it resets when it can't be decrypted. We update it just a single time to avoid // continually resetting it after each key deletion. - if let Some(handle) = task_executor.handle() { - handle - .block_on(initialized_validators.update_validators()) - .map_err(|e| custom_server_error(format!("unable to update key cache: {:?}", e)))?; - } + initialized_validators + .update_validators() + .await + .map_err(|e| custom_server_error(format!("unable to update key cache: {:?}", e)))?; Ok(DeleteRemotekeysResponse { data: statuses }) } -fn delete_single_remotekey( +async fn delete_single_remotekey( pubkey_bytes: &PublicKeyBytes, initialized_validators: &mut InitializedValidators, - task_executor: TaskExecutor, ) -> Result { - if let Some(handle) = task_executor.handle() { - let pubkey = pubkey_bytes - .decompress() - .map_err(|e| format!("invalid pubkey, {:?}: {:?}", pubkey_bytes, e))?; + let pubkey = pubkey_bytes + .decompress() + .map_err(|e| format!("invalid pubkey, {:?}: {:?}", pubkey_bytes, e))?; - match handle.block_on(initialized_validators.delete_definition_and_keystore(&pubkey, false)) - { - Ok(_) => Ok(DeleteRemotekeyStatus::Deleted), - Err(e) => match e { - Error::ValidatorNotInitialized(_) => Ok(DeleteRemotekeyStatus::NotFound), - _ => Err(format!("unable to disable and delete: {:?}", e)), - }, - } - } else { - Err("validator client shutdown".into()) + match initialized_validators + .delete_definition_and_keystore(&pubkey, false) + .await + { + Ok(_) => Ok(DeleteRemotekeyStatus::Deleted), + Err(e) => match e { + Error::ValidatorNotInitialized(_) => Ok(DeleteRemotekeyStatus::NotFound), + _ => Err(format!("unable to disable and delete: {:?}", e)), + }, } } diff --git a/validator_client/http_api/src/test_utils.rs b/validator_client/http_api/src/test_utils.rs index 08447a82ce..39c1c3345d 100644 --- a/validator_client/http_api/src/test_utils.rs +++ b/validator_client/http_api/src/test_utils.rs @@ -15,7 +15,6 @@ use eth2_keystore::KeystoreBuilder; use initialized_validators::key_cache::{KeyCache, CACHE_FILENAME}; use initialized_validators::{InitializedValidators, OnDecryptFailure}; use lighthouse_validator_store::{Config as ValidatorStoreConfig, LighthouseValidatorStore}; -use parking_lot::RwLock; use sensitive_url::SensitiveUrl; use slashing_protection::{SlashingDatabase, SLASHING_PROTECTION_FILENAME}; use slot_clock::{SlotClock, TestingSlotClock}; @@ -25,7 +24,7 @@ use std::sync::Arc; use std::time::Duration; use task_executor::test_utils::TestRuntime; use tempfile::{tempdir, TempDir}; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, RwLock}; use validator_services::block_service::BlockService; use zeroize::Zeroizing; @@ -114,6 +113,7 @@ impl ApiTester { validator_store .register_all_in_doppelganger_protection_if_enabled() + .await .expect("Should attach doppelganger service"); let initialized_validators = validator_store.initialized_validators(); @@ -189,6 +189,7 @@ impl ApiTester { self.initialized_validators .read() + .await .decrypt_key_cache(key_cache, &mut <_>::default(), OnDecryptFailure::Error) .await .expect("key cache should decypt"); @@ -280,27 +281,27 @@ impl ApiTester { self } - pub fn vals_total(&self) -> usize { - self.initialized_validators.read().num_total() + pub async fn vals_total(&self) -> usize { + self.initialized_validators.read().await.num_total() } - pub fn vals_enabled(&self) -> usize { - self.initialized_validators.read().num_enabled() + pub async fn vals_enabled(&self) -> usize { + self.initialized_validators.read().await.num_enabled() } - pub fn assert_enabled_validators_count(self, count: usize) -> Self { - assert_eq!(self.vals_enabled(), count); + pub async fn assert_enabled_validators_count(self, count: usize) -> Self { + assert_eq!(self.vals_enabled().await, count); self } - pub fn assert_validators_count(self, count: usize) -> Self { - assert_eq!(self.vals_total(), count); + pub async fn assert_validators_count(self, count: usize) -> Self { + assert_eq!(self.vals_total().await, count); self } pub async fn create_hd_validators(self, s: HdValidatorScenario) -> Self { - let initial_vals = self.vals_total(); - let initial_enabled_vals = self.vals_enabled(); + let initial_vals = self.vals_total().await; + let initial_enabled_vals = self.vals_enabled().await; let validators = (0..s.count) .map(|i| ValidatorRequest { @@ -346,15 +347,15 @@ impl ApiTester { }; assert_eq!(response.len(), s.count); - assert_eq!(self.vals_total(), initial_vals + s.count); + assert_eq!(self.vals_total().await, initial_vals + s.count); assert_eq!( - self.vals_enabled(), + self.vals_enabled().await, initial_enabled_vals + s.count - s.disabled.len() ); let server_vals = self.client.get_lighthouse_validators().await.unwrap().data; - assert_eq!(server_vals.len(), self.vals_total()); + assert_eq!(server_vals.len(), self.vals_total().await); // Ensure the server lists all of these newly created validators. for validator in &response { @@ -423,8 +424,8 @@ impl ApiTester { } pub async fn create_keystore_validators(self, s: KeystoreValidatorScenario) -> Self { - let initial_vals = self.vals_total(); - let initial_enabled_vals = self.vals_enabled(); + let initial_vals = self.vals_total().await; + let initial_enabled_vals = self.vals_enabled().await; let password = random_password(); let keypair = Keypair::random(); @@ -479,12 +480,15 @@ impl ApiTester { let num_enabled = s.enabled as usize; - assert_eq!(self.vals_total(), initial_vals + 1); - assert_eq!(self.vals_enabled(), initial_enabled_vals + num_enabled); + assert_eq!(self.vals_total().await, initial_vals + 1); + assert_eq!( + self.vals_enabled().await, + initial_enabled_vals + num_enabled + ); let server_vals = self.client.get_lighthouse_validators().await.unwrap().data; - assert_eq!(server_vals.len(), self.vals_total()); + assert_eq!(server_vals.len(), self.vals_total().await); assert_eq!(response.voting_pubkey, keypair.pk.into()); assert_eq!(response.enabled, s.enabled); @@ -493,8 +497,8 @@ impl ApiTester { } pub async fn create_web3signer_validators(self, s: Web3SignerValidatorScenario) -> Self { - let initial_vals = self.vals_total(); - let initial_enabled_vals = self.vals_enabled(); + let initial_vals = self.vals_total().await; + let initial_enabled_vals = self.vals_enabled().await; let request: Vec<_> = (0..s.count) .map(|i| { @@ -523,11 +527,11 @@ impl ApiTester { .await .unwrap(); - assert_eq!(self.vals_total(), initial_vals + s.count); + assert_eq!(self.vals_total().await, initial_vals + s.count); if s.enabled { - assert_eq!(self.vals_enabled(), initial_enabled_vals + s.count); + assert_eq!(self.vals_enabled().await, initial_enabled_vals + s.count); } else { - assert_eq!(self.vals_enabled(), initial_enabled_vals); + assert_eq!(self.vals_enabled().await, initial_enabled_vals); }; self @@ -552,6 +556,7 @@ impl ApiTester { assert_eq!( self.initialized_validators .read() + .await .is_enabled(&validator.voting_pubkey.decompress().unwrap()) .unwrap(), enabled @@ -606,7 +611,9 @@ impl ApiTester { let validator = &self.client.get_lighthouse_validators().await.unwrap().data[index]; assert_eq!( - self.validator_store.get_gas_limit(&validator.voting_pubkey), + self.validator_store + .get_gas_limit(&validator.voting_pubkey) + .await, gas_limit ); @@ -637,7 +644,8 @@ impl ApiTester { assert_eq!( self.validator_store - .get_builder_proposals_testing_only(&validator.voting_pubkey), + .get_builder_proposals_testing_only(&validator.voting_pubkey) + .await, builder_proposals ); diff --git a/validator_client/http_api/src/tests.rs b/validator_client/http_api/src/tests.rs index 4b1a3c0059..dde8ca61df 100644 --- a/validator_client/http_api/src/tests.rs +++ b/validator_client/http_api/src/tests.rs @@ -19,7 +19,6 @@ use eth2::{ }; use eth2_keystore::KeystoreBuilder; use lighthouse_validator_store::{Config as ValidatorStoreConfig, LighthouseValidatorStore}; -use parking_lot::RwLock; use sensitive_url::SensitiveUrl; use slashing_protection::{SlashingDatabase, SLASHING_PROTECTION_FILENAME}; use slot_clock::{SlotClock, TestingSlotClock}; @@ -30,6 +29,7 @@ use std::sync::Arc; use std::time::Duration; use task_executor::test_utils::TestRuntime; use tempfile::{tempdir, TempDir}; +use tokio::sync::RwLock; use types::graffiti::GraffitiString; use validator_store::ValidatorStore; use zeroize::Zeroizing; @@ -104,6 +104,7 @@ impl ApiTester { validator_store .register_all_in_doppelganger_protection_if_enabled() + .await .expect("Should attach doppelganger service"); let initialized_validators = validator_store.initialized_validators(); @@ -243,27 +244,27 @@ impl ApiTester { self } - pub fn vals_total(&self) -> usize { - self.initialized_validators.read().num_total() + pub async fn vals_total(&self) -> usize { + self.initialized_validators.read().await.num_total() } - pub fn vals_enabled(&self) -> usize { - self.initialized_validators.read().num_enabled() + pub async fn vals_enabled(&self) -> usize { + self.initialized_validators.read().await.num_enabled() } - pub fn assert_enabled_validators_count(self, count: usize) -> Self { - assert_eq!(self.vals_enabled(), count); + pub async fn assert_enabled_validators_count(self, count: usize) -> Self { + assert_eq!(self.vals_enabled().await, count); self } - pub fn assert_validators_count(self, count: usize) -> Self { - assert_eq!(self.vals_total(), count); + pub async fn assert_validators_count(self, count: usize) -> Self { + assert_eq!(self.vals_total().await, count); self } pub async fn create_hd_validators(self, s: HdValidatorScenario) -> Self { - let initial_vals = self.vals_total(); - let initial_enabled_vals = self.vals_enabled(); + let initial_vals = self.vals_total().await; + let initial_enabled_vals = self.vals_enabled().await; let validators = (0..s.count) .map(|i| ValidatorRequest { @@ -309,15 +310,15 @@ impl ApiTester { }; assert_eq!(response.len(), s.count); - assert_eq!(self.vals_total(), initial_vals + s.count); + assert_eq!(self.vals_total().await, initial_vals + s.count); assert_eq!( - self.vals_enabled(), + self.vals_enabled().await, initial_enabled_vals + s.count - s.disabled.len() ); let server_vals = self.client.get_lighthouse_validators().await.unwrap().data; - assert_eq!(server_vals.len(), self.vals_total()); + assert_eq!(server_vals.len(), self.vals_total().await); // Ensure the server lists all of these newly created validators. for validator in &response { @@ -386,8 +387,8 @@ impl ApiTester { } pub async fn create_keystore_validators(self, s: KeystoreValidatorScenario) -> Self { - let initial_vals = self.vals_total(); - let initial_enabled_vals = self.vals_enabled(); + let initial_vals = self.vals_total().await; + let initial_enabled_vals = self.vals_enabled().await; let password = random_password(); let keypair = Keypair::random(); @@ -442,12 +443,15 @@ impl ApiTester { let num_enabled = s.enabled as usize; - assert_eq!(self.vals_total(), initial_vals + 1); - assert_eq!(self.vals_enabled(), initial_enabled_vals + num_enabled); + assert_eq!(self.vals_total().await, initial_vals + 1); + assert_eq!( + self.vals_enabled().await, + initial_enabled_vals + num_enabled + ); let server_vals = self.client.get_lighthouse_validators().await.unwrap().data; - assert_eq!(server_vals.len(), self.vals_total()); + assert_eq!(server_vals.len(), self.vals_total().await); assert_eq!(response.voting_pubkey, keypair.pk.into()); assert_eq!(response.enabled, s.enabled); @@ -486,11 +490,11 @@ impl ApiTester { .await .unwrap(); - assert_eq!(self.vals_total(), initial_vals + s.count); + assert_eq!(self.vals_total().await, initial_vals + s.count); if s.enabled { - assert_eq!(self.vals_enabled(), initial_enabled_vals + s.count); + assert_eq!(self.vals_enabled().await, initial_enabled_vals + s.count); } else { - assert_eq!(self.vals_enabled(), initial_enabled_vals); + assert_eq!(self.vals_enabled().await, initial_enabled_vals); }; self @@ -501,6 +505,7 @@ impl ApiTester { // manually setting validator index in `ValidatorStore` self.initialized_validators .write() + .await .set_index(&validator.voting_pubkey, 0); let expected_exit_epoch = maybe_epoch.unwrap_or_else(|| self.get_current_epoch()); @@ -542,6 +547,7 @@ impl ApiTester { assert_eq!( self.initialized_validators .read() + .await .is_enabled(&validator.voting_pubkey.decompress().unwrap()) .unwrap(), enabled @@ -596,7 +602,9 @@ impl ApiTester { let validator = &self.client.get_lighthouse_validators().await.unwrap().data[index]; assert_eq!( - self.validator_store.get_gas_limit(&validator.voting_pubkey), + self.validator_store + .get_gas_limit(&validator.voting_pubkey) + .await, gas_limit ); @@ -669,7 +677,8 @@ impl ApiTester { assert_eq!( self.validator_store - .get_builder_proposals_testing_only(&validator.voting_pubkey), + .get_builder_proposals_testing_only(&validator.voting_pubkey) + .await, builder_proposals ); @@ -685,7 +694,8 @@ impl ApiTester { assert_eq!( self.validator_store - .get_builder_boost_factor_testing_only(&validator.voting_pubkey), + .get_builder_boost_factor_testing_only(&validator.voting_pubkey) + .await, builder_boost_factor ); @@ -701,17 +711,22 @@ impl ApiTester { assert_eq!( self.validator_store - .determine_builder_boost_factor(&validator.voting_pubkey), + .determine_builder_boost_factor(&validator.voting_pubkey) + .await, builder_boost_factor ); self } - pub fn assert_default_builder_boost_factor(self, builder_boost_factor: Option) -> Self { + pub async fn assert_default_builder_boost_factor( + self, + builder_boost_factor: Option, + ) -> Self { assert_eq!( self.validator_store - .determine_builder_boost_factor(&PublicKeyBytes::empty()), + .determine_builder_boost_factor(&PublicKeyBytes::empty()) + .await, builder_boost_factor ); @@ -727,7 +742,8 @@ impl ApiTester { assert_eq!( self.validator_store - .get_prefer_builder_proposals_testing_only(&validator.voting_pubkey), + .get_prefer_builder_proposals_testing_only(&validator.voting_pubkey) + .await, prefer_builder_proposals ); @@ -757,7 +773,9 @@ impl ApiTester { let validator = &self.client.get_lighthouse_validators().await.unwrap().data[index]; let graffiti_str = GraffitiString::from_str(graffiti).unwrap(); assert_eq!( - self.validator_store.graffiti(&validator.voting_pubkey), + self.validator_store + .graffiti(&validator.voting_pubkey) + .await, Some(graffiti_str.into()) ); diff --git a/validator_client/http_api/src/tests/keystores.rs b/validator_client/http_api/src/tests/keystores.rs index 37f7513f37..123deb9635 100644 --- a/validator_client/http_api/src/tests/keystores.rs +++ b/validator_client/http_api/src/tests/keystores.rs @@ -2152,6 +2152,7 @@ async fn import_remotekey_web3signer_enabled() { let web3_vals = tester .initialized_validators .read() + .await .validator_definitions() .to_vec(); @@ -2171,7 +2172,7 @@ async fn import_remotekey_web3signer_enabled() { assert_eq!(tester.vals_total(), 1); assert_eq!(tester.vals_enabled(), 1); { - let vals = tester.initialized_validators.read(); + let vals = tester.initialized_validators.read().await; let remote_vals = vals.validator_definitions(); // Web3signer should not be overwritten since it is enabled. diff --git a/validator_client/http_metrics/Cargo.toml b/validator_client/http_metrics/Cargo.toml index 24cbff7cde..e313e501b6 100644 --- a/validator_client/http_metrics/Cargo.toml +++ b/validator_client/http_metrics/Cargo.toml @@ -14,6 +14,7 @@ metrics = { workspace = true } parking_lot = { workspace = true } serde = { workspace = true } slot_clock = { workspace = true } +tokio = { workspace = true } tracing = { workspace = true } types = { workspace = true } validator_metrics = { workspace = true } diff --git a/validator_client/http_metrics/src/lib.rs b/validator_client/http_metrics/src/lib.rs index 7441939957..8e61f45389 100644 --- a/validator_client/http_metrics/src/lib.rs +++ b/validator_client/http_metrics/src/lib.rs @@ -6,13 +6,13 @@ use lighthouse_validator_store::LighthouseValidatorStore; use lighthouse_version::version_with_platform; use logging::crit; use malloc_utils::scrape_allocator_metrics; -use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use slot_clock::{SlotClock, SystemTimeSlotClock}; use std::future::Future; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::sync::RwLock; use tracing::info; use types::EthSpec; use validator_services::duties_service::DutiesService; @@ -124,6 +124,7 @@ pub fn serve( .and_then(|ctx: Arc>| async move { Ok::<_, warp::Rejection>( gather_prometheus_metrics(&ctx) + .await .map(|body| { Response::builder() .status(200) @@ -159,7 +160,7 @@ pub fn serve( Ok((listening_socket, server)) } -pub fn gather_prometheus_metrics( +pub async fn gather_prometheus_metrics( ctx: &Context, ) -> std::result::Result { use validator_metrics::*; @@ -167,7 +168,7 @@ pub fn gather_prometheus_metrics( let encoder = TextEncoder::new(); { - let shared = ctx.shared.read(); + let shared = ctx.shared.read().await; if let Some(genesis_time) = shared.genesis_time { if let Ok(now) = SystemTime::now().duration_since(UNIX_EPOCH) { @@ -184,17 +185,17 @@ pub fn gather_prometheus_metrics( set_int_gauge( &PROPOSER_COUNT, &[CURRENT_EPOCH], - duties_service.proposer_count(current_epoch) as i64, + duties_service.proposer_count(current_epoch).await as i64, ); set_int_gauge( &ATTESTER_COUNT, &[CURRENT_EPOCH], - duties_service.attester_count(current_epoch) as i64, + duties_service.attester_count(current_epoch).await as i64, ); set_int_gauge( &ATTESTER_COUNT, &[NEXT_EPOCH], - duties_service.attester_count(next_epoch) as i64, + duties_service.attester_count(next_epoch).await as i64, ); } } diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 100f896f8e..2a11505c74 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -22,7 +22,6 @@ use eth2::{reqwest::ClientBuilder, BeaconNodeHttpClient, StatusCode, Timeouts}; use initialized_validators::Error::UnableToOpenVotingKeystore; use lighthouse_validator_store::LighthouseValidatorStore; use notifier::spawn_notifier; -use parking_lot::RwLock; use reqwest::Certificate; use slot_clock::SlotClock; use slot_clock::SystemTimeSlotClock; @@ -32,6 +31,7 @@ use std::net::SocketAddr; use std::path::Path; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::sync::RwLock; use tokio::{ sync::mpsc, time::{sleep, Duration}, @@ -390,7 +390,7 @@ impl ProductionValidatorClient { // Update the metrics server. if let Some(ctx) = &validator_metrics_ctx { - ctx.shared.write().genesis_time = Some(genesis_time); + ctx.shared.write().await.genesis_time = Some(genesis_time); } let slot_clock = SystemTimeSlotClock::new( @@ -426,10 +426,12 @@ impl ProductionValidatorClient { )); // Ensure all validators are registered in doppelganger protection. - validator_store.register_all_in_doppelganger_protection_if_enabled()?; + validator_store + .register_all_in_doppelganger_protection_if_enabled() + .await?; info!( - voting_validators = validator_store.num_voting_validators(), + voting_validators = validator_store.num_voting_validators().await, "Loaded validator keypair store" ); @@ -437,7 +439,9 @@ impl ProductionValidatorClient { // oversized from having not been pruned (by a prior version) we don't want to prune // concurrently, as it will hog the lock and cause the attestation service to spew CRITs. if let Some(slot) = slot_clock.now() { - validator_store.prune_slashing_protection_db(slot.epoch(E::slots_per_epoch()), true); + validator_store + .prune_slashing_protection_db(slot.epoch(E::slots_per_epoch()), true) + .await; } let duties_service = Arc::new( @@ -455,8 +459,8 @@ impl ProductionValidatorClient { // Update the metrics server. if let Some(ctx) = &validator_metrics_ctx { - ctx.shared.write().validator_store = Some(validator_store.clone()); - ctx.shared.write().duties_service = Some(duties_service.clone()); + ctx.shared.write().await.validator_store = Some(validator_store.clone()); + ctx.shared.write().await.duties_service = Some(duties_service.clone()); } let mut block_service_builder = BlockServiceBuilder::new() diff --git a/validator_client/src/notifier.rs b/validator_client/src/notifier.rs index 05f1c919d2..f72b802de4 100644 --- a/validator_client/src/notifier.rs +++ b/validator_client/src/notifier.rs @@ -104,10 +104,10 @@ async fn notify( if let Some(slot) = duties_service.slot_clock.now() { let epoch = slot.epoch(E::slots_per_epoch()); - let total_validators = duties_service.total_validator_count(); - let proposing_validators = duties_service.proposer_count(epoch); - let attesting_validators = duties_service.attester_count(epoch); - let doppelganger_detecting_validators = duties_service.doppelganger_detecting_count(); + let total_validators = duties_service.total_validator_count().await; + let proposing_validators = duties_service.proposer_count(epoch).await; + let attesting_validators = duties_service.attester_count(epoch).await; + let doppelganger_detecting_validators = duties_service.doppelganger_detecting_count().await; if doppelganger_detecting_validators > 0 { info!( diff --git a/validator_client/validator_services/src/duties_service.rs b/validator_client/validator_services/src/duties_service.rs index 5505f1b452..d41835a952 100644 --- a/validator_client/validator_services/src/duties_service.rs +++ b/validator_client/validator_services/src/duties_service.rs @@ -658,7 +658,8 @@ async fn poll_validator_indices( ); duties_service .validator_store - .set_validator_index(&pubkey, response.data.index); + .set_validator_index(&pubkey, response.data.index) + .await; duties_service .unknown_validator_next_poll_slots @@ -756,7 +757,7 @@ async fn poll_beacon_attesters PreparationService Result<(), String> { - let preparation_data = self.collect_preparation_data(spec); + let preparation_data = self.collect_preparation_data(spec).await; if !preparation_data.is_empty() { self.publish_preparation_data(preparation_data).await?; } @@ -262,7 +262,7 @@ impl PreparationService Vec { + async fn collect_preparation_data(&self, spec: &ChainSpec) -> Vec { self.collect_proposal_data(|pubkey, proposal_data| { if let Some(fee_recipient) = proposal_data.fee_recipient { Some(ProposerPreparationData { @@ -281,9 +281,10 @@ impl PreparationService Vec { + async fn collect_validator_registration_keys(&self) -> Vec { self.collect_proposal_data(|pubkey, proposal_data| { // Ignore fee recipients for keys without indices, they are inactive. proposal_data.validator_index?; @@ -300,6 +301,7 @@ impl PreparationService(&self, map_fn: G) -> Vec @@ -311,13 +313,15 @@ impl PreparationService PreparationService Result<(), String> { - let registration_keys = self.collect_validator_registration_keys(); + let registration_keys = self.collect_validator_registration_keys().await; let mut changed_keys = vec![]; diff --git a/validator_client/validator_store/src/lib.rs b/validator_client/validator_store/src/lib.rs index cbcbfda1c4..d356f7b342 100644 --- a/validator_client/validator_store/src/lib.rs +++ b/validator_client/validator_store/src/lib.rs @@ -83,7 +83,7 @@ pub trait ValidatorStore: Send + Sync { fn get_fee_recipient( &self, validator_pubkey: &PublicKeyBytes, - ) -> impl Future>; + ) -> impl Future> + Send + Sync; /// Translate the `builder_proposals`, `builder_boost_factor` and /// `prefer_builder_proposals` to a boost factor, if available. diff --git a/validator_manager/src/move_validators.rs b/validator_manager/src/move_validators.rs index abac071673..0ea2f01e8d 100644 --- a/validator_manager/src/move_validators.rs +++ b/validator_manager/src/move_validators.rs @@ -964,7 +964,7 @@ mod test { // helps test that we don't delete a password file if it's in use by // another validator. if let Some(primary_index) = self.reuse_password_files { - let mut initialized_validators = src_vc.initialized_validators.write(); + let mut initialized_validators = src_vc.initialized_validators.write().await; let definitions = initialized_validators.as_mut_slice_testing_only(); // Find the path of the "primary" definition. let primary_path = definitions @@ -1001,6 +1001,7 @@ mod test { let passwords = src_vc .initialized_validators .write() + .await .delete_passwords_from_validator_definitions() .unwrap();