Modularize validator store (#6705)

- Create trait `ValidatorStore` with all functions used by the `validator_services`
- Make `validator_services` generic on `S: ValidatorStore`
- Introduce `LighthouseValidatorStore`, which has identical functionality to the old `ValidatorStore`
- Remove dependencies (especially `environment`) from `validator_services` and `beacon_node_fallback` in order to be able to cleanly use them in Anchor
This commit is contained in:
Daniel Knopik
2025-05-07 05:43:33 +02:00
committed by GitHub
parent beb0ce68bd
commit 3d92e3663b
42 changed files with 2010 additions and 1622 deletions

49
Cargo.lock generated
View File

@@ -879,14 +879,13 @@ name = "beacon_node_fallback"
version = "0.1.0"
dependencies = [
"clap",
"environment",
"eth2",
"futures",
"itertools 0.10.5",
"logging",
"serde",
"slot_clock",
"strum",
"task_executor",
"tokio",
"tracing",
"types",
@@ -2364,6 +2363,7 @@ dependencies = [
"tokio",
"tracing",
"types",
"validator_store",
]
[[package]]
@@ -5635,6 +5635,32 @@ dependencies = [
"unused_port",
]
[[package]]
name = "lighthouse_validator_store"
version = "0.1.0"
dependencies = [
"account_utils",
"beacon_node_fallback",
"doppelganger_service",
"either",
"environment",
"eth2",
"futures",
"initialized_validators",
"logging",
"parking_lot 0.12.3",
"serde",
"signing_method",
"slashing_protection",
"slot_clock",
"task_executor",
"tokio",
"tracing",
"types",
"validator_metrics",
"validator_store",
]
[[package]]
name = "lighthouse_version"
version = "0.1.0"
@@ -9667,6 +9693,7 @@ dependencies = [
"graffiti_file",
"hyper 1.6.0",
"initialized_validators",
"lighthouse_validator_store",
"metrics",
"monitoring_api",
"parking_lot 0.12.3",
@@ -9722,6 +9749,7 @@ dependencies = [
"health_metrics",
"initialized_validators",
"itertools 0.10.5",
"lighthouse_validator_store",
"lighthouse_version",
"logging",
"parking_lot 0.12.3",
@@ -9754,6 +9782,7 @@ name = "validator_http_metrics"
version = "0.1.0"
dependencies = [
"health_metrics",
"lighthouse_validator_store",
"lighthouse_version",
"logging",
"malloc_utils",
@@ -9765,7 +9794,6 @@ dependencies = [
"types",
"validator_metrics",
"validator_services",
"validator_store",
"warp",
"warp_utils",
]
@@ -9808,9 +9836,7 @@ version = "0.1.0"
dependencies = [
"beacon_node_fallback",
"bls",
"doppelganger_service",
"either",
"environment",
"eth2",
"futures",
"graffiti_file",
@@ -9818,6 +9844,7 @@ dependencies = [
"parking_lot 0.12.3",
"safe_arith",
"slot_clock",
"task_executor",
"tokio",
"tracing",
"tree_hash",
@@ -9830,19 +9857,8 @@ dependencies = [
name = "validator_store"
version = "0.1.0"
dependencies = [
"account_utils",
"doppelganger_service",
"initialized_validators",
"logging",
"parking_lot 0.12.3",
"serde",
"signing_method",
"slashing_protection",
"slot_clock",
"task_executor",
"tracing",
"types",
"validator_metrics",
]
[[package]]
@@ -10100,6 +10116,7 @@ dependencies = [
"eth2_network_config",
"futures",
"initialized_validators",
"lighthouse_validator_store",
"logging",
"parking_lot 0.12.3",
"reqwest",

View File

@@ -96,11 +96,11 @@ members = [
"validator_client/http_api",
"validator_client/http_metrics",
"validator_client/initialized_validators",
"validator_client/lighthouse_validator_store",
"validator_client/signing_method",
"validator_client/slashing_protection",
"validator_client/validator_metrics",
"validator_client/validator_services",
"validator_client/validator_store",
"validator_manager",
]
@@ -228,7 +228,6 @@ compare_fields = { path = "common/compare_fields" }
deposit_contract = { path = "common/deposit_contract" }
directory = { path = "common/directory" }
doppelganger_service = { path = "validator_client/doppelganger_service" }
validator_services = { path = "validator_client/validator_services" }
environment = { path = "lighthouse/environment" }
eth1 = { path = "beacon_node/eth1" }
eth1_test_rig = { path = "testing/eth1_test_rig" }
@@ -250,6 +249,7 @@ int_to_bytes = { path = "consensus/int_to_bytes" }
kzg = { path = "crypto/kzg" }
metrics = { path = "common/metrics" }
lighthouse_network = { path = "beacon_node/lighthouse_network" }
lighthouse_validator_store = { path = "validator_client/lighthouse_validator_store" }
lighthouse_version = { path = "common/lighthouse_version" }
workspace_members = { path = "common/workspace_members" }
lockfile = { path = "common/lockfile" }
@@ -281,6 +281,7 @@ validator_dir = { path = "common/validator_dir" }
validator_http_api = { path = "validator_client/http_api" }
validator_http_metrics = { path = "validator_client/http_metrics" }
validator_metrics = { path = "validator_client/validator_metrics" }
validator_services = { path = "validator_client/validator_services" }
validator_store = { path = "validator_client/validator_store" }
validator_test_rig = { path = "testing/validator_test_rig" }
warp_utils = { path = "common/warp_utils" }

View File

@@ -16,7 +16,7 @@ use super::{
Signature, SignedRoot,
};
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub enum Error {
SszTypesError(ssz_types::Error),
BitfieldError(ssz::BitfieldError),

View File

@@ -85,6 +85,7 @@ pub trait AbstractExecPayload<E: EthSpec>:
+ TryInto<Self::Deneb>
+ TryInto<Self::Electra>
+ TryInto<Self::Fulu>
+ Sync
{
type Ref<'a>: ExecPayload<E>
+ Copy
@@ -97,23 +98,28 @@ pub trait AbstractExecPayload<E: EthSpec>:
type Bellatrix: OwnedExecPayload<E>
+ Into<Self>
+ for<'a> From<Cow<'a, ExecutionPayloadBellatrix<E>>>
+ TryFrom<ExecutionPayloadHeaderBellatrix<E>>;
+ TryFrom<ExecutionPayloadHeaderBellatrix<E>>
+ Sync;
type Capella: OwnedExecPayload<E>
+ Into<Self>
+ for<'a> From<Cow<'a, ExecutionPayloadCapella<E>>>
+ TryFrom<ExecutionPayloadHeaderCapella<E>>;
+ TryFrom<ExecutionPayloadHeaderCapella<E>>
+ Sync;
type Deneb: OwnedExecPayload<E>
+ Into<Self>
+ for<'a> From<Cow<'a, ExecutionPayloadDeneb<E>>>
+ TryFrom<ExecutionPayloadHeaderDeneb<E>>;
+ TryFrom<ExecutionPayloadHeaderDeneb<E>>
+ Sync;
type Electra: OwnedExecPayload<E>
+ Into<Self>
+ for<'a> From<Cow<'a, ExecutionPayloadElectra<E>>>
+ TryFrom<ExecutionPayloadHeaderElectra<E>>;
+ TryFrom<ExecutionPayloadHeaderElectra<E>>
+ Sync;
type Fulu: OwnedExecPayload<E>
+ Into<Self>
+ for<'a> From<Cow<'a, ExecutionPayloadFulu<E>>>
+ TryFrom<ExecutionPayloadHeaderFulu<E>>;
+ TryFrom<ExecutionPayloadHeaderFulu<E>>
+ Sync;
}
#[superstruct(

View File

@@ -14,6 +14,7 @@ eth2_keystore = { workspace = true }
eth2_network_config = { workspace = true }
futures = { workspace = true }
initialized_validators = { workspace = true }
lighthouse_validator_store = { workspace = true }
logging = { workspace = true }
parking_lot = { workspace = true }
reqwest = { workspace = true }

View File

@@ -25,6 +25,7 @@ mod tests {
use initialized_validators::{
load_pem_certificate, load_pkcs12_identity, InitializedValidators,
};
use lighthouse_validator_store::LighthouseValidatorStore;
use parking_lot::Mutex;
use reqwest::Client;
use serde::Serialize;
@@ -44,7 +45,7 @@ mod tests {
use tokio::time::sleep;
use types::{attestation::AttestationBase, *};
use url::Url;
use validator_store::{Error as ValidatorStoreError, ValidatorStore};
use validator_store::{Error as ValidatorStoreError, SignedBlock, ValidatorStore};
/// If the we are unable to reach the Web3Signer HTTP API within this time out then we will
/// assume it failed to start.
@@ -73,6 +74,7 @@ mod tests {
impl SignedObject for Signature {}
impl SignedObject for Attestation<E> {}
impl SignedObject for SignedBeaconBlock<E> {}
impl SignedObject for SignedBlock<E> {}
impl SignedObject for SignedAggregateAndProof<E> {}
impl SignedObject for SelectionProof {}
impl SignedObject for SyncSelectionProof {}
@@ -301,7 +303,7 @@ mod tests {
/// A testing rig which holds a `ValidatorStore`.
struct ValidatorStoreRig {
validator_store: Arc<ValidatorStore<TestingSlotClock, E>>,
validator_store: Arc<LighthouseValidatorStore<TestingSlotClock, E>>,
_validator_dir: TempDir,
runtime: Arc<tokio::runtime::Runtime>,
_runtime_shutdown: async_channel::Sender<()>,
@@ -352,12 +354,12 @@ mod tests {
let slot_clock =
TestingSlotClock::new(Slot::new(0), Duration::from_secs(0), Duration::from_secs(1));
let config = validator_store::Config {
let config = lighthouse_validator_store::Config {
enable_web3signer_slashing_protection: slashing_protection_config.local,
..Default::default()
};
let validator_store = ValidatorStore::<_, E>::new(
let validator_store = LighthouseValidatorStore::<_, E>::new(
initialized_validators,
slashing_protection,
Hash256::repeat_byte(42),
@@ -481,7 +483,7 @@ mod tests {
generate_sig: F,
) -> Self
where
F: Fn(PublicKeyBytes, Arc<ValidatorStore<TestingSlotClock, E>>) -> R,
F: Fn(PublicKeyBytes, Arc<LighthouseValidatorStore<TestingSlotClock, E>>) -> R,
R: Future<Output = S>,
// We use the `SignedObject` trait to white-list objects for comparison. This avoids
// accidentally comparing something meaningless like a `()`.
@@ -516,8 +518,8 @@ mod tests {
web3signer_should_sign: bool,
) -> Self
where
F: Fn(PublicKeyBytes, Arc<ValidatorStore<TestingSlotClock, E>>) -> R,
R: Future<Output = Result<(), ValidatorStoreError>>,
F: Fn(PublicKeyBytes, Arc<LighthouseValidatorStore<TestingSlotClock, E>>) -> R,
R: Future<Output = Result<(), lighthouse_validator_store::Error>>,
{
for validator_rig in &self.validator_rigs {
let result =
@@ -591,10 +593,10 @@ mod tests {
.assert_signatures_match("beacon_block_base", |pubkey, validator_store| {
let spec = spec.clone();
async move {
let block = BeaconBlock::Base(BeaconBlockBase::empty(&spec));
let block = BeaconBlock::<E>::Base(BeaconBlockBase::empty(&spec));
let block_slot = block.slot();
validator_store
.sign_block(pubkey, block, block_slot)
.sign_block(pubkey, block.into(), block_slot)
.await
.unwrap()
}
@@ -664,7 +666,11 @@ mod tests {
let mut altair_block = BeaconBlockAltair::empty(&spec);
altair_block.slot = altair_fork_slot;
validator_store
.sign_block(pubkey, BeaconBlock::Altair(altair_block), altair_fork_slot)
.sign_block(
pubkey,
BeaconBlock::<E>::Altair(altair_block).into(),
altair_fork_slot,
)
.await
.unwrap()
}
@@ -749,7 +755,7 @@ mod tests {
validator_store
.sign_block(
pubkey,
BeaconBlock::Bellatrix(bellatrix_block),
BeaconBlock::<E>::Bellatrix(bellatrix_block).into(),
bellatrix_fork_slot,
)
.await
@@ -805,7 +811,7 @@ mod tests {
};
let first_block = || {
let mut bellatrix_block = BeaconBlockBellatrix::empty(&spec);
let mut bellatrix_block = BeaconBlockBellatrix::<E>::empty(&spec);
bellatrix_block.slot = bellatrix_fork_slot;
BeaconBlock::Bellatrix(bellatrix_block)
};
@@ -871,7 +877,7 @@ mod tests {
let block = first_block();
let slot = block.slot();
validator_store
.sign_block(pubkey, block, slot)
.sign_block(pubkey, block.into(), slot)
.await
.unwrap()
})
@@ -882,7 +888,7 @@ mod tests {
let block = double_vote_block();
let slot = block.slot();
validator_store
.sign_block(pubkey, block, slot)
.sign_block(pubkey, block.into(), slot)
.await
.map(|_| ())
},

View File

@@ -22,6 +22,7 @@ fdlimit = "0.3.0"
graffiti_file = { workspace = true }
hyper = { workspace = true }
initialized_validators = { workspace = true }
lighthouse_validator_store = { workspace = true }
metrics = { workspace = true }
monitoring_api = { workspace = true }
parking_lot = { workspace = true }

View File

@@ -10,18 +10,17 @@ path = "src/lib.rs"
[dependencies]
clap = { workspace = true }
environment = { workspace = true }
eth2 = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
serde = { workspace = true }
slot_clock = { workspace = true }
strum = { workspace = true }
task_executor = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
types = { workspace = true }
validator_metrics = { workspace = true }
[dev-dependencies]
logging = { workspace = true }
validator_test_rig = { workspace = true }

View File

@@ -8,7 +8,6 @@ use beacon_node_health::{
IsOptimistic, SyncDistanceTier,
};
use clap::ValueEnum;
use environment::RuntimeContext;
use eth2::BeaconNodeHttpClient;
use futures::future;
use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer};
@@ -17,11 +16,11 @@ use std::cmp::Ordering;
use std::fmt;
use std::fmt::Debug;
use std::future::Future;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::vec::Vec;
use strum::EnumVariantNames;
use task_executor::TaskExecutor;
use tokio::{sync::RwLock, time::sleep};
use tracing::{debug, error, warn};
use types::{ChainSpec, Config as ConfigSpec, EthSpec, Slot};
@@ -61,17 +60,16 @@ pub struct LatencyMeasurement {
///
/// See `SLOT_LOOKAHEAD` for information about when this should run.
pub fn start_fallback_updater_service<T: SlotClock + 'static, E: EthSpec>(
context: RuntimeContext<E>,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
executor: TaskExecutor,
beacon_nodes: Arc<BeaconNodeFallback<T>>,
) -> Result<(), &'static str> {
let executor = context.executor;
if beacon_nodes.slot_clock.is_none() {
return Err("Cannot start fallback updater without slot clock");
}
let future = async move {
loop {
beacon_nodes.update_all_candidates().await;
beacon_nodes.update_all_candidates::<E>().await;
let sleep_time = beacon_nodes
.slot_clock
@@ -186,29 +184,27 @@ impl Serialize for CandidateInfo {
/// Represents a `BeaconNodeHttpClient` inside a `BeaconNodeFallback` that may or may not be used
/// for a query.
#[derive(Clone, Debug)]
pub struct CandidateBeaconNode<E> {
pub struct CandidateBeaconNode {
pub index: usize,
pub beacon_node: BeaconNodeHttpClient,
pub health: Arc<RwLock<Result<BeaconNodeHealth, CandidateError>>>,
_phantom: PhantomData<E>,
}
impl<E: EthSpec> PartialEq for CandidateBeaconNode<E> {
impl PartialEq for CandidateBeaconNode {
fn eq(&self, other: &Self) -> bool {
self.index == other.index && self.beacon_node == other.beacon_node
}
}
impl<E: EthSpec> Eq for CandidateBeaconNode<E> {}
impl Eq for CandidateBeaconNode {}
impl<E: EthSpec> CandidateBeaconNode<E> {
impl CandidateBeaconNode {
/// Instantiate a new node.
pub fn new(beacon_node: BeaconNodeHttpClient, index: usize) -> Self {
Self {
index,
beacon_node,
health: Arc::new(RwLock::new(Err(CandidateError::Uninitialized))),
_phantom: PhantomData,
}
}
@@ -217,13 +213,13 @@ impl<E: EthSpec> CandidateBeaconNode<E> {
*self.health.read().await
}
pub async fn refresh_health<T: SlotClock>(
pub async fn refresh_health<E: EthSpec, T: SlotClock>(
&self,
distance_tiers: &BeaconNodeSyncDistanceTiers,
slot_clock: Option<&T>,
spec: &ChainSpec,
) -> Result<(), CandidateError> {
if let Err(e) = self.is_compatible(spec).await {
if let Err(e) = self.is_compatible::<E>(spec).await {
*self.health.write().await = Err(e);
return Err(e);
}
@@ -287,7 +283,7 @@ impl<E: EthSpec> CandidateBeaconNode<E> {
}
/// Checks if the node has the correct specification.
async fn is_compatible(&self, spec: &ChainSpec) -> Result<(), CandidateError> {
async fn is_compatible<E: EthSpec>(&self, spec: &ChainSpec) -> Result<(), CandidateError> {
let config = self
.beacon_node
.get_config_spec::<ConfigSpec>()
@@ -372,17 +368,17 @@ impl<E: EthSpec> CandidateBeaconNode<E> {
/// behaviour, where the failure of one candidate results in the next candidate receiving an
/// identical query.
#[derive(Clone, Debug)]
pub struct BeaconNodeFallback<T, E> {
pub candidates: Arc<RwLock<Vec<CandidateBeaconNode<E>>>>,
pub struct BeaconNodeFallback<T> {
pub candidates: Arc<RwLock<Vec<CandidateBeaconNode>>>,
distance_tiers: BeaconNodeSyncDistanceTiers,
slot_clock: Option<T>,
broadcast_topics: Vec<ApiTopic>,
spec: Arc<ChainSpec>,
}
impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
impl<T: SlotClock> BeaconNodeFallback<T> {
pub fn new(
candidates: Vec<CandidateBeaconNode<E>>,
candidates: Vec<CandidateBeaconNode>,
config: Config,
broadcast_topics: Vec<ApiTopic>,
spec: Arc<ChainSpec>,
@@ -464,7 +460,7 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
/// It is possible for a node to return an unsynced status while continuing to serve
/// low quality responses. To route around this it's best to poll all connected beacon nodes.
/// A previous implementation of this function polled only the unavailable BNs.
pub async fn update_all_candidates(&self) {
pub async fn update_all_candidates<E: EthSpec>(&self) {
// Clone the vec, so we release the read lock immediately.
// `candidate.health` is behind an Arc<RwLock>, so this would still allow us to mutate the values.
let candidates = self.candidates.read().await.clone();
@@ -472,7 +468,7 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
let mut nodes = Vec::with_capacity(candidates.len());
for candidate in candidates.iter() {
futures.push(candidate.refresh_health(
futures.push(candidate.refresh_health::<E, T>(
&self.distance_tiers,
self.slot_clock.as_ref(),
&self.spec,
@@ -675,7 +671,7 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
}
/// Helper functions to allow sorting candidate nodes by health.
async fn sort_nodes_by_health<E: EthSpec>(nodes: &mut Vec<CandidateBeaconNode<E>>) {
async fn sort_nodes_by_health(nodes: &mut Vec<CandidateBeaconNode>) {
// Fetch all health values.
let health_results: Vec<Result<BeaconNodeHealth, CandidateError>> =
future::join_all(nodes.iter().map(|node| node.health())).await;
@@ -693,7 +689,7 @@ async fn sort_nodes_by_health<E: EthSpec>(nodes: &mut Vec<CandidateBeaconNode<E>
});
// Reorder candidates based on the sorted indices.
let sorted_nodes: Vec<CandidateBeaconNode<E>> = indices_with_health
let sorted_nodes: Vec<CandidateBeaconNode> = indices_with_health
.into_iter()
.map(|(index, _)| nodes[index].clone())
.collect();
@@ -752,7 +748,7 @@ mod tests {
let optimistic_status = IsOptimistic::No;
let execution_status = ExecutionEngineHealth::Healthy;
fn new_candidate(index: usize) -> CandidateBeaconNode<E> {
fn new_candidate(index: usize) -> CandidateBeaconNode {
let beacon_node = BeaconNodeHttpClient::new(
SensitiveUrl::parse(&format!("http://example_{index}.com")).unwrap(),
Timeouts::set_all(Duration::from_secs(index as u64)),
@@ -859,21 +855,21 @@ mod tests {
async fn new_mock_beacon_node(
index: usize,
spec: &ChainSpec,
) -> (MockBeaconNode<E>, CandidateBeaconNode<E>) {
) -> (MockBeaconNode<E>, CandidateBeaconNode) {
let mut mock_beacon_node = MockBeaconNode::<E>::new().await;
mock_beacon_node.mock_config_spec(spec);
let beacon_node =
CandidateBeaconNode::<E>::new(mock_beacon_node.beacon_api_client.clone(), index);
CandidateBeaconNode::new(mock_beacon_node.beacon_api_client.clone(), index);
(mock_beacon_node, beacon_node)
}
fn create_beacon_node_fallback(
candidates: Vec<CandidateBeaconNode<E>>,
candidates: Vec<CandidateBeaconNode>,
topics: Vec<ApiTopic>,
spec: Arc<ChainSpec>,
) -> BeaconNodeFallback<TestingSlotClock, E> {
) -> BeaconNodeFallback<TestingSlotClock> {
let mut beacon_node_fallback =
BeaconNodeFallback::new(candidates, Config::default(), topics, spec);
@@ -929,7 +925,7 @@ mod tests {
sync_distance: Slot::new(0),
});
beacon_node_fallback.update_all_candidates().await;
beacon_node_fallback.update_all_candidates::<E>().await;
let candidates = beacon_node_fallback.candidates.read().await;
assert_eq!(

View File

@@ -15,6 +15,7 @@ task_executor = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
types = { workspace = true }
validator_store = { workspace = true }
[dev-dependencies]
futures = { workspace = true }

View File

@@ -42,68 +42,7 @@ use task_executor::ShutdownReason;
use tokio::time::sleep;
use tracing::{error, info};
use types::{Epoch, EthSpec, PublicKeyBytes, Slot};
/// A wrapper around `PublicKeyBytes` which encodes information about the status of a validator
/// pubkey with regards to doppelganger protection.
#[derive(Debug, PartialEq)]
pub enum DoppelgangerStatus {
/// Doppelganger protection has approved this for signing.
///
/// This is because the service has waited some period of time to
/// detect other instances of this key on the network.
SigningEnabled(PublicKeyBytes),
/// Doppelganger protection is still waiting to detect other instances.
///
/// Do not use this pubkey for signing slashable messages!!
///
/// However, it can safely be used for other non-slashable operations (e.g., collecting duties
/// or subscribing to subnets).
SigningDisabled(PublicKeyBytes),
/// This pubkey is unknown to the doppelganger service.
///
/// This represents a serious internal error in the program. This validator will be permanently
/// disabled!
UnknownToDoppelganger(PublicKeyBytes),
}
impl DoppelgangerStatus {
/// Only return a pubkey if it is explicitly safe for doppelganger protection.
///
/// If `Some(pubkey)` is returned, doppelganger has declared it safe for signing.
///
/// ## Note
///
/// "Safe" is only best-effort by doppelganger. There is no guarantee that a doppelganger
/// doesn't exist.
pub fn only_safe(self) -> Option<PublicKeyBytes> {
match self {
DoppelgangerStatus::SigningEnabled(pubkey) => Some(pubkey),
DoppelgangerStatus::SigningDisabled(_) => None,
DoppelgangerStatus::UnknownToDoppelganger(_) => None,
}
}
/// Returns a key regardless of whether or not doppelganger has approved it. Such a key might be
/// used for signing non-slashable messages, duties collection or other activities.
///
/// If the validator is unknown to doppelganger then `None` will be returned.
pub fn ignored(self) -> Option<PublicKeyBytes> {
match self {
DoppelgangerStatus::SigningEnabled(pubkey) => Some(pubkey),
DoppelgangerStatus::SigningDisabled(pubkey) => Some(pubkey),
DoppelgangerStatus::UnknownToDoppelganger(_) => None,
}
}
/// Only return a pubkey if it will not be used for signing due to doppelganger detection.
pub fn only_unsafe(self) -> Option<PublicKeyBytes> {
match self {
DoppelgangerStatus::SigningEnabled(_) => None,
DoppelgangerStatus::SigningDisabled(pubkey) => Some(pubkey),
DoppelgangerStatus::UnknownToDoppelganger(pubkey) => Some(pubkey),
}
}
}
use validator_store::{DoppelgangerStatus, ValidatorStore};
struct LivenessResponses {
current_epoch_responses: Vec<LivenessResponseData>,
@@ -114,13 +53,6 @@ struct LivenessResponses {
/// validators on the network.
pub const DEFAULT_REMAINING_DETECTION_EPOCHS: u64 = 1;
/// This crate cannot depend on ValidatorStore as validator_store depends on this crate and
/// initialises the doppelganger protection. For this reason, we abstract the validator store
/// functions this service needs through the following trait
pub trait DoppelgangerValidatorStore {
fn get_validator_index(&self, pubkey: &PublicKeyBytes) -> Option<u64>;
}
/// Store the per-validator status of doppelganger checking.
#[derive(Debug, PartialEq)]
pub struct DoppelgangerState {
@@ -163,8 +95,8 @@ impl DoppelgangerState {
/// If the BN fails to respond to either of these requests, simply return an empty response.
/// This behaviour is to help prevent spurious failures on the BN from needlessly preventing
/// doppelganger progression.
async fn beacon_node_liveness<T: 'static + SlotClock, E: EthSpec>(
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
async fn beacon_node_liveness<T: 'static + SlotClock>(
beacon_nodes: Arc<BeaconNodeFallback<T>>,
current_epoch: Epoch,
validator_indices: Vec<u64>,
) -> LivenessResponses {
@@ -280,20 +212,20 @@ impl DoppelgangerService {
service: Arc<Self>,
context: RuntimeContext<E>,
validator_store: Arc<V>,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
beacon_nodes: Arc<BeaconNodeFallback<T>>,
slot_clock: T,
) -> Result<(), String>
where
E: EthSpec,
T: 'static + SlotClock,
V: DoppelgangerValidatorStore + Send + Sync + 'static,
V: ValidatorStore<E = E> + Send + Sync + 'static,
{
// Define the `get_index` function as one that uses the validator store.
let get_index = move |pubkey| validator_store.get_validator_index(&pubkey);
let get_index = move |pubkey| validator_store.validator_index(&pubkey);
// Define the `get_liveness` function as one that queries the beacon node API.
let get_liveness = move |current_epoch, validator_indices| {
beacon_node_liveness(beacon_nodes.clone(), current_epoch, validator_indices)
beacon_node_liveness::<T>(beacon_nodes.clone(), current_epoch, validator_indices)
};
let mut shutdown_sender = context.executor.shutdown_sender();
@@ -378,17 +310,18 @@ impl DoppelgangerService {
///
/// Validators added during the genesis epoch will not have doppelganger protection applied to
/// them.
pub fn register_new_validator<E: EthSpec, T: SlotClock>(
pub fn register_new_validator<T: SlotClock>(
&self,
validator: PublicKeyBytes,
slot_clock: &T,
slots_per_epoch: u64,
) -> Result<(), String> {
let current_epoch = slot_clock
// If registering before genesis, use the genesis slot.
.now_or_genesis()
.ok_or_else(|| "Unable to read slot clock when registering validator".to_string())?
.epoch(E::slots_per_epoch());
let genesis_epoch = slot_clock.genesis_slot().epoch(E::slots_per_epoch());
.epoch(slots_per_epoch);
let genesis_epoch = slot_clock.genesis_slot().epoch(slots_per_epoch);
let remaining_epochs = if current_epoch <= genesis_epoch {
// Disable doppelganger protection when the validator was initialized before genesis.
@@ -673,6 +606,7 @@ mod test {
test_utils::{SeedableRng, TestRandom, XorShiftRng},
MainnetEthSpec,
};
use validator_store::DoppelgangerStatus;
const DEFAULT_VALIDATORS: usize = 8;
@@ -773,7 +707,7 @@ mod test {
.expect("index should exist");
self.doppelganger
.register_new_validator::<E, _>(pubkey, &self.slot_clock)
.register_new_validator(pubkey, &self.slot_clock, E::slots_per_epoch())
.unwrap();
self.doppelganger
.doppelganger_states

View File

@@ -16,13 +16,14 @@ deposit_contract = { workspace = true }
directory = { workspace = true }
dirs = { workspace = true }
doppelganger_service = { workspace = true }
eth2 = { workspace = true }
eth2_keystore = { workspace = true }
eth2 = { workspace = true }
eth2_keystore = { workspace = true }
ethereum_serde_utils = { workspace = true }
filesystem = { workspace = true }
graffiti_file = { workspace = true }
health_metrics = { workspace = true }
initialized_validators = { workspace = true }
lighthouse_validator_store = { workspace = true }
lighthouse_version = { workspace = true }
logging = { workspace = true }
parking_lot = { workspace = true }
@@ -32,19 +33,19 @@ serde = { workspace = true }
serde_json = { workspace = true }
signing_method = { workspace = true }
slashing_protection = { workspace = true }
slot_clock = { workspace = true }
sysinfo = { workspace = true }
system_health = { workspace = true }
task_executor = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tracing = { workspace = true }
types = { workspace = true }
url = { workspace = true }
validator_dir = { workspace = true }
validator_services = { workspace = true }
validator_store = { workspace = true }
slot_clock = { workspace = true }
sysinfo = { workspace = true }
system_health = { workspace = true }
task_executor = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tracing = { workspace = true }
types = { workspace = true }
url = { workspace = true }
validator_dir = { workspace = true }
validator_services = { workspace = true }
validator_store = { workspace = true }
warp = { workspace = true }
warp_utils = { workspace = true }
zeroize = { workspace = true }

View File

@@ -1,5 +1,6 @@
use bls::{PublicKey, PublicKeyBytes};
use eth2::types::GenericResponse;
use lighthouse_validator_store::LighthouseValidatorStore;
use slot_clock::SlotClock;
use std::sync::Arc;
use tracing::info;
@@ -9,7 +10,7 @@ use validator_store::ValidatorStore;
pub async fn create_signed_voluntary_exit<T: 'static + SlotClock + Clone, E: EthSpec>(
pubkey: PublicKey,
maybe_epoch: Option<Epoch>,
validator_store: Arc<ValidatorStore<T, E>>,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
slot_clock: T,
) -> Result<GenericResponse<SignedVoluntaryExit>, warp::Rejection> {
let epoch = match maybe_epoch {

View File

@@ -5,12 +5,11 @@ use account_utils::{
random_mnemonic, random_password,
};
use eth2::lighthouse_vc::types::{self as api_types};
use lighthouse_validator_store::LighthouseValidatorStore;
use slot_clock::SlotClock;
use std::path::{Path, PathBuf};
use types::ChainSpec;
use types::EthSpec;
use types::{ChainSpec, EthSpec};
use validator_dir::{keystore_password_path, Builder as ValidatorDirBuilder};
use validator_store::ValidatorStore;
use zeroize::Zeroizing;
/// Create some validator EIP-2335 keystores and store them on disk. Then, enroll the validators in
@@ -30,7 +29,7 @@ pub async fn create_validators_mnemonic<P: AsRef<Path>, T: 'static + SlotClock,
validator_requests: &[api_types::ValidatorRequest],
validator_dir: P,
secrets_dir: Option<PathBuf>,
validator_store: &ValidatorStore<T, E>,
validator_store: &LighthouseValidatorStore<T, E>,
spec: &ChainSpec,
) -> Result<(Vec<api_types::CreatedValidator>, Mnemonic), warp::Rejection> {
let mnemonic = mnemonic_opt.unwrap_or_else(random_mnemonic);
@@ -178,7 +177,7 @@ pub async fn create_validators_mnemonic<P: AsRef<Path>, T: 'static + SlotClock,
pub async fn create_validators_web3signer<T: 'static + SlotClock, E: EthSpec>(
validators: Vec<ValidatorDefinition>,
validator_store: &ValidatorStore<T, E>,
validator_store: &LighthouseValidatorStore<T, E>,
) -> Result<(), warp::Rejection> {
for validator in validators {
validator_store

View File

@@ -1,12 +1,12 @@
use bls::PublicKey;
use lighthouse_validator_store::LighthouseValidatorStore;
use slot_clock::SlotClock;
use std::sync::Arc;
use types::{graffiti::GraffitiString, EthSpec, Graffiti};
use validator_store::ValidatorStore;
pub fn get_graffiti<T: 'static + SlotClock + Clone, E: EthSpec>(
validator_pubkey: PublicKey,
validator_store: Arc<ValidatorStore<T, E>>,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
graffiti_flag: Option<Graffiti>,
) -> Result<Graffiti, warp::Rejection> {
let initialized_validators_rw_lock = validator_store.initialized_validators();
@@ -29,7 +29,7 @@ pub fn get_graffiti<T: 'static + SlotClock + Clone, E: EthSpec>(
pub fn set_graffiti<T: 'static + SlotClock + Clone, E: EthSpec>(
validator_pubkey: PublicKey,
graffiti: GraffitiString,
validator_store: Arc<ValidatorStore<T, E>>,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
) -> Result<(), warp::Rejection> {
let initialized_validators_rw_lock = validator_store.initialized_validators();
let mut initialized_validators = initialized_validators_rw_lock.write();
@@ -55,7 +55,7 @@ pub fn set_graffiti<T: 'static + SlotClock + Clone, E: EthSpec>(
pub fn delete_graffiti<T: 'static + SlotClock + Clone, E: EthSpec>(
validator_pubkey: PublicKey,
validator_store: Arc<ValidatorStore<T, E>>,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
) -> Result<(), warp::Rejection> {
let initialized_validators_rw_lock = validator_store.initialized_validators();
let mut initialized_validators = initialized_validators_rw_lock.write();

View File

@@ -10,6 +10,7 @@ use eth2::lighthouse_vc::{
};
use eth2_keystore::Keystore;
use initialized_validators::{Error, InitializedValidators};
use lighthouse_validator_store::LighthouseValidatorStore;
use signing_method::SigningMethod;
use slot_clock::SlotClock;
use std::path::PathBuf;
@@ -19,13 +20,12 @@ use tokio::runtime::Handle;
use tracing::{info, warn};
use types::{EthSpec, PublicKeyBytes};
use validator_dir::{keystore_password_path, Builder as ValidatorDirBuilder};
use validator_store::ValidatorStore;
use warp::Rejection;
use warp_utils::reject::{custom_bad_request, custom_server_error};
use zeroize::Zeroizing;
pub fn list<T: SlotClock + 'static, E: EthSpec>(
validator_store: Arc<ValidatorStore<T, E>>,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
) -> ListKeystoresResponse {
let initialized_validators_rwlock = validator_store.initialized_validators();
let initialized_validators = initialized_validators_rwlock.read();
@@ -62,7 +62,7 @@ pub fn import<T: SlotClock + 'static, E: EthSpec>(
request: ImportKeystoresRequest,
validator_dir: PathBuf,
secrets_dir: Option<PathBuf>,
validator_store: Arc<ValidatorStore<T, E>>,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
task_executor: TaskExecutor,
) -> Result<ImportKeystoresResponse, Rejection> {
// Check request validity. This is the only cases in which we should return a 4xx code.
@@ -117,7 +117,7 @@ pub fn import<T: SlotClock + 'static, E: EthSpec>(
)
} else if let Some(handle) = task_executor.handle() {
// Import the keystore.
match import_single_keystore(
match import_single_keystore::<_, E>(
keystore,
password,
validator_dir.clone(),
@@ -164,7 +164,7 @@ fn import_single_keystore<T: SlotClock + 'static, E: EthSpec>(
password: Zeroizing<String>,
validator_dir_path: PathBuf,
secrets_dir: Option<PathBuf>,
validator_store: &ValidatorStore<T, E>,
validator_store: &LighthouseValidatorStore<T, E>,
handle: Handle,
) -> Result<ImportKeystoreStatus, String> {
// Check if the validator key already exists, erroring if it is a remote signer validator.
@@ -234,7 +234,7 @@ fn import_single_keystore<T: SlotClock + 'static, E: EthSpec>(
pub fn delete<T: SlotClock + 'static, E: EthSpec>(
request: DeleteKeystoresRequest,
validator_store: Arc<ValidatorStore<T, E>>,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
task_executor: TaskExecutor,
) -> Result<DeleteKeystoresResponse, Rejection> {
let export_response = export(request, validator_store, task_executor)?;
@@ -265,7 +265,7 @@ pub fn delete<T: SlotClock + 'static, E: EthSpec>(
pub fn export<T: SlotClock + 'static, E: EthSpec>(
request: DeleteKeystoresRequest,
validator_store: Arc<ValidatorStore<T, E>>,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
task_executor: TaskExecutor,
) -> Result<ExportKeystoresResponse, Rejection> {
// Remove from initialized validators.

View File

@@ -13,6 +13,7 @@ use graffiti::{delete_graffiti, get_graffiti, set_graffiti};
use create_signed_voluntary_exit::create_signed_voluntary_exit;
use graffiti_file::{determine_graffiti, GraffitiFile};
use lighthouse_validator_store::LighthouseValidatorStore;
use validator_store::ValidatorStore;
use account_utils::{
@@ -41,7 +42,6 @@ use serde::{Deserialize, Serialize};
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::future::Future;
use std::marker::PhantomData;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
@@ -77,11 +77,11 @@ impl From<String> for Error {
/// A wrapper around all the items required to spawn the HTTP server.
///
/// The server will gracefully handle the case where any fields are `None`.
pub struct Context<T: SlotClock, E: EthSpec> {
pub struct Context<T: SlotClock, E> {
pub task_executor: TaskExecutor,
pub api_secret: ApiSecret,
pub block_service: Option<BlockService<T, E>>,
pub validator_store: Option<Arc<ValidatorStore<T, E>>>,
pub block_service: Option<BlockService<LighthouseValidatorStore<T, E>, T>>,
pub validator_store: Option<Arc<LighthouseValidatorStore<T, E>>>,
pub validator_dir: Option<PathBuf>,
pub secrets_dir: Option<PathBuf>,
pub graffiti_file: Option<GraffitiFile>,
@@ -90,7 +90,6 @@ pub struct Context<T: SlotClock, E: EthSpec> {
pub config: Config,
pub sse_logging_components: Option<SSELoggingComponents>,
pub slot_clock: T,
pub _phantom: PhantomData<E>,
}
/// Configuration for the HTTP server.
@@ -320,7 +319,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path("validators"))
.and(warp::path::end())
.and(validator_store_filter.clone())
.then(|validator_store: Arc<ValidatorStore<T, E>>| {
.then(|validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
let validators = validator_store
.initialized_validators()
@@ -345,7 +344,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>| {
|validator_pubkey: PublicKey, validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
let validator = validator_store
.initialized_validators()
@@ -395,7 +394,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(graffiti_file_filter.clone())
.and(graffiti_flag_filter)
.then(
|validator_store: Arc<ValidatorStore<T, E>>,
|validator_store: Arc<LighthouseValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>,
graffiti_flag: Option<Graffiti>| {
blocking_json_task(move || {
@@ -424,33 +423,35 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path("health"))
.and(warp::path::end())
.and(block_service_filter.clone())
.then(|block_filter: BlockService<T, E>| async move {
let mut result: HashMap<String, Vec<CandidateInfo>> = HashMap::new();
.then(
|block_filter: BlockService<LighthouseValidatorStore<T, E>, T>| async move {
let mut result: HashMap<String, Vec<CandidateInfo>> = HashMap::new();
let mut beacon_nodes = Vec::new();
for node in &*block_filter.beacon_nodes.candidates.read().await {
beacon_nodes.push(CandidateInfo {
index: node.index,
endpoint: node.beacon_node.to_string(),
health: *node.health.read().await,
});
}
result.insert("beacon_nodes".to_string(), beacon_nodes);
if let Some(proposer_nodes_list) = &block_filter.proposer_nodes {
let mut proposer_nodes = Vec::new();
for node in &*proposer_nodes_list.candidates.read().await {
proposer_nodes.push(CandidateInfo {
let mut beacon_nodes = Vec::new();
for node in &*block_filter.beacon_nodes.candidates.read().await {
beacon_nodes.push(CandidateInfo {
index: node.index,
endpoint: node.beacon_node.to_string(),
health: *node.health.read().await,
});
}
result.insert("proposer_nodes".to_string(), proposer_nodes);
}
result.insert("beacon_nodes".to_string(), beacon_nodes);
blocking_json_task(move || Ok(api_types::GenericResponse::from(result))).await
});
if let Some(proposer_nodes_list) = &block_filter.proposer_nodes {
let mut proposer_nodes = Vec::new();
for node in &*proposer_nodes_list.candidates.read().await {
proposer_nodes.push(CandidateInfo {
index: node.index,
endpoint: node.beacon_node.to_string(),
health: *node.health.read().await,
});
}
result.insert("proposer_nodes".to_string(), proposer_nodes);
}
blocking_json_task(move || Ok(api_types::GenericResponse::from(result))).await
},
);
// POST lighthouse/validators/
let post_validators = warp::path("lighthouse")
@@ -466,14 +467,14 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
move |body: Vec<api_types::ValidatorRequest>,
validator_dir: PathBuf,
secrets_dir: PathBuf,
validator_store: Arc<ValidatorStore<T, E>>,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
spec: Arc<ChainSpec>,
task_executor: TaskExecutor| {
blocking_json_task(move || {
let secrets_dir = store_passwords_in_secrets_dir.then_some(secrets_dir);
if let Some(handle) = task_executor.handle() {
let (validators, mnemonic) =
handle.block_on(create_validators_mnemonic(
handle.block_on(create_validators_mnemonic::<_, _, E>(
None,
None,
&body,
@@ -511,7 +512,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
move |body: api_types::CreateValidatorsMnemonicRequest,
validator_dir: PathBuf,
secrets_dir: PathBuf,
validator_store: Arc<ValidatorStore<T, E>>,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
spec: Arc<ChainSpec>,
task_executor: TaskExecutor| {
blocking_json_task(move || {
@@ -525,7 +526,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
))
})?;
let (validators, _mnemonic) =
handle.block_on(create_validators_mnemonic(
handle.block_on(create_validators_mnemonic::<_, _, E>(
Some(mnemonic),
Some(body.key_derivation_path_offset),
&body.validators,
@@ -558,7 +559,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
move |body: api_types::KeystoreValidatorsPostRequest,
validator_dir: PathBuf,
secrets_dir: PathBuf,
validator_store: Arc<ValidatorStore<T, E>>,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
task_executor: TaskExecutor| {
blocking_json_task(move || {
// Check to ensure the password is correct.
@@ -644,7 +645,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(task_executor_filter.clone())
.then(
|body: Vec<api_types::Web3SignerValidatorRequest>,
validator_store: Arc<ValidatorStore<T, E>>,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
task_executor: TaskExecutor| {
blocking_json_task(move || {
if let Some(handle) = task_executor.handle() {
@@ -672,7 +673,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
),
})
.collect();
handle.block_on(create_validators_web3signer(
handle.block_on(create_validators_web3signer::<_, E>(
web3signers,
&validator_store,
))?;
@@ -698,7 +699,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.then(
|validator_pubkey: PublicKey,
body: api_types::ValidatorPatchRequest,
validator_store: Arc<ValidatorStore<T, E>>,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>,
task_executor: TaskExecutor| {
blocking_json_task(move || {
@@ -851,7 +852,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>| {
|validator_pubkey: PublicKey, validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
if validator_store
.initialized_validators()
@@ -892,7 +893,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.then(
|validator_pubkey: PublicKey,
request: api_types::UpdateFeeRecipientRequest,
validator_store: Arc<ValidatorStore<T, E>>| {
validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
if validator_store
.initialized_validators()
@@ -928,7 +929,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>| {
|validator_pubkey: PublicKey, validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
if validator_store
.initialized_validators()
@@ -964,7 +965,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>| {
|validator_pubkey: PublicKey, validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
if validator_store
.initialized_validators()
@@ -997,7 +998,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.then(
|validator_pubkey: PublicKey,
request: api_types::UpdateGasLimitRequest,
validator_store: Arc<ValidatorStore<T, E>>| {
validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
if validator_store
.initialized_validators()
@@ -1033,7 +1034,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>| {
|validator_pubkey: PublicKey, validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || {
if validator_store
.initialized_validators()
@@ -1074,13 +1075,13 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.then(
|pubkey: PublicKey,
query: api_types::VoluntaryExitQuery,
validator_store: Arc<ValidatorStore<T, E>>,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
slot_clock: T,
task_executor: TaskExecutor| {
blocking_json_task(move || {
if let Some(handle) = task_executor.handle() {
let signed_voluntary_exit =
handle.block_on(create_signed_voluntary_exit(
handle.block_on(create_signed_voluntary_exit::<T, E>(
pubkey,
query.epoch,
validator_store,
@@ -1106,7 +1107,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(graffiti_flag_filter)
.then(
|pubkey: PublicKey,
validator_store: Arc<ValidatorStore<T, E>>,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
graffiti_flag: Option<Graffiti>| {
blocking_json_task(move || {
let graffiti = get_graffiti(pubkey.clone(), validator_store, graffiti_flag)?;
@@ -1130,7 +1131,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.then(
|pubkey: PublicKey,
query: SetGraffitiRequest,
validator_store: Arc<ValidatorStore<T, E>>,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>| {
blocking_json_task(move || {
if graffiti_file.is_some() {
@@ -1155,7 +1156,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(graffiti_file_filter.clone())
.then(
|pubkey: PublicKey,
validator_store: Arc<ValidatorStore<T, E>>,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>| {
blocking_json_task(move || {
if graffiti_file.is_some() {
@@ -1172,7 +1173,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
// GET /eth/v1/keystores
let get_std_keystores = std_keystores.and(validator_store_filter.clone()).then(
|validator_store: Arc<ValidatorStore<T, E>>| {
|validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || Ok(keystores::list(validator_store)))
},
);
@@ -1188,7 +1189,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
move |request, validator_dir, secrets_dir, validator_store, task_executor| {
let secrets_dir = store_passwords_in_secrets_dir.then_some(secrets_dir);
blocking_json_task(move || {
keystores::import(
keystores::import::<_, E>(
request,
validator_dir,
secrets_dir,
@@ -1210,7 +1211,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
// GET /eth/v1/remotekeys
let get_std_remotekeys = std_remotekeys.and(validator_store_filter.clone()).then(
|validator_store: Arc<ValidatorStore<T, E>>| {
|validator_store: Arc<LighthouseValidatorStore<T, E>>| {
blocking_json_task(move || Ok(remotekeys::list(validator_store)))
},
);
@@ -1221,7 +1222,9 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(task_executor_filter.clone())
.then(|request, validator_store, task_executor| {
blocking_json_task(move || remotekeys::import(request, validator_store, task_executor))
blocking_json_task(move || {
remotekeys::import::<_, E>(request, validator_store, task_executor)
})
});
// DELETE /eth/v1/remotekeys

View File

@@ -8,6 +8,7 @@ use eth2::lighthouse_vc::std_types::{
ListRemotekeysResponse, SingleListRemotekeysResponse, Status,
};
use initialized_validators::{Error, InitializedValidators};
use lighthouse_validator_store::LighthouseValidatorStore;
use slot_clock::SlotClock;
use std::sync::Arc;
use task_executor::TaskExecutor;
@@ -15,12 +16,11 @@ use tokio::runtime::Handle;
use tracing::{info, warn};
use types::{EthSpec, PublicKeyBytes};
use url::Url;
use validator_store::ValidatorStore;
use warp::Rejection;
use warp_utils::reject::custom_server_error;
pub fn list<T: SlotClock + 'static, E: EthSpec>(
validator_store: Arc<ValidatorStore<T, E>>,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
) -> ListRemotekeysResponse {
let initialized_validators_rwlock = validator_store.initialized_validators();
let initialized_validators = initialized_validators_rwlock.read();
@@ -50,7 +50,7 @@ pub fn list<T: SlotClock + 'static, E: EthSpec>(
pub fn import<T: SlotClock + 'static, E: EthSpec>(
request: ImportRemotekeysRequest,
validator_store: Arc<ValidatorStore<T, E>>,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
task_executor: TaskExecutor,
) -> Result<ImportRemotekeysResponse, Rejection> {
info!(
@@ -63,8 +63,12 @@ pub fn import<T: SlotClock + 'static, E: EthSpec>(
for remotekey in request.remote_keys {
let status = if let Some(handle) = task_executor.handle() {
// Import the keystore.
match import_single_remotekey(remotekey.pubkey, remotekey.url, &validator_store, handle)
{
match import_single_remotekey::<_, E>(
remotekey.pubkey,
remotekey.url,
&validator_store,
handle,
) {
Ok(status) => Status::ok(status),
Err(e) => {
warn!(
@@ -89,7 +93,7 @@ pub fn import<T: SlotClock + 'static, E: EthSpec>(
fn import_single_remotekey<T: SlotClock + 'static, E: EthSpec>(
pubkey: PublicKeyBytes,
url: String,
validator_store: &ValidatorStore<T, E>,
validator_store: &LighthouseValidatorStore<T, E>,
handle: Handle,
) -> Result<ImportRemotekeyStatus, String> {
if let Err(url_err) = Url::parse(&url) {
@@ -143,7 +147,7 @@ fn import_single_remotekey<T: SlotClock + 'static, E: EthSpec>(
pub fn delete<T: SlotClock + 'static, E: EthSpec>(
request: DeleteRemotekeysRequest,
validator_store: Arc<ValidatorStore<T, E>>,
validator_store: Arc<LighthouseValidatorStore<T, E>>,
task_executor: TaskExecutor,
) -> Result<DeleteRemotekeysResponse, Rejection> {
info!(

View File

@@ -14,19 +14,19 @@ use eth2::{
use eth2_keystore::KeystoreBuilder;
use initialized_validators::key_cache::{KeyCache, CACHE_FILENAME};
use initialized_validators::{InitializedValidators, OnDecryptFailure};
use lighthouse_validator_store::{Config as ValidatorStoreConfig, LighthouseValidatorStore};
use parking_lot::RwLock;
use sensitive_url::SensitiveUrl;
use slashing_protection::{SlashingDatabase, SLASHING_PROTECTION_FILENAME};
use slot_clock::{SlotClock, TestingSlotClock};
use std::future::Future;
use std::marker::PhantomData;
use std::net::{IpAddr, Ipv4Addr};
use std::sync::Arc;
use std::time::Duration;
use task_executor::test_utils::TestRuntime;
use tempfile::{tempdir, TempDir};
use tokio::sync::oneshot;
use validator_store::{Config as ValidatorStoreConfig, ValidatorStore};
use validator_services::block_service::BlockService;
use zeroize::Zeroizing;
pub const PASSWORD_BYTES: &[u8] = &[42, 50, 37];
@@ -54,7 +54,7 @@ pub struct Web3SignerValidatorScenario {
pub struct ApiTester {
pub client: ValidatorClientHttpClient,
pub initialized_validators: Arc<RwLock<InitializedValidators>>,
pub validator_store: Arc<ValidatorStore<TestingSlotClock, E>>,
pub validator_store: Arc<LighthouseValidatorStore<TestingSlotClock, E>>,
pub url: SensitiveUrl,
pub api_token: String,
pub test_runtime: TestRuntime,
@@ -101,7 +101,7 @@ impl ApiTester {
let test_runtime = TestRuntime::default();
let validator_store = Arc::new(ValidatorStore::<_, E>::new(
let validator_store = Arc::new(LighthouseValidatorStore::new(
initialized_validators,
slashing_protection,
Hash256::repeat_byte(42),
@@ -121,7 +121,7 @@ impl ApiTester {
let context = Arc::new(Context {
task_executor: test_runtime.task_executor.clone(),
api_secret,
block_service: None,
block_service: None::<BlockService<LighthouseValidatorStore<_, _>, _>>,
validator_dir: Some(validator_dir.path().into()),
secrets_dir: Some(secrets_dir.path().into()),
validator_store: Some(validator_store.clone()),
@@ -131,7 +131,6 @@ impl ApiTester {
config: http_config,
sse_logging_components: None,
slot_clock,
_phantom: PhantomData,
});
let ctx = context;
let (shutdown_tx, shutdown_rx) = oneshot::channel();
@@ -139,7 +138,7 @@ impl ApiTester {
// It's not really interesting why this triggered, just that it happened.
let _ = shutdown_rx.await;
};
let (listening_socket, server) = super::serve(ctx, server_shutdown).unwrap();
let (listening_socket, server) = super::serve::<_, E>(ctx, server_shutdown).unwrap();
tokio::spawn(server);
@@ -638,7 +637,7 @@ impl ApiTester {
assert_eq!(
self.validator_store
.get_builder_proposals(&validator.voting_pubkey),
.get_builder_proposals_testing_only(&validator.voting_pubkey),
builder_proposals
);

View File

@@ -18,12 +18,12 @@ use eth2::{
Error as ApiError,
};
use eth2_keystore::KeystoreBuilder;
use lighthouse_validator_store::{Config as ValidatorStoreConfig, LighthouseValidatorStore};
use parking_lot::RwLock;
use sensitive_url::SensitiveUrl;
use slashing_protection::{SlashingDatabase, SLASHING_PROTECTION_FILENAME};
use slot_clock::{SlotClock, TestingSlotClock};
use std::future::Future;
use std::marker::PhantomData;
use std::net::{IpAddr, Ipv4Addr};
use std::str::FromStr;
use std::sync::Arc;
@@ -31,7 +31,7 @@ use std::time::Duration;
use task_executor::test_utils::TestRuntime;
use tempfile::{tempdir, TempDir};
use types::graffiti::GraffitiString;
use validator_store::{Config as ValidatorStoreConfig, ValidatorStore};
use validator_store::ValidatorStore;
use zeroize::Zeroizing;
const PASSWORD_BYTES: &[u8] = &[42, 50, 37];
@@ -42,7 +42,7 @@ type E = MainnetEthSpec;
struct ApiTester {
client: ValidatorClientHttpClient,
initialized_validators: Arc<RwLock<InitializedValidators>>,
validator_store: Arc<ValidatorStore<TestingSlotClock, E>>,
validator_store: Arc<LighthouseValidatorStore<TestingSlotClock, E>>,
url: SensitiveUrl,
slot_clock: TestingSlotClock,
_validator_dir: TempDir,
@@ -91,7 +91,7 @@ impl ApiTester {
let test_runtime = TestRuntime::default();
let validator_store = Arc::new(ValidatorStore::<_, E>::new(
let validator_store = Arc::new(LighthouseValidatorStore::<_, E>::new(
initialized_validators,
slashing_protection,
Hash256::repeat_byte(42),
@@ -129,11 +129,10 @@ impl ApiTester {
},
sse_logging_components: None,
slot_clock: slot_clock.clone(),
_phantom: PhantomData,
});
let ctx = context.clone();
let (listening_socket, server) =
super::serve(ctx, test_runtime.task_executor.exit()).unwrap();
super::serve::<_, E>(ctx, test_runtime.task_executor.exit()).unwrap();
tokio::spawn(server);
@@ -670,7 +669,7 @@ impl ApiTester {
assert_eq!(
self.validator_store
.get_builder_proposals(&validator.voting_pubkey),
.get_builder_proposals_testing_only(&validator.voting_pubkey),
builder_proposals
);
@@ -686,7 +685,7 @@ impl ApiTester {
assert_eq!(
self.validator_store
.get_builder_boost_factor(&validator.voting_pubkey),
.get_builder_boost_factor_testing_only(&validator.voting_pubkey),
builder_boost_factor
);
@@ -702,7 +701,7 @@ impl ApiTester {
assert_eq!(
self.validator_store
.determine_validator_builder_boost_factor(&validator.voting_pubkey),
.determine_builder_boost_factor(&validator.voting_pubkey),
builder_boost_factor
);
@@ -712,7 +711,7 @@ impl ApiTester {
pub fn assert_default_builder_boost_factor(self, builder_boost_factor: Option<u64>) -> Self {
assert_eq!(
self.validator_store
.determine_default_builder_boost_factor(),
.determine_builder_boost_factor(&PublicKeyBytes::empty()),
builder_boost_factor
);
@@ -728,7 +727,7 @@ impl ApiTester {
assert_eq!(
self.validator_store
.get_prefer_builder_proposals(&validator.voting_pubkey),
.get_prefer_builder_proposals_testing_only(&validator.voting_pubkey),
prefer_builder_proposals
);
@@ -1159,7 +1158,7 @@ async fn validator_derived_builder_boost_factor_with_process_defaults() {
})
.await
.assert_default_builder_boost_factor(Some(80))
.assert_validator_derived_builder_boost_factor(0, None)
.assert_validator_derived_builder_boost_factor(0, Some(80))
.await
.set_builder_proposals(0, false)
.await

View File

@@ -8,12 +8,13 @@ use eth2::lighthouse_vc::{
types::Web3SignerValidatorRequest,
};
use itertools::Itertools;
use lighthouse_validator_store::DEFAULT_GAS_LIMIT;
use rand::{rngs::SmallRng, Rng, SeedableRng};
use slashing_protection::interchange::{Interchange, InterchangeMetadata};
use std::{collections::HashMap, path::Path};
use tokio::runtime::Handle;
use types::{attestation::AttestationBase, Address};
use validator_store::DEFAULT_GAS_LIMIT;
use validator_store::ValidatorStore;
use zeroize::Zeroizing;
fn new_keystore(password: Zeroizing<String>) -> Keystore {

View File

@@ -6,6 +6,7 @@ authors = ["Sigma Prime <contact@sigmaprime.io>"]
[dependencies]
health_metrics = { workspace = true }
lighthouse_validator_store = { workspace = true }
lighthouse_version = { workspace = true }
logging = { workspace = true }
malloc_utils = { workspace = true }
@@ -17,6 +18,5 @@ tracing = { workspace = true }
types = { workspace = true }
validator_metrics = { workspace = true }
validator_services = { workspace = true }
validator_store = { workspace = true }
warp = { workspace = true }
warp_utils = { workspace = true }

View File

@@ -2,6 +2,7 @@
//!
//! For other endpoints, see the `http_api` crate.
use lighthouse_validator_store::LighthouseValidatorStore;
use lighthouse_version::version_with_platform;
use logging::crit;
use malloc_utils::scrape_allocator_metrics;
@@ -15,7 +16,6 @@ use std::time::{SystemTime, UNIX_EPOCH};
use tracing::info;
use types::EthSpec;
use validator_services::duties_service::DutiesService;
use validator_store::ValidatorStore;
use warp::{http::Response, Filter};
#[derive(Debug)]
@@ -36,17 +36,19 @@ impl From<String> for Error {
}
}
type ValidatorStore<E> = LighthouseValidatorStore<SystemTimeSlotClock, E>;
/// Contains objects which have shared access from inside/outside of the metrics server.
pub struct Shared<E: EthSpec> {
pub validator_store: Option<Arc<ValidatorStore<SystemTimeSlotClock, E>>>,
pub duties_service: Option<Arc<DutiesService<SystemTimeSlotClock, E>>>,
pub struct Shared<E> {
pub validator_store: Option<Arc<ValidatorStore<E>>>,
pub duties_service: Option<Arc<DutiesService<ValidatorStore<E>, SystemTimeSlotClock>>>,
pub genesis_time: Option<u64>,
}
/// A wrapper around all the items required to spawn the HTTP server.
///
/// The server will gracefully handle the case where any fields are `None`.
pub struct Context<E: EthSpec> {
pub struct Context<E> {
pub config: Config,
pub shared: RwLock<Shared<E>>,
}

View File

@@ -0,0 +1,30 @@
[package]
name = "lighthouse_validator_store"
version = "0.1.0"
edition = { workspace = true }
authors = ["Sigma Prime <contact@sigmaprime.io>"]
[dependencies]
account_utils = { workspace = true }
beacon_node_fallback = { workspace = true }
doppelganger_service = { workspace = true }
either = { workspace = true }
environment = { workspace = true }
eth2 = { workspace = true }
initialized_validators = { workspace = true }
logging = { workspace = true }
parking_lot = { workspace = true }
serde = { workspace = true }
signing_method = { workspace = true }
slashing_protection = { workspace = true }
slot_clock = { workspace = true }
task_executor = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
types = { workspace = true }
validator_metrics = { workspace = true }
validator_store = { workspace = true }
[dev-dependencies]
futures = { workspace = true }
logging = { workspace = true }

File diff suppressed because it is too large Load Diff

View File

@@ -12,7 +12,7 @@ use std::sync::Arc;
use task_executor::TaskExecutor;
use types::*;
use url::Url;
use web3signer::{ForkInfo, SigningRequest, SigningResponse};
use web3signer::{ForkInfo, MessageType, SigningRequest, SigningResponse};
pub use web3signer::Web3SignerObject;
@@ -152,8 +152,13 @@ impl SigningMethod {
genesis_validators_root,
});
self.get_signature_from_root(signable_message, signing_root, executor, fork_info)
.await
self.get_signature_from_root::<E, Payload>(
signable_message,
signing_root,
executor,
fork_info,
)
.await
}
pub async fn get_signature_from_root<E: EthSpec, Payload: AbstractExecPayload<E>>(
@@ -227,11 +232,7 @@ impl SigningMethod {
// Determine the Web3Signer message type.
let message_type = object.message_type();
if matches!(
object,
Web3SignerObject::Deposit { .. } | Web3SignerObject::ValidatorRegistration(_)
) && fork_info.is_some()
if matches!(message_type, MessageType::ValidatorRegistration) && fork_info.is_some()
{
return Err(Error::GenesisForkVersionRequired);
}

View File

@@ -27,7 +27,7 @@ pub const SLASHING_PROTECTION_FILENAME: &str = "slashing_protection.sqlite";
/// The attestation or block is not safe to sign.
///
/// This could be because it's slashable, or because an error occurred.
#[derive(PartialEq, Debug)]
#[derive(PartialEq, Debug, Clone)]
pub enum NotSafe {
UnregisteredValidator(PublicKeyBytes),
DisabledValidator(PublicKeyBytes),

View File

@@ -10,7 +10,7 @@ pub struct SignedAttestation {
}
/// Reasons why an attestation may be slashable (or invalid).
#[derive(PartialEq, Debug)]
#[derive(PartialEq, Debug, Clone)]
pub enum InvalidAttestation {
/// The attestation has the same target epoch as an attestation from the DB (enclosed).
DoubleVote(SignedAttestation),

View File

@@ -9,7 +9,7 @@ pub struct SignedBlock {
}
/// Reasons why a block may be slashable.
#[derive(PartialEq, Debug)]
#[derive(PartialEq, Debug, Clone)]
pub enum InvalidBlock {
DoubleBlockProposal(SignedBlock),
SlotViolatesLowerBound { block_slot: Slot, bound_slot: Slot },

View File

@@ -10,6 +10,7 @@ use directory::{
use eth2::types::Graffiti;
use graffiti_file::GraffitiFile;
use initialized_validators::Config as InitializedValidatorsConfig;
use lighthouse_validator_store::Config as ValidatorStoreConfig;
use sensitive_url::SensitiveUrl;
use serde::{Deserialize, Serialize};
use std::fs;
@@ -20,7 +21,6 @@ use tracing::{info, warn};
use types::GRAFFITI_BYTES_LEN;
use validator_http_api::{self, PK_FILENAME};
use validator_http_metrics;
use validator_store::Config as ValidatorStoreConfig;
pub const DEFAULT_BEACON_NODE: &str = "http://localhost:5052/";

View File

@@ -15,7 +15,7 @@ pub const SLOT_DELAY_DENOMINATOR: u32 = 12;
pub fn start_latency_service<T: SlotClock + 'static, E: EthSpec>(
context: RuntimeContext<E>,
slot_clock: T,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
beacon_nodes: Arc<BeaconNodeFallback<T>>,
) {
let future = async move {
loop {

View File

@@ -20,6 +20,7 @@ use doppelganger_service::DoppelgangerService;
use environment::RuntimeContext;
use eth2::{reqwest::ClientBuilder, BeaconNodeHttpClient, StatusCode, Timeouts};
use initialized_validators::Error::UnableToOpenVotingKeystore;
use lighthouse_validator_store::LighthouseValidatorStore;
use notifier::spawn_notifier;
use parking_lot::RwLock;
use reqwest::Certificate;
@@ -27,7 +28,6 @@ use slot_clock::SlotClock;
use slot_clock::SystemTimeSlotClock;
use std::fs::File;
use std::io::Read;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
@@ -42,12 +42,11 @@ use validator_http_api::ApiSecret;
use validator_services::{
attestation_service::{AttestationService, AttestationServiceBuilder},
block_service::{BlockService, BlockServiceBuilder},
duties_service::{self, DutiesService},
duties_service::{self, DutiesService, DutiesServiceBuilder},
preparation_service::{PreparationService, PreparationServiceBuilder},
sync::SyncDutiesMap,
sync_committee_service::SyncCommitteeService,
};
use validator_store::ValidatorStore;
use validator_store::ValidatorStore as ValidatorStoreTrait;
/// The interval between attempts to contact the beacon node during startup.
const RETRY_DELAY: Duration = Duration::from_secs(2);
@@ -72,20 +71,22 @@ const HTTP_GET_VALIDATOR_BLOCK_TIMEOUT_QUOTIENT: u32 = 4;
const DOPPELGANGER_SERVICE_NAME: &str = "doppelganger";
type ValidatorStore<E> = LighthouseValidatorStore<SystemTimeSlotClock, E>;
#[derive(Clone)]
pub struct ProductionValidatorClient<E: EthSpec> {
context: RuntimeContext<E>,
duties_service: Arc<DutiesService<SystemTimeSlotClock, E>>,
block_service: BlockService<SystemTimeSlotClock, E>,
attestation_service: AttestationService<SystemTimeSlotClock, E>,
sync_committee_service: SyncCommitteeService<SystemTimeSlotClock, E>,
duties_service: Arc<DutiesService<ValidatorStore<E>, SystemTimeSlotClock>>,
block_service: BlockService<ValidatorStore<E>, SystemTimeSlotClock>,
attestation_service: AttestationService<ValidatorStore<E>, SystemTimeSlotClock>,
sync_committee_service: SyncCommitteeService<ValidatorStore<E>, SystemTimeSlotClock>,
doppelganger_service: Option<Arc<DoppelgangerService>>,
preparation_service: PreparationService<SystemTimeSlotClock, E>,
validator_store: Arc<ValidatorStore<SystemTimeSlotClock, E>>,
preparation_service: PreparationService<ValidatorStore<E>, SystemTimeSlotClock>,
validator_store: Arc<ValidatorStore<E>>,
slot_clock: SystemTimeSlotClock,
http_api_listen_addr: Option<SocketAddr>,
config: Config,
beacon_nodes: Arc<BeaconNodeFallback<SystemTimeSlotClock, E>>,
beacon_nodes: Arc<BeaconNodeFallback<SystemTimeSlotClock>>,
genesis_time: u64,
}
@@ -367,14 +368,14 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
// Initialize the number of connected, avaliable beacon nodes to 0.
set_gauge(&validator_metrics::AVAILABLE_BEACON_NODES_COUNT, 0);
let mut beacon_nodes: BeaconNodeFallback<_, E> = BeaconNodeFallback::new(
let mut beacon_nodes: BeaconNodeFallback<_> = BeaconNodeFallback::new(
candidates,
config.beacon_node_fallback,
config.broadcast_topics.clone(),
context.eth2_config.spec.clone(),
);
let mut proposer_nodes: BeaconNodeFallback<_, E> = BeaconNodeFallback::new(
let mut proposer_nodes: BeaconNodeFallback<_> = BeaconNodeFallback::new(
proposer_candidates,
config.beacon_node_fallback,
config.broadcast_topics.clone(),
@@ -383,7 +384,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
// Perform some potentially long-running initialization tasks.
let (genesis_time, genesis_validators_root) = tokio::select! {
tuple = init_from_beacon_node(&beacon_nodes, &proposer_nodes) => tuple?,
tuple = init_from_beacon_node::<E>(&beacon_nodes, &proposer_nodes) => tuple?,
() = context.executor.exit() => return Err("Shutting down".to_string())
};
@@ -402,10 +403,10 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
proposer_nodes.set_slot_clock(slot_clock.clone());
let beacon_nodes = Arc::new(beacon_nodes);
start_fallback_updater_service(context.clone(), beacon_nodes.clone())?;
start_fallback_updater_service::<_, E>(context.executor.clone(), beacon_nodes.clone())?;
let proposer_nodes = Arc::new(proposer_nodes);
start_fallback_updater_service(context.clone(), proposer_nodes.clone())?;
start_fallback_updater_service::<_, E>(context.executor.clone(), proposer_nodes.clone())?;
let doppelganger_service = if config.enable_doppelganger_protection {
Some(Arc::new(DoppelgangerService::default()))
@@ -413,7 +414,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
None
};
let validator_store = Arc::new(ValidatorStore::new(
let validator_store = Arc::new(LighthouseValidatorStore::new(
validators,
slashing_protection,
genesis_validators_root,
@@ -439,21 +440,18 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
validator_store.prune_slashing_protection_db(slot.epoch(E::slots_per_epoch()), true);
}
let duties_context = context.service_context("duties".into());
let duties_service = Arc::new(DutiesService {
attesters: <_>::default(),
proposers: <_>::default(),
sync_duties: SyncDutiesMap::new(config.distributed),
slot_clock: slot_clock.clone(),
beacon_nodes: beacon_nodes.clone(),
validator_store: validator_store.clone(),
unknown_validator_next_poll_slots: <_>::default(),
spec: context.eth2_config.spec.clone(),
context: duties_context,
enable_high_validator_count_metrics: config.enable_high_validator_count_metrics,
distributed: config.distributed,
disable_attesting: config.disable_attesting,
});
let duties_service = Arc::new(
DutiesServiceBuilder::new()
.slot_clock(slot_clock.clone())
.beacon_nodes(beacon_nodes.clone())
.validator_store(validator_store.clone())
.spec(context.eth2_config.spec.clone())
.executor(context.executor.clone())
.enable_high_validator_count_metrics(config.enable_high_validator_count_metrics)
.distributed(config.distributed)
.disable_attesting(config.disable_attesting)
.build()?,
);
// Update the metrics server.
if let Some(ctx) = &validator_metrics_ctx {
@@ -465,7 +463,8 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
.slot_clock(slot_clock.clone())
.validator_store(validator_store.clone())
.beacon_nodes(beacon_nodes.clone())
.runtime_context(context.service_context("block".into()))
.executor(context.executor.clone())
.chain_spec(context.eth2_config.spec.clone())
.graffiti(config.graffiti)
.graffiti_file(config.graffiti_file.clone());
@@ -481,7 +480,8 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
.slot_clock(slot_clock.clone())
.validator_store(validator_store.clone())
.beacon_nodes(beacon_nodes.clone())
.runtime_context(context.service_context("attestation".into()))
.executor(context.executor.clone())
.chain_spec(context.eth2_config.spec.clone())
.disable(config.disable_attesting)
.build()?;
@@ -489,7 +489,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
.slot_clock(slot_clock.clone())
.validator_store(validator_store.clone())
.beacon_nodes(beacon_nodes.clone())
.runtime_context(context.service_context("preparation".into()))
.executor(context.executor.clone())
.builder_registration_timestamp_override(config.builder_registration_timestamp_override)
.validator_registration_batch_size(config.validator_registration_batch_size)
.build()?;
@@ -499,7 +499,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
validator_store.clone(),
slot_clock.clone(),
beacon_nodes.clone(),
context.service_context("sync_committee".into()),
context.executor.clone(),
);
Ok(Self {
@@ -542,12 +542,11 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
config: self.config.http_api.clone(),
sse_logging_components: self.context.sse_logging_components.clone(),
slot_clock: self.slot_clock.clone(),
_phantom: PhantomData,
});
let exit = self.context.executor.exit();
let (listen_addr, server) = validator_http_api::serve(ctx, exit)
let (listen_addr, server) = validator_http_api::serve::<_, E>(ctx, exit)
.map_err(|e| format!("Unable to start HTTP API server: {:?}", e))?;
self.context
@@ -615,12 +614,12 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
}
async fn init_from_beacon_node<E: EthSpec>(
beacon_nodes: &BeaconNodeFallback<SystemTimeSlotClock, E>,
proposer_nodes: &BeaconNodeFallback<SystemTimeSlotClock, E>,
beacon_nodes: &BeaconNodeFallback<SystemTimeSlotClock>,
proposer_nodes: &BeaconNodeFallback<SystemTimeSlotClock>,
) -> Result<(u64, Hash256), String> {
loop {
beacon_nodes.update_all_candidates().await;
proposer_nodes.update_all_candidates().await;
beacon_nodes.update_all_candidates::<E>().await;
proposer_nodes.update_all_candidates::<E>().await;
let num_available = beacon_nodes.num_available().await;
let num_total = beacon_nodes.num_total().await;
@@ -697,8 +696,8 @@ async fn init_from_beacon_node<E: EthSpec>(
Ok((genesis.genesis_time, genesis.genesis_validators_root))
}
async fn wait_for_genesis<E: EthSpec>(
beacon_nodes: &BeaconNodeFallback<SystemTimeSlotClock, E>,
async fn wait_for_genesis(
beacon_nodes: &BeaconNodeFallback<SystemTimeSlotClock>,
genesis_time: u64,
) -> Result<(), String> {
let now = SystemTime::now()
@@ -740,8 +739,8 @@ async fn wait_for_genesis<E: EthSpec>(
/// Request the version from the node, looping back and trying again on failure. Exit once the node
/// has been contacted.
async fn poll_whilst_waiting_for_genesis<E: EthSpec>(
beacon_nodes: &BeaconNodeFallback<SystemTimeSlotClock, E>,
async fn poll_whilst_waiting_for_genesis(
beacon_nodes: &BeaconNodeFallback<SystemTimeSlotClock>,
genesis_time: Duration,
) -> Result<(), String> {
loop {

View File

@@ -1,4 +1,5 @@
use crate::{DutiesService, ProductionValidatorClient};
use lighthouse_validator_store::LighthouseValidatorStore;
use metrics::set_gauge;
use slot_clock::SlotClock;
use tokio::time::{sleep, Duration};
@@ -32,7 +33,9 @@ pub fn spawn_notifier<E: EthSpec>(client: &ProductionValidatorClient<E>) -> Resu
}
/// Performs a single notification routine.
async fn notify<T: SlotClock + 'static, E: EthSpec>(duties_service: &DutiesService<T, E>) {
async fn notify<T: SlotClock + 'static, E: EthSpec>(
duties_service: &DutiesService<LighthouseValidatorStore<T, E>, T>,
) {
let (candidate_info, num_available, num_synced) =
duties_service.beacon_nodes.get_notifier_info().await;
let num_total = candidate_info.len();

View File

@@ -6,10 +6,8 @@ authors = ["Sigma Prime <contact@sigmaprime.io>"]
[dependencies]
beacon_node_fallback = { workspace = true }
bls = { workspace = true }
doppelganger_service = { workspace = true }
bls = { workspace = true }
either = { workspace = true }
environment = { workspace = true }
eth2 = { workspace = true }
futures = { workspace = true }
graffiti_file = { workspace = true }
@@ -17,6 +15,7 @@ logging = { workspace = true }
parking_lot = { workspace = true }
safe_arith = { workspace = true }
slot_clock = { workspace = true }
task_executor = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tree_hash = { workspace = true }

View File

@@ -1,13 +1,13 @@
use crate::duties_service::{DutiesService, DutyAndProof};
use beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use either::Either;
use environment::RuntimeContext;
use futures::future::join_all;
use logging::crit;
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::time::{sleep, sleep_until, Duration, Instant};
use tracing::{debug, error, info, trace, warn};
use tree_hash::TreeHash;
@@ -16,33 +16,35 @@ use validator_store::{Error as ValidatorStoreError, ValidatorStore};
/// Builds an `AttestationService`.
#[derive(Default)]
pub struct AttestationServiceBuilder<T: SlotClock + 'static, E: EthSpec> {
duties_service: Option<Arc<DutiesService<T, E>>>,
validator_store: Option<Arc<ValidatorStore<T, E>>>,
pub struct AttestationServiceBuilder<S: ValidatorStore, T: SlotClock + 'static> {
duties_service: Option<Arc<DutiesService<S, T>>>,
validator_store: Option<Arc<S>>,
slot_clock: Option<T>,
beacon_nodes: Option<Arc<BeaconNodeFallback<T, E>>>,
context: Option<RuntimeContext<E>>,
beacon_nodes: Option<Arc<BeaconNodeFallback<T>>>,
executor: Option<TaskExecutor>,
chain_spec: Option<Arc<ChainSpec>>,
disable: bool,
}
impl<T: SlotClock + 'static, E: EthSpec> AttestationServiceBuilder<T, E> {
impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationServiceBuilder<S, T> {
pub fn new() -> Self {
Self {
duties_service: None,
validator_store: None,
slot_clock: None,
beacon_nodes: None,
context: None,
executor: None,
chain_spec: None,
disable: false,
}
}
pub fn duties_service(mut self, service: Arc<DutiesService<T, E>>) -> Self {
pub fn duties_service(mut self, service: Arc<DutiesService<S, T>>) -> Self {
self.duties_service = Some(service);
self
}
pub fn validator_store(mut self, store: Arc<ValidatorStore<T, E>>) -> Self {
pub fn validator_store(mut self, store: Arc<S>) -> Self {
self.validator_store = Some(store);
self
}
@@ -52,13 +54,18 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationServiceBuilder<T, E> {
self
}
pub fn beacon_nodes(mut self, beacon_nodes: Arc<BeaconNodeFallback<T, E>>) -> Self {
pub fn beacon_nodes(mut self, beacon_nodes: Arc<BeaconNodeFallback<T>>) -> Self {
self.beacon_nodes = Some(beacon_nodes);
self
}
pub fn runtime_context(mut self, context: RuntimeContext<E>) -> Self {
self.context = Some(context);
pub fn executor(mut self, executor: TaskExecutor) -> Self {
self.executor = Some(executor);
self
}
pub fn chain_spec(mut self, chain_spec: Arc<ChainSpec>) -> Self {
self.chain_spec = Some(chain_spec);
self
}
@@ -67,7 +74,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationServiceBuilder<T, E> {
self
}
pub fn build(self) -> Result<AttestationService<T, E>, String> {
pub fn build(self) -> Result<AttestationService<S, T>, String> {
Ok(AttestationService {
inner: Arc::new(Inner {
duties_service: self
@@ -82,9 +89,12 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationServiceBuilder<T, E> {
beacon_nodes: self
.beacon_nodes
.ok_or("Cannot build AttestationService without beacon_nodes")?,
context: self
.context
.ok_or("Cannot build AttestationService without runtime_context")?,
executor: self
.executor
.ok_or("Cannot build AttestationService without executor")?,
chain_spec: self
.chain_spec
.ok_or("Cannot build AttestationService without chain_spec")?,
disable: self.disable,
}),
})
@@ -92,12 +102,13 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationServiceBuilder<T, E> {
}
/// Helper to minimise `Arc` usage.
pub struct Inner<T, E: EthSpec> {
duties_service: Arc<DutiesService<T, E>>,
validator_store: Arc<ValidatorStore<T, E>>,
pub struct Inner<S, T> {
duties_service: Arc<DutiesService<S, T>>,
validator_store: Arc<S>,
slot_clock: T,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
context: RuntimeContext<E>,
beacon_nodes: Arc<BeaconNodeFallback<T>>,
executor: TaskExecutor,
chain_spec: Arc<ChainSpec>,
disable: bool,
}
@@ -106,11 +117,11 @@ pub struct Inner<T, E: EthSpec> {
/// If any validators are on the same committee, a single attestation will be downloaded and
/// returned to the beacon node. This attestation will have a signature from each of the
/// validators.
pub struct AttestationService<T, E: EthSpec> {
inner: Arc<Inner<T, E>>,
pub struct AttestationService<S, T> {
inner: Arc<Inner<S, T>>,
}
impl<T, E: EthSpec> Clone for AttestationService<T, E> {
impl<S, T> Clone for AttestationService<S, T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
@@ -118,15 +129,15 @@ impl<T, E: EthSpec> Clone for AttestationService<T, E> {
}
}
impl<T, E: EthSpec> Deref for AttestationService<T, E> {
type Target = Inner<T, E>;
impl<S, T> Deref for AttestationService<S, T> {
type Target = Inner<S, T>;
fn deref(&self) -> &Self::Target {
self.inner.deref()
}
}
impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S, T> {
/// Starts the service which periodically produces attestations.
pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> {
if self.disable {
@@ -145,7 +156,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
"Attestation production service started"
);
let executor = self.context.executor.clone();
let executor = self.executor.clone();
let interval_fut = async move {
loop {
@@ -205,7 +216,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
.into_iter()
.for_each(|(committee_index, validator_duties)| {
// Spawn a separate task for each attestation.
self.inner.context.executor.spawn_ignoring_error(
self.inner.executor.spawn_ignoring_error(
self.clone().publish_attestations_and_aggregates(
slot,
committee_index,
@@ -332,7 +343,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
.slot_clock
.now()
.ok_or("Unable to determine current slot from clock")?
.epoch(E::slots_per_epoch());
.epoch(S::E::slots_per_epoch());
let attestation_data = self
.beacon_nodes
@@ -357,7 +368,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
let attestation_data = attestation_data_ref;
// Ensure that the attestation matches the duties.
if !duty.match_attestation_data::<E>(attestation_data, &self.context.eth2_config.spec) {
if !duty.match_attestation_data::<S::E>(attestation_data, &self.chain_spec) {
crit!(
validator = ?duty.pubkey,
duty_slot = %duty.slot,
@@ -369,14 +380,14 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
return None;
}
let mut attestation = match Attestation::<E>::empty_for_signing(
let mut attestation = match Attestation::empty_for_signing(
duty.committee_index,
duty.committee_length as usize,
attestation_data.slot,
attestation_data.beacon_block_root,
attestation_data.source,
attestation_data.target,
&self.context.eth2_config.spec,
&self.chain_spec,
) {
Ok(attestation) => attestation,
Err(err) => {
@@ -439,10 +450,8 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
return Ok(None);
}
let fork_name = self
.context
.eth2_config
.spec
.fork_name_at_slot::<E>(attestation_data.slot);
.chain_spec
.fork_name_at_slot::<S::E>(attestation_data.slot);
// Post the attestations to the BN.
match self
@@ -476,7 +485,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
.collect::<Vec<_>>();
beacon_node
.post_beacon_pool_attestations_v2::<E>(
.post_beacon_pool_attestations_v2::<S::E>(
Either::Right(single_attestations),
fork_name,
)
@@ -538,10 +547,8 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
}
let fork_name = self
.context
.eth2_config
.spec
.fork_name_at_slot::<E>(attestation_data.slot);
.chain_spec
.fork_name_at_slot::<S::E>(attestation_data.slot);
let aggregated_attestation = &self
.beacon_nodes
@@ -585,7 +592,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
let duty = &duty_and_proof.duty;
let selection_proof = duty_and_proof.selection_proof.as_ref()?;
if !duty.match_attestation_data::<E>(attestation_data, &self.context.eth2_config.spec) {
if !duty.match_attestation_data::<S::E>(attestation_data, &self.chain_spec) {
crit!("Inconsistent validator duties during signing");
return None;
}
@@ -689,11 +696,11 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
/// Start the task at `pruning_instant` to avoid interference with other tasks.
fn spawn_slashing_protection_pruning_task(&self, slot: Slot, pruning_instant: Instant) {
let attestation_service = self.clone();
let executor = self.inner.context.executor.clone();
let current_epoch = slot.epoch(E::slots_per_epoch());
let executor = self.inner.executor.clone();
let current_epoch = slot.epoch(S::E::slots_per_epoch());
// Wait for `pruning_instant` in a regular task, and then switch to a blocking one.
self.inner.context.executor.spawn(
self.inner.executor.spawn(
async move {
sleep_until(pruning_instant).await;

View File

@@ -1,6 +1,5 @@
use beacon_node_fallback::{ApiTopic, BeaconNodeFallback, Error as FallbackError, Errors};
use bls::SignatureBytes;
use environment::RuntimeContext;
use eth2::types::{FullBlockContents, PublishBlockRequest};
use eth2::{BeaconNodeHttpClient, StatusCode};
use graffiti_file::{determine_graffiti, GraffitiFile};
@@ -11,11 +10,12 @@ use std::future::Future;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use tracing::{debug, error, info, trace, warn};
use types::{
BlindedBeaconBlock, BlockType, EthSpec, Graffiti, PublicKeyBytes, SignedBlindedBeaconBlock,
Slot,
BlindedBeaconBlock, BlockType, ChainSpec, EthSpec, Graffiti, PublicKeyBytes,
SignedBlindedBeaconBlock, Slot,
};
use validator_store::{Error as ValidatorStoreError, ValidatorStore};
@@ -45,30 +45,32 @@ impl From<Errors<BlockError>> for BlockError {
/// Builds a `BlockService`.
#[derive(Default)]
pub struct BlockServiceBuilder<T, E: EthSpec> {
validator_store: Option<Arc<ValidatorStore<T, E>>>,
pub struct BlockServiceBuilder<S, T> {
validator_store: Option<Arc<S>>,
slot_clock: Option<Arc<T>>,
beacon_nodes: Option<Arc<BeaconNodeFallback<T, E>>>,
proposer_nodes: Option<Arc<BeaconNodeFallback<T, E>>>,
context: Option<RuntimeContext<E>>,
beacon_nodes: Option<Arc<BeaconNodeFallback<T>>>,
proposer_nodes: Option<Arc<BeaconNodeFallback<T>>>,
executor: Option<TaskExecutor>,
chain_spec: Option<Arc<ChainSpec>>,
graffiti: Option<Graffiti>,
graffiti_file: Option<GraffitiFile>,
}
impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
impl<S: ValidatorStore, T: SlotClock + 'static> BlockServiceBuilder<S, T> {
pub fn new() -> Self {
Self {
validator_store: None,
slot_clock: None,
beacon_nodes: None,
proposer_nodes: None,
context: None,
executor: None,
chain_spec: None,
graffiti: None,
graffiti_file: None,
}
}
pub fn validator_store(mut self, store: Arc<ValidatorStore<T, E>>) -> Self {
pub fn validator_store(mut self, store: Arc<S>) -> Self {
self.validator_store = Some(store);
self
}
@@ -78,18 +80,23 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
self
}
pub fn beacon_nodes(mut self, beacon_nodes: Arc<BeaconNodeFallback<T, E>>) -> Self {
pub fn beacon_nodes(mut self, beacon_nodes: Arc<BeaconNodeFallback<T>>) -> Self {
self.beacon_nodes = Some(beacon_nodes);
self
}
pub fn proposer_nodes(mut self, proposer_nodes: Arc<BeaconNodeFallback<T, E>>) -> Self {
pub fn proposer_nodes(mut self, proposer_nodes: Arc<BeaconNodeFallback<T>>) -> Self {
self.proposer_nodes = Some(proposer_nodes);
self
}
pub fn runtime_context(mut self, context: RuntimeContext<E>) -> Self {
self.context = Some(context);
pub fn executor(mut self, executor: TaskExecutor) -> Self {
self.executor = Some(executor);
self
}
pub fn chain_spec(mut self, chain_spec: Arc<ChainSpec>) -> Self {
self.chain_spec = Some(chain_spec);
self
}
@@ -103,7 +110,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
self
}
pub fn build(self) -> Result<BlockService<T, E>, String> {
pub fn build(self) -> Result<BlockService<S, T>, String> {
Ok(BlockService {
inner: Arc::new(Inner {
validator_store: self
@@ -115,9 +122,12 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
beacon_nodes: self
.beacon_nodes
.ok_or("Cannot build BlockService without beacon_node")?,
context: self
.context
.ok_or("Cannot build BlockService without runtime_context")?,
executor: self
.executor
.ok_or("Cannot build BlockService without executor")?,
chain_spec: self
.chain_spec
.ok_or("Cannot build BlockService without chain_spec")?,
proposer_nodes: self.proposer_nodes,
graffiti: self.graffiti,
graffiti_file: self.graffiti_file,
@@ -128,12 +138,12 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
// Combines a set of non-block-proposing `beacon_nodes` and only-block-proposing
// `proposer_nodes`.
pub struct ProposerFallback<T, E: EthSpec> {
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
proposer_nodes: Option<Arc<BeaconNodeFallback<T, E>>>,
pub struct ProposerFallback<T> {
beacon_nodes: Arc<BeaconNodeFallback<T>>,
proposer_nodes: Option<Arc<BeaconNodeFallback<T>>>,
}
impl<T: SlotClock, E: EthSpec> ProposerFallback<T, E> {
impl<T: SlotClock> ProposerFallback<T> {
// Try `func` on `self.proposer_nodes` first. If that doesn't work, try `self.beacon_nodes`.
pub async fn request_proposers_first<F, Err, R>(&self, func: F) -> Result<(), Errors<Err>>
where
@@ -178,22 +188,23 @@ impl<T: SlotClock, E: EthSpec> ProposerFallback<T, E> {
}
/// Helper to minimise `Arc` usage.
pub struct Inner<T, E: EthSpec> {
validator_store: Arc<ValidatorStore<T, E>>,
pub struct Inner<S, T> {
validator_store: Arc<S>,
slot_clock: Arc<T>,
pub beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
pub proposer_nodes: Option<Arc<BeaconNodeFallback<T, E>>>,
context: RuntimeContext<E>,
pub beacon_nodes: Arc<BeaconNodeFallback<T>>,
pub proposer_nodes: Option<Arc<BeaconNodeFallback<T>>>,
executor: TaskExecutor,
chain_spec: Arc<ChainSpec>,
graffiti: Option<Graffiti>,
graffiti_file: Option<GraffitiFile>,
}
/// Attempts to produce attestations for any block producer(s) at the start of the epoch.
pub struct BlockService<T, E: EthSpec> {
inner: Arc<Inner<T, E>>,
pub struct BlockService<S, T> {
inner: Arc<Inner<S, T>>,
}
impl<T, E: EthSpec> Clone for BlockService<T, E> {
impl<S, T> Clone for BlockService<S, T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
@@ -201,8 +212,8 @@ impl<T, E: EthSpec> Clone for BlockService<T, E> {
}
}
impl<T, E: EthSpec> Deref for BlockService<T, E> {
type Target = Inner<T, E>;
impl<S, T> Deref for BlockService<S, T> {
type Target = Inner<S, T>;
fn deref(&self) -> &Self::Target {
self.inner.deref()
@@ -215,14 +226,14 @@ pub struct BlockServiceNotification {
pub block_proposers: Vec<PublicKeyBytes>,
}
impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
impl<S: ValidatorStore + 'static, T: SlotClock + 'static> BlockService<S, T> {
pub fn start_update_service(
self,
mut notification_rx: mpsc::Receiver<BlockServiceNotification>,
) -> Result<(), String> {
info!("Block production service started");
let executor = self.inner.context.executor.clone();
let executor = self.inner.executor.clone();
executor.spawn(
async move {
@@ -258,7 +269,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
return Ok(());
}
if slot == self.context.eth2_config.spec.genesis_slot {
if slot == self.chain_spec.genesis_slot {
debug!(
proposers = format!("{:?}", notification.block_proposers),
"Not producing block at genesis slot"
@@ -285,9 +296,11 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
}
for validator_pubkey in proposers {
let builder_boost_factor = self.get_builder_boost_factor(&validator_pubkey);
let builder_boost_factor = self
.validator_store
.determine_builder_boost_factor(&validator_pubkey);
let service = self.clone();
self.inner.context.executor.spawn(
self.inner.executor.spawn(
async move {
let result = service
.publish_block(slot, validator_pubkey, builder_boost_factor)
@@ -314,30 +327,35 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
#[allow(clippy::too_many_arguments)]
async fn sign_and_publish_block(
&self,
proposer_fallback: ProposerFallback<T, E>,
proposer_fallback: ProposerFallback<T>,
slot: Slot,
graffiti: Option<Graffiti>,
validator_pubkey: &PublicKeyBytes,
unsigned_block: UnsignedBlock<E>,
unsigned_block: UnsignedBlock<S::E>,
) -> Result<(), BlockError> {
let signing_timer = validator_metrics::start_timer(&validator_metrics::BLOCK_SIGNING_TIMES);
let res = match unsigned_block {
let (block, maybe_blobs) = match unsigned_block {
UnsignedBlock::Full(block_contents) => {
let (block, maybe_blobs) = block_contents.deconstruct();
self.validator_store
.sign_block(*validator_pubkey, block, slot)
.await
.map(|b| SignedBlock::Full(PublishBlockRequest::new(Arc::new(b), maybe_blobs)))
(block.into(), maybe_blobs)
}
UnsignedBlock::Blinded(block) => self
.validator_store
.sign_block(*validator_pubkey, block, slot)
.await
.map(Arc::new)
.map(SignedBlock::Blinded),
UnsignedBlock::Blinded(block) => (block.into(), None),
};
let res = self
.validator_store
.sign_block(*validator_pubkey, block, slot)
.await
.map(|block| match block {
validator_store::SignedBlock::Full(block) => {
SignedBlock::Full(PublishBlockRequest::new(Arc::new(block), maybe_blobs))
}
validator_store::SignedBlock::Blinded(block) => {
SignedBlock::Blinded(Arc::new(block))
}
});
let signed_block = match res {
Ok(block) => block,
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
@@ -404,7 +422,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
let randao_reveal = match self
.validator_store
.randao_reveal(validator_pubkey, slot.epoch(E::slots_per_epoch()))
.randao_reveal(validator_pubkey, slot.epoch(S::E::slots_per_epoch()))
.await
{
Ok(signature) => signature.into(),
@@ -487,7 +505,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
async fn publish_signed_block_contents(
&self,
signed_block: &SignedBlock<E>,
signed_block: &SignedBlock<S::E>,
beacon_node: BeaconNodeHttpClient,
) -> Result<(), BlockError> {
let slot = signed_block.slot();
@@ -523,9 +541,9 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
graffiti: Option<Graffiti>,
proposer_index: Option<u64>,
builder_boost_factor: Option<u64>,
) -> Result<UnsignedBlock<E>, BlockError> {
) -> Result<UnsignedBlock<S::E>, BlockError> {
let (block_response, _) = beacon_node
.get_validator_blocks_v3::<E>(
.get_validator_blocks_v3::<S::E>(
slot,
randao_reveal_ref,
graffiti.as_ref(),
@@ -553,36 +571,6 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
Ok::<_, BlockError>(unsigned_block)
}
/// Returns the builder boost factor of the given public key.
/// The priority order for fetching this value is:
///
/// 1. validator_definitions.yml
/// 2. process level flag
fn get_builder_boost_factor(&self, validator_pubkey: &PublicKeyBytes) -> Option<u64> {
// Apply per validator configuration first.
let validator_builder_boost_factor = self
.validator_store
.determine_validator_builder_boost_factor(validator_pubkey);
// Fallback to process-wide configuration if needed.
let maybe_builder_boost_factor = validator_builder_boost_factor.or_else(|| {
self.validator_store
.determine_default_builder_boost_factor()
});
if let Some(builder_boost_factor) = maybe_builder_boost_factor {
// if builder boost factor is set to 100 it should be treated
// as None to prevent unnecessary calculations that could
// lead to loss of information.
if builder_boost_factor == 100 {
return None;
}
return Some(builder_boost_factor);
}
None
}
}
pub enum UnsignedBlock<E: EthSpec> {

View File

@@ -10,8 +10,6 @@ use crate::block_service::BlockServiceNotification;
use crate::sync::poll_sync_committee_duties;
use crate::sync::SyncDutiesMap;
use beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use doppelganger_service::DoppelgangerStatus;
use environment::RuntimeContext;
use eth2::types::{
AttesterData, BeaconCommitteeSubscription, DutiesResponse, ProposerData, StateId, ValidatorId,
};
@@ -24,11 +22,12 @@ use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::{sync::mpsc::Sender, time::sleep};
use tracing::{debug, error, info, warn};
use types::{ChainSpec, Epoch, EthSpec, Hash256, PublicKeyBytes, SelectionProof, Slot};
use validator_metrics::{get_int_gauge, set_int_gauge, ATTESTATION_DUTY};
use validator_store::{Error as ValidatorStoreError, ValidatorStore};
use validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore};
/// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch.
const HISTORICAL_DUTIES_EPOCHS: u64 = 2;
@@ -87,16 +86,16 @@ const _: () = assert!(ATTESTATION_SUBSCRIPTION_OFFSETS[0] > MIN_ATTESTATION_SUBS
// The info in the enum variants is displayed in logging, clippy thinks it's dead code.
#[derive(Debug)]
pub enum Error {
pub enum Error<T> {
UnableToReadSlotClock,
FailedToDownloadAttesters(#[allow(dead_code)] String),
FailedToProduceSelectionProof(#[allow(dead_code)] ValidatorStoreError),
FailedToProduceSelectionProof(#[allow(dead_code)] ValidatorStoreError<T>),
InvalidModulo(#[allow(dead_code)] ArithError),
Arith(#[allow(dead_code)] ArithError),
SyncDutiesNotFound(#[allow(dead_code)] u64),
}
impl From<ArithError> for Error {
impl<T> From<ArithError> for Error<T> {
fn from(e: ArithError) -> Self {
Self::Arith(e)
}
@@ -125,11 +124,11 @@ pub struct SubscriptionSlots {
/// Create a selection proof for `duty`.
///
/// Return `Ok(None)` if the attesting validator is not an aggregator.
async fn make_selection_proof<T: SlotClock + 'static, E: EthSpec>(
async fn make_selection_proof<S: ValidatorStore + 'static>(
duty: &AttesterData,
validator_store: &ValidatorStore<T, E>,
validator_store: &S,
spec: &ChainSpec,
) -> Result<Option<SelectionProof>, Error> {
) -> Result<Option<SelectionProof>, Error<S::Error>> {
let selection_proof = validator_store
.produce_selection_proof(duty.pubkey, duty.slot)
.await
@@ -205,25 +204,132 @@ type DependentRoot = Hash256;
type AttesterMap = HashMap<PublicKeyBytes, HashMap<Epoch, (DependentRoot, DutyAndProof)>>;
type ProposerMap = HashMap<Epoch, (DependentRoot, Vec<ProposerData>)>;
pub struct DutiesServiceBuilder<S, T> {
/// Provides the canonical list of locally-managed validators.
validator_store: Option<Arc<S>>,
/// Tracks the current slot.
slot_clock: Option<T>,
/// Provides HTTP access to remote beacon nodes.
beacon_nodes: Option<Arc<BeaconNodeFallback<T>>>,
/// The runtime for spawning tasks.
executor: Option<TaskExecutor>,
/// The current chain spec.
spec: Option<Arc<ChainSpec>>,
//// Whether we permit large validator counts in the metrics.
enable_high_validator_count_metrics: bool,
/// If this validator is running in distributed mode.
distributed: bool,
disable_attesting: bool,
}
impl<S, T> Default for DutiesServiceBuilder<S, T> {
fn default() -> Self {
Self::new()
}
}
impl<S, T> DutiesServiceBuilder<S, T> {
pub fn new() -> Self {
Self {
validator_store: None,
slot_clock: None,
beacon_nodes: None,
executor: None,
spec: None,
enable_high_validator_count_metrics: false,
distributed: false,
disable_attesting: false,
}
}
pub fn validator_store(mut self, validator_store: Arc<S>) -> Self {
self.validator_store = Some(validator_store);
self
}
pub fn slot_clock(mut self, slot_clock: T) -> Self {
self.slot_clock = Some(slot_clock);
self
}
pub fn beacon_nodes(mut self, beacon_nodes: Arc<BeaconNodeFallback<T>>) -> Self {
self.beacon_nodes = Some(beacon_nodes);
self
}
pub fn executor(mut self, executor: TaskExecutor) -> Self {
self.executor = Some(executor);
self
}
pub fn spec(mut self, spec: Arc<ChainSpec>) -> Self {
self.spec = Some(spec);
self
}
pub fn enable_high_validator_count_metrics(
mut self,
enable_high_validator_count_metrics: bool,
) -> Self {
self.enable_high_validator_count_metrics = enable_high_validator_count_metrics;
self
}
pub fn distributed(mut self, distributed: bool) -> Self {
self.distributed = distributed;
self
}
pub fn disable_attesting(mut self, disable_attesting: bool) -> Self {
self.disable_attesting = disable_attesting;
self
}
pub fn build(self) -> Result<DutiesService<S, T>, String> {
Ok(DutiesService {
attesters: Default::default(),
proposers: Default::default(),
sync_duties: SyncDutiesMap::new(self.distributed),
validator_store: self
.validator_store
.ok_or("Cannot build DutiesService without validator_store")?,
unknown_validator_next_poll_slots: Default::default(),
slot_clock: self
.slot_clock
.ok_or("Cannot build DutiesService without slot_clock")?,
beacon_nodes: self
.beacon_nodes
.ok_or("Cannot build DutiesService without beacon_nodes")?,
executor: self
.executor
.ok_or("Cannot build DutiesService without executor")?,
spec: self.spec.ok_or("Cannot build DutiesService without spec")?,
enable_high_validator_count_metrics: self.enable_high_validator_count_metrics,
distributed: self.distributed,
disable_attesting: self.disable_attesting,
})
}
}
/// See the module-level documentation.
pub struct DutiesService<T, E: EthSpec> {
pub struct DutiesService<S, T> {
/// Maps a validator public key to their duties for each epoch.
pub attesters: RwLock<AttesterMap>,
/// Maps an epoch to all *local* proposers in this epoch. Notably, this does not contain
/// proposals for any validators which are not registered locally.
pub proposers: RwLock<ProposerMap>,
/// Map from validator index to sync committee duties.
pub sync_duties: SyncDutiesMap<E>,
pub sync_duties: SyncDutiesMap,
/// Provides the canonical list of locally-managed validators.
pub validator_store: Arc<ValidatorStore<T, E>>,
pub validator_store: Arc<S>,
/// Maps unknown validator pubkeys to the next slot time when a poll should be conducted again.
pub unknown_validator_next_poll_slots: RwLock<HashMap<PublicKeyBytes, Slot>>,
/// Tracks the current slot.
pub slot_clock: T,
/// Provides HTTP access to remote beacon nodes.
pub beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
pub beacon_nodes: Arc<BeaconNodeFallback<T>>,
/// The runtime for spawning tasks.
pub context: RuntimeContext<E>,
pub executor: TaskExecutor,
/// The current chain spec.
pub spec: Arc<ChainSpec>,
//// Whether we permit large validator counts in the metrics.
@@ -233,7 +339,7 @@ pub struct DutiesService<T, E: EthSpec> {
pub disable_attesting: bool,
}
impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
impl<S: ValidatorStore, T: SlotClock + 'static> DutiesService<S, T> {
/// Returns the total number of validators known to the duties service.
pub fn total_validator_count(&self) -> usize {
self.validator_store.num_voting_validators()
@@ -284,7 +390,7 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
/// It is possible that multiple validators have an identical proposal slot, however that is
/// likely the result of heavy forking (lol) or inconsistent beacon node connections.
pub fn block_proposers(&self, slot: Slot) -> HashSet<PublicKeyBytes> {
let epoch = slot.epoch(E::slots_per_epoch());
let epoch = slot.epoch(S::E::slots_per_epoch());
// Only collect validators that are considered safe in terms of doppelganger protection.
let signing_pubkeys: HashSet<_> = self
@@ -309,7 +415,7 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
/// Returns all `ValidatorDuty` for the given `slot`.
pub fn attesters(&self, slot: Slot) -> Vec<DutyAndProof> {
let epoch = slot.epoch(E::slots_per_epoch());
let epoch = slot.epoch(S::E::slots_per_epoch());
// Only collect validators that are considered safe in terms of doppelganger protection.
let signing_pubkeys: HashSet<_> = self
@@ -347,15 +453,15 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
/// process every slot, which has the chance of creating a theoretically unlimited backlog of tasks.
/// It was a conscious decision to choose to drop tasks on an overloaded/latent system rather than
/// overload it even more.
pub fn start_update_service<T: SlotClock + 'static, E: EthSpec>(
core_duties_service: Arc<DutiesService<T, E>>,
pub fn start_update_service<S: ValidatorStore + 'static, T: SlotClock + 'static>(
core_duties_service: Arc<DutiesService<S, T>>,
mut block_service_tx: Sender<BlockServiceNotification>,
) {
/*
* Spawn the task which updates the map of pubkey to validator index.
*/
let duties_service = core_duties_service.clone();
core_duties_service.context.executor.spawn(
core_duties_service.executor.spawn(
async move {
loop {
// Run this poll before the wait, this should hopefully download all the indices
@@ -378,7 +484,7 @@ pub fn start_update_service<T: SlotClock + 'static, E: EthSpec>(
* Spawn the task which keeps track of local block proposal duties.
*/
let duties_service = core_duties_service.clone();
core_duties_service.context.executor.spawn(
core_duties_service.executor.spawn(
async move {
loop {
if let Some(duration) = duties_service.slot_clock.duration_to_next_slot() {
@@ -411,7 +517,7 @@ pub fn start_update_service<T: SlotClock + 'static, E: EthSpec>(
* Spawn the task which keeps track of local attestation duties.
*/
let duties_service = core_duties_service.clone();
core_duties_service.context.executor.spawn(
core_duties_service.executor.spawn(
async move {
loop {
if let Some(duration) = duties_service.slot_clock.duration_to_next_slot() {
@@ -436,7 +542,7 @@ pub fn start_update_service<T: SlotClock + 'static, E: EthSpec>(
// Spawn the task which keeps track of local sync committee duties.
let duties_service = core_duties_service.clone();
core_duties_service.context.executor.spawn(
core_duties_service.executor.spawn(
async move {
loop {
if let Err(e) = poll_sync_committee_duties(&duties_service).await {
@@ -466,8 +572,8 @@ pub fn start_update_service<T: SlotClock + 'static, E: EthSpec>(
/// Iterate through all the voting pubkeys in the `ValidatorStore` and attempt to learn any unknown
/// validator indices.
async fn poll_validator_indices<T: SlotClock + 'static, E: EthSpec>(
duties_service: &DutiesService<T, E>,
async fn poll_validator_indices<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &DutiesService<S, T>,
) {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::DUTIES_SERVICE_TIMES,
@@ -486,16 +592,14 @@ async fn poll_validator_indices<T: SlotClock + 'static, E: EthSpec>(
// This is on its own line to avoid some weirdness with locks and if statements.
let is_known = duties_service
.validator_store
.initialized_validators()
.read()
.get_index(&pubkey)
.validator_index(&pubkey)
.is_some();
if !is_known {
let current_slot_opt = duties_service.slot_clock.now();
if let Some(current_slot) = current_slot_opt {
let is_first_slot_of_epoch = current_slot % E::slots_per_epoch() == 0;
let is_first_slot_of_epoch = current_slot % S::E::slots_per_epoch() == 0;
// Query an unknown validator later if it was queried within the last epoch, or if
// the current slot is the first slot of an epoch.
@@ -546,9 +650,7 @@ async fn poll_validator_indices<T: SlotClock + 'static, E: EthSpec>(
);
duties_service
.validator_store
.initialized_validators()
.write()
.set_index(&pubkey, response.data.index);
.set_validator_index(&pubkey, response.data.index);
duties_service
.unknown_validator_next_poll_slots
@@ -559,7 +661,7 @@ async fn poll_validator_indices<T: SlotClock + 'static, E: EthSpec>(
// the beacon chain.
Ok(None) => {
if let Some(current_slot) = current_slot_opt {
let next_poll_slot = current_slot.saturating_add(E::slots_per_epoch());
let next_poll_slot = current_slot.saturating_add(S::E::slots_per_epoch());
duties_service
.unknown_validator_next_poll_slots
.write()
@@ -590,9 +692,9 @@ async fn poll_validator_indices<T: SlotClock + 'static, E: EthSpec>(
/// 2. As above, but for the next-epoch.
/// 3. Push out any attestation subnet subscriptions to the BN.
/// 4. Prune old entries from `duties_service.attesters`.
async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
) -> Result<(), Error> {
async fn poll_beacon_attesters<S: ValidatorStore + 'static, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
) -> Result<(), Error<S::Error>> {
let current_epoch_timer = validator_metrics::start_timer_vec(
&validator_metrics::DUTIES_SERVICE_TIMES,
&[validator_metrics::UPDATE_ATTESTERS_CURRENT_EPOCH],
@@ -602,7 +704,7 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
.slot_clock
.now()
.ok_or(Error::UnableToReadSlotClock)?;
let current_epoch = current_slot.epoch(E::slots_per_epoch());
let current_epoch = current_slot.epoch(S::E::slots_per_epoch());
let next_epoch = current_epoch + 1;
// Collect *all* pubkeys, even those undergoing doppelganger protection.
@@ -616,10 +718,8 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
let local_indices = {
let mut local_indices = Vec::with_capacity(local_pubkeys.len());
let vals_ref = duties_service.validator_store.initialized_validators();
let vals = vals_ref.read();
for &pubkey in &local_pubkeys {
if let Some(validator_index) = vals.get_index(&pubkey) {
if let Some(validator_index) = duties_service.validator_store.validator_index(&pubkey) {
local_indices.push(validator_index)
}
}
@@ -643,7 +743,7 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
)
}
update_per_validator_duty_metrics::<T, E>(duties_service, current_epoch, current_slot);
update_per_validator_duty_metrics(duties_service, current_epoch, current_slot);
drop(current_epoch_timer);
let next_epoch_timer = validator_metrics::start_timer_vec(
@@ -664,7 +764,7 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
)
}
update_per_validator_duty_metrics::<T, E>(duties_service, next_epoch, current_slot);
update_per_validator_duty_metrics(duties_service, next_epoch, current_slot);
drop(next_epoch_timer);
let subscriptions_timer = validator_metrics::start_timer_vec(
@@ -685,7 +785,7 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
* std::cmp::max(
1,
local_pubkeys.len() * ATTESTATION_SUBSCRIPTION_OFFSETS.len()
/ E::slots_per_epoch() as usize,
/ S::E::slots_per_epoch() as usize,
)
/ overallocation_denominator;
let mut subscriptions = Vec::with_capacity(num_expected_subscriptions);
@@ -781,12 +881,12 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
/// For the given `local_indices` and `local_pubkeys`, download the duties for the given `epoch` and
/// store them in `duties_service.attesters`.
async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
async fn poll_beacon_attesters_for_epoch<S: ValidatorStore + 'static, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
epoch: Epoch,
local_indices: &[u64],
local_pubkeys: &HashSet<PublicKeyBytes>,
) -> Result<(), Error> {
) -> Result<(), Error<S::Error>> {
// No need to bother the BN if we don't have any validators.
if local_indices.is_empty() {
debug!(
@@ -930,7 +1030,7 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
// Spawn the background task to compute selection proofs.
let subservice = duties_service.clone();
duties_service.context.executor.spawn(
duties_service.executor.spawn(
async move {
fill_in_selection_proofs(subservice, new_duties, dependent_root).await;
},
@@ -941,8 +1041,8 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
}
/// Get a filtered list of local validators for which we don't already know their duties for that epoch
fn get_uninitialized_validators<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
fn get_uninitialized_validators<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
epoch: &Epoch,
local_pubkeys: &HashSet<PublicKeyBytes>,
) -> Vec<u64> {
@@ -958,8 +1058,8 @@ fn get_uninitialized_validators<T: SlotClock + 'static, E: EthSpec>(
.collect::<Vec<_>>()
}
fn update_per_validator_duty_metrics<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
fn update_per_validator_duty_metrics<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
epoch: Epoch,
current_slot: Slot,
) {
@@ -974,14 +1074,14 @@ fn update_per_validator_duty_metrics<T: SlotClock + 'static, E: EthSpec>(
get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()])
{
let existing_slot = Slot::new(existing_slot_gauge.get() as u64);
let existing_epoch = existing_slot.epoch(E::slots_per_epoch());
let existing_epoch = existing_slot.epoch(S::E::slots_per_epoch());
// First condition ensures that we switch to the next epoch duty slot
// once the current epoch duty slot passes.
// Second condition is to ensure that next epoch duties don't override
// current epoch duties.
if existing_slot < current_slot
|| (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch
|| (duty_slot.epoch(S::E::slots_per_epoch()) <= existing_epoch
&& duty_slot > current_slot
&& duty_slot != existing_slot)
{
@@ -999,11 +1099,11 @@ fn update_per_validator_duty_metrics<T: SlotClock + 'static, E: EthSpec>(
}
}
async fn post_validator_duties_attester<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
async fn post_validator_duties_attester<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
epoch: Epoch,
validator_indices: &[u64],
) -> Result<DutiesResponse<Vec<AttesterData>>, Error> {
) -> Result<DutiesResponse<Vec<AttesterData>>, Error<S::Error>> {
duties_service
.beacon_nodes
.first_success(|beacon_node| async move {
@@ -1023,8 +1123,8 @@ async fn post_validator_duties_attester<T: SlotClock + 'static, E: EthSpec>(
///
/// Duties are computed in batches each slot. If a re-org is detected then the process will
/// terminate early as it is assumed the selection proofs from `duties` are no longer relevant.
async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
duties_service: Arc<DutiesService<T, E>>,
async fn fill_in_selection_proofs<S: ValidatorStore + 'static, T: SlotClock + 'static>(
duties_service: Arc<DutiesService<S, T>>,
duties: Vec<AttesterData>,
dependent_root: Hash256,
) {
@@ -1075,7 +1175,7 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
.then(|duty| async {
let opt_selection_proof = make_selection_proof(
&duty,
&duties_service.validator_store,
duties_service.validator_store.as_ref(),
&duties_service.spec,
)
.await?;
@@ -1114,7 +1214,7 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
};
let attester_map = attesters.entry(duty.pubkey).or_default();
let epoch = duty.slot.epoch(E::slots_per_epoch());
let epoch = duty.slot.epoch(S::E::slots_per_epoch());
match attester_map.entry(epoch) {
hash_map::Entry::Occupied(mut entry) => {
// No need to update duties for which no proof was computed.
@@ -1191,10 +1291,10 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
/// through the slow path every time. I.e., the proposal will only happen after we've been able to
/// download and process the duties from the BN. This means it is very important to ensure this
/// function is as fast as possible.
async fn poll_beacon_proposers<T: SlotClock + 'static, E: EthSpec>(
duties_service: &DutiesService<T, E>,
async fn poll_beacon_proposers<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &DutiesService<S, T>,
block_service_tx: &mut Sender<BlockServiceNotification>,
) -> Result<(), Error> {
) -> Result<(), Error<S::Error>> {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::DUTIES_SERVICE_TIMES,
&[validator_metrics::UPDATE_PROPOSERS],
@@ -1204,17 +1304,17 @@ async fn poll_beacon_proposers<T: SlotClock + 'static, E: EthSpec>(
.slot_clock
.now()
.ok_or(Error::UnableToReadSlotClock)?;
let current_epoch = current_slot.epoch(E::slots_per_epoch());
let current_epoch = current_slot.epoch(S::E::slots_per_epoch());
// Notify the block proposal service for any proposals that we have in our cache.
//
// See the function-level documentation for more information.
let initial_block_proposers = duties_service.block_proposers(current_slot);
notify_block_production_service(
notify_block_production_service::<S>(
current_slot,
&initial_block_proposers,
block_service_tx,
&duties_service.validator_store,
duties_service.validator_store.as_ref(),
)
.await;
@@ -1296,11 +1396,11 @@ async fn poll_beacon_proposers<T: SlotClock + 'static, E: EthSpec>(
//
// See the function-level documentation for more reasoning about this behaviour.
if !additional_block_producers.is_empty() {
notify_block_production_service(
notify_block_production_service::<S>(
current_slot,
&additional_block_producers,
block_service_tx,
&duties_service.validator_store,
duties_service.validator_store.as_ref(),
)
.await;
debug!(
@@ -1321,11 +1421,11 @@ async fn poll_beacon_proposers<T: SlotClock + 'static, E: EthSpec>(
}
/// Notify the block service if it should produce a block.
async fn notify_block_production_service<T: SlotClock + 'static, E: EthSpec>(
async fn notify_block_production_service<S: ValidatorStore>(
current_slot: Slot,
block_proposers: &HashSet<PublicKeyBytes>,
block_service_tx: &mut Sender<BlockServiceNotification>,
validator_store: &ValidatorStore<T, E>,
validator_store: &S,
) {
let non_doppelganger_proposers = block_proposers
.iter()

View File

@@ -1,7 +1,5 @@
use beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use bls::PublicKeyBytes;
use doppelganger_service::DoppelgangerStatus;
use environment::RuntimeContext;
use parking_lot::RwLock;
use slot_clock::SlotClock;
use std::collections::HashMap;
@@ -9,13 +7,16 @@ use std::hash::Hash;
use std::ops::Deref;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use task_executor::TaskExecutor;
use tokio::time::{sleep, Duration};
use tracing::{debug, error, info, warn};
use types::{
Address, ChainSpec, EthSpec, ProposerPreparationData, SignedValidatorRegistrationData,
ValidatorRegistrationData,
};
use validator_store::{Error as ValidatorStoreError, ProposalData, ValidatorStore};
use validator_store::{
DoppelgangerStatus, Error as ValidatorStoreError, ProposalData, ValidatorStore,
};
/// Number of epochs before the Bellatrix hard fork to begin posting proposer preparations.
const PROPOSER_PREPARATION_LOOKAHEAD_EPOCHS: u64 = 2;
@@ -25,28 +26,28 @@ const EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION: u64 = 1;
/// Builds an `PreparationService`.
#[derive(Default)]
pub struct PreparationServiceBuilder<T: SlotClock + 'static, E: EthSpec> {
validator_store: Option<Arc<ValidatorStore<T, E>>>,
pub struct PreparationServiceBuilder<S: ValidatorStore, T: SlotClock + 'static> {
validator_store: Option<Arc<S>>,
slot_clock: Option<T>,
beacon_nodes: Option<Arc<BeaconNodeFallback<T, E>>>,
context: Option<RuntimeContext<E>>,
beacon_nodes: Option<Arc<BeaconNodeFallback<T>>>,
executor: Option<TaskExecutor>,
builder_registration_timestamp_override: Option<u64>,
validator_registration_batch_size: Option<usize>,
}
impl<T: SlotClock + 'static, E: EthSpec> PreparationServiceBuilder<T, E> {
impl<S: ValidatorStore, T: SlotClock + 'static> PreparationServiceBuilder<S, T> {
pub fn new() -> Self {
Self {
validator_store: None,
slot_clock: None,
beacon_nodes: None,
context: None,
executor: None,
builder_registration_timestamp_override: None,
validator_registration_batch_size: None,
}
}
pub fn validator_store(mut self, store: Arc<ValidatorStore<T, E>>) -> Self {
pub fn validator_store(mut self, store: Arc<S>) -> Self {
self.validator_store = Some(store);
self
}
@@ -56,13 +57,13 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationServiceBuilder<T, E> {
self
}
pub fn beacon_nodes(mut self, beacon_nodes: Arc<BeaconNodeFallback<T, E>>) -> Self {
pub fn beacon_nodes(mut self, beacon_nodes: Arc<BeaconNodeFallback<T>>) -> Self {
self.beacon_nodes = Some(beacon_nodes);
self
}
pub fn runtime_context(mut self, context: RuntimeContext<E>) -> Self {
self.context = Some(context);
pub fn executor(mut self, executor: TaskExecutor) -> Self {
self.executor = Some(executor);
self
}
@@ -82,7 +83,7 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationServiceBuilder<T, E> {
self
}
pub fn build(self) -> Result<PreparationService<T, E>, String> {
pub fn build(self) -> Result<PreparationService<S, T>, String> {
Ok(PreparationService {
inner: Arc::new(Inner {
validator_store: self
@@ -94,9 +95,9 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationServiceBuilder<T, E> {
beacon_nodes: self
.beacon_nodes
.ok_or("Cannot build PreparationService without beacon_nodes")?,
context: self
.context
.ok_or("Cannot build PreparationService without runtime_context")?,
executor: self
.executor
.ok_or("Cannot build PreparationService without executor")?,
builder_registration_timestamp_override: self
.builder_registration_timestamp_override,
validator_registration_batch_size: self.validator_registration_batch_size.ok_or(
@@ -109,11 +110,11 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationServiceBuilder<T, E> {
}
/// Helper to minimise `Arc` usage.
pub struct Inner<T, E: EthSpec> {
validator_store: Arc<ValidatorStore<T, E>>,
pub struct Inner<S, T> {
validator_store: Arc<S>,
slot_clock: T,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
context: RuntimeContext<E>,
beacon_nodes: Arc<BeaconNodeFallback<T>>,
executor: TaskExecutor,
builder_registration_timestamp_override: Option<u64>,
// Used to track unpublished validator registration changes.
validator_registration_cache:
@@ -145,11 +146,11 @@ impl From<ValidatorRegistrationData> for ValidatorRegistrationKey {
}
/// Attempts to produce proposer preparations for all known validators at the beginning of each epoch.
pub struct PreparationService<T, E: EthSpec> {
inner: Arc<Inner<T, E>>,
pub struct PreparationService<S, T> {
inner: Arc<Inner<S, T>>,
}
impl<T, E: EthSpec> Clone for PreparationService<T, E> {
impl<S, T> Clone for PreparationService<S, T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
@@ -157,15 +158,15 @@ impl<T, E: EthSpec> Clone for PreparationService<T, E> {
}
}
impl<T, E: EthSpec> Deref for PreparationService<T, E> {
type Target = Inner<T, E>;
impl<S, T> Deref for PreparationService<S, T> {
type Target = Inner<S, T>;
fn deref(&self) -> &Self::Target {
self.inner.deref()
}
}
impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PreparationService<S, T> {
pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> {
self.clone().start_validator_registration_service(spec)?;
self.start_proposer_prepare_service(spec)
@@ -176,7 +177,7 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
let slot_duration = Duration::from_secs(spec.seconds_per_slot);
info!("Proposer preparation service started");
let executor = self.context.executor.clone();
let executor = self.executor.clone();
let spec = spec.clone();
let interval_fut = async move {
@@ -215,7 +216,7 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
let spec = spec.clone();
let slot_duration = Duration::from_secs(spec.seconds_per_slot);
let executor = self.context.executor.clone();
let executor = self.executor.clone();
let validator_registration_fut = async move {
loop {
@@ -243,10 +244,9 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
/// This avoids spamming the BN with preparations before the Bellatrix fork epoch, which may
/// cause errors if it doesn't support the preparation API.
fn should_publish_at_current_slot(&self, spec: &ChainSpec) -> bool {
let current_epoch = self
.slot_clock
.now()
.map_or(E::genesis_epoch(), |slot| slot.epoch(E::slots_per_epoch()));
let current_epoch = self.slot_clock.now().map_or(S::E::genesis_epoch(), |slot| {
slot.epoch(S::E::slots_per_epoch())
});
spec.bellatrix_fork_epoch.is_some_and(|fork_epoch| {
current_epoch + PROPOSER_PREPARATION_LOOKAHEAD_EPOCHS >= fork_epoch
})
@@ -367,7 +367,8 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
// Check if any have changed or it's been `EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION`.
if let Some(slot) = self.slot_clock.now() {
if slot % (E::slots_per_epoch() * EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION) == 0 {
if slot % (S::E::slots_per_epoch() * EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION) == 0
{
self.publish_validator_registration_data(registration_keys)
.await?;
} else if !changed_keys.is_empty() {

View File

@@ -1,15 +1,13 @@
use crate::duties_service::{DutiesService, Error};
use doppelganger_service::DoppelgangerStatus;
use futures::future::join_all;
use logging::crit;
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
use slot_clock::SlotClock;
use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
use std::sync::Arc;
use tracing::{debug, info, warn};
use types::{ChainSpec, EthSpec, PublicKeyBytes, Slot, SyncDuty, SyncSelectionProof, SyncSubnetId};
use validator_store::Error as ValidatorStoreError;
use validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore};
/// Number of epochs in advance to compute selection proofs when not in `distributed` mode.
pub const AGGREGATION_PRE_COMPUTE_EPOCHS: u64 = 2;
@@ -28,12 +26,11 @@ pub const AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED: u64 = 1;
/// 2. One-at-a-time locking. For the innermost locks on the aggregator duties, all of the functions
/// in this file take care to only lock one validator at a time. We never hold a lock while
/// trying to obtain another one (hence no lock ordering issues).
pub struct SyncDutiesMap<E: EthSpec> {
pub struct SyncDutiesMap {
/// Map from sync committee period to duties for members of that sync committee.
committees: RwLock<HashMap<u64, CommitteeDuties>>,
/// Whether we are in `distributed` mode and using reduced lookahead for aggregate pre-compute.
distributed: bool,
_phantom: PhantomData<E>,
}
/// Duties for a single sync committee period.
@@ -81,12 +78,11 @@ pub struct SlotDuties {
pub aggregators: HashMap<SyncSubnetId, Vec<(u64, PublicKeyBytes, SyncSelectionProof)>>,
}
impl<E: EthSpec> SyncDutiesMap<E> {
impl SyncDutiesMap {
pub fn new(distributed: bool) -> Self {
Self {
committees: RwLock::new(HashMap::new()),
distributed,
_phantom: PhantomData,
}
}
@@ -104,7 +100,7 @@ impl<E: EthSpec> SyncDutiesMap<E> {
}
/// Number of slots in advance to compute selection proofs
fn aggregation_pre_compute_slots(&self) -> u64 {
fn aggregation_pre_compute_slots<E: EthSpec>(&self) -> u64 {
if self.distributed {
AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED
} else {
@@ -117,7 +113,7 @@ impl<E: EthSpec> SyncDutiesMap<E> {
/// Return the slot up to which proofs should be pre-computed, as well as a vec of
/// `(previous_pre_compute_slot, sync_duty)` pairs for all validators which need to have proofs
/// computed. See `fill_in_aggregation_proofs` for the actual calculation.
fn prepare_for_aggregator_pre_compute(
fn prepare_for_aggregator_pre_compute<E: EthSpec>(
&self,
committee_period: u64,
current_slot: Slot,
@@ -127,7 +123,7 @@ impl<E: EthSpec> SyncDutiesMap<E> {
current_slot,
first_slot_of_period::<E>(committee_period, spec),
);
let pre_compute_lookahead_slots = self.aggregation_pre_compute_slots();
let pre_compute_lookahead_slots = self.aggregation_pre_compute_slots::<E>();
let pre_compute_slot = std::cmp::min(
current_slot + pre_compute_lookahead_slots,
last_slot_of_period::<E>(committee_period, spec),
@@ -187,7 +183,7 @@ impl<E: EthSpec> SyncDutiesMap<E> {
/// Get duties for all validators for the given `wall_clock_slot`.
///
/// This is the entry-point for the sync committee service.
pub fn get_duties_for_slot(
pub fn get_duties_for_slot<E: EthSpec>(
&self,
wall_clock_slot: Slot,
spec: &ChainSpec,
@@ -284,16 +280,16 @@ fn last_slot_of_period<E: EthSpec>(sync_committee_period: u64, spec: &ChainSpec)
first_slot_of_period::<E>(sync_committee_period + 1, spec) - 1
}
pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
) -> Result<(), Error> {
pub async fn poll_sync_committee_duties<S: ValidatorStore + 'static, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
) -> Result<(), Error<S::Error>> {
let sync_duties = &duties_service.sync_duties;
let spec = &duties_service.spec;
let current_slot = duties_service
.slot_clock
.now()
.ok_or(Error::UnableToReadSlotClock)?;
let current_epoch = current_slot.epoch(E::slots_per_epoch());
let current_epoch = current_slot.epoch(S::E::slots_per_epoch());
// If the Altair fork is yet to be activated, do not attempt to poll for duties.
if spec
@@ -317,10 +313,8 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
let local_indices = {
let mut local_indices = Vec::with_capacity(local_pubkeys.len());
let vals_ref = duties_service.validator_store.initialized_validators();
let vals = vals_ref.read();
for &pubkey in &local_pubkeys {
if let Some(validator_index) = vals.get_index(&pubkey) {
if let Some(validator_index) = duties_service.validator_store.validator_index(&pubkey) {
local_indices.push(validator_index)
}
}
@@ -342,11 +336,15 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
// Pre-compute aggregator selection proofs for the current period.
let (current_pre_compute_slot, new_pre_compute_duties) = sync_duties
.prepare_for_aggregator_pre_compute(current_sync_committee_period, current_slot, spec);
.prepare_for_aggregator_pre_compute::<S::E>(
current_sync_committee_period,
current_slot,
spec,
);
if !new_pre_compute_duties.is_empty() {
let sub_duties_service = duties_service.clone();
duties_service.context.executor.spawn(
duties_service.executor.spawn(
async move {
fill_in_aggregation_proofs(
sub_duties_service,
@@ -379,18 +377,22 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
}
// Pre-compute aggregator selection proofs for the next period.
let aggregate_pre_compute_lookahead_slots = sync_duties.aggregation_pre_compute_slots();
let aggregate_pre_compute_lookahead_slots = sync_duties.aggregation_pre_compute_slots::<S::E>();
if (current_slot + aggregate_pre_compute_lookahead_slots)
.epoch(E::slots_per_epoch())
.epoch(S::E::slots_per_epoch())
.sync_committee_period(spec)?
== next_sync_committee_period
{
let (pre_compute_slot, new_pre_compute_duties) = sync_duties
.prepare_for_aggregator_pre_compute(next_sync_committee_period, current_slot, spec);
.prepare_for_aggregator_pre_compute::<S::E>(
next_sync_committee_period,
current_slot,
spec,
);
if !new_pre_compute_duties.is_empty() {
let sub_duties_service = duties_service.clone();
duties_service.context.executor.spawn(
duties_service.executor.spawn(
async move {
fill_in_aggregation_proofs(
sub_duties_service,
@@ -409,11 +411,11 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
Ok(())
}
pub async fn poll_sync_committee_duties_for_period<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
pub async fn poll_sync_committee_duties_for_period<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
local_indices: &[u64],
sync_committee_period: u64,
) -> Result<(), Error> {
) -> Result<(), Error<S::Error>> {
let spec = &duties_service.spec;
// no local validators don't need to poll for sync committee
@@ -496,8 +498,8 @@ pub async fn poll_sync_committee_duties_for_period<T: SlotClock + 'static, E: Et
Ok(())
}
pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
duties_service: Arc<DutiesService<T, E>>,
pub async fn fill_in_aggregation_proofs<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: Arc<DutiesService<S, T>>,
pre_compute_duties: &[(Slot, SyncDuty)],
sync_committee_period: u64,
current_slot: Slot,
@@ -519,7 +521,7 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
continue;
}
let subnet_ids = match duty.subnet_ids::<E>() {
let subnet_ids = match duty.subnet_ids::<S::E>() {
Ok(subnet_ids) => subnet_ids,
Err(e) => {
crit!(
@@ -564,7 +566,7 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
}
};
match proof.is_aggregator::<E>() {
match proof.is_aggregator::<S::E>() {
Ok(true) => {
debug!(
validator_index = duty.validator_index,

View File

@@ -1,6 +1,5 @@
use crate::duties_service::DutiesService;
use beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use environment::RuntimeContext;
use eth2::types::BlockId;
use futures::future::join_all;
use futures::future::FutureExt;
@@ -10,6 +9,7 @@ use std::collections::HashMap;
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::time::{sleep, sleep_until, Duration, Instant};
use tracing::{debug, error, info, trace, warn};
use types::{
@@ -20,11 +20,11 @@ use validator_store::{Error as ValidatorStoreError, ValidatorStore};
pub const SUBSCRIPTION_LOOKAHEAD_EPOCHS: u64 = 4;
pub struct SyncCommitteeService<T: SlotClock + 'static, E: EthSpec> {
inner: Arc<Inner<T, E>>,
pub struct SyncCommitteeService<S: ValidatorStore, T: SlotClock + 'static> {
inner: Arc<Inner<S, T>>,
}
impl<T: SlotClock + 'static, E: EthSpec> Clone for SyncCommitteeService<T, E> {
impl<S: ValidatorStore, T: SlotClock + 'static> Clone for SyncCommitteeService<S, T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
@@ -32,33 +32,33 @@ impl<T: SlotClock + 'static, E: EthSpec> Clone for SyncCommitteeService<T, E> {
}
}
impl<T: SlotClock + 'static, E: EthSpec> Deref for SyncCommitteeService<T, E> {
type Target = Inner<T, E>;
impl<S: ValidatorStore, T: SlotClock + 'static> Deref for SyncCommitteeService<S, T> {
type Target = Inner<S, T>;
fn deref(&self) -> &Self::Target {
self.inner.deref()
}
}
pub struct Inner<T: SlotClock + 'static, E: EthSpec> {
duties_service: Arc<DutiesService<T, E>>,
validator_store: Arc<ValidatorStore<T, E>>,
pub struct Inner<S: ValidatorStore, T: SlotClock + 'static> {
duties_service: Arc<DutiesService<S, T>>,
validator_store: Arc<S>,
slot_clock: T,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
context: RuntimeContext<E>,
beacon_nodes: Arc<BeaconNodeFallback<T>>,
executor: TaskExecutor,
/// Boolean to track whether the service has posted subscriptions to the BN at least once.
///
/// This acts as a latch that fires once upon start-up, and then never again.
first_subscription_done: AtomicBool,
}
impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S, T> {
pub fn new(
duties_service: Arc<DutiesService<T, E>>,
validator_store: Arc<ValidatorStore<T, E>>,
duties_service: Arc<DutiesService<S, T>>,
validator_store: Arc<S>,
slot_clock: T,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
context: RuntimeContext<E>,
beacon_nodes: Arc<BeaconNodeFallback<T>>,
executor: TaskExecutor,
) -> Self {
Self {
inner: Arc::new(Inner {
@@ -66,7 +66,7 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
validator_store,
slot_clock,
beacon_nodes,
context,
executor,
first_subscription_done: AtomicBool::new(false),
}),
}
@@ -80,7 +80,7 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
.spec
.altair_fork_epoch
.and_then(|fork_epoch| {
let current_epoch = self.slot_clock.now()?.epoch(E::slots_per_epoch());
let current_epoch = self.slot_clock.now()?.epoch(S::E::slots_per_epoch());
Some(current_epoch >= fork_epoch)
})
.unwrap_or(false)
@@ -103,7 +103,7 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
"Sync committee service started"
);
let executor = self.context.executor.clone();
let executor = self.executor.clone();
let interval_fut = async move {
loop {
@@ -156,7 +156,7 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
let Some(slot_duties) = self
.duties_service
.sync_duties
.get_duties_for_slot(slot, &self.duties_service.spec)
.get_duties_for_slot::<S::E>(slot, &self.duties_service.spec)
else {
debug!("No duties known for slot {}", slot);
return Ok(());
@@ -202,7 +202,7 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
// Spawn one task to publish all of the sync committee signatures.
let validator_duties = slot_duties.duties;
let service = self.clone();
self.inner.context.executor.spawn(
self.inner.executor.spawn(
async move {
service
.publish_sync_committee_signatures(slot, block_root, validator_duties)
@@ -214,7 +214,7 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
let aggregators = slot_duties.aggregators;
let service = self.clone();
self.inner.context.executor.spawn(
self.inner.executor.spawn(
async move {
service
.publish_sync_committee_aggregates(
@@ -316,7 +316,7 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
) {
for (subnet_id, subnet_aggregators) in aggregators {
let service = self.clone();
self.inner.context.executor.spawn(
self.inner.executor.spawn(
async move {
service
.publish_sync_committee_aggregate_for_subnet(
@@ -354,7 +354,7 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
};
beacon_node
.get_validator_sync_committee_contribution::<E>(&sync_contribution_data)
.get_validator_sync_committee_contribution(&sync_contribution_data)
.await
})
.await
@@ -440,7 +440,7 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
fn spawn_subscription_tasks(&self) {
let service = self.clone();
self.inner.context.executor.spawn(
self.inner.executor.spawn(
async move {
service.publish_subscriptions().await.unwrap_or_else(|e| {
error!(
@@ -463,10 +463,10 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
// At the start of every epoch during the current period, re-post the subscriptions
// to the beacon node. This covers the case where the BN has forgotten the subscriptions
// due to a restart, or where the VC has switched to a fallback BN.
let current_period = sync_period_of_slot::<E>(slot, spec)?;
let current_period = sync_period_of_slot::<S::E>(slot, spec)?;
if !self.first_subscription_done.load(Ordering::Relaxed)
|| slot.as_u64() % E::slots_per_epoch() == 0
|| slot.as_u64() % S::E::slots_per_epoch() == 0
{
duty_slots.push((slot, current_period));
}
@@ -474,9 +474,9 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
// Near the end of the current period, push subscriptions for the next period to the
// beacon node. We aggressively push every slot in the lead-up, as this is the main way
// that we want to ensure that the BN is subscribed (well in advance).
let lookahead_slot = slot + SUBSCRIPTION_LOOKAHEAD_EPOCHS * E::slots_per_epoch();
let lookahead_slot = slot + SUBSCRIPTION_LOOKAHEAD_EPOCHS * S::E::slots_per_epoch();
let lookahead_period = sync_period_of_slot::<E>(lookahead_slot, spec)?;
let lookahead_period = sync_period_of_slot::<S::E>(lookahead_slot, spec)?;
if lookahead_period > current_period {
duty_slots.push((lookahead_slot, lookahead_period));
@@ -494,7 +494,7 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
match self
.duties_service
.sync_duties
.get_duties_for_slot(duty_slot, spec)
.get_duties_for_slot::<S::E>(duty_slot, spec)
{
Some(duties) => subscriptions.extend(subscriptions_from_sync_duties(
duties.duties,

View File

@@ -4,21 +4,6 @@ version = "0.1.0"
edition = { workspace = true }
authors = ["Sigma Prime <contact@sigmaprime.io>"]
[lib]
name = "validator_store"
path = "src/lib.rs"
[dependencies]
account_utils = { workspace = true }
doppelganger_service = { workspace = true }
initialized_validators = { workspace = true }
logging = { workspace = true }
parking_lot = { workspace = true }
serde = { workspace = true }
signing_method = { workspace = true }
slashing_protection = { workspace = true }
slot_clock = { workspace = true }
task_executor = { workspace = true }
tracing = { workspace = true }
types = { workspace = true }
validator_metrics = { workspace = true }

File diff suppressed because it is too large Load Diff