Port websockets, timer and notifier to stable futures (#1035)

* Fix lcli

* Port timer to stable futures

* Fix timer

* Port websocket_server to stable futures

* Port notifier to stable futures

* Add TODOS

* Port remote_beacon_node to stable futures
This commit is contained in:
Pawan Dhananjay
2020-04-30 11:19:05 +05:30
committed by GitHub
parent 281502396f
commit 85baec87f5
10 changed files with 1228 additions and 1376 deletions

1879
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -2,13 +2,13 @@ use crate::metrics;
use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_chain::{BeaconChain, BeaconChainTypes};
use environment::RuntimeContext; use environment::RuntimeContext;
use eth2_libp2p::NetworkGlobals; use eth2_libp2p::NetworkGlobals;
use futures::{Future, Stream}; use futures::{FutureExt, StreamExt, TryFutureExt};
use parking_lot::Mutex; use parking_lot::Mutex;
use slog::{debug, error, info, warn}; use slog::{debug, error, info, warn};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::Duration;
use tokio::timer::Interval; use tokio::time::{interval_at, Instant};
use types::{EthSpec, Slot}; use types::{EthSpec, Slot};
/// Create a warning log whenever the peer count is at or below this value. /// Create a warning log whenever the peer count is at or below this value.
@@ -32,9 +32,7 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
network: Arc<NetworkGlobals<T::EthSpec>>, network: Arc<NetworkGlobals<T::EthSpec>>,
milliseconds_per_slot: u64, milliseconds_per_slot: u64,
) -> Result<tokio::sync::oneshot::Sender<()>, String> { ) -> Result<tokio::sync::oneshot::Sender<()>, String> {
let log_1 = context.log.clone(); let log = context.log.clone();
let log_2 = context.log.clone();
let log_3 = context.log.clone();
let slot_duration = Duration::from_millis(milliseconds_per_slot); let slot_duration = Duration::from_millis(milliseconds_per_slot);
let duration_to_next_slot = beacon_chain let duration_to_next_slot = beacon_chain
@@ -50,31 +48,40 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
let speedo = Mutex::new(Speedo::default()); let speedo = Mutex::new(Speedo::default());
let interval_future = Interval::new(start_instant, interval_duration) // Note: `interval_at` panics when interval_duration is 0
.map_err( // TODO: `Return type of closure passed to `for_each` is restricted to `Future<Output = ()>`
move |e| error!(log_1, "Slot notifier timer failed"; "error" => format!("{:?}", e)), // Hence, shifting the .then() error logs into the `for_each` closure.
) // Can be solved with `TryStreamExt::try_for_each` if `Interval` implemented `TryStream`.
.for_each(move |_| { // Check if this can be refactored better.
let log = log_2.clone(); let interval_future = interval_at(start_instant, interval_duration).for_each(|_| {
let connected_peer_count = network.connected_peers(); let connected_peer_count = network.connected_peers();
let head_info = beacon_chain.head_info() let head_info = match beacon_chain.head_info() {
.map_err(|e| error!( Ok(head) => head,
Err(e) => {
error!(
log, log,
"Failed to get beacon chain head info"; "Notifier failed to notify, Failed to get beacon chain head info";
"error" => format!("{:?}", e) "error" => format!("{:?}", e)
))?; );
return futures::future::ready(());
}
};
let head_slot = head_info.slot; let head_slot = head_info.slot;
let head_epoch = head_slot.epoch(T::EthSpec::slots_per_epoch()); let head_epoch = head_slot.epoch(T::EthSpec::slots_per_epoch());
let current_slot = beacon_chain.slot().map_err(|e| { let current_slot = match beacon_chain.slot() {
Ok(slot) => slot,
Err(e) => {
error!( error!(
log, log,
"Unable to read current slot"; "Notify failed to notify, Unable to read current slot";
"error" => format!("{:?}", e) "error" => format!("{:?}", e)
) );
})?; return futures::future::ready(());
}
};
let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch());
let finalized_epoch = head_info.finalized_checkpoint.epoch; let finalized_epoch = head_info.finalized_checkpoint.epoch;
let finalized_root = head_info.finalized_checkpoint.root; let finalized_root = head_info.finalized_checkpoint.root;
@@ -83,7 +90,10 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
let mut speedo = speedo.lock(); let mut speedo = speedo.lock();
speedo.observe(head_slot, Instant::now()); speedo.observe(head_slot, Instant::now());
metrics::set_gauge(&metrics::SYNC_SLOTS_PER_SECOND, speedo.slots_per_second().unwrap_or_else(|| 0_f64) as i64); metrics::set_gauge(
&metrics::SYNC_SLOTS_PER_SECOND,
speedo.slots_per_second().unwrap_or_else(|| 0_f64) as i64,
);
// The next two lines take advantage of saturating subtraction on `Slot`. // The next two lines take advantage of saturating subtraction on `Slot`.
let head_distance = current_slot - head_slot; let head_distance = current_slot - head_slot;
@@ -119,13 +129,13 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
"est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(current_slot)), "est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(current_slot)),
); );
return Ok(()); return futures::future::ready(());
}; };
macro_rules! not_quite_synced_log { macro_rules! not_quite_synced_log {
($message: expr) => { ($message: expr) => {
info!( info!(
log_2, log,
$message; $message;
"peers" => peer_count_pretty(connected_peer_count), "peers" => peer_count_pretty(connected_peer_count),
"finalized_root" => format!("{}", finalized_root), "finalized_root" => format!("{}", finalized_root),
@@ -142,7 +152,7 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
not_quite_synced_log!("Synced to current epoch") not_quite_synced_log!("Synced to current epoch")
} else { } else {
info!( info!(
log_2, log,
"Synced"; "Synced";
"peers" => peer_count_pretty(connected_peer_count), "peers" => peer_count_pretty(connected_peer_count),
"finalized_root" => format!("{}", finalized_root), "finalized_root" => format!("{}", finalized_root),
@@ -152,25 +162,15 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
); );
}; };
Ok(()) futures::future::ready(())
}) });
.then(move |result| {
match result {
Ok(()) => Ok(()),
Err(e) => {
error!(
log_3,
"Notifier failed to notify";
"error" => format!("{:?}", e)
);
Ok(())
} } });
let (exit_signal, exit) = tokio::sync::oneshot::channel(); let (exit_signal, exit) = tokio::sync::oneshot::channel();
context let future = futures::future::select(interval_future, exit.map_err(|_| ()).map(|_| ()));
.executor
.spawn(interval_future.select(exit).map(|_| ()).map_err(|_| ())); // TODO: check if the runtime handle should spawn this future.
tokio::task::spawn(future);
Ok(exit_signal) Ok(exit_signal)
} }

