mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-02 16:21:42 +00:00
Optimise pubkey cache initialisation during beacon node startup (#8451)
Instrument beacon node startup and parallelise pubkey cache initialisation. I instrumented beacon node startup and noticed that pubkey cache takes a long time to initialise, mostly due to decompressing all the validator pubkeys. This PR uses rayon to parallelize the decompression on initial checkpoint sync. The pubkeys are stored uncompressed, so the decopression time is not a problem on subsequent restarts. On restarts, we still deserialize pubkeys, but the timing is quite minimal on Sepolia so I didn't investigate further. `validator_pubkey_cache_new` timing on Sepolia: * before: 109.64ms * with parallelization: 21ms on Hoodi: * before: times out with Kurtosis after 120s * with parallelization: 12.77s to import keys **UPDATE**: downloading checkpoint state + genesis state takes about 2 minutes on my laptop, so it seems like the BN managed to start the http server just before timing out (after the optimisation). <img width="1380" height="625" alt="image" src="https://github.com/user-attachments/assets/4c548c14-57dd-4b47-af9a-115b15791940" /> Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>
This commit is contained in:
@@ -1,12 +1,14 @@
|
||||
use crate::errors::BeaconChainError;
|
||||
use crate::{BeaconChainTypes, BeaconStore};
|
||||
use bls::PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN;
|
||||
use rayon::prelude::*;
|
||||
use smallvec::SmallVec;
|
||||
use ssz::{Decode, Encode};
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use std::collections::HashMap;
|
||||
use std::marker::PhantomData;
|
||||
use store::{DBColumn, Error as StoreError, StoreItem, StoreOp};
|
||||
use tracing::instrument;
|
||||
use types::{BeaconState, FixedBytesExtended, Hash256, PublicKey, PublicKeyBytes};
|
||||
|
||||
/// Provides a mapping of `validator_index -> validator_publickey`.
|
||||
@@ -28,6 +30,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
|
||||
/// Create a new public key cache using the keys in `state.validators`.
|
||||
///
|
||||
/// The new cache will be updated with the keys from `state` and immediately written to disk.
|
||||
#[instrument(name = "validator_pubkey_cache_new", skip_all)]
|
||||
pub fn new(
|
||||
state: &BeaconState<T::EthSpec>,
|
||||
store: BeaconStore<T>,
|
||||
@@ -46,6 +49,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
|
||||
}
|
||||
|
||||
/// Load the pubkey cache from the given on-disk database.
|
||||
#[instrument(name = "validator_pubkey_cache_load_from_store", skip_all)]
|
||||
pub fn load_from_store(store: BeaconStore<T>) -> Result<Self, BeaconChainError> {
|
||||
let mut pubkeys = vec![];
|
||||
let mut indices = HashMap::new();
|
||||
@@ -77,6 +81,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
|
||||
/// Does not delete any keys from `self` if they don't appear in `state`.
|
||||
///
|
||||
/// NOTE: The caller *must* commit the returned I/O batch as part of the block import process.
|
||||
#[instrument(skip_all)]
|
||||
pub fn import_new_pubkeys(
|
||||
&mut self,
|
||||
state: &BeaconState<T::EthSpec>,
|
||||
@@ -106,29 +111,58 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
|
||||
self.indices.reserve(validator_keys.len());
|
||||
|
||||
let mut store_ops = Vec::with_capacity(validator_keys.len());
|
||||
for pubkey_bytes in validator_keys {
|
||||
let i = self.pubkeys.len();
|
||||
|
||||
if self.indices.contains_key(&pubkey_bytes) {
|
||||
return Err(BeaconChainError::DuplicateValidatorPublicKey);
|
||||
let is_initial_import = self.pubkeys.is_empty();
|
||||
|
||||
// Helper to insert a decompressed key
|
||||
let mut insert_key =
|
||||
|pubkey_bytes: PublicKeyBytes, pubkey: PublicKey| -> Result<(), BeaconChainError> {
|
||||
let i = self.pubkeys.len();
|
||||
|
||||
if self.indices.contains_key(&pubkey_bytes) {
|
||||
return Err(BeaconChainError::DuplicateValidatorPublicKey);
|
||||
}
|
||||
|
||||
// Stage the new validator key for writing to disk.
|
||||
// It will be committed atomically when the block that introduced it is written to disk.
|
||||
// Notably it is NOT written while the write lock on the cache is held.
|
||||
// See: https://github.com/sigp/lighthouse/issues/2327
|
||||
store_ops.push(StoreOp::KeyValueOp(
|
||||
DatabasePubkey::from_pubkey(&pubkey)
|
||||
.as_kv_store_op(DatabasePubkey::key_for_index(i)),
|
||||
));
|
||||
|
||||
self.pubkeys.push(pubkey);
|
||||
self.pubkey_bytes.push(pubkey_bytes);
|
||||
self.indices.insert(pubkey_bytes, i);
|
||||
Ok(())
|
||||
};
|
||||
|
||||
if is_initial_import {
|
||||
// On first startup, decompress keys in parallel for better performance
|
||||
let validator_keys_vec: Vec<PublicKeyBytes> = validator_keys.collect();
|
||||
|
||||
let decompressed: Vec<(PublicKeyBytes, PublicKey)> = validator_keys_vec
|
||||
.into_par_iter()
|
||||
.map(|pubkey_bytes| {
|
||||
let pubkey = (&pubkey_bytes)
|
||||
.try_into()
|
||||
.map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?;
|
||||
Ok((pubkey_bytes, pubkey))
|
||||
})
|
||||
.collect::<Result<Vec<_>, BeaconChainError>>()?;
|
||||
|
||||
for (pubkey_bytes, pubkey) in decompressed {
|
||||
insert_key(pubkey_bytes, pubkey)?;
|
||||
}
|
||||
} else {
|
||||
// Sequential path for incremental updates
|
||||
for pubkey_bytes in validator_keys {
|
||||
let pubkey = (&pubkey_bytes)
|
||||
.try_into()
|
||||
.map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?;
|
||||
insert_key(pubkey_bytes, pubkey)?;
|
||||
}
|
||||
|
||||
let pubkey = (&pubkey_bytes)
|
||||
.try_into()
|
||||
.map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?;
|
||||
|
||||
// Stage the new validator key for writing to disk.
|
||||
// It will be committed atomically when the block that introduced it is written to disk.
|
||||
// Notably it is NOT written while the write lock on the cache is held.
|
||||
// See: https://github.com/sigp/lighthouse/issues/2327
|
||||
store_ops.push(StoreOp::KeyValueOp(
|
||||
DatabasePubkey::from_pubkey(&pubkey)
|
||||
.as_kv_store_op(DatabasePubkey::key_for_index(i)),
|
||||
));
|
||||
|
||||
self.pubkeys.push(pubkey);
|
||||
self.pubkey_bytes.push(pubkey_bytes);
|
||||
self.indices.insert(pubkey_bytes, i);
|
||||
}
|
||||
|
||||
Ok(store_ops)
|
||||
@@ -324,4 +358,39 @@ mod test {
|
||||
let cache = ValidatorPubkeyCache::load_from_store(store).expect("should open cache");
|
||||
check_cache_get(&cache, &keypairs[..]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parallel_import_maintains_order() {
|
||||
// Test that parallel decompression on first startup maintains correct order and indices
|
||||
let (state, keypairs) = get_state(100);
|
||||
let store = get_store();
|
||||
|
||||
// Create cache from empty state (triggers parallel path)
|
||||
let cache: ValidatorPubkeyCache<T> =
|
||||
ValidatorPubkeyCache::new(&state, store).expect("should create cache");
|
||||
|
||||
check_cache_get(&cache, &keypairs[..]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn incremental_import_maintains_order() {
|
||||
// Test that incremental imports maintain correct order (triggers sequential path)
|
||||
let store = get_store();
|
||||
|
||||
// Start with 50 validators
|
||||
let (state1, keypairs1) = get_state(50);
|
||||
let mut cache =
|
||||
ValidatorPubkeyCache::new(&state1, store.clone()).expect("should create cache");
|
||||
check_cache_get(&cache, &keypairs1[..]);
|
||||
|
||||
// Add 50 more validators
|
||||
let (state2, keypairs2) = get_state(100);
|
||||
let ops = cache
|
||||
.import_new_pubkeys(&state2)
|
||||
.expect("should import pubkeys");
|
||||
store.do_atomically_with_block_and_blobs_cache(ops).unwrap();
|
||||
|
||||
// Verify all 100 validators are correctly indexed
|
||||
check_cache_get(&cache, &keypairs2[..]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,7 +42,7 @@ use std::time::Duration;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use store::database::interface::BeaconNodeBackend;
|
||||
use timer::spawn_timer;
|
||||
use tracing::{debug, info, warn};
|
||||
use tracing::{debug, info, instrument, warn};
|
||||
use types::data_column_custody_group::compute_ordered_custody_column_indices;
|
||||
use types::{
|
||||
BeaconState, BlobSidecarList, ChainSpec, EthSpec, ExecutionBlockHash, Hash256,
|
||||
@@ -151,6 +151,7 @@ where
|
||||
|
||||
/// Initializes the `BeaconChainBuilder`. The `build_beacon_chain` method will need to be
|
||||
/// called later in order to actually instantiate the `BeaconChain`.
|
||||
#[instrument(skip_all)]
|
||||
pub async fn beacon_chain_builder(
|
||||
mut self,
|
||||
client_genesis: ClientGenesis,
|
||||
@@ -613,6 +614,7 @@ where
|
||||
///
|
||||
/// If type inference errors are being raised, see the comment on the definition of `Self`.
|
||||
#[allow(clippy::type_complexity)]
|
||||
#[instrument(name = "build_client", skip_all)]
|
||||
pub fn build(
|
||||
mut self,
|
||||
) -> Result<Client<Witness<TSlotClock, E, THotStore, TColdStore>>, String> {
|
||||
@@ -813,6 +815,7 @@ where
|
||||
TColdStore: ItemStore<E> + 'static,
|
||||
{
|
||||
/// Consumes the internal `BeaconChainBuilder`, attaching the resulting `BeaconChain` to self.
|
||||
#[instrument(skip_all)]
|
||||
pub fn build_beacon_chain(mut self) -> Result<Self, String> {
|
||||
let context = self
|
||||
.runtime_context
|
||||
|
||||
Reference in New Issue
Block a user