diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 87fdbf6f71..c09fd78fec 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -194,6 +194,7 @@ pub struct HeadInfo { pub genesis_time: u64, pub genesis_validators_root: Hash256, pub proposer_shuffling_decision_root: Hash256, + pub is_merge_complete: bool, } pub trait BeaconChainTypes: Send + Sync + 'static { @@ -204,6 +205,19 @@ pub trait BeaconChainTypes: Send + Sync + 'static { type EthSpec: types::EthSpec; } +/// Indicates the status of the `ExecutionLayer`. +#[derive(Debug, PartialEq)] +pub enum ExecutionLayerStatus { + /// The execution layer is synced and reachable. + Ready, + /// The execution layer either syncing or unreachable. + NotReady, + /// The execution layer is required, but has not been enabled. This is a configuration error. + Missing, + /// The execution layer is not yet required, therefore the status is irrelevant. + NotRequired, +} + pub type BeaconForkChoice = ForkChoice< BeaconForkChoiceStore< ::EthSpec, @@ -1001,6 +1015,7 @@ impl BeaconChain { genesis_time: head.beacon_state.genesis_time(), genesis_validators_root: head.beacon_state.genesis_validators_root(), proposer_shuffling_decision_root, + is_merge_complete: is_merge_complete(&head.beacon_state), }) }) } @@ -3405,6 +3420,39 @@ impl BeaconChain { .map_err(Error::ExecutionForkChoiceUpdateFailed) } + /// Indicates the status of the execution layer. + pub async fn execution_layer_status(&self) -> Result { + let epoch = self.epoch()?; + if self.spec.merge_fork_epoch.map_or(true, |fork| epoch < fork) { + return Ok(ExecutionLayerStatus::NotRequired); + } + + if let Some(execution_layer) = &self.execution_layer { + if execution_layer.is_synced().await { + Ok(ExecutionLayerStatus::Ready) + } else { + Ok(ExecutionLayerStatus::NotReady) + } + } else { + // This branch is slightly more restrictive than what is minimally required. + // + // It is possible for a node without an execution layer (EL) to follow the chain + // *after* the merge fork and *before* the terminal execution block, as long as + // that node is not required to produce blocks. + // + // However, here we say that all nodes *must* have an EL as soon as the merge fork + // happens. We do this because it's very difficult to determine that the terminal + // block has been met if we don't already have an EL. As far as we know, the + // terminal execution block might already exist and we've been rejecting it since + // we don't have an EL to verify it. + // + // I think it is very reasonable to say that the beacon chain expects all BNs to + // be paired with an EL node by the time the merge fork epoch is reached. So, we + // enforce that here. + Ok(ExecutionLayerStatus::Missing) + } + } + /// This function takes a configured weak subjectivity `Checkpoint` and the latest finalized `Checkpoint`. /// If the weak subjectivity checkpoint and finalized checkpoint share the same epoch, we compare /// roots. If we the weak subjectivity checkpoint is from an older epoch, we iterate back through diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 19c366572b..d70ab4d477 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -36,7 +36,8 @@ mod validator_pubkey_cache; pub use self::beacon_chain::{ AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult, - ForkChoiceError, HeadInfo, StateSkipConfig, WhenSlotSkipped, MAXIMUM_GOSSIP_CLOCK_DISPARITY, + ExecutionLayerStatus, ForkChoiceError, HeadInfo, StateSkipConfig, WhenSlotSkipped, + MAXIMUM_GOSSIP_CLOCK_DISPARITY, }; pub use self::beacon_snapshot::BeaconSnapshot; pub use self::chain_config::ChainConfig; diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 2bb1fbe6a4..7536818db4 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -647,8 +647,55 @@ where if let Some(beacon_chain) = self.beacon_chain.as_ref() { let state_advance_context = runtime_context.service_context("state_advance".into()); - let log = state_advance_context.log().clone(); - spawn_state_advance_timer(state_advance_context.executor, beacon_chain.clone(), log); + let state_advance_log = state_advance_context.log().clone(); + spawn_state_advance_timer( + state_advance_context.executor, + beacon_chain.clone(), + state_advance_log, + ); + + if let Some(execution_layer) = beacon_chain.execution_layer.as_ref() { + let store = beacon_chain.store.clone(); + let inner_execution_layer = execution_layer.clone(); + + let head = beacon_chain + .head_info() + .map_err(|e| format!("Unable to read beacon chain head: {:?}", e))?; + + // Issue the head to the execution engine on startup. This ensures it can start + // syncing. + if head.is_merge_complete { + let result = runtime_context + .executor + .runtime() + .upgrade() + .ok_or_else(|| "Cannot update engine head, shutting down".to_string())? + .block_on(async move { + BeaconChain::< + Witness, + >::update_execution_engine_forkchoice( + inner_execution_layer, + store, + head.finalized_checkpoint.root, + head.block_root, + ) + .await + }); + + // No need to exit early if setting the head fails. It will be set again if/when the + // node comes online. + if let Err(e) = result { + warn!( + log, + "Failed to update head on execution engines"; + "error" => ?e + ); + } + } + + // Spawn a routine that tracks the status of the execution engines. + execution_layer.spawn_watchdog_routine(beacon_chain.slot_clock.clone()); + } } Ok(Client { diff --git a/beacon_node/execution_layer/Cargo.toml b/beacon_node/execution_layer/Cargo.toml index dbbbfe5ccd..aeeaab67ae 100644 --- a/beacon_node/execution_layer/Cargo.toml +++ b/beacon_node/execution_layer/Cargo.toml @@ -28,3 +28,4 @@ exit-future = "0.2.0" tree_hash = { path = "../../consensus/tree_hash"} tree_hash_derive = { path = "../../consensus/tree_hash_derive"} parking_lot = "0.11.0" +slot_clock = { path = "../../common/slot_clock" } diff --git a/beacon_node/execution_layer/src/engines.rs b/beacon_node/execution_layer/src/engines.rs index 25f2dd323b..c06abd3426 100644 --- a/beacon_node/execution_layer/src/engines.rs +++ b/beacon_node/execution_layer/src/engines.rs @@ -2,32 +2,38 @@ use crate::engine_api::{EngineApi, Error as EngineApiError}; use futures::future::join_all; -use slog::{crit, error, info, warn, Logger}; +use slog::{crit, debug, error, info, warn, Logger}; use std::future::Future; use tokio::sync::RwLock; +use types::Hash256; /// Stores the remembered state of a engine. #[derive(Copy, Clone, PartialEq)] enum EngineState { - Online, + Synced, Offline, + Syncing, } -impl EngineState { - fn set_online(&mut self) { - *self = EngineState::Online - } +#[derive(Copy, Clone, PartialEq, Debug)] +pub struct ForkChoiceHead { + pub head_block_hash: Hash256, + pub finalized_block_hash: Hash256, +} - fn set_offline(&mut self) { - *self = EngineState::Offline - } +/// Used to enable/disable logging on some tasks. +#[derive(Copy, Clone, PartialEq)] +pub enum Logging { + Enabled, + Disabled, +} - fn is_online(&self) -> bool { - *self == EngineState::Online - } - - fn is_offline(&self) -> bool { - *self == EngineState::Offline +impl Logging { + pub fn is_enabled(&self) -> bool { + match self { + Logging::Enabled => true, + Logging::Disabled => false, + } } } @@ -53,6 +59,7 @@ impl Engine { /// manner. pub struct Engines { pub engines: Vec>, + pub latest_head: RwLock>, pub log: Logger, } @@ -63,45 +70,112 @@ pub enum EngineError { } impl Engines { + pub async fn set_latest_head(&self, latest_head: ForkChoiceHead) { + *self.latest_head.write().await = Some(latest_head); + } + + async fn send_latest_head(&self, engine: &Engine) { + let latest_head: Option = *self.latest_head.read().await; + if let Some(head) = latest_head { + info!( + self.log, + "Issuing forkchoiceUpdated"; + "head" => ?head, + "id" => &engine.id, + ); + + if let Err(e) = engine + .api + .forkchoice_updated(head.head_block_hash, head.finalized_block_hash) + .await + { + error!( + self.log, + "Failed to issue latest head to engine"; + "error" => ?e, + "id" => &engine.id, + ); + } + } else { + debug!( + self.log, + "No head, not sending to engine"; + "id" => &engine.id, + ); + } + } + + /// Returns `true` if there is at least one engine with a "synced" status. + pub async fn any_synced(&self) -> bool { + for engine in &self.engines { + if *engine.state.read().await == EngineState::Synced { + return true; + } + } + false + } + /// Run the `EngineApi::upcheck` function on all nodes which are currently offline. /// /// This can be used to try and recover any offline nodes. - async fn upcheck_offline(&self) { + pub async fn upcheck_not_synced(&self, logging: Logging) { let upcheck_futures = self.engines.iter().map(|engine| async move { - let mut state = engine.state.write().await; - if state.is_offline() { + let mut state_lock = engine.state.write().await; + if *state_lock != EngineState::Synced { match engine.api.upcheck().await { Ok(()) => { - info!( - self.log, - "Execution engine online"; - "id" => &engine.id - ); - state.set_online() + if logging.is_enabled() { + info!( + self.log, + "Execution engine online"; + "id" => &engine.id + ); + } + + // Send the node our latest head. + self.send_latest_head(engine).await; + + *state_lock = EngineState::Synced + } + Err(EngineApiError::IsSyncing) => { + if logging.is_enabled() { + warn!( + self.log, + "Execution engine syncing"; + "id" => &engine.id + ) + } + + // Send the node our latest head, it may assist with syncing. + self.send_latest_head(engine).await; + + *state_lock = EngineState::Syncing } Err(e) => { - warn!( - self.log, - "Execution engine offline"; - "error" => ?e, - "id" => &engine.id - ) + if logging.is_enabled() { + warn!( + self.log, + "Execution engine offline"; + "error" => ?e, + "id" => &engine.id + ) + } } } } - *state + *state_lock }); - let num_online = join_all(upcheck_futures) + let num_synced = join_all(upcheck_futures) .await .into_iter() - .filter(|state: &EngineState| state.is_online()) + .filter(|state: &EngineState| *state == EngineState::Synced) .count(); - if num_online == 0 { + if num_synced == 0 && logging.is_enabled() { crit!( self.log, - "No execution engines online"; + "No synced execution engines"; ) } } @@ -120,7 +194,7 @@ impl Engines { Ok(result) => Ok(result), Err(mut first_errors) => { // Try to recover some nodes. - self.upcheck_offline().await; + self.upcheck_not_synced(Logging::Enabled).await; // Retry the call on all nodes. match self.first_success_without_retry(func).await { Ok(result) => Ok(result), @@ -146,8 +220,8 @@ impl Engines { let mut errors = vec![]; for engine in &self.engines { - let engine_online = engine.state.read().await.is_online(); - if engine_online { + let engine_synced = *engine.state.read().await == EngineState::Synced; + if engine_synced { match func(engine).await { Ok(result) => return Ok(result), Err(error) => { @@ -157,7 +231,7 @@ impl Engines { "error" => ?error, "id" => &engine.id ); - engine.state.write().await.set_offline(); + *engine.state.write().await = EngineState::Offline; errors.push(EngineError::Api { id: engine.id.clone(), error, @@ -174,7 +248,8 @@ impl Engines { Err(errors) } - /// Runs `func` on all nodes concurrently, returning all results. + /// Runs `func` on all nodes concurrently, returning all results. Any nodes that are offline + /// will be ignored, however all synced or unsynced nodes will receive the broadcast. /// /// This function might try to run `func` twice. If all nodes return an error on the first time /// it runs, it will try to upcheck all offline nodes and then run the function again. @@ -195,7 +270,7 @@ impl Engines { } if any_offline { - self.upcheck_offline().await; + self.upcheck_not_synced(Logging::Enabled).await; self.broadcast_without_retry(func).await } else { first_results @@ -213,8 +288,8 @@ impl Engines { { let func = &func; let futures = self.engines.iter().map(|engine| async move { - let engine_online = engine.state.read().await.is_online(); - if engine_online { + let is_offline = *engine.state.read().await == EngineState::Offline; + if !is_offline { func(engine).await.map_err(|error| { error!( self.log, diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index ad76be882c..f5ea686779 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -5,14 +5,19 @@ //! deposit-contract functionality that the `beacon_node/eth1` crate already provides. use engine_api::{Error as ApiError, *}; -use engines::{Engine, EngineError, Engines}; +use engines::{Engine, EngineError, Engines, ForkChoiceHead, Logging}; use lru::LruCache; use sensitive_url::SensitiveUrl; -use slog::{crit, info, Logger}; +use slog::{crit, error, info, Logger}; +use slot_clock::SlotClock; use std::future::Future; use std::sync::Arc; +use std::time::Duration; use task_executor::TaskExecutor; -use tokio::sync::{Mutex, MutexGuard}; +use tokio::{ + sync::{Mutex, MutexGuard}, + time::{sleep, sleep_until, Instant}, +}; pub use engine_api::{http::HttpJsonRpc, ConsensusStatus, ExecutePayloadResponse}; pub use execute_payload_handle::ExecutePayloadHandle; @@ -92,6 +97,7 @@ impl ExecutionLayer { let inner = Inner { engines: Engines { engines, + latest_head: <_>::default(), log: log.clone(), }, terminal_total_difficulty, @@ -164,6 +170,72 @@ impl ExecutionLayer { self.executor().spawn(generate_future(self.clone()), name); } + /// Spawns a routine which attempts to keep the execution engines online. + pub fn spawn_watchdog_routine(&self, slot_clock: S) { + let watchdog = |el: ExecutionLayer| async move { + // Run one task immediately. + el.watchdog_task().await; + + let recurring_task = + |el: ExecutionLayer, now: Instant, duration_to_next_slot: Duration| async move { + // We run the task three times per slot. + // + // The interval between each task is 1/3rd of the slot duration. This matches nicely + // with the attestation production times (unagg. at 1/3rd, agg at 2/3rd). + // + // Each task is offset by 3/4ths of the interval. + // + // On mainnet, this means we will run tasks at: + // + // - 3s after slot start: 1s before publishing unaggregated attestations. + // - 7s after slot start: 1s before publishing aggregated attestations. + // - 11s after slot start: 1s before the next slot starts. + let interval = duration_to_next_slot / 3; + let offset = (interval / 4) * 3; + + let first_execution = duration_to_next_slot + offset; + let second_execution = first_execution + interval; + let third_execution = second_execution + interval; + + sleep_until(now + first_execution).await; + el.engines().upcheck_not_synced(Logging::Disabled).await; + + sleep_until(now + second_execution).await; + el.engines().upcheck_not_synced(Logging::Disabled).await; + + sleep_until(now + third_execution).await; + el.engines().upcheck_not_synced(Logging::Disabled).await; + }; + + // Start the loop to periodically update. + loop { + if let Some(duration) = slot_clock.duration_to_next_slot() { + let now = Instant::now(); + + // Spawn a new task rather than waiting for this to finish. This ensure that a + // slow run doesn't prevent the next run from starting. + el.spawn(|el| recurring_task(el, now, duration), "exec_watchdog_task"); + } else { + error!(el.log(), "Failed to spawn watchdog task"); + } + sleep(slot_clock.slot_duration()).await; + } + }; + + self.spawn(watchdog, "exec_watchdog"); + } + + /// Performs a single execution of the watchdog routine. + async fn watchdog_task(&self) { + // Disable logging since this runs frequently and may get annoying. + self.engines().upcheck_not_synced(Logging::Disabled).await; + } + + /// Returns `true` if there is at least one synced and reachable engine. + pub async fn is_synced(&self) -> bool { + self.engines().any_synced().await + } + /// Maps to the `engine_preparePayload` JSON-RPC function. /// /// ## Fallback Behavior @@ -364,6 +436,16 @@ impl ExecutionLayer { "finalized_block_hash" => ?finalized_block_hash, "head_block_hash" => ?head_block_hash, ); + + // Update the cached version of the latest head so it can be sent to new or reconnecting + // execution nodes. + self.engines() + .set_latest_head(ForkChoiceHead { + head_block_hash, + finalized_block_hash, + }) + .await; + let broadcast_results = self .engines() .broadcast(|engine| { diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 35a22afc4d..c22419ffae 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -20,7 +20,7 @@ use beacon_chain::{ observed_operations::ObservationOutcome, validator_monitor::{get_block_delay_ms, timestamp_now}, AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, - WhenSlotSkipped, + ExecutionLayerStatus, WhenSlotSkipped, }; use block_id::BlockId; use eth2::types::{self as api_types, EndpointVersion, ValidatorId}; @@ -340,7 +340,7 @@ pub fn serve( } }); - // Create a `warp` filter that rejects request whilst the node is syncing. + // Create a `warp` filter that rejects requests whilst the node is syncing. let not_while_syncing_filter = warp::any() .and(network_globals.clone()) @@ -385,6 +385,28 @@ pub fn serve( ) .untuple_one(); + // Create a `warp` filter that rejects requests unless the execution layer (EL) is ready. + let only_while_el_is_ready = warp::any() + .and(chain_filter.clone()) + .and_then(move |chain: Arc>| async move { + let status = chain.execution_layer_status().await.map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "failed to read execution engine status: {:?}", + e + )) + })?; + match status { + ExecutionLayerStatus::Ready | ExecutionLayerStatus::NotRequired => Ok(()), + ExecutionLayerStatus::NotReady => Err(warp_utils::reject::custom_server_error( + "execution engine(s) not ready".to_string(), + )), + ExecutionLayerStatus::Missing => Err(warp_utils::reject::custom_server_error( + "no execution engines configured".to_string(), + )), + } + }) + .untuple_one(); + // Create a `warp` filter that provides access to the logger. let inner_ctx = ctx.clone(); let log_filter = warp::any().map(move || inner_ctx.log.clone()); @@ -1081,6 +1103,7 @@ pub fn serve( .and(warp::body::json()) .and(network_tx_filter.clone()) .and(log_filter.clone()) + .and(only_while_el_is_ready.clone()) .and_then( |chain: Arc>, attestations: Vec>, @@ -1378,6 +1401,7 @@ pub fn serve( .and(warp::body::json()) .and(network_tx_filter.clone()) .and(log_filter.clone()) + .and(only_while_el_is_ready.clone()) .and_then( |chain: Arc>, signatures: Vec, @@ -1807,6 +1831,7 @@ pub fn serve( })) .and(warp::path::end()) .and(not_while_syncing_filter.clone()) + .and(only_while_el_is_ready.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) .and_then(|epoch: Epoch, chain: Arc>, log: Logger| { @@ -1824,6 +1849,7 @@ pub fn serve( })) .and(warp::path::end()) .and(not_while_syncing_filter.clone()) + .and(only_while_el_is_ready.clone()) .and(warp::query::()) .and(chain_filter.clone()) .and_then( @@ -1858,6 +1884,7 @@ pub fn serve( .and(warp::path::end()) .and(warp::query::()) .and(not_while_syncing_filter.clone()) + .and(only_while_el_is_ready.clone()) .and(chain_filter.clone()) .and_then( |query: api_types::ValidatorAttestationDataQuery, chain: Arc>| { @@ -1890,6 +1917,7 @@ pub fn serve( .and(warp::path::end()) .and(warp::query::()) .and(not_while_syncing_filter.clone()) + .and(only_while_el_is_ready.clone()) .and(chain_filter.clone()) .and_then( |query: api_types::ValidatorAggregateAttestationQuery, chain: Arc>| { @@ -1921,6 +1949,7 @@ pub fn serve( })) .and(warp::path::end()) .and(not_while_syncing_filter.clone()) + .and(only_while_el_is_ready.clone()) .and(warp::body::json()) .and(chain_filter.clone()) .and_then( @@ -1943,6 +1972,7 @@ pub fn serve( })) .and(warp::path::end()) .and(not_while_syncing_filter.clone()) + .and(only_while_el_is_ready.clone()) .and(warp::body::json()) .and(chain_filter.clone()) .and_then( @@ -1960,6 +1990,7 @@ pub fn serve( .and(warp::path::end()) .and(warp::query::()) .and(not_while_syncing_filter.clone()) + .and(only_while_el_is_ready.clone()) .and(chain_filter.clone()) .and_then( |sync_committee_data: SyncContributionData, chain: Arc>| { @@ -1982,6 +2013,7 @@ pub fn serve( .and(warp::path("aggregate_and_proofs")) .and(warp::path::end()) .and(not_while_syncing_filter.clone()) + .and(only_while_el_is_ready.clone()) .and(chain_filter.clone()) .and(warp::body::json()) .and(network_tx_filter.clone()) @@ -2082,6 +2114,7 @@ pub fn serve( .and(warp::path("contribution_and_proofs")) .and(warp::path::end()) .and(not_while_syncing_filter.clone()) + .and(only_while_el_is_ready) .and(chain_filter.clone()) .and(warp::body::json()) .and(network_tx_filter.clone())