diff --git a/Cargo.lock b/Cargo.lock index 4046537ef6..bc235ad2f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1074,8 +1074,10 @@ dependencies = [ "lazy_static", "lighthouse_metrics", "lighthouse_network", + "logging", "monitoring_api", "network", + "operation_pool", "parking_lot 0.12.1", "sensitive_url", "serde", @@ -1085,6 +1087,7 @@ dependencies = [ "slasher_service", "slog", "slot_clock", + "state_processing", "store", "task_executor", "time 0.3.17", @@ -3238,6 +3241,7 @@ dependencies = [ "logging", "lru 0.7.8", "network", + "operation_pool", "parking_lot 0.12.1", "proto_array", "safe_arith", @@ -5042,6 +5046,7 @@ dependencies = [ "lru_cache", "matches", "num_cpus", + "operation_pool", "rand 0.8.5", "rlp", "slog", @@ -5377,6 +5382,7 @@ dependencies = [ "lighthouse_metrics", "maplit", "parking_lot 0.12.1", + "rand 0.8.5", "rayon", "serde", "serde_derive", diff --git a/Makefile b/Makefile index e7628733cc..458ceefc70 100644 --- a/Makefile +++ b/Makefile @@ -167,7 +167,8 @@ lint: -A clippy::from-over-into \ -A clippy::upper-case-acronyms \ -A clippy::vec-init-then-push \ - -A clippy::question-mark + -A clippy::question-mark \ + -A clippy::uninlined-format-args nightly-lint: cp .github/custom/clippy.toml . diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index b60ce7efe5..04f601fad9 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -27,6 +27,11 @@ //! ▼ //! impl VerifiedAttestation //! ``` + +// Ignore this lint for `AttestationSlashInfo` which is of comparable size to the non-error types it +// is returned alongside. +#![allow(clippy::result_large_err)] + mod batch; use crate::{ diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index beabd898de..221d380a86 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -73,7 +73,7 @@ use fork_choice::{ use futures::channel::mpsc::Sender; use itertools::process_results; use itertools::Itertools; -use operation_pool::{AttestationRef, OperationPool, PersistedOperationPool}; +use operation_pool::{AttestationRef, OperationPool, PersistedOperationPool, ReceivedPreCapella}; use parking_lot::{Mutex, RwLock}; use proto_array::{CountUnrealizedFull, DoNotReOrg, ProposerHeadError}; use safe_arith::SafeArith; @@ -1020,7 +1020,9 @@ impl BeaconChain { .ok_or(Error::ExecutionLayerMissing)? .get_payload_by_block_hash(exec_block_hash, fork) .await - .map_err(|e| Error::ExecutionLayerErrorPayloadReconstruction(exec_block_hash, e))? + .map_err(|e| { + Error::ExecutionLayerErrorPayloadReconstruction(exec_block_hash, Box::new(e)) + })? .ok_or(Error::BlockHashMissingFromExecutionLayer(exec_block_hash))?; //FIXME(sean) avoid the clone by comparing refs to headers (`as_execution_payload_header` method ?) @@ -1040,8 +1042,6 @@ impl BeaconChain { return Err(Error::InconsistentPayloadReconstructed { slot: blinded_block.slot(), exec_block_hash, - canonical_payload_root: execution_payload_header.tree_hash_root(), - reconstructed_payload_root: header_from_payload.tree_hash_root(), canonical_transactions_root: execution_payload_header.transactions_root(), reconstructed_transactions_root: header_from_payload.transactions_root(), }); @@ -2375,10 +2375,11 @@ impl BeaconChain { pub fn import_bls_to_execution_change( &self, bls_to_execution_change: SigVerifiedOp, + received_pre_capella: ReceivedPreCapella, ) -> bool { if self.eth1_chain.is_some() { self.op_pool - .insert_bls_to_execution_change(bls_to_execution_change) + .insert_bls_to_execution_change(bls_to_execution_change, received_pre_capella) } else { false } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 4cb53a639b..fb1d79e965 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -46,6 +46,11 @@ //! END //! //! ``` + +// Ignore this lint for `BlockSlashInfo` which is of comparable size to the non-error types it is +// returned alongside. +#![allow(clippy::result_large_err)] + use crate::blob_verification::{ validate_blob_for_gossip, AsBlock, AvailableBlock, BlobError, BlockWrapper, IntoAvailableBlock, IntoBlockWrapper, diff --git a/beacon_node/beacon_chain/src/capella_readiness.rs b/beacon_node/beacon_chain/src/capella_readiness.rs new file mode 100644 index 0000000000..b156321058 --- /dev/null +++ b/beacon_node/beacon_chain/src/capella_readiness.rs @@ -0,0 +1,135 @@ +//! Provides tools for checking if a node is ready for the Capella upgrade and following merge +//! transition. + +use crate::{BeaconChain, BeaconChainTypes}; +use execution_layer::http::{ + ENGINE_FORKCHOICE_UPDATED_V2, ENGINE_GET_PAYLOAD_V2, ENGINE_NEW_PAYLOAD_V2, +}; +use serde::{Deserialize, Serialize}; +use std::fmt; +use std::time::Duration; +use types::*; + +/// The time before the Capella fork when we will start issuing warnings about preparation. +use super::merge_readiness::SECONDS_IN_A_WEEK; +pub const CAPELLA_READINESS_PREPARATION_SECONDS: u64 = SECONDS_IN_A_WEEK * 2; +pub const ENGINE_CAPABILITIES_REFRESH_INTERVAL: u64 = 300; + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +#[serde(tag = "type")] +pub enum CapellaReadiness { + /// The execution engine is capella-enabled (as far as we can tell) + Ready, + /// The EL can be reached and has the correct configuration, however it's not yet synced. + NotSynced, + /// We are connected to an execution engine which doesn't support the V2 engine api methods + V2MethodsNotSupported { error: String }, + /// The transition configuration with the EL failed, there might be a problem with + /// connectivity, authentication or a difference in configuration. + ExchangeCapabilitiesFailed { error: String }, + /// The user has not configured an execution endpoint + NoExecutionEndpoint, +} + +impl fmt::Display for CapellaReadiness { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + CapellaReadiness::Ready => { + write!(f, "This node appears ready for Capella.") + } + CapellaReadiness::ExchangeCapabilitiesFailed { error } => write!( + f, + "Could not exchange capabilities with the \ + execution endpoint: {}", + error + ), + CapellaReadiness::NotSynced => write!( + f, + "The execution endpoint is connected and configured, \ + however it is not yet synced" + ), + CapellaReadiness::NoExecutionEndpoint => write!( + f, + "The --execution-endpoint flag is not specified, this is a \ + requirement post-merge" + ), + CapellaReadiness::V2MethodsNotSupported { error } => write!( + f, + "The execution endpoint does not appear to support \ + the required engine api methods for Capella: {}", + error + ), + } + } +} + +impl BeaconChain { + /// Returns `true` if capella epoch is set and Capella fork has occurred or will + /// occur within `CAPELLA_READINESS_PREPARATION_SECONDS` + pub fn is_time_to_prepare_for_capella(&self, current_slot: Slot) -> bool { + if let Some(capella_epoch) = self.spec.capella_fork_epoch { + let capella_slot = capella_epoch.start_slot(T::EthSpec::slots_per_epoch()); + let capella_readiness_preparation_slots = + CAPELLA_READINESS_PREPARATION_SECONDS / self.spec.seconds_per_slot; + // Return `true` if Capella has happened or is within the preparation time. + current_slot + capella_readiness_preparation_slots > capella_slot + } else { + // The Capella fork epoch has not been defined yet, no need to prepare. + false + } + } + + /// Attempts to connect to the EL and confirm that it is ready for capella. + pub async fn check_capella_readiness(&self) -> CapellaReadiness { + if let Some(el) = self.execution_layer.as_ref() { + match el + .get_engine_capabilities(Some(Duration::from_secs( + ENGINE_CAPABILITIES_REFRESH_INTERVAL, + ))) + .await + { + Err(e) => { + // The EL was either unreachable or responded with an error + CapellaReadiness::ExchangeCapabilitiesFailed { + error: format!("{:?}", e), + } + } + Ok(capabilities) => { + let mut missing_methods = String::from("Required Methods Unsupported:"); + let mut all_good = true; + if !capabilities.get_payload_v2 { + missing_methods.push(' '); + missing_methods.push_str(ENGINE_GET_PAYLOAD_V2); + all_good = false; + } + if !capabilities.forkchoice_updated_v2 { + missing_methods.push(' '); + missing_methods.push_str(ENGINE_FORKCHOICE_UPDATED_V2); + all_good = false; + } + if !capabilities.new_payload_v2 { + missing_methods.push(' '); + missing_methods.push_str(ENGINE_NEW_PAYLOAD_V2); + all_good = false; + } + + if all_good { + if !el.is_synced_for_notifier().await { + // The EL is not synced. + CapellaReadiness::NotSynced + } else { + CapellaReadiness::Ready + } + } else { + CapellaReadiness::V2MethodsNotSupported { + error: missing_methods, + } + } + } + } + } else { + CapellaReadiness::NoExecutionEndpoint + } + } +} diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 1f727c3e17..28e6bf7a6d 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -143,13 +143,11 @@ pub enum BeaconChainError { BuilderMissing, ExecutionLayerMissing, BlockVariantLacksExecutionPayload(Hash256), - ExecutionLayerErrorPayloadReconstruction(ExecutionBlockHash, execution_layer::Error), + ExecutionLayerErrorPayloadReconstruction(ExecutionBlockHash, Box), BlockHashMissingFromExecutionLayer(ExecutionBlockHash), InconsistentPayloadReconstructed { slot: Slot, exec_block_hash: ExecutionBlockHash, - canonical_payload_root: Hash256, - reconstructed_payload_root: Hash256, canonical_transactions_root: Hash256, reconstructed_transactions_root: Hash256, }, diff --git a/beacon_node/beacon_chain/src/fork_choice_signal.rs b/beacon_node/beacon_chain/src/fork_choice_signal.rs index fd92de661d..f5424d417e 100644 --- a/beacon_node/beacon_chain/src/fork_choice_signal.rs +++ b/beacon_node/beacon_chain/src/fork_choice_signal.rs @@ -43,7 +43,7 @@ impl ForkChoiceSignalTx { /// /// Return an error if the provided `slot` is strictly less than any previously provided slot. pub fn notify_fork_choice_complete(&self, slot: Slot) -> Result<(), BeaconChainError> { - let &(ref lock, ref condvar) = &*self.pair; + let (lock, condvar) = &*self.pair; let mut current_slot = lock.lock(); @@ -72,7 +72,7 @@ impl Default for ForkChoiceSignalTx { impl ForkChoiceSignalRx { pub fn wait_for_fork_choice(&self, slot: Slot, timeout: Duration) -> ForkChoiceWaitResult { - let &(ref lock, ref condvar) = &*self.pair; + let (lock, condvar) = &*self.pair; let mut current_slot = lock.lock(); diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index e09cbdf310..86a5fded5c 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -12,6 +12,7 @@ mod block_times_cache; mod block_verification; pub mod builder; pub mod canonical_head; +pub mod capella_readiness; pub mod chain_config; mod early_attester_cache; mod errors; diff --git a/beacon_node/beacon_chain/src/merge_readiness.rs b/beacon_node/beacon_chain/src/merge_readiness.rs index 4ef2102fd5..c66df39eed 100644 --- a/beacon_node/beacon_chain/src/merge_readiness.rs +++ b/beacon_node/beacon_chain/src/merge_readiness.rs @@ -8,7 +8,7 @@ use std::fmt::Write; use types::*; /// The time before the Bellatrix fork when we will start issuing warnings about preparation. -const SECONDS_IN_A_WEEK: u64 = 604800; +pub const SECONDS_IN_A_WEEK: u64 = 604800; pub const MERGE_READINESS_PREPARATION_SECONDS: u64 = SECONDS_IN_A_WEEK * 2; #[derive(Default, Debug, Serialize, Deserialize)] diff --git a/beacon_node/beacon_chain/src/schema_change.rs b/beacon_node/beacon_chain/src/schema_change.rs index 8684bafe2d..35202a3c5d 100644 --- a/beacon_node/beacon_chain/src/schema_change.rs +++ b/beacon_node/beacon_chain/src/schema_change.rs @@ -2,6 +2,7 @@ mod migration_schema_v12; mod migration_schema_v13; mod migration_schema_v14; +mod migration_schema_v15; use crate::beacon_chain::{BeaconChainTypes, ETH1_CACHE_DB_KEY}; use crate::eth1_chain::SszEth1; @@ -123,6 +124,14 @@ pub fn migrate_schema( let ops = migration_schema_v14::downgrade_from_v14::(db.clone(), log)?; db.store_schema_version_atomically(to, ops) } + (SchemaVersion(14), SchemaVersion(15)) => { + let ops = migration_schema_v15::upgrade_to_v15::(db.clone(), log)?; + db.store_schema_version_atomically(to, ops) + } + (SchemaVersion(15), SchemaVersion(14)) => { + let ops = migration_schema_v15::downgrade_from_v15::(db.clone(), log)?; + db.store_schema_version_atomically(to, ops) + } // Anything else is an error. (_, _) => Err(HotColdDBError::UnsupportedSchemaVersion { target_version: to, diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v15.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v15.rs new file mode 100644 index 0000000000..f4adc2cf4d --- /dev/null +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v15.rs @@ -0,0 +1,78 @@ +use crate::beacon_chain::{BeaconChainTypes, OP_POOL_DB_KEY}; +use operation_pool::{ + PersistedOperationPool, PersistedOperationPoolV14, PersistedOperationPoolV15, +}; +use slog::{debug, info, Logger}; +use std::sync::Arc; +use store::{Error, HotColdDB, KeyValueStoreOp, StoreItem}; + +pub fn upgrade_to_v15( + db: Arc>, + log: Logger, +) -> Result, Error> { + // Load a V14 op pool and transform it to V15. + let PersistedOperationPoolV14:: { + attestations, + sync_contributions, + attester_slashings, + proposer_slashings, + voluntary_exits, + bls_to_execution_changes, + } = if let Some(op_pool_v14) = db.get_item(&OP_POOL_DB_KEY)? { + op_pool_v14 + } else { + debug!(log, "Nothing to do, no operation pool stored"); + return Ok(vec![]); + }; + + let v15 = PersistedOperationPool::V15(PersistedOperationPoolV15 { + attestations, + sync_contributions, + attester_slashings, + proposer_slashings, + voluntary_exits, + bls_to_execution_changes, + // Initialize with empty set + capella_bls_change_broadcast_indices: <_>::default(), + }); + Ok(vec![v15.as_kv_store_op(OP_POOL_DB_KEY)]) +} + +pub fn downgrade_from_v15( + db: Arc>, + log: Logger, +) -> Result, Error> { + // Load a V15 op pool and transform it to V14. + let PersistedOperationPoolV15 { + attestations, + sync_contributions, + attester_slashings, + proposer_slashings, + voluntary_exits, + bls_to_execution_changes, + capella_bls_change_broadcast_indices, + } = if let Some(PersistedOperationPool::::V15(op_pool)) = + db.get_item(&OP_POOL_DB_KEY)? + { + op_pool + } else { + debug!(log, "Nothing to do, no operation pool stored"); + return Ok(vec![]); + }; + + info!( + log, + "Forgetting address changes for Capella broadcast"; + "count" => capella_bls_change_broadcast_indices.len(), + ); + + let v14 = PersistedOperationPoolV14 { + attestations, + sync_contributions, + attester_slashings, + proposer_slashings, + voluntary_exits, + bls_to_execution_changes, + }; + Ok(vec![v14.as_kv_store_op(OP_POOL_DB_KEY)]) +} diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 37cf488413..93f42a4c56 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -316,6 +316,26 @@ where self } + /// Initializes the BLS withdrawal keypairs for `num_keypairs` validators to + /// the "determistic" values, regardless of wether or not the validator has + /// a BLS or execution address in the genesis deposits. + /// + /// This aligns with the withdrawal commitments used in the "interop" + /// genesis states. + pub fn deterministic_withdrawal_keypairs(self, num_keypairs: usize) -> Self { + self.withdrawal_keypairs( + types::test_utils::generate_deterministic_keypairs(num_keypairs) + .into_iter() + .map(Option::Some) + .collect(), + ) + } + + pub fn withdrawal_keypairs(mut self, withdrawal_keypairs: Vec>) -> Self { + self.withdrawal_keypairs = withdrawal_keypairs; + self + } + pub fn default_spec(self) -> Self { self.spec_or_default(None) } @@ -376,7 +396,6 @@ where .collect::>() .unwrap(); - let spec = MainnetEthSpec::default_spec(); let config = execution_layer::Config { execution_endpoints: urls, secret_files: vec![], @@ -387,7 +406,6 @@ where config, self.runtime.task_executor.clone(), self.log.clone(), - &spec, ) .unwrap(); @@ -429,6 +447,7 @@ where DEFAULT_TERMINAL_BLOCK, shanghai_time, eip4844_time, + None, Some(JwtKey::from_slice(&DEFAULT_JWT_SECRET).unwrap()), spec, None, @@ -438,7 +457,11 @@ where self } - pub fn mock_execution_layer_with_builder(mut self, beacon_url: SensitiveUrl) -> Self { + pub fn mock_execution_layer_with_builder( + mut self, + beacon_url: SensitiveUrl, + builder_threshold: Option, + ) -> Self { // Get a random unused port let port = unused_port::unused_tcp_port().unwrap(); let builder_url = SensitiveUrl::parse(format!("http://127.0.0.1:{port}").as_str()).unwrap(); @@ -455,6 +478,7 @@ where DEFAULT_TERMINAL_BLOCK, shanghai_time, eip4844_time, + builder_threshold, Some(JwtKey::from_slice(&DEFAULT_JWT_SECRET).unwrap()), spec.clone(), Some(builder_url.clone()), diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index d01f2505cc..9a49843a9f 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -6,6 +6,10 @@ edition = "2021" [dev-dependencies] serde_yaml = "0.8.13" +logging = { path = "../../common/logging" } +state_processing = { path = "../../consensus/state_processing" } +operation_pool = { path = "../operation_pool" } +tokio = "1.14.0" [dependencies] beacon_chain = { path = "../beacon_chain" } diff --git a/beacon_node/client/src/address_change_broadcast.rs b/beacon_node/client/src/address_change_broadcast.rs new file mode 100644 index 0000000000..272ee908fb --- /dev/null +++ b/beacon_node/client/src/address_change_broadcast.rs @@ -0,0 +1,322 @@ +use crate::*; +use lighthouse_network::PubsubMessage; +use network::NetworkMessage; +use slog::{debug, info, warn, Logger}; +use slot_clock::SlotClock; +use std::cmp; +use std::collections::HashSet; +use std::mem; +use std::time::Duration; +use tokio::sync::mpsc::UnboundedSender; +use tokio::time::sleep; +use types::EthSpec; + +/// The size of each chunk of addresses changes to be broadcast at the Capella +/// fork. +const BROADCAST_CHUNK_SIZE: usize = 128; +/// The delay between broadcasting each chunk. +const BROADCAST_CHUNK_DELAY: Duration = Duration::from_millis(500); + +/// If the Capella fork has already been reached, `broadcast_address_changes` is +/// called immediately. +/// +/// If the Capella fork has not been reached, waits until the start of the fork +/// epoch and then calls `broadcast_address_changes`. +pub async fn broadcast_address_changes_at_capella( + chain: &BeaconChain, + network_send: UnboundedSender>, + log: &Logger, +) { + let spec = &chain.spec; + let slot_clock = &chain.slot_clock; + + let capella_fork_slot = if let Some(epoch) = spec.capella_fork_epoch { + epoch.start_slot(T::EthSpec::slots_per_epoch()) + } else { + // Exit now if Capella is not defined. + return; + }; + + // Wait until the Capella fork epoch. + while chain.slot().map_or(true, |slot| slot < capella_fork_slot) { + match slot_clock.duration_to_slot(capella_fork_slot) { + Some(duration) => { + // Sleep until the Capella fork. + sleep(duration).await; + break; + } + None => { + // We were unable to read the slot clock wait another slot + // and then try again. + sleep(slot_clock.slot_duration()).await; + } + } + } + + // The following function will be called in two scenarios: + // + // 1. The node has been running for some time and the Capella fork has just + // been reached. + // 2. The node has just started and it is *after* the Capella fork. + broadcast_address_changes(chain, network_send, log).await +} + +/// Broadcasts any address changes that are flagged for broadcasting at the +/// Capella fork epoch. +/// +/// Address changes are published in chunks, with a delay between each chunk. +/// This helps reduce the load on the P2P network and also helps prevent us from +/// clogging our `network_send` channel and being late to publish +/// blocks, attestations, etc. +pub async fn broadcast_address_changes( + chain: &BeaconChain, + network_send: UnboundedSender>, + log: &Logger, +) { + let head = chain.head_snapshot(); + let mut changes = chain + .op_pool + .get_bls_to_execution_changes_received_pre_capella(&head.beacon_state, &chain.spec); + + while !changes.is_empty() { + // This `split_off` approach is to allow us to have owned chunks of the + // `changes` vec. The `std::slice::Chunks` method uses references and + // the `itertools` iterator that achives this isn't `Send` so it doesn't + // work well with the `sleep` at the end of the loop. + let tail = changes.split_off(cmp::min(BROADCAST_CHUNK_SIZE, changes.len())); + let chunk = mem::replace(&mut changes, tail); + + let mut published_indices = HashSet::with_capacity(BROADCAST_CHUNK_SIZE); + let mut num_ok = 0; + let mut num_err = 0; + + // Publish each individual address change. + for address_change in chunk { + let validator_index = address_change.message.validator_index; + + let pubsub_message = PubsubMessage::BlsToExecutionChange(Box::new(address_change)); + let message = NetworkMessage::Publish { + messages: vec![pubsub_message], + }; + // It seems highly unlikely that this unbounded send will fail, but + // we handle the result nontheless. + if let Err(e) = network_send.send(message) { + debug!( + log, + "Failed to publish change message"; + "error" => ?e, + "validator_index" => validator_index + ); + num_err += 1; + } else { + debug!( + log, + "Published address change message"; + "validator_index" => validator_index + ); + num_ok += 1; + published_indices.insert(validator_index); + } + } + + // Remove any published indices from the list of indices that need to be + // published. + chain + .op_pool + .register_indices_broadcasted_at_capella(&published_indices); + + info!( + log, + "Published address change messages"; + "num_published" => num_ok, + ); + + if num_err > 0 { + warn!( + log, + "Failed to publish address changes"; + "info" => "failed messages will be retried", + "num_unable_to_publish" => num_err, + ); + } + + sleep(BROADCAST_CHUNK_DELAY).await; + } + + debug!( + log, + "Address change routine complete"; + ); +} + +#[cfg(not(debug_assertions))] // Tests run too slow in debug. +#[cfg(test)] +mod tests { + use super::*; + use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType}; + use operation_pool::ReceivedPreCapella; + use state_processing::{SigVerifiedOp, VerifyOperation}; + use std::collections::HashSet; + use tokio::sync::mpsc; + use types::*; + + type E = MainnetEthSpec; + + pub const VALIDATOR_COUNT: usize = BROADCAST_CHUNK_SIZE * 3; + pub const EXECUTION_ADDRESS: Address = Address::repeat_byte(42); + + struct Tester { + harness: BeaconChainHarness>, + /// Changes which should be broadcast at the Capella fork. + received_pre_capella_changes: Vec>, + /// Changes which should *not* be broadcast at the Capella fork. + not_received_pre_capella_changes: Vec>, + } + + impl Tester { + fn new() -> Self { + let altair_fork_epoch = Epoch::new(0); + let bellatrix_fork_epoch = Epoch::new(0); + let capella_fork_epoch = Epoch::new(2); + + let mut spec = E::default_spec(); + spec.altair_fork_epoch = Some(altair_fork_epoch); + spec.bellatrix_fork_epoch = Some(bellatrix_fork_epoch); + spec.capella_fork_epoch = Some(capella_fork_epoch); + + let harness = BeaconChainHarness::builder(E::default()) + .spec(spec) + .logger(logging::test_logger()) + .deterministic_keypairs(VALIDATOR_COUNT) + .deterministic_withdrawal_keypairs(VALIDATOR_COUNT) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + Self { + harness, + received_pre_capella_changes: <_>::default(), + not_received_pre_capella_changes: <_>::default(), + } + } + + fn produce_verified_address_change( + &self, + validator_index: u64, + ) -> SigVerifiedOp { + let change = self + .harness + .make_bls_to_execution_change(validator_index, EXECUTION_ADDRESS); + let head = self.harness.chain.head_snapshot(); + + change + .validate(&head.beacon_state, &self.harness.spec) + .unwrap() + } + + fn produce_received_pre_capella_changes(mut self, indices: Vec) -> Self { + for validator_index in indices { + self.received_pre_capella_changes + .push(self.produce_verified_address_change(validator_index)); + } + self + } + + fn produce_not_received_pre_capella_changes(mut self, indices: Vec) -> Self { + for validator_index in indices { + self.not_received_pre_capella_changes + .push(self.produce_verified_address_change(validator_index)); + } + self + } + + async fn run(self) { + let harness = self.harness; + let chain = harness.chain.clone(); + + let mut broadcast_indices = HashSet::new(); + for change in self.received_pre_capella_changes { + broadcast_indices.insert(change.as_inner().message.validator_index); + chain + .op_pool + .insert_bls_to_execution_change(change, ReceivedPreCapella::Yes); + } + + let mut non_broadcast_indices = HashSet::new(); + for change in self.not_received_pre_capella_changes { + non_broadcast_indices.insert(change.as_inner().message.validator_index); + chain + .op_pool + .insert_bls_to_execution_change(change, ReceivedPreCapella::No); + } + + harness.set_current_slot( + chain + .spec + .capella_fork_epoch + .unwrap() + .start_slot(E::slots_per_epoch()), + ); + + let (sender, mut receiver) = mpsc::unbounded_channel(); + + broadcast_address_changes_at_capella(&chain, sender, &logging::test_logger()).await; + + let mut broadcasted_changes = vec![]; + while let Some(NetworkMessage::Publish { mut messages }) = receiver.recv().await { + match messages.pop().unwrap() { + PubsubMessage::BlsToExecutionChange(change) => broadcasted_changes.push(change), + _ => panic!("unexpected message"), + } + } + + assert_eq!( + broadcasted_changes.len(), + broadcast_indices.len(), + "all expected changes should have been broadcast" + ); + + for broadcasted in &broadcasted_changes { + assert!( + !non_broadcast_indices.contains(&broadcasted.message.validator_index), + "messages not flagged for broadcast should not have been broadcast" + ); + } + + let head = chain.head_snapshot(); + assert!( + chain + .op_pool + .get_bls_to_execution_changes_received_pre_capella( + &head.beacon_state, + &chain.spec, + ) + .is_empty(), + "there shouldn't be any capella broadcast changes left in the op pool" + ); + } + } + + // Useful for generating even-numbered indices. Required since only even + // numbered genesis validators have BLS credentials. + fn even_indices(start: u64, count: usize) -> Vec { + (start..).filter(|i| i % 2 == 0).take(count).collect() + } + + #[tokio::test] + async fn one_chunk() { + Tester::new() + .produce_received_pre_capella_changes(even_indices(0, 4)) + .produce_not_received_pre_capella_changes(even_indices(10, 4)) + .run() + .await; + } + + #[tokio::test] + async fn multiple_chunks() { + Tester::new() + .produce_received_pre_capella_changes(even_indices(0, BROADCAST_CHUNK_SIZE * 3 / 2)) + .run() + .await; + } +} diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 298155e025..23c4977b7c 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -1,3 +1,4 @@ +use crate::address_change_broadcast::broadcast_address_changes_at_capella; use crate::config::{ClientGenesis, Config as ClientConfig}; use crate::notifier::spawn_notifier; use crate::Client; @@ -154,7 +155,6 @@ where config, context.executor.clone(), context.log().clone(), - &spec, ) .map_err(|e| format!("unable to start execution layer endpoints: {:?}", e))?; Some(execution_layer) @@ -809,6 +809,25 @@ where // Spawns a routine that polls the `exchange_transition_configuration` endpoint. execution_layer.spawn_transition_configuration_poll(beacon_chain.spec.clone()); } + + // Spawn a service to publish BLS to execution changes at the Capella fork. + if let Some(network_senders) = self.network_senders { + let inner_chain = beacon_chain.clone(); + let broadcast_context = + runtime_context.service_context("addr_bcast".to_string()); + let log = broadcast_context.log().clone(); + broadcast_context.executor.spawn( + async move { + broadcast_address_changes_at_capella( + &inner_chain, + network_senders.network_send(), + &log, + ) + .await + }, + "addr_broadcast", + ); + } } start_proposer_prep_service(runtime_context.executor.clone(), beacon_chain.clone()); diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 24df874086..b0184dc0ff 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -1,5 +1,6 @@ extern crate slog; +mod address_change_broadcast; pub mod config; mod metrics; mod notifier; diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index 1da7a79707..c1d830bc08 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -1,5 +1,6 @@ use crate::metrics; use beacon_chain::{ + capella_readiness::CapellaReadiness, merge_readiness::{MergeConfig, MergeReadiness}, BeaconChain, BeaconChainTypes, ExecutionStatus, }; @@ -313,6 +314,7 @@ pub fn spawn_notifier( eth1_logging(&beacon_chain, &log); merge_readiness_logging(current_slot, &beacon_chain, &log).await; + capella_readiness_logging(current_slot, &beacon_chain, &log).await; } }; @@ -350,12 +352,15 @@ async fn merge_readiness_logging( } if merge_completed && !has_execution_layer { - error!( - log, - "Execution endpoint required"; - "info" => "you need an execution engine to validate blocks, see: \ - https://lighthouse-book.sigmaprime.io/merge-migration.html" - ); + if !beacon_chain.is_time_to_prepare_for_capella(current_slot) { + // logging of the EE being offline is handled in `capella_readiness_logging()` + error!( + log, + "Execution endpoint required"; + "info" => "you need an execution engine to validate blocks, see: \ + https://lighthouse-book.sigmaprime.io/merge-migration.html" + ); + } return; } @@ -419,6 +424,60 @@ async fn merge_readiness_logging( } } +/// Provides some helpful logging to users to indicate if their node is ready for Capella +async fn capella_readiness_logging( + current_slot: Slot, + beacon_chain: &BeaconChain, + log: &Logger, +) { + let capella_completed = beacon_chain + .canonical_head + .cached_head() + .snapshot + .beacon_block + .message() + .body() + .execution_payload() + .map_or(false, |payload| payload.withdrawals_root().is_ok()); + + let has_execution_layer = beacon_chain.execution_layer.is_some(); + + if capella_completed && has_execution_layer + || !beacon_chain.is_time_to_prepare_for_capella(current_slot) + { + return; + } + + if capella_completed && !has_execution_layer { + error!( + log, + "Execution endpoint required"; + "info" => "you need a Capella enabled execution engine to validate blocks, see: \ + https://lighthouse-book.sigmaprime.io/merge-migration.html" + ); + return; + } + + match beacon_chain.check_capella_readiness().await { + CapellaReadiness::Ready => { + info!(log, "Ready for Capella") + } + readiness @ CapellaReadiness::ExchangeCapabilitiesFailed { error: _ } => { + error!( + log, + "Not ready for Capella"; + "info" => %readiness, + "hint" => "try updating Lighthouse and/or the execution layer", + ) + } + readiness => warn!( + log, + "Not ready for Capella"; + "info" => %readiness, + ), + } +} + fn eth1_logging(beacon_chain: &BeaconChain, log: &Logger) { let current_slot_opt = beacon_chain.slot().ok(); diff --git a/beacon_node/eth1/src/inner.rs b/beacon_node/eth1/src/inner.rs index a44b31050f..0468a02d2e 100644 --- a/beacon_node/eth1/src/inner.rs +++ b/beacon_node/eth1/src/inner.rs @@ -122,7 +122,7 @@ impl SszEth1Cache { cache: self.deposit_cache.to_deposit_cache()?, last_processed_block: self.last_processed_block, }), - endpoint: endpoint_from_config(&config, &spec) + endpoint: endpoint_from_config(&config) .map_err(|e| format!("Failed to create endpoint: {:?}", e))?, to_finalize: RwLock::new(None), // Set the remote head_block zero when creating a new instance. We only care about diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index 56c2411ba1..31082394ba 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -363,7 +363,7 @@ impl Default for Config { } } -pub fn endpoint_from_config(config: &Config, spec: &ChainSpec) -> Result { +pub fn endpoint_from_config(config: &Config) -> Result { match config.endpoint.clone() { Eth1Endpoint::Auth { endpoint, @@ -373,16 +373,11 @@ pub fn endpoint_from_config(config: &Config, spec: &ChainSpec) -> Result { let auth = Auth::new_with_path(jwt_path, jwt_id, jwt_version) .map_err(|e| format!("Failed to initialize jwt auth: {:?}", e))?; - HttpJsonRpc::new_with_auth( - endpoint, - auth, - Some(config.execution_timeout_multiplier), - spec, - ) - .map_err(|e| format!("Failed to create eth1 json rpc client: {:?}", e)) + HttpJsonRpc::new_with_auth(endpoint, auth, Some(config.execution_timeout_multiplier)) + .map_err(|e| format!("Failed to create eth1 json rpc client: {:?}", e)) } Eth1Endpoint::NoAuth(endpoint) => { - HttpJsonRpc::new(endpoint, Some(config.execution_timeout_multiplier), spec) + HttpJsonRpc::new(endpoint, Some(config.execution_timeout_multiplier)) .map_err(|e| format!("Failed to create eth1 json rpc client: {:?}", e)) } } @@ -409,7 +404,7 @@ impl Service { deposit_cache: RwLock::new(DepositUpdater::new( config.deposit_contract_deploy_block, )), - endpoint: endpoint_from_config(&config, &spec)?, + endpoint: endpoint_from_config(&config)?, to_finalize: RwLock::new(None), remote_head_block: RwLock::new(None), config: RwLock::new(config), @@ -438,7 +433,7 @@ impl Service { inner: Arc::new(Inner { block_cache: <_>::default(), deposit_cache: RwLock::new(deposit_cache), - endpoint: endpoint_from_config(&config, &spec) + endpoint: endpoint_from_config(&config) .map_err(Error::FailedToInitializeFromSnapshot)?, to_finalize: RwLock::new(None), remote_head_block: RwLock::new(None), diff --git a/beacon_node/eth1/tests/test.rs b/beacon_node/eth1/tests/test.rs index eb0d2371cb..cd680478cc 100644 --- a/beacon_node/eth1/tests/test.rs +++ b/beacon_node/eth1/tests/test.rs @@ -494,8 +494,7 @@ mod deposit_tree { let mut deposit_counts = vec![]; let client = - HttpJsonRpc::new(SensitiveUrl::parse(ð1.endpoint()).unwrap(), None, spec) - .unwrap(); + HttpJsonRpc::new(SensitiveUrl::parse(ð1.endpoint()).unwrap(), None).unwrap(); // Perform deposits to the smart contract, recording it's state along the way. for deposit in &deposits { @@ -599,12 +598,8 @@ mod http { .expect("should start eth1 environment"); let deposit_contract = ð1.deposit_contract; let web3 = eth1.web3(); - let client = HttpJsonRpc::new( - SensitiveUrl::parse(ð1.endpoint()).unwrap(), - None, - &MainnetEthSpec::default_spec(), - ) - .unwrap(); + let client = + HttpJsonRpc::new(SensitiveUrl::parse(ð1.endpoint()).unwrap(), None).unwrap(); let block_number = get_block_number(&web3).await; let logs = blocking_deposit_logs(&client, ð1, 0..block_number).await; @@ -720,8 +715,7 @@ mod fast { ) .unwrap(); let client = - HttpJsonRpc::new(SensitiveUrl::parse(ð1.endpoint()).unwrap(), None, &spec) - .unwrap(); + HttpJsonRpc::new(SensitiveUrl::parse(ð1.endpoint()).unwrap(), None).unwrap(); let n = 10; let deposits: Vec<_> = (0..n).map(|_| random_deposit_data()).collect(); for deposit in &deposits { diff --git a/beacon_node/execution_layer/src/engine_api.rs b/beacon_node/execution_layer/src/engine_api.rs index bb1f0248c4..bbd0e4fd3a 100644 --- a/beacon_node/execution_layer/src/engine_api.rs +++ b/beacon_node/execution_layer/src/engine_api.rs @@ -1,4 +1,9 @@ use crate::engines::ForkchoiceState; +use crate::http::{ + ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1, ENGINE_FORKCHOICE_UPDATED_V1, + ENGINE_FORKCHOICE_UPDATED_V2, ENGINE_GET_PAYLOAD_V1, ENGINE_GET_PAYLOAD_V2, + ENGINE_NEW_PAYLOAD_V1, ENGINE_NEW_PAYLOAD_V2, +}; use crate::BlobTxConversionError; pub use ethers_core::types::Transaction; use ethers_core::utils::rlp::{self, Decodable, Rlp}; @@ -10,8 +15,8 @@ use std::convert::TryFrom; use strum::IntoStaticStr; use superstruct::superstruct; pub use types::{ - Address, EthSpec, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadHeader, FixedVector, - ForkName, Hash256, Uint256, VariableList, Withdrawal, + Address, EthSpec, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadHeader, + ExecutionPayloadRef, FixedVector, ForkName, Hash256, Uint256, VariableList, Withdrawal, }; use types::{ExecutionPayloadCapella, ExecutionPayloadEip4844, ExecutionPayloadMerge}; @@ -330,6 +335,8 @@ pub struct ProposeBlindedBlockResponse { #[superstruct( variants(Merge, Capella, Eip4844), variant_attributes(derive(Clone, Debug, PartialEq),), + map_into(ExecutionPayload), + map_ref_into(ExecutionPayloadRef), cast_error(ty = "Error", expr = "Error::IncorrectStateVariant"), partial_getter_error(ty = "Error", expr = "Error::IncorrectStateVariant") )] @@ -344,27 +351,49 @@ pub struct GetPayloadResponse { pub block_value: Uint256, } -impl GetPayloadResponse { - pub fn execution_payload(self) -> ExecutionPayload { - match self { - GetPayloadResponse::Merge(response) => { - ExecutionPayload::Merge(response.execution_payload) - } - GetPayloadResponse::Capella(response) => { - ExecutionPayload::Capella(response.execution_payload) - } - GetPayloadResponse::Eip4844(response) => { - ExecutionPayload::Eip4844(response.execution_payload) - } +impl<'a, T: EthSpec> From> for ExecutionPayloadRef<'a, T> { + fn from(response: GetPayloadResponseRef<'a, T>) -> Self { + map_get_payload_response_ref_into_execution_payload_ref!(&'a _, response, |inner, cons| { + cons(&inner.execution_payload) + }) + } +} + +impl From> for ExecutionPayload { + fn from(response: GetPayloadResponse) -> Self { + map_get_payload_response_into_execution_payload!(response, |inner, cons| { + cons(inner.execution_payload) + }) + } +} + +impl From> for (ExecutionPayload, Uint256) { + fn from(response: GetPayloadResponse) -> Self { + match response { + GetPayloadResponse::Merge(inner) => ( + ExecutionPayload::Merge(inner.execution_payload), + inner.block_value, + ), + GetPayloadResponse::Capella(inner) => ( + ExecutionPayload::Capella(inner.execution_payload), + inner.block_value, + ), + GetPayloadResponse::Eip4844(inner) => ( + ExecutionPayload::Eip4844(inner.execution_payload), + inner.block_value, + ), } } } -// This name is work in progress, it could -// change when this method is actually proposed -// but I'm writing this as it has been described +impl GetPayloadResponse { + pub fn execution_payload_ref(&self) -> ExecutionPayloadRef { + self.to_ref().into() + } +} + #[derive(Clone, Copy, Debug)] -pub struct SupportedApis { +pub struct EngineCapabilities { pub new_payload_v1: bool, pub new_payload_v2: bool, pub new_payload_v3: bool, @@ -375,3 +404,32 @@ pub struct SupportedApis { pub get_payload_v3: bool, pub exchange_transition_configuration_v1: bool, } + +impl EngineCapabilities { + pub fn to_response(&self) -> Vec<&str> { + let mut response = Vec::new(); + if self.new_payload_v1 { + response.push(ENGINE_NEW_PAYLOAD_V1); + } + if self.new_payload_v2 { + response.push(ENGINE_NEW_PAYLOAD_V2); + } + if self.forkchoice_updated_v1 { + response.push(ENGINE_FORKCHOICE_UPDATED_V1); + } + if self.forkchoice_updated_v2 { + response.push(ENGINE_FORKCHOICE_UPDATED_V2); + } + if self.get_payload_v1 { + response.push(ENGINE_GET_PAYLOAD_V1); + } + if self.get_payload_v2 { + response.push(ENGINE_GET_PAYLOAD_V2); + } + if self.exchange_transition_configuration_v1 { + response.push(ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1); + } + + response + } +} diff --git a/beacon_node/execution_layer/src/engine_api/http.rs b/beacon_node/execution_layer/src/engine_api/http.rs index 90f8f053d9..b49ce51c63 100644 --- a/beacon_node/execution_layer/src/engine_api/http.rs +++ b/beacon_node/execution_layer/src/engine_api/http.rs @@ -7,10 +7,11 @@ use reqwest::header::CONTENT_TYPE; use sensitive_url::SensitiveUrl; use serde::de::DeserializeOwned; use serde_json::json; -use tokio::sync::RwLock; +use std::collections::HashSet; +use tokio::sync::Mutex; -use std::time::Duration; -use types::{ChainSpec, EthSpec}; +use std::time::{Duration, SystemTime}; +use types::EthSpec; pub use deposit_log::{DepositLog, Log}; pub use reqwest::Client; @@ -50,8 +51,37 @@ pub const ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1: &str = "engine_exchangeTransitionConfigurationV1"; pub const ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1_TIMEOUT: Duration = Duration::from_secs(1); +pub const ENGINE_EXCHANGE_CAPABILITIES: &str = "engine_exchangeCapabilities"; +pub const ENGINE_EXCHANGE_CAPABILITIES_TIMEOUT: Duration = Duration::from_secs(1); + /// This error is returned during a `chainId` call by Geth. pub const EIP155_ERROR_STR: &str = "chain not synced beyond EIP-155 replay-protection fork block"; +/// This code is returned by all clients when a method is not supported +/// (verified geth, nethermind, erigon, besu) +pub const METHOD_NOT_FOUND_CODE: i64 = -32601; + +pub static LIGHTHOUSE_CAPABILITIES: &[&str] = &[ + ENGINE_NEW_PAYLOAD_V1, + ENGINE_NEW_PAYLOAD_V2, + ENGINE_GET_PAYLOAD_V1, + ENGINE_GET_PAYLOAD_V2, + ENGINE_FORKCHOICE_UPDATED_V1, + ENGINE_FORKCHOICE_UPDATED_V2, + ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1, +]; + +/// This is necessary because a user might run a capella-enabled version of +/// lighthouse before they update to a capella-enabled execution engine. +// TODO (mark): rip this out once we are post-capella on mainnet +pub static PRE_CAPELLA_ENGINE_CAPABILITIES: EngineCapabilities = EngineCapabilities { + new_payload_v1: true, + new_payload_v2: false, + forkchoice_updated_v1: true, + forkchoice_updated_v2: false, + get_payload_v1: true, + get_payload_v2: false, + exchange_transition_configuration_v1: true, +}; /// Contains methods to convert arbitrary bytes to an ETH2 deposit contract object. pub mod deposit_log { @@ -528,11 +558,47 @@ pub mod deposit_methods { } } +#[derive(Clone, Debug)] +pub struct CapabilitiesCacheEntry { + engine_capabilities: EngineCapabilities, + fetch_time: SystemTime, +} + +impl CapabilitiesCacheEntry { + pub fn new(engine_capabilities: EngineCapabilities) -> Self { + Self { + engine_capabilities, + fetch_time: SystemTime::now(), + } + } + + pub fn engine_capabilities(&self) -> &EngineCapabilities { + &self.engine_capabilities + } + + pub fn age(&self) -> Duration { + // duration_since() may fail because measurements taken earlier + // are not guaranteed to always be before later measurements + // due to anomalies such as the system clock being adjusted + // either forwards or backwards + // + // In such cases, we'll just say the age is zero + SystemTime::now() + .duration_since(self.fetch_time) + .unwrap_or(Duration::ZERO) + } + + /// returns `true` if the entry's age is >= age_limit + pub fn older_than(&self, age_limit: Option) -> bool { + age_limit.map_or(false, |limit| self.age() >= limit) + } +} + pub struct HttpJsonRpc { pub client: Client, pub url: SensitiveUrl, pub execution_timeout_multiplier: u32, - pub cached_supported_apis: RwLock>, + pub engine_capabilities_cache: Mutex>, auth: Option, } @@ -540,29 +606,12 @@ impl HttpJsonRpc { pub fn new( url: SensitiveUrl, execution_timeout_multiplier: Option, - spec: &ChainSpec, ) -> Result { - // FIXME: remove this `cached_supported_apis` spec hack once the `engine_getCapabilities` - // method is implemented in all execution clients: - // https://github.com/ethereum/execution-apis/issues/321 - let cached_supported_apis = RwLock::new(Some(SupportedApis { - new_payload_v1: true, - new_payload_v2: spec.capella_fork_epoch.is_some() || spec.eip4844_fork_epoch.is_some(), - new_payload_v3: spec.eip4844_fork_epoch.is_some(), - forkchoice_updated_v1: true, - forkchoice_updated_v2: spec.capella_fork_epoch.is_some() - || spec.eip4844_fork_epoch.is_some(), - get_payload_v1: true, - get_payload_v2: spec.capella_fork_epoch.is_some() || spec.eip4844_fork_epoch.is_some(), - get_payload_v3: spec.eip4844_fork_epoch.is_some(), - exchange_transition_configuration_v1: true, - })); - Ok(Self { client: Client::builder().build()?, url, execution_timeout_multiplier: execution_timeout_multiplier.unwrap_or(1), - cached_supported_apis, + engine_capabilities_cache: Mutex::new(None), auth: None, }) } @@ -571,29 +620,12 @@ impl HttpJsonRpc { url: SensitiveUrl, auth: Auth, execution_timeout_multiplier: Option, - spec: &ChainSpec, ) -> Result { - // FIXME: remove this `cached_supported_apis` spec hack once the `engine_getCapabilities` - // method is implemented in all execution clients: - // https://github.com/ethereum/execution-apis/issues/321 - let cached_supported_apis = RwLock::new(Some(SupportedApis { - new_payload_v1: true, - new_payload_v2: spec.capella_fork_epoch.is_some() || spec.eip4844_fork_epoch.is_some(), - new_payload_v3: spec.eip4844_fork_epoch.is_some(), - forkchoice_updated_v1: true, - forkchoice_updated_v2: spec.capella_fork_epoch.is_some() - || spec.eip4844_fork_epoch.is_some(), - get_payload_v1: true, - get_payload_v2: spec.capella_fork_epoch.is_some() || spec.eip4844_fork_epoch.is_some(), - get_payload_v3: spec.eip4844_fork_epoch.is_some(), - exchange_transition_configuration_v1: true, - })); - Ok(Self { client: Client::builder().build()?, url, execution_timeout_multiplier: execution_timeout_multiplier.unwrap_or(1), - cached_supported_apis, + engine_capabilities_cache: Mutex::new(None), auth: Some(auth), }) } @@ -791,7 +823,7 @@ impl HttpJsonRpc { pub async fn get_payload_v1( &self, payload_id: PayloadId, - ) -> Result, Error> { + ) -> Result, Error> { let params = json!([JsonPayloadIdRequest::from(payload_id)]); let payload_v1: JsonExecutionPayloadV1 = self @@ -802,7 +834,11 @@ impl HttpJsonRpc { ) .await?; - Ok(JsonExecutionPayload::V1(payload_v1).into()) + Ok(GetPayloadResponse::Merge(GetPayloadResponseMerge { + execution_payload: payload_v1.into(), + // Have to guess zero here as we don't know the value + block_value: Uint256::zero(), + })) } pub async fn get_payload_v2( @@ -961,37 +997,67 @@ impl HttpJsonRpc { Ok(response) } - // TODO: This is currently a stub for the `engine_getCapabilities` - // method. This stub is unused because we set cached_supported_apis - // in the constructor based on the `spec` - // Implement this once the execution clients support it - // https://github.com/ethereum/execution-apis/issues/321 - pub async fn get_capabilities(&self) -> Result { - Ok(SupportedApis { - new_payload_v1: true, - new_payload_v2: true, - new_payload_v3: true, - forkchoice_updated_v1: true, - forkchoice_updated_v2: true, - get_payload_v1: true, - get_payload_v2: true, - get_payload_v3: true, - exchange_transition_configuration_v1: true, - }) + pub async fn exchange_capabilities(&self) -> Result { + let params = json!([LIGHTHOUSE_CAPABILITIES]); + + let response: Result, _> = self + .rpc_request( + ENGINE_EXCHANGE_CAPABILITIES, + params, + ENGINE_EXCHANGE_CAPABILITIES_TIMEOUT * self.execution_timeout_multiplier, + ) + .await; + + match response { + // TODO (mark): rip this out once we are post capella on mainnet + Err(error) => match error { + Error::ServerMessage { code, message: _ } if code == METHOD_NOT_FOUND_CODE => { + Ok(PRE_CAPELLA_ENGINE_CAPABILITIES) + } + _ => Err(error), + }, + Ok(capabilities) => Ok(EngineCapabilities { + new_payload_v1: capabilities.contains(ENGINE_NEW_PAYLOAD_V1), + new_payload_v2: capabilities.contains(ENGINE_NEW_PAYLOAD_V2), + forkchoice_updated_v1: capabilities.contains(ENGINE_FORKCHOICE_UPDATED_V1), + forkchoice_updated_v2: capabilities.contains(ENGINE_FORKCHOICE_UPDATED_V2), + get_payload_v1: capabilities.contains(ENGINE_GET_PAYLOAD_V1), + get_payload_v2: capabilities.contains(ENGINE_GET_PAYLOAD_V2), + exchange_transition_configuration_v1: capabilities + .contains(ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1), + }), + } } - pub async fn set_cached_supported_apis(&self, supported_apis: Option) { - *self.cached_supported_apis.write().await = supported_apis; + pub async fn clear_exchange_capabilties_cache(&self) { + *self.engine_capabilities_cache.lock().await = None; } - pub async fn get_cached_supported_apis(&self) -> Result { - let cached_opt = *self.cached_supported_apis.read().await; - if let Some(supported_apis) = cached_opt { - Ok(supported_apis) + /// Returns the execution engine capabilities resulting from a call to + /// engine_exchangeCapabilities. If the capabilities cache is not populated, + /// or if it is populated with a cached result of age >= `age_limit`, this + /// method will fetch the result from the execution engine and populate the + /// cache before returning it. Otherwise it will return a cached result from + /// a previous call. + /// + /// Set `age_limit` to `None` to always return the cached result + /// Set `age_limit` to `Some(Duration::ZERO)` to force fetching from EE + pub async fn get_engine_capabilities( + &self, + age_limit: Option, + ) -> Result { + let mut lock = self.engine_capabilities_cache.lock().await; + + if lock + .as_ref() + .map_or(true, |entry| entry.older_than(age_limit)) + { + let engine_capabilities = self.exchange_capabilities().await?; + *lock = Some(CapabilitiesCacheEntry::new(engine_capabilities)); + Ok(engine_capabilities) } else { - let supported_apis = self.get_capabilities().await?; - self.set_cached_supported_apis(Some(supported_apis)).await; - Ok(supported_apis) + // here entry is guaranteed to exist so unwrap() is safe + Ok(*lock.as_ref().unwrap().engine_capabilities()) } } @@ -1001,12 +1067,12 @@ impl HttpJsonRpc { &self, execution_payload: ExecutionPayload, ) -> Result { - let supported_apis = self.get_cached_supported_apis().await?; - if supported_apis.new_payload_v3 { + let engine_capabilities = self.get_engine_capabilities(None).await?; + if engine_capabilities.new_payload_v3 { self.new_payload_v3(execution_payload).await - } else if supported_apis.new_payload_v2 { + }else if engine_capabilities.new_payload_v2 { self.new_payload_v2(execution_payload).await - } else if supported_apis.new_payload_v1 { + } else if engine_capabilities.new_payload_v1 { self.new_payload_v1(execution_payload).await } else { Err(Error::RequiredMethodUnsupported("engine_newPayload")) @@ -1019,22 +1085,13 @@ impl HttpJsonRpc { &self, fork_name: ForkName, payload_id: PayloadId, - ) -> Result, Error> { - let supported_apis = self.get_cached_supported_apis().await?; - if supported_apis.get_payload_v3 { - Ok(self - .get_payload_v3(fork_name, payload_id) - .await? - .execution_payload()) - } else if supported_apis.get_payload_v2 { - // TODO: modify this method to return GetPayloadResponse instead - // of throwing away the `block_value` and returning only the - // ExecutionPayload - Ok(self - .get_payload_v2(fork_name, payload_id) - .await? - .execution_payload()) - } else if supported_apis.get_payload_v1 { + ) -> Result, Error> { + let engine_capabilities = self.get_engine_capabilities(None).await?; + if engine_capabilities.get_payload_v3 { + self.get_payload_v3(fork_name, payload_id).await + } else if engine_capabilities.get_payload_v2 { + self.get_payload_v2(fork_name, payload_id).await + } else if engine_capabilities.new_payload_v1 { self.get_payload_v1(payload_id).await } else { Err(Error::RequiredMethodUnsupported("engine_getPayload")) @@ -1048,11 +1105,11 @@ impl HttpJsonRpc { forkchoice_state: ForkchoiceState, payload_attributes: Option, ) -> Result { - let supported_apis = self.get_cached_supported_apis().await?; - if supported_apis.forkchoice_updated_v2 { + let engine_capabilities = self.get_engine_capabilities(None).await?; + if engine_capabilities.forkchoice_updated_v2 { self.forkchoice_updated_v2(forkchoice_state, payload_attributes) .await - } else if supported_apis.forkchoice_updated_v1 { + } else if engine_capabilities.forkchoice_updated_v1 { self.forkchoice_updated_v1(forkchoice_state, payload_attributes) .await } else { @@ -1080,7 +1137,6 @@ mod test { impl Tester { pub fn new(with_auth: bool) -> Self { let server = MockServer::unit_testing(); - let spec = MainnetEthSpec::default_spec(); let rpc_url = SensitiveUrl::parse(&server.url()).unwrap(); let echo_url = SensitiveUrl::parse(&format!("{}/echo", server.url())).unwrap(); @@ -1091,13 +1147,13 @@ mod test { let echo_auth = Auth::new(JwtKey::from_slice(&DEFAULT_JWT_SECRET).unwrap(), None, None); ( - Arc::new(HttpJsonRpc::new_with_auth(rpc_url, rpc_auth, None, &spec).unwrap()), - Arc::new(HttpJsonRpc::new_with_auth(echo_url, echo_auth, None, &spec).unwrap()), + Arc::new(HttpJsonRpc::new_with_auth(rpc_url, rpc_auth, None).unwrap()), + Arc::new(HttpJsonRpc::new_with_auth(echo_url, echo_auth, None).unwrap()), ) } else { ( - Arc::new(HttpJsonRpc::new(rpc_url, None, &spec).unwrap()), - Arc::new(HttpJsonRpc::new(echo_url, None, &spec).unwrap()), + Arc::new(HttpJsonRpc::new(rpc_url, None).unwrap()), + Arc::new(HttpJsonRpc::new(echo_url, None).unwrap()), ) }; @@ -1685,10 +1741,11 @@ mod test { } })], |client| async move { - let payload = client + let payload: ExecutionPayload<_> = client .get_payload_v1::(str_to_payload_id("0xa247243752eb10b4")) .await - .unwrap(); + .unwrap() + .into(); let expected = ExecutionPayload::Merge(ExecutionPayloadMerge { parent_hash: ExecutionBlockHash::from_str("0x3b8fb240d288781d4aac94d3fd16809ee413bc99294a085798a589dae51ddd4a").unwrap(), diff --git a/beacon_node/execution_layer/src/engines.rs b/beacon_node/execution_layer/src/engines.rs index 271cca26cb..5532fbb345 100644 --- a/beacon_node/execution_layer/src/engines.rs +++ b/beacon_node/execution_layer/src/engines.rs @@ -1,13 +1,15 @@ //! Provides generic behaviour for multiple execution engines, specifically fallback behaviour. use crate::engine_api::{ - Error as EngineApiError, ForkchoiceUpdatedResponse, PayloadAttributes, PayloadId, + EngineCapabilities, Error as EngineApiError, ForkchoiceUpdatedResponse, PayloadAttributes, + PayloadId, }; use crate::HttpJsonRpc; use lru::LruCache; -use slog::{debug, error, info, Logger}; +use slog::{debug, error, info, warn, Logger}; use std::future::Future; use std::sync::Arc; +use std::time::Duration; use task_executor::TaskExecutor; use tokio::sync::{watch, Mutex, RwLock}; use tokio_stream::wrappers::WatchStream; @@ -18,6 +20,7 @@ use types::ExecutionBlockHash; /// Since the size of each value is small (~100 bytes) a large number is used for safety. /// FIXME: check this assumption now that the key includes entire payload attributes which now includes withdrawals const PAYLOAD_ID_LRU_CACHE_SIZE: usize = 512; +const CACHED_ENGINE_CAPABILITIES_AGE_LIMIT: Duration = Duration::from_secs(900); // 15 minutes /// Stores the remembered state of a engine. #[derive(Copy, Clone, PartialEq, Debug, Eq, Default)] @@ -29,6 +32,14 @@ enum EngineStateInternal { AuthFailed, } +#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)] +enum CapabilitiesCacheAction { + #[default] + None, + Update, + Clear, +} + /// A subset of the engine state to inform other services if the engine is online or offline. #[derive(Debug, Clone, PartialEq, Eq, Copy)] pub enum EngineState { @@ -231,7 +242,7 @@ impl Engine { /// Run the `EngineApi::upcheck` function if the node's last known state is not synced. This /// might be used to recover the node if offline. pub async fn upcheck(&self) { - let state: EngineStateInternal = match self.api.upcheck().await { + let (state, cache_action) = match self.api.upcheck().await { Ok(()) => { let mut state = self.state.write().await; if **state != EngineStateInternal::Synced { @@ -249,12 +260,12 @@ impl Engine { ); } state.update(EngineStateInternal::Synced); - **state + (**state, CapabilitiesCacheAction::Update) } Err(EngineApiError::IsSyncing) => { let mut state = self.state.write().await; state.update(EngineStateInternal::Syncing); - **state + (**state, CapabilitiesCacheAction::Update) } Err(EngineApiError::Auth(err)) => { error!( @@ -265,7 +276,7 @@ impl Engine { let mut state = self.state.write().await; state.update(EngineStateInternal::AuthFailed); - **state + (**state, CapabilitiesCacheAction::None) } Err(e) => { error!( @@ -276,10 +287,30 @@ impl Engine { let mut state = self.state.write().await; state.update(EngineStateInternal::Offline); - **state + // need to clear the engine capabilities cache if we detect the + // execution engine is offline as it is likely the engine is being + // updated to a newer version with new capabilities + (**state, CapabilitiesCacheAction::Clear) } }; + // do this after dropping state lock guard to avoid holding two locks at once + match cache_action { + CapabilitiesCacheAction::None => {} + CapabilitiesCacheAction::Update => { + if let Err(e) = self + .get_engine_capabilities(Some(CACHED_ENGINE_CAPABILITIES_AGE_LIMIT)) + .await + { + warn!(self.log, + "Error during exchange capabilities"; + "error" => ?e, + ) + } + } + CapabilitiesCacheAction::Clear => self.api.clear_exchange_capabilties_cache().await, + } + debug!( self.log, "Execution engine upcheck complete"; @@ -287,6 +318,22 @@ impl Engine { ); } + /// Returns the execution engine capabilities resulting from a call to + /// engine_exchangeCapabilities. If the capabilities cache is not populated, + /// or if it is populated with a cached result of age >= `age_limit`, this + /// method will fetch the result from the execution engine and populate the + /// cache before returning it. Otherwise it will return a cached result from + /// a previous call. + /// + /// Set `age_limit` to `None` to always return the cached result + /// Set `age_limit` to `Some(Duration::ZERO)` to force fetching from EE + pub async fn get_engine_capabilities( + &self, + age_limit: Option, + ) -> Result { + self.api.get_engine_capabilities(age_limit).await + } + /// Run `func` on the node regardless of the node's current state. /// /// ## Note diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 5e574cf0f9..b55e9a0c0f 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -7,6 +7,7 @@ use crate::payload_cache::PayloadCache; use auth::{strip_prefix, Auth, JwtKey}; use builder_client::BuilderHttpClient; +pub use engine_api::EngineCapabilities; use engine_api::Error as ApiError; pub use engine_api::*; pub use engine_api::{http, http::deposit_methods, http::HttpJsonRpc}; @@ -123,9 +124,13 @@ impl From for Error { } pub enum BlockProposalContents> { - Payload(Payload), + Payload { + payload: Payload, + block_value: Uint256, + }, PayloadAndBlobs { payload: Payload, + block_value: Uint256, kzg_commitments: VariableList, blobs: VariableList, T::MaxBlobsPerBlock>, }, @@ -151,9 +156,13 @@ impl> BlockProposalContents &Payload { match self { - Self::Payload(payload) => payload, + Self::Payload { + payload, + block_value: _, + } => payload, Self::PayloadAndBlobs { payload, + block_value: _, kzg_commitments: _, blobs: _, } => payload, @@ -161,21 +170,43 @@ impl> BlockProposalContents Payload { match self { - Self::Payload(payload) => payload, + Self::Payload { + payload, + block_value: _, + } => payload, Self::PayloadAndBlobs { payload, + block_value: _, kzg_commitments: _, blobs: _, } => payload, } } + pub fn block_value(&self) -> &Uint256 { + match self { + Self::Payload { + payload: _, + block_value, + } => block_value, + Self::PayloadAndBlobs { + payload: _, + block_value, + kzg_commitments: _, + blobs: _, + } => block_value, + } + } pub fn default_at_fork(fork_name: ForkName) -> Result { Ok(match fork_name { ForkName::Base | ForkName::Altair | ForkName::Merge | ForkName::Capella => { - BlockProposalContents::Payload(Payload::default_at_fork(fork_name)?) + BlockProposalContents::Payload { + payload: Payload::default_at_fork(fork_name)?, + block_value: Uint256::zero(), + } } ForkName::Eip4844 => BlockProposalContents::PayloadAndBlobs { payload: Payload::default_at_fork(fork_name)?, + block_value: Uint256::zero(), blobs: VariableList::default(), kzg_commitments: VariableList::default(), }, @@ -267,12 +298,7 @@ pub struct ExecutionLayer { impl ExecutionLayer { /// Instantiate `Self` with an Execution engine specified in `Config`, using JSON-RPC via HTTP. - pub fn from_config( - config: Config, - executor: TaskExecutor, - log: Logger, - spec: &ChainSpec, - ) -> Result { + pub fn from_config(config: Config, executor: TaskExecutor, log: Logger) -> Result { let Config { execution_endpoints: urls, builder_url, @@ -327,13 +353,8 @@ impl ExecutionLayer { let engine: Engine = { let auth = Auth::new(jwt_key, jwt_id, jwt_version); debug!(log, "Loaded execution endpoint"; "endpoint" => %execution_url, "jwt_path" => ?secret_file.as_path()); - let api = HttpJsonRpc::new_with_auth( - execution_url, - auth, - execution_timeout_multiplier, - &spec, - ) - .map_err(Error::ApiError)?; + let api = HttpJsonRpc::new_with_auth(execution_url, auth, execution_timeout_multiplier) + .map_err(Error::ApiError)?; Engine::new(api, executor.clone(), &log) }; @@ -377,12 +398,12 @@ impl ExecutionLayer { &self.inner.builder } - /// Cache a full payload, keyed on the `tree_hash_root` of its `transactions` field. - fn cache_payload(&self, payload: &ExecutionPayload) -> Option> { - self.inner.payload_cache.put(payload.clone()) + /// Cache a full payload, keyed on the `tree_hash_root` of the payload + fn cache_payload(&self, payload: ExecutionPayloadRef) -> Option> { + self.inner.payload_cache.put(payload.clone_from_ref()) } - /// Attempt to retrieve a full payload from the payload cache by the `transactions_root`. + /// Attempt to retrieve a full payload from the payload cache by the payload root pub fn get_payload_by_root(&self, root: &Hash256) -> Option> { self.inner.payload_cache.pop(root) } @@ -819,17 +840,32 @@ impl ExecutionLayer { "parent_hash" => ?parent_hash, ); + let relay_value = relay.data.message.value; + let local_value = *local.block_value(); + if local_value >= relay_value { + info!( + self.log(), + "Local block is more profitable than relay block"; + "local_block_value" => %local_value, + "relay_value" => %relay_value + ); + return Ok(ProvenancedPayload::Local(local)); + } + match verify_builder_bid( &relay, parent_hash, - payload_attributes.prev_randao(), - payload_attributes.timestamp(), + payload_attributes, Some(local.payload().block_number()), self.inner.builder_profit_threshold, + current_fork, spec, ) { Ok(()) => Ok(ProvenancedPayload::Builder( - BlockProposalContents::Payload(relay.data.message.header), + BlockProposalContents::Payload { + payload: relay.data.message.header, + block_value: relay.data.message.value, + }, )), Err(reason) if !reason.payload_invalid() => { info!( @@ -873,19 +909,25 @@ impl ExecutionLayer { match verify_builder_bid( &relay, parent_hash, - payload_attributes.prev_randao(), - payload_attributes.timestamp(), + payload_attributes, None, self.inner.builder_profit_threshold, + current_fork, spec, ) { Ok(()) => Ok(ProvenancedPayload::Builder( - BlockProposalContents::Payload(relay.data.message.header), + BlockProposalContents::Payload { + payload: relay.data.message.header, + block_value: relay.data.message.value, + }, )), // If the payload is valid then use it. The local EE failed // to produce a payload so we have no alternative. Err(e) if !e.payload_invalid() => Ok(ProvenancedPayload::Builder( - BlockProposalContents::Payload(relay.data.message.header), + BlockProposalContents::Payload { + payload: relay.data.message.header, + block_value: relay.data.message.value, + }, )), Err(reason) => { metrics::inc_counter_vec( @@ -999,7 +1041,7 @@ impl ExecutionLayer { payload_attributes: &PayloadAttributes, forkchoice_update_params: ForkchoiceUpdateParameters, current_fork: ForkName, - f: fn(&ExecutionLayer, &ExecutionPayload) -> Option>, + f: fn(&ExecutionLayer, ExecutionPayloadRef) -> Option>, ) -> Result, Error> { self.engine() .request(move |engine| async move { @@ -1082,9 +1124,9 @@ impl ExecutionLayer { ); engine.api.get_payload::(current_fork, payload_id).await }; - let (blob, payload) = tokio::join!(blob_fut, payload_fut); - let payload = payload.map(|full_payload| { - if full_payload.fee_recipient() != payload_attributes.suggested_fee_recipient() { + let (blob, payload_response) = tokio::join!(blob_fut, payload_fut); + let (execution_payload, block_value) = payload_response.map(|payload_response| { + if payload_response.execution_payload_ref().fee_recipient() != payload_attributes.suggested_fee_recipient() { error!( self.log(), "Inconsistent fee recipient"; @@ -1093,28 +1135,32 @@ impl ExecutionLayer { indicate that fees are being diverted to another address. Please \ ensure that the value of suggested_fee_recipient is set correctly and \ that the Execution Engine is trusted.", - "fee_recipient" => ?full_payload.fee_recipient(), + "fee_recipient" => ?payload_response.execution_payload_ref().fee_recipient(), "suggested_fee_recipient" => ?payload_attributes.suggested_fee_recipient(), ); } - if f(self, &full_payload).is_some() { + if f(self, payload_response.execution_payload_ref()).is_some() { warn!( self.log(), "Duplicate payload cached, this might indicate redundant proposal \ attempts." ); } - full_payload.into() + payload_response.into() })?; if let Some(blob) = blob.transpose()? { // FIXME(sean) cache blobs Ok(BlockProposalContents::PayloadAndBlobs { - payload, + payload: execution_payload.into(), + block_value, blobs: blob.blobs, kzg_commitments: blob.kzgs, }) } else { - Ok(BlockProposalContents::Payload(payload)) + Ok(BlockProposalContents::Payload { + payload: execution_payload.into(), + block_value, + }) } }) .await @@ -1373,6 +1419,26 @@ impl ExecutionLayer { } } + /// Returns the execution engine capabilities resulting from a call to + /// engine_exchangeCapabilities. If the capabilities cache is not populated, + /// or if it is populated with a cached result of age >= `age_limit`, this + /// method will fetch the result from the execution engine and populate the + /// cache before returning it. Otherwise it will return a cached result from + /// a previous call. + /// + /// Set `age_limit` to `None` to always return the cached result + /// Set `age_limit` to `Some(Duration::ZERO)` to force fetching from EE + pub async fn get_engine_capabilities( + &self, + age_limit: Option, + ) -> Result { + self.engine() + .request(|engine| engine.get_engine_capabilities(age_limit)) + .await + .map_err(Box::new) + .map_err(Error::EngineError) + } + /// Used during block production to determine if the merge has been triggered. /// /// ## Specification @@ -1801,6 +1867,11 @@ enum InvalidBuilderPayload { signature: Signature, pubkey: PublicKeyBytes, }, + #[allow(dead_code)] + WithdrawalsRoot { + payload: Hash256, + expected: Hash256, + }, } impl InvalidBuilderPayload { @@ -1815,6 +1886,7 @@ impl InvalidBuilderPayload { InvalidBuilderPayload::BlockNumber { .. } => true, InvalidBuilderPayload::Fork { .. } => true, InvalidBuilderPayload::Signature { .. } => true, + InvalidBuilderPayload::WithdrawalsRoot { .. } => true, } } } @@ -1850,6 +1922,13 @@ impl fmt::Display for InvalidBuilderPayload { "invalid payload signature {} for pubkey {}", signature, pubkey ), + InvalidBuilderPayload::WithdrawalsRoot { payload, expected } => { + write!( + f, + "payload withdrawals root was {} not {}", + payload, expected + ) + } } } } @@ -1858,10 +1937,10 @@ impl fmt::Display for InvalidBuilderPayload { fn verify_builder_bid>( bid: &ForkVersionedResponse>, parent_hash: ExecutionBlockHash, - prev_randao: Hash256, - timestamp: u64, + payload_attributes: &PayloadAttributes, block_number: Option, profit_threshold: Uint256, + current_fork: ForkName, spec: &ChainSpec, ) -> Result<(), Box> { let is_signature_valid = bid.data.verify_signature(spec); @@ -1888,29 +1967,25 @@ fn verify_builder_bid>( payload: header.parent_hash(), expected: parent_hash, })) - } else if header.prev_randao() != prev_randao { + } else if header.prev_randao() != payload_attributes.prev_randao() { Err(Box::new(InvalidBuilderPayload::PrevRandao { payload: header.prev_randao(), - expected: prev_randao, + expected: payload_attributes.prev_randao(), })) - } else if header.timestamp() != timestamp { + } else if header.timestamp() != payload_attributes.timestamp() { Err(Box::new(InvalidBuilderPayload::Timestamp { payload: header.timestamp(), - expected: timestamp, + expected: payload_attributes.timestamp(), })) } else if block_number.map_or(false, |n| n != header.block_number()) { Err(Box::new(InvalidBuilderPayload::BlockNumber { payload: header.block_number(), expected: block_number, })) - } else if !matches!(bid.version, Some(ForkName::Merge)) { - // Once fork information is added to the payload, we will need to - // check that the local and relay payloads match. At this point, if - // we are requesting a payload at all, we have to assume this is - // the Bellatrix fork. + } else if bid.version != Some(current_fork) { Err(Box::new(InvalidBuilderPayload::Fork { payload: bid.version, - expected: ForkName::Merge, + expected: current_fork, })) } else if !is_signature_valid { Err(Box::new(InvalidBuilderPayload::Signature { diff --git a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs index a0ce44450e..fb0cf66a35 100644 --- a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs +++ b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs @@ -1,25 +1,33 @@ use super::Context; use crate::engine_api::{http::*, *}; use crate::json_structures::*; +use crate::test_utils::DEFAULT_MOCK_EL_PAYLOAD_VALUE_WEI; use serde::de::DeserializeOwned; use serde_json::Value as JsonValue; use std::sync::Arc; use types::{EthSpec, ForkName}; +pub const GENERIC_ERROR_CODE: i64 = -1234; +pub const BAD_PARAMS_ERROR_CODE: i64 = -32602; +pub const UNKNOWN_PAYLOAD_ERROR_CODE: i64 = -38001; +pub const FORK_REQUEST_MISMATCH_ERROR_CODE: i64 = -32000; + pub async fn handle_rpc( body: JsonValue, ctx: Arc>, -) -> Result { +) -> Result { *ctx.previous_request.lock() = Some(body.clone()); let method = body .get("method") .and_then(JsonValue::as_str) - .ok_or_else(|| "missing/invalid method field".to_string())?; + .ok_or_else(|| "missing/invalid method field".to_string()) + .map_err(|s| (s, GENERIC_ERROR_CODE))?; let params = body .get("params") - .ok_or_else(|| "missing/invalid params field".to_string())?; + .ok_or_else(|| "missing/invalid params field".to_string()) + .map_err(|s| (s, GENERIC_ERROR_CODE))?; match method { ETH_SYNCING => Ok(JsonValue::Bool(false)), @@ -27,7 +35,8 @@ pub async fn handle_rpc( let tag = params .get(0) .and_then(JsonValue::as_str) - .ok_or_else(|| "missing/invalid params[0] value".to_string())?; + .ok_or_else(|| "missing/invalid params[0] value".to_string()) + .map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?; match tag { "latest" => Ok(serde_json::to_value( @@ -36,7 +45,10 @@ pub async fn handle_rpc( .latest_execution_block(), ) .unwrap()), - other => Err(format!("The tag {} is not supported", other)), + other => Err(( + format!("The tag {} is not supported", other), + BAD_PARAMS_ERROR_CODE, + )), } } ETH_GET_BLOCK_BY_HASH => { @@ -47,7 +59,8 @@ pub async fn handle_rpc( .and_then(|s| { s.parse() .map_err(|e| format!("unable to parse hash: {:?}", e)) - })?; + }) + .map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?; // If we have a static response set, just return that. if let Some(response) = *ctx.static_get_block_by_hash_response.lock() { @@ -57,7 +70,8 @@ pub async fn handle_rpc( let full_tx = params .get(1) .and_then(JsonValue::as_bool) - .ok_or_else(|| "missing/invalid params[1] value".to_string())?; + .ok_or_else(|| "missing/invalid params[1] value".to_string()) + .map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?; if full_tx { Ok(serde_json::to_value( ctx.execution_block_generator @@ -76,15 +90,17 @@ pub async fn handle_rpc( } ENGINE_NEW_PAYLOAD_V1 | ENGINE_NEW_PAYLOAD_V2 | ENGINE_NEW_PAYLOAD_V3 => { let request = match method { - ENGINE_NEW_PAYLOAD_V1 => { - JsonExecutionPayload::V1(get_param::>(params, 0)?) - } + ENGINE_NEW_PAYLOAD_V1 => JsonExecutionPayload::V1( + get_param::>(params, 0) + .map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?, + ), ENGINE_NEW_PAYLOAD_V2 => get_param::>(params, 0) .map(|jep| JsonExecutionPayload::V2(jep)) .or_else(|_| { get_param::>(params, 0) .map(|jep| JsonExecutionPayload::V1(jep)) - })?, + }) + .map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?, ENGINE_NEW_PAYLOAD_V3 => get_param::>(params, 0) .map(|jep| JsonExecutionPayload::V3(jep)) .or_else(|_| { @@ -106,37 +122,46 @@ pub async fn handle_rpc( match fork { ForkName::Merge => { if matches!(request, JsonExecutionPayload::V2(_)) { - return Err(format!( - "{} called with `ExecutionPayloadV2` before capella fork!", - method + return Err(( + format!( + "{} called with `ExecutionPayloadV2` before Capella fork!", + method + ), + GENERIC_ERROR_CODE, )); } } ForkName::Capella => { if method == ENGINE_NEW_PAYLOAD_V1 { - return Err(format!("{} called after capella fork!", method)); + return Err(( + format!("{} called after Capella fork!", method), + GENERIC_ERROR_CODE, + )); } if matches!(request, JsonExecutionPayload::V1(_)) { - return Err(format!( - "{} called with `ExecutionPayloadV1` after capella fork!", - method + return Err(( + format!( + "{} called with `ExecutionPayloadV1` after Capella fork!", + method + ), + GENERIC_ERROR_CODE, )); } } ForkName::Eip4844 => { if method == ENGINE_NEW_PAYLOAD_V1 || method == ENGINE_NEW_PAYLOAD_V2 { - return Err(format!("{} called after capella fork!", method)); + return Err((format!("{} called after capella fork!", method), GENERIC_ERROR_CODE)); } if matches!(request, JsonExecutionPayload::V1(_)) { - return Err(format!( + return Err((format!( "{} called with `ExecutionPayloadV1` after eip4844 fork!", method - )); + ), GENERIC_ERROR_CODE)); } if matches!(request, JsonExecutionPayload::V2(_)) { - return Err(format!( + return Err((format!( "{} called with `ExecutionPayloadV2` after eip4844 fork!", - method + method), GENERIC_ERROR_CODE )); } } @@ -174,14 +199,20 @@ pub async fn handle_rpc( Ok(serde_json::to_value(JsonPayloadStatusV1::from(response)).unwrap()) } ENGINE_GET_PAYLOAD_V1 | ENGINE_GET_PAYLOAD_V2 | ENGINE_GET_PAYLOAD_V3 => { - let request: JsonPayloadIdRequest = get_param(params, 0)?; + let request: JsonPayloadIdRequest = + get_param(params, 0).map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?; let id = request.into(); let response = ctx .execution_block_generator .write() .get_payload(&id) - .ok_or_else(|| format!("no payload for id {:?}", id))?; + .ok_or_else(|| { + ( + format!("no payload for id {:?}", id), + UNKNOWN_PAYLOAD_ERROR_CODE, + ) + })?; // validate method called correctly according to shanghai fork time if ctx @@ -191,7 +222,10 @@ pub async fn handle_rpc( == ForkName::Capella && method == ENGINE_GET_PAYLOAD_V1 { - return Err(format!("{} called after capella fork!", method)); + return Err(( + format!("{} called after Capella fork!", method), + FORK_REQUEST_MISMATCH_ERROR_CODE, + )); } // validate method called correctly according to eip4844 fork time if ctx @@ -201,7 +235,7 @@ pub async fn handle_rpc( == ForkName::Eip4844 && (method == ENGINE_GET_PAYLOAD_V1 || method == ENGINE_GET_PAYLOAD_V2) { - return Err(format!("{} called after eip4844 fork!", method)); + return Err((format!("{} called after eip4844 fork!", method), FORK_REQUEST_MISMATCH_ERROR_CODE)); } match method { @@ -212,14 +246,14 @@ pub async fn handle_rpc( JsonExecutionPayload::V1(execution_payload) => { serde_json::to_value(JsonGetPayloadResponseV1 { execution_payload, - block_value: 0.into(), + block_value: DEFAULT_MOCK_EL_PAYLOAD_VALUE_WEI.into(), }) .unwrap() } JsonExecutionPayload::V2(execution_payload) => { serde_json::to_value(JsonGetPayloadResponseV2 { execution_payload, - block_value: 0.into(), + block_value: DEFAULT_MOCK_EL_PAYLOAD_VALUE_WEI.into(), }) .unwrap() } @@ -229,61 +263,65 @@ pub async fn handle_rpc( JsonExecutionPayload::V1(execution_payload) => { serde_json::to_value(JsonGetPayloadResponseV1 { execution_payload, - block_value: 0.into(), + block_value: DEFAULT_MOCK_EL_PAYLOAD_VALUE_WEI.into() }) - .unwrap() + .unwrap() } JsonExecutionPayload::V2(execution_payload) => { serde_json::to_value(JsonGetPayloadResponseV2 { execution_payload, - block_value: 0.into(), + block_value: DEFAULT_MOCK_EL_PAYLOAD_VALUE_WEI.into() }) - .unwrap() + .unwrap() } JsonExecutionPayload::V3(execution_payload) => { serde_json::to_value(JsonGetPayloadResponseV3 { execution_payload, - block_value: 0.into(), + block_value: DEFAULT_MOCK_EL_PAYLOAD_VALUE_WEI.into() }) - .unwrap() + .unwrap() } }), _ => unreachable!(), } } ENGINE_FORKCHOICE_UPDATED_V1 | ENGINE_FORKCHOICE_UPDATED_V2 => { - let forkchoice_state: JsonForkchoiceStateV1 = get_param(params, 0)?; + let forkchoice_state: JsonForkchoiceStateV1 = + get_param(params, 0).map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?; let payload_attributes = match method { ENGINE_FORKCHOICE_UPDATED_V1 => { - let jpa1: Option = get_param(params, 1)?; + let jpa1: Option = + get_param(params, 1).map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?; jpa1.map(JsonPayloadAttributes::V1) } ENGINE_FORKCHOICE_UPDATED_V2 => { // we can't use `deny_unknown_fields` without breaking compatibility with some // clients that haven't updated to the latest engine_api spec. So instead we'll // need to deserialize based on timestamp - get_param::>(params, 1).and_then(|pa| { - pa.and_then(|pa| { - match ctx - .execution_block_generator - .read() - .get_fork_at_timestamp(*pa.timestamp()) - { - ForkName::Merge => { - get_param::>(params, 1) - .map(|opt| opt.map(JsonPayloadAttributes::V1)) - .transpose() + get_param::>(params, 1) + .and_then(|pa| { + pa.and_then(|pa| { + match ctx + .execution_block_generator + .read() + .get_fork_at_timestamp(*pa.timestamp()) + { + ForkName::Merge => { + get_param::>(params, 1) + .map(|opt| opt.map(JsonPayloadAttributes::V1)) + .transpose() + } + ForkName::Capella => { + get_param::>(params, 1) + .map(|opt| opt.map(JsonPayloadAttributes::V2)) + .transpose() + } + _ => unreachable!(), } - ForkName::Capella | ForkName::Eip4844 => { - get_param::>(params, 1) - .map(|opt| opt.map(JsonPayloadAttributes::V2)) - .transpose() - } - _ => unreachable!(), - } + }) + .transpose() }) - .transpose() - })? + .map_err(|s| (s, BAD_PARAMS_ERROR_CODE))? } _ => unreachable!(), }; @@ -297,20 +335,29 @@ pub async fn handle_rpc( { ForkName::Merge => { if matches!(pa, JsonPayloadAttributes::V2(_)) { - return Err(format!( - "{} called with `JsonPayloadAttributesV2` before capella fork!", - method + return Err(( + format!( + "{} called with `JsonPayloadAttributesV2` before Capella fork!", + method + ), + GENERIC_ERROR_CODE, )); } } ForkName::Capella | ForkName::Eip4844 => { if method == ENGINE_FORKCHOICE_UPDATED_V1 { - return Err(format!("{} called after capella fork!", method)); + return Err(( + format!("{} called after Capella fork!", method), + FORK_REQUEST_MISMATCH_ERROR_CODE, + )); } if matches!(pa, JsonPayloadAttributes::V1(_)) { - return Err(format!( - "{} called with `JsonPayloadAttributesV1` after capella fork!", - method + return Err(( + format!( + "{} called with `JsonPayloadAttributesV1` after Capella fork!", + method + ), + FORK_REQUEST_MISMATCH_ERROR_CODE, )); } } @@ -337,10 +384,14 @@ pub async fn handle_rpc( return Ok(serde_json::to_value(response).unwrap()); } - let mut response = ctx.execution_block_generator.write().forkchoice_updated( - forkchoice_state.into(), - payload_attributes.map(|json| json.into()), - )?; + let mut response = ctx + .execution_block_generator + .write() + .forkchoice_updated( + forkchoice_state.into(), + payload_attributes.map(|json| json.into()), + ) + .map_err(|s| (s, GENERIC_ERROR_CODE))?; if let Some(mut status) = ctx.static_forkchoice_updated_response.lock().clone() { if status.status == PayloadStatusV1Status::Valid { @@ -361,9 +412,13 @@ pub async fn handle_rpc( }; Ok(serde_json::to_value(transition_config).unwrap()) } - other => Err(format!( - "The method {} does not exist/is not available", - other + ENGINE_EXCHANGE_CAPABILITIES => { + let engine_capabilities = ctx.engine_capabilities.read(); + Ok(serde_json::to_value(engine_capabilities.to_response()).unwrap()) + } + other => Err(( + format!("The method {} does not exist/is not available", other), + METHOD_NOT_FOUND_CODE, )), } } diff --git a/beacon_node/execution_layer/src/test_utils/mock_builder.rs b/beacon_node/execution_layer/src/test_utils/mock_builder.rs index 8ce4a65564..40a0c41afa 100644 --- a/beacon_node/execution_layer/src/test_utils/mock_builder.rs +++ b/beacon_node/execution_layer/src/test_utils/mock_builder.rs @@ -1,4 +1,4 @@ -use crate::test_utils::DEFAULT_JWT_SECRET; +use crate::test_utils::{DEFAULT_BUILDER_PAYLOAD_VALUE_WEI, DEFAULT_JWT_SECRET}; use crate::{Config, ExecutionLayer, PayloadAttributes}; use async_trait::async_trait; use eth2::types::{BlockId, StateId, ValidatorId}; @@ -84,8 +84,7 @@ impl TestingBuilder { }; let el = - ExecutionLayer::from_config(config, executor.clone(), executor.log().clone(), &spec) - .unwrap(); + ExecutionLayer::from_config(config, executor.clone(), executor.log().clone()).unwrap(); // This should probably be done for all fields, we only update ones we are testing with so far. let mut context = Context::for_mainnet(); @@ -329,7 +328,7 @@ impl mev_build_rs::BlindedBlockProvider for MockBuilder { let mut message = BuilderBid { header, - value: ssz_rs::U256::default(), + value: to_ssz_rs(&Uint256::from(DEFAULT_BUILDER_PAYLOAD_VALUE_WEI))?, public_key: self.builder_sk.public_key(), }; diff --git a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs index d061f13a6b..1a5d1fd198 100644 --- a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs +++ b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs @@ -29,6 +29,7 @@ impl MockExecutionLayer { DEFAULT_TERMINAL_BLOCK, None, None, + None, Some(JwtKey::from_slice(&DEFAULT_JWT_SECRET).unwrap()), spec, None, @@ -41,6 +42,7 @@ impl MockExecutionLayer { terminal_block: u64, shanghai_time: Option, eip4844_time: Option, + builder_threshold: Option, jwt_key: Option, spec: ChainSpec, builder_url: Option, @@ -69,12 +71,11 @@ impl MockExecutionLayer { builder_url, secret_files: vec![path], suggested_fee_recipient: Some(Address::repeat_byte(42)), - builder_profit_threshold: DEFAULT_BUILDER_THRESHOLD_WEI, + builder_profit_threshold: builder_threshold.unwrap_or(DEFAULT_BUILDER_THRESHOLD_WEI), ..Default::default() }; let el = - ExecutionLayer::from_config(config, executor.clone(), executor.log().clone(), &spec) - .unwrap(); + ExecutionLayer::from_config(config, executor.clone(), executor.log().clone()).unwrap(); Self { server, @@ -106,7 +107,7 @@ impl MockExecutionLayer { prev_randao, Address::repeat_byte(42), // FIXME: think about how to handle different forks / withdrawals here.. - Some(vec![]), + None, ); // Insert a proposer to ensure the fork choice updated command works. diff --git a/beacon_node/execution_layer/src/test_utils/mod.rs b/beacon_node/execution_layer/src/test_utils/mod.rs index bad02e3698..077d29575e 100644 --- a/beacon_node/execution_layer/src/test_utils/mod.rs +++ b/beacon_node/execution_layer/src/test_utils/mod.rs @@ -22,6 +22,7 @@ use tokio::{runtime, sync::oneshot}; use types::{EthSpec, ExecutionBlockHash, Uint256}; use warp::{http::StatusCode, Filter, Rejection}; +use crate::EngineCapabilities; pub use execution_block_generator::{generate_pow_block, Block, ExecutionBlockGenerator}; pub use hook::Hook; pub use mock_builder::{Context as MockBuilderContext, MockBuilder, Operation, TestingBuilder}; @@ -31,6 +32,17 @@ pub const DEFAULT_TERMINAL_DIFFICULTY: u64 = 6400; pub const DEFAULT_TERMINAL_BLOCK: u64 = 64; pub const DEFAULT_JWT_SECRET: [u8; 32] = [42; 32]; pub const DEFAULT_BUILDER_THRESHOLD_WEI: u128 = 1_000_000_000_000_000_000; +pub const DEFAULT_MOCK_EL_PAYLOAD_VALUE_WEI: u128 = 10_000_000_000_000_000; +pub const DEFAULT_BUILDER_PAYLOAD_VALUE_WEI: u128 = 20_000_000_000_000_000; +pub const DEFAULT_ENGINE_CAPABILITIES: EngineCapabilities = EngineCapabilities { + new_payload_v1: true, + new_payload_v2: true, + forkchoice_updated_v1: true, + forkchoice_updated_v2: true, + get_payload_v1: true, + get_payload_v2: true, + exchange_transition_configuration_v1: true, +}; mod execution_block_generator; mod handle_rpc; @@ -117,6 +129,7 @@ impl MockServer { hook: <_>::default(), new_payload_statuses: <_>::default(), fcu_payload_statuses: <_>::default(), + engine_capabilities: Arc::new(RwLock::new(DEFAULT_ENGINE_CAPABILITIES)), _phantom: PhantomData, }); @@ -147,6 +160,10 @@ impl MockServer { } } + pub fn set_engine_capabilities(&self, engine_capabilities: EngineCapabilities) { + *self.ctx.engine_capabilities.write() = engine_capabilities; + } + pub fn new( handle: &runtime::Handle, jwt_key: JwtKey, @@ -469,6 +486,7 @@ pub struct Context { pub new_payload_statuses: Arc>>, pub fcu_payload_statuses: Arc>>, + pub engine_capabilities: Arc>, pub _phantom: PhantomData, } @@ -620,11 +638,11 @@ pub fn serve( "jsonrpc": JSONRPC_VERSION, "result": result }), - Err(message) => json!({ + Err((message, code)) => json!({ "id": id, "jsonrpc": JSONRPC_VERSION, "error": { - "code": -1234, // Junk error code. + "code": code, "message": message } }), diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 0dc918f425..5110a73ed7 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -36,6 +36,7 @@ tree_hash = "0.4.1" sysinfo = "0.26.5" system_health = { path = "../../common/system_health" } directory = { path = "../../common/directory" } +operation_pool = { path = "../operation_pool" } [dev-dependencies] store = { path = "../store" } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 45d7b10023..6f72bcf2cb 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -35,6 +35,7 @@ use eth2::types::{ use lighthouse_network::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage}; use lighthouse_version::version_with_platform; use network::{NetworkMessage, NetworkSenders, ValidatorSubscriptionMessage}; +use operation_pool::ReceivedPreCapella; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use slog::{crit, debug, error, info, warn, Logger}; @@ -1690,8 +1691,12 @@ pub fn serve( .to_execution_address; // New to P2P *and* op pool, gossip immediately if post-Capella. - let publish = chain.current_slot_is_post_capella().unwrap_or(false); - if publish { + let received_pre_capella = if chain.current_slot_is_post_capella().unwrap_or(false) { + ReceivedPreCapella::No + } else { + ReceivedPreCapella::Yes + }; + if matches!(received_pre_capella, ReceivedPreCapella::No) { publish_pubsub_message( &network_tx, PubsubMessage::BlsToExecutionChange(Box::new( @@ -1702,14 +1707,14 @@ pub fn serve( // Import to op pool (may return `false` if there's a race). let imported = - chain.import_bls_to_execution_change(verified_address_change); + chain.import_bls_to_execution_change(verified_address_change, received_pre_capella); info!( log, "Processed BLS to execution change"; "validator_index" => validator_index, "address" => ?address, - "published" => publish, + "published" => matches!(received_pre_capella, ReceivedPreCapella::No), "imported" => imported, ); } @@ -3566,6 +3571,7 @@ pub fn serve( .or(post_beacon_pool_sync_committees.boxed()) .or(post_beacon_rewards_sync_committee.boxed()) .or(post_beacon_pool_bls_to_execution_changes.boxed()) + .or(post_beacon_rewards_sync_committee.boxed()) .or(post_validator_duties_attester.boxed()) .or(post_validator_duties_sync.boxed()) .or(post_validator_aggregate_and_proofs.boxed()) diff --git a/beacon_node/http_api/tests/fork_tests.rs b/beacon_node/http_api/tests/fork_tests.rs index e61470fe95..6144123565 100644 --- a/beacon_node/http_api/tests/fork_tests.rs +++ b/beacon_node/http_api/tests/fork_tests.rs @@ -6,6 +6,7 @@ use beacon_chain::{ }; use eth2::types::{IndexedErrorMessage, StateId, SyncSubcommittee}; use genesis::{bls_withdrawal_credentials, interop_genesis_state_with_withdrawal_credentials}; +use std::collections::HashSet; use types::{ test_utils::{generate_deterministic_keypair, generate_deterministic_keypairs}, Address, ChainSpec, Epoch, EthSpec, Hash256, MinimalEthSpec, Slot, @@ -438,6 +439,8 @@ async fn bls_to_execution_changes_update_all_around_capella_fork() { .await .unwrap(); + let expected_received_pre_capella_messages = valid_address_changes[..num_pre_capella].to_vec(); + // Conflicting changes for the same validators should all fail. let error = client .post_beacon_pool_bls_to_execution_changes(&conflicting_address_changes[..num_pre_capella]) @@ -464,6 +467,20 @@ async fn bls_to_execution_changes_update_all_around_capella_fork() { harness.extend_to_slot(capella_slot - 1).await; assert_eq!(harness.head_slot(), capella_slot - 1); + assert_eq!( + harness + .chain + .op_pool + .get_bls_to_execution_changes_received_pre_capella( + &harness.chain.head_snapshot().beacon_state, + &spec, + ) + .into_iter() + .collect::>(), + HashSet::from_iter(expected_received_pre_capella_messages.into_iter()), + "all pre-capella messages should be queued for capella broadcast" + ); + // Add Capella blocks which should be full of BLS to execution changes. for i in 0..validator_count / max_bls_to_execution_changes { let head_block_root = harness.extend_slots(1).await; diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 86733cf63a..43099c7a91 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -11,9 +11,11 @@ use eth2::{ types::{BlockId as CoreBlockId, StateId as CoreStateId, *}, BeaconNodeHttpClient, Error, StatusCode, Timeouts, }; -use execution_layer::test_utils::Operation; use execution_layer::test_utils::TestingBuilder; use execution_layer::test_utils::DEFAULT_BUILDER_THRESHOLD_WEI; +use execution_layer::test_utils::{ + Operation, DEFAULT_BUILDER_PAYLOAD_VALUE_WEI, DEFAULT_MOCK_EL_PAYLOAD_VALUE_WEI, +}; use futures::stream::{Stream, StreamExt}; use futures::FutureExt; use http_api::{BlockId, StateId}; @@ -72,38 +74,53 @@ struct ApiTester { mock_builder: Option>>, } +struct ApiTesterConfig { + spec: ChainSpec, + builder_threshold: Option, +} + +impl Default for ApiTesterConfig { + fn default() -> Self { + let mut spec = E::default_spec(); + spec.shard_committee_period = 2; + Self { + spec, + builder_threshold: None, + } + } +} + impl ApiTester { pub async fn new() -> Self { // This allows for testing voluntary exits without building out a massive chain. - let mut spec = E::default_spec(); - spec.shard_committee_period = 2; - Self::new_from_spec(spec).await + Self::new_from_config(ApiTesterConfig::default()).await } pub async fn new_with_hard_forks(altair: bool, bellatrix: bool) -> Self { - let mut spec = E::default_spec(); - spec.shard_committee_period = 2; + let mut config = ApiTesterConfig::default(); // Set whether the chain has undergone each hard fork. if altair { - spec.altair_fork_epoch = Some(Epoch::new(0)); + config.spec.altair_fork_epoch = Some(Epoch::new(0)); } if bellatrix { - spec.bellatrix_fork_epoch = Some(Epoch::new(0)); + config.spec.bellatrix_fork_epoch = Some(Epoch::new(0)); } - Self::new_from_spec(spec).await + Self::new_from_config(config).await } - pub async fn new_from_spec(spec: ChainSpec) -> Self { + pub async fn new_from_config(config: ApiTesterConfig) -> Self { // Get a random unused port + let spec = config.spec; let port = unused_port::unused_tcp_port().unwrap(); let beacon_url = SensitiveUrl::parse(format!("http://127.0.0.1:{port}").as_str()).unwrap(); let harness = Arc::new( BeaconChainHarness::builder(MainnetEthSpec) .spec(spec.clone()) + .logger(logging::test_logger()) .deterministic_keypairs(VALIDATOR_COUNT) .fresh_ephemeral_store() - .mock_execution_layer_with_builder(beacon_url.clone()) + .mock_execution_layer_with_builder(beacon_url.clone(), config.builder_threshold) .build(), ); @@ -358,6 +375,28 @@ impl ApiTester { tester } + pub async fn new_mev_tester_no_builder_threshold() -> Self { + let mut config = ApiTesterConfig { + builder_threshold: Some(0), + spec: E::default_spec(), + }; + config.spec.altair_fork_epoch = Some(Epoch::new(0)); + config.spec.bellatrix_fork_epoch = Some(Epoch::new(0)); + let tester = Self::new_from_config(config) + .await + .test_post_validator_register_validator() + .await; + tester + .mock_builder + .as_ref() + .unwrap() + .builder + .add_operation(Operation::Value(Uint256::from( + DEFAULT_BUILDER_PAYLOAD_VALUE_WEI, + ))); + tester + } + fn skip_slots(self, count: u64) -> Self { for _ in 0..count { self.chain @@ -3278,6 +3317,117 @@ impl ApiTester { self } + pub async fn test_builder_payload_chosen_when_more_profitable(self) -> Self { + // Mutate value. + self.mock_builder + .as_ref() + .unwrap() + .builder + .add_operation(Operation::Value(Uint256::from( + DEFAULT_MOCK_EL_PAYLOAD_VALUE_WEI + 1, + ))); + + let slot = self.chain.slot().unwrap(); + let epoch = self.chain.epoch().unwrap(); + + let (_, randao_reveal) = self.get_test_randao(slot, epoch).await; + + let payload: BlindedPayload = self + .client + .get_validator_blinded_blocks::>(slot, &randao_reveal, None) + .await + .unwrap() + .data + .body() + .execution_payload() + .unwrap() + .into(); + + // The builder's payload should've been chosen, so this cache should not be populated + assert!(self + .chain + .execution_layer + .as_ref() + .unwrap() + .get_payload_by_root(&payload.tree_hash_root()) + .is_none()); + self + } + + pub async fn test_local_payload_chosen_when_equally_profitable(self) -> Self { + // Mutate value. + self.mock_builder + .as_ref() + .unwrap() + .builder + .add_operation(Operation::Value(Uint256::from( + DEFAULT_MOCK_EL_PAYLOAD_VALUE_WEI, + ))); + + let slot = self.chain.slot().unwrap(); + let epoch = self.chain.epoch().unwrap(); + + let (_, randao_reveal) = self.get_test_randao(slot, epoch).await; + + let payload: BlindedPayload = self + .client + .get_validator_blinded_blocks::>(slot, &randao_reveal, None) + .await + .unwrap() + .data + .body() + .execution_payload() + .unwrap() + .into(); + + // The local payload should've been chosen, so this cache should be populated + assert!(self + .chain + .execution_layer + .as_ref() + .unwrap() + .get_payload_by_root(&payload.tree_hash_root()) + .is_some()); + self + } + + pub async fn test_local_payload_chosen_when_more_profitable(self) -> Self { + // Mutate value. + self.mock_builder + .as_ref() + .unwrap() + .builder + .add_operation(Operation::Value(Uint256::from( + DEFAULT_MOCK_EL_PAYLOAD_VALUE_WEI - 1, + ))); + + let slot = self.chain.slot().unwrap(); + let epoch = self.chain.epoch().unwrap(); + + let (_, randao_reveal) = self.get_test_randao(slot, epoch).await; + + let payload: BlindedPayload = self + .client + .get_validator_blinded_blocks::>(slot, &randao_reveal, None) + .await + .unwrap() + .data + .body() + .execution_payload() + .unwrap() + .into(); + + // The local payload should've been chosen, so this cache should be populated + assert!(self + .chain + .execution_layer + .as_ref() + .unwrap() + .get_payload_by_root(&payload.tree_hash_root()) + .is_some()); + self + } + #[cfg(target_os = "linux")] pub async fn test_get_lighthouse_health(self) -> Self { self.client.get_lighthouse_health().await.unwrap(); @@ -3747,9 +3897,9 @@ async fn get_events() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_events_altair() { - let mut spec = E::default_spec(); - spec.altair_fork_epoch = Some(Epoch::new(0)); - ApiTester::new_from_spec(spec) + let mut config = ApiTesterConfig::default(); + config.spec.altair_fork_epoch = Some(Epoch::new(0)); + ApiTester::new_from_config(config) .await .test_get_events_altair() .await; @@ -4262,6 +4412,18 @@ async fn builder_inadequate_builder_threshold() { .await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn builder_payload_chosen_by_profit() { + ApiTester::new_mev_tester_no_builder_threshold() + .await + .test_builder_payload_chosen_when_more_profitable() + .await + .test_local_payload_chosen_when_equally_profitable() + .await + .test_local_payload_chosen_when_more_profitable() + .await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn lighthouse_endpoints() { ApiTester::new() diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index b1d928eecb..5ce3311693 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -45,6 +45,7 @@ tokio-util = { version = "0.6.3", features = ["time"] } derivative = "2.2.0" delay_map = "0.1.1" ethereum-types = { version = "0.14.1", optional = true } +operation_pool = { path = "../operation_pool" } [features] deterministic_long_lived_attnets = [ "ethereum-types" ] diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index e92f03fb91..b08c127af3 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -802,6 +802,21 @@ impl std::convert::From> for WorkEvent { seen_timestamp, }, }, + ReadyWork::LightClientUpdate(QueuedLightClientUpdate { + peer_id, + message_id, + light_client_optimistic_update, + seen_timestamp, + .. + }) => Self { + drop_during_sync: true, + work: Work::UnknownLightClientOptimisticUpdate { + message_id, + peer_id, + light_client_optimistic_update, + seen_timestamp, + }, + }, } } } @@ -986,6 +1001,7 @@ impl Work { Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE, Work::GossipBlsToExecutionChange { .. } => GOSSIP_BLS_TO_EXECUTION_CHANGE, + Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE, } } } @@ -1522,6 +1538,9 @@ impl BeaconProcessor { Work::BlobsByRootsRequest { .. } => { blbroots_queue.push(work, work_id, &self.log) } + Work::UnknownLightClientOptimisticUpdate { .. } => { + unknown_light_client_update_queue.push(work, work_id, &self.log) + } } } } diff --git a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs index 4d0bdc0027..debfaa478c 100644 --- a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs +++ b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs @@ -31,7 +31,8 @@ use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::time::error::Error as TimeError; use tokio_util::time::delay_queue::{DelayQueue, Key as DelayKey}; use types::{ - Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof, SubnetId, + Attestation, EthSpec, Hash256, LightClientOptimisticUpdate, SignedAggregateAndProof, + SignedBeaconBlock, SubnetId, }; const TASK_NAME: &str = "beacon_processor_reprocess_queue"; diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 7970a32b45..5e84cbb5f2 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -13,6 +13,7 @@ use beacon_chain::{ GossipVerifiedBlock, NotifyExecutionLayer, }; use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; +use operation_pool::ReceivedPreCapella; use slog::{crit, debug, error, info, trace, warn}; use slot_clock::SlotClock; use ssz::Encode; @@ -1262,7 +1263,12 @@ impl Worker { self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); - self.chain.import_bls_to_execution_change(change); + // Address change messages from gossip are only processed *after* the + // Capella fork epoch. + let received_pre_capella = ReceivedPreCapella::No; + + self.chain + .import_bls_to_execution_change(change, received_pre_capella); debug!( self.log, diff --git a/beacon_node/operation_pool/Cargo.toml b/beacon_node/operation_pool/Cargo.toml index 8483233589..cc4eacde89 100644 --- a/beacon_node/operation_pool/Cargo.toml +++ b/beacon_node/operation_pool/Cargo.toml @@ -19,6 +19,7 @@ serde = "1.0.116" serde_derive = "1.0.116" store = { path = "../store" } bitvec = "1" +rand = "0.8.5" [dev-dependencies] beacon_chain = { path = "../beacon_chain" } diff --git a/beacon_node/operation_pool/src/bls_to_execution_changes.rs b/beacon_node/operation_pool/src/bls_to_execution_changes.rs index 84513d466e..c73666e145 100644 --- a/beacon_node/operation_pool/src/bls_to_execution_changes.rs +++ b/beacon_node/operation_pool/src/bls_to_execution_changes.rs @@ -1,11 +1,20 @@ use state_processing::SigVerifiedOp; -use std::collections::{hash_map::Entry, HashMap}; +use std::collections::{hash_map::Entry, HashMap, HashSet}; use std::sync::Arc; use types::{ AbstractExecPayload, BeaconState, ChainSpec, EthSpec, SignedBeaconBlock, SignedBlsToExecutionChange, }; +/// Indicates if a `BlsToExecutionChange` was received before or after the +/// Capella fork. This is used to know which messages we should broadcast at the +/// Capella fork epoch. +#[derive(Copy, Clone)] +pub enum ReceivedPreCapella { + Yes, + No, +} + /// Pool of BLS to execution changes that maintains a LIFO queue and an index by validator. /// /// Using the LIFO queue for block production disincentivises spam on P2P at the Capella fork, @@ -16,6 +25,9 @@ pub struct BlsToExecutionChanges { by_validator_index: HashMap>>, /// Last-in-first-out (LIFO) queue of verified messages. queue: Vec>>, + /// Contains a set of validator indices which need to have their changes + /// broadcast at the capella epoch. + received_pre_capella_indices: HashSet, } impl BlsToExecutionChanges { @@ -31,16 +43,18 @@ impl BlsToExecutionChanges { pub fn insert( &mut self, verified_change: SigVerifiedOp, + received_pre_capella: ReceivedPreCapella, ) -> bool { + let validator_index = verified_change.as_inner().message.validator_index; // Wrap in an `Arc` once on insert. let verified_change = Arc::new(verified_change); - match self - .by_validator_index - .entry(verified_change.as_inner().message.validator_index) - { + match self.by_validator_index.entry(validator_index) { Entry::Vacant(entry) => { self.queue.push(verified_change.clone()); entry.insert(verified_change); + if matches!(received_pre_capella, ReceivedPreCapella::Yes) { + self.received_pre_capella_indices.insert(validator_index); + } true } Entry::Occupied(_) => false, @@ -61,6 +75,24 @@ impl BlsToExecutionChanges { self.queue.iter().rev() } + /// Returns only those which are flagged for broadcasting at the Capella + /// fork. Uses FIFO ordering, although we expect this list to be shuffled by + /// the caller. + pub fn iter_received_pre_capella( + &self, + ) -> impl Iterator>> { + self.queue.iter().filter(|address_change| { + self.received_pre_capella_indices + .contains(&address_change.as_inner().message.validator_index) + }) + } + + /// Returns the set of indicies which should have their address changes + /// broadcast at the Capella fork. + pub fn iter_pre_capella_indices(&self) -> impl Iterator { + self.received_pre_capella_indices.iter() + } + /// Prune BLS to execution changes that have been applied to the state more than 1 block ago. /// /// The block check is necessary to avoid pruning too eagerly and losing the ability to include @@ -102,4 +134,14 @@ impl BlsToExecutionChanges { self.by_validator_index.remove(&validator_index); } } + + /// Removes `broadcasted` validators from the set of validators that should + /// have their BLS changes broadcast at the Capella fork boundary. + pub fn register_indices_broadcasted_at_capella(&mut self, broadcasted: &HashSet) { + self.received_pre_capella_indices = self + .received_pre_capella_indices + .difference(broadcasted) + .copied() + .collect(); + } } diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index 4643addad5..d401deb896 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -9,12 +9,13 @@ mod persistence; mod reward_cache; mod sync_aggregate_id; +pub use crate::bls_to_execution_changes::ReceivedPreCapella; pub use attestation::AttMaxCover; pub use attestation_storage::{AttestationRef, SplitAttestation}; pub use max_cover::MaxCover; pub use persistence::{ PersistedOperationPool, PersistedOperationPoolV12, PersistedOperationPoolV14, - PersistedOperationPoolV5, + PersistedOperationPoolV15, PersistedOperationPoolV5, }; pub use reward_cache::RewardCache; @@ -24,6 +25,8 @@ use crate::sync_aggregate_id::SyncAggregateId; use attester_slashing::AttesterSlashingMaxCover; use max_cover::maximum_cover; use parking_lot::{RwLock, RwLockWriteGuard}; +use rand::seq::SliceRandom; +use rand::thread_rng; use state_processing::per_block_processing::errors::AttestationValidationError; use state_processing::per_block_processing::{ get_slashable_indices_modular, verify_exit, VerifySignatures, @@ -533,10 +536,11 @@ impl OperationPool { pub fn insert_bls_to_execution_change( &self, verified_change: SigVerifiedOp, + received_pre_capella: ReceivedPreCapella, ) -> bool { self.bls_to_execution_changes .write() - .insert(verified_change) + .insert(verified_change, received_pre_capella) } /// Get a list of execution changes for inclusion in a block. @@ -562,6 +566,42 @@ impl OperationPool { ) } + /// Get a list of execution changes to be broadcast at the Capella fork. + /// + /// The list that is returned will be shuffled to help provide a fair + /// broadcast of messages. + pub fn get_bls_to_execution_changes_received_pre_capella( + &self, + state: &BeaconState, + spec: &ChainSpec, + ) -> Vec { + let mut changes = filter_limit_operations( + self.bls_to_execution_changes + .read() + .iter_received_pre_capella(), + |address_change| { + address_change.signature_is_still_valid(&state.fork()) + && state + .get_validator(address_change.as_inner().message.validator_index as usize) + .map_or(false, |validator| { + !validator.has_eth1_withdrawal_credential(spec) + }) + }, + |address_change| address_change.as_inner().clone(), + usize::max_value(), + ); + changes.shuffle(&mut thread_rng()); + changes + } + + /// Removes `broadcasted` validators from the set of validators that should + /// have their BLS changes broadcast at the Capella fork boundary. + pub fn register_indices_broadcasted_at_capella(&self, broadcasted: &HashSet) { + self.bls_to_execution_changes + .write() + .register_indices_broadcasted_at_capella(broadcasted); + } + /// Prune BLS to execution changes that have been applied to the state more than 1 block ago. pub fn prune_bls_to_execution_changes>( &self, diff --git a/beacon_node/operation_pool/src/persistence.rs b/beacon_node/operation_pool/src/persistence.rs index 4948040ae1..65354e01ac 100644 --- a/beacon_node/operation_pool/src/persistence.rs +++ b/beacon_node/operation_pool/src/persistence.rs @@ -1,6 +1,6 @@ use crate::attestation_id::AttestationId; use crate::attestation_storage::AttestationMap; -use crate::bls_to_execution_changes::BlsToExecutionChanges; +use crate::bls_to_execution_changes::{BlsToExecutionChanges, ReceivedPreCapella}; use crate::sync_aggregate_id::SyncAggregateId; use crate::OpPoolError; use crate::OperationPool; @@ -9,6 +9,8 @@ use parking_lot::RwLock; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use state_processing::SigVerifiedOp; +use std::collections::HashSet; +use std::mem; use store::{DBColumn, Error as StoreError, StoreItem}; use types::*; @@ -19,7 +21,7 @@ type PersistedSyncContributions = Vec<(SyncAggregateId, Vec { #[superstruct(only(V5))] pub attestations_v5: Vec<(AttestationId, Vec>)>, /// Attestations and their attesting indices. - #[superstruct(only(V12, V14))] + #[superstruct(only(V12, V14, V15))] pub attestations: Vec<(Attestation, Vec)>, /// Mapping from sync contribution ID to sync contributions and aggregate. pub sync_contributions: PersistedSyncContributions, @@ -41,23 +43,27 @@ pub struct PersistedOperationPool { #[superstruct(only(V5))] pub attester_slashings_v5: Vec<(AttesterSlashing, ForkVersion)>, /// Attester slashings. - #[superstruct(only(V12, V14))] + #[superstruct(only(V12, V14, V15))] pub attester_slashings: Vec, T>>, /// [DEPRECATED] Proposer slashings. #[superstruct(only(V5))] pub proposer_slashings_v5: Vec, /// Proposer slashings with fork information. - #[superstruct(only(V12, V14))] + #[superstruct(only(V12, V14, V15))] pub proposer_slashings: Vec>, /// [DEPRECATED] Voluntary exits. #[superstruct(only(V5))] pub voluntary_exits_v5: Vec, /// Voluntary exits with fork information. - #[superstruct(only(V12, V14))] + #[superstruct(only(V12, V14, V15))] pub voluntary_exits: Vec>, /// BLS to Execution Changes - #[superstruct(only(V14))] + #[superstruct(only(V14, V15))] pub bls_to_execution_changes: Vec>, + /// Validator indices with BLS to Execution Changes to be broadcast at the + /// Capella fork. + #[superstruct(only(V15))] + pub capella_bls_change_broadcast_indices: Vec, } impl PersistedOperationPool { @@ -110,18 +116,26 @@ impl PersistedOperationPool { .map(|bls_to_execution_change| (**bls_to_execution_change).clone()) .collect(); - PersistedOperationPool::V14(PersistedOperationPoolV14 { + let capella_bls_change_broadcast_indices = operation_pool + .bls_to_execution_changes + .read() + .iter_pre_capella_indices() + .copied() + .collect(); + + PersistedOperationPool::V15(PersistedOperationPoolV15 { attestations, sync_contributions, attester_slashings, proposer_slashings, voluntary_exits, bls_to_execution_changes, + capella_bls_change_broadcast_indices, }) } /// Reconstruct an `OperationPool`. - pub fn into_operation_pool(self) -> Result, OpPoolError> { + pub fn into_operation_pool(mut self) -> Result, OpPoolError> { let attester_slashings = RwLock::new(self.attester_slashings()?.iter().cloned().collect()); let proposer_slashings = RwLock::new( self.proposer_slashings()? @@ -142,33 +156,43 @@ impl PersistedOperationPool { PersistedOperationPool::V5(_) | PersistedOperationPool::V12(_) => { return Err(OpPoolError::IncorrectOpPoolVariant) } - PersistedOperationPool::V14(ref pool) => { + PersistedOperationPool::V14(_) | PersistedOperationPool::V15(_) => { let mut map = AttestationMap::default(); - for (att, attesting_indices) in pool.attestations.clone() { + for (att, attesting_indices) in self.attestations()?.clone() { map.insert(att, attesting_indices); } RwLock::new(map) } }; - let bls_to_execution_changes = match self { - PersistedOperationPool::V5(_) | PersistedOperationPool::V12(_) => { - return Err(OpPoolError::IncorrectOpPoolVariant) + let mut bls_to_execution_changes = BlsToExecutionChanges::default(); + if let Ok(persisted_changes) = self.bls_to_execution_changes_mut() { + let persisted_changes = mem::take(persisted_changes); + + let broadcast_indices = + if let Ok(indices) = self.capella_bls_change_broadcast_indices_mut() { + mem::take(indices).into_iter().collect() + } else { + HashSet::new() + }; + + for bls_to_execution_change in persisted_changes { + let received_pre_capella = if broadcast_indices + .contains(&bls_to_execution_change.as_inner().message.validator_index) + { + ReceivedPreCapella::Yes + } else { + ReceivedPreCapella::No + }; + bls_to_execution_changes.insert(bls_to_execution_change, received_pre_capella); } - PersistedOperationPool::V14(pool) => { - let mut bls_to_execution_changes = BlsToExecutionChanges::default(); - for bls_to_execution_change in pool.bls_to_execution_changes { - bls_to_execution_changes.insert(bls_to_execution_change); - } - RwLock::new(bls_to_execution_changes) - } - }; + } let op_pool = OperationPool { attestations, sync_contributions, attester_slashings, proposer_slashings, voluntary_exits, - bls_to_execution_changes, + bls_to_execution_changes: RwLock::new(bls_to_execution_changes), reward_cache: Default::default(), _phantom: Default::default(), }; @@ -204,6 +228,20 @@ impl StoreItem for PersistedOperationPoolV12 { } } +impl StoreItem for PersistedOperationPoolV14 { + fn db_column() -> DBColumn { + DBColumn::OpPool + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + PersistedOperationPoolV14::from_ssz_bytes(bytes).map_err(Into::into) + } +} + /// Deserialization for `PersistedOperationPool` defaults to `PersistedOperationPool::V12`. impl StoreItem for PersistedOperationPool { fn db_column() -> DBColumn { @@ -216,8 +254,8 @@ impl StoreItem for PersistedOperationPool { fn from_store_bytes(bytes: &[u8]) -> Result { // Default deserialization to the latest variant. - PersistedOperationPoolV14::from_ssz_bytes(bytes) - .map(Self::V14) + PersistedOperationPoolV15::from_ssz_bytes(bytes) + .map(Self::V15) .map_err(Into::into) } } diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index bd3f480cf6..15bcaf1bb0 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -4,7 +4,7 @@ use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use types::{Checkpoint, Hash256, Slot}; -pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(14); +pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(15); // All the keys that get stored under the `BeaconMeta` column. // diff --git a/book/src/checkpoint-sync.md b/book/src/checkpoint-sync.md index 893c545cb9..47dc03b20c 100644 --- a/book/src/checkpoint-sync.md +++ b/book/src/checkpoint-sync.md @@ -97,7 +97,7 @@ You can opt-in to reconstructing all of the historic states by providing the The database keeps track of three markers to determine the availability of historic blocks and states: -* `oldest_block_slot`: All blocks with slots less than or equal to this value are available in the +* `oldest_block_slot`: All blocks with slots greater than or equal to this value are available in the database. Additionally, the genesis block is always available. * `state_lower_limit`: All states with slots _less than or equal to_ this value are available in the database. The minimum value is 0, indicating that the genesis state is always available. diff --git a/common/compare_fields/src/lib.rs b/common/compare_fields/src/lib.rs index a0166eb500..bc2f5446ad 100644 --- a/common/compare_fields/src/lib.rs +++ b/common/compare_fields/src/lib.rs @@ -115,11 +115,7 @@ impl Comparison { let mut children = vec![]; for i in 0..std::cmp::max(a.len(), b.len()) { - children.push(FieldComparison::new( - format!("{:}", i), - &a.get(i), - &b.get(i), - )); + children.push(FieldComparison::new(format!("{i}"), &a.get(i), &b.get(i))); } Self::parent(field_name, a == b, children) @@ -164,8 +160,8 @@ impl FieldComparison { Self { field_name, equal: a == b, - a: format!("{:?}", a), - b: format!("{:?}", b), + a: format!("{a:?}"), + b: format!("{b:?}"), } } diff --git a/common/compare_fields_derive/src/lib.rs b/common/compare_fields_derive/src/lib.rs index beabc6ca9b..752c09ee05 100644 --- a/common/compare_fields_derive/src/lib.rs +++ b/common/compare_fields_derive/src/lib.rs @@ -32,7 +32,7 @@ pub fn compare_fields_derive(input: TokenStream) -> TokenStream { _ => panic!("compare_fields_derive only supports named struct fields."), }; - let field_name = format!("{:}", ident_a); + let field_name = ident_a.to_string(); let ident_b = ident_a.clone(); let quote = if is_slice(field) { diff --git a/consensus/serde_utils/src/lib.rs b/consensus/serde_utils/src/lib.rs index 75fd6009b7..92b5966c9a 100644 --- a/consensus/serde_utils/src/lib.rs +++ b/consensus/serde_utils/src/lib.rs @@ -7,7 +7,6 @@ pub mod json_str; pub mod list_of_bytes_lists; pub mod quoted_u64_vec; pub mod u256_hex_be; -pub mod u256_hex_be_opt; pub mod u32_hex; pub mod u64_hex_be; pub mod u8_hex; diff --git a/consensus/serde_utils/src/u256_hex_be_opt.rs b/consensus/serde_utils/src/u256_hex_be_opt.rs index e53cb5c695..e69de29bb2 100644 --- a/consensus/serde_utils/src/u256_hex_be_opt.rs +++ b/consensus/serde_utils/src/u256_hex_be_opt.rs @@ -1,188 +0,0 @@ -use ethereum_types::U256; - -use serde::de::{Error, Visitor}; -use serde::{de, Deserializer, Serialize, Serializer}; -use std::fmt; -use std::str::FromStr; - -pub fn serialize(num: &Option, serializer: S) -> Result -where - S: Serializer, -{ - num.serialize(serializer) -} - -pub struct U256Visitor; - -impl<'de> Visitor<'de> for U256Visitor { - type Value = Option; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("a well formatted hex string") - } - - fn visit_some(self, deserializer: D) -> Result - where - D: Deserializer<'de>, - { - deserializer.deserialize_string(U256Visitor) - } - - fn visit_none(self) -> Result - where - E: Error, - { - Ok(None) - } - - fn visit_str(self, value: &str) -> Result - where - E: de::Error, - { - if !value.starts_with("0x") { - return Err(de::Error::custom("must start with 0x")); - } - let stripped = &value[2..]; - if stripped.is_empty() { - Err(de::Error::custom(format!( - "quantity cannot be {:?}", - stripped - ))) - } else if stripped == "0" { - Ok(Some(value.to_string())) - } else if stripped.starts_with('0') { - Err(de::Error::custom("cannot have leading zero")) - } else { - Ok(Some(value.to_string())) - } - } -} - -pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> -where - D: Deserializer<'de>, -{ - let decoded = deserializer.deserialize_option(U256Visitor)?; - - decoded - .map(|decoded| { - U256::from_str(&decoded) - .map_err(|e| de::Error::custom(format!("Invalid U256 string: {}", e))) - }) - .transpose() -} - -#[cfg(test)] -mod test { - use ethereum_types::U256; - use serde::{Deserialize, Serialize}; - use serde_json; - - #[derive(Debug, PartialEq, Serialize, Deserialize)] - #[serde(transparent)] - struct Wrapper { - #[serde(with = "super")] - val: Option, - } - - #[test] - fn encoding() { - assert_eq!( - &serde_json::to_string(&Wrapper { - val: Some(0.into()) - }) - .unwrap(), - "\"0x0\"" - ); - assert_eq!( - &serde_json::to_string(&Wrapper { - val: Some(1.into()) - }) - .unwrap(), - "\"0x1\"" - ); - assert_eq!( - &serde_json::to_string(&Wrapper { - val: Some(256.into()) - }) - .unwrap(), - "\"0x100\"" - ); - assert_eq!( - &serde_json::to_string(&Wrapper { - val: Some(65.into()) - }) - .unwrap(), - "\"0x41\"" - ); - assert_eq!( - &serde_json::to_string(&Wrapper { - val: Some(1024.into()) - }) - .unwrap(), - "\"0x400\"" - ); - assert_eq!( - &serde_json::to_string(&Wrapper { - val: Some(U256::max_value() - 1) - }) - .unwrap(), - "\"0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffe\"" - ); - assert_eq!( - &serde_json::to_string(&Wrapper { - val: Some(U256::max_value()) - }) - .unwrap(), - "\"0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff\"" - ); - } - - #[test] - fn decoding() { - assert_eq!( - serde_json::from_str::("\"0x0\"").unwrap(), - Wrapper { - val: Some(0.into()) - }, - ); - assert_eq!( - serde_json::from_str::("\"0x41\"").unwrap(), - Wrapper { - val: Some(65.into()) - }, - ); - assert_eq!( - serde_json::from_str::("\"0x400\"").unwrap(), - Wrapper { - val: Some(1024.into()) - }, - ); - assert_eq!( - serde_json::from_str::( - "\"0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffe\"" - ) - .unwrap(), - Wrapper { - val: Some(U256::max_value() - 1) - }, - ); - assert_eq!( - serde_json::from_str::( - "\"0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff\"" - ) - .unwrap(), - Wrapper { - val: Some(U256::max_value()) - }, - ); - assert_eq!( - serde_json::from_str::("null").unwrap(), - Wrapper { val: None }, - ); - serde_json::from_str::("\"0x\"").unwrap_err(); - serde_json::from_str::("\"0x0400\"").unwrap_err(); - serde_json::from_str::("\"400\"").unwrap_err(); - serde_json::from_str::("\"ff\"").unwrap_err(); - } -} diff --git a/consensus/state_processing/src/per_block_processing.rs b/consensus/state_processing/src/per_block_processing.rs index c61662090e..b2d7e00072 100644 --- a/consensus/state_processing/src/per_block_processing.rs +++ b/consensus/state_processing/src/per_block_processing.rs @@ -343,10 +343,10 @@ pub fn get_new_eth1_data( /// Contains a partial set of checks from the `process_execution_payload` function: /// /// https://github.com/ethereum/consensus-specs/blob/v1.1.5/specs/merge/beacon-chain.md#process_execution_payload -pub fn partially_verify_execution_payload<'payload, T: EthSpec, Payload: AbstractExecPayload>( +pub fn partially_verify_execution_payload>( state: &BeaconState, block_slot: Slot, - payload: Payload::Ref<'payload>, + payload: Payload::Ref<'_>, spec: &ChainSpec, ) -> Result<(), BlockProcessingError> { if is_merge_transition_complete(state) { @@ -385,9 +385,9 @@ pub fn partially_verify_execution_payload<'payload, T: EthSpec, Payload: Abstrac /// Partially equivalent to the `process_execution_payload` function: /// /// https://github.com/ethereum/consensus-specs/blob/v1.1.5/specs/merge/beacon-chain.md#process_execution_payload -pub fn process_execution_payload<'payload, T: EthSpec, Payload: AbstractExecPayload>( +pub fn process_execution_payload>( state: &mut BeaconState, - payload: Payload::Ref<'payload>, + payload: Payload::Ref<'_>, spec: &ChainSpec, ) -> Result<(), BlockProcessingError> { partially_verify_execution_payload::(state, state.slot(), payload, spec)?; @@ -516,9 +516,9 @@ pub fn get_expected_withdrawals( } /// Apply withdrawals to the state. -pub fn process_withdrawals<'payload, T: EthSpec, Payload: AbstractExecPayload>( +pub fn process_withdrawals>( state: &mut BeaconState, - payload: Payload::Ref<'payload>, + payload: Payload::Ref<'_>, spec: &ChainSpec, ) -> Result<(), BlockProcessingError> { match state { diff --git a/consensus/state_processing/src/per_block_processing/process_operations.rs b/consensus/state_processing/src/per_block_processing/process_operations.rs index 48c524fd42..8a6163f29b 100644 --- a/consensus/state_processing/src/per_block_processing/process_operations.rs +++ b/consensus/state_processing/src/per_block_processing/process_operations.rs @@ -9,9 +9,9 @@ use crate::VerifySignatures; use safe_arith::SafeArith; use types::consts::altair::{PARTICIPATION_FLAG_WEIGHTS, PROPOSER_WEIGHT, WEIGHT_DENOMINATOR}; -pub fn process_operations<'a, T: EthSpec, Payload: AbstractExecPayload>( +pub fn process_operations>( state: &mut BeaconState, - block_body: BeaconBlockBodyRef<'a, T, Payload>, + block_body: BeaconBlockBodyRef, verify_signatures: VerifySignatures, ctxt: &mut ConsensusContext, spec: &ChainSpec, @@ -237,9 +237,9 @@ pub fn process_attester_slashings( } /// Wrapper function to handle calling the correct version of `process_attestations` based on /// the fork. -pub fn process_attestations<'a, T: EthSpec, Payload: AbstractExecPayload>( +pub fn process_attestations>( state: &mut BeaconState, - block_body: BeaconBlockBodyRef<'a, T, Payload>, + block_body: BeaconBlockBodyRef, verify_signatures: VerifySignatures, ctxt: &mut ConsensusContext, spec: &ChainSpec, diff --git a/consensus/types/src/bls_to_execution_change.rs b/consensus/types/src/bls_to_execution_change.rs index cb73e43f9a..b279515bd1 100644 --- a/consensus/types/src/bls_to_execution_change.rs +++ b/consensus/types/src/bls_to_execution_change.rs @@ -10,6 +10,7 @@ use tree_hash_derive::TreeHash; arbitrary::Arbitrary, Debug, PartialEq, + Eq, Hash, Clone, Serialize, diff --git a/consensus/types/src/execution_payload.rs b/consensus/types/src/execution_payload.rs index a57d411412..1721960f8b 100644 --- a/consensus/types/src/execution_payload.rs +++ b/consensus/types/src/execution_payload.rs @@ -87,6 +87,16 @@ pub struct ExecutionPayload { pub withdrawals: Withdrawals, } +impl<'a, T: EthSpec> ExecutionPayloadRef<'a, T> { + // this emulates clone on a normal reference type + pub fn clone_from_ref(&self) -> ExecutionPayload { + map_execution_payload_ref!(&'a _, self, move |payload, cons| { + cons(payload); + payload.clone().into() + }) + } +} + impl ExecutionPayload { pub fn from_ssz_bytes(bytes: &[u8], fork_name: ForkName) -> Result { match fork_name { diff --git a/consensus/types/src/signed_bls_to_execution_change.rs b/consensus/types/src/signed_bls_to_execution_change.rs index 92b79fad3f..2b17095ae7 100644 --- a/consensus/types/src/signed_bls_to_execution_change.rs +++ b/consensus/types/src/signed_bls_to_execution_change.rs @@ -10,6 +10,7 @@ use tree_hash_derive::TreeHash; arbitrary::Arbitrary, Debug, PartialEq, + Eq, Hash, Clone, Serialize, diff --git a/testing/execution_engine_integration/src/test_rig.rs b/testing/execution_engine_integration/src/test_rig.rs index 2daacb0add..fe7e51e923 100644 --- a/testing/execution_engine_integration/src/test_rig.rs +++ b/testing/execution_engine_integration/src/test_rig.rs @@ -127,7 +127,7 @@ impl TestRig { ..Default::default() }; let execution_layer = - ExecutionLayer::from_config(config, executor.clone(), log.clone(), &spec).unwrap(); + ExecutionLayer::from_config(config, executor.clone(), log.clone()).unwrap(); ExecutionPair { execution_engine, execution_layer, @@ -146,7 +146,7 @@ impl TestRig { ..Default::default() }; let execution_layer = - ExecutionLayer::from_config(config, executor, log.clone(), &spec).unwrap(); + ExecutionLayer::from_config(config, executor, log.clone()).unwrap(); ExecutionPair { execution_engine, execution_layer,