Import gossip data column into data availability checker (#6197)

* Import gossip data column into data availability checker
This commit is contained in:
Jimmy Chen
2024-08-02 22:42:11 +10:00
committed by GitHub
parent 0e96d4f105
commit acd3151184
8 changed files with 128 additions and 41 deletions

View File

@@ -2959,9 +2959,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>, self: &Arc<Self>,
data_columns: Vec<GossipVerifiedDataColumn<T>>, data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> { ) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let Ok(block_root) = data_columns let Ok((slot, block_root)) = data_columns
.iter() .iter()
.map(|c| c.block_root()) .map(|c| (c.slot(), c.block_root()))
.unique() .unique()
.exactly_one() .exactly_one()
else { else {
@@ -2981,7 +2981,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
} }
let r = self let r = self
.check_gossip_data_columns_availability_and_import(data_columns) .check_gossip_data_columns_availability_and_import(slot, block_root, data_columns)
.await; .await;
self.remove_notified_custody_columns(&block_root, r) self.remove_notified_custody_columns(&block_root, r)
} }
@@ -3298,6 +3298,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// if so, otherwise caches the data column in the data availability checker. /// if so, otherwise caches the data column in the data availability checker.
async fn check_gossip_data_columns_availability_and_import( async fn check_gossip_data_columns_availability_and_import(
self: &Arc<Self>, self: &Arc<Self>,
slot: Slot,
block_root: Hash256,
data_columns: Vec<GossipVerifiedDataColumn<T>>, data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> { ) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
if let Some(slasher) = self.slasher.as_ref() { if let Some(slasher) = self.slasher.as_ref() {
@@ -3306,15 +3308,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
} }
} }
let Ok(slot) = data_columns.iter().map(|c| c.slot()).unique().exactly_one() else { let availability = self.data_availability_checker.put_gossip_data_columns(
return Err(BlockError::InternalError( slot,
"Columns for the same block should have matching slot".to_string(), block_root,
)); data_columns,
}; )?;
let availability = self
.data_availability_checker
.put_gossip_data_columns(data_columns)?;
self.process_availability(slot, availability).await self.process_availability(slot, availability).await
} }
@@ -3629,7 +3627,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// If the write fails, revert fork choice to the version from disk, else we can // If the write fails, revert fork choice to the version from disk, else we can
// end up with blocks in fork choice that are missing from disk. // end up with blocks in fork choice that are missing from disk.
// See https://github.com/sigp/lighthouse/issues/2028 // See https://github.com/sigp/lighthouse/issues/2028
let (_, signed_block, blobs) = signed_block.deconstruct(); let (_, signed_block, blobs, data_columns) = signed_block.deconstruct();
let block = signed_block.message(); let block = signed_block.message();
ops.extend( ops.extend(
confirmed_state_roots confirmed_state_roots
@@ -3650,6 +3648,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
} }
} }
if let Some(_data_columns) = data_columns {
// TODO(das): depends on https://github.com/sigp/lighthouse/pull/6073
// if !data_columns.is_empty() {
// debug!(
// self.log, "Writing data_columns to store";
// "block_root" => %block_root,
// "count" => data_columns.len(),
// );
// ops.push(StoreOp::PutDataColumns(block_root, data_columns));
// }
}
let txn_lock = self.store.hot_db.begin_rw_transaction(); let txn_lock = self.store.hot_db.begin_rw_transaction();
if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) { if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) {

View File

@@ -517,7 +517,8 @@ impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {
} }
fn into_rpc_block(self) -> RpcBlock<E> { fn into_rpc_block(self) -> RpcBlock<E> {
let (block_root, block, blobs_opt) = self.deconstruct(); // TODO(das): rpc data columns to be merged from `das` branch
let (block_root, block, blobs_opt, _data_columns_opt) = self.deconstruct();
// Circumvent the constructor here, because an Available block will have already had // Circumvent the constructor here, because an Available block will have already had
// consistency checks performed. // consistency checks performed.
let inner = match blobs_opt { let inner = match blobs_opt {

View File

@@ -14,13 +14,16 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock}; use types::{
BlobSidecarList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, Hash256, SignedBeaconBlock,
Slot,
};
mod error; mod error;
mod overflow_lru_cache; mod overflow_lru_cache;
mod state_lru_cache; mod state_lru_cache;
use crate::data_column_verification::GossipVerifiedDataColumn; use crate::data_column_verification::{GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn};
pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory}; pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory};
use types::non_zero_usize::new_non_zero_usize; use types::non_zero_usize::new_non_zero_usize;
@@ -191,10 +194,18 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
pub fn put_gossip_data_columns( pub fn put_gossip_data_columns(
&self, &self,
_gossip_data_columns: Vec<GossipVerifiedDataColumn<T>>, slot: Slot,
block_root: Hash256,
gossip_data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> { ) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
// TODO(das) to be implemented let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
Err(AvailabilityCheckError::Unexpected) let custody_columns = gossip_data_columns
.into_iter()
.map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner()))
.collect::<Vec<_>>();
self.availability_cache
.put_kzg_verified_data_columns(block_root, epoch, custody_columns)
} }
/// Check if we have all the blobs for a block. Returns `Availability` which has information /// Check if we have all the blobs for a block. Returns `Availability` which has information
@@ -231,6 +242,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block_root, block_root,
block, block,
blobs: None, blobs: None,
data_columns: None,
blobs_available_timestamp: None, blobs_available_timestamp: None,
})) }))
} }
@@ -251,6 +263,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block_root, block_root,
block, block,
blobs: verified_blobs, blobs: verified_blobs,
data_columns: None,
blobs_available_timestamp: None, blobs_available_timestamp: None,
})) }))
} }
@@ -297,6 +310,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block_root, block_root,
block, block,
blobs: None, blobs: None,
data_columns: None,
blobs_available_timestamp: None, blobs_available_timestamp: None,
})) }))
} }
@@ -312,6 +326,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block_root, block_root,
block, block,
blobs: verified_blobs, blobs: verified_blobs,
data_columns: None,
blobs_available_timestamp: None, blobs_available_timestamp: None,
})) }))
} }
@@ -477,6 +492,7 @@ pub struct AvailableBlock<E: EthSpec> {
block_root: Hash256, block_root: Hash256,
block: Arc<SignedBeaconBlock<E>>, block: Arc<SignedBeaconBlock<E>>,
blobs: Option<BlobSidecarList<E>>, blobs: Option<BlobSidecarList<E>>,
data_columns: Option<DataColumnSidecarList<E>>,
/// Timestamp at which this block first became available (UNIX timestamp, time since 1970). /// Timestamp at which this block first became available (UNIX timestamp, time since 1970).
blobs_available_timestamp: Option<Duration>, blobs_available_timestamp: Option<Duration>,
} }
@@ -486,11 +502,13 @@ impl<E: EthSpec> AvailableBlock<E> {
block_root: Hash256, block_root: Hash256,
block: Arc<SignedBeaconBlock<E>>, block: Arc<SignedBeaconBlock<E>>,
blobs: Option<BlobSidecarList<E>>, blobs: Option<BlobSidecarList<E>>,
data_columns: Option<DataColumnSidecarList<E>>,
) -> Self { ) -> Self {
Self { Self {
block_root, block_root,
block, block,
blobs, blobs,
data_columns,
blobs_available_timestamp: None, blobs_available_timestamp: None,
} }
} }
@@ -510,20 +528,23 @@ impl<E: EthSpec> AvailableBlock<E> {
self.blobs_available_timestamp self.blobs_available_timestamp
} }
#[allow(clippy::type_complexity)]
pub fn deconstruct( pub fn deconstruct(
self, self,
) -> ( ) -> (
Hash256, Hash256,
Arc<SignedBeaconBlock<E>>, Arc<SignedBeaconBlock<E>>,
Option<BlobSidecarList<E>>, Option<BlobSidecarList<E>>,
Option<DataColumnSidecarList<E>>,
) { ) {
let AvailableBlock { let AvailableBlock {
block_root, block_root,
block, block,
blobs, blobs,
data_columns,
blobs_available_timestamp: _, blobs_available_timestamp: _,
} = self; } = self;
(block_root, block, blobs) (block_root, block, blobs, data_columns)
} }
} }

