Compare commits

...

8 Commits

Author SHA1 Message Date
Eitan Seri-Levi
9438ed7add test fixes 2025-05-10 22:07:46 -07:00
Eitan Seri-Levi
f5d801ce6b fmt 2025-05-10 11:51:53 -07:00
Eitan Seri-Levi
47aca45013 subte tracing thing 2025-05-10 11:46:57 -07:00
Michael Sproul
4d118a6ff2 More WIP 2025-05-08 20:52:50 +10:00
Michael Sproul
2e50c2daea WIP 2025-05-08 16:14:50 +10:00
Michael Sproul
eaca4dfa47 Grinding through cases 2025-05-07 15:36:05 +10:00
Michael Sproul
bd57ddf4c7 WIP async locks 2025-05-07 15:14:08 +10:00
Michael Sproul
50c7b2b8e9 Stuff 2025-05-07 15:13:01 +10:00
23 changed files with 723 additions and 510 deletions

1
Cargo.lock generated
View File

@@ -9790,6 +9790,7 @@ dependencies = [
"parking_lot 0.12.3",
"serde",
"slot_clock",
"tokio",
"tracing",
"types",
"validator_metrics",

View File

@@ -1,5 +1,6 @@
use crate::reject::convert_rejection;
use serde::Serialize;
use std::future::Future;
use warp::reply::{Reply, Response};
/// A convenience wrapper around `blocking_task`.
@@ -38,3 +39,13 @@ where
convert_rejection(result).await
}
/// Async variant of `blocking_json_task`, which handles JSONification and error conversion.
pub async fn async_json_task<F, T>(fut: F) -> Response
where
F: Future<Output = Result<T, warp::Rejection>>,
T: Serialize + Send + 'static,
{
let result = fut.await.map(|response| warp::reply::json(&response));
convert_rejection(result).await
}

View File

