Add Gloas data column support (#8682)

Co-Authored-By: Eitan Seri-Levi <eserilev@ucsc.edu>

Co-Authored-By: Eitan Seri- Levi <eserilev@gmail.com>
This commit is contained in:
Eitan Seri-Levi
2026-01-27 20:52:12 -08:00
committed by GitHub
parent 0f57fc9d8e
commit 9bec8df37a
44 changed files with 1507 additions and 680 deletions

View File

@@ -412,9 +412,10 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// Maintains a record of which validators have proposed blocks for each slot.
pub observed_block_producers: RwLock<ObservedBlockProducers<T::EthSpec>>,
/// Maintains a record of blob sidecars seen over the gossip network.
pub observed_blob_sidecars: RwLock<ObservedDataSidecars<BlobSidecar<T::EthSpec>>>,
pub observed_blob_sidecars: RwLock<ObservedDataSidecars<BlobSidecar<T::EthSpec>, T::EthSpec>>,
/// Maintains a record of column sidecars seen over the gossip network.
pub observed_column_sidecars: RwLock<ObservedDataSidecars<DataColumnSidecar<T::EthSpec>>>,
pub observed_column_sidecars:
RwLock<ObservedDataSidecars<DataColumnSidecar<T::EthSpec>, T::EthSpec>>,
/// Maintains a record of slashable message seen over the gossip network or RPC.
pub observed_slashable: RwLock<ObservedSlashable<T::EthSpec>>,
/// Maintains a record of which validators have submitted voluntary exits.
@@ -1130,13 +1131,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.or_else(|| self.early_attester_cache.get_data_columns(block_root));
if let Some(mut all_cached_columns) = all_cached_columns_opt {
all_cached_columns.retain(|col| indices.contains(&col.index));
all_cached_columns.retain(|col| indices.contains(col.index()));
Ok(all_cached_columns)
} else {
} else if let Some(block) = self.get_blinded_block(&block_root)? {
indices
.iter()
.filter_map(|index| self.get_data_column(&block_root, index).transpose())
.filter_map(|index| {
self.get_data_column(&block_root, index, block.fork_name_unchecked())
.transpose()
})
.collect::<Result<_, _>>()
} else {
Ok(vec![])
}
}
@@ -1221,8 +1227,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn get_data_columns(
&self,
block_root: &Hash256,
fork_name: ForkName,
) -> Result<Option<DataColumnSidecarList<T::EthSpec>>, Error> {
self.store.get_data_columns(block_root).map_err(Error::from)
self.store
.get_data_columns(block_root, fork_name)
.map_err(Error::from)
}
/// Returns the blobs at the given root, if any.
@@ -1243,7 +1252,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
};
if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) {
if let Some(columns) = self.store.get_data_columns(block_root)? {
let fork_name = self.spec.fork_name_at_epoch(block.epoch());
if let Some(columns) = self.store.get_data_columns(block_root, fork_name)? {
let num_required_columns = T::EthSpec::number_of_columns() / 2;
let reconstruction_possible = columns.len() >= num_required_columns;
if reconstruction_possible {
@@ -1259,7 +1269,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(None)
}
} else {
self.get_blobs(block_root).map(|b| b.blobs())
Ok(self.get_blobs(block_root)?.blobs())
}
}
@@ -1271,8 +1281,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
block_root: &Hash256,
column_index: &ColumnIndex,
fork_name: ForkName,
) -> Result<Option<Arc<DataColumnSidecar<T::EthSpec>>>, Error> {
Ok(self.store.get_data_column(block_root, column_index)?)
Ok(self
.store
.get_data_column(block_root, column_index, fork_name)?)
}
pub fn get_blinded_block(
@@ -3182,7 +3195,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.cached_data_column_indexes(block_root)
.unwrap_or_default();
let new_data_columns =
data_columns_iter.filter(|b| !imported_data_columns.contains(&b.index));
data_columns_iter.filter(|b| !imported_data_columns.contains(b.index()));
for data_column in new_data_columns {
event_handler.register(EventKind::DataColumnSidecar(
@@ -3194,6 +3207,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Cache the columns in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
// TODO(gloas) we need a separate code path for gloas. See TODO's below.
pub async fn process_rpc_custody_columns(
self: &Arc<Self>,
custody_columns: DataColumnSidecarList<T::EthSpec>,
@@ -3211,6 +3225,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its columns again.
// TODO(gloas) the block will be available in fork choice for gloas. This does not indicate availability
// anymore.
if self
.canonical_head
.fork_choice_read_lock()
@@ -3222,7 +3238,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Reject RPC columns referencing unknown parents. Otherwise we allow potentially invalid data
// into the da_checker, where invalid = descendant of invalid blocks.
// Note: custody_columns should have at least one item and all items have the same parent root.
if let Some(parent_root) = custody_columns.iter().map(|c| c.block_parent_root()).next()
// TODO(gloas) ensure this check is no longer relevant post gloas
if let Some(parent_root) = custody_columns
.iter()
.filter_map(|c| match c.as_ref() {
DataColumnSidecar::Fulu(column) => Some(column.block_parent_root()),
_ => None,
})
.next()
&& !self
.canonical_head
.fork_choice_read_lock()
@@ -3542,8 +3565,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
if let Some(slasher) = self.slasher.as_ref() {
for data_colum in &data_columns {
slasher.accept_block_header(data_colum.signed_block_header());
for data_column in &data_columns {
// TODO(gloas) different gossip checks in gloas
// https://github.com/ethereum/consensus-specs/blob/81458afc6aad6985c533785c8d2860d87a993241/specs/gloas/p2p-interface.md?plain=1#L385
if let DataColumnSidecar::Fulu(c) = data_column.as_data_column() {
slasher.accept_block_header(c.signed_block_header.clone());
}
}
}
@@ -3621,9 +3648,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.put_kzg_verified_blobs(block_root, blobs)?
}
EngineGetBlobsOutput::CustodyColumns(data_columns) => {
// TODO(gloas) verify that this check is no longer relevant for gloas
self.check_data_column_sidecar_header_signature_and_slashability(
block_root,
data_columns.iter().map(|c| c.as_data_column()),
data_columns
.iter()
.filter_map(|c| match c.as_data_column() {
DataColumnSidecar::Fulu(column) => Some(column),
_ => None,
}),
)?;
self.data_availability_checker
.put_kzg_verified_custody_data_columns(block_root, data_columns)?
@@ -3642,9 +3675,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_root: Hash256,
custody_columns: DataColumnSidecarList<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
// TODO(gloas) ensure that this check is no longer relevant post gloas
self.check_data_column_sidecar_header_signature_and_slashability(
block_root,
custody_columns.iter().map(|c| c.as_ref()),
custody_columns.iter().filter_map(|c| match c.as_ref() {
DataColumnSidecar::Fulu(fulu) => Some(fulu),
_ => None,
}),
)?;
// This slot value is purely informative for the consumers of
@@ -3662,7 +3699,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
fn check_data_column_sidecar_header_signature_and_slashability<'a>(
self: &Arc<Self>,
block_root: Hash256,
custody_columns: impl IntoIterator<Item = &'a DataColumnSidecar<T::EthSpec>>,
custody_columns: impl IntoIterator<Item = &'a DataColumnSidecarFulu<T::EthSpec>>,
) -> Result<(), BlockError> {
let mut slashable_cache = self.observed_slashable.write();
// Process all unique block headers - previous logic assumed all headers were identical and
@@ -7365,7 +7402,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Supernodes need to persist all sampled custody columns
if columns_to_custody.len() != self.spec.number_of_custody_groups as usize {
data_columns
.retain(|data_column| columns_to_custody.contains(&data_column.index));
.retain(|data_column| columns_to_custody.contains(data_column.index()));
}
debug!(
%block_root,
@@ -7378,7 +7415,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
/// Retrieves block roots (in ascending slot order) within some slot range from fork choice.
pub fn block_roots_from_fork_choice(&self, start_slot: u64, count: u64) -> Vec<Hash256> {
pub fn block_roots_from_fork_choice(
&self,
start_slot: u64,
count: u64,
) -> Vec<(Hash256, Slot)> {
let head_block_root = self.canonical_head.cached_head().head_block_root();
let fork_choice_read_lock = self.canonical_head.fork_choice_read_lock();
let block_roots_iter = fork_choice_read_lock
@@ -7389,7 +7430,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
for (root, slot) in block_roots_iter {
if slot < end_slot && slot >= start_slot {
roots.push(root);
roots.push((root, slot));
}
if slot < start_slot {
break;

View File

@@ -8,7 +8,9 @@ use crate::block_verification::{
BlockSlashInfo, get_validator_pubkey_cache, process_block_slash_info,
};
use crate::kzg_utils::{validate_blob, validate_blobs};
use crate::observed_data_sidecars::{ObservationStrategy, Observe};
use crate::observed_data_sidecars::{
Error as ObservedDataSidecarsError, ObservationStrategy, Observe,
};
use crate::{BeaconChainError, metrics};
use kzg::{Error as KzgError, Kzg, KzgCommitment};
use ssz_derive::{Decode, Encode};
@@ -451,8 +453,9 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes, O: ObservationStrat
if chain
.observed_blob_sidecars
.read()
.proposer_is_known(&blob_sidecar)
.observation_key_is_known(&blob_sidecar)
.map_err(|e| GossipBlobError::BeaconChainError(Box::new(e.into())))?
.is_some()
{
return Err(GossipBlobError::RepeatBlob {
proposer: blob_proposer_index,
@@ -593,7 +596,10 @@ pub fn observe_gossip_blob<T: BeaconChainTypes>(
.observed_blob_sidecars
.write()
.observe_sidecar(blob_sidecar)
.map_err(|e| GossipBlobError::BeaconChainError(Box::new(e.into())))?
.map_err(|e: ObservedDataSidecarsError| {
GossipBlobError::BeaconChainError(Box::new(e.into()))
})?
.is_some()
{
return Err(GossipBlobError::RepeatBlob {
proposer: blob_sidecar.block_proposer_index(),

View File

@@ -9,7 +9,7 @@ use crate::data_availability_checker::DataAvailabilityChecker;
use crate::fork_choice_signal::ForkChoiceSignalTx;
use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary};
use crate::graffiti_calculator::{GraffitiCalculator, GraffitiOrigin};
use crate::kzg_utils::build_data_column_sidecars;
use crate::kzg_utils::{build_data_column_sidecars_fulu, build_data_column_sidecars_gloas};
use crate::light_client_server_cache::LightClientServerCache;
use crate::migrate::{BackgroundMigrator, MigratorConfig};
use crate::observed_data_sidecars::ObservedDataSidecars;
@@ -42,6 +42,7 @@ use std::time::Duration;
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
use task_executor::{ShutdownReason, TaskExecutor};
use tracing::{debug, error, info};
use tree_hash::TreeHash;
use types::data::CustodyIndex;
use types::{
BeaconBlock, BeaconState, BlobSidecarList, ChainSpec, ColumnIndex, DataColumnSidecarList,
@@ -1213,17 +1214,30 @@ fn build_data_columns_from_blobs<E: EthSpec>(
.blob_kzg_commitments()
.cloned()
.map_err(|e| format!("Unexpected pre Deneb block: {e:?}"))?;
let kzg_commitments_inclusion_proof = beacon_block_body
.kzg_commitments_merkle_proof()
.map_err(|e| format!("Failed to compute kzg commitments merkle proof: {e:?}"))?;
build_data_column_sidecars(
kzg_commitments,
kzg_commitments_inclusion_proof,
block.signed_block_header(),
blob_cells_and_proofs_vec,
spec,
)
.map_err(|e| format!("Failed to compute weak subjectivity data_columns: {e:?}"))?
if block.fork_name_unchecked().gloas_enabled() {
build_data_column_sidecars_gloas(
kzg_commitments,
block.message().tree_hash_root(),
block.slot(),
blob_cells_and_proofs_vec,
spec,
)
.map_err(|e| format!("Failed to compute weak subjectivity data_columns: {e:?}"))?
} else {
let kzg_commitments_inclusion_proof = beacon_block_body
.kzg_commitments_merkle_proof()
.map_err(|e| format!("Failed to compute kzg commitments merkle proof: {e:?}"))?;
build_data_column_sidecars_fulu(
kzg_commitments,
kzg_commitments_inclusion_proof,
block.signed_block_header(),
blob_cells_and_proofs_vec,
spec,
)
.map_err(|e| format!("Failed to compute weak subjectivity data_columns: {e:?}"))?
}
};
Ok(data_columns)
}

View File

@@ -19,10 +19,10 @@ use std::sync::Arc;
use std::time::Duration;
use task_executor::TaskExecutor;
use tracing::{debug, error, instrument};
use types::data::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::data::{BlobIdentifier, FixedBlobSidecarList};
use types::{
BlobSidecarList, BlockImportSource, ChainSpec, DataColumnSidecar, DataColumnSidecarList, Epoch,
EthSpec, Hash256, SignedBeaconBlock, Slot,
BlobSidecar, BlobSidecarList, BlockImportSource, ChainSpec, DataColumnSidecar,
DataColumnSidecarList, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot,
};
mod error;
@@ -187,7 +187,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
self.availability_cache
.peek_pending_components(block_root, |components| {
components.is_some_and(|components| {
let cached_column_opt = components.get_cached_data_column(data_column.index);
let cached_column_opt = components.get_cached_data_column(*data_column.index());
cached_column_opt.is_some_and(|cached| *cached == *data_column)
})
})
@@ -877,7 +877,9 @@ mod test {
use std::time::Duration;
use store::HotColdDB;
use types::data::DataColumn;
use types::{ChainSpec, ColumnIndex, EthSpec, ForkName, MainnetEthSpec, Slot};
use types::{
ChainSpec, ColumnIndex, DataColumnSidecarFulu, EthSpec, ForkName, MainnetEthSpec, Slot,
};
type E = MainnetEthSpec;
type T = EphemeralHarnessType<E>;
@@ -932,7 +934,7 @@ mod test {
cgc_change_slot,
data_columns
.into_iter()
.filter(|d| requested_columns.contains(&d.index))
.filter(|d| requested_columns.contains(d.index()))
.collect(),
)
.expect("should put rpc custody columns");
@@ -1007,7 +1009,7 @@ mod test {
let requested_columns = &custody_columns[..10];
let gossip_columns = data_columns
.into_iter()
.filter(|d| requested_columns.contains(&d.index))
.filter(|d| requested_columns.contains(d.index()))
.map(GossipVerifiedDataColumn::<T>::__new_for_testing)
.collect::<Vec<_>>();
da_checker
@@ -1039,7 +1041,7 @@ mod test {
/// Regression test for KZG verification truncation bug (https://github.com/sigp/lighthouse/pull/7927)
#[test]
fn verify_kzg_for_rpc_blocks_should_not_truncate_data_columns() {
fn verify_kzg_for_rpc_blocks_should_not_truncate_data_columns_fulu() {
let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec()));
let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64);
let da_checker = new_da_checker(spec.clone());
@@ -1065,10 +1067,17 @@ mod test {
data_columns
.into_iter()
.map(|d| {
let invalid_sidecar = DataColumnSidecar {
let invalid_sidecar = DataColumnSidecar::Fulu(DataColumnSidecarFulu {
column: DataColumn::<E>::empty(),
..d.as_ref().clone()
};
index: *d.index(),
kzg_commitments: d.kzg_commitments().clone(),
kzg_proofs: d.kzg_proofs().clone(),
signed_block_header: d.signed_block_header().unwrap().clone(),
kzg_commitments_inclusion_proof: d
.kzg_commitments_inclusion_proof()
.unwrap()
.clone(),
});
CustodyDataColumn::from_asserted_custody(Arc::new(invalid_sidecar))
})
.collect::<Vec<_>>()
@@ -1126,7 +1135,7 @@ mod test {
let custody_columns = custody_context.custody_columns_for_epoch(None, &spec);
let custody_columns = custody_columns
.iter()
.filter_map(|&col_idx| data_columns.iter().find(|d| d.index == col_idx).cloned())
.filter_map(|&col_idx| data_columns.iter().find(|d| *d.index() == col_idx).cloned())
.take(64)
.map(|d| {
KzgVerifiedCustodyDataColumn::from_asserted_custody(

View File

@@ -2,14 +2,16 @@ use crate::block_verification::{
BlockSlashInfo, get_validator_pubkey_cache, process_block_slash_info,
};
use crate::kzg_utils::{reconstruct_data_columns, validate_data_columns};
use crate::observed_data_sidecars::{ObservationStrategy, Observe};
use crate::observed_data_sidecars::{
Error as ObservedDataSidecarsError, ObservationKey, ObservationStrategy, Observe,
};
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes, metrics};
use educe::Educe;
use fork_choice::ProtoBlock;
use kzg::{Error as KzgError, Kzg};
use proto_array::Block;
use slot_clock::SlotClock;
use ssz_derive::{Decode, Encode};
use ssz_derive::Encode;
use ssz_types::VariableList;
use std::iter;
use std::marker::PhantomData;
@@ -17,13 +19,14 @@ use std::sync::Arc;
use tracing::{debug, instrument};
use types::data::ColumnIndex;
use types::{
BeaconStateError, ChainSpec, DataColumnSidecar, DataColumnSubnetId, EthSpec, Hash256,
SignedBeaconBlockHeader, Slot,
BeaconStateError, ChainSpec, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSubnetId,
EthSpec, Hash256, Slot,
};
/// An error occurred while validating a gossip data column.
#[derive(Debug)]
pub enum GossipDataColumnError {
InvalidVariant,
/// There was an error whilst processing the data column. It is not known if it is
/// valid or invalid.
///
@@ -64,7 +67,10 @@ pub enum GossipDataColumnError {
/// ## Peer scoring
///
/// The column is invalid or the peer is faulty.
InvalidSubnetId { received: u64, expected: u64 },
InvalidSubnetId {
received: u64,
expected: u64,
},
/// The column sidecar is from a slot that is later than the current slot (with respect to the
/// gossip clock disparity).
///
@@ -97,35 +103,40 @@ pub enum GossipDataColumnError {
/// ## Peer scoring
///
/// The column is invalid and the peer is faulty.
ProposerIndexMismatch { sidecar: usize, local: usize },
ProposerIndexMismatch {
sidecar: usize,
local: usize,
},
/// The provided columns's parent block is unknown.
///
/// ## Peer scoring
///
/// We cannot process the columns without validating its parent, the peer isn't necessarily faulty.
ParentUnknown { parent_root: Hash256 },
ParentUnknown {
parent_root: Hash256,
},
/// The column conflicts with finalization, no need to propagate.
///
/// ## Peer scoring
///
/// It's unclear if this column is valid, but it conflicts with finality and shouldn't be
/// imported.
NotFinalizedDescendant { block_parent_root: Hash256 },
NotFinalizedDescendant {
block_parent_root: Hash256,
},
/// Invalid kzg commitment inclusion proof
///
/// ## Peer scoring
///
/// The column sidecar is invalid and the peer is faulty
InvalidInclusionProof,
/// A column has already been seen for the given `(sidecar.block_root, sidecar.index)` tuple
/// over gossip or no gossip sources.
/// A column has already been seen for the given observation key and index.
///
/// ## Peer scoring
///
/// The peer isn't faulty, but we do not forward it over gossip.
PriorKnown {
proposer: u64,
slot: Slot,
observation_key: ObservationKey,
index: ColumnIndex,
},
/// A column has already been processed from non-gossip source and have not yet been seen on
@@ -160,7 +171,10 @@ pub enum GossipDataColumnError {
/// ## Peer scoring
///
/// The column sidecar is invalid and the peer is faulty
InconsistentProofsLength { cells_len: usize, proofs_len: usize },
InconsistentProofsLength {
cells_len: usize,
proofs_len: usize,
},
/// The number of KZG commitments exceeds the maximum number of blobs allowed for the fork. The
/// sidecar is invalid.
///
@@ -209,17 +223,26 @@ impl<T: BeaconChainTypes, O: ObservationStrategy> GossipVerifiedDataColumn<T, O>
subnet_id: DataColumnSubnetId,
chain: &BeaconChain<T>,
) -> Result<Self, GossipDataColumnError> {
let header = column_sidecar.signed_block_header.clone();
// We only process slashing info if the gossip verification failed
// since we do not process the data column any further in that case.
validate_data_column_sidecar_for_gossip::<T, O>(column_sidecar, subnet_id, chain).map_err(
|e| {
process_block_slash_info::<_, GossipDataColumnError>(
match column_sidecar.as_ref() {
DataColumnSidecar::Fulu(c) => {
let header = c.signed_block_header.clone();
// We only process slashing info if the gossip verification failed
// since we do not process the data column any further in that case.
validate_data_column_sidecar_for_gossip_fulu::<T, O>(
column_sidecar,
subnet_id,
chain,
BlockSlashInfo::from_early_error_data_column(header, e),
)
},
)
.map_err(|e| {
process_block_slash_info::<_, GossipDataColumnError>(
chain,
BlockSlashInfo::from_early_error_data_column(header, e),
)
})
}
// TODO(gloas) support gloas data column variant
DataColumnSidecar::Gloas(_) => Err(GossipDataColumnError::InvalidVariant),
}
}
/// Create a `GossipVerifiedDataColumn` from `DataColumnSidecar` for block production ONLY.
@@ -283,11 +306,7 @@ impl<T: BeaconChainTypes, O: ObservationStrategy> GossipVerifiedDataColumn<T, O>
}
pub fn index(&self) -> ColumnIndex {
self.data_column.data.index
}
pub fn signed_block_header(&self) -> SignedBeaconBlockHeader {
self.data_column.data.signed_block_header.clone()
*self.data_column.data.index()
}
pub fn into_inner(self) -> KzgVerifiedDataColumn<T::EthSpec> {
@@ -296,7 +315,7 @@ impl<T: BeaconChainTypes, O: ObservationStrategy> GossipVerifiedDataColumn<T, O>
}
/// Wrapper over a `DataColumnSidecar` for which we have completed kzg verification.
#[derive(Debug, Educe, Clone, Encode, Decode)]
#[derive(Debug, Educe, Clone, Encode)]
#[educe(PartialEq, Eq)]
#[ssz(struct_behaviour = "transparent")]
pub struct KzgVerifiedDataColumn<E: EthSpec> {
@@ -345,7 +364,7 @@ impl<E: EthSpec> KzgVerifiedDataColumn<E> {
}
pub fn index(&self) -> ColumnIndex {
self.data.index
*self.data.index()
}
}
@@ -353,7 +372,7 @@ pub type CustodyDataColumnList<E> =
VariableList<CustodyDataColumn<E>, <E as EthSpec>::NumberOfColumns>;
/// Data column that we must custody
#[derive(Debug, Educe, Clone, Encode, Decode)]
#[derive(Debug, Educe, Clone, Encode)]
#[educe(PartialEq, Eq, Hash(bound(E: EthSpec)))]
#[ssz(struct_behaviour = "transparent")]
pub struct CustodyDataColumn<E: EthSpec> {
@@ -378,12 +397,12 @@ impl<E: EthSpec> CustodyDataColumn<E> {
self.data.clone()
}
pub fn index(&self) -> u64 {
self.data.index
*self.data.index()
}
}
/// Data column that we must custody and has completed kzg verification
#[derive(Debug, Educe, Clone, Encode, Decode)]
#[derive(Debug, Educe, Clone, Encode)]
#[educe(PartialEq, Eq)]
#[ssz(struct_behaviour = "transparent")]
pub struct KzgVerifiedCustodyDataColumn<E: EthSpec> {
@@ -443,7 +462,7 @@ impl<E: EthSpec> KzgVerifiedCustodyDataColumn<E> {
self.data.clone()
}
pub fn index(&self) -> ColumnIndex {
self.data.index
*self.data.index()
}
}
@@ -477,12 +496,21 @@ where
Ok(())
}
#[instrument(skip_all, level = "debug")]
pub fn validate_data_column_sidecar_for_gossip<T: BeaconChainTypes, O: ObservationStrategy>(
// TODO(gloas) make sure the gloas variant uses the same span name
#[instrument(
skip_all,
name = "validate_data_column_sidecar_for_gossip",
level = "debug"
)]
pub fn validate_data_column_sidecar_for_gossip_fulu<T: BeaconChainTypes, O: ObservationStrategy>(
data_column: Arc<DataColumnSidecar<T::EthSpec>>,
subnet: DataColumnSubnetId,
chain: &BeaconChain<T>,
) -> Result<GossipVerifiedDataColumn<T, O>, GossipDataColumnError> {
let DataColumnSidecar::Fulu(data_column_fulu) = data_column.as_ref() else {
return Err(GossipDataColumnError::InvalidVariant);
};
let column_slot = data_column.slot();
verify_data_column_sidecar(&data_column, &chain.spec)?;
verify_index_matches_subnet(&data_column, subnet, &chain.spec)?;
@@ -506,10 +534,10 @@ pub fn validate_data_column_sidecar_for_gossip<T: BeaconChainTypes, O: Observati
return Err(GossipDataColumnError::PriorKnownUnpublished);
}
verify_column_inclusion_proof(&data_column)?;
let parent_block = verify_parent_block_and_finalized_descendant(data_column.clone(), chain)?;
verify_column_inclusion_proof(data_column_fulu)?;
let parent_block = verify_parent_block_and_finalized_descendant(data_column_fulu, chain)?;
verify_slot_higher_than_parent(&parent_block, column_slot)?;
verify_proposer_and_signature(&data_column, &parent_block, chain)?;
verify_proposer_and_signature(data_column_fulu, &parent_block, chain)?;
let kzg = &chain.kzg;
let kzg_verified_data_column = verify_kzg_for_data_column(data_column.clone(), kzg)
.map_err(|(_, e)| GossipDataColumnError::InvalidKzgProof(e))?;
@@ -519,7 +547,7 @@ pub fn validate_data_column_sidecar_for_gossip<T: BeaconChainTypes, O: Observati
.write()
.observe_slashable(
column_slot,
data_column.block_proposer_index(),
data_column_fulu.block_proposer_index(),
data_column.block_root(),
)
.map_err(|e| GossipDataColumnError::BeaconChainError(Box::new(e.into())))?;
@@ -540,16 +568,18 @@ fn verify_data_column_sidecar<E: EthSpec>(
data_column: &DataColumnSidecar<E>,
spec: &ChainSpec,
) -> Result<(), GossipDataColumnError> {
if data_column.index >= E::number_of_columns() as u64 {
return Err(GossipDataColumnError::InvalidColumnIndex(data_column.index));
if *data_column.index() >= E::number_of_columns() as u64 {
return Err(GossipDataColumnError::InvalidColumnIndex(
*data_column.index(),
));
}
if data_column.kzg_commitments.is_empty() {
if data_column.kzg_commitments().is_empty() {
return Err(GossipDataColumnError::UnexpectedDataColumn);
}
let cells_len = data_column.column.len();
let commitments_len = data_column.kzg_commitments.len();
let proofs_len = data_column.kzg_proofs.len();
let cells_len = data_column.column().len();
let commitments_len = data_column.kzg_commitments().len();
let proofs_len = data_column.kzg_proofs().len();
let max_blobs_per_block = spec.max_blobs_per_block(data_column.epoch()) as usize;
if commitments_len > max_blobs_per_block {
@@ -582,23 +612,24 @@ fn verify_is_unknown_sidecar<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
column_sidecar: &DataColumnSidecar<T::EthSpec>,
) -> Result<(), GossipDataColumnError> {
if chain
if let Some(observation_key) = chain
.observed_column_sidecars
.read()
.proposer_is_known(column_sidecar)
.map_err(|e| GossipDataColumnError::BeaconChainError(Box::new(e.into())))?
.observation_key_is_known(column_sidecar)
.map_err(|e: ObservedDataSidecarsError| {
GossipDataColumnError::BeaconChainError(Box::new(e.into()))
})?
{
return Err(GossipDataColumnError::PriorKnown {
proposer: column_sidecar.block_proposer_index(),
slot: column_sidecar.slot(),
index: column_sidecar.index,
observation_key,
index: *column_sidecar.index(),
});
}
Ok(())
}
fn verify_column_inclusion_proof<E: EthSpec>(
data_column: &DataColumnSidecar<E>,
data_column: &DataColumnSidecarFulu<E>,
) -> Result<(), GossipDataColumnError> {
let _timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_INCLUSION_PROOF_VERIFICATION);
if !data_column.verify_inclusion_proof() {
@@ -622,7 +653,7 @@ fn verify_slot_higher_than_parent(
}
fn verify_parent_block_and_finalized_descendant<T: BeaconChainTypes>(
data_column: Arc<DataColumnSidecar<T::EthSpec>>,
data_column: &DataColumnSidecarFulu<T::EthSpec>,
chain: &BeaconChain<T>,
) -> Result<ProtoBlock, GossipDataColumnError> {
let fork_choice = chain.canonical_head.fork_choice_read_lock();
@@ -646,7 +677,7 @@ fn verify_parent_block_and_finalized_descendant<T: BeaconChainTypes>(
}
fn verify_proposer_and_signature<T: BeaconChainTypes>(
data_column: &DataColumnSidecar<T::EthSpec>,
data_column: &DataColumnSidecarFulu<T::EthSpec>,
parent_block: &ProtoBlock,
chain: &BeaconChain<T>,
) -> Result<(), GossipDataColumnError> {
@@ -723,7 +754,7 @@ fn verify_index_matches_subnet<E: EthSpec>(
subnet: DataColumnSubnetId,
spec: &ChainSpec,
) -> Result<(), GossipDataColumnError> {
let expected_subnet = DataColumnSubnetId::from_column_index(data_column.index, spec);
let expected_subnet = DataColumnSubnetId::from_column_index(*data_column.index(), spec);
if expected_subnet != subnet {
return Err(GossipDataColumnError::InvalidSubnetId {
received: subnet.into(),
@@ -772,27 +803,31 @@ pub fn observe_gossip_data_column<T: BeaconChainTypes>(
data_column_sidecar: &DataColumnSidecar<T::EthSpec>,
chain: &BeaconChain<T>,
) -> Result<(), GossipDataColumnError> {
// Now the signature is valid, store the proposal so we don't accept another data column sidecar
// with the same `ColumnIndex`. It's important to double-check that the proposer still
// hasn't been observed so we don't have a race-condition when verifying two blocks
// Pre-gloas: Now the signature is valid, store the proposal so we don't accept another data column sidecar
// with the same `ColumnIndex`.
// Post-gloas: The block associated with the sidecar has already been imported into fork choice. Store the
// columns `beacon_block_root` so we don't accept another data column sidecar with the same `ColumnIndex`.
// It's important to double-check that the `Observationkey` still
// hasn't been observed so we don't have a race-condition when verifying two sidecars
// simultaneously.
//
// Note: If this DataColumnSidecar goes on to fail full verification, we do not evict it from the
// seen_cache as alternate data_column_sidecars for the same identifier can still be retrieved over
// rpc. Evicting them from this cache would allow faster propagation over gossip. So we
// allow retrieval of potentially valid blocks over rpc, but try to punish the proposer for
// allow retrieval of potentially valid sidecars over rpc, but try to punish the proposer for
// signing invalid messages. Issue for more background
// https://github.com/ethereum/consensus-specs/issues/3261
if chain
if let Some(observation_key) = chain
.observed_column_sidecars
.write()
.observe_sidecar(data_column_sidecar)
.map_err(|e| GossipDataColumnError::BeaconChainError(Box::new(e.into())))?
.map_err(|e: ObservedDataSidecarsError| {
GossipDataColumnError::BeaconChainError(Box::new(e.into()))
})?
{
return Err(GossipDataColumnError::PriorKnown {
proposer: data_column_sidecar.block_proposer_index(),
slot: data_column_sidecar.slot(),
index: data_column_sidecar.index,
observation_key,
index: *data_column_sidecar.index(),
});
}
Ok(())
@@ -801,7 +836,8 @@ pub fn observe_gossip_data_column<T: BeaconChainTypes>(
#[cfg(test)]
mod test {
use crate::data_column_verification::{
GossipDataColumnError, GossipVerifiedDataColumn, validate_data_column_sidecar_for_gossip,
GossipDataColumnError, GossipVerifiedDataColumn,
validate_data_column_sidecar_for_gossip_fulu,
};
use crate::observed_data_sidecars::Observe;
use crate::test_utils::{
@@ -810,12 +846,16 @@ mod test {
use eth2::types::BlobsBundle;
use execution_layer::test_utils::generate_blobs;
use std::sync::Arc;
use types::{DataColumnSidecar, DataColumnSubnetId, EthSpec, ForkName, MainnetEthSpec};
use types::{
DataColumnSidecar, DataColumnSidecarFulu, DataColumnSubnetId, EthSpec, ForkName,
MainnetEthSpec,
};
type E = MainnetEthSpec;
// TODO(gloas) make this generic over gloas/fulu
#[tokio::test]
async fn test_validate_data_column_sidecar_for_gossip() {
async fn test_validate_data_column_sidecar_for_gossip_fulu() {
// Setting up harness is slow, we initialise once and use it for all gossip validation tests.
let spec = ForkName::Fulu.make_genesis_spec(E::default_spec());
let harness = BeaconChainHarness::builder(E::default())
@@ -827,19 +867,20 @@ mod test {
harness.advance_slot();
let verify_fn = |column_sidecar: DataColumnSidecar<E>| {
let col_index = column_sidecar.index;
validate_data_column_sidecar_for_gossip::<_, Observe>(
let col_index = *column_sidecar.index();
validate_data_column_sidecar_for_gossip_fulu::<_, Observe>(
column_sidecar.into(),
DataColumnSubnetId::from_column_index(col_index, &harness.spec),
&harness.chain,
)
};
empty_data_column_sidecars_fails_validation(&harness, &verify_fn).await;
empty_data_column_sidecars_fails_validation_fulu(&harness, &verify_fn).await;
data_column_sidecar_commitments_exceed_max_blobs_per_block(&harness, &verify_fn).await;
}
// TODO(gloas) make this generic over gloas/fulu
#[tokio::test]
async fn test_new_for_block_publishing() {
async fn test_new_for_block_publishing_fulu() {
// Setting up harness is slow, we initialise once and use it for all gossip validation tests.
let spec = ForkName::Fulu.make_genesis_spec(E::default_spec());
let harness = BeaconChainHarness::builder(E::default())
@@ -856,11 +897,12 @@ mod test {
&harness.chain,
)
};
empty_data_column_sidecars_fails_validation(&harness, &verify_fn).await;
empty_data_column_sidecars_fails_validation_fulu(&harness, &verify_fn).await;
data_column_sidecar_commitments_exceed_max_blobs_per_block(&harness, &verify_fn).await;
}
async fn empty_data_column_sidecars_fails_validation<D>(
// TODO(gloas) make this generic over gloas/fulu
async fn empty_data_column_sidecars_fails_validation_fulu<D>(
harness: &BeaconChainHarness<EphemeralHarnessType<E>>,
verify_fn: &impl Fn(DataColumnSidecar<E>) -> Result<D, GossipDataColumnError>,
) {
@@ -873,7 +915,7 @@ mod test {
.await;
let index = 0;
let column_sidecar = DataColumnSidecar::<E> {
let column_sidecar: DataColumnSidecar<E> = DataColumnSidecar::Fulu(DataColumnSidecarFulu {
index,
column: vec![].try_into().unwrap(),
kzg_commitments: vec![].try_into().unwrap(),
@@ -884,7 +926,7 @@ mod test {
.body()
.kzg_commitments_merkle_proof()
.unwrap(),
};
});
let result = verify_fn(column_sidecar);
assert!(matches!(

View File

@@ -1,5 +1,5 @@
use crate::fetch_blobs::{EngineGetBlobsOutput, FetchEngineBlobError};
use crate::observed_block_producers::ProposalKey;
use crate::observed_data_sidecars::ObservationKey;
use crate::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes};
use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2};
use kzg::Kzg;
@@ -67,27 +67,25 @@ impl<T: BeaconChainTypes> FetchBlobsBeaconAdapter<T> {
.map_err(FetchEngineBlobError::RequestFailed)
}
pub(crate) fn blobs_known_for_proposal(
pub(crate) fn blobs_known_for_observation_key(
&self,
proposer: u64,
slot: Slot,
observation_key: ObservationKey,
) -> Option<HashSet<u64>> {
let proposer_key = ProposalKey::new(proposer, slot);
self.chain
.observed_blob_sidecars
.read()
.known_for_proposal(&proposer_key)
.known_for_observation_key(&observation_key)
.cloned()
}
pub(crate) fn data_column_known_for_proposal(
pub(crate) fn data_column_known_for_observation_key(
&self,
proposal_key: ProposalKey,
observation_key: ObservationKey,
) -> Option<HashSet<ColumnIndex>> {
self.chain
.observed_column_sidecars
.read()
.known_for_proposal(&proposal_key)
.known_for_observation_key(&observation_key)
.cloned()
}

View File

@@ -18,7 +18,7 @@ use crate::data_column_verification::{KzgVerifiedCustodyDataColumn, KzgVerifiedD
#[cfg_attr(test, double)]
use crate::fetch_blobs::fetch_blobs_beacon_adapter::FetchBlobsBeaconAdapter;
use crate::kzg_utils::blobs_to_data_column_sidecars;
use crate::observed_block_producers::ProposalKey;
use crate::observed_data_sidecars::ObservationKey;
use crate::validator_monitor::timestamp_now;
use crate::{
AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, BlockError,
@@ -193,9 +193,10 @@ async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
&kzg_commitments_proof,
)?;
if let Some(observed_blobs) =
chain_adapter.blobs_known_for_proposal(block.message().proposer_index(), block.slot())
{
let observation_key =
ObservationKey::new_proposer_key(block.message().proposer_index(), block.slot());
if let Some(observed_blobs) = chain_adapter.blobs_known_for_observation_key(observation_key) {
blob_sidecar_list.retain(|blob| !observed_blobs.contains(&blob.blob_index()));
if blob_sidecar_list.is_empty() {
debug!(
@@ -380,7 +381,7 @@ async fn compute_custody_columns_to_import<T: BeaconChainTypes>(
.map(|data_columns| {
data_columns
.into_iter()
.filter(|col| custody_columns_indices.contains(&col.index))
.filter(|col| custody_columns_indices.contains(col.index()))
.map(|col| {
KzgVerifiedCustodyDataColumn::from_asserted_custody(
KzgVerifiedDataColumn::from_execution_verified(col),
@@ -391,9 +392,11 @@ async fn compute_custody_columns_to_import<T: BeaconChainTypes>(
.map_err(FetchEngineBlobError::DataColumnSidecarError)?;
// Only consider columns that are not already observed on gossip.
if let Some(observed_columns) = chain_adapter_cloned.data_column_known_for_proposal(
ProposalKey::new(block.message().proposer_index(), block.slot()),
) {
let observation_key = ObservationKey::from_block(&block, block_root, &spec);
if let Some(observed_columns) =
chain_adapter_cloned.data_column_known_for_observation_key(observation_key)
{
custody_columns.retain(|col| !observed_columns.contains(&col.index()));
if custody_columns.is_empty() {
return Ok(vec![]);

View File

@@ -156,7 +156,7 @@ mod get_blobs_v2 {
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
// All data columns already seen on gossip
mock_adapter
.expect_data_column_known_for_proposal()
.expect_data_column_known_for_observation_key()
.returning(|_| Some(hashset![0, 1, 2]));
// No blobs should be processed
mock_adapter.expect_process_engine_blobs().times(0);
@@ -193,7 +193,7 @@ mod get_blobs_v2 {
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
mock_adapter
.expect_data_column_known_for_proposal()
.expect_data_column_known_for_observation_key()
.returning(|_| None);
mock_adapter
.expect_cached_data_column_indexes()
@@ -332,8 +332,8 @@ mod get_blobs_v1 {
.expect_cached_blob_indexes()
.returning(|_| None);
mock_adapter
.expect_blobs_known_for_proposal()
.returning(|_, _| None);
.expect_blobs_known_for_observation_key()
.returning(|_| None);
// Returned blobs should be processed
mock_process_engine_blobs_result(
&mut mock_adapter,
@@ -427,8 +427,8 @@ mod get_blobs_v1 {
.expect_cached_blob_indexes()
.returning(|_| None);
mock_adapter
.expect_blobs_known_for_proposal()
.returning(move |_, _| Some(all_blob_indices.clone()));
.expect_blobs_known_for_observation_key()
.returning(move |_| Some(all_blob_indices.clone()));
// **WHEN**: Trigger `fetch_blobs` on the block
let custody_columns: [ColumnIndex; 3] = [0, 1, 2];
@@ -467,8 +467,8 @@ mod get_blobs_v1 {
.expect_cached_blob_indexes()
.returning(|_| None);
mock_adapter
.expect_blobs_known_for_proposal()
.returning(|_, _| None);
.expect_blobs_known_for_observation_key()
.returning(|_| None);
mock_process_engine_blobs_result(
&mut mock_adapter,
Ok(AvailabilityProcessingStatus::Imported(block_root)),

View File

@@ -61,12 +61,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let unique_column_indices = historical_data_column_sidecar_list
.iter()
.map(|item| item.index)
.map(|item| *item.index())
.collect::<HashSet<_>>();
let mut slot_and_column_index_to_data_columns = historical_data_column_sidecar_list
.iter()
.map(|data_column| ((data_column.slot(), data_column.index), data_column))
.map(|data_column| ((data_column.slot(), *data_column.index()), data_column))
.collect::<HashMap<_, _>>();
let forward_blocks_iter = self
@@ -80,13 +80,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let (block_root, slot) = block_iter_result
.map_err(|e| HistoricalDataColumnError::BeaconChainError(Box::new(e)))?;
let fork_name = self.spec.fork_name_at_slot::<T::EthSpec>(slot);
for column_index in unique_column_indices.clone() {
if let Some(data_column) =
slot_and_column_index_to_data_columns.remove(&(slot, column_index))
{
if self
.store
.get_data_column(&block_root, &data_column.index)?
.get_data_column(&block_root, data_column.index(), fork_name)?
.is_some()
{
continue;

View File

@@ -6,12 +6,13 @@ use rayon::prelude::*;
use ssz_types::{FixedVector, VariableList};
use std::sync::Arc;
use tracing::instrument;
use tree_hash::TreeHash;
use types::data::{Cell, DataColumn, DataColumnSidecarError};
use types::kzg_ext::KzgCommitments;
use types::{
Blob, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSidecarList,
EthSpec, Hash256, KzgCommitment, KzgProof, SignedBeaconBlock, SignedBeaconBlockHeader,
SignedBlindedBeaconBlock,
Blob, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSidecarFulu,
DataColumnSidecarGloas, DataColumnSidecarList, EthSpec, Hash256, KzgCommitment, KzgProof,
SignedBeaconBlock, SignedBeaconBlockHeader, SignedBlindedBeaconBlock, Slot,
};
/// Converts a blob ssz List object to an array to be used with the kzg
@@ -59,22 +60,22 @@ where
let mut commitments = Vec::new();
for data_column in data_column_iter {
let col_index = data_column.index;
let col_index = *data_column.index();
if data_column.column.is_empty() {
if data_column.column().is_empty() {
return Err((Some(col_index), KzgError::KzgVerificationFailed));
}
for cell in &data_column.column {
for cell in data_column.column() {
cells.push(ssz_cell_to_crypto_cell::<E>(cell).map_err(|e| (Some(col_index), e))?);
column_indices.push(col_index);
}
for &proof in &data_column.kzg_proofs {
for &proof in data_column.kzg_proofs() {
proofs.push(Bytes48::from(proof));
}
for &commitment in &data_column.kzg_commitments {
for &commitment in data_column.kzg_commitments() {
commitments.push(Bytes48::from(commitment));
}
@@ -171,7 +172,6 @@ pub fn blobs_to_data_column_sidecars<E: EthSpec>(
.body()
.blob_kzg_commitments()
.map_err(|_err| DataColumnSidecarError::PreDeneb)?;
let kzg_commitments_inclusion_proof = block.message().body().kzg_commitments_merkle_proof()?;
let signed_block_header = block.signed_block_header();
if cell_proofs.len() != blobs.len() * E::number_of_columns() {
@@ -207,14 +207,27 @@ pub fn blobs_to_data_column_sidecars<E: EthSpec>(
})
.collect::<Result<Vec<_>, KzgError>>()?;
build_data_column_sidecars(
kzg_commitments.clone(),
kzg_commitments_inclusion_proof,
signed_block_header,
blob_cells_and_proofs_vec,
spec,
)
.map_err(DataColumnSidecarError::BuildSidecarFailed)
if block.fork_name_unchecked().gloas_enabled() {
build_data_column_sidecars_gloas(
kzg_commitments.clone(),
signed_block_header.message.tree_hash_root(),
block.slot(),
blob_cells_and_proofs_vec,
spec,
)
.map_err(DataColumnSidecarError::BuildSidecarFailed)
} else {
let kzg_commitments_inclusion_proof =
block.message().body().kzg_commitments_merkle_proof()?;
build_data_column_sidecars_fulu(
kzg_commitments.clone(),
kzg_commitments_inclusion_proof,
signed_block_header,
blob_cells_and_proofs_vec,
spec,
)
.map_err(DataColumnSidecarError::BuildSidecarFailed)
}
}
pub fn compute_cells<E: EthSpec>(blobs: &[&Blob<E>], kzg: &Kzg) -> Result<Vec<KzgCell>, KzgError> {
@@ -235,13 +248,20 @@ pub fn compute_cells<E: EthSpec>(blobs: &[&Blob<E>], kzg: &Kzg) -> Result<Vec<Kz
Ok(cells_flattened)
}
pub(crate) fn build_data_column_sidecars<E: EthSpec>(
pub(crate) fn build_data_column_sidecars_fulu<E: EthSpec>(
kzg_commitments: KzgCommitments<E>,
kzg_commitments_inclusion_proof: FixedVector<Hash256, E::KzgCommitmentsInclusionProofDepth>,
signed_block_header: SignedBeaconBlockHeader,
blob_cells_and_proofs_vec: Vec<CellsAndKzgProofs>,
spec: &ChainSpec,
) -> Result<DataColumnSidecarList<E>, String> {
if spec
.fork_name_at_slot::<E>(signed_block_header.message.slot)
.gloas_enabled()
{
return Err("Attempting to construct Fulu data columns post-Gloas".to_owned());
}
let number_of_columns = E::number_of_columns();
let max_blobs_per_block = spec
.max_blobs_per_block(signed_block_header.message.slot.epoch(E::slots_per_epoch()))
@@ -283,7 +303,7 @@ pub(crate) fn build_data_column_sidecars<E: EthSpec>(
.enumerate()
.map(
|(index, (col, proofs))| -> Result<Arc<DataColumnSidecar<E>>, String> {
Ok(Arc::new(DataColumnSidecar {
Ok(Arc::new(DataColumnSidecar::Fulu(DataColumnSidecarFulu {
index: index as u64,
column: DataColumn::<E>::try_from(col)
.map_err(|e| format!("MaxBlobCommitmentsPerBlock exceeded: {e:?}"))?,
@@ -292,7 +312,7 @@ pub(crate) fn build_data_column_sidecars<E: EthSpec>(
.map_err(|e| format!("MaxBlobCommitmentsPerBlock exceeded: {e:?}"))?,
signed_block_header: signed_block_header.clone(),
kzg_commitments_inclusion_proof: kzg_commitments_inclusion_proof.clone(),
}))
})))
},
)
.collect();
@@ -300,6 +320,75 @@ pub(crate) fn build_data_column_sidecars<E: EthSpec>(
sidecars
}
pub(crate) fn build_data_column_sidecars_gloas<E: EthSpec>(
kzg_commitments: KzgCommitments<E>,
beacon_block_root: Hash256,
slot: Slot,
blob_cells_and_proofs_vec: Vec<CellsAndKzgProofs>,
spec: &ChainSpec,
) -> Result<DataColumnSidecarList<E>, String> {
if !spec.fork_name_at_slot::<E>(slot).gloas_enabled() {
return Err("Attempting to construct Gloas data columns pre-Gloas".to_owned());
}
let number_of_columns = E::number_of_columns();
let max_blobs_per_block = spec.max_blobs_per_block(slot.epoch(E::slots_per_epoch())) as usize;
let mut columns = vec![Vec::with_capacity(max_blobs_per_block); number_of_columns];
let mut column_kzg_proofs = vec![Vec::with_capacity(max_blobs_per_block); number_of_columns];
for (blob_cells, blob_cell_proofs) in blob_cells_and_proofs_vec {
// we iterate over each column, and we construct the column from "top to bottom",
// pushing on the cell and the corresponding proof at each column index. we do this for
// each blob (i.e. the outer loop).
for col in 0..number_of_columns {
let cell = blob_cells
.get(col)
.ok_or(format!("Missing blob cell at index {col}"))?;
let cell: Vec<u8> = cell.to_vec();
let cell =
Cell::<E>::try_from(cell).map_err(|e| format!("BytesPerCell exceeded: {e:?}"))?;
let proof = blob_cell_proofs
.get(col)
.ok_or(format!("Missing blob cell KZG proof at index {col}"))?;
let column = columns
.get_mut(col)
.ok_or(format!("Missing data column at index {col}"))?;
let column_proofs = column_kzg_proofs
.get_mut(col)
.ok_or(format!("Missing data column proofs at index {col}"))?;
column.push(cell);
column_proofs.push(*proof);
}
}
let sidecars: Result<Vec<Arc<DataColumnSidecar<E>>>, String> = columns
.into_iter()
.zip(column_kzg_proofs)
.enumerate()
.map(
|(index, (col, proofs))| -> Result<Arc<DataColumnSidecar<E>>, String> {
Ok(Arc::new(DataColumnSidecar::Gloas(DataColumnSidecarGloas {
index: index as u64,
column: DataColumn::<E>::try_from(col)
.map_err(|e| format!("MaxBlobCommitmentsPerBlock exceeded: {e:?}"))?,
kzg_commitments: kzg_commitments.clone(),
kzg_proofs: VariableList::try_from(proofs)
.map_err(|e| format!("MaxBlobCommitmentsPerBlock exceeded: {e:?}"))?,
beacon_block_root,
slot,
})))
},
)
.collect();
sidecars
}
// TODO(gloas) blob reconstruction will fail post gloas. We should just return `Blob`s
// instead of a `BlobSidecar`. This might require a beacon api spec change as well.
/// Reconstruct blobs from a subset of data column sidecars (requires at least 50%).
///
/// If `blob_indices_opt` is `None`, this function attempts to reconstruct all blobs associated
@@ -314,7 +403,7 @@ pub fn reconstruct_blobs<E: EthSpec>(
spec: &ChainSpec,
) -> Result<BlobSidecarList<E>, String> {
// Sort data columns by index to ensure ascending order for KZG operations
data_columns.sort_unstable_by_key(|dc| dc.index);
data_columns.sort_unstable_by_key(|dc| *dc.index());
let first_data_column = data_columns
.first()
@@ -323,7 +412,7 @@ pub fn reconstruct_blobs<E: EthSpec>(
let blob_indices: Vec<usize> = match blob_indices_opt {
Some(indices) => indices.into_iter().map(|i| i as usize).collect(),
None => {
let num_of_blobs = first_data_column.kzg_commitments.len();
let num_of_blobs = first_data_column.kzg_commitments().len();
(0..num_of_blobs).collect()
}
};
@@ -335,7 +424,7 @@ pub fn reconstruct_blobs<E: EthSpec>(
let mut cell_ids: Vec<u64> = vec![];
for data_column in &data_columns {
let cell = data_column
.column
.column()
.get(row_index)
.ok_or(format!("Missing data column at row index {row_index}"))
.and_then(|cell| {
@@ -343,7 +432,7 @@ pub fn reconstruct_blobs<E: EthSpec>(
})?;
cells.push(cell);
cell_ids.push(data_column.index);
cell_ids.push(*data_column.index());
}
let num_cells_original_blob = E::number_of_columns() / 2;
@@ -374,8 +463,13 @@ pub fn reconstruct_blobs<E: EthSpec>(
row_index,
blob,
signed_block,
first_data_column.signed_block_header.clone(),
&first_data_column.kzg_commitments_inclusion_proof,
first_data_column
.signed_block_header()
.map_err(|e| format!("{e:?}"))?
.clone(),
first_data_column
.kzg_commitments_inclusion_proof()
.map_err(|e| format!("{e:?}"))?,
kzg_proof,
)
.map(Arc::new)
@@ -395,7 +489,7 @@ pub fn reconstruct_data_columns<E: EthSpec>(
spec: &ChainSpec,
) -> Result<DataColumnSidecarList<E>, KzgError> {
// Sort data columns by index to ensure ascending order for KZG operations
data_columns.sort_unstable_by_key(|dc| dc.index);
data_columns.sort_unstable_by_key(|dc| *dc.index());
let first_data_column = data_columns
.first()
@@ -403,37 +497,47 @@ pub fn reconstruct_data_columns<E: EthSpec>(
"data_columns should have at least one element".to_string(),
))?;
let num_of_blobs = first_data_column.kzg_commitments.len();
let num_of_blobs = first_data_column.kzg_commitments().len();
let blob_cells_and_proofs_vec =
(0..num_of_blobs)
.into_par_iter()
.map(|row_index| {
let mut cells: Vec<KzgCellRef> = vec![];
let mut cell_ids: Vec<u64> = vec![];
for data_column in &data_columns {
let cell = data_column.column.get(row_index).ok_or(
KzgError::InconsistentArrayLength(format!(
"Missing data column at row index {row_index}"
)),
)?;
let blob_cells_and_proofs_vec = (0..num_of_blobs)
.into_par_iter()
.map(|row_index| {
let mut cells: Vec<KzgCellRef> = vec![];
let mut cell_ids: Vec<u64> = vec![];
for data_column in &data_columns {
let cell = data_column.column().get(row_index).ok_or(
KzgError::InconsistentArrayLength(format!(
"Missing data column at row index {row_index}"
)),
)?;
cells.push(ssz_cell_to_crypto_cell::<E>(cell)?);
cell_ids.push(data_column.index);
}
kzg.recover_cells_and_compute_kzg_proofs(&cell_ids, &cells)
})
.collect::<Result<Vec<_>, KzgError>>()?;
// Clone sidecar elements from existing data column, no need to re-compute
build_data_column_sidecars(
first_data_column.kzg_commitments.clone(),
first_data_column.kzg_commitments_inclusion_proof.clone(),
first_data_column.signed_block_header.clone(),
blob_cells_and_proofs_vec,
spec,
)
.map_err(KzgError::ReconstructFailed)
cells.push(ssz_cell_to_crypto_cell::<E>(cell)?);
cell_ids.push(*data_column.index());
}
kzg.recover_cells_and_compute_kzg_proofs(&cell_ids, &cells)
})
.collect::<Result<Vec<_>, KzgError>>()?;
match first_data_column.as_ref() {
DataColumnSidecar::Fulu(first_column) => {
// Clone sidecar elements from existing data column, no need to re-compute
build_data_column_sidecars_fulu(
first_column.kzg_commitments.clone(),
first_column.kzg_commitments_inclusion_proof.clone(),
first_column.signed_block_header.clone(),
blob_cells_and_proofs_vec,
spec,
)
.map_err(KzgError::ReconstructFailed)
}
DataColumnSidecar::Gloas(first_column) => build_data_column_sidecars_gloas(
first_column.kzg_commitments.clone(),
first_column.beacon_block_root,
first_column.slot,
blob_cells_and_proofs_vec,
spec,
)
.map_err(KzgError::ReconstructFailed),
}
}
#[cfg(test)]
@@ -455,12 +559,13 @@ mod test {
// Loading and initializing PeerDAS KZG is expensive and slow, so we group the tests together
// only load it once.
// TODO(Gloas) make this generic over fulu/gloas, or write a separate function for Gloas
#[test]
fn test_build_data_columns_sidecars() {
let spec = ForkName::Fulu.make_genesis_spec(E::default_spec());
let kzg = get_kzg();
test_build_data_columns_empty(&kzg, &spec);
test_build_data_columns(&kzg, &spec);
test_build_data_columns_fulu(&kzg, &spec);
test_reconstruct_data_columns(&kzg, &spec);
test_reconstruct_data_columns_unordered(&kzg, &spec);
test_reconstruct_blobs_from_data_columns(&kzg, &spec);
@@ -494,8 +599,10 @@ mod test {
assert!(column_sidecars.is_empty());
}
// TODO(gloas) create `test_build_data_columns_gloas` and make sure its called
// in the relevant places
#[track_caller]
fn test_build_data_columns(kzg: &Kzg, spec: &ChainSpec) {
fn test_build_data_columns_fulu(kzg: &Kzg, spec: &ChainSpec) {
// Using at least 2 blobs to make sure we're arranging the data columns correctly.
let num_of_blobs = 2;
let (signed_block, blobs, proofs) =
@@ -520,18 +627,21 @@ mod test {
assert_eq!(column_sidecars.len(), E::number_of_columns());
for (idx, col_sidecar) in column_sidecars.iter().enumerate() {
assert_eq!(col_sidecar.index, idx as u64);
assert_eq!(*col_sidecar.index(), idx as u64);
assert_eq!(col_sidecar.kzg_commitments.len(), num_of_blobs);
assert_eq!(col_sidecar.column.len(), num_of_blobs);
assert_eq!(col_sidecar.kzg_proofs.len(), num_of_blobs);
assert_eq!(col_sidecar.kzg_commitments().len(), num_of_blobs);
assert_eq!(col_sidecar.column().len(), num_of_blobs);
assert_eq!(col_sidecar.kzg_proofs().len(), num_of_blobs);
assert_eq!(col_sidecar.kzg_commitments, block_kzg_commitments);
assert_eq!(col_sidecar.kzg_commitments().clone(), block_kzg_commitments);
assert_eq!(
col_sidecar.kzg_commitments_inclusion_proof,
col_sidecar
.kzg_commitments_inclusion_proof()
.unwrap()
.clone(),
block_kzg_commitments_inclusion_proof
);
assert!(col_sidecar.verify_inclusion_proof());
assert!(col_sidecar.as_fulu().unwrap().verify_inclusion_proof());
}
}

View File

@@ -3,27 +3,36 @@
//! Only `BlobSidecar`s that have completed proposer signature verification can be added
//! to this cache to reduce DoS risks.
use crate::observed_block_producers::ProposalKey;
use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
use std::sync::Arc;
use types::{BlobSidecar, ChainSpec, DataColumnSidecar, EthSpec, Slot};
use types::{BlobSidecar, ChainSpec, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot};
type ValidatorIndex = u64;
type BeaconBlockRoot = Hash256;
#[derive(Debug, PartialEq)]
pub enum Error {
/// The slot of the provided `ObservableDataSidecar` is prior to finalization and should not have been provided
/// to this function. This is an internal error.
FinalizedDataSidecar { slot: Slot, finalized_slot: Slot },
FinalizedDataSidecar {
slot: Slot,
finalized_slot: Slot,
},
/// The data sidecar contains an invalid index, the data sidecar is invalid.
/// Note: The invalid data should have been caught and flagged as an error much before reaching
/// here.
InvalidDataIndex(u64),
// An unexpected data sidecar variant was received
UnexpectedVariant,
}
pub trait ObservableDataSidecar {
fn slot(&self) -> Slot;
fn block_proposer_index(&self) -> u64;
fn index(&self) -> u64;
fn proposer_index(&self) -> Option<ValidatorIndex>;
fn beacon_block_root(&self) -> BeaconBlockRoot;
fn max_num_of_items(spec: &ChainSpec, slot: Slot) -> usize;
}
@@ -32,14 +41,18 @@ impl<E: EthSpec> ObservableDataSidecar for BlobSidecar<E> {
self.slot()
}
fn block_proposer_index(&self) -> u64 {
self.block_proposer_index()
}
fn index(&self) -> u64 {
self.index
}
fn proposer_index(&self) -> Option<ValidatorIndex> {
Some(self.block_proposer_index())
}
fn beacon_block_root(&self) -> BeaconBlockRoot {
self.block_root()
}
fn max_num_of_items(spec: &ChainSpec, slot: Slot) -> usize {
spec.max_blobs_per_block(slot.epoch(E::slots_per_epoch())) as usize
}
@@ -50,12 +63,16 @@ impl<E: EthSpec> ObservableDataSidecar for DataColumnSidecar<E> {
self.slot()
}
fn block_proposer_index(&self) -> u64 {
self.block_proposer_index()
fn index(&self) -> u64 {
*self.index()
}
fn index(&self) -> u64 {
self.index
fn proposer_index(&self) -> Option<ValidatorIndex> {
self.as_fulu().map(|d| d.block_proposer_index()).ok()
}
fn beacon_block_root(&self) -> BeaconBlockRoot {
self.block_root()
}
fn max_num_of_items(_spec: &ChainSpec, _slot: Slot) -> usize {
@@ -63,6 +80,58 @@ impl<E: EthSpec> ObservableDataSidecar for DataColumnSidecar<E> {
}
}
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub enum ObservationKey {
ProposerKey((ValidatorIndex, Slot)),
BlockRootKey((BeaconBlockRoot, Slot)),
}
impl ObservationKey {
pub fn new<T: ObservableDataSidecar, E: EthSpec>(
sidecar: &T,
spec: &ChainSpec,
) -> Result<Self, Error> {
let slot = sidecar.slot();
if spec.fork_name_at_slot::<E>(slot).gloas_enabled() {
Ok(Self::new_block_root_key(sidecar.beacon_block_root(), slot))
} else if let Some(proposer_index) = sidecar.proposer_index() {
Ok(Self::new_proposer_key(proposer_index, slot))
} else {
Err(Error::UnexpectedVariant)
}
}
pub fn from_block<E: EthSpec>(
block: &SignedBeaconBlock<E>,
block_root: Hash256,
spec: &ChainSpec,
) -> Self {
let slot = block.slot();
if spec.fork_name_at_slot::<E>(slot).gloas_enabled() {
Self::new_block_root_key(block_root, slot)
} else {
Self::new_proposer_key(block.message().proposer_index(), slot)
}
}
pub fn new_proposer_key(proposer_index: ValidatorIndex, slot: Slot) -> Self {
Self::ProposerKey((proposer_index, slot))
}
pub fn new_block_root_key(beacon_block_root: BeaconBlockRoot, slot: Slot) -> Self {
Self::BlockRootKey((beacon_block_root, slot))
}
pub fn slot(&self) -> Slot {
match self {
ObservationKey::ProposerKey((_, slot)) => *slot,
ObservationKey::BlockRootKey((_, slot)) => *slot,
}
}
}
/// Maintains a cache of seen `ObservableDataSidecar`s that are received over gossip
/// and have been gossip verified.
///
@@ -71,15 +140,15 @@ impl<E: EthSpec> ObservableDataSidecar for DataColumnSidecar<E> {
///
/// Note: To prevent DoS attacks, this cache must include only items that have received some DoS resistance
/// like checking the proposer signature.
pub struct ObservedDataSidecars<T: ObservableDataSidecar> {
pub struct ObservedDataSidecars<T: ObservableDataSidecar, E: EthSpec> {
finalized_slot: Slot,
/// Stores all received data indices for a given `(ValidatorIndex, Slot)` tuple.
items: HashMap<ProposalKey, HashSet<u64>>,
/// Stores all received data indices for a given `ObservationKey`.
items: HashMap<ObservationKey, HashSet<u64>>,
spec: Arc<ChainSpec>,
_phantom: PhantomData<T>,
_phantom: PhantomData<(T, E)>,
}
impl<T: ObservableDataSidecar> ObservedDataSidecars<T> {
impl<T: ObservableDataSidecar, E: EthSpec> ObservedDataSidecars<T, E> {
/// Instantiates `Self` with `finalized_slot == 0`.
pub fn new(spec: Arc<ChainSpec>) -> Self {
Self {
@@ -90,42 +159,48 @@ impl<T: ObservableDataSidecar> ObservedDataSidecars<T> {
}
}
/// Observe the `data_sidecar` at (`data_sidecar.block_proposer_index, data_sidecar.slot`).
/// This will update `self` so future calls to it indicate that this `data_sidecar` is known.
/// Observe the `data_sidecar` at `ObservationKey`.
/// Observes the sidecar, returning `Some(key)` if it was already known, `None` if newly added.
///
/// The supplied `data_sidecar` **MUST** have completed proposer signature verification.
pub fn observe_sidecar(&mut self, data_sidecar: &T) -> Result<bool, Error> {
/// This will update `self` so future calls indicate that this `data_sidecar` is known.
pub fn observe_sidecar(&mut self, data_sidecar: &T) -> Result<Option<ObservationKey>, Error> {
self.sanitize_data_sidecar(data_sidecar)?;
let observation_key = ObservationKey::new::<T, E>(data_sidecar, &self.spec)?;
let data_indices = self
.items
.entry(ProposalKey {
slot: data_sidecar.slot(),
proposer: data_sidecar.block_proposer_index(),
})
.entry(observation_key.clone())
.or_insert_with(|| {
HashSet::with_capacity(T::max_num_of_items(&self.spec, data_sidecar.slot()))
});
let did_not_exist = data_indices.insert(data_sidecar.index());
Ok(!did_not_exist)
Ok((!did_not_exist).then_some(observation_key))
}
/// Returns `true` if the `data_sidecar` has already been observed in the cache within the prune window.
pub fn proposer_is_known(&self, data_sidecar: &T) -> Result<bool, Error> {
/// Returns `Some(key)` if the sidecar has already been observed, `None` otherwise.
pub fn observation_key_is_known(
&self,
data_sidecar: &T,
) -> Result<Option<ObservationKey>, Error> {
self.sanitize_data_sidecar(data_sidecar)?;
let observation_key = ObservationKey::new::<T, E>(data_sidecar, &self.spec)?;
let is_known = self
.items
.get(&ProposalKey {
slot: data_sidecar.slot(),
proposer: data_sidecar.block_proposer_index(),
})
.get(&observation_key)
.is_some_and(|indices| indices.contains(&data_sidecar.index()));
Ok(is_known)
Ok(is_known.then_some(observation_key))
}
pub fn known_for_proposal(&self, proposal_key: &ProposalKey) -> Option<&HashSet<u64>> {
self.items.get(proposal_key)
pub fn known_for_observation_key(
&self,
observation_key: &ObservationKey,
) -> Option<&HashSet<u64>> {
self.items.get(observation_key)
}
fn sanitize_data_sidecar(&self, data_sidecar: &T) -> Result<(), Error> {
@@ -150,7 +225,7 @@ impl<T: ObservableDataSidecar> ObservedDataSidecars<T> {
}
self.finalized_slot = finalized_slot;
self.items.retain(|k, _| k.slot > finalized_slot);
self.items.retain(|k, _| k.slot() > finalized_slot);
}
}
@@ -182,38 +257,101 @@ impl ObservationStrategy for DoNotObserve {
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::test_spec;
use bls::Hash256;
use super::*;
use bls::{FixedBytesExtended, Signature};
use std::sync::Arc;
use types::{Epoch, MainnetEthSpec};
use types::{
BeaconBlockHeader, DataColumnSidecarFulu, DataColumnSidecarGloas, ForkName, MainnetEthSpec,
SignedBeaconBlockHeader,
};
type E = MainnetEthSpec;
fn get_blob_sidecar(slot: u64, proposer_index: u64, index: u64) -> Arc<BlobSidecar<E>> {
let mut blob_sidecar = BlobSidecar::empty();
blob_sidecar.signed_block_header.message.slot = slot.into();
blob_sidecar.signed_block_header.message.proposer_index = proposer_index;
blob_sidecar.index = index;
Arc::new(blob_sidecar)
/// Creates a Fulu DataColumnSidecar for testing.
/// Keyed by (proposer_index, slot) in the observation cache.
fn get_data_column_sidecar_fulu(
slot: u64,
proposer_index: u64,
index: u64,
) -> Arc<DataColumnSidecar<E>> {
let signed_block_header = SignedBeaconBlockHeader {
message: BeaconBlockHeader {
slot: slot.into(),
proposer_index,
parent_root: Hash256::ZERO,
state_root: Hash256::ZERO,
// Use proposer_index as a simple way to generate different block roots
body_root: Hash256::from_low_u64_be(proposer_index),
},
signature: Signature::empty(),
};
Arc::new(DataColumnSidecar::Fulu(DataColumnSidecarFulu {
index,
column: vec![].try_into().unwrap(),
kzg_commitments: vec![].try_into().unwrap(),
kzg_proofs: vec![].try_into().unwrap(),
signed_block_header,
kzg_commitments_inclusion_proof: vec![
Hash256::ZERO;
E::kzg_commitments_inclusion_proof_depth()
]
.try_into()
.unwrap(),
}))
}
/// Creates a Gloas DataColumnSidecar for testing.
/// Keyed by (beacon_block_root, slot) in the observation cache.
fn get_data_column_sidecar_gloas(
slot: u64,
beacon_block_root: Hash256,
index: u64,
) -> Arc<DataColumnSidecar<E>> {
Arc::new(DataColumnSidecar::Gloas(DataColumnSidecarGloas {
index,
column: vec![].try_into().unwrap(),
kzg_commitments: vec![].try_into().unwrap(),
kzg_proofs: vec![].try_into().unwrap(),
slot: slot.into(),
beacon_block_root,
}))
}
fn get_sidecar(
slot: u64,
key: u64,
index: u64,
fork_name: ForkName,
) -> Arc<DataColumnSidecar<E>> {
if fork_name.gloas_enabled() {
get_data_column_sidecar_gloas(slot, Hash256::from_low_u64_be(key), index)
} else {
get_data_column_sidecar_fulu(slot, key, index)
}
}
#[test]
fn pruning() {
let spec = Arc::new(test_spec::<E>());
let mut cache = ObservedDataSidecars::<BlobSidecar<E>>::new(spec);
let fork_name = spec.fork_name_at_slot::<E>(Slot::new(0));
let mut cache = ObservedDataSidecars::<DataColumnSidecar<E>, E>::new(spec.clone());
assert_eq!(cache.finalized_slot, 0, "finalized slot is zero");
assert_eq!(cache.items.len(), 0, "no slots should be present");
// Slot 0, index 0
let proposer_index_a = 420;
let sidecar_a = get_blob_sidecar(0, proposer_index_a, 0);
let key_a = 420;
let sidecar_a = get_sidecar(0, key_a, 0, fork_name);
assert_eq!(
cache.observe_sidecar(&sidecar_a),
cache
.observe_sidecar(sidecar_a.as_ref())
.map(|o| o.is_some()),
Ok(false),
"can observe proposer, indicates proposer unobserved"
"can observe sidecar, indicates sidecar unobserved"
);
/*
@@ -224,18 +362,17 @@ mod tests {
assert_eq!(
cache.items.len(),
1,
"only one (validator_index, slot) tuple should be present"
"only one observation key should be present"
);
let cached_blob_indices = cache
let observation_key =
&ObservationKey::new::<DataColumnSidecar<E>, E>(sidecar_a.as_ref(), &spec).unwrap();
let cached_indices = cache
.items
.get(&ProposalKey::new(proposer_index_a, Slot::new(0)))
.get(observation_key)
.expect("slot zero should be present");
assert_eq!(
cached_blob_indices.len(),
1,
"only one proposer should be present"
);
assert_eq!(cached_indices.len(), 1, "only one index should be present");
/*
* Check that a prune at the genesis slot does nothing.
@@ -243,17 +380,16 @@ mod tests {
cache.prune(Slot::new(0));
let observation_key =
ObservationKey::new::<DataColumnSidecar<E>, E>(sidecar_a.as_ref(), &spec).unwrap();
assert_eq!(cache.finalized_slot, 0, "finalized slot is zero");
assert_eq!(cache.items.len(), 1, "only one slot should be present");
let cached_blob_indices = cache
let cached_indices = cache
.items
.get(&ProposalKey::new(proposer_index_a, Slot::new(0)))
.get(&observation_key)
.expect("slot zero should be present");
assert_eq!(
cached_blob_indices.len(),
1,
"only one proposer should be present"
);
assert_eq!(cached_indices.len(), 1, "only one index should be present");
/*
* Check that a prune empties the cache
@@ -272,10 +408,10 @@ mod tests {
*/
// First slot of finalized epoch
let block_b = get_blob_sidecar(E::slots_per_epoch(), 419, 0);
let sidecar_b = get_sidecar(E::slots_per_epoch(), 419, 0, fork_name);
assert_eq!(
cache.observe_sidecar(&block_b),
cache.observe_sidecar(sidecar_b.as_ref()),
Err(Error::FinalizedDataSidecar {
slot: E::slots_per_epoch().into(),
finalized_slot: E::slots_per_epoch().into(),
@@ -286,34 +422,34 @@ mod tests {
assert_eq!(cache.items.len(), 0, "sidecar was not added");
/*
* Check that we _can_ insert a non-finalized block
* Check that we _can_ insert a non-finalized sidecar
*/
let three_epochs = E::slots_per_epoch() * 3;
// First slot of finalized epoch
let proposer_index_b = 421;
let block_b = get_blob_sidecar(three_epochs, proposer_index_b, 0);
let key_b = 421;
let sidecar_b = get_sidecar(three_epochs, key_b, 0, fork_name);
assert_eq!(
cache.observe_sidecar(&block_b),
cache
.observe_sidecar(sidecar_b.as_ref())
.map(|o| o.is_some()),
Ok(false),
"can insert non-finalized block"
"can insert non-finalized sidecar"
);
let observation_key =
ObservationKey::new::<DataColumnSidecar<E>, E>(sidecar_b.as_ref(), &spec).unwrap();
assert_eq!(cache.items.len(), 1, "only one slot should be present");
let cached_blob_indices = cache
let cached_indices = cache
.items
.get(&ProposalKey::new(proposer_index_b, Slot::new(three_epochs)))
.get(&observation_key)
.expect("the three epochs slot should be present");
assert_eq!(
cached_blob_indices.len(),
1,
"only one proposer should be present"
);
assert_eq!(cached_indices.len(), 1, "only one index should be present");
/*
* Check that a prune doesnt wipe later blocks
* Check that a prune doesnt wipe later sidecars
*/
let two_epochs = E::slots_per_epoch() * 2;
@@ -325,183 +461,294 @@ mod tests {
"finalized slot is updated"
);
let observation_key =
ObservationKey::new::<DataColumnSidecar<E>, E>(sidecar_b.as_ref(), &spec).unwrap();
assert_eq!(cache.items.len(), 1, "only one slot should be present");
let cached_blob_indices = cache
let cached_indices = cache
.items
.get(&ProposalKey::new(proposer_index_b, Slot::new(three_epochs)))
.get(&observation_key)
.expect("the three epochs slot should be present");
assert_eq!(
cached_blob_indices.len(),
1,
"only one proposer should be present"
);
assert_eq!(cached_indices.len(), 1, "only one index should be present");
}
#[test]
fn simple_observations() {
let spec = Arc::new(test_spec::<E>());
let mut cache = ObservedDataSidecars::<BlobSidecar<E>>::new(spec.clone());
let fork_name = spec.fork_name_at_slot::<E>(Slot::new(0));
let mut cache = ObservedDataSidecars::<DataColumnSidecar<E>, E>::new(spec.clone());
// Slot 0, index 0
let proposer_index_a = 420;
let sidecar_a = get_blob_sidecar(0, proposer_index_a, 0);
let key_a = 420;
let sidecar_a = get_sidecar(0, key_a, 0, fork_name);
assert_eq!(
cache.proposer_is_known(&sidecar_a),
cache
.observation_key_is_known(sidecar_a.as_ref())
.map(|o| o.is_some()),
Ok(false),
"no observation in empty cache"
);
assert_eq!(
cache.observe_sidecar(&sidecar_a),
cache
.observe_sidecar(sidecar_a.as_ref())
.map(|o| o.is_some()),
Ok(false),
"can observe proposer, indicates proposer unobserved"
"can observe sidecar, indicates sidecar unobserved"
);
assert_eq!(
cache.proposer_is_known(&sidecar_a),
cache
.observation_key_is_known(sidecar_a.as_ref())
.map(|o| o.is_some()),
Ok(true),
"observed block is indicated as true"
"observed sidecar is indicated as true"
);
assert_eq!(
cache.observe_sidecar(&sidecar_a),
cache
.observe_sidecar(sidecar_a.as_ref())
.map(|o| o.is_some()),
Ok(true),
"observing again indicates true"
);
assert_eq!(cache.finalized_slot, 0, "finalized slot is zero");
assert_eq!(cache.items.len(), 1, "only one slot should be present");
let cached_blob_indices = cache
let cached_indices = cache
.items
.get(&ProposalKey::new(proposer_index_a, Slot::new(0)))
.get(
&ObservationKey::new::<DataColumnSidecar<E>, E>(sidecar_a.as_ref(), &spec).unwrap(),
)
.expect("slot zero should be present");
assert_eq!(
cached_blob_indices.len(),
1,
"only one proposer should be present"
);
assert_eq!(cached_indices.len(), 1, "only one index should be present");
// Slot 1, proposer 0
// Slot 1, different key
let proposer_index_b = 421;
let sidecar_b = get_blob_sidecar(1, proposer_index_b, 0);
let key_b = 421;
let sidecar_b = get_sidecar(1, key_b, 0, fork_name);
assert_eq!(
cache.proposer_is_known(&sidecar_b),
cache
.observation_key_is_known(sidecar_b.as_ref())
.map(|o| o.is_some()),
Ok(false),
"no observation for new slot"
);
assert_eq!(
cache.observe_sidecar(&sidecar_b),
cache
.observe_sidecar(sidecar_b.as_ref())
.map(|o| o.is_some()),
Ok(false),
"can observe proposer for new slot, indicates proposer unobserved"
"can observe sidecar for new slot, indicates sidecar unobserved"
);
assert_eq!(
cache.proposer_is_known(&sidecar_b),
cache
.observation_key_is_known(sidecar_b.as_ref())
.map(|o| o.is_some()),
Ok(true),
"observed block in slot 1 is indicated as true"
"observed sidecar in slot 1 is indicated as true"
);
assert_eq!(
cache.observe_sidecar(&sidecar_b),
cache
.observe_sidecar(sidecar_b.as_ref())
.map(|o| o.is_some()),
Ok(true),
"observing slot 1 again indicates true"
);
assert_eq!(cache.finalized_slot, 0, "finalized slot is zero");
assert_eq!(cache.items.len(), 2, "two slots should be present");
let cached_blob_indices = cache
let cached_indices = cache
.items
.get(&ProposalKey::new(proposer_index_a, Slot::new(0)))
.get(
&ObservationKey::new::<DataColumnSidecar<E>, E>(sidecar_a.as_ref(), &spec).unwrap(),
)
.expect("slot zero should be present");
assert_eq!(
cached_blob_indices.len(),
cached_indices.len(),
1,
"only one proposer should be present in slot 0"
"only one index should be present in slot 0"
);
let cached_blob_indices = cache
let cached_indices = cache
.items
.get(&ProposalKey::new(proposer_index_b, Slot::new(1)))
.expect("slot zero should be present");
.get(
&ObservationKey::new::<DataColumnSidecar<E>, E>(sidecar_b.as_ref(), &spec).unwrap(),
)
.expect("slot one should be present");
assert_eq!(
cached_blob_indices.len(),
cached_indices.len(),
1,
"only one proposer should be present in slot 1"
"only one index should be present in slot 1"
);
// Slot 0, index 1
let sidecar_c = get_blob_sidecar(0, proposer_index_a, 1);
// Slot 0, index 1 (same key as sidecar_a)
let sidecar_c = get_sidecar(0, key_a, 1, fork_name);
assert_eq!(
cache.proposer_is_known(&sidecar_c),
cache
.observation_key_is_known(sidecar_c.as_ref())
.map(|o| o.is_some()),
Ok(false),
"no observation for new index"
);
assert_eq!(
cache.observe_sidecar(&sidecar_c),
cache
.observe_sidecar(sidecar_c.as_ref())
.map(|o| o.is_some()),
Ok(false),
"can observe new index, indicates sidecar unobserved for new index"
);
assert_eq!(
cache.proposer_is_known(&sidecar_c),
cache
.observation_key_is_known(sidecar_c.as_ref())
.map(|o| o.is_some()),
Ok(true),
"observed new sidecar is indicated as true"
);
assert_eq!(
cache.observe_sidecar(&sidecar_c),
cache
.observe_sidecar(sidecar_c.as_ref())
.map(|o| o.is_some()),
Ok(true),
"observing new sidecar again indicates true"
);
assert_eq!(cache.finalized_slot, 0, "finalized slot is zero");
assert_eq!(cache.items.len(), 2, "two slots should be present");
let cached_blob_indices = cache
let cached_indices = cache
.items
.get(&ProposalKey::new(proposer_index_a, Slot::new(0)))
.get(
&ObservationKey::new::<DataColumnSidecar<E>, E>(sidecar_a.as_ref(), &spec).unwrap(),
)
.expect("slot zero should be present");
assert_eq!(
cached_blob_indices.len(),
cached_indices.len(),
2,
"two blob indices should be present in slot 0"
"two indices should be present in slot 0"
);
// Create a sidecar sharing slot and proposer but with a different block root.
let mut sidecar_d: BlobSidecar<E> = BlobSidecar {
index: sidecar_c.index,
blob: sidecar_c.blob.clone(),
kzg_commitment: sidecar_c.kzg_commitment,
kzg_proof: sidecar_c.kzg_proof,
signed_block_header: sidecar_c.signed_block_header.clone(),
kzg_commitment_inclusion_proof: sidecar_c.kzg_commitment_inclusion_proof.clone(),
};
sidecar_d.signed_block_header.message.body_root = Hash256::repeat_byte(7);
// Create a sidecar with a different key at the same slot
// For Fulu: different proposer_index creates a different observation key
// For Gloas: different block_root creates a different observation key
let key_c = 422;
let sidecar_d = get_sidecar(0, key_c, 0, fork_name);
assert_eq!(
cache.proposer_is_known(&sidecar_d),
Ok(true),
"there has been an observation for this proposer index"
cache
.observation_key_is_known(sidecar_d.as_ref())
.map(|o| o.is_some()),
Ok(false),
"no observation for new key"
);
assert_eq!(
cache.observe_sidecar(&sidecar_d),
Ok(true),
"indicates sidecar proposer was observed"
cache
.observe_sidecar(sidecar_d.as_ref())
.map(|o| o.is_some()),
Ok(false),
"can observe sidecar, indicates sidecar unobserved for new key"
);
let cached_blob_indices = cache
let cached_indices = cache
.items
.get(&ProposalKey::new(proposer_index_a, Slot::new(0)))
.expect("slot zero should be present");
.get(
&ObservationKey::new::<DataColumnSidecar<E>, E>(sidecar_d.as_ref(), &spec).unwrap(),
)
.expect("sidecar_d's observation key should be present");
assert_eq!(
cached_blob_indices.len(),
2,
"two blob indices should be present in slot 0"
cached_indices.len(),
1,
"one index should be present for sidecar_d's observation key"
);
// Try adding an out of bounds index
let invalid_index = spec.max_blobs_per_block(Epoch::new(0));
let sidecar_d = get_blob_sidecar(0, proposer_index_a, invalid_index);
let invalid_index = E::number_of_columns() as u64;
let sidecar_e = get_sidecar(0, key_a, invalid_index, fork_name);
assert_eq!(
cache.observe_sidecar(&sidecar_d),
cache.observe_sidecar(sidecar_e.as_ref()),
Err(Error::InvalidDataIndex(invalid_index)),
"cannot add an index > MaxBlobsPerBlock"
"cannot add an index >= NUMBER_OF_COLUMNS"
);
}
/// Test that sidecars with the same observation key but different indices
/// are tracked correctly.
#[test]
fn multiple_indices_same_key() {
let spec = Arc::new(test_spec::<E>());
let fork_name = spec.fork_name_at_slot::<E>(Slot::new(0));
let mut cache = ObservedDataSidecars::<DataColumnSidecar<E>, E>::new(spec.clone());
let key = 420;
// Add multiple indices for the same observation key
for index in 0..5 {
let sidecar = get_sidecar(0, key, index, fork_name);
assert_eq!(
cache.observe_sidecar(sidecar.as_ref()).map(|o| o.is_some()),
Ok(false),
"index {index} should be new"
);
}
// Verify all indices are tracked under one observation key
assert_eq!(cache.items.len(), 1, "only one observation key");
let sidecar_for_key = get_sidecar(0, key, 0, fork_name);
let observation_key =
ObservationKey::new::<DataColumnSidecar<E>, E>(sidecar_for_key.as_ref(), &spec)
.unwrap();
let cached_indices = cache.items.get(&observation_key).unwrap();
assert_eq!(cached_indices.len(), 5, "five indices should be tracked");
// Re-observing should indicate they're already known
for index in 0..5 {
let sidecar = get_sidecar(0, key, index, fork_name);
assert_eq!(
cache.observe_sidecar(sidecar.as_ref()).map(|o| o.is_some()),
Ok(true),
"index {index} should already be known"
);
}
}
/// Test the known_for_observation_key method
#[test]
fn known_for_observation_key() {
let spec = Arc::new(test_spec::<E>());
let fork_name = spec.fork_name_at_slot::<E>(Slot::new(0));
let mut cache = ObservedDataSidecars::<DataColumnSidecar<E>, E>::new(spec.clone());
let key = 420;
let sidecar = get_sidecar(0, key, 0, fork_name);
let observation_key =
ObservationKey::new::<DataColumnSidecar<E>, E>(sidecar.as_ref(), &spec).unwrap();
// Before observation, should return None
assert!(cache.known_for_observation_key(&observation_key).is_none());
// After observation, should return the set of indices
cache.observe_sidecar(sidecar.as_ref()).unwrap();
let known = cache
.known_for_observation_key(&observation_key)
.expect("should be known");
assert!(known.contains(&0));
assert_eq!(known.len(), 1);
// Add more indices
let sidecar_1 = get_sidecar(0, key, 1, fork_name);
let sidecar_2 = get_sidecar(0, key, 2, fork_name);
cache.observe_sidecar(sidecar_1.as_ref()).unwrap();
cache.observe_sidecar(sidecar_2.as_ref()).unwrap();
let known = cache
.known_for_observation_key(&observation_key)
.expect("should be known");
assert!(known.contains(&0));
assert!(known.contains(&1));
assert!(known.contains(&2));
assert_eq!(known.len(), 3);
}
}

View File

@@ -3,7 +3,7 @@ use crate::block_verification_types::{AsBlock, RpcBlock};
use crate::custody_context::NodeCustodyType;
use crate::data_column_verification::CustodyDataColumn;
use crate::graffiti_calculator::GraffitiSettings;
use crate::kzg_utils::build_data_column_sidecars;
use crate::kzg_utils::{build_data_column_sidecars_fulu, build_data_column_sidecars_gloas};
use crate::observed_operations::ObservationOutcome;
pub use crate::persisted_beacon_chain::PersistedBeaconChain;
use crate::{BeaconBlockResponseWrapper, get_block_root};
@@ -2441,7 +2441,12 @@ where
// Blobs are stored as data columns from Fulu (PeerDAS)
if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) {
let columns = self.chain.get_data_columns(&block_root).unwrap().unwrap();
let fork_name = self.spec.fork_name_at_epoch(block.epoch());
let columns = self
.chain
.get_data_columns(&block_root, fork_name)
.unwrap()
.unwrap();
let custody_columns = columns
.into_iter()
.map(CustodyDataColumn::from_asserted_custody)
@@ -2470,7 +2475,7 @@ where
// currently have any knowledge of the columns being custodied.
let columns = generate_data_column_sidecars_from_block(&block, &self.spec)
.into_iter()
.filter(|d| sampling_columns.contains(&d.index))
.filter(|d| sampling_columns.contains(d.index()))
.map(CustodyDataColumn::from_asserted_custody)
.collect::<Vec<_>>();
RpcBlock::new_with_custody_columns(Some(block_root), block, columns)?
@@ -3209,10 +3214,10 @@ where
let verified_columns = generate_data_column_sidecars_from_block(block, &self.spec)
.into_iter()
.filter(|c| custody_columns.contains(&c.index))
.filter(|c| custody_columns.contains(c.index()))
.map(|sidecar| {
let subnet_id =
DataColumnSubnetId::from_column_index(sidecar.index, &self.spec);
DataColumnSubnetId::from_column_index(*sidecar.index(), &self.spec);
self.chain
.verify_data_column_sidecar_for_gossip(sidecar, subnet_id)
})
@@ -3363,39 +3368,76 @@ pub fn generate_data_column_sidecars_from_block<E: EthSpec>(
.unwrap();
let signed_block_header = block.signed_block_header();
// load the precomputed column sidecar to avoid computing them for every block in the tests.
let template_data_columns = RuntimeVariableList::<DataColumnSidecar<E>>::from_ssz_bytes(
TEST_DATA_COLUMN_SIDECARS_SSZ,
E::number_of_columns(),
)
.unwrap();
// Load the precomputed column sidecar to avoid computing them for every block in the tests.
// Then repeat the cells and proofs for every blob
if block.fork_name_unchecked().gloas_enabled() {
let template_data_columns =
RuntimeVariableList::<DataColumnSidecarGloas<E>>::from_ssz_bytes(
TEST_DATA_COLUMN_SIDECARS_SSZ,
E::number_of_columns(),
)
.unwrap();
let (cells, proofs) = template_data_columns
.into_iter()
.map(|sidecar| {
let DataColumnSidecar {
column, kzg_proofs, ..
} = sidecar;
// There's only one cell per column for a single blob
let cell_bytes: Vec<u8> = column.into_iter().next().unwrap().into();
let kzg_cell = cell_bytes.try_into().unwrap();
let kzg_proof = kzg_proofs.into_iter().next().unwrap();
(kzg_cell, kzg_proof)
})
.collect::<(Vec<_>, Vec<_>)>();
let (cells, proofs) = template_data_columns
.into_iter()
.map(|sidecar| {
let DataColumnSidecarGloas {
column, kzg_proofs, ..
} = sidecar;
// There's only one cell per column for a single blob
let cell_bytes: Vec<u8> = column.into_iter().next().unwrap().into();
let kzg_cell = cell_bytes.try_into().unwrap();
let kzg_proof = kzg_proofs.into_iter().next().unwrap();
(kzg_cell, kzg_proof)
})
.collect::<(Vec<_>, Vec<_>)>();
// Repeat the cells and proofs for every blob
let blob_cells_and_proofs_vec =
vec![(cells.try_into().unwrap(), proofs.try_into().unwrap()); kzg_commitments.len()];
let blob_cells_and_proofs_vec =
vec![(cells.try_into().unwrap(), proofs.try_into().unwrap()); kzg_commitments.len()];
build_data_column_sidecars(
kzg_commitments.clone(),
kzg_commitments_inclusion_proof,
signed_block_header,
blob_cells_and_proofs_vec,
spec,
)
.unwrap()
build_data_column_sidecars_gloas(
kzg_commitments.clone(),
signed_block_header.message.tree_hash_root(),
signed_block_header.message.slot,
blob_cells_and_proofs_vec,
spec,
)
.unwrap()
} else {
// load the precomputed column sidecar to avoid computing them for every block in the tests.
let template_data_columns =
RuntimeVariableList::<DataColumnSidecarFulu<E>>::from_ssz_bytes(
TEST_DATA_COLUMN_SIDECARS_SSZ,
E::number_of_columns(),
)
.unwrap();
let (cells, proofs) = template_data_columns
.into_iter()
.map(|sidecar| {
let DataColumnSidecarFulu {
column, kzg_proofs, ..
} = sidecar;
// There's only one cell per column for a single blob
let cell_bytes: Vec<u8> = column.into_iter().next().unwrap().into();
let kzg_cell = cell_bytes.try_into().unwrap();
let kzg_proof = kzg_proofs.into_iter().next().unwrap();
(kzg_cell, kzg_proof)
})
.collect::<(Vec<_>, Vec<_>)>();
let blob_cells_and_proofs_vec =
vec![(cells.try_into().unwrap(), proofs.try_into().unwrap()); kzg_commitments.len()];
build_data_column_sidecars_fulu(
kzg_commitments.clone(),
kzg_commitments_inclusion_proof,
signed_block_header,
blob_cells_and_proofs_vec,
spec,
)
.unwrap()
}
}
pub fn generate_data_column_indices_rand_order<E: EthSpec>() -> Vec<CustodyIndex> {