Use dedicated cache for HTTP API route (#9318)

- PR https://github.com/sigp/lighthouse/pull/9305 wants to store PTCs in the committee cache.

BUT the http API route wants to use the committee cache and insert historical committees (i.e. given state at epoch 1000, compute and store the committee for epoch 900).

If we want a single cache to serve both use cases we need to:
- Have entries in the committee cache that have no PTC: Makes reading PTCs from the cache not deterministic
- Compute historical PTC: A bunch of complicated code that's useless

Instead we can add a separate cache for the API, very simple one, that caches committees only. And have the one in the beacon chain compute and cache PTCs always.

### Performance impact

Slightly additional memory cost for users of the `beacon/states/committees` route. Caching is almost equivalent, except for queries of recent committees that may already exist in the beacon chain's committee cache.

### AI disclousure

This PR was written by hand 90%. Claude fixed some warp type issues


  


Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com>
This commit is contained in:
Lion - dapplion
2026-05-18 23:12:17 -06:00
committed by GitHub
parent fd0852a8e5
commit 398efc3acc
6 changed files with 115 additions and 41 deletions

View File

@@ -36,6 +36,7 @@ use rand::SeedableRng;
use rand::rngs::{OsRng, StdRng}; use rand::rngs::{OsRng, StdRng};
use slasher::Slasher; use slasher::Slasher;
use slasher_service::SlasherService; use slasher_service::SlasherService;
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@@ -639,6 +640,10 @@ where
network_globals: self.network_globals.clone(), network_globals: self.network_globals.clone(),
beacon_processor_send: Some(beacon_processor_channels.beacon_processor_tx.clone()), beacon_processor_send: Some(beacon_processor_channels.beacon_processor_tx.clone()),
sse_logging_components: runtime_context.sse_logging_components.clone(), sse_logging_components: runtime_context.sse_logging_components.clone(),
historical_committee_cache: Arc::new(http_api::HistoricalCommitteeCache::new(
NonZeroUsize::new(self.http_api_config.historical_committee_cache_size)
.unwrap_or(NonZeroUsize::MIN),
)),
}); });
let exit = runtime_context.executor.exit(); let exit = runtime_context.executor.exit();

View File

