Merge remote-tracking branch 'origin/unstable' into payload-attestation-committee-cache

This commit is contained in:
Michael Sproul
2026-05-19 15:43:12 +10:00
6 changed files with 132 additions and 85 deletions

View File

@@ -36,6 +36,7 @@ use rand::SeedableRng;
use rand::rngs::{OsRng, StdRng}; use rand::rngs::{OsRng, StdRng};
use slasher::Slasher; use slasher::Slasher;
use slasher_service::SlasherService; use slasher_service::SlasherService;
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@@ -639,6 +640,10 @@ where
network_globals: self.network_globals.clone(), network_globals: self.network_globals.clone(),
beacon_processor_send: Some(beacon_processor_channels.beacon_processor_tx.clone()), beacon_processor_send: Some(beacon_processor_channels.beacon_processor_tx.clone()),
sse_logging_components: runtime_context.sse_logging_components.clone(), sse_logging_components: runtime_context.sse_logging_components.clone(),
historical_committee_cache: Arc::new(http_api::HistoricalCommitteeCache::new(
NonZeroUsize::new(self.http_api_config.historical_committee_cache_size)
.unwrap_or(NonZeroUsize::MIN),
)),
}); });
let exit = runtime_context.executor.exit(); let exit = runtime_context.executor.exit();

View File

