Add interop keys and startup wait

This commit is contained in:
Paul Hauner
2019-11-22 17:02:50 +11:00
parent f4b78e6133
commit b79f88ceaa
5 changed files with 203 additions and 304 deletions

View File

@@ -17,22 +17,25 @@ use clap::ArgMatches;
use config::{Config as ClientConfig, KeySource};
use duties_service::{DutiesService, DutiesServiceBuilder};
use environment::RuntimeContext;
use eth2_config::Eth2Config;
use exit_future::Signal;
use fork_service::{ForkService, ForkServiceBuilder};
use futures::{Future, IntoFuture};
use lighthouse_bootstrap::Bootstrapper;
use futures::{
future::{self, loop_fn, Loop},
Future, IntoFuture,
};
use parking_lot::RwLock;
use remote_beacon_node::RemoteBeaconNode;
use slog::{info, Logger};
use slog::{error, info, Logger};
use slot_clock::SlotClock;
use slot_clock::SystemTimeSlotClock;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio::timer::Delay;
use types::EthSpec;
use validator_store::ValidatorStore;
const RETRY_DELAY: Duration = Duration::from_secs(2);
#[derive(Clone)]
pub struct ProductionValidatorClient<T: EthSpec> {
context: RuntimeContext<T>,
@@ -47,22 +50,13 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
/// Instantiates the validator client, _without_ starting the timers to trigger block
/// and attestation production.
pub fn new_from_cli(
mut context: RuntimeContext<T>,
matches: &ArgMatches,
context: RuntimeContext<T>,
cli_args: &ArgMatches,
) -> impl Future<Item = Self, Error = String> {
let mut log = context.log.clone();
get_configs(&matches, &mut log)
ClientConfig::from_cli(&cli_args)
.into_future()
.map_err(|e| format!("Unable to initialize config: {}", e))
.and_then(|(client_config, eth2_config)| {
// TODO: the eth2 config in the env is being completely ignored.
//
// See https://github.com/sigp/lighthouse/issues/602
context.eth2_config = eth2_config;
Self::new(context, client_config)
})
.and_then(|client_config| Self::new(context, client_config))
}
/// Instantiates the validator client, _without_ starting the timers to trigger block
@@ -71,12 +65,14 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
mut context: RuntimeContext<T>,
client_config: ClientConfig,
) -> impl Future<Item = Self, Error = String> {
let log = context.log.clone();
let log_1 = context.log.clone();
let log_2 = context.log.clone();
let log_3 = context.log.clone();
info!(
log,
log_1,
"Starting validator client";
"datadir" => client_config.full_data_dir().expect("Unable to find datadir").to_str(),
"datadir" => format!("{:?}", client_config.data_dir),
);
format!(
@@ -86,12 +82,18 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
.parse()
.map_err(|e| format!("Unable to parse server address: {:?}", e))
.into_future()
.and_then(|http_server_addr| {
.and_then(move |http_server_addr| {
info!(
log_1,
"Beacon node connection info";
"http_server" => format!("{}", http_server_addr),
);
RemoteBeaconNode::new(http_server_addr)
.map_err(|e| format!("Unable to init beacon node http client: {}", e))
})
.and_then(move |beacon_node| wait_for_node(beacon_node, log_2))
.and_then(|beacon_node| {
// TODO: add loop function to retry if node not online.
beacon_node
.http
.spec()
@@ -131,17 +133,29 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
Duration::from_millis(context.eth2_config.spec.milliseconds_per_slot),
);
dbg!(context.eth2_config.spec.milliseconds_per_slot);
// TODO: fix expect.
let validator_store = ValidatorStore::load_from_disk(
client_config.full_data_dir().expect("Get rid of this."),
context.eth2_config.spec.clone(),
log.clone(),
)?;
let validator_store: ValidatorStore<T> = match &client_config.key_source {
// Load pre-existing validators from the data dir.
//
// Use the `account_manager` to generate these files.
KeySource::Disk => ValidatorStore::load_from_disk(
client_config.data_dir.clone(),
context.eth2_config.spec.clone(),
log_3.clone(),
)?,
// Generate ephemeral insecure keypairs for testing purposes.
//
// Do not use in production.
KeySource::TestingKeypairRange(range) => {
ValidatorStore::insecure_ephemeral_validators(
range.clone(),
context.eth2_config.spec.clone(),
log_3.clone(),
)?
}
};
info!(
log,
log_3,
"Loaded validator keypair store";
"voting_validators" => validator_store.num_voting_validators()
);
@@ -221,134 +235,50 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
}
}
/// Parses the CLI arguments and attempts to load the client and eth2 configuration.
///
/// This is not a pure function, it reads from disk and may contact network servers.
fn get_configs(
cli_args: &ArgMatches,
mut log: &mut Logger,
) -> Result<(ClientConfig, Eth2Config), String> {
let mut client_config = ClientConfig::default();
/// Request the version from the node, looping back and trying again on failure. Exit once the node
/// has been contacted.
fn wait_for_node<E: EthSpec>(
beacon_node: RemoteBeaconNode<E>,
log: Logger,
) -> impl Future<Item = RemoteBeaconNode<E>, Error = String> {
// Try to get the version string from the node, looping until success is returned.
loop_fn(beacon_node.clone(), move |beacon_node| {
let log = log.clone();
beacon_node
.clone()
.http
.node()
.get_version()
.map_err(|e| format!("{:?}", e))
.then(move |result| {
let future: Box<dyn Future<Item = Loop<_, _>, Error = String> + Send> = match result
{
Ok(version) => {
info!(
log,
"Connected to beacon node";
"version" => version,
);
client_config.apply_cli_args(&cli_args, &mut log)?;
Box::new(future::ok(Loop::Break(beacon_node)))
}
Err(e) => {
error!(
log,
"Unable to connect to beacon node";
"error" => format!("{:?}", e),
);
if let Some(server) = cli_args.value_of("server") {
client_config.server = server.to_string();
}
Box::new(
Delay::new(Instant::now() + RETRY_DELAY)
.map_err(|e| format!("Failed to trigger delay: {:?}", e))
.and_then(|_| future::ok(Loop::Continue(beacon_node))),
)
}
};
if let Some(port) = cli_args.value_of("server-http-port") {
client_config.server_http_port = port
.parse::<u16>()
.map_err(|e| format!("Unable to parse HTTP port: {:?}", e))?;
}
if let Some(port) = cli_args.value_of("server-grpc-port") {
client_config.server_grpc_port = port
.parse::<u16>()
.map_err(|e| format!("Unable to parse gRPC port: {:?}", e))?;
}
info!(
*log,
"Beacon node connection info";
"grpc_port" => client_config.server_grpc_port,
"http_port" => client_config.server_http_port,
"server" => &client_config.server,
);
let (client_config, eth2_config) = match cli_args.subcommand() {
("testnet", Some(sub_cli_args)) => {
if cli_args.is_present("eth2-config") && sub_cli_args.is_present("bootstrap") {
return Err(
"Cannot specify --eth2-config and --bootstrap as it may result \
in ambiguity."
.into(),
);
}
process_testnet_subcommand(sub_cli_args, client_config, log)
}
_ => return Err("You must use the testnet command. See '--help'.".into()),
}?;
Ok((client_config, eth2_config))
}
/// Parses the `testnet` CLI subcommand.
///
/// This is not a pure function, it reads from disk and may contact network servers.
fn process_testnet_subcommand(
cli_args: &ArgMatches,
mut client_config: ClientConfig,
log: &Logger,
) -> Result<(ClientConfig, Eth2Config), String> {
let eth2_config = if cli_args.is_present("bootstrap") {
info!(log, "Connecting to bootstrap server");
let bootstrapper = Bootstrapper::connect(
format!(
"http://{}:{}",
client_config.server, client_config.server_http_port
),
&log,
)?;
let eth2_config = bootstrapper.eth2_config()?;
info!(
log,
"Bootstrapped eth2 config via HTTP";
"slot_time_millis" => eth2_config.spec.milliseconds_per_slot,
"spec" => &eth2_config.spec_constants,
);
eth2_config
} else {
match cli_args.value_of("spec") {
Some("mainnet") => Eth2Config::mainnet(),
Some("minimal") => Eth2Config::minimal(),
Some("interop") => Eth2Config::interop(),
_ => return Err("No --spec flag provided. See '--help'.".into()),
}
};
client_config.key_source = match cli_args.subcommand() {
("insecure", Some(sub_cli_args)) => {
let first = sub_cli_args
.value_of("first_validator")
.ok_or_else(|| "No first validator supplied")?
.parse::<usize>()
.map_err(|e| format!("Unable to parse first validator: {:?}", e))?;
let count = sub_cli_args
.value_of("validator_count")
.ok_or_else(|| "No validator count supplied")?
.parse::<usize>()
.map_err(|e| format!("Unable to parse validator count: {:?}", e))?;
info!(
log,
"Generating unsafe testing keys";
"first_validator" => first,
"count" => count
);
KeySource::TestingKeypairRange(first..first + count)
}
("interop-yaml", Some(sub_cli_args)) => {
let path = sub_cli_args
.value_of("path")
.ok_or_else(|| "No yaml path supplied")?
.parse::<PathBuf>()
.map_err(|e| format!("Unable to parse yaml path: {:?}", e))?;
info!(
log,
"Loading keypairs from interop YAML format";
"path" => format!("{:?}", path),
);
KeySource::YamlKeypairs(path)
}
_ => KeySource::Disk,
};
Ok((client_config, eth2_config))
future
})
})
.map(|_| beacon_node)
}