diff --git a/Cargo.lock b/Cargo.lock index 6b0edcae92..ab75fe2aed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2604,6 +2604,7 @@ dependencies = [ "lru", "network", "parking_lot 0.12.1", + "proto_array", "safe_arith", "sensitive_url", "serde", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 78e846e74a..c18f4a7374 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1289,23 +1289,28 @@ impl BeaconChain { epoch: Epoch, head_block_root: Hash256, ) -> Result<(Vec>, Hash256, ExecutionStatus), Error> { - self.with_committee_cache(head_block_root, epoch, |committee_cache, dependent_root| { - let duties = validator_indices - .iter() - .map(|validator_index| { - let validator_index = *validator_index as usize; - committee_cache.get_attestation_duties(validator_index) - }) - .collect(); + let execution_status = self + .canonical_head + .fork_choice_read_lock() + .get_block_execution_status(&head_block_root) + .ok_or(Error::AttestationHeadNotInForkChoice(head_block_root))?; - let execution_status = self - .canonical_head - .fork_choice_read_lock() - .get_block_execution_status(&head_block_root) - .ok_or(Error::AttestationHeadNotInForkChoice(head_block_root))?; + let (duties, dependent_root) = self.with_committee_cache( + head_block_root, + epoch, + |committee_cache, dependent_root| { + let duties = validator_indices + .iter() + .map(|validator_index| { + let validator_index = *validator_index as usize; + committee_cache.get_attestation_duties(validator_index) + }) + .collect(); - Ok((duties, dependent_root, execution_status)) - }) + Ok((duties, dependent_root)) + }, + )?; + Ok((duties, dependent_root, execution_status)) } /// Returns an aggregated `Attestation`, if any, that has a matching `attestation.data`. @@ -2908,6 +2913,7 @@ impl BeaconChain { event_handler.register(EventKind::Block(SseBlock { slot, block: block_root, + execution_optimistic: payload_verification_status.is_optimistic(), })); } } @@ -4055,9 +4061,9 @@ impl BeaconChain { /// /// Returns `Ok(false)` if the block is pre-Bellatrix, or has `ExecutionStatus::Valid`. /// Returns `Ok(true)` if the block has `ExecutionStatus::Optimistic`. - pub fn is_optimistic_block( + pub fn is_optimistic_block>( &self, - block: &SignedBeaconBlock, + block: &SignedBeaconBlock, ) -> Result { // Check if the block is pre-Bellatrix. if self.slot_is_prior_to_bellatrix(block.slot()) { @@ -4081,9 +4087,9 @@ impl BeaconChain { /// /// There is a potential race condition when syncing where the block_root of `head_block` could /// be pruned from the fork choice store before being read. - pub fn is_optimistic_head_block( + pub fn is_optimistic_head_block>( &self, - head_block: &SignedBeaconBlock, + head_block: &SignedBeaconBlock, ) -> Result { // Check if the block is pre-Bellatrix. if self.slot_is_prior_to_bellatrix(head_block.slot()) { diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index a07b346c1b..aff4deeaf9 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -300,6 +300,23 @@ impl CanonicalHead { .ok_or(Error::HeadMissingFromForkChoice(head_block_root)) } + /// Returns a clone of the `CachedHead` and the execution status of the contained head block. + /// + /// This will only return `Err` in the scenario where `self.fork_choice` has advanced + /// significantly past the cached `head_snapshot`. In such a scenario it is likely prudent to + /// run `BeaconChain::recompute_head` to update the cached values. + pub fn head_and_execution_status( + &self, + ) -> Result<(CachedHead, ExecutionStatus), Error> { + let head = self.cached_head(); + let head_block_root = head.head_block_root(); + let execution_status = self + .fork_choice_read_lock() + .get_block_execution_status(&head_block_root) + .ok_or(Error::HeadMissingFromForkChoice(head_block_root))?; + Ok((head, execution_status)) + } + /// Returns a clone of `self.cached_head`. /// /// Takes a read-lock on `self.cached_head` for a short time (just long enough to clone it). @@ -713,6 +730,7 @@ impl BeaconChain { ) -> Result<(), Error> { let old_snapshot = &old_cached_head.snapshot; let new_snapshot = &new_cached_head.snapshot; + let new_head_is_optimistic = new_head_proto_block.execution_status.is_optimistic(); // Detect and potentially report any re-orgs. let reorg_distance = detect_reorg( @@ -798,6 +816,7 @@ impl BeaconChain { current_duty_dependent_root, previous_duty_dependent_root, epoch_transition: is_epoch_transition, + execution_optimistic: new_head_is_optimistic, })); } (Err(e), _) | (_, Err(e)) => { @@ -825,6 +844,7 @@ impl BeaconChain { new_head_block: new_snapshot.beacon_block_root, new_head_state: new_snapshot.beacon_state_root(), epoch: head_slot.epoch(T::EthSpec::slots_per_epoch()), + execution_optimistic: new_head_is_optimistic, })); } } @@ -841,6 +861,7 @@ impl BeaconChain { finalized_proto_block: ProtoBlock, ) -> Result<(), Error> { let new_snapshot = &new_cached_head.snapshot; + let finalized_block_is_optimistic = finalized_proto_block.execution_status.is_optimistic(); self.op_pool .prune_all(&new_snapshot.beacon_state, self.epoch()?); @@ -884,6 +905,7 @@ impl BeaconChain { // specific state root at the first slot of the finalized epoch (which // might be a skip slot). state: finalized_proto_block.state_root, + execution_optimistic: finalized_block_is_optimistic, })); } } @@ -1216,6 +1238,7 @@ fn observe_head_block_delays( let block_time_set_as_head = timestamp_now(); let head_block_root = head_block.root; let head_block_slot = head_block.slot; + let head_block_is_optimistic = head_block.execution_status.is_optimistic(); // Calculate the total delay between the start of the slot and when it was set as head. let block_delay_total = get_slot_delay_ms(block_time_set_as_head, head_block_slot, slot_clock); @@ -1308,6 +1331,7 @@ fn observe_head_block_delays( observed_delay: block_delays.observed, imported_delay: block_delays.imported, set_as_head_delay: block_delays.set_as_head, + execution_optimistic: head_block_is_optimistic, })); } } diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 2adae6c166..e9dc8619ac 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -37,6 +37,7 @@ use state_processing::{ }; use std::borrow::Cow; use std::collections::{HashMap, HashSet}; +use std::fmt; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; @@ -1778,3 +1779,10 @@ where (honest_head, faulty_head) } } + +// Junk `Debug` impl to satistfy certain trait bounds during testing. +impl fmt::Debug for BeaconChainHarness { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "BeaconChainHarness") + } +} diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 07fb992393..5cc703aa1a 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -40,6 +40,7 @@ tree_hash = "0.4.1" sensitive_url = { path = "../../common/sensitive_url" } logging = { path = "../../common/logging" } serde_json = "1.0.58" +proto_array = { path = "../../consensus/proto_array" } [[test]] name = "bn_http_api_tests" diff --git a/beacon_node/http_api/src/attester_duties.rs b/beacon_node/http_api/src/attester_duties.rs index 35a35bcb74..6805d7104c 100644 --- a/beacon_node/http_api/src/attester_duties.rs +++ b/beacon_node/http_api/src/attester_duties.rs @@ -60,11 +60,17 @@ fn cached_attestation_duties( ) -> Result { let head_block_root = chain.canonical_head.cached_head().head_block_root(); - let (duties, dependent_root, _execution_status) = chain + let (duties, dependent_root, execution_status) = chain .validator_attestation_duties(request_indices, request_epoch, head_block_root) .map_err(warp_utils::reject::beacon_chain_error)?; - convert_to_api_response(duties, request_indices, dependent_root, chain) + convert_to_api_response( + duties, + request_indices, + dependent_root, + execution_status.is_optimistic(), + chain, + ) } /// Compute some attester duties by reading a `BeaconState` from disk, completely ignoring the @@ -76,35 +82,42 @@ fn compute_historic_attester_duties( ) -> Result { // If the head is quite old then it might still be relevant for a historical request. // - // Use the `with_head` function to read & clone in a single call to avoid race conditions. - let state_opt = chain - .with_head(|head| { - if head.beacon_state.current_epoch() <= request_epoch { - Ok(Some(( - head.beacon_state_root(), - head.beacon_state - .clone_with(CloneConfig::committee_caches_only()), - ))) - } else { - Ok(None) - } - }) - .map_err(warp_utils::reject::beacon_chain_error)?; + // Avoid holding the `cached_head` longer than necessary. + let state_opt = { + let (cached_head, execution_status) = chain + .canonical_head + .head_and_execution_status() + .map_err(warp_utils::reject::beacon_chain_error)?; + let head = &cached_head.snapshot; - let mut state = if let Some((state_root, mut state)) = state_opt { - // If we've loaded the head state it might be from a previous epoch, ensure it's in a - // suitable epoch. - ensure_state_knows_attester_duties_for_epoch( - &mut state, - state_root, - request_epoch, - &chain.spec, - )?; - state - } else { - StateId::slot(request_epoch.start_slot(T::EthSpec::slots_per_epoch())).state(chain)? + if head.beacon_state.current_epoch() <= request_epoch { + Some(( + head.beacon_state_root(), + head.beacon_state + .clone_with(CloneConfig::committee_caches_only()), + execution_status.is_optimistic(), + )) + } else { + None + } }; + let (mut state, execution_optimistic) = + if let Some((state_root, mut state, execution_optimistic)) = state_opt { + // If we've loaded the head state it might be from a previous epoch, ensure it's in a + // suitable epoch. + ensure_state_knows_attester_duties_for_epoch( + &mut state, + state_root, + request_epoch, + &chain.spec, + )?; + (state, execution_optimistic) + } else { + StateId::from_slot(request_epoch.start_slot(T::EthSpec::slots_per_epoch())) + .state(chain)? + }; + // Sanity-check the state lookup. if !(state.current_epoch() == request_epoch || state.current_epoch() + 1 == request_epoch) { return Err(warp_utils::reject::custom_server_error(format!( @@ -140,7 +153,13 @@ fn compute_historic_attester_duties( .collect::>() .map_err(warp_utils::reject::beacon_chain_error)?; - convert_to_api_response(duties, request_indices, dependent_root, chain) + convert_to_api_response( + duties, + request_indices, + dependent_root, + execution_optimistic, + chain, + ) } fn ensure_state_knows_attester_duties_for_epoch( @@ -178,6 +197,7 @@ fn convert_to_api_response( duties: Vec>, indices: &[u64], dependent_root: Hash256, + execution_optimistic: bool, chain: &BeaconChain, ) -> Result { // Protect against an inconsistent slot clock. @@ -213,6 +233,7 @@ fn convert_to_api_response( Ok(api_types::DutiesResponse { dependent_root, + execution_optimistic: Some(execution_optimistic), data, }) } diff --git a/beacon_node/http_api/src/block_id.rs b/beacon_node/http_api/src/block_id.rs index 73f50985bd..91425e2f10 100644 --- a/beacon_node/http_api/src/block_id.rs +++ b/beacon_node/http_api/src/block_id.rs @@ -1,8 +1,10 @@ -use beacon_chain::{BeaconChain, BeaconChainTypes, WhenSlotSkipped}; +use crate::{state_id::checkpoint_slot_and_execution_optimistic, ExecutionOptimistic}; +use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes, WhenSlotSkipped}; use eth2::types::BlockId as CoreBlockId; +use std::fmt; use std::str::FromStr; use std::sync::Arc; -use types::{BlindedPayload, Hash256, SignedBeaconBlock, Slot}; +use types::{Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, Slot}; /// Wraps `eth2::types::BlockId` and provides a simple way to obtain a block or root for a given /// `BlockId`. @@ -22,32 +24,78 @@ impl BlockId { pub fn root( &self, chain: &BeaconChain, - ) -> Result { + ) -> Result<(Hash256, ExecutionOptimistic), warp::Rejection> { match &self.0 { - CoreBlockId::Head => Ok(chain.canonical_head.cached_head().head_block_root()), - CoreBlockId::Genesis => Ok(chain.genesis_block_root), - CoreBlockId::Finalized => Ok(chain - .canonical_head - .cached_head() - .finalized_checkpoint() - .root), - CoreBlockId::Justified => Ok(chain - .canonical_head - .cached_head() - .justified_checkpoint() - .root), - CoreBlockId::Slot(slot) => chain - .block_root_at_slot(*slot, WhenSlotSkipped::None) - .map_err(warp_utils::reject::beacon_chain_error) - .and_then(|root_opt| { - root_opt.ok_or_else(|| { - warp_utils::reject::custom_not_found(format!( - "beacon block at slot {}", - slot - )) - }) - }), - CoreBlockId::Root(root) => Ok(*root), + CoreBlockId::Head => { + let (cached_head, execution_status) = chain + .canonical_head + .head_and_execution_status() + .map_err(warp_utils::reject::beacon_chain_error)?; + Ok(( + cached_head.head_block_root(), + execution_status.is_optimistic(), + )) + } + CoreBlockId::Genesis => Ok((chain.genesis_block_root, false)), + CoreBlockId::Finalized => { + let finalized_checkpoint = + chain.canonical_head.cached_head().finalized_checkpoint(); + let (_slot, execution_optimistic) = + checkpoint_slot_and_execution_optimistic(chain, finalized_checkpoint)?; + Ok((finalized_checkpoint.root, execution_optimistic)) + } + CoreBlockId::Justified => { + let justified_checkpoint = + chain.canonical_head.cached_head().justified_checkpoint(); + let (_slot, execution_optimistic) = + checkpoint_slot_and_execution_optimistic(chain, justified_checkpoint)?; + Ok((justified_checkpoint.root, execution_optimistic)) + } + CoreBlockId::Slot(slot) => { + let execution_optimistic = chain + .is_optimistic_head() + .map_err(warp_utils::reject::beacon_chain_error)?; + let root = chain + .block_root_at_slot(*slot, WhenSlotSkipped::None) + .map_err(warp_utils::reject::beacon_chain_error) + .and_then(|root_opt| { + root_opt.ok_or_else(|| { + warp_utils::reject::custom_not_found(format!( + "beacon block at slot {}", + slot + )) + }) + })?; + Ok((root, execution_optimistic)) + } + CoreBlockId::Root(root) => { + // This matches the behaviour of other consensus clients (e.g. Teku). + if root == &Hash256::zero() { + return Err(warp_utils::reject::custom_not_found(format!( + "beacon block with root {}", + root + ))); + }; + if chain + .store + .block_exists(root) + .map_err(BeaconChainError::DBError) + .map_err(warp_utils::reject::beacon_chain_error)? + { + let execution_optimistic = chain + .canonical_head + .fork_choice_read_lock() + .is_optimistic_block(root) + .map_err(BeaconChainError::ForkChoiceError) + .map_err(warp_utils::reject::beacon_chain_error)?; + Ok((*root, execution_optimistic)) + } else { + return Err(warp_utils::reject::custom_not_found(format!( + "beacon block with root {}", + root + ))); + } + } } } @@ -55,11 +103,20 @@ impl BlockId { pub fn blinded_block( &self, chain: &BeaconChain, - ) -> Result>, warp::Rejection> { + ) -> Result<(SignedBlindedBeaconBlock, ExecutionOptimistic), warp::Rejection> { match &self.0 { - CoreBlockId::Head => Ok(chain.head_beacon_block().clone_as_blinded()), + CoreBlockId::Head => { + let (cached_head, execution_status) = chain + .canonical_head + .head_and_execution_status() + .map_err(warp_utils::reject::beacon_chain_error)?; + Ok(( + cached_head.snapshot.beacon_block.clone_as_blinded(), + execution_status.is_optimistic(), + )) + } CoreBlockId::Slot(slot) => { - let root = self.root(chain)?; + let (root, execution_optimistic) = self.root(chain)?; chain .get_blinded_block(&root) .map_err(warp_utils::reject::beacon_chain_error) @@ -71,7 +128,7 @@ impl BlockId { slot ))); } - Ok(block) + Ok((block, execution_optimistic)) } None => Err(warp_utils::reject::custom_not_found(format!( "beacon block with root {}", @@ -80,8 +137,8 @@ impl BlockId { }) } _ => { - let root = self.root(chain)?; - chain + let (root, execution_optimistic) = self.root(chain)?; + let block = chain .get_blinded_block(&root) .map_err(warp_utils::reject::beacon_chain_error) .and_then(|root_opt| { @@ -91,7 +148,8 @@ impl BlockId { root )) }) - }) + })?; + Ok((block, execution_optimistic)) } } } @@ -100,11 +158,20 @@ impl BlockId { pub async fn full_block( &self, chain: &BeaconChain, - ) -> Result>, warp::Rejection> { + ) -> Result<(Arc>, ExecutionOptimistic), warp::Rejection> { match &self.0 { - CoreBlockId::Head => Ok(chain.head_beacon_block()), + CoreBlockId::Head => { + let (cached_head, execution_status) = chain + .canonical_head + .head_and_execution_status() + .map_err(warp_utils::reject::beacon_chain_error)?; + Ok(( + cached_head.snapshot.beacon_block.clone(), + execution_status.is_optimistic(), + )) + } CoreBlockId::Slot(slot) => { - let root = self.root(chain)?; + let (root, execution_optimistic) = self.root(chain)?; chain .get_block(&root) .await @@ -117,7 +184,7 @@ impl BlockId { slot ))); } - Ok(Arc::new(block)) + Ok((Arc::new(block), execution_optimistic)) } None => Err(warp_utils::reject::custom_not_found(format!( "beacon block with root {}", @@ -126,18 +193,20 @@ impl BlockId { }) } _ => { - let root = self.root(chain)?; + let (root, execution_optimistic) = self.root(chain)?; chain .get_block(&root) .await .map_err(warp_utils::reject::beacon_chain_error) .and_then(|block_opt| { - block_opt.map(Arc::new).ok_or_else(|| { - warp_utils::reject::custom_not_found(format!( - "beacon block with root {}", - root - )) - }) + block_opt + .map(|block| (Arc::new(block), execution_optimistic)) + .ok_or_else(|| { + warp_utils::reject::custom_not_found(format!( + "beacon block with root {}", + root + )) + }) }) } } @@ -151,3 +220,9 @@ impl FromStr for BlockId { CoreBlockId::from_str(s).map(Self) } } + +impl fmt::Display for BlockId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 14f260e57b..a27e5015cf 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -25,7 +25,7 @@ use beacon_chain::{ AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, ProduceBlockVerification, WhenSlotSkipped, }; -use block_id::BlockId; +pub use block_id::BlockId; use eth2::types::{self as api_types, EndpointVersion, ValidatorId}; use lighthouse_network::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage}; use lighthouse_version::version_with_platform; @@ -34,7 +34,7 @@ use serde::{Deserialize, Serialize}; use slog::{crit, debug, error, info, warn, Logger}; use slot_clock::SlotClock; use ssz::Encode; -use state_id::StateId; +pub use state_id::StateId; use std::borrow::Cow; use std::convert::TryInto; use std::future::Future; @@ -53,8 +53,8 @@ use types::{ SyncCommitteeMessage, SyncContributionData, }; use version::{ - add_consensus_version_header, fork_versioned_response, inconsistent_fork_rejection, - unsupported_version_rejection, V1, + add_consensus_version_header, execution_optimistic_fork_versioned_response, + fork_versioned_response, inconsistent_fork_rejection, unsupported_version_rejection, V1, V2, }; use warp::http::StatusCode; use warp::sse::Event; @@ -77,6 +77,9 @@ const SYNC_TOLERANCE_EPOCHS: u64 = 8; /// A custom type which allows for both unsecured and TLS-enabled HTTP servers. type HttpServer = (SocketAddr, Pin + Send>>); +/// Alias for readability. +pub type ExecutionOptimistic = bool; + /// Configuration used when serving the HTTP server over TLS. #[derive(PartialEq, Debug, Clone, Serialize, Deserialize)] pub struct TlsConfig { @@ -304,7 +307,7 @@ pub fn serve( .untuple_one() }; - let eth1_v1 = single_version(V1); + let eth_v1 = single_version(V1); // Create a `warp` filter that provides access to the network globals. let inner_network_globals = ctx.network_globals.clone(); @@ -413,7 +416,7 @@ pub fn serve( */ // GET beacon/genesis - let get_beacon_genesis = eth1_v1 + let get_beacon_genesis = eth_v1 .and(warp::path("beacon")) .and(warp::path("genesis")) .and(warp::path::end()) @@ -433,7 +436,7 @@ pub fn serve( * beacon/states/{state_id} */ - let beacon_states_path = eth1_v1 + let beacon_states_path = eth_v1 .and(warp::path("beacon")) .and(warp::path("states")) .and(warp::path::param::().or_else(|_| async { @@ -450,10 +453,12 @@ pub fn serve( .and(warp::path::end()) .and_then(|state_id: StateId, chain: Arc>| { blocking_json_task(move || { - state_id - .root(&chain) + let (root, execution_optimistic) = state_id.root(&chain)?; + + Ok(root) .map(api_types::RootData::from) .map(api_types::GenericResponse::from) + .map(|resp| resp.add_execution_optimistic(execution_optimistic)) }) }); @@ -463,7 +468,14 @@ pub fn serve( .and(warp::path("fork")) .and(warp::path::end()) .and_then(|state_id: StateId, chain: Arc>| { - blocking_json_task(move || state_id.fork(&chain).map(api_types::GenericResponse::from)) + blocking_json_task(move || { + let (fork, execution_optimistic) = + state_id.fork_and_execution_optimistic(&chain)?; + Ok(api_types::ExecutionOptimisticResponse { + data: fork, + execution_optimistic: Some(execution_optimistic), + }) + }) }); // GET beacon/states/{state_id}/finality_checkpoints @@ -473,15 +485,24 @@ pub fn serve( .and(warp::path::end()) .and_then(|state_id: StateId, chain: Arc>| { blocking_json_task(move || { - state_id - .map_state(&chain, |state| { - Ok(api_types::FinalityCheckpointsData { - previous_justified: state.previous_justified_checkpoint(), - current_justified: state.current_justified_checkpoint(), - finalized: state.finalized_checkpoint(), - }) - }) - .map(api_types::GenericResponse::from) + let (data, execution_optimistic) = state_id.map_state_and_execution_optimistic( + &chain, + |state, execution_optimistic| { + Ok(( + api_types::FinalityCheckpointsData { + previous_justified: state.previous_justified_checkpoint(), + current_justified: state.current_justified_checkpoint(), + finalized: state.finalized_checkpoint(), + }, + execution_optimistic, + )) + }, + )?; + + Ok(api_types::ExecutionOptimisticResponse { + data, + execution_optimistic: Some(execution_optimistic), + }) }) }); @@ -497,35 +518,45 @@ pub fn serve( query_res: Result| { blocking_json_task(move || { let query = query_res?; - state_id - .map_state(&chain, |state| { - Ok(state - .validators() - .iter() - .zip(state.balances().iter()) - .enumerate() - // filter by validator id(s) if provided - .filter(|(index, (validator, _))| { - query.id.as_ref().map_or(true, |ids| { - ids.iter().any(|id| match id { - ValidatorId::PublicKey(pubkey) => { - &validator.pubkey == pubkey - } - ValidatorId::Index(param_index) => { - *param_index == *index as u64 - } + let (data, execution_optimistic) = state_id + .map_state_and_execution_optimistic( + &chain, + |state, execution_optimistic| { + Ok(( + state + .validators() + .iter() + .zip(state.balances().iter()) + .enumerate() + // filter by validator id(s) if provided + .filter(|(index, (validator, _))| { + query.id.as_ref().map_or(true, |ids| { + ids.iter().any(|id| match id { + ValidatorId::PublicKey(pubkey) => { + &validator.pubkey == pubkey + } + ValidatorId::Index(param_index) => { + *param_index == *index as u64 + } + }) + }) }) - }) - }) - .map(|(index, (_, balance))| { - Some(api_types::ValidatorBalanceData { - index: index as u64, - balance: *balance, - }) - }) - .collect::>()) - }) - .map(api_types::GenericResponse::from) + .map(|(index, (_, balance))| { + Some(api_types::ValidatorBalanceData { + index: index as u64, + balance: *balance, + }) + }) + .collect::>(), + execution_optimistic, + )) + }, + )?; + + Ok(api_types::ExecutionOptimisticResponse { + data, + execution_optimistic: Some(execution_optimistic), + }) }) }, ); @@ -542,57 +573,67 @@ pub fn serve( query_res: Result| { blocking_json_task(move || { let query = query_res?; - state_id - .map_state(&chain, |state| { - let epoch = state.current_epoch(); - let far_future_epoch = chain.spec.far_future_epoch; + let (data, execution_optimistic) = state_id + .map_state_and_execution_optimistic( + &chain, + |state, execution_optimistic| { + let epoch = state.current_epoch(); + let far_future_epoch = chain.spec.far_future_epoch; - Ok(state - .validators() - .iter() - .zip(state.balances().iter()) - .enumerate() - // filter by validator id(s) if provided - .filter(|(index, (validator, _))| { - query.id.as_ref().map_or(true, |ids| { - ids.iter().any(|id| match id { - ValidatorId::PublicKey(pubkey) => { - &validator.pubkey == pubkey - } - ValidatorId::Index(param_index) => { - *param_index == *index as u64 + Ok(( + state + .validators() + .iter() + .zip(state.balances().iter()) + .enumerate() + // filter by validator id(s) if provided + .filter(|(index, (validator, _))| { + query.id.as_ref().map_or(true, |ids| { + ids.iter().any(|id| match id { + ValidatorId::PublicKey(pubkey) => { + &validator.pubkey == pubkey + } + ValidatorId::Index(param_index) => { + *param_index == *index as u64 + } + }) + }) + }) + // filter by status(es) if provided and map the result + .filter_map(|(index, (validator, balance))| { + let status = api_types::ValidatorStatus::from_validator( + validator, + epoch, + far_future_epoch, + ); + + let status_matches = + query.status.as_ref().map_or(true, |statuses| { + statuses.contains(&status) + || statuses.contains(&status.superstatus()) + }); + + if status_matches { + Some(api_types::ValidatorData { + index: index as u64, + balance: *balance, + status, + validator: validator.clone(), + }) + } else { + None } }) - }) - }) - // filter by status(es) if provided and map the result - .filter_map(|(index, (validator, balance))| { - let status = api_types::ValidatorStatus::from_validator( - validator, - epoch, - far_future_epoch, - ); + .collect::>(), + execution_optimistic, + )) + }, + )?; - let status_matches = - query.status.as_ref().map_or(true, |statuses| { - statuses.contains(&status) - || statuses.contains(&status.superstatus()) - }); - - if status_matches { - Some(api_types::ValidatorData { - index: index as u64, - balance: *balance, - status, - validator: validator.clone(), - }) - } else { - None - } - }) - .collect::>()) - }) - .map(api_types::GenericResponse::from) + Ok(api_types::ExecutionOptimisticResponse { + data, + execution_optimistic: Some(execution_optimistic), + }) }) }, ); @@ -610,41 +651,51 @@ pub fn serve( .and_then( |state_id: StateId, chain: Arc>, validator_id: ValidatorId| { blocking_json_task(move || { - state_id - .map_state(&chain, |state| { - let index_opt = match &validator_id { - ValidatorId::PublicKey(pubkey) => { - state.validators().iter().position(|v| v.pubkey == *pubkey) - } - ValidatorId::Index(index) => Some(*index as usize), - }; + let (data, execution_optimistic) = state_id + .map_state_and_execution_optimistic( + &chain, + |state, execution_optimistic| { + let index_opt = match &validator_id { + ValidatorId::PublicKey(pubkey) => { + state.validators().iter().position(|v| v.pubkey == *pubkey) + } + ValidatorId::Index(index) => Some(*index as usize), + }; - index_opt - .and_then(|index| { - let validator = state.validators().get(index)?; - let balance = *state.balances().get(index)?; - let epoch = state.current_epoch(); - let far_future_epoch = chain.spec.far_future_epoch; + Ok(( + index_opt + .and_then(|index| { + let validator = state.validators().get(index)?; + let balance = *state.balances().get(index)?; + let epoch = state.current_epoch(); + let far_future_epoch = chain.spec.far_future_epoch; - Some(api_types::ValidatorData { - index: index as u64, - balance, - status: api_types::ValidatorStatus::from_validator( - validator, - epoch, - far_future_epoch, - ), - validator: validator.clone(), - }) - }) - .ok_or_else(|| { - warp_utils::reject::custom_not_found(format!( - "unknown validator: {}", - validator_id - )) - }) - }) - .map(api_types::GenericResponse::from) + Some(api_types::ValidatorData { + index: index as u64, + balance, + status: api_types::ValidatorStatus::from_validator( + validator, + epoch, + far_future_epoch, + ), + validator: validator.clone(), + }) + }) + .ok_or_else(|| { + warp_utils::reject::custom_not_found(format!( + "unknown validator: {}", + validator_id + )) + })?, + execution_optimistic, + )) + }, + )?; + + Ok(api_types::ExecutionOptimisticResponse { + data, + execution_optimistic: Some(execution_optimistic), + }) }) }, ); @@ -658,86 +709,98 @@ pub fn serve( .and_then( |state_id: StateId, chain: Arc>, query: api_types::CommitteesQuery| { blocking_json_task(move || { - state_id.map_state(&chain, |state| { - let current_epoch = state.current_epoch(); - let epoch = query.epoch.unwrap_or(current_epoch); + let (data, execution_optimistic) = state_id + .map_state_and_execution_optimistic( + &chain, + |state, execution_optimistic| { + let current_epoch = state.current_epoch(); + let epoch = query.epoch.unwrap_or(current_epoch); - let committee_cache = match RelativeEpoch::from_epoch(current_epoch, epoch) - { - Ok(relative_epoch) - if state.committee_cache_is_initialized(relative_epoch) => - { - state.committee_cache(relative_epoch).map(Cow::Borrowed) - } - _ => CommitteeCache::initialized(state, epoch, &chain.spec) - .map(Cow::Owned), - } - .map_err(|e| match e { - BeaconStateError::EpochOutOfBounds => { - let max_sprp = T::EthSpec::slots_per_historical_root() as u64; - let first_subsequent_restore_point_slot = - ((epoch.start_slot(T::EthSpec::slots_per_epoch()) / max_sprp) - + 1) - * max_sprp; - if epoch < current_epoch { - warp_utils::reject::custom_bad_request(format!( - "epoch out of bounds, try state at slot {}", - first_subsequent_restore_point_slot, - )) - } else { - warp_utils::reject::custom_bad_request( - "epoch out of bounds, too far in future".into(), - ) + let committee_cache = + match RelativeEpoch::from_epoch(current_epoch, epoch) { + Ok(relative_epoch) + if state + .committee_cache_is_initialized(relative_epoch) => + { + state.committee_cache(relative_epoch).map(Cow::Borrowed) + } + _ => CommitteeCache::initialized(state, epoch, &chain.spec) + .map(Cow::Owned), + } + .map_err(|e| match e { + BeaconStateError::EpochOutOfBounds => { + let max_sprp = + T::EthSpec::slots_per_historical_root() as u64; + let first_subsequent_restore_point_slot = ((epoch + .start_slot(T::EthSpec::slots_per_epoch()) + / max_sprp) + + 1) + * max_sprp; + if epoch < current_epoch { + warp_utils::reject::custom_bad_request(format!( + "epoch out of bounds, try state at slot {}", + first_subsequent_restore_point_slot, + )) + } else { + warp_utils::reject::custom_bad_request( + "epoch out of bounds, too far in future".into(), + ) + } + } + _ => warp_utils::reject::beacon_chain_error(e.into()), + })?; + + // Use either the supplied slot or all slots in the epoch. + let slots = + query.slot.map(|slot| vec![slot]).unwrap_or_else(|| { + epoch.slot_iter(T::EthSpec::slots_per_epoch()).collect() + }); + + // Use either the supplied committee index or all available indices. + let indices = + query.index.map(|index| vec![index]).unwrap_or_else(|| { + (0..committee_cache.committees_per_slot()).collect() + }); + + let mut response = Vec::with_capacity(slots.len() * indices.len()); + + for slot in slots { + // It is not acceptable to query with a slot that is not within the + // specified epoch. + if slot.epoch(T::EthSpec::slots_per_epoch()) != epoch { + return Err(warp_utils::reject::custom_bad_request( + format!("{} is not in epoch {}", slot, epoch), + )); + } + + for &index in &indices { + let committee = committee_cache + .get_beacon_committee(slot, index) + .ok_or_else(|| { + warp_utils::reject::custom_bad_request(format!( + "committee index {} does not exist in epoch {}", + index, epoch + )) + })?; + + response.push(api_types::CommitteeData { + index, + slot, + validators: committee + .committee + .iter() + .map(|i| *i as u64) + .collect(), + }); + } } - } - _ => warp_utils::reject::beacon_chain_error(e.into()), - })?; - // Use either the supplied slot or all slots in the epoch. - let slots = query.slot.map(|slot| vec![slot]).unwrap_or_else(|| { - epoch.slot_iter(T::EthSpec::slots_per_epoch()).collect() - }); - - // Use either the supplied committee index or all available indices. - let indices = query.index.map(|index| vec![index]).unwrap_or_else(|| { - (0..committee_cache.committees_per_slot()).collect() - }); - - let mut response = Vec::with_capacity(slots.len() * indices.len()); - - for slot in slots { - // It is not acceptable to query with a slot that is not within the - // specified epoch. - if slot.epoch(T::EthSpec::slots_per_epoch()) != epoch { - return Err(warp_utils::reject::custom_bad_request(format!( - "{} is not in epoch {}", - slot, epoch - ))); - } - - for &index in &indices { - let committee = committee_cache - .get_beacon_committee(slot, index) - .ok_or_else(|| { - warp_utils::reject::custom_bad_request(format!( - "committee index {} does not exist in epoch {}", - index, epoch - )) - })?; - - response.push(api_types::CommitteeData { - index, - slot, - validators: committee - .committee - .iter() - .map(|i| *i as u64) - .collect(), - }); - } - } - - Ok(api_types::GenericResponse::from(response)) + Ok((response, execution_optimistic)) + }, + )?; + Ok(api_types::ExecutionOptimisticResponse { + data, + execution_optimistic: Some(execution_optimistic), }) }) }, @@ -754,28 +817,35 @@ pub fn serve( chain: Arc>, query: api_types::SyncCommitteesQuery| { blocking_json_task(move || { - let sync_committee = state_id.map_state(&chain, |state| { - let current_epoch = state.current_epoch(); - let epoch = query.epoch.unwrap_or(current_epoch); - state - .get_built_sync_committee(epoch, &chain.spec) - .map(|committee| committee.clone()) - .map_err(|e| match e { - BeaconStateError::SyncCommitteeNotKnown { .. } => { - warp_utils::reject::custom_bad_request(format!( + let (sync_committee, execution_optimistic) = state_id + .map_state_and_execution_optimistic( + &chain, + |state, execution_optimistic| { + let current_epoch = state.current_epoch(); + let epoch = query.epoch.unwrap_or(current_epoch); + Ok(( + state + .get_built_sync_committee(epoch, &chain.spec) + .map(|committee| committee.clone()) + .map_err(|e| match e { + BeaconStateError::SyncCommitteeNotKnown { .. } => { + warp_utils::reject::custom_bad_request(format!( "state at epoch {} has no sync committee for epoch {}", current_epoch, epoch )) - } - BeaconStateError::IncorrectStateVariant => { - warp_utils::reject::custom_bad_request(format!( - "state at epoch {} is not activated for Altair", - current_epoch, - )) - } - e => warp_utils::reject::beacon_state_error(e), - }) - })?; + } + BeaconStateError::IncorrectStateVariant => { + warp_utils::reject::custom_bad_request(format!( + "state at epoch {} is not activated for Altair", + current_epoch, + )) + } + e => warp_utils::reject::beacon_state_error(e), + })?, + execution_optimistic, + )) + }, + )?; let validators = chain .validator_indices(sync_committee.pubkeys.iter()) @@ -793,7 +863,8 @@ pub fn serve( validator_aggregates, }; - Ok(api_types::GenericResponse::from(response)) + Ok(api_types::GenericResponse::from(response) + .add_execution_optimistic(execution_optimistic)) }) }, ); @@ -805,7 +876,7 @@ pub fn serve( // things. Returning non-canonical things is hard for us since we don't already have a // mechanism for arbitrary forwards block iteration, we only support iterating forwards along // the canonical chain. - let get_beacon_headers = eth1_v1 + let get_beacon_headers = eth_v1 .and(warp::path("beacon")) .and(warp::path("headers")) .and(warp::query::()) @@ -814,15 +885,24 @@ pub fn serve( .and_then( |query: api_types::HeadersQuery, chain: Arc>| { blocking_json_task(move || { - let (root, block) = match (query.slot, query.parent_root) { + let (root, block, execution_optimistic) = match (query.slot, query.parent_root) + { // No query parameters, return the canonical head block. (None, None) => { - let block = chain.head_beacon_block(); - (block.canonical_root(), block.clone_as_blinded()) + let (cached_head, execution_status) = chain + .canonical_head + .head_and_execution_status() + .map_err(warp_utils::reject::beacon_chain_error)?; + ( + cached_head.head_block_root(), + cached_head.snapshot.beacon_block.clone_as_blinded(), + execution_status.is_optimistic(), + ) } // Only the parent root parameter, do a forwards-iterator lookup. (None, Some(parent_root)) => { - let parent = BlockId::from_root(parent_root).blinded_block(&chain)?; + let (parent, execution_optimistic) = + BlockId::from_root(parent_root).blinded_block(&chain)?; let (root, _slot) = chain .forwards_iter_block_roots(parent.slot()) .map_err(warp_utils::reject::beacon_chain_error)? @@ -841,13 +921,21 @@ pub fn serve( BlockId::from_root(root) .blinded_block(&chain) - .map(|block| (root, block))? + // Ignore this `execution_optimistic` since the first value has + // more information about the original request. + .map(|(block, _execution_optimistic)| { + (root, block, execution_optimistic) + })? } // Slot is supplied, search by slot and optionally filter by // parent root. (Some(slot), parent_root_opt) => { - let root = BlockId::from_slot(slot).root(&chain)?; - let block = BlockId::from_root(root).blinded_block(&chain)?; + let (root, execution_optimistic) = + BlockId::from_slot(slot).root(&chain)?; + // Ignore the second `execution_optimistic`, the first one is the + // most relevant since it knows that we queried by slot. + let (block, _execution_optimistic) = + BlockId::from_root(root).blinded_block(&chain)?; // If the parent root was supplied, check that it matches the block // obtained via a slot lookup. @@ -860,7 +948,7 @@ pub fn serve( } } - (root, block) + (root, block, execution_optimistic) } }; @@ -873,13 +961,14 @@ pub fn serve( }, }; - Ok(api_types::GenericResponse::from(vec![data])) + Ok(api_types::GenericResponse::from(vec![data]) + .add_execution_optimistic(execution_optimistic)) }) }, ); // GET beacon/headers/{block_id} - let get_beacon_headers_block_id = eth1_v1 + let get_beacon_headers_block_id = eth_v1 .and(warp::path("beacon")) .and(warp::path("headers")) .and(warp::path::param::().or_else(|_| async { @@ -891,8 +980,11 @@ pub fn serve( .and(chain_filter.clone()) .and_then(|block_id: BlockId, chain: Arc>| { blocking_json_task(move || { - let root = block_id.root(&chain)?; - let block = BlockId::from_root(root).blinded_block(&chain)?; + let (root, execution_optimistic) = block_id.root(&chain)?; + // Ignore the second `execution_optimistic` since the first one has more + // information about the original request. + let (block, _execution_optimistic) = + BlockId::from_root(root).blinded_block(&chain)?; let canonical = chain .block_root_at_slot(block.slot(), WhenSlotSkipped::None) @@ -908,7 +1000,10 @@ pub fn serve( }, }; - Ok(api_types::GenericResponse::from(data)) + Ok(api_types::ExecutionOptimisticResponse { + execution_optimistic: Some(execution_optimistic), + data, + }) }) }); @@ -917,7 +1012,7 @@ pub fn serve( */ // POST beacon/blocks - let post_beacon_blocks = eth1_v1 + let post_beacon_blocks = eth_v1 .and(warp::path("beacon")) .and(warp::path("blocks")) .and(warp::path::end()) @@ -1013,7 +1108,7 @@ pub fn serve( */ // POST beacon/blocks - let post_beacon_blinded_blocks = eth1_v1 + let post_beacon_blinded_blocks = eth_v1 .and(warp::path("beacon")) .and(warp::path("blinded_blocks")) .and(warp::path::end()) @@ -1115,7 +1210,7 @@ pub fn serve( )) }); - let beacon_blocks_path_v1 = eth1_v1 + let beacon_blocks_path_v1 = eth_v1 .and(warp::path("beacon")) .and(warp::path("blocks")) .and(block_id_or_err) @@ -1138,10 +1233,11 @@ pub fn serve( chain: Arc>, accept_header: Option| { async move { - let block = block_id.full_block(&chain).await?; + let (block, execution_optimistic) = block_id.full_block(&chain).await?; let fork_name = block .fork_name(&chain.spec) .map_err(inconsistent_fork_rejection)?; + match accept_header { Some(api_types::Accept::Ssz) => Response::builder() .status(200) @@ -1153,8 +1249,13 @@ pub fn serve( e )) }), - _ => fork_versioned_response(endpoint_version, fork_name, block) - .map(|res| warp::reply::json(&res).into_response()), + _ => execution_optimistic_fork_versioned_response( + endpoint_version, + fork_name, + execution_optimistic, + block, + ) + .map(|res| warp::reply::json(&res).into_response()), } .map(|resp| add_consensus_version_header(resp, fork_name)) } @@ -1168,10 +1269,12 @@ pub fn serve( .and(warp::path::end()) .and_then(|block_id: BlockId, chain: Arc>| { blocking_json_task(move || { - block_id - .root(&chain) - .map(api_types::RootData::from) - .map(api_types::GenericResponse::from) + let (block, execution_optimistic) = block_id.blinded_block(&chain)?; + + Ok(api_types::GenericResponse::from(api_types::RootData::from( + block.canonical_root(), + )) + .add_execution_optimistic(execution_optimistic)) }) }); @@ -1182,10 +1285,12 @@ pub fn serve( .and(warp::path::end()) .and_then(|block_id: BlockId, chain: Arc>| { blocking_json_task(move || { - block_id - .blinded_block(&chain) - .map(|block| block.message().body().attestations().clone()) - .map(api_types::GenericResponse::from) + let (block, execution_optimistic) = block_id.blinded_block(&chain)?; + + Ok( + api_types::GenericResponse::from(block.message().body().attestations().clone()) + .add_execution_optimistic(execution_optimistic), + ) }) }); @@ -1193,7 +1298,7 @@ pub fn serve( * beacon/pool */ - let beacon_pool_path = eth1_v1 + let beacon_pool_path = eth_v1 .and(warp::path("beacon")) .and(warp::path("pool")) .and(chain_filter.clone()); @@ -1519,7 +1624,7 @@ pub fn serve( * config */ - let config_path = eth1_v1.and(warp::path("config")); + let config_path = eth_v1.and(warp::path("config")); // GET config/fork_schedule let get_config_fork_schedule = config_path @@ -1593,7 +1698,10 @@ pub fn serve( chain: Arc>| { blocking_task(move || match accept_header { Some(api_types::Accept::Ssz) => { - let state = state_id.state(&chain)?; + // We can ignore the optimistic status for the "fork" since it's a + // specification constant that doesn't change across competing heads of the + // beacon chain. + let (state, _execution_optimistic) = state_id.state(&chain)?; let fork_name = state .fork_name(&chain.spec) .map_err(inconsistent_fork_rejection)?; @@ -1609,44 +1717,71 @@ pub fn serve( )) }) } - _ => state_id.map_state(&chain, |state| { - let fork_name = state - .fork_name(&chain.spec) - .map_err(inconsistent_fork_rejection)?; - let res = fork_versioned_response(endpoint_version, fork_name, &state)?; - Ok(add_consensus_version_header( - warp::reply::json(&res).into_response(), - fork_name, - )) - }), + _ => state_id.map_state_and_execution_optimistic( + &chain, + |state, execution_optimistic| { + let fork_name = state + .fork_name(&chain.spec) + .map_err(inconsistent_fork_rejection)?; + let res = execution_optimistic_fork_versioned_response( + endpoint_version, + fork_name, + execution_optimistic, + &state, + )?; + Ok(add_consensus_version_header( + warp::reply::json(&res).into_response(), + fork_name, + )) + }, + ), }) }, ); // GET debug/beacon/heads - let get_debug_beacon_heads = eth1_v1 + let get_debug_beacon_heads = any_version .and(warp::path("debug")) .and(warp::path("beacon")) .and(warp::path("heads")) .and(warp::path::end()) .and(chain_filter.clone()) - .and_then(|chain: Arc>| { - blocking_json_task(move || { - let heads = chain - .heads() - .into_iter() - .map(|(root, slot)| api_types::ChainHeadData { slot, root }) - .collect::>(); - Ok(api_types::GenericResponse::from(heads)) - }) - }); + .and_then( + |endpoint_version: EndpointVersion, chain: Arc>| { + blocking_json_task(move || { + let heads = chain + .heads() + .into_iter() + .map(|(root, slot)| { + let execution_optimistic = if endpoint_version == V1 { + None + } else if endpoint_version == V2 { + chain + .canonical_head + .fork_choice_read_lock() + .is_optimistic_block(&root) + .ok() + } else { + return Err(unsupported_version_rejection(endpoint_version)); + }; + Ok(api_types::ChainHeadData { + slot, + root, + execution_optimistic, + }) + }) + .collect::, warp::Rejection>>(); + Ok(api_types::GenericResponse::from(heads?)) + }) + }, + ); /* * node */ // GET node/identity - let get_node_identity = eth1_v1 + let get_node_identity = eth_v1 .and(warp::path("node")) .and(warp::path("identity")) .and(warp::path::end()) @@ -1684,7 +1819,7 @@ pub fn serve( }); // GET node/version - let get_node_version = eth1_v1 + let get_node_version = eth_v1 .and(warp::path("node")) .and(warp::path("version")) .and(warp::path::end()) @@ -1697,7 +1832,7 @@ pub fn serve( }); // GET node/syncing - let get_node_syncing = eth1_v1 + let get_node_syncing = eth_v1 .and(warp::path("node")) .and(warp::path("syncing")) .and(warp::path::end()) @@ -1726,7 +1861,7 @@ pub fn serve( ); // GET node/health - let get_node_health = eth1_v1 + let get_node_health = eth_v1 .and(warp::path("node")) .and(warp::path("health")) .and(warp::path::end()) @@ -1751,7 +1886,7 @@ pub fn serve( }); // GET node/peers/{peer_id} - let get_node_peers_by_id = eth1_v1 + let get_node_peers_by_id = eth_v1 .and(warp::path("node")) .and(warp::path("peers")) .and(warp::path::param::()) @@ -1808,7 +1943,7 @@ pub fn serve( ); // GET node/peers - let get_node_peers = eth1_v1 + let get_node_peers = eth_v1 .and(warp::path("node")) .and(warp::path("peers")) .and(warp::path::end()) @@ -1877,7 +2012,7 @@ pub fn serve( ); // GET node/peer_count - let get_node_peer_count = eth1_v1 + let get_node_peer_count = eth_v1 .and(warp::path("node")) .and(warp::path("peer_count")) .and(warp::path::end()) @@ -1918,7 +2053,7 @@ pub fn serve( */ // GET validator/duties/proposer/{epoch} - let get_validator_duties_proposer = eth1_v1 + let get_validator_duties_proposer = eth_v1 .and(warp::path("validator")) .and(warp::path("duties")) .and(warp::path("proposer")) @@ -2061,7 +2196,7 @@ pub fn serve( ); // GET validator/attestation_data?slot,committee_index - let get_validator_attestation_data = eth1_v1 + let get_validator_attestation_data = eth_v1 .and(warp::path("validator")) .and(warp::path("attestation_data")) .and(warp::path::end()) @@ -2093,7 +2228,7 @@ pub fn serve( ); // GET validator/aggregate_attestation?attestation_data_root,slot - let get_validator_aggregate_attestation = eth1_v1 + let get_validator_aggregate_attestation = eth_v1 .and(warp::path("validator")) .and(warp::path("aggregate_attestation")) .and(warp::path::end()) @@ -2125,7 +2260,7 @@ pub fn serve( ); // POST validator/duties/attester/{epoch} - let post_validator_duties_attester = eth1_v1 + let post_validator_duties_attester = eth_v1 .and(warp::path("validator")) .and(warp::path("duties")) .and(warp::path("attester")) @@ -2147,7 +2282,7 @@ pub fn serve( ); // POST validator/duties/sync - let post_validator_duties_sync = eth1_v1 + let post_validator_duties_sync = eth_v1 .and(warp::path("validator")) .and(warp::path("duties")) .and(warp::path("sync")) @@ -2169,7 +2304,7 @@ pub fn serve( ); // GET validator/sync_committee_contribution - let get_validator_sync_committee_contribution = eth1_v1 + let get_validator_sync_committee_contribution = eth_v1 .and(warp::path("validator")) .and(warp::path("sync_committee_contribution")) .and(warp::path::end()) @@ -2192,7 +2327,7 @@ pub fn serve( ); // POST validator/aggregate_and_proofs - let post_validator_aggregate_and_proofs = eth1_v1 + let post_validator_aggregate_and_proofs = eth_v1 .and(warp::path("validator")) .and(warp::path("aggregate_and_proofs")) .and(warp::path::end()) @@ -2292,7 +2427,7 @@ pub fn serve( }, ); - let post_validator_contribution_and_proofs = eth1_v1 + let post_validator_contribution_and_proofs = eth_v1 .and(warp::path("validator")) .and(warp::path("contribution_and_proofs")) .and(warp::path::end()) @@ -2319,7 +2454,7 @@ pub fn serve( ); // POST validator/beacon_committee_subscriptions - let post_validator_beacon_committee_subscriptions = eth1_v1 + let post_validator_beacon_committee_subscriptions = eth_v1 .and(warp::path("validator")) .and(warp::path("beacon_committee_subscriptions")) .and(warp::path::end()) @@ -2359,7 +2494,7 @@ pub fn serve( ); // POST validator/prepare_beacon_proposer - let post_validator_prepare_beacon_proposer = eth1_v1 + let post_validator_prepare_beacon_proposer = eth_v1 .and(warp::path("validator")) .and(warp::path("prepare_beacon_proposer")) .and(warp::path::end()) @@ -2407,7 +2542,7 @@ pub fn serve( ); // POST validator/register_validator - let post_validator_register_validator = eth1_v1 + let post_validator_register_validator = eth_v1 .and(warp::path("validator")) .and(warp::path("register_validator")) .and(warp::path::end()) @@ -2480,7 +2615,7 @@ pub fn serve( }, ); // POST validator/sync_committee_subscriptions - let post_validator_sync_committee_subscriptions = eth1_v1 + let post_validator_sync_committee_subscriptions = eth_v1 .and(warp::path("validator")) .and(warp::path("sync_committee_subscriptions")) .and(warp::path::end()) @@ -2760,7 +2895,8 @@ pub fn serve( .and(chain_filter.clone()) .and_then(|state_id: StateId, chain: Arc>| { blocking_task(move || { - let state = state_id.state(&chain)?; + // This debug endpoint provides no indication of optimistic status. + let (state, _execution_optimistic) = state_id.state(&chain)?; Response::builder() .status(200) .header("Content-Type", "application/ssz") @@ -2899,7 +3035,7 @@ pub fn serve( ))) }); - let get_events = eth1_v1 + let get_events = eth_v1 .and(warp::path("events")) .and(warp::path::end()) .and(multi_key_query::()) diff --git a/beacon_node/http_api/src/proposer_duties.rs b/beacon_node/http_api/src/proposer_duties.rs index bddae55549..13788a07b2 100644 --- a/beacon_node/http_api/src/proposer_duties.rs +++ b/beacon_node/http_api/src/proposer_duties.rs @@ -55,10 +55,16 @@ pub fn proposer_duties( .safe_add(1) .map_err(warp_utils::reject::arith_error)? { - let (proposers, dependent_root, _execution_status, _fork) = + let (proposers, dependent_root, execution_status, _fork) = compute_proposer_duties_from_head(request_epoch, chain) .map_err(warp_utils::reject::beacon_chain_error)?; - convert_to_api_response(chain, request_epoch, dependent_root, proposers) + convert_to_api_response( + chain, + request_epoch, + dependent_root, + execution_status.is_optimistic(), + proposers, + ) } else if request_epoch > current_epoch .safe_add(1) @@ -88,17 +94,18 @@ fn try_proposer_duties_from_cache( request_epoch: Epoch, chain: &BeaconChain, ) -> Result, warp::reject::Rejection> { - let (head_slot, head_block_root, head_decision_root) = { - let head = chain.canonical_head.cached_head(); - let head_block_root = head.head_block_root(); - let decision_root = head - .snapshot - .beacon_state - .proposer_shuffling_decision_root(head_block_root) - .map_err(warp_utils::reject::beacon_state_error)?; - (head.head_slot(), head_block_root, decision_root) - }; - let head_epoch = head_slot.epoch(T::EthSpec::slots_per_epoch()); + let head = chain.canonical_head.cached_head(); + let head_block = &head.snapshot.beacon_block; + let head_block_root = head.head_block_root(); + let head_decision_root = head + .snapshot + .beacon_state + .proposer_shuffling_decision_root(head_block_root) + .map_err(warp_utils::reject::beacon_state_error)?; + let head_epoch = head_block.slot().epoch(T::EthSpec::slots_per_epoch()); + let execution_optimistic = chain + .is_optimistic_head_block(head_block) + .map_err(warp_utils::reject::beacon_chain_error)?; let dependent_root = match head_epoch.cmp(&request_epoch) { // head_epoch == request_epoch @@ -120,7 +127,13 @@ fn try_proposer_duties_from_cache( .get_epoch::(dependent_root, request_epoch) .cloned() .map(|indices| { - convert_to_api_response(chain, request_epoch, dependent_root, indices.to_vec()) + convert_to_api_response( + chain, + request_epoch, + dependent_root, + execution_optimistic, + indices.to_vec(), + ) }) .transpose() } @@ -139,7 +152,7 @@ fn compute_and_cache_proposer_duties( current_epoch: Epoch, chain: &BeaconChain, ) -> Result { - let (indices, dependent_root, _execution_status, fork) = + let (indices, dependent_root, execution_status, fork) = compute_proposer_duties_from_head(current_epoch, chain) .map_err(warp_utils::reject::beacon_chain_error)?; @@ -151,7 +164,13 @@ fn compute_and_cache_proposer_duties( .map_err(BeaconChainError::from) .map_err(warp_utils::reject::beacon_chain_error)?; - convert_to_api_response(chain, current_epoch, dependent_root, indices) + convert_to_api_response( + chain, + current_epoch, + dependent_root, + execution_status.is_optimistic(), + indices, + ) } /// Compute some proposer duties by reading a `BeaconState` from disk, completely ignoring the @@ -162,31 +181,37 @@ fn compute_historic_proposer_duties( ) -> Result { // If the head is quite old then it might still be relevant for a historical request. // - // Use the `with_head` function to read & clone in a single call to avoid race conditions. - let state_opt = chain - .with_head(|head| { - if head.beacon_state.current_epoch() <= epoch { - Ok(Some(( - head.beacon_state_root(), - head.beacon_state - .clone_with(CloneConfig::committee_caches_only()), - ))) - } else { - Ok(None) - } - }) - .map_err(warp_utils::reject::beacon_chain_error)?; - - let state = if let Some((state_root, mut state)) = state_opt { - // If we've loaded the head state it might be from a previous epoch, ensure it's in a - // suitable epoch. - ensure_state_is_in_epoch(&mut state, state_root, epoch, &chain.spec) + // Avoid holding the `cached_head` longer than necessary. + let state_opt = { + let (cached_head, execution_status) = chain + .canonical_head + .head_and_execution_status() .map_err(warp_utils::reject::beacon_chain_error)?; - state - } else { - StateId::slot(epoch.start_slot(T::EthSpec::slots_per_epoch())).state(chain)? + let head = &cached_head.snapshot; + + if head.beacon_state.current_epoch() <= epoch { + Some(( + head.beacon_state_root(), + head.beacon_state + .clone_with(CloneConfig::committee_caches_only()), + execution_status.is_optimistic(), + )) + } else { + None + } }; + let (state, execution_optimistic) = + if let Some((state_root, mut state, execution_optimistic)) = state_opt { + // If we've loaded the head state it might be from a previous epoch, ensure it's in a + // suitable epoch. + ensure_state_is_in_epoch(&mut state, state_root, epoch, &chain.spec) + .map_err(warp_utils::reject::beacon_chain_error)?; + (state, execution_optimistic) + } else { + StateId::from_slot(epoch.start_slot(T::EthSpec::slots_per_epoch())).state(chain)? + }; + // Ensure the state lookup was correct. if state.current_epoch() != epoch { return Err(warp_utils::reject::custom_server_error(format!( @@ -208,7 +233,7 @@ fn compute_historic_proposer_duties( .map_err(BeaconChainError::from) .map_err(warp_utils::reject::beacon_chain_error)?; - convert_to_api_response(chain, epoch, dependent_root, indices) + convert_to_api_response(chain, epoch, dependent_root, execution_optimistic, indices) } /// Converts the internal representation of proposer duties into one that is compatible with the @@ -217,6 +242,7 @@ fn convert_to_api_response( chain: &BeaconChain, epoch: Epoch, dependent_root: Hash256, + execution_optimistic: bool, indices: Vec, ) -> Result { let index_to_pubkey_map = chain @@ -251,6 +277,7 @@ fn convert_to_api_response( } else { Ok(api_types::DutiesResponse { dependent_root, + execution_optimistic: Some(execution_optimistic), data: proposer_data, }) } diff --git a/beacon_node/http_api/src/state_id.rs b/beacon_node/http_api/src/state_id.rs index 8604c91899..af47c242d6 100644 --- a/beacon_node/http_api/src/state_id.rs +++ b/beacon_node/http_api/src/state_id.rs @@ -1,14 +1,17 @@ -use beacon_chain::{BeaconChain, BeaconChainTypes}; +use crate::ExecutionOptimistic; +use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; use eth2::types::StateId as CoreStateId; +use std::fmt; use std::str::FromStr; -use types::{BeaconState, EthSpec, Fork, Hash256, Slot}; +use types::{BeaconState, Checkpoint, EthSpec, Fork, Hash256, Slot}; /// Wraps `eth2::types::StateId` and provides common state-access functionality. E.g., reading /// states or parts of states from the database. -pub struct StateId(CoreStateId); +#[derive(Debug)] +pub struct StateId(pub CoreStateId); impl StateId { - pub fn slot(slot: Slot) -> Self { + pub fn from_slot(slot: Slot) -> Self { Self(CoreStateId::Slot(slot)) } @@ -16,54 +19,128 @@ impl StateId { pub fn root( &self, chain: &BeaconChain, - ) -> Result { - let slot = match &self.0 { - CoreStateId::Head => return Ok(chain.canonical_head.cached_head().head_state_root()), - CoreStateId::Genesis => return Ok(chain.genesis_state_root), - CoreStateId::Finalized => chain - .canonical_head - .cached_head() - .finalized_checkpoint() - .epoch - .start_slot(T::EthSpec::slots_per_epoch()), - CoreStateId::Justified => chain - .canonical_head - .cached_head() - .justified_checkpoint() - .epoch - .start_slot(T::EthSpec::slots_per_epoch()), - CoreStateId::Slot(slot) => *slot, - CoreStateId::Root(root) => return Ok(*root), + ) -> Result<(Hash256, ExecutionOptimistic), warp::Rejection> { + let (slot, execution_optimistic) = match &self.0 { + CoreStateId::Head => { + let (cached_head, execution_status) = chain + .canonical_head + .head_and_execution_status() + .map_err(warp_utils::reject::beacon_chain_error)?; + return Ok(( + cached_head.head_state_root(), + execution_status.is_optimistic(), + )); + } + CoreStateId::Genesis => return Ok((chain.genesis_state_root, false)), + CoreStateId::Finalized => { + let finalized_checkpoint = + chain.canonical_head.cached_head().finalized_checkpoint(); + checkpoint_slot_and_execution_optimistic(chain, finalized_checkpoint)? + } + CoreStateId::Justified => { + let justified_checkpoint = + chain.canonical_head.cached_head().justified_checkpoint(); + checkpoint_slot_and_execution_optimistic(chain, justified_checkpoint)? + } + CoreStateId::Slot(slot) => ( + *slot, + chain + .is_optimistic_head() + .map_err(warp_utils::reject::beacon_chain_error)?, + ), + CoreStateId::Root(root) => { + if let Some(hot_summary) = chain + .store + .load_hot_state_summary(root) + .map_err(BeaconChainError::DBError) + .map_err(warp_utils::reject::beacon_chain_error)? + { + let execution_optimistic = chain + .canonical_head + .fork_choice_read_lock() + .is_optimistic_block_no_fallback(&hot_summary.latest_block_root) + .map_err(BeaconChainError::ForkChoiceError) + .map_err(warp_utils::reject::beacon_chain_error)?; + return Ok((*root, execution_optimistic)); + } else if let Some(_cold_state_slot) = chain + .store + .load_cold_state_slot(root) + .map_err(BeaconChainError::DBError) + .map_err(warp_utils::reject::beacon_chain_error)? + { + let fork_choice = chain.canonical_head.fork_choice_read_lock(); + let finalized_root = fork_choice + .cached_fork_choice_view() + .finalized_checkpoint + .root; + let execution_optimistic = fork_choice + .is_optimistic_block_no_fallback(&finalized_root) + .map_err(BeaconChainError::ForkChoiceError) + .map_err(warp_utils::reject::beacon_chain_error)?; + return Ok((*root, execution_optimistic)); + } else { + return Err(warp_utils::reject::custom_not_found(format!( + "beacon state for state root {}", + root + ))); + } + } }; - chain + let root = chain .state_root_at_slot(slot) .map_err(warp_utils::reject::beacon_chain_error)? .ok_or_else(|| { warp_utils::reject::custom_not_found(format!("beacon state at slot {}", slot)) - }) + })?; + + Ok((root, execution_optimistic)) } /// Return the `fork` field of the state identified by `self`. + /// Also returns the `execution_optimistic` value of the state. + pub fn fork_and_execution_optimistic( + &self, + chain: &BeaconChain, + ) -> Result<(Fork, bool), warp::Rejection> { + self.map_state_and_execution_optimistic(chain, |state, execution_optimistic| { + Ok((state.fork(), execution_optimistic)) + }) + } + + /// Convenience function to compute `fork` when `execution_optimistic` isn't desired. pub fn fork( &self, chain: &BeaconChain, ) -> Result { - self.map_state(chain, |state| Ok(state.fork())) + self.fork_and_execution_optimistic(chain) + .map(|(fork, _)| fork) } /// Return the `BeaconState` identified by `self`. pub fn state( &self, chain: &BeaconChain, - ) -> Result, warp::Rejection> { - let (state_root, slot_opt) = match &self.0 { - CoreStateId::Head => return Ok(chain.head_beacon_state_cloned()), + ) -> Result<(BeaconState, ExecutionOptimistic), warp::Rejection> { + let ((state_root, execution_optimistic), slot_opt) = match &self.0 { + CoreStateId::Head => { + let (cached_head, execution_status) = chain + .canonical_head + .head_and_execution_status() + .map_err(warp_utils::reject::beacon_chain_error)?; + return Ok(( + cached_head + .snapshot + .beacon_state + .clone_with_only_committee_caches(), + execution_status.is_optimistic(), + )); + } CoreStateId::Slot(slot) => (self.root(chain)?, Some(*slot)), _ => (self.root(chain)?, None), }; - chain + let state = chain .get_state(&state_root, slot_opt) .map_err(warp_utils::reject::beacon_chain_error) .and_then(|opt| { @@ -73,13 +150,17 @@ impl StateId { state_root )) }) - }) + })?; + + Ok((state, execution_optimistic)) } + /* /// Map a function across the `BeaconState` identified by `self`. /// /// This function will avoid instantiating/copying a new state when `self` points to the head /// of the chain. + #[allow(dead_code)] pub fn map_state( &self, chain: &BeaconChain, @@ -95,6 +176,36 @@ impl StateId { _ => func(&self.state(chain)?), } } + */ + + /// Functions the same as `map_state` but additionally computes the value of + /// `execution_optimistic` of the state identified by `self`. + /// + /// This is to avoid re-instantiating `state` unnecessarily. + pub fn map_state_and_execution_optimistic( + &self, + chain: &BeaconChain, + func: F, + ) -> Result + where + F: Fn(&BeaconState, bool) -> Result, + { + let (state, execution_optimistic) = match &self.0 { + CoreStateId::Head => { + let (head, execution_status) = chain + .canonical_head + .head_and_execution_status() + .map_err(warp_utils::reject::beacon_chain_error)?; + return func( + &head.snapshot.beacon_state, + execution_status.is_optimistic(), + ); + } + _ => self.state(chain)?, + }; + + func(&state, execution_optimistic) + } } impl FromStr for StateId { @@ -104,3 +215,35 @@ impl FromStr for StateId { CoreStateId::from_str(s).map(Self) } } + +impl fmt::Display for StateId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +/// Returns the first slot of the checkpoint's `epoch` and the execution status of the checkpoint's +/// `root`. +pub fn checkpoint_slot_and_execution_optimistic( + chain: &BeaconChain, + checkpoint: Checkpoint, +) -> Result<(Slot, ExecutionOptimistic), warp::reject::Rejection> { + let slot = checkpoint.epoch.start_slot(T::EthSpec::slots_per_epoch()); + let fork_choice = chain.canonical_head.fork_choice_read_lock(); + let finalized_checkpoint = fork_choice.cached_fork_choice_view().finalized_checkpoint; + + // If the checkpoint is pre-finalization, just use the optimistic status of the finalized + // block. + let root = if checkpoint.epoch < finalized_checkpoint.epoch { + &finalized_checkpoint.root + } else { + &checkpoint.root + }; + + let execution_optimistic = fork_choice + .is_optimistic_block_no_fallback(root) + .map_err(BeaconChainError::ForkChoiceError) + .map_err(warp_utils::reject::beacon_chain_error)?; + + Ok((slot, execution_optimistic)) +} diff --git a/beacon_node/http_api/src/sync_committees.rs b/beacon_node/http_api/src/sync_committees.rs index 3ebc3c4ec8..54a3e075d3 100644 --- a/beacon_node/http_api/src/sync_committees.rs +++ b/beacon_node/http_api/src/sync_committees.rs @@ -22,7 +22,7 @@ use types::{ }; /// The struct that is returned to the requesting HTTP client. -type SyncDuties = api_types::GenericResponse>; +type SyncDuties = api_types::ExecutionOptimisticResponse>; /// Handles a request from the HTTP API for sync committee duties. pub fn sync_committee_duties( @@ -34,14 +34,20 @@ pub fn sync_committee_duties( altair_fork_epoch } else { // Empty response for networks with Altair disabled. - return Ok(convert_to_response(vec![])); + return Ok(convert_to_response(vec![], false)); }; + // Even when computing duties from state, any block roots pulled using the request epoch are + // still dependent on the head. So using `is_optimistic_head` is fine for both cases. + let execution_optimistic = chain + .is_optimistic_head() + .map_err(warp_utils::reject::beacon_chain_error)?; + // Try using the head's sync committees to satisfy the request. This should be sufficient for // the vast majority of requests. Rather than checking if we think the request will succeed in a // way prone to data races, we attempt the request immediately and check the error code. match chain.sync_committee_duties_from_head(request_epoch, request_indices) { - Ok(duties) => return Ok(convert_to_response(duties)), + Ok(duties) => return Ok(convert_to_response(duties, execution_optimistic)), Err(BeaconChainError::SyncDutiesError(BeaconStateError::SyncCommitteeNotKnown { .. })) @@ -60,7 +66,7 @@ pub fn sync_committee_duties( )), e => warp_utils::reject::beacon_chain_error(e), })?; - Ok(convert_to_response(duties)) + Ok(convert_to_response(duties, execution_optimistic)) } /// Slow path for duties: load a state and use it to compute the duties. @@ -117,8 +123,9 @@ fn duties_from_state_load( } } -fn convert_to_response(duties: Vec>) -> SyncDuties { +fn convert_to_response(duties: Vec>, execution_optimistic: bool) -> SyncDuties { api_types::GenericResponse::from(duties.into_iter().flatten().collect::>()) + .add_execution_optimistic(execution_optimistic) } /// Receive sync committee duties, storing them in the pools & broadcasting them. diff --git a/beacon_node/http_api/src/validator_inclusion.rs b/beacon_node/http_api/src/validator_inclusion.rs index 48dfc17ffa..917e85e649 100644 --- a/beacon_node/http_api/src/validator_inclusion.rs +++ b/beacon_node/http_api/src/validator_inclusion.rs @@ -16,7 +16,10 @@ fn end_of_epoch_state( chain: &BeaconChain, ) -> Result, warp::reject::Rejection> { let target_slot = epoch.end_slot(T::EthSpec::slots_per_epoch()); - StateId::slot(target_slot).state(chain) + // The execution status is not returned, any functions which rely upon this method might return + // optimistic information without explicitly declaring so. + let (state, _execution_status) = StateId::from_slot(target_slot).state(chain)?; + Ok(state) } /// Generate an `EpochProcessingSummary` for `state`. diff --git a/beacon_node/http_api/src/version.rs b/beacon_node/http_api/src/version.rs index 854ef0c858..87ba3a4663 100644 --- a/beacon_node/http_api/src/version.rs +++ b/beacon_node/http_api/src/version.rs @@ -1,4 +1,6 @@ -use crate::api_types::{EndpointVersion, ForkVersionedResponse}; +use crate::api_types::{ + EndpointVersion, ExecutionOptimisticForkVersionedResponse, ForkVersionedResponse, +}; use eth2::CONSENSUS_VERSION_HEADER; use serde::Serialize; use types::{ForkName, InconsistentFork}; @@ -25,6 +27,26 @@ pub fn fork_versioned_response( }) } +pub fn execution_optimistic_fork_versioned_response( + endpoint_version: EndpointVersion, + fork_name: ForkName, + execution_optimistic: bool, + data: T, +) -> Result, warp::reject::Rejection> { + let fork_name = if endpoint_version == V1 { + None + } else if endpoint_version == V2 { + Some(fork_name) + } else { + return Err(unsupported_version_rejection(endpoint_version)); + }; + Ok(ExecutionOptimisticForkVersionedResponse { + version: fork_name, + execution_optimistic: Some(execution_optimistic), + data, + }) +} + /// Add the `Eth-Consensus-Version` header to a response. pub fn add_consensus_version_header(reply: T, fork_name: ForkName) -> WithHeader { reply::with_header(reply, CONSENSUS_VERSION_HEADER, fork_name.to_string()) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index b57a87dfca..37c267fd46 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -8,13 +8,15 @@ use environment::null_logger; use eth2::{ mixin::{RequestAccept, ResponseForkName, ResponseOptional}, reqwest::RequestBuilder, - types::*, + types::{BlockId as CoreBlockId, StateId as CoreStateId, *}, BeaconNodeHttpClient, Error, StatusCode, Timeouts, }; use futures::stream::{Stream, StreamExt}; use futures::FutureExt; +use http_api::{BlockId, StateId}; use lighthouse_network::{Enr, EnrExt, PeerId}; use network::NetworkMessage; +use proto_array::ExecutionStatus; use sensitive_url::SensitiveUrl; use slot_clock::SlotClock; use state_processing::per_slot_processing; @@ -25,8 +27,8 @@ use tokio::time::Duration; use tree_hash::TreeHash; use types::application_domain::ApplicationDomain; use types::{ - AggregateSignature, BeaconState, BitList, Domain, EthSpec, Hash256, Keypair, MainnetEthSpec, - RelativeEpoch, SelectionProof, SignedRoot, Slot, + AggregateSignature, BitList, Domain, EthSpec, ExecutionBlockHash, Hash256, Keypair, + MainnetEthSpec, RelativeEpoch, SelectionProof, SignedRoot, Slot, }; type E = MainnetEthSpec; @@ -74,6 +76,19 @@ impl ApiTester { Self::new_from_spec(spec).await } + pub async fn new_with_hard_forks(altair: bool, bellatrix: bool) -> Self { + let mut spec = E::default_spec(); + spec.shard_committee_period = 2; + // Set whether the chain has undergone each hard fork. + if altair { + spec.altair_fork_epoch = Some(Epoch::new(0)); + } + if bellatrix { + spec.bellatrix_fork_epoch = Some(Epoch::new(0)); + } + Self::new_from_spec(spec).await + } + pub async fn new_from_spec(spec: ChainSpec) -> Self { let harness = Arc::new( BeaconChainHarness::builder(MainnetEthSpec) @@ -325,99 +340,43 @@ impl ApiTester { fn interesting_state_ids(&self) -> Vec { let mut ids = vec![ - StateId::Head, - StateId::Genesis, - StateId::Finalized, - StateId::Justified, - StateId::Slot(Slot::new(0)), - StateId::Slot(Slot::new(32)), - StateId::Slot(Slot::from(SKIPPED_SLOTS[0])), - StateId::Slot(Slot::from(SKIPPED_SLOTS[1])), - StateId::Slot(Slot::from(SKIPPED_SLOTS[2])), - StateId::Slot(Slot::from(SKIPPED_SLOTS[3])), - StateId::Root(Hash256::zero()), + StateId(CoreStateId::Head), + StateId(CoreStateId::Genesis), + StateId(CoreStateId::Finalized), + StateId(CoreStateId::Justified), + StateId(CoreStateId::Slot(Slot::new(0))), + StateId(CoreStateId::Slot(Slot::new(32))), + StateId(CoreStateId::Slot(Slot::from(SKIPPED_SLOTS[0]))), + StateId(CoreStateId::Slot(Slot::from(SKIPPED_SLOTS[1]))), + StateId(CoreStateId::Slot(Slot::from(SKIPPED_SLOTS[2]))), + StateId(CoreStateId::Slot(Slot::from(SKIPPED_SLOTS[3]))), + StateId(CoreStateId::Root(Hash256::zero())), ]; - ids.push(StateId::Root( + ids.push(StateId(CoreStateId::Root( self.chain.canonical_head.cached_head().head_state_root(), - )); + ))); ids } fn interesting_block_ids(&self) -> Vec { let mut ids = vec![ - BlockId::Head, - BlockId::Genesis, - BlockId::Finalized, - BlockId::Justified, - BlockId::Slot(Slot::new(0)), - BlockId::Slot(Slot::new(32)), - BlockId::Slot(Slot::from(SKIPPED_SLOTS[0])), - BlockId::Slot(Slot::from(SKIPPED_SLOTS[1])), - BlockId::Slot(Slot::from(SKIPPED_SLOTS[2])), - BlockId::Slot(Slot::from(SKIPPED_SLOTS[3])), - BlockId::Root(Hash256::zero()), + BlockId(CoreBlockId::Head), + BlockId(CoreBlockId::Genesis), + BlockId(CoreBlockId::Finalized), + BlockId(CoreBlockId::Justified), + BlockId(CoreBlockId::Slot(Slot::new(0))), + BlockId(CoreBlockId::Slot(Slot::new(32))), + BlockId(CoreBlockId::Slot(Slot::from(SKIPPED_SLOTS[0]))), + BlockId(CoreBlockId::Slot(Slot::from(SKIPPED_SLOTS[1]))), + BlockId(CoreBlockId::Slot(Slot::from(SKIPPED_SLOTS[2]))), + BlockId(CoreBlockId::Slot(Slot::from(SKIPPED_SLOTS[3]))), + BlockId(CoreBlockId::Root(Hash256::zero())), ]; - ids.push(BlockId::Root( + ids.push(BlockId(CoreBlockId::Root( self.chain.canonical_head.cached_head().head_block_root(), - )); + ))); ids } - - fn get_state(&self, state_id: StateId) -> Option> { - match state_id { - StateId::Head => Some( - self.chain - .head_snapshot() - .beacon_state - .clone_with_only_committee_caches(), - ), - StateId::Genesis => self - .chain - .get_state(&self.chain.genesis_state_root, None) - .unwrap(), - StateId::Finalized => { - let finalized_slot = self - .chain - .canonical_head - .cached_head() - .finalized_checkpoint() - .epoch - .start_slot(E::slots_per_epoch()); - - let root = self - .chain - .state_root_at_slot(finalized_slot) - .unwrap() - .unwrap(); - - self.chain.get_state(&root, Some(finalized_slot)).unwrap() - } - StateId::Justified => { - let justified_slot = self - .chain - .canonical_head - .cached_head() - .justified_checkpoint() - .epoch - .start_slot(E::slots_per_epoch()); - - let root = self - .chain - .state_root_at_slot(justified_slot) - .unwrap() - .unwrap(); - - self.chain.get_state(&root, Some(justified_slot)).unwrap() - } - StateId::Slot(slot) => { - let root = self.chain.state_root_at_slot(slot).unwrap().unwrap(); - - self.chain.get_state(&root, Some(slot)).unwrap() - } - StateId::Root(root) => self.chain.get_state(&root, None).unwrap(), - } - } - pub async fn test_beacon_genesis(self) -> Self { let result = self.client.get_beacon_genesis().await.unwrap().data; @@ -437,39 +396,15 @@ impl ApiTester { for state_id in self.interesting_state_ids() { let result = self .client - .get_beacon_states_root(state_id) + .get_beacon_states_root(state_id.0) .await .unwrap() .map(|res| res.data.root); - let expected = match state_id { - StateId::Head => Some(self.chain.canonical_head.cached_head().head_state_root()), - StateId::Genesis => Some(self.chain.genesis_state_root), - StateId::Finalized => { - let finalized_slot = self - .chain - .canonical_head - .cached_head() - .finalized_checkpoint() - .epoch - .start_slot(E::slots_per_epoch()); - - self.chain.state_root_at_slot(finalized_slot).unwrap() - } - StateId::Justified => { - let justified_slot = self - .chain - .canonical_head - .cached_head() - .justified_checkpoint() - .epoch - .start_slot(E::slots_per_epoch()); - - self.chain.state_root_at_slot(justified_slot).unwrap() - } - StateId::Slot(slot) => self.chain.state_root_at_slot(slot).unwrap(), - StateId::Root(root) => Some(root), - }; + let expected = state_id + .root(&self.chain) + .ok() + .map(|(root, _execution_optimistic)| root); assert_eq!(result, expected, "{:?}", state_id); } @@ -481,12 +416,12 @@ impl ApiTester { for state_id in self.interesting_state_ids() { let result = self .client - .get_beacon_states_fork(state_id) + .get_beacon_states_fork(state_id.0) .await .unwrap() .map(|res| res.data); - let expected = self.get_state(state_id).map(|state| state.fork()); + let expected = state_id.fork(&self.chain).ok(); assert_eq!(result, expected, "{:?}", state_id); } @@ -498,18 +433,20 @@ impl ApiTester { for state_id in self.interesting_state_ids() { let result = self .client - .get_beacon_states_finality_checkpoints(state_id) + .get_beacon_states_finality_checkpoints(state_id.0) .await .unwrap() .map(|res| res.data); - let expected = self - .get_state(state_id) - .map(|state| FinalityCheckpointsData { - previous_justified: state.previous_justified_checkpoint(), - current_justified: state.current_justified_checkpoint(), - finalized: state.finalized_checkpoint(), - }); + let expected = + state_id + .state(&self.chain) + .ok() + .map(|(state, _execution_optimistic)| FinalityCheckpointsData { + previous_justified: state.previous_justified_checkpoint(), + current_justified: state.current_justified_checkpoint(), + finalized: state.finalized_checkpoint(), + }); assert_eq!(result, expected, "{:?}", state_id); } @@ -520,9 +457,9 @@ impl ApiTester { pub async fn test_beacon_states_validator_balances(self) -> Self { for state_id in self.interesting_state_ids() { for validator_indices in self.interesting_validator_indices() { - let state_opt = self.get_state(state_id); + let state_opt = state_id.state(&self.chain).ok(); let validators: Vec = match state_opt.as_ref() { - Some(state) => state.validators().clone().into(), + Some((state, _execution_optimistic)) => state.validators().clone().into(), None => vec![], }; let validator_index_ids = validator_indices @@ -545,7 +482,7 @@ impl ApiTester { let result_index_ids = self .client .get_beacon_states_validator_balances( - state_id, + state_id.0, Some(validator_index_ids.as_slice()), ) .await @@ -554,14 +491,14 @@ impl ApiTester { let result_pubkey_ids = self .client .get_beacon_states_validator_balances( - state_id, + state_id.0, Some(validator_pubkey_ids.as_slice()), ) .await .unwrap() .map(|res| res.data); - let expected = state_opt.map(|state| { + let expected = state_opt.map(|(state, _execution_optimistic)| { let mut validators = Vec::with_capacity(validator_indices.len()); for i in validator_indices { @@ -588,7 +525,10 @@ impl ApiTester { for state_id in self.interesting_state_ids() { for statuses in self.interesting_validator_statuses() { for validator_indices in self.interesting_validator_indices() { - let state_opt = self.get_state(state_id); + let state_opt = state_id + .state(&self.chain) + .ok() + .map(|(state, _execution_optimistic)| state); let validators: Vec = match state_opt.as_ref() { Some(state) => state.validators().clone().into(), None => vec![], @@ -613,7 +553,7 @@ impl ApiTester { let result_index_ids = self .client .get_beacon_states_validators( - state_id, + state_id.0, Some(validator_index_ids.as_slice()), None, ) @@ -624,7 +564,7 @@ impl ApiTester { let result_pubkey_ids = self .client .get_beacon_states_validators( - state_id, + state_id.0, Some(validator_pubkey_ids.as_slice()), None, ) @@ -675,7 +615,10 @@ impl ApiTester { pub async fn test_beacon_states_validator_id(self) -> Self { for state_id in self.interesting_state_ids() { - let state_opt = self.get_state(state_id); + let state_opt = state_id + .state(&self.chain) + .ok() + .map(|(state, _execution_optimistic)| state); let validators = match state_opt.as_ref() { Some(state) => state.validators().clone().into(), None => vec![], @@ -690,7 +633,7 @@ impl ApiTester { for validator_id in validator_ids { let result = self .client - .get_beacon_states_validator_id(state_id, validator_id) + .get_beacon_states_validator_id(state_id.0, validator_id) .await .unwrap() .map(|res| res.data); @@ -727,12 +670,15 @@ impl ApiTester { pub async fn test_beacon_states_committees(self) -> Self { for state_id in self.interesting_state_ids() { - let mut state_opt = self.get_state(state_id); + let mut state_opt = state_id + .state(&self.chain) + .ok() + .map(|(state, _execution_optimistic)| state); let epoch_opt = state_opt.as_ref().map(|state| state.current_epoch()); let results = self .client - .get_beacon_states_committees(state_id, None, None, epoch_opt) + .get_beacon_states_committees(state_id.0, None, None, epoch_opt) .await .unwrap() .map(|res| res.data); @@ -769,37 +715,6 @@ impl ApiTester { self } - fn get_block_root(&self, block_id: BlockId) -> Option { - match block_id { - BlockId::Head => Some(self.chain.canonical_head.cached_head().head_block_root()), - BlockId::Genesis => Some(self.chain.genesis_block_root), - BlockId::Finalized => Some( - self.chain - .canonical_head - .cached_head() - .finalized_checkpoint() - .root, - ), - BlockId::Justified => Some( - self.chain - .canonical_head - .cached_head() - .justified_checkpoint() - .root, - ), - BlockId::Slot(slot) => self - .chain - .block_root_at_slot(slot, WhenSlotSkipped::None) - .unwrap(), - BlockId::Root(root) => Some(root), - } - } - - async fn get_block(&self, block_id: BlockId) -> Option> { - let root = self.get_block_root(block_id)?; - self.chain.get_block(&root).await.unwrap() - } - pub async fn test_beacon_headers_all_slots(self) -> Self { for slot in 0..CHAIN_LENGTH { let slot = Slot::from(slot); @@ -877,14 +792,17 @@ impl ApiTester { for block_id in self.interesting_block_ids() { let result = self .client - .get_beacon_headers_block_id(block_id) + .get_beacon_headers_block_id(block_id.0) .await .unwrap() .map(|res| res.data); - let block_root_opt = self.get_block_root(block_id); + let block_root_opt = block_id + .root(&self.chain) + .ok() + .map(|(root, _execution_optimistic)| root); - if let BlockId::Slot(slot) = block_id { + if let CoreBlockId::Slot(slot) = block_id.0 { if block_root_opt.is_none() { assert!(SKIPPED_SLOTS.contains(&slot.as_u64())); } else { @@ -892,11 +810,11 @@ impl ApiTester { } } - let block_opt = if let Some(root) = block_root_opt { - self.chain.get_block(&root).await.unwrap() - } else { - None - }; + let block_opt = block_id + .full_block(&self.chain) + .await + .ok() + .map(|(block, _execution_optimistic)| block); if block_opt.is_none() && result.is_none() { continue; @@ -934,13 +852,16 @@ impl ApiTester { for block_id in self.interesting_block_ids() { let result = self .client - .get_beacon_blocks_root(block_id) + .get_beacon_blocks_root(block_id.0) .await .unwrap() .map(|res| res.data.root); - let expected = self.get_block_root(block_id); - if let BlockId::Slot(slot) = block_id { + let expected = block_id + .root(&self.chain) + .ok() + .map(|(root, _execution_optimistic)| root); + if let CoreBlockId::Slot(slot) = block_id.0 { if expected.is_none() { assert!(SKIPPED_SLOTS.contains(&slot.as_u64())); } else { @@ -982,9 +903,13 @@ impl ApiTester { pub async fn test_beacon_blocks(self) -> Self { for block_id in self.interesting_block_ids() { - let expected = self.get_block(block_id).await; + let expected = block_id + .full_block(&self.chain) + .await + .ok() + .map(|(block, _execution_optimistic)| block); - if let BlockId::Slot(slot) = block_id { + if let CoreBlockId::Slot(slot) = block_id.0 { if expected.is_none() { assert!(SKIPPED_SLOTS.contains(&slot.as_u64())); } else { @@ -993,10 +918,10 @@ impl ApiTester { } // Check the JSON endpoint. - let json_result = self.client.get_beacon_blocks(block_id).await.unwrap(); + let json_result = self.client.get_beacon_blocks(block_id.0).await.unwrap(); if let (Some(json), Some(expected)) = (&json_result, &expected) { - assert_eq!(json.data, *expected, "{:?}", block_id); + assert_eq!(&json.data, expected.as_ref(), "{:?}", block_id); assert_eq!( json.version, Some(expected.fork_name(&self.chain.spec).unwrap()) @@ -1009,23 +934,28 @@ impl ApiTester { // Check the SSZ endpoint. let ssz_result = self .client - .get_beacon_blocks_ssz(block_id, &self.chain.spec) + .get_beacon_blocks_ssz(block_id.0, &self.chain.spec) .await .unwrap(); - assert_eq!(ssz_result, expected, "{:?}", block_id); + assert_eq!( + ssz_result.as_ref(), + expected.as_ref().map(|b| b.as_ref()), + "{:?}", + block_id + ); // Check that the legacy v1 API still works but doesn't return a version field. - let v1_result = self.client.get_beacon_blocks_v1(block_id).await.unwrap(); + let v1_result = self.client.get_beacon_blocks_v1(block_id.0).await.unwrap(); if let (Some(v1_result), Some(expected)) = (&v1_result, &expected) { assert_eq!(v1_result.version, None); - assert_eq!(v1_result.data, *expected); + assert_eq!(&v1_result.data, expected.as_ref()); } else { assert_eq!(v1_result, None); assert_eq!(expected, None); } // Check that version headers are provided. - let url = self.client.get_beacon_blocks_path(block_id).unwrap(); + let url = self.client.get_beacon_blocks_path(block_id.0).unwrap(); let builders: Vec RequestBuilder> = vec![ |b| b, @@ -1060,17 +990,18 @@ impl ApiTester { for block_id in self.interesting_block_ids() { let result = self .client - .get_beacon_blocks_attestations(block_id) + .get_beacon_blocks_attestations(block_id.0) .await .unwrap() .map(|res| res.data); - let expected = self - .get_block(block_id) - .await - .map(|block| block.message().body().attestations().clone().into()); + let expected = block_id.full_block(&self.chain).await.ok().map( + |(block, _execution_optimistic)| { + block.message().body().attestations().clone().into() + }, + ); - if let BlockId::Slot(slot) = block_id { + if let CoreBlockId::Slot(slot) = block_id.0 { if expected.is_none() { assert!(SKIPPED_SLOTS.contains(&slot.as_u64())); } else { @@ -1473,9 +1404,16 @@ impl ApiTester { pub async fn test_get_debug_beacon_states(self) -> Self { for state_id in self.interesting_state_ids() { - let result_json = self.client.get_debug_beacon_states(state_id).await.unwrap(); + let result_json = self + .client + .get_debug_beacon_states(state_id.0) + .await + .unwrap(); - let mut expected = self.get_state(state_id); + let mut expected = state_id + .state(&self.chain) + .ok() + .map(|(state, _execution_optimistic)| state); expected.as_mut().map(|state| state.drop_all_caches()); if let (Some(json), Some(expected)) = (&result_json, &expected) { @@ -1492,7 +1430,7 @@ impl ApiTester { // Check SSZ API. let result_ssz = self .client - .get_debug_beacon_states_ssz(state_id, &self.chain.spec) + .get_debug_beacon_states_ssz(state_id.0, &self.chain.spec) .await .unwrap(); assert_eq!(result_ssz, expected, "{:?}", state_id); @@ -1500,7 +1438,7 @@ impl ApiTester { // Check legacy v1 API. let result_v1 = self .client - .get_debug_beacon_states_v1(state_id) + .get_debug_beacon_states_v1(state_id.0) .await .unwrap(); @@ -1513,7 +1451,10 @@ impl ApiTester { } // Check that version headers are provided. - let url = self.client.get_debug_beacon_states_path(state_id).unwrap(); + let url = self + .client + .get_debug_beacon_states_path(state_id.0) + .unwrap(); let builders: Vec RequestBuilder> = vec![|b| b, |b| b.accept(Accept::Ssz)]; @@ -1791,6 +1732,7 @@ impl ApiTester { let expected = DutiesResponse { data: expected_duties, + execution_optimistic: Some(false), dependent_root, }; @@ -2391,11 +2333,14 @@ impl ApiTester { for state_id in self.interesting_state_ids() { let result = self .client - .get_lighthouse_beacon_states_ssz(&state_id, &self.chain.spec) + .get_lighthouse_beacon_states_ssz(&state_id.0, &self.chain.spec) .await .unwrap(); - let mut expected = self.get_state(state_id); + let mut expected = state_id + .state(&self.chain) + .ok() + .map(|(state, _execution_optimistic)| state); expected.as_mut().map(|state| state.drop_all_caches()); assert_eq!(result, expected, "{:?}", state_id); @@ -2562,6 +2507,7 @@ impl ApiTester { let expected_block = EventKind::Block(SseBlock { block: block_root, slot: next_slot, + execution_optimistic: false, }); let expected_head = EventKind::Head(SseHead { @@ -2575,6 +2521,7 @@ impl ApiTester { .unwrap() .unwrap(), epoch_transition: true, + execution_optimistic: false, }); let finalized_block_root = self @@ -2593,6 +2540,7 @@ impl ApiTester { block: finalized_block_root, state: finalized_state_root, epoch: Epoch::new(3), + execution_optimistic: false, }); self.client @@ -2621,6 +2569,7 @@ impl ApiTester { new_head_block: self.reorg_block.canonical_root(), new_head_state: self.reorg_block.state_root(), epoch: self.next_block.slot().epoch(E::slots_per_epoch()), + execution_optimistic: false, }); self.client @@ -2687,6 +2636,7 @@ impl ApiTester { let expected_block = EventKind::Block(SseBlock { block: block_root, slot: next_slot, + execution_optimistic: false, }); let expected_head = EventKind::Head(SseHead { @@ -2696,6 +2646,7 @@ impl ApiTester { current_duty_dependent_root: self.chain.genesis_block_root, previous_duty_dependent_root: self.chain.genesis_block_root, epoch_transition: false, + execution_optimistic: false, }); self.client @@ -2708,6 +2659,40 @@ impl ApiTester { self } + + pub async fn test_check_optimistic_responses(&mut self) { + // Check responses are not optimistic. + let result = self + .client + .get_beacon_headers_block_id(CoreBlockId::Head) + .await + .unwrap() + .unwrap(); + + assert_eq!(result.execution_optimistic, Some(false)); + + // Change head to be optimistic. + self.chain + .canonical_head + .fork_choice_write_lock() + .proto_array_mut() + .core_proto_array_mut() + .nodes + .last_mut() + .map(|head_node| { + head_node.execution_status = ExecutionStatus::Optimistic(ExecutionBlockHash::zero()) + }); + + // Check responses are now optimistic. + let result = self + .client + .get_beacon_headers_block_id(CoreBlockId::Head) + .await + .unwrap() + .unwrap(); + + assert_eq!(result.execution_optimistic, Some(true)); + } } async fn poll_events, eth2::Error>> + Unpin, T: EthSpec>( @@ -3105,3 +3090,11 @@ async fn lighthouse_endpoints() { .test_post_lighthouse_liveness() .await; } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn optimistic_responses() { + ApiTester::new_with_hard_forks(true, true) + .await + .test_check_optimistic_responses() + .await; +} diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index e66cee6fde..c4b4a64a05 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1317,7 +1317,7 @@ impl, Cold: ItemStore> HotColdDB } /// Load a frozen state's slot, given its root. - fn load_cold_state_slot(&self, state_root: &Hash256) -> Result, Error> { + pub fn load_cold_state_slot(&self, state_root: &Hash256) -> Result, Error> { Ok(self .cold_db .get(state_root)? @@ -1583,7 +1583,7 @@ fn no_state_root_iter() -> Option Result>, Error> { + ) -> Result>, Error> { let mut path = self.eth_path(V1)?; path.path_segments_mut() @@ -351,7 +351,7 @@ impl BeaconNodeHttpClient { pub async fn get_beacon_states_fork( &self, state_id: StateId, - ) -> Result>, Error> { + ) -> Result>, Error> { let mut path = self.eth_path(V1)?; path.path_segments_mut() @@ -370,7 +370,7 @@ impl BeaconNodeHttpClient { pub async fn get_beacon_states_finality_checkpoints( &self, state_id: StateId, - ) -> Result>, Error> { + ) -> Result>, Error> { let mut path = self.eth_path(V1)?; path.path_segments_mut() @@ -390,7 +390,7 @@ impl BeaconNodeHttpClient { &self, state_id: StateId, ids: Option<&[ValidatorId]>, - ) -> Result>>, Error> { + ) -> Result>>, Error> { let mut path = self.eth_path(V1)?; path.path_segments_mut() @@ -420,7 +420,7 @@ impl BeaconNodeHttpClient { state_id: StateId, ids: Option<&[ValidatorId]>, statuses: Option<&[ValidatorStatus]>, - ) -> Result>>, Error> { + ) -> Result>>, Error> { let mut path = self.eth_path(V1)?; path.path_segments_mut() @@ -460,7 +460,7 @@ impl BeaconNodeHttpClient { slot: Option, index: Option, epoch: Option, - ) -> Result>>, Error> { + ) -> Result>>, Error> { let mut path = self.eth_path(V1)?; path.path_segments_mut() @@ -493,7 +493,7 @@ impl BeaconNodeHttpClient { &self, state_id: StateId, epoch: Option, - ) -> Result, Error> { + ) -> Result, Error> { let mut path = self.eth_path(V1)?; path.path_segments_mut() @@ -518,7 +518,7 @@ impl BeaconNodeHttpClient { &self, state_id: StateId, validator_id: &ValidatorId, - ) -> Result>, Error> { + ) -> Result>, Error> { let mut path = self.eth_path(V1)?; path.path_segments_mut() @@ -539,7 +539,7 @@ impl BeaconNodeHttpClient { &self, slot: Option, parent_root: Option, - ) -> Result>>, Error> { + ) -> Result>>, Error> { let mut path = self.eth_path(V1)?; path.path_segments_mut() @@ -566,7 +566,7 @@ impl BeaconNodeHttpClient { pub async fn get_beacon_headers_block_id( &self, block_id: BlockId, - ) -> Result>, Error> { + ) -> Result>, Error> { let mut path = self.eth_path(V1)?; path.path_segments_mut() @@ -635,7 +635,7 @@ impl BeaconNodeHttpClient { pub async fn get_beacon_blocks( &self, block_id: BlockId, - ) -> Result>>, Error> { + ) -> Result>>, Error> { let path = self.get_beacon_blocks_path(block_id)?; let response = match self.get_response(path, |b| b).await.optional()? { Some(res) => res, @@ -644,20 +644,31 @@ impl BeaconNodeHttpClient { // If present, use the fork provided in the headers to decode the block. Gracefully handle // missing and malformed fork names by falling back to regular deserialisation. - let (block, version) = match response.fork_name_from_header() { + let (block, version, execution_optimistic) = match response.fork_name_from_header() { Ok(Some(fork_name)) => { - map_fork_name_with!(fork_name, SignedBeaconBlock, { - let ForkVersionedResponse { version, data } = response.json().await?; - (data, version) - }) + let (data, (version, execution_optimistic)) = + map_fork_name_with!(fork_name, SignedBeaconBlock, { + let ExecutionOptimisticForkVersionedResponse { + version, + execution_optimistic, + data, + } = response.json().await?; + (data, (version, execution_optimistic)) + }); + (data, version, execution_optimistic) } Ok(None) | Err(_) => { - let ForkVersionedResponse { version, data } = response.json().await?; - (data, version) + let ExecutionOptimisticForkVersionedResponse { + version, + execution_optimistic, + data, + } = response.json().await?; + (data, version, execution_optimistic) } }; - Ok(Some(ForkVersionedResponse { + Ok(Some(ExecutionOptimisticForkVersionedResponse { version, + execution_optimistic, data: block, })) } @@ -702,7 +713,7 @@ impl BeaconNodeHttpClient { pub async fn get_beacon_blocks_root( &self, block_id: BlockId, - ) -> Result>, Error> { + ) -> Result>, Error> { let mut path = self.eth_path(V1)?; path.path_segments_mut() @@ -721,7 +732,7 @@ impl BeaconNodeHttpClient { pub async fn get_beacon_blocks_attestations( &self, block_id: BlockId, - ) -> Result>>>, Error> { + ) -> Result>>>, Error> { let mut path = self.eth_path(V1)?; path.path_segments_mut() @@ -1123,7 +1134,7 @@ impl BeaconNodeHttpClient { pub async fn get_debug_beacon_states( &self, state_id: StateId, - ) -> Result>>, Error> { + ) -> Result>>, Error> { let path = self.get_debug_beacon_states_path(state_id)?; self.get_opt(path).await } @@ -1132,7 +1143,7 @@ impl BeaconNodeHttpClient { pub async fn get_debug_beacon_states_v1( &self, state_id: StateId, - ) -> Result>>, Error> { + ) -> Result>>, Error> { let mut path = self.eth_path(V1)?; path.path_segments_mut() @@ -1160,9 +1171,24 @@ impl BeaconNodeHttpClient { .transpose() } - /// `GET debug/beacon/heads` + /// `GET v2/debug/beacon/heads` pub async fn get_debug_beacon_heads( &self, + ) -> Result>, Error> { + let mut path = self.eth_path(V2)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("debug") + .push("beacon") + .push("heads"); + + self.get(path).await + } + + /// `GET v1/debug/beacon/heads` (LEGACY) + pub async fn get_debug_beacon_heads_v1( + &self, ) -> Result>, Error> { let mut path = self.eth_path(V1)?; @@ -1494,7 +1520,7 @@ impl BeaconNodeHttpClient { &self, epoch: Epoch, indices: &[u64], - ) -> Result>, Error> { + ) -> Result>, Error> { let mut path = self.eth_path(V1)?; path.path_segments_mut() diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 8ef3582268..c78e2c6919 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -189,6 +189,14 @@ impl fmt::Display for StateId { #[serde(bound = "T: Serialize + serde::de::DeserializeOwned")] pub struct DutiesResponse { pub dependent_root: Hash256, + pub execution_optimistic: Option, + pub data: T, +} + +#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] +#[serde(bound = "T: Serialize + serde::de::DeserializeOwned")] +pub struct ExecutionOptimisticResponse { + pub execution_optimistic: Option, pub data: T, } @@ -204,6 +212,18 @@ impl From for GenericResponse } } +impl GenericResponse { + pub fn add_execution_optimistic( + self, + execution_optimistic: bool, + ) -> ExecutionOptimisticResponse { + ExecutionOptimisticResponse { + execution_optimistic: Some(execution_optimistic), + data: self.data, + } + } +} + #[derive(Debug, PartialEq, Clone, Serialize)] #[serde(bound = "T: Serialize")] pub struct GenericResponseRef<'a, T: Serialize> { @@ -216,6 +236,14 @@ impl<'a, T: Serialize> From<&'a T> for GenericResponseRef<'a, T> { } } +#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] +pub struct ExecutionOptimisticForkVersionedResponse { + #[serde(skip_serializing_if = "Option::is_none")] + pub version: Option, + pub execution_optimistic: Option, + pub data: T, +} + #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] pub struct ForkVersionedResponse { #[serde(skip_serializing_if = "Option::is_none")] @@ -495,6 +523,8 @@ pub struct DepositContractData { pub struct ChainHeadData { pub slot: Slot, pub root: Hash256, + #[serde(skip_serializing_if = "Option::is_none")] + pub execution_optimistic: Option, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] @@ -794,6 +824,7 @@ pub struct PeerCount { pub struct SseBlock { pub slot: Slot, pub block: Hash256, + pub execution_optimistic: bool, } #[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] @@ -801,6 +832,7 @@ pub struct SseFinalizedCheckpoint { pub block: Hash256, pub state: Hash256, pub epoch: Epoch, + pub execution_optimistic: bool, } #[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] @@ -811,6 +843,7 @@ pub struct SseHead { pub current_duty_dependent_root: Hash256, pub previous_duty_dependent_root: Hash256, pub epoch_transition: bool, + pub execution_optimistic: bool, } #[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] @@ -823,6 +856,7 @@ pub struct SseChainReorg { pub new_head_block: Hash256, pub new_head_state: Hash256, pub epoch: Epoch, + pub execution_optimistic: bool, } #[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] @@ -837,6 +871,7 @@ pub struct SseLateHead { pub observed_delay: Option, pub imported_delay: Option, pub set_as_head_delay: Option, + pub execution_optimistic: bool, } #[derive(PartialEq, Debug, Serialize, Clone)] diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index d06d52235f..984eeaada5 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -1175,6 +1175,12 @@ where &self.proto_array } + /// Returns a mutable reference to `proto_array`. + /// Should only be used in testing. + pub fn proto_array_mut(&mut self) -> &mut ProtoArrayForkChoice { + &mut self.proto_array + } + /// Returns a reference to the underlying `fc_store`. pub fn fc_store(&self) -> &T { &self.fc_store