Files
lighthouse/eth2/utils/remote_beacon_node/src/lib.rs
2019-11-21 11:18:21 +11:00

400 lines
13 KiB
Rust

//! Provides a `RemoteBeaconNode` which interacts with a HTTP API on another Lighthouse (or
//! compatible) instance.
//!
//! Presently, this is only used for testing but it _could_ become a user-facing library.
use futures::{future, Future, IntoFuture};
use reqwest::{
r#async::{Client, ClientBuilder, Response},
StatusCode,
};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use ssz::Encode;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::time::Duration;
use types::{
Attestation, BeaconBlock, BeaconState, CommitteeIndex, Epoch, EthSpec, Hash256, PublicKey,
Signature, Slot,
};
use url::Url;
pub use rest_api::{HeadResponse, ValidatorDuty};
pub const REQUEST_TIMEOUT_SECONDS: u64 = 5;
/// Connects to a remote Lighthouse (or compatible) node via HTTP.
pub struct RemoteBeaconNode<E: EthSpec> {
pub http: HttpClient<E>,
}
impl<E: EthSpec> RemoteBeaconNode<E> {
pub fn new(http_endpoint: SocketAddr) -> Result<Self, String> {
Ok(Self {
http: HttpClient::new(format!("http://{}", http_endpoint.to_string()))
.map_err(|e| format!("Unable to create http client: {:?}", e))?,
})
}
}
#[derive(Debug)]
pub enum Error {
/// Unable to parse a URL. Check the server URL.
UrlParseError(url::ParseError),
/// The `reqwest` library returned an error.
ReqwestError(reqwest::Error),
/// There was an error when encoding/decoding an object using serde.
SerdeJsonError(serde_json::Error),
/// The server responded to the request, however it did not return a 200-type success code.
DidNotSucceed { status: StatusCode, body: String },
}
#[derive(Clone)]
pub struct HttpClient<E> {
client: Client,
url: Url,
timeout: Duration,
_phantom: PhantomData<E>,
}
impl<E: EthSpec> HttpClient<E> {
/// Creates a new instance (without connecting to the node).
///
/// The `timeout` is set to 15 seconds.
pub fn new(server_url: String) -> Result<Self, Error> {
Ok(Self {
client: ClientBuilder::new()
.timeout(Duration::from_secs(REQUEST_TIMEOUT_SECONDS))
.build()
.expect("should build from static configuration"),
url: Url::parse(&server_url)?,
timeout: Duration::from_secs(15),
_phantom: PhantomData,
})
}
pub fn beacon(&self) -> Beacon<E> {
Beacon(self.clone())
}
pub fn validator(&self) -> Validator<E> {
Validator(self.clone())
}
fn url(&self, path: &str) -> Result<Url, Error> {
self.url.join(path).map_err(|e| e.into())
}
pub fn json_post<T: Serialize>(
&self,
url: Url,
body: T,
) -> impl Future<Item = Response, Error = Error> {
self.client
.post(&url.to_string())
.json(&body)
.send()
.map_err(Error::from)
}
pub fn json_get<T: DeserializeOwned>(
&self,
mut url: Url,
query_pairs: Vec<(String, String)>,
) -> impl Future<Item = T, Error = Error> {
query_pairs.into_iter().for_each(|(key, param)| {
url.query_pairs_mut().append_pair(&key, &param);
});
self.client
.get(&url.to_string())
.send()
.map_err(Error::from)
.and_then(|response| error_for_status(response).map_err(Error::from))
.and_then(|mut success| success.json::<T>().map_err(Error::from))
}
}
/// Returns an `Error` (with a description) if the `response` was not a 200-type success response.
///
/// Distinct from `Response::error_for_status` because it includes the body of the response as
/// text. This ensures the error message from the server is not discarded.
fn error_for_status(
mut response: Response,
) -> Box<dyn Future<Item = Response, Error = Error> + Send> {
let status = response.status();
if status.is_success() {
Box::new(future::ok(response))
} else {
Box::new(response.text().then(move |text_result| match text_result {
Err(e) => Err(Error::ReqwestError(e)),
Ok(body) => Err(Error::DidNotSucceed { status, body }),
}))
}
}
#[derive(Debug, PartialEq, Clone)]
pub enum PublishStatus {
/// The object was valid and has been published to the network.
Valid,
/// The object was not valid and may or may not have been published to the network.
Invalid(String),
/// The server responsed with an unknown status code. The object may or may not have been
/// published to the network.
Unknown,
}
impl PublishStatus {
/// Returns `true` if `*self == PublishStatus::Valid`.
pub fn is_valid(&self) -> bool {
*self == PublishStatus::Valid
}
}
/// Provides the functions on the `/beacon` endpoint of the node.
#[derive(Clone)]
pub struct Validator<E>(HttpClient<E>);
impl<E: EthSpec> Validator<E> {
fn url(&self, path: &str) -> Result<Url, Error> {
self.0
.url("validator/")
.and_then(move |url| url.join(path).map_err(Error::from))
.map_err(Into::into)
}
/// Produces an unsigned attestation.
pub fn produce_attestation(
&self,
slot: Slot,
committee_index: CommitteeIndex,
) -> impl Future<Item = Attestation<E>, Error = Error> {
let query_params = vec![
("slot".into(), format!("{}", slot)),
("committee_index".into(), format!("{}", committee_index)),
];
let client = self.0.clone();
self.url("attestation")
.into_future()
.and_then(move |url| client.json_get(url, query_params))
}
/// Posts an attestation to the beacon node, expecting it to verify it and publish it to the network.
pub fn publish_attestation(
&self,
attestation: Attestation<E>,
) -> impl Future<Item = PublishStatus, Error = Error> {
let client = self.0.clone();
self.url("attestation")
.into_future()
.and_then(move |url| client.json_post::<_>(url, attestation))
.and_then(|mut response| {
response
.text()
.map(|text| (response, text))
.map_err(Error::from)
})
.and_then(|(response, text)| match response.status() {
StatusCode::OK => Ok(PublishStatus::Valid),
StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(text)),
_ => response
.error_for_status()
.map_err(Error::from)
.map(|_| PublishStatus::Unknown),
})
}
/// Returns the duties required of the given validator pubkeys in the given epoch.
pub fn get_duties(
&self,
epoch: Epoch,
validator_pubkeys: &[PublicKey],
) -> impl Future<Item = Vec<ValidatorDuty>, Error = Error> {
let validator_pubkeys: Vec<String> =
validator_pubkeys.iter().map(pubkey_as_string).collect();
let client = self.0.clone();
self.url("duties").into_future().and_then(move |url| {
let mut query_params = validator_pubkeys
.into_iter()
.map(|pubkey| ("validator_pubkeys".to_string(), pubkey))
.collect::<Vec<_>>();
query_params.push(("epoch".into(), format!("{}", epoch.as_u64())));
client.json_get::<_>(url, query_params)
})
}
/// Posts a block to the beacon node, expecting it to verify it and publish it to the network.
pub fn publish_block(
&self,
block: BeaconBlock<E>,
) -> impl Future<Item = PublishStatus, Error = Error> {
let client = self.0.clone();
self.url("block")
.into_future()
.and_then(move |url| client.json_post::<_>(url, block))
.and_then(|mut response| {
response
.text()
.map(|text| (response, text))
.map_err(Error::from)
})
.and_then(|(response, text)| match response.status() {
StatusCode::OK => Ok(PublishStatus::Valid),
StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(text)),
_ => response
.error_for_status()
.map_err(Error::from)
.map(|_| PublishStatus::Unknown),
})
}
/// Requests a new (unsigned) block from the beacon node.
pub fn produce_block(
&self,
slot: Slot,
randao_reveal: Signature,
) -> impl Future<Item = BeaconBlock<E>, Error = Error> {
let client = self.0.clone();
self.url("block").into_future().and_then(move |url| {
client.json_get::<BeaconBlock<E>>(
url,
vec![
("slot".into(), format!("{}", slot.as_u64())),
("randao_reveal".into(), signature_as_string(&randao_reveal)),
],
)
})
}
}
/// Provides the functions on the `/beacon` endpoint of the node.
#[derive(Clone)]
pub struct Beacon<E>(HttpClient<E>);
impl<E: EthSpec> Beacon<E> {
fn url(&self, path: &str) -> Result<Url, Error> {
self.0
.url("beacon/")
.and_then(move |url| url.join(path).map_err(Error::from))
.map_err(Into::into)
}
pub fn get_head(&self) -> impl Future<Item = HeadResponse, Error = Error> {
let client = self.0.clone();
self.url("head")
.into_future()
.and_then(move |url| client.json_get::<HeadResponse>(url, vec![]))
}
/// Returns the block and block root at the given slot.
pub fn get_block_by_slot(
&self,
slot: Slot,
) -> impl Future<Item = (BeaconBlock<E>, Hash256), Error = Error> {
self.get_block("slot".to_string(), format!("{}", slot.as_u64()))
}
/// Returns the block and block root at the given root.
pub fn get_block_by_root(
&self,
root: Hash256,
) -> impl Future<Item = (BeaconBlock<E>, Hash256), Error = Error> {
self.get_block("root".to_string(), root_as_string(root))
}
/// Returns the block and block root at the given slot.
fn get_block(
&self,
query_key: String,
query_param: String,
) -> impl Future<Item = (BeaconBlock<E>, Hash256), Error = Error> {
let client = self.0.clone();
self.url("block")
.into_future()
.and_then(move |url| {
client.json_get::<BlockResponse<E>>(url, vec![(query_key, query_param)])
})
.map(|response| (response.beacon_block, response.root))
}
/// Returns the state and state root at the given slot.
pub fn get_state_by_slot(
&self,
slot: Slot,
) -> impl Future<Item = (BeaconState<E>, Hash256), Error = Error> {
self.get_state("slot".to_string(), format!("{}", slot.as_u64()))
}
/// Returns the state and state root at the given root.
pub fn get_state_by_root(
&self,
root: Hash256,
) -> impl Future<Item = (BeaconState<E>, Hash256), Error = Error> {
self.get_state("root".to_string(), root_as_string(root))
}
/// Returns the state and state root at the given slot.
fn get_state(
&self,
query_key: String,
query_param: String,
) -> impl Future<Item = (BeaconState<E>, Hash256), Error = Error> {
let client = self.0.clone();
self.url("state")
.into_future()
.and_then(move |url| {
client.json_get::<StateResponse<E>>(url, vec![(query_key, query_param)])
})
.map(|response| (response.beacon_state, response.root))
}
}
#[derive(Deserialize)]
#[serde(bound = "T: EthSpec")]
pub struct BlockResponse<T: EthSpec> {
pub beacon_block: BeaconBlock<T>,
pub root: Hash256,
}
#[derive(Deserialize)]
#[serde(bound = "T: EthSpec")]
pub struct StateResponse<T: EthSpec> {
pub beacon_state: BeaconState<T>,
pub root: Hash256,
}
fn root_as_string(root: Hash256) -> String {
format!("0x{:?}", root)
}
fn signature_as_string(signature: &Signature) -> String {
format!("0x{}", hex::encode(signature.as_ssz_bytes()))
}
fn pubkey_as_string(pubkey: &PublicKey) -> String {
format!("0x{}", hex::encode(pubkey.as_ssz_bytes()))
}
impl From<reqwest::Error> for Error {
fn from(e: reqwest::Error) -> Error {
Error::ReqwestError(e)
}
}
impl From<url::ParseError> for Error {
fn from(e: url::ParseError) -> Error {
Error::UrlParseError(e)
}
}
impl From<serde_json::Error> for Error {
fn from(e: serde_json::Error) -> Error {
Error::SerdeJsonError(e)
}
}