diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 443cc686eb..56f578b21b 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -664,7 +664,7 @@ pub struct BeaconChainHarness { pub runtime: TestRuntime, pub mock_execution_layer: Option>, - pub mock_builder: Option>>, + pub mock_builder: Option>>, pub rng: Mutex, } @@ -722,6 +722,7 @@ where mock_el_url, beacon_url, self.spec.clone(), + self.chain.slot_clock.clone(), self.runtime.task_executor.clone(), ); diff --git a/beacon_node/execution_layer/src/test_utils/mock_builder.rs b/beacon_node/execution_layer/src/test_utils/mock_builder.rs index 3540909fe4..037a737164 100644 --- a/beacon_node/execution_layer/src/test_utils/mock_builder.rs +++ b/beacon_node/execution_layer/src/test_utils/mock_builder.rs @@ -10,6 +10,7 @@ use fork_choice::ForkchoiceUpdateParameters; use parking_lot::RwLock; use sensitive_url::SensitiveUrl; use slog::{debug, error, info, warn, Logger}; +use slot_clock::SlotClock; use std::collections::HashMap; use std::fmt::Debug; use std::future::Future; @@ -27,7 +28,7 @@ use types::builder_bid::{ use types::{ Address, BeaconState, ChainSpec, Epoch, EthSpec, ExecPayload, ExecutionPayload, ExecutionPayloadHeaderRefMut, ExecutionRequests, ForkName, ForkVersionedResponse, Hash256, - PublicKeyBytes, Signature, SignedBlindedBeaconBlock, SignedRoot, + ProposerPreparationData, PublicKeyBytes, Signature, SignedBlindedBeaconBlock, SignedRoot, SignedValidatorRegistrationData, Slot, Uint256, }; use types::{ExecutionBlockHash, SecretKey}; @@ -282,7 +283,7 @@ pub struct PayloadParametersCloned { } #[derive(Clone)] -pub struct MockBuilder { +pub struct MockBuilder { el: ExecutionLayer, beacon_client: BeaconNodeHttpClient, spec: Arc, @@ -290,7 +291,6 @@ pub struct MockBuilder { builder_sk: SecretKey, operations: Arc>>, invalidate_signatures: Arc>, - genesis_time: Option, /// Only returns bids for registered validators if set to true. `true` by default. validate_pubkey: bool, /// Do not apply any operations if set to `false`. @@ -303,14 +303,17 @@ pub struct MockBuilder { max_bid: bool, /// A cache that stores the proposers index for a given epoch proposers_cache: Arc>>>, + pubkey_cache: Arc>>, + slot_clock: S, log: Logger, } -impl MockBuilder { +impl MockBuilder { pub fn new_for_testing( mock_el_url: SensitiveUrl, beacon_url: SensitiveUrl, spec: Arc, + slot_clock: S, executor: TaskExecutor, ) -> (Self, (SocketAddr, impl Future)) { let file = NamedTempFile::new().unwrap(); @@ -336,6 +339,7 @@ impl MockBuilder { false, spec, None, + slot_clock, executor.log().clone(), ); let host: Ipv4Addr = Ipv4Addr::LOCALHOST; @@ -353,6 +357,7 @@ impl MockBuilder { max_bid: bool, spec: Arc, sk: Option<&[u8]>, + slot_clock: S, log: Logger, ) -> Self { let builder_sk = if let Some(sk_bytes) = sk { @@ -381,9 +386,10 @@ impl MockBuilder { invalidate_signatures: Arc::new(RwLock::new(false)), payload_id_cache: Arc::new(RwLock::new(HashMap::new())), proposers_cache: Arc::new(RwLock::new(HashMap::new())), + pubkey_cache: Arc::new(RwLock::new(HashMap::new())), apply_operations, max_bid, - genesis_time: None, + slot_clock, log, } } @@ -424,6 +430,13 @@ impl MockBuilder { "Registering validators"; "count" => registrations.len(), ); + + let mut preparation_data = Vec::with_capacity(registrations.len()); + let current_epoch = self + .slot_clock + .now() + .ok_or("Failed to get current epoch".to_string())? + .epoch(E::slots_per_epoch()); for registration in registrations { if !registration.verify_signature(&self.spec) { error!( @@ -432,12 +445,77 @@ impl MockBuilder { "error" => "invalid signature", "validator" => %registration.message.pubkey ); - return Err("invalid signature".to_string()); + continue; } + let pubkey = registration.message.pubkey.clone(); + + // First try to get the validator index from cache + let validator_index = { + let pubkey_cache = self.pubkey_cache.write(); + pubkey_cache.get(&pubkey).copied() + }; + + // Get or fetch the validator index + let validator_index = if let Some(validator_index) = validator_index { + validator_index + } else { + // Do the async fetch without holding any locks + let validator_index = self + .beacon_client + .get_beacon_states_validator_id( + StateId::Head, + &ValidatorId::PublicKey(pubkey.clone()), + ) + .await + .map_err(|e| format!("Failed to get validator index: {:?}", e))? + .ok_or("Beacon node returned 404".to_string())? + .data + .index; + + // Update cache after the fetch + // Note: Doing this to avoid locking between await points + { + let mut pubkey_cache = self.pubkey_cache.write(); + pubkey_cache.insert(pubkey.clone(), validator_index); + } + + validator_index + }; + let prep_data = ( + ProposerPreparationData { + validator_index, + fee_recipient: registration.message.fee_recipient, + }, + Some(registration.message.gas_limit), + ); + info!( + self.log, + "Proposer prep data for {} {:?}", pubkey, prep_data, + ); + preparation_data.push(( + ProposerPreparationData { + validator_index, + fee_recipient: registration.message.fee_recipient, + }, + Some(registration.message.gas_limit), + )); self.val_registration_cache .write() .insert(registration.message.pubkey, registration); } + info!( + self.log, + "Updating proposer preparation for {} validators", + preparation_data.len(), + ); + self.el + .update_proposer_preparation( + current_epoch, + preparation_data + .iter() + .map(|(data, gas_limit)| (data, gas_limit)), + ) + .await; Ok(()) } @@ -815,16 +893,7 @@ impl MockBuilder { }; let slots_since_genesis = slot.as_u64() - self.spec.genesis_slot.as_u64(); - let genesis_time = if let Some(genesis_time) = self.genesis_time { - genesis_time - } else { - self.beacon_client - .get_beacon_genesis() - .await - .map_err(|_| "couldn't get beacon genesis".to_string())? - .data - .genesis_time - }; + let genesis_time = self.slot_clock.genesis_duration().as_secs(); let timestamp = (slots_since_genesis * self.spec.seconds_per_slot) + genesis_time; let head_state: BeaconState = self @@ -926,10 +995,10 @@ impl MockBuilder { /// the requests. /// /// We should eventually move this to axum when we move everything else. -pub fn serve( +pub fn serve( listen_addr: Ipv4Addr, listen_port: u16, - builder: MockBuilder, + builder: MockBuilder, ) -> Result<(SocketAddr, impl Future), crate::test_utils::Error> { let inner_ctx = builder.clone(); let ctx_filter = warp::any().map(move || inner_ctx.clone()); @@ -938,57 +1007,57 @@ pub fn serve( .and(warp::path("v1")) .and(warp::path("builder")); - let validators = prefix - .and(warp::path("validators")) - .and(warp::body::json()) - .and(warp::path::end()) - .and(ctx_filter.clone()) - .and_then( - |registrations: Vec, - builder: MockBuilder| async move { - builder - .register_validators(registrations) - .await - .map_err(|e| warp::reject::custom(Custom(e)))?; - Ok::<_, Rejection>(warp::reply()) - }, - ) - .boxed(); - - let blinded_block = + let validators = prefix - .and(warp::path("blinded_blocks")) + .and(warp::path("validators")) .and(warp::body::json()) - .and(warp::header::header::(CONSENSUS_VERSION_HEADER)) .and(warp::path::end()) .and(ctx_filter.clone()) .and_then( - |block: SignedBlindedBeaconBlock, - fork_name: ForkName, - builder: MockBuilder| async move { - let payload = builder - .submit_blinded_block(block) + |registrations: Vec, + builder: MockBuilder| async move { + builder + .register_validators(registrations) .await .map_err(|e| warp::reject::custom(Custom(e)))?; - let resp: ForkVersionedResponse<_> = ForkVersionedResponse { - version: Some(fork_name), - metadata: Default::default(), - data: payload, - }; - - let json_payload = serde_json::to_string(&resp) - .map_err(|_| reject("coudn't serialize response"))?; - Ok::<_, warp::reject::Rejection>( - warp::http::Response::builder() - .status(200) - .body( - serde_json::to_string(&json_payload) - .map_err(|_| reject("invalid JSON"))?, - ) - .unwrap(), - ) + Ok::<_, Rejection>(warp::reply()) }, - ); + ) + .boxed(); + + let blinded_block = prefix + .and(warp::path("blinded_blocks")) + .and(warp::body::json()) + .and(warp::header::header::(CONSENSUS_VERSION_HEADER)) + .and(warp::path::end()) + .and(ctx_filter.clone()) + .and_then( + |block: SignedBlindedBeaconBlock, + fork_name: ForkName, + builder: MockBuilder| async move { + let payload = builder + .submit_blinded_block(block) + .await + .map_err(|e| warp::reject::custom(Custom(e)))?; + let resp: ForkVersionedResponse<_> = ForkVersionedResponse { + version: Some(fork_name), + metadata: Default::default(), + data: payload, + }; + + let json_payload = serde_json::to_string(&resp) + .map_err(|_| reject("coudn't serialize response"))?; + Ok::<_, warp::reject::Rejection>( + warp::http::Response::builder() + .status(200) + .body( + serde_json::to_string(&json_payload) + .map_err(|_| reject("invalid JSON"))?, + ) + .unwrap(), + ) + }, + ); let status = prefix .and(warp::path("status")) @@ -1011,7 +1080,7 @@ pub fn serve( |slot: Slot, parent_hash: ExecutionBlockHash, pubkey: PublicKeyBytes, - builder: MockBuilder| async move { + builder: MockBuilder| async move { let fork_name = builder.fork_name_at_slot(slot); let signed_bid = builder .get_header(slot, parent_hash, pubkey)