Shift HTTP server heavy-lifting to blocking executor (#1518)

## Issue Addressed

NA

## Proposed Changes

Shift practically all HTTP endpoint handlers to the blocking executor (some very light tasks are left on the core executor).

## Additional Info

This PR covers the `rest_api` which will soon be refactored to suit the standard API. As such, I've cut a few corners and left some existing issues open in this patch. What I have done here should leave the API in state that is not necessary *exactly* the same, but good enough for us to run validators with. Specifically, the number of blocking workers that can be spawned is unbounded and I have not implemented a queue; this will need to be fixed when we implement the standard API.
This commit is contained in:
Paul Hauner
2020-08-24 03:06:10 +00:00
parent 2bc9115a94
commit c895dc8971
22 changed files with 828 additions and 906 deletions

View File

@@ -14,6 +14,13 @@ state_processing = { path = "../../consensus/state_processing" }
bls = { path = "../../crypto/bls" }
serde = { version = "1.0.110", features = ["derive"] }
rayon = "1.3.0"
hyper = "0.13.5"
tokio = { version = "0.2.21", features = ["sync"] }
environment = { path = "../../lighthouse/environment" }
store = { path = "../../beacon_node/store" }
beacon_chain = { path = "../../beacon_node/beacon_chain" }
serde_json = "1.0.52"
serde_yaml = "0.8.11"
[target.'cfg(target_os = "linux")'.dependencies]
psutil = "3.1.0"

View File

@@ -0,0 +1,99 @@
use hyper::{Body, Response, StatusCode};
use std::error::Error as StdError;
#[derive(PartialEq, Debug, Clone)]
pub enum ApiError {
MethodNotAllowed(String),
ServerError(String),
NotImplemented(String),
BadRequest(String),
NotFound(String),
UnsupportedType(String),
ImATeapot(String), // Just in case.
ProcessingError(String), // A 202 error, for when a block/attestation cannot be processed, but still transmitted.
InvalidHeaderValue(String),
}
pub type ApiResult = Result<Response<Body>, ApiError>;
impl ApiError {
pub fn status_code(self) -> (StatusCode, String) {
match self {
ApiError::MethodNotAllowed(desc) => (StatusCode::METHOD_NOT_ALLOWED, desc),
ApiError::ServerError(desc) => (StatusCode::INTERNAL_SERVER_ERROR, desc),
ApiError::NotImplemented(desc) => (StatusCode::NOT_IMPLEMENTED, desc),
ApiError::BadRequest(desc) => (StatusCode::BAD_REQUEST, desc),
ApiError::NotFound(desc) => (StatusCode::NOT_FOUND, desc),
ApiError::UnsupportedType(desc) => (StatusCode::UNSUPPORTED_MEDIA_TYPE, desc),
ApiError::ImATeapot(desc) => (StatusCode::IM_A_TEAPOT, desc),
ApiError::ProcessingError(desc) => (StatusCode::ACCEPTED, desc),
ApiError::InvalidHeaderValue(desc) => (StatusCode::INTERNAL_SERVER_ERROR, desc),
}
}
}
impl Into<Response<Body>> for ApiError {
fn into(self) -> Response<Body> {
let (status_code, desc) = self.status_code();
Response::builder()
.status(status_code)
.header("content-type", "text/plain; charset=utf-8")
.body(Body::from(desc))
.expect("Response should always be created.")
}
}
impl From<store::Error> for ApiError {
fn from(e: store::Error) -> ApiError {
ApiError::ServerError(format!("Database error: {:?}", e))
}
}
impl From<types::BeaconStateError> for ApiError {
fn from(e: types::BeaconStateError) -> ApiError {
ApiError::ServerError(format!("BeaconState error: {:?}", e))
}
}
impl From<beacon_chain::BeaconChainError> for ApiError {
fn from(e: beacon_chain::BeaconChainError) -> ApiError {
ApiError::ServerError(format!("BeaconChainError error: {:?}", e))
}
}
impl From<state_processing::per_slot_processing::Error> for ApiError {
fn from(e: state_processing::per_slot_processing::Error) -> ApiError {
ApiError::ServerError(format!("PerSlotProcessing error: {:?}", e))
}
}
impl From<hyper::error::Error> for ApiError {
fn from(e: hyper::error::Error) -> ApiError {
ApiError::ServerError(format!("Networking error: {:?}", e))
}
}
impl From<std::io::Error> for ApiError {
fn from(e: std::io::Error) -> ApiError {
ApiError::ServerError(format!("IO error: {:?}", e))
}
}
impl From<hyper::header::InvalidHeaderValue> for ApiError {
fn from(e: hyper::header::InvalidHeaderValue) -> ApiError {
ApiError::InvalidHeaderValue(format!("Invalid CORS header value: {:?}", e))
}
}
impl StdError for ApiError {
fn cause(&self) -> Option<&dyn StdError> {
None
}
}
impl std::fmt::Display for ApiError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let status = self.clone().status_code();
write!(f, "{:?}: {:?}", status.0, status.1)
}
}

