Update is_available check to support PeerDAS. (#6076)

* Update `is_available` check to support PeerDAS.

* Merge branch 'unstable' into da-checker-das

# Conflicts:
#	beacon_node/beacon_chain/src/data_availability_checker.rs
#	beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs

* Simplify code using `map_or`

* Merge branch 'unstable' into da-checker-das

# Conflicts:
#	consensus/types/src/chain_spec.rs

* Remove `epoch` method from `PendingComponents`

* Add `put_kzg_verified_data_columns` method.
This commit is contained in:
Jimmy Chen
2024-07-22 23:23:21 +10:00
committed by GitHub
parent 06dff60992
commit bca732e19d
8 changed files with 195 additions and 23 deletions

View File

@@ -3288,9 +3288,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
}
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
let availability = self
.data_availability_checker
.put_rpc_blobs(block_root, blobs)?;
.put_rpc_blobs(block_root, epoch, blobs)?;
self.process_availability(slot, availability).await
}

View File

@@ -16,7 +16,9 @@ use ssz_types::VariableList;
use std::time::Duration;
use tree_hash::TreeHash;
use types::blob_sidecar::BlobIdentifier;
use types::{BeaconStateError, BlobSidecar, EthSpec, Hash256, SignedBeaconBlockHeader, Slot};
use types::{
BeaconStateError, BlobSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlockHeader, Slot,
};
/// An error occurred while validating a gossip blob.
#[derive(Debug)]
@@ -223,6 +225,9 @@ impl<T: BeaconChainTypes> GossipVerifiedBlob<T> {
pub fn slot(&self) -> Slot {
self.blob.blob.slot()
}
pub fn epoch(&self) -> Epoch {
self.blob.blob.epoch()
}
pub fn index(&self) -> u64 {
self.blob.blob.index
}

View File

@@ -94,8 +94,17 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
log: &Logger,
spec: ChainSpec,
) -> Result<Self, AvailabilityCheckError> {
let overflow_cache =
DataAvailabilityCheckerInner::new(OVERFLOW_LRU_CAPACITY, store, spec.clone())?;
// TODO(das): support supernode or custom custody requirement
let custody_subnet_count = spec.custody_requirement as usize;
let custody_column_count =
custody_subnet_count.saturating_mul(spec.data_columns_per_subnet());
let overflow_cache = DataAvailabilityCheckerInner::new(
OVERFLOW_LRU_CAPACITY,
store,
custody_column_count,
spec.clone(),
)?;
Ok(Self {
availability_cache: Arc::new(overflow_cache),
slot_clock,
@@ -143,6 +152,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
pub fn put_rpc_blobs(
&self,
block_root: Hash256,
epoch: Epoch,
blobs: FixedBlobSidecarList<T::EthSpec>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let Some(kzg) = self.kzg.as_ref() else {
@@ -159,7 +169,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.map_err(AvailabilityCheckError::Kzg)?;
self.availability_cache
.put_kzg_verified_blobs(block_root, verified_blobs)
.put_kzg_verified_blobs(block_root, epoch, verified_blobs)
}
/// Check if we've cached other blobs for this block. If it completes a set and we also
@@ -171,8 +181,11 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
&self,
gossip_blob: GossipVerifiedBlob<T>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
self.availability_cache
.put_kzg_verified_blobs(gossip_blob.block_root(), vec![gossip_blob.into_inner()])
self.availability_cache.put_kzg_verified_blobs(
gossip_blob.block_root(),
gossip_blob.epoch(),
vec![gossip_blob.into_inner()],
)
}
/// Check if we have all the blobs for a block. Returns `Availability` which has information

View File

@@ -10,6 +10,7 @@ pub enum Error {
blob_commitment: KzgCommitment,
block_commitment: KzgCommitment,
},
UnableToDetermineImportRequirement,
Unexpected,
SszTypes(ssz_types::Error),
MissingBlobs,
@@ -41,6 +42,7 @@ impl Error {
| Error::Unexpected
| Error::ParentStateMissing(_)
| Error::BlockReplayError(_)
| Error::UnableToDetermineImportRequirement
| Error::RebuildingStateCaches(_)
| Error::SlotClockError => ErrorCategory::Internal,
Error::Kzg(_)

View File

@@ -5,6 +5,7 @@ use crate::block_verification_types::{
AvailabilityPendingExecutedBlock, AvailableBlock, AvailableExecutedBlock,
};
use crate::data_availability_checker::{Availability, AvailabilityCheckError};
use crate::data_column_verification::KzgVerifiedCustodyDataColumn;
use crate::BeaconChainTypes;
use lru::LruCache;
use parking_lot::RwLock;
@@ -23,9 +24,15 @@ use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock};
pub struct PendingComponents<E: EthSpec> {
pub block_root: Hash256,
pub verified_blobs: FixedVector<Option<KzgVerifiedBlob<E>>, E::MaxBlobsPerBlock>,
pub verified_data_columns: Vec<KzgVerifiedCustodyDataColumn<E>>,
pub executed_block: Option<DietAvailabilityPendingExecutedBlock<E>>,
}
pub enum BlockImportRequirement {
AllBlobs,
CustodyColumns(usize),
}
impl<E: EthSpec> PendingComponents<E> {
/// Returns an immutable reference to the cached block.
pub fn get_cached_block(&self) -> &Option<DietAvailabilityPendingExecutedBlock<E>> {
@@ -39,6 +46,16 @@ impl<E: EthSpec> PendingComponents<E> {
&self.verified_blobs
}
/// Returns an immutable reference to the cached data column.
pub fn get_cached_data_column(
&self,
data_column_index: u64,
) -> Option<&KzgVerifiedCustodyDataColumn<E>> {
self.verified_data_columns
.iter()
.find(|d| d.index() == data_column_index)
}
/// Returns a mutable reference to the cached block.
pub fn get_cached_block_mut(&mut self) -> &mut Option<DietAvailabilityPendingExecutedBlock<E>> {
&mut self.executed_block
@@ -78,6 +95,20 @@ impl<E: EthSpec> PendingComponents<E> {
self.get_cached_blobs().iter().flatten().count()
}
/// Checks if a data column of a given index exists in the cache.
///
/// Returns:
/// - `true` if a data column for the given index exists.
/// - `false` otherwise.
fn data_column_exists(&self, data_column_index: u64) -> bool {
self.get_cached_data_column(data_column_index).is_some()
}
/// Returns the number of data columns that have been received and are stored in the cache.
pub fn num_received_data_columns(&self) -> usize {
self.verified_data_columns.len()
}
/// Inserts a block into the cache.
pub fn insert_block(&mut self, block: DietAvailabilityPendingExecutedBlock<E>) {
*self.get_cached_block_mut() = Some(block)
@@ -125,6 +156,18 @@ impl<E: EthSpec> PendingComponents<E> {
}
}
/// Merges a given set of data columns into the cache.
fn merge_data_columns<I: IntoIterator<Item = KzgVerifiedCustodyDataColumn<E>>>(
&mut self,
kzg_verified_data_columns: I,
) {
for data_column in kzg_verified_data_columns {
if !self.data_column_exists(data_column.index()) {
self.verified_data_columns.push(data_column);
}
}
}
/// Inserts a new block and revalidates the existing blobs against it.
///
/// Blobs that don't match the new block's commitments are evicted.
@@ -134,15 +177,27 @@ impl<E: EthSpec> PendingComponents<E> {
self.merge_blobs(reinsert);
}
/// Checks if the block and all of its expected blobs are available in the cache.
/// Checks if the block and all of its expected blobs or custody columns (post-PeerDAS) are
/// available in the cache.
///
/// Returns `true` if both the block exists and the number of received blobs matches the number
/// of expected blobs.
pub fn is_available(&self) -> bool {
if let Some(num_expected_blobs) = self.num_expected_blobs() {
num_expected_blobs == self.num_received_blobs()
} else {
false
/// Returns `true` if both the block exists and the number of received blobs / custody columns
/// matches the number of expected blobs / custody columns.
pub fn is_available(&self, block_import_requirement: &BlockImportRequirement) -> bool {
match block_import_requirement {
BlockImportRequirement::AllBlobs => self
.num_expected_blobs()
.map_or(false, |num_expected_blobs| {
num_expected_blobs == self.num_received_blobs()
}),
BlockImportRequirement::CustodyColumns(num_expected_columns) => {
let num_received_data_columns = self.num_received_data_columns();
// No data columns when there are 0 blobs
self.num_expected_blobs()
.map_or(false, |num_expected_blobs| {
num_expected_blobs == 0
|| *num_expected_columns == num_received_data_columns
})
}
}
}
@@ -151,6 +206,7 @@ impl<E: EthSpec> PendingComponents<E> {
Self {
block_root,
verified_blobs: FixedVector::default(),
verified_data_columns: vec![],
executed_block: None,
}
}
@@ -170,6 +226,7 @@ impl<E: EthSpec> PendingComponents<E> {
let Self {
block_root,
verified_blobs,
verified_data_columns: _,
executed_block,
} = self;
@@ -222,17 +279,23 @@ pub struct DataAvailabilityCheckerInner<T: BeaconChainTypes> {
/// This cache holds a limited number of states in memory and reconstructs them
/// from disk when necessary. This is necessary until we merge tree-states
state_cache: StateLRUCache<T>,
/// The number of data columns the node is custodying.
custody_column_count: usize,
spec: ChainSpec,
}
impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
pub fn new(
capacity: NonZeroUsize,
beacon_store: BeaconStore<T>,
custody_column_count: usize,
spec: ChainSpec,
) -> Result<Self, AvailabilityCheckError> {
Ok(Self {
critical: RwLock::new(LruCache::new(capacity)),
state_cache: StateLRUCache::new(beacon_store, spec),
state_cache: StateLRUCache::new(beacon_store, spec.clone()),
custody_column_count,
spec,
})
}
@@ -277,9 +340,24 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
f(self.critical.read().peek(block_root))
}
fn block_import_requirement(
&self,
epoch: Epoch,
) -> Result<BlockImportRequirement, AvailabilityCheckError> {
let peer_das_enabled = self.spec.is_peer_das_enabled_for_epoch(epoch);
if peer_das_enabled {
Ok(BlockImportRequirement::CustodyColumns(
self.custody_column_count,
))
} else {
Ok(BlockImportRequirement::AllBlobs)
}
}
pub fn put_kzg_verified_blobs<I: IntoIterator<Item = KzgVerifiedBlob<T::EthSpec>>>(
&self,
block_root: Hash256,
epoch: Epoch,
kzg_verified_blobs: I,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let mut fixed_blobs = FixedVector::default();
@@ -301,7 +379,43 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
// Merge in the blobs.
pending_components.merge_blobs(fixed_blobs);
if pending_components.is_available() {
let block_import_requirement = self.block_import_requirement(epoch)?;
if pending_components.is_available(&block_import_requirement) {
write_lock.put(block_root, pending_components.clone());
// No need to hold the write lock anymore
drop(write_lock);
pending_components.make_available(|diet_block| {
self.state_cache.recover_pending_executed_block(diet_block)
})
} else {
write_lock.put(block_root, pending_components);
Ok(Availability::MissingComponents(block_root))
}
}
// TODO(das): gossip and rpc code paths to be implemented.
#[allow(dead_code)]
pub fn put_kzg_verified_data_columns<
I: IntoIterator<Item = KzgVerifiedCustodyDataColumn<T::EthSpec>>,
>(
&self,
block_root: Hash256,
epoch: Epoch,
kzg_verified_data_columns: I,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let mut write_lock = self.critical.write();
// Grab existing entry or create a new entry.
let mut pending_components = write_lock
.pop_entry(&block_root)
.map(|(_, v)| v)
.unwrap_or_else(|| PendingComponents::empty(block_root));
// Merge in the data columns.
pending_components.merge_data_columns(kzg_verified_data_columns);
let block_import_requirement = self.block_import_requirement(epoch)?;
if pending_components.is_available(&block_import_requirement) {
write_lock.put(block_root, pending_components.clone());
// No need to hold the write lock anymore
drop(write_lock);
@@ -322,6 +436,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let mut write_lock = self.critical.write();
let block_root = executed_block.import_data.block_root;
let epoch = executed_block.block.epoch();
// register the block to get the diet block
let diet_executed_block = self
@@ -338,7 +453,8 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
pending_components.merge_block(diet_executed_block);
// Check if we have all components and entire set is consistent.
if pending_components.is_available() {
let block_import_requirement = self.block_import_requirement(epoch)?;
if pending_components.is_available(&block_import_requirement) {
write_lock.put(block_root, pending_components.clone());
// No need to hold the write lock anymore
drop(write_lock);
@@ -401,6 +517,7 @@ mod test {
use types::{ExecPayload, MinimalEthSpec};
const LOW_VALIDATOR_COUNT: usize = 32;
const DEFAULT_TEST_CUSTODY_COLUMN_COUNT: usize = 8;
fn get_store_with_spec<E: EthSpec>(
db_path: &TempDir,
@@ -588,8 +705,13 @@ mod test {
let test_store = harness.chain.store.clone();
let capacity_non_zero = new_non_zero_usize(capacity);
let cache = Arc::new(
DataAvailabilityCheckerInner::<T>::new(capacity_non_zero, test_store, spec.clone())
.expect("should create cache"),
DataAvailabilityCheckerInner::<T>::new(
capacity_non_zero,
test_store,
DEFAULT_TEST_CUSTODY_COLUMN_COUNT,
spec.clone(),
)
.expect("should create cache"),
);
(harness, cache, chain_db_path)
}
@@ -603,6 +725,7 @@ mod test {
let (pending_block, blobs) = availability_pending_block(&harness).await;
let root = pending_block.import_data.block_root;
let epoch = pending_block.block.epoch();
let blobs_expected = pending_block.num_blobs_expected();
assert_eq!(
@@ -651,7 +774,7 @@ mod test {
for (blob_index, gossip_blob) in blobs.into_iter().enumerate() {
kzg_verified_blobs.push(gossip_blob.into_inner());
let availability = cache
.put_kzg_verified_blobs(root, kzg_verified_blobs.clone())
.put_kzg_verified_blobs(root, epoch, kzg_verified_blobs.clone())
.expect("should put blob");
if blob_index == blobs_expected - 1 {
assert!(matches!(availability, Availability::Available(_)));
@@ -673,11 +796,12 @@ mod test {
"should have expected number of blobs"
);
let root = pending_block.import_data.block_root;
let epoch = pending_block.block.epoch();
let mut kzg_verified_blobs = vec![];
for gossip_blob in blobs {
kzg_verified_blobs.push(gossip_blob.into_inner());
let availability = cache
.put_kzg_verified_blobs(root, kzg_verified_blobs.clone())
.put_kzg_verified_blobs(root, epoch, kzg_verified_blobs.clone())
.expect("should put blob");
assert_eq!(
availability,

View File

@@ -0,0 +1,19 @@
use derivative::Derivative;
use ssz_derive::{Decode, Encode};
use std::sync::Arc;
use types::data_column_sidecar::{ColumnIndex, DataColumnSidecar};
use types::EthSpec;
/// Data column that we must custody and has completed kzg verification
#[derive(Debug, Derivative, Clone, Encode, Decode)]
#[derivative(PartialEq, Eq)]
#[ssz(struct_behaviour = "transparent")]
pub struct KzgVerifiedCustodyDataColumn<E: EthSpec> {
data: Arc<DataColumnSidecar<E>>,
}
impl<E: EthSpec> KzgVerifiedCustodyDataColumn<E> {
pub fn index(&self) -> ColumnIndex {
self.data.index
}
}

View File

@@ -19,6 +19,7 @@ pub mod canonical_head;
pub mod capella_readiness;
pub mod chain_config;
pub mod data_availability_checker;
mod data_column_verification;
pub mod deneb_readiness;
mod early_attester_cache;
pub mod electra_readiness;

View File

@@ -1,7 +1,7 @@
use crate::test_utils::TestRandom;
use crate::{
beacon_block_body::BLOB_KZG_COMMITMENTS_INDEX, BeaconBlockHeader, BeaconStateError, Blob,
EthSpec, FixedVector, Hash256, SignedBeaconBlockHeader, Slot, VariableList,
Epoch, EthSpec, FixedVector, Hash256, SignedBeaconBlockHeader, Slot, VariableList,
};
use crate::{KzgProofs, SignedBeaconBlock};
use bls::Signature;
@@ -160,6 +160,13 @@ impl<E: EthSpec> BlobSidecar<E> {
self.signed_block_header.message.slot
}
pub fn epoch(&self) -> Epoch {
self.signed_block_header
.message
.slot
.epoch(E::slots_per_epoch())
}
pub fn block_root(&self) -> Hash256 {
self.signed_block_header.message.tree_hash_root()
}