Port rest_api crate to stable futures (#1118)

* Port rest_api lib to stable futures

* Reduce tokio features
This commit is contained in:
Pawan Dhananjay
2020-05-08 07:21:20 +05:30
committed by GitHub
parent 8e81ea911d
commit 7fbde447b1
10 changed files with 381 additions and 460 deletions

View File

@@ -25,7 +25,7 @@ state_processing = { path = "../../eth2/state_processing" }
types = { path = "../../eth2/types" } types = { path = "../../eth2/types" }
http = "0.2.1" http = "0.2.1"
hyper = "0.13.5" hyper = "0.13.5"
tokio = "0.2.20" tokio = { version = "0.2", features = ["sync"] }
url = "2.1.1" url = "2.1.1"
lazy_static = "1.4.0" lazy_static = "1.4.0"
eth2_config = { path = "../../eth2/utils/eth2_config" } eth2_config = { path = "../../eth2/utils/eth2_config" }

View File

@@ -1,9 +1,8 @@
use crate::helpers::*; use crate::helpers::*;
use crate::response_builder::ResponseBuilder; use crate::response_builder::ResponseBuilder;
use crate::validator::get_state_for_epoch; use crate::validator::get_state_for_epoch;
use crate::{ApiError, ApiResult, BoxFut, UrlQuery}; use crate::{ApiError, ApiResult, UrlQuery};
use beacon_chain::{BeaconChain, BeaconChainTypes, StateSkipConfig}; use beacon_chain::{BeaconChain, BeaconChainTypes, StateSkipConfig};
use futures::{Future, Stream};
use hyper::{Body, Request}; use hyper::{Body, Request};
use rest_types::{ use rest_types::{
BlockResponse, CanonicalHeadResponse, Committee, HeadBeaconBlock, StateResponse, BlockResponse, CanonicalHeadResponse, Committee, HeadBeaconBlock, StateResponse,
@@ -216,23 +215,22 @@ pub fn get_active_validators<T: BeaconChainTypes>(
/// ///
/// This method allows for a basically unbounded list of `pubkeys`, where as the `get_validators` /// This method allows for a basically unbounded list of `pubkeys`, where as the `get_validators`
/// request is limited by the max number of pubkeys you can fit in a URL. /// request is limited by the max number of pubkeys you can fit in a URL.
pub fn post_validators<T: BeaconChainTypes>( pub async fn post_validators<T: BeaconChainTypes>(
req: Request<Body>, req: Request<Body>,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
) -> BoxFut { ) -> ApiResult {
let response_builder = ResponseBuilder::new(&req); let response_builder = ResponseBuilder::new(&req);
let future = req let body = req.into_body();
.into_body() let chunks = hyper::body::to_bytes(body)
.concat2() .await
.map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e))) .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?;
.and_then(|chunks| { serde_json::from_slice::<ValidatorRequest>(&chunks)
serde_json::from_slice::<ValidatorRequest>(&chunks).map_err(|e| { .map_err(|e| {
ApiError::BadRequest(format!( ApiError::BadRequest(format!(
"Unable to parse JSON into ValidatorRequest: {:?}", "Unable to parse JSON into ValidatorRequest: {:?}",
e e
)) ))
})
}) })
.and_then(|bulk_request| { .and_then(|bulk_request| {
validator_responses_by_pubkey( validator_responses_by_pubkey(
@@ -241,9 +239,7 @@ pub fn post_validators<T: BeaconChainTypes>(
bulk_request.pubkeys, bulk_request.pubkeys,
) )
}) })
.and_then(|validators| response_builder?.body(&validators)); .and_then(|validators| response_builder?.body(&validators))
Box::new(future)
} }
/// Returns either the state given by `state_root_opt`, or the canonical head state if it is /// Returns either the state given by `state_root_opt`, or the canonical head state if it is
@@ -449,23 +445,23 @@ pub fn get_genesis_validators_root<T: BeaconChainTypes>(
ResponseBuilder::new(&req)?.body(&beacon_chain.head_info()?.genesis_validators_root) ResponseBuilder::new(&req)?.body(&beacon_chain.head_info()?.genesis_validators_root)
} }
pub fn proposer_slashing<T: BeaconChainTypes>( pub async fn proposer_slashing<T: BeaconChainTypes>(
req: Request<Body>, req: Request<Body>,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
) -> BoxFut { ) -> ApiResult {
let response_builder = ResponseBuilder::new(&req); let response_builder = ResponseBuilder::new(&req);
let future = req let body = req.into_body();
.into_body() let chunks = hyper::body::to_bytes(body)
.concat2() .await
.map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e))) .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?;
.and_then(|chunks| {
serde_json::from_slice::<ProposerSlashing>(&chunks).map_err(|e| { serde_json::from_slice::<ProposerSlashing>(&chunks)
ApiError::BadRequest(format!( .map_err(|e| {
"Unable to parse JSON into ProposerSlashing: {:?}", ApiError::BadRequest(format!(
e "Unable to parse JSON into ProposerSlashing: {:?}",
)) e
}) ))
}) })
.and_then(move |proposer_slashing| { .and_then(move |proposer_slashing| {
let spec = &beacon_chain.spec; let spec = &beacon_chain.spec;
@@ -481,33 +477,31 @@ pub fn proposer_slashing<T: BeaconChainTypes>(
)) ))
}) })
} else { } else {
Err(ApiError::BadRequest( return Err(ApiError::BadRequest(
"Cannot insert proposer slashing on node without Eth1 connection.".to_string(), "Cannot insert proposer slashing on node without Eth1 connection.".to_string(),
)) ));
} }
}) })
.and_then(|_| response_builder?.body(&true)); .and_then(|_| response_builder?.body(&true))
Box::new(future)
} }
pub fn attester_slashing<T: BeaconChainTypes>( pub async fn attester_slashing<T: BeaconChainTypes>(
req: Request<Body>, req: Request<Body>,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
) -> BoxFut { ) -> ApiResult {
let response_builder = ResponseBuilder::new(&req); let response_builder = ResponseBuilder::new(&req);
let future = req let body = req.into_body();
.into_body() let chunks = hyper::body::to_bytes(body)
.concat2() .await
.map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e))) .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?;
.and_then(|chunks| {
serde_json::from_slice::<AttesterSlashing<T::EthSpec>>(&chunks).map_err(|e| { serde_json::from_slice::<AttesterSlashing<T::EthSpec>>(&chunks)
ApiError::BadRequest(format!( .map_err(|e| {
"Unable to parse JSON into AttesterSlashing: {:?}", ApiError::BadRequest(format!(
e "Unable to parse JSON into AttesterSlashing: {:?}",
)) e
}) ))
}) })
.and_then(move |attester_slashing| { .and_then(move |attester_slashing| {
let spec = &beacon_chain.spec; let spec = &beacon_chain.spec;
@@ -528,7 +522,5 @@ pub fn attester_slashing<T: BeaconChainTypes>(
)) ))
} }
}) })
.and_then(|_| response_builder?.body(&true)); .and_then(|_| response_builder?.body(&true))
Box::new(future)
} }