View File

@@ -0,0 +1,247 @@
use crate::{ApiError, ApiResult};
use environment::TaskExecutor;
use hyper::header;
use hyper::{Body, Request, Response, StatusCode};
use serde::Deserialize;
use serde::Serialize;
use ssz::Encode;
/// Defines the encoding for the API.
#[derive(Clone, Serialize, Deserialize, Copy)]
pub enum ApiEncodingFormat {
JSON,
YAML,
SSZ,
}
impl ApiEncodingFormat {
pub fn get_content_type(&self) -> &str {
match self {
ApiEncodingFormat::JSON => "application/json",
ApiEncodingFormat::YAML => "application/yaml",
ApiEncodingFormat::SSZ => "application/ssz",
}
}
}
impl From<&str> for ApiEncodingFormat {
fn from(f: &str) -> ApiEncodingFormat {
match f {
"application/yaml" => ApiEncodingFormat::YAML,
"application/ssz" => ApiEncodingFormat::SSZ,
_ => ApiEncodingFormat::JSON,
}
}
}
/// Provides a HTTP request handler with Lighthouse-specific functionality.
pub struct Handler<T> {
executor: TaskExecutor,
req: Request<()>,
body: Body,
ctx: T,
encoding: ApiEncodingFormat,
allow_body: bool,
}
impl<T: Clone + Send + Sync + 'static> Handler<T> {
/// Start handling a new request.
pub fn new(req: Request<Body>, ctx: T, executor: TaskExecutor) -> Result<Self, ApiError> {
let (req_parts, body) = req.into_parts();
let req = Request::from_parts(req_parts, ());
let accept_header: String = req
.headers()
.get(header::ACCEPT)
.map_or(Ok(""), |h| h.to_str())
.map_err(|e| {
ApiError::BadRequest(format!(
"The Accept header contains invalid characters: {:?}",
e
))
})
.map(String::from)?;
Ok(Self {
executor,
req,
body,
ctx,
allow_body: false,
encoding: ApiEncodingFormat::from(accept_header.as_str()),
})
}
/// The default behaviour is to return an error if any body is supplied in the request. Calling
/// this function disables that error.
pub fn allow_body(mut self) -> Self {
self.allow_body = true;
self
}
/// Return a simple static value.
///
/// Does not use the blocking executor.
pub async fn static_value<V>(self, value: V) -> Result<HandledRequest<V>, ApiError> {
// Always check and disallow a body for a static value.
let _ = Self::get_body(self.body, false).await?;
Ok(HandledRequest {
value,
encoding: self.encoding,
})
}
/// Calls `func` in-line, on the core executor.
///
/// This should only be used for very fast tasks.
pub async fn in_core_task<F, V>(self, func: F) -> Result<HandledRequest<V>, ApiError>
where
V: Send + Sync + 'static,
F: Fn(Request<Vec<u8>>, T) -> Result<V, ApiError> + Send + Sync + 'static,
{
let body = Self::get_body(self.body, self.allow_body).await?;
let (req_parts, _) = self.req.into_parts();
let req = Request::from_parts(req_parts, body);
let value = func(req, self.ctx)?;
Ok(HandledRequest {
value,
encoding: self.encoding,
})
}
/// Spawns `func` on the blocking executor.
///
/// This method is suitable for handling long-running or intensive tasks.
pub async fn in_blocking_task<F, V>(self, func: F) -> Result<HandledRequest<V>, ApiError>
where
V: Send + Sync + 'static,
F: Fn(Request<Vec<u8>>, T) -> Result<V, ApiError> + Send + Sync + 'static,
{
let ctx = self.ctx;
let body = Self::get_body(self.body, self.allow_body).await?;
let (req_parts, _) = self.req.into_parts();
let req = Request::from_parts(req_parts, body);
let value = self
.executor
.clone()
.handle
.spawn_blocking(move || func(req, ctx))
.await
.map_err(|e| {
ApiError::ServerError(format!(
"Failed to get blocking join handle: {}",
e.to_string()
))
})??;
Ok(HandledRequest {
value,
encoding: self.encoding,
})
}
/// Call `func`, then return a response that is suitable for an SSE stream.
pub async fn sse_stream<F>(self, func: F) -> ApiResult
where
F: Fn(Request<()>, T) -> Result<Body, ApiError>,
{
let body = func(self.req, self.ctx)?;
Response::builder()
.status(200)
.header("Content-Type", "text/event-stream")
.header("Connection", "Keep-Alive")
.header("Cache-Control", "no-cache")
.header("Access-Control-Allow-Origin", "*")
.body(body)
.map_err(|e| ApiError::ServerError(format!("Failed to build response: {:?}", e)))
}
/// Downloads the bytes for `body`.
async fn get_body(body: Body, allow_body: bool) -> Result<Vec<u8>, ApiError> {
let bytes = hyper::body::to_bytes(body)
.await
.map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?;
if !allow_body && !bytes[..].is_empty() {
Err(ApiError::BadRequest(
"The request body must be empty".to_string(),
))
} else {
Ok(bytes.into_iter().collect())
}
}
}
/// A request that has been "handled" and now a result (`value`) needs to be serialize and
/// returned.
pub struct HandledRequest<V> {
encoding: ApiEncodingFormat,
value: V,
}
impl HandledRequest<String> {
/// Simple encode a string as utf-8.
pub fn text_encoding(self) -> ApiResult {
Response::builder()
.status(StatusCode::OK)
.header("content-type", "text/plain; charset=utf-8")
.body(Body::from(self.value))
.map_err(|e| ApiError::ServerError(format!("Failed to build response: {:?}", e)))
}
}
impl<V: Serialize + Encode> HandledRequest<V> {
/// Suitable for all items which implement `serde` and `ssz`.
pub fn all_encodings(self) -> ApiResult {
match self.encoding {
ApiEncodingFormat::SSZ => Response::builder()
.status(StatusCode::OK)
.header("content-type", "application/ssz")
.body(Body::from(self.value.as_ssz_bytes()))
.map_err(|e| ApiError::ServerError(format!("Failed to build response: {:?}", e))),
_ => self.serde_encodings(),
}
}
}
impl<V: Serialize> HandledRequest<V> {
/// Suitable for items which only implement `serde`.
pub fn serde_encodings(self) -> ApiResult {
let (body, content_type) = match self.encoding {
ApiEncodingFormat::JSON => (
Body::from(serde_json::to_string(&self.value).map_err(|e| {
ApiError::ServerError(format!(
"Unable to serialize response body as JSON: {:?}",
e
))
})?),
"application/json",
),
ApiEncodingFormat::SSZ => {
return Err(ApiError::UnsupportedType(
"Response cannot be encoded as SSZ.".into(),
));
}
ApiEncodingFormat::YAML => (
Body::from(serde_yaml::to_string(&self.value).map_err(|e| {
ApiError::ServerError(format!(
"Unable to serialize response body as YAML: {:?}",
e
))
})?),
"application/yaml",
),
};
Response::builder()
.status(StatusCode::OK)
.header("content-type", content_type)
.body(body)
.map_err(|e| ApiError::ServerError(format!("Failed to build response: {:?}", e)))
}
}

View File

@@ -2,20 +2,21 @@
//!
//! This is primarily used by the validator client and the beacon node rest API.
mod api_error;
mod beacon;
mod consensus;
mod handler;
mod node;
mod validator;
pub use api_error::{ApiError, ApiResult};
pub use beacon::{
BlockResponse, CanonicalHeadResponse, Committee, HeadBeaconBlock, StateResponse,
ValidatorRequest, ValidatorResponse,
};
pub use consensus::{IndividualVote, IndividualVotesRequest, IndividualVotesResponse};
pub use handler::{ApiEncodingFormat, Handler};
pub use node::{Health, SyncingResponse, SyncingStatus};
pub use validator::{
ValidatorDutiesRequest, ValidatorDuty, ValidatorDutyBytes, ValidatorSubscription,
};
pub use consensus::{IndividualVote, IndividualVotesRequest, IndividualVotesResponse};
pub use node::{Health, SyncingResponse, SyncingStatus};