From 7cee5d60906b624e139b0f74ff84951a3fb20d2f Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Fri, 28 Nov 2025 15:30:49 +1100 Subject: [PATCH] Optimise pubkey cache initialisation during beacon node startup (#8451) Instrument beacon node startup and parallelise pubkey cache initialisation. I instrumented beacon node startup and noticed that pubkey cache takes a long time to initialise, mostly due to decompressing all the validator pubkeys. This PR uses rayon to parallelize the decompression on initial checkpoint sync. The pubkeys are stored uncompressed, so the decopression time is not a problem on subsequent restarts. On restarts, we still deserialize pubkeys, but the timing is quite minimal on Sepolia so I didn't investigate further. `validator_pubkey_cache_new` timing on Sepolia: * before: 109.64ms * with parallelization: 21ms on Hoodi: * before: times out with Kurtosis after 120s * with parallelization: 12.77s to import keys **UPDATE**: downloading checkpoint state + genesis state takes about 2 minutes on my laptop, so it seems like the BN managed to start the http server just before timing out (after the optimisation). image Co-Authored-By: Jimmy Chen --- .../src/validator_pubkey_cache.rs | 111 ++++++++++++++---- beacon_node/client/src/builder.rs | 5 +- 2 files changed, 94 insertions(+), 22 deletions(-) diff --git a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs index 39d2c2c2d7..a346a649f0 100644 --- a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs +++ b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs @@ -1,12 +1,14 @@ use crate::errors::BeaconChainError; use crate::{BeaconChainTypes, BeaconStore}; use bls::PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN; +use rayon::prelude::*; use smallvec::SmallVec; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use std::collections::HashMap; use std::marker::PhantomData; use store::{DBColumn, Error as StoreError, StoreItem, StoreOp}; +use tracing::instrument; use types::{BeaconState, FixedBytesExtended, Hash256, PublicKey, PublicKeyBytes}; /// Provides a mapping of `validator_index -> validator_publickey`. @@ -28,6 +30,7 @@ impl ValidatorPubkeyCache { /// Create a new public key cache using the keys in `state.validators`. /// /// The new cache will be updated with the keys from `state` and immediately written to disk. + #[instrument(name = "validator_pubkey_cache_new", skip_all)] pub fn new( state: &BeaconState, store: BeaconStore, @@ -46,6 +49,7 @@ impl ValidatorPubkeyCache { } /// Load the pubkey cache from the given on-disk database. + #[instrument(name = "validator_pubkey_cache_load_from_store", skip_all)] pub fn load_from_store(store: BeaconStore) -> Result { let mut pubkeys = vec![]; let mut indices = HashMap::new(); @@ -77,6 +81,7 @@ impl ValidatorPubkeyCache { /// Does not delete any keys from `self` if they don't appear in `state`. /// /// NOTE: The caller *must* commit the returned I/O batch as part of the block import process. + #[instrument(skip_all)] pub fn import_new_pubkeys( &mut self, state: &BeaconState, @@ -106,29 +111,58 @@ impl ValidatorPubkeyCache { self.indices.reserve(validator_keys.len()); let mut store_ops = Vec::with_capacity(validator_keys.len()); - for pubkey_bytes in validator_keys { - let i = self.pubkeys.len(); - if self.indices.contains_key(&pubkey_bytes) { - return Err(BeaconChainError::DuplicateValidatorPublicKey); + let is_initial_import = self.pubkeys.is_empty(); + + // Helper to insert a decompressed key + let mut insert_key = + |pubkey_bytes: PublicKeyBytes, pubkey: PublicKey| -> Result<(), BeaconChainError> { + let i = self.pubkeys.len(); + + if self.indices.contains_key(&pubkey_bytes) { + return Err(BeaconChainError::DuplicateValidatorPublicKey); + } + + // Stage the new validator key for writing to disk. + // It will be committed atomically when the block that introduced it is written to disk. + // Notably it is NOT written while the write lock on the cache is held. + // See: https://github.com/sigp/lighthouse/issues/2327 + store_ops.push(StoreOp::KeyValueOp( + DatabasePubkey::from_pubkey(&pubkey) + .as_kv_store_op(DatabasePubkey::key_for_index(i)), + )); + + self.pubkeys.push(pubkey); + self.pubkey_bytes.push(pubkey_bytes); + self.indices.insert(pubkey_bytes, i); + Ok(()) + }; + + if is_initial_import { + // On first startup, decompress keys in parallel for better performance + let validator_keys_vec: Vec = validator_keys.collect(); + + let decompressed: Vec<(PublicKeyBytes, PublicKey)> = validator_keys_vec + .into_par_iter() + .map(|pubkey_bytes| { + let pubkey = (&pubkey_bytes) + .try_into() + .map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?; + Ok((pubkey_bytes, pubkey)) + }) + .collect::, BeaconChainError>>()?; + + for (pubkey_bytes, pubkey) in decompressed { + insert_key(pubkey_bytes, pubkey)?; + } + } else { + // Sequential path for incremental updates + for pubkey_bytes in validator_keys { + let pubkey = (&pubkey_bytes) + .try_into() + .map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?; + insert_key(pubkey_bytes, pubkey)?; } - - let pubkey = (&pubkey_bytes) - .try_into() - .map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?; - - // Stage the new validator key for writing to disk. - // It will be committed atomically when the block that introduced it is written to disk. - // Notably it is NOT written while the write lock on the cache is held. - // See: https://github.com/sigp/lighthouse/issues/2327 - store_ops.push(StoreOp::KeyValueOp( - DatabasePubkey::from_pubkey(&pubkey) - .as_kv_store_op(DatabasePubkey::key_for_index(i)), - )); - - self.pubkeys.push(pubkey); - self.pubkey_bytes.push(pubkey_bytes); - self.indices.insert(pubkey_bytes, i); } Ok(store_ops) @@ -324,4 +358,39 @@ mod test { let cache = ValidatorPubkeyCache::load_from_store(store).expect("should open cache"); check_cache_get(&cache, &keypairs[..]); } + + #[test] + fn parallel_import_maintains_order() { + // Test that parallel decompression on first startup maintains correct order and indices + let (state, keypairs) = get_state(100); + let store = get_store(); + + // Create cache from empty state (triggers parallel path) + let cache: ValidatorPubkeyCache = + ValidatorPubkeyCache::new(&state, store).expect("should create cache"); + + check_cache_get(&cache, &keypairs[..]); + } + + #[test] + fn incremental_import_maintains_order() { + // Test that incremental imports maintain correct order (triggers sequential path) + let store = get_store(); + + // Start with 50 validators + let (state1, keypairs1) = get_state(50); + let mut cache = + ValidatorPubkeyCache::new(&state1, store.clone()).expect("should create cache"); + check_cache_get(&cache, &keypairs1[..]); + + // Add 50 more validators + let (state2, keypairs2) = get_state(100); + let ops = cache + .import_new_pubkeys(&state2) + .expect("should import pubkeys"); + store.do_atomically_with_block_and_blobs_cache(ops).unwrap(); + + // Verify all 100 validators are correctly indexed + check_cache_get(&cache, &keypairs2[..]); + } } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index bac61fc735..c48021e45d 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -42,7 +42,7 @@ use std::time::Duration; use std::time::{SystemTime, UNIX_EPOCH}; use store::database::interface::BeaconNodeBackend; use timer::spawn_timer; -use tracing::{debug, info, warn}; +use tracing::{debug, info, instrument, warn}; use types::data_column_custody_group::compute_ordered_custody_column_indices; use types::{ BeaconState, BlobSidecarList, ChainSpec, EthSpec, ExecutionBlockHash, Hash256, @@ -151,6 +151,7 @@ where /// Initializes the `BeaconChainBuilder`. The `build_beacon_chain` method will need to be /// called later in order to actually instantiate the `BeaconChain`. + #[instrument(skip_all)] pub async fn beacon_chain_builder( mut self, client_genesis: ClientGenesis, @@ -613,6 +614,7 @@ where /// /// If type inference errors are being raised, see the comment on the definition of `Self`. #[allow(clippy::type_complexity)] + #[instrument(name = "build_client", skip_all)] pub fn build( mut self, ) -> Result>, String> { @@ -813,6 +815,7 @@ where TColdStore: ItemStore + 'static, { /// Consumes the internal `BeaconChainBuilder`, attaching the resulting `BeaconChain` to self. + #[instrument(skip_all)] pub fn build_beacon_chain(mut self) -> Result { let context = self .runtime_context