View File

@@ -8,7 +8,7 @@ edition = "2018"
beacon_chain = { path = "../beacon_chain" } beacon_chain = { path = "../beacon_chain" }
types = { path = "../../eth2/types" } types = { path = "../../eth2/types" }
slot_clock = { path = "../../eth2/utils/slot_clock" } slot_clock = { path = "../../eth2/utils/slot_clock" }
tokio = "0.1.22" tokio = { version = "0.2", features = ["full"] }
slog = "2.5.2" slog = "2.5.2"
parking_lot = "0.10.0" parking_lot = "0.10.0"
futures = "0.1.29" futures = "0.3"

View File

@@ -3,20 +3,22 @@
//! This service allows task execution on the beacon node for various functionality. //! This service allows task execution on the beacon node for various functionality.
use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_chain::{BeaconChain, BeaconChainTypes};
use futures::stream::StreamExt;
use futures::{future, prelude::*}; use futures::{future, prelude::*};
use slog::error;
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::Duration;
use tokio::runtime::TaskExecutor; use tokio::runtime::Handle;
use tokio::timer::Interval; use tokio::time::{interval_at, Instant};
/// Spawns a timer service which periodically executes tasks for the beacon chain /// Spawns a timer service which periodically executes tasks for the beacon chain
/// TODO: We might not need a `Handle` to the runtime since this function should be
/// called from the context of a runtime and we can simply spawn using task::spawn.
/// Check for issues without the Handle.
pub fn spawn<T: BeaconChainTypes>( pub fn spawn<T: BeaconChainTypes>(
executor: &TaskExecutor, handle: &Handle,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
milliseconds_per_slot: u64, milliseconds_per_slot: u64,
log: slog::Logger,
) -> Result<tokio::sync::oneshot::Sender<()>, &'static str> { ) -> Result<tokio::sync::oneshot::Sender<()>, &'static str> {
let (exit_signal, exit) = tokio::sync::oneshot::channel(); let (exit_signal, exit) = tokio::sync::oneshot::channel();
@@ -26,25 +28,15 @@ pub fn spawn<T: BeaconChainTypes>(
.duration_to_next_slot() .duration_to_next_slot()
.ok_or_else(|| "slot_notifier unable to determine time to next slot")?; .ok_or_else(|| "slot_notifier unable to determine time to next slot")?;
let timer_future = Interval::new(start_instant, Duration::from_millis(milliseconds_per_slot)) // Warning: `interval_at` panics if `milliseconds_per_slot` = 0.
.map_err(move |e| { let timer_future = interval_at(start_instant, Duration::from_millis(milliseconds_per_slot))
error!(
log,
"Beacon chain timer failed";
"error" => format!("{:?}", e)
)
})
.for_each(move |_| { .for_each(move |_| {
beacon_chain.per_slot_task(); beacon_chain.per_slot_task();
future::ok(()) future::ready(())
}); });
executor.spawn( let future = futures::future::select(timer_future, exit.map_err(|_| ()).map(|_| ()));
exit.map_err(|_| ()) handle.spawn(future);
.select(timer_future)
.map(|_| ())
.map_err(|_| ()),
);
Ok(exit_signal) Ok(exit_signal)
} }

View File

@@ -7,11 +7,11 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
futures = "0.1.29" futures = "0.3"
serde = "1.0.102" serde = "1.0.102"
serde_derive = "1.0.102" serde_derive = "1.0.102"
serde_json = "1.0.41" serde_json = "1.0.41"
slog = "2.5.2" slog = "2.5.2"
tokio = "0.1.22" tokio = { version = "0.2", features = ["full"] }
types = { path = "../../eth2/types" } types = { path = "../../eth2/types" }
ws = "0.9.1" ws = "0.9.1"

View File

@@ -1,9 +1,8 @@
use futures::Future; use futures::future::TryFutureExt;
use slog::{debug, error, info, warn, Logger}; use slog::{debug, error, info, warn, Logger};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::thread; use tokio::runtime::Handle;
use tokio::runtime::TaskExecutor;
use types::EthSpec; use types::EthSpec;
use ws::{Sender, WebSocket}; use ws::{Sender, WebSocket};
@@ -38,7 +37,7 @@ impl<T: EthSpec> WebSocketSender<T> {
pub fn start_server<T: EthSpec>( pub fn start_server<T: EthSpec>(
config: &Config, config: &Config,
executor: &TaskExecutor, handle: &Handle,
log: &Logger, log: &Logger,
) -> Result< ) -> Result<
( (
@@ -87,19 +86,22 @@ pub fn start_server<T: EthSpec>(
} else { } else {
info!(log_inner, "Websocket server shutdown"); info!(log_inner, "Websocket server shutdown");
} }
Ok(()) futures::future::ok(())
}) })
.map_err(|_| ()); .map_err(|_| ());
// Place a future on the executor that will shutdown the websocket server when the // Place a future on the handle that will shutdown the websocket server when the
// application exits. // application exits.
executor.spawn(exit_future); // TODO: check if we should spawn using a `Handle` or using `task::spawn`
handle.spawn(exit_future);
exit_channel exit_channel
}; };
let log_inner = log.clone(); let log_inner = log.clone();
let _handle = thread::spawn(move || match server.run() { // TODO: using tokio `spawn_blocking` instead of `thread::spawn`
// Check which is more apt.
let _handle = tokio::task::spawn_blocking(move || match server.run() {
Ok(_) => { Ok(_) => {
debug!( debug!(
log_inner, log_inner,

View File

@@ -7,10 +7,10 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
reqwest = "0.9" reqwest = { version = "0.10", features = ["json"] }
url = "1.2" url = "1.2"
serde = "1.0" serde = "1.0"
futures = "0.1.25" futures = "0.3.4"
types = { path = "../../../eth2/types" } types = { path = "../../../eth2/types" }
rest_types = { path = "../rest_types" } rest_types = { path = "../rest_types" }
hex = "0.3" hex = "0.3"

View File

@@ -4,11 +4,7 @@
//! Presently, this is only used for testing but it _could_ become a user-facing library. //! Presently, this is only used for testing but it _could_ become a user-facing library.
use eth2_config::Eth2Config; use eth2_config::Eth2Config;
use futures::{future, Future, IntoFuture}; use reqwest::{Client, ClientBuilder, Response, StatusCode};
use reqwest::{
r#async::{Client, ClientBuilder, Response},
StatusCode,
};
use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde::{de::DeserializeOwned, Deserialize, Serialize};
use ssz::Encode; use ssz::Encode;
use std::marker::PhantomData; use std::marker::PhantomData;
@@ -119,33 +115,33 @@ impl<E: EthSpec> HttpClient<E> {
self.url.join(path).map_err(|e| e.into()) self.url.join(path).map_err(|e| e.into())
} }
pub fn json_post<T: Serialize>( pub async fn json_post<T: Serialize>(&self, url: Url, body: T) -> Result<Response, Error> {
&self,
url: Url,
body: T,
) -> impl Future<Item = Response, Error = Error> {
self.client self.client
.post(&url.to_string()) .post(&url.to_string())
.json(&body) .json(&body)
.send() .send()
.await
.map_err(Error::from) .map_err(Error::from)
} }
pub fn json_get<T: DeserializeOwned>( pub async fn json_get<T: DeserializeOwned>(
&self, &self,
mut url: Url, mut url: Url,
query_pairs: Vec<(String, String)>, query_pairs: Vec<(String, String)>,
) -> impl Future<Item = T, Error = Error> { ) -> Result<T, Error> {
query_pairs.into_iter().for_each(|(key, param)| { query_pairs.into_iter().for_each(|(key, param)| {
url.query_pairs_mut().append_pair(&key, &param); url.query_pairs_mut().append_pair(&key, &param);
}); });
self.client let response = self
.client
.get(&url.to_string()) .get(&url.to_string())
.send() .send()
.map_err(Error::from) .await
.and_then(|response| error_for_status(response).map_err(Error::from)) .map_err(Error::from)?;
.and_then(|mut success| success.json::<T>().map_err(Error::from))
let success = error_for_status(response).await.map_err(Error::from)?;
success.json::<T>().await.map_err(Error::from)
} }
} }
@@ -153,18 +149,17 @@ impl<E: EthSpec> HttpClient<E> {
/// ///
/// Distinct from `Response::error_for_status` because it includes the body of the response as /// Distinct from `Response::error_for_status` because it includes the body of the response as
/// text. This ensures the error message from the server is not discarded. /// text. This ensures the error message from the server is not discarded.
fn error_for_status( async fn error_for_status(response: Response) -> Result<Response, Error> {
mut response: Response,
) -> Box<dyn Future<Item = Response, Error = Error> + Send> {
let status = response.status(); let status = response.status();
if status.is_success() { if status.is_success() {
Box::new(future::ok(response)) return Ok(response);
} else { } else {
Box::new(response.text().then(move |text_result| match text_result { let text_result = response.text().await;
match text_result {
Err(e) => Err(Error::ReqwestError(e)), Err(e) => Err(Error::ReqwestError(e)),
Ok(body) => Err(Error::DidNotSucceed { status, body }), Ok(body) => Err(Error::DidNotSucceed { status, body }),
})) }
} }
} }
@@ -199,94 +194,86 @@ impl<E: EthSpec> Validator<E> {
} }
/// Produces an unsigned attestation. /// Produces an unsigned attestation.
pub fn produce_attestation( pub async fn produce_attestation(
&self, &self,
slot: Slot, slot: Slot,
committee_index: CommitteeIndex, committee_index: CommitteeIndex,
) -> impl Future<Item = Attestation<E>, Error = Error> { ) -> Result<Attestation<E>, Error> {
let query_params = vec![ let query_params = vec![
("slot".into(), format!("{}", slot)), ("slot".into(), format!("{}", slot)),
("committee_index".into(), format!("{}", committee_index)), ("committee_index".into(), format!("{}", committee_index)),
]; ];
let client = self.0.clone(); let client = self.0.clone();
self.url("attestation") let url = self.url("attestation")?;
.into_future() client.json_get(url, query_params).await
.and_then(move |url| client.json_get(url, query_params))
} }
/// Produces an aggregate attestation. /// Produces an aggregate attestation.
pub fn produce_aggregate_attestation( pub async fn produce_aggregate_attestation(
&self, &self,
attestation_data: &AttestationData, attestation_data: &AttestationData,
) -> impl Future<Item = Attestation<E>, Error = Error> { ) -> Result<Attestation<E>, Error> {
let query_params = vec![( let query_params = vec![(
"attestation_data".into(), "attestation_data".into(),
as_ssz_hex_string(attestation_data), as_ssz_hex_string(attestation_data),
)]; )];
let client = self.0.clone(); let client = self.0.clone();
self.url("aggregate_attestation") let url = self.url("aggregate_attestation")?;
.into_future() client.json_get(url, query_params).await
.and_then(move |url| client.json_get(url, query_params))
} }
/// Posts a list of attestations to the beacon node, expecting it to verify it and publish it to the network. /// Posts a list of attestations to the beacon node, expecting it to verify it and publish it to the network.
pub fn publish_attestations( pub async fn publish_attestations(
&self, &self,
attestation: Vec<Attestation<E>>, attestation: Vec<Attestation<E>>,
) -> impl Future<Item = PublishStatus, Error = Error> { ) -> Result<PublishStatus, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("attestations") let url = self.url("attestations")?;
.into_future() let response = client.json_post::<_>(url, attestation).await?;
.and_then(move |url| client.json_post::<_>(url, attestation))
.and_then(|mut response| { match response.status() {
response
.text()
.map(|text| (response, text))
.map_err(Error::from)
})
.and_then(|(response, text)| match response.status() {
StatusCode::OK => Ok(PublishStatus::Valid), StatusCode::OK => Ok(PublishStatus::Valid),
StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(text)), StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(
response.text().await.map_err(Error::from)?,
)),
_ => response _ => response
.error_for_status() .error_for_status()
.map_err(Error::from) .map_err(Error::from)
.map(|_| PublishStatus::Unknown), .map(|_| PublishStatus::Unknown),
}) }
} }
/// Posts a list of signed aggregates and proofs to the beacon node, expecting it to verify it and publish it to the network. /// Posts a list of signed aggregates and proofs to the beacon node, expecting it to verify it and publish it to the network.
pub fn publish_aggregate_and_proof( pub async fn publish_aggregate_and_proof(
&self, &self,
signed_aggregate_and_proofs: Vec<SignedAggregateAndProof<E>>, signed_aggregate_and_proofs: Vec<SignedAggregateAndProof<E>>,
) -> impl Future<Item = PublishStatus, Error = Error> { ) -> Result<PublishStatus, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("aggregate_and_proofs") let url = self.url("aggregate_and_proofs")?;
.into_future() let response = client
.and_then(move |url| client.json_post::<_>(url, signed_aggregate_and_proofs)) .json_post::<_>(url, signed_aggregate_and_proofs)
.and_then(|mut response| { .await?;
response
.text() match response.status() {
.map(|text| (response, text))
.map_err(Error::from)
})
.and_then(|(response, text)| match response.status() {
StatusCode::OK => Ok(PublishStatus::Valid), StatusCode::OK => Ok(PublishStatus::Valid),
StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(text)), StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(
response.text().await.map_err(Error::from)?,
)),
_ => response _ => response
.error_for_status() .error_for_status()
.map_err(Error::from) .map_err(Error::from)
.map(|_| PublishStatus::Unknown), .map(|_| PublishStatus::Unknown),
}) }
} }
/// Returns the duties required of the given validator pubkeys in the given epoch. /// Returns the duties required of the given validator pubkeys in the given epoch.
pub fn get_duties( pub async fn get_duties(
&self, &self,
epoch: Epoch, epoch: Epoch,
validator_pubkeys: &[PublicKey], validator_pubkeys: &[PublicKey],
) -> impl Future<Item = Vec<ValidatorDutyBytes>, Error = Error> { ) -> Result<Vec<ValidatorDutyBytes>, Error> {
let client = self.0.clone(); let client = self.0.clone();
let bulk_request = ValidatorDutiesRequest { let bulk_request = ValidatorDutiesRequest {
@@ -297,79 +284,68 @@ impl<E: EthSpec> Validator<E> {
.collect(), .collect(),
}; };
self.url("duties") let url = self.url("duties")?;
.into_future() let response = client.json_post::<_>(url, bulk_request).await?;
.and_then(move |url| client.json_post::<_>(url, bulk_request)) let success = error_for_status(response).await.map_err(Error::from)?;
.and_then(|response| error_for_status(response).map_err(Error::from)) success.json().await.map_err(Error::from)
.and_then(|mut success| success.json().map_err(Error::from))
} }
/// Posts a block to the beacon node, expecting it to verify it and publish it to the network. /// Posts a block to the beacon node, expecting it to verify it and publish it to the network.
pub fn publish_block( pub async fn publish_block(&self, block: SignedBeaconBlock<E>) -> Result<PublishStatus, Error> {
&self,
block: SignedBeaconBlock<E>,
) -> impl Future<Item = PublishStatus, Error = Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("block") let url = self.url("block")?;
.into_future() let response = client.json_post::<_>(url, block).await?;
.and_then(move |url| client.json_post::<_>(url, block))
.and_then(|mut response| { match response.status() {
response
.text()
.map(|text| (response, text))
.map_err(Error::from)
})
.and_then(|(response, text)| match response.status() {
StatusCode::OK => Ok(PublishStatus::Valid), StatusCode::OK => Ok(PublishStatus::Valid),
StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(text)), StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(
response.text().await.map_err(Error::from)?,
)),
_ => response _ => response
.error_for_status() .error_for_status()
.map_err(Error::from) .map_err(Error::from)
.map(|_| PublishStatus::Unknown), .map(|_| PublishStatus::Unknown),
}) }
} }
/// Requests a new (unsigned) block from the beacon node. /// Requests a new (unsigned) block from the beacon node.
pub fn produce_block( pub async fn produce_block(
&self, &self,
slot: Slot, slot: Slot,
randao_reveal: Signature, randao_reveal: Signature,
) -> impl Future<Item = BeaconBlock<E>, Error = Error> { ) -> Result<BeaconBlock<E>, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("block").into_future().and_then(move |url| { let url = self.url("block")?;
client.json_get::<BeaconBlock<E>>( client
.json_get::<BeaconBlock<E>>(
url, url,
vec![ vec![
("slot".into(), format!("{}", slot.as_u64())), ("slot".into(), format!("{}", slot.as_u64())),
("randao_reveal".into(), as_ssz_hex_string(&randao_reveal)), ("randao_reveal".into(), as_ssz_hex_string(&randao_reveal)),
], ],
) )
}) .await
} }
/// Subscribes a list of validators to particular slots for attestation production/publication. /// Subscribes a list of validators to particular slots for attestation production/publication.
pub fn subscribe( pub async fn subscribe(
&self, &self,
subscriptions: Vec<ValidatorSubscription>, subscriptions: Vec<ValidatorSubscription>,
) -> impl Future<Item = PublishStatus, Error = Error> { ) -> Result<PublishStatus, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("subscribe") let url = self.url("subscribe")?;
.into_future() let response = client.json_post::<_>(url, subscriptions).await?;
.and_then(move |url| client.json_post::<_>(url, subscriptions))
.and_then(|mut response| { match response.status() {
response
.text()
.map(|text| (response, text))
.map_err(Error::from)
})
.and_then(|(response, text)| match response.status() {
StatusCode::OK => Ok(PublishStatus::Valid), StatusCode::OK => Ok(PublishStatus::Valid),
StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(text)), StatusCode::ACCEPTED => Ok(PublishStatus::Invalid(
response.text().await.map_err(Error::from)?,
)),
_ => response _ => response
.error_for_status() .error_for_status()
.map_err(Error::from) .map_err(Error::from)
.map(|_| PublishStatus::Unknown), .map(|_| PublishStatus::Unknown),
}) }
} }
} }
@@ -386,120 +362,116 @@ impl<E: EthSpec> Beacon<E> {
} }
/// Returns the genesis time. /// Returns the genesis time.
pub fn get_genesis_time(&self) -> impl Future<Item = u64, Error = Error> { pub async fn get_genesis_time(&self) -> Result<u64, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("genesis_time") let url = self.url("genesis_time")?;
.into_future() client.json_get(url, vec![]).await
.and_then(move |url| client.json_get(url, vec![]))
} }
/// Returns the genesis validators root. /// Returns the genesis validators root.
pub fn get_genesis_validators_root(&self) -> impl Future<Item = Hash256, Error = Error> { pub async fn get_genesis_validators_root(&self) -> Result<Hash256, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("genesis_validators_root") let url = self.url("genesis_validators_root")?;
.into_future() client.json_get(url, vec![]).await
.and_then(move |url| client.json_get(url, vec![]))
} }
/// Returns the fork at the head of the beacon chain. /// Returns the fork at the head of the beacon chain.
pub fn get_fork(&self) -> impl Future<Item = Fork, Error = Error> { pub async fn get_fork(&self) -> Result<Fork, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("fork") let url = self.url("fork")?;
.into_future() client.json_get(url, vec![]).await
.and_then(move |url| client.json_get(url, vec![]))
} }
/// Returns info about the head of the canonical beacon chain. /// Returns info about the head of the canonical beacon chain.
pub fn get_head(&self) -> impl Future<Item = CanonicalHeadResponse, Error = Error> { pub async fn get_head(&self) -> Result<CanonicalHeadResponse, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("head") let url = self.url("head")?;
.into_future() client.json_get::<CanonicalHeadResponse>(url, vec![]).await
.and_then(move |url| client.json_get::<CanonicalHeadResponse>(url, vec![]))
} }
/// Returns the set of known beacon chain head blocks. One of these will be the canonical head. /// Returns the set of known beacon chain head blocks. One of these will be the canonical head.
pub fn get_heads(&self) -> impl Future<Item = Vec<HeadBeaconBlock>, Error = Error> { pub async fn get_heads(&self) -> Result<Vec<HeadBeaconBlock>, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("heads") let url = self.url("heads")?;
.into_future() client.json_get(url, vec![]).await
.and_then(move |url| client.json_get(url, vec![]))
} }
/// Returns the block and block root at the given slot. /// Returns the block and block root at the given slot.
pub fn get_block_by_slot( pub async fn get_block_by_slot(
&self, &self,
slot: Slot, slot: Slot,
) -> impl Future<Item = (SignedBeaconBlock<E>, Hash256), Error = Error> { ) -> Result<(SignedBeaconBlock<E>, Hash256), Error> {
self.get_block("slot".to_string(), format!("{}", slot.as_u64())) self.get_block("slot".to_string(), format!("{}", slot.as_u64()))
.await
} }
/// Returns the block and block root at the given root. /// Returns the block and block root at the given root.
pub fn get_block_by_root( pub async fn get_block_by_root(
&self, &self,
root: Hash256, root: Hash256,
) -> impl Future<Item = (SignedBeaconBlock<E>, Hash256), Error = Error> { ) -> Result<(SignedBeaconBlock<E>, Hash256), Error> {
self.get_block("root".to_string(), root_as_string(root)) self.get_block("root".to_string(), root_as_string(root))
.await
} }
/// Returns the block and block root at the given slot. /// Returns the block and block root at the given slot.
fn get_block( async fn get_block(
&self, &self,
query_key: String, query_key: String,
query_param: String, query_param: String,
) -> impl Future<Item = (SignedBeaconBlock<E>, Hash256), Error = Error> { ) -> Result<(SignedBeaconBlock<E>, Hash256), Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("block") let url = self.url("block")?;
.into_future() client
.and_then(move |url| { .json_get::<BlockResponse<E>>(url, vec![(query_key, query_param)])
client.json_get::<BlockResponse<E>>(url, vec![(query_key, query_param)]) .await
})
.map(|response| (response.beacon_block, response.root)) .map(|response| (response.beacon_block, response.root))
} }
/// Returns the state and state root at the given slot. /// Returns the state and state root at the given slot.
pub fn get_state_by_slot( pub async fn get_state_by_slot(&self, slot: Slot) -> Result<(BeaconState<E>, Hash256), Error> {
&self,
slot: Slot,
) -> impl Future<Item = (BeaconState<E>, Hash256), Error = Error> {
self.get_state("slot".to_string(), format!("{}", slot.as_u64())) self.get_state("slot".to_string(), format!("{}", slot.as_u64()))
.await
} }
/// Returns the state and state root at the given root. /// Returns the state and state root at the given root.
pub fn get_state_by_root( pub async fn get_state_by_root(
&self, &self,
root: Hash256, root: Hash256,
) -> impl Future<Item = (BeaconState<E>, Hash256), Error = Error> { ) -> Result<(BeaconState<E>, Hash256), Error> {
self.get_state("root".to_string(), root_as_string(root)) self.get_state("root".to_string(), root_as_string(root))
.await
} }
/// Returns the root of the state at the given slot. /// Returns the root of the state at the given slot.
pub fn get_state_root(&self, slot: Slot) -> impl Future<Item = Hash256, Error = Error> { pub async fn get_state_root(&self, slot: Slot) -> Result<Hash256, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("state_root").into_future().and_then(move |url| { let url = self.url("state_root")?;
client.json_get(url, vec![("slot".into(), format!("{}", slot.as_u64()))]) client
}) .json_get(url, vec![("slot".into(), format!("{}", slot.as_u64()))])
.await
} }
/// Returns the root of the block at the given slot. /// Returns the root of the block at the given slot.
pub fn get_block_root(&self, slot: Slot) -> impl Future<Item = Hash256, Error = Error> { pub async fn get_block_root(&self, slot: Slot) -> Result<Hash256, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("block_root").into_future().and_then(move |url| { let url = self.url("block_root")?;
client.json_get(url, vec![("slot".into(), format!("{}", slot.as_u64()))]) client
}) .json_get(url, vec![("slot".into(), format!("{}", slot.as_u64()))])
.await
} }
/// Returns the state and state root at the given slot. /// Returns the state and state root at the given slot.
fn get_state( async fn get_state(
&self, &self,
query_key: String, query_key: String,
query_param: String, query_param: String,
) -> impl Future<Item = (BeaconState<E>, Hash256), Error = Error> { ) -> Result<(BeaconState<E>, Hash256), Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("state") let url = self.url("state")?;
.into_future() client
.and_then(move |url| { .json_get::<StateResponse<E>>(url, vec![(query_key, query_param)])
client.json_get::<StateResponse<E>>(url, vec![(query_key, query_param)]) .await
})
.map(|response| (response.beacon_state, response.root)) .map(|response| (response.beacon_state, response.root))
} }
@@ -507,11 +479,11 @@ impl<E: EthSpec> Beacon<E> {
/// ///
/// If `state_root` is `Some`, the query will use the given state instead of the default /// If `state_root` is `Some`, the query will use the given state instead of the default
/// canonical head state. /// canonical head state.
pub fn get_validators( pub async fn get_validators(
&self, &self,
validator_pubkeys: Vec<PublicKey>, validator_pubkeys: Vec<PublicKey>,
state_root: Option<Hash256>, state_root: Option<Hash256>,
) -> impl Future<Item = Vec<ValidatorResponse>, Error = Error> { ) -> Result<Vec<ValidatorResponse>, Error> {
let client = self.0.clone(); let client = self.0.clone();
let bulk_request = ValidatorRequest { let bulk_request = ValidatorRequest {
@@ -522,21 +494,20 @@ impl<E: EthSpec> Beacon<E> {
.collect(), .collect(),
}; };
self.url("validators") let url = self.url("validators")?;
.into_future() let response = client.json_post::<_>(url, bulk_request).await?;
.and_then(move |url| client.json_post::<_>(url, bulk_request)) let success = error_for_status(response).await.map_err(Error::from)?;
.and_then(|response| error_for_status(response).map_err(Error::from)) success.json().await.map_err(Error::from)
.and_then(|mut success| success.json().map_err(Error::from))
} }
/// Returns all validators. /// Returns all validators.
/// ///
/// If `state_root` is `Some`, the query will use the given state instead of the default /// If `state_root` is `Some`, the query will use the given state instead of the default
/// canonical head state. /// canonical head state.
pub fn get_all_validators( pub async fn get_all_validators(
&self, &self,
state_root: Option<Hash256>, state_root: Option<Hash256>,
) -> impl Future<Item = Vec<ValidatorResponse>, Error = Error> { ) -> Result<Vec<ValidatorResponse>, Error> {
let client = self.0.clone(); let client = self.0.clone();
let query_params = if let Some(state_root) = state_root { let query_params = if let Some(state_root) = state_root {
@@ -545,19 +516,18 @@ impl<E: EthSpec> Beacon<E> {
vec![] vec![]
}; };
self.url("validators/all") let url = self.url("validators/all")?;
.into_future() client.json_get(url, query_params).await
.and_then(move |url| client.json_get(url, query_params))
} }
/// Returns the active validators. /// Returns the active validators.
/// ///
/// If `state_root` is `Some`, the query will use the given state instead of the default /// If `state_root` is `Some`, the query will use the given state instead of the default
/// canonical head state. /// canonical head state.
pub fn get_active_validators( pub async fn get_active_validators(
&self, &self,
state_root: Option<Hash256>, state_root: Option<Hash256>,
) -> impl Future<Item = Vec<ValidatorResponse>, Error = Error> { ) -> Result<Vec<ValidatorResponse>, Error> {
let client = self.0.clone(); let client = self.0.clone();
let query_params = if let Some(state_root) = state_root { let query_params = if let Some(state_root) = state_root {
@@ -566,53 +536,42 @@ impl<E: EthSpec> Beacon<E> {
vec![] vec![]
}; };
self.url("validators/active") let url = self.url("validators/active")?;
.into_future() client.json_get(url, query_params).await
.and_then(move |url| client.json_get(url, query_params))
} }
/// Returns committees at the given epoch. /// Returns committees at the given epoch.
pub fn get_committees( pub async fn get_committees(&self, epoch: Epoch) -> Result<Vec<Committee>, Error> {
&self,
epoch: Epoch,
) -> impl Future<Item = Vec<Committee>, Error = Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("committees").into_future().and_then(move |url| { let url = self.url("committees")?;
client.json_get(url, vec![("epoch".into(), format!("{}", epoch.as_u64()))]) client
}) .json_get(url, vec![("epoch".into(), format!("{}", epoch.as_u64()))])
.await
} }
pub fn proposer_slashing( pub async fn proposer_slashing(
&self, &self,
proposer_slashing: ProposerSlashing, proposer_slashing: ProposerSlashing,
) -> impl Future<Item = bool, Error = Error> { ) -> Result<bool, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("proposer_slashing") let url = self.url("proposer_slashing")?;
.into_future() let response = client.json_post::<_>(url, proposer_slashing).await?;
.and_then(move |url| { let success = error_for_status(response).await.map_err(Error::from)?;
client success.json().await.map_err(Error::from)
.json_post::<_>(url, proposer_slashing)
.and_then(|response| error_for_status(response).map_err(Error::from))
.and_then(|mut success| success.json().map_err(Error::from))
})
} }
pub fn attester_slashing( pub async fn attester_slashing(
&self, &self,
attester_slashing: AttesterSlashing<E>, attester_slashing: AttesterSlashing<E>,
) -> impl Future<Item = bool, Error = Error> { ) -> Result<bool, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("attester_slashing") let url = self.url("attester_slashing")?;
.into_future() let response = client.json_post::<_>(url, attester_slashing).await?;
.and_then(move |url| { let success = error_for_status(response).await.map_err(Error::from)?;
client success.json().await.map_err(Error::from)
.json_post::<_>(url, attester_slashing)
.and_then(|response| error_for_status(response).map_err(Error::from))
.and_then(|mut success| success.json().map_err(Error::from))
})
} }
} }
@@ -628,11 +587,10 @@ impl<E: EthSpec> Spec<E> {
.map_err(Into::into) .map_err(Into::into)
} }
pub fn get_eth2_config(&self) -> impl Future<Item = Eth2Config, Error = Error> { pub async fn get_eth2_config(&self) -> Result<Eth2Config, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("eth2_config") let url = self.url("eth2_config")?;
.into_future() client.json_get(url, vec![]).await
.and_then(move |url| client.json_get(url, vec![]))
} }
} }
@@ -648,11 +606,10 @@ impl<E: EthSpec> Node<E> {
.map_err(Into::into) .map_err(Into::into)
} }
pub fn get_version(&self) -> impl Future<Item = String, Error = Error> { pub async fn get_version(&self) -> Result<String, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("version") let url = self.url("version")?;
.into_future() client.json_get(url, vec![]).await
.and_then(move |url| client.json_get(url, vec![]))
} }
} }
@@ -669,21 +626,17 @@ impl<E: EthSpec> Advanced<E> {
} }
/// Gets the core `ProtoArray` struct from the node. /// Gets the core `ProtoArray` struct from the node.
pub fn get_fork_choice(&self) -> impl Future<Item = ProtoArray, Error = Error> { pub async fn get_fork_choice(&self) -> Result<ProtoArray, Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("fork_choice") let url = self.url("fork_choice")?;
.into_future() client.json_get(url, vec![]).await
.and_then(move |url| client.json_get(url, vec![]))
} }
/// Gets the core `PersistedOperationPool` struct from the node. /// Gets the core `PersistedOperationPool` struct from the node.
pub fn get_operation_pool( pub async fn get_operation_pool(&self) -> Result<PersistedOperationPool<E>, Error> {
&self,
) -> impl Future<Item = PersistedOperationPool<E>, Error = Error> {
let client = self.0.clone(); let client = self.0.clone();
self.url("operation_pool") let url = self.url("operation_pool")?;
.into_future() client.json_get(url, vec![]).await
.and_then(move |url| client.json_get(url, vec![]))
} }
} }
@@ -700,31 +653,26 @@ impl<E: EthSpec> Consensus<E> {
} }
/// Gets a `IndividualVote` for each of the given `pubkeys`. /// Gets a `IndividualVote` for each of the given `pubkeys`.
pub fn get_individual_votes( pub async fn get_individual_votes(
&self, &self,
epoch: Epoch, epoch: Epoch,
pubkeys: Vec<PublicKeyBytes>, pubkeys: Vec<PublicKeyBytes>,
) -> impl Future<Item = IndividualVotesResponse, Error = Error> { ) -> Result<IndividualVotesResponse, Error> {
let client = self.0.clone(); let client = self.0.clone();
let req_body = IndividualVotesRequest { epoch, pubkeys }; let req_body = IndividualVotesRequest { epoch, pubkeys };
self.url("individual_votes") let url = self.url("individual_votes")?;
.into_future() let response = client.json_post::<_>(url, req_body).await?;
.and_then(move |url| client.json_post::<_>(url, req_body)) let success = error_for_status(response).await.map_err(Error::from)?;
.and_then(|response| error_for_status(response).map_err(Error::from)) success.json().await.map_err(Error::from)
.and_then(|mut success| success.json().map_err(Error::from))
} }
/// Gets a `VoteCount` for the given `epoch`. /// Gets a `VoteCount` for the given `epoch`.
pub fn get_vote_count( pub async fn get_vote_count(&self, epoch: Epoch) -> Result<IndividualVotesResponse, Error> {
&self,
epoch: Epoch,
) -> impl Future<Item = IndividualVotesResponse, Error = Error> {
let client = self.0.clone(); let client = self.0.clone();
let query_params = vec![("epoch".into(), format!("{}", epoch.as_u64()))]; let query_params = vec![("epoch".into(), format!("{}", epoch.as_u64()))];
self.url("vote_count") let url = self.url("vote_count")?;
.into_future() client.json_get(url, query_params).await
.and_then(move |url| client.json_get(url, query_params))
} }
} }

