add processing and processed caching to the DA checker (#4732)

* add processing and processed caching to the DA checker

* move processing cache out of critical cache

* get it compiling

* fix lints

* add docs to `AvailabilityView`

* some self review

* fix lints

* fix beacon chain tests

* cargo fmt

* make availability view easier to implement, start on testing

* move child component cache and finish test

* cargo fix

* cargo fix

* cargo fix

* fmt and lint

* make blob commitments not optional, rename some caches, add missing blobs struct

* Update beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs

Co-authored-by: ethDreamer <37123614+ethDreamer@users.noreply.github.com>

* marks review feedback and other general cleanup

* cargo fix

* improve availability view docs

* some renames

* some renames and docs

* fix should delay lookup logic

* get rid of some wrapper methods

* fix up single lookup changes

* add a couple docs

* add single blob merge method and improve process_... docs

* update some names

* lints

* fix merge

* remove blob indices from lookup creation log

* remove blob indices from lookup creation log

* delayed lookup logging improvement

* check fork choice before doing any blob processing

* remove unused dep

* Update beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs

Co-authored-by: Michael Sproul <micsproul@gmail.com>

* Update beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs

Co-authored-by: Michael Sproul <micsproul@gmail.com>

* Update beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs

Co-authored-by: Michael Sproul <micsproul@gmail.com>

* Update beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs

Co-authored-by: Michael Sproul <micsproul@gmail.com>

* Update beacon_node/network/src/sync/block_lookups/delayed_lookup.rs

Co-authored-by: Michael Sproul <micsproul@gmail.com>

* remove duplicate deps

* use gen range in random blobs geneartor

* rename processing cache fields

* require block root in rpc block construction and check block root consistency

* send peers as vec in single message

* spawn delayed lookup service from network beacon processor

* fix tests

---------

Co-authored-by: ethDreamer <37123614+ethDreamer@users.noreply.github.com>
Co-authored-by: Michael Sproul <micsproul@gmail.com>
This commit is contained in:
realbigsean
2023-10-03 09:59:33 -04:00
committed by GitHub
parent 67aeb6bf6b
commit c7ddf1f0b1
38 changed files with 1894 additions and 1190 deletions

View File

@@ -0,0 +1,566 @@
use super::child_components::ChildComponents;
use crate::blob_verification::KzgVerifiedBlob;
use crate::block_verification_types::AsBlock;
use crate::data_availability_checker::overflow_lru_cache::PendingComponents;
use crate::data_availability_checker::ProcessingComponents;
use crate::AvailabilityPendingExecutedBlock;
use kzg::KzgCommitment;
use ssz_types::FixedVector;
use std::sync::Arc;
use types::beacon_block_body::KzgCommitments;
use types::{BlobSidecar, EthSpec, SignedBeaconBlock};
/// Defines an interface for managing data availability with two key invariants:
///
/// 1. If we haven't seen a block yet, we will insert the first blob for a given (block_root, index)
/// but we won't insert subsequent blobs for the same (block_root, index) if they have a different
/// commitment.
/// 2. On block insertion, any non-matching blob commitments are evicted.
///
/// Types implementing this trait can be used for validating and managing availability
/// of blocks and blobs in a cache-like data structure.
pub trait AvailabilityView<E: EthSpec> {
/// The type representing a block in the implementation.
type BlockType: GetCommitments<E>;
/// The type representing a blob in the implementation. Must implement `Clone`.
type BlobType: Clone + GetCommitment<E>;
/// Returns an immutable reference to the cached block.
fn get_cached_block(&self) -> &Option<Self::BlockType>;
/// Returns an immutable reference to the fixed vector of cached blobs.
fn get_cached_blobs(&self) -> &FixedVector<Option<Self::BlobType>, E::MaxBlobsPerBlock>;
/// Returns a mutable reference to the cached block.
fn get_cached_block_mut(&mut self) -> &mut Option<Self::BlockType>;
/// Returns a mutable reference to the fixed vector of cached blobs.
fn get_cached_blobs_mut(
&mut self,
) -> &mut FixedVector<Option<Self::BlobType>, E::MaxBlobsPerBlock>;
/// Checks if a block exists in the cache.
///
/// Returns:
/// - `true` if a block exists.
/// - `false` otherwise.
fn block_exists(&self) -> bool {
self.get_cached_block().is_some()
}
/// Checks if a blob exists at the given index in the cache.
///
/// Returns:
/// - `true` if a blob exists at the given index.
/// - `false` otherwise.
fn blob_exists(&self, blob_index: usize) -> bool {
self.get_cached_blobs()
.get(blob_index)
.map(|b| b.is_some())
.unwrap_or(false)
}
/// Returns the number of blobs that are expected to be present. Returns `None` if we don't have a
/// block.
///
/// This corresponds to the number of commitments that are present in a block.
fn num_expected_blobs(&self) -> Option<usize> {
self.get_cached_block()
.as_ref()
.map(|b| b.get_commitments().len())
}
/// Returns the number of blobs that have been received and are stored in the cache.
fn num_received_blobs(&self) -> usize {
self.get_cached_blobs().iter().flatten().count()
}
/// Inserts a block into the cache.
fn insert_block(&mut self, block: Self::BlockType) {
*self.get_cached_block_mut() = Some(block)
}
/// Inserts a blob at a specific index in the cache.
///
/// Existing blob at the index will be replaced.
fn insert_blob_at_index(&mut self, blob_index: usize, blob: Self::BlobType) {
if let Some(b) = self.get_cached_blobs_mut().get_mut(blob_index) {
*b = Some(blob);
}
}
/// Merges a given set of blobs into the cache.
///
/// Blobs are only inserted if:
/// 1. The blob entry at the index is empty and no block exists.
/// 2. The block exists and its commitment matches the blob's commitment.
fn merge_blobs(&mut self, blobs: FixedVector<Option<Self::BlobType>, E::MaxBlobsPerBlock>) {
for (index, blob) in blobs.iter().cloned().enumerate() {
let Some(blob) = blob else { continue };
self.merge_single_blob(index, blob);
}
}
/// Merges a single blob into the cache.
///
/// Blobs are only inserted if:
/// 1. The blob entry at the index is empty and no block exists, or
/// 2. The block exists and its commitment matches the blob's commitment.
fn merge_single_blob(&mut self, index: usize, blob: Self::BlobType) {
let commitment = *blob.get_commitment();
if let Some(cached_block) = self.get_cached_block() {
let block_commitment_opt = cached_block.get_commitments().get(index).copied();
if let Some(block_commitment) = block_commitment_opt {
if block_commitment == commitment {
self.insert_blob_at_index(index, blob)
}
}
} else if !self.blob_exists(index) {
self.insert_blob_at_index(index, blob)
}
}
/// Inserts a new block and revalidates the existing blobs against it.
///
/// Blobs that don't match the new block's commitments are evicted.
fn merge_block(&mut self, block: Self::BlockType) {
self.insert_block(block);
let reinsert = std::mem::take(self.get_cached_blobs_mut());
self.merge_blobs(reinsert);
}
/// Checks if the block and all of its expected blobs are available in the cache.
///
/// Returns `true` if both the block exists and the number of received blobs matches the number
/// of expected blobs.
fn is_available(&self) -> bool {
if let Some(num_expected_blobs) = self.num_expected_blobs() {
num_expected_blobs == self.num_received_blobs()
} else {
false
}
}
}
/// Implements the `AvailabilityView` trait for a given struct.
///
/// - `$struct_name`: The name of the struct for which to implement `AvailabilityView`.
/// - `$block_type`: The type to use for `BlockType` in the `AvailabilityView` trait.
/// - `$blob_type`: The type to use for `BlobType` in the `AvailabilityView` trait.
/// - `$block_field`: The field name in the struct that holds the cached block.
/// - `$blob_field`: The field name in the struct that holds the cached blobs.
#[macro_export]
macro_rules! impl_availability_view {
($struct_name:ident, $block_type:ty, $blob_type:ty, $block_field:ident, $blob_field:ident) => {
impl<E: EthSpec> AvailabilityView<E> for $struct_name<E> {
type BlockType = $block_type;
type BlobType = $blob_type;
fn get_cached_block(&self) -> &Option<Self::BlockType> {
&self.$block_field
}
fn get_cached_blobs(
&self,
) -> &FixedVector<Option<Self::BlobType>, E::MaxBlobsPerBlock> {
&self.$blob_field
}
fn get_cached_block_mut(&mut self) -> &mut Option<Self::BlockType> {
&mut self.$block_field
}
fn get_cached_blobs_mut(
&mut self,
) -> &mut FixedVector<Option<Self::BlobType>, E::MaxBlobsPerBlock> {
&mut self.$blob_field
}
}
};
}
impl_availability_view!(
ProcessingComponents,
KzgCommitments<E>,
KzgCommitment,
block_commitments,
blob_commitments
);
impl_availability_view!(
PendingComponents,
AvailabilityPendingExecutedBlock<E>,
KzgVerifiedBlob<E>,
executed_block,
verified_blobs
);
impl_availability_view!(
ChildComponents,
Arc<SignedBeaconBlock<E>>,
Arc<BlobSidecar<E>>,
downloaded_block,
downloaded_blobs
);
pub trait GetCommitments<E: EthSpec> {
fn get_commitments(&self) -> KzgCommitments<E>;
}
pub trait GetCommitment<E: EthSpec> {
fn get_commitment(&self) -> &KzgCommitment;
}
// These implementations are required to implement `AvailabilityView` for `ProcessingView`.
impl<E: EthSpec> GetCommitments<E> for KzgCommitments<E> {
fn get_commitments(&self) -> KzgCommitments<E> {
self.clone()
}
}
impl<E: EthSpec> GetCommitment<E> for KzgCommitment {
fn get_commitment(&self) -> &KzgCommitment {
self
}
}
// These implementations are required to implement `AvailabilityView` for `PendingComponents`.
impl<E: EthSpec> GetCommitments<E> for AvailabilityPendingExecutedBlock<E> {
fn get_commitments(&self) -> KzgCommitments<E> {
self.as_block()
.message()
.body()
.blob_kzg_commitments()
.cloned()
.unwrap_or_default()
}
}
impl<E: EthSpec> GetCommitment<E> for KzgVerifiedBlob<E> {
fn get_commitment(&self) -> &KzgCommitment {
&self.as_blob().kzg_commitment
}
}
// These implementations are required to implement `AvailabilityView` for `ChildComponents`.
impl<E: EthSpec> GetCommitments<E> for Arc<SignedBeaconBlock<E>> {
fn get_commitments(&self) -> KzgCommitments<E> {
self.message()
.body()
.blob_kzg_commitments()
.ok()
.cloned()
.unwrap_or_default()
}
}
impl<E: EthSpec> GetCommitment<E> for Arc<BlobSidecar<E>> {
fn get_commitment(&self) -> &KzgCommitment {
&self.kzg_commitment
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use crate::block_verification_types::BlockImportData;
use crate::eth1_finalization_cache::Eth1FinalizationData;
use crate::test_utils::{generate_rand_block_and_blobs, NumBlobs};
use crate::PayloadVerificationOutcome;
use eth2_network_config::get_trusted_setup;
use fork_choice::PayloadVerificationStatus;
use kzg::{Kzg, TrustedSetup};
use rand::rngs::StdRng;
use rand::SeedableRng;
use state_processing::ConsensusContext;
use types::test_utils::TestRandom;
use types::{BeaconState, ChainSpec, ForkName, MainnetEthSpec, Slot};
type E = MainnetEthSpec;
type Setup<E> = (
SignedBeaconBlock<E>,
FixedVector<Option<BlobSidecar<E>>, <E as EthSpec>::MaxBlobsPerBlock>,
FixedVector<Option<BlobSidecar<E>>, <E as EthSpec>::MaxBlobsPerBlock>,
);
pub fn pre_setup() -> Setup<E> {
let trusted_setup: TrustedSetup =
serde_json::from_reader(get_trusted_setup::<<E as EthSpec>::Kzg>()).unwrap();
let kzg = Kzg::new_from_trusted_setup(trusted_setup).unwrap();
let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64);
let (block, blobs_vec) =
generate_rand_block_and_blobs::<E>(ForkName::Deneb, NumBlobs::Random, &kzg, &mut rng);
let mut blobs: FixedVector<_, <E as EthSpec>::MaxBlobsPerBlock> = FixedVector::default();
for blob in blobs_vec {
if let Some(b) = blobs.get_mut(blob.index as usize) {
*b = Some(blob);
}
}
let mut invalid_blobs: FixedVector<
Option<BlobSidecar<E>>,
<E as EthSpec>::MaxBlobsPerBlock,
> = FixedVector::default();
for (index, blob) in blobs.iter().enumerate() {
let mut invalid_blob_opt = blob.clone();
if let Some(invalid_blob) = invalid_blob_opt.as_mut() {
invalid_blob.kzg_commitment = KzgCommitment::random_for_test(&mut rng);
}
*invalid_blobs.get_mut(index).unwrap() = invalid_blob_opt;
}
(block, blobs, invalid_blobs)
}
type ProcessingViewSetup<E> = (
KzgCommitments<E>,
FixedVector<Option<KzgCommitment>, <E as EthSpec>::MaxBlobsPerBlock>,
FixedVector<Option<KzgCommitment>, <E as EthSpec>::MaxBlobsPerBlock>,
);
pub fn setup_processing_components(
block: SignedBeaconBlock<E>,
valid_blobs: FixedVector<Option<BlobSidecar<E>>, <E as EthSpec>::MaxBlobsPerBlock>,
invalid_blobs: FixedVector<Option<BlobSidecar<E>>, <E as EthSpec>::MaxBlobsPerBlock>,
) -> ProcessingViewSetup<E> {
let commitments = block
.message()
.body()
.blob_kzg_commitments()
.unwrap()
.clone();
let blobs = FixedVector::from(
valid_blobs
.iter()
.map(|blob_opt| blob_opt.as_ref().map(|blob| blob.kzg_commitment))
.collect::<Vec<_>>(),
);
let invalid_blobs = FixedVector::from(
invalid_blobs
.iter()
.map(|blob_opt| blob_opt.as_ref().map(|blob| blob.kzg_commitment))
.collect::<Vec<_>>(),
);
(commitments, blobs, invalid_blobs)
}
type PendingComponentsSetup<E> = (
AvailabilityPendingExecutedBlock<E>,
FixedVector<Option<KzgVerifiedBlob<E>>, <E as EthSpec>::MaxBlobsPerBlock>,
FixedVector<Option<KzgVerifiedBlob<E>>, <E as EthSpec>::MaxBlobsPerBlock>,
);
pub fn setup_pending_components(
block: SignedBeaconBlock<E>,
valid_blobs: FixedVector<Option<BlobSidecar<E>>, <E as EthSpec>::MaxBlobsPerBlock>,
invalid_blobs: FixedVector<Option<BlobSidecar<E>>, <E as EthSpec>::MaxBlobsPerBlock>,
) -> PendingComponentsSetup<E> {
let blobs = FixedVector::from(
valid_blobs
.iter()
.map(|blob_opt| {
blob_opt
.as_ref()
.map(|blob| KzgVerifiedBlob::new(blob.clone()))
})
.collect::<Vec<_>>(),
);
let invalid_blobs = FixedVector::from(
invalid_blobs
.iter()
.map(|blob_opt| {
blob_opt
.as_ref()
.map(|blob| KzgVerifiedBlob::new(blob.clone()))
})
.collect::<Vec<_>>(),
);
let dummy_parent = block.clone_as_blinded();
let block = AvailabilityPendingExecutedBlock {
block: Arc::new(block),
import_data: BlockImportData {
block_root: Default::default(),
state: BeaconState::new(0, Default::default(), &ChainSpec::minimal()),
parent_block: dummy_parent,
parent_eth1_finalization_data: Eth1FinalizationData {
eth1_data: Default::default(),
eth1_deposit_index: 0,
},
confirmed_state_roots: vec![],
consensus_context: ConsensusContext::new(Slot::new(0)),
},
payload_verification_outcome: PayloadVerificationOutcome {
payload_verification_status: PayloadVerificationStatus::Verified,
is_valid_merge_transition_block: false,
},
};
(block, blobs, invalid_blobs)
}
type ChildComponentsSetup<E> = (
Arc<SignedBeaconBlock<E>>,
FixedVector<Option<Arc<BlobSidecar<E>>>, <E as EthSpec>::MaxBlobsPerBlock>,
FixedVector<Option<Arc<BlobSidecar<E>>>, <E as EthSpec>::MaxBlobsPerBlock>,
);
pub fn setup_child_components(
block: SignedBeaconBlock<E>,
valid_blobs: FixedVector<Option<BlobSidecar<E>>, <E as EthSpec>::MaxBlobsPerBlock>,
invalid_blobs: FixedVector<Option<BlobSidecar<E>>, <E as EthSpec>::MaxBlobsPerBlock>,
) -> ChildComponentsSetup<E> {
let blobs = FixedVector::from(
valid_blobs
.into_iter()
.map(|blob_opt| blob_opt.clone().map(Arc::new))
.collect::<Vec<_>>(),
);
let invalid_blobs = FixedVector::from(
invalid_blobs
.into_iter()
.map(|blob_opt| blob_opt.clone().map(Arc::new))
.collect::<Vec<_>>(),
);
(Arc::new(block), blobs, invalid_blobs)
}
pub fn assert_cache_consistent<V: AvailabilityView<E>>(cache: V) {
if let Some(cached_block) = cache.get_cached_block() {
let cached_block_commitments = cached_block.get_commitments();
for index in 0..E::max_blobs_per_block() {
let block_commitment = cached_block_commitments.get(index).copied();
let blob_commitment_opt = cache.get_cached_blobs().get(index).unwrap();
let blob_commitment = blob_commitment_opt.as_ref().map(|b| *b.get_commitment());
assert_eq!(block_commitment, blob_commitment);
}
} else {
panic!("No cached block")
}
}
pub fn assert_empty_blob_cache<V: AvailabilityView<E>>(cache: V) {
for blob in cache.get_cached_blobs().iter() {
assert!(blob.is_none());
}
}
#[macro_export]
macro_rules! generate_tests {
($module_name:ident, $type_name:ty, $block_field:ident, $blob_field:ident, $setup_fn:ident) => {
mod $module_name {
use super::*;
use types::Hash256;
#[test]
fn valid_block_invalid_blobs_valid_blobs() {
let (block_commitments, blobs, random_blobs) = pre_setup();
let (block_commitments, blobs, random_blobs) =
$setup_fn(block_commitments, blobs, random_blobs);
let block_root = Hash256::zero();
let mut cache = <$type_name>::empty(block_root);
cache.merge_block(block_commitments);
cache.merge_blobs(random_blobs);
cache.merge_blobs(blobs);
assert_cache_consistent(cache);
}
#[test]
fn invalid_blobs_block_valid_blobs() {
let (block_commitments, blobs, random_blobs) = pre_setup();
let (block_commitments, blobs, random_blobs) =
$setup_fn(block_commitments, blobs, random_blobs);
let block_root = Hash256::zero();
let mut cache = <$type_name>::empty(block_root);
cache.merge_blobs(random_blobs);
cache.merge_block(block_commitments);
cache.merge_blobs(blobs);
assert_cache_consistent(cache);
}
#[test]
fn invalid_blobs_valid_blobs_block() {
let (block_commitments, blobs, random_blobs) = pre_setup();
let (block_commitments, blobs, random_blobs) =
$setup_fn(block_commitments, blobs, random_blobs);
let block_root = Hash256::zero();
let mut cache = <$type_name>::empty(block_root);
cache.merge_blobs(random_blobs);
cache.merge_blobs(blobs);
cache.merge_block(block_commitments);
assert_empty_blob_cache(cache);
}
#[test]
fn block_valid_blobs_invalid_blobs() {
let (block_commitments, blobs, random_blobs) = pre_setup();
let (block_commitments, blobs, random_blobs) =
$setup_fn(block_commitments, blobs, random_blobs);
let block_root = Hash256::zero();
let mut cache = <$type_name>::empty(block_root);
cache.merge_block(block_commitments);
cache.merge_blobs(blobs);
cache.merge_blobs(random_blobs);
assert_cache_consistent(cache);
}
#[test]
fn valid_blobs_block_invalid_blobs() {
let (block_commitments, blobs, random_blobs) = pre_setup();
let (block_commitments, blobs, random_blobs) =
$setup_fn(block_commitments, blobs, random_blobs);
let block_root = Hash256::zero();
let mut cache = <$type_name>::empty(block_root);
cache.merge_blobs(blobs);
cache.merge_block(block_commitments);
cache.merge_blobs(random_blobs);
assert_cache_consistent(cache);
}
#[test]
fn valid_blobs_invalid_blobs_block() {
let (block_commitments, blobs, random_blobs) = pre_setup();
let (block_commitments, blobs, random_blobs) =
$setup_fn(block_commitments, blobs, random_blobs);
let block_root = Hash256::zero();
let mut cache = <$type_name>::empty(block_root);
cache.merge_blobs(blobs);
cache.merge_blobs(random_blobs);
cache.merge_block(block_commitments);
assert_cache_consistent(cache);
}
}
};
}
generate_tests!(
processing_components_tests,
ProcessingComponents::<E>,
kzg_commitments,
processing_blobs,
setup_processing_components
);
generate_tests!(
pending_components_tests,
PendingComponents<E>,
executed_block,
verified_blobs,
setup_pending_components
);
generate_tests!(
child_component_tests,
ChildComponents::<E>,
downloaded_block,
downloaded_blobs,
setup_child_components
);
}

View File

@@ -0,0 +1,54 @@
use crate::block_verification_types::RpcBlock;
use crate::data_availability_checker::AvailabilityView;
use bls::Hash256;
use std::sync::Arc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{EthSpec, SignedBeaconBlock};
/// For requests triggered by an `UnknownBlockParent` or `UnknownBlobParent`, this struct
/// is used to cache components as they are sent to the network service. We can't use the
/// data availability cache currently because any blocks or blobs without parents
/// won't pass validation and therefore won't make it into the cache.
pub struct ChildComponents<E: EthSpec> {
pub block_root: Hash256,
pub downloaded_block: Option<Arc<SignedBeaconBlock<E>>>,
pub downloaded_blobs: FixedBlobSidecarList<E>,
}
impl<E: EthSpec> From<RpcBlock<E>> for ChildComponents<E> {
fn from(value: RpcBlock<E>) -> Self {
let (block_root, block, blobs) = value.deconstruct();
let fixed_blobs = blobs.map(|blobs| {
FixedBlobSidecarList::from(blobs.into_iter().map(Some).collect::<Vec<_>>())
});
Self::new(block_root, Some(block), fixed_blobs)
}
}
impl<E: EthSpec> ChildComponents<E> {
pub fn empty(block_root: Hash256) -> Self {
Self {
block_root,
downloaded_block: None,
downloaded_blobs: <_>::default(),
}
}
pub fn new(
block_root: Hash256,
block: Option<Arc<SignedBeaconBlock<E>>>,
blobs: Option<FixedBlobSidecarList<E>>,
) -> Self {
let mut cache = Self::empty(block_root);
if let Some(block) = block {
cache.merge_block(block);
}
if let Some(blobs) = blobs {
cache.merge_blobs(blobs);
}
cache
}
pub fn clear_blobs(&mut self) {
self.downloaded_blobs = FixedBlobSidecarList::default();
}
}

View File

@@ -30,21 +30,20 @@
use crate::beacon_chain::BeaconStore;
use crate::blob_verification::KzgVerifiedBlob;
use crate::block_verification_types::{
AsBlock, AvailabilityPendingExecutedBlock, AvailableExecutedBlock,
AsBlock, AvailabilityPendingExecutedBlock, AvailableBlock, AvailableExecutedBlock,
};
use crate::data_availability_checker::{make_available, Availability, AvailabilityCheckError};
use crate::data_availability_checker::availability_view::AvailabilityView;
use crate::data_availability_checker::{Availability, AvailabilityCheckError};
use crate::store::{DBColumn, KeyValueStore};
use crate::BeaconChainTypes;
use lru::LruCache;
use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard};
use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use ssz_types::FixedVector;
use ssz_types::{FixedVector, VariableList};
use std::{collections::HashSet, sync::Arc};
use types::blob_sidecar::BlobIdentifier;
use types::{BlobSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlock};
type MissingBlobInfo<T> = (Option<Arc<SignedBeaconBlock<T>>>, HashSet<usize>);
use types::{BlobSidecar, Epoch, EthSpec, Hash256};
/// This represents the components of a partially available block
///
@@ -52,53 +51,59 @@ type MissingBlobInfo<T> = (Option<Arc<SignedBeaconBlock<T>>>, HashSet<usize>);
/// The block has completed all verifications except the availability check.
#[derive(Encode, Decode, Clone)]
pub struct PendingComponents<T: EthSpec> {
verified_blobs: FixedVector<Option<KzgVerifiedBlob<T>>, T::MaxBlobsPerBlock>,
executed_block: Option<AvailabilityPendingExecutedBlock<T>>,
pub block_root: Hash256,
pub verified_blobs: FixedVector<Option<KzgVerifiedBlob<T>>, T::MaxBlobsPerBlock>,
pub executed_block: Option<AvailabilityPendingExecutedBlock<T>>,
}
impl<T: EthSpec> PendingComponents<T> {
pub fn new_from_blobs(blobs: &[KzgVerifiedBlob<T>]) -> Self {
let mut verified_blobs = FixedVector::<_, _>::default();
for blob in blobs {
if let Some(mut_maybe_blob) = verified_blobs.get_mut(blob.blob_index() as usize) {
*mut_maybe_blob = Some(blob.clone());
}
}
pub fn empty(block_root: Hash256) -> Self {
Self {
block_root,
verified_blobs: FixedVector::default(),
executed_block: None,
}
}
/// Verifies an `SignedBeaconBlock` against a set of KZG verified blobs.
/// This does not check whether a block *should* have blobs, these checks should have been
/// completed when producing the `AvailabilityPendingBlock`.
pub fn make_available(self) -> Result<Availability<T>, AvailabilityCheckError> {
let Self {
block_root,
verified_blobs,
executed_block: None,
}
}
executed_block,
} = self;
pub fn new_from_block(block: AvailabilityPendingExecutedBlock<T>) -> Self {
Self {
verified_blobs: <_>::default(),
executed_block: Some(block),
}
}
let Some(executed_block) = executed_block else {
return Err(AvailabilityCheckError::Unexpected);
};
let num_blobs_expected = executed_block.num_blobs_expected();
let Some(verified_blobs) = verified_blobs
.into_iter()
.cloned()
.map(|b| b.map(|b| b.to_blob()))
.take(num_blobs_expected)
.collect::<Option<Vec<_>>>()
else {
return Err(AvailabilityCheckError::Unexpected);
};
let verified_blobs = VariableList::new(verified_blobs)?;
/// Returns `true` if the cache has all blobs corresponding to the
/// kzg commitments in the block.
pub fn has_all_blobs(&self, block: &AvailabilityPendingExecutedBlock<T>) -> bool {
for i in 0..block.num_blobs_expected() {
if self
.verified_blobs
.get(i)
.map(|maybe_blob| maybe_blob.is_none())
.unwrap_or(true)
{
return false;
}
}
true
}
let AvailabilityPendingExecutedBlock {
block,
import_data,
payload_verification_outcome,
} = executed_block;
pub fn empty() -> Self {
Self {
verified_blobs: <_>::default(),
executed_block: None,
}
let available_block = AvailableBlock {
block_root,
block,
blobs: Some(verified_blobs),
};
Ok(Availability::Available(Box::new(
AvailableExecutedBlock::new(available_block, import_data, payload_verification_outcome),
)))
}
pub fn epoch(&self) -> Option<Epoch> {
@@ -116,20 +121,6 @@ impl<T: EthSpec> PendingComponents<T> {
None
})
}
pub fn get_missing_blob_info(&self) -> MissingBlobInfo<T> {
let block_opt = self
.executed_block
.as_ref()
.map(|block| block.block.clone());
let blobs = self
.verified_blobs
.iter()
.enumerate()
.filter_map(|(i, maybe_blob)| maybe_blob.as_ref().map(|_| i))
.collect::<HashSet<_>>();
(block_opt, blobs)
}
}
/// Blocks and blobs are stored in the database sequentially so that it's
@@ -216,14 +207,14 @@ impl<T: BeaconChainTypes> OverflowStore<T> {
match OverflowKey::from_ssz_bytes(&key_bytes)? {
OverflowKey::Block(_) => {
maybe_pending_components
.get_or_insert_with(PendingComponents::empty)
.get_or_insert_with(|| PendingComponents::empty(block_root))
.executed_block = Some(AvailabilityPendingExecutedBlock::from_ssz_bytes(
value_bytes.as_slice(),
)?);
}
OverflowKey::Blob(_, index) => {
*maybe_pending_components
.get_or_insert_with(PendingComponents::empty)
.get_or_insert_with(|| PendingComponents::empty(block_root))
.verified_blobs
.get_mut(index as usize)
.ok_or(AvailabilityCheckError::BlobIndexInvalid(index as u64))? =
@@ -245,23 +236,6 @@ impl<T: BeaconChainTypes> OverflowStore<T> {
Ok(disk_keys)
}
/// Load a single block from the database (ignoring blobs)
pub fn load_block(
&self,
block_root: &Hash256,
) -> Result<Option<AvailabilityPendingExecutedBlock<T::EthSpec>>, AvailabilityCheckError> {
let key = OverflowKey::from_block_root(*block_root);
self.0
.hot_db
.get_bytes(DBColumn::OverflowLRUCache.as_str(), &key.as_ssz_bytes())?
.map(|block_bytes| {
AvailabilityPendingExecutedBlock::from_ssz_bytes(block_bytes.as_slice())
})
.transpose()
.map_err(|e| e.into())
}
/// Load a single blob from the database
pub fn load_blob(
&self,
@@ -404,43 +378,6 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
})
}
/// Returns whether or not a block is in the cache (in memory or on disk)
pub fn has_block(&self, block_root: &Hash256) -> bool {
let read_lock = self.critical.read();
if read_lock
.in_memory
.peek(block_root)
.map_or(false, |cache| cache.executed_block.is_some())
{
true
} else if read_lock.store_keys.contains(block_root) {
drop(read_lock);
// If there's some kind of error reading from the store, we should just return false
self.overflow_store
.load_block(block_root)
.map_or(false, |maybe_block| maybe_block.is_some())
} else {
false
}
}
/// Fetch the missing blob info for a block without affecting the LRU ordering
pub fn get_missing_blob_info(&self, block_root: Hash256) -> MissingBlobInfo<T::EthSpec> {
let read_lock = self.critical.read();
if let Some(cache) = read_lock.in_memory.peek(&block_root) {
cache.get_missing_blob_info()
} else if read_lock.store_keys.contains(&block_root) {
drop(read_lock);
// return default if there's an error reading from the store
match self.overflow_store.load_pending_components(block_root) {
Ok(Some(pending_components)) => pending_components.get_missing_blob_info(),
_ => Default::default(),
}
} else {
Default::default()
}
}
/// Fetch a blob from the cache without affecting the LRU ordering
pub fn peek_blob(
&self,
@@ -460,59 +397,44 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
pub fn put_kzg_verified_blobs(
&self,
block_root: Hash256,
kzg_verified_blobs: &[KzgVerifiedBlob<T::EthSpec>],
kzg_verified_blobs: Vec<KzgVerifiedBlob<T::EthSpec>>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let mut fixed_blobs = FixedVector::default();
// Initial check to ensure all provided blobs have a consistent block root.
for blob in kzg_verified_blobs {
let blob_block_root = blob.block_root();
if blob_block_root != block_root {
return Err(AvailabilityCheckError::BlockBlobRootMismatch {
return Err(AvailabilityCheckError::InconsistentBlobBlockRoots {
block_root,
blob_block_root,
});
}
if let Some(blob_opt) = fixed_blobs.get_mut(blob.blob_index() as usize) {
*blob_opt = Some(blob);
}
}
let mut write_lock = self.critical.write();
let availability = if let Some(mut pending_components) =
write_lock.pop_pending_components(block_root, &self.overflow_store)?
{
for kzg_verified_blob in kzg_verified_blobs {
let blob_index = kzg_verified_blob.blob_index() as usize;
if let Some(maybe_verified_blob) =
pending_components.verified_blobs.get_mut(blob_index)
{
*maybe_verified_blob = Some(kzg_verified_blob.clone())
} else {
return Err(AvailabilityCheckError::BlobIndexInvalid(blob_index as u64));
}
}
// Grab existing entry or create a new entry.
let mut pending_components = write_lock
.pop_pending_components(block_root, &self.overflow_store)?
.unwrap_or_else(|| PendingComponents::empty(block_root));
if let Some(executed_block) = pending_components.executed_block.take() {
self.check_block_availability_maybe_cache(
write_lock,
pending_components,
executed_block,
)?
} else {
write_lock.put_pending_components(
block_root,
pending_components,
&self.overflow_store,
)?;
Availability::MissingComponents(block_root)
}
// Merge in the blobs.
pending_components.merge_blobs(fixed_blobs);
if pending_components.is_available() {
pending_components.make_available()
} else {
// not in memory or store -> put new in memory
let new_pending_components = PendingComponents::new_from_blobs(kzg_verified_blobs);
write_lock.put_pending_components(
block_root,
new_pending_components,
pending_components,
&self.overflow_store,
)?;
Availability::MissingComponents(block_root)
};
Ok(availability)
Ok(Availability::MissingComponents(block_root))
}
}
/// Check if we have all the blobs for a block. If we do, return the Availability variant that
@@ -524,90 +446,23 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
let mut write_lock = self.critical.write();
let block_root = executed_block.import_data.block_root;
let availability =
match write_lock.pop_pending_components(block_root, &self.overflow_store)? {
Some(pending_components) => self.check_block_availability_maybe_cache(
write_lock,
pending_components,
executed_block,
)?,
None => {
let all_blob_ids = executed_block.get_all_blob_ids();
if all_blob_ids.is_empty() {
// no blobs for this block, we can import it
let AvailabilityPendingExecutedBlock {
block,
import_data,
payload_verification_outcome,
} = executed_block;
let available_block = make_available(block, vec![])?;
return Ok(Availability::Available(Box::new(
AvailableExecutedBlock::new(
available_block,
import_data,
payload_verification_outcome,
),
)));
}
let new_pending_components = PendingComponents::new_from_block(executed_block);
write_lock.put_pending_components(
block_root,
new_pending_components,
&self.overflow_store,
)?;
Availability::MissingComponents(block_root)
}
};
// Grab existing entry or create a new entry.
let mut pending_components = write_lock
.pop_pending_components(block_root, &self.overflow_store)?
.unwrap_or_else(|| PendingComponents::empty(block_root));
Ok(availability)
}
// Merge in the block.
pending_components.merge_block(executed_block);
/// Checks if the provided `executed_block` contains all required blobs to be considered an
/// `AvailableBlock` based on blobs that are cached.
///
/// Returns an error if there was an error when matching the block commitments against blob commitments.
///
/// Returns `Ok(Availability::Available(_))` if all blobs for the block are present in cache.
/// Returns `Ok(Availability::MissingComponents(_))` if all corresponding blobs have not been received in the cache.
fn check_block_availability_maybe_cache(
&self,
mut write_lock: RwLockWriteGuard<Critical<T>>,
mut pending_components: PendingComponents<T::EthSpec>,
executed_block: AvailabilityPendingExecutedBlock<T::EthSpec>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
if pending_components.has_all_blobs(&executed_block) {
let num_blobs_expected = executed_block.num_blobs_expected();
let AvailabilityPendingExecutedBlock {
block,
import_data,
payload_verification_outcome,
} = executed_block;
let Some(verified_blobs) = Vec::from(pending_components.verified_blobs)
.into_iter()
.take(num_blobs_expected)
.collect::<Option<Vec<_>>>()
else {
return Ok(Availability::MissingComponents(import_data.block_root));
};
let available_block = make_available(block, verified_blobs)?;
Ok(Availability::Available(Box::new(
AvailableExecutedBlock::new(
available_block,
import_data,
payload_verification_outcome,
),
)))
// Check if we have all components and entire set is consistent.
if pending_components.is_available() {
pending_components.make_available()
} else {
let block_root = executed_block.import_data.block_root;
let _ = pending_components.executed_block.insert(executed_block);
write_lock.put_pending_components(
block_root,
pending_components,
&self.overflow_store,
)?;
Ok(Availability::MissingComponents(block_root))
}
}
@@ -1224,7 +1079,7 @@ mod test {
.expect("kzg should verify");
kzg_verified_blobs.push(kzg_verified_blob);
let availability = cache
.put_kzg_verified_blobs(root, kzg_verified_blobs.as_slice())
.put_kzg_verified_blobs(root, kzg_verified_blobs.clone())
.expect("should put blob");
if blob_index == blobs_expected - 1 {
assert!(matches!(availability, Availability::Available(_)));
@@ -1252,7 +1107,7 @@ mod test {
.expect("kzg should verify");
kzg_verified_blobs.push(kzg_verified_blob);
let availability = cache
.put_kzg_verified_blobs(root, kzg_verified_blobs.as_slice())
.put_kzg_verified_blobs(root, kzg_verified_blobs.clone())
.expect("should put blob");
assert_eq!(
availability,
@@ -1397,7 +1252,7 @@ mod test {
.expect("kzg should verify");
kzg_verified_blobs.push(kzg_verified_blob);
let availability = cache
.put_kzg_verified_blobs(roots[0], kzg_verified_blobs.as_slice())
.put_kzg_verified_blobs(roots[0], kzg_verified_blobs.clone())
.expect("should put blob");
if blob_index == expected_blobs - 1 {
assert!(matches!(availability, Availability::Available(_)));
@@ -1504,7 +1359,7 @@ mod test {
"should have pending blobs"
);
let availability = cache
.put_kzg_verified_blobs(block_root, kzg_verified_blobs.as_slice())
.put_kzg_verified_blobs(block_root, kzg_verified_blobs)
.expect("should put blob");
assert!(
matches!(availability, Availability::MissingComponents(_)),
@@ -1513,7 +1368,7 @@ mod test {
);
} else {
let availability = cache
.put_kzg_verified_blobs(block_root, kzg_verified_blobs.as_slice())
.put_kzg_verified_blobs(block_root, kzg_verified_blobs)
.expect("should put blob");
let root = pending_block.block.as_block().canonical_root();
assert_eq!(
@@ -1656,7 +1511,7 @@ mod test {
"should have pending blobs"
);
let availability = cache
.put_kzg_verified_blobs(block_root, kzg_verified_blobs.as_slice())
.put_kzg_verified_blobs(block_root, kzg_verified_blobs)
.expect("should put blob");
assert!(
matches!(availability, Availability::MissingComponents(_)),
@@ -1665,7 +1520,7 @@ mod test {
);
} else {
let availability = cache
.put_kzg_verified_blobs(block_root, kzg_verified_blobs.as_slice())
.put_kzg_verified_blobs(block_root, kzg_verified_blobs)
.expect("should put blob");
let root = pending_block.block.as_block().canonical_root();
assert_eq!(
@@ -1757,7 +1612,7 @@ mod test {
.expect("kzg should verify");
kzg_verified_blobs.push(kzg_verified_blob);
let availability = recovered_cache
.put_kzg_verified_blobs(root, kzg_verified_blobs.as_slice())
.put_kzg_verified_blobs(root, kzg_verified_blobs.clone())
.expect("should put blob");
if i == additional_blobs - 1 {
assert!(matches!(availability, Availability::Available(_)))

View File

@@ -0,0 +1,74 @@
use crate::data_availability_checker::AvailabilityView;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use types::beacon_block_body::{KzgCommitmentOpts, KzgCommitments};
use types::{EthSpec, Hash256, Slot};
/// This cache is used only for gossip blocks/blobs and single block/blob lookups, to give req/resp
/// a view of what we have and what we require. This cache serves a slightly different purpose than
/// gossip caches because it allows us to process duplicate blobs that are valid in gossip.
/// See `AvailabilityView`'s trait definition.
#[derive(Default)]
pub struct ProcessingCache<E: EthSpec> {
processing_cache: HashMap<Hash256, ProcessingComponents<E>>,
}
impl<E: EthSpec> ProcessingCache<E> {
pub fn get(&self, block_root: &Hash256) -> Option<&ProcessingComponents<E>> {
self.processing_cache.get(block_root)
}
pub fn entry(&mut self, block_root: Hash256) -> Entry<'_, Hash256, ProcessingComponents<E>> {
self.processing_cache.entry(block_root)
}
pub fn remove(&mut self, block_root: &Hash256) {
self.processing_cache.remove(block_root);
}
pub fn has_block(&self, block_root: &Hash256) -> bool {
self.processing_cache
.get(block_root)
.map_or(false, |b| b.block_exists())
}
pub fn incomplete_processing_components(&self, slot: Slot) -> Vec<Hash256> {
let mut roots_missing_components = vec![];
for (&block_root, info) in self.processing_cache.iter() {
if info.slot == slot && !info.is_available() {
roots_missing_components.push(block_root);
}
}
roots_missing_components
}
}
#[derive(Debug, Clone)]
pub struct ProcessingComponents<E: EthSpec> {
slot: Slot,
/// Blobs required for a block can only be known if we have seen the block. So `Some` here
/// means we've seen it, a `None` means we haven't. The `kzg_commitments` value helps us figure
/// out whether incoming blobs actually match the block.
pub block_commitments: Option<KzgCommitments<E>>,
/// `KzgCommitments` for blobs are always known, even if we haven't seen the block. See
/// `AvailabilityView`'s trait definition for more details.
pub blob_commitments: KzgCommitmentOpts<E>,
}
impl<E: EthSpec> ProcessingComponents<E> {
pub fn new(slot: Slot) -> Self {
Self {
slot,
block_commitments: None,
blob_commitments: KzgCommitmentOpts::<E>::default(),
}
}
}
// Not safe for use outside of tests as this always required a slot.
#[cfg(test)]
impl<E: EthSpec> ProcessingComponents<E> {
pub fn empty(_block_root: Hash256) -> Self {
Self {
slot: Slot::new(0),
block_commitments: None,
blob_commitments: KzgCommitmentOpts::<E>::default(),
}
}
}