Improve HTTP API error messages + tweaks (#4595)

## Issue Addressed

Closes #3404 (mostly)

## Proposed Changes

- Remove all uses of Warp's `and_then` (which backtracks) in favour of `then` (which doesn't).
- Bump the priority of the `POST` method for `v2/blocks` to `P0`. Publishing a block needs to happen quickly.
- Run the new SSZ POST endpoints on the beacon processor. I think this was missed in between merging #4462 and #4504/#4479.
- Fix a minor issue in the validator registrations endpoint whereby an error from spawning the task on the beacon processor would be dropped.

## Additional Info

I've tested this manually and can confirm that we no longer get the dreaded `Unsupported endpoint version` errors for queries like:

```
$ curl -X POST -H "Content-Type: application/json" --data @block.json "http://localhost:5052/eth/v2/beacon/blocks" | jq
{
  "code": 400,
  "message": "BAD_REQUEST: WeakSubjectivityConflict",
  "stacktraces": []
}
```

```
$ curl -X POST -H "Content-Type: application/octet-stream" --data @block.json "http://localhost:5052/eth/v2/beacon/blocks" | jq
{
  "code": 400,
  "message": "BAD_REQUEST: invalid SSZ: OffsetOutOfBounds(572530811)",
  "stacktraces": []
}
```

```
$ curl "http://localhost:5052/eth/v2/validator/blocks/7067595"
{"code":400,"message":"BAD_REQUEST: invalid query: Invalid query string","stacktraces":[]}
```

However, I can still trigger it by leaving off the `Content-Type`. We can re-test this aspect with #4575.
This commit is contained in:
Michael Sproul
2023-08-14 04:06:37 +00:00
parent 912f869829
commit 249f85f1d9
2 changed files with 227 additions and 227 deletions

View File

