Persist light client updates (#5545)

* persist light client updates

* update beacon chain to serve light client updates

* resolve todos

* cache best update

* extend cache parts

* is better light client update

* resolve merge conflict

* initial api changes

* add lc update db column

* fmt

* added tests

* add sim

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into persist-light-client-updates

* fix some weird issues with the simulator

* tests

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into persist-light-client-updates

* test changes

* merge conflict

* testing

* started work on ef tests and some code clean up

* update tests

* linting

* noop pre altair, were still failing on electra though

* allow for zeroed light client header

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into persist-light-client-updates

* merge unstable

* remove unwraps

* remove unwraps

* Update light_client_update.rs

* merge unstable

* move functionality to helper methods

* refactor is best update fn

* refactor is best update fn

* improve organization of light client server cache logic

* fork diget calc, and only spawn as many blcoks as we need for the lc update test

* fetch lc update from the cache if it exists

* fmt

* Fix beacon_chain tests

* Add debug code to update ranking_order ef test

* Fix compare code

* merge conflicts

* fix test

* Merge branch 'persist-light-client-updates' of https://github.com/eserilev/lighthouse into persist-light-client-updates

* Use blinded blocks for light client proofs

* fix ef test

* merge conflicts

* fix lc update check

* Lint

* resolve merge conflict

* Merge branch 'persist-light-client-updates' of https://github.com/eserilev/lighthouse into persist-light-client-updates

* revert basic sim

* small fix

* revert sim

* Review PR

* resolve merge conflicts

* Merge branch 'unstable' into persist-light-client-updates
This commit is contained in:
Eitan Seri-Levi
2024-08-09 00:36:20 -07:00
committed by GitHub
parent aad8727f52
commit 3913ea44c6
21 changed files with 1124 additions and 124 deletions

View File

@@ -1351,14 +1351,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) -> Result<(), Error> {
self.light_client_server_cache.recompute_and_cache_updates(
self.store.clone(),
&parent_root,
slot,
&parent_root,
&sync_aggregate,
&self.log,
&self.spec,
)
}
pub fn get_light_client_updates(
&self,
sync_committee_period: u64,
count: u64,
) -> Result<Vec<LightClientUpdate<T::EthSpec>>, Error> {
self.light_client_server_cache.get_light_client_updates(
&self.store,
sync_committee_period,
count,
&self.spec,
)
}
/// Returns the current heads of the `BeaconChain`. For the canonical head, see `Self::head`.
///
/// Returns `(block_root, block_slot)`.

View File

