[Merge] Block validator duties when EL is not ready (#2672)

* Reject some HTTP endpoints when EL is not ready

* Restrict more endpoints

* Add watchdog task

* Change scheduling

* Update to new schedule

* Add "syncing" concept

* Remove RequireSynced

* Add is_merge_complete to head_info

* Cache latest_head in Engines

* Call consensus_forkchoiceUpdate on startup
This commit is contained in:
Paul Hauner
2021-10-06 21:21:21 +11:00
parent d6fda44620
commit 35350dff75
7 changed files with 338 additions and 51 deletions

View File

@@ -28,3 +28,4 @@ exit-future = "0.2.0"
tree_hash = { path = "../../consensus/tree_hash"}
tree_hash_derive = { path = "../../consensus/tree_hash_derive"}
parking_lot = "0.11.0"
slot_clock = { path = "../../common/slot_clock" }

View File

@@ -2,32 +2,38 @@
use crate::engine_api::{EngineApi, Error as EngineApiError};
use futures::future::join_all;
use slog::{crit, error, info, warn, Logger};
use slog::{crit, debug, error, info, warn, Logger};
use std::future::Future;
use tokio::sync::RwLock;
use types::Hash256;
/// Stores the remembered state of a engine.
#[derive(Copy, Clone, PartialEq)]
enum EngineState {
Online,
Synced,
Offline,
Syncing,
}
impl EngineState {
fn set_online(&mut self) {
*self = EngineState::Online
}
#[derive(Copy, Clone, PartialEq, Debug)]
pub struct ForkChoiceHead {
pub head_block_hash: Hash256,
pub finalized_block_hash: Hash256,
}
fn set_offline(&mut self) {
*self = EngineState::Offline
}
/// Used to enable/disable logging on some tasks.
#[derive(Copy, Clone, PartialEq)]
pub enum Logging {
Enabled,
Disabled,
}
fn is_online(&self) -> bool {
*self == EngineState::Online
}
fn is_offline(&self) -> bool {
*self == EngineState::Offline
impl Logging {
pub fn is_enabled(&self) -> bool {
match self {
Logging::Enabled => true,
Logging::Disabled => false,
}
}
}
@@ -53,6 +59,7 @@ impl<T> Engine<T> {
/// manner.
pub struct Engines<T> {
pub engines: Vec<Engine<T>>,
pub latest_head: RwLock<Option<ForkChoiceHead>>,
pub log: Logger,
}
@@ -63,45 +70,112 @@ pub enum EngineError {
}
impl<T: EngineApi> Engines<T> {
pub async fn set_latest_head(&self, latest_head: ForkChoiceHead) {
*self.latest_head.write().await = Some(latest_head);
}
async fn send_latest_head(&self, engine: &Engine<T>) {
let latest_head: Option<ForkChoiceHead> = *self.latest_head.read().await;
if let Some(head) = latest_head {
info!(
self.log,
"Issuing forkchoiceUpdated";
"head" => ?head,
"id" => &engine.id,
);
if let Err(e) = engine
.api
.forkchoice_updated(head.head_block_hash, head.finalized_block_hash)
.await
{
error!(
self.log,
"Failed to issue latest head to engine";
"error" => ?e,
"id" => &engine.id,
);
}
} else {
debug!(
self.log,
"No head, not sending to engine";
"id" => &engine.id,
);
}
}
/// Returns `true` if there is at least one engine with a "synced" status.
pub async fn any_synced(&self) -> bool {
for engine in &self.engines {
if *engine.state.read().await == EngineState::Synced {
return true;
}
}
false
}
/// Run the `EngineApi::upcheck` function on all nodes which are currently offline.
///
/// This can be used to try and recover any offline nodes.
async fn upcheck_offline(&self) {
pub async fn upcheck_not_synced(&self, logging: Logging) {
let upcheck_futures = self.engines.iter().map(|engine| async move {
let mut state = engine.state.write().await;
if state.is_offline() {
let mut state_lock = engine.state.write().await;
if *state_lock != EngineState::Synced {
match engine.api.upcheck().await {
Ok(()) => {
info!(
self.log,
"Execution engine online";
"id" => &engine.id
);
state.set_online()
if logging.is_enabled() {
info!(
self.log,
"Execution engine online";
"id" => &engine.id
);
}
// Send the node our latest head.
self.send_latest_head(engine).await;
*state_lock = EngineState::Synced
}
Err(EngineApiError::IsSyncing) => {
if logging.is_enabled() {
warn!(
self.log,
"Execution engine syncing";
"id" => &engine.id
)
}
// Send the node our latest head, it may assist with syncing.
self.send_latest_head(engine).await;
*state_lock = EngineState::Syncing
}
Err(e) => {
warn!(
self.log,
"Execution engine offline";
"error" => ?e,
"id" => &engine.id
)
if logging.is_enabled() {
warn!(
self.log,
"Execution engine offline";
"error" => ?e,
"id" => &engine.id
)
}
}
}
}
*state
*state_lock
});
let num_online = join_all(upcheck_futures)
let num_synced = join_all(upcheck_futures)
.await
.into_iter()
.filter(|state: &EngineState| state.is_online())
.filter(|state: &EngineState| *state == EngineState::Synced)
.count();
if num_online == 0 {
if num_synced == 0 && logging.is_enabled() {
crit!(
self.log,
"No execution engines online";
"No synced execution engines";
)
}
}
@@ -120,7 +194,7 @@ impl<T: EngineApi> Engines<T> {
Ok(result) => Ok(result),
Err(mut first_errors) => {
// Try to recover some nodes.
self.upcheck_offline().await;
self.upcheck_not_synced(Logging::Enabled).await;
// Retry the call on all nodes.
match self.first_success_without_retry(func).await {
Ok(result) => Ok(result),
@@ -146,8 +220,8 @@ impl<T: EngineApi> Engines<T> {
let mut errors = vec![];
for engine in &self.engines {
let engine_online = engine.state.read().await.is_online();
if engine_online {
let engine_synced = *engine.state.read().await == EngineState::Synced;
if engine_synced {
match func(engine).await {
Ok(result) => return Ok(result),
Err(error) => {
@@ -157,7 +231,7 @@ impl<T: EngineApi> Engines<T> {
"error" => ?error,
"id" => &engine.id
);
engine.state.write().await.set_offline();
*engine.state.write().await = EngineState::Offline;
errors.push(EngineError::Api {
id: engine.id.clone(),
error,
@@ -174,7 +248,8 @@ impl<T: EngineApi> Engines<T> {
Err(errors)
}
/// Runs `func` on all nodes concurrently, returning all results.
/// Runs `func` on all nodes concurrently, returning all results. Any nodes that are offline
/// will be ignored, however all synced or unsynced nodes will receive the broadcast.
///
/// This function might try to run `func` twice. If all nodes return an error on the first time
/// it runs, it will try to upcheck all offline nodes and then run the function again.
@@ -195,7 +270,7 @@ impl<T: EngineApi> Engines<T> {
}
if any_offline {
self.upcheck_offline().await;
self.upcheck_not_synced(Logging::Enabled).await;
self.broadcast_without_retry(func).await
} else {
first_results
@@ -213,8 +288,8 @@ impl<T: EngineApi> Engines<T> {
{
let func = &func;
let futures = self.engines.iter().map(|engine| async move {
let engine_online = engine.state.read().await.is_online();
if engine_online {
let is_offline = *engine.state.read().await == EngineState::Offline;
if !is_offline {
func(engine).await.map_err(|error| {
error!(
self.log,

View File

@@ -5,14 +5,19 @@
//! deposit-contract functionality that the `beacon_node/eth1` crate already provides.
use engine_api::{Error as ApiError, *};
use engines::{Engine, EngineError, Engines};
use engines::{Engine, EngineError, Engines, ForkChoiceHead, Logging};
use lru::LruCache;
use sensitive_url::SensitiveUrl;
use slog::{crit, info, Logger};
use slog::{crit, error, info, Logger};
use slot_clock::SlotClock;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::{Mutex, MutexGuard};
use tokio::{
sync::{Mutex, MutexGuard},
time::{sleep, sleep_until, Instant},
};
pub use engine_api::{http::HttpJsonRpc, ConsensusStatus, ExecutePayloadResponse};
pub use execute_payload_handle::ExecutePayloadHandle;
@@ -92,6 +97,7 @@ impl ExecutionLayer {
let inner = Inner {
engines: Engines {
engines,
latest_head: <_>::default(),
log: log.clone(),
},
terminal_total_difficulty,
@@ -164,6 +170,72 @@ impl ExecutionLayer {
self.executor().spawn(generate_future(self.clone()), name);
}
/// Spawns a routine which attempts to keep the execution engines online.
pub fn spawn_watchdog_routine<S: SlotClock + 'static>(&self, slot_clock: S) {
let watchdog = |el: ExecutionLayer| async move {
// Run one task immediately.
el.watchdog_task().await;
let recurring_task =
|el: ExecutionLayer, now: Instant, duration_to_next_slot: Duration| async move {
// We run the task three times per slot.
//
// The interval between each task is 1/3rd of the slot duration. This matches nicely
// with the attestation production times (unagg. at 1/3rd, agg at 2/3rd).
//
// Each task is offset by 3/4ths of the interval.
//
// On mainnet, this means we will run tasks at:
//
// - 3s after slot start: 1s before publishing unaggregated attestations.
// - 7s after slot start: 1s before publishing aggregated attestations.
// - 11s after slot start: 1s before the next slot starts.
let interval = duration_to_next_slot / 3;
let offset = (interval / 4) * 3;
let first_execution = duration_to_next_slot + offset;
let second_execution = first_execution + interval;
let third_execution = second_execution + interval;
sleep_until(now + first_execution).await;
el.engines().upcheck_not_synced(Logging::Disabled).await;
sleep_until(now + second_execution).await;
el.engines().upcheck_not_synced(Logging::Disabled).await;
sleep_until(now + third_execution).await;
el.engines().upcheck_not_synced(Logging::Disabled).await;
};
// Start the loop to periodically update.
loop {
if let Some(duration) = slot_clock.duration_to_next_slot() {
let now = Instant::now();
// Spawn a new task rather than waiting for this to finish. This ensure that a
// slow run doesn't prevent the next run from starting.
el.spawn(|el| recurring_task(el, now, duration), "exec_watchdog_task");
} else {
error!(el.log(), "Failed to spawn watchdog task");
}
sleep(slot_clock.slot_duration()).await;
}
};
self.spawn(watchdog, "exec_watchdog");
}
/// Performs a single execution of the watchdog routine.
async fn watchdog_task(&self) {
// Disable logging since this runs frequently and may get annoying.
self.engines().upcheck_not_synced(Logging::Disabled).await;
}
/// Returns `true` if there is at least one synced and reachable engine.
pub async fn is_synced(&self) -> bool {
self.engines().any_synced().await
}
/// Maps to the `engine_preparePayload` JSON-RPC function.
///
/// ## Fallback Behavior
@@ -364,6 +436,16 @@ impl ExecutionLayer {
"finalized_block_hash" => ?finalized_block_hash,
"head_block_hash" => ?head_block_hash,
);
// Update the cached version of the latest head so it can be sent to new or reconnecting
// execution nodes.
self.engines()
.set_latest_head(ForkChoiceHead {
head_block_hash,
finalized_block_hash,
})
.await;
let broadcast_results = self
.engines()
.broadcast(|engine| {