//! Provides a `RemoteBeaconNode` which interacts with a HTTP API on another Lighthouse (or //! compatible) instance. //! //! Presently, this is only used for testing but it _could_ become a user-facing library. use eth2_config::Eth2Config; use futures::{future, Future, IntoFuture}; use reqwest::{ r#async::{Client, ClientBuilder, Response}, StatusCode, }; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use ssz::Encode; use std::marker::PhantomData; use std::time::Duration; use types::{ Attestation, AttestationData, AttesterSlashing, BeaconBlock, BeaconState, CommitteeIndex, Epoch, EthSpec, Fork, Hash256, ProposerSlashing, PublicKey, PublicKeyBytes, Signature, SignedAggregateAndProof, SignedBeaconBlock, Slot, }; use url::Url; pub use operation_pool::PersistedOperationPool; pub use proto_array_fork_choice::core::ProtoArray; pub use rest_types::{ CanonicalHeadResponse, Committee, HeadBeaconBlock, IndividualVotesRequest, IndividualVotesResponse, ValidatorDutiesRequest, ValidatorDutyBytes, ValidatorRequest, ValidatorResponse, ValidatorSubscription, }; // Setting a long timeout for debug ensures that crypto-heavy operations can still succeed. #[cfg(debug_assertions)] pub const REQUEST_TIMEOUT_SECONDS: u64 = 15; #[cfg(not(debug_assertions))] pub const REQUEST_TIMEOUT_SECONDS: u64 = 5; #[derive(Clone)] /// Connects to a remote Lighthouse (or compatible) node via HTTP. pub struct RemoteBeaconNode { pub http: HttpClient, } impl RemoteBeaconNode { /// Uses the default HTTP timeout. pub fn new(http_endpoint: String) -> Result { Self::new_with_timeout(http_endpoint, Duration::from_secs(REQUEST_TIMEOUT_SECONDS)) } pub fn new_with_timeout(http_endpoint: String, timeout: Duration) -> Result { Ok(Self { http: HttpClient::new(http_endpoint, timeout) .map_err(|e| format!("Unable to create http client: {:?}", e))?, }) } } #[derive(Debug)] pub enum Error { /// Unable to parse a URL. Check the server URL. UrlParseError(url::ParseError), /// The `reqwest` library returned an error. ReqwestError(reqwest::Error), /// There was an error when encoding/decoding an object using serde. SerdeJsonError(serde_json::Error), /// The server responded to the request, however it did not return a 200-type success code. DidNotSucceed { status: StatusCode, body: String }, /// The request input was invalid. InvalidInput, } #[derive(Clone)] pub struct HttpClient { client: Client, url: Url, timeout: Duration, _phantom: PhantomData, } impl HttpClient { /// Creates a new instance (without connecting to the node). pub fn new(server_url: String, timeout: Duration) -> Result { Ok(Self { client: ClientBuilder::new() .timeout(timeout) .build() .expect("should build from static configuration"), url: Url::parse(&server_url)?, timeout: Duration::from_secs(15), _phantom: PhantomData, }) } pub fn beacon(&self) -> Beacon { Beacon(self.clone()) } pub fn validator(&self) -> Validator { Validator(self.clone()) } pub fn spec(&self) -> Spec { Spec(self.clone()) } pub fn node(&self) -> Node { Node(self.clone()) } pub fn advanced(&self) -> Advanced { Advanced(self.clone()) } pub fn consensus(&self) -> Consensus { Consensus(self.clone()) } fn url(&self, path: &str) -> Result { self.url.join(path).map_err(|e| e.into()) } pub fn json_post( &self, url: Url, body: T, ) -> impl Future { self.client .post(&url.to_string()) .json(&body) .send() .map_err(Error::from) } pub fn json_get( &self, mut url: Url, query_pairs: Vec<(String, String)>, ) -> impl Future { query_pairs.into_iter().for_each(|(key, param)| { url.query_pairs_mut().append_pair(&key, ¶m); }); self.client .get(&url.to_string()) .send() .map_err(Error::from) .and_then(|response| error_for_status(response).map_err(Error::from)) .and_then(|mut success| success.json::().map_err(Error::from)) } } /// Returns an `Error` (with a description) if the `response` was not a 200-type success response. /// /// Distinct from `Response::error_for_status` because it includes the body of the response as /// text. This ensures the error message from the server is not discarded. fn error_for_status( mut response: Response, ) -> Box + Send> { let status = response.status(); if status.is_success() { Box::new(future::ok(response)) } else { Box::new(response.text().then(move |text_result| match text_result { Err(e) => Err(Error::ReqwestError(e)), Ok(body) => Err(Error::DidNotSucceed { status, body }), })) } } #[derive(Debug, PartialEq, Clone)] pub enum PublishStatus { /// The object was valid and has been published to the network. Valid, /// The object was not valid and may or may not have been published to the network. Invalid(String), /// The server responded with an unknown status code. The object may or may not have been /// published to the network. Unknown, } impl PublishStatus { /// Returns `true` if `*self == PublishStatus::Valid`. pub fn is_valid(&self) -> bool { *self == PublishStatus::Valid } } /// Provides the functions on the `/validator` endpoint of the node. #[derive(Clone)] pub struct Validator(HttpClient); impl Validator { fn url(&self, path: &str) -> Result { self.0 .url("validator/") .and_then(move |url| url.join(path).map_err(Error::from)) .map_err(Into::into) } /// Produces an unsigned attestation. pub fn produce_attestation( &self, slot: Slot, committee_index: CommitteeIndex, ) -> impl Future, Error = Error> { let query_params = vec![ ("slot".into(), format!("{}", slot)), ("committee_index".into(), format!("{}", committee_index)), ]; let client = self.0.clone(); self.url("attestation") .into_future() .and_then(move |url| client.json_get(url, query_params)) } /// Produces an aggregate attestation. pub fn produce_aggregate_attestation( &self, attestation_data: &AttestationData, ) -> impl Future, Error = Error> { let query_params = vec![( "attestation_data".into(), as_ssz_hex_string(attestation_data), )]; let client = self.0.clone(); self.url("aggregate_attestation") .into_future() .and_then(move |url| client.json_get(url, query_params)) } /// Posts a list of attestations to the beacon node, expecting it to verify it and publish it to the network. pub fn publish_attestations( &self, attestation: Vec>, ) -> impl Future { let client = self.0.clone(); self.url("attestations") .into_future() .and_then(move |url| client.json_post::<_>(url, attestation)) .and_then(|mut response| { response .text() .map(|text| (response, text)) .map_err(Error::from) }) .and_then(|(response, text)| match response.status() { StatusCode::OK => Ok(PublishStatus::Valid), StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(text)), _ => response .error_for_status() .map_err(Error::from) .map(|_| PublishStatus::Unknown), }) } /// Posts a list of signed aggregates and proofs to the beacon node, expecting it to verify it and publish it to the network. pub fn publish_aggregate_and_proof( &self, signed_aggregate_and_proofs: Vec>, ) -> impl Future { let client = self.0.clone(); self.url("aggregate_and_proofs") .into_future() .and_then(move |url| client.json_post::<_>(url, signed_aggregate_and_proofs)) .and_then(|mut response| { response .text() .map(|text| (response, text)) .map_err(Error::from) }) .and_then(|(response, text)| match response.status() { StatusCode::OK => Ok(PublishStatus::Valid), StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(text)), _ => response .error_for_status() .map_err(Error::from) .map(|_| PublishStatus::Unknown), }) } /// Returns the duties required of the given validator pubkeys in the given epoch. pub fn get_duties( &self, epoch: Epoch, validator_pubkeys: &[PublicKey], ) -> impl Future, Error = Error> { let client = self.0.clone(); let bulk_request = ValidatorDutiesRequest { epoch, pubkeys: validator_pubkeys .iter() .map(|pubkey| pubkey.clone().into()) .collect(), }; self.url("duties") .into_future() .and_then(move |url| client.json_post::<_>(url, bulk_request)) .and_then(|response| error_for_status(response).map_err(Error::from)) .and_then(|mut success| success.json().map_err(Error::from)) } /// Posts a block to the beacon node, expecting it to verify it and publish it to the network. pub fn publish_block( &self, block: SignedBeaconBlock, ) -> impl Future { let client = self.0.clone(); self.url("block") .into_future() .and_then(move |url| client.json_post::<_>(url, block)) .and_then(|mut response| { response .text() .map(|text| (response, text)) .map_err(Error::from) }) .and_then(|(response, text)| match response.status() { StatusCode::OK => Ok(PublishStatus::Valid), StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(text)), _ => response .error_for_status() .map_err(Error::from) .map(|_| PublishStatus::Unknown), }) } /// Requests a new (unsigned) block from the beacon node. pub fn produce_block( &self, slot: Slot, randao_reveal: Signature, ) -> impl Future, Error = Error> { let client = self.0.clone(); self.url("block").into_future().and_then(move |url| { client.json_get::>( url, vec![ ("slot".into(), format!("{}", slot.as_u64())), ("randao_reveal".into(), as_ssz_hex_string(&randao_reveal)), ], ) }) } /// Subscribes a list of validators to particular slots for attestation production/publication. pub fn subscribe( &self, subscriptions: Vec, ) -> impl Future { let client = self.0.clone(); self.url("subscribe") .into_future() .and_then(move |url| client.json_post::<_>(url, subscriptions)) .and_then(|mut response| { response .text() .map(|text| (response, text)) .map_err(Error::from) }) .and_then(|(response, text)| match response.status() { StatusCode::OK => Ok(PublishStatus::Valid), StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(text)), _ => response .error_for_status() .map_err(Error::from) .map(|_| PublishStatus::Unknown), }) } } /// Provides the functions on the `/beacon` endpoint of the node. #[derive(Clone)] pub struct Beacon(HttpClient); impl Beacon { fn url(&self, path: &str) -> Result { self.0 .url("beacon/") .and_then(move |url| url.join(path).map_err(Error::from)) .map_err(Into::into) } /// Returns the genesis time. pub fn get_genesis_time(&self) -> impl Future { let client = self.0.clone(); self.url("genesis_time") .into_future() .and_then(move |url| client.json_get(url, vec![])) } /// Returns the genesis validators root. pub fn get_genesis_validators_root(&self) -> impl Future { let client = self.0.clone(); self.url("genesis_validators_root") .into_future() .and_then(move |url| client.json_get(url, vec![])) } /// Returns the fork at the head of the beacon chain. pub fn get_fork(&self) -> impl Future { let client = self.0.clone(); self.url("fork") .into_future() .and_then(move |url| client.json_get(url, vec![])) } /// Returns info about the head of the canonical beacon chain. pub fn get_head(&self) -> impl Future { let client = self.0.clone(); self.url("head") .into_future() .and_then(move |url| client.json_get::(url, vec![])) } /// Returns the set of known beacon chain head blocks. One of these will be the canonical head. pub fn get_heads(&self) -> impl Future, Error = Error> { let client = self.0.clone(); self.url("heads") .into_future() .and_then(move |url| client.json_get(url, vec![])) } /// Returns the block and block root at the given slot. pub fn get_block_by_slot( &self, slot: Slot, ) -> impl Future, Hash256), Error = Error> { self.get_block("slot".to_string(), format!("{}", slot.as_u64())) } /// Returns the block and block root at the given root. pub fn get_block_by_root( &self, root: Hash256, ) -> impl Future, Hash256), Error = Error> { self.get_block("root".to_string(), root_as_string(root)) } /// Returns the block and block root at the given slot. fn get_block( &self, query_key: String, query_param: String, ) -> impl Future, Hash256), Error = Error> { let client = self.0.clone(); self.url("block") .into_future() .and_then(move |url| { client.json_get::>(url, vec![(query_key, query_param)]) }) .map(|response| (response.beacon_block, response.root)) } /// Returns the state and state root at the given slot. pub fn get_state_by_slot( &self, slot: Slot, ) -> impl Future, Hash256), Error = Error> { self.get_state("slot".to_string(), format!("{}", slot.as_u64())) } /// Returns the state and state root at the given root. pub fn get_state_by_root( &self, root: Hash256, ) -> impl Future, Hash256), Error = Error> { self.get_state("root".to_string(), root_as_string(root)) } /// Returns the root of the state at the given slot. pub fn get_state_root(&self, slot: Slot) -> impl Future { let client = self.0.clone(); self.url("state_root").into_future().and_then(move |url| { client.json_get(url, vec![("slot".into(), format!("{}", slot.as_u64()))]) }) } /// Returns the root of the block at the given slot. pub fn get_block_root(&self, slot: Slot) -> impl Future { let client = self.0.clone(); self.url("block_root").into_future().and_then(move |url| { client.json_get(url, vec![("slot".into(), format!("{}", slot.as_u64()))]) }) } /// Returns the state and state root at the given slot. fn get_state( &self, query_key: String, query_param: String, ) -> impl Future, Hash256), Error = Error> { let client = self.0.clone(); self.url("state") .into_future() .and_then(move |url| { client.json_get::>(url, vec![(query_key, query_param)]) }) .map(|response| (response.beacon_state, response.root)) } /// Returns the block and block root at the given slot. /// /// If `state_root` is `Some`, the query will use the given state instead of the default /// canonical head state. pub fn get_validators( &self, validator_pubkeys: Vec, state_root: Option, ) -> impl Future, Error = Error> { let client = self.0.clone(); let bulk_request = ValidatorRequest { state_root, pubkeys: validator_pubkeys .iter() .map(|pubkey| pubkey.clone().into()) .collect(), }; self.url("validators") .into_future() .and_then(move |url| client.json_post::<_>(url, bulk_request)) .and_then(|response| error_for_status(response).map_err(Error::from)) .and_then(|mut success| success.json().map_err(Error::from)) } /// Returns all validators. /// /// If `state_root` is `Some`, the query will use the given state instead of the default /// canonical head state. pub fn get_all_validators( &self, state_root: Option, ) -> impl Future, Error = Error> { let client = self.0.clone(); let query_params = if let Some(state_root) = state_root { vec![("state_root".into(), root_as_string(state_root))] } else { vec![] }; self.url("validators/all") .into_future() .and_then(move |url| client.json_get(url, query_params)) } /// Returns the active validators. /// /// If `state_root` is `Some`, the query will use the given state instead of the default /// canonical head state. pub fn get_active_validators( &self, state_root: Option, ) -> impl Future, Error = Error> { let client = self.0.clone(); let query_params = if let Some(state_root) = state_root { vec![("state_root".into(), root_as_string(state_root))] } else { vec![] }; self.url("validators/active") .into_future() .and_then(move |url| client.json_get(url, query_params)) } /// Returns committees at the given epoch. pub fn get_committees( &self, epoch: Epoch, ) -> impl Future, Error = Error> { let client = self.0.clone(); self.url("committees").into_future().and_then(move |url| { client.json_get(url, vec![("epoch".into(), format!("{}", epoch.as_u64()))]) }) } pub fn proposer_slashing( &self, proposer_slashing: ProposerSlashing, ) -> impl Future { let client = self.0.clone(); self.url("proposer_slashing") .into_future() .and_then(move |url| { client .json_post::<_>(url, proposer_slashing) .and_then(|response| error_for_status(response).map_err(Error::from)) .and_then(|mut success| success.json().map_err(Error::from)) }) } pub fn attester_slashing( &self, attester_slashing: AttesterSlashing, ) -> impl Future { let client = self.0.clone(); self.url("attester_slashing") .into_future() .and_then(move |url| { client .json_post::<_>(url, attester_slashing) .and_then(|response| error_for_status(response).map_err(Error::from)) .and_then(|mut success| success.json().map_err(Error::from)) }) } } /// Provides the functions on the `/spec` endpoint of the node. #[derive(Clone)] pub struct Spec(HttpClient); impl Spec { fn url(&self, path: &str) -> Result { self.0 .url("spec/") .and_then(move |url| url.join(path).map_err(Error::from)) .map_err(Into::into) } pub fn get_eth2_config(&self) -> impl Future { let client = self.0.clone(); self.url("eth2_config") .into_future() .and_then(move |url| client.json_get(url, vec![])) } } /// Provides the functions on the `/node` endpoint of the node. #[derive(Clone)] pub struct Node(HttpClient); impl Node { fn url(&self, path: &str) -> Result { self.0 .url("node/") .and_then(move |url| url.join(path).map_err(Error::from)) .map_err(Into::into) } pub fn get_version(&self) -> impl Future { let client = self.0.clone(); self.url("version") .into_future() .and_then(move |url| client.json_get(url, vec![])) } } /// Provides the functions on the `/advanced` endpoint of the node. #[derive(Clone)] pub struct Advanced(HttpClient); impl Advanced { fn url(&self, path: &str) -> Result { self.0 .url("advanced/") .and_then(move |url| url.join(path).map_err(Error::from)) .map_err(Into::into) } /// Gets the core `ProtoArray` struct from the node. pub fn get_fork_choice(&self) -> impl Future { let client = self.0.clone(); self.url("fork_choice") .into_future() .and_then(move |url| client.json_get(url, vec![])) } /// Gets the core `PersistedOperationPool` struct from the node. pub fn get_operation_pool( &self, ) -> impl Future, Error = Error> { let client = self.0.clone(); self.url("operation_pool") .into_future() .and_then(move |url| client.json_get(url, vec![])) } } /// Provides the functions on the `/consensus` endpoint of the node. #[derive(Clone)] pub struct Consensus(HttpClient); impl Consensus { fn url(&self, path: &str) -> Result { self.0 .url("consensus/") .and_then(move |url| url.join(path).map_err(Error::from)) .map_err(Into::into) } /// Gets a `IndividualVote` for each of the given `pubkeys`. pub fn get_individual_votes( &self, epoch: Epoch, pubkeys: Vec, ) -> impl Future { let client = self.0.clone(); let req_body = IndividualVotesRequest { epoch, pubkeys }; self.url("individual_votes") .into_future() .and_then(move |url| client.json_post::<_>(url, req_body)) .and_then(|response| error_for_status(response).map_err(Error::from)) .and_then(|mut success| success.json().map_err(Error::from)) } /// Gets a `VoteCount` for the given `epoch`. pub fn get_vote_count( &self, epoch: Epoch, ) -> impl Future { let client = self.0.clone(); let query_params = vec![("epoch".into(), format!("{}", epoch.as_u64()))]; self.url("vote_count") .into_future() .and_then(move |url| client.json_get(url, query_params)) } } #[derive(Deserialize)] #[serde(bound = "T: EthSpec")] pub struct BlockResponse { pub beacon_block: SignedBeaconBlock, pub root: Hash256, } #[derive(Deserialize)] #[serde(bound = "T: EthSpec")] pub struct StateResponse { pub beacon_state: BeaconState, pub root: Hash256, } fn root_as_string(root: Hash256) -> String { format!("0x{:?}", root) } fn as_ssz_hex_string(item: &T) -> String { format!("0x{}", hex::encode(item.as_ssz_bytes())) } impl From for Error { fn from(e: reqwest::Error) -> Error { Error::ReqwestError(e) } } impl From for Error { fn from(e: url::ParseError) -> Error { Error::UrlParseError(e) } } impl From for Error { fn from(e: serde_json::Error) -> Error { Error::SerdeJsonError(e) } }