@@ -1,14 +1,23 @@
use crate::errors::BeaconChainError;
use crate::{metrics, BeaconChainTypes, BeaconStore};
use parking_lot::{Mutex, RwLock};
use safe_arith::SafeArith;
use slog::{debug, Logger};
use ssz::Decode;
use ssz::Encode;
use ssz_types::FixedVector;
use std::num::NonZeroUsize;
use types::light_client_update::{FinalizedRootProofLen, FINALIZED_ROOT_INDEX};
use std::sync::Arc;
use store::DBColumn;
use store::KeyValueStore;
use types::light_client_update::{
FinalizedRootProofLen, NextSyncCommitteeProofLen, FINALIZED_ROOT_INDEX,
NEXT_SYNC_COMMITTEE_INDEX,
};
use types::non_zero_usize::new_non_zero_usize;
use types::{
BeaconBlockRef, BeaconState, ChainSpec, EthSpec, ForkName, Hash256, LightClientFinalityUpdate,
LightClientOptimisticUpdate, Slot, SyncAggregate,
LightClientOptimisticUpdate, LightClientUpdate, Slot, SyncAggregate, SyncCommittee,
};
/// A prev block cache miss requires to re-generate the state of the post-parent block. Items in the
@@ -30,8 +39,10 @@ pub struct LightClientServerCache<T: BeaconChainTypes> {
latest_finality_update: RwLock<Option<LightClientFinalityUpdate<T::EthSpec>>>,
/// Tracks a single global latest optimistic update out of all imported blocks.
latest_optimistic_update: RwLock<Option<LightClientOptimisticUpdate<T::EthSpec>>>,
/// Caches the most recent light client update
latest_light_client_update: RwLock<Option<LightClientUpdate<T::EthSpec>>>,
/// Caches state proofs by block root
prev_block_cache: Mutex<lru::LruCache<Hash256, LightClientCachedData>>,
prev_block_cache: Mutex<lru::LruCache<Hash256, LightClientCachedData<T::EthSpec>>>,
}
impl<T: BeaconChainTypes> LightClientServerCache<T> {
@@ -39,13 +50,14 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
Self {
latest_finality_update: None.into(),
latest_optimistic_update: None.into(),
latest_light_client_update: None.into(),
prev_block_cache: lru::LruCache::new(PREV_BLOCK_CACHE_SIZE).into(),
}
}
/// Compute and cache state proofs for latter production of light-client messages. Does not
/// trigger block replay.
pub fn cache_state_data(
pub(crate) fn cache_state_data(
&self,
spec: &ChainSpec,
block: BeaconBlockRef<T::EthSpec>,
@@ -67,13 +79,13 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
Ok(())
}
/// Given a block with a SyncAggregte computes better or more recent light client updates. The
/// Given a block with a SyncAggregate computes better or more recent light client updates. The
/// results are cached either on disk or memory to be served via p2p and rest API
pub fn recompute_and_cache_updates(
&self,
store: BeaconStore<T>,
block_parent_root: &Hash256,
block_slot: Slot,
block_parent_root: &Hash256,
sync_aggregate: &SyncAggregate<T::EthSpec>,
log: &Logger,
chain_spec: &ChainSpec,
@@ -100,11 +112,17 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
let attested_slot = attested_block.slot();
let maybe_finalized_block = store.get_blinded_block(&cached_parts.finalized_block_root)?;
let sync_period = block_slot
.epoch(T::EthSpec::slots_per_epoch())
.sync_committee_period(chain_spec)?;
// Spec: Full nodes SHOULD provide the LightClientOptimisticUpdate with the highest
// attested_header.beacon.slot (if multiple, highest signature_slot) as selected by fork choice
let is_latest_optimistic = match &self.latest_optimistic_update.read().clone() {
Some(latest_optimistic_update) => {
is_latest_optimistic_update(latest_optimistic_update, attested_slot, signature_slot)
latest_optimistic_update.is_latest(attested_slot, signature_slot)
}
None => true,
};
@@ -122,18 +140,17 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
// attested_header.beacon.slot (if multiple, highest signature_slot) as selected by fork choice
let is_latest_finality = match &self.latest_finality_update.read().clone() {
Some(latest_finality_update) => {
is_latest_finality_update(latest_finality_update, attested_slot, signature_slot)
latest_finality_update.is_latest(attested_slot, signature_slot)
}
None => true,
};
if is_latest_finality & !cached_parts.finalized_block_root.is_zero() {
// Immediately after checkpoint sync the finalized block may not be available yet.
if let Some(finalized_block) =
store.get_blinded_block(&cached_parts.finalized_block_root)?
{
if let Some(finalized_block) = maybe_finalized_block.as_ref() {
*self.latest_finality_update.write() = Some(LightClientFinalityUpdate::new(
&attested_block,
&finalized_block,
finalized_block,
cached_parts.finality_branch.clone(),
sync_aggregate.clone(),
signature_slot,
@@ -148,9 +165,142 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
}
}
let new_light_client_update = LightClientUpdate::new(
sync_aggregate,
block_slot,
cached_parts.next_sync_committee,
cached_parts.next_sync_committee_branch,
cached_parts.finality_branch,
&attested_block,
maybe_finalized_block.as_ref(),
chain_spec,
)?;
// Spec: Full nodes SHOULD provide the best derivable LightClientUpdate (according to is_better_update)
// for each sync committee period
let prev_light_client_update = match &self.latest_light_client_update.read().clone() {
Some(prev_light_client_update) => Some(prev_light_client_update.clone()),
None => self.get_light_client_update(&store, sync_period, chain_spec)?,
};
let should_persist_light_client_update =
if let Some(prev_light_client_update) = prev_light_client_update {
let prev_sync_period = prev_light_client_update
.signature_slot()
.epoch(T::EthSpec::slots_per_epoch())
.sync_committee_period(chain_spec)?;
if sync_period != prev_sync_period {
true
} else {
prev_light_client_update
.is_better_light_client_update(&new_light_client_update, chain_spec)?
}
} else {
true
};
if should_persist_light_client_update {
self.store_light_client_update(&store, sync_period, &new_light_client_update)?;
}
Ok(())
}
fn store_light_client_update(
&self,
store: &BeaconStore<T>,
sync_committee_period: u64,
light_client_update: &LightClientUpdate<T::EthSpec>,
) -> Result<(), BeaconChainError> {
let column = DBColumn::LightClientUpdate;
store.hot_db.put_bytes(
column.into(),
&sync_committee_period.to_le_bytes(),
&light_client_update.as_ssz_bytes(),
)?;
*self.latest_light_client_update.write() = Some(light_client_update.clone());
Ok(())
}
// Used to fetch the most recently persisted "best" light client update.
// Should not be used outside the light client server, as it also caches the fetched
// light client update.
fn get_light_client_update(
&self,
store: &BeaconStore<T>,
sync_committee_period: u64,
chain_spec: &ChainSpec,
) -> Result<Option<LightClientUpdate<T::EthSpec>>, BeaconChainError> {
if let Some(latest_light_client_update) = self.latest_light_client_update.read().clone() {
let latest_lc_update_sync_committee_period = latest_light_client_update
.signature_slot()
.epoch(T::EthSpec::slots_per_epoch())
.sync_committee_period(chain_spec)?;
if latest_lc_update_sync_committee_period == sync_committee_period {
return Ok(Some(latest_light_client_update));
}
}
let column = DBColumn::LightClientUpdate;
let res = store
.hot_db
.get_bytes(column.into(), &sync_committee_period.to_le_bytes())?;
if let Some(light_client_update_bytes) = res {
let epoch = sync_committee_period
.safe_mul(chain_spec.epochs_per_sync_committee_period.into())?;
let fork_name = chain_spec.fork_name_at_epoch(epoch.into());
let light_client_update =
LightClientUpdate::from_ssz_bytes(&light_client_update_bytes, &fork_name)
.map_err(store::errors::Error::SszDecodeError)?;
*self.latest_light_client_update.write() = Some(light_client_update.clone());
return Ok(Some(light_client_update));
}
Ok(None)
}
pub fn get_light_client_updates(
&self,
store: &BeaconStore<T>,
start_period: u64,
count: u64,
chain_spec: &ChainSpec,
) -> Result<Vec<LightClientUpdate<T::EthSpec>>, BeaconChainError> {
let column = DBColumn::LightClientUpdate;
let mut light_client_updates = vec![];
for res in store
.hot_db
.iter_column_from::<Vec<u8>>(column, &start_period.to_le_bytes())
{
let (sync_committee_bytes, light_client_update_bytes) = res?;
let sync_committee_period = u64::from_ssz_bytes(&sync_committee_bytes)
.map_err(store::errors::Error::SszDecodeError)?;
let epoch = sync_committee_period
.safe_mul(chain_spec.epochs_per_sync_committee_period.into())?;
let fork_name = chain_spec.fork_name_at_epoch(epoch.into());
let light_client_update =
LightClientUpdate::from_ssz_bytes(&light_client_update_bytes, &fork_name)
.map_err(store::errors::Error::SszDecodeError)?;
light_client_updates.push(light_client_update);
if sync_committee_period >= start_period + count {
break;
}
}
Ok(light_client_updates)
}
/// Retrieves prev block cached data from cache. If not present re-computes by retrieving the
/// parent state, and inserts an entry to the cache.
///
@@ -161,7 +311,7 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
block_root: &Hash256,
block_state_root: &Hash256,
block_slot: Slot,
) -> Result<LightClientCachedData, BeaconChainError> {
) -> Result<LightClientCachedData<T::EthSpec>, BeaconChainError> {
// Attempt to get the value from the cache first.
if let Some(cached_parts) = self.prev_block_cache.lock().get(block_root) {
return Ok(cached_parts.clone());
@@ -199,52 +349,25 @@ impl<T: BeaconChainTypes> Default for LightClientServerCache<T> {
}
type FinalityBranch = FixedVector<Hash256, FinalizedRootProofLen>;
type NextSyncCommitteeBranch = FixedVector<Hash256, NextSyncCommitteeProofLen>;
#[derive(Clone)]
struct LightClientCachedData {
struct LightClientCachedData<E: EthSpec> {
finality_branch: FinalityBranch,
next_sync_committee_branch: NextSyncCommitteeBranch,
next_sync_committee: Arc<SyncCommittee<E>>,
finalized_block_root: Hash256,
}
impl LightClientCachedData {
fn from_state<E: EthSpec>(state: &mut BeaconState<E>) -> Result<Self, BeaconChainError> {
impl<E: EthSpec> LightClientCachedData<E> {
fn from_state(state: &mut BeaconState<E>) -> Result<Self, BeaconChainError> {
Ok(Self {
finality_branch: state.compute_merkle_proof(FINALIZED_ROOT_INDEX)?.into(),
next_sync_committee: state.next_sync_committee()?.clone(),
next_sync_committee_branch: state
.compute_merkle_proof(NEXT_SYNC_COMMITTEE_INDEX)?
.into(),
finalized_block_root: state.finalized_checkpoint().root,
})
}
}
// Implements spec prioritization rules:
// > Full nodes SHOULD provide the LightClientFinalityUpdate with the highest attested_header.beacon.slot (if multiple, highest signature_slot)
//
// ref: https://github.com/ethereum/consensus-specs/blob/113c58f9bf9c08867f6f5f633c4d98e0364d612a/specs/altair/light-client/full-node.md#create_light_client_finality_update
fn is_latest_finality_update<E: EthSpec>(
prev: &LightClientFinalityUpdate<E>,
attested_slot: Slot,
signature_slot: Slot,
) -> bool {
let prev_slot = prev.get_attested_header_slot();
if attested_slot > prev_slot {
true
} else {
attested_slot == prev_slot && signature_slot > *prev.signature_slot()
}
}
// Implements spec prioritization rules:
// > Full nodes SHOULD provide the LightClientOptimisticUpdate with the highest attested_header.beacon.slot (if multiple, highest signature_slot)
//
// ref: https://github.com/ethereum/consensus-specs/blob/113c58f9bf9c08867f6f5f633c4d98e0364d612a/specs/altair/light-client/full-node.md#create_light_client_optimistic_update
fn is_latest_optimistic_update<E: EthSpec>(
prev: &LightClientOptimisticUpdate<E>,
attested_slot: Slot,
signature_slot: Slot,
) -> bool {
let prev_slot = prev.get_slot();
if attested_slot > prev_slot {
true
} else {
attested_slot == prev_slot && signature_slot > *prev.signature_slot()
}
}

View File

@@ -5,6 +5,7 @@ use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::builder::BeaconChainBuilder;
use beacon_chain::data_availability_checker::AvailableBlock;
use beacon_chain::schema_change::migrate_schema;
use beacon_chain::test_utils::RelativeSyncCommittee;
use beacon_chain::test_utils::{
mock_execution_layer_from_parts, test_spec, AttestationStrategy, BeaconChainHarness,
BlockStrategy, DiskHarnessType, KZG,
@@ -103,6 +104,256 @@ fn get_harness_generic(
harness
}
#[tokio::test]
async fn light_client_updates_test() {
let spec = test_spec::<E>();
let Some(_) = spec.altair_fork_epoch else {
// No-op prior to Altair.
return;
};
let num_final_blocks = E::slots_per_epoch() * 2;
let checkpoint_slot = Slot::new(E::slots_per_epoch() * 9);
let db_path = tempdir().unwrap();
let log = test_logger();
let seconds_per_slot = spec.seconds_per_slot;
let store = get_store_generic(
&db_path,
StoreConfig {
slots_per_restore_point: 2 * E::slots_per_epoch(),
..Default::default()
},
test_spec::<E>(),
);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let all_validators = (0..LOW_VALIDATOR_COUNT).collect::<Vec<_>>();
let num_initial_slots = E::slots_per_epoch() * 10;
let slots: Vec<Slot> = (1..num_initial_slots).map(Slot::new).collect();
let (genesis_state, genesis_state_root) = harness.get_current_state_and_root();
harness
.add_attested_blocks_at_slots(
genesis_state.clone(),
genesis_state_root,
&slots,
&all_validators,
)
.await;
let wss_block_root = harness
.chain
.block_root_at_slot(checkpoint_slot, WhenSlotSkipped::Prev)
.unwrap()
.unwrap();
let wss_state_root = harness
.chain
.state_root_at_slot(checkpoint_slot)
.unwrap()
.unwrap();
let wss_block = harness
.chain
.store
.get_full_block(&wss_block_root)
.unwrap()
.unwrap();
let wss_blobs_opt = harness.chain.store.get_blobs(&wss_block_root).unwrap();
let wss_state = store
.get_state(&wss_state_root, Some(checkpoint_slot))
.unwrap()
.unwrap();
let kzg = spec.deneb_fork_epoch.map(|_| KZG.clone());
let mock =
mock_execution_layer_from_parts(&harness.spec, harness.runtime.task_executor.clone());
harness.advance_slot();
harness
.extend_chain(
num_final_blocks as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
// Initialise a new beacon chain from the finalized checkpoint.
// The slot clock must be set to a time ahead of the checkpoint state.
let slot_clock = TestingSlotClock::new(
Slot::new(0),
Duration::from_secs(harness.chain.genesis_time),
Duration::from_secs(seconds_per_slot),
);
slot_clock.set_slot(harness.get_current_slot().as_u64());
let (shutdown_tx, _shutdown_rx) = futures::channel::mpsc::channel(1);
let beacon_chain = BeaconChainBuilder::<DiskHarnessType<E>>::new(MinimalEthSpec)
.store(store.clone())
.custom_spec(test_spec::<E>())
.task_executor(harness.chain.task_executor.clone())
.logger(log.clone())
.weak_subjectivity_state(
wss_state,
wss_block.clone(),
wss_blobs_opt.clone(),
genesis_state,
)
.unwrap()
.store_migrator_config(MigratorConfig::default().blocking())
.dummy_eth1_backend()
.expect("should build dummy backend")
.slot_clock(slot_clock)
.shutdown_sender(shutdown_tx)
.chain_config(ChainConfig::default())
.event_handler(Some(ServerSentEventHandler::new_with_capacity(
log.clone(),
1,
)))
.execution_layer(Some(mock.el))
.kzg(kzg)
.build()
.expect("should build");
let beacon_chain = Arc::new(beacon_chain);
let current_state = harness.get_current_state();
if ForkName::Electra == current_state.fork_name_unchecked() {
// TODO(electra) fix beacon state `compute_merkle_proof`
return;
}
let block_root = *current_state
.get_block_root(current_state.slot() - Slot::new(1))
.unwrap();
let contributions = harness.make_sync_contributions(
&current_state,
block_root,
current_state.slot() - Slot::new(1),
RelativeSyncCommittee::Current,
);
// generate sync aggregates
for (_, contribution_and_proof) in contributions {
let contribution = contribution_and_proof
.expect("contribution exists for committee")
.message
.contribution;
beacon_chain
.op_pool
.insert_sync_contribution(contribution.clone())
.unwrap();
beacon_chain
.op_pool
.insert_sync_contribution(contribution)
.unwrap();
}
// check that we can fetch the newly generated sync aggregate
let sync_aggregate = beacon_chain
.op_pool
.get_sync_aggregate(&current_state)
.unwrap()
.unwrap();
// cache light client data
beacon_chain
.light_client_server_cache
.recompute_and_cache_updates(
store.clone(),
current_state.slot() - Slot::new(1),
&block_root,
&sync_aggregate,
&log,
&spec,
)
.unwrap();
// calculate the sync period from the previous slot
let sync_period = (current_state.slot() - Slot::new(1))
.epoch(E::slots_per_epoch())
.sync_committee_period(&spec)
.unwrap();
// fetch a range of light client updates. right now there should only be one light client update
// in the db.
let lc_updates = beacon_chain
.get_light_client_updates(sync_period, 100)
.unwrap();
assert_eq!(lc_updates.len(), 1);
// Advance to the next sync committee period
for _i in 0..(E::slots_per_epoch() * u64::from(spec.epochs_per_sync_committee_period)) {
harness.advance_slot();
}
harness
.extend_chain(
num_final_blocks as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
let current_state = harness.get_current_state();
let block_root = *current_state
.get_block_root(current_state.slot() - Slot::new(1))
.unwrap();
let contributions = harness.make_sync_contributions(
&current_state,
block_root,
current_state.slot() - Slot::new(1),
RelativeSyncCommittee::Current,
);
// generate new sync aggregates from this new state
for (_, contribution_and_proof) in contributions {
let contribution = contribution_and_proof
.expect("contribution exists for committee")
.message
.contribution;
beacon_chain
.op_pool
.insert_sync_contribution(contribution.clone())
.unwrap();
beacon_chain
.op_pool
.insert_sync_contribution(contribution)
.unwrap();
}
let sync_aggregate = beacon_chain
.op_pool
.get_sync_aggregate(&current_state)
.unwrap()
.unwrap();
// cache new light client data
beacon_chain
.light_client_server_cache
.recompute_and_cache_updates(
store.clone(),
current_state.slot() - Slot::new(1),
&block_root,
&sync_aggregate,
&log,
&spec,
)
.unwrap();
// we should now have two light client updates in the db
let lc_updates = beacon_chain
.get_light_client_updates(sync_period, 100)
.unwrap();
assert_eq!(lc_updates.len(), 2);
}
/// Tests that `store.heal_freezer_block_roots_at_split` inserts block roots between last restore point
/// slot and the split slot.
#[tokio::test]

View File

@@ -13,6 +13,7 @@ mod block_rewards;
mod build_block_contents;
mod builder_states;
mod database;
mod light_client;
mod metrics;
mod produce_block;
mod proposer_duties;
@@ -30,6 +31,7 @@ mod validator_inclusion;
mod validators;
mod version;
use crate::light_client::get_light_client_updates;
use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3};
use crate::version::fork_versioned_response;
use beacon_chain::{
@@ -44,8 +46,8 @@ use bytes::Bytes;
use directory::DEFAULT_ROOT_DIR;
use eth2::types::{
self as api_types, BroadcastValidation, EndpointVersion, ForkChoice, ForkChoiceNode,
PublishBlockRequest, ValidatorBalancesRequestBody, ValidatorId, ValidatorStatus,
ValidatorsRequestBody,
LightClientUpdatesQuery, PublishBlockRequest, ValidatorBalancesRequestBody, ValidatorId,
ValidatorStatus, ValidatorsRequestBody,
};
use eth2::{CONSENSUS_VERSION_HEADER, CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
use lighthouse_network::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage};
@@ -2484,6 +2486,25 @@ pub fn serve<T: BeaconChainTypes>(
},
);
// GET beacon/light_client/updates
let get_beacon_light_client_updates = beacon_light_client_path
.clone()
.and(task_spawner_filter.clone())
.and(warp::path("updates"))
.and(warp::path::end())
.and(warp::query::<api_types::LightClientUpdatesQuery>())
.and(warp::header::optional::<api_types::Accept>("accept"))
.then(
|chain: Arc<BeaconChain<T>>,
task_spawner: TaskSpawner<T::EthSpec>,
query: LightClientUpdatesQuery,
accept_header: Option<api_types::Accept>| {
task_spawner.blocking_response_task(Priority::P1, move || {
get_light_client_updates::<T>(chain, query, accept_header)
})
},
);
/*
* beacon/rewards
*/
@@ -4640,6 +4661,10 @@ pub fn serve<T: BeaconChainTypes>(
enable(ctx.config.enable_light_client_server)
.and(get_beacon_light_client_bootstrap),
)
.uor(
enable(ctx.config.enable_light_client_server)
.and(get_beacon_light_client_updates),
)
.uor(get_lighthouse_block_packing_efficiency)
.uor(get_lighthouse_merge_readiness)
.uor(get_events)

View File

@@ -0,0 +1,143 @@
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2::types::{
self as api_types, ChainSpec, ForkVersionedResponse, LightClientUpdate,
LightClientUpdateResponseChunk, LightClientUpdateSszResponse, LightClientUpdatesQuery,
};
use ssz::Encode;
use std::sync::Arc;
use warp::{
hyper::{Body, Response},
reply::Reply,
Rejection,
};
use crate::version::{add_ssz_content_type_header, fork_versioned_response, V1};
const MAX_REQUEST_LIGHT_CLIENT_UPDATES: u64 = 128;
pub fn get_light_client_updates<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
query: LightClientUpdatesQuery,
accept_header: Option<api_types::Accept>,
) -> Result<Response<Body>, Rejection> {
validate_light_client_updates_request(&chain, &query)?;
let light_client_updates = chain
.get_light_client_updates(query.start_period, query.count)
.map_err(|_| {
warp_utils::reject::custom_not_found("No LightClientUpdates found".to_string())
})?;
match accept_header {
Some(api_types::Accept::Ssz) => {
let response_chunks = light_client_updates
.iter()
.map(|update| map_light_client_update_to_ssz_chunk::<T>(&chain, update))
.collect::<Vec<LightClientUpdateResponseChunk>>();
let ssz_response = LightClientUpdateSszResponse {
response_chunk_len: (light_client_updates.len() as u64).to_le_bytes().to_vec(),
response_chunk: response_chunks.as_ssz_bytes(),
}
.as_ssz_bytes();
Response::builder()
.status(200)
.body(ssz_response)
.map(|res: Response<Vec<u8>>| add_ssz_content_type_header(res))
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
"failed to create response: {}",
e
))
})
}
_ => {
let fork_versioned_response = light_client_updates
.iter()
.map(|update| map_light_client_update_to_json_response::<T>(&chain, update.clone()))
.collect::<Result<Vec<ForkVersionedResponse<LightClientUpdate<T::EthSpec>>>, Rejection>>()?;
Ok(warp::reply::json(&fork_versioned_response).into_response())
}
}
}
pub fn validate_light_client_updates_request<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
query: &LightClientUpdatesQuery,
) -> Result<(), Rejection> {
if query.count > MAX_REQUEST_LIGHT_CLIENT_UPDATES {
return Err(warp_utils::reject::custom_bad_request(
"Invalid count requested".to_string(),
));
}
let current_sync_period = chain
.epoch()
.map_err(|_| {
warp_utils::reject::custom_server_error("failed to get current epoch".to_string())
})?
.sync_committee_period(&chain.spec)
.map_err(|_| {
warp_utils::reject::custom_server_error(
"failed to get current sync committee period".to_string(),
)
})?;
if query.start_period > current_sync_period {
return Err(warp_utils::reject::custom_bad_request(
"Invalid sync committee period requested".to_string(),
));
}
let earliest_altair_sync_committee = chain
.spec
.altair_fork_epoch
.ok_or(warp_utils::reject::custom_server_error(
"failed to get altair fork epoch".to_string(),
))?
.sync_committee_period(&chain.spec)
.map_err(|_| {
warp_utils::reject::custom_server_error(
"failed to get earliest altair sync committee".to_string(),
)
})?;
if query.start_period < earliest_altair_sync_committee {
return Err(warp_utils::reject::custom_bad_request(
"Invalid sync committee period requested".to_string(),
));
}
Ok(())
}
fn map_light_client_update_to_ssz_chunk<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
light_client_update: &LightClientUpdate<T::EthSpec>,
) -> LightClientUpdateResponseChunk {
let fork_name = chain
.spec
.fork_name_at_slot::<T::EthSpec>(*light_client_update.signature_slot());
let fork_digest = ChainSpec::compute_fork_digest(
chain.spec.fork_version_for_name(fork_name),
chain.genesis_validators_root,
);
LightClientUpdateResponseChunk {
context: fork_digest,
payload: light_client_update.as_ssz_bytes(),
}
}
fn map_light_client_update_to_json_response<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
light_client_update: LightClientUpdate<T::EthSpec>,
) -> Result<ForkVersionedResponse<LightClientUpdate<T::EthSpec>>, Rejection> {
let fork_name = chain
.spec
.fork_name_at_slot::<T::EthSpec>(*light_client_update.signature_slot());
fork_versioned_response(V1, fork_name, light_client_update)
}

View File

@@ -1813,6 +1813,36 @@ impl ApiTester {
self
}
pub async fn test_get_beacon_light_client_updates(self) -> Self {
let current_epoch = self.chain.epoch().unwrap();
let current_sync_committee_period = current_epoch
.sync_committee_period(&self.chain.spec)
.unwrap();
let result = match self
.client
.get_beacon_light_client_updates::<E>(current_sync_committee_period as u64, 1)
.await
{
Ok(result) => result,
Err(e) => panic!("query failed incorrectly: {e:?}"),
};
let expected = self
.chain
.light_client_server_cache
.get_light_client_updates(
&self.chain.store,
current_sync_committee_period as u64,
1,
&self.chain.spec,
)
.unwrap();
assert_eq!(result.clone().unwrap().len(), expected.len());
self
}
pub async fn test_get_beacon_light_client_bootstrap(self) -> Self {
let block_id = BlockId(CoreBlockId::Finalized);
let (block_root, _, _) = block_id.root(&self.chain).unwrap();
@@ -6171,6 +6201,18 @@ async fn node_get() {
.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_light_client_updates() {
let config = ApiTesterConfig {
spec: ForkName::Altair.make_genesis_spec(E::default_spec()),
..<_>::default()
};
ApiTester::new_from_config(config)
.await
.test_get_beacon_light_client_updates()
.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_light_client_bootstrap() {
let config = ApiTesterConfig {

View File

@@ -182,7 +182,6 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
fn iter_column_from<K: Key>(&self, column: DBColumn, from: &[u8]) -> ColumnIter<K> {
let start_key = BytesKey::from_vec(get_key_for_col(column.into(), from));
let iter = self.db.iter(self.read_options());
iter.seek(&start_key);

View File

@@ -300,6 +300,9 @@ pub enum DBColumn {
BeaconHistoricalSummaries,
#[strum(serialize = "olc")]
OverflowLRUCache,
/// For persisting eagerly computed light client data
#[strum(serialize = "lcu")]
LightClientUpdate,
}
/// A block from the database, which might have an execution payload or not.
@@ -342,7 +345,8 @@ impl DBColumn {
| Self::BeaconStateRoots
| Self::BeaconHistoricalRoots
| Self::BeaconHistoricalSummaries
| Self::BeaconRandaoMixes => 8,
| Self::BeaconRandaoMixes
| Self::LightClientUpdate => 8,
Self::BeaconDataColumn => DATA_COLUMN_DB_KEY_SIZE,
}
}