@@ -1,4 +1,5 @@
use crate::StateId; use crate::StateId;
use crate::caches::{HistoricalCommitteeCache, HistoricalShufflingId};
use crate::task_spawner::{Priority, TaskSpawner}; use crate::task_spawner::{Priority, TaskSpawner};
use crate::utils::ResponseFilter; use crate::utils::ResponseFilter;
use crate::validator::pubkey_to_validator_index; use crate::validator::pubkey_to_validator_index;
@@ -13,7 +14,10 @@ use eth2::types::{
}; };
use ssz::Encode; use ssz::Encode;
use std::sync::Arc; use std::sync::Arc;
use types::{AttestationShufflingId, BeaconStateError, CommitteeCache, EthSpec, RelativeEpoch}; use types::{
AttestationShufflingId, BeaconStateError, CommitteeCache, EthSpec, RelativeEpoch,
RelativeEpochError,
};
use warp::filters::BoxedFilter; use warp::filters::BoxedFilter;
use warp::http::Response; use warp::http::Response;
use warp::hyper::Body; use warp::hyper::Body;
@@ -26,6 +30,8 @@ type BeaconStatesPath<T> = BoxedFilter<(
Arc<BeaconChain<T>>, Arc<BeaconChain<T>>,
)>; )>;
type BeaconStatesCommitteesFilter = BoxedFilter<(Arc<HistoricalCommitteeCache>,)>;
// GET beacon/states/{state_id}/pending_consolidations // GET beacon/states/{state_id}/pending_consolidations
pub fn get_beacon_state_pending_consolidations<T: BeaconChainTypes>( pub fn get_beacon_state_pending_consolidations<T: BeaconChainTypes>(
beacon_states_path: BeaconStatesPath<T>, beacon_states_path: BeaconStatesPath<T>,
@@ -337,17 +343,20 @@ pub fn get_beacon_state_sync_committees<T: BeaconChainTypes>(
// GET beacon/states/{state_id}/committees?slot,index,epoch // GET beacon/states/{state_id}/committees?slot,index,epoch
pub fn get_beacon_state_committees<T: BeaconChainTypes>( pub fn get_beacon_state_committees<T: BeaconChainTypes>(
beacon_states_path: BeaconStatesPath<T>, beacon_states_path: BeaconStatesPath<T>,
beacon_states_committees_filter: BeaconStatesCommitteesFilter,
) -> ResponseFilter { ) -> ResponseFilter {
beacon_states_path beacon_states_path
.clone() .clone()
.and(warp::path("committees")) .and(warp::path("committees"))
.and(warp::query::<eth2::types::CommitteesQuery>()) .and(warp::query::<eth2::types::CommitteesQuery>())
.and(beacon_states_committees_filter)
.and(warp::path::end()) .and(warp::path::end())
.then( .then(
|state_id: StateId, |state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>, task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>, chain: Arc<BeaconChain<T>>,
query: eth2::types::CommitteesQuery| { query: eth2::types::CommitteesQuery,
historical_committee_cache: Arc<HistoricalCommitteeCache>| {
task_spawner.blocking_json_task(Priority::P1, move || { task_spawner.blocking_json_task(Priority::P1, move || {
let (data, execution_optimistic, finalized) = state_id let (data, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized( .map_state_and_execution_optimistic_and_finalized(
@@ -364,101 +373,72 @@ pub fn get_beacon_state_committees<T: BeaconChainTypes>(
let shuffling_id = if let Ok(Some(shuffling_decision_block)) = let shuffling_id = if let Ok(Some(shuffling_decision_block)) =
chain.block_root_at_slot(decision_slot, WhenSlotSkipped::Prev) chain.block_root_at_slot(decision_slot, WhenSlotSkipped::Prev)
{ {
Some(AttestationShufflingId { Some(HistoricalShufflingId::ShufflingId(
shuffling_epoch: epoch, AttestationShufflingId {
shuffling_decision_block, shuffling_epoch: epoch,
}) shuffling_decision_block,
},
))
} else if epoch < chain.head().finalized_checkpoint().epoch {
// Use the case for finalized epochs
Some(HistoricalShufflingId::FinalizedEpoch(epoch))
} else { } else {
None None
}; };
// Attempt to read from the chain cache if there exists a // Attempt to read from the chain cache if there exists a
// shuffling_id // shuffling_id
let maybe_cached_shuffling = if let Some(shuffling_id) = let maybe_cached_shuffling =
shuffling_id.as_ref() if let Some(shuffling_id) = shuffling_id.as_ref() {
{ historical_committee_cache.get(shuffling_id)
chain } else {
.shuffling_cache None
.try_write_for(std::time::Duration::from_secs(1)) };
.and_then(|mut cache_write| cache_write.get(shuffling_id))
.and_then(|cache_item| cache_item.wait().ok())
.map(|cached_shuffling| cached_shuffling.committee_cache)
} else {
None
};
let committee_cache = let committee_cache =
if let Some(shuffling) = maybe_cached_shuffling { if let Some(shuffling) = maybe_cached_shuffling {
shuffling shuffling
} else { } else {
let committee_cache = let committee_cache = match RelativeEpoch::from_epoch(
match RelativeEpoch::from_epoch(current_epoch, epoch) { current_epoch,
Ok(relative_epoch) epoch,
if state.committee_cache_is_initialized( ) {
relative_epoch, Ok(relative_epoch)
) => if state.committee_cache_is_initialized(
{ relative_epoch,
state.committee_cache(relative_epoch).cloned() ) =>
} {
_ => CommitteeCache::initialized( state.committee_cache(relative_epoch).cloned()
}
Ok(_) | Err(RelativeEpochError::EpochTooLow { .. }) => {
CommitteeCache::initialized(
state, state,
epoch, epoch,
&chain.spec, &chain.spec,
), )
} }
.map_err( Err(RelativeEpochError::EpochTooHigh { .. }) => {
|e| match e { Err(BeaconStateError::EpochOutOfBounds)
BeaconStateError::EpochOutOfBounds => { }
let max_sprp = Err(RelativeEpochError::ArithError(e)) => {
T::EthSpec::slots_per_historical_root() Err(BeaconStateError::ArithError(e))
as u64; }
let first_subsequent_restore_point_slot = }
((epoch.start_slot( .map_err(|e| match e {
T::EthSpec::slots_per_epoch(), BeaconStateError::EpochOutOfBounds => {
) / max_sprp) warp_utils::reject::custom_bad_request(format!(
+ 1) "epoch {} out of bounds for state at {}",
* max_sprp; epoch, current_epoch
if epoch < current_epoch { ))
warp_utils::reject::custom_bad_request( }
format!( _ => warp_utils::reject::unhandled_error(
"epoch out of bounds, \ BeaconChainError::from(e),
try state at slot {}", ),
first_subsequent_restore_point_slot, })?;
),
)
} else {
warp_utils::reject::custom_bad_request(
"epoch out of bounds, \
too far in future"
.into(),
)
}
}
_ => warp_utils::reject::unhandled_error(
BeaconChainError::from(e),
),
},
)?;
// Attempt to write to the beacon cache (only if the cache if let Some(shuffling_id) = shuffling_id {
// size is not the default value). historical_committee_cache
if chain.config.shuffling_cache_size .insert(shuffling_id, committee_cache.clone());
!= beacon_chain::shuffling_cache::DEFAULT_CACHE_SIZE
&& let Some(_shuffling_id) = shuffling_id
&& let Some(_cache_write) = chain
.shuffling_cache
.try_write_for(std::time::Duration::from_secs(1))
{
// TODO: Do we really need to insert into the committee
// cache? Then we need to be able to produce PTCs for
// historical epochs, or limit the range of query.epoch
// against the state_id
// Theoretically we COULD compute the PTC for historical
// epochs, but should we? If we don't we need to insert
// historical committees to the cache without PTC, so we
// have to have a type of entry that does not have a PTC
// just to support the caching in this route: I persoanlly
// hate this.
} }
committee_cache committee_cache

View File

@@ -0,0 +1,43 @@
use lru::LruCache;
use parking_lot::Mutex;
use std::num::NonZeroUsize;
use std::sync::Arc;
use types::{AttestationShufflingId, CommitteeCache, Epoch};
/// See `shuffling_cache::DEFAULT_CACHE_SIZE` for rationale
pub const DEFAULT_HISTORICAL_COMMITTEE_CACHE_SIZE: usize = 16;
/// Indexes the `HistoricalCommitteeCache`. We can compute committees for very old epochs, and we
/// can't retrieve the decision root cheaply from a state. For those cases we allow the cache to
/// key those committees by finalized epoch.
#[derive(Eq, Hash, PartialEq)]
pub enum HistoricalShufflingId {
FinalizedEpoch(Epoch),
ShufflingId(AttestationShufflingId),
}
/// Dedicated cache for attestation committees, used exclusively by the HTTP API.
///
/// This may contain committees for finalized and unfinalized epochs. The name is slightly
/// missleading :)
pub struct HistoricalCommitteeCache {
committees: Mutex<LruCache<HistoricalShufflingId, Arc<CommitteeCache>>>,
}
impl HistoricalCommitteeCache {
pub fn new(size: NonZeroUsize) -> Self {
Self {
committees: Mutex::new(LruCache::new(size)),
}
}
}
impl HistoricalCommitteeCache {
pub fn get(&self, id: &HistoricalShufflingId) -> Option<Arc<CommitteeCache>> {
self.committees.lock().get(id).cloned()
}
pub fn insert(&self, id: HistoricalShufflingId, cache: Arc<CommitteeCache>) {
self.committees.lock().put(id, cache);
}
}

View File

@@ -12,6 +12,7 @@ mod beacon;
mod block_id; mod block_id;
mod build_block_contents; mod build_block_contents;
mod builder_states; mod builder_states;
mod caches;
mod custody; mod custody;
mod database; mod database;
mod light_client; mod light_client;
@@ -40,6 +41,8 @@ use crate::beacon::execution_payload_envelope::{
post_beacon_execution_payload_envelope_ssz, post_beacon_execution_payload_envelope_ssz,
}; };
use crate::beacon::pool::*; use crate::beacon::pool::*;
use crate::caches::DEFAULT_HISTORICAL_COMMITTEE_CACHE_SIZE;
pub use crate::caches::HistoricalCommitteeCache;
use crate::light_client::{get_light_client_bootstrap, get_light_client_updates}; use crate::light_client::{get_light_client_bootstrap, get_light_client_updates};
use crate::utils::{AnyVersionFilter, EthV1Filter}; use crate::utils::{AnyVersionFilter, EthV1Filter};
use crate::validator::post_validator_liveness_epoch; use crate::validator::post_validator_liveness_epoch;
@@ -132,6 +135,7 @@ pub struct Context<T: BeaconChainTypes> {
pub network_globals: Option<Arc<NetworkGlobals<T::EthSpec>>>, pub network_globals: Option<Arc<NetworkGlobals<T::EthSpec>>>,
pub beacon_processor_send: Option<BeaconProcessorSend<T::EthSpec>>, pub beacon_processor_send: Option<BeaconProcessorSend<T::EthSpec>>,
pub sse_logging_components: Option<SSELoggingComponents>, pub sse_logging_components: Option<SSELoggingComponents>,
pub historical_committee_cache: Arc<HistoricalCommitteeCache>,
} }
/// Configuration for the HTTP server. /// Configuration for the HTTP server.
@@ -148,6 +152,7 @@ pub struct Config {
#[serde(with = "eth2::types::serde_status_code")] #[serde(with = "eth2::types::serde_status_code")]
pub duplicate_block_status_code: StatusCode, pub duplicate_block_status_code: StatusCode,
pub target_peers: usize, pub target_peers: usize,
pub historical_committee_cache_size: usize,
} }
impl Default for Config { impl Default for Config {
@@ -163,6 +168,7 @@ impl Default for Config {
enable_beacon_processor: true, enable_beacon_processor: true,
duplicate_block_status_code: StatusCode::ACCEPTED, duplicate_block_status_code: StatusCode::ACCEPTED,
target_peers: 100, target_peers: 100,
historical_committee_cache_size: DEFAULT_HISTORICAL_COMMITTEE_CACHE_SIZE,
} }
} }
} }
@@ -416,6 +422,11 @@ pub fn serve<T: BeaconChainTypes>(
}) })
.boxed(); .boxed();
let historical_committee_cache = ctx.historical_committee_cache.clone();
let beacon_states_committees_filter = warp::any()
.map(move || historical_committee_cache.clone())
.boxed();
// Create a `warp` filter that provides access to the network sender channel. // Create a `warp` filter that provides access to the network sender channel.
let network_tx = ctx let network_tx = ctx
.network_senders .network_senders
@@ -628,8 +639,10 @@ pub fn serve<T: BeaconChainTypes>(
states::get_beacon_state_validators_id(beacon_states_path.clone()); states::get_beacon_state_validators_id(beacon_states_path.clone());
// GET beacon/states/{state_id}/committees?slot,index,epoch // GET beacon/states/{state_id}/committees?slot,index,epoch
let get_beacon_state_committees = let get_beacon_state_committees = states::get_beacon_state_committees(
states::get_beacon_state_committees(beacon_states_path.clone()); beacon_states_path.clone(),
beacon_states_committees_filter,
);
// GET beacon/states/{state_id}/sync_committees?epoch // GET beacon/states/{state_id}/sync_committees?epoch
let get_beacon_state_sync_committees = let get_beacon_state_sync_committees =

View File

@@ -1,4 +1,4 @@
use crate::{Config, Context}; use crate::{Config, Context, caches::HistoricalCommitteeCache};
use beacon_chain::{ use beacon_chain::{
BeaconChain, BeaconChainTypes, BeaconChain, BeaconChainTypes,
custody_context::NodeCustodyType, custody_context::NodeCustodyType,
@@ -22,10 +22,10 @@ use lighthouse_network::{
}; };
use network::{NetworkReceivers, NetworkSenders}; use network::{NetworkReceivers, NetworkSenders};
use sensitive_url::SensitiveUrl; use sensitive_url::SensitiveUrl;
use std::future::Future;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use std::{future::Future, num::NonZeroUsize};
use store::MemoryStore; use store::MemoryStore;
use task_executor::test_utils::TestRuntime; use task_executor::test_utils::TestRuntime;
use types::{ChainSpec, EthSpec}; use types::{ChainSpec, EthSpec};
@@ -293,6 +293,9 @@ pub async fn create_api_server_with_config<T: BeaconChainTypes>(
network_globals: Some(network_globals), network_globals: Some(network_globals),
beacon_processor_send: Some(beacon_processor_send), beacon_processor_send: Some(beacon_processor_send),
sse_logging_components: None, sse_logging_components: None,
historical_committee_cache: Arc::new(HistoricalCommitteeCache::new(
NonZeroUsize::new(http_config.historical_committee_cache_size).unwrap(),
)),
}); });
let (listening_socket, server) = let (listening_socket, server) =

View File

@@ -215,6 +215,9 @@ pub fn get_config<E: EthSpec>(
if let Some(cache_size) = clap_utils::parse_optional(cli_args, "shuffling-cache-size")? { if let Some(cache_size) = clap_utils::parse_optional(cli_args, "shuffling-cache-size")? {
client_config.chain.shuffling_cache_size = cache_size; client_config.chain.shuffling_cache_size = cache_size;
// Mantain backwards compatibility with users customizing `shuffling_cache_size` to tweak
// the behaviour of the HTTP API route `beacon/states/committees`
client_config.http_api.historical_committee_cache_size = cache_size;
} }
if let Some(batches) = clap_utils::parse_optional(cli_args, "blob-publication-batches")? { if let Some(batches) = clap_utils::parse_optional(cli_args, "blob-publication-batches")? {