View File

@@ -1,8 +1,7 @@
use crate::helpers::*; use crate::helpers::*;
use crate::response_builder::ResponseBuilder; use crate::response_builder::ResponseBuilder;
use crate::{ApiError, ApiResult, BoxFut, UrlQuery}; use crate::{ApiError, ApiResult, UrlQuery};
use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_chain::{BeaconChain, BeaconChainTypes};
use futures::{Future, Stream};
use hyper::{Body, Request}; use hyper::{Body, Request};
use rest_types::{IndividualVotesRequest, IndividualVotesResponse}; use rest_types::{IndividualVotesRequest, IndividualVotesResponse};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -71,23 +70,23 @@ pub fn get_vote_count<T: BeaconChainTypes>(
ResponseBuilder::new(&req)?.body(&report) ResponseBuilder::new(&req)?.body(&report)
} }
pub fn post_individual_votes<T: BeaconChainTypes>( pub async fn post_individual_votes<T: BeaconChainTypes>(
req: Request<Body>, req: Request<Body>,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
) -> BoxFut { ) -> ApiResult {
let response_builder = ResponseBuilder::new(&req); let response_builder = ResponseBuilder::new(&req);
let future = req let body = req.into_body();
.into_body() let chunks = hyper::body::to_bytes(body)
.concat2() .await
.map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e))) .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?;
.and_then(|chunks| {
serde_json::from_slice::<IndividualVotesRequest>(&chunks).map_err(|e| { serde_json::from_slice::<IndividualVotesRequest>(&chunks)
ApiError::BadRequest(format!( .map_err(|e| {
"Unable to parse JSON into ValidatorDutiesRequest: {:?}", ApiError::BadRequest(format!(
e "Unable to parse JSON into ValidatorDutiesRequest: {:?}",
)) e
}) ))
}) })
.and_then(move |body| { .and_then(move |body| {
let epoch = body.epoch; let epoch = body.epoch;
@@ -136,7 +135,5 @@ pub fn post_individual_votes<T: BeaconChainTypes>(
}) })
.collect::<Result<Vec<_>, _>>() .collect::<Result<Vec<_>, _>>()
}) })
.and_then(|votes| response_builder?.body_no_ssz(&votes)); .and_then(|votes| response_builder?.body_no_ssz(&votes))
Box::new(future)
} }

View File