@@ -1,4 +1,5 @@
use crate::StateId; use crate::StateId;
use crate::caches::{HistoricalCommitteeCache, HistoricalShufflingId};
use crate::task_spawner::{Priority, TaskSpawner}; use crate::task_spawner::{Priority, TaskSpawner};
use crate::utils::ResponseFilter; use crate::utils::ResponseFilter;
use crate::validator::pubkey_to_validator_index; use crate::validator::pubkey_to_validator_index;
@@ -13,7 +14,10 @@ use eth2::types::{
}; };
use ssz::Encode; use ssz::Encode;
use std::sync::Arc; use std::sync::Arc;
use types::{AttestationShufflingId, BeaconStateError, CommitteeCache, EthSpec, RelativeEpoch}; use types::{
AttestationShufflingId, BeaconStateError, CommitteeCache, EthSpec, RelativeEpoch,
RelativeEpochError,
};
use warp::filters::BoxedFilter; use warp::filters::BoxedFilter;
use warp::http::Response; use warp::http::Response;
use warp::hyper::Body; use warp::hyper::Body;
@@ -26,6 +30,8 @@ type BeaconStatesPath<T> = BoxedFilter<(
Arc<BeaconChain<T>>, Arc<BeaconChain<T>>,
)>; )>;
type BeaconStatesCommitteesFilter = BoxedFilter<(Arc<HistoricalCommitteeCache>,)>;
// GET beacon/states/{state_id}/pending_consolidations // GET beacon/states/{state_id}/pending_consolidations
pub fn get_beacon_state_pending_consolidations<T: BeaconChainTypes>( pub fn get_beacon_state_pending_consolidations<T: BeaconChainTypes>(
beacon_states_path: BeaconStatesPath<T>, beacon_states_path: BeaconStatesPath<T>,
@@ -337,17 +343,20 @@ pub fn get_beacon_state_sync_committees<T: BeaconChainTypes>(
// GET beacon/states/{state_id}/committees?slot,index,epoch // GET beacon/states/{state_id}/committees?slot,index,epoch
pub fn get_beacon_state_committees<T: BeaconChainTypes>( pub fn get_beacon_state_committees<T: BeaconChainTypes>(
beacon_states_path: BeaconStatesPath<T>, beacon_states_path: BeaconStatesPath<T>,
beacon_states_committees_filter: BeaconStatesCommitteesFilter,
) -> ResponseFilter { ) -> ResponseFilter {
beacon_states_path beacon_states_path
.clone() .clone()
.and(warp::path("committees")) .and(warp::path("committees"))
.and(warp::query::<eth2::types::CommitteesQuery>()) .and(warp::query::<eth2::types::CommitteesQuery>())
.and(beacon_states_committees_filter)
.and(warp::path::end()) .and(warp::path::end())
.then( .then(
|state_id: StateId, |state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>, task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>, chain: Arc<BeaconChain<T>>,
query: eth2::types::CommitteesQuery| { query: eth2::types::CommitteesQuery,
historical_committee_cache: Arc<HistoricalCommitteeCache>| {
task_spawner.blocking_json_task(Priority::P1, move || { task_spawner.blocking_json_task(Priority::P1, move || {
let (data, execution_optimistic, finalized) = state_id let (data, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized( .map_state_and_execution_optimistic_and_finalized(
@@ -364,24 +373,24 @@ pub fn get_beacon_state_committees<T: BeaconChainTypes>(
let shuffling_id = if let Ok(Some(shuffling_decision_block)) = let shuffling_id = if let Ok(Some(shuffling_decision_block)) =
chain.block_root_at_slot(decision_slot, WhenSlotSkipped::Prev) chain.block_root_at_slot(decision_slot, WhenSlotSkipped::Prev)
{ {
Some(AttestationShufflingId { Some(HistoricalShufflingId::ShufflingId(
AttestationShufflingId {
shuffling_epoch: epoch, shuffling_epoch: epoch,
shuffling_decision_block, shuffling_decision_block,
}) },
))
} else if epoch < chain.head().finalized_checkpoint().epoch {
// Use the case for finalized epochs
Some(HistoricalShufflingId::FinalizedEpoch(epoch))
} else { } else {
None None
}; };
// Attempt to read from the chain cache if there exists a // Attempt to read from the chain cache if there exists a
// shuffling_id // shuffling_id
let maybe_cached_shuffling = if let Some(shuffling_id) = let maybe_cached_shuffling =
shuffling_id.as_ref() if let Some(shuffling_id) = shuffling_id.as_ref() {
{ historical_committee_cache.get(shuffling_id)
chain
.shuffling_cache
.try_write_for(std::time::Duration::from_secs(1))
.and_then(|mut cache_write| cache_write.get(shuffling_id))
.and_then(|cache_item| cache_item.wait().ok())
} else { } else {
None None
}; };
@@ -390,7 +399,7 @@ pub fn get_beacon_state_committees<T: BeaconChainTypes>(
if let Some(shuffling) = maybe_cached_shuffling { if let Some(shuffling) = maybe_cached_shuffling {
shuffling shuffling
} else { } else {
let possibly_built_cache = match RelativeEpoch::from_epoch( let committee_cache = match RelativeEpoch::from_epoch(
current_epoch, current_epoch,
epoch, epoch,
) { ) {
@@ -401,11 +410,19 @@ pub fn get_beacon_state_committees<T: BeaconChainTypes>(
{ {
state.committee_cache(relative_epoch).cloned() state.committee_cache(relative_epoch).cloned()
} }
_ => CommitteeCache::initialized( Ok(_) | Err(RelativeEpochError::EpochTooLow { .. }) => {
CommitteeCache::initialized(
state, state,
epoch, epoch,
&chain.spec, &chain.spec,
), )
}
Err(RelativeEpochError::EpochTooHigh { .. }) => {
Err(BeaconStateError::EpochOutOfBounds)
}
Err(RelativeEpochError::ArithError(e)) => {
Err(BeaconStateError::ArithError(e))
}
} }
.map_err(|e| match e { .map_err(|e| match e {
BeaconStateError::EpochOutOfBounds => { BeaconStateError::EpochOutOfBounds => {
@@ -419,22 +436,12 @@ pub fn get_beacon_state_committees<T: BeaconChainTypes>(
), ),
})?; })?;
// Attempt to write to the beacon cache (only if the cache if let Some(shuffling_id) = shuffling_id {
// size is not the default value). historical_committee_cache
if chain.config.shuffling_cache_size .insert(shuffling_id, committee_cache.clone());
!= beacon_chain::shuffling_cache::DEFAULT_CACHE_SIZE
&& let Some(shuffling_id) = shuffling_id
&& let Some(mut cache_write) = chain
.shuffling_cache
.try_write_for(std::time::Duration::from_secs(1))
{
cache_write.insert_committee_cache(
shuffling_id,
&possibly_built_cache,
);
} }
possibly_built_cache committee_cache
}; };
// Use either the supplied slot or all slots in the epoch. // Use either the supplied slot or all slots in the epoch.

View File

@@ -0,0 +1,43 @@
use lru::LruCache;
use parking_lot::Mutex;
use std::num::NonZeroUsize;
use std::sync::Arc;
use types::{AttestationShufflingId, CommitteeCache, Epoch};
/// See `shuffling_cache::DEFAULT_CACHE_SIZE` for rationale
pub const DEFAULT_HISTORICAL_COMMITTEE_CACHE_SIZE: usize = 16;
/// Indexes the `HistoricalCommitteeCache`. We can compute committees for very old epochs, and we
/// can't retrieve the decision root cheaply from a state. For those cases we allow the cache to
/// key those committees by finalized epoch.
#[derive(Eq, Hash, PartialEq)]
pub enum HistoricalShufflingId {
FinalizedEpoch(Epoch),
ShufflingId(AttestationShufflingId),
}
/// Dedicated cache for attestation committees, used exclusively by the HTTP API.
///
/// This may contain committees for finalized and unfinalized epochs. The name is slightly
/// missleading :)
pub struct HistoricalCommitteeCache {
committees: Mutex<LruCache<HistoricalShufflingId, Arc<CommitteeCache>>>,
}
impl HistoricalCommitteeCache {
pub fn new(size: NonZeroUsize) -> Self {
Self {
committees: Mutex::new(LruCache::new(size)),
}
}
}
impl HistoricalCommitteeCache {
pub fn get(&self, id: &HistoricalShufflingId) -> Option<Arc<CommitteeCache>> {
self.committees.lock().get(id).cloned()
}
pub fn insert(&self, id: HistoricalShufflingId, cache: Arc<CommitteeCache>) {
self.committees.lock().put(id, cache);
}
}

View File

@@ -12,6 +12,7 @@ mod beacon;
mod block_id; mod block_id;
mod build_block_contents; mod build_block_contents;
mod builder_states; mod builder_states;
mod caches;
mod custody; mod custody;
mod database; mod database;
mod light_client; mod light_client;
@@ -40,6 +41,8 @@ use crate::beacon::execution_payload_envelope::{
post_beacon_execution_payload_envelope_ssz, post_beacon_execution_payload_envelope_ssz,
}; };
use crate::beacon::pool::*; use crate::beacon::pool::*;
use crate::caches::DEFAULT_HISTORICAL_COMMITTEE_CACHE_SIZE;
pub use crate::caches::HistoricalCommitteeCache;
use crate::light_client::{get_light_client_bootstrap, get_light_client_updates}; use crate::light_client::{get_light_client_bootstrap, get_light_client_updates};
use crate::utils::{AnyVersionFilter, EthV1Filter}; use crate::utils::{AnyVersionFilter, EthV1Filter};
use crate::validator::post_validator_liveness_epoch; use crate::validator::post_validator_liveness_epoch;
@@ -132,6 +135,7 @@ pub struct Context<T: BeaconChainTypes> {
pub network_globals: Option<Arc<NetworkGlobals<T::EthSpec>>>, pub network_globals: Option<Arc<NetworkGlobals<T::EthSpec>>>,
pub beacon_processor_send: Option<BeaconProcessorSend<T::EthSpec>>, pub beacon_processor_send: Option<BeaconProcessorSend<T::EthSpec>>,
pub sse_logging_components: Option<SSELoggingComponents>, pub sse_logging_components: Option<SSELoggingComponents>,
pub historical_committee_cache: Arc<HistoricalCommitteeCache>,
} }
/// Configuration for the HTTP server. /// Configuration for the HTTP server.
@@ -148,6 +152,7 @@ pub struct Config {
#[serde(with = "eth2::types::serde_status_code")] #[serde(with = "eth2::types::serde_status_code")]
pub duplicate_block_status_code: StatusCode, pub duplicate_block_status_code: StatusCode,
pub target_peers: usize, pub target_peers: usize,
pub historical_committee_cache_size: usize,
} }
impl Default for Config { impl Default for Config {
@@ -163,6 +168,7 @@ impl Default for Config {
enable_beacon_processor: true, enable_beacon_processor: true,
duplicate_block_status_code: StatusCode::ACCEPTED, duplicate_block_status_code: StatusCode::ACCEPTED,
target_peers: 100, target_peers: 100,
historical_committee_cache_size: DEFAULT_HISTORICAL_COMMITTEE_CACHE_SIZE,
} }
} }
} }
@@ -416,6 +422,11 @@ pub fn serve<T: BeaconChainTypes>(
}) })
.boxed(); .boxed();
let historical_committee_cache = ctx.historical_committee_cache.clone();
let beacon_states_committees_filter = warp::any()
.map(move || historical_committee_cache.clone())
.boxed();
// Create a `warp` filter that provides access to the network sender channel. // Create a `warp` filter that provides access to the network sender channel.
let network_tx = ctx let network_tx = ctx
.network_senders .network_senders
@@ -628,8 +639,10 @@ pub fn serve<T: BeaconChainTypes>(
states::get_beacon_state_validators_id(beacon_states_path.clone()); states::get_beacon_state_validators_id(beacon_states_path.clone());
// GET beacon/states/{state_id}/committees?slot,index,epoch // GET beacon/states/{state_id}/committees?slot,index,epoch
let get_beacon_state_committees = let get_beacon_state_committees = states::get_beacon_state_committees(
states::get_beacon_state_committees(beacon_states_path.clone()); beacon_states_path.clone(),
beacon_states_committees_filter,
);
// GET beacon/states/{state_id}/sync_committees?epoch // GET beacon/states/{state_id}/sync_committees?epoch
let get_beacon_state_sync_committees = let get_beacon_state_sync_committees =

View File

@@ -1,4 +1,4 @@
use crate::{Config, Context}; use crate::{Config, Context, caches::HistoricalCommitteeCache};
use beacon_chain::{ use beacon_chain::{
BeaconChain, BeaconChainTypes, BeaconChain, BeaconChainTypes,
custody_context::NodeCustodyType, custody_context::NodeCustodyType,
@@ -22,10 +22,10 @@ use lighthouse_network::{
}; };
use network::{NetworkReceivers, NetworkSenders}; use network::{NetworkReceivers, NetworkSenders};
use sensitive_url::SensitiveUrl; use sensitive_url::SensitiveUrl;
use std::future::Future;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use std::{future::Future, num::NonZeroUsize};
use store::MemoryStore; use store::MemoryStore;
use task_executor::test_utils::TestRuntime; use task_executor::test_utils::TestRuntime;
use types::{ChainSpec, EthSpec}; use types::{ChainSpec, EthSpec};
@@ -293,6 +293,9 @@ pub async fn create_api_server_with_config<T: BeaconChainTypes>(
network_globals: Some(network_globals), network_globals: Some(network_globals),
beacon_processor_send: Some(beacon_processor_send), beacon_processor_send: Some(beacon_processor_send),
sse_logging_components: None, sse_logging_components: None,
historical_committee_cache: Arc::new(HistoricalCommitteeCache::new(
NonZeroUsize::new(http_config.historical_committee_cache_size).unwrap(),
)),
}); });
let (listening_socket, server) = let (listening_socket, server) =

View File

@@ -215,6 +215,9 @@ pub fn get_config<E: EthSpec>(
if let Some(cache_size) = clap_utils::parse_optional(cli_args, "shuffling-cache-size")? { if let Some(cache_size) = clap_utils::parse_optional(cli_args, "shuffling-cache-size")? {
client_config.chain.shuffling_cache_size = cache_size; client_config.chain.shuffling_cache_size = cache_size;
// Mantain backwards compatibility with users customizing `shuffling_cache_size` to tweak
// the behaviour of the HTTP API route `beacon/states/committees`
client_config.http_api.historical_committee_cache_size = cache_size;
} }
if let Some(batches) = clap_utils::parse_optional(cli_args, "blob-publication-batches")? { if let Some(batches) = clap_utils::parse_optional(cli_args, "blob-publication-batches")? {