diff --git a/beacon_node/rest_api/src/error.rs b/beacon_node/rest_api/src/error.rs index 70384dce9a..9f815a7d3b 100644 --- a/beacon_node/rest_api/src/error.rs +++ b/beacon_node/rest_api/src/error.rs @@ -10,7 +10,8 @@ pub enum ApiError { BadRequest(String), NotFound(String), UnsupportedType(String), - ImATeapot(String), // Just in case. + ImATeapot(String), // Just in case. + ProcessingError(String), // A 202 error, for when a block/attestation cannot be processed, but still transmitted. } pub type ApiResult = Result, ApiError>; @@ -25,6 +26,7 @@ impl ApiError { ApiError::NotFound(desc) => (StatusCode::NOT_FOUND, desc), ApiError::UnsupportedType(desc) => (StatusCode::UNSUPPORTED_MEDIA_TYPE, desc), ApiError::ImATeapot(desc) => (StatusCode::IM_A_TEAPOT, desc), + ApiError::ProcessingError(desc) => (StatusCode::ACCEPTED, desc), } } } @@ -34,7 +36,7 @@ impl Into> for ApiError { let status_code = self.status_code(); Response::builder() .status(status_code.0) - .header("content-type", "text/plain") + .header("content-type", "text/plain; charset=utf-8") .body(Body::from(status_code.1)) .expect("Response should always be created.") } diff --git a/beacon_node/rest_api/src/helpers.rs b/beacon_node/rest_api/src/helpers.rs index 3f76f4e257..a711246b0d 100644 --- a/beacon_node/rest_api/src/helpers.rs +++ b/beacon_node/rest_api/src/helpers.rs @@ -2,7 +2,9 @@ use crate::{ApiError, ApiResult}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use bls::PublicKey; use eth2_libp2p::{PubsubMessage, Topic}; -use eth2_libp2p::{BEACON_BLOCK_TOPIC, TOPIC_ENCODING_POSTFIX, TOPIC_PREFIX}; +use eth2_libp2p::{ + BEACON_ATTESTATION_TOPIC, BEACON_BLOCK_TOPIC, TOPIC_ENCODING_POSTFIX, TOPIC_PREFIX, +}; use hex; use http::header; use hyper::{Body, Request}; @@ -12,7 +14,7 @@ use ssz::Encode; use std::sync::Arc; use store::{iter::AncestorIter, Store}; use tokio::sync::mpsc; -use types::{BeaconBlock, BeaconState, EthSpec, Hash256, RelativeEpoch, Slot}; +use types::{Attestation, BeaconBlock, BeaconState, EthSpec, Hash256, RelativeEpoch, Slot}; /// Parse a slot from a `0x` preixed string. /// @@ -227,7 +229,7 @@ pub fn publish_beacon_block_to_network( // Publish the block to the p2p network via gossipsub. if let Err(e) = chan.write().try_send(NetworkMessage::Publish { topics: vec![topic], - message: message, + message, }) { return Err(ApiError::ServerError(format!( "Unable to send new block to network: {:?}", @@ -238,6 +240,32 @@ pub fn publish_beacon_block_to_network( Ok(()) } +pub fn publish_attestation_to_network( + chan: Arc>>, + attestation: Attestation, +) -> Result<(), ApiError> { + // create the network topic to send on + let topic_string = format!( + "/{}/{}/{}", + TOPIC_PREFIX, BEACON_ATTESTATION_TOPIC, TOPIC_ENCODING_POSTFIX + ); + let topic = Topic::new(topic_string); + let message = PubsubMessage::Attestation(attestation.as_ssz_bytes()); + + // Publish the attestation to the p2p network via gossipsub. + if let Err(e) = chan.write().try_send(NetworkMessage::Publish { + topics: vec![topic], + message, + }) { + return Err(ApiError::ServerError(format!( + "Unable to send new attestation to network: {:?}", + e + ))); + } + + Ok(()) +} + #[cfg(test)] mod test { use super::*; diff --git a/beacon_node/rest_api/src/lib.rs b/beacon_node/rest_api/src/lib.rs index 35678391a3..133fc3a266 100644 --- a/beacon_node/rest_api/src/lib.rs +++ b/beacon_node/rest_api/src/lib.rs @@ -140,7 +140,7 @@ impl Service for ApiService { into_boxfut(validator::get_new_attestation::(req)) } (&Method::POST, "/beacon/validator/attestation") => { - into_boxfut(helpers::implementation_pending_response(req)) + validator::publish_attestation::(req) } (&Method::GET, "/beacon/state") => into_boxfut(beacon::get_state::(req)), @@ -164,6 +164,7 @@ impl Service for ApiService { (&Method::GET, "/spec/eth2_config") => into_boxfut(spec::get_eth2_config::(req)), (&Method::GET, "/metrics") => into_boxfut(metrics::get_prometheus::(req)), + _ => Box::new(futures::future::err(ApiError::NotFound( "Request path and/or method not found.".to_owned(), ))), diff --git a/beacon_node/rest_api/src/validator.rs b/beacon_node/rest_api/src/validator.rs index 53d9e8b8b8..60c0eed060 100644 --- a/beacon_node/rest_api/src/validator.rs +++ b/beacon_node/rest_api/src/validator.rs @@ -1,10 +1,10 @@ use crate::helpers::{ check_content_type_for_json, get_beacon_chain_from_request, get_logger_from_request, - parse_pubkey, publish_beacon_block_to_network, + parse_pubkey, publish_attestation_to_network, publish_beacon_block_to_network, }; use crate::response_builder::ResponseBuilder; use crate::{ApiError, ApiResult, BoxFut, UrlQuery}; -use beacon_chain::{BeaconChainTypes, BlockProcessingOutcome}; +use beacon_chain::{AttestationProcessingOutcome, BeaconChainTypes, BlockProcessingOutcome}; use bls::{AggregateSignature, PublicKey, Signature}; use futures::future::Future; use futures::stream::Stream; @@ -228,16 +228,16 @@ pub fn publish_beacon_block(req: Request) - publish_beacon_block_to_network::(network_chan, block) } Ok(outcome) => { - warn!(log, "Block could not be processed, but is being sent to the network anyway."; "block_slot" => slot, "outcome" => format!("{:?}", outcome)); - //TODO need to send to network and return http 202 - Err(ApiError::BadRequest(format!( - "The BeaconBlock could not be processed: {:?}", + warn!(log, "BeaconBlock could not be processed, but is being sent to the network anyway."; "outcome" => format!("{:?}", outcome)); + publish_beacon_block_to_network::(network_chan, block)?; + Err(ApiError::ProcessingError(format!( + "The BeaconBlock could not be processed, but has still been published: {:?}", outcome ))) } Err(e) => { Err(ApiError::ServerError(format!( - "Unable to process block: {:?}", + "Error while processing block: {:?}", e ))) } @@ -351,3 +351,61 @@ pub fn get_new_attestation(req: Request) -> ResponseBuilder::new(&req)?.body(&attestation) } + +/// HTTP Handler to publish an Attestation, which has been signed by a validator. +pub fn publish_attestation(req: Request) -> BoxFut { + let _ = try_future!(check_content_type_for_json(&req)); + let log = get_logger_from_request(&req); + let beacon_chain = try_future!(get_beacon_chain_from_request::(&req)); + // Get the network sending channel from the request, for later transmission + let network_chan = req + .extensions() + .get::>>>() + .expect("Should always get the network channel from the request, since we put it in there.") + .clone(); + + let response_builder = ResponseBuilder::new(&req); + + let body = req.into_body(); + trace!( + log, + "Got the request body, now going to parse it into an attesation." + ); + Box::new(body + .concat2() + .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}",e))) + .map(|chunk| chunk.iter().cloned().collect::>()) + .and_then(|chunks| { + serde_json::from_slice(&chunks.as_slice()).map_err(|e| { + ApiError::BadRequest(format!( + "Unable to deserialize JSON into a BeaconBlock: {:?}", + e + )) + }) + }) + .and_then(move |attestation: Attestation| { + match beacon_chain.process_attestation(attestation.clone()) { + Ok(AttestationProcessingOutcome::Processed) => { + // Block was processed, publish via gossipsub + info!(log, "Processed valid attestation from API, transmitting to network."); + publish_attestation_to_network::(network_chan, attestation) + } + Ok(outcome) => { + warn!(log, "Attestation could not be processed, but is being sent to the network anyway."; "outcome" => format!("{:?}", outcome)); + publish_attestation_to_network::(network_chan, attestation)?; + Err(ApiError::ProcessingError(format!( + "The Attestation could not be processed, but has still been published: {:?}", + outcome + ))) + } + Err(e) => { + Err(ApiError::ServerError(format!( + "Error while processing attestation: {:?}", + e + ))) + } + } + }).and_then(|_| { + response_builder?.body_no_ssz(&()) + })) +}