@@ -1,4 +1,3 @@
use crate::BoxFut;
use hyper::{Body, Response, StatusCode}; use hyper::{Body, Response, StatusCode};
use std::error::Error as StdError; use std::error::Error as StdError;
@@ -42,12 +41,6 @@ impl Into<Response<Body>> for ApiError {
} }
} }
impl Into<BoxFut> for ApiError {
fn into(self) -> BoxFut {
Box::new(futures::future::err(self))
}
}
impl From<store::Error> for ApiError { impl From<store::Error> for ApiError {
fn from(e: store::Error) -> ApiError { fn from(e: store::Error) -> ApiError {
ApiError::ServerError(format!("Database error: {:?}", e)) ApiError::ServerError(format!("Database error: {:?}", e))

View File

@@ -229,14 +229,14 @@ pub fn implementation_pending_response(_req: Request<Body>) -> ApiResult {
} }
pub fn publish_beacon_block_to_network<T: BeaconChainTypes + 'static>( pub fn publish_beacon_block_to_network<T: BeaconChainTypes + 'static>(
mut chan: NetworkChannel<T::EthSpec>, chan: NetworkChannel<T::EthSpec>,
block: SignedBeaconBlock<T::EthSpec>, block: SignedBeaconBlock<T::EthSpec>,
) -> Result<(), ApiError> { ) -> Result<(), ApiError> {
// send the block via SSZ encoding // send the block via SSZ encoding
let messages = vec![PubsubMessage::BeaconBlock(Box::new(block))]; let messages = vec![PubsubMessage::BeaconBlock(Box::new(block))];
// Publish the block to the p2p network via gossipsub. // Publish the block to the p2p network via gossipsub.
if let Err(e) = chan.try_send(NetworkMessage::Publish { messages }) { if let Err(e) = chan.send(NetworkMessage::Publish { messages }) {
return Err(ApiError::ServerError(format!( return Err(ApiError::ServerError(format!(
"Unable to send new block to network: {:?}", "Unable to send new block to network: {:?}",
e e

View File

@@ -26,23 +26,22 @@ pub use config::ApiEncodingFormat;
use error::{ApiError, ApiResult}; use error::{ApiError, ApiResult};
use eth2_config::Eth2Config; use eth2_config::Eth2Config;
use eth2_libp2p::NetworkGlobals; use eth2_libp2p::NetworkGlobals;
use hyper::rt::Future; use futures::future::TryFutureExt;
use hyper::server::conn::AddrStream; use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn}; use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server}; use hyper::{Body, Request, Server};
use slog::{info, warn}; use slog::{info, warn};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::ops::Deref; use std::ops::Deref;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use tokio::runtime::TaskExecutor; use tokio::runtime::Handle;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use url_query::UrlQuery; use url_query::UrlQuery;
pub use crate::helpers::parse_pubkey_bytes; pub use crate::helpers::parse_pubkey_bytes;
pub use config::Config; pub use config::Config;
pub type BoxFut = Box<dyn Future<Item = Response<Body>, Error = ApiError> + Send>;
pub type NetworkChannel<T> = mpsc::UnboundedSender<NetworkMessage<T>>; pub type NetworkChannel<T> = mpsc::UnboundedSender<NetworkMessage<T>>;
pub struct NetworkInfo<T: BeaconChainTypes> { pub struct NetworkInfo<T: BeaconChainTypes> {
@@ -54,7 +53,7 @@ pub struct NetworkInfo<T: BeaconChainTypes> {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn start_server<T: BeaconChainTypes>( pub fn start_server<T: BeaconChainTypes>(
config: &Config, config: &Config,
executor: &TaskExecutor, handle: &Handle,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
network_info: NetworkInfo<T>, network_info: NetworkInfo<T>,
db_path: PathBuf, db_path: PathBuf,
@@ -75,18 +74,20 @@ pub fn start_server<T: BeaconChainTypes>(
let db_path = db_path.clone(); let db_path = db_path.clone();
let freezer_db_path = freezer_db_path.clone(); let freezer_db_path = freezer_db_path.clone();
service_fn(move |req: Request<Body>| { async move {
router::route( Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| {
req, router::route(
beacon_chain.clone(), req,
network_globals.clone(), beacon_chain.clone(),
network_channel.clone(), network_globals.clone(),
eth2_config.clone(), network_channel.clone(),
log.clone(), eth2_config.clone(),
db_path.clone(), log.clone(),
freezer_db_path.clone(), db_path.clone(),
) freezer_db_path.clone(),
}) )
}))
}
}); });
let bind_addr = (config.listen_address, config.port).into(); let bind_addr = (config.listen_address, config.port).into();
@@ -99,16 +100,22 @@ pub fn start_server<T: BeaconChainTypes>(
let actual_listen_addr = server.local_addr(); let actual_listen_addr = server.local_addr();
// Build a channel to kill the HTTP server. // Build a channel to kill the HTTP server.
let (exit_signal, exit) = oneshot::channel(); let (exit_signal, exit) = oneshot::channel::<()>();
let inner_log = log.clone(); let inner_log = log.clone();
let server_exit = exit.and_then(move |_| { let server_exit = exit
info!(inner_log, "HTTP service shutdown"); .and_then(move |_| {
Ok(()) info!(inner_log, "HTTP service shutdown");
}); futures::future::ok(())
})
.map_err(|_| ());
// Configure the `hyper` server to gracefully shutdown when the shutdown channel is triggered. // Configure the `hyper` server to gracefully shutdown when the shutdown channel is triggered.
let inner_log = log.clone(); let inner_log = log.clone();
let server_future = server let server_future = server
.with_graceful_shutdown(server_exit) .with_graceful_shutdown(async {
// TODO: Copied from the docs. I think the await is ok here.
server_exit.await.ok();
})
.map_err(move |e| { .map_err(move |e| {
warn!( warn!(
inner_log, inner_log,
@@ -123,7 +130,7 @@ pub fn start_server<T: BeaconChainTypes>(
"port" => actual_listen_addr.port(), "port" => actual_listen_addr.port(),
); );
executor.spawn(server_future); handle.spawn(server_future);
Ok((exit_signal, actual_listen_addr)) Ok((exit_signal, actual_listen_addr))
} }

View File

@@ -2,9 +2,7 @@ macro_rules! try_future {
($expr:expr) => { ($expr:expr) => {
match $expr { match $expr {
core::result::Result::Ok(val) => val, core::result::Result::Ok(val) => val,
core::result::Result::Err(err) => { core::result::Result::Err(err) => return Err(std::convert::From::from(err)),
return Box::new(futures::future::err(std::convert::From::from(err)))
}
} }
}; };
($expr:expr,) => { ($expr:expr,) => {

View File

@@ -1,6 +1,6 @@
use super::{ApiError, ApiResult}; use super::{ApiError, ApiResult};
use crate::config::ApiEncodingFormat; use crate::config::ApiEncodingFormat;
use http::header; use hyper::header;
use hyper::{Body, Request, Response, StatusCode}; use hyper::{Body, Request, Response, StatusCode};
use serde::Serialize; use serde::Serialize;
use ssz::Encode; use ssz::Encode;

View File

@@ -1,11 +1,10 @@
use crate::{ use crate::{
advanced, beacon, consensus, error::ApiError, helpers, lighthouse, metrics, network, node, advanced, beacon, consensus, error::ApiError, helpers, lighthouse, metrics, network, node,
spec, validator, BoxFut, NetworkChannel, spec, validator, NetworkChannel,
}; };
use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_config::Eth2Config; use eth2_config::Eth2Config;
use eth2_libp2p::NetworkGlobals; use eth2_libp2p::NetworkGlobals;
use futures::{Future, IntoFuture};
use hyper::{Body, Error, Method, Request, Response}; use hyper::{Body, Error, Method, Request, Response};
use slog::debug; use slog::debug;
use std::path::PathBuf; use std::path::PathBuf;
@@ -13,17 +12,9 @@ use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use types::Slot; use types::Slot;
fn into_boxfut<F: IntoFuture + 'static>(item: F) -> BoxFut
where
F: IntoFuture<Item = Response<Body>, Error = ApiError>,
F::Future: Send,
{
Box::new(item.into_future())
}
// Allowing more than 7 arguments. // Allowing more than 7 arguments.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn route<T: BeaconChainTypes>( pub async fn route<T: BeaconChainTypes>(
req: Request<Body>, req: Request<Body>,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>, network_globals: Arc<NetworkGlobals<T::EthSpec>>,
@@ -32,7 +23,7 @@ pub fn route<T: BeaconChainTypes>(
local_log: slog::Logger, local_log: slog::Logger,
db_path: PathBuf, db_path: PathBuf,
freezer_db_path: PathBuf, freezer_db_path: PathBuf,
) -> impl Future<Item = Response<Body>, Error = Error> { ) -> Result<Response<Body>, Error> {
metrics::inc_counter(&metrics::REQUEST_COUNT); metrics::inc_counter(&metrics::REQUEST_COUNT);
let timer = metrics::start_timer(&metrics::REQUEST_RESPONSE_TIME); let timer = metrics::start_timer(&metrics::REQUEST_RESPONSE_TIME);
let received_instant = Instant::now(); let received_instant = Instant::now();
@@ -40,222 +31,179 @@ pub fn route<T: BeaconChainTypes>(
let path = req.uri().path().to_string(); let path = req.uri().path().to_string();
let log = local_log.clone(); let log = local_log.clone();
let request_result: Box<dyn Future<Item = Response<_>, Error = _> + Send> = let request_result = match (req.method(), path.as_ref()) {
match (req.method(), path.as_ref()) { // Methods for Client
// Methods for Client (&Method::GET, "/node/version") => node::get_version(req),
(&Method::GET, "/node/version") => into_boxfut(node::get_version(req)), (&Method::GET, "/node/syncing") => {
(&Method::GET, "/node/syncing") => { // inform the current slot, or set to 0
// inform the current slot, or set to 0 let current_slot = beacon_chain
let current_slot = beacon_chain .head_info()
.head_info() .map(|info| info.slot)
.map(|info| info.slot) .unwrap_or_else(|_| Slot::from(0u64));
.unwrap_or_else(|_| Slot::from(0u64));
into_boxfut(node::syncing::<T::EthSpec>( node::syncing::<T::EthSpec>(req, network_globals, current_slot)
req, }
network_globals,
current_slot,
))
}
// Methods for Network // Methods for Network
(&Method::GET, "/network/enr") => { (&Method::GET, "/network/enr") => network::get_enr::<T>(req, network_globals),
into_boxfut(network::get_enr::<T>(req, network_globals)) (&Method::GET, "/network/peer_count") => network::get_peer_count::<T>(req, network_globals),
} (&Method::GET, "/network/peer_id") => network::get_peer_id::<T>(req, network_globals),
(&Method::GET, "/network/peer_count") => { (&Method::GET, "/network/peers") => network::get_peer_list::<T>(req, network_globals),
into_boxfut(network::get_peer_count::<T>(req, network_globals)) (&Method::GET, "/network/listen_port") => {
} network::get_listen_port::<T>(req, network_globals)
(&Method::GET, "/network/peer_id") => { }
into_boxfut(network::get_peer_id::<T>(req, network_globals)) (&Method::GET, "/network/listen_addresses") => {
} network::get_listen_addresses::<T>(req, network_globals)
(&Method::GET, "/network/peers") => { }
into_boxfut(network::get_peer_list::<T>(req, network_globals))
}
(&Method::GET, "/network/listen_port") => {
into_boxfut(network::get_listen_port::<T>(req, network_globals))
}
(&Method::GET, "/network/listen_addresses") => {
into_boxfut(network::get_listen_addresses::<T>(req, network_globals))
}
// Methods for Beacon Node // Methods for Beacon Node
(&Method::GET, "/beacon/head") => into_boxfut(beacon::get_head::<T>(req, beacon_chain)), (&Method::GET, "/beacon/head") => beacon::get_head::<T>(req, beacon_chain),
(&Method::GET, "/beacon/heads") => { (&Method::GET, "/beacon/heads") => beacon::get_heads::<T>(req, beacon_chain),
into_boxfut(beacon::get_heads::<T>(req, beacon_chain)) (&Method::GET, "/beacon/block") => beacon::get_block::<T>(req, beacon_chain),
} (&Method::GET, "/beacon/block_root") => beacon::get_block_root::<T>(req, beacon_chain),
(&Method::GET, "/beacon/block") => { (&Method::GET, "/beacon/fork") => beacon::get_fork::<T>(req, beacon_chain),
into_boxfut(beacon::get_block::<T>(req, beacon_chain)) (&Method::GET, "/beacon/genesis_time") => beacon::get_genesis_time::<T>(req, beacon_chain),
} (&Method::GET, "/beacon/genesis_validators_root") => {
(&Method::GET, "/beacon/block_root") => { beacon::get_genesis_validators_root::<T>(req, beacon_chain)
into_boxfut(beacon::get_block_root::<T>(req, beacon_chain)) }
} (&Method::GET, "/beacon/validators") => beacon::get_validators::<T>(req, beacon_chain),
(&Method::GET, "/beacon/fork") => into_boxfut(beacon::get_fork::<T>(req, beacon_chain)), (&Method::POST, "/beacon/validators") => {
(&Method::GET, "/beacon/genesis_time") => { beacon::post_validators::<T>(req, beacon_chain).await
into_boxfut(beacon::get_genesis_time::<T>(req, beacon_chain)) }
} (&Method::GET, "/beacon/validators/all") => {
(&Method::GET, "/beacon/genesis_validators_root") => { beacon::get_all_validators::<T>(req, beacon_chain)
into_boxfut(beacon::get_genesis_validators_root::<T>(req, beacon_chain)) }
} (&Method::GET, "/beacon/validators/active") => {
(&Method::GET, "/beacon/validators") => { beacon::get_active_validators::<T>(req, beacon_chain)
into_boxfut(beacon::get_validators::<T>(req, beacon_chain)) }
} (&Method::GET, "/beacon/state") => beacon::get_state::<T>(req, beacon_chain),
(&Method::POST, "/beacon/validators") => { (&Method::GET, "/beacon/state_root") => beacon::get_state_root::<T>(req, beacon_chain),
into_boxfut(beacon::post_validators::<T>(req, beacon_chain)) (&Method::GET, "/beacon/state/genesis") => {
} beacon::get_genesis_state::<T>(req, beacon_chain)
(&Method::GET, "/beacon/validators/all") => { }
into_boxfut(beacon::get_all_validators::<T>(req, beacon_chain)) (&Method::GET, "/beacon/committees") => beacon::get_committees::<T>(req, beacon_chain),
} (&Method::POST, "/beacon/proposer_slashing") => {
(&Method::GET, "/beacon/validators/active") => { beacon::proposer_slashing::<T>(req, beacon_chain).await
into_boxfut(beacon::get_active_validators::<T>(req, beacon_chain)) }
} (&Method::POST, "/beacon/attester_slashing") => {
(&Method::GET, "/beacon/state") => { beacon::attester_slashing::<T>(req, beacon_chain).await
into_boxfut(beacon::get_state::<T>(req, beacon_chain)) }
}
(&Method::GET, "/beacon/state_root") => {
into_boxfut(beacon::get_state_root::<T>(req, beacon_chain))
}
(&Method::GET, "/beacon/state/genesis") => {
into_boxfut(beacon::get_genesis_state::<T>(req, beacon_chain))
}
(&Method::GET, "/beacon/committees") => {
into_boxfut(beacon::get_committees::<T>(req, beacon_chain))
}
(&Method::POST, "/beacon/proposer_slashing") => {
into_boxfut(beacon::proposer_slashing::<T>(req, beacon_chain))
}
(&Method::POST, "/beacon/attester_slashing") => {
into_boxfut(beacon::attester_slashing::<T>(req, beacon_chain))
}
// Methods for Validator // Methods for Validator
(&Method::POST, "/validator/duties") => { (&Method::POST, "/validator/duties") => {
let timer = let timer = metrics::start_timer(&metrics::VALIDATOR_GET_DUTIES_REQUEST_RESPONSE_TIME);
metrics::start_timer(&metrics::VALIDATOR_GET_DUTIES_REQUEST_RESPONSE_TIME); let response = validator::post_validator_duties::<T>(req, beacon_chain);
let response = validator::post_validator_duties::<T>(req, beacon_chain); drop(timer);
drop(timer); response.await
into_boxfut(response) }
} (&Method::POST, "/validator/subscribe") => {
(&Method::POST, "/validator/subscribe") => { validator::post_validator_subscriptions::<T>(req, network_channel).await
validator::post_validator_subscriptions::<T>(req, network_channel) }
} (&Method::GET, "/validator/duties/all") => {
(&Method::GET, "/validator/duties/all") => { validator::get_all_validator_duties::<T>(req, beacon_chain)
into_boxfut(validator::get_all_validator_duties::<T>(req, beacon_chain)) }
} (&Method::GET, "/validator/duties/active") => {
(&Method::GET, "/validator/duties/active") => into_boxfut( validator::get_active_validator_duties::<T>(req, beacon_chain)
validator::get_active_validator_duties::<T>(req, beacon_chain), }
), (&Method::GET, "/validator/block") => {
(&Method::GET, "/validator/block") => { let timer = metrics::start_timer(&metrics::VALIDATOR_GET_BLOCK_REQUEST_RESPONSE_TIME);
let timer = let response = validator::get_new_beacon_block::<T>(req, beacon_chain, log);
metrics::start_timer(&metrics::VALIDATOR_GET_BLOCK_REQUEST_RESPONSE_TIME); drop(timer);
let response = validator::get_new_beacon_block::<T>(req, beacon_chain, log); response
drop(timer); }
into_boxfut(response) (&Method::POST, "/validator/block") => {
} validator::publish_beacon_block::<T>(req, beacon_chain, network_channel, log).await
(&Method::POST, "/validator/block") => { }
validator::publish_beacon_block::<T>(req, beacon_chain, network_channel, log) (&Method::GET, "/validator/attestation") => {
} let timer =
(&Method::GET, "/validator/attestation") => { metrics::start_timer(&metrics::VALIDATOR_GET_ATTESTATION_REQUEST_RESPONSE_TIME);
let timer = let response = validator::get_new_attestation::<T>(req, beacon_chain);
metrics::start_timer(&metrics::VALIDATOR_GET_ATTESTATION_REQUEST_RESPONSE_TIME); drop(timer);
let response = validator::get_new_attestation::<T>(req, beacon_chain); response
drop(timer); }
into_boxfut(response) (&Method::GET, "/validator/aggregate_attestation") => {
} validator::get_aggregate_attestation::<T>(req, beacon_chain)
(&Method::GET, "/validator/aggregate_attestation") => { }
into_boxfut(validator::get_aggregate_attestation::<T>(req, beacon_chain)) (&Method::POST, "/validator/attestations") => {
} validator::publish_attestations::<T>(req, beacon_chain, network_channel, log).await
(&Method::POST, "/validator/attestations") => { }
validator::publish_attestations::<T>(req, beacon_chain, network_channel, log) (&Method::POST, "/validator/aggregate_and_proofs") => {
} validator::publish_aggregate_and_proofs::<T>(req, beacon_chain, network_channel, log)
(&Method::POST, "/validator/aggregate_and_proofs") => { .await
validator::publish_aggregate_and_proofs::<T>( }
req,
beacon_chain,
network_channel,
log,
)
}
// Methods for consensus // Methods for consensus
(&Method::GET, "/consensus/global_votes") => { (&Method::GET, "/consensus/global_votes") => {
into_boxfut(consensus::get_vote_count::<T>(req, beacon_chain)) consensus::get_vote_count::<T>(req, beacon_chain)
} }
(&Method::POST, "/consensus/individual_votes") => { (&Method::POST, "/consensus/individual_votes") => {
consensus::post_individual_votes::<T>(req, beacon_chain) consensus::post_individual_votes::<T>(req, beacon_chain).await
} }
// Methods for bootstrap and checking configuration // Methods for bootstrap and checking configuration
(&Method::GET, "/spec") => into_boxfut(spec::get_spec::<T>(req, beacon_chain)), (&Method::GET, "/spec") => spec::get_spec::<T>(req, beacon_chain),
(&Method::GET, "/spec/slots_per_epoch") => { (&Method::GET, "/spec/slots_per_epoch") => spec::get_slots_per_epoch::<T>(req),
into_boxfut(spec::get_slots_per_epoch::<T>(req)) (&Method::GET, "/spec/deposit_contract") => helpers::implementation_pending_response(req),
} (&Method::GET, "/spec/eth2_config") => spec::get_eth2_config::<T>(req, eth2_config),
(&Method::GET, "/spec/deposit_contract") => {
into_boxfut(helpers::implementation_pending_response(req))
}
(&Method::GET, "/spec/eth2_config") => {
into_boxfut(spec::get_eth2_config::<T>(req, eth2_config))
}
// Methods for advanced parameters // Methods for advanced parameters
(&Method::GET, "/advanced/fork_choice") => { (&Method::GET, "/advanced/fork_choice") => {
into_boxfut(advanced::get_fork_choice::<T>(req, beacon_chain)) advanced::get_fork_choice::<T>(req, beacon_chain)
} }
(&Method::GET, "/advanced/operation_pool") => { (&Method::GET, "/advanced/operation_pool") => {
into_boxfut(advanced::get_operation_pool::<T>(req, beacon_chain)) advanced::get_operation_pool::<T>(req, beacon_chain)
} }
(&Method::GET, "/metrics") => into_boxfut(metrics::get_prometheus::<T>(
req,
beacon_chain,
db_path,
freezer_db_path,
)),
// Lighthouse specific (&Method::GET, "/metrics") => {
(&Method::GET, "/lighthouse/syncing") => { metrics::get_prometheus::<T>(req, beacon_chain, db_path, freezer_db_path)
into_boxfut(lighthouse::syncing::<T::EthSpec>(req, network_globals)) }
}
(&Method::GET, "/lighthouse/peers") => { // Lighthouse specific
into_boxfut(lighthouse::peers::<T::EthSpec>(req, network_globals)) (&Method::GET, "/lighthouse/syncing") => {
} lighthouse::syncing::<T::EthSpec>(req, network_globals)
(&Method::GET, "/lighthouse/connected_peers") => into_boxfut( }
lighthouse::connected_peers::<T::EthSpec>(req, network_globals),
), (&Method::GET, "/lighthouse/peers") => {
_ => Box::new(futures::future::err(ApiError::NotFound( lighthouse::peers::<T::EthSpec>(req, network_globals)
"Request path and/or method not found.".to_owned(), }
))),
}; (&Method::GET, "/lighthouse/connected_peers") => {
lighthouse::connected_peers::<T::EthSpec>(req, network_globals)
}
_ => Err(ApiError::NotFound(
"Request path and/or method not found.".to_owned(),
)),
};
// Map the Rust-friendly `Result` in to a http-friendly response. In effect, this ensures that // Map the Rust-friendly `Result` in to a http-friendly response. In effect, this ensures that
// any `Err` returned from our response handlers becomes a valid http response to the client // any `Err` returned from our response handlers becomes a valid http response to the client
// (e.g., a response with a 404 or 500 status). // (e.g., a response with a 404 or 500 status).
request_result.then(move |result| { let duration = Instant::now().duration_since(received_instant);
let duration = Instant::now().duration_since(received_instant); match request_result {
match result { Ok(response) => {
Ok(response) => { debug!(
debug!( local_log,
local_log, "HTTP API request successful";
"HTTP API request successful"; "path" => path,
"path" => path, "duration_ms" => duration.as_millis()
"duration_ms" => duration.as_millis() );
); metrics::inc_counter(&metrics::SUCCESS_COUNT);
metrics::inc_counter(&metrics::SUCCESS_COUNT); metrics::stop_timer(timer);
metrics::stop_timer(timer);
Ok(response) Ok(response)
}
Err(e) => {
let error_response = e.into();
debug!(
local_log,
"HTTP API request failure";
"path" => path,
"duration_ms" => duration.as_millis()
);
metrics::stop_timer(timer);
Ok(error_response)
}
} }
}) Err(e) => {
let error_response = e.into();
debug!(
local_log,
"HTTP API request failure";
"path" => path,
"duration_ms" => duration.as_millis()
);
metrics::stop_timer(timer);
Ok(error_response)
}
}
} }

View File

@@ -1,13 +1,12 @@
use crate::helpers::{check_content_type_for_json, publish_beacon_block_to_network}; use crate::helpers::{check_content_type_for_json, publish_beacon_block_to_network};
use crate::response_builder::ResponseBuilder; use crate::response_builder::ResponseBuilder;
use crate::{ApiError, ApiResult, BoxFut, NetworkChannel, UrlQuery}; use crate::{ApiError, ApiResult, NetworkChannel, UrlQuery};
use beacon_chain::{ use beacon_chain::{
attestation_verification::Error as AttnError, BeaconChain, BeaconChainTypes, BlockError, attestation_verification::Error as AttnError, BeaconChain, BeaconChainTypes, BlockError,
StateSkipConfig, StateSkipConfig,
}; };
use bls::PublicKeyBytes; use bls::PublicKeyBytes;
use eth2_libp2p::PubsubMessage; use eth2_libp2p::PubsubMessage;
use futures::{Future, Stream};
use hyper::{Body, Request}; use hyper::{Body, Request};
use network::NetworkMessage; use network::NetworkMessage;
use rayon::prelude::*; use rayon::prelude::*;
@@ -23,23 +22,23 @@ use types::{
/// HTTP Handler to retrieve the duties for a set of validators during a particular epoch. This /// HTTP Handler to retrieve the duties for a set of validators during a particular epoch. This
/// method allows for collecting bulk sets of validator duties without risking exceeding the max /// method allows for collecting bulk sets of validator duties without risking exceeding the max
/// URL length with query pairs. /// URL length with query pairs.
pub fn post_validator_duties<T: BeaconChainTypes>( pub async fn post_validator_duties<T: BeaconChainTypes>(
req: Request<Body>, req: Request<Body>,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
) -> BoxFut { ) -> ApiResult {
let response_builder = ResponseBuilder::new(&req); let response_builder = ResponseBuilder::new(&req);
let future = req let body = req.into_body();
.into_body() let chunks = hyper::body::to_bytes(body)
.concat2() .await
.map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e))) .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?;
.and_then(|chunks| {
serde_json::from_slice::<ValidatorDutiesRequest>(&chunks).map_err(|e| { serde_json::from_slice::<ValidatorDutiesRequest>(&chunks)
ApiError::BadRequest(format!( .map_err(|e| {
"Unable to parse JSON into ValidatorDutiesRequest: {:?}", ApiError::BadRequest(format!(
e "Unable to parse JSON into ValidatorDutiesRequest: {:?}",
)) e
}) ))
}) })
.and_then(|bulk_request| { .and_then(|bulk_request| {
return_validator_duties( return_validator_duties(
@@ -48,45 +47,42 @@ pub fn post_validator_duties<T: BeaconChainTypes>(
bulk_request.pubkeys.into_iter().map(Into::into).collect(), bulk_request.pubkeys.into_iter().map(Into::into).collect(),
) )
}) })
.and_then(|duties| response_builder?.body_no_ssz(&duties)); .and_then(|duties| response_builder?.body_no_ssz(&duties))
Box::new(future)
} }
/// HTTP Handler to retrieve subscriptions for a set of validators. This allows the node to /// HTTP Handler to retrieve subscriptions for a set of validators. This allows the node to
/// organise peer discovery and topic subscription for known validators. /// organise peer discovery and topic subscription for known validators.
pub fn post_validator_subscriptions<T: BeaconChainTypes>( pub async fn post_validator_subscriptions<T: BeaconChainTypes>(
req: Request<Body>, req: Request<Body>,
mut network_chan: NetworkChannel<T::EthSpec>, network_chan: NetworkChannel<T::EthSpec>,
) -> BoxFut { ) -> ApiResult {
try_future!(check_content_type_for_json(&req)); try_future!(check_content_type_for_json(&req));
let response_builder = ResponseBuilder::new(&req); let response_builder = ResponseBuilder::new(&req);
let body = req.into_body(); let body = req.into_body();
Box::new( let chunks = hyper::body::to_bytes(body)
body.concat2() .await
.map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e))) .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?;
.and_then(|chunks| {
serde_json::from_slice(&chunks).map_err(|e| { serde_json::from_slice(&chunks)
ApiError::BadRequest(format!( .map_err(|e| {
"Unable to parse JSON into ValidatorSubscriptions: {:?}", ApiError::BadRequest(format!(
"Unable to parse JSON into ValidatorSubscriptions: {:?}",
e
))
})
.and_then(move |subscriptions: Vec<ValidatorSubscription>| {
network_chan
.send(NetworkMessage::Subscribe { subscriptions })
.map_err(|e| {
ApiError::ServerError(format!(
"Unable to subscriptions to the network: {:?}",
e e
)) ))
}) })?;
}) Ok(())
.and_then(move |subscriptions: Vec<ValidatorSubscription>| { })
network_chan .and_then(|_| response_builder?.body_no_ssz(&()))
.try_send(NetworkMessage::Subscribe { subscriptions })
.map_err(|e| {
ApiError::ServerError(format!(
"Unable to subscriptions to the network: {:?}",
e
))
})?;
Ok(())
})
.and_then(|_| response_builder?.body_no_ssz(&())),
)
} }
/// HTTP Handler to retrieve all validator duties for the given epoch. /// HTTP Handler to retrieve all validator duties for the given epoch.
@@ -291,24 +287,23 @@ pub fn get_new_beacon_block<T: BeaconChainTypes>(
} }
/// HTTP Handler to publish a SignedBeaconBlock, which has been signed by a validator. /// HTTP Handler to publish a SignedBeaconBlock, which has been signed by a validator.
pub fn publish_beacon_block<T: BeaconChainTypes>( pub async fn publish_beacon_block<T: BeaconChainTypes>(
req: Request<Body>, req: Request<Body>,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
network_chan: NetworkChannel<T::EthSpec>, network_chan: NetworkChannel<T::EthSpec>,
log: Logger, log: Logger,
) -> BoxFut { ) -> ApiResult {
try_future!(check_content_type_for_json(&req)); try_future!(check_content_type_for_json(&req));
let response_builder = ResponseBuilder::new(&req); let response_builder = ResponseBuilder::new(&req);
let body = req.into_body(); let body = req.into_body();
Box::new( let chunks = hyper::body::to_bytes(body)
body.concat2() .await
.map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e))) .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?;
.and_then(|chunks| {
serde_json::from_slice(&chunks).map_err(|e| { serde_json::from_slice(&chunks).map_err(|e| {
ApiError::BadRequest(format!("Unable to parse JSON into SignedBeaconBlock: {:?}", e)) ApiError::BadRequest(format!("Unable to parse JSON into SignedBeaconBlock: {:?}", e))
}) })
})
.and_then(move |block: SignedBeaconBlock<T::EthSpec>| { .and_then(move |block: SignedBeaconBlock<T::EthSpec>| {
let slot = block.slot(); let slot = block.slot();
match beacon_chain.process_block(block.clone()) { match beacon_chain.process_block(block.clone()) {
@@ -382,7 +377,6 @@ pub fn publish_beacon_block<T: BeaconChainTypes>(
} }
}) })
.and_then(|_| response_builder?.body_no_ssz(&())) .and_then(|_| response_builder?.body_no_ssz(&()))
)
} }
/// HTTP Handler to produce a new Attestation from the current state, ready to be signed by a validator. /// HTTP Handler to produce a new Attestation from the current state, ready to be signed by a validator.
@@ -424,59 +418,56 @@ pub fn get_aggregate_attestation<T: BeaconChainTypes>(
} }
/// HTTP Handler to publish a list of Attestations, which have been signed by a number of validators. /// HTTP Handler to publish a list of Attestations, which have been signed by a number of validators.
pub fn publish_attestations<T: BeaconChainTypes>( pub async fn publish_attestations<T: BeaconChainTypes>(
req: Request<Body>, req: Request<Body>,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
network_chan: NetworkChannel<T::EthSpec>, network_chan: NetworkChannel<T::EthSpec>,
log: Logger, log: Logger,
) -> BoxFut { ) -> ApiResult {
try_future!(check_content_type_for_json(&req)); try_future!(check_content_type_for_json(&req));
let response_builder = ResponseBuilder::new(&req); let response_builder = ResponseBuilder::new(&req);
Box::new( let body = req.into_body();
req.into_body() let chunk = hyper::body::to_bytes(body)
.concat2() .await
.map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e))) .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?;
.map(|chunk| chunk.iter().cloned().collect::<Vec<u8>>())
.and_then(|chunks| { let chunks = chunk.iter().cloned().collect::<Vec<u8>>();
serde_json::from_slice(&chunks.as_slice()).map_err(|e| { serde_json::from_slice(&chunks.as_slice())
ApiError::BadRequest(format!( .map_err(|e| {
"Unable to deserialize JSON into a list of attestations: {:?}", ApiError::BadRequest(format!(
e "Unable to deserialize JSON into a list of attestations: {:?}",
)) e
))
})
// Process all of the aggregates _without_ exiting early if one fails.
.map(move |attestations: Vec<Attestation<T::EthSpec>>| {
attestations
.into_par_iter()
.enumerate()
.map(|(i, attestation)| {
process_unaggregated_attestation(
&beacon_chain,
network_chan.clone(),
attestation,
i,
&log,
)
}) })
}) .collect::<Vec<Result<_, _>>>()
// Process all of the aggregates _without_ exiting early if one fails. })
.map(move |attestations: Vec<Attestation<T::EthSpec>>| { // Iterate through all the results and return on the first `Err`.
attestations //
.into_par_iter() // Note: this will only provide info about the _first_ failure, not all failures.
.enumerate() .and_then(|processing_results| processing_results.into_iter().try_for_each(|result| result))
.map(|(i, attestation)| { .and_then(|_| response_builder?.body_no_ssz(&()))
process_unaggregated_attestation(
&beacon_chain,
network_chan.clone(),
attestation,
i,
&log,
)
})
.collect::<Vec<Result<_, _>>>()
})
// Iterate through all the results and return on the first `Err`.
//
// Note: this will only provide info about the _first_ failure, not all failures.
.and_then(|processing_results| {
processing_results.into_iter().try_for_each(|result| result)
})
.and_then(|_| response_builder?.body_no_ssz(&())),
)
} }
/// Processes an unaggregrated attestation that was included in a list of attestations with the /// Processes an unaggregrated attestation that was included in a list of attestations with the
/// index `i`. /// index `i`.
fn process_unaggregated_attestation<T: BeaconChainTypes>( fn process_unaggregated_attestation<T: BeaconChainTypes>(
beacon_chain: &BeaconChain<T>, beacon_chain: &BeaconChain<T>,
mut network_chan: NetworkChannel<T::EthSpec>, network_chan: NetworkChannel<T::EthSpec>,
attestation: Attestation<T::EthSpec>, attestation: Attestation<T::EthSpec>,
i: usize, i: usize,
log: &Logger, log: &Logger,
@@ -496,7 +487,7 @@ fn process_unaggregated_attestation<T: BeaconChainTypes>(
})?; })?;
// Publish the attestation to the network // Publish the attestation to the network
if let Err(e) = network_chan.try_send(NetworkMessage::Publish { if let Err(e) = network_chan.send(NetworkMessage::Publish {
messages: vec![PubsubMessage::Attestation(Box::new(( messages: vec![PubsubMessage::Attestation(Box::new((
attestation attestation
.subnet_id(&beacon_chain.spec) .subnet_id(&beacon_chain.spec)
@@ -542,61 +533,56 @@ fn process_unaggregated_attestation<T: BeaconChainTypes>(
} }
/// HTTP Handler to publish an Attestation, which has been signed by a validator. /// HTTP Handler to publish an Attestation, which has been signed by a validator.
pub fn publish_aggregate_and_proofs<T: BeaconChainTypes>( pub async fn publish_aggregate_and_proofs<T: BeaconChainTypes>(
req: Request<Body>, req: Request<Body>,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
network_chan: NetworkChannel<T::EthSpec>, network_chan: NetworkChannel<T::EthSpec>,
log: Logger, log: Logger,
) -> BoxFut { ) -> ApiResult {
try_future!(check_content_type_for_json(&req)); try_future!(check_content_type_for_json(&req));
let response_builder = ResponseBuilder::new(&req); let response_builder = ResponseBuilder::new(&req);
let body = req.into_body();
Box::new( let chunk = hyper::body::to_bytes(body)
req.into_body() .await
.concat2() .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?;
.map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e))) let chunks = chunk.iter().cloned().collect::<Vec<u8>>();
.map(|chunk| chunk.iter().cloned().collect::<Vec<u8>>()) serde_json::from_slice(&chunks.as_slice())
.and_then(|chunks| { .map_err(|e| {
serde_json::from_slice(&chunks.as_slice()).map_err(|e| { ApiError::BadRequest(format!(
ApiError::BadRequest(format!( "Unable to deserialize JSON into a list of SignedAggregateAndProof: {:?}",
"Unable to deserialize JSON into a list of SignedAggregateAndProof: {:?}", e
e ))
)) })
}) // Process all of the aggregates _without_ exiting early if one fails.
}) .map(
// Process all of the aggregates _without_ exiting early if one fails. move |signed_aggregates: Vec<SignedAggregateAndProof<T::EthSpec>>| {
.map( signed_aggregates
move |signed_aggregates: Vec<SignedAggregateAndProof<T::EthSpec>>| { .into_par_iter()
signed_aggregates .enumerate()
.into_par_iter() .map(|(i, signed_aggregate)| {
.enumerate() process_aggregated_attestation(
.map(|(i, signed_aggregate)| { &beacon_chain,
process_aggregated_attestation( network_chan.clone(),
&beacon_chain, signed_aggregate,
network_chan.clone(), i,
signed_aggregate, &log,
i, )
&log, })
) .collect::<Vec<Result<_, _>>>()
}) },
.collect::<Vec<Result<_, _>>>() )
}, // Iterate through all the results and return on the first `Err`.
) //
// Iterate through all the results and return on the first `Err`. // Note: this will only provide info about the _first_ failure, not all failures.
// .and_then(|processing_results| processing_results.into_iter().try_for_each(|result| result))
// Note: this will only provide info about the _first_ failure, not all failures. .and_then(|_| response_builder?.body_no_ssz(&()))
.and_then(|processing_results| {
processing_results.into_iter().try_for_each(|result| result)
})
.and_then(|_| response_builder?.body_no_ssz(&())),
)
} }
/// Processes an aggregrated attestation that was included in a list of attestations with the index /// Processes an aggregrated attestation that was included in a list of attestations with the index
/// `i`. /// `i`.
fn process_aggregated_attestation<T: BeaconChainTypes>( fn process_aggregated_attestation<T: BeaconChainTypes>(
beacon_chain: &BeaconChain<T>, beacon_chain: &BeaconChain<T>,
mut network_chan: NetworkChannel<T::EthSpec>, network_chan: NetworkChannel<T::EthSpec>,
signed_aggregate: SignedAggregateAndProof<T::EthSpec>, signed_aggregate: SignedAggregateAndProof<T::EthSpec>,
i: usize, i: usize,
log: &Logger, log: &Logger,
@@ -643,7 +629,7 @@ fn process_aggregated_attestation<T: BeaconChainTypes>(
}; };
// Publish the attestation to the network // Publish the attestation to the network
if let Err(e) = network_chan.try_send(NetworkMessage::Publish { if let Err(e) = network_chan.send(NetworkMessage::Publish {
messages: vec![PubsubMessage::AggregateAndProofAttestation(Box::new( messages: vec![PubsubMessage::AggregateAndProofAttestation(Box::new(
signed_aggregate, signed_aggregate,
))], ))],