Merged Age's changes and ripped out heaps of now obsolete stuff in the validator client.

- Replaced most instances of PublicKey with KeyPair, since they need to be passed into each validator thread now.
 - Pulled out a bunch of FreeAttestations, and replaced with regular Attestations (as per Paul's suggestion)
 - Started generalising pubkeys to 'signers' (though they are still just Keypairs)
 - Added validator_index into a few structs where relevant
 - Removed the SlotClock and DutiesReader from the BlockProducer and Attester services, since this logic is now abstracted to the higher level process.
 - Added a Hash trait to the Keypair (rather than just pubkey) which assumes the Pubkey uniquely defines it.
This commit is contained in:
Luke Anderson
2019-03-28 15:50:57 +11:00
53 changed files with 3198 additions and 616 deletions

View File

@@ -19,6 +19,7 @@ use protos::services_grpc::{
use slog::{debug, error, info, warn};
use slot_clock::{SlotClock, SystemTimeSlotClock};
use std::sync::Arc;
use std::sync::RwLock;
use std::time::{Duration, Instant, SystemTime};
use tokio::prelude::*;
use tokio::runtime::Builder;
@@ -40,12 +41,14 @@ pub struct Service {
chain_id: u16,
/// The fork state we processing on.
fork: Fork,
/// The slot clock keeping track of time.
slot_clock: Arc<SystemTimeSlotClock>,
/// The slot clock for this service.
slot_clock: SystemTimeSlotClock,
/// The current slot we are processing.
current_slot: Slot,
/// Duration until the next slot. This is used for initializing the tokio timer interval.
duration_to_next_slot: Duration,
/// The number of slots per epoch to allow for converting slots to epochs.
slots_per_epoch: u64,
// GRPC Clients
/// The beacon block GRPC client.
beacon_block_client: Arc<BeaconBlockServiceClient>,
@@ -77,7 +80,7 @@ impl Service {
// retrieve node information
let node_info = loop {
let info = match beacon_node_client.info(&Empty::new()) {
match beacon_node_client.info(&Empty::new()) {
Err(e) => {
warn!(log, "Could not connect to node. Error: {}", e);
info!(log, "Retrying in 5 seconds...");
@@ -118,13 +121,6 @@ impl Service {
epoch: Epoch::from(proto_fork.get_epoch()),
};
// build the validator slot clock
let slot_clock = {
let clock = SystemTimeSlotClock::new(genesis_time, config.spec.seconds_per_slot)
.expect("Unable to instantiate SystemTimeSlotClock.");
Arc::new(clock)
};
// initialize the RPC clients
// Beacon node gRPC beacon block endpoints.
@@ -145,6 +141,10 @@ impl Service {
Arc::new(AttestationServiceClient::new(ch))
};
// build the validator slot clock
let slot_clock = SystemTimeSlotClock::new(genesis_time, config.spec.seconds_per_slot)
.expect("Unable to instantiate SystemTimeSlotClock.");
let current_slot = slot_clock
.present_slot()
.map_err(|e| ErrorKind::SlotClockError(e))?
@@ -182,6 +182,7 @@ impl Service {
slot_clock,
current_slot,
duration_to_next_slot,
slots_per_epoch: config.spec.slots_per_epoch,
beacon_block_client,
validator_client,
attester_client,
@@ -192,7 +193,7 @@ impl Service {
/// Initialise the service then run the core thread.
pub fn start(config: ValidatorConfig, log: slog::Logger) -> error_chain::Result<()> {
// connect to the node and retrieve its properties and initialize the gRPC clients
let service = Service::initialize_service(&config, log)?;
let service = Service::initialize_service(&config, log.clone())?;
// we have connected to a node and established its parameters. Spin up the core service
@@ -214,77 +215,140 @@ impl Service {
)
};
// kick off core service
/* kick off core service */
// generate keypairs
// TODO: keypairs are randomly generated; they should be loaded from a file or generated.
// https://github.com/sigp/lighthouse/issues/160
let keypairs = match config.fetch_keys(&log) {
let keypairs = match config.fetch_keys(&log.clone()) {
Some(kps) => kps,
None => panic!("No key pairs found, cannot start validator client without. Try running ./account_manager generate first.")
};
// build requisite objects to pass to core thread.
let duties_map = Arc::new(EpochDutiesMap::new(config.spec.slots_per_epoch));
let epoch_map_for_attester = Arc::new(EpochMap::new(config.spec.slots_per_epoch));
let manager = DutiesManager {
/* build requisite objects to pass to core thread */
// Builds a mapping of Epoch -> Map(Keypair, EpochDuty)
// where EpochDuty contains slot numbers and attestation data that each validator needs to
// produce work on.
let duties_map = RwLock::new(EpochDutiesMap::new(config.spec.slots_per_epoch));
// builds a manager which maintains the list of current duties for all known validators
// and can check when a validator needs to perform a task.
let manager = Arc::new(DutiesManager {
duties_map,
pubkeys: keypairs.iter().map(|keypair| keypair.pk.clone()).collect(),
spec: Arc::new(config.spec),
slot_clock: service.slot_clock.clone(),
signers: keypairs,
beacon_node: service.validator_client.clone(),
};
});
// run the core thread
runtime
.block_on(interval.for_each(move |_| {
// get the current slot
let log = service.log.clone();
/* get the current slot and epoch */
let current_slot = match service.slot_clock.present_slot() {
Err(e) => {
error!(service.log, "SystemTimeError {:?}", e);
error!(log, "SystemTimeError {:?}", e);
return Ok(());
}
Ok(slot) => slot.expect("Genesis is in the future"),
};
let current_epoch = current_slot.epoch(service.slots_per_epoch);
debug_assert!(
current_slot > service.current_slot,
"The Timer should poll a new slot"
);
info!(service.log, "Processing slot: {}", current_slot.as_u64());
info!(log, "Processing slot: {}", current_slot.as_u64());
// check for new duties
// TODO: Convert to its own thread
match manager.update(current_slot) {
Err(error) => {
error!(service.log, "Epoch duties poll error"; "error" => format!("{:?}", error))
/* check for new duties */
let cloned_log = log.clone();
let cloned_manager = manager.clone();
tokio::spawn(futures::future::poll_fn(move || {
cloned_manager.run_update(current_epoch.clone(), cloned_log.clone())
}));
/* execute any specified duties */
if let Some(work) = manager.get_current_work(current_slot) {
for (keypair, work_type) in work {
if work_type.produce_block {
// TODO: Produce a beacon block in a new thread
}
if work_type.attestation_duty.is_some() {
// available AttestationDuty info
let attestation_duty =
work_type.attestation_duty.expect("Cannot be None");
let attester_grpc_client =
Arc::new(
AttestationGrpcClient::new(
service.attester_client.clone()
)
);
let signer = Arc::new(AttesterLocalSigner::new(keypair.clone()));
let attester =
Attester::new(
attester_grpc_client,
signer);
let mut attester_service = AttesterService {
attester,
poll_interval_millis: POLL_INTERVAL_MILLIS,
log: log.clone(),
};
attester_service.run();
}
}
Ok(UpdateOutcome::NoChange(epoch)) => {
debug!(service.log, "No change in duties"; "epoch" => epoch)
}
Ok(UpdateOutcome::DutiesChanged(epoch, duties)) => {
info!(service.log, "Duties changed (potential re-org)"; "epoch" => epoch, "duties" => format!("{:?}", duties))
}
Ok(UpdateOutcome::NewDuties(epoch, duties)) => {
info!(service.log, "New duties obtained"; "epoch" => epoch, "duties" => format!("{:?}", duties))
}
Ok(UpdateOutcome::UnknownValidatorOrEpoch(epoch)) => {
error!(service.log, "Epoch or validator unknown"; "epoch" => epoch)
}
};
}
Ok(())
}))
.map_err(|e| format!("Service thread failed: {:?}", e))?;
// completed a slot process
Ok(())
}
/*
for keypair in keypairs {
info!(self.log, "Starting validator services"; "validator" => keypair.pk.concatenated_hex_id());
// Spawn a new thread to maintain the validator's `EpochDuties`.
let duties_manager_thread = {
let spec = spec.clone();
let duties_map = duties_map.clone();
let slot_clock = self.slot_clock.clone();
let log = self.log.clone();
let beacon_node = self.validator_client.clone();
let pubkey = keypair.pk.clone();
thread::spawn(move || {
let manager = DutiesManager {
duties_map,
pubkey,
spec,
slot_clock,
beacon_node,
};
let mut duties_manager_service = DutiesManagerService {
manager,
poll_interval_millis,
log,
};
duties_manager_service.run();
})
};
let mut threads = vec![];
for keypair in keypairs {
info!(log, "Starting validator services"; "validator" => keypair.pk.concatenated_hex_id());
/*
// Spawn a new thread to maintain the validator's `EpochDuties`.
let duties_manager_thread = {
let spec = spec.clone();
@@ -331,7 +395,6 @@ impl Service {
block_producer_service.run();
})
};
*/
// Spawn a new thread for attestation for the validator.
let attester_thread = {
@@ -358,7 +421,6 @@ impl Service {
Ok(())
}
/*
// Naively wait for all the threads to complete.
for tuple in threads {
let (manager, producer, attester) = tuple;