View File

@@ -217,7 +217,11 @@ impl<E: EthSpec> PendingComponents<E> {
/// ///
/// WARNING: This function can potentially take a lot of time if the state needs to be /// WARNING: This function can potentially take a lot of time if the state needs to be
/// reconstructed from disk. Ensure you are not holding any write locks while calling this. /// reconstructed from disk. Ensure you are not holding any write locks while calling this.
pub fn make_available<R>(self, recover: R) -> Result<Availability<E>, AvailabilityCheckError> pub fn make_available<R>(
self,
block_import_requirement: BlockImportRequirement,
recover: R,
) -> Result<Availability<E>, AvailabilityCheckError>
where where
R: FnOnce( R: FnOnce(
DietAvailabilityPendingExecutedBlock<E>, DietAvailabilityPendingExecutedBlock<E>,
@@ -226,7 +230,7 @@ impl<E: EthSpec> PendingComponents<E> {
let Self { let Self {
block_root, block_root,
verified_blobs, verified_blobs,
verified_data_columns: _, verified_data_columns,
executed_block, executed_block,
} = self; } = self;
@@ -239,6 +243,9 @@ impl<E: EthSpec> PendingComponents<E> {
let Some(diet_executed_block) = executed_block else { let Some(diet_executed_block) = executed_block else {
return Err(AvailabilityCheckError::Unexpected); return Err(AvailabilityCheckError::Unexpected);
}; };
let (blobs, data_columns) = match block_import_requirement {
BlockImportRequirement::AllBlobs => {
let num_blobs_expected = diet_executed_block.num_blobs_expected(); let num_blobs_expected = diet_executed_block.num_blobs_expected();
let Some(verified_blobs) = verified_blobs let Some(verified_blobs) = verified_blobs
.into_iter() .into_iter()
@@ -249,7 +256,16 @@ impl<E: EthSpec> PendingComponents<E> {
else { else {
return Err(AvailabilityCheckError::Unexpected); return Err(AvailabilityCheckError::Unexpected);
}; };
let verified_blobs = VariableList::new(verified_blobs)?; (Some(VariableList::new(verified_blobs)?), None)
}
BlockImportRequirement::CustodyColumns(_) => {
let verified_data_columns = verified_data_columns
.into_iter()
.map(|d| d.into_inner())
.collect();
(None, Some(verified_data_columns))
}
};
let executed_block = recover(diet_executed_block)?; let executed_block = recover(diet_executed_block)?;
@@ -262,7 +278,8 @@ impl<E: EthSpec> PendingComponents<E> {
let available_block = AvailableBlock { let available_block = AvailableBlock {
block_root, block_root,
block, block,
blobs: Some(verified_blobs), blobs,
data_columns,
blobs_available_timestamp, blobs_available_timestamp,
}; };
Ok(Availability::Available(Box::new( Ok(Availability::Available(Box::new(
@@ -404,7 +421,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
write_lock.put(block_root, pending_components.clone()); write_lock.put(block_root, pending_components.clone());
// No need to hold the write lock anymore // No need to hold the write lock anymore
drop(write_lock); drop(write_lock);
pending_components.make_available(|diet_block| { pending_components.make_available(block_import_requirement, |diet_block| {
self.state_cache.recover_pending_executed_block(diet_block) self.state_cache.recover_pending_executed_block(diet_block)
}) })
} else { } else {
@@ -413,7 +430,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
} }
} }
// TODO(das): gossip and rpc code paths to be implemented. // TODO(das): rpc code paths to be implemented.
#[allow(dead_code)] #[allow(dead_code)]
pub fn put_kzg_verified_data_columns< pub fn put_kzg_verified_data_columns<
I: IntoIterator<Item = KzgVerifiedCustodyDataColumn<T::EthSpec>>, I: IntoIterator<Item = KzgVerifiedCustodyDataColumn<T::EthSpec>>,
@@ -439,7 +456,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
write_lock.put(block_root, pending_components.clone()); write_lock.put(block_root, pending_components.clone());
// No need to hold the write lock anymore // No need to hold the write lock anymore
drop(write_lock); drop(write_lock);
pending_components.make_available(|diet_block| { pending_components.make_available(block_import_requirement, |diet_block| {
self.state_cache.recover_pending_executed_block(diet_block) self.state_cache.recover_pending_executed_block(diet_block)
}) })
} else { } else {
@@ -478,7 +495,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
write_lock.put(block_root, pending_components.clone()); write_lock.put(block_root, pending_components.clone());
// No need to hold the write lock anymore // No need to hold the write lock anymore
drop(write_lock); drop(write_lock);
pending_components.make_available(|diet_block| { pending_components.make_available(block_import_requirement, |diet_block| {
self.state_cache.recover_pending_executed_block(diet_block) self.state_cache.recover_pending_executed_block(diet_block)
}) })
} else { } else {

View File

@@ -190,6 +190,10 @@ impl<T: BeaconChainTypes> GossipVerifiedDataColumn<T> {
pub fn signed_block_header(&self) -> SignedBeaconBlockHeader { pub fn signed_block_header(&self) -> SignedBeaconBlockHeader {
self.data_column.data.signed_block_header.clone() self.data_column.data.signed_block_header.clone()
} }
pub fn into_inner(self) -> KzgVerifiedDataColumn<T::EthSpec> {
self.data_column
}
} }
/// Wrapper over a `DataColumnSidecar` for which we have completed kzg verification. /// Wrapper over a `DataColumnSidecar` for which we have completed kzg verification.
@@ -204,6 +208,9 @@ impl<E: EthSpec> KzgVerifiedDataColumn<E> {
pub fn new(data_column: Arc<DataColumnSidecar<E>>, kzg: &Kzg) -> Result<Self, KzgError> { pub fn new(data_column: Arc<DataColumnSidecar<E>>, kzg: &Kzg) -> Result<Self, KzgError> {
verify_kzg_for_data_column(data_column, kzg) verify_kzg_for_data_column(data_column, kzg)
} }
pub fn to_data_column(self) -> Arc<DataColumnSidecar<E>> {
self.data
}
pub fn as_data_column(&self) -> &DataColumnSidecar<E> { pub fn as_data_column(&self) -> &DataColumnSidecar<E> {
&self.data &self.data
} }
@@ -226,9 +233,21 @@ pub struct KzgVerifiedCustodyDataColumn<E: EthSpec> {
} }
impl<E: EthSpec> KzgVerifiedCustodyDataColumn<E> { impl<E: EthSpec> KzgVerifiedCustodyDataColumn<E> {
/// Mark a column as custody column. Caller must ensure that our current custody requirements
/// include this column
pub fn from_asserted_custody(kzg_verified: KzgVerifiedDataColumn<E>) -> Self {
Self {
data: kzg_verified.to_data_column(),
}
}
pub fn index(&self) -> ColumnIndex { pub fn index(&self) -> ColumnIndex {
self.data.index self.data.index
} }
pub fn into_inner(self) -> Arc<DataColumnSidecar<E>> {
self.data
}
} }
/// Complete kzg verification for a `DataColumnSidecar`. /// Complete kzg verification for a `DataColumnSidecar`.

View File

@@ -22,6 +22,7 @@ pub struct CacheItem<E: EthSpec> {
*/ */
block: Arc<SignedBeaconBlock<E>>, block: Arc<SignedBeaconBlock<E>>,
blobs: Option<BlobSidecarList<E>>, blobs: Option<BlobSidecarList<E>>,
data_columns: Option<DataColumnSidecarList<E>>,
proto_block: ProtoBlock, proto_block: ProtoBlock,
} }
@@ -69,7 +70,7 @@ impl<E: EthSpec> EarlyAttesterCache<E> {
}, },
}; };
let (_, block, blobs) = block.deconstruct(); let (_, block, blobs, data_columns) = block.deconstruct();
let item = CacheItem { let item = CacheItem {
epoch, epoch,
committee_lengths, committee_lengths,
@@ -78,6 +79,7 @@ impl<E: EthSpec> EarlyAttesterCache<E> {
target, target,
block, block,
blobs, blobs,
data_columns,
proto_block, proto_block,
}; };
@@ -164,6 +166,15 @@ impl<E: EthSpec> EarlyAttesterCache<E> {
.and_then(|item| item.blobs.clone()) .and_then(|item| item.blobs.clone())
} }
/// Returns the data columns, if `block_root` matches the cached item.
pub fn get_data_columns(&self, block_root: Hash256) -> Option<DataColumnSidecarList<E>> {
self.item
.read()
.as_ref()
.filter(|item| item.beacon_block_root == block_root)
.and_then(|item| item.data_columns.clone())
}
/// Returns the proto-array block, if `block_root` matches the cached item. /// Returns the proto-array block, if `block_root` matches the cached item.
pub fn get_proto_block(&self, block_root: Hash256) -> Option<ProtoBlock> { pub fn get_proto_block(&self, block_root: Hash256) -> Option<ProtoBlock> {
self.item self.item

View File

@@ -107,7 +107,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let mut signed_blocks = Vec::with_capacity(blocks_to_import.len()); let mut signed_blocks = Vec::with_capacity(blocks_to_import.len());
for available_block in blocks_to_import.into_iter().rev() { for available_block in blocks_to_import.into_iter().rev() {
let (block_root, block, maybe_blobs) = available_block.deconstruct(); let (block_root, block, maybe_blobs, maybe_data_columns) =
available_block.deconstruct();
if block_root != expected_block_root { if block_root != expected_block_root {
return Err(HistoricalBlockError::MismatchedBlockRoot { return Err(HistoricalBlockError::MismatchedBlockRoot {
@@ -127,6 +128,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.store self.store
.blobs_as_kv_store_ops(&block_root, blobs, &mut blob_batch); .blobs_as_kv_store_ops(&block_root, blobs, &mut blob_batch);
} }
// Store the data columns too
if let Some(_data_columns) = maybe_data_columns {
// TODO(das): depends on https://github.com/sigp/lighthouse/pull/6073
// new_oldest_data_column_slot = Some(block.slot());
// self.store
// .data_columns_as_kv_store_ops(&block_root, data_columns, &mut blob_batch);
}
// Store block roots, including at all skip slots in the freezer DB. // Store block roots, including at all skip slots in the freezer DB.
for slot in (block.slot().as_usize()..prev_block_slot.as_usize()).rev() { for slot in (block.slot().as_usize()..prev_block_slot.as_usize()).rev() {

View File

@@ -2542,10 +2542,10 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
// signatures correctly. Regression test for https://github.com/sigp/lighthouse/pull/5120. // signatures correctly. Regression test for https://github.com/sigp/lighthouse/pull/5120.
let mut batch_with_invalid_first_block = available_blocks.clone(); let mut batch_with_invalid_first_block = available_blocks.clone();
batch_with_invalid_first_block[0] = { batch_with_invalid_first_block[0] = {
let (block_root, block, blobs) = available_blocks[0].clone().deconstruct(); let (block_root, block, blobs, data_columns) = available_blocks[0].clone().deconstruct();
let mut corrupt_block = (*block).clone(); let mut corrupt_block = (*block).clone();
*corrupt_block.signature_mut() = Signature::empty(); *corrupt_block.signature_mut() = Signature::empty();
AvailableBlock::__new_for_testing(block_root, Arc::new(corrupt_block), blobs) AvailableBlock::__new_for_testing(block_root, Arc::new(corrupt_block), blobs, data_columns)
}; };
// Importing the invalid batch should error. // Importing the invalid batch should error.