Merged age-validator-client into luke's changes on validator_client, and fixed all the merge conflicts.

This commit is contained in:
Luke Anderson
2019-03-28 20:55:07 +11:00
parent c9e8fe53bc
commit ba71e8adca
28 changed files with 540 additions and 258 deletions

View File

@@ -25,6 +25,7 @@ use tokio::prelude::*;
use tokio::runtime::Builder;
use tokio::timer::Interval;
use tokio_timer::clock::Clock;
use types::test_utils::generate_deterministic_keypairs;
use types::{Epoch, Fork, Slot};
use std::thread;
@@ -45,8 +46,6 @@ pub struct Service {
slot_clock: SystemTimeSlotClock,
/// The current slot we are processing.
current_slot: Slot,
/// Duration until the next slot. This is used for initializing the tokio timer interval.
duration_to_next_slot: Duration,
/// The number of slots per epoch to allow for converting slots to epochs.
slots_per_epoch: u64,
// GRPC Clients
@@ -107,6 +106,7 @@ impl Service {
// build requisite objects to form Self
let genesis_time = node_info.get_genesis_time();
let genesis_slot = Slot::from(node_info.get_genesis_slot());
info!(log,"Beacon node connected"; "Node Version" => node_info.version.clone(), "Chain ID" => node_info.chain_id, "Genesis time" => genesis_time);
@@ -142,46 +142,21 @@ impl Service {
};
// build the validator slot clock
let slot_clock = SystemTimeSlotClock::new(genesis_time, config.spec.seconds_per_slot)
.expect("Unable to instantiate SystemTimeSlotClock.");
let slot_clock =
SystemTimeSlotClock::new(genesis_slot, genesis_time, config.spec.seconds_per_slot)
.expect("Unable to instantiate SystemTimeSlotClock.");
let current_slot = slot_clock
.present_slot()
.map_err(|e| ErrorKind::SlotClockError(e))?
.expect("Genesis must be in the future");
// calculate the duration to the next slot
let duration_to_next_slot = {
let seconds_per_slot = config.spec.seconds_per_slot;
let syslot_time = SystemTime::now();
let duration_since_epoch = syslot_time
.duration_since(SystemTime::UNIX_EPOCH)
.map_err(|e| ErrorKind::SystemTimeError(e.to_string()))?;
let duration_since_genesis = duration_since_epoch
.checked_sub(Duration::from_secs(genesis_time))
.expect("Genesis must be in the future. Checked on connection");
let elapsed_slots = duration_since_epoch
.as_secs()
.checked_div(seconds_per_slot as u64)
.expect("Seconds per slot should not be 0");
// the duration to the next slot
Duration::from_secs(
(elapsed_slots + 1)
.checked_mul(seconds_per_slot)
.expect("Next slot time should not overflow u64"),
)
.checked_sub(duration_since_genesis)
.expect("This should never saturate")
};
Ok(Self {
connected_node_version: node_info.version,
chain_id: node_info.chain_id as u16,
fork,
slot_clock,
current_slot,
duration_to_next_slot,
slots_per_epoch: config.spec.slots_per_epoch,
beacon_block_client,
validator_client,
@@ -204,15 +179,18 @@ impl Service {
.build()
.map_err(|e| format!("Tokio runtime failed: {}", e))?;
let duration_to_next_slot = service
.slot_clock
.duration_to_next_slot()
.map_err(|e| format!("System clock error: {:?}", e))?
.expect("Cannot start before genesis");
// set up the validator work interval - start at next slot and proceed every slot
let interval = {
// Set the interval to start at the next slot, and every slot after
let slot_duration = Duration::from_secs(config.spec.seconds_per_slot);
//TODO: Handle checked add correctly
Interval::new(
Instant::now() + service.duration_to_next_slot,
slot_duration,
)
Interval::new(Instant::now() + duration_to_next_slot, slot_duration)
};
/* kick off core service */
@@ -221,11 +199,14 @@ impl Service {
// TODO: keypairs are randomly generated; they should be loaded from a file or generated.
// https://github.com/sigp/lighthouse/issues/160
/* In future, load generated keys from disk.
let keypairs = match config.fetch_keys(&log.clone()) {
Some(kps) => kps,
None => panic!("No key pairs found, cannot start validator client without. Try running ./account_manager generate first.")
};
*/
let keypairs = Arc::new(generate_deterministic_keypairs(8));
/* build requisite objects to pass to core thread */
@@ -243,71 +224,75 @@ impl Service {
});
// run the core thread
runtime
.block_on(interval.for_each(move |_| {
let log = service.log.clone();
runtime.block_on(
interval
.for_each(move |_| {
let log = service.log.clone();
/* get the current slot and epoch */
let current_slot = match service.slot_clock.present_slot() {
Err(e) => {
error!(log, "SystemTimeError {:?}", e);
return Ok(());
}
Ok(slot) => slot.expect("Genesis is in the future"),
};
let current_epoch = current_slot.epoch(service.slots_per_epoch);
debug_assert!(
current_slot > service.current_slot,
"The Timer should poll a new slot"
);
info!(log, "Processing slot: {}", current_slot.as_u64());
/* check for new duties */
let cloned_log = log.clone();
let cloned_manager = manager.clone();
tokio::spawn(futures::future::poll_fn(move || {
cloned_manager.run_update(current_epoch.clone(), cloned_log.clone())
}));
/* execute any specified duties */
if let Some(work) = manager.get_current_work(current_slot) {
for (keypair, work_type) in work {
if work_type.produce_block {
// TODO: Produce a beacon block in a new thread
/* get the current slot and epoch */
let current_slot = match service.slot_clock.present_slot() {
Err(e) => {
error!(log, "SystemTimeError {:?}", e);
return Ok(());
}
if work_type.attestation_duty.is_some() {
// available AttestationDuty info
let attestation_duty =
work_type.attestation_duty.expect("Cannot be None");
let attester_grpc_client =
Arc::new(
AttestationGrpcClient::new(
service.attester_client.clone()
)
);
let signer = Arc::new(AttesterLocalSigner::new(keypair.clone()));
let attester =
Attester::new(
attester_grpc_client,
signer);
let mut attester_service = AttesterService {
attester,
poll_interval_millis: POLL_INTERVAL_MILLIS,
log: log.clone(),
};
attester_service.run();
Ok(slot) => slot.expect("Genesis is in the future"),
};
let current_epoch = current_slot.epoch(service.slots_per_epoch);
debug_assert!(
current_slot > service.current_slot,
"The Timer should poll a new slot"
);
info!(log, "Processing slot: {}", current_slot.as_u64());
/* check for new duties */
let cloned_manager = manager.clone();
let cloned_log = log.clone();
// spawn a new thread separate to the runtime
std::thread::spawn(move || {
cloned_manager.run_update(current_epoch.clone(), cloned_log.clone());
dbg!("Finished thread");
});
/* execute any specified duties */
if let Some(work) = manager.get_current_work(current_slot) {
for (keypair, work_type) in work {
if work_type.produce_block {
// TODO: Produce a beacon block in a new thread
}
if work_type.attestation_duty.is_some() {
// available AttestationDuty info
let attestation_duty =
work_type.attestation_duty.expect("Cannot be None");
let attester_grpc_client =
Arc::new(
AttestationGrpcClient::new(
service.attester_client.clone()
)
);
let signer = Arc::new(AttesterLocalSigner::new(keypair.clone()));
let attester =
Attester::new(
attester_grpc_client,
signer);
let mut attester_service = AttesterService {
attester,
poll_interval_millis: POLL_INTERVAL_MILLIS,
log: log.clone(),
};
attester_service.run();
}
}
}
}
Ok(())
}))
.map_err(|e| format!("Service thread failed: {:?}", e))?;
Ok(())
})
.map_err(|e| format!("Service thread failed: {:?}", e)),
);
// completed a slot process
Ok(())