From 0136eb33b092a8ab474c5ddf3db666a542dfda0b Mon Sep 17 00:00:00 2001 From: Luke Anderson Date: Mon, 9 Sep 2019 12:54:14 +1000 Subject: [PATCH] WIP: Added POST functionality for pusblish_beacon_block. Currently doesn't compile, struggling with the borrow checker. --- beacon_node/client/src/lib.rs | 1 + beacon_node/rest_api/src/helpers.rs | 42 +++++++++++++++++++- beacon_node/rest_api/src/lib.rs | 8 +++- beacon_node/rest_api/src/validator.rs | 57 +++++++++++++++++++++++++-- 4 files changed, 103 insertions(+), 5 deletions(-) diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index afcd538b58..f26a5503c1 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -215,6 +215,7 @@ where executor, beacon_chain.clone(), network.clone(), + network_send.clone(), client_config.db_path().expect("unable to read datadir"), eth2_config.clone(), &log, diff --git a/beacon_node/rest_api/src/helpers.rs b/beacon_node/rest_api/src/helpers.rs index 76fc78750c..d6ea0397f7 100644 --- a/beacon_node/rest_api/src/helpers.rs +++ b/beacon_node/rest_api/src/helpers.rs @@ -1,11 +1,17 @@ use crate::{ApiError, ApiResult}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use bls::PublicKey; +use eth2_libp2p::{PubsubMessage, Topic}; +use eth2_libp2p::{BEACON_BLOCK_TOPIC, TOPIC_ENCODING_POSTFIX, TOPIC_PREFIX}; use hex; use hyper::{Body, Request}; +use network::NetworkMessage; +use ssz::Encode; +use std::borrow::BorrowMut; use std::sync::Arc; use store::{iter::AncestorIter, Store}; -use types::{BeaconState, EthSpec, Hash256, RelativeEpoch, Slot}; +use tokio::sync::mpsc; +use types::{BeaconBlock, BeaconState, EthSpec, Hash256, RelativeEpoch, Slot}; /// Parse a slot from a `0x` preixed string. /// @@ -197,6 +203,40 @@ pub fn get_logger_from_request(req: &Request) -> slog::Logger { log.to_owned() } +pub fn publish_beacon_block_to_network( + req: &Request, + block: BeaconBlock, +) -> Result<(), ApiError> { + // Get the network service from the request + let mut network_chan = req + .extensions() + .get::>() + .expect( + "Should always get the network channel from the request, since we put it in there.", + ); + + // create the network topic to send on + let topic_string = format!( + "/{}/{}/{}", + TOPIC_PREFIX, BEACON_BLOCK_TOPIC, TOPIC_ENCODING_POSTFIX + ); + let topic = Topic::new(topic_string); + let message = PubsubMessage::Block(block.as_ssz_bytes()); + + // Publish the block to the p2p network via gossipsub. + if let Err(e) = &network_chan.try_send(NetworkMessage::Publish { + topics: vec![topic], + message: message, + }) { + return Err(ApiError::ServerError(format!( + "Unable to send new block to network: {:?}", + e + ))); + } + + Ok(()) +} + #[cfg(test)] mod test { use super::*; diff --git a/beacon_node/rest_api/src/lib.rs b/beacon_node/rest_api/src/lib.rs index 02c68c6394..c0927dde35 100644 --- a/beacon_node/rest_api/src/lib.rs +++ b/beacon_node/rest_api/src/lib.rs @@ -14,6 +14,7 @@ mod url_query; mod validator; use beacon_chain::{BeaconChain, BeaconChainTypes}; +use client_network::NetworkMessage; use client_network::Service as NetworkService; use eth2_config::Eth2Config; use hyper::rt::Future; @@ -25,6 +26,7 @@ use std::ops::Deref; use std::path::PathBuf; use std::sync::Arc; use tokio::runtime::TaskExecutor; +use tokio::sync::mpsc; use url_query::UrlQuery; pub use beacon::{BlockResponse, HeadResponse, StateResponse}; @@ -83,6 +85,7 @@ pub fn start_server( executor: &TaskExecutor, beacon_chain: Arc>, network_service: Arc>, + network_chan: mpsc::UnboundedSender, db_path: PathBuf, eth2_config: Eth2Config, log: &slog::Logger, @@ -113,6 +116,7 @@ pub fn start_server( let beacon_chain = server_bc.clone(); let db_path = db_path.clone(); let network_service = network_service.clone(); + let network_chan = network_chan.clone(); let eth2_config = eth2_config.clone(); // Create a simple handler for the router, inject our stateful objects into the request. @@ -126,6 +130,8 @@ pub fn start_server( req.extensions_mut().insert::(db_path.clone()); req.extensions_mut() .insert::>>(network_service.clone()); + req.extensions_mut() + .insert::>(network_chan.clone()); req.extensions_mut() .insert::>(eth2_config.clone()); @@ -177,7 +183,7 @@ pub fn start_server( validator::get_new_beacon_block::(req) } (&Method::POST, "/beacon/validator/block") => { - helpers::implementation_pending_response(req) + validator::publish_beacon_block::(req) } (&Method::GET, "/beacon/validator/attestation") => { validator::get_new_attestation::(req) diff --git a/beacon_node/rest_api/src/validator.rs b/beacon_node/rest_api/src/validator.rs index 49b4c04419..632aee0ac6 100644 --- a/beacon_node/rest_api/src/validator.rs +++ b/beacon_node/rest_api/src/validator.rs @@ -1,11 +1,14 @@ use super::{success_response, ApiResult}; use crate::{helpers::*, ApiError, UrlQuery}; -use beacon_chain::BeaconChainTypes; +use beacon_chain::{BeaconChainTypes, BlockProcessingOutcome}; use bls::{AggregateSignature, PublicKey, Signature}; -use hyper::{Body, Request}; +use futures::future::Future; +use futures::stream::Stream; +use hyper::{Body, Error, Request}; use serde::{Deserialize, Serialize}; +use slog::info; use types::beacon_state::EthSpec; -use types::{Attestation, BitList, Epoch, RelativeEpoch, Shard, Slot}; +use types::{Attestation, BeaconBlock, BitList, Epoch, RelativeEpoch, Shard, Slot}; #[derive(Debug, Serialize, Deserialize)] pub struct ValidatorDuty { @@ -195,6 +198,54 @@ pub fn get_new_beacon_block(req: Request) - Ok(success_response(body)) } +/// HTTP Handler to publish a BeaconBlock, which has been signed by a validator. +pub fn publish_beacon_block(req: Request) -> ApiResult { + let log = get_logger_from_request(&req); + let (beacon_chain, _head_state) = get_beacon_chain_from_request::(&req)?; + + let (_head, body) = req.into_parts(); + let block_future = body + .fold(Vec::new(), |mut acc, chunk| { + acc.extend_from_slice(&*chunk); + futures::future::ok::<_, Error>(acc) + }) + .map_err(|e| ApiError::ServerError(format!("Unable parse request body: {:?}", e))) + .and_then(|body| { + let block_result: Result, ApiError> = + serde_json::from_slice(&body.as_slice()).map_err(|e| { + ApiError::InvalidQueryParams(format!( + "Unable to deserialize JSON into a BeaconBlock: {:?}", + e + )) + }); + block_result + }); + let block = block_future.wait()?; + match beacon_chain.process_block(block.clone()) { + Ok(BlockProcessingOutcome::Processed { + block_root: block_root, + }) => { + // Block was processed, publish via gossipsub + info!(log, "Processed valid block from API"; "block_slot" => block.slot, "block_root" => format!("{}", block_root)); + publish_beacon_block_to_network::(&req, block)?; + } + Ok(outcome) => { + return Err(ApiError::InvalidQueryParams(format!( + "The BeaconBlock could not be processed: {:?}", + outcome + ))); + } + Err(e) => { + return Err(ApiError::ServerError(format!( + "Unable to process block: {:?}", + e + ))); + } + } + + Ok(success_response(Body::empty())) +} + /// HTTP Handler to produce a new Attestation from the current state, ready to be signed by a validator. pub fn get_new_attestation(req: Request) -> ApiResult { let (beacon_chain, head_state) = get_beacon_chain_from_request::(&req)?;