Tidy signing, reduce ForkService duplication

This commit is contained in:
Paul Hauner
2019-11-23 17:02:39 +11:00
parent b9967048ea
commit f76f97a3fd
13 changed files with 119 additions and 153 deletions

View File

@@ -1,6 +1,4 @@
use crate::{
duties_service::DutiesService, fork_service::ForkService, validator_store::ValidatorStore,
};
use crate::{duties_service::DutiesService, validator_store::ValidatorStore};
use environment::RuntimeContext;
use exit_future::Signal;
use futures::{Future, Stream};
@@ -11,16 +9,15 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::timer::Interval;
use types::{ChainSpec, CommitteeIndex, EthSpec, Fork, Slot};
use types::{ChainSpec, CommitteeIndex, EthSpec, Slot};
/// Delay this period of time after the slot starts. This allows the node to process the new slot.
const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(100);
#[derive(Clone)]
pub struct AttestationServiceBuilder<T: Clone, E: EthSpec> {
fork_service: Option<ForkService<T, E>>,
duties_service: Option<DutiesService<T, E>>,
validator_store: Option<ValidatorStore<E>>,
validator_store: Option<ValidatorStore<T, E>>,
slot_clock: Option<T>,
beacon_node: Option<RemoteBeaconNode<E>>,
context: Option<RuntimeContext<E>>,
@@ -30,7 +27,6 @@ pub struct AttestationServiceBuilder<T: Clone, E: EthSpec> {
impl<T: SlotClock + Clone + 'static, E: EthSpec> AttestationServiceBuilder<T, E> {
pub fn new() -> Self {
Self {
fork_service: None,
duties_service: None,
validator_store: None,
slot_clock: None,
@@ -39,17 +35,12 @@ impl<T: SlotClock + Clone + 'static, E: EthSpec> AttestationServiceBuilder<T, E>
}
}
pub fn fork_service(mut self, service: ForkService<T, E>) -> Self {
self.fork_service = Some(service);
self
}
pub fn duties_service(mut self, service: DutiesService<T, E>) -> Self {
self.duties_service = Some(service);
self
}
pub fn validator_store(mut self, store: ValidatorStore<E>) -> Self {
pub fn validator_store(mut self, store: ValidatorStore<T, E>) -> Self {
self.validator_store = Some(store);
self
}
@@ -72,9 +63,6 @@ impl<T: SlotClock + Clone + 'static, E: EthSpec> AttestationServiceBuilder<T, E>
pub fn build(self) -> Result<AttestationService<T, E>, String> {
Ok(AttestationService {
inner: Arc::new(Inner {
fork_service: self
.fork_service
.ok_or_else(|| "Cannot build AttestationService without fork_service")?,
duties_service: self
.duties_service
.ok_or_else(|| "Cannot build AttestationService without duties_service")?,
@@ -97,8 +85,7 @@ impl<T: SlotClock + Clone + 'static, E: EthSpec> AttestationServiceBuilder<T, E>
pub struct Inner<T: Clone, E: EthSpec> {
duties_service: DutiesService<T, E>,
fork_service: ForkService<T, E>,
validator_store: ValidatorStore<E>,
validator_store: ValidatorStore<T, E>,
slot_clock: T,
beacon_node: RemoteBeaconNode<E>,
context: RuntimeContext<E>,
@@ -178,10 +165,6 @@ impl<T: SlotClock + Clone + 'static, E: EthSpec> AttestationService<T, E> {
.slot_clock
.now()
.ok_or_else(|| "Failed to read slot clock".to_string())?;
let fork = inner
.fork_service
.fork()
.ok_or_else(|| "Failed to get Fork".to_string())?;
let mut committee_indices: HashMap<CommitteeIndex, Vec<ValidatorDuty>> = HashMap::new();
@@ -206,7 +189,6 @@ impl<T: SlotClock + Clone + 'static, E: EthSpec> AttestationService<T, E> {
slot,
committee_index,
validator_duties,
fork.clone(),
));
});
@@ -218,7 +200,6 @@ impl<T: SlotClock + Clone + 'static, E: EthSpec> AttestationService<T, E> {
slot: Slot,
committee_index: CommitteeIndex,
validator_duties: Vec<ValidatorDuty>,
fork: Fork,
) -> impl Future<Item = (), Error = ()> {
let inner_1 = self.inner.clone();
let inner_2 = self.inner.clone();
@@ -250,7 +231,6 @@ impl<T: SlotClock + Clone + 'static, E: EthSpec> AttestationService<T, E> {
&duty.validator_pubkey,
validator_committee_position,
attestation,
&fork,
)
.ok_or_else(|| "Unable to sign attestation".to_string())
} else {

View File

@@ -1,6 +1,4 @@
use crate::{
duties_service::DutiesService, fork_service::ForkService, validator_store::ValidatorStore,
};
use crate::{duties_service::DutiesService, validator_store::ValidatorStore};
use environment::RuntimeContext;
use exit_future::Signal;
use futures::{stream, Future, IntoFuture, Stream};
@@ -17,9 +15,8 @@ const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(100);
#[derive(Clone)]
pub struct BlockServiceBuilder<T: Clone, E: EthSpec> {
fork_service: Option<ForkService<T, E>>,
duties_service: Option<DutiesService<T, E>>,
validator_store: Option<ValidatorStore<E>>,
validator_store: Option<ValidatorStore<T, E>>,
slot_clock: Option<Arc<T>>,
beacon_node: Option<RemoteBeaconNode<E>>,
context: Option<RuntimeContext<E>>,
@@ -29,7 +26,6 @@ pub struct BlockServiceBuilder<T: Clone, E: EthSpec> {
impl<T: SlotClock + Clone + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
pub fn new() -> Self {
Self {
fork_service: None,
duties_service: None,
validator_store: None,
slot_clock: None,
@@ -38,17 +34,12 @@ impl<T: SlotClock + Clone + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
}
}
pub fn fork_service(mut self, service: ForkService<T, E>) -> Self {
self.fork_service = Some(service);
self
}
pub fn duties_service(mut self, service: DutiesService<T, E>) -> Self {
self.duties_service = Some(service);
self
}
pub fn validator_store(mut self, store: ValidatorStore<E>) -> Self {
pub fn validator_store(mut self, store: ValidatorStore<T, E>) -> Self {
self.validator_store = Some(store);
self
}
@@ -70,9 +61,6 @@ impl<T: SlotClock + Clone + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
pub fn build(self) -> Result<BlockService<T, E>, String> {
Ok(BlockService {
fork_service: self
.fork_service
.ok_or_else(|| "Cannot build BlockService without fork_service")?,
duties_service: self
.duties_service
.ok_or_else(|| "Cannot build BlockService without duties_service")?,
@@ -95,8 +83,7 @@ impl<T: SlotClock + Clone + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
#[derive(Clone)]
pub struct BlockService<T: Clone, E: EthSpec> {
duties_service: DutiesService<T, E>,
fork_service: ForkService<T, E>,
validator_store: ValidatorStore<E>,
validator_store: ValidatorStore<T, E>,
slot_clock: Arc<T>,
beacon_node: RemoteBeaconNode<E>,
context: RuntimeContext<E>,
@@ -167,29 +154,17 @@ impl<T: SlotClock + Clone + 'static, E: EthSpec> BlockService<T, E> {
let service_3 = service.clone();
block_producers.next().map(move |validator_pubkey| {
service_2
.fork_service
.fork()
.ok_or_else(|| "Fork is unknown, unable to sign".to_string())
.and_then(|fork| {
service_1
.validator_store
.randao_reveal(
&validator_pubkey,
slot.epoch(E::slots_per_epoch()),
&fork,
)
.map(|randao_reveal| (fork, randao_reveal))
.ok_or_else(|| "Unable to produce randao reveal".to_string())
})
service_1
.validator_store
.randao_reveal(&validator_pubkey, slot.epoch(E::slots_per_epoch()))
.ok_or_else(|| "Unable to produce randao reveal".to_string())
.into_future()
.and_then(move |(fork, randao_reveal)| {
.and_then(move |randao_reveal| {
service_1
.beacon_node
.http
.validator()
.produce_block(slot, randao_reveal)
.map(|block| (fork, block))
.map_err(|e| {
format!(
"Error from beacon node when producing block: {:?}",
@@ -197,10 +172,10 @@ impl<T: SlotClock + Clone + 'static, E: EthSpec> BlockService<T, E> {
)
})
})
.and_then(move |(fork, block)| {
.and_then(move |block| {
service_2
.validator_store
.sign_block(&validator_pubkey, block, &fork)
.sign_block(&validator_pubkey, block)
.ok_or_else(|| "Unable to sign block".to_string())
})
.and_then(move |block| {

View File

@@ -124,7 +124,7 @@ impl DutiesStore {
#[derive(Clone)]
pub struct DutiesServiceBuilder<T: Clone, E: EthSpec> {
store: Option<Arc<DutiesStore>>,
validator_store: Option<ValidatorStore<E>>,
validator_store: Option<ValidatorStore<T, E>>,
slot_clock: Option<Arc<T>>,
beacon_node: Option<RemoteBeaconNode<E>>,
context: Option<RuntimeContext<E>>,
@@ -142,7 +142,7 @@ impl<T: SlotClock + Clone + 'static, E: EthSpec> DutiesServiceBuilder<T, E> {
}
}
pub fn validator_store(mut self, store: ValidatorStore<E>) -> Self {
pub fn validator_store(mut self, store: ValidatorStore<T, E>) -> Self {
self.validator_store = Some(store);
self
}
@@ -184,7 +184,7 @@ impl<T: SlotClock + Clone + 'static, E: EthSpec> DutiesServiceBuilder<T, E> {
#[derive(Clone)]
pub struct DutiesService<T: Clone, E: EthSpec> {
store: Arc<DutiesStore>,
validator_store: ValidatorStore<E>,
validator_store: ValidatorStore<T, E>,
slot_clock: Arc<T>,
beacon_node: RemoteBeaconNode<E>,
context: RuntimeContext<E>,

View File

@@ -120,26 +120,35 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
Duration::from_millis(context.eth2_config.spec.milliseconds_per_slot),
);
let validator_store: ValidatorStore<T> = match &config.key_source {
// Load pre-existing validators from the data dir.
//
// Use the `account_manager` to generate these files.
KeySource::Disk => ValidatorStore::load_from_disk(
config.data_dir.clone(),
context.eth2_config.spec.clone(),
log_3.clone(),
)?,
// Generate ephemeral insecure keypairs for testing purposes.
//
// Do not use in production.
KeySource::TestingKeypairRange(range) => {
ValidatorStore::insecure_ephemeral_validators(
range.clone(),
let fork_service = ForkServiceBuilder::new()
.slot_clock(slot_clock.clone())
.beacon_node(beacon_node.clone())
.runtime_context(context.service_context("fork"))
.build()?;
let validator_store: ValidatorStore<SystemTimeSlotClock, T> =
match &config.key_source {
// Load pre-existing validators from the data dir.
//
// Use the `account_manager` to generate these files.
KeySource::Disk => ValidatorStore::load_from_disk(
config.data_dir.clone(),
context.eth2_config.spec.clone(),
fork_service.clone(),
log_3.clone(),
)?
}
};
)?,
// Generate ephemeral insecure keypairs for testing purposes.
//
// Do not use in production.
KeySource::TestingKeypairRange(range) => {
ValidatorStore::insecure_ephemeral_validators(
range.clone(),
context.eth2_config.spec.clone(),
fork_service.clone(),
log_3.clone(),
)?
}
};
info!(
log_3,
@@ -154,15 +163,8 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
.runtime_context(context.service_context("duties"))
.build()?;
let fork_service = ForkServiceBuilder::new()
.slot_clock(slot_clock.clone())
.beacon_node(beacon_node.clone())
.runtime_context(context.service_context("fork"))
.build()?;
let block_service = BlockServiceBuilder::new()
.duties_service(duties_service.clone())
.fork_service(fork_service.clone())
.slot_clock(slot_clock.clone())
.validator_store(validator_store.clone())
.beacon_node(beacon_node.clone())
@@ -171,7 +173,6 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
let attestation_service = AttestationServiceBuilder::new()
.duties_service(duties_service.clone())
.fork_service(fork_service.clone())
.slot_clock(slot_clock)
.validator_store(validator_store)
.beacon_node(beacon_node)

View File

@@ -1,7 +1,9 @@
use crate::fork_service::ForkService;
use crate::validator_directory::{ValidatorDirectory, ValidatorDirectoryBuilder};
use parking_lot::RwLock;
use rayon::prelude::*;
use slog::{error, Logger};
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::fs::read_dir;
use std::iter::FromIterator;
@@ -10,22 +12,28 @@ use std::ops::Range;
use std::path::PathBuf;
use std::sync::Arc;
use tempdir::TempDir;
use tree_hash::{SignedRoot, TreeHash};
use tree_hash::TreeHash;
use types::{
Attestation, BeaconBlock, ChainSpec, Domain, Epoch, EthSpec, Fork, PublicKey, Signature,
};
#[derive(Clone)]
pub struct ValidatorStore<E> {
pub struct ValidatorStore<T, E: EthSpec> {
validators: Arc<RwLock<HashMap<PublicKey, ValidatorDirectory>>>,
spec: Arc<ChainSpec>,
log: Logger,
temp_dir: Option<Arc<TempDir>>,
fork_service: ForkService<T, E>,
_phantom: PhantomData<E>,
}
impl<E: EthSpec> ValidatorStore<E> {
pub fn load_from_disk(base_dir: PathBuf, spec: ChainSpec, log: Logger) -> Result<Self, String> {
impl<T: SlotClock + Clone + 'static, E: EthSpec> ValidatorStore<T, E> {
pub fn load_from_disk(
base_dir: PathBuf,
spec: ChainSpec,
fork_service: ForkService<T, E>,
log: Logger,
) -> Result<Self, String> {
let validator_iter = read_dir(&base_dir)
.map_err(|e| format!("Failed to read base directory: {:?}", e))?
.filter_map(|validator_dir| {
@@ -60,6 +68,7 @@ impl<E: EthSpec> ValidatorStore<E> {
spec: Arc::new(spec),
log,
temp_dir: None,
fork_service,
_phantom: PhantomData,
})
}
@@ -67,6 +76,7 @@ impl<E: EthSpec> ValidatorStore<E> {
pub fn insecure_ephemeral_validators(
range: Range<usize>,
spec: ChainSpec,
fork_service: ForkService<T, E>,
log: Logger,
) -> Result<Self, String> {
let temp_dir = TempDir::new("insecure_validator")
@@ -100,6 +110,7 @@ impl<E: EthSpec> ValidatorStore<E> {
spec: Arc::new(spec),
log,
temp_dir: Some(Arc::new(temp_dir)),
fork_service,
_phantom: PhantomData,
})
}
@@ -116,22 +127,27 @@ impl<E: EthSpec> ValidatorStore<E> {
self.validators.read().len()
}
pub fn randao_reveal(
&self,
validator_pubkey: &PublicKey,
epoch: Epoch,
fork: &Fork,
) -> Option<Signature> {
fn fork(&self) -> Option<Fork> {
if self.fork_service.fork().is_none() {
error!(
self.log,
"Unable to get Fork for signing";
);
}
self.fork_service.fork()
}
pub fn randao_reveal(&self, validator_pubkey: &PublicKey, epoch: Epoch) -> Option<Signature> {
// TODO: check this against the slot clock to make sure it's not an early reveal?
self.validators
.read()
.get(validator_pubkey)
.and_then(|validator_dir| {
validator_dir.voting_keypair.as_ref().map(|voting_keypair| {
let message = epoch.tree_hash_root();
let domain = self.spec.get_domain(epoch, Domain::Randao, &fork);
Signature::new(&message, domain, &voting_keypair.sk)
})
let voting_keypair = validator_dir.voting_keypair.as_ref()?;
let message = epoch.tree_hash_root();
let domain = self.spec.get_domain(epoch, Domain::Randao, &self.fork()?);
Some(Signature::new(&message, domain, &voting_keypair.sk))
})
}
@@ -139,20 +155,15 @@ impl<E: EthSpec> ValidatorStore<E> {
&self,
validator_pubkey: &PublicKey,
mut block: BeaconBlock<E>,
fork: &Fork,
) -> Option<BeaconBlock<E>> {
// TODO: check for slashing.
self.validators
.read()
.get(validator_pubkey)
.and_then(|validator_dir| {
validator_dir.voting_keypair.as_ref().map(|voting_keypair| {
let epoch = block.slot.epoch(E::slots_per_epoch());
let message = block.signed_root();
let domain = self.spec.get_domain(epoch, Domain::BeaconProposer, &fork);
block.signature = Signature::new(&message, domain, &voting_keypair.sk);
block
})
let voting_keypair = validator_dir.voting_keypair.as_ref()?;
block.sign(&voting_keypair.sk, &self.fork()?, &self.spec);
Some(block)
})
}
@@ -161,34 +172,31 @@ impl<E: EthSpec> ValidatorStore<E> {
validator_pubkey: &PublicKey,
validator_committee_position: usize,
mut attestation: Attestation<E>,
fork: &Fork,
) -> Option<Attestation<E>> {
// TODO: check for slashing.
self.validators
.read()
.get(validator_pubkey)
.and_then(|validator_dir| {
validator_dir
.voting_keypair
.as_ref()
.and_then(|voting_keypair| {
attestation
.sign(
&voting_keypair.sk,
validator_committee_position,
fork,
&self.spec,
)
.map_err(|e| {
error!(
self.log,
"Error whilst signing attestation";
"error" => format!("{:?}", e)
)
})
.map(|()| attestation)
.ok()
let voting_keypair = validator_dir.voting_keypair.as_ref()?;
attestation
.sign(
&voting_keypair.sk,
validator_committee_position,
&self.fork()?,
&self.spec,
)
.map_err(|e| {
error!(
self.log,
"Error whilst signing attestation";
"error" => format!("{:?}", e)
)
})
.ok()?;
Some(attestation)
})
}
}