Port remote_beacon_node to stable futures

This commit is contained in:
pawanjay176
2020-04-28 17:37:53 +05:30
parent 5227d321f6
commit 5266d214f7
3 changed files with 1096 additions and 1235 deletions

1879
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -7,10 +7,10 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
reqwest = "0.9" reqwest = { version = "0.10", features = ["json"] }
url = "1.2" url = "1.2"
serde = "1.0" serde = "1.0"
futures = "0.1.25" futures = "0.3.4"
types = { path = "../../../eth2/types" } types = { path = "../../../eth2/types" }
rest_types = { path = "../rest_types" } rest_types = { path = "../rest_types" }
hex = "0.3" hex = "0.3"

View File

@@ -4,11 +4,7 @@
//! Presently, this is only used for testing but it _could_ become a user-facing library. //! Presently, this is only used for testing but it _could_ become a user-facing library.
use eth2_config::Eth2Config; use eth2_config::Eth2Config;
use futures::{future, Future, IntoFuture}; use reqwest::{Client, ClientBuilder, Response, StatusCode};
use reqwest::{
r#async::{Client, ClientBuilder, Response},
StatusCode,
};
use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde::{de::DeserializeOwned, Deserialize, Serialize};
use ssz::Encode; use ssz::Encode;
use std::marker::PhantomData; use std::marker::PhantomData;
@@ -119,33 +115,33 @@ impl<E: EthSpec> HttpClient<E> {
self.url.join(path).map_err(|e| e.into()) self.url.join(path).map_err(|e| e.into())
} }
pub fn json_post<T: Serialize>( pub async fn json_post<T: Serialize>(&self, url: Url, body: T) -> Result<Response, Error> {
&self,
url: Url,
body: T,
) -> impl Future<Item = Response, Error = Error> {
self.client self.client
.post(&url.to_string()) .post(&url.to_string())
.json(&body) .json(&body)
.send() .send()
.await
.map_err(Error::from) .map_err(Error::from)
} }
pub fn json_get<T: DeserializeOwned>( pub async fn json_get<T: DeserializeOwned>(
&self, &self,
mut url: Url, mut url: Url,
query_pairs: Vec<(String, String)>, query_pairs: Vec<(String, String)>,
) -> impl Future<Item = T, Error = Error> { ) -> Result<T, Error> {
query_pairs.into_iter().for_each(|(key, param)| { query_pairs.into_iter().for_each(|(key, param)| {
url.query_pairs_mut().append_pair(&key, &param); url.query_pairs_mut().append_pair(&key, &param);
}); });
self.client let response = self
.client
.get(&url.to_string()) .get(&url.to_string())
.send() .send()
.map_err(Error::from) .await
.and_then(|response| error_for_status(response).map_err(Error::from)) .map_err(Error::from)?;
.and_then(|mut success| success.json::<T>().map_err(Error::from))
let success = error_for_status(response).await.map_err(Error::from)?;
success.json::<T>().await.map_err(Error::from)
} }
} }
@@ -153,18 +149,17 @@ impl<E: EthSpec> HttpClient<E> {
/// ///
/// Distinct from `Response::error_for_status` because it includes the body of the response as /// Distinct from `Response::error_for_status` because it includes the body of the response as
/// text. This ensures the error message from the server is not discarded. /// text. This ensures the error message from the server is not discarded.
fn error_for_status( async fn error_for_status(response: Response) -> Result<Response, Error> {
mut response: Response,
) -> Box<dyn Future<Item = Response, Error = Error> + Send> {
let status = response.status(); let status = response.status();
if status.is_success() { if status.is_success() {
Box::new(future::ok(response)) return Ok(response);
} else { } else {
Box::new(response.text().then(move |text_result| match text_result { let text_result = response.text().await;
match text_result {
Err(e) => Err(Error::ReqwestError(e)), Err(e) => Err(Error::ReqwestError(e)),
Ok(body) => Err(Error::DidNotSucceed { status, body }), Ok(body) => Err(Error::DidNotSucceed { status, body }),
})) }
} }
} }
@@ -199,94 +194,86 @@ impl<E: EthSpec> Validator<E> {
} }
/// Produces an unsigned attestation. /// Produces an unsigned attestation.
pub fn produce_attestation( pub async fn produce_attestation(
&self, &self,
slot: Slot, slot: Slot,
committee_index: CommitteeIndex, committee_index: CommitteeIndex,
) -> impl Future<Item = Attestation<E>, Error = Error> { ) -> Result<Attestation<E>, Error> {
let query_params = vec![ let query_params = vec![
("slot".into(), format!("{}", slot)), ("slot".into(), format!("{}", slot)),
("committee_index".into(), format!("{}", committee_index)), ("committee_index".into(), format!("{}", committee_index)),
]; ];
let client = self.0.clone(); let client = self.0.clone();
self.url("attestation") let url = self.url("attestation")?;
.into_future() client.json_get(url, query_params).await
.and_then(move |url| client.json_get(url, query_params))
} }
/// Produces an aggregate attestation. /// Produces an aggregate attestation.
pub fn produce_aggregate_attestation( pub async fn produce_aggregate_attestation(
&self, &self,
attestation_data: &AttestationData, attestation_data: &AttestationData,
) -> impl Future<Item = Attestation<E>, Error = Error> { ) -> Result<Attestation<E>, Error> {
let query_params = vec![( let query_params = vec![(
"attestation_data".into(), "attestation_data".into(),
as_ssz_hex_string(attestation_data), as_ssz_hex_string(attestation_data),
)]; )];
let client = self.0.clone(); let client = self.0.clone();
self.url("aggregate_attestation") let url = self.url("aggregate_attestation")?;
.into_future() client.json_get(url, query_params).await
.and_then(move |url| client.json_get(url, query_params))
} }
/// Posts a list of attestations to the beacon node, expecting it to verify it and publish it to the network. /// Posts a list of attestations to the beacon node, expecting it to verify it and publish it to the network.
pub fn publish_attestations( pub async fn publish_attestations(
&self, &self,
attestation: Vec<Attestation<E>>, attestation: Vec<Attestation<E>>,
) -> impl Future<Item = PublishStatus, Error = Error> { ) -> Result<PublishStatus, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("attestations") let url = self.url("attestations")?;
.into_future() let response = client.json_post::<_>(url, attestation).await?;
.and_then(move |url| client.json_post::<_>(url, attestation))
.and_then(|mut response| { match response.status() {
response
.text()
.map(|text| (response, text))
.map_err(Error::from)
})
.and_then(|(response, text)| match response.status() {
StatusCode::OK => Ok(PublishStatus::Valid), StatusCode::OK => Ok(PublishStatus::Valid),
StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(text)), StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(
response.text().await.map_err(Error::from)?,
)),
_ => response _ => response
.error_for_status() .error_for_status()
.map_err(Error::from) .map_err(Error::from)
.map(|_| PublishStatus::Unknown), .map(|_| PublishStatus::Unknown),
}) }
} }
/// Posts a list of signed aggregates and proofs to the beacon node, expecting it to verify it and publish it to the network. /// Posts a list of signed aggregates and proofs to the beacon node, expecting it to verify it and publish it to the network.
pub fn publish_aggregate_and_proof( pub async fn publish_aggregate_and_proof(
&self, &self,
signed_aggregate_and_proofs: Vec<SignedAggregateAndProof<E>>, signed_aggregate_and_proofs: Vec<SignedAggregateAndProof<E>>,
) -> impl Future<Item = PublishStatus, Error = Error> { ) -> Result<PublishStatus, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("aggregate_and_proofs") let url = self.url("aggregate_and_proofs")?;
.into_future() let response = client
.and_then(move |url| client.json_post::<_>(url, signed_aggregate_and_proofs)) .json_post::<_>(url, signed_aggregate_and_proofs)
.and_then(|mut response| { .await?;
response
.text() match response.status() {
.map(|text| (response, text))
.map_err(Error::from)
})
.and_then(|(response, text)| match response.status() {
StatusCode::OK => Ok(PublishStatus::Valid), StatusCode::OK => Ok(PublishStatus::Valid),
StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(text)), StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(
response.text().await.map_err(Error::from)?,
)),
_ => response _ => response
.error_for_status() .error_for_status()
.map_err(Error::from) .map_err(Error::from)
.map(|_| PublishStatus::Unknown), .map(|_| PublishStatus::Unknown),
}) }
} }
/// Returns the duties required of the given validator pubkeys in the given epoch. /// Returns the duties required of the given validator pubkeys in the given epoch.
pub fn get_duties( pub async fn get_duties(
&self, &self,
epoch: Epoch, epoch: Epoch,
validator_pubkeys: &[PublicKey], validator_pubkeys: &[PublicKey],
) -> impl Future<Item = Vec<ValidatorDutyBytes>, Error = Error> { ) -> Result<Vec<ValidatorDutyBytes>, Error> {
let client = self.0.clone(); let client = self.0.clone();
let bulk_request = ValidatorDutiesRequest { let bulk_request = ValidatorDutiesRequest {
@@ -297,79 +284,68 @@ impl<E: EthSpec> Validator<E> {
.collect(), .collect(),
}; };
self.url("duties") let url = self.url("duties")?;
.into_future() let response = client.json_post::<_>(url, bulk_request).await?;
.and_then(move |url| client.json_post::<_>(url, bulk_request)) let success = error_for_status(response).await.map_err(Error::from)?;
.and_then(|response| error_for_status(response).map_err(Error::from)) success.json().await.map_err(Error::from)
.and_then(|mut success| success.json().map_err(Error::from))
} }
/// Posts a block to the beacon node, expecting it to verify it and publish it to the network. /// Posts a block to the beacon node, expecting it to verify it and publish it to the network.
pub fn publish_block( pub async fn publish_block(&self, block: SignedBeaconBlock<E>) -> Result<PublishStatus, Error> {
&self,
block: SignedBeaconBlock<E>,
) -> impl Future<Item = PublishStatus, Error = Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("block") let url = self.url("block")?;
.into_future() let response = client.json_post::<_>(url, block).await?;
.and_then(move |url| client.json_post::<_>(url, block))
.and_then(|mut response| { match response.status() {
response
.text()
.map(|text| (response, text))
.map_err(Error::from)
})
.and_then(|(response, text)| match response.status() {
StatusCode::OK => Ok(PublishStatus::Valid), StatusCode::OK => Ok(PublishStatus::Valid),
StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(text)), StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(
response.text().await.map_err(Error::from)?,
)),
_ => response _ => response
.error_for_status() .error_for_status()
.map_err(Error::from) .map_err(Error::from)
.map(|_| PublishStatus::Unknown), .map(|_| PublishStatus::Unknown),
}) }
} }
/// Requests a new (unsigned) block from the beacon node. /// Requests a new (unsigned) block from the beacon node.
pub fn produce_block( pub async fn produce_block(
&self, &self,
slot: Slot, slot: Slot,
randao_reveal: Signature, randao_reveal: Signature,
) -> impl Future<Item = BeaconBlock<E>, Error = Error> { ) -> Result<BeaconBlock<E>, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("block").into_future().and_then(move |url| { let url = self.url("block")?;
client.json_get::<BeaconBlock<E>>( client
.json_get::<BeaconBlock<E>>(
url, url,
vec![ vec![
("slot".into(), format!("{}", slot.as_u64())), ("slot".into(), format!("{}", slot.as_u64())),
("randao_reveal".into(), as_ssz_hex_string(&randao_reveal)), ("randao_reveal".into(), as_ssz_hex_string(&randao_reveal)),
], ],
) )
}) .await
} }
/// Subscribes a list of validators to particular slots for attestation production/publication. /// Subscribes a list of validators to particular slots for attestation production/publication.
pub fn subscribe( pub async fn subscribe(
&self, &self,
subscriptions: Vec<ValidatorSubscription>, subscriptions: Vec<ValidatorSubscription>,
) -> impl Future<Item = PublishStatus, Error = Error> { ) -> Result<PublishStatus, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("subscribe") let url = self.url("subscribe")?;
.into_future() let response = client.json_post::<_>(url, subscriptions).await?;
.and_then(move |url| client.json_post::<_>(url, subscriptions))
.and_then(|mut response| { match response.status() {
response
.text()
.map(|text| (response, text))
.map_err(Error::from)
})
.and_then(|(response, text)| match response.status() {
StatusCode::OK => Ok(PublishStatus::Valid), StatusCode::OK => Ok(PublishStatus::Valid),
StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(text)), StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(
response.text().await.map_err(Error::from)?,
)),
_ => response _ => response
.error_for_status() .error_for_status()
.map_err(Error::from) .map_err(Error::from)
.map(|_| PublishStatus::Unknown), .map(|_| PublishStatus::Unknown),
}) }
} }
} }
@@ -386,120 +362,116 @@ impl<E: EthSpec> Beacon<E> {
} }
/// Returns the genesis time. /// Returns the genesis time.
pub fn get_genesis_time(&self) -> impl Future<Item = u64, Error = Error> { pub async fn get_genesis_time(&self) -> Result<u64, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("genesis_time") let url = self.url("genesis_time")?;
.into_future() client.json_get(url, vec![]).await
.and_then(move |url| client.json_get(url, vec![]))
} }
/// Returns the genesis validators root. /// Returns the genesis validators root.
pub fn get_genesis_validators_root(&self) -> impl Future<Item = Hash256, Error = Error> { pub async fn get_genesis_validators_root(&self) -> Result<Hash256, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("genesis_validators_root") let url = self.url("genesis_validators_root")?;
.into_future() client.json_get(url, vec![]).await
.and_then(move |url| client.json_get(url, vec![]))
} }
/// Returns the fork at the head of the beacon chain. /// Returns the fork at the head of the beacon chain.
pub fn get_fork(&self) -> impl Future<Item = Fork, Error = Error> { pub async fn get_fork(&self) -> Result<Fork, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("fork") let url = self.url("fork")?;
.into_future() client.json_get(url, vec![]).await
.and_then(move |url| client.json_get(url, vec![]))
} }
/// Returns info about the head of the canonical beacon chain. /// Returns info about the head of the canonical beacon chain.
pub fn get_head(&self) -> impl Future<Item = CanonicalHeadResponse, Error = Error> { pub async fn get_head(&self) -> Result<CanonicalHeadResponse, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("head") let url = self.url("head")?;
.into_future() client.json_get::<CanonicalHeadResponse>(url, vec![]).await
.and_then(move |url| client.json_get::<CanonicalHeadResponse>(url, vec![]))
} }
/// Returns the set of known beacon chain head blocks. One of these will be the canonical head. /// Returns the set of known beacon chain head blocks. One of these will be the canonical head.
pub fn get_heads(&self) -> impl Future<Item = Vec<HeadBeaconBlock>, Error = Error> { pub async fn get_heads(&self) -> Result<Vec<HeadBeaconBlock>, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("heads") let url = self.url("heads")?;
.into_future() client.json_get(url, vec![]).await
.and_then(move |url| client.json_get(url, vec![]))
} }
/// Returns the block and block root at the given slot. /// Returns the block and block root at the given slot.
pub fn get_block_by_slot( pub async fn get_block_by_slot(
&self, &self,
slot: Slot, slot: Slot,
) -> impl Future<Item = (SignedBeaconBlock<E>, Hash256), Error = Error> { ) -> Result<(SignedBeaconBlock<E>, Hash256), Error> {
self.get_block("slot".to_string(), format!("{}", slot.as_u64())) self.get_block("slot".to_string(), format!("{}", slot.as_u64()))
.await
} }
/// Returns the block and block root at the given root. /// Returns the block and block root at the given root.
pub fn get_block_by_root( pub async fn get_block_by_root(
&self, &self,
root: Hash256, root: Hash256,
) -> impl Future<Item = (SignedBeaconBlock<E>, Hash256), Error = Error> { ) -> Result<(SignedBeaconBlock<E>, Hash256), Error> {
self.get_block("root".to_string(), root_as_string(root)) self.get_block("root".to_string(), root_as_string(root))
.await
} }
/// Returns the block and block root at the given slot. /// Returns the block and block root at the given slot.
fn get_block( async fn get_block(
&self, &self,
query_key: String, query_key: String,
query_param: String, query_param: String,
) -> impl Future<Item = (SignedBeaconBlock<E>, Hash256), Error = Error> { ) -> Result<(SignedBeaconBlock<E>, Hash256), Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("block") let url = self.url("block")?;
.into_future() client
.and_then(move |url| { .json_get::<BlockResponse<E>>(url, vec![(query_key, query_param)])
client.json_get::<BlockResponse<E>>(url, vec![(query_key, query_param)]) .await
})
.map(|response| (response.beacon_block, response.root)) .map(|response| (response.beacon_block, response.root))
} }
/// Returns the state and state root at the given slot. /// Returns the state and state root at the given slot.
pub fn get_state_by_slot( pub async fn get_state_by_slot(&self, slot: Slot) -> Result<(BeaconState<E>, Hash256), Error> {
&self,
slot: Slot,
) -> impl Future<Item = (BeaconState<E>, Hash256), Error = Error> {
self.get_state("slot".to_string(), format!("{}", slot.as_u64())) self.get_state("slot".to_string(), format!("{}", slot.as_u64()))
.await
} }
/// Returns the state and state root at the given root. /// Returns the state and state root at the given root.
pub fn get_state_by_root( pub async fn get_state_by_root(
&self, &self,
root: Hash256, root: Hash256,
) -> impl Future<Item = (BeaconState<E>, Hash256), Error = Error> { ) -> Result<(BeaconState<E>, Hash256), Error> {
self.get_state("root".to_string(), root_as_string(root)) self.get_state("root".to_string(), root_as_string(root))
.await
} }
/// Returns the root of the state at the given slot. /// Returns the root of the state at the given slot.
pub fn get_state_root(&self, slot: Slot) -> impl Future<Item = Hash256, Error = Error> { pub async fn get_state_root(&self, slot: Slot) -> Result<Hash256, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("state_root").into_future().and_then(move |url| { let url = self.url("state_root")?;
client.json_get(url, vec![("slot".into(), format!("{}", slot.as_u64()))]) client
}) .json_get(url, vec![("slot".into(), format!("{}", slot.as_u64()))])
.await
} }
/// Returns the root of the block at the given slot. /// Returns the root of the block at the given slot.
pub fn get_block_root(&self, slot: Slot) -> impl Future<Item = Hash256, Error = Error> { pub async fn get_block_root(&self, slot: Slot) -> Result<Hash256, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("block_root").into_future().and_then(move |url| { let url = self.url("block_root")?;
client.json_get(url, vec![("slot".into(), format!("{}", slot.as_u64()))]) client
}) .json_get(url, vec![("slot".into(), format!("{}", slot.as_u64()))])
.await
} }
/// Returns the state and state root at the given slot. /// Returns the state and state root at the given slot.
fn get_state( async fn get_state(
&self, &self,
query_key: String, query_key: String,
query_param: String, query_param: String,
) -> impl Future<Item = (BeaconState<E>, Hash256), Error = Error> { ) -> Result<(BeaconState<E>, Hash256), Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("state") let url = self.url("state")?;
.into_future() client
.and_then(move |url| { .json_get::<StateResponse<E>>(url, vec![(query_key, query_param)])
client.json_get::<StateResponse<E>>(url, vec![(query_key, query_param)]) .await
})
.map(|response| (response.beacon_state, response.root)) .map(|response| (response.beacon_state, response.root))
} }
@@ -507,11 +479,11 @@ impl<E: EthSpec> Beacon<E> {
/// ///
/// If `state_root` is `Some`, the query will use the given state instead of the default /// If `state_root` is `Some`, the query will use the given state instead of the default
/// canonical head state. /// canonical head state.
pub fn get_validators( pub async fn get_validators(
&self, &self,
validator_pubkeys: Vec<PublicKey>, validator_pubkeys: Vec<PublicKey>,
state_root: Option<Hash256>, state_root: Option<Hash256>,
) -> impl Future<Item = Vec<ValidatorResponse>, Error = Error> { ) -> Result<Vec<ValidatorResponse>, Error> {
let client = self.0.clone(); let client = self.0.clone();
let bulk_request = ValidatorRequest { let bulk_request = ValidatorRequest {
@@ -522,21 +494,20 @@ impl<E: EthSpec> Beacon<E> {
.collect(), .collect(),
}; };
self.url("validators") let url = self.url("validators")?;
.into_future() let response = client.json_post::<_>(url, bulk_request).await?;
.and_then(move |url| client.json_post::<_>(url, bulk_request)) let success = error_for_status(response).await.map_err(Error::from)?;
.and_then(|response| error_for_status(response).map_err(Error::from)) success.json().await.map_err(Error::from)
.and_then(|mut success| success.json().map_err(Error::from))
} }
/// Returns all validators. /// Returns all validators.
/// ///
/// If `state_root` is `Some`, the query will use the given state instead of the default /// If `state_root` is `Some`, the query will use the given state instead of the default
/// canonical head state. /// canonical head state.
pub fn get_all_validators( pub async fn get_all_validators(
&self, &self,
state_root: Option<Hash256>, state_root: Option<Hash256>,
) -> impl Future<Item = Vec<ValidatorResponse>, Error = Error> { ) -> Result<Vec<ValidatorResponse>, Error> {
let client = self.0.clone(); let client = self.0.clone();
let query_params = if let Some(state_root) = state_root { let query_params = if let Some(state_root) = state_root {
@@ -545,19 +516,18 @@ impl<E: EthSpec> Beacon<E> {
vec![] vec![]
}; };
self.url("validators/all") let url = self.url("validators/all")?;
.into_future() client.json_get(url, query_params).await
.and_then(move |url| client.json_get(url, query_params))
} }
/// Returns the active validators. /// Returns the active validators.
/// ///
/// If `state_root` is `Some`, the query will use the given state instead of the default /// If `state_root` is `Some`, the query will use the given state instead of the default
/// canonical head state. /// canonical head state.
pub fn get_active_validators( pub async fn get_active_validators(
&self, &self,
state_root: Option<Hash256>, state_root: Option<Hash256>,
) -> impl Future<Item = Vec<ValidatorResponse>, Error = Error> { ) -> Result<Vec<ValidatorResponse>, Error> {
let client = self.0.clone(); let client = self.0.clone();
let query_params = if let Some(state_root) = state_root { let query_params = if let Some(state_root) = state_root {
@@ -566,53 +536,42 @@ impl<E: EthSpec> Beacon<E> {
vec![] vec![]
}; };
self.url("validators/active") let url = self.url("validators/active")?;
.into_future() client.json_get(url, query_params).await
.and_then(move |url| client.json_get(url, query_params))
} }
/// Returns committees at the given epoch. /// Returns committees at the given epoch.
pub fn get_committees( pub async fn get_committees(&self, epoch: Epoch) -> Result<Vec<Committee>, Error> {
&self,
epoch: Epoch,
) -> impl Future<Item = Vec<Committee>, Error = Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("committees").into_future().and_then(move |url| { let url = self.url("committees")?;
client.json_get(url, vec![("epoch".into(), format!("{}", epoch.as_u64()))]) client
}) .json_get(url, vec![("epoch".into(), format!("{}", epoch.as_u64()))])
.await
} }
pub fn proposer_slashing( pub async fn proposer_slashing(
&self, &self,
proposer_slashing: ProposerSlashing, proposer_slashing: ProposerSlashing,
) -> impl Future<Item = bool, Error = Error> { ) -> Result<bool, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("proposer_slashing") let url = self.url("proposer_slashing")?;
.into_future() let response = client.json_post::<_>(url, proposer_slashing).await?;
.and_then(move |url| { let success = error_for_status(response).await.map_err(Error::from)?;
client success.json().await.map_err(Error::from)
.json_post::<_>(url, proposer_slashing)
.and_then(|response| error_for_status(response).map_err(Error::from))
.and_then(|mut success| success.json().map_err(Error::from))
})
} }
pub fn attester_slashing( pub async fn attester_slashing(
&self, &self,
attester_slashing: AttesterSlashing<E>, attester_slashing: AttesterSlashing<E>,
) -> impl Future<Item = bool, Error = Error> { ) -> Result<bool, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("attester_slashing") let url = self.url("attester_slashing")?;
.into_future() let response = client.json_post::<_>(url, attester_slashing).await?;
.and_then(move |url| { let success = error_for_status(response).await.map_err(Error::from)?;
client success.json().await.map_err(Error::from)
.json_post::<_>(url, attester_slashing)
.and_then(|response| error_for_status(response).map_err(Error::from))
.and_then(|mut success| success.json().map_err(Error::from))
})
} }
} }
@@ -628,11 +587,10 @@ impl<E: EthSpec> Spec<E> {
.map_err(Into::into) .map_err(Into::into)
} }
pub fn get_eth2_config(&self) -> impl Future<Item = Eth2Config, Error = Error> { pub async fn get_eth2_config(&self) -> Result<Eth2Config, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("eth2_config") let url = self.url("eth2_config")?;
.into_future() client.json_get(url, vec![]).await
.and_then(move |url| client.json_get(url, vec![]))
} }
} }
@@ -648,11 +606,10 @@ impl<E: EthSpec> Node<E> {
.map_err(Into::into) .map_err(Into::into)
} }
pub fn get_version(&self) -> impl Future<Item = String, Error = Error> { pub async fn get_version(&self) -> Result<String, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("version") let url = self.url("version")?;
.into_future() client.json_get(url, vec![]).await
.and_then(move |url| client.json_get(url, vec![]))
} }
} }
@@ -669,21 +626,17 @@ impl<E: EthSpec> Advanced<E> {
} }
/// Gets the core `ProtoArray` struct from the node. /// Gets the core `ProtoArray` struct from the node.
pub fn get_fork_choice(&self) -> impl Future<Item = ProtoArray, Error = Error> { pub async fn get_fork_choice(&self) -> Result<ProtoArray, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("fork_choice") let url = self.url("fork_choice")?;
.into_future() client.json_get(url, vec![]).await
.and_then(move |url| client.json_get(url, vec![]))
} }
/// Gets the core `PersistedOperationPool` struct from the node. /// Gets the core `PersistedOperationPool` struct from the node.
pub fn get_operation_pool( pub async fn get_operation_pool(&self) -> Result<PersistedOperationPool<E>, Error> {
&self,
) -> impl Future<Item = PersistedOperationPool<E>, Error = Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("operation_pool") let url = self.url("operation_pool")?;
.into_future() client.json_get(url, vec![]).await
.and_then(move |url| client.json_get(url, vec![]))
} }
} }
@@ -700,31 +653,26 @@ impl<E: EthSpec> Consensus<E> {
} }
/// Gets a `IndividualVote` for each of the given `pubkeys`. /// Gets a `IndividualVote` for each of the given `pubkeys`.
pub fn get_individual_votes( pub async fn get_individual_votes(
&self, &self,
epoch: Epoch, epoch: Epoch,
pubkeys: Vec<PublicKeyBytes>, pubkeys: Vec<PublicKeyBytes>,
) -> impl Future<Item = IndividualVotesResponse, Error = Error> { ) -> Result<IndividualVotesResponse, Error> {
let client = self.0.clone(); let client = self.0.clone();
let req_body = IndividualVotesRequest { epoch, pubkeys }; let req_body = IndividualVotesRequest { epoch, pubkeys };
self.url("individual_votes") let url = self.url("individual_votes")?;
.into_future() let response = client.json_post::<_>(url, req_body).await?;
.and_then(move |url| client.json_post::<_>(url, req_body)) let success = error_for_status(response).await.map_err(Error::from)?;
.and_then(|response| error_for_status(response).map_err(Error::from)) success.json().await.map_err(Error::from)
.and_then(|mut success| success.json().map_err(Error::from))
} }
/// Gets a `VoteCount` for the given `epoch`. /// Gets a `VoteCount` for the given `epoch`.
pub fn get_vote_count( pub async fn get_vote_count(&self, epoch: Epoch) -> Result<IndividualVotesResponse, Error> {
&self,
epoch: Epoch,
) -> impl Future<Item = IndividualVotesResponse, Error = Error> {
let client = self.0.clone(); let client = self.0.clone();
let query_params = vec![("epoch".into(), format!("{}", epoch.as_u64()))]; let query_params = vec![("epoch".into(), format!("{}", epoch.as_u64()))];
self.url("vote_count") let url = self.url("vote_count")?;
.into_future() client.json_get(url, query_params).await
.and_then(move |url| client.json_get(url, query_params))
} }
} }