diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index f2af8cdf89..4c191695b2 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -6,12 +6,12 @@ use crate::block_verification::{AvailabilityPendingExecutedBlock, AvailableExecu use kzg::Error as KzgError; use kzg::Kzg; -use parking_lot::{Mutex, RwLock}; +use parking_lot::RwLock; use slot_clock::SlotClock; -use ssz_types::{Error, VariableList}; +use ssz_types::{Error, FixedVector, VariableList}; use state_processing::per_block_processing::deneb::deneb::verify_kzg_commitments_against_transactions; use std::collections::hash_map::{Entry, OccupiedEntry}; -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::sync::Arc; use types::beacon_block_body::KzgCommitments; use types::blob_sidecar::{BlobIdentifier, BlobSidecar}; @@ -53,8 +53,7 @@ impl From for AvailabilityCheckError { /// have not been verified against blobs /// - blocks that have been fully verified and only require a data availability check pub struct DataAvailabilityChecker { - rpc_blob_cache: RwLock>>>, - gossip_availability_cache: Mutex>>, + availability_cache: RwLock>>, slot_clock: S, kzg: Option>, spec: ChainSpec, @@ -65,16 +64,20 @@ pub struct DataAvailabilityChecker { /// /// The blobs are all gossip and kzg verified. /// The block has completed all verifications except the availability check. -struct GossipAvailabilityCache { +struct ReceivedComponents { /// We use a `BTreeMap` here to maintain the order of `BlobSidecar`s based on index. - verified_blobs: BTreeMap>, + verified_blobs: FixedVector>, T::MaxBlobsPerBlock>, executed_block: Option>, } -impl GossipAvailabilityCache { +impl ReceivedComponents { fn new_from_blob(blob: KzgVerifiedBlob) -> Self { - let mut verified_blobs = BTreeMap::new(); - verified_blobs.insert(blob.blob_index(), blob); + let mut verified_blobs = FixedVector::<_, _>::default(); + // TODO: verify that we've already ensured the blob index < T::MaxBlobsPerBlock + if let Some(mut_maybe_blob) = verified_blobs.get_mut(blob.blob_index() as usize) { + *mut_maybe_blob = Some(blob); + } + Self { verified_blobs, executed_block: None, @@ -83,7 +86,7 @@ impl GossipAvailabilityCache { fn new_from_block(block: AvailabilityPendingExecutedBlock) -> Self { Self { - verified_blobs: BTreeMap::new(), + verified_blobs: <_>::default(), executed_block: Some(block), } } @@ -91,7 +94,17 @@ impl GossipAvailabilityCache { /// Returns `true` if the cache has all blobs corresponding to the /// kzg commitments in the block. fn has_all_blobs(&self, block: &AvailabilityPendingExecutedBlock) -> bool { - self.verified_blobs.len() == block.num_blobs_expected() + for i in 0..block.num_blobs_expected() { + if self + .verified_blobs + .get(i) + .map(|maybe_blob| maybe_blob.is_none()) + .unwrap_or(true) + { + return false; + } + } + true } } @@ -120,17 +133,22 @@ impl Availability { impl DataAvailabilityChecker { pub fn new(slot_clock: S, kzg: Option>, spec: ChainSpec) -> Self { Self { - rpc_blob_cache: <_>::default(), - gossip_availability_cache: <_>::default(), + availability_cache: <_>::default(), slot_clock, kzg, spec, } } - /// Get a blob from the RPC cache. + /// Get a blob from the availability cache. pub fn get_blob(&self, blob_id: &BlobIdentifier) -> Option>> { - self.rpc_blob_cache.read().get(blob_id).cloned() + self.availability_cache + .read() + .get(&blob_id.block_root)? + .verified_blobs + .get(blob_id.index as usize)? + .as_ref() + .map(|kzg_verified_blob| kzg_verified_blob.clone_blob()) } /// This first validates the KZG commitments included in the blob sidecar. @@ -152,22 +170,24 @@ impl DataAvailabilityChecker { return Err(AvailabilityCheckError::KzgNotInitialized); }; - let blob = kzg_verified_blob.clone_blob(); - - let mut blob_cache = self.gossip_availability_cache.lock(); - - // Gossip cache. - let availability = match blob_cache.entry(blob.block_root) { + let availability = match self + .availability_cache + .write() + .entry(kzg_verified_blob.block_root()) + { Entry::Occupied(mut occupied_entry) => { // All blobs reaching this cache should be gossip verified and gossip verification // should filter duplicates, as well as validate indices. - let cache = occupied_entry.get_mut(); + let received_components = occupied_entry.get_mut(); - cache + if let Some(maybe_verified_blob) = received_components .verified_blobs - .insert(kzg_verified_blob.blob_index(), kzg_verified_blob); + .get_mut(kzg_verified_blob.blob_index() as usize) + { + *maybe_verified_blob = Some(kzg_verified_blob) + } - if let Some(executed_block) = cache.executed_block.take() { + if let Some(executed_block) = received_components.executed_block.take() { self.check_block_availability_maybe_cache(occupied_entry, executed_block)? } else { Availability::PendingBlock(block_root) @@ -175,19 +195,11 @@ impl DataAvailabilityChecker { } Entry::Vacant(vacant_entry) => { let block_root = kzg_verified_blob.block_root(); - vacant_entry.insert(GossipAvailabilityCache::new_from_blob(kzg_verified_blob)); + vacant_entry.insert(ReceivedComponents::new_from_blob(kzg_verified_blob)); Availability::PendingBlock(block_root) } }; - drop(blob_cache); - - if let Some(blob_ids) = availability.get_available_blob_ids() { - self.prune_rpc_blob_cache(&blob_ids); - } else { - self.rpc_blob_cache.write().insert(blob.id(), blob.clone()); - } - Ok(availability) } @@ -197,26 +209,21 @@ impl DataAvailabilityChecker { &self, executed_block: AvailabilityPendingExecutedBlock, ) -> Result, AvailabilityCheckError> { - let mut guard = self.gossip_availability_cache.lock(); - let entry = guard.entry(executed_block.import_data.block_root); - - let availability = match entry { + let availability = match self + .availability_cache + .write() + .entry(executed_block.import_data.block_root) + { Entry::Occupied(occupied_entry) => { self.check_block_availability_maybe_cache(occupied_entry, executed_block)? } Entry::Vacant(vacant_entry) => { let all_blob_ids = executed_block.get_all_blob_ids(); - vacant_entry.insert(GossipAvailabilityCache::new_from_block(executed_block)); + vacant_entry.insert(ReceivedComponents::new_from_block(executed_block)); Availability::PendingBlobs(all_blob_ids) } }; - drop(guard); - - if let Some(blob_ids) = availability.get_available_blob_ids() { - self.prune_rpc_blob_cache(&blob_ids); - } - Ok(availability) } @@ -229,21 +236,27 @@ impl DataAvailabilityChecker { /// Returns `Ok(Availability::PendingBlobs(_))` if all corresponding blobs have not been received in the cache. fn check_block_availability_maybe_cache( &self, - mut occupied_entry: OccupiedEntry>, + mut occupied_entry: OccupiedEntry>, executed_block: AvailabilityPendingExecutedBlock, ) -> Result, AvailabilityCheckError> { if occupied_entry.get().has_all_blobs(&executed_block) { + let num_blobs_expected = executed_block.num_blobs_expected(); let AvailabilityPendingExecutedBlock { block, import_data, payload_verification_outcome, } = executed_block; - let GossipAvailabilityCache { + let ReceivedComponents { verified_blobs, executed_block: _, } = occupied_entry.remove(); - let verified_blobs = verified_blobs.into_values().collect(); + + let verified_blobs = Vec::from(verified_blobs) + .into_iter() + .take(num_blobs_expected) + .map(|maybe_blob| maybe_blob.ok_or(AvailabilityCheckError::MissingBlobs)) + .collect::, _>>()?; let available_block = self.make_available(block, verified_blobs)?; Ok(Availability::Available(Box::new( @@ -254,12 +267,17 @@ impl DataAvailabilityChecker { ), ))) } else { - let cached_entry = occupied_entry.get_mut(); + let received_components = occupied_entry.get_mut(); - let missing_blob_ids = executed_block - .get_filtered_blob_ids(|index| cached_entry.verified_blobs.get(&index).is_none()); + let missing_blob_ids = executed_block.get_filtered_blob_ids(|index| { + received_components + .verified_blobs + .get(index as usize) + .map(|maybe_blob| maybe_blob.is_none()) + .unwrap_or(true) + }); - let _ = cached_entry.executed_block.insert(executed_block); + let _ = received_components.executed_block.insert(executed_block); Ok(Availability::PendingBlobs(missing_blob_ids)) } @@ -435,13 +453,6 @@ impl DataAvailabilityChecker { self.data_availability_boundary() .map_or(false, |da_epoch| block_epoch >= da_epoch) } - - pub fn prune_rpc_blob_cache(&self, blob_ids: &[BlobIdentifier]) { - let mut guard = self.rpc_blob_cache.write(); - for id in blob_ids { - guard.remove(id); - } - } } pub enum BlobRequirements {