Handle unknown head during attestation publishing (#5010)

* Handle unknown head during attestation publishing

* Merge remote-tracking branch 'origin/unstable' into queue-http-attestations

* Simplify task spawner

* Improve logging

* Add a test

* Improve error logging

* Merge remote-tracking branch 'origin/unstable' into queue-http-attestations

* Fix beta compiler warnings
This commit is contained in:
Michael Sproul
2024-02-15 23:24:47 +11:00
committed by GitHub
parent 49536ff103
commit f17fb291b7
7 changed files with 455 additions and 140 deletions

View File

@@ -60,11 +60,15 @@ impl<E: EthSpec> TaskSpawner<E> {
}
}
/// Executes a "blocking" (non-async) task which returns a `Response`.
pub async fn blocking_response_task<F, T>(self, priority: Priority, func: F) -> Response
/// Executes a "blocking" (non-async) task which returns an arbitrary value.
pub async fn blocking_task<F, T>(
self,
priority: Priority,
func: F,
) -> Result<T, warp::Rejection>
where
F: FnOnce() -> Result<T, warp::Rejection> + Send + Sync + 'static,
T: Reply + Send + 'static,
T: Send + 'static,
{
if let Some(beacon_processor_send) = &self.beacon_processor_send {
// Create a closure that will execute `func` and send the result to
@@ -79,22 +83,31 @@ impl<E: EthSpec> TaskSpawner<E> {
};
// Send the function to the beacon processor for execution at some arbitrary time.
let result = send_to_beacon_processor(
send_to_beacon_processor(
beacon_processor_send,
priority,
BlockingOrAsync::Blocking(Box::new(process_fn)),
rx,
)
.await
.and_then(|x| x);
convert_rejection(result).await
.and_then(|x| x)
} else {
// There is no beacon processor so spawn a task directly on the
// tokio executor.
convert_rejection(warp_utils::task::blocking_response_task(func).await).await
warp_utils::task::blocking_task(func).await
}
}
/// Executes a "blocking" (non-async) task which returns a `Response`.
pub async fn blocking_response_task<F, T>(self, priority: Priority, func: F) -> Response
where
F: FnOnce() -> Result<T, warp::Rejection> + Send + Sync + 'static,
T: Reply + Send + 'static,
{
let result = self.blocking_task(priority, func).await;
convert_rejection(result).await
}
/// Executes a "blocking" (non-async) task which returns a JSON-serializable
/// object.
pub async fn blocking_json_task<F, T>(self, priority: Priority, func: F) -> Response