diff --git a/beacon_node/rest_api/Cargo.toml b/beacon_node/rest_api/Cargo.toml index 5049ae6eba..6154ab57b3 100644 --- a/beacon_node/rest_api/Cargo.toml +++ b/beacon_node/rest_api/Cargo.toml @@ -25,7 +25,7 @@ state_processing = { path = "../../eth2/state_processing" } types = { path = "../../eth2/types" } http = "0.2.1" hyper = "0.13.5" -tokio = "0.2.20" +tokio = { version = "0.2", features = ["sync"] } url = "2.1.1" lazy_static = "1.4.0" eth2_config = { path = "../../eth2/utils/eth2_config" } diff --git a/beacon_node/rest_api/src/beacon.rs b/beacon_node/rest_api/src/beacon.rs index 0df3673eed..7d41535b30 100644 --- a/beacon_node/rest_api/src/beacon.rs +++ b/beacon_node/rest_api/src/beacon.rs @@ -1,9 +1,8 @@ use crate::helpers::*; use crate::response_builder::ResponseBuilder; use crate::validator::get_state_for_epoch; -use crate::{ApiError, ApiResult, BoxFut, UrlQuery}; +use crate::{ApiError, ApiResult, UrlQuery}; use beacon_chain::{BeaconChain, BeaconChainTypes, StateSkipConfig}; -use futures::{Future, Stream}; use hyper::{Body, Request}; use rest_types::{ BlockResponse, CanonicalHeadResponse, Committee, HeadBeaconBlock, StateResponse, @@ -216,23 +215,22 @@ pub fn get_active_validators( /// /// This method allows for a basically unbounded list of `pubkeys`, where as the `get_validators` /// request is limited by the max number of pubkeys you can fit in a URL. -pub fn post_validators( +pub async fn post_validators( req: Request, beacon_chain: Arc>, -) -> BoxFut { +) -> ApiResult { let response_builder = ResponseBuilder::new(&req); - let future = req - .into_body() - .concat2() - .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e))) - .and_then(|chunks| { - serde_json::from_slice::(&chunks).map_err(|e| { - ApiError::BadRequest(format!( - "Unable to parse JSON into ValidatorRequest: {:?}", - e - )) - }) + let body = req.into_body(); + let chunks = hyper::body::to_bytes(body) + .await + .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?; + serde_json::from_slice::(&chunks) + .map_err(|e| { + ApiError::BadRequest(format!( + "Unable to parse JSON into ValidatorRequest: {:?}", + e + )) }) .and_then(|bulk_request| { validator_responses_by_pubkey( @@ -241,9 +239,7 @@ pub fn post_validators( bulk_request.pubkeys, ) }) - .and_then(|validators| response_builder?.body(&validators)); - - Box::new(future) + .and_then(|validators| response_builder?.body(&validators)) } /// Returns either the state given by `state_root_opt`, or the canonical head state if it is @@ -449,23 +445,23 @@ pub fn get_genesis_validators_root( ResponseBuilder::new(&req)?.body(&beacon_chain.head_info()?.genesis_validators_root) } -pub fn proposer_slashing( +pub async fn proposer_slashing( req: Request, beacon_chain: Arc>, -) -> BoxFut { +) -> ApiResult { let response_builder = ResponseBuilder::new(&req); - let future = req - .into_body() - .concat2() - .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e))) - .and_then(|chunks| { - serde_json::from_slice::(&chunks).map_err(|e| { - ApiError::BadRequest(format!( - "Unable to parse JSON into ProposerSlashing: {:?}", - e - )) - }) + let body = req.into_body(); + let chunks = hyper::body::to_bytes(body) + .await + .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?; + + serde_json::from_slice::(&chunks) + .map_err(|e| { + ApiError::BadRequest(format!( + "Unable to parse JSON into ProposerSlashing: {:?}", + e + )) }) .and_then(move |proposer_slashing| { let spec = &beacon_chain.spec; @@ -481,33 +477,31 @@ pub fn proposer_slashing( )) }) } else { - Err(ApiError::BadRequest( + return Err(ApiError::BadRequest( "Cannot insert proposer slashing on node without Eth1 connection.".to_string(), - )) + )); } }) - .and_then(|_| response_builder?.body(&true)); - - Box::new(future) + .and_then(|_| response_builder?.body(&true)) } -pub fn attester_slashing( +pub async fn attester_slashing( req: Request, beacon_chain: Arc>, -) -> BoxFut { +) -> ApiResult { let response_builder = ResponseBuilder::new(&req); - let future = req - .into_body() - .concat2() - .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e))) - .and_then(|chunks| { - serde_json::from_slice::>(&chunks).map_err(|e| { - ApiError::BadRequest(format!( - "Unable to parse JSON into AttesterSlashing: {:?}", - e - )) - }) + let body = req.into_body(); + let chunks = hyper::body::to_bytes(body) + .await + .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?; + + serde_json::from_slice::>(&chunks) + .map_err(|e| { + ApiError::BadRequest(format!( + "Unable to parse JSON into AttesterSlashing: {:?}", + e + )) }) .and_then(move |attester_slashing| { let spec = &beacon_chain.spec; @@ -528,7 +522,5 @@ pub fn attester_slashing( )) } }) - .and_then(|_| response_builder?.body(&true)); - - Box::new(future) + .and_then(|_| response_builder?.body(&true)) } diff --git a/beacon_node/rest_api/src/consensus.rs b/beacon_node/rest_api/src/consensus.rs index 64b5a5df32..a006b379f7 100644 --- a/beacon_node/rest_api/src/consensus.rs +++ b/beacon_node/rest_api/src/consensus.rs @@ -1,8 +1,7 @@ use crate::helpers::*; use crate::response_builder::ResponseBuilder; -use crate::{ApiError, ApiResult, BoxFut, UrlQuery}; +use crate::{ApiError, ApiResult, UrlQuery}; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use futures::{Future, Stream}; use hyper::{Body, Request}; use rest_types::{IndividualVotesRequest, IndividualVotesResponse}; use serde::{Deserialize, Serialize}; @@ -71,23 +70,23 @@ pub fn get_vote_count( ResponseBuilder::new(&req)?.body(&report) } -pub fn post_individual_votes( +pub async fn post_individual_votes( req: Request, beacon_chain: Arc>, -) -> BoxFut { +) -> ApiResult { let response_builder = ResponseBuilder::new(&req); - let future = req - .into_body() - .concat2() - .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e))) - .and_then(|chunks| { - serde_json::from_slice::(&chunks).map_err(|e| { - ApiError::BadRequest(format!( - "Unable to parse JSON into ValidatorDutiesRequest: {:?}", - e - )) - }) + let body = req.into_body(); + let chunks = hyper::body::to_bytes(body) + .await + .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?; + + serde_json::from_slice::(&chunks) + .map_err(|e| { + ApiError::BadRequest(format!( + "Unable to parse JSON into ValidatorDutiesRequest: {:?}", + e + )) }) .and_then(move |body| { let epoch = body.epoch; @@ -136,7 +135,5 @@ pub fn post_individual_votes( }) .collect::, _>>() }) - .and_then(|votes| response_builder?.body_no_ssz(&votes)); - - Box::new(future) + .and_then(|votes| response_builder?.body_no_ssz(&votes)) } diff --git a/beacon_node/rest_api/src/error.rs b/beacon_node/rest_api/src/error.rs index 913fa8bd6d..0897e62fef 100644 --- a/beacon_node/rest_api/src/error.rs +++ b/beacon_node/rest_api/src/error.rs @@ -1,4 +1,3 @@ -use crate::BoxFut; use hyper::{Body, Response, StatusCode}; use std::error::Error as StdError; @@ -42,12 +41,6 @@ impl Into> for ApiError { } } -impl Into for ApiError { - fn into(self) -> BoxFut { - Box::new(futures::future::err(self)) - } -} - impl From for ApiError { fn from(e: store::Error) -> ApiError { ApiError::ServerError(format!("Database error: {:?}", e)) diff --git a/beacon_node/rest_api/src/helpers.rs b/beacon_node/rest_api/src/helpers.rs index 171a10d246..b07bb97d58 100644 --- a/beacon_node/rest_api/src/helpers.rs +++ b/beacon_node/rest_api/src/helpers.rs @@ -229,14 +229,14 @@ pub fn implementation_pending_response(_req: Request) -> ApiResult { } pub fn publish_beacon_block_to_network( - mut chan: NetworkChannel, + chan: NetworkChannel, block: SignedBeaconBlock, ) -> Result<(), ApiError> { // send the block via SSZ encoding let messages = vec![PubsubMessage::BeaconBlock(Box::new(block))]; // Publish the block to the p2p network via gossipsub. - if let Err(e) = chan.try_send(NetworkMessage::Publish { messages }) { + if let Err(e) = chan.send(NetworkMessage::Publish { messages }) { return Err(ApiError::ServerError(format!( "Unable to send new block to network: {:?}", e diff --git a/beacon_node/rest_api/src/lib.rs b/beacon_node/rest_api/src/lib.rs index 2702f38c90..4159150acc 100644 --- a/beacon_node/rest_api/src/lib.rs +++ b/beacon_node/rest_api/src/lib.rs @@ -26,23 +26,22 @@ pub use config::ApiEncodingFormat; use error::{ApiError, ApiResult}; use eth2_config::Eth2Config; use eth2_libp2p::NetworkGlobals; -use hyper::rt::Future; +use futures::future::TryFutureExt; use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Request, Response, Server}; +use hyper::{Body, Request, Server}; use slog::{info, warn}; use std::net::SocketAddr; use std::ops::Deref; use std::path::PathBuf; use std::sync::Arc; -use tokio::runtime::TaskExecutor; +use tokio::runtime::Handle; use tokio::sync::{mpsc, oneshot}; use url_query::UrlQuery; pub use crate::helpers::parse_pubkey_bytes; pub use config::Config; -pub type BoxFut = Box, Error = ApiError> + Send>; pub type NetworkChannel = mpsc::UnboundedSender>; pub struct NetworkInfo { @@ -54,7 +53,7 @@ pub struct NetworkInfo { #[allow(clippy::too_many_arguments)] pub fn start_server( config: &Config, - executor: &TaskExecutor, + handle: &Handle, beacon_chain: Arc>, network_info: NetworkInfo, db_path: PathBuf, @@ -75,18 +74,20 @@ pub fn start_server( let db_path = db_path.clone(); let freezer_db_path = freezer_db_path.clone(); - service_fn(move |req: Request| { - router::route( - req, - beacon_chain.clone(), - network_globals.clone(), - network_channel.clone(), - eth2_config.clone(), - log.clone(), - db_path.clone(), - freezer_db_path.clone(), - ) - }) + async move { + Ok::<_, hyper::Error>(service_fn(move |req: Request| { + router::route( + req, + beacon_chain.clone(), + network_globals.clone(), + network_channel.clone(), + eth2_config.clone(), + log.clone(), + db_path.clone(), + freezer_db_path.clone(), + ) + })) + } }); let bind_addr = (config.listen_address, config.port).into(); @@ -99,16 +100,22 @@ pub fn start_server( let actual_listen_addr = server.local_addr(); // Build a channel to kill the HTTP server. - let (exit_signal, exit) = oneshot::channel(); + let (exit_signal, exit) = oneshot::channel::<()>(); let inner_log = log.clone(); - let server_exit = exit.and_then(move |_| { - info!(inner_log, "HTTP service shutdown"); - Ok(()) - }); + let server_exit = exit + .and_then(move |_| { + info!(inner_log, "HTTP service shutdown"); + futures::future::ok(()) + }) + .map_err(|_| ()); + // Configure the `hyper` server to gracefully shutdown when the shutdown channel is triggered. let inner_log = log.clone(); let server_future = server - .with_graceful_shutdown(server_exit) + .with_graceful_shutdown(async { + // TODO: Copied from the docs. I think the await is ok here. + server_exit.await.ok(); + }) .map_err(move |e| { warn!( inner_log, @@ -123,7 +130,7 @@ pub fn start_server( "port" => actual_listen_addr.port(), ); - executor.spawn(server_future); + handle.spawn(server_future); Ok((exit_signal, actual_listen_addr)) } diff --git a/beacon_node/rest_api/src/macros.rs b/beacon_node/rest_api/src/macros.rs index e95cfb8aed..f43224e5db 100644 --- a/beacon_node/rest_api/src/macros.rs +++ b/beacon_node/rest_api/src/macros.rs @@ -2,9 +2,7 @@ macro_rules! try_future { ($expr:expr) => { match $expr { core::result::Result::Ok(val) => val, - core::result::Result::Err(err) => { - return Box::new(futures::future::err(std::convert::From::from(err))) - } + core::result::Result::Err(err) => return Err(std::convert::From::from(err)), } }; ($expr:expr,) => { diff --git a/beacon_node/rest_api/src/response_builder.rs b/beacon_node/rest_api/src/response_builder.rs index 0c8752a113..2377167915 100644 --- a/beacon_node/rest_api/src/response_builder.rs +++ b/beacon_node/rest_api/src/response_builder.rs @@ -1,6 +1,6 @@ use super::{ApiError, ApiResult}; use crate::config::ApiEncodingFormat; -use http::header; +use hyper::header; use hyper::{Body, Request, Response, StatusCode}; use serde::Serialize; use ssz::Encode; diff --git a/beacon_node/rest_api/src/router.rs b/beacon_node/rest_api/src/router.rs index 1c86e8ebc7..6c7924cce3 100644 --- a/beacon_node/rest_api/src/router.rs +++ b/beacon_node/rest_api/src/router.rs @@ -1,11 +1,10 @@ use crate::{ advanced, beacon, consensus, error::ApiError, helpers, lighthouse, metrics, network, node, - spec, validator, BoxFut, NetworkChannel, + spec, validator, NetworkChannel, }; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_config::Eth2Config; use eth2_libp2p::NetworkGlobals; -use futures::{Future, IntoFuture}; use hyper::{Body, Error, Method, Request, Response}; use slog::debug; use std::path::PathBuf; @@ -13,17 +12,9 @@ use std::sync::Arc; use std::time::Instant; use types::Slot; -fn into_boxfut(item: F) -> BoxFut -where - F: IntoFuture, Error = ApiError>, - F::Future: Send, -{ - Box::new(item.into_future()) -} - // Allowing more than 7 arguments. #[allow(clippy::too_many_arguments)] -pub fn route( +pub async fn route( req: Request, beacon_chain: Arc>, network_globals: Arc>, @@ -32,7 +23,7 @@ pub fn route( local_log: slog::Logger, db_path: PathBuf, freezer_db_path: PathBuf, -) -> impl Future, Error = Error> { +) -> Result, Error> { metrics::inc_counter(&metrics::REQUEST_COUNT); let timer = metrics::start_timer(&metrics::REQUEST_RESPONSE_TIME); let received_instant = Instant::now(); @@ -40,222 +31,179 @@ pub fn route( let path = req.uri().path().to_string(); let log = local_log.clone(); - let request_result: Box, Error = _> + Send> = - match (req.method(), path.as_ref()) { - // Methods for Client - (&Method::GET, "/node/version") => into_boxfut(node::get_version(req)), - (&Method::GET, "/node/syncing") => { - // inform the current slot, or set to 0 - let current_slot = beacon_chain - .head_info() - .map(|info| info.slot) - .unwrap_or_else(|_| Slot::from(0u64)); + let request_result = match (req.method(), path.as_ref()) { + // Methods for Client + (&Method::GET, "/node/version") => node::get_version(req), + (&Method::GET, "/node/syncing") => { + // inform the current slot, or set to 0 + let current_slot = beacon_chain + .head_info() + .map(|info| info.slot) + .unwrap_or_else(|_| Slot::from(0u64)); - into_boxfut(node::syncing::( - req, - network_globals, - current_slot, - )) - } + node::syncing::(req, network_globals, current_slot) + } - // Methods for Network - (&Method::GET, "/network/enr") => { - into_boxfut(network::get_enr::(req, network_globals)) - } - (&Method::GET, "/network/peer_count") => { - into_boxfut(network::get_peer_count::(req, network_globals)) - } - (&Method::GET, "/network/peer_id") => { - into_boxfut(network::get_peer_id::(req, network_globals)) - } - (&Method::GET, "/network/peers") => { - into_boxfut(network::get_peer_list::(req, network_globals)) - } - (&Method::GET, "/network/listen_port") => { - into_boxfut(network::get_listen_port::(req, network_globals)) - } - (&Method::GET, "/network/listen_addresses") => { - into_boxfut(network::get_listen_addresses::(req, network_globals)) - } + // Methods for Network + (&Method::GET, "/network/enr") => network::get_enr::(req, network_globals), + (&Method::GET, "/network/peer_count") => network::get_peer_count::(req, network_globals), + (&Method::GET, "/network/peer_id") => network::get_peer_id::(req, network_globals), + (&Method::GET, "/network/peers") => network::get_peer_list::(req, network_globals), + (&Method::GET, "/network/listen_port") => { + network::get_listen_port::(req, network_globals) + } + (&Method::GET, "/network/listen_addresses") => { + network::get_listen_addresses::(req, network_globals) + } - // Methods for Beacon Node - (&Method::GET, "/beacon/head") => into_boxfut(beacon::get_head::(req, beacon_chain)), - (&Method::GET, "/beacon/heads") => { - into_boxfut(beacon::get_heads::(req, beacon_chain)) - } - (&Method::GET, "/beacon/block") => { - into_boxfut(beacon::get_block::(req, beacon_chain)) - } - (&Method::GET, "/beacon/block_root") => { - into_boxfut(beacon::get_block_root::(req, beacon_chain)) - } - (&Method::GET, "/beacon/fork") => into_boxfut(beacon::get_fork::(req, beacon_chain)), - (&Method::GET, "/beacon/genesis_time") => { - into_boxfut(beacon::get_genesis_time::(req, beacon_chain)) - } - (&Method::GET, "/beacon/genesis_validators_root") => { - into_boxfut(beacon::get_genesis_validators_root::(req, beacon_chain)) - } - (&Method::GET, "/beacon/validators") => { - into_boxfut(beacon::get_validators::(req, beacon_chain)) - } - (&Method::POST, "/beacon/validators") => { - into_boxfut(beacon::post_validators::(req, beacon_chain)) - } - (&Method::GET, "/beacon/validators/all") => { - into_boxfut(beacon::get_all_validators::(req, beacon_chain)) - } - (&Method::GET, "/beacon/validators/active") => { - into_boxfut(beacon::get_active_validators::(req, beacon_chain)) - } - (&Method::GET, "/beacon/state") => { - into_boxfut(beacon::get_state::(req, beacon_chain)) - } - (&Method::GET, "/beacon/state_root") => { - into_boxfut(beacon::get_state_root::(req, beacon_chain)) - } - (&Method::GET, "/beacon/state/genesis") => { - into_boxfut(beacon::get_genesis_state::(req, beacon_chain)) - } - (&Method::GET, "/beacon/committees") => { - into_boxfut(beacon::get_committees::(req, beacon_chain)) - } - (&Method::POST, "/beacon/proposer_slashing") => { - into_boxfut(beacon::proposer_slashing::(req, beacon_chain)) - } - (&Method::POST, "/beacon/attester_slashing") => { - into_boxfut(beacon::attester_slashing::(req, beacon_chain)) - } + // Methods for Beacon Node + (&Method::GET, "/beacon/head") => beacon::get_head::(req, beacon_chain), + (&Method::GET, "/beacon/heads") => beacon::get_heads::(req, beacon_chain), + (&Method::GET, "/beacon/block") => beacon::get_block::(req, beacon_chain), + (&Method::GET, "/beacon/block_root") => beacon::get_block_root::(req, beacon_chain), + (&Method::GET, "/beacon/fork") => beacon::get_fork::(req, beacon_chain), + (&Method::GET, "/beacon/genesis_time") => beacon::get_genesis_time::(req, beacon_chain), + (&Method::GET, "/beacon/genesis_validators_root") => { + beacon::get_genesis_validators_root::(req, beacon_chain) + } + (&Method::GET, "/beacon/validators") => beacon::get_validators::(req, beacon_chain), + (&Method::POST, "/beacon/validators") => { + beacon::post_validators::(req, beacon_chain).await + } + (&Method::GET, "/beacon/validators/all") => { + beacon::get_all_validators::(req, beacon_chain) + } + (&Method::GET, "/beacon/validators/active") => { + beacon::get_active_validators::(req, beacon_chain) + } + (&Method::GET, "/beacon/state") => beacon::get_state::(req, beacon_chain), + (&Method::GET, "/beacon/state_root") => beacon::get_state_root::(req, beacon_chain), + (&Method::GET, "/beacon/state/genesis") => { + beacon::get_genesis_state::(req, beacon_chain) + } + (&Method::GET, "/beacon/committees") => beacon::get_committees::(req, beacon_chain), + (&Method::POST, "/beacon/proposer_slashing") => { + beacon::proposer_slashing::(req, beacon_chain).await + } + (&Method::POST, "/beacon/attester_slashing") => { + beacon::attester_slashing::(req, beacon_chain).await + } - // Methods for Validator - (&Method::POST, "/validator/duties") => { - let timer = - metrics::start_timer(&metrics::VALIDATOR_GET_DUTIES_REQUEST_RESPONSE_TIME); - let response = validator::post_validator_duties::(req, beacon_chain); - drop(timer); - into_boxfut(response) - } - (&Method::POST, "/validator/subscribe") => { - validator::post_validator_subscriptions::(req, network_channel) - } - (&Method::GET, "/validator/duties/all") => { - into_boxfut(validator::get_all_validator_duties::(req, beacon_chain)) - } - (&Method::GET, "/validator/duties/active") => into_boxfut( - validator::get_active_validator_duties::(req, beacon_chain), - ), - (&Method::GET, "/validator/block") => { - let timer = - metrics::start_timer(&metrics::VALIDATOR_GET_BLOCK_REQUEST_RESPONSE_TIME); - let response = validator::get_new_beacon_block::(req, beacon_chain, log); - drop(timer); - into_boxfut(response) - } - (&Method::POST, "/validator/block") => { - validator::publish_beacon_block::(req, beacon_chain, network_channel, log) - } - (&Method::GET, "/validator/attestation") => { - let timer = - metrics::start_timer(&metrics::VALIDATOR_GET_ATTESTATION_REQUEST_RESPONSE_TIME); - let response = validator::get_new_attestation::(req, beacon_chain); - drop(timer); - into_boxfut(response) - } - (&Method::GET, "/validator/aggregate_attestation") => { - into_boxfut(validator::get_aggregate_attestation::(req, beacon_chain)) - } - (&Method::POST, "/validator/attestations") => { - validator::publish_attestations::(req, beacon_chain, network_channel, log) - } - (&Method::POST, "/validator/aggregate_and_proofs") => { - validator::publish_aggregate_and_proofs::( - req, - beacon_chain, - network_channel, - log, - ) - } + // Methods for Validator + (&Method::POST, "/validator/duties") => { + let timer = metrics::start_timer(&metrics::VALIDATOR_GET_DUTIES_REQUEST_RESPONSE_TIME); + let response = validator::post_validator_duties::(req, beacon_chain); + drop(timer); + response.await + } + (&Method::POST, "/validator/subscribe") => { + validator::post_validator_subscriptions::(req, network_channel).await + } + (&Method::GET, "/validator/duties/all") => { + validator::get_all_validator_duties::(req, beacon_chain) + } + (&Method::GET, "/validator/duties/active") => { + validator::get_active_validator_duties::(req, beacon_chain) + } + (&Method::GET, "/validator/block") => { + let timer = metrics::start_timer(&metrics::VALIDATOR_GET_BLOCK_REQUEST_RESPONSE_TIME); + let response = validator::get_new_beacon_block::(req, beacon_chain, log); + drop(timer); + response + } + (&Method::POST, "/validator/block") => { + validator::publish_beacon_block::(req, beacon_chain, network_channel, log).await + } + (&Method::GET, "/validator/attestation") => { + let timer = + metrics::start_timer(&metrics::VALIDATOR_GET_ATTESTATION_REQUEST_RESPONSE_TIME); + let response = validator::get_new_attestation::(req, beacon_chain); + drop(timer); + response + } + (&Method::GET, "/validator/aggregate_attestation") => { + validator::get_aggregate_attestation::(req, beacon_chain) + } + (&Method::POST, "/validator/attestations") => { + validator::publish_attestations::(req, beacon_chain, network_channel, log).await + } + (&Method::POST, "/validator/aggregate_and_proofs") => { + validator::publish_aggregate_and_proofs::(req, beacon_chain, network_channel, log) + .await + } - // Methods for consensus - (&Method::GET, "/consensus/global_votes") => { - into_boxfut(consensus::get_vote_count::(req, beacon_chain)) - } - (&Method::POST, "/consensus/individual_votes") => { - consensus::post_individual_votes::(req, beacon_chain) - } + // Methods for consensus + (&Method::GET, "/consensus/global_votes") => { + consensus::get_vote_count::(req, beacon_chain) + } + (&Method::POST, "/consensus/individual_votes") => { + consensus::post_individual_votes::(req, beacon_chain).await + } - // Methods for bootstrap and checking configuration - (&Method::GET, "/spec") => into_boxfut(spec::get_spec::(req, beacon_chain)), - (&Method::GET, "/spec/slots_per_epoch") => { - into_boxfut(spec::get_slots_per_epoch::(req)) - } - (&Method::GET, "/spec/deposit_contract") => { - into_boxfut(helpers::implementation_pending_response(req)) - } - (&Method::GET, "/spec/eth2_config") => { - into_boxfut(spec::get_eth2_config::(req, eth2_config)) - } + // Methods for bootstrap and checking configuration + (&Method::GET, "/spec") => spec::get_spec::(req, beacon_chain), + (&Method::GET, "/spec/slots_per_epoch") => spec::get_slots_per_epoch::(req), + (&Method::GET, "/spec/deposit_contract") => helpers::implementation_pending_response(req), + (&Method::GET, "/spec/eth2_config") => spec::get_eth2_config::(req, eth2_config), - // Methods for advanced parameters - (&Method::GET, "/advanced/fork_choice") => { - into_boxfut(advanced::get_fork_choice::(req, beacon_chain)) - } - (&Method::GET, "/advanced/operation_pool") => { - into_boxfut(advanced::get_operation_pool::(req, beacon_chain)) - } - (&Method::GET, "/metrics") => into_boxfut(metrics::get_prometheus::( - req, - beacon_chain, - db_path, - freezer_db_path, - )), + // Methods for advanced parameters + (&Method::GET, "/advanced/fork_choice") => { + advanced::get_fork_choice::(req, beacon_chain) + } + (&Method::GET, "/advanced/operation_pool") => { + advanced::get_operation_pool::(req, beacon_chain) + } - // Lighthouse specific - (&Method::GET, "/lighthouse/syncing") => { - into_boxfut(lighthouse::syncing::(req, network_globals)) - } - (&Method::GET, "/lighthouse/peers") => { - into_boxfut(lighthouse::peers::(req, network_globals)) - } - (&Method::GET, "/lighthouse/connected_peers") => into_boxfut( - lighthouse::connected_peers::(req, network_globals), - ), - _ => Box::new(futures::future::err(ApiError::NotFound( - "Request path and/or method not found.".to_owned(), - ))), - }; + (&Method::GET, "/metrics") => { + metrics::get_prometheus::(req, beacon_chain, db_path, freezer_db_path) + } + + // Lighthouse specific + (&Method::GET, "/lighthouse/syncing") => { + lighthouse::syncing::(req, network_globals) + } + + (&Method::GET, "/lighthouse/peers") => { + lighthouse::peers::(req, network_globals) + } + + (&Method::GET, "/lighthouse/connected_peers") => { + lighthouse::connected_peers::(req, network_globals) + } + _ => Err(ApiError::NotFound( + "Request path and/or method not found.".to_owned(), + )), + }; // Map the Rust-friendly `Result` in to a http-friendly response. In effect, this ensures that // any `Err` returned from our response handlers becomes a valid http response to the client // (e.g., a response with a 404 or 500 status). - request_result.then(move |result| { - let duration = Instant::now().duration_since(received_instant); - match result { - Ok(response) => { - debug!( - local_log, - "HTTP API request successful"; - "path" => path, - "duration_ms" => duration.as_millis() - ); - metrics::inc_counter(&metrics::SUCCESS_COUNT); - metrics::stop_timer(timer); + let duration = Instant::now().duration_since(received_instant); + match request_result { + Ok(response) => { + debug!( + local_log, + "HTTP API request successful"; + "path" => path, + "duration_ms" => duration.as_millis() + ); + metrics::inc_counter(&metrics::SUCCESS_COUNT); + metrics::stop_timer(timer); - Ok(response) - } - Err(e) => { - let error_response = e.into(); - - debug!( - local_log, - "HTTP API request failure"; - "path" => path, - "duration_ms" => duration.as_millis() - ); - metrics::stop_timer(timer); - - Ok(error_response) - } + Ok(response) } - }) + Err(e) => { + let error_response = e.into(); + + debug!( + local_log, + "HTTP API request failure"; + "path" => path, + "duration_ms" => duration.as_millis() + ); + metrics::stop_timer(timer); + + Ok(error_response) + } + } } diff --git a/beacon_node/rest_api/src/validator.rs b/beacon_node/rest_api/src/validator.rs index 18bd628c0c..7656437ea7 100644 --- a/beacon_node/rest_api/src/validator.rs +++ b/beacon_node/rest_api/src/validator.rs @@ -1,13 +1,12 @@ use crate::helpers::{check_content_type_for_json, publish_beacon_block_to_network}; use crate::response_builder::ResponseBuilder; -use crate::{ApiError, ApiResult, BoxFut, NetworkChannel, UrlQuery}; +use crate::{ApiError, ApiResult, NetworkChannel, UrlQuery}; use beacon_chain::{ attestation_verification::Error as AttnError, BeaconChain, BeaconChainTypes, BlockError, StateSkipConfig, }; use bls::PublicKeyBytes; use eth2_libp2p::PubsubMessage; -use futures::{Future, Stream}; use hyper::{Body, Request}; use network::NetworkMessage; use rayon::prelude::*; @@ -23,23 +22,23 @@ use types::{ /// HTTP Handler to retrieve the duties for a set of validators during a particular epoch. This /// method allows for collecting bulk sets of validator duties without risking exceeding the max /// URL length with query pairs. -pub fn post_validator_duties( +pub async fn post_validator_duties( req: Request, beacon_chain: Arc>, -) -> BoxFut { +) -> ApiResult { let response_builder = ResponseBuilder::new(&req); - let future = req - .into_body() - .concat2() - .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e))) - .and_then(|chunks| { - serde_json::from_slice::(&chunks).map_err(|e| { - ApiError::BadRequest(format!( - "Unable to parse JSON into ValidatorDutiesRequest: {:?}", - e - )) - }) + let body = req.into_body(); + let chunks = hyper::body::to_bytes(body) + .await + .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?; + + serde_json::from_slice::(&chunks) + .map_err(|e| { + ApiError::BadRequest(format!( + "Unable to parse JSON into ValidatorDutiesRequest: {:?}", + e + )) }) .and_then(|bulk_request| { return_validator_duties( @@ -48,45 +47,42 @@ pub fn post_validator_duties( bulk_request.pubkeys.into_iter().map(Into::into).collect(), ) }) - .and_then(|duties| response_builder?.body_no_ssz(&duties)); - - Box::new(future) + .and_then(|duties| response_builder?.body_no_ssz(&duties)) } /// HTTP Handler to retrieve subscriptions for a set of validators. This allows the node to /// organise peer discovery and topic subscription for known validators. -pub fn post_validator_subscriptions( +pub async fn post_validator_subscriptions( req: Request, - mut network_chan: NetworkChannel, -) -> BoxFut { + network_chan: NetworkChannel, +) -> ApiResult { try_future!(check_content_type_for_json(&req)); let response_builder = ResponseBuilder::new(&req); let body = req.into_body(); - Box::new( - body.concat2() - .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e))) - .and_then(|chunks| { - serde_json::from_slice(&chunks).map_err(|e| { - ApiError::BadRequest(format!( - "Unable to parse JSON into ValidatorSubscriptions: {:?}", + let chunks = hyper::body::to_bytes(body) + .await + .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?; + + serde_json::from_slice(&chunks) + .map_err(|e| { + ApiError::BadRequest(format!( + "Unable to parse JSON into ValidatorSubscriptions: {:?}", + e + )) + }) + .and_then(move |subscriptions: Vec| { + network_chan + .send(NetworkMessage::Subscribe { subscriptions }) + .map_err(|e| { + ApiError::ServerError(format!( + "Unable to subscriptions to the network: {:?}", e )) - }) - }) - .and_then(move |subscriptions: Vec| { - network_chan - .try_send(NetworkMessage::Subscribe { subscriptions }) - .map_err(|e| { - ApiError::ServerError(format!( - "Unable to subscriptions to the network: {:?}", - e - )) - })?; - Ok(()) - }) - .and_then(|_| response_builder?.body_no_ssz(&())), - ) + })?; + Ok(()) + }) + .and_then(|_| response_builder?.body_no_ssz(&())) } /// HTTP Handler to retrieve all validator duties for the given epoch. @@ -291,24 +287,23 @@ pub fn get_new_beacon_block( } /// HTTP Handler to publish a SignedBeaconBlock, which has been signed by a validator. -pub fn publish_beacon_block( +pub async fn publish_beacon_block( req: Request, beacon_chain: Arc>, network_chan: NetworkChannel, log: Logger, -) -> BoxFut { +) -> ApiResult { try_future!(check_content_type_for_json(&req)); let response_builder = ResponseBuilder::new(&req); let body = req.into_body(); - Box::new( - body.concat2() - .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e))) - .and_then(|chunks| { - serde_json::from_slice(&chunks).map_err(|e| { + let chunks = hyper::body::to_bytes(body) + .await + .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?; + + serde_json::from_slice(&chunks).map_err(|e| { ApiError::BadRequest(format!("Unable to parse JSON into SignedBeaconBlock: {:?}", e)) }) - }) .and_then(move |block: SignedBeaconBlock| { let slot = block.slot(); match beacon_chain.process_block(block.clone()) { @@ -382,7 +377,6 @@ pub fn publish_beacon_block( } }) .and_then(|_| response_builder?.body_no_ssz(&())) - ) } /// HTTP Handler to produce a new Attestation from the current state, ready to be signed by a validator. @@ -424,59 +418,56 @@ pub fn get_aggregate_attestation( } /// HTTP Handler to publish a list of Attestations, which have been signed by a number of validators. -pub fn publish_attestations( +pub async fn publish_attestations( req: Request, beacon_chain: Arc>, network_chan: NetworkChannel, log: Logger, -) -> BoxFut { +) -> ApiResult { try_future!(check_content_type_for_json(&req)); let response_builder = ResponseBuilder::new(&req); - Box::new( - req.into_body() - .concat2() - .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e))) - .map(|chunk| chunk.iter().cloned().collect::>()) - .and_then(|chunks| { - serde_json::from_slice(&chunks.as_slice()).map_err(|e| { - ApiError::BadRequest(format!( - "Unable to deserialize JSON into a list of attestations: {:?}", - e - )) + let body = req.into_body(); + let chunk = hyper::body::to_bytes(body) + .await + .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?; + + let chunks = chunk.iter().cloned().collect::>(); + serde_json::from_slice(&chunks.as_slice()) + .map_err(|e| { + ApiError::BadRequest(format!( + "Unable to deserialize JSON into a list of attestations: {:?}", + e + )) + }) + // Process all of the aggregates _without_ exiting early if one fails. + .map(move |attestations: Vec>| { + attestations + .into_par_iter() + .enumerate() + .map(|(i, attestation)| { + process_unaggregated_attestation( + &beacon_chain, + network_chan.clone(), + attestation, + i, + &log, + ) }) - }) - // Process all of the aggregates _without_ exiting early if one fails. - .map(move |attestations: Vec>| { - attestations - .into_par_iter() - .enumerate() - .map(|(i, attestation)| { - process_unaggregated_attestation( - &beacon_chain, - network_chan.clone(), - attestation, - i, - &log, - ) - }) - .collect::>>() - }) - // Iterate through all the results and return on the first `Err`. - // - // Note: this will only provide info about the _first_ failure, not all failures. - .and_then(|processing_results| { - processing_results.into_iter().try_for_each(|result| result) - }) - .and_then(|_| response_builder?.body_no_ssz(&())), - ) + .collect::>>() + }) + // Iterate through all the results and return on the first `Err`. + // + // Note: this will only provide info about the _first_ failure, not all failures. + .and_then(|processing_results| processing_results.into_iter().try_for_each(|result| result)) + .and_then(|_| response_builder?.body_no_ssz(&())) } /// Processes an unaggregrated attestation that was included in a list of attestations with the /// index `i`. fn process_unaggregated_attestation( beacon_chain: &BeaconChain, - mut network_chan: NetworkChannel, + network_chan: NetworkChannel, attestation: Attestation, i: usize, log: &Logger, @@ -496,7 +487,7 @@ fn process_unaggregated_attestation( })?; // Publish the attestation to the network - if let Err(e) = network_chan.try_send(NetworkMessage::Publish { + if let Err(e) = network_chan.send(NetworkMessage::Publish { messages: vec![PubsubMessage::Attestation(Box::new(( attestation .subnet_id(&beacon_chain.spec) @@ -542,61 +533,56 @@ fn process_unaggregated_attestation( } /// HTTP Handler to publish an Attestation, which has been signed by a validator. -pub fn publish_aggregate_and_proofs( +pub async fn publish_aggregate_and_proofs( req: Request, beacon_chain: Arc>, network_chan: NetworkChannel, log: Logger, -) -> BoxFut { +) -> ApiResult { try_future!(check_content_type_for_json(&req)); let response_builder = ResponseBuilder::new(&req); - - Box::new( - req.into_body() - .concat2() - .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e))) - .map(|chunk| chunk.iter().cloned().collect::>()) - .and_then(|chunks| { - serde_json::from_slice(&chunks.as_slice()).map_err(|e| { - ApiError::BadRequest(format!( - "Unable to deserialize JSON into a list of SignedAggregateAndProof: {:?}", - e - )) - }) - }) - // Process all of the aggregates _without_ exiting early if one fails. - .map( - move |signed_aggregates: Vec>| { - signed_aggregates - .into_par_iter() - .enumerate() - .map(|(i, signed_aggregate)| { - process_aggregated_attestation( - &beacon_chain, - network_chan.clone(), - signed_aggregate, - i, - &log, - ) - }) - .collect::>>() - }, - ) - // Iterate through all the results and return on the first `Err`. - // - // Note: this will only provide info about the _first_ failure, not all failures. - .and_then(|processing_results| { - processing_results.into_iter().try_for_each(|result| result) - }) - .and_then(|_| response_builder?.body_no_ssz(&())), - ) + let body = req.into_body(); + let chunk = hyper::body::to_bytes(body) + .await + .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?; + let chunks = chunk.iter().cloned().collect::>(); + serde_json::from_slice(&chunks.as_slice()) + .map_err(|e| { + ApiError::BadRequest(format!( + "Unable to deserialize JSON into a list of SignedAggregateAndProof: {:?}", + e + )) + }) + // Process all of the aggregates _without_ exiting early if one fails. + .map( + move |signed_aggregates: Vec>| { + signed_aggregates + .into_par_iter() + .enumerate() + .map(|(i, signed_aggregate)| { + process_aggregated_attestation( + &beacon_chain, + network_chan.clone(), + signed_aggregate, + i, + &log, + ) + }) + .collect::>>() + }, + ) + // Iterate through all the results and return on the first `Err`. + // + // Note: this will only provide info about the _first_ failure, not all failures. + .and_then(|processing_results| processing_results.into_iter().try_for_each(|result| result)) + .and_then(|_| response_builder?.body_no_ssz(&())) } /// Processes an aggregrated attestation that was included in a list of attestations with the index /// `i`. fn process_aggregated_attestation( beacon_chain: &BeaconChain, - mut network_chan: NetworkChannel, + network_chan: NetworkChannel, signed_aggregate: SignedAggregateAndProof, i: usize, log: &Logger, @@ -643,7 +629,7 @@ fn process_aggregated_attestation( }; // Publish the attestation to the network - if let Err(e) = network_chan.try_send(NetworkMessage::Publish { + if let Err(e) = network_chan.send(NetworkMessage::Publish { messages: vec![PubsubMessage::AggregateAndProofAttestation(Box::new( signed_aggregate, ))],