View File

@@ -1,5 +1,4 @@
use clap::ArgMatches; use clap::ArgMatches;
use environment::Environment;
use eth1_test_rig::DepositContract; use eth1_test_rig::DepositContract;
use futures::compat::Future01CompatExt; use futures::compat::Future01CompatExt;
use std::fs::File; use std::fs::File;
@@ -7,7 +6,7 @@ use std::io::Read;
use types::EthSpec; use types::EthSpec;
use web3::{transports::Http, Web3}; use web3::{transports::Http, Web3};
pub async fn run<T: EthSpec>(mut env: Environment<T>, matches: &ArgMatches) -> Result<(), String> { pub async fn run<T: EthSpec>(matches: &ArgMatches<'_>) -> Result<(), String> {
let confirmations = matches let confirmations = matches
.value_of("confirmations") .value_of("confirmations")
.ok_or_else(|| "Confirmations not specified")? .ok_or_else(|| "Confirmations not specified")?

View File

@@ -455,11 +455,9 @@ async fn run<T: EthSpec>(env_builder: EnvironmentBuilder<T>, matches: &ArgMatche
.unwrap_or_else(|e| error!("Failed to transition blocks: {}", e)), .unwrap_or_else(|e| error!("Failed to transition blocks: {}", e)),
("pretty-hex", Some(matches)) => run_parse_hex::<T>(matches) ("pretty-hex", Some(matches)) => run_parse_hex::<T>(matches)
.unwrap_or_else(|e| error!("Failed to pretty print hex: {}", e)), .unwrap_or_else(|e| error!("Failed to pretty print hex: {}", e)),
("deploy-deposit-contract", Some(matches)) => { ("deploy-deposit-contract", Some(matches)) => deploy_deposit_contract::run::<T>(matches)
deploy_deposit_contract::run::<T>(env, matches)
.await .await
.unwrap_or_else(|e| error!("Failed to run deploy-deposit-contract command: {}", e)) .unwrap_or_else(|e| error!("Failed to run deploy-deposit-contract command: {}", e)),
}
("refund-deposit-contract", Some(matches)) => { ("refund-deposit-contract", Some(matches)) => {
refund_deposit_contract::run::<T>(env, matches) refund_deposit_contract::run::<T>(env, matches)
.await .await