Validator client refactor (#618)

* Update to spec v0.9.0

* Update to v0.9.1

* Bump spec tags for v0.9.1

* Formatting, fix CI failures

* Resolve accidental KeyPair merge conflict

* Document new BeaconState functions

* Add `validator` changes from `validator-to-rest`

* Add initial (failing) REST api tests

* Fix signature parsing

* Add more tests

* Refactor http router

* Add working tests for publish beacon block

* Add validator duties tests

* Move account_manager under `lighthouse` binary

* Unify logfile handling in `environment` crate.

* Fix incorrect cache drops in `advance_caches`

* Update fork choice for v0.9.1

* Add `deposit_contract` crate

* Add progress on validator onboarding

* Add unfinished attesation code

* Update account manager CLI

* Write eth1 data file as hex string

* Integrate ValidatorDirectory with validator_client

* Move ValidatorDirectory into validator_client

* Clean up some FIXMEs

* Add beacon_chain_sim

* Fix a few docs/logs

* Expand `beacon_chain_sim`

* Fix spec for `beacon_chain_sim

* More testing for api

* Start work on attestation endpoint

* Reject empty attestations

* Allow attestations to genesis block

* Add working tests for `rest_api` validator endpoint

* Remove grpc from beacon_node

* Start heavy refactor of validator client

- Block production is working

* Prune old validator client files

* Start works on attestation service

* Add attestation service to validator client

* Use full pubkey for validator directories

* Add validator duties post endpoint

* Use par_iter for keypair generation

* Use bulk duties request in validator client

* Add version http endpoint tests

* Add interop keys and startup wait

* Ensure a prompt exit

* Add duties pruning

* Fix compile error in beacon node tests

* Add github workflow

* Modify rust.yaml

* Modify gitlab actions

* Add to CI file

* Add sudo to CI npm install

* Move cargo fmt to own job in tests

* Fix cargo fmt in CI

* Add rustup update before cargo fmt

* Change name of CI job

* Make other CI jobs require cargo fmt

* Add CI badge

* Remove gitlab and travis files

* Add different http timeout for debug

* Update docker file, use makefile in CI

* Use make in the dockerfile, skip the test

* Use the makefile for debug GI test

* Update book

* Tidy grpc and misc things

* Apply discv5 fixes

* Address other minor issues

* Fix warnings

* Attempt fix for addr parsing

* Tidy validator config, CLIs

* Tidy comments

* Tidy signing, reduce ForkService duplication

* Fail if skipping too many slots

* Set default recent genesis time to 0

* Add custom http timeout to validator

* Fix compile bug in node_test_rig

* Remove old bootstrap flag from val CLI

* Update docs

* Tidy val client

* Change val client log levels

* Add comments, more validity checks

* Fix compile error, add comments

* Undo changes to eth2-libp2p/src

* Reduce duplication of keypair generation

* Add more logging for validator duties

* Fix beacon_chain_sim, nitpicks

* Fix compile error, minor nits

* Address Michael's comments
This commit is contained in:
Paul Hauner
2019-11-25 15:48:24 +11:00
committed by GitHub
parent 3ca63cfa83
commit 78d82d9193
100 changed files with 4571 additions and 4032 deletions

View File

@@ -3,24 +3,46 @@
//!
//! Presently, this is only used for testing but it _could_ become a user-facing library.
use futures::{Future, IntoFuture};
use reqwest::r#async::{Client, RequestBuilder};
use serde::Deserialize;
use eth2_config::Eth2Config;
use futures::{future, Future, IntoFuture};
use reqwest::{
r#async::{Client, ClientBuilder, Response},
StatusCode,
};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use ssz::Encode;
use std::marker::PhantomData;
use std::net::SocketAddr;
use types::{BeaconBlock, BeaconState, EthSpec};
use types::{Hash256, Slot};
use std::time::Duration;
use types::{
Attestation, BeaconBlock, BeaconState, CommitteeIndex, Epoch, EthSpec, Fork, Hash256,
PublicKey, Signature, Slot,
};
use url::Url;
pub use rest_api::{BulkValidatorDutiesRequest, HeadResponse, ValidatorDuty};
// Setting a long timeout for debug ensures that crypto-heavy operations can still succeed.
#[cfg(debug_assertions)]
pub const REQUEST_TIMEOUT_SECONDS: u64 = 15;
#[cfg(not(debug_assertions))]
pub const REQUEST_TIMEOUT_SECONDS: u64 = 5;
#[derive(Clone)]
/// Connects to a remote Lighthouse (or compatible) node via HTTP.
pub struct RemoteBeaconNode<E: EthSpec> {
pub http: HttpClient<E>,
}
impl<E: EthSpec> RemoteBeaconNode<E> {
pub fn new(http_endpoint: SocketAddr) -> Result<Self, String> {
/// Uses the default HTTP timeout.
pub fn new(http_endpoint: String) -> Result<Self, String> {
Self::new_with_timeout(http_endpoint, Duration::from_secs(REQUEST_TIMEOUT_SECONDS))
}
pub fn new_with_timeout(http_endpoint: String, timeout: Duration) -> Result<Self, String> {
Ok(Self {
http: HttpClient::new(format!("http://{}", http_endpoint.to_string()))
http: HttpClient::new(http_endpoint, timeout)
.map_err(|e| format!("Unable to create http client: {:?}", e))?,
})
}
@@ -28,23 +50,34 @@ impl<E: EthSpec> RemoteBeaconNode<E> {
#[derive(Debug)]
pub enum Error {
/// Unable to parse a URL. Check the server URL.
UrlParseError(url::ParseError),
/// The `reqwest` library returned an error.
ReqwestError(reqwest::Error),
/// There was an error when encoding/decoding an object using serde.
SerdeJsonError(serde_json::Error),
/// The server responded to the request, however it did not return a 200-type success code.
DidNotSucceed { status: StatusCode, body: String },
}
#[derive(Clone)]
pub struct HttpClient<E> {
client: Client,
url: Url,
timeout: Duration,
_phantom: PhantomData<E>,
}
impl<E: EthSpec> HttpClient<E> {
/// Creates a new instance (without connecting to the node).
pub fn new(server_url: String) -> Result<Self, Error> {
pub fn new(server_url: String, timeout: Duration) -> Result<Self, Error> {
Ok(Self {
client: Client::new(),
client: ClientBuilder::new()
.timeout(timeout)
.build()
.expect("should build from static configuration"),
url: Url::parse(&server_url)?,
timeout: Duration::from_secs(15),
_phantom: PhantomData,
})
}
@@ -53,13 +86,231 @@ impl<E: EthSpec> HttpClient<E> {
Beacon(self.clone())
}
pub fn validator(&self) -> Validator<E> {
Validator(self.clone())
}
pub fn spec(&self) -> Spec<E> {
Spec(self.clone())
}
pub fn node(&self) -> Node<E> {
Node(self.clone())
}
fn url(&self, path: &str) -> Result<Url, Error> {
self.url.join(path).map_err(|e| e.into())
}
pub fn get(&self, path: &str) -> Result<RequestBuilder, Error> {
self.url(path)
.map(|url| Client::new().get(&url.to_string()))
pub fn json_post<T: Serialize>(
&self,
url: Url,
body: T,
) -> impl Future<Item = Response, Error = Error> {
self.client
.post(&url.to_string())
.json(&body)
.send()
.map_err(Error::from)
}
pub fn json_get<T: DeserializeOwned>(
&self,
mut url: Url,
query_pairs: Vec<(String, String)>,
) -> impl Future<Item = T, Error = Error> {
query_pairs.into_iter().for_each(|(key, param)| {
url.query_pairs_mut().append_pair(&key, &param);
});
self.client
.get(&url.to_string())
.send()
.map_err(Error::from)
.and_then(|response| error_for_status(response).map_err(Error::from))
.and_then(|mut success| success.json::<T>().map_err(Error::from))
}
}
/// Returns an `Error` (with a description) if the `response` was not a 200-type success response.
///
/// Distinct from `Response::error_for_status` because it includes the body of the response as
/// text. This ensures the error message from the server is not discarded.
fn error_for_status(
mut response: Response,
) -> Box<dyn Future<Item = Response, Error = Error> + Send> {
let status = response.status();
if status.is_success() {
Box::new(future::ok(response))
} else {
Box::new(response.text().then(move |text_result| match text_result {
Err(e) => Err(Error::ReqwestError(e)),
Ok(body) => Err(Error::DidNotSucceed { status, body }),
}))
}
}
#[derive(Debug, PartialEq, Clone)]
pub enum PublishStatus {
/// The object was valid and has been published to the network.
Valid,
/// The object was not valid and may or may not have been published to the network.
Invalid(String),
/// The server responsed with an unknown status code. The object may or may not have been
/// published to the network.
Unknown,
}
impl PublishStatus {
/// Returns `true` if `*self == PublishStatus::Valid`.
pub fn is_valid(&self) -> bool {
*self == PublishStatus::Valid
}
}
/// Provides the functions on the `/beacon` endpoint of the node.
#[derive(Clone)]
pub struct Validator<E>(HttpClient<E>);
impl<E: EthSpec> Validator<E> {
fn url(&self, path: &str) -> Result<Url, Error> {
self.0
.url("validator/")
.and_then(move |url| url.join(path).map_err(Error::from))
.map_err(Into::into)
}
/// Produces an unsigned attestation.
pub fn produce_attestation(
&self,
slot: Slot,
committee_index: CommitteeIndex,
) -> impl Future<Item = Attestation<E>, Error = Error> {
let query_params = vec![
("slot".into(), format!("{}", slot)),
("committee_index".into(), format!("{}", committee_index)),
];
let client = self.0.clone();
self.url("attestation")
.into_future()
.and_then(move |url| client.json_get(url, query_params))
}
/// Posts an attestation to the beacon node, expecting it to verify it and publish it to the network.
pub fn publish_attestation(
&self,
attestation: Attestation<E>,
) -> impl Future<Item = PublishStatus, Error = Error> {
let client = self.0.clone();
self.url("attestation")
.into_future()
.and_then(move |url| client.json_post::<_>(url, attestation))
.and_then(|mut response| {
response
.text()
.map(|text| (response, text))
.map_err(Error::from)
})
.and_then(|(response, text)| match response.status() {
StatusCode::OK => Ok(PublishStatus::Valid),
StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(text)),
_ => response
.error_for_status()
.map_err(Error::from)
.map(|_| PublishStatus::Unknown),
})
}
/// Returns the duties required of the given validator pubkeys in the given epoch.
///
/// ## Warning
///
/// This method cannot request large amounts of validator duties because the query string fills
/// up the URL. I have seen requests of 1,024 fail. For large requests, use `get_duties_bulk`.
pub fn get_duties(
&self,
epoch: Epoch,
validator_pubkeys: &[PublicKey],
) -> impl Future<Item = Vec<ValidatorDuty>, Error = Error> {
let validator_pubkeys: Vec<String> =
validator_pubkeys.iter().map(pubkey_as_string).collect();
let client = self.0.clone();
self.url("duties").into_future().and_then(move |url| {
let mut query_params = validator_pubkeys
.into_iter()
.map(|pubkey| ("validator_pubkeys".to_string(), pubkey))
.collect::<Vec<_>>();
query_params.push(("epoch".into(), format!("{}", epoch.as_u64())));
client.json_get::<_>(url, query_params)
})
}
/// Returns the duties required of the given validator pubkeys in the given epoch.
pub fn get_duties_bulk(
&self,
epoch: Epoch,
validator_pubkeys: &[PublicKey],
) -> impl Future<Item = Vec<ValidatorDuty>, Error = Error> {
let client = self.0.clone();
let bulk_request = BulkValidatorDutiesRequest {
epoch,
pubkeys: validator_pubkeys.to_vec(),
};
self.url("duties")
.into_future()
.and_then(move |url| client.json_post::<_>(url, bulk_request))
.and_then(|response| error_for_status(response).map_err(Error::from))
.and_then(|mut success| success.json().map_err(Error::from))
}
/// Posts a block to the beacon node, expecting it to verify it and publish it to the network.
pub fn publish_block(
&self,
block: BeaconBlock<E>,
) -> impl Future<Item = PublishStatus, Error = Error> {
let client = self.0.clone();
self.url("block")
.into_future()
.and_then(move |url| client.json_post::<_>(url, block))
.and_then(|mut response| {
response
.text()
.map(|text| (response, text))
.map_err(Error::from)
})
.and_then(|(response, text)| match response.status() {
StatusCode::OK => Ok(PublishStatus::Valid),
StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(text)),
_ => response
.error_for_status()
.map_err(Error::from)
.map(|_| PublishStatus::Unknown),
})
}
/// Requests a new (unsigned) block from the beacon node.
pub fn produce_block(
&self,
slot: Slot,
randao_reveal: Signature,
) -> impl Future<Item = BeaconBlock<E>, Error = Error> {
let client = self.0.clone();
self.url("block").into_future().and_then(move |url| {
client.json_get::<BeaconBlock<E>>(
url,
vec![
("slot".into(), format!("{}", slot.as_u64())),
("randao_reveal".into(), signature_as_string(&randao_reveal)),
],
)
})
}
}
@@ -75,45 +326,130 @@ impl<E: EthSpec> Beacon<E> {
.map_err(Into::into)
}
pub fn get_genesis_time(&self) -> impl Future<Item = u64, Error = Error> {
let client = self.0.clone();
self.url("genesis_time")
.into_future()
.and_then(move |url| client.json_get(url, vec![]))
}
pub fn get_fork(&self) -> impl Future<Item = Fork, Error = Error> {
let client = self.0.clone();
self.url("fork")
.into_future()
.and_then(move |url| client.json_get(url, vec![]))
}
pub fn get_head(&self) -> impl Future<Item = HeadResponse, Error = Error> {
let client = self.0.clone();
self.url("head")
.into_future()
.and_then(move |url| client.json_get::<HeadResponse>(url, vec![]))
}
/// Returns the block and block root at the given slot.
pub fn block_at_slot(
pub fn get_block_by_slot(
&self,
slot: Slot,
) -> impl Future<Item = (BeaconBlock<E>, Hash256), Error = Error> {
self.get_block("slot".to_string(), format!("{}", slot.as_u64()))
}
/// Returns the block and block root at the given root.
pub fn get_block_by_root(
&self,
root: Hash256,
) -> impl Future<Item = (BeaconBlock<E>, Hash256), Error = Error> {
self.get_block("root".to_string(), root_as_string(root))
}
/// Returns the block and block root at the given slot.
fn get_block(
&self,
query_key: String,
query_param: String,
) -> impl Future<Item = (BeaconBlock<E>, Hash256), Error = Error> {
let client = self.0.clone();
self.url("block")
.into_future()
.and_then(move |mut url| {
url.query_pairs_mut()
.append_pair("slot", &format!("{}", slot.as_u64()));
client.get(&url.to_string())
.and_then(move |url| {
client.json_get::<BlockResponse<E>>(url, vec![(query_key, query_param)])
})
.and_then(|builder| builder.send().map_err(Error::from))
.and_then(|response| response.error_for_status().map_err(Error::from))
.and_then(|mut success| success.json::<BlockResponse<E>>().map_err(Error::from))
.map(|response| (response.beacon_block, response.root))
}
/// Returns the state and state root at the given slot.
pub fn state_at_slot(
pub fn get_state_by_slot(
&self,
slot: Slot,
) -> impl Future<Item = (BeaconState<E>, Hash256), Error = Error> {
self.get_state("slot".to_string(), format!("{}", slot.as_u64()))
}
/// Returns the state and state root at the given root.
pub fn get_state_by_root(
&self,
root: Hash256,
) -> impl Future<Item = (BeaconState<E>, Hash256), Error = Error> {
self.get_state("root".to_string(), root_as_string(root))
}
/// Returns the state and state root at the given slot.
fn get_state(
&self,
query_key: String,
query_param: String,
) -> impl Future<Item = (BeaconState<E>, Hash256), Error = Error> {
let client = self.0.clone();
self.url("state")
.into_future()
.and_then(move |mut url| {
url.query_pairs_mut()
.append_pair("slot", &format!("{}", slot.as_u64()));
client.get(&url.to_string())
.and_then(move |url| {
client.json_get::<StateResponse<E>>(url, vec![(query_key, query_param)])
})
.and_then(|builder| builder.send().map_err(Error::from))
.and_then(|response| response.error_for_status().map_err(Error::from))
.and_then(|mut success| success.json::<StateResponse<E>>().map_err(Error::from))
.map(|response| (response.beacon_state, response.root))
}
}
/// Provides the functions on the `/spec` endpoint of the node.
#[derive(Clone)]
pub struct Spec<E>(HttpClient<E>);
impl<E: EthSpec> Spec<E> {
fn url(&self, path: &str) -> Result<Url, Error> {
self.0
.url("spec/")
.and_then(move |url| url.join(path).map_err(Error::from))
.map_err(Into::into)
}
pub fn get_eth2_config(&self) -> impl Future<Item = Eth2Config, Error = Error> {
let client = self.0.clone();
self.url("eth2_config")
.into_future()
.and_then(move |url| client.json_get(url, vec![]))
}
}
/// Provides the functions on the `/node` endpoint of the node.
#[derive(Clone)]
pub struct Node<E>(HttpClient<E>);
impl<E: EthSpec> Node<E> {
fn url(&self, path: &str) -> Result<Url, Error> {
self.0
.url("node/")
.and_then(move |url| url.join(path).map_err(Error::from))
.map_err(Into::into)
}
pub fn get_version(&self) -> impl Future<Item = String, Error = Error> {
let client = self.0.clone();
self.url("version")
.into_future()
.and_then(move |url| client.json_get(url, vec![]))
}
}
#[derive(Deserialize)]
#[serde(bound = "T: EthSpec")]
pub struct BlockResponse<T: EthSpec> {
@@ -128,6 +464,18 @@ pub struct StateResponse<T: EthSpec> {
pub root: Hash256,
}
fn root_as_string(root: Hash256) -> String {
format!("0x{:?}", root)
}
fn signature_as_string(signature: &Signature) -> String {
format!("0x{}", hex::encode(signature.as_ssz_bytes()))
}
fn pubkey_as_string(pubkey: &PublicKey) -> String {
format!("0x{}", hex::encode(pubkey.as_ssz_bytes()))
}
impl From<reqwest::Error> for Error {
fn from(e: reqwest::Error) -> Error {
Error::ReqwestError(e)
@@ -139,3 +487,9 @@ impl From<url::ParseError> for Error {
Error::UrlParseError(e)
}
}
impl From<serde_json::Error> for Error {
fn from(e: serde_json::Error) -> Error {
Error::SerdeJsonError(e)
}
}