@@ -35,6 +35,24 @@ pub struct TaskSpawner<E: EthSpec> {
beacon_processor_send: Option<BeaconProcessorSend<E>>,
}
/// Convert a warp `Rejection` into a `Response`.
///
/// This function should *always* be used to convert rejections into responses. This prevents warp
/// from trying to backtrack in strange ways. See: https://github.com/sigp/lighthouse/issues/3404
pub async fn convert_rejection<T: Reply>(res: Result<T, warp::Rejection>) -> Response {
match res {
Ok(response) => response.into_response(),
Err(e) => match warp_utils::reject::handle_rejection(e).await {
Ok(reply) => reply.into_response(),
Err(_) => warp::reply::with_status(
warp::reply::json(&"unhandled error"),
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response(),
},
}
}
impl<E: EthSpec> TaskSpawner<E> {
pub fn new(beacon_processor_send: Option<BeaconProcessorSend<E>>) -> Self {
Self {
@@ -43,11 +61,7 @@ impl<E: EthSpec> TaskSpawner<E> {
}
/// Executes a "blocking" (non-async) task which returns a `Response`.
pub async fn blocking_response_task<F, T>(
self,
priority: Priority,
func: F,
) -> Result<Response, warp::Rejection>
pub async fn blocking_response_task<F, T>(self, priority: Priority, func: F) -> Response
where
F: FnOnce() -> Result<T, warp::Rejection> + Send + Sync + 'static,
T: Reply + Send + 'static,
@@ -65,31 +79,25 @@ impl<E: EthSpec> TaskSpawner<E> {
};
// Send the function to the beacon processor for execution at some arbitrary time.
match send_to_beacon_processor(
let result = send_to_beacon_processor(
beacon_processor_send,
priority,
BlockingOrAsync::Blocking(Box::new(process_fn)),
rx,
)
.await
{
Ok(result) => result.map(Reply::into_response),
Err(error_response) => Ok(error_response),
}
.and_then(|x| x);
convert_rejection(result).await
} else {
// There is no beacon processor so spawn a task directly on the
// tokio executor.
warp_utils::task::blocking_response_task(func).await
convert_rejection(warp_utils::task::blocking_response_task(func).await).await
}
}
/// Executes a "blocking" (non-async) task which returns a JSON-serializable
/// object.
pub async fn blocking_json_task<F, T>(
self,
priority: Priority,
func: F,
) -> Result<Response, warp::Rejection>
pub async fn blocking_json_task<F, T>(self, priority: Priority, func: F) -> Response
where
F: FnOnce() -> Result<T, warp::Rejection> + Send + Sync + 'static,
T: Serialize + Send + 'static,
@@ -98,11 +106,26 @@ impl<E: EthSpec> TaskSpawner<E> {
self.blocking_response_task(priority, func).await
}
/// Executes an async task which may return a `warp::Rejection`.
/// Executes an async task which may return a `Rejection`, which will be converted to a response.
pub async fn spawn_async_with_rejection(
self,
priority: Priority,
func: impl Future<Output = Result<Response, warp::Rejection>> + Send + Sync + 'static,
) -> Response {
let result = self
.spawn_async_with_rejection_no_conversion(priority, func)
.await;
convert_rejection(result).await
}
/// Same as `spawn_async_with_rejection` but returning a result with the unhandled rejection.
///
/// If you call this function you MUST convert the rejection to a response and not let it
/// propagate into Warp's filters. See `convert_rejection`.
pub async fn spawn_async_with_rejection_no_conversion(
self,
priority: Priority,
func: impl Future<Output = Result<Response, warp::Rejection>> + Send + Sync + 'static,
) -> Result<Response, warp::Rejection> {
if let Some(beacon_processor_send) = &self.beacon_processor_send {
// Create a wrapper future that will execute `func` and send the
@@ -124,18 +147,16 @@ impl<E: EthSpec> TaskSpawner<E> {
rx,
)
.await
.unwrap_or_else(Result::Ok)
.and_then(|x| x)
} else {
// There is no beacon processor so spawn a task directly on the
// tokio executor.
tokio::task::spawn(func).await.unwrap_or_else(|e| {
let response = warp::reply::with_status(
warp::reply::json(&format!("Tokio did not execute task: {e:?}")),
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response();
Ok(response)
})
tokio::task::spawn(func)
.await
.map_err(|_| {
warp_utils::reject::custom_server_error("Tokio failed to spawn task".into())
})
.and_then(|x| x)
}
}
@@ -158,14 +179,14 @@ impl<E: EthSpec> TaskSpawner<E> {
};
// Send the function to the beacon processor for execution at some arbitrary time.
send_to_beacon_processor(
let result = send_to_beacon_processor(
beacon_processor_send,
priority,
BlockingOrAsync::Async(Box::pin(process_fn)),
rx,
)
.await
.unwrap_or_else(|error_response| error_response)
.await;
convert_rejection(result).await
} else {
// There is no beacon processor so spawn a task directly on the
// tokio executor.
@@ -182,14 +203,14 @@ impl<E: EthSpec> TaskSpawner<E> {
/// Send a task to the beacon processor and await execution.
///
/// If the task is not executed, return an `Err(response)` with an error message
/// If the task is not executed, return an `Err` with an error message
/// for the API consumer.
async fn send_to_beacon_processor<E: EthSpec, T>(
beacon_processor_send: &BeaconProcessorSend<E>,
priority: Priority,
process_fn: BlockingOrAsync,
rx: oneshot::Receiver<T>,
) -> Result<T, Response> {
) -> Result<T, warp::Rejection> {
let error_message = match beacon_processor_send.try_send(priority.work_event(process_fn)) {
Ok(()) => {
match rx.await {
@@ -205,10 +226,7 @@ async fn send_to_beacon_processor<E: EthSpec, T>(
Err(TrySendError::Closed(_)) => "The task was dropped. The server is shutting down.",
};
let error_response = warp::reply::with_status(
warp::reply::json(&error_message),
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response();
Err(error_response)
Err(warp_utils::reject::custom_server_error(
error_message.to_string(),
))
}