From 46db30416d8b85f0c8c10a26bc6bc1b39b60ccca Mon Sep 17 00:00:00 2001 From: ethDreamer <37123614+ethDreamer@users.noreply.github.com> Date: Fri, 12 May 2023 09:08:24 -0500 Subject: [PATCH] Implement Overflow LRU Cache for Pending Blobs (#4203) * All Necessary Objects Implement Encode/Decode * Major Components for LRUOverflowCache Implemented * Finish Database Code * Add Maintenance Methods * Added Maintenance Service * Persist Blobs on Shutdown / Reload on Startup * Address Clippy Complaints * Add (emum_behaviour = "tag") to ssz_derive * Convert Encode/Decode Implementations to "tag" * Started Adding Tests * Added a ton of tests * 1 character fix * Feature Guard Minimal Spec Tests * Update beacon_node/beacon_chain/src/data_availability_checker.rs Co-authored-by: realbigsean * Address Sean's Comments * Add iter_raw_keys method * Remove TODOs --------- Co-authored-by: realbigsean --- beacon_node/beacon_chain/src/beacon_chain.rs | 10 +- .../beacon_chain/src/blob_verification.rs | 4 +- .../beacon_chain/src/block_verification.rs | 11 +- beacon_node/beacon_chain/src/builder.rs | 6 +- .../src/data_availability_checker.rs | 450 ++--- .../overflow_lru_cache.rs | 1645 +++++++++++++++++ beacon_node/beacon_chain/src/errors.rs | 3 + .../src/eth1_finalization_cache.rs | 3 +- beacon_node/beacon_chain/src/metrics.rs | 2 + beacon_node/client/src/builder.rs | 5 + .../test_utils/execution_block_generator.rs | 3 +- .../beacon_processor/worker/rpc_methods.rs | 2 +- beacon_node/store/src/leveldb_store.rs | 30 + beacon_node/store/src/lib.rs | 13 + consensus/fork_choice/src/fork_choice.rs | 3 +- consensus/ssz_derive/src/lib.rs | 145 +- consensus/ssz_derive/tests/tests.rs | 15 + .../state_processing/src/consensus_context.rs | 5 +- consensus/types/src/beacon_block.rs | 21 +- consensus/types/src/beacon_state.rs | 98 +- consensus/types/src/fork_name.rs | 4 +- consensus/types/src/lib.rs | 6 +- consensus/types/src/signed_beacon_block.rs | 133 ++ 23 files changed, 2364 insertions(+), 253 deletions(-) create mode 100644 beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e7180cae14..ea4e3179b2 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -465,7 +465,7 @@ pub struct BeaconChain { /// Provides monitoring of a set of explicitly defined validators. pub validator_monitor: RwLock>, pub proposal_blob_cache: BlobCache, - pub data_availability_checker: DataAvailabilityChecker, + pub data_availability_checker: DataAvailabilityChecker, pub kzg: Option>, } @@ -609,6 +609,13 @@ impl BeaconChain { Ok(()) } + pub fn persist_data_availabilty_checker(&self) -> Result<(), Error> { + let _timer = metrics::start_timer(&metrics::PERSIST_DATA_AVAILABILITY_CHECKER); + self.data_availability_checker.persist_all()?; + + Ok(()) + } + /// Returns the slot _right now_ according to `self.slot_clock`. Returns `Err` if the slot is /// unavailable. /// @@ -6268,6 +6275,7 @@ impl Drop for BeaconChain { let drop = || -> Result<(), Error> { self.persist_head_and_fork_choice()?; self.persist_op_pool()?; + self.persist_data_availabilty_checker()?; self.persist_eth1_cache() }; diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index d5e5e9665a..2216764c27 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -15,6 +15,7 @@ use crate::BeaconChainError; use eth2::types::BlockContentsTuple; use kzg::Kzg; use slog::{debug, warn}; +use ssz_derive::{Decode, Encode}; use std::borrow::Cow; use types::{ BeaconBlockRef, BeaconState, BeaconStateError, BlobSidecar, BlobSidecarList, ChainSpec, @@ -398,8 +399,9 @@ fn cheap_state_advance_to_obtain_committees<'a, E: EthSpec>( /// Wrapper over a `BlobSidecar` for which we have completed kzg verification. /// i.e. `verify_blob_kzg_proof(blob, commitment, proof) == true`. -#[derive(Debug, Derivative, Clone)] +#[derive(Debug, Derivative, Clone, Encode, Decode)] #[derivative(PartialEq, Eq)] +#[ssz(struct_behaviour = "transparent")] pub struct KzgVerifiedBlob { blob: Arc>, } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 94316c0d30..2107fbf693 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -77,6 +77,7 @@ use safe_arith::ArithError; use slog::{debug, error, warn, Logger}; use slot_clock::SlotClock; use ssz::Encode; +use ssz_derive::{Decode, Encode}; use state_processing::per_block_processing::{errors::IntoWithIndex, is_merge_transition_block}; use state_processing::{ block_signature_verifier::{BlockSignatureVerifier, Error as BlockSignatureVerifierError}, @@ -95,6 +96,7 @@ use task_executor::JoinHandle; use tree_hash::TreeHash; use types::blob_sidecar::BlobIdentifier; use types::ExecPayload; +use types::{ssz_tagged_beacon_state, ssz_tagged_signed_beacon_block}; use types::{ BeaconBlockRef, BeaconState, BeaconStateError, BlindedPayload, ChainSpec, CloneConfig, Epoch, EthSpec, ExecutionBlockHash, Hash256, InconsistentFork, PublicKey, PublicKeyBytes, @@ -499,7 +501,7 @@ impl From for BlockError { } /// Stores information about verifying a payload against an execution engine. -#[derive(Clone)] +#[derive(Debug, PartialEq, Clone, Encode, Decode)] pub struct PayloadVerificationOutcome { pub payload_verification_status: PayloadVerificationStatus, pub is_valid_merge_transition_block: bool, @@ -718,6 +720,7 @@ impl ExecutedBlock { } } +#[derive(Debug, PartialEq)] pub struct AvailableExecutedBlock { pub block: AvailableBlock, pub import_data: BlockImportData, @@ -755,6 +758,7 @@ impl AvailableExecutedBlock { } } +#[derive(Encode, Decode, Clone)] pub struct AvailabilityPendingExecutedBlock { pub block: AvailabilityPendingBlock, pub import_data: BlockImportData, @@ -799,9 +803,14 @@ impl AvailabilityPendingExecutedBlock { } } +#[derive(Debug, PartialEq, Encode, Decode, Clone)] +// TODO (mark): investigate using an Arc / Arc +// here to make this cheaper to clone pub struct BlockImportData { pub block_root: Hash256, + #[ssz(with = "ssz_tagged_beacon_state")] pub state: BeaconState, + #[ssz(with = "ssz_tagged_signed_beacon_block")] pub parent_block: SignedBeaconBlock>, pub parent_eth1_finalization_data: Eth1FinalizationData, pub confirmed_state_roots: Vec, diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index a2347ede94..78f39e3581 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -794,7 +794,7 @@ where let beacon_chain = BeaconChain { spec: self.spec.clone(), config: self.chain_config, - store, + store: store.clone(), task_executor: self .task_executor .ok_or("Cannot build without task executor")?, @@ -864,8 +864,10 @@ where data_availability_checker: DataAvailabilityChecker::new( slot_clock, kzg.clone(), + store, self.spec, - ), + ) + .map_err(|e| format!("Error initializing DataAvailabiltyChecker: {:?}", e))?, proposal_blob_cache: BlobCache::default(), kzg, }; diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 1b44947c08..550515009e 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -4,23 +4,29 @@ use crate::blob_verification::{ }; use crate::block_verification::{AvailabilityPendingExecutedBlock, AvailableExecutedBlock}; +use crate::data_availability_checker::overflow_lru_cache::OverflowLRUCache; +use crate::{BeaconChain, BeaconChainTypes, BeaconStore}; use kzg::Error as KzgError; use kzg::Kzg; -use parking_lot::RwLock; +use slog::{debug, error}; use slot_clock::SlotClock; -use ssz_types::{Error, FixedVector, VariableList}; +use ssz_types::{Error, VariableList}; use state_processing::per_block_processing::deneb::deneb::verify_kzg_commitments_against_transactions; -use std::collections::hash_map::{Entry, OccupiedEntry}; -use std::collections::HashMap; use std::sync::Arc; +use task_executor::TaskExecutor; use types::beacon_block_body::KzgCommitments; use types::blob_sidecar::{BlobIdentifier, BlobSidecar}; use types::consts::deneb::MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS; +use types::ssz_tagged_signed_beacon_block; use types::{ BeaconBlockRef, BlobSidecarList, ChainSpec, Epoch, EthSpec, ExecPayload, FullPayload, Hash256, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, }; +mod overflow_lru_cache; + +pub const OVERFLOW_LRU_CAPACITY: usize = 1024; + #[derive(Debug)] pub enum AvailabilityCheckError { DuplicateBlob(Hash256), @@ -39,6 +45,9 @@ pub enum AvailabilityCheckError { }, Pending, IncorrectFork, + BlobIndexInvalid(u64), + StoreError(store::Error), + DecodeError(ssz::DecodeError), } impl From for AvailabilityCheckError { @@ -47,70 +56,35 @@ impl From for AvailabilityCheckError { } } +impl From for AvailabilityCheckError { + fn from(value: store::Error) -> Self { + Self::StoreError(value) + } +} + +impl From for AvailabilityCheckError { + fn from(value: ssz::DecodeError) -> Self { + Self::DecodeError(value) + } +} + /// This cache contains /// - blobs that have been gossip verified /// - commitments for blocks that have been gossip verified, but the commitments themselves /// have not been verified against blobs /// - blocks that have been fully verified and only require a data availability check -pub struct DataAvailabilityChecker { - availability_cache: RwLock>>, - slot_clock: S, +pub struct DataAvailabilityChecker { + availability_cache: Arc>, + slot_clock: T::SlotClock, kzg: Option>, spec: ChainSpec, } -/// Caches partially available blobs and execution verified blocks corresponding -/// to a given `block_hash` that are received over gossip. -/// -/// The blobs are all gossip and kzg verified. -/// The block has completed all verifications except the availability check. -struct ReceivedComponents { - verified_blobs: FixedVector>, T::MaxBlobsPerBlock>, - executed_block: Option>, -} - -impl ReceivedComponents { - fn new_from_blob(blob: KzgVerifiedBlob) -> Self { - let mut verified_blobs = FixedVector::<_, _>::default(); - // TODO: verify that we've already ensured the blob index < T::MaxBlobsPerBlock - if let Some(mut_maybe_blob) = verified_blobs.get_mut(blob.blob_index() as usize) { - *mut_maybe_blob = Some(blob); - } - - Self { - verified_blobs, - executed_block: None, - } - } - - fn new_from_block(block: AvailabilityPendingExecutedBlock) -> Self { - Self { - verified_blobs: <_>::default(), - executed_block: Some(block), - } - } - - /// Returns `true` if the cache has all blobs corresponding to the - /// kzg commitments in the block. - fn has_all_blobs(&self, block: &AvailabilityPendingExecutedBlock) -> bool { - for i in 0..block.num_blobs_expected() { - if self - .verified_blobs - .get(i) - .map(|maybe_blob| maybe_blob.is_none()) - .unwrap_or(true) - { - return false; - } - } - true - } -} - /// This type is returned after adding a block / blob to the `DataAvailabilityChecker`. /// /// Indicates if the block is fully `Available` or if we need blobs or blocks /// to "complete" the requirements for an `AvailableBlock`. +#[derive(Debug, PartialEq)] pub enum Availability { PendingBlobs(Vec), PendingBlock(Hash256), @@ -129,25 +103,28 @@ impl Availability { } } -impl DataAvailabilityChecker { - pub fn new(slot_clock: S, kzg: Option>, spec: ChainSpec) -> Self { - Self { - availability_cache: <_>::default(), +impl DataAvailabilityChecker { + pub fn new( + slot_clock: T::SlotClock, + kzg: Option>, + store: BeaconStore, + spec: ChainSpec, + ) -> Result { + let overflow_cache = OverflowLRUCache::new(OVERFLOW_LRU_CAPACITY, store)?; + Ok(Self { + availability_cache: Arc::new(overflow_cache), slot_clock, kzg, spec, - } + }) } /// Get a blob from the availability cache. - pub fn get_blob(&self, blob_id: &BlobIdentifier) -> Option>> { - self.availability_cache - .read() - .get(&blob_id.block_root)? - .verified_blobs - .get(blob_id.index as usize)? - .as_ref() - .map(|kzg_verified_blob| kzg_verified_blob.clone_blob()) + pub fn get_blob( + &self, + blob_id: &BlobIdentifier, + ) -> Result>>, AvailabilityCheckError> { + self.availability_cache.peek_blob(blob_id) } /// This first validates the KZG commitments included in the blob sidecar. @@ -158,10 +135,8 @@ impl DataAvailabilityChecker { /// This should only accept gossip verified blobs, so we should not have to worry about dupes. pub fn put_gossip_blob( &self, - gossip_blob: GossipVerifiedBlob, - ) -> Result, AvailabilityCheckError> { - let block_root = gossip_blob.block_root(); - + gossip_blob: GossipVerifiedBlob, + ) -> Result, AvailabilityCheckError> { // Verify the KZG commitments. let kzg_verified_blob = if let Some(kzg) = self.kzg.as_ref() { verify_kzg_for_blob(gossip_blob, kzg)? @@ -169,125 +144,26 @@ impl DataAvailabilityChecker { return Err(AvailabilityCheckError::KzgNotInitialized); }; - let availability = match self - .availability_cache - .write() - .entry(kzg_verified_blob.block_root()) - { - Entry::Occupied(mut occupied_entry) => { - // All blobs reaching this cache should be gossip verified and gossip verification - // should filter duplicates, as well as validate indices. - let received_components = occupied_entry.get_mut(); - - if let Some(maybe_verified_blob) = received_components - .verified_blobs - .get_mut(kzg_verified_blob.blob_index() as usize) - { - *maybe_verified_blob = Some(kzg_verified_blob) - } - - if let Some(executed_block) = received_components.executed_block.take() { - self.check_block_availability_maybe_cache(occupied_entry, executed_block)? - } else { - Availability::PendingBlock(block_root) - } - } - Entry::Vacant(vacant_entry) => { - let block_root = kzg_verified_blob.block_root(); - vacant_entry.insert(ReceivedComponents::new_from_blob(kzg_verified_blob)); - Availability::PendingBlock(block_root) - } - }; - - Ok(availability) + self.availability_cache + .put_kzg_verified_blob(kzg_verified_blob) } /// Check if we have all the blobs for a block. If we do, return the Availability variant that /// triggers import of the block. pub fn put_pending_executed_block( &self, - executed_block: AvailabilityPendingExecutedBlock, - ) -> Result, AvailabilityCheckError> { - let availability = match self - .availability_cache - .write() - .entry(executed_block.import_data.block_root) - { - Entry::Occupied(occupied_entry) => { - self.check_block_availability_maybe_cache(occupied_entry, executed_block)? - } - Entry::Vacant(vacant_entry) => { - let all_blob_ids = executed_block.get_all_blob_ids(); - vacant_entry.insert(ReceivedComponents::new_from_block(executed_block)); - Availability::PendingBlobs(all_blob_ids) - } - }; - - Ok(availability) - } - - /// Checks if the provided `executed_block` contains all required blobs to be considered an - /// `AvailableBlock` based on blobs that are cached. - /// - /// Returns an error if there was an error when matching the block commitments against blob commitments. - /// - /// Returns `Ok(Availability::Available(_))` if all blobs for the block are present in cache. - /// Returns `Ok(Availability::PendingBlobs(_))` if all corresponding blobs have not been received in the cache. - fn check_block_availability_maybe_cache( - &self, - mut occupied_entry: OccupiedEntry>, - executed_block: AvailabilityPendingExecutedBlock, - ) -> Result, AvailabilityCheckError> { - if occupied_entry.get().has_all_blobs(&executed_block) { - let num_blobs_expected = executed_block.num_blobs_expected(); - let AvailabilityPendingExecutedBlock { - block, - import_data, - payload_verification_outcome, - } = executed_block; - - let ReceivedComponents { - verified_blobs, - executed_block: _, - } = occupied_entry.remove(); - - let verified_blobs = Vec::from(verified_blobs) - .into_iter() - .take(num_blobs_expected) - .map(|maybe_blob| maybe_blob.ok_or(AvailabilityCheckError::MissingBlobs)) - .collect::, _>>()?; - - let available_block = self.make_available(block, verified_blobs)?; - Ok(Availability::Available(Box::new( - AvailableExecutedBlock::new( - available_block, - import_data, - payload_verification_outcome, - ), - ))) - } else { - let received_components = occupied_entry.get_mut(); - - let missing_blob_ids = executed_block.get_filtered_blob_ids(|index| { - received_components - .verified_blobs - .get(index as usize) - .map(|maybe_blob| maybe_blob.is_none()) - .unwrap_or(true) - }); - - let _ = received_components.executed_block.insert(executed_block); - - Ok(Availability::PendingBlobs(missing_blob_ids)) - } + executed_block: AvailabilityPendingExecutedBlock, + ) -> Result, AvailabilityCheckError> { + self.availability_cache + .put_pending_executed_block(executed_block) } /// Checks if a block is available, returns a `MaybeAvailableBlock` that may include the fully /// available block. pub fn check_availability( &self, - block: BlockWrapper, - ) -> Result, AvailabilityCheckError> { + block: BlockWrapper, + ) -> Result, AvailabilityCheckError> { match block { BlockWrapper::Block(block) => self.check_availability_without_blobs(block), BlockWrapper::BlockAndBlobs(block, blob_list) => { @@ -308,8 +184,8 @@ impl DataAvailabilityChecker { /// Does not access the gossip cache. pub fn try_check_availability( &self, - block: BlockWrapper, - ) -> Result, AvailabilityCheckError> { + block: BlockWrapper, + ) -> Result, AvailabilityCheckError> { match block { BlockWrapper::Block(block) => { let blob_requirements = self.get_blob_requirements(&block)?; @@ -329,13 +205,13 @@ impl DataAvailabilityChecker { /// commitments are consistent with the provided verified blob commitments. pub fn check_availability_with_blobs( &self, - block: Arc>, - blobs: KzgVerifiedBlobList, - ) -> Result, AvailabilityCheckError> { + block: Arc>, + blobs: KzgVerifiedBlobList, + ) -> Result, AvailabilityCheckError> { match self.check_availability_without_blobs(block)? { MaybeAvailableBlock::Available(block) => Ok(block), MaybeAvailableBlock::AvailabilityPending(pending_block) => { - self.make_available(pending_block, blobs) + pending_block.make_available(blobs) } } } @@ -344,8 +220,8 @@ impl DataAvailabilityChecker { /// an AvailableBlock if no blobs are required. Otherwise this will return an AvailabilityPendingBlock. pub fn check_availability_without_blobs( &self, - block: Arc>, - ) -> Result, AvailabilityCheckError> { + block: Arc>, + ) -> Result, AvailabilityCheckError> { let blob_requirements = self.get_blob_requirements(&block)?; let blobs = match blob_requirements { BlobRequirements::EmptyBlobs => VerifiedBlobs::EmptyBlobs, @@ -363,50 +239,18 @@ impl DataAvailabilityChecker { })) } - /// Verifies an AvailabilityPendingBlock against a set of KZG verified blobs. - /// This does not check whether a block *should* have blobs, these checks should must have been - /// completed when producing the `AvailabilityPendingBlock`. - pub fn make_available( - &self, - block: AvailabilityPendingBlock, - blobs: Vec>, - ) -> Result, AvailabilityCheckError> { - let block_kzg_commitments = block.kzg_commitments()?; - if blobs.len() != block_kzg_commitments.len() { - return Err(AvailabilityCheckError::NumBlobsMismatch { - num_kzg_commitments: block_kzg_commitments.len(), - num_blobs: blobs.len(), - }); - } - - for (block_commitment, blob) in block_kzg_commitments.iter().zip(blobs.iter()) { - if *block_commitment != blob.kzg_commitment() { - return Err(AvailabilityCheckError::KzgCommitmentMismatch { - blob_index: blob.as_blob().index, - }); - } - } - - let blobs = VariableList::new(blobs.into_iter().map(|blob| blob.to_blob()).collect())?; - - Ok(AvailableBlock { - block: block.block, - blobs: VerifiedBlobs::Available(blobs), - }) - } - /// Determines the blob requirements for a block. Answers the question: "Does this block require /// blobs?". fn get_blob_requirements( &self, - block: &Arc>>, + block: &Arc>>, ) -> Result { let verified_blobs = if let (Ok(block_kzg_commitments), Ok(payload)) = ( block.message().body().blob_kzg_commitments(), block.message().body().execution_payload(), ) { if let Some(transactions) = payload.transactions() { - let verified = verify_kzg_commitments_against_transactions::( + let verified = verify_kzg_commitments_against_transactions::( transactions, block_kzg_commitments, ) @@ -437,7 +281,7 @@ impl DataAvailabilityChecker { self.spec.deneb_fork_epoch.and_then(|fork_epoch| { self.slot_clock .now() - .map(|slot| slot.epoch(T::slots_per_epoch())) + .map(|slot| slot.epoch(T::EthSpec::slots_per_epoch())) .map(|current_epoch| { std::cmp::max( fork_epoch, @@ -452,6 +296,96 @@ impl DataAvailabilityChecker { self.data_availability_boundary() .map_or(false, |da_epoch| block_epoch >= da_epoch) } + + /// Persist all in memory components to disk + pub fn persist_all(&self) -> Result<(), AvailabilityCheckError> { + self.availability_cache.write_all_to_disk() + } +} + +pub fn start_availability_cache_maintenance_service( + executor: TaskExecutor, + chain: Arc>, +) { + // this cache only needs to be maintained if deneb is configured + if chain.spec.deneb_fork_epoch.is_some() { + let overflow_cache = chain.data_availability_checker.availability_cache.clone(); + executor.spawn( + async move { availability_cache_maintenance_service(chain, overflow_cache).await }, + "availability_cache_service", + ); + } else { + debug!( + chain.log, + "Deneb fork not configured, not starting availability cache maintenance service" + ); + } +} + +async fn availability_cache_maintenance_service( + chain: Arc>, + overflow_cache: Arc>, +) { + let epoch_duration = chain.slot_clock.slot_duration() * T::EthSpec::slots_per_epoch() as u32; + loop { + match chain + .slot_clock + .duration_to_next_epoch(T::EthSpec::slots_per_epoch()) + { + Some(duration) => { + // this service should run 3/4 of the way through the epoch + let additional_delay = (epoch_duration * 3) / 4; + tokio::time::sleep(duration + additional_delay).await; + + let deneb_fork_epoch = match chain.spec.deneb_fork_epoch { + Some(epoch) => epoch, + None => break, // shutdown service if deneb fork epoch not set + }; + + debug!( + chain.log, + "Availability cache maintenance service firing"; + ); + + let current_epoch = match chain + .slot_clock + .now() + .map(|slot| slot.epoch(T::EthSpec::slots_per_epoch())) + { + Some(epoch) => epoch, + None => continue, // we'll have to try again next time I suppose.. + }; + + if current_epoch < deneb_fork_epoch { + // we are not in deneb yet + continue; + } + + let finalized_epoch = chain + .canonical_head + .fork_choice_read_lock() + .finalized_checkpoint() + .epoch; + // any data belonging to an epoch before this should be pruned + let cutoff_epoch = std::cmp::max( + finalized_epoch + 1, + std::cmp::max( + current_epoch.saturating_sub(*MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS), + deneb_fork_epoch, + ), + ); + + if let Err(e) = overflow_cache.do_maintenance(cutoff_epoch) { + error!(chain.log, "Failed to maintain availability cache"; "error" => ?e); + } + } + None => { + error!(chain.log, "Failed to read slot clock"); + // If we can't read the slot clock, just wait another slot. + tokio::time::sleep(chain.slot_clock.slot_duration()).await; + } + }; + } } pub enum BlobRequirements { @@ -493,6 +427,37 @@ impl AvailabilityPendingBlock { .blob_kzg_commitments() .map_err(|_| AvailabilityCheckError::IncorrectFork) } + + /// Verifies an AvailabilityPendingBlock against a set of KZG verified blobs. + /// This does not check whether a block *should* have blobs, these checks should must have been + /// completed when producing the `AvailabilityPendingBlock`. + pub fn make_available( + self, + blobs: Vec>, + ) -> Result, AvailabilityCheckError> { + let block_kzg_commitments = self.kzg_commitments()?; + if blobs.len() != block_kzg_commitments.len() { + return Err(AvailabilityCheckError::NumBlobsMismatch { + num_kzg_commitments: block_kzg_commitments.len(), + num_blobs: blobs.len(), + }); + } + + for (block_commitment, blob) in block_kzg_commitments.iter().zip(blobs.iter()) { + if *block_commitment != blob.kzg_commitment() { + return Err(AvailabilityCheckError::KzgCommitmentMismatch { + blob_index: blob.as_blob().index, + }); + } + } + + let blobs = VariableList::new(blobs.into_iter().map(|blob| blob.to_blob()).collect())?; + + Ok(AvailableBlock { + block: self.block, + blobs: VerifiedBlobs::Available(blobs), + }) + } } #[derive(Clone, Debug, PartialEq)] @@ -576,3 +541,44 @@ impl AsBlock for AvailableBlock { } } } + +// The standard implementation of Encode for SignedBeaconBlock +// requires us to use ssz(enum_behaviour = "transparent"). This +// prevents us from implementing Decode. We need to use a +// custom Encode and Decode in this wrapper object that essentially +// encodes it as if it were ssz(enum_behaviour = "union") +impl ssz::Encode for AvailabilityPendingBlock { + fn is_ssz_fixed_len() -> bool { + ssz_tagged_signed_beacon_block::encode::is_ssz_fixed_len() + } + + fn ssz_append(&self, buf: &mut Vec) { + ssz_tagged_signed_beacon_block::encode::ssz_append(self.block.as_ref(), buf); + } + + fn ssz_bytes_len(&self) -> usize { + ssz_tagged_signed_beacon_block::encode::ssz_bytes_len(self.block.as_ref()) + } +} + +impl ssz::Decode for AvailabilityPendingBlock { + fn is_ssz_fixed_len() -> bool { + ssz_tagged_signed_beacon_block::decode::is_ssz_fixed_len() + } + + fn from_ssz_bytes(bytes: &[u8]) -> Result { + Ok(Self { + block: Arc::new(ssz_tagged_signed_beacon_block::decode::from_ssz_bytes( + bytes, + )?), + }) + } +} + +#[cfg(test)] +mod test { + #[test] + fn check_encode_decode_availability_pending_block() { + // todo.. (difficult to create default beacon blocks to test) + } +} diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs new file mode 100644 index 0000000000..4ad5f57eb6 --- /dev/null +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -0,0 +1,1645 @@ +use crate::beacon_chain::BeaconStore; +use crate::blob_verification::KzgVerifiedBlob; +use crate::block_verification::{AvailabilityPendingExecutedBlock, AvailableExecutedBlock}; +use crate::data_availability_checker::{Availability, AvailabilityCheckError}; +use crate::store::{DBColumn, KeyValueStore}; +use crate::BeaconChainTypes; +use lru::LruCache; +use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard}; +use ssz::{Decode, Encode}; +use ssz_derive::{Decode, Encode}; +use ssz_types::FixedVector; +use std::{collections::HashSet, sync::Arc}; +use types::blob_sidecar::BlobIdentifier; +use types::{BlobSidecar, Epoch, EthSpec, Hash256}; + +/// Caches partially available blobs and execution verified blocks corresponding +/// to a given `block_hash` that are received over gossip. +/// +/// The blobs are all gossip and kzg verified. +/// The block has completed all verifications except the availability check. +#[derive(Encode, Decode, Clone)] +pub struct PendingComponents { + verified_blobs: FixedVector>, T::MaxBlobsPerBlock>, + executed_block: Option>, +} + +impl PendingComponents { + pub fn new_from_blob(blob: KzgVerifiedBlob) -> Self { + let mut verified_blobs = FixedVector::<_, _>::default(); + if let Some(mut_maybe_blob) = verified_blobs.get_mut(blob.blob_index() as usize) { + *mut_maybe_blob = Some(blob); + } + + Self { + verified_blobs, + executed_block: None, + } + } + + pub fn new_from_block(block: AvailabilityPendingExecutedBlock) -> Self { + Self { + verified_blobs: <_>::default(), + executed_block: Some(block), + } + } + + /// Returns `true` if the cache has all blobs corresponding to the + /// kzg commitments in the block. + pub fn has_all_blobs(&self, block: &AvailabilityPendingExecutedBlock) -> bool { + for i in 0..block.num_blobs_expected() { + if self + .verified_blobs + .get(i) + .map(|maybe_blob| maybe_blob.is_none()) + .unwrap_or(true) + { + return false; + } + } + true + } + + pub fn empty() -> Self { + Self { + verified_blobs: <_>::default(), + executed_block: None, + } + } + + pub fn epoch(&self) -> Option { + self.executed_block + .as_ref() + .map(|pending_block| pending_block.block.as_block().epoch()) + .or_else(|| { + for maybe_blob in self.verified_blobs.iter() { + if maybe_blob.is_some() { + return maybe_blob.as_ref().map(|kzg_verified_blob| { + kzg_verified_blob.as_blob().slot.epoch(T::slots_per_epoch()) + }); + } + } + None + }) + } +} + +#[derive(Debug, PartialEq)] +enum OverflowKey { + Block(Hash256), + Blob(Hash256, u8), +} + +impl OverflowKey { + pub fn from_block_root(block_root: Hash256) -> Self { + Self::Block(block_root) + } + + pub fn from_blob_id( + blob_id: BlobIdentifier, + ) -> Result { + if blob_id.index > E::max_blobs_per_block() as u64 || blob_id.index > u8::MAX as u64 { + return Err(AvailabilityCheckError::BlobIndexInvalid(blob_id.index)); + } + Ok(Self::Blob(blob_id.block_root, blob_id.index as u8)) + } + + pub fn root(&self) -> &Hash256 { + match self { + Self::Block(root) => root, + Self::Blob(root, _) => root, + } + } +} + +/// A wrapper around BeaconStore that implements various +/// methods used for saving and retrieving blocks / blobs +/// from the store (for organization) +struct OverflowStore(BeaconStore); + +impl OverflowStore { + pub fn persist_pending_components( + &self, + block_root: Hash256, + mut pending_components: PendingComponents, + ) -> Result<(), AvailabilityCheckError> { + let col = DBColumn::OverflowLRUCache; + + if let Some(block) = pending_components.executed_block.take() { + let key = OverflowKey::from_block_root(block_root); + self.0 + .hot_db + .put_bytes(col.as_str(), &key.as_ssz_bytes(), &block.as_ssz_bytes())? + } + + for blob in Vec::from(pending_components.verified_blobs) + .into_iter() + .flatten() + { + let key = OverflowKey::from_blob_id::(BlobIdentifier { + block_root, + index: blob.blob_index(), + })?; + + self.0 + .hot_db + .put_bytes(col.as_str(), &key.as_ssz_bytes(), &blob.as_ssz_bytes())? + } + + Ok(()) + } + + pub fn get_pending_components( + &self, + block_root: Hash256, + ) -> Result>, AvailabilityCheckError> { + // read everything from disk and reconstruct + let mut maybe_pending_components = None; + for res in self + .0 + .hot_db + .iter_raw_entries(DBColumn::OverflowLRUCache, block_root.as_bytes()) + { + let (key_bytes, value_bytes) = res?; + match OverflowKey::from_ssz_bytes(&key_bytes)? { + OverflowKey::Block(_) => { + maybe_pending_components + .get_or_insert_with(PendingComponents::empty) + .executed_block = Some(AvailabilityPendingExecutedBlock::from_ssz_bytes( + value_bytes.as_slice(), + )?); + } + OverflowKey::Blob(_, index) => { + *maybe_pending_components + .get_or_insert_with(PendingComponents::empty) + .verified_blobs + .get_mut(index as usize) + .ok_or(AvailabilityCheckError::BlobIndexInvalid(index as u64))? = + Some(KzgVerifiedBlob::from_ssz_bytes(value_bytes.as_slice())?); + } + } + } + + Ok(maybe_pending_components) + } + + // returns the hashes of all the blocks we have data for on disk + pub fn read_keys_on_disk(&self) -> Result, AvailabilityCheckError> { + let mut disk_keys = HashSet::new(); + for res in self.0.hot_db.iter_raw_keys(DBColumn::OverflowLRUCache, &[]) { + let key_bytes = res?; + disk_keys.insert(*OverflowKey::from_ssz_bytes(&key_bytes)?.root()); + } + Ok(disk_keys) + } + + pub fn load_blob( + &self, + blob_id: &BlobIdentifier, + ) -> Result>>, AvailabilityCheckError> { + let key = OverflowKey::from_blob_id::(blob_id.clone())?; + + self.0 + .hot_db + .get_bytes(DBColumn::OverflowLRUCache.as_str(), &key.as_ssz_bytes())? + .map(|blob_bytes| Arc::>::from_ssz_bytes(blob_bytes.as_slice())) + .transpose() + .map_err(|e| e.into()) + } + + pub fn delete_keys(&self, keys: &Vec) -> Result<(), AvailabilityCheckError> { + for key in keys { + self.0 + .hot_db + .key_delete(DBColumn::OverflowLRUCache.as_str(), &key.as_ssz_bytes())?; + } + Ok(()) + } +} + +// This data is protected by an RwLock +struct Critical { + pub in_memory: LruCache>, + pub store_keys: HashSet, +} + +impl Critical { + pub fn new(capacity: usize) -> Self { + Self { + in_memory: LruCache::new(capacity), + store_keys: HashSet::new(), + } + } + + pub fn reload_store_keys( + &mut self, + overflow_store: &OverflowStore, + ) -> Result<(), AvailabilityCheckError> { + let disk_keys = overflow_store.read_keys_on_disk()?; + self.store_keys = disk_keys; + Ok(()) + } + + /// This only checks for the blobs in memory + pub fn peek_blob( + &self, + blob_id: &BlobIdentifier, + ) -> Result>>, AvailabilityCheckError> { + if let Some(pending_components) = self.in_memory.peek(&blob_id.block_root) { + Ok(pending_components + .verified_blobs + .get(blob_id.index as usize) + .ok_or(AvailabilityCheckError::BlobIndexInvalid(blob_id.index))? + .as_ref() + .map(|blob| blob.clone_blob())) + } else { + Ok(None) + } + } + + /// Puts the pending components in the LRU cache. If the cache + /// is at capacity, the LRU entry is written to the store first + pub fn put_pending_components( + &mut self, + block_root: Hash256, + pending_components: PendingComponents, + overflow_store: &OverflowStore, + ) -> Result<(), AvailabilityCheckError> { + if self.in_memory.len() == self.in_memory.cap() { + // cache will overflow, must write lru entry to disk + if let Some((lru_key, lru_value)) = self.in_memory.pop_lru() { + overflow_store.persist_pending_components(lru_key, lru_value)?; + self.store_keys.insert(lru_key); + } + } + self.in_memory.put(block_root, pending_components); + Ok(()) + } + + /// Removes and returns the pending_components corresponding to + /// the `block_root` or `None` if it does not exist + pub fn pop_pending_components( + &mut self, + block_root: Hash256, + store: &OverflowStore, + ) -> Result>, AvailabilityCheckError> { + match self.in_memory.pop_entry(&block_root) { + Some((_, pending_components)) => Ok(Some(pending_components)), + None => { + // not in memory, is it in the store? + if self.store_keys.remove(&block_root) { + store.get_pending_components(block_root) + } else { + Ok(None) + } + } + } + } +} + +pub struct OverflowLRUCache { + critical: RwLock>, + overflow_store: OverflowStore, + maintenance_lock: Mutex<()>, + capacity: usize, +} + +impl OverflowLRUCache { + pub fn new( + capacity: usize, + beacon_store: BeaconStore, + ) -> Result { + let overflow_store = OverflowStore(beacon_store); + let mut critical = Critical::new(capacity); + critical.reload_store_keys(&overflow_store)?; + Ok(Self { + critical: RwLock::new(critical), + overflow_store, + maintenance_lock: Mutex::new(()), + capacity, + }) + } + + pub fn peek_blob( + &self, + blob_id: &BlobIdentifier, + ) -> Result>>, AvailabilityCheckError> { + let read_lock = self.critical.read(); + if let Some(blob) = read_lock.peek_blob(blob_id)? { + Ok(Some(blob)) + } else if read_lock.store_keys.contains(&blob_id.block_root) { + drop(read_lock); + self.overflow_store.load_blob(blob_id) + } else { + Ok(None) + } + } + + pub fn put_kzg_verified_blob( + &self, + kzg_verified_blob: KzgVerifiedBlob, + ) -> Result, AvailabilityCheckError> { + let mut write_lock = self.critical.write(); + let block_root = kzg_verified_blob.block_root(); + + let availability = if let Some(mut pending_components) = + write_lock.pop_pending_components(block_root, &self.overflow_store)? + { + let blob_index = kzg_verified_blob.blob_index(); + *pending_components + .verified_blobs + .get_mut(blob_index as usize) + .ok_or(AvailabilityCheckError::BlobIndexInvalid(blob_index))? = + Some(kzg_verified_blob); + + if let Some(executed_block) = pending_components.executed_block.take() { + self.check_block_availability_maybe_cache( + write_lock, + block_root, + pending_components, + executed_block, + )? + } else { + write_lock.put_pending_components( + block_root, + pending_components, + &self.overflow_store, + )?; + Availability::PendingBlock(block_root) + } + } else { + // not in memory or store -> put new in memory + let new_pending_components = PendingComponents::new_from_blob(kzg_verified_blob); + write_lock.put_pending_components( + block_root, + new_pending_components, + &self.overflow_store, + )?; + Availability::PendingBlock(block_root) + }; + + Ok(availability) + } + + /// Check if we have all the blobs for a block. If we do, return the Availability variant that + /// triggers import of the block. + pub fn put_pending_executed_block( + &self, + executed_block: AvailabilityPendingExecutedBlock, + ) -> Result, AvailabilityCheckError> { + let mut write_lock = self.critical.write(); + let block_root = executed_block.import_data.block_root; + + let availability = + match write_lock.pop_pending_components(block_root, &self.overflow_store)? { + Some(pending_components) => self.check_block_availability_maybe_cache( + write_lock, + block_root, + pending_components, + executed_block, + )?, + None => { + let all_blob_ids = executed_block.get_all_blob_ids(); + if all_blob_ids.is_empty() { + // no blobs for this block, we can import it + let AvailabilityPendingExecutedBlock { + block, + import_data, + payload_verification_outcome, + } = executed_block; + let available_block = block.make_available(vec![])?; + return Ok(Availability::Available(Box::new( + AvailableExecutedBlock::new( + available_block, + import_data, + payload_verification_outcome, + ), + ))); + } + let new_pending_components = PendingComponents::new_from_block(executed_block); + write_lock.put_pending_components( + block_root, + new_pending_components, + &self.overflow_store, + )?; + Availability::PendingBlobs(all_blob_ids) + } + }; + + Ok(availability) + } + + /// Checks if the provided `executed_block` contains all required blobs to be considered an + /// `AvailableBlock` based on blobs that are cached. + /// + /// Returns an error if there was an error when matching the block commitments against blob commitments. + /// + /// Returns `Ok(Availability::Available(_))` if all blobs for the block are present in cache. + /// Returns `Ok(Availability::PendingBlobs(_))` if all corresponding blobs have not been received in the cache. + fn check_block_availability_maybe_cache( + &self, + mut write_lock: RwLockWriteGuard>, + block_root: Hash256, + mut pending_components: PendingComponents, + executed_block: AvailabilityPendingExecutedBlock, + ) -> Result, AvailabilityCheckError> { + if pending_components.has_all_blobs(&executed_block) { + let num_blobs_expected = executed_block.num_blobs_expected(); + let AvailabilityPendingExecutedBlock { + block, + import_data, + payload_verification_outcome, + } = executed_block; + + let verified_blobs = Vec::from(pending_components.verified_blobs) + .into_iter() + .take(num_blobs_expected) + .map(|maybe_blob| maybe_blob.ok_or(AvailabilityCheckError::MissingBlobs)) + .collect::, _>>()?; + + let available_block = block.make_available(verified_blobs)?; + Ok(Availability::Available(Box::new( + AvailableExecutedBlock::new( + available_block, + import_data, + payload_verification_outcome, + ), + ))) + } else { + let missing_blob_ids = executed_block.get_filtered_blob_ids(|index| { + pending_components + .verified_blobs + .get(index as usize) + .map(|maybe_blob| maybe_blob.is_none()) + .unwrap_or(true) + }); + + let _ = pending_components.executed_block.insert(executed_block); + write_lock.put_pending_components( + block_root, + pending_components, + &self.overflow_store, + )?; + + Ok(Availability::PendingBlobs(missing_blob_ids)) + } + } + + // writes all in_memory objects to disk + pub fn write_all_to_disk(&self) -> Result<(), AvailabilityCheckError> { + let maintenance_lock = self.maintenance_lock.lock(); + let mut critical_lock = self.critical.write(); + + let mut swap_lru = LruCache::new(self.capacity); + std::mem::swap(&mut swap_lru, &mut critical_lock.in_memory); + + for (root, pending_components) in swap_lru.into_iter() { + self.overflow_store + .persist_pending_components(root, pending_components)?; + critical_lock.store_keys.insert(root); + } + + drop(critical_lock); + drop(maintenance_lock); + Ok(()) + } + + // maintain the cache + pub fn do_maintenance(&self, cutoff_epoch: Epoch) -> Result<(), AvailabilityCheckError> { + // ensure memory usage is below threshold + let threshold = self.capacity * 3 / 4; + self.maintain_threshold(threshold, cutoff_epoch)?; + // clean up any keys on the disk that shouldn't be there + self.prune_disk(cutoff_epoch)?; + Ok(()) + } + + fn maintain_threshold( + &self, + threshold: usize, + cutoff_epoch: Epoch, + ) -> Result<(), AvailabilityCheckError> { + // ensure only one thread at a time can be deleting things from the disk or + // moving things between memory and storage + let maintenance_lock = self.maintenance_lock.lock(); + + let mut stored = self.critical.read().in_memory.len(); + while stored > threshold { + let read_lock = self.critical.upgradable_read(); + let lru_entry = read_lock + .in_memory + .peek_lru() + .map(|(key, value)| (*key, value.clone())); + + let (lru_root, lru_pending_components) = match lru_entry { + Some((r, p)) => (r, p), + None => break, + }; + + if lru_pending_components + .epoch() + .map(|epoch| epoch < cutoff_epoch) + .unwrap_or(true) + { + // this data is no longer needed -> delete it + let mut write_lock = RwLockUpgradableReadGuard::upgrade(read_lock); + write_lock.in_memory.pop_entry(&lru_root); + stored = write_lock.in_memory.len(); + continue; + } else { + drop(read_lock); + } + + // write the lru entry to disk (we aren't holding any critical locks while we do this) + self.overflow_store + .persist_pending_components(lru_root, lru_pending_components)?; + // now that we've written to disk, grab the critical write lock + let mut write_lock = self.critical.write(); + if let Some((new_lru_root_ref, _)) = write_lock.in_memory.peek_lru() { + // need to ensure the entry we just wrote to disk wasn't updated + // while we were writing and is still the LRU entry + if *new_lru_root_ref == lru_root { + // it is still LRU entry -> delete it from memory & record that it's on disk + write_lock.in_memory.pop_entry(&lru_root); + write_lock.store_keys.insert(lru_root); + stored = write_lock.in_memory.len(); + } + } + drop(write_lock); + } + + drop(maintenance_lock); + Ok(()) + } + + fn prune_disk(&self, cutoff_epoch: Epoch) -> Result<(), AvailabilityCheckError> { + // ensure only one thread at a time can be deleting things from the disk or + // moving things between memory and storage + let maintenance_lock = self.maintenance_lock.lock(); + + struct BlockData { + keys: Vec, + root: Hash256, + epoch: Epoch, + } + + let delete_if_outdated = |cache: &OverflowLRUCache, + block_data: Option| + -> Result<(), AvailabilityCheckError> { + let block_data = match block_data { + Some(block_data) => block_data, + None => return Ok(()), + }; + let not_in_store_keys = !cache.critical.read().store_keys.contains(&block_data.root); + if not_in_store_keys { + // these keys aren't supposed to be on disk + cache.overflow_store.delete_keys(&block_data.keys)?; + } else { + // check this data is still relevant + if block_data.epoch < cutoff_epoch { + // this data is no longer needed -> delete it + self.overflow_store.delete_keys(&block_data.keys)?; + } + } + Ok(()) + }; + + let mut current_block_data: Option = None; + for res in self + .overflow_store + .0 + .hot_db + .iter_raw_entries(DBColumn::OverflowLRUCache, &[]) + { + let (key_bytes, value_bytes) = res?; + let overflow_key = OverflowKey::from_ssz_bytes(&key_bytes)?; + let current_root = *overflow_key.root(); + + match &mut current_block_data { + Some(block_data) if block_data.root == current_root => { + // still dealing with the same block + block_data.keys.push(overflow_key); + } + _ => { + // first time encountering data for this block + delete_if_outdated(self, current_block_data)?; + let current_epoch = match &overflow_key { + OverflowKey::Block(_) => { + AvailabilityPendingExecutedBlock::::from_ssz_bytes( + value_bytes.as_slice(), + )? + .block + .as_block() + .epoch() + } + OverflowKey::Blob(_, _) => { + KzgVerifiedBlob::::from_ssz_bytes(value_bytes.as_slice())? + .as_blob() + .slot + .epoch(T::EthSpec::slots_per_epoch()) + } + }; + current_block_data = Some(BlockData { + keys: vec![overflow_key], + root: current_root, + epoch: current_epoch, + }); + } + } + } + // can't fall off the end + delete_if_outdated(self, current_block_data)?; + + drop(maintenance_lock); + Ok(()) + } +} + +impl ssz::Encode for OverflowKey { + fn is_ssz_fixed_len() -> bool { + true + } + + fn ssz_append(&self, buf: &mut Vec) { + match self { + OverflowKey::Block(block_hash) => { + block_hash.ssz_append(buf); + buf.push(0u8) + } + OverflowKey::Blob(block_hash, index) => { + block_hash.ssz_append(buf); + buf.push(*index + 1) + } + } + } + + fn ssz_fixed_len() -> usize { + ::ssz_fixed_len() + 1 + } + + fn ssz_bytes_len(&self) -> usize { + match self { + Self::Block(root) => root.ssz_bytes_len() + 1, + Self::Blob(root, _) => root.ssz_bytes_len() + 1, + } + } +} + +impl ssz::Decode for OverflowKey { + fn is_ssz_fixed_len() -> bool { + true + } + + fn ssz_fixed_len() -> usize { + ::ssz_fixed_len() + 1 + } + + fn from_ssz_bytes(bytes: &[u8]) -> Result { + let len = bytes.len(); + let h256_len = ::ssz_fixed_len(); + let expected = h256_len + 1; + + if len != expected { + Err(ssz::DecodeError::InvalidByteLength { len, expected }) + } else { + let root_bytes = bytes + .get(..h256_len) + .ok_or(ssz::DecodeError::OutOfBoundsByte { i: 0 })?; + let block_root = Hash256::from_ssz_bytes(root_bytes)?; + let id_byte = *bytes + .get(h256_len) + .ok_or(ssz::DecodeError::OutOfBoundsByte { i: h256_len })?; + match id_byte { + 0 => Ok(OverflowKey::Block(block_root)), + n => Ok(OverflowKey::Blob(block_root, n - 1)), + } + } + } +} + +#[cfg(test)] +mod test { + use super::*; + #[cfg(feature = "spec-minimal")] + use crate::{ + blob_verification::{ + validate_blob_sidecar_for_gossip, verify_kzg_for_blob, GossipVerifiedBlob, + }, + block_verification::{BlockImportData, PayloadVerificationOutcome}, + data_availability_checker::AvailabilityPendingBlock, + eth1_finalization_cache::Eth1FinalizationData, + test_utils::{BaseHarnessType, BeaconChainHarness, DiskHarnessType}, + }; + #[cfg(feature = "spec-minimal")] + use fork_choice::PayloadVerificationStatus; + #[cfg(feature = "spec-minimal")] + use logging::test_logger; + #[cfg(feature = "spec-minimal")] + use slog::{info, Logger}; + #[cfg(feature = "spec-minimal")] + use state_processing::ConsensusContext; + #[cfg(feature = "spec-minimal")] + use std::collections::{BTreeMap, HashMap, VecDeque}; + #[cfg(feature = "spec-minimal")] + use std::ops::AddAssign; + #[cfg(feature = "spec-minimal")] + use store::{HotColdDB, ItemStore, LevelDB, StoreConfig}; + #[cfg(feature = "spec-minimal")] + use tempfile::{tempdir, TempDir}; + #[cfg(feature = "spec-minimal")] + use types::beacon_state::ssz_tagged_beacon_state; + #[cfg(feature = "spec-minimal")] + use types::{ChainSpec, ExecPayload, MinimalEthSpec}; + + #[cfg(feature = "spec-minimal")] + const LOW_VALIDATOR_COUNT: usize = 32; + + #[cfg(feature = "spec-minimal")] + fn get_store_with_spec( + db_path: &TempDir, + spec: ChainSpec, + log: Logger, + ) -> Arc, LevelDB>> { + let hot_path = db_path.path().join("hot_db"); + let cold_path = db_path.path().join("cold_db"); + let config = StoreConfig::default(); + + HotColdDB::open( + &hot_path, + &cold_path, + None, + |_, _, _| Ok(()), + config, + spec, + log, + ) + .expect("disk store should initialize") + } + + // get a beacon chain harness advanced to just before deneb fork + #[cfg(feature = "spec-minimal")] + async fn get_deneb_chain( + log: Logger, + db_path: &TempDir, + ) -> BeaconChainHarness, LevelDB>> { + let altair_fork_epoch = Epoch::new(1); + let bellatrix_fork_epoch = Epoch::new(2); + let bellatrix_fork_slot = bellatrix_fork_epoch.start_slot(E::slots_per_epoch()); + let capella_fork_epoch = Epoch::new(3); + let deneb_fork_epoch = Epoch::new(4); + let deneb_fork_slot = deneb_fork_epoch.start_slot(E::slots_per_epoch()); + + let mut spec = E::default_spec(); + spec.altair_fork_epoch = Some(altair_fork_epoch); + spec.bellatrix_fork_epoch = Some(bellatrix_fork_epoch); + spec.capella_fork_epoch = Some(capella_fork_epoch); + spec.deneb_fork_epoch = Some(deneb_fork_epoch); + + let chain_store = get_store_with_spec::(db_path, spec.clone(), log.clone()); + let validators_keypairs = + types::test_utils::generate_deterministic_keypairs(LOW_VALIDATOR_COUNT); + let harness = BeaconChainHarness::builder(E::default()) + .spec(spec.clone()) + .logger(log.clone()) + .keypairs(validators_keypairs) + .fresh_disk_store(chain_store) + .mock_execution_layer() + .build(); + + // go to bellatrix slot + harness.extend_to_slot(bellatrix_fork_slot).await; + let merge_head = &harness.chain.head_snapshot().beacon_block; + assert!(merge_head.as_merge().is_ok()); + assert_eq!(merge_head.slot(), bellatrix_fork_slot); + assert!( + merge_head + .message() + .body() + .execution_payload() + .unwrap() + .is_default_with_empty_roots(), + "Merge head is default payload" + ); + // Trigger the terminal PoW block. + harness + .execution_block_generator() + .move_to_terminal_block() + .unwrap(); + // go right before deneb slot + harness.extend_to_slot(deneb_fork_slot - 1).await; + + harness + } + + #[test] + fn overflow_key_encode_decode_equality() { + type E = types::MainnetEthSpec; + let key_block = OverflowKey::Block(Hash256::random()); + let key_blob_0 = OverflowKey::from_blob_id::(BlobIdentifier { + block_root: Hash256::random(), + index: 0, + }) + .expect("should create overflow key 0"); + let key_blob_1 = OverflowKey::from_blob_id::(BlobIdentifier { + block_root: Hash256::random(), + index: 1, + }) + .expect("should create overflow key 1"); + let key_blob_2 = OverflowKey::from_blob_id::(BlobIdentifier { + block_root: Hash256::random(), + index: 2, + }) + .expect("should create overflow key 2"); + let key_blob_3 = OverflowKey::from_blob_id::(BlobIdentifier { + block_root: Hash256::random(), + index: 3, + }) + .expect("should create overflow key 3"); + + let keys = vec![key_block, key_blob_0, key_blob_1, key_blob_2, key_blob_3]; + for key in keys { + let encoded = key.as_ssz_bytes(); + let decoded = OverflowKey::from_ssz_bytes(&encoded).expect("should decode"); + assert_eq!(key, decoded, "Encoded and decoded keys should be equal"); + } + } + + #[tokio::test] + #[cfg(feature = "spec-minimal")] + async fn ssz_tagged_beacon_state_encode_decode_equality() { + type E = MinimalEthSpec; + let altair_fork_epoch = Epoch::new(1); + let altair_fork_slot = altair_fork_epoch.start_slot(E::slots_per_epoch()); + let bellatrix_fork_epoch = Epoch::new(2); + let merge_fork_slot = bellatrix_fork_epoch.start_slot(E::slots_per_epoch()); + let capella_fork_epoch = Epoch::new(3); + let capella_fork_slot = capella_fork_epoch.start_slot(E::slots_per_epoch()); + let deneb_fork_epoch = Epoch::new(4); + let deneb_fork_slot = deneb_fork_epoch.start_slot(E::slots_per_epoch()); + + let mut spec = E::default_spec(); + spec.altair_fork_epoch = Some(altair_fork_epoch); + spec.bellatrix_fork_epoch = Some(bellatrix_fork_epoch); + spec.capella_fork_epoch = Some(capella_fork_epoch); + spec.deneb_fork_epoch = Some(deneb_fork_epoch); + + let harness = BeaconChainHarness::builder(E::default()) + .spec(spec) + .logger(logging::test_logger()) + .deterministic_keypairs(LOW_VALIDATOR_COUNT) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + let mut state = harness.get_current_state(); + assert!(state.as_base().is_ok()); + let encoded = ssz_tagged_beacon_state::encode::as_ssz_bytes(&state); + let decoded = + ssz_tagged_beacon_state::decode::from_ssz_bytes(&encoded).expect("should decode"); + state.drop_all_caches().expect("should drop caches"); + assert_eq!(state, decoded, "Encoded and decoded states should be equal"); + + harness.extend_to_slot(altair_fork_slot).await; + + let mut state = harness.get_current_state(); + assert!(state.as_altair().is_ok()); + let encoded = ssz_tagged_beacon_state::encode::as_ssz_bytes(&state); + let decoded = + ssz_tagged_beacon_state::decode::from_ssz_bytes(&encoded).expect("should decode"); + state.drop_all_caches().expect("should drop caches"); + assert_eq!(state, decoded, "Encoded and decoded states should be equal"); + + harness.extend_to_slot(merge_fork_slot).await; + + let mut state = harness.get_current_state(); + assert!(state.as_merge().is_ok()); + let encoded = ssz_tagged_beacon_state::encode::as_ssz_bytes(&state); + let decoded = + ssz_tagged_beacon_state::decode::from_ssz_bytes(&encoded).expect("should decode"); + state.drop_all_caches().expect("should drop caches"); + assert_eq!(state, decoded, "Encoded and decoded states should be equal"); + + harness.extend_to_slot(capella_fork_slot).await; + + let mut state = harness.get_current_state(); + assert!(state.as_capella().is_ok()); + let encoded = ssz_tagged_beacon_state::encode::as_ssz_bytes(&state); + let decoded = + ssz_tagged_beacon_state::decode::from_ssz_bytes(&encoded).expect("should decode"); + state.drop_all_caches().expect("should drop caches"); + assert_eq!(state, decoded, "Encoded and decoded states should be equal"); + + harness.extend_to_slot(deneb_fork_slot).await; + + let mut state = harness.get_current_state(); + assert!(state.as_deneb().is_ok()); + let encoded = ssz_tagged_beacon_state::encode::as_ssz_bytes(&state); + let decoded = + ssz_tagged_beacon_state::decode::from_ssz_bytes(&encoded).expect("should decode"); + state.drop_all_caches().expect("should drop caches"); + assert_eq!(state, decoded, "Encoded and decoded states should be equal"); + } + + #[cfg(feature = "spec-minimal")] + async fn availability_pending_block( + harness: &BeaconChainHarness>, + log: Logger, + ) -> ( + AvailabilityPendingExecutedBlock, + Vec>, + ) + where + E: EthSpec, + Hot: ItemStore, + Cold: ItemStore, + { + let chain = &harness.chain; + let head = chain.head_snapshot(); + let parent_state = head.beacon_state.clone_with_only_committee_caches(); + + let target_slot = chain.slot().expect("should get slot") + 1; + let parent_root = head.beacon_block_root; + let parent_block = chain + .get_blinded_block(&parent_root) + .expect("should get block") + .expect("should have block"); + + let parent_eth1_finalization_data = Eth1FinalizationData { + eth1_data: parent_block.message().body().eth1_data().clone(), + eth1_deposit_index: 0, + }; + + let (signed_beacon_block_hash, (block, maybe_blobs), state) = harness + .add_block_at_slot(target_slot, parent_state) + .await + .expect("should add block"); + let block_root = signed_beacon_block_hash.into(); + assert_eq!( + block_root, + block.canonical_root(), + "block root should match" + ); + + // log kzg commitments + info!(log, "printing kzg commitments"); + for comm in Vec::from( + block + .message() + .body() + .blob_kzg_commitments() + .expect("should be deneb fork") + .clone(), + ) { + info!(log, "kzg commitment"; "commitment" => ?comm); + } + info!(log, "done printing kzg commitments"); + + let gossip_verified_blobs = if let Some(blobs) = maybe_blobs { + Vec::from(blobs) + .into_iter() + .map(|signed_blob| { + let subnet = signed_blob.message.index; + validate_blob_sidecar_for_gossip(signed_blob, subnet, &harness.chain) + .expect("should validate blob") + }) + .collect() + } else { + vec![] + }; + + let slot = block.slot(); + let apb: AvailabilityPendingBlock = AvailabilityPendingBlock { + block: Arc::new(block), + }; + + let consensus_context = ConsensusContext::::new(slot); + let import_data: BlockImportData = BlockImportData { + block_root, + state, + parent_block, + parent_eth1_finalization_data, + confirmed_state_roots: vec![], + consensus_context, + }; + + let payload_verification_outcome = PayloadVerificationOutcome { + payload_verification_status: PayloadVerificationStatus::Verified, + is_valid_merge_transition_block: false, + }; + + let availability_pending_block = AvailabilityPendingExecutedBlock { + block: apb, + import_data, + payload_verification_outcome, + }; + + (availability_pending_block, gossip_verified_blobs) + } + + #[tokio::test] + #[cfg(feature = "spec-minimal")] + async fn overflow_cache_test_insert_components() { + type E = MinimalEthSpec; + type T = DiskHarnessType; + let log = test_logger(); + let chain_db_path = tempdir().expect("should get temp dir"); + let harness: BeaconChainHarness = get_deneb_chain(log.clone(), &chain_db_path).await; + let spec = harness.spec.clone(); + let capacity = 4; + let db_path = tempdir().expect("should get temp dir"); + let test_store = get_store_with_spec::(&db_path, spec.clone(), log.clone()); + let cache = Arc::new( + OverflowLRUCache::::new(capacity, test_store).expect("should create cache"), + ); + + let (pending_block, blobs) = availability_pending_block(&harness, log.clone()).await; + let root = pending_block.import_data.block_root; + + let blobs_expected = pending_block.num_blobs_expected(); + assert_eq!( + blobs.len(), + blobs_expected, + "should have expected number of blobs" + ); + assert!( + cache.critical.read().in_memory.is_empty(), + "cache should be empty" + ); + let availability = cache + .put_pending_executed_block(pending_block) + .expect("should put block"); + if blobs_expected == 0 { + assert!( + matches!(availability, Availability::Available(_)), + "block doesn't have blobs, should be available" + ); + assert_eq!( + cache.critical.read().in_memory.len(), + 0, + "cache should be empty because we don't have blobs" + ); + } else { + assert!( + matches!(availability, Availability::PendingBlobs(_)), + "should be pending blobs" + ); + assert_eq!( + cache.critical.read().in_memory.len(), + 1, + "cache should have one block" + ); + assert!( + cache.critical.read().in_memory.peek(&root).is_some(), + "newly inserted block should exist in memory" + ); + } + + let kzg = harness + .chain + .kzg + .as_ref() + .cloned() + .expect("kzg should exist"); + for (blob_index, gossip_blob) in blobs.into_iter().enumerate() { + let kzg_verified_blob = + verify_kzg_for_blob(gossip_blob, kzg.as_ref()).expect("kzg should verify"); + let availability = cache + .put_kzg_verified_blob(kzg_verified_blob) + .expect("should put blob"); + if blob_index == blobs_expected - 1 { + assert!(matches!(availability, Availability::Available(_))); + } else { + assert!(matches!(availability, Availability::PendingBlobs(_))); + assert_eq!(cache.critical.read().in_memory.len(), 1); + } + } + assert!( + cache.critical.read().in_memory.is_empty(), + "cache should be empty now that all components available" + ); + + let (pending_block, blobs) = availability_pending_block(&harness, log.clone()).await; + let blobs_expected = pending_block.num_blobs_expected(); + assert_eq!( + blobs.len(), + blobs_expected, + "should have expected number of blobs" + ); + let root = pending_block.import_data.block_root; + for gossip_blob in blobs { + let kzg_verified_blob = + verify_kzg_for_blob(gossip_blob, kzg.as_ref()).expect("kzg should verify"); + let availability = cache + .put_kzg_verified_blob(kzg_verified_blob) + .expect("should put blob"); + assert_eq!( + availability, + Availability::PendingBlock(root), + "should be pending block" + ); + assert_eq!(cache.critical.read().in_memory.len(), 1); + } + let availability = cache + .put_pending_executed_block(pending_block) + .expect("should put block"); + assert!( + matches!(availability, Availability::Available(_)), + "block should be available: {:?}", + availability + ); + assert!( + cache.critical.read().in_memory.is_empty(), + "cache should be empty now that all components available" + ); + } + + #[tokio::test] + #[cfg(feature = "spec-minimal")] + async fn overflow_cache_test_overflow() { + type E = MinimalEthSpec; + type T = DiskHarnessType; + let log = test_logger(); + let chain_db_path = tempdir().expect("should get temp dir"); + let harness: BeaconChainHarness = get_deneb_chain(log.clone(), &chain_db_path).await; + let spec = harness.spec.clone(); + let capacity = 4; + let db_path = tempdir().expect("should get temp dir"); + let test_store = get_store_with_spec::(&db_path, spec.clone(), log.clone()); + let cache = Arc::new( + OverflowLRUCache::::new(capacity, test_store).expect("should create cache"), + ); + + let mut pending_blocks = VecDeque::new(); + let mut pending_blobs = VecDeque::new(); + let mut roots = VecDeque::new(); + while pending_blobs.len() < capacity + 1 { + let (pending_block, blobs) = availability_pending_block(&harness, log.clone()).await; + if pending_block.num_blobs_expected() == 0 { + // we need blocks with blobs + continue; + } + let root = pending_block.block.block.canonical_root(); + pending_blocks.push_back(pending_block); + pending_blobs.push_back(blobs); + roots.push_back(root); + } + + for i in 0..capacity { + cache + .put_pending_executed_block(pending_blocks.pop_front().expect("should have block")) + .expect("should put block"); + assert_eq!(cache.critical.read().in_memory.len(), i + 1); + } + for root in roots.iter().take(capacity) { + assert!(cache.critical.read().in_memory.peek(root).is_some()); + } + assert_eq!( + cache.critical.read().in_memory.len(), + capacity, + "cache should be full" + ); + // the first block should be the lru entry + assert_eq!( + *cache + .critical + .read() + .in_memory + .peek_lru() + .expect("should exist") + .0, + roots[0], + "first block should be lru" + ); + + cache + .put_pending_executed_block(pending_blocks.pop_front().expect("should have block")) + .expect("should put block"); + assert_eq!( + cache.critical.read().in_memory.len(), + capacity, + "cache should be full" + ); + assert!( + cache.critical.read().in_memory.peek(&roots[0]).is_none(), + "first block should be evicted" + ); + assert_eq!( + *cache + .critical + .read() + .in_memory + .peek_lru() + .expect("should exist") + .0, + roots[1], + "second block should be lru" + ); + + assert!(cache + .overflow_store + .get_pending_components(roots[0]) + .expect("should exist") + .is_some()); + + let threshold = capacity * 3 / 4; + cache + .maintain_threshold(threshold, Epoch::new(0)) + .expect("should maintain threshold"); + assert_eq!( + cache.critical.read().in_memory.len(), + threshold, + "cache should have been maintained" + ); + + let store_keys = cache + .overflow_store + .read_keys_on_disk() + .expect("should read keys"); + assert_eq!(store_keys.len(), 2); + assert!(store_keys.contains(&roots[0])); + assert!(store_keys.contains(&roots[1])); + assert!(cache.critical.read().store_keys.contains(&roots[0])); + assert!(cache.critical.read().store_keys.contains(&roots[1])); + + let kzg = harness + .chain + .kzg + .as_ref() + .cloned() + .expect("kzg should exist"); + + let blobs_0 = pending_blobs.pop_front().expect("should have blobs"); + let expected_blobs = blobs_0.len(); + for (blob_index, gossip_blob) in blobs_0.into_iter().enumerate() { + let kzg_verified_blob = + verify_kzg_for_blob(gossip_blob, kzg.as_ref()).expect("kzg should verify"); + let availability = cache + .put_kzg_verified_blob(kzg_verified_blob) + .expect("should put blob"); + if blob_index == expected_blobs - 1 { + assert!(matches!(availability, Availability::Available(_))); + } else { + // the first block should be brought back into memory + assert!( + cache.critical.read().in_memory.peek(&roots[0]).is_some(), + "first block should be in memory" + ); + assert!(matches!(availability, Availability::PendingBlobs(_))); + } + } + assert_eq!( + cache.critical.read().in_memory.len(), + threshold, + "cache should no longer have the first block" + ); + cache.prune_disk(Epoch::new(0)).expect("should prune disk"); + assert!( + cache + .overflow_store + .get_pending_components(roots[1]) + .expect("no error") + .is_some(), + "second block should still be on disk" + ); + assert!( + cache + .overflow_store + .get_pending_components(roots[0]) + .expect("no error") + .is_none(), + "first block should not be on disk" + ); + } + + #[tokio::test] + #[cfg(feature = "spec-minimal")] + async fn overflow_cache_test_maintenance() { + type E = MinimalEthSpec; + type T = DiskHarnessType; + let log = test_logger(); + let chain_db_path = tempdir().expect("should get temp dir"); + let harness: BeaconChainHarness = get_deneb_chain(log.clone(), &chain_db_path).await; + let spec = harness.spec.clone(); + let n_epochs = 4; + let capacity = E::slots_per_epoch() as usize; + let db_path = tempdir().expect("should get temp dir"); + let test_store = get_store_with_spec::(&db_path, spec.clone(), log.clone()); + let cache = Arc::new( + OverflowLRUCache::::new(capacity, test_store).expect("should create cache"), + ); + + let mut pending_blocks = VecDeque::new(); + let mut pending_blobs = VecDeque::new(); + let mut roots = VecDeque::new(); + let mut epoch_count = BTreeMap::new(); + while pending_blobs.len() < n_epochs * capacity { + let (pending_block, blobs) = availability_pending_block(&harness, log.clone()).await; + if pending_block.num_blobs_expected() == 0 { + // we need blocks with blobs + continue; + } + let root = pending_block.block.as_block().canonical_root(); + let epoch = pending_block + .block + .as_block() + .slot() + .epoch(E::slots_per_epoch()); + epoch_count.entry(epoch).or_insert_with(|| 0).add_assign(1); + + pending_blocks.push_back(pending_block); + pending_blobs.push_back(blobs); + roots.push_back(root); + } + + let kzg = harness + .chain + .kzg + .as_ref() + .cloned() + .expect("kzg should exist"); + + for _ in 0..(n_epochs * capacity) { + let pending_block = pending_blocks.pop_front().expect("should have block"); + let expected_blobs = pending_block.num_blobs_expected(); + if expected_blobs > 1 { + // might as well add a blob too + let mut pending_blobs = pending_blobs.pop_front().expect("should have blobs"); + let one_blob = pending_blobs.pop().expect("should have at least one blob"); + let kzg_verified_blob = + verify_kzg_for_blob(one_blob, kzg.as_ref()).expect("kzg should verify"); + // generate random boolean + let block_first = (rand::random::() % 2) == 0; + if block_first { + let availability = cache + .put_pending_executed_block(pending_block) + .expect("should put block"); + assert!( + matches!(availability, Availability::PendingBlobs(_)), + "should have pending blobs" + ); + let availability = cache + .put_kzg_verified_blob(kzg_verified_blob) + .expect("should put blob"); + assert!( + matches!(availability, Availability::PendingBlobs(_)), + "availabilty should be pending blobs: {:?}", + availability + ); + } else { + let availability = cache + .put_kzg_verified_blob(kzg_verified_blob) + .expect("should put blob"); + let root = pending_block.block.as_block().canonical_root(); + assert_eq!( + availability, + Availability::PendingBlock(root), + "should be pending block" + ); + let availability = cache + .put_pending_executed_block(pending_block) + .expect("should put block"); + assert!( + matches!(availability, Availability::PendingBlobs(_)), + "should have pending blobs" + ); + } + } else { + // still need to pop front so the blob count is correct + pending_blobs.pop_front().expect("should have blobs"); + let availability = cache + .put_pending_executed_block(pending_block) + .expect("should put block"); + assert!( + matches!(availability, Availability::PendingBlobs(_)), + "should be pending blobs" + ); + } + } + + // now we should have a full cache spanning multiple epochs + // run the maintenance routine for increasing epochs and ensure that the cache is pruned + assert_eq!( + cache.critical.read().in_memory.len(), + capacity, + "cache memory should be full" + ); + let store_keys = cache + .overflow_store + .read_keys_on_disk() + .expect("should read keys"); + assert_eq!( + store_keys.len(), + capacity * (n_epochs - 1), + "cache disk should have the rest" + ); + let mut expected_length = n_epochs * capacity; + for (epoch, count) in epoch_count { + cache + .do_maintenance(epoch + 1) + .expect("should run maintenance"); + let disk_keys = cache + .overflow_store + .read_keys_on_disk() + .expect("should read keys") + .len(); + let mem_keys = cache.critical.read().in_memory.len(); + expected_length -= count; + info!( + log, + "EPOCH: {} DISK KEYS: {} MEM KEYS: {} TOTAL: {} EXPECTED: {}", + epoch, + disk_keys, + mem_keys, + (disk_keys + mem_keys), + std::cmp::max(expected_length, capacity * 3 / 4), + ); + assert_eq!( + (disk_keys + mem_keys), + std::cmp::max(expected_length, capacity * 3 / 4), + "cache should be pruned" + ); + } + } + + #[tokio::test] + #[cfg(feature = "spec-minimal")] + async fn overflow_cache_test_persist_recover() { + type E = MinimalEthSpec; + type T = DiskHarnessType; + let log = test_logger(); + let chain_db_path = tempdir().expect("should get temp dir"); + let harness: BeaconChainHarness = get_deneb_chain(log.clone(), &chain_db_path).await; + let spec = harness.spec.clone(); + let n_epochs = 4; + let capacity = E::slots_per_epoch() as usize; + let db_path = tempdir().expect("should get temp dir"); + let test_store = get_store_with_spec::(&db_path, spec.clone(), log.clone()); + let cache = Arc::new( + OverflowLRUCache::::new(capacity, test_store.clone()).expect("should create cache"), + ); + + let mut pending_blocks = VecDeque::new(); + let mut pending_blobs = VecDeque::new(); + let mut roots = VecDeque::new(); + let mut epoch_count = BTreeMap::new(); + while pending_blobs.len() < n_epochs * capacity { + let (pending_block, blobs) = availability_pending_block(&harness, log.clone()).await; + if pending_block.num_blobs_expected() == 0 { + // we need blocks with blobs + continue; + } + let root = pending_block.block.as_block().canonical_root(); + let epoch = pending_block + .block + .as_block() + .slot() + .epoch(E::slots_per_epoch()); + epoch_count.entry(epoch).or_insert_with(|| 0).add_assign(1); + + pending_blocks.push_back(pending_block); + pending_blobs.push_back(blobs); + roots.push_back(root); + } + + let kzg = harness + .chain + .kzg + .as_ref() + .cloned() + .expect("kzg should exist"); + + let mut remaining_blobs = HashMap::new(); + for _ in 0..(n_epochs * capacity) { + let pending_block = pending_blocks.pop_front().expect("should have block"); + let block_root = pending_block.block.as_block().canonical_root(); + let expected_blobs = pending_block.num_blobs_expected(); + if expected_blobs > 1 { + // might as well add a blob too + let mut pending_blobs = pending_blobs.pop_front().expect("should have blobs"); + let one_blob = pending_blobs.pop().expect("should have at least one blob"); + let kzg_verified_blob = + verify_kzg_for_blob(one_blob, kzg.as_ref()).expect("kzg should verify"); + // generate random boolean + let block_first = (rand::random::() % 2) == 0; + remaining_blobs.insert(block_root, pending_blobs); + if block_first { + let availability = cache + .put_pending_executed_block(pending_block) + .expect("should put block"); + assert!( + matches!(availability, Availability::PendingBlobs(_)), + "should have pending blobs" + ); + let availability = cache + .put_kzg_verified_blob(kzg_verified_blob) + .expect("should put blob"); + assert!( + matches!(availability, Availability::PendingBlobs(_)), + "availabilty should be pending blobs: {:?}", + availability + ); + } else { + let availability = cache + .put_kzg_verified_blob(kzg_verified_blob) + .expect("should put blob"); + let root = pending_block.block.as_block().canonical_root(); + assert_eq!( + availability, + Availability::PendingBlock(root), + "should be pending block" + ); + let availability = cache + .put_pending_executed_block(pending_block) + .expect("should put block"); + assert!( + matches!(availability, Availability::PendingBlobs(_)), + "should have pending blobs" + ); + } + } else { + // still need to pop front so the blob count is correct + let pending_blobs = pending_blobs.pop_front().expect("should have blobs"); + remaining_blobs.insert(block_root, pending_blobs); + let availability = cache + .put_pending_executed_block(pending_block) + .expect("should put block"); + assert!( + matches!(availability, Availability::PendingBlobs(_)), + "should be pending blobs" + ); + } + } + + // now we should have a full cache spanning multiple epochs + // cache should be at capacity + assert_eq!( + cache.critical.read().in_memory.len(), + capacity, + "cache memory should be full" + ); + // write all components to disk + cache.write_all_to_disk().expect("should write all to disk"); + // everything should be on disk now + assert_eq!( + cache + .overflow_store + .read_keys_on_disk() + .expect("should read keys") + .len(), + capacity * n_epochs, + "cache disk should have the rest" + ); + assert_eq!( + cache.critical.read().in_memory.len(), + 0, + "cache memory should be empty" + ); + assert_eq!( + cache.critical.read().store_keys.len(), + n_epochs * capacity, + "cache store should have the rest" + ); + drop(cache); + + // create a new cache with the same store + let recovered_cache = + OverflowLRUCache::::new(capacity, test_store).expect("should recover cache"); + // again, everything should be on disk + assert_eq!( + recovered_cache + .overflow_store + .read_keys_on_disk() + .expect("should read keys") + .len(), + capacity * n_epochs, + "cache disk should have the rest" + ); + assert_eq!( + recovered_cache.critical.read().in_memory.len(), + 0, + "cache memory should be empty" + ); + assert_eq!( + recovered_cache.critical.read().store_keys.len(), + n_epochs * capacity, + "cache store should have the rest" + ); + + // now lets insert the remaining blobs until the cache is empty + for (_, blobs) in remaining_blobs { + let additional_blobs = blobs.len(); + for (i, gossip_blob) in blobs.into_iter().enumerate() { + let kzg_verified_blob = + verify_kzg_for_blob(gossip_blob, kzg.as_ref()).expect("kzg should verify"); + let availability = recovered_cache + .put_kzg_verified_blob(kzg_verified_blob) + .expect("should put blob"); + if i == additional_blobs - 1 { + assert!(matches!(availability, Availability::Available(_))) + } else { + assert!(matches!(availability, Availability::PendingBlobs(_))); + } + } + } + } +} diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 9d5485df9e..e4c4ff2517 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -2,6 +2,7 @@ use crate::attester_cache::Error as AttesterCacheError; use crate::beacon_block_streamer::Error as BlockStreamerError; use crate::beacon_chain::ForkChoiceError; use crate::beacon_fork_choice_store::Error as ForkChoiceStoreError; +use crate::data_availability_checker::AvailabilityCheckError; use crate::eth1_chain::Error as Eth1ChainError; use crate::historical_blocks::HistoricalBlockError; use crate::migrate::PruningError; @@ -215,6 +216,7 @@ pub enum BeaconChainError { BlsToExecutionConflictsWithPool, InconsistentFork(InconsistentFork), ProposerHeadForkChoiceError(fork_choice::Error), + AvailabilityCheckError(AvailabilityCheckError), } easy_from_to!(SlotProcessingError, BeaconChainError); @@ -240,6 +242,7 @@ easy_from_to!(HistoricalBlockError, BeaconChainError); easy_from_to!(StateAdvanceError, BeaconChainError); easy_from_to!(BlockReplayError, BeaconChainError); easy_from_to!(InconsistentFork, BeaconChainError); +easy_from_to!(AvailabilityCheckError, BeaconChainError); #[derive(Debug)] pub enum BlockProductionError { diff --git a/beacon_node/beacon_chain/src/eth1_finalization_cache.rs b/beacon_node/beacon_chain/src/eth1_finalization_cache.rs index 7cf805a126..17ac4e5b30 100644 --- a/beacon_node/beacon_chain/src/eth1_finalization_cache.rs +++ b/beacon_node/beacon_chain/src/eth1_finalization_cache.rs @@ -1,4 +1,5 @@ use slog::{debug, Logger}; +use ssz_derive::{Decode, Encode}; use std::cmp; use std::collections::BTreeMap; use types::{Checkpoint, Epoch, Eth1Data, Hash256 as Root}; @@ -10,7 +11,7 @@ pub const DEFAULT_ETH1_CACHE_SIZE: usize = 5; /// These fields are named the same as the corresponding fields in the `BeaconState` /// as this structure stores these values from the `BeaconState` at a `Checkpoint` -#[derive(Clone)] +#[derive(Clone, Debug, PartialEq, Encode, Decode)] pub struct Eth1FinalizationData { pub eth1_data: Eth1Data, pub eth1_deposit_index: u64, diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 315f869514..a8fdc0abd6 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -380,6 +380,8 @@ lazy_static! { try_create_histogram("beacon_persist_eth1_cache", "Time taken to persist the eth1 caches"); pub static ref PERSIST_FORK_CHOICE: Result = try_create_histogram("beacon_persist_fork_choice", "Time taken to persist the fork choice struct"); + pub static ref PERSIST_DATA_AVAILABILITY_CHECKER: Result = + try_create_histogram("beacon_persist_data_availability_checker", "Time taken to persist the data availability checker"); /* * Eth1 diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 329f072754..c977746c7b 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -2,6 +2,7 @@ use crate::address_change_broadcast::broadcast_address_changes_at_capella; use crate::config::{ClientGenesis, Config as ClientConfig}; use crate::notifier::spawn_notifier; use crate::Client; +use beacon_chain::data_availability_checker::start_availability_cache_maintenance_service; use beacon_chain::otb_verification_service::start_otb_verification_service; use beacon_chain::proposer_prep_service::start_proposer_prep_service; use beacon_chain::schema_change::migrate_schema; @@ -828,6 +829,10 @@ where start_proposer_prep_service(runtime_context.executor.clone(), beacon_chain.clone()); start_otb_verification_service(runtime_context.executor.clone(), beacon_chain.clone()); + start_availability_cache_maintenance_service( + runtime_context.executor.clone(), + beacon_chain.clone(), + ); } Ok(Client { diff --git a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs index 773c3fe9d4..5e5508b6f8 100644 --- a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs +++ b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use tree_hash::TreeHash; use tree_hash_derive::TreeHash; +use types::consts::deneb::BLOB_TX_TYPE; use types::transaction::{BlobTransaction, EcdsaSignature, SignedBlobTransaction}; use types::{ Blob, EthSpec, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadCapella, @@ -684,7 +685,7 @@ impl ExecutionBlockGenerator { signature: bad_signature, }; // calculate transaction bytes - let tx_bytes = [0x05u8] + let tx_bytes = [BLOB_TX_TYPE] .into_iter() .chain(signed_blob_transaction.as_ssz_bytes().into_iter()) .collect::>(); diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 5f282ecfbe..ffaf564127 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -230,7 +230,7 @@ impl Worker { let mut blob_list_results = HashMap::new(); for id in request.blob_ids.into_iter() { // First attempt to get the blobs from the RPC cache. - if let Some(blob) = self.chain.data_availability_checker.get_blob(&id) { + if let Ok(Some(blob)) = self.chain.data_availability_checker.get_blob(&id) { self.send_response(peer_id, Response::BlobsByRoot(Some(blob)), request_id); send_blob_count += 1; } else { diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index 86bd4ffacc..261f8c461b 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -198,6 +198,36 @@ impl KeyValueStore for LevelDB { ) } + fn iter_raw_entries(&self, column: DBColumn, prefix: &[u8]) -> RawEntryIter { + let start_key = BytesKey::from_vec(get_key_for_col(column.into(), prefix)); + + let iter = self.db.iter(self.read_options()); + iter.seek(&start_key); + + Box::new( + iter.take_while(move |(key, _)| key.key.starts_with(start_key.key.as_slice())) + .map(move |(bytes_key, value)| { + let subkey = &bytes_key.key[column.as_bytes().len()..]; + Ok((Vec::from(subkey), value)) + }), + ) + } + + fn iter_raw_keys(&self, column: DBColumn, prefix: &[u8]) -> RawKeyIter { + let start_key = BytesKey::from_vec(get_key_for_col(column.into(), prefix)); + + let iter = self.db.keys_iter(self.read_options()); + iter.seek(&start_key); + + Box::new( + iter.take_while(move |key| key.key.starts_with(start_key.key.as_slice())) + .map(move |bytes_key| { + let subkey = &bytes_key.key[column.as_bytes().len()..]; + Ok(Vec::from(subkey)) + }), + ) + } + /// Iterate through all keys and values in a particular column. fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter { let start_key = diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 47f0049fc2..cd2f2da2b9 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -49,6 +49,9 @@ pub use types::*; pub type ColumnIter<'a> = Box), Error>> + 'a>; pub type ColumnKeyIter<'a> = Box> + 'a>; +pub type RawEntryIter<'a> = Box, Vec), Error>> + 'a>; +pub type RawKeyIter<'a> = Box, Error>> + 'a>; + pub trait KeyValueStore: Sync + Send + Sized + 'static { /// Retrieve some bytes in `column` with `key`. fn get_bytes(&self, column: &str, key: &[u8]) -> Result>, Error>; @@ -88,6 +91,14 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { Box::new(std::iter::empty()) } + fn iter_raw_entries(&self, _column: DBColumn, _prefix: &[u8]) -> RawEntryIter { + Box::new(std::iter::empty()) + } + + fn iter_raw_keys(&self, _column: DBColumn, _prefix: &[u8]) -> RawKeyIter { + Box::new(std::iter::empty()) + } + /// Iterate through all keys in a particular column. fn iter_column_keys(&self, _column: DBColumn) -> ColumnKeyIter { // Default impl for non LevelDB databases @@ -227,6 +238,8 @@ pub enum DBColumn { OptimisticTransitionBlock, #[strum(serialize = "bhs")] BeaconHistoricalSummaries, + #[strum(serialize = "olc")] + OverflowLRUCache, } /// A block from the database, which might have an execution payload or not. diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index 03942751a8..b78e486d51 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -192,7 +192,8 @@ impl CountUnrealized { /// Indicates if a block has been verified by an execution payload. /// /// There is no variant for "invalid", since such a block should never be added to fork choice. -#[derive(Clone, Copy, Debug, PartialEq)] +#[derive(Clone, Copy, Debug, PartialEq, Encode, Decode)] +#[ssz(enum_behaviour = "tag")] pub enum PayloadVerificationStatus { /// An EL has declared the execution payload to be valid. Verified, diff --git a/consensus/ssz_derive/src/lib.rs b/consensus/ssz_derive/src/lib.rs index a5c4b7bb6d..280bdb83df 100644 --- a/consensus/ssz_derive/src/lib.rs +++ b/consensus/ssz_derive/src/lib.rs @@ -4,6 +4,7 @@ //! //! The following struct/enum attributes are available: //! +//! - `#[ssz(enum_behaviour = "tag")]`: encodes and decodes an `enum` with 0 fields per variant //! - `#[ssz(enum_behaviour = "union")]`: encodes and decodes an `enum` with a one-byte variant selector. //! - `#[ssz(enum_behaviour = "transparent")]`: allows encoding an `enum` by serializing only the //! value whilst ignoring outermost the `enum`. @@ -140,6 +141,22 @@ //! TransparentEnum::Bar(vec![42, 42]).as_ssz_bytes(), //! vec![42, 42] //! ); +//! +//! /// Representated as an SSZ "uint8" +//! #[derive(Debug, PartialEq, Encode, Decode)] +//! #[ssz(enum_behaviour = "tag")] +//! enum TagEnum { +//! Foo, +//! Bar, +//! } +//! assert_eq!( +//! TagEnum::Foo.as_ssz_bytes(), +//! vec![0] +//! ); +//! assert_eq!( +//! TagEnum::from_ssz_bytes(&[1]).unwrap(), +//! TagEnum::Bar, +//! ); //! ``` use darling::{FromDeriveInput, FromMeta}; @@ -154,8 +171,9 @@ const MAX_UNION_SELECTOR: u8 = 127; const ENUM_TRANSPARENT: &str = "transparent"; const ENUM_UNION: &str = "union"; +const ENUM_TAG: &str = "tag"; const NO_ENUM_BEHAVIOUR_ERROR: &str = "enums require an \"enum_behaviour\" attribute with \ - a \"transparent\" or \"union\" value, e.g., #[ssz(enum_behaviour = \"transparent\")]"; + a \"transparent\", \"union\", or \"tag\" value, e.g., #[ssz(enum_behaviour = \"transparent\")]"; #[derive(Debug, FromDeriveInput)] #[darling(attributes(ssz))] @@ -196,6 +214,7 @@ enum StructBehaviour { enum EnumBehaviour { Union, Transparent, + Tag, } impl<'a> Procedure<'a> { @@ -237,6 +256,10 @@ impl<'a> Procedure<'a> { data, behaviour: EnumBehaviour::Transparent, }, + Some("tag") => Procedure::Enum { + data, + behaviour: EnumBehaviour::Tag, + }, Some(other) => panic!( "{} is not a valid enum behaviour, use \"container\" or \"transparent\"", other @@ -296,6 +319,7 @@ pub fn ssz_encode_derive(input: TokenStream) -> TokenStream { Procedure::Enum { data, behaviour } => match behaviour { EnumBehaviour::Transparent => ssz_encode_derive_enum_transparent(&item, data), EnumBehaviour::Union => ssz_encode_derive_enum_union(&item, data), + EnumBehaviour::Tag => ssz_encode_derive_enum_tag(&item, data), }, } } @@ -573,6 +597,67 @@ fn ssz_encode_derive_enum_transparent( output.into() } +/// Derive `ssz::Encode` for an `enum` following the "tag" method. +/// +/// The union selector will be determined based upon the order in which the enum variants are +/// defined. E.g., the top-most variant in the enum will have a selector of `0`, the variant +/// beneath it will have a selector of `1` and so on. +/// +/// # Limitations +/// +/// Only supports enums where each variant has no fields +fn ssz_encode_derive_enum_tag(derive_input: &DeriveInput, enum_data: &DataEnum) -> TokenStream { + let name = &derive_input.ident; + let (impl_generics, ty_generics, where_clause) = &derive_input.generics.split_for_impl(); + + let patterns: Vec<_> = enum_data + .variants + .iter() + .map(|variant| { + let variant_name = &variant.ident; + + if !variant.fields.is_empty() { + panic!("ssz::Encode tag behaviour can only be derived for enums with no fields"); + } + + quote! { + #name::#variant_name + } + }) + .collect(); + + let union_selectors = compute_union_selectors(patterns.len()); + + let output = quote! { + impl #impl_generics ssz::Encode for #name #ty_generics #where_clause { + fn is_ssz_fixed_len() -> bool { + true + } + + fn ssz_fixed_len() -> usize { + 1 + } + + fn ssz_bytes_len(&self) -> usize { + 1 + } + + fn ssz_append(&self, buf: &mut Vec) { + match self { + #( + #patterns => { + let union_selector: u8 = #union_selectors; + debug_assert!(union_selector <= ssz::MAX_UNION_SELECTOR); + buf.push(union_selector); + }, + )* + } + } + } + }; + output.into() +} + /// Derive `ssz::Encode` for an `enum` following the "union" SSZ spec. /// /// The union selector will be determined based upon the order in which the enum variants are @@ -652,9 +737,10 @@ pub fn ssz_decode_derive(input: TokenStream) -> TokenStream { }, Procedure::Enum { data, behaviour } => match behaviour { EnumBehaviour::Union => ssz_decode_derive_enum_union(&item, data), + EnumBehaviour::Tag => ssz_decode_derive_enum_tag(&item, data), EnumBehaviour::Transparent => panic!( - "Decode cannot be derived for enum_behaviour \"{}\", only \"{}\" is valid.", - ENUM_TRANSPARENT, ENUM_UNION + "Decode cannot be derived for enum_behaviour \"{}\", only \"{}\" and \"{}\" is valid.", + ENUM_TRANSPARENT, ENUM_UNION, ENUM_TAG, ), }, } @@ -908,6 +994,59 @@ fn ssz_decode_derive_struct_transparent( output.into() } +/// Derive `ssz::Decode` for an `enum` following the "tag" SSZ spec. +fn ssz_decode_derive_enum_tag(derive_input: &DeriveInput, enum_data: &DataEnum) -> TokenStream { + let name = &derive_input.ident; + let (impl_generics, ty_generics, where_clause) = &derive_input.generics.split_for_impl(); + + let patterns: Vec<_> = enum_data + .variants + .iter() + .map(|variant| { + let variant_name = &variant.ident; + + if !variant.fields.is_empty() { + panic!("ssz::Decode tag behaviour can only be derived for enums with no fields"); + } + + quote! { + #name::#variant_name + } + }) + .collect(); + + let union_selectors = compute_union_selectors(patterns.len()); + + let output = quote! { + impl #impl_generics ssz::Decode for #name #ty_generics #where_clause { + fn is_ssz_fixed_len() -> bool { + true + } + + fn ssz_fixed_len() -> usize { + 1 + } + + fn from_ssz_bytes(bytes: &[u8]) -> std::result::Result { + let byte = bytes + .first() + .copied() + .ok_or(ssz::DecodeError::OutOfBoundsByte { i: 0 })?; + + match byte { + #( + #union_selectors => { + Ok(#patterns) + }, + )* + other => Err(ssz::DecodeError::UnionSelectorInvalid(other)), + } + } + } + }; + output.into() +} + /// Derive `ssz::Decode` for an `enum` following the "union" SSZ spec. fn ssz_decode_derive_enum_union(derive_input: &DeriveInput, enum_data: &DataEnum) -> TokenStream { let name = &derive_input.ident; diff --git a/consensus/ssz_derive/tests/tests.rs b/consensus/ssz_derive/tests/tests.rs index 040d2a3476..72192b293a 100644 --- a/consensus/ssz_derive/tests/tests.rs +++ b/consensus/ssz_derive/tests/tests.rs @@ -12,6 +12,14 @@ fn assert_encode_decode(item: &T, bytes: assert_eq!(T::from_ssz_bytes(bytes).unwrap(), *item); } +#[derive(PartialEq, Debug, Encode, Decode)] +#[ssz(enum_behaviour = "tag")] +enum TagEnum { + A, + B, + C, +} + #[derive(PartialEq, Debug, Encode, Decode)] #[ssz(enum_behaviour = "union")] enum TwoFixedUnion { @@ -120,6 +128,13 @@ fn two_variable_union() { ); } +#[test] +fn tag_enum() { + assert_encode_decode(&TagEnum::A, &[0]); + assert_encode_decode(&TagEnum::B, &[1]); + assert_encode_decode(&TagEnum::C, &[2]); +} + #[derive(PartialEq, Debug, Encode, Decode)] #[ssz(enum_behaviour = "union")] enum TwoVecUnion { diff --git a/consensus/state_processing/src/consensus_context.rs b/consensus/state_processing/src/consensus_context.rs index 37bd5fe446..78803ab4eb 100644 --- a/consensus/state_processing/src/consensus_context.rs +++ b/consensus/state_processing/src/consensus_context.rs @@ -1,5 +1,6 @@ use crate::common::get_indexed_attestation; use crate::per_block_processing::errors::{AttestationInvalid, BlockOperationError}; +use ssz_derive::{Decode, Encode}; use std::collections::{hash_map::Entry, HashMap}; use tree_hash::TreeHash; use types::{ @@ -7,7 +8,7 @@ use types::{ ChainSpec, Epoch, EthSpec, Hash256, IndexedAttestation, SignedBeaconBlock, Slot, }; -#[derive(Debug, Clone)] +#[derive(Debug, PartialEq, Clone, Encode, Decode)] pub struct ConsensusContext { /// Slot to act as an identifier/safeguard slot: Slot, @@ -16,6 +17,8 @@ pub struct ConsensusContext { /// Block root of the block at `slot`. current_block_root: Option, /// Cache of indexed attestations constructed during block processing. + /// We can skip serializing / deserializing this as the cache will just be rebuilt + #[ssz(skip_serializing, skip_deserializing)] indexed_attestations: HashMap<(AttestationData, BitList), IndexedAttestation>, /// Whether `verify_kzg_commitments_against_transactions` has successfully passed. diff --git a/consensus/types/src/beacon_block.rs b/consensus/types/src/beacon_block.rs index 27f15c9ed0..090a361cd4 100644 --- a/consensus/types/src/beacon_block.rs +++ b/consensus/types/src/beacon_block.rs @@ -201,13 +201,7 @@ impl<'a, T: EthSpec, Payload: AbstractExecPayload> BeaconBlockRef<'a, T, Payl /// dictated by `self.slot()`. pub fn fork_name(&self, spec: &ChainSpec) -> Result { let fork_at_slot = spec.fork_name_at_slot::(self.slot()); - let object_fork = match self { - BeaconBlockRef::Base { .. } => ForkName::Base, - BeaconBlockRef::Altair { .. } => ForkName::Altair, - BeaconBlockRef::Merge { .. } => ForkName::Merge, - BeaconBlockRef::Capella { .. } => ForkName::Capella, - BeaconBlockRef::Deneb { .. } => ForkName::Deneb, - }; + let object_fork = self.fork_name_unchecked(); if fork_at_slot == object_fork { Ok(object_fork) @@ -219,6 +213,19 @@ impl<'a, T: EthSpec, Payload: AbstractExecPayload> BeaconBlockRef<'a, T, Payl } } + /// Returns the name of the fork pertaining to `self`. + /// + /// Does not check that the fork is consistent with the slot. + pub fn fork_name_unchecked(&self) -> ForkName { + match self { + BeaconBlockRef::Base { .. } => ForkName::Base, + BeaconBlockRef::Altair { .. } => ForkName::Altair, + BeaconBlockRef::Merge { .. } => ForkName::Merge, + BeaconBlockRef::Capella { .. } => ForkName::Capella, + BeaconBlockRef::Deneb { .. } => ForkName::Deneb, + } + } + /// Convenience accessor for the `body` as a `BeaconBlockBodyRef`. pub fn body(&self) -> BeaconBlockBodyRef<'a, T, Payload> { map_beacon_block_ref_into_beacon_block_body_ref!(&'a _, *self, |block, cons| cons( diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index d480c0fc32..58c0eed339 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -415,13 +415,7 @@ impl BeaconState { /// dictated by `self.slot()`. pub fn fork_name(&self, spec: &ChainSpec) -> Result { let fork_at_slot = spec.fork_name_at_epoch(self.current_epoch()); - let object_fork = match self { - BeaconState::Base { .. } => ForkName::Base, - BeaconState::Altair { .. } => ForkName::Altair, - BeaconState::Merge { .. } => ForkName::Merge, - BeaconState::Capella { .. } => ForkName::Capella, - BeaconState::Deneb { .. } => ForkName::Deneb, - }; + let object_fork = self.fork_name_unchecked(); if fork_at_slot == object_fork { Ok(object_fork) @@ -433,6 +427,19 @@ impl BeaconState { } } + /// Returns the name of the fork pertaining to `self`. + /// + /// Does not check if `self` is consistent with the fork dictated by `self.slot()`. + pub fn fork_name_unchecked(&self) -> ForkName { + match self { + BeaconState::Base { .. } => ForkName::Base, + BeaconState::Altair { .. } => ForkName::Altair, + BeaconState::Merge { .. } => ForkName::Merge, + BeaconState::Capella { .. } => ForkName::Capella, + BeaconState::Deneb { .. } => ForkName::Deneb, + } + } + /// Specialised deserialisation method that uses the `ChainSpec` as context. #[allow(clippy::integer_arithmetic)] pub fn from_ssz_bytes(bytes: &[u8], spec: &ChainSpec) -> Result { @@ -1870,3 +1877,80 @@ impl ForkVersionDeserialize for BeaconState { )) } } + +/// This module can be used to encode and decode a `BeaconState` the same way it +/// would be done if we had tagged the superstruct enum with +/// `#[ssz(enum_behaviour = "union")]` +/// This should _only_ be used for *some* cases to store these objects in the +/// database and _NEVER_ for encoding / decoding states sent over the network! +pub mod ssz_tagged_beacon_state { + use super::*; + pub mod encode { + use super::*; + #[allow(unused_imports)] + use ssz::*; + + pub fn is_ssz_fixed_len() -> bool { + false + } + + pub fn ssz_fixed_len() -> usize { + BYTES_PER_LENGTH_OFFSET + } + + pub fn ssz_bytes_len(state: &BeaconState) -> usize { + state + .ssz_bytes_len() + .checked_add(1) + .expect("encoded length must be less than usize::max") + } + + pub fn ssz_append(state: &BeaconState, buf: &mut Vec) { + let fork_name = state.fork_name_unchecked(); + fork_name.ssz_append(buf); + state.ssz_append(buf); + } + + pub fn as_ssz_bytes(state: &BeaconState) -> Vec { + let mut buf = vec![]; + ssz_append(state, &mut buf); + + buf + } + } + + pub mod decode { + use super::*; + #[allow(unused_imports)] + use ssz::*; + + pub fn is_ssz_fixed_len() -> bool { + false + } + + pub fn ssz_fixed_len() -> usize { + BYTES_PER_LENGTH_OFFSET + } + + pub fn from_ssz_bytes(bytes: &[u8]) -> Result, DecodeError> { + let fork_byte = bytes + .first() + .copied() + .ok_or(DecodeError::OutOfBoundsByte { i: 0 })?; + let body = bytes + .get(1..) + .ok_or(DecodeError::OutOfBoundsByte { i: 1 })?; + match ForkName::from_ssz_bytes(&[fork_byte])? { + ForkName::Base => Ok(BeaconState::Base(BeaconStateBase::from_ssz_bytes(body)?)), + ForkName::Altair => Ok(BeaconState::Altair(BeaconStateAltair::from_ssz_bytes( + body, + )?)), + ForkName::Merge => Ok(BeaconState::Merge(BeaconStateMerge::from_ssz_bytes(body)?)), + ForkName::Capella => Ok(BeaconState::Capella(BeaconStateCapella::from_ssz_bytes( + body, + )?)), + ForkName::Deneb => Ok(BeaconState::Deneb(BeaconStateDeneb::from_ssz_bytes(body)?)), + } + } + } +} diff --git a/consensus/types/src/fork_name.rs b/consensus/types/src/fork_name.rs index e7c1f9628b..6d52e0abbd 100644 --- a/consensus/types/src/fork_name.rs +++ b/consensus/types/src/fork_name.rs @@ -1,12 +1,14 @@ use crate::{ChainSpec, Epoch}; use serde_derive::{Deserialize, Serialize}; +use ssz_derive::{Decode, Encode}; use std::convert::TryFrom; use std::fmt::{self, Display, Formatter}; use std::str::FromStr; -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Decode, Encode, PartialEq, Eq, Hash, Serialize, Deserialize)] #[serde(try_from = "String")] #[serde(into = "String")] +#[ssz(enum_behaviour = "tag")] pub enum ForkName { Base, Altair, diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 617cbcaf02..46c5c2a4ce 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -170,9 +170,9 @@ pub use crate::selection_proof::SelectionProof; pub use crate::shuffling_id::AttestationShufflingId; pub use crate::signed_aggregate_and_proof::SignedAggregateAndProof; pub use crate::signed_beacon_block::{ - SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockCapella, - SignedBeaconBlockDeneb, SignedBeaconBlockHash, SignedBeaconBlockMerge, - SignedBlindedBeaconBlock, + ssz_tagged_signed_beacon_block, SignedBeaconBlock, SignedBeaconBlockAltair, + SignedBeaconBlockBase, SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockHash, + SignedBeaconBlockMerge, SignedBlindedBeaconBlock, }; pub use crate::signed_beacon_block_header::SignedBeaconBlockHeader; pub use crate::signed_blob::*; diff --git a/consensus/types/src/signed_beacon_block.rs b/consensus/types/src/signed_beacon_block.rs index 58810150c2..23a254079d 100644 --- a/consensus/types/src/signed_beacon_block.rs +++ b/consensus/types/src/signed_beacon_block.rs @@ -92,6 +92,12 @@ impl> SignedBeaconBlock self.message().fork_name(spec) } + /// Returns the name of the fork pertaining to `self` + /// Does not check that the fork is consistent with the slot. + pub fn fork_name_unchecked(&self) -> ForkName { + self.message().fork_name_unchecked() + } + /// SSZ decode with fork variant determined by slot. pub fn from_ssz_bytes(bytes: &[u8], spec: &ChainSpec) -> Result { Self::from_ssz_bytes_with(bytes, |bytes| BeaconBlock::from_ssz_bytes(bytes, spec)) @@ -510,6 +516,99 @@ impl> ForkVersionDeserialize } } +/// This module can be used to encode and decode a `SignedBeaconBlock` the same way it +/// would be done if we had tagged the superstruct enum with +/// `#[ssz(enum_behaviour = "union")]` +/// This should _only_ be used *some* cases when storing these objects in the database +/// and _NEVER_ for encoding / decoding blocks sent over the network! +pub mod ssz_tagged_signed_beacon_block { + use super::*; + pub mod encode { + use super::*; + #[allow(unused_imports)] + use ssz::*; + + pub fn is_ssz_fixed_len() -> bool { + false + } + + pub fn ssz_fixed_len() -> usize { + BYTES_PER_LENGTH_OFFSET + } + + pub fn ssz_bytes_len>( + block: &SignedBeaconBlock, + ) -> usize { + block + .ssz_bytes_len() + .checked_add(1) + .expect("encoded length must be less than usize::max") + } + + pub fn ssz_append>( + block: &SignedBeaconBlock, + buf: &mut Vec, + ) { + let fork_name = block.fork_name_unchecked(); + fork_name.ssz_append(buf); + block.ssz_append(buf); + } + + pub fn as_ssz_bytes>( + block: &SignedBeaconBlock, + ) -> Vec { + let mut buf = vec![]; + ssz_append(block, &mut buf); + + buf + } + } + + pub mod decode { + use super::*; + #[allow(unused_imports)] + use ssz::*; + + pub fn is_ssz_fixed_len() -> bool { + false + } + + pub fn ssz_fixed_len() -> usize { + BYTES_PER_LENGTH_OFFSET + } + + pub fn from_ssz_bytes>( + bytes: &[u8], + ) -> Result, DecodeError> { + let fork_byte = bytes + .first() + .copied() + .ok_or(DecodeError::OutOfBoundsByte { i: 0 })?; + let body = bytes + .get(1..) + .ok_or(DecodeError::OutOfBoundsByte { i: 1 })?; + + match ForkName::from_ssz_bytes(&[fork_byte])? { + ForkName::Base => Ok(SignedBeaconBlock::Base( + SignedBeaconBlockBase::from_ssz_bytes(body)?, + )), + ForkName::Altair => Ok(SignedBeaconBlock::Altair( + SignedBeaconBlockAltair::from_ssz_bytes(body)?, + )), + ForkName::Merge => Ok(SignedBeaconBlock::Merge( + SignedBeaconBlockMerge::from_ssz_bytes(body)?, + )), + ForkName::Capella => Ok(SignedBeaconBlock::Capella( + SignedBeaconBlockCapella::from_ssz_bytes(body)?, + )), + ForkName::Deneb => Ok(SignedBeaconBlock::Deneb( + SignedBeaconBlockDeneb::from_ssz_bytes(body)?, + )), + } + } + } +} + #[cfg(test)] mod test { use super::*; @@ -551,4 +650,38 @@ mod test { assert_eq!(reconstructed, block); } } + + #[test] + fn test_ssz_tagged_signed_beacon_block() { + type E = MainnetEthSpec; + + let spec = &E::default_spec(); + let sig = Signature::empty(); + let blocks = vec![ + SignedBeaconBlock::::from_block( + BeaconBlock::Base(BeaconBlockBase::empty(spec)), + sig.clone(), + ), + SignedBeaconBlock::from_block( + BeaconBlock::Altair(BeaconBlockAltair::empty(spec)), + sig.clone(), + ), + SignedBeaconBlock::from_block( + BeaconBlock::Merge(BeaconBlockMerge::empty(spec)), + sig.clone(), + ), + SignedBeaconBlock::from_block( + BeaconBlock::Capella(BeaconBlockCapella::empty(spec)), + sig.clone(), + ), + SignedBeaconBlock::from_block(BeaconBlock::Deneb(BeaconBlockDeneb::empty(spec)), sig), + ]; + + for block in blocks { + let encoded = ssz_tagged_signed_beacon_block::encode::as_ssz_bytes(&block); + let decoded = ssz_tagged_signed_beacon_block::decode::from_ssz_bytes::(&encoded) + .expect("should decode"); + assert_eq!(decoded, block); + } + } }