diff --git a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs index 39d2c2c2d7..a346a649f0 100644 --- a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs +++ b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs @@ -1,12 +1,14 @@ use crate::errors::BeaconChainError; use crate::{BeaconChainTypes, BeaconStore}; use bls::PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN; +use rayon::prelude::*; use smallvec::SmallVec; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use std::collections::HashMap; use std::marker::PhantomData; use store::{DBColumn, Error as StoreError, StoreItem, StoreOp}; +use tracing::instrument; use types::{BeaconState, FixedBytesExtended, Hash256, PublicKey, PublicKeyBytes}; /// Provides a mapping of `validator_index -> validator_publickey`. @@ -28,6 +30,7 @@ impl ValidatorPubkeyCache { /// Create a new public key cache using the keys in `state.validators`. /// /// The new cache will be updated with the keys from `state` and immediately written to disk. + #[instrument(name = "validator_pubkey_cache_new", skip_all)] pub fn new( state: &BeaconState, store: BeaconStore, @@ -46,6 +49,7 @@ impl ValidatorPubkeyCache { } /// Load the pubkey cache from the given on-disk database. + #[instrument(name = "validator_pubkey_cache_load_from_store", skip_all)] pub fn load_from_store(store: BeaconStore) -> Result { let mut pubkeys = vec![]; let mut indices = HashMap::new(); @@ -77,6 +81,7 @@ impl ValidatorPubkeyCache { /// Does not delete any keys from `self` if they don't appear in `state`. /// /// NOTE: The caller *must* commit the returned I/O batch as part of the block import process. + #[instrument(skip_all)] pub fn import_new_pubkeys( &mut self, state: &BeaconState, @@ -106,29 +111,58 @@ impl ValidatorPubkeyCache { self.indices.reserve(validator_keys.len()); let mut store_ops = Vec::with_capacity(validator_keys.len()); - for pubkey_bytes in validator_keys { - let i = self.pubkeys.len(); - if self.indices.contains_key(&pubkey_bytes) { - return Err(BeaconChainError::DuplicateValidatorPublicKey); + let is_initial_import = self.pubkeys.is_empty(); + + // Helper to insert a decompressed key + let mut insert_key = + |pubkey_bytes: PublicKeyBytes, pubkey: PublicKey| -> Result<(), BeaconChainError> { + let i = self.pubkeys.len(); + + if self.indices.contains_key(&pubkey_bytes) { + return Err(BeaconChainError::DuplicateValidatorPublicKey); + } + + // Stage the new validator key for writing to disk. + // It will be committed atomically when the block that introduced it is written to disk. + // Notably it is NOT written while the write lock on the cache is held. + // See: https://github.com/sigp/lighthouse/issues/2327 + store_ops.push(StoreOp::KeyValueOp( + DatabasePubkey::from_pubkey(&pubkey) + .as_kv_store_op(DatabasePubkey::key_for_index(i)), + )); + + self.pubkeys.push(pubkey); + self.pubkey_bytes.push(pubkey_bytes); + self.indices.insert(pubkey_bytes, i); + Ok(()) + }; + + if is_initial_import { + // On first startup, decompress keys in parallel for better performance + let validator_keys_vec: Vec = validator_keys.collect(); + + let decompressed: Vec<(PublicKeyBytes, PublicKey)> = validator_keys_vec + .into_par_iter() + .map(|pubkey_bytes| { + let pubkey = (&pubkey_bytes) + .try_into() + .map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?; + Ok((pubkey_bytes, pubkey)) + }) + .collect::, BeaconChainError>>()?; + + for (pubkey_bytes, pubkey) in decompressed { + insert_key(pubkey_bytes, pubkey)?; + } + } else { + // Sequential path for incremental updates + for pubkey_bytes in validator_keys { + let pubkey = (&pubkey_bytes) + .try_into() + .map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?; + insert_key(pubkey_bytes, pubkey)?; } - - let pubkey = (&pubkey_bytes) - .try_into() - .map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?; - - // Stage the new validator key for writing to disk. - // It will be committed atomically when the block that introduced it is written to disk. - // Notably it is NOT written while the write lock on the cache is held. - // See: https://github.com/sigp/lighthouse/issues/2327 - store_ops.push(StoreOp::KeyValueOp( - DatabasePubkey::from_pubkey(&pubkey) - .as_kv_store_op(DatabasePubkey::key_for_index(i)), - )); - - self.pubkeys.push(pubkey); - self.pubkey_bytes.push(pubkey_bytes); - self.indices.insert(pubkey_bytes, i); } Ok(store_ops) @@ -324,4 +358,39 @@ mod test { let cache = ValidatorPubkeyCache::load_from_store(store).expect("should open cache"); check_cache_get(&cache, &keypairs[..]); } + + #[test] + fn parallel_import_maintains_order() { + // Test that parallel decompression on first startup maintains correct order and indices + let (state, keypairs) = get_state(100); + let store = get_store(); + + // Create cache from empty state (triggers parallel path) + let cache: ValidatorPubkeyCache = + ValidatorPubkeyCache::new(&state, store).expect("should create cache"); + + check_cache_get(&cache, &keypairs[..]); + } + + #[test] + fn incremental_import_maintains_order() { + // Test that incremental imports maintain correct order (triggers sequential path) + let store = get_store(); + + // Start with 50 validators + let (state1, keypairs1) = get_state(50); + let mut cache = + ValidatorPubkeyCache::new(&state1, store.clone()).expect("should create cache"); + check_cache_get(&cache, &keypairs1[..]); + + // Add 50 more validators + let (state2, keypairs2) = get_state(100); + let ops = cache + .import_new_pubkeys(&state2) + .expect("should import pubkeys"); + store.do_atomically_with_block_and_blobs_cache(ops).unwrap(); + + // Verify all 100 validators are correctly indexed + check_cache_get(&cache, &keypairs2[..]); + } } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index bac61fc735..c48021e45d 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -42,7 +42,7 @@ use std::time::Duration; use std::time::{SystemTime, UNIX_EPOCH}; use store::database::interface::BeaconNodeBackend; use timer::spawn_timer; -use tracing::{debug, info, warn}; +use tracing::{debug, info, instrument, warn}; use types::data_column_custody_group::compute_ordered_custody_column_indices; use types::{ BeaconState, BlobSidecarList, ChainSpec, EthSpec, ExecutionBlockHash, Hash256, @@ -151,6 +151,7 @@ where /// Initializes the `BeaconChainBuilder`. The `build_beacon_chain` method will need to be /// called later in order to actually instantiate the `BeaconChain`. + #[instrument(skip_all)] pub async fn beacon_chain_builder( mut self, client_genesis: ClientGenesis, @@ -613,6 +614,7 @@ where /// /// If type inference errors are being raised, see the comment on the definition of `Self`. #[allow(clippy::type_complexity)] + #[instrument(name = "build_client", skip_all)] pub fn build( mut self, ) -> Result>, String> { @@ -813,6 +815,7 @@ where TColdStore: ItemStore + 'static, { /// Consumes the internal `BeaconChainBuilder`, attaching the resulting `BeaconChain` to self. + #[instrument(skip_all)] pub fn build_beacon_chain(mut self) -> Result { let context = self .runtime_context