use crate::errors::BeaconChainError; use crate::{metrics, BeaconChainTypes, BeaconStore}; use parking_lot::{Mutex, RwLock}; use safe_arith::SafeArith; use slog::{debug, Logger}; use ssz::Decode; use ssz::Encode; use ssz_types::FixedVector; use std::num::NonZeroUsize; use std::sync::Arc; use store::DBColumn; use store::KeyValueStore; use types::light_client_update::{ FinalizedRootProofLen, NextSyncCommitteeProofLen, FINALIZED_ROOT_INDEX, NEXT_SYNC_COMMITTEE_INDEX, }; use types::non_zero_usize::new_non_zero_usize; use types::{ BeaconBlockRef, BeaconState, ChainSpec, EthSpec, ForkName, Hash256, LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, Slot, SyncAggregate, SyncCommittee, }; /// A prev block cache miss requires to re-generate the state of the post-parent block. Items in the /// prev block cache are very small 32 * (6 + 1) = 224 bytes. 32 is an arbitrary number that /// represents unlikely re-orgs, while keeping the cache very small. const PREV_BLOCK_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(32); /// This cache computes light client messages ahead of time, required to satisfy p2p and API /// requests. These messages include proofs on historical states, so on-demand computation is /// expensive. /// pub struct LightClientServerCache { /// Tracks a single global latest finality update out of all imported blocks. /// /// TODO: Active discussion with @etan-status if this cache should be fork aware to return /// latest canonical (update with highest signature slot, where its attested header is part of /// the head chain) instead of global latest (update with highest signature slot, out of all /// branches). latest_finality_update: RwLock>>, /// Tracks a single global latest optimistic update out of all imported blocks. latest_optimistic_update: RwLock>>, /// Caches the most recent light client update latest_light_client_update: RwLock>>, /// Caches state proofs by block root prev_block_cache: Mutex>>, } impl LightClientServerCache { pub fn new() -> Self { Self { latest_finality_update: None.into(), latest_optimistic_update: None.into(), latest_light_client_update: None.into(), prev_block_cache: lru::LruCache::new(PREV_BLOCK_CACHE_SIZE).into(), } } /// Compute and cache state proofs for latter production of light-client messages. Does not /// trigger block replay. pub(crate) fn cache_state_data( &self, spec: &ChainSpec, block: BeaconBlockRef, block_root: Hash256, block_post_state: &mut BeaconState, ) -> Result<(), BeaconChainError> { let _timer = metrics::start_timer(&metrics::LIGHT_CLIENT_SERVER_CACHE_STATE_DATA_TIMES); // Only post-altair if spec.fork_name_at_slot::(block.slot()) == ForkName::Base { return Ok(()); } // Persist in memory cache for a descendent block let cached_data = LightClientCachedData::from_state(block_post_state)?; self.prev_block_cache.lock().put(block_root, cached_data); Ok(()) } /// Given a block with a SyncAggregate computes better or more recent light client updates. The /// results are cached either on disk or memory to be served via p2p and rest API pub fn recompute_and_cache_updates( &self, store: BeaconStore, block_slot: Slot, block_parent_root: &Hash256, sync_aggregate: &SyncAggregate, log: &Logger, chain_spec: &ChainSpec, ) -> Result<(), BeaconChainError> { let _timer = metrics::start_timer(&metrics::LIGHT_CLIENT_SERVER_CACHE_RECOMPUTE_UPDATES_TIMES); let signature_slot = block_slot; let attested_block_root = block_parent_root; let attested_block = store.get_blinded_block(attested_block_root)?.ok_or( BeaconChainError::DBInconsistent(format!( "Block not available {:?}", attested_block_root )), )?; let cached_parts = self.get_or_compute_prev_block_cache( store.clone(), attested_block_root, &attested_block.state_root(), attested_block.slot(), )?; let attested_slot = attested_block.slot(); let maybe_finalized_block = store.get_blinded_block(&cached_parts.finalized_block_root)?; let sync_period = block_slot .epoch(T::EthSpec::slots_per_epoch()) .sync_committee_period(chain_spec)?; // Spec: Full nodes SHOULD provide the LightClientOptimisticUpdate with the highest // attested_header.beacon.slot (if multiple, highest signature_slot) as selected by fork choice let is_latest_optimistic = match &self.latest_optimistic_update.read().clone() { Some(latest_optimistic_update) => { latest_optimistic_update.is_latest(attested_slot, signature_slot) } None => true, }; if is_latest_optimistic { // can create an optimistic update, that is more recent *self.latest_optimistic_update.write() = Some(LightClientOptimisticUpdate::new( &attested_block, sync_aggregate.clone(), signature_slot, chain_spec, )?); }; // Spec: Full nodes SHOULD provide the LightClientFinalityUpdate with the highest // attested_header.beacon.slot (if multiple, highest signature_slot) as selected by fork choice let is_latest_finality = match &self.latest_finality_update.read().clone() { Some(latest_finality_update) => { latest_finality_update.is_latest(attested_slot, signature_slot) } None => true, }; if is_latest_finality & !cached_parts.finalized_block_root.is_zero() { // Immediately after checkpoint sync the finalized block may not be available yet. if let Some(finalized_block) = maybe_finalized_block.as_ref() { *self.latest_finality_update.write() = Some(LightClientFinalityUpdate::new( &attested_block, finalized_block, cached_parts.finality_branch.clone(), sync_aggregate.clone(), signature_slot, chain_spec, )?); } else { debug!( log, "Finalized block not available in store for light_client server"; "finalized_block_root" => format!("{}", cached_parts.finalized_block_root), ); } } let new_light_client_update = LightClientUpdate::new( sync_aggregate, block_slot, cached_parts.next_sync_committee, cached_parts.next_sync_committee_branch, cached_parts.finality_branch, &attested_block, maybe_finalized_block.as_ref(), chain_spec, )?; // Spec: Full nodes SHOULD provide the best derivable LightClientUpdate (according to is_better_update) // for each sync committee period let prev_light_client_update = match &self.latest_light_client_update.read().clone() { Some(prev_light_client_update) => Some(prev_light_client_update.clone()), None => self.get_light_client_update(&store, sync_period, chain_spec)?, }; let should_persist_light_client_update = if let Some(prev_light_client_update) = prev_light_client_update { let prev_sync_period = prev_light_client_update .signature_slot() .epoch(T::EthSpec::slots_per_epoch()) .sync_committee_period(chain_spec)?; if sync_period != prev_sync_period { true } else { prev_light_client_update .is_better_light_client_update(&new_light_client_update, chain_spec)? } } else { true }; if should_persist_light_client_update { self.store_light_client_update(&store, sync_period, &new_light_client_update)?; } Ok(()) } fn store_light_client_update( &self, store: &BeaconStore, sync_committee_period: u64, light_client_update: &LightClientUpdate, ) -> Result<(), BeaconChainError> { let column = DBColumn::LightClientUpdate; store.hot_db.put_bytes( column.into(), &sync_committee_period.to_le_bytes(), &light_client_update.as_ssz_bytes(), )?; *self.latest_light_client_update.write() = Some(light_client_update.clone()); Ok(()) } // Used to fetch the most recently persisted "best" light client update. // Should not be used outside the light client server, as it also caches the fetched // light client update. fn get_light_client_update( &self, store: &BeaconStore, sync_committee_period: u64, chain_spec: &ChainSpec, ) -> Result>, BeaconChainError> { if let Some(latest_light_client_update) = self.latest_light_client_update.read().clone() { let latest_lc_update_sync_committee_period = latest_light_client_update .signature_slot() .epoch(T::EthSpec::slots_per_epoch()) .sync_committee_period(chain_spec)?; if latest_lc_update_sync_committee_period == sync_committee_period { return Ok(Some(latest_light_client_update)); } } let column = DBColumn::LightClientUpdate; let res = store .hot_db .get_bytes(column.into(), &sync_committee_period.to_le_bytes())?; if let Some(light_client_update_bytes) = res { let epoch = sync_committee_period .safe_mul(chain_spec.epochs_per_sync_committee_period.into())?; let fork_name = chain_spec.fork_name_at_epoch(epoch.into()); let light_client_update = LightClientUpdate::from_ssz_bytes(&light_client_update_bytes, &fork_name) .map_err(store::errors::Error::SszDecodeError)?; *self.latest_light_client_update.write() = Some(light_client_update.clone()); return Ok(Some(light_client_update)); } Ok(None) } pub fn get_light_client_updates( &self, store: &BeaconStore, start_period: u64, count: u64, chain_spec: &ChainSpec, ) -> Result>, BeaconChainError> { let column = DBColumn::LightClientUpdate; let mut light_client_updates = vec![]; for res in store .hot_db .iter_column_from::>(column, &start_period.to_le_bytes()) { let (sync_committee_bytes, light_client_update_bytes) = res?; let sync_committee_period = u64::from_ssz_bytes(&sync_committee_bytes) .map_err(store::errors::Error::SszDecodeError)?; let epoch = sync_committee_period .safe_mul(chain_spec.epochs_per_sync_committee_period.into())?; let fork_name = chain_spec.fork_name_at_epoch(epoch.into()); let light_client_update = LightClientUpdate::from_ssz_bytes(&light_client_update_bytes, &fork_name) .map_err(store::errors::Error::SszDecodeError)?; light_client_updates.push(light_client_update); if sync_committee_period >= start_period + count { break; } } Ok(light_client_updates) } /// Retrieves prev block cached data from cache. If not present re-computes by retrieving the /// parent state, and inserts an entry to the cache. /// /// In separate function since FnOnce of get_or_insert can not be fallible. fn get_or_compute_prev_block_cache( &self, store: BeaconStore, block_root: &Hash256, block_state_root: &Hash256, block_slot: Slot, ) -> Result, BeaconChainError> { // Attempt to get the value from the cache first. if let Some(cached_parts) = self.prev_block_cache.lock().get(block_root) { return Ok(cached_parts.clone()); } metrics::inc_counter(&metrics::LIGHT_CLIENT_SERVER_CACHE_PREV_BLOCK_CACHE_MISS); // Compute the value, handling potential errors. let mut state = store .get_state(block_state_root, Some(block_slot))? .ok_or_else(|| { BeaconChainError::DBInconsistent(format!("Missing state {:?}", block_state_root)) })?; let new_value = LightClientCachedData::from_state(&mut state)?; // Insert value and return owned self.prev_block_cache .lock() .put(*block_root, new_value.clone()); Ok(new_value) } pub fn get_latest_finality_update(&self) -> Option> { self.latest_finality_update.read().clone() } pub fn get_latest_optimistic_update(&self) -> Option> { self.latest_optimistic_update.read().clone() } } impl Default for LightClientServerCache { fn default() -> Self { Self::new() } } type FinalityBranch = FixedVector; type NextSyncCommitteeBranch = FixedVector; #[derive(Clone)] struct LightClientCachedData { finality_branch: FinalityBranch, next_sync_committee_branch: NextSyncCommitteeBranch, next_sync_committee: Arc>, finalized_block_root: Hash256, } impl LightClientCachedData { fn from_state(state: &mut BeaconState) -> Result { Ok(Self { finality_branch: state.compute_merkle_proof(FINALIZED_ROOT_INDEX)?.into(), next_sync_committee: state.next_sync_committee()?.clone(), next_sync_committee_branch: state .compute_merkle_proof(NEXT_SYNC_COMMITTEE_INDEX)? .into(), finalized_block_root: state.finalized_checkpoint().root, }) } }