@@ -221,7 +221,10 @@ impl DoppelgangerService {
V: ValidatorStore<E = E> + Send + Sync + 'static,
{
// Define the `get_index` function as one that uses the validator store.
let get_index = move |pubkey| validator_store.validator_index(&pubkey);
let get_index = move |pubkey| {
let inner_store = validator_store.clone();
async move { inner_store.clone().validator_index(&pubkey).await }
};
// Define the `get_liveness` function as one that queries the beacon node API.
let get_liveness = move |current_epoch, validator_indices| {
@@ -263,7 +266,7 @@ impl DoppelgangerService {
if let Some(slot) = slot_clock.now() {
if let Err(e) = service
.detect_doppelgangers::<E, _, _, _, _>(
.detect_doppelgangers::<E, _, _, _, _, _>(
slot,
&get_index,
&get_liveness,
@@ -355,7 +358,7 @@ impl DoppelgangerService {
/// This function is relatively complex when it comes to generic parameters. This is to allow
/// for simple unit testing. Using these generics, we can test the `DoppelgangerService` without
/// needing a BN API or a `ValidatorStore`.
async fn detect_doppelgangers<E, I, L, F, S>(
async fn detect_doppelgangers<E, I, L, F, G, S>(
&self,
request_slot: Slot,
get_index: &I,
@@ -364,13 +367,14 @@ impl DoppelgangerService {
) -> Result<(), String>
where
E: EthSpec,
I: Fn(PublicKeyBytes) -> Option<u64>,
I: Fn(PublicKeyBytes) -> G,
L: Fn(Epoch, Vec<u64>) -> F,
F: Future<Output = LivenessResponses>,
G: Future<Output = Option<u64>>,
S: FnMut(),
{
// Get all validators with active doppelganger protection.
let indices_map = self.compute_detection_indices_map(get_index);
let indices_map = self.compute_detection_indices_map(get_index).await;
if indices_map.is_empty() {
// Nothing to do.
@@ -397,9 +401,13 @@ impl DoppelgangerService {
/// further doppelganger checks.
///
/// Any validator with an unknown index will be omitted from these results.
fn compute_detection_indices_map<F>(&self, get_index: &F) -> HashMap<u64, PublicKeyBytes>
async fn compute_detection_indices_map<F, G>(
&self,
get_index: &F,
) -> HashMap<u64, PublicKeyBytes>
where
F: Fn(PublicKeyBytes) -> Option<u64>,
F: Fn(PublicKeyBytes) -> G,
G: Future<Output = Option<u64>>,
{
let detection_pubkeys = self
.doppelganger_states
@@ -421,7 +429,7 @@ impl DoppelgangerService {
// any other locks. That is why this is a separate loop to the one that generates
// `detection_pubkeys`.
for pubkey in detection_pubkeys {
if let Some(index) = get_index(pubkey) {
if let Some(index) = get_index(pubkey).await {
indices_map.insert(index, pubkey);
}
}
@@ -718,7 +726,7 @@ mod test {
self
}
pub fn assert_all_enabled(self) -> Self {
pub async fn assert_all_enabled(self) -> Self {
/*
* 1. Ensure all validators have the correct status.
*/
@@ -736,7 +744,11 @@ mod test {
let pubkey_to_index = self.pubkey_to_index_map();
let generated_map = self
.doppelganger
.compute_detection_indices_map(&|pubkey| pubkey_to_index.get(&pubkey).copied());
.compute_detection_indices_map(&|pubkey| {
let inner_map = pubkey_to_index.clone();
async move { inner_map.get(&pubkey).copied() }
})
.await;
assert!(
generated_map.is_empty(),
"there should be no indices for detection if all validators are enabled"
@@ -745,7 +757,7 @@ mod test {
self
}
pub fn assert_all_disabled(self) -> Self {
pub async fn assert_all_disabled(self) -> Self {
/*
* 1. Ensure all validators have the correct status.
*/
@@ -763,7 +775,11 @@ mod test {
let pubkey_to_index = self.pubkey_to_index_map();
let generated_map = self
.doppelganger
.compute_detection_indices_map(&|pubkey| pubkey_to_index.get(&pubkey).copied());
.compute_detection_indices_map(&|pubkey| {
let inner_map = pubkey_to_index.clone();
async move { inner_map.get(&pubkey).copied() }
})
.await;
assert_eq!(
pubkey_to_index.len(),
@@ -833,14 +849,15 @@ mod test {
}
}
#[test]
fn enabled_in_genesis_epoch() {
#[tokio::test]
async fn enabled_in_genesis_epoch() {
for slot in genesis_epoch().slot_iter(E::slots_per_epoch()) {
TestBuilder::default()
.build()
.set_slot(slot)
.register_all_in_doppelganger_protection_if_enabled()
.assert_all_enabled()
.await
.assert_all_states(&DoppelgangerState {
next_check_epoch: genesis_epoch() + 1,
remaining_epochs: 0,
@@ -848,8 +865,8 @@ mod test {
}
}
#[test]
fn disabled_after_genesis_epoch() {
#[tokio::test]
async fn disabled_after_genesis_epoch() {
let epoch = genesis_epoch() + 1;
for slot in epoch.slot_iter(E::slots_per_epoch()) {
@@ -858,6 +875,7 @@ mod test {
.set_slot(slot)
.register_all_in_doppelganger_protection_if_enabled()
.assert_all_disabled()
.await
.assert_all_states(&DoppelgangerState {
next_check_epoch: epoch + 1,
remaining_epochs: DEFAULT_REMAINING_DETECTION_EPOCHS,
@@ -865,8 +883,8 @@ mod test {
}
}
#[test]
fn unregistered_validator() {
#[tokio::test]
async fn unregistered_validator() {
// Non-genesis epoch
let epoch = genesis_epoch() + 2;
@@ -930,9 +948,12 @@ mod test {
// Create a simulated validator store that can resolve pubkeys to indices.
let pubkey_to_index = self.pubkey_to_index_map();
let get_index = |pubkey| pubkey_to_index.get(&pubkey).copied();
let get_index = |pubkey| {
let inner_map = pubkey_to_index.clone();
async move { inner_map.get(&pubkey).copied() }
};
block_on(self.doppelganger.detect_doppelgangers::<E, _, _, _, _>(
block_on(self.doppelganger.detect_doppelgangers::<E, _, _, _, _, _>(
slot,
&get_index,
&get_liveness,
@@ -950,8 +971,8 @@ mod test {
}
}
#[test]
fn detect_at_genesis() {
#[tokio::test]
async fn detect_at_genesis() {
let epoch = genesis_epoch();
let slot = epoch.start_slot(E::slots_per_epoch());
@@ -961,6 +982,7 @@ mod test {
.register_all_in_doppelganger_protection_if_enabled()
// All validators should have signing enabled since it's the genesis epoch.
.assert_all_enabled()
.await
.simulate_detect_doppelgangers(
slot,
ShouldShutdown::No,
@@ -973,10 +995,10 @@ mod test {
},
)
// All validators should be enabled.
.assert_all_enabled();
.assert_all_enabled().await;
}
fn detect_after_genesis_test<F>(mutate_responses: F)
async fn detect_after_genesis_test<F>(mutate_responses: F)
where
F: Fn(&mut LivenessResponses),
{
@@ -991,6 +1013,7 @@ mod test {
.set_slot(starting_slot)
.register_all_in_doppelganger_protection_if_enabled()
.assert_all_disabled()
.await
// First, simulate a check where there are no doppelgangers.
.simulate_detect_doppelgangers(
checking_slot,
@@ -1006,6 +1029,7 @@ mod test {
)
// All validators should be disabled since they started after genesis.
.assert_all_disabled()
.await
// Now, simulate a check where we apply `mutate_responses` which *must* create some
// doppelgangers.
.simulate_detect_doppelgangers(
@@ -1025,6 +1049,7 @@ mod test {
)
// All validators should still be disabled.
.assert_all_disabled()
.await
// The states of all validators should be jammed with `u64:MAX`.
.assert_all_states(&DoppelgangerState {
next_check_epoch: starting_epoch + 1,
@@ -1032,22 +1057,24 @@ mod test {
});
}
#[test]
fn detect_after_genesis_with_current_epoch_doppelganger() {
#[tokio::test]
async fn detect_after_genesis_with_current_epoch_doppelganger() {
detect_after_genesis_test(|liveness_responses| {
liveness_responses.current_epoch_responses[0].is_live = true
})
.await
}
#[test]
fn detect_after_genesis_with_previous_epoch_doppelganger() {
#[tokio::test]
async fn detect_after_genesis_with_previous_epoch_doppelganger() {
detect_after_genesis_test(|liveness_responses| {
liveness_responses.previous_epoch_responses[0].is_live = true
})
.await
}
#[test]
fn register_prior_to_genesis() {
#[tokio::test]
async fn register_prior_to_genesis() {
let prior_to_genesis = GENESIS_TIME.checked_sub(SLOT_DURATION).unwrap();
TestBuilder::default()
@@ -1057,8 +1084,8 @@ mod test {
.register_all_in_doppelganger_protection_if_enabled();
}
#[test]
fn detect_doppelganger_in_starting_epoch() {
#[tokio::test]
async fn detect_doppelganger_in_starting_epoch() {
let epoch = genesis_epoch() + 1;
let slot = epoch.start_slot(E::slots_per_epoch());
@@ -1067,6 +1094,7 @@ mod test {
.set_slot(slot)
.register_all_in_doppelganger_protection_if_enabled()
.assert_all_disabled()
.await
// First, simulate a check where there is a doppelganger in the starting epoch.
//
// This should *not* cause a shutdown since we don't declare a doppelganger in the
@@ -1088,14 +1116,15 @@ mod test {
},
)
.assert_all_disabled()
.await
.assert_all_states(&DoppelgangerState {
next_check_epoch: epoch + 1,
remaining_epochs: DEFAULT_REMAINING_DETECTION_EPOCHS,
});
}
#[test]
fn no_doppelgangers_for_adequate_time() {
#[tokio::test]
async fn no_doppelgangers_for_adequate_time() {
let initial_epoch = genesis_epoch() + 42;
let initial_slot = initial_epoch.start_slot(E::slots_per_epoch());
let activation_slot =
@@ -1105,7 +1134,8 @@ mod test {
.build()
.set_slot(initial_slot)
.register_all_in_doppelganger_protection_if_enabled()
.assert_all_disabled();
.assert_all_disabled()
.await;
for slot in initial_slot.as_u64()..=activation_slot.as_u64() {
let slot = Slot::new(slot);
@@ -1151,22 +1181,23 @@ mod test {
scenario = scenario.assert_all_states(&expected_state);
scenario = if slot < activation_slot {
scenario.assert_all_disabled()
scenario.assert_all_disabled().await
} else {
scenario.assert_all_enabled()
scenario.assert_all_enabled().await
};
}
scenario
.assert_all_enabled()
.await
.assert_all_states(&DoppelgangerState {
next_check_epoch: activation_slot.epoch(E::slots_per_epoch()),
remaining_epochs: 0,
});
}
#[test]
fn time_skips_forward_no_doppelgangers() {
#[tokio::test]
async fn time_skips_forward_no_doppelgangers() {
let initial_epoch = genesis_epoch() + 1;
let initial_slot = initial_epoch.start_slot(E::slots_per_epoch());
let skipped_forward_epoch = initial_epoch + 42;
@@ -1177,6 +1208,7 @@ mod test {
.set_slot(initial_slot)
.register_all_in_doppelganger_protection_if_enabled()
.assert_all_disabled()
.await
// First, simulate a check in the initialization epoch.
.simulate_detect_doppelgangers(
initial_slot,
@@ -1189,6 +1221,7 @@ mod test {
},
)
.assert_all_disabled()
.await
.assert_all_states(&DoppelgangerState {
next_check_epoch: initial_epoch + 1,
remaining_epochs: DEFAULT_REMAINING_DETECTION_EPOCHS,
@@ -1211,8 +1244,8 @@ mod test {
});
}
#[test]
fn time_skips_forward_with_doppelgangers() {
#[tokio::test]
async fn time_skips_forward_with_doppelgangers() {
let initial_epoch = genesis_epoch() + 1;
let initial_slot = initial_epoch.start_slot(E::slots_per_epoch());
let skipped_forward_epoch = initial_epoch + 42;
@@ -1223,6 +1256,7 @@ mod test {
.set_slot(initial_slot)
.register_all_in_doppelganger_protection_if_enabled()
.assert_all_disabled()
.await
// First, simulate a check in the initialization epoch.
.simulate_detect_doppelgangers(
initial_slot,
@@ -1235,6 +1269,7 @@ mod test {
},
)
.assert_all_disabled()
.await
.assert_all_states(&DoppelgangerState {
next_check_epoch: initial_epoch + 1,
remaining_epochs: DEFAULT_REMAINING_DETECTION_EPOCHS,
@@ -1261,8 +1296,8 @@ mod test {
});
}
#[test]
fn time_skips_backward() {
#[tokio::test]
async fn time_skips_backward() {
let initial_epoch = genesis_epoch() + 42;
let initial_slot = initial_epoch.start_slot(E::slots_per_epoch());
let skipped_backward_epoch = initial_epoch - 12;
@@ -1273,6 +1308,7 @@ mod test {
.set_slot(initial_slot)
.register_all_in_doppelganger_protection_if_enabled()
.assert_all_disabled()
.await
// First, simulate a check in the initialization epoch.
.simulate_detect_doppelgangers(
initial_slot,
@@ -1285,6 +1321,7 @@ mod test {
},
)
.assert_all_disabled()
.await
.assert_all_states(&DoppelgangerState {
next_check_epoch: initial_epoch + 1,
remaining_epochs: DEFAULT_REMAINING_DETECTION_EPOCHS,
@@ -1301,6 +1338,7 @@ mod test {
},
)
.assert_all_disabled()
.await
// When time skips backward we should *not* allow doppelganger advancement.
.assert_all_states(&DoppelgangerState {
next_check_epoch: initial_epoch + 1,
@@ -1308,8 +1346,8 @@ mod test {
});
}
#[test]
fn staggered_entry() {
#[tokio::test]
async fn staggered_entry() {
let early_epoch = genesis_epoch() + 42;
let early_slot = early_epoch.start_slot(E::slots_per_epoch());
let early_activation_slot =
@@ -1330,7 +1368,8 @@ mod test {
.register_validators(&early_validators)
.set_slot(late_slot)
.register_validators(&late_validators)
.assert_all_disabled();
.assert_all_disabled()
.await;
for slot in early_slot.as_u64()..=late_activation_slot.as_u64() {
let slot = Slot::new(slot);
@@ -1368,6 +1407,6 @@ mod test {
}
}
scenario.assert_all_enabled();
scenario.assert_all_enabled().await;
}
}

View File

@@ -21,7 +21,7 @@ pub async fn create_signed_voluntary_exit<T: 'static + SlotClock + Clone, E: Eth
};
let pubkey_bytes = PublicKeyBytes::from(pubkey);
if !validator_store.has_validator(&pubkey_bytes) {
if !validator_store.has_validator(&pubkey_bytes).await {
return Err(warp_utils::reject::custom_not_found(format!(
"{} is disabled or not managed by this validator client",
pubkey_bytes.as_hex_string()
@@ -30,6 +30,7 @@ pub async fn create_signed_voluntary_exit<T: 'static + SlotClock + Clone, E: Eth
let validator_index = validator_store
.validator_index(&pubkey_bytes)
.await
.ok_or_else(|| {
warp_utils::reject::custom_not_found(format!(
"The validator index for {} is not known. The validator client \

View File

@@ -4,13 +4,13 @@ use slot_clock::SlotClock;
use std::sync::Arc;
use types::{graffiti::GraffitiString, EthSpec, Graffiti};
pub fn get_graffiti<T: 'static + SlotClock + Clone, E: EthSpec>(
pub async fn get_graffiti<T: 'static + SlotClock + Clone, E: EthSpec>(
validator_pubkey: PublicKey,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
graffiti_flag: Option<Graffiti>,
) -> Result<Graffiti, warp::Rejection> {
let initialized_validators_rw_lock = validator_store.initialized_validators();
let initialized_validators = initialized_validators_rw_lock.read();
let initialized_validators = initialized_validators_rw_lock.read().await;
match initialized_validators.validator(&validator_pubkey.compress()) {
None => Err(warp_utils::reject::custom_not_found(
"The key was not found on the server".to_string(),
@@ -26,13 +26,13 @@ pub fn get_graffiti<T: 'static + SlotClock + Clone, E: EthSpec>(
}
}
pub fn set_graffiti<T: 'static + SlotClock + Clone, E: EthSpec>(
pub async fn set_graffiti<T: 'static + SlotClock + Clone, E: EthSpec>(
validator_pubkey: PublicKey,
graffiti: GraffitiString,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
) -> Result<(), warp::Rejection> {
let initialized_validators_rw_lock = validator_store.initialized_validators();
let mut initialized_validators = initialized_validators_rw_lock.write();
let mut initialized_validators = initialized_validators_rw_lock.write().await;
match initialized_validators.validator(&validator_pubkey.compress()) {
None => Err(warp_utils::reject::custom_not_found(
"The key was not found on the server, nothing to update".to_string(),
@@ -53,12 +53,12 @@ pub fn set_graffiti<T: 'static + SlotClock + Clone, E: EthSpec>(
}
}
pub fn delete_graffiti<T: 'static + SlotClock + Clone, E: EthSpec>(
pub async fn delete_graffiti<T: 'static + SlotClock + Clone, E: EthSpec>(
validator_pubkey: PublicKey,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
) -> Result<(), warp::Rejection> {
let initialized_validators_rw_lock = validator_store.initialized_validators();
let mut initialized_validators = initialized_validators_rw_lock.write();
let mut initialized_validators = initialized_validators_rw_lock.write().await;
match initialized_validators.validator(&validator_pubkey.compress()) {
None => Err(warp_utils::reject::custom_not_found(
"The key was not found on the server, nothing to delete".to_string(),

View File

@@ -16,7 +16,6 @@ use slot_clock::SlotClock;
use std::path::PathBuf;
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::runtime::Handle;
use tracing::{info, warn};
use types::{EthSpec, PublicKeyBytes};
use validator_dir::{keystore_password_path, Builder as ValidatorDirBuilder};
@@ -24,11 +23,11 @@ use warp::Rejection;
use warp_utils::reject::{custom_bad_request, custom_server_error};
use zeroize::Zeroizing;
pub fn list<T: SlotClock + 'static, E: EthSpec>(
pub async fn list<T: SlotClock + 'static, E: EthSpec>(
validator_store: Arc<LighthouseValidatorStore<T, E>>,
) -> ListKeystoresResponse {
let initialized_validators_rwlock = validator_store.initialized_validators();
let initialized_validators = initialized_validators_rwlock.read();
let initialized_validators = initialized_validators_rwlock.read().await;
let keystores = initialized_validators
.validator_definitions()
@@ -58,12 +57,12 @@ pub fn list<T: SlotClock + 'static, E: EthSpec>(
ListKeystoresResponse { data: keystores }
}
pub fn import<T: SlotClock + 'static, E: EthSpec>(
pub async fn import<T: SlotClock + 'static, E: EthSpec>(
request: ImportKeystoresRequest,
validator_dir: PathBuf,
secrets_dir: Option<PathBuf>,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
task_executor: TaskExecutor,
_task_executor: TaskExecutor,
) -> Result<ImportKeystoresResponse, Rejection> {
// Check request validity. This is the only cases in which we should return a 4xx code.
if request.keystores.len() != request.passwords.len() {
@@ -115,7 +114,7 @@ pub fn import<T: SlotClock + 'static, E: EthSpec>(
ImportKeystoreStatus::Error,
format!("slashing protection import failed: {:?}", e),
)
} else if let Some(handle) = task_executor.handle() {
} else {
// Import the keystore.
match import_single_keystore::<_, E>(
keystore,
@@ -123,8 +122,9 @@ pub fn import<T: SlotClock + 'static, E: EthSpec>(
validator_dir.clone(),
secrets_dir.clone(),
&validator_store,
handle,
) {
)
.await
{
Ok(status) => Status::ok(status),
Err(e) => {
warn!(
@@ -135,11 +135,6 @@ pub fn import<T: SlotClock + 'static, E: EthSpec>(
Status::error(ImportKeystoreStatus::Error, e)
}
}
} else {
Status::error(
ImportKeystoreStatus::Error,
"validator client shutdown".into(),
)
};
statuses.push(status);
}
@@ -159,13 +154,12 @@ pub fn import<T: SlotClock + 'static, E: EthSpec>(
Ok(ImportKeystoresResponse { data: statuses })
}
fn import_single_keystore<T: SlotClock + 'static, E: EthSpec>(
async fn import_single_keystore<T: SlotClock + 'static, E: EthSpec>(
keystore: Keystore,
password: Zeroizing<String>,
validator_dir_path: PathBuf,
secrets_dir: Option<PathBuf>,
validator_store: &LighthouseValidatorStore<T, E>,
handle: Handle,
) -> Result<ImportKeystoreStatus, String> {
// Check if the validator key already exists, erroring if it is a remote signer validator.
let pubkey = keystore
@@ -174,6 +168,7 @@ fn import_single_keystore<T: SlotClock + 'static, E: EthSpec>(
if let Some(def) = validator_store
.initialized_validators()
.read()
.await
.validator_definitions()
.iter()
.find(|def| def.voting_public_key == pubkey)
@@ -215,8 +210,8 @@ fn import_single_keystore<T: SlotClock + 'static, E: EthSpec>(
let voting_keystore_path = validator_dir.voting_keystore_path();
drop(validator_dir);
handle
.block_on(validator_store.add_validator_keystore(
validator_store
.add_validator_keystore(
voting_keystore_path,
password_storage,
true,
@@ -226,18 +221,18 @@ fn import_single_keystore<T: SlotClock + 'static, E: EthSpec>(
None,
None,
None,
))
)
.await
.map_err(|e| format!("failed to initialize validator: {:?}", e))?;
Ok(ImportKeystoreStatus::Imported)
}
pub fn delete<T: SlotClock + 'static, E: EthSpec>(
pub async fn delete<T: SlotClock + 'static, E: EthSpec>(
request: DeleteKeystoresRequest,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
task_executor: TaskExecutor,
) -> Result<DeleteKeystoresResponse, Rejection> {
let export_response = export(request, validator_store, task_executor)?;
let export_response = export(request, validator_store).await?;
// Check the status is Deleted to confirm deletion is successful, then only display the log
let successful_deletion = export_response
@@ -263,49 +258,40 @@ pub fn delete<T: SlotClock + 'static, E: EthSpec>(
})
}
pub fn export<T: SlotClock + 'static, E: EthSpec>(
pub async fn export<T: SlotClock + 'static, E: EthSpec>(
request: DeleteKeystoresRequest,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
task_executor: TaskExecutor,
) -> Result<ExportKeystoresResponse, Rejection> {
// Remove from initialized validators.
let initialized_validators_rwlock = validator_store.initialized_validators();
let mut initialized_validators = initialized_validators_rwlock.write();
let mut initialized_validators = initialized_validators_rwlock.write().await;
let mut responses = request
.pubkeys
.iter()
.map(|pubkey_bytes| {
match delete_single_keystore(
pubkey_bytes,
&mut initialized_validators,
task_executor.clone(),
) {
Ok(status) => status,
Err(error) => {
warn!(
pubkey = ?pubkey_bytes,
?error,
"Error deleting keystore"
);
SingleExportKeystoresResponse {
status: Status::error(DeleteKeystoreStatus::Error, error),
validating_keystore: None,
validating_keystore_password: None,
}
}
let mut responses = vec![];
for pubkey_bytes in &request.pubkeys {
match delete_single_keystore(pubkey_bytes, &mut initialized_validators).await {
Ok(status) => responses.push(status),
Err(error) => {
warn!(
pubkey = ?pubkey_bytes,
?error,
"Error deleting keystore"
);
responses.push(SingleExportKeystoresResponse {
status: Status::error(DeleteKeystoreStatus::Error, error),
validating_keystore: None,
validating_keystore_password: None,
})
}
})
.collect::<Vec<_>>();
}
}
// Use `update_validators` to update the key cache. It is safe to let the key cache get a bit out
// of date as it resets when it can't be decrypted. We update it just a single time to avoid
// continually resetting it after each key deletion.
if let Some(handle) = task_executor.handle() {
handle
.block_on(initialized_validators.update_validators())
.map_err(|e| custom_server_error(format!("unable to update key cache: {:?}", e)))?;
}
initialized_validators
.update_validators()
.await
.map_err(|e| custom_server_error(format!("unable to update key cache: {:?}", e)))?;
// Export the slashing protection data.
let slashing_protection = validator_store
@@ -332,38 +318,35 @@ pub fn export<T: SlotClock + 'static, E: EthSpec>(
})
}
fn delete_single_keystore(
async fn delete_single_keystore(
pubkey_bytes: &PublicKeyBytes,
initialized_validators: &mut InitializedValidators,
task_executor: TaskExecutor,
) -> Result<SingleExportKeystoresResponse, String> {
if let Some(handle) = task_executor.handle() {
let pubkey = pubkey_bytes
.decompress()
.map_err(|e| format!("invalid pubkey, {:?}: {:?}", pubkey_bytes, e))?;
let pubkey = pubkey_bytes
.decompress()
.map_err(|e| format!("invalid pubkey, {:?}: {:?}", pubkey_bytes, e))?;
match handle.block_on(initialized_validators.delete_definition_and_keystore(&pubkey, true))
{
Ok(Some(keystore_and_password)) => Ok(SingleExportKeystoresResponse {
status: Status::ok(DeleteKeystoreStatus::Deleted),
validating_keystore: Some(KeystoreJsonStr(keystore_and_password.keystore)),
validating_keystore_password: keystore_and_password.password,
}),
Ok(None) => Ok(SingleExportKeystoresResponse {
status: Status::ok(DeleteKeystoreStatus::Deleted),
match initialized_validators
.delete_definition_and_keystore(&pubkey, true)
.await
{
Ok(Some(keystore_and_password)) => Ok(SingleExportKeystoresResponse {
status: Status::ok(DeleteKeystoreStatus::Deleted),
validating_keystore: Some(KeystoreJsonStr(keystore_and_password.keystore)),
validating_keystore_password: keystore_and_password.password,
}),
Ok(None) => Ok(SingleExportKeystoresResponse {
status: Status::ok(DeleteKeystoreStatus::Deleted),
validating_keystore: None,
validating_keystore_password: None,
}),
Err(e) => match e {
Error::ValidatorNotInitialized(_) => Ok(SingleExportKeystoresResponse {
status: Status::ok(DeleteKeystoreStatus::NotFound),
validating_keystore: None,
validating_keystore_password: None,
}),
Err(e) => match e {
Error::ValidatorNotInitialized(_) => Ok(SingleExportKeystoresResponse {
status: Status::ok(DeleteKeystoreStatus::NotFound),
validating_keystore: None,
validating_keystore_password: None,
}),
_ => Err(format!("unable to disable and delete: {:?}", e)),
},
}
} else {
Err("validator client shutdown".into())
_ => Err(format!("unable to disable and delete: {:?}", e)),
},
}
}

View File

@@ -54,7 +54,7 @@ use types::{ChainSpec, ConfigAndPreset, EthSpec};
use validator_dir::Builder as ValidatorDirBuilder;
use validator_services::block_service::BlockService;
use warp::{sse::Event, Filter};
use warp_utils::task::blocking_json_task;
use warp_utils::task::{async_json_task, blocking_json_task};
#[derive(Debug)]
pub enum Error {
@@ -320,10 +320,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.then(|validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
async_json_task(async move {
let validators = validator_store
.initialized_validators()
.read()
.await
.validator_definitions()
.iter()
.map(|def| api_types::ValidatorData {
@@ -345,10 +346,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.then(
|validator_pubkey: PublicKey, validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
async_json_task(async move {
let validator = validator_store
.initialized_validators()
.read()
.await
.validator_definitions()
.iter()
.find(|def| def.voting_public_key == validator_pubkey)
@@ -397,11 +399,12 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
|validator_store: Arc<LighthouseValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>,
graffiti_flag: Option<Graffiti>| {
blocking_json_task(move || {
async_json_task(async move {
let mut result = HashMap::new();
for (key, graffiti_definition) in validator_store
.initialized_validators()
.read()
.await
.get_all_validators_graffiti()
{
let graffiti = determine_graffiti(
@@ -697,12 +700,13 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(graffiti_file_filter.clone())
.and(task_executor_filter.clone())
.then(
// FIXME(sproul): do we need task_executor for anything here?
|validator_pubkey: PublicKey,
body: api_types::ValidatorPatchRequest,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>,
task_executor: TaskExecutor| {
blocking_json_task(move || {
_task_executor: TaskExecutor| {
async_json_task(async move {
if body.graffiti.is_some() && graffiti_file.is_some() {
return Err(warp_utils::reject::custom_bad_request(
"Unable to update graffiti as the \"--graffiti-file\" flag is set"
@@ -711,8 +715,8 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
}
let maybe_graffiti = body.graffiti.clone().map(Into::into);
let initialized_validators_rw_lock = validator_store.initialized_validators();
let initialized_validators = initialized_validators_rw_lock.upgradable_read();
let initialized_validators_lock = validator_store.initialized_validators();
let mut initialized_validators = initialized_validators_lock.write().await;
// Do not make any changes if all fields are identical or unchanged.
fn equal_or_none<T: PartialEq>(
@@ -770,38 +774,24 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
Ok(())
}
(Some(_), _) => {
// Upgrade read lock only in the case where a write is actually
// required.
let mut initialized_validators_write =
parking_lot::RwLockUpgradableReadGuard::upgrade(
initialized_validators,
);
if let Some(handle) = task_executor.handle() {
handle
.block_on(
initialized_validators_write
.set_validator_definition_fields(
&validator_pubkey,
body.enabled,
body.gas_limit,
body.builder_proposals,
body.builder_boost_factor,
body.prefer_builder_proposals,
body.graffiti,
),
)
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
"unable to set validator status: {:?}",
e
))
})?;
Ok(())
} else {
Err(warp_utils::reject::custom_server_error(
"Lighthouse shutting down".into(),
))
}
initialized_validators
.set_validator_definition_fields(
&validator_pubkey,
body.enabled,
body.gas_limit,
body.builder_proposals,
body.builder_boost_factor,
body.prefer_builder_proposals,
body.graffiti,
)
.await
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
"unable to set validator status: {:?}",
e
))
})?;
Ok(())
}
}
})
@@ -827,10 +817,10 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::body::json())
.and(validator_store_filter.clone())
.and(task_executor_filter.clone())
.then(move |request, validator_store, task_executor| {
blocking_json_task(move || {
.then(move |request, validator_store, _task_executor| {
async_json_task(async move {
if allow_keystore_export {
keystores::export(request, validator_store, task_executor)
keystores::export(request, validator_store).await
} else {
Err(warp_utils::reject::custom_bad_request(
"keystore export is disabled".to_string(),
@@ -853,10 +843,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.then(
|validator_pubkey: PublicKey, validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
async_json_task(async move {
if validator_store
.initialized_validators()
.read()
.await
.is_enabled(&validator_pubkey)
.is_none()
{
@@ -867,6 +858,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
}
validator_store
.get_fee_recipient(&PublicKeyBytes::from(&validator_pubkey))
.await
.map(|fee_recipient| {
GenericResponse::from(GetFeeRecipientResponse {
pubkey: PublicKeyBytes::from(validator_pubkey.clone()),
@@ -894,10 +886,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
|validator_pubkey: PublicKey,
request: api_types::UpdateFeeRecipientRequest,
validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
async_json_task(async move {
if validator_store
.initialized_validators()
.read()
.await
.is_enabled(&validator_pubkey)
.is_none()
{
@@ -909,6 +902,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
validator_store
.initialized_validators()
.write()
.await
.set_validator_fee_recipient(&validator_pubkey, request.ethaddress)
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
@@ -919,7 +913,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
})
},
)
.map(|reply| warp::reply::with_status(reply, warp::http::StatusCode::ACCEPTED));
.map(|response| warp::reply::with_status(response, warp::http::StatusCode::ACCEPTED));
// DELETE /eth/v1/validator/{pubkey}/feerecipient
let delete_fee_recipient = eth_v1
@@ -930,10 +924,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.then(
|validator_pubkey: PublicKey, validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
async_json_task(async move {
if validator_store
.initialized_validators()
.read()
.await
.is_enabled(&validator_pubkey)
.is_none()
{
@@ -945,6 +940,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
validator_store
.initialized_validators()
.write()
.await
.delete_validator_fee_recipient(&validator_pubkey)
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
@@ -966,10 +962,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.then(
|validator_pubkey: PublicKey, validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
async_json_task(async move {
if validator_store
.initialized_validators()
.read()
.await
.is_enabled(&validator_pubkey)
.is_none()
{
@@ -981,7 +978,8 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
Ok(GenericResponse::from(GetGasLimitResponse {
pubkey: PublicKeyBytes::from(validator_pubkey.clone()),
gas_limit: validator_store
.get_gas_limit(&PublicKeyBytes::from(&validator_pubkey)),
.get_gas_limit(&PublicKeyBytes::from(&validator_pubkey))
.await,
}))
})
},
@@ -999,10 +997,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
|validator_pubkey: PublicKey,
request: api_types::UpdateGasLimitRequest,
validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
async_json_task(async move {
if validator_store
.initialized_validators()
.read()
.await
.is_enabled(&validator_pubkey)
.is_none()
{
@@ -1014,6 +1013,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
validator_store
.initialized_validators()
.write()
.await
.set_validator_gas_limit(&validator_pubkey, request.gas_limit)
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
@@ -1035,10 +1035,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.then(
|validator_pubkey: PublicKey, validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
async_json_task(async move {
if validator_store
.initialized_validators()
.read()
.await
.is_enabled(&validator_pubkey)
.is_none()
{
@@ -1050,6 +1051,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
validator_store
.initialized_validators()
.write()
.await
.delete_validator_gas_limit(&validator_pubkey)
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
@@ -1109,8 +1111,9 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
|pubkey: PublicKey,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
graffiti_flag: Option<Graffiti>| {
blocking_json_task(move || {
let graffiti = get_graffiti(pubkey.clone(), validator_store, graffiti_flag)?;
async_json_task(async move {
let graffiti =
get_graffiti(pubkey.clone(), validator_store, graffiti_flag).await?;
Ok(GenericResponse::from(GetGraffitiResponse {
pubkey: pubkey.into(),
graffiti,
@@ -1133,14 +1136,14 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
query: SetGraffitiRequest,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>| {
blocking_json_task(move || {
async_json_task(async move {
if graffiti_file.is_some() {
return Err(warp_utils::reject::invalid_auth(
"Unable to update graffiti as the \"--graffiti-file\" flag is set"
.to_string(),
));
}
set_graffiti(pubkey.clone(), query.graffiti, validator_store)
set_graffiti(pubkey.clone(), query.graffiti, validator_store).await
})
},
)
@@ -1158,14 +1161,14 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
|pubkey: PublicKey,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>| {
blocking_json_task(move || {
async_json_task(async move {
if graffiti_file.is_some() {
return Err(warp_utils::reject::invalid_auth(
"Unable to delete graffiti as the \"--graffiti-file\" flag is set"
.to_string(),
));
}
delete_graffiti(pubkey.clone(), validator_store)
delete_graffiti(pubkey.clone(), validator_store).await
})
},
)
@@ -1174,7 +1177,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
// GET /eth/v1/keystores
let get_std_keystores = std_keystores.and(validator_store_filter.clone()).then(
|validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || Ok(keystores::list(validator_store)))
async_json_task(async move { Ok(keystores::list(validator_store).await) })
},
);
@@ -1188,7 +1191,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.then(
move |request, validator_dir, secrets_dir, validator_store, task_executor| {
let secrets_dir = store_passwords_in_secrets_dir.then_some(secrets_dir);
blocking_json_task(move || {
async_json_task(async move {
keystores::import::<_, E>(
request,
validator_dir,
@@ -1196,6 +1199,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
validator_store,
task_executor,
)
.await
})
},
);
@@ -1204,15 +1208,14 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
let delete_std_keystores = std_keystores
.and(warp::body::json())
.and(validator_store_filter.clone())
.and(task_executor_filter.clone())
.then(|request, validator_store, task_executor| {
blocking_json_task(move || keystores::delete(request, validator_store, task_executor))
.then(|request, validator_store| {
async_json_task(async move { keystores::delete(request, validator_store).await })
});
// GET /eth/v1/remotekeys
let get_std_remotekeys = std_remotekeys.and(validator_store_filter.clone()).then(
|validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || Ok(remotekeys::list(validator_store)))
async_json_task(async move { Ok(remotekeys::list(validator_store).await) })
},
);
@@ -1220,20 +1223,18 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
let post_std_remotekeys = std_remotekeys
.and(warp::body::json())
.and(validator_store_filter.clone())
.and(task_executor_filter.clone())
.then(|request, validator_store, task_executor| {
blocking_json_task(move || {
remotekeys::import::<_, E>(request, validator_store, task_executor)
})
.then(|request, validator_store| {
async_json_task(
async move { remotekeys::import::<_, E>(request, validator_store).await },
)
});
// DELETE /eth/v1/remotekeys
let delete_std_remotekeys = std_remotekeys
.and(warp::body::json())
.and(validator_store_filter)
.and(task_executor_filter)
.then(|request, validator_store, task_executor| {
blocking_json_task(move || remotekeys::delete(request, validator_store, task_executor))
.then(|request, validator_store| {
async_json_task(async move { remotekeys::delete(request, validator_store).await })
});
// Subscribe to get VC logs via Server side events

View File

@@ -11,19 +11,17 @@ use initialized_validators::{Error, InitializedValidators};
use lighthouse_validator_store::LighthouseValidatorStore;
use slot_clock::SlotClock;
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::runtime::Handle;
use tracing::{info, warn};
use types::{EthSpec, PublicKeyBytes};
use url::Url;
use warp::Rejection;
use warp_utils::reject::custom_server_error;
pub fn list<T: SlotClock + 'static, E: EthSpec>(
pub async fn list<T: SlotClock + 'static, E: EthSpec>(
validator_store: Arc<LighthouseValidatorStore<T, E>>,
) -> ListRemotekeysResponse {
let initialized_validators_rwlock = validator_store.initialized_validators();
let initialized_validators = initialized_validators_rwlock.read();
let initialized_validators = initialized_validators_rwlock.read().await;
let keystores = initialized_validators
.validator_definitions()
@@ -48,10 +46,9 @@ pub fn list<T: SlotClock + 'static, E: EthSpec>(
ListRemotekeysResponse { data: keystores }
}
pub fn import<T: SlotClock + 'static, E: EthSpec>(
pub async fn import<T: SlotClock + 'static, E: EthSpec>(
request: ImportRemotekeysRequest,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
task_executor: TaskExecutor,
) -> Result<ImportRemotekeysResponse, Rejection> {
info!(
count = request.remote_keys.len(),
@@ -61,40 +58,33 @@ pub fn import<T: SlotClock + 'static, E: EthSpec>(
let mut statuses = Vec::with_capacity(request.remote_keys.len());
for remotekey in request.remote_keys {
let status = if let Some(handle) = task_executor.handle() {
// Import the keystore.
match import_single_remotekey::<_, E>(
remotekey.pubkey,
remotekey.url,
&validator_store,
handle,
) {
Ok(status) => Status::ok(status),
Err(e) => {
warn!(
pubkey = remotekey.pubkey.to_string(),
error = ?e,
"Error importing keystore, skipped"
);
Status::error(ImportRemotekeyStatus::Error, e)
}
// Import the keystore.
let status = match import_single_remotekey::<_, E>(
remotekey.pubkey,
remotekey.url,
&validator_store,
)
.await
{
Ok(status) => Status::ok(status),
Err(e) => {
warn!(
pubkey = remotekey.pubkey.to_string(),
error = ?e,
"Error importing keystore, skipped"
);
Status::error(ImportRemotekeyStatus::Error, e)
}
} else {
Status::error(
ImportRemotekeyStatus::Error,
"validator client shutdown".into(),
)
};
statuses.push(status);
}
Ok(ImportRemotekeysResponse { data: statuses })
}
fn import_single_remotekey<T: SlotClock + 'static, E: EthSpec>(
async fn import_single_remotekey<T: SlotClock + 'static, E: EthSpec>(
pubkey: PublicKeyBytes,
url: String,
validator_store: &LighthouseValidatorStore<T, E>,
handle: Handle,
) -> Result<ImportRemotekeyStatus, String> {
if let Err(url_err) = Url::parse(&url) {
return Err(format!("failed to parse remotekey URL: {}", url_err));
@@ -107,6 +97,7 @@ fn import_single_remotekey<T: SlotClock + 'static, E: EthSpec>(
if let Some(def) = validator_store
.initialized_validators()
.read()
.await
.validator_definitions()
.iter()
.find(|def| def.voting_public_key == pubkey)
@@ -138,17 +129,17 @@ fn import_single_remotekey<T: SlotClock + 'static, E: EthSpec>(
client_identity_password: None,
}),
};
handle
.block_on(validator_store.add_validator(web3signer_validator))
validator_store
.add_validator(web3signer_validator)
.await
.map_err(|e| format!("failed to initialize validator: {:?}", e))?;
Ok(ImportRemotekeyStatus::Imported)
}
pub fn delete<T: SlotClock + 'static, E: EthSpec>(
pub async fn delete<T: SlotClock + 'static, E: EthSpec>(
request: DeleteRemotekeysRequest,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
task_executor: TaskExecutor,
) -> Result<DeleteRemotekeysResponse, Rejection> {
info!(
count = request.pubkeys.len(),
@@ -156,61 +147,51 @@ pub fn delete<T: SlotClock + 'static, E: EthSpec>(
);
// Remove from initialized validators.
let initialized_validators_rwlock = validator_store.initialized_validators();
let mut initialized_validators = initialized_validators_rwlock.write();
let mut initialized_validators = initialized_validators_rwlock.write().await;
let statuses = request
.pubkeys
.iter()
.map(|pubkey_bytes| {
match delete_single_remotekey(
pubkey_bytes,
&mut initialized_validators,
task_executor.clone(),
) {
Ok(status) => Status::ok(status),
Err(error) => {
warn!(
pubkey = ?pubkey_bytes,
?error,
"Error deleting keystore"
);
Status::error(DeleteRemotekeyStatus::Error, error)
}
let mut statuses = vec![];
for pubkey_bytes in &request.pubkeys {
match delete_single_remotekey(pubkey_bytes, &mut initialized_validators).await {
Ok(status) => statuses.push(Status::ok(status)),
Err(error) => {
warn!(
pubkey = ?pubkey_bytes,
?error,
"Error deleting keystore"
);
statuses.push(Status::error(DeleteRemotekeyStatus::Error, error))
}
})
.collect::<Vec<_>>();
}
}
// Use `update_validators` to update the key cache. It is safe to let the key cache get a bit out
// of date as it resets when it can't be decrypted. We update it just a single time to avoid
// continually resetting it after each key deletion.
if let Some(handle) = task_executor.handle() {
handle
.block_on(initialized_validators.update_validators())
.map_err(|e| custom_server_error(format!("unable to update key cache: {:?}", e)))?;
}
initialized_validators
.update_validators()
.await
.map_err(|e| custom_server_error(format!("unable to update key cache: {:?}", e)))?;
Ok(DeleteRemotekeysResponse { data: statuses })
}
fn delete_single_remotekey(
async fn delete_single_remotekey(
pubkey_bytes: &PublicKeyBytes,
initialized_validators: &mut InitializedValidators,
task_executor: TaskExecutor,
) -> Result<DeleteRemotekeyStatus, String> {
if let Some(handle) = task_executor.handle() {
let pubkey = pubkey_bytes
.decompress()
.map_err(|e| format!("invalid pubkey, {:?}: {:?}", pubkey_bytes, e))?;
let pubkey = pubkey_bytes
.decompress()
.map_err(|e| format!("invalid pubkey, {:?}: {:?}", pubkey_bytes, e))?;
match handle.block_on(initialized_validators.delete_definition_and_keystore(&pubkey, false))
{
Ok(_) => Ok(DeleteRemotekeyStatus::Deleted),
Err(e) => match e {
Error::ValidatorNotInitialized(_) => Ok(DeleteRemotekeyStatus::NotFound),
_ => Err(format!("unable to disable and delete: {:?}", e)),
},
}
} else {
Err("validator client shutdown".into())
match initialized_validators
.delete_definition_and_keystore(&pubkey, false)
.await
{
Ok(_) => Ok(DeleteRemotekeyStatus::Deleted),
Err(e) => match e {
Error::ValidatorNotInitialized(_) => Ok(DeleteRemotekeyStatus::NotFound),
_ => Err(format!("unable to disable and delete: {:?}", e)),
},
}
}

View File

@@ -15,7 +15,6 @@ use eth2_keystore::KeystoreBuilder;
use initialized_validators::key_cache::{KeyCache, CACHE_FILENAME};
use initialized_validators::{InitializedValidators, OnDecryptFailure};
use lighthouse_validator_store::{Config as ValidatorStoreConfig, LighthouseValidatorStore};
use parking_lot::RwLock;
use sensitive_url::SensitiveUrl;
use slashing_protection::{SlashingDatabase, SLASHING_PROTECTION_FILENAME};
use slot_clock::{SlotClock, TestingSlotClock};
@@ -25,7 +24,7 @@ use std::sync::Arc;
use std::time::Duration;
use task_executor::test_utils::TestRuntime;
use tempfile::{tempdir, TempDir};
use tokio::sync::oneshot;
use tokio::sync::{oneshot, RwLock};
use validator_services::block_service::BlockService;
use zeroize::Zeroizing;
@@ -114,6 +113,7 @@ impl ApiTester {
validator_store
.register_all_in_doppelganger_protection_if_enabled()
.await
.expect("Should attach doppelganger service");
let initialized_validators = validator_store.initialized_validators();
@@ -189,6 +189,7 @@ impl ApiTester {
self.initialized_validators
.read()
.await
.decrypt_key_cache(key_cache, &mut <_>::default(), OnDecryptFailure::Error)
.await
.expect("key cache should decypt");
@@ -280,27 +281,27 @@ impl ApiTester {
self
}
pub fn vals_total(&self) -> usize {
self.initialized_validators.read().num_total()
pub async fn vals_total(&self) -> usize {
self.initialized_validators.read().await.num_total()
}
pub fn vals_enabled(&self) -> usize {
self.initialized_validators.read().num_enabled()
pub async fn vals_enabled(&self) -> usize {
self.initialized_validators.read().await.num_enabled()
}
pub fn assert_enabled_validators_count(self, count: usize) -> Self {
assert_eq!(self.vals_enabled(), count);
pub async fn assert_enabled_validators_count(self, count: usize) -> Self {
assert_eq!(self.vals_enabled().await, count);
self
}
pub fn assert_validators_count(self, count: usize) -> Self {
assert_eq!(self.vals_total(), count);
pub async fn assert_validators_count(self, count: usize) -> Self {
assert_eq!(self.vals_total().await, count);
self
}
pub async fn create_hd_validators(self, s: HdValidatorScenario) -> Self {
let initial_vals = self.vals_total();
let initial_enabled_vals = self.vals_enabled();
let initial_vals = self.vals_total().await;
let initial_enabled_vals = self.vals_enabled().await;
let validators = (0..s.count)
.map(|i| ValidatorRequest {
@@ -346,15 +347,15 @@ impl ApiTester {
};
assert_eq!(response.len(), s.count);
assert_eq!(self.vals_total(), initial_vals + s.count);
assert_eq!(self.vals_total().await, initial_vals + s.count);
assert_eq!(
self.vals_enabled(),
self.vals_enabled().await,
initial_enabled_vals + s.count - s.disabled.len()
);
let server_vals = self.client.get_lighthouse_validators().await.unwrap().data;
assert_eq!(server_vals.len(), self.vals_total());
assert_eq!(server_vals.len(), self.vals_total().await);
// Ensure the server lists all of these newly created validators.
for validator in &response {
@@ -423,8 +424,8 @@ impl ApiTester {
}
pub async fn create_keystore_validators(self, s: KeystoreValidatorScenario) -> Self {
let initial_vals = self.vals_total();
let initial_enabled_vals = self.vals_enabled();
let initial_vals = self.vals_total().await;
let initial_enabled_vals = self.vals_enabled().await;
let password = random_password();
let keypair = Keypair::random();
@@ -479,12 +480,15 @@ impl ApiTester {
let num_enabled = s.enabled as usize;
assert_eq!(self.vals_total(), initial_vals + 1);
assert_eq!(self.vals_enabled(), initial_enabled_vals + num_enabled);
assert_eq!(self.vals_total().await, initial_vals + 1);
assert_eq!(
self.vals_enabled().await,
initial_enabled_vals + num_enabled
);
let server_vals = self.client.get_lighthouse_validators().await.unwrap().data;
assert_eq!(server_vals.len(), self.vals_total());
assert_eq!(server_vals.len(), self.vals_total().await);
assert_eq!(response.voting_pubkey, keypair.pk.into());
assert_eq!(response.enabled, s.enabled);
@@ -493,8 +497,8 @@ impl ApiTester {
}
pub async fn create_web3signer_validators(self, s: Web3SignerValidatorScenario) -> Self {
let initial_vals = self.vals_total();
let initial_enabled_vals = self.vals_enabled();
let initial_vals = self.vals_total().await;
let initial_enabled_vals = self.vals_enabled().await;
let request: Vec<_> = (0..s.count)
.map(|i| {
@@ -523,11 +527,11 @@ impl ApiTester {
.await
.unwrap();
assert_eq!(self.vals_total(), initial_vals + s.count);
assert_eq!(self.vals_total().await, initial_vals + s.count);
if s.enabled {
assert_eq!(self.vals_enabled(), initial_enabled_vals + s.count);
assert_eq!(self.vals_enabled().await, initial_enabled_vals + s.count);
} else {
assert_eq!(self.vals_enabled(), initial_enabled_vals);
assert_eq!(self.vals_enabled().await, initial_enabled_vals);
};
self
@@ -552,6 +556,7 @@ impl ApiTester {
assert_eq!(
self.initialized_validators
.read()
.await
.is_enabled(&validator.voting_pubkey.decompress().unwrap())
.unwrap(),
enabled
@@ -606,7 +611,9 @@ impl ApiTester {
let validator = &self.client.get_lighthouse_validators().await.unwrap().data[index];
assert_eq!(
self.validator_store.get_gas_limit(&validator.voting_pubkey),
self.validator_store
.get_gas_limit(&validator.voting_pubkey)
.await,
gas_limit
);
@@ -637,7 +644,8 @@ impl ApiTester {
assert_eq!(
self.validator_store
.get_builder_proposals_testing_only(&validator.voting_pubkey),
.get_builder_proposals_testing_only(&validator.voting_pubkey)
.await,
builder_proposals
);

View File

@@ -19,7 +19,6 @@ use eth2::{
};
use eth2_keystore::KeystoreBuilder;
use lighthouse_validator_store::{Config as ValidatorStoreConfig, LighthouseValidatorStore};
use parking_lot::RwLock;
use sensitive_url::SensitiveUrl;
use slashing_protection::{SlashingDatabase, SLASHING_PROTECTION_FILENAME};
use slot_clock::{SlotClock, TestingSlotClock};
@@ -30,6 +29,7 @@ use std::sync::Arc;
use std::time::Duration;
use task_executor::test_utils::TestRuntime;
use tempfile::{tempdir, TempDir};
use tokio::sync::RwLock;
use types::graffiti::GraffitiString;
use validator_store::ValidatorStore;
use zeroize::Zeroizing;
@@ -104,6 +104,7 @@ impl ApiTester {
validator_store
.register_all_in_doppelganger_protection_if_enabled()
.await
.expect("Should attach doppelganger service");
let initialized_validators = validator_store.initialized_validators();
@@ -243,27 +244,27 @@ impl ApiTester {
self
}
pub fn vals_total(&self) -> usize {
self.initialized_validators.read().num_total()
pub async fn vals_total(&self) -> usize {
self.initialized_validators.read().await.num_total()
}
pub fn vals_enabled(&self) -> usize {
self.initialized_validators.read().num_enabled()
pub async fn vals_enabled(&self) -> usize {
self.initialized_validators.read().await.num_enabled()
}
pub fn assert_enabled_validators_count(self, count: usize) -> Self {
assert_eq!(self.vals_enabled(), count);
pub async fn assert_enabled_validators_count(self, count: usize) -> Self {
assert_eq!(self.vals_enabled().await, count);
self
}
pub fn assert_validators_count(self, count: usize) -> Self {
assert_eq!(self.vals_total(), count);
pub async fn assert_validators_count(self, count: usize) -> Self {
assert_eq!(self.vals_total().await, count);
self
}
pub async fn create_hd_validators(self, s: HdValidatorScenario) -> Self {
let initial_vals = self.vals_total();
let initial_enabled_vals = self.vals_enabled();
let initial_vals = self.vals_total().await;
let initial_enabled_vals = self.vals_enabled().await;
let validators = (0..s.count)
.map(|i| ValidatorRequest {
@@ -309,15 +310,15 @@ impl ApiTester {
};
assert_eq!(response.len(), s.count);
assert_eq!(self.vals_total(), initial_vals + s.count);
assert_eq!(self.vals_total().await, initial_vals + s.count);
assert_eq!(
self.vals_enabled(),
self.vals_enabled().await,
initial_enabled_vals + s.count - s.disabled.len()
);
let server_vals = self.client.get_lighthouse_validators().await.unwrap().data;
assert_eq!(server_vals.len(), self.vals_total());
assert_eq!(server_vals.len(), self.vals_total().await);
// Ensure the server lists all of these newly created validators.
for validator in &response {
@@ -386,8 +387,8 @@ impl ApiTester {
}
pub async fn create_keystore_validators(self, s: KeystoreValidatorScenario) -> Self {
let initial_vals = self.vals_total();
let initial_enabled_vals = self.vals_enabled();
let initial_vals = self.vals_total().await;
let initial_enabled_vals = self.vals_enabled().await;
let password = random_password();
let keypair = Keypair::random();
@@ -442,12 +443,15 @@ impl ApiTester {
let num_enabled = s.enabled as usize;
assert_eq!(self.vals_total(), initial_vals + 1);
assert_eq!(self.vals_enabled(), initial_enabled_vals + num_enabled);
assert_eq!(self.vals_total().await, initial_vals + 1);
assert_eq!(
self.vals_enabled().await,
initial_enabled_vals + num_enabled
);
let server_vals = self.client.get_lighthouse_validators().await.unwrap().data;
assert_eq!(server_vals.len(), self.vals_total());
assert_eq!(server_vals.len(), self.vals_total().await);
assert_eq!(response.voting_pubkey, keypair.pk.into());
assert_eq!(response.enabled, s.enabled);
@@ -456,8 +460,8 @@ impl ApiTester {
}
pub async fn create_web3signer_validators(self, s: Web3SignerValidatorScenario) -> Self {
let initial_vals = self.vals_total();
let initial_enabled_vals = self.vals_enabled();
let initial_vals = self.vals_total().await;
let initial_enabled_vals = self.vals_enabled().await;
let request: Vec<_> = (0..s.count)
.map(|i| {
@@ -486,11 +490,11 @@ impl ApiTester {
.await
.unwrap();
assert_eq!(self.vals_total(), initial_vals + s.count);
assert_eq!(self.vals_total().await, initial_vals + s.count);
if s.enabled {
assert_eq!(self.vals_enabled(), initial_enabled_vals + s.count);
assert_eq!(self.vals_enabled().await, initial_enabled_vals + s.count);
} else {
assert_eq!(self.vals_enabled(), initial_enabled_vals);
assert_eq!(self.vals_enabled().await, initial_enabled_vals);
};
self
@@ -501,6 +505,7 @@ impl ApiTester {
// manually setting validator index in `ValidatorStore`
self.initialized_validators
.write()
.await
.set_index(&validator.voting_pubkey, 0);
let expected_exit_epoch = maybe_epoch.unwrap_or_else(|| self.get_current_epoch());
@@ -542,6 +547,7 @@ impl ApiTester {
assert_eq!(
self.initialized_validators
.read()
.await
.is_enabled(&validator.voting_pubkey.decompress().unwrap())
.unwrap(),
enabled
@@ -596,7 +602,9 @@ impl ApiTester {
let validator = &self.client.get_lighthouse_validators().await.unwrap().data[index];
assert_eq!(
self.validator_store.get_gas_limit(&validator.voting_pubkey),
self.validator_store
.get_gas_limit(&validator.voting_pubkey)
.await,
gas_limit
);
@@ -669,7 +677,8 @@ impl ApiTester {
assert_eq!(
self.validator_store
.get_builder_proposals_testing_only(&validator.voting_pubkey),
.get_builder_proposals_testing_only(&validator.voting_pubkey)
.await,
builder_proposals
);
@@ -685,7 +694,8 @@ impl ApiTester {
assert_eq!(
self.validator_store
.get_builder_boost_factor_testing_only(&validator.voting_pubkey),
.get_builder_boost_factor_testing_only(&validator.voting_pubkey)
.await,
builder_boost_factor
);
@@ -701,17 +711,22 @@ impl ApiTester {
assert_eq!(
self.validator_store
.determine_builder_boost_factor(&validator.voting_pubkey),
.determine_builder_boost_factor(&validator.voting_pubkey)
.await,
builder_boost_factor
);
self
}
pub fn assert_default_builder_boost_factor(self, builder_boost_factor: Option<u64>) -> Self {
pub async fn assert_default_builder_boost_factor(
self,
builder_boost_factor: Option<u64>,
) -> Self {
assert_eq!(
self.validator_store
.determine_builder_boost_factor(&PublicKeyBytes::empty()),
.determine_builder_boost_factor(&PublicKeyBytes::empty())
.await,
builder_boost_factor
);
@@ -727,7 +742,8 @@ impl ApiTester {
assert_eq!(
self.validator_store
.get_prefer_builder_proposals_testing_only(&validator.voting_pubkey),
.get_prefer_builder_proposals_testing_only(&validator.voting_pubkey)
.await,
prefer_builder_proposals
);
@@ -757,7 +773,9 @@ impl ApiTester {
let validator = &self.client.get_lighthouse_validators().await.unwrap().data[index];
let graffiti_str = GraffitiString::from_str(graffiti).unwrap();
assert_eq!(
self.validator_store.graffiti(&validator.voting_pubkey),
self.validator_store
.graffiti(&validator.voting_pubkey)
.await,
Some(graffiti_str.into())
);
@@ -977,7 +995,9 @@ async fn hd_validator_creation() {
ApiTester::new()
.await
.assert_enabled_validators_count(0)
.await
.assert_validators_count(0)
.await
.create_hd_validators(HdValidatorScenario {
count: 2,
specify_mnemonic: true,
@@ -986,7 +1006,9 @@ async fn hd_validator_creation() {
})
.await
.assert_enabled_validators_count(2)
.await
.assert_validators_count(2)
.await
.create_hd_validators(HdValidatorScenario {
count: 1,
specify_mnemonic: false,
@@ -995,7 +1017,9 @@ async fn hd_validator_creation() {
})
.await
.assert_enabled_validators_count(2)
.await
.assert_validators_count(3)
.await
.create_hd_validators(HdValidatorScenario {
count: 0,
specify_mnemonic: true,
@@ -1004,7 +1028,9 @@ async fn hd_validator_creation() {
})
.await
.assert_enabled_validators_count(2)
.assert_validators_count(3);
.await
.assert_validators_count(3)
.await;
}
#[tokio::test]
@@ -1019,7 +1045,9 @@ async fn validator_exit() {
})
.await
.assert_enabled_validators_count(2)
.await
.assert_validators_count(2)
.await
.test_sign_voluntary_exits(0, None)
.await
.test_sign_voluntary_exits(0, Some(Epoch::new(256)))
@@ -1038,15 +1066,21 @@ async fn validator_enabling() {
})
.await
.assert_enabled_validators_count(2)
.await
.assert_validators_count(2)
.await
.set_validator_enabled(0, false)
.await
.assert_enabled_validators_count(1)
.await
.assert_validators_count(2)
.await
.set_validator_enabled(0, true)
.await
.assert_enabled_validators_count(2)
.assert_validators_count(2);
.await
.assert_validators_count(2)
.await;
}
#[tokio::test]
@@ -1061,7 +1095,9 @@ async fn validator_gas_limit() {
})
.await
.assert_enabled_validators_count(2)
.await
.assert_validators_count(2)
.await
.set_gas_limit(0, 500)
.await
.assert_gas_limit(0, 500)
@@ -1070,12 +1106,15 @@ async fn validator_gas_limit() {
.set_validator_enabled(0, false)
.await
.assert_enabled_validators_count(1)
.await
.assert_validators_count(2)
.await
.set_gas_limit(0, 1000)
.await
.set_validator_enabled(0, true)
.await
.assert_enabled_validators_count(2)
.await
.assert_gas_limit(0, 1000)
.await;
}
@@ -1092,19 +1131,24 @@ async fn validator_builder_proposals() {
})
.await
.assert_enabled_validators_count(2)
.await
.assert_validators_count(2)
.await
.set_builder_proposals(0, true)
.await
// Test setting builder proposals while the validator is disabled
.set_validator_enabled(0, false)
.await
.assert_enabled_validators_count(1)
.await
.assert_validators_count(2)
.await
.set_builder_proposals(0, false)
.await
.set_validator_enabled(0, true)
.await
.assert_enabled_validators_count(2)
.await
.assert_builder_proposals(0, false)
.await;
}
@@ -1121,19 +1165,24 @@ async fn validator_builder_boost_factor() {
})
.await
.assert_enabled_validators_count(2)
.await
.assert_validators_count(2)
.await
.set_builder_boost_factor(0, 120)
.await
// Test setting builder proposals while the validator is disabled
.set_validator_enabled(0, false)
.await
.assert_enabled_validators_count(1)
.await
.assert_validators_count(2)
.await
.set_builder_boost_factor(0, 80)
.await
.set_validator_enabled(0, true)
.await
.assert_enabled_validators_count(2)
.await
.assert_builder_boost_factor(0, Some(80))
.await;
}
@@ -1158,6 +1207,7 @@ async fn validator_derived_builder_boost_factor_with_process_defaults() {
})
.await
.assert_default_builder_boost_factor(Some(80))
.await
.assert_validator_derived_builder_boost_factor(0, Some(80))
.await
.set_builder_proposals(0, false)
@@ -1184,7 +1234,8 @@ async fn validator_builder_boost_factor_global_builder_proposals_true() {
};
ApiTester::new_with_config(config)
.await
.assert_default_builder_boost_factor(None);
.assert_default_builder_boost_factor(None)
.await;
}
#[tokio::test]
@@ -1197,7 +1248,8 @@ async fn validator_builder_boost_factor_global_builder_proposals_false() {
};
ApiTester::new_with_config(config)
.await
.assert_default_builder_boost_factor(Some(0));
.assert_default_builder_boost_factor(Some(0))
.await;
}
#[tokio::test]
@@ -1210,7 +1262,8 @@ async fn validator_builder_boost_factor_global_prefer_builder_proposals_true() {
};
ApiTester::new_with_config(config)
.await
.assert_default_builder_boost_factor(Some(u64::MAX));
.assert_default_builder_boost_factor(Some(u64::MAX))
.await;
}
#[tokio::test]
@@ -1223,7 +1276,8 @@ async fn validator_builder_boost_factor_global_prefer_builder_proposals_true_ove
};
ApiTester::new_with_config(config)
.await
.assert_default_builder_boost_factor(Some(u64::MAX));
.assert_default_builder_boost_factor(Some(u64::MAX))
.await;
}
#[tokio::test]
@@ -1238,19 +1292,24 @@ async fn prefer_builder_proposals_validator() {
})
.await
.assert_enabled_validators_count(2)
.await
.assert_validators_count(2)
.await
.set_prefer_builder_proposals(0, false)
.await
// Test setting builder proposals while the validator is disabled
.set_validator_enabled(0, false)
.await
.assert_enabled_validators_count(1)
.await
.assert_validators_count(2)
.await
.set_prefer_builder_proposals(0, true)
.await
.set_validator_enabled(0, true)
.await
.assert_enabled_validators_count(2)
.await
.assert_prefer_builder_proposals(0, true)
.await;
}
@@ -1267,7 +1326,9 @@ async fn validator_graffiti() {
})
.await
.assert_enabled_validators_count(2)
.await
.assert_validators_count(2)
.await
.set_graffiti(0, "Mr F was here")
.await
.assert_graffiti(0, "Mr F was here")
@@ -1276,12 +1337,15 @@ async fn validator_graffiti() {
.set_validator_enabled(0, false)
.await
.assert_enabled_validators_count(1)
.await
.assert_validators_count(2)
.await
.set_graffiti(0, "Mr F was here again")
.await
.set_validator_enabled(0, true)
.await
.assert_enabled_validators_count(2)
.await
.assert_graffiti(0, "Mr F was here again")
.await;
}
@@ -1298,7 +1362,9 @@ async fn validator_graffiti_api() {
})
.await
.assert_enabled_validators_count(2)
.await
.assert_validators_count(2)
.await
.set_graffiti(0, "Mr F was here")
.await
.test_get_graffiti(0, "Mr F was here")
@@ -1316,28 +1382,36 @@ async fn keystore_validator_creation() {
ApiTester::new()
.await
.assert_enabled_validators_count(0)
.await
.assert_validators_count(0)
.await
.create_keystore_validators(KeystoreValidatorScenario {
correct_password: true,
enabled: true,
})
.await
.assert_enabled_validators_count(1)
.await
.assert_validators_count(1)
.await
.create_keystore_validators(KeystoreValidatorScenario {
correct_password: false,
enabled: true,
})
.await
.assert_enabled_validators_count(1)
.await
.assert_validators_count(1)
.await
.create_keystore_validators(KeystoreValidatorScenario {
correct_password: true,
enabled: false,
})
.await
.assert_enabled_validators_count(1)
.assert_validators_count(2);
.await
.assert_validators_count(2)
.await;
}
#[tokio::test]
@@ -1345,12 +1419,16 @@ async fn web3signer_validator_creation() {
ApiTester::new()
.await
.assert_enabled_validators_count(0)
.await
.assert_validators_count(0)
.await
.create_web3signer_validators(Web3SignerValidatorScenario {
count: 1,
enabled: true,
})
.await
.assert_enabled_validators_count(1)
.assert_validators_count(1);
.await
.assert_validators_count(1)
.await;
}

View File

@@ -2092,8 +2092,8 @@ async fn import_remotekey_web3signer_disabled() {
.unwrap();
// 1 validator imported.
assert_eq!(tester.vals_total(), 1);
assert_eq!(tester.vals_enabled(), 0);
assert_eq!(tester.vals_total().await, 1);
assert_eq!(tester.vals_enabled().await, 0);
// Import remotekeys.
let import_res = tester
@@ -2109,8 +2109,8 @@ async fn import_remotekey_web3signer_disabled() {
);
// Still only one validator. Web3signer is overwritten by remotekey.
assert_eq!(tester.vals_total(), 1);
assert_eq!(tester.vals_enabled(), 1);
assert_eq!(tester.vals_total().await, 1);
assert_eq!(tester.vals_enabled().await, 1);
// Remotekey overwrites web3signer.
let expected_responses = vec![SingleListRemotekeysResponse {
@@ -2147,11 +2147,12 @@ async fn import_remotekey_web3signer_enabled() {
.unwrap();
// 1 validator imported.
assert_eq!(tester.vals_total(), 1);
assert_eq!(tester.vals_enabled(), 1);
assert_eq!(tester.vals_total().await, 1);
assert_eq!(tester.vals_enabled().await, 1);
let web3_vals = tester
.initialized_validators
.read()
.await
.validator_definitions()
.to_vec();
@@ -2168,10 +2169,10 @@ async fn import_remotekey_web3signer_enabled() {
all_with_status(1, ImportRemotekeyStatus::Duplicate),
);
assert_eq!(tester.vals_total(), 1);
assert_eq!(tester.vals_enabled(), 1);
assert_eq!(tester.vals_total().await, 1);
assert_eq!(tester.vals_enabled().await, 1);
{
let vals = tester.initialized_validators.read();
let vals = tester.initialized_validators.read().await;
let remote_vals = vals.validator_definitions();
// Web3signer should not be overwritten since it is enabled.

View File

@@ -14,6 +14,7 @@ metrics = { workspace = true }
parking_lot = { workspace = true }
serde = { workspace = true }
slot_clock = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
types = { workspace = true }
validator_metrics = { workspace = true }

View File

@@ -6,13 +6,13 @@ use lighthouse_validator_store::LighthouseValidatorStore;
use lighthouse_version::version_with_platform;
use logging::crit;
use malloc_utils::scrape_allocator_metrics;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use slot_clock::{SlotClock, SystemTimeSlotClock};
use std::future::Future;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;
use tracing::info;
use types::EthSpec;
use validator_services::duties_service::DutiesService;
@@ -124,6 +124,7 @@ pub fn serve<E: EthSpec>(
.and_then(|ctx: Arc<Context<E>>| async move {
Ok::<_, warp::Rejection>(
gather_prometheus_metrics(&ctx)
.await
.map(|body| {
Response::builder()
.status(200)
@@ -159,7 +160,7 @@ pub fn serve<E: EthSpec>(
Ok((listening_socket, server))
}
pub fn gather_prometheus_metrics<E: EthSpec>(
pub async fn gather_prometheus_metrics<E: EthSpec>(
ctx: &Context<E>,
) -> std::result::Result<String, String> {
use validator_metrics::*;
@@ -167,7 +168,7 @@ pub fn gather_prometheus_metrics<E: EthSpec>(
let encoder = TextEncoder::new();
{
let shared = ctx.shared.read();
let shared = ctx.shared.read().await;
if let Some(genesis_time) = shared.genesis_time {
if let Ok(now) = SystemTime::now().duration_since(UNIX_EPOCH) {
@@ -184,17 +185,17 @@ pub fn gather_prometheus_metrics<E: EthSpec>(
set_int_gauge(
&PROPOSER_COUNT,
&[CURRENT_EPOCH],
duties_service.proposer_count(current_epoch) as i64,
duties_service.proposer_count(current_epoch).await as i64,
);
set_int_gauge(
&ATTESTER_COUNT,
&[CURRENT_EPOCH],
duties_service.attester_count(current_epoch) as i64,
duties_service.attester_count(current_epoch).await as i64,
);
set_int_gauge(
&ATTESTER_COUNT,
&[NEXT_EPOCH],
duties_service.attester_count(next_epoch) as i64,
duties_service.attester_count(next_epoch).await as i64,
);
}
}

View File

@@ -2,7 +2,6 @@ use account_utils::validator_definitions::{PasswordStorage, ValidatorDefinition}
use doppelganger_service::DoppelgangerService;
use initialized_validators::InitializedValidators;
use logging::crit;
use parking_lot::{Mutex, RwLock};
use serde::{Deserialize, Serialize};
use signing_method::Error as SigningError;
use signing_method::{SignableMessage, SigningContext, SigningMethod};
@@ -14,6 +13,7 @@ use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::sync::{Mutex, RwLock};
use tracing::{error, info, warn};
use types::{
graffiti::GraffitiString, AbstractExecPayload, Address, AggregateAndProof, Attestation,
@@ -112,9 +112,9 @@ impl<T: SlotClock + 'static, E: EthSpec> LighthouseValidatorStore<T, E> {
/// duplicate validators operating on the network at the same time.
///
/// This function has no effect if doppelganger protection is disabled.
pub fn register_all_in_doppelganger_protection_if_enabled(&self) -> Result<(), String> {
pub async fn register_all_in_doppelganger_protection_if_enabled(&self) -> Result<(), String> {
if let Some(doppelganger_service) = &self.doppelganger_service {
for pubkey in self.validators.read().iter_voting_pubkeys() {
for pubkey in self.validators.read().await.iter_voting_pubkeys() {
doppelganger_service.register_new_validator(
*pubkey,
&self.slot_clock,
@@ -136,9 +136,10 @@ impl<T: SlotClock + 'static, E: EthSpec> LighthouseValidatorStore<T, E> {
}
/// Indicates if the `voting_public_key` exists in self and is enabled.
pub fn has_validator(&self, voting_public_key: &PublicKeyBytes) -> bool {
pub async fn has_validator(&self, voting_public_key: &PublicKeyBytes) -> bool {
self.validators
.read()
.await
.validator(voting_public_key)
.is_some()
}
@@ -182,8 +183,6 @@ impl<T: SlotClock + 'static, E: EthSpec> LighthouseValidatorStore<T, E> {
/// - Adding the validator definition to the YAML file, saving it to the filesystem.
/// - Enabling the validator with the slashing protection database.
/// - If `enable == true`, starting to perform duties for the validator.
// FIXME: ignore this clippy lint until the validator store is refactored to use async locks
#[allow(clippy::await_holding_lock)]
pub async fn add_validator(
&self,
validator_def: ValidatorDefinition,
@@ -204,6 +203,7 @@ impl<T: SlotClock + 'static, E: EthSpec> LighthouseValidatorStore<T, E> {
self.validators
.write()
.await
.add_definition_replace_disabled(validator_def.clone())
.await
.map_err(|e| format!("Unable to add definition: {:?}", e))?;
@@ -213,12 +213,13 @@ impl<T: SlotClock + 'static, E: EthSpec> LighthouseValidatorStore<T, E> {
/// Returns doppelganger statuses for all enabled validators.
#[allow(clippy::needless_collect)] // Collect is required to avoid holding a lock.
pub fn doppelganger_statuses(&self) -> Vec<DoppelgangerStatus> {
pub async fn doppelganger_statuses(&self) -> Vec<DoppelgangerStatus> {
// Collect all the pubkeys first to avoid interleaving locks on `self.validators` and
// `self.doppelganger_service`.
let pubkeys = self
.validators
.read()
.await
.iter_voting_pubkeys()
.cloned()
.collect::<Vec<_>>();
@@ -241,13 +242,14 @@ impl<T: SlotClock + 'static, E: EthSpec> LighthouseValidatorStore<T, E> {
/// Returns a `SigningMethod` for `validator_pubkey` *only if* that validator is considered safe
/// by doppelganger protection.
fn doppelganger_checked_signing_method(
async fn doppelganger_checked_signing_method(
&self,
validator_pubkey: PublicKeyBytes,
) -> Result<Arc<SigningMethod>, Error> {
if self.doppelganger_protection_allows_signing(validator_pubkey) {
self.validators
.read()
.await
.signing_method(&validator_pubkey)
.ok_or(Error::UnknownPubkey(validator_pubkey))
} else {
@@ -261,12 +263,13 @@ impl<T: SlotClock + 'static, E: EthSpec> LighthouseValidatorStore<T, E> {
/// ## Warning
///
/// This method should only be used for signing non-slashable messages.
fn doppelganger_bypassed_signing_method(
async fn doppelganger_bypassed_signing_method(
&self,
validator_pubkey: PublicKeyBytes,
) -> Result<Arc<SigningMethod>, Error> {
self.validators
.read()
.await
.signing_method(&validator_pubkey)
.ok_or(Error::UnknownPubkey(validator_pubkey))
}
@@ -310,9 +313,10 @@ impl<T: SlotClock + 'static, E: EthSpec> LighthouseValidatorStore<T, E> {
/// Returns the suggested_fee_recipient from `validator_definitions.yml` if any.
/// This has been pulled into a private function so the read lock is dropped easily
fn suggested_fee_recipient(&self, validator_pubkey: &PublicKeyBytes) -> Option<Address> {
async fn suggested_fee_recipient(&self, validator_pubkey: &PublicKeyBytes) -> Option<Address> {
self.validators
.read()
.await
.suggested_fee_recipient(validator_pubkey)
}
@@ -322,8 +326,8 @@ impl<T: SlotClock + 'static, E: EthSpec> LighthouseValidatorStore<T, E> {
/// 1. validator_definitions.yml
/// 2. process level gas limit
/// 3. `DEFAULT_GAS_LIMIT`
pub fn get_gas_limit(&self, validator_pubkey: &PublicKeyBytes) -> u64 {
self.get_gas_limit_defaulting(self.validators.read().gas_limit(validator_pubkey))
pub async fn get_gas_limit(&self, validator_pubkey: &PublicKeyBytes) -> u64 {
self.get_gas_limit_defaulting(self.validators.read().await.gas_limit(validator_pubkey))
}
fn get_gas_limit_defaulting(&self, gas_limit: Option<u64>) -> u64 {
@@ -344,11 +348,17 @@ impl<T: SlotClock + 'static, E: EthSpec> LighthouseValidatorStore<T, E> {
///
/// This function is currently only used in tests because in prod it is translated and combined
/// with other flags into a builder boost factor (see `determine_builder_boost_factor`).
pub fn get_builder_proposals_testing_only(&self, validator_pubkey: &PublicKeyBytes) -> bool {
pub async fn get_builder_proposals_testing_only(
&self,
validator_pubkey: &PublicKeyBytes,
) -> bool {
// If there is a `suggested_fee_recipient` in the validator definitions yaml
// file, use that value.
self.get_builder_proposals_defaulting(
self.validators.read().builder_proposals(validator_pubkey),
self.validators
.read()
.await
.builder_proposals(validator_pubkey),
)
}
@@ -365,12 +375,13 @@ impl<T: SlotClock + 'static, E: EthSpec> LighthouseValidatorStore<T, E> {
///
/// This function is currently only used in tests because in prod it is translated and combined
/// with other flags into a builder boost factor (see `determine_builder_boost_factor`).
pub fn get_builder_boost_factor_testing_only(
pub async fn get_builder_boost_factor_testing_only(
&self,
validator_pubkey: &PublicKeyBytes,
) -> Option<u64> {
self.validators
.read()
.await
.builder_boost_factor(validator_pubkey)
.or(self.builder_boost_factor)
}
@@ -383,12 +394,13 @@ impl<T: SlotClock + 'static, E: EthSpec> LighthouseValidatorStore<T, E> {
///
/// This function is currently only used in tests because in prod it is translated and combined
/// with other flags into a builder boost factor (see `determine_builder_boost_factor`).
pub fn get_prefer_builder_proposals_testing_only(
pub async fn get_prefer_builder_proposals_testing_only(
&self,
validator_pubkey: &PublicKeyBytes,
) -> bool {
self.validators
.read()
.await
.prefer_builder_proposals(validator_pubkey)
.unwrap_or(self.prefer_builder_proposals)
}
@@ -458,7 +470,9 @@ impl<T: SlotClock + 'static, E: EthSpec> LighthouseValidatorStore<T, E> {
let signing_context = self.signing_context(Domain::BeaconProposer, signing_epoch);
let domain_hash = signing_context.domain_hash(&self.spec);
let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?;
let signing_method = self
.doppelganger_checked_signing_method(validator_pubkey)
.await?;
// Check for slashing conditions.
let slashing_status = if signing_method
@@ -532,7 +546,9 @@ impl<T: SlotClock + 'static, E: EthSpec> LighthouseValidatorStore<T, E> {
) -> Result<SignedVoluntaryExit, Error> {
let signing_epoch = voluntary_exit.epoch;
let signing_context = self.signing_context(Domain::VoluntaryExit, signing_epoch);
let signing_method = self.doppelganger_bypassed_signing_method(validator_pubkey)?;
let signing_method = self
.doppelganger_bypassed_signing_method(validator_pubkey)
.await?;
let signature = signing_method
.get_signature::<E, BlindedPayload<E>>(
@@ -565,8 +581,8 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore for LighthouseValidatorS
///
/// - Unknown.
/// - Known, but with an unknown index.
fn validator_index(&self, pubkey: &PublicKeyBytes) -> Option<u64> {
self.validators.read().get_index(pubkey)
async fn validator_index(&self, pubkey: &PublicKeyBytes) -> Option<u64> {
self.validators.read().await.get_index(pubkey)
}
/// Returns all voting pubkeys for all enabled validators.
@@ -579,7 +595,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore for LighthouseValidatorS
/// - `DoppelgangerStatus::ignored`: returns all the pubkeys from `only_safe` *plus* those still
/// undergoing protection. This is useful for collecting duties or other non-signing tasks.
#[allow(clippy::needless_collect)] // Collect is required to avoid holding a lock.
fn voting_pubkeys<I, F>(&self, filter_func: F) -> I
async fn voting_pubkeys<I, F>(&self, filter_func: F) -> I
where
I: FromIterator<PublicKeyBytes>,
F: Fn(DoppelgangerStatus) -> Option<PublicKeyBytes>,
@@ -589,6 +605,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore for LighthouseValidatorS
let pubkeys = self
.validators
.read()
.await
.iter_voting_pubkeys()
.cloned()
.collect::<Vec<_>>();
@@ -621,22 +638,22 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore for LighthouseValidatorS
})
}
fn num_voting_validators(&self) -> usize {
self.validators.read().num_enabled()
async fn num_voting_validators(&self) -> usize {
self.validators.read().await.num_enabled()
}
fn graffiti(&self, validator_pubkey: &PublicKeyBytes) -> Option<Graffiti> {
self.validators.read().graffiti(validator_pubkey)
async fn graffiti(&self, validator_pubkey: &PublicKeyBytes) -> Option<Graffiti> {
self.validators.read().await.graffiti(validator_pubkey)
}
/// Returns the fee recipient for the given public key. The priority order for fetching
/// the fee recipient is:
/// 1. validator_definitions.yml
/// 2. process level fee recipient
fn get_fee_recipient(&self, validator_pubkey: &PublicKeyBytes) -> Option<Address> {
async fn get_fee_recipient(&self, validator_pubkey: &PublicKeyBytes) -> Option<Address> {
// If there is a `suggested_fee_recipient` in the validator definitions yaml
// file, use that value.
self.get_fee_recipient_defaulting(self.suggested_fee_recipient(validator_pubkey))
self.get_fee_recipient_defaulting(self.suggested_fee_recipient(validator_pubkey).await)
}
/// Translate the per validator and per process `builder_proposals`, `builder_boost_factor` and
@@ -651,29 +668,30 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore for LighthouseValidatorS
/// - If `builder_proposals` is set to false, set boost factor to 0 to indicate a preference for
/// local payloads.
/// - Else return `None` to indicate no preference between builder and local payloads.
fn determine_builder_boost_factor(&self, validator_pubkey: &PublicKeyBytes) -> Option<u64> {
async fn determine_builder_boost_factor(
&self,
validator_pubkey: &PublicKeyBytes,
) -> Option<u64> {
let validator_prefer_builder_proposals = self
.validators
.read()
.await
.prefer_builder_proposals(validator_pubkey);
if matches!(validator_prefer_builder_proposals, Some(true)) {
return Some(u64::MAX);
}
let factor = self
.validators
.read()
let validators = self.validators.read().await;
let factor = validators
.builder_boost_factor(validator_pubkey)
.or_else(|| {
if matches!(
self.validators.read().builder_proposals(validator_pubkey),
Some(false)
) {
if matches!(validators.builder_proposals(validator_pubkey), Some(false)) {
return Some(0);
}
None
});
drop(validators);
factor
.or_else(|| {
@@ -705,7 +723,9 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore for LighthouseValidatorS
validator_pubkey: PublicKeyBytes,
signing_epoch: Epoch,
) -> Result<Signature, Error> {
let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?;
let signing_method = self
.doppelganger_checked_signing_method(validator_pubkey)
.await?;
let signing_context = self.signing_context(Domain::Randao, signing_epoch);
let signature = signing_method
@@ -720,9 +740,10 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore for LighthouseValidatorS
Ok(signature)
}
fn set_validator_index(&self, validator_pubkey: &PublicKeyBytes, index: u64) {
async fn set_validator_index(&self, validator_pubkey: &PublicKeyBytes, index: u64) {
self.initialized_validators()
.write()
.await
.set_index(validator_pubkey, index);
}
@@ -760,7 +781,9 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore for LighthouseValidatorS
}
// Get the signing method and check doppelganger protection.
let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?;
let signing_method = self
.doppelganger_checked_signing_method(validator_pubkey)
.await?;
// Checking for slashing conditions.
let signing_epoch = attestation.data().target.epoch;
@@ -842,8 +865,9 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore for LighthouseValidatorS
let domain_hash = self.spec.get_builder_domain();
let signing_root = validator_registration_data.signing_root(domain_hash);
let signing_method =
self.doppelganger_bypassed_signing_method(validator_registration_data.pubkey)?;
let signing_method = self
.doppelganger_bypassed_signing_method(validator_registration_data.pubkey)
.await?;
let signature = signing_method
.get_signature_from_root::<E, BlindedPayload<E>>(
SignableMessage::ValidatorRegistration(&validator_registration_data),
@@ -881,7 +905,9 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore for LighthouseValidatorS
let message =
AggregateAndProof::from_attestation(aggregator_index, aggregate, selection_proof);
let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?;
let signing_method = self
.doppelganger_checked_signing_method(validator_pubkey)
.await?;
let signature = signing_method
.get_signature::<E, BlindedPayload<E>>(
SignableMessage::SignedAggregateAndProof(message.to_ref()),
@@ -918,7 +944,9 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore for LighthouseValidatorS
//
// As long as we disallow `SignedAggregateAndProof` then these selection proofs will never
// be published on the network.
let signing_method = self.doppelganger_bypassed_signing_method(validator_pubkey)?;
let signing_method = self
.doppelganger_bypassed_signing_method(validator_pubkey)
.await?;
let signature = signing_method
.get_signature::<E, BlindedPayload<E>>(
@@ -950,7 +978,9 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore for LighthouseValidatorS
self.signing_context(Domain::SyncCommitteeSelectionProof, signing_epoch);
// Bypass `with_validator_signing_method`: sync committee messages are not slashable.
let signing_method = self.doppelganger_bypassed_signing_method(*validator_pubkey)?;
let signing_method = self
.doppelganger_bypassed_signing_method(*validator_pubkey)
.await?;
validator_metrics::inc_counter_vec(
&validator_metrics::SIGNED_SYNC_SELECTION_PROOFS_TOTAL,
@@ -986,7 +1016,9 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore for LighthouseValidatorS
let signing_context = self.signing_context(Domain::SyncCommittee, signing_epoch);
// Bypass `with_validator_signing_method`: sync committee messages are not slashable.
let signing_method = self.doppelganger_bypassed_signing_method(*validator_pubkey)?;
let signing_method = self
.doppelganger_bypassed_signing_method(*validator_pubkey)
.await?;
let signature = signing_method
.get_signature::<E, BlindedPayload<E>>(
@@ -1025,7 +1057,9 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore for LighthouseValidatorS
let signing_context = self.signing_context(Domain::ContributionAndProof, signing_epoch);
// Bypass `with_validator_signing_method`: sync committee messages are not slashable.
let signing_method = self.doppelganger_bypassed_signing_method(aggregator_pubkey)?;
let signing_method = self
.doppelganger_bypassed_signing_method(aggregator_pubkey)
.await?;
let message = ContributionAndProof {
aggregator_index,
@@ -1056,10 +1090,10 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore for LighthouseValidatorS
/// This function will only do actual pruning periodically, so it should usually be
/// cheap to call. The `first_run` flag can be used to print a more verbose message when pruning
/// runs.
fn prune_slashing_protection_db(&self, current_epoch: Epoch, first_run: bool) {
async fn prune_slashing_protection_db(&self, current_epoch: Epoch, first_run: bool) {
// Attempt to prune every SLASHING_PROTECTION_HISTORY_EPOCHs, with a tolerance for
// missing the epoch that aligns exactly.
let mut last_prune = self.slashing_protection_last_prune.lock();
let mut last_prune = self.slashing_protection_last_prune.lock().await;
if current_epoch / SLASHING_PROTECTION_HISTORY_EPOCHS
<= *last_prune / SLASHING_PROTECTION_HISTORY_EPOCHS
{
@@ -1082,7 +1116,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore for LighthouseValidatorS
let new_min_target_epoch = current_epoch.saturating_sub(SLASHING_PROTECTION_HISTORY_EPOCHS);
let new_min_slot = new_min_target_epoch.start_slot(E::slots_per_epoch());
let all_pubkeys: Vec<_> = self.voting_pubkeys(DoppelgangerStatus::ignored);
let all_pubkeys: Vec<_> = self.voting_pubkeys(DoppelgangerStatus::ignored).await;
if let Err(e) = self
.slashing_protection
@@ -1114,9 +1148,10 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore for LighthouseValidatorS
/// Returns `ProposalData` for the provided `pubkey` if it exists in `InitializedValidators`.
/// `ProposalData` fields include defaulting logic described in `get_fee_recipient_defaulting`,
/// `get_gas_limit_defaulting`, and `get_builder_proposals_defaulting`.
fn proposal_data(&self, pubkey: &PublicKeyBytes) -> Option<ProposalData> {
async fn proposal_data(&self, pubkey: &PublicKeyBytes) -> Option<ProposalData> {
self.validators
.read()
.await
.validator(pubkey)
.map(|validator| ProposalData {
validator_index: validator.get_index(),

View File

@@ -22,7 +22,6 @@ use eth2::{reqwest::ClientBuilder, BeaconNodeHttpClient, StatusCode, Timeouts};
use initialized_validators::Error::UnableToOpenVotingKeystore;
use lighthouse_validator_store::LighthouseValidatorStore;
use notifier::spawn_notifier;
use parking_lot::RwLock;
use reqwest::Certificate;
use slot_clock::SlotClock;
use slot_clock::SystemTimeSlotClock;
@@ -32,6 +31,7 @@ use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;
use tokio::{
sync::mpsc,
time::{sleep, Duration},
@@ -390,7 +390,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
// Update the metrics server.
if let Some(ctx) = &validator_metrics_ctx {
ctx.shared.write().genesis_time = Some(genesis_time);
ctx.shared.write().await.genesis_time = Some(genesis_time);
}
let slot_clock = SystemTimeSlotClock::new(
@@ -426,18 +426,21 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
));
// Ensure all validators are registered in doppelganger protection.
validator_store.register_all_in_doppelganger_protection_if_enabled()?;
validator_store
.register_all_in_doppelganger_protection_if_enabled()
.await?;
info!(
voting_validators = validator_store.num_voting_validators(),
"Loaded validator keypair store"
);
let voting_validators = validator_store.num_voting_validators().await;
info!(voting_validators, "Loaded validator keypair store");
// Perform pruning of the slashing protection database on start-up. In case the database is
// oversized from having not been pruned (by a prior version) we don't want to prune
// concurrently, as it will hog the lock and cause the attestation service to spew CRITs.
if let Some(slot) = slot_clock.now() {
validator_store.prune_slashing_protection_db(slot.epoch(E::slots_per_epoch()), true);
validator_store
.prune_slashing_protection_db(slot.epoch(E::slots_per_epoch()), true)
.await;
}
let duties_service = Arc::new(
@@ -455,8 +458,8 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
// Update the metrics server.
if let Some(ctx) = &validator_metrics_ctx {
ctx.shared.write().validator_store = Some(validator_store.clone());
ctx.shared.write().duties_service = Some(duties_service.clone());
ctx.shared.write().await.validator_store = Some(validator_store.clone());
ctx.shared.write().await.duties_service = Some(duties_service.clone());
}
let mut block_service_builder = BlockServiceBuilder::new()

View File

@@ -104,10 +104,10 @@ async fn notify<T: SlotClock + 'static, E: EthSpec>(
if let Some(slot) = duties_service.slot_clock.now() {
let epoch = slot.epoch(E::slots_per_epoch());
let total_validators = duties_service.total_validator_count();
let proposing_validators = duties_service.proposer_count(epoch);
let attesting_validators = duties_service.attester_count(epoch);
let doppelganger_detecting_validators = duties_service.doppelganger_detecting_count();
let total_validators = duties_service.total_validator_count().await;
let proposing_validators = duties_service.proposer_count(epoch).await;
let attesting_validators = duties_service.attester_count(epoch).await;
let doppelganger_detecting_validators = duties_service.doppelganger_detecting_count().await;
if doppelganger_detecting_validators > 0 {
info!(

View File

@@ -163,7 +163,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
if let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() {
sleep(duration_to_next_slot + slot_duration / 3).await;
if let Err(e) = self.spawn_attestation_tasks(slot_duration) {
if let Err(e) = self.spawn_attestation_tasks(slot_duration).await {
crit!(error = e, "Failed to spawn attestation tasks")
} else {
trace!("Spawned attestation tasks");
@@ -183,7 +183,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
/// For each each required attestation, spawn a new task that downloads, signs and uploads the
/// attestation to the beacon node.
fn spawn_attestation_tasks(&self, slot_duration: Duration) -> Result<(), String> {
async fn spawn_attestation_tasks(&self, slot_duration: Duration) -> Result<(), String> {
let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?;
let duration_to_next_slot = self
.slot_clock
@@ -197,16 +197,16 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
.checked_sub(slot_duration / 3)
.unwrap_or_else(|| Duration::from_secs(0));
let duties_by_committee_index: HashMap<CommitteeIndex, Vec<DutyAndProof>> = self
.duties_service
.attesters(slot)
.into_iter()
.fold(HashMap::new(), |mut map, duty_and_proof| {
map.entry(duty_and_proof.duty.committee_index)
.or_default()
.push(duty_and_proof);
map
});
let duties_by_committee_index: HashMap<CommitteeIndex, Vec<DutyAndProof>> =
self.duties_service.attesters(slot).await.into_iter().fold(
HashMap::new(),
|mut map, duty_and_proof| {
map.entry(duty_and_proof.duty.committee_index)
.or_default()
.push(duty_and_proof);
map
},
);
// For each committee index for this slot:
//
@@ -704,11 +704,12 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
async move {
sleep_until(pruning_instant).await;
executor.spawn_blocking(
move || {
executor.spawn(
async move {
attestation_service
.validator_store
.prune_slashing_protection_db(current_epoch, false)
.await
},
"slashing_protection_pruning",
)

View File

@@ -296,12 +296,12 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> BlockService<S, T> {
}
for validator_pubkey in proposers {
let builder_boost_factor = self
.validator_store
.determine_builder_boost_factor(&validator_pubkey);
let service = self.clone();
self.inner.executor.spawn(
async move {
let builder_boost_factor = service
.validator_store
.determine_builder_boost_factor(&validator_pubkey).await;
let result = service
.publish_block(slot, validator_pubkey, builder_boost_factor)
.await;
@@ -448,13 +448,16 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> BlockService<S, T> {
let graffiti = determine_graffiti(
&validator_pubkey,
self.graffiti_file.clone(),
self.validator_store.graffiti(&validator_pubkey),
self.validator_store.graffiti(&validator_pubkey).await,
self.graffiti,
);
let randao_reveal_ref = &randao_reveal;
let self_ref = &self;
let proposer_index = self.validator_store.validator_index(&validator_pubkey);
let proposer_index = self
.validator_store
.validator_index(&validator_pubkey)
.await;
let proposer_fallback = ProposerFallback {
beacon_nodes: self.beacon_nodes.clone(),
proposer_nodes: self.proposer_nodes.clone(),

View File

@@ -341,16 +341,17 @@ pub struct DutiesService<S, T> {
impl<S: ValidatorStore, T: SlotClock + 'static> DutiesService<S, T> {
/// Returns the total number of validators known to the duties service.
pub fn total_validator_count(&self) -> usize {
self.validator_store.num_voting_validators()
pub async fn total_validator_count(&self) -> usize {
self.validator_store.num_voting_validators().await
}
/// Returns the total number of validators that should propose in the given epoch.
pub fn proposer_count(&self, epoch: Epoch) -> usize {
pub async fn proposer_count(&self, epoch: Epoch) -> usize {
// Only collect validators that are considered safe in terms of doppelganger protection.
let signing_pubkeys: HashSet<_> = self
.validator_store
.voting_pubkeys(DoppelgangerStatus::only_safe);
.voting_pubkeys(DoppelgangerStatus::only_safe)
.await;
self.proposers
.read()
@@ -364,11 +365,12 @@ impl<S: ValidatorStore, T: SlotClock + 'static> DutiesService<S, T> {
}
/// Returns the total number of validators that should attest in the given epoch.
pub fn attester_count(&self, epoch: Epoch) -> usize {
pub async fn attester_count(&self, epoch: Epoch) -> usize {
// Only collect validators that are considered safe in terms of doppelganger protection.
let signing_pubkeys: HashSet<_> = self
.validator_store
.voting_pubkeys(DoppelgangerStatus::only_safe);
.voting_pubkeys(DoppelgangerStatus::only_safe)
.await;
self.attesters
.read()
.iter()
@@ -379,9 +381,10 @@ impl<S: ValidatorStore, T: SlotClock + 'static> DutiesService<S, T> {
}
/// Returns the total number of validators that are in a doppelganger detection period.
pub fn doppelganger_detecting_count(&self) -> usize {
pub async fn doppelganger_detecting_count(&self) -> usize {
self.validator_store
.voting_pubkeys::<HashSet<_>, _>(DoppelgangerStatus::only_unsafe)
.await
.len()
}
@@ -389,13 +392,14 @@ impl<S: ValidatorStore, T: SlotClock + 'static> DutiesService<S, T> {
///
/// It is possible that multiple validators have an identical proposal slot, however that is
/// likely the result of heavy forking (lol) or inconsistent beacon node connections.
pub fn block_proposers(&self, slot: Slot) -> HashSet<PublicKeyBytes> {
pub async fn block_proposers(&self, slot: Slot) -> HashSet<PublicKeyBytes> {
let epoch = slot.epoch(S::E::slots_per_epoch());
// Only collect validators that are considered safe in terms of doppelganger protection.
let signing_pubkeys: HashSet<_> = self
.validator_store
.voting_pubkeys(DoppelgangerStatus::only_safe);
.voting_pubkeys(DoppelgangerStatus::only_safe)
.await;
self.proposers
.read()
@@ -414,13 +418,14 @@ impl<S: ValidatorStore, T: SlotClock + 'static> DutiesService<S, T> {
}
/// Returns all `ValidatorDuty` for the given `slot`.
pub fn attesters(&self, slot: Slot) -> Vec<DutyAndProof> {
pub async fn attesters(&self, slot: Slot) -> Vec<DutyAndProof> {
let epoch = slot.epoch(S::E::slots_per_epoch());
// Only collect validators that are considered safe in terms of doppelganger protection.
let signing_pubkeys: HashSet<_> = self
.validator_store
.voting_pubkeys(DoppelgangerStatus::only_safe);
.voting_pubkeys(DoppelgangerStatus::only_safe)
.await;
self.attesters
.read()
@@ -436,9 +441,9 @@ impl<S: ValidatorStore, T: SlotClock + 'static> DutiesService<S, T> {
}
/// Returns `true` if we should collect per validator metrics and `false` otherwise.
pub fn per_validator_metrics(&self) -> bool {
pub async fn per_validator_metrics(&self) -> bool {
self.enable_high_validator_count_metrics
|| self.total_validator_count() <= VALIDATOR_METRICS_MIN_COUNT
|| self.total_validator_count().await <= VALIDATOR_METRICS_MIN_COUNT
}
}
@@ -586,13 +591,15 @@ async fn poll_validator_indices<S: ValidatorStore, T: SlotClock + 'static>(
// collect those indices.
let all_pubkeys: Vec<_> = duties_service
.validator_store
.voting_pubkeys(DoppelgangerStatus::ignored);
.voting_pubkeys(DoppelgangerStatus::ignored)
.await;
for pubkey in all_pubkeys {
// This is on its own line to avoid some weirdness with locks and if statements.
let is_known = duties_service
.validator_store
.validator_index(&pubkey)
.await
.is_some();
if !is_known {
@@ -634,6 +641,7 @@ async fn poll_validator_indices<S: ValidatorStore, T: SlotClock + 'static>(
let fee_recipient = duties_service
.validator_store
.get_fee_recipient(&pubkey)
.await
.map(|fr| fr.to_string())
.unwrap_or_else(|| {
"Fee recipient for validator not set in validator_definitions.yml \
@@ -650,7 +658,8 @@ async fn poll_validator_indices<S: ValidatorStore, T: SlotClock + 'static>(
);
duties_service
.validator_store
.set_validator_index(&pubkey, response.data.index);
.set_validator_index(&pubkey, response.data.index)
.await;
duties_service
.unknown_validator_next_poll_slots
@@ -713,13 +722,18 @@ async fn poll_beacon_attesters<S: ValidatorStore + 'static, T: SlotClock + 'stat
// and get more information about other running instances.
let local_pubkeys: HashSet<_> = duties_service
.validator_store
.voting_pubkeys(DoppelgangerStatus::ignored);
.voting_pubkeys(DoppelgangerStatus::ignored)
.await;
let local_indices = {
let mut local_indices = Vec::with_capacity(local_pubkeys.len());
for &pubkey in &local_pubkeys {
if let Some(validator_index) = duties_service.validator_store.validator_index(&pubkey) {
if let Some(validator_index) = duties_service
.validator_store
.validator_index(&pubkey)
.await
{
local_indices.push(validator_index)
}
}
@@ -743,7 +757,7 @@ async fn poll_beacon_attesters<S: ValidatorStore + 'static, T: SlotClock + 'stat
)
}
update_per_validator_duty_metrics(duties_service, current_epoch, current_slot);
update_per_validator_duty_metrics(duties_service, current_epoch, current_slot).await;
drop(current_epoch_timer);
let next_epoch_timer = validator_metrics::start_timer_vec(
@@ -764,7 +778,7 @@ async fn poll_beacon_attesters<S: ValidatorStore + 'static, T: SlotClock + 'stat
)
}
update_per_validator_duty_metrics(duties_service, next_epoch, current_slot);
update_per_validator_duty_metrics(duties_service, next_epoch, current_slot).await;
drop(next_epoch_timer);
let subscriptions_timer = validator_metrics::start_timer_vec(
@@ -906,7 +920,7 @@ async fn poll_beacon_attesters_for_epoch<S: ValidatorStore + 'static, T: SlotClo
// determine whether validator duties need to be updated. This is to ensure that we don't
// request for extra data unless necessary in order to save on network bandwidth.
let uninitialized_validators =
get_uninitialized_validators(duties_service, &epoch, local_pubkeys);
get_uninitialized_validators(duties_service, &epoch, local_pubkeys).await;
let initial_indices_to_request = if !uninitialized_validators.is_empty() {
uninitialized_validators.as_slice()
} else {
@@ -940,11 +954,15 @@ async fn poll_beacon_attesters_for_epoch<S: ValidatorStore + 'static, T: SlotClo
// Make a request for all indices that require updating which we have not already made a request
// for.
let indices_to_request = validators_to_update
.iter()
.filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey))
.filter(|validator_index| !initial_indices_to_request.contains(validator_index))
.collect::<Vec<_>>();
let mut indices_to_request = vec![];
for pubkey in &validators_to_update {
if let Some(validator_index) = duties_service.validator_store.validator_index(pubkey).await
{
if !initial_indices_to_request.contains(&validator_index) {
indices_to_request.push(validator_index);
}
}
}
// Filter the initial duties by their relevance so that we don't hit the warning below about
// overwriting duties. There was previously a bug here.
@@ -1041,29 +1059,39 @@ async fn poll_beacon_attesters_for_epoch<S: ValidatorStore + 'static, T: SlotClo
}
/// Get a filtered list of local validators for which we don't already know their duties for that epoch
fn get_uninitialized_validators<S: ValidatorStore, T: SlotClock + 'static>(
async fn get_uninitialized_validators<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
epoch: &Epoch,
local_pubkeys: &HashSet<PublicKeyBytes>,
) -> Vec<u64> {
let attesters = duties_service.attesters.read();
local_pubkeys
.iter()
.filter(|pubkey| {
attesters
.get(pubkey)
.is_none_or(|duties| !duties.contains_key(epoch))
})
.filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey))
.collect::<Vec<_>>()
// Allocate a temporary vec to prevent holding two locks simultaneously (deadlock risk if we use
// a different lock order in another function).
let uninitialized_pubkeys = {
let attesters = duties_service.attesters.read();
local_pubkeys
.iter()
.filter(|pubkey| {
attesters
.get(pubkey)
.is_none_or(|duties| !duties.contains_key(epoch))
})
.collect::<Vec<_>>()
};
let mut uninitialized_indices = Vec::with_capacity(uninitialized_pubkeys.len());
for pubkey in uninitialized_pubkeys {
if let Some(index) = duties_service.validator_store.validator_index(pubkey).await {
uninitialized_indices.push(index);
}
}
uninitialized_indices
}
fn update_per_validator_duty_metrics<S: ValidatorStore, T: SlotClock + 'static>(
async fn update_per_validator_duty_metrics<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
epoch: Epoch,
current_slot: Slot,
) {
if duties_service.per_validator_metrics() {
if duties_service.per_validator_metrics().await {
let attesters = duties_service.attesters.read();
attesters.values().for_each(|attester_duties_by_epoch| {
if let Some((_, duty_and_proof)) = attester_duties_by_epoch.get(&epoch) {
@@ -1309,7 +1337,7 @@ async fn poll_beacon_proposers<S: ValidatorStore, T: SlotClock + 'static>(
// Notify the block proposal service for any proposals that we have in our cache.
//
// See the function-level documentation for more information.
let initial_block_proposers = duties_service.block_proposers(current_slot);
let initial_block_proposers = duties_service.block_proposers(current_slot).await;
notify_block_production_service::<S>(
current_slot,
&initial_block_proposers,
@@ -1324,7 +1352,8 @@ async fn poll_beacon_proposers<S: ValidatorStore, T: SlotClock + 'static>(
// doppelganger finishes.
let local_pubkeys: HashSet<_> = duties_service
.validator_store
.voting_pubkeys(DoppelgangerStatus::ignored);
.voting_pubkeys(DoppelgangerStatus::ignored)
.await;
// Only download duties and push out additional block production events if we have some
// validators.
@@ -1387,6 +1416,7 @@ async fn poll_beacon_proposers<S: ValidatorStore, T: SlotClock + 'static>(
// which were not included in the initial notification to the `BlockService`.
let additional_block_producers = duties_service
.block_proposers(current_slot)
.await
.difference(&initial_block_proposers)
.copied()
.collect::<HashSet<PublicKeyBytes>>();

View File

@@ -254,7 +254,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PreparationService<S,
/// Prepare proposer preparations and send to beacon node
async fn prepare_proposers_and_publish(&self, spec: &ChainSpec) -> Result<(), String> {
let preparation_data = self.collect_preparation_data(spec);
let preparation_data = self.collect_preparation_data(spec).await;
if !preparation_data.is_empty() {
self.publish_preparation_data(preparation_data).await?;
}
@@ -262,7 +262,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PreparationService<S,
Ok(())
}
fn collect_preparation_data(&self, spec: &ChainSpec) -> Vec<ProposerPreparationData> {
async fn collect_preparation_data(&self, spec: &ChainSpec) -> Vec<ProposerPreparationData> {
self.collect_proposal_data(|pubkey, proposal_data| {
if let Some(fee_recipient) = proposal_data.fee_recipient {
Some(ProposerPreparationData {
@@ -281,9 +281,10 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PreparationService<S,
None
}
})
.await
}
fn collect_validator_registration_keys(&self) -> Vec<ValidatorRegistrationKey> {
async fn collect_validator_registration_keys(&self) -> Vec<ValidatorRegistrationKey> {
self.collect_proposal_data(|pubkey, proposal_data| {
// Ignore fee recipients for keys without indices, they are inactive.
proposal_data.validator_index?;
@@ -300,23 +301,27 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PreparationService<S,
})
})
})
.await
}
fn collect_proposal_data<G, U>(&self, map_fn: G) -> Vec<U>
async fn collect_proposal_data<G, U>(&self, map_fn: G) -> Vec<U>
where
G: Fn(PublicKeyBytes, ProposalData) -> Option<U>,
{
let all_pubkeys: Vec<_> = self
.validator_store
.voting_pubkeys(DoppelgangerStatus::ignored);
.voting_pubkeys(DoppelgangerStatus::ignored)
.await;
all_pubkeys
.into_iter()
.filter_map(|pubkey| {
let proposal_data = self.validator_store.proposal_data(&pubkey)?;
map_fn(pubkey, proposal_data)
})
.collect()
let mut results = Vec::with_capacity(all_pubkeys.len());
for pubkey in all_pubkeys {
if let Some(proposal_data) = self.validator_store.proposal_data(&pubkey).await {
if let Some(u) = map_fn(pubkey, proposal_data) {
results.push(u);
}
}
}
results
}
async fn publish_preparation_data(
@@ -349,7 +354,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PreparationService<S,
/// Register validators with builders, used in the blinded block proposal flow.
async fn register_validators(&self) -> Result<(), String> {
let registration_keys = self.collect_validator_registration_keys();
let registration_keys = self.collect_validator_registration_keys().await;
let mut changed_keys = vec![];

View File

@@ -308,13 +308,18 @@ pub async fn poll_sync_committee_duties<S: ValidatorStore + 'static, T: SlotCloc
// protection.
let local_pubkeys: HashSet<_> = duties_service
.validator_store
.voting_pubkeys(DoppelgangerStatus::ignored);
.voting_pubkeys(DoppelgangerStatus::ignored)
.await;
let local_indices = {
let mut local_indices = Vec::with_capacity(local_pubkeys.len());
for &pubkey in &local_pubkeys {
if let Some(validator_index) = duties_service.validator_store.validator_index(&pubkey) {
if let Some(validator_index) = duties_service
.validator_store
.validator_index(&pubkey)
.await
{
local_indices.push(validator_index)
}
}

View File

@@ -46,7 +46,10 @@ pub trait ValidatorStore: Send + Sync {
///
/// - Unknown.
/// - Known, but with an unknown index.
fn validator_index(&self, pubkey: &PublicKeyBytes) -> Option<u64>;
fn validator_index(
&self,
pubkey: &PublicKeyBytes,
) -> impl Future<Output = Option<u64>> + Send + Sync;
/// Returns all voting pubkeys for all enabled validators.
///
@@ -57,23 +60,30 @@ pub trait ValidatorStore: Send + Sync {
/// protection and are safe-enough to sign messages.
/// - `DoppelgangerStatus::ignored`: returns all the pubkeys from `only_safe` *plus* those still
/// undergoing protection. This is useful for collecting duties or other non-signing tasks.
fn voting_pubkeys<I, F>(&self, filter_func: F) -> I
fn voting_pubkeys<I, F>(&self, filter_func: F) -> impl Future<Output = I> + Send + Sync
where
I: FromIterator<PublicKeyBytes>,
F: Fn(DoppelgangerStatus) -> Option<PublicKeyBytes>;
F: Fn(DoppelgangerStatus) -> Option<PublicKeyBytes> + Send + Sync;
/// Check if the `validator_pubkey` is permitted by the doppleganger protection to sign
/// messages.
fn doppelganger_protection_allows_signing(&self, validator_pubkey: PublicKeyBytes) -> bool;
fn num_voting_validators(&self) -> usize;
fn graffiti(&self, validator_pubkey: &PublicKeyBytes) -> Option<Graffiti>;
fn num_voting_validators(&self) -> impl Future<Output = usize> + Send + Sync;
fn graffiti(
&self,
validator_pubkey: &PublicKeyBytes,
) -> impl Future<Output = Option<Graffiti>> + Send + Sync;
/// Returns the fee recipient for the given public key. The priority order for fetching
/// the fee recipient is:
/// 1. validator_definitions.yml
/// 2. process level fee recipient
fn get_fee_recipient(&self, validator_pubkey: &PublicKeyBytes) -> Option<Address>;
fn get_fee_recipient(
&self,
validator_pubkey: &PublicKeyBytes,
) -> impl Future<Output = Option<Address>> + Send + Sync;
/// Translate the `builder_proposals`, `builder_boost_factor` and
/// `prefer_builder_proposals` to a boost factor, if available.
@@ -83,7 +93,10 @@ pub trait ValidatorStore: Send + Sync {
/// - If `builder_proposals` is set to false, set boost factor to 0 to indicate a preference for
/// local payloads.
/// - Else return `None` to indicate no preference between builder and local payloads.
fn determine_builder_boost_factor(&self, validator_pubkey: &PublicKeyBytes) -> Option<u64>;
fn determine_builder_boost_factor(
&self,
validator_pubkey: &PublicKeyBytes,
) -> impl Future<Output = Option<u64>> + Send + Sync;
fn randao_reveal(
&self,
@@ -91,7 +104,11 @@ pub trait ValidatorStore: Send + Sync {
signing_epoch: Epoch,
) -> impl Future<Output = Result<Signature, Error<Self::Error>>> + Send;
fn set_validator_index(&self, validator_pubkey: &PublicKeyBytes, index: u64);
fn set_validator_index(
&self,
validator_pubkey: &PublicKeyBytes,
index: u64,
) -> impl Future<Output = ()> + Send + Sync;
fn sign_block(
&self,
@@ -162,12 +179,19 @@ pub trait ValidatorStore: Send + Sync {
/// This function will only do actual pruning periodically, so it should usually be
/// cheap to call. The `first_run` flag can be used to print a more verbose message when pruning
/// runs.
fn prune_slashing_protection_db(&self, current_epoch: Epoch, first_run: bool);
fn prune_slashing_protection_db(
&self,
current_epoch: Epoch,
first_run: bool,
) -> impl Future<Output = ()> + Send + Sync;
/// Returns `ProposalData` for the provided `pubkey` if it exists in `InitializedValidators`.
/// `ProposalData` fields include defaulting logic described in `get_fee_recipient_defaulting`,
/// `get_gas_limit_defaulting`, and `get_builder_proposals_defaulting`.
fn proposal_data(&self, pubkey: &PublicKeyBytes) -> Option<ProposalData>;
fn proposal_data(
&self,
pubkey: &PublicKeyBytes,
) -> impl Future<Output = Option<ProposalData>> + Send + Sync;
}
#[derive(Clone, Debug, PartialEq)]

View File

@@ -964,7 +964,7 @@ mod test {
// helps test that we don't delete a password file if it's in use by
// another validator.
if let Some(primary_index) = self.reuse_password_files {
let mut initialized_validators = src_vc.initialized_validators.write();
let mut initialized_validators = src_vc.initialized_validators.write().await;
let definitions = initialized_validators.as_mut_slice_testing_only();
// Find the path of the "primary" definition.
let primary_path = definitions
@@ -1001,6 +1001,7 @@ mod test {
let passwords = src_vc
.initialized_validators
.write()
.await
.delete_passwords_from_validator_definitions()
.unwrap();