diff --git a/beacon_node/beacon_chain/benches/benches.rs b/beacon_node/beacon_chain/benches/benches.rs index c09af00be6..aae627da13 100644 --- a/beacon_node/beacon_chain/benches/benches.rs +++ b/beacon_node/beacon_chain/benches/benches.rs @@ -5,16 +5,16 @@ use beacon_chain::test_utils::get_kzg; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use bls::Signature; -use kzg::KzgCommitment; +use kzg::{KzgCommitment, KzgProof}; use types::{ beacon_block_body::KzgCommitments, BeaconBlock, BeaconBlockDeneb, Blob, BlobsList, ChainSpec, - EmptyBlock, EthSpec, MainnetEthSpec, SignedBeaconBlock, + EmptyBlock, EthSpec, KzgProofs, MainnetEthSpec, SignedBeaconBlock, }; fn create_test_block_and_blobs( num_of_blobs: usize, spec: &ChainSpec, -) -> (SignedBeaconBlock, BlobsList) { +) -> (SignedBeaconBlock, BlobsList, KzgProofs) { let mut block = BeaconBlock::Deneb(BeaconBlockDeneb::empty(spec)); let mut body = block.body_mut(); let blob_kzg_commitments = body.blob_kzg_commitments_mut().unwrap(); @@ -27,8 +27,9 @@ fn create_test_block_and_blobs( .map(|_| Blob::::default()) .collect::>() .into(); + let proofs = vec![KzgProof::empty(); num_of_blobs * spec.number_of_columns as usize].into(); - (signed_block, blobs) + (signed_block, blobs, proofs) } fn all_benches(c: &mut Criterion) { @@ -37,10 +38,11 @@ fn all_benches(c: &mut Criterion) { let kzg = get_kzg(&spec); for blob_count in [1, 2, 3, 6] { - let (signed_block, blobs) = create_test_block_and_blobs::(blob_count, &spec); + let (signed_block, blobs, proofs) = create_test_block_and_blobs::(blob_count, &spec); let column_sidecars = blobs_to_data_column_sidecars( &blobs.iter().collect::>(), + proofs.to_vec(), &signed_block, &kzg, &spec, diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index d9ac2fa6ea..d6475de243 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -31,6 +31,7 @@ use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; use crate::eth1_finalization_cache::{Eth1FinalizationCache, Eth1FinalizationData}; use crate::events::ServerSentEventHandler; use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, PreparePayloadHandle}; +use crate::fetch_blobs::EngineGetBlobsOutput; use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult}; use crate::graffiti_calculator::GraffitiCalculator; use crate::kzg_utils::reconstruct_blobs; @@ -121,7 +122,6 @@ use store::{ KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp, }; use task_executor::{ShutdownReason, TaskExecutor}; -use tokio::sync::oneshot; use tokio_stream::Stream; use tracing::{debug, error, info, trace, warn}; use tree_hash::TreeHash; @@ -3137,16 +3137,11 @@ impl BeaconChain { } /// Process blobs retrieved from the EL and returns the `AvailabilityProcessingStatus`. - /// - /// `data_column_recv`: An optional receiver for `DataColumnSidecarList`. - /// If PeerDAS is enabled, this receiver will be provided and used to send - /// the `DataColumnSidecar`s once they have been successfully computed. pub async fn process_engine_blobs( self: &Arc, slot: Slot, block_root: Hash256, - blobs: FixedBlobSidecarList, - data_column_recv: Option>>, + engine_get_blobs_output: EngineGetBlobsOutput, ) -> Result { // If this block has already been imported to forkchoice it must have been available, so // we don't need to process its blobs again. @@ -3160,15 +3155,12 @@ impl BeaconChain { // process_engine_blobs is called for both pre and post PeerDAS. However, post PeerDAS // consumers don't expect the blobs event to fire erratically. - if !self - .spec - .is_peer_das_enabled_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch())) - { + if let EngineGetBlobsOutput::Blobs(blobs) = &engine_get_blobs_output { self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref)); } let r = self - .check_engine_blob_availability_and_import(slot, block_root, blobs, data_column_recv) + .check_engine_blobs_availability_and_import(slot, block_root, engine_get_blobs_output) .await; self.remove_notified(&block_root, r) } @@ -3618,20 +3610,24 @@ impl BeaconChain { .await } - async fn check_engine_blob_availability_and_import( + async fn check_engine_blobs_availability_and_import( self: &Arc, slot: Slot, block_root: Hash256, - blobs: FixedBlobSidecarList, - data_column_recv: Option>>, + engine_get_blobs_output: EngineGetBlobsOutput, ) -> Result { - self.check_blobs_for_slashability(block_root, &blobs)?; - let availability = self.data_availability_checker.put_engine_blobs( - block_root, - slot.epoch(T::EthSpec::slots_per_epoch()), - blobs, - data_column_recv, - )?; + let availability = match engine_get_blobs_output { + EngineGetBlobsOutput::Blobs(blobs) => { + self.check_blobs_for_slashability(block_root, &blobs)?; + self.data_availability_checker + .put_engine_blobs(block_root, blobs)? + } + EngineGetBlobsOutput::CustodyColumns(data_columns) => { + self.check_columns_for_slashability(block_root, &data_columns)?; + self.data_availability_checker + .put_engine_data_columns(block_root, data_columns)? + } + }; self.process_availability(slot, availability, || Ok(())) .await @@ -3645,27 +3641,7 @@ impl BeaconChain { block_root: Hash256, custody_columns: DataColumnSidecarList, ) -> Result { - // Need to scope this to ensure the lock is dropped before calling `process_availability` - // Even an explicit drop is not enough to convince the borrow checker. - { - let mut slashable_cache = self.observed_slashable.write(); - // Assumes all items in custody_columns are for the same block_root - if let Some(column) = custody_columns.first() { - let header = &column.signed_block_header; - if verify_header_signature::(self, header).is_ok() { - slashable_cache - .observe_slashable( - header.message.slot, - header.message.proposer_index, - block_root, - ) - .map_err(|e| BlockError::BeaconChainError(e.into()))?; - if let Some(slasher) = self.slasher.as_ref() { - slasher.accept_block_header(header.clone()); - } - } - } - } + self.check_columns_for_slashability(block_root, &custody_columns)?; // This slot value is purely informative for the consumers of // `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot. @@ -3677,6 +3653,31 @@ impl BeaconChain { .await } + fn check_columns_for_slashability( + self: &Arc, + block_root: Hash256, + custody_columns: &DataColumnSidecarList, + ) -> Result<(), BlockError> { + let mut slashable_cache = self.observed_slashable.write(); + // Assumes all items in custody_columns are for the same block_root + if let Some(column) = custody_columns.first() { + let header = &column.signed_block_header; + if verify_header_signature::(self, header).is_ok() { + slashable_cache + .observe_slashable( + header.message.slot, + header.message.proposer_index, + block_root, + ) + .map_err(|e| BlockError::BeaconChainError(e.into()))?; + if let Some(slasher) = self.slasher.as_ref() { + slasher.accept_block_header(header.clone()); + } + } + } + Ok(()) + } + /// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents` /// /// An error is returned if the block was unable to be imported. It may be partially imported @@ -5798,15 +5799,26 @@ impl BeaconChain { let kzg_proofs = Vec::from(proofs); let kzg = self.kzg.as_ref(); - - // TODO(fulu): we no longer need blob proofs from PeerDAS and could avoid computing. - kzg_utils::validate_blobs::( - kzg, - expected_kzg_commitments, - blobs.iter().collect(), - &kzg_proofs, - ) - .map_err(BlockProductionError::KzgError)?; + if self + .spec + .is_peer_das_enabled_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch())) + { + kzg_utils::validate_blobs_and_cell_proofs::( + kzg, + blobs.iter().collect(), + &kzg_proofs, + expected_kzg_commitments, + ) + .map_err(BlockProductionError::KzgError)?; + } else { + kzg_utils::validate_blobs::( + kzg, + expected_kzg_commitments, + blobs.iter().collect(), + &kzg_proofs, + ) + .map_err(BlockProductionError::KzgError)?; + } Some((kzg_proofs.into(), blobs)) } @@ -7118,27 +7130,6 @@ impl BeaconChain { ); Ok(Some(StoreOp::PutDataColumns(block_root, data_columns))) } - AvailableBlockData::DataColumnsRecv(data_column_recv) => { - // Blobs were available from the EL, in this case we wait for the data columns to be computed (blocking). - let _column_recv_timer = - metrics::start_timer(&metrics::BLOCK_PROCESSING_DATA_COLUMNS_WAIT); - // Unable to receive data columns from sender, sender is either dropped or - // failed to compute data columns from blobs. We restore fork choice here and - // return to avoid inconsistency in database. - let computed_data_columns = data_column_recv - .blocking_recv() - .map_err(|e| format!("Did not receive data columns from sender: {e:?}"))?; - debug!( - %block_root, - count = computed_data_columns.len(), - "Writing data columns to store" - ); - // TODO(das): Store only this node's custody columns - Ok(Some(StoreOp::PutDataColumns( - block_root, - computed_data_columns, - ))) - } } } } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 39bad34cd6..46ba1bc992 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -97,8 +97,8 @@ use tracing::{debug, error}; use types::{ data_column_sidecar::DataColumnSidecarError, BeaconBlockRef, BeaconState, BeaconStateError, BlobsList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, ExecutionBlockHash, FullPayload, - Hash256, InconsistentFork, PublicKey, PublicKeyBytes, RelativeEpoch, SignedBeaconBlock, - SignedBeaconBlockHeader, Slot, + Hash256, InconsistentFork, KzgProofs, PublicKey, PublicKeyBytes, RelativeEpoch, + SignedBeaconBlock, SignedBeaconBlockHeader, Slot, }; pub const POS_PANDA_BANNER: &str = r#" @@ -755,6 +755,7 @@ pub fn build_blob_data_column_sidecars( chain: &BeaconChain, block: &SignedBeaconBlock>, blobs: BlobsList, + kzg_cell_proofs: KzgProofs, ) -> Result, DataColumnSidecarError> { // Only attempt to build data columns if blobs is non empty to avoid skewing the metrics. if blobs.is_empty() { @@ -766,8 +767,14 @@ pub fn build_blob_data_column_sidecars( &[&blobs.len().to_string()], ); let blob_refs = blobs.iter().collect::>(); - let sidecars = blobs_to_data_column_sidecars(&blob_refs, block, &chain.kzg, &chain.spec) - .discard_timer_on_break(&mut timer)?; + let sidecars = blobs_to_data_column_sidecars( + &blob_refs, + kzg_cell_proofs.to_vec(), + block, + &chain.kzg, + &chain.spec, + ) + .discard_timer_on_break(&mut timer)?; drop(timer); Ok(sidecars) } diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 6f8a0dcb7c..975be33f0b 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -8,7 +8,7 @@ use crate::eth1_finalization_cache::Eth1FinalizationCache; use crate::fork_choice_signal::ForkChoiceSignalTx; use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary}; use crate::graffiti_calculator::{GraffitiCalculator, GraffitiOrigin}; -use crate::kzg_utils::blobs_to_data_column_sidecars; +use crate::kzg_utils::build_data_column_sidecars; use crate::light_client_server_cache::LightClientServerCache; use crate::migrate::{BackgroundMigrator, MigratorConfig}; use crate::observed_data_sidecars::ObservedDataSidecars; @@ -30,6 +30,7 @@ use logging::crit; use operation_pool::{OperationPool, PersistedOperationPool}; use parking_lot::{Mutex, RwLock}; use proto_array::{DisallowedReOrgOffsets, ReOrgThreshold}; +use rayon::prelude::*; use slasher::Slasher; use slot_clock::{SlotClock, TestingSlotClock}; use state_processing::{per_slot_processing, AllCaches}; @@ -40,8 +41,8 @@ use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp}; use task_executor::{ShutdownReason, TaskExecutor}; use tracing::{debug, error, info}; use types::{ - BeaconBlock, BeaconState, BlobSidecarList, ChainSpec, Checkpoint, Epoch, EthSpec, - FixedBytesExtended, Hash256, Signature, SignedBeaconBlock, Slot, + BeaconBlock, BeaconState, BlobSidecarList, ChainSpec, Checkpoint, DataColumnSidecarList, Epoch, + EthSpec, FixedBytesExtended, Hash256, Signature, SignedBeaconBlock, Slot, }; /// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing @@ -546,15 +547,8 @@ where { // After PeerDAS recompute columns from blobs to not force the checkpointz server // into exposing another route. - let blobs = blobs - .iter() - .map(|blob_sidecar| &blob_sidecar.blob) - .collect::>(); let data_columns = - blobs_to_data_column_sidecars(&blobs, &weak_subj_block, &self.kzg, &self.spec) - .map_err(|e| { - format!("Failed to compute weak subjectivity data_columns: {e:?}") - })?; + build_data_columns_from_blobs(&weak_subj_block, &blobs, &self.kzg, &self.spec)?; // TODO(das): only persist the columns under custody store .put_data_columns(&weak_subj_block_root, data_columns) @@ -1138,6 +1132,49 @@ fn descriptive_db_error(item: &str, error: &StoreError) -> String { ) } +/// Build data columns and proofs from blobs. +fn build_data_columns_from_blobs( + block: &SignedBeaconBlock, + blobs: &BlobSidecarList, + kzg: &Kzg, + spec: &ChainSpec, +) -> Result, String> { + let blob_cells_and_proofs_vec = blobs + .into_par_iter() + .map(|blob_sidecar| { + let kzg_blob_ref = blob_sidecar + .blob + .as_ref() + .try_into() + .map_err(|e| format!("Failed to convert blob to kzg blob: {e:?}"))?; + let cells_and_proofs = kzg + .compute_cells_and_proofs(kzg_blob_ref) + .map_err(|e| format!("Failed to compute cell kzg proofs: {e:?}"))?; + Ok(cells_and_proofs) + }) + .collect::, String>>()?; + + let data_columns = { + let beacon_block_body = block.message().body(); + let kzg_commitments = beacon_block_body + .blob_kzg_commitments() + .cloned() + .map_err(|e| format!("Unexpected pre Deneb block: {e:?}"))?; + let kzg_commitments_inclusion_proof = beacon_block_body + .kzg_commitments_merkle_proof() + .map_err(|e| format!("Failed to compute kzg commitments merkle proof: {e:?}"))?; + build_data_column_sidecars( + kzg_commitments, + kzg_commitments_inclusion_proof, + block.signed_block_header(), + blob_cells_and_proofs_vec, + spec, + ) + .map_err(|e| format!("Failed to compute weak subjectivity data_columns: {e:?}"))? + }; + Ok(data_columns) +} + #[cfg(not(debug_assertions))] #[cfg(test)] mod test { diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 2b7ae9e4d1..033b472da0 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -14,7 +14,6 @@ use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; use task_executor::TaskExecutor; -use tokio::sync::oneshot; use tracing::{debug, error, info_span, Instrument}; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; use types::{ @@ -226,27 +225,45 @@ impl DataAvailabilityChecker { pub fn put_engine_blobs( &self, block_root: Hash256, - block_epoch: Epoch, blobs: FixedBlobSidecarList, - data_columns_recv: Option>>, ) -> Result, AvailabilityCheckError> { - // `data_columns_recv` is always Some if block_root is post-PeerDAS - if let Some(data_columns_recv) = data_columns_recv { - self.availability_cache.put_computed_data_columns_recv( - block_root, - block_epoch, - data_columns_recv, - ) - } else { - let seen_timestamp = self - .slot_clock - .now_duration() - .ok_or(AvailabilityCheckError::SlotClockError)?; - self.availability_cache.put_kzg_verified_blobs( - block_root, - KzgVerifiedBlobList::from_verified(blobs.iter().flatten().cloned(), seen_timestamp), - ) - } + let seen_timestamp = self + .slot_clock + .now_duration() + .ok_or(AvailabilityCheckError::SlotClockError)?; + self.availability_cache.put_kzg_verified_blobs( + block_root, + KzgVerifiedBlobList::from_verified(blobs.iter().flatten().cloned(), seen_timestamp), + ) + } + + /// Put a list of data columns computed from blobs received from the EL pool into the + /// availability cache. + /// + /// This DOES NOT perform KZG proof and inclusion proof verification because + /// - The KZG proofs should have been verified by the trusted EL. + /// - The KZG commitments inclusion proof should have been constructed immediately prior to + /// calling this function so they are assumed to be valid. + /// + /// This method is used if the EL already has the blobs and returns them via the `getBlobsV2` + /// engine method. + /// More details in [fetch_blobs.rs](https://github.com/sigp/lighthouse/blob/44f8add41ea2252769bb967864af95b3c13af8ca/beacon_node/beacon_chain/src/fetch_blobs.rs). + pub fn put_engine_data_columns( + &self, + block_root: Hash256, + data_columns: DataColumnSidecarList, + ) -> Result, AvailabilityCheckError> { + let kzg_verified_custody_columns = data_columns + .into_iter() + .map(|d| { + KzgVerifiedCustodyDataColumn::from_asserted_custody( + KzgVerifiedDataColumn::from_verified(d), + ) + }) + .collect::>(); + + self.availability_cache + .put_kzg_verified_data_columns(block_root, kzg_verified_custody_columns) } /// Check if we've cached other blobs for this block. If it completes a set and we also @@ -704,9 +721,6 @@ pub enum AvailableBlockData { Blobs(BlobSidecarList), /// Block is post-PeerDAS and has more than zero blobs DataColumns(DataColumnSidecarList), - /// Block is post-PeerDAS, has more than zero blobs and we recomputed the columns from the EL's - /// mempool blobs - DataColumnsRecv(oneshot::Receiver>), } /// A fully available block that is ready to be imported into fork choice. @@ -756,7 +770,6 @@ impl AvailableBlock { AvailableBlockData::NoData => false, AvailableBlockData::Blobs(..) => true, AvailableBlockData::DataColumns(_) => false, - AvailableBlockData::DataColumnsRecv(_) => false, } } @@ -782,9 +795,6 @@ impl AvailableBlock { AvailableBlockData::DataColumns(data_columns) => { AvailableBlockData::DataColumns(data_columns.clone()) } - AvailableBlockData::DataColumnsRecv(_) => { - return Err("Can't clone DataColumnsRecv".to_owned()) - } }, blobs_available_timestamp: self.blobs_available_timestamp, spec: self.spec.clone(), diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 4359d7fbdb..f5fd24483a 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -13,13 +13,11 @@ use parking_lot::RwLock; use std::cmp::Ordering; use std::num::NonZeroUsize; use std::sync::Arc; -use tokio::sync::oneshot; use tracing::debug; use types::blob_sidecar::BlobIdentifier; use types::{ - BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar, - DataColumnSidecarList, Epoch, EthSpec, Hash256, RuntimeFixedVector, RuntimeVariableList, - SignedBeaconBlock, + BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar, Epoch, EthSpec, + Hash256, RuntimeFixedVector, RuntimeVariableList, SignedBeaconBlock, }; /// This represents the components of a partially available block @@ -32,12 +30,6 @@ pub struct PendingComponents { pub verified_data_columns: Vec>, pub executed_block: Option>, pub reconstruction_started: bool, - /// Receiver for data columns that are computed asynchronously; - /// - /// If `data_column_recv` is `Some`, it means data column computation or reconstruction has been - /// started. This can happen either via engine blobs fetching or data column reconstruction - /// (triggered when >= 50% columns are received via gossip). - pub data_column_recv: Option>>, } impl PendingComponents { @@ -202,13 +194,8 @@ impl PendingComponents { Some(AvailableBlockData::DataColumns(data_columns)) } Ordering::Less => { - // The data_columns_recv is an infallible promise that we will receive all expected - // columns, so we consider the block available. - // We take the receiver as it can't be cloned, and make_available should never - // be called again once it returns `Some`. - self.data_column_recv - .take() - .map(AvailableBlockData::DataColumnsRecv) + // Not enough data columns received yet + None } } } else { @@ -261,7 +248,6 @@ impl PendingComponents { .max(), // TODO(das): To be fixed with https://github.com/sigp/lighthouse/pull/6850 AvailableBlockData::DataColumns(_) => None, - AvailableBlockData::DataColumnsRecv(_) => None, }; let AvailabilityPendingExecutedBlock { @@ -293,7 +279,6 @@ impl PendingComponents { verified_data_columns: vec![], executed_block: None, reconstruction_started: false, - data_column_recv: None, } } @@ -331,17 +316,11 @@ impl PendingComponents { } else { "?" }; - let data_column_recv_count = if self.data_column_recv.is_some() { - 1 - } else { - 0 - }; format!( - "block {} data_columns {}/{} data_columns_recv {}", + "block {} data_columns {}/{}", block_count, self.verified_data_columns.len(), custody_columns_count, - data_column_recv_count, ) } else { let num_expected_blobs = if let Some(block) = self.get_cached_block() { @@ -498,7 +477,6 @@ impl DataAvailabilityCheckerInner { self.state_cache.recover_pending_executed_block(block) })? { // We keep the pending components in the availability cache during block import (#5845). - // `data_column_recv` is returned as part of the available block and is no longer needed here. write_lock.put(block_root, pending_components); drop(write_lock); Ok(Availability::Available(Box::new(available_block))) @@ -551,55 +529,6 @@ impl DataAvailabilityCheckerInner { self.state_cache.recover_pending_executed_block(block) })? { // We keep the pending components in the availability cache during block import (#5845). - // `data_column_recv` is returned as part of the available block and is no longer needed here. - write_lock.put(block_root, pending_components); - drop(write_lock); - Ok(Availability::Available(Box::new(available_block))) - } else { - write_lock.put(block_root, pending_components); - Ok(Availability::MissingComponents(block_root)) - } - } - - /// The `data_column_recv` parameter is a `Receiver` for data columns that are computed - /// asynchronously. This method is used if the EL already has the blobs and returns them via the - /// `getBlobsV1` engine method. More details in [fetch_blobs.rs](https://github.com/sigp/lighthouse/blob/44f8add41ea2252769bb967864af95b3c13af8ca/beacon_node/beacon_chain/src/fetch_blobs.rs). - pub fn put_computed_data_columns_recv( - &self, - block_root: Hash256, - block_epoch: Epoch, - data_column_recv: oneshot::Receiver>, - ) -> Result, AvailabilityCheckError> { - let mut write_lock = self.critical.write(); - - // Grab existing entry or create a new entry. - let mut pending_components = write_lock - .pop_entry(&block_root) - .map(|(_, v)| v) - .unwrap_or_else(|| { - PendingComponents::empty( - block_root, - self.spec.max_blobs_per_block(block_epoch) as usize, - ) - }); - - // We have all the blobs from engine, and have started computing data columns. We store the - // receiver in `PendingComponents` for later use when importing the block. - // TODO(das): Error or log if we overwrite a prior receiver https://github.com/sigp/lighthouse/issues/6764 - pending_components.data_column_recv = Some(data_column_recv); - - debug!( - component = "data_columns_recv", - ?block_root, - status = pending_components.status_str(block_epoch, &self.spec), - "Component added to data availability checker" - ); - - if let Some(available_block) = pending_components.make_available(&self.spec, |block| { - self.state_cache.recover_pending_executed_block(block) - })? { - // We keep the pending components in the availability cache during block import (#5845). - // `data_column_recv` is returned as part of the available block and is no longer needed here. write_lock.put(block_root, pending_components); drop(write_lock); Ok(Availability::Available(Box::new(available_block))) @@ -694,7 +623,6 @@ impl DataAvailabilityCheckerInner { self.state_cache.recover_pending_executed_block(block) })? { // We keep the pending components in the availability cache during block import (#5845). - // `data_column_recv` is returned as part of the available block and is no longer needed here. write_lock.put(block_root, pending_components); drop(write_lock); Ok(Availability::Available(Box::new(available_block))) diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 2f95d834b5..57efbb0a77 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -141,13 +141,23 @@ pub enum GossipDataColumnError { /// /// The column sidecar is invalid and the peer is faulty UnexpectedDataColumn, - /// The data column length must be equal to the number of commitments/proofs, otherwise the + /// The data column length must be equal to the number of commitments, otherwise the /// sidecar is invalid. /// /// ## Peer scoring /// /// The column sidecar is invalid and the peer is faulty - InconsistentCommitmentsOrProofLength, + InconsistentCommitmentsLength { + cells_len: usize, + commitments_len: usize, + }, + /// The data column length must be equal to the number of proofs, otherwise the + /// sidecar is invalid. + /// + /// ## Peer scoring + /// + /// The column sidecar is invalid and the peer is faulty + InconsistentProofsLength { cells_len: usize, proofs_len: usize }, } impl From for GossipDataColumnError { @@ -240,6 +250,14 @@ impl KzgVerifiedDataColumn { verify_kzg_for_data_column(data_column, kzg) } + /// Create a `KzgVerifiedDataColumn` from `data_column` that are already KZG verified. + /// + /// This should be used with caution, as used incorrectly it could result in KZG verification + /// being skipped and invalid data_columns being deemed valid. + pub fn from_verified(data_column: Arc>) -> Self { + Self { data: data_column } + } + pub fn from_batch( data_columns: Vec>>, kzg: &Kzg, @@ -473,10 +491,23 @@ fn verify_data_column_sidecar( if data_column.kzg_commitments.is_empty() { return Err(GossipDataColumnError::UnexpectedDataColumn); } - if data_column.column.len() != data_column.kzg_commitments.len() - || data_column.column.len() != data_column.kzg_proofs.len() - { - return Err(GossipDataColumnError::InconsistentCommitmentsOrProofLength); + + let cells_len = data_column.column.len(); + let commitments_len = data_column.kzg_commitments.len(); + let proofs_len = data_column.kzg_proofs.len(); + + if cells_len != commitments_len { + return Err(GossipDataColumnError::InconsistentCommitmentsLength { + cells_len, + commitments_len, + }); + } + + if cells_len != proofs_len { + return Err(GossipDataColumnError::InconsistentProofsLength { + cells_len, + proofs_len, + }); } Ok(()) diff --git a/beacon_node/beacon_chain/src/early_attester_cache.rs b/beacon_node/beacon_chain/src/early_attester_cache.rs index f4810e7b4a..5665ef3775 100644 --- a/beacon_node/beacon_chain/src/early_attester_cache.rs +++ b/beacon_node/beacon_chain/src/early_attester_cache.rs @@ -74,10 +74,6 @@ impl EarlyAttesterCache { AvailableBlockData::NoData => (None, None), AvailableBlockData::Blobs(blobs) => (Some(blobs.clone()), None), AvailableBlockData::DataColumns(data_columns) => (None, Some(data_columns.clone())), - // TODO(das): Once the columns are received, they will not be available in - // the early attester cache. If someone does a query to us via RPC we - // will get downscored. - AvailableBlockData::DataColumnsRecv(_) => (None, None), }; let item = CacheItem { diff --git a/beacon_node/beacon_chain/src/fetch_blobs.rs b/beacon_node/beacon_chain/src/fetch_blobs.rs index 3c28ac9a44..3b576da1c7 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs.rs @@ -7,34 +7,52 @@ //! on P2P gossip to the network. From PeerDAS onwards, together with the increase in blob count, //! broadcasting blobs requires a much higher bandwidth, and is only done by high capacity //! supernodes. + use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use crate::kzg_utils::blobs_to_data_column_sidecars; use crate::observed_data_sidecars::DoNotObserve; -use crate::{metrics, AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError}; -use execution_layer::json_structures::BlobAndProofV1; +use crate::{ + metrics, AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, + BlockError, +}; +use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2}; use execution_layer::Error as ExecutionLayerError; use metrics::{inc_counter, TryExt}; use ssz_types::FixedVector; use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash; +use std::collections::HashSet; use std::sync::Arc; -use tokio::sync::oneshot; -use tracing::{debug, error}; +use tracing::debug; use types::blob_sidecar::{BlobSidecarError, FixedBlobSidecarList}; +use types::data_column_sidecar::DataColumnSidecarError; use types::{ - BeaconStateError, BlobSidecar, ChainSpec, DataColumnSidecar, DataColumnSidecarList, EthSpec, - FullPayload, Hash256, SignedBeaconBlock, SignedBeaconBlockHeader, + BeaconStateError, Blob, BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecarList, EthSpec, + FullPayload, Hash256, KzgProofs, SignedBeaconBlock, SignedBeaconBlockHeader, VersionedHash, }; +/// Blobs or data column to be published to the gossip network. pub enum BlobsOrDataColumns { Blobs(Vec>), DataColumns(DataColumnSidecarList), } +/// Result from engine get blobs to be passed onto `DataAvailabilityChecker`. +/// +/// The blobs are retrieved from a trusted EL and columns are computed locally, therefore they are +/// considered valid without requiring extra validation. +pub enum EngineGetBlobsOutput { + Blobs(FixedBlobSidecarList), + /// A filtered list of custody data columns to be imported into the `DataAvailabilityChecker`. + CustodyColumns(DataColumnSidecarList), +} + #[derive(Debug)] pub enum FetchEngineBlobError { BeaconStateError(BeaconStateError), + BeaconChainError(BeaconChainError), BlobProcessingError(BlockError), BlobSidecarError(BlobSidecarError), + DataColumnSidecarError(DataColumnSidecarError), ExecutionLayerMissing, InternalError(String), GossipBlob(GossipBlobError), @@ -48,6 +66,7 @@ pub async fn fetch_and_process_engine_blobs( chain: Arc>, block_root: Hash256, block: Arc>>, + custody_columns: HashSet, publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, ) -> Result, FetchEngineBlobError> { let versioned_hashes = if let Some(kzg_commitments) = block @@ -66,8 +85,34 @@ pub async fn fetch_and_process_engine_blobs( return Ok(None); }; - let num_expected_blobs = versioned_hashes.len(); + debug!( + num_expected_blobs = versioned_hashes.len(), + "Fetching blobs from the EL" + ); + if chain.spec.is_peer_das_enabled_for_epoch(block.epoch()) { + fetch_and_process_blobs_v2( + chain, + block_root, + block, + versioned_hashes, + custody_columns, + publish_fn, + ) + .await + } else { + fetch_and_process_blobs_v1(chain, block_root, block, versioned_hashes, publish_fn).await + } +} + +async fn fetch_and_process_blobs_v1( + chain: Arc>, + block_root: Hash256, + block: Arc>, + versioned_hashes: Vec, + publish_fn: impl Fn(BlobsOrDataColumns) + Send + Sized, +) -> Result, FetchEngineBlobError> { + let num_expected_blobs = versioned_hashes.len(); let execution_layer = chain .execution_layer .as_ref() @@ -76,7 +121,7 @@ pub async fn fetch_and_process_engine_blobs( metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64); debug!(num_expected_blobs, "Fetching blobs from the EL"); let response = execution_layer - .get_blobs(versioned_hashes) + .get_blobs_v1(versioned_hashes) .await .inspect_err(|_| { inc_counter(&metrics::BLOBS_FROM_EL_ERROR_TOTAL); @@ -125,59 +170,9 @@ pub async fn fetch_and_process_engine_blobs( .collect::, _>>() .map_err(FetchEngineBlobError::GossipBlob)?; - let peer_das_enabled = chain.spec.is_peer_das_enabled_for_epoch(block.epoch()); - - let data_columns_receiver_opt = if peer_das_enabled { - // Partial blobs response isn't useful for PeerDAS, so we don't bother building and publishing data columns. - if num_fetched_blobs != num_expected_blobs { - debug!( - info = "Unable to compute data columns", - num_fetched_blobs, num_expected_blobs, "Not all blobs fetched from the EL" - ); - return Ok(None); - } - - if chain - .canonical_head - .fork_choice_read_lock() - .contains_block(&block_root) - { - // Avoid computing columns if block has already been imported. - debug!( - info = "block has already been imported", - "Ignoring EL blobs response" - ); - return Ok(None); - } - - if chain - .canonical_head - .fork_choice_read_lock() - .contains_block(&block_root) - { - // Avoid computing columns if block has already been imported. - debug!( - info = "block has already been imported", - "Ignoring EL blobs response" - ); - return Ok(None); - } - - let data_columns_receiver = spawn_compute_and_publish_data_columns_task( - &chain, - block.clone(), - fixed_blob_sidecar_list.clone(), - publish_fn, - ); - - Some(data_columns_receiver) - } else { - if !blobs_to_import_and_publish.is_empty() { - publish_fn(BlobsOrDataColumns::Blobs(blobs_to_import_and_publish)); - } - - None - }; + if !blobs_to_import_and_publish.is_empty() { + publish_fn(BlobsOrDataColumns::Blobs(blobs_to_import_and_publish)); + } debug!(num_fetched_blobs, "Processing engine blobs"); @@ -185,8 +180,7 @@ pub async fn fetch_and_process_engine_blobs( .process_engine_blobs( block.slot(), block_root, - fixed_blob_sidecar_list.clone(), - data_columns_receiver_opt, + EngineGetBlobsOutput::Blobs(fixed_blob_sidecar_list.clone()), ) .await .map_err(FetchEngineBlobError::BlobProcessingError)?; @@ -194,67 +188,140 @@ pub async fn fetch_and_process_engine_blobs( Ok(Some(availability_processing_status)) } -/// Spawn a blocking task here for long computation tasks, so it doesn't block processing, and it -/// allows blobs / data columns to propagate without waiting for processing. -/// -/// An `mpsc::Sender` is then used to send the produced data columns to the `beacon_chain` for it -/// to be persisted, **after** the block is made attestable. -/// -/// The reason for doing this is to make the block available and attestable as soon as possible, -/// while maintaining the invariant that block and data columns are persisted atomically. -fn spawn_compute_and_publish_data_columns_task( +async fn fetch_and_process_blobs_v2( + chain: Arc>, + block_root: Hash256, + block: Arc>, + versioned_hashes: Vec, + custody_columns_indices: HashSet, + publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, +) -> Result, FetchEngineBlobError> { + let num_expected_blobs = versioned_hashes.len(); + let execution_layer = chain + .execution_layer + .as_ref() + .ok_or(FetchEngineBlobError::ExecutionLayerMissing)?; + + metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64); + debug!(num_expected_blobs, "Fetching blobs from the EL"); + let response = execution_layer + .get_blobs_v2(versioned_hashes) + .await + .inspect_err(|_| { + inc_counter(&metrics::BLOBS_FROM_EL_ERROR_TOTAL); + }) + .map_err(FetchEngineBlobError::RequestFailed)?; + + let (blobs, proofs): (Vec<_>, Vec<_>) = response + .into_iter() + .filter_map(|blob_and_proof_opt| { + blob_and_proof_opt.map(|blob_and_proof| { + let BlobAndProofV2 { blob, proofs } = blob_and_proof; + (blob, proofs) + }) + }) + .unzip(); + + let num_fetched_blobs = blobs.len(); + metrics::observe(&metrics::BLOBS_FROM_EL_RECEIVED, num_fetched_blobs as f64); + + // Partial blobs response isn't useful for PeerDAS, so we don't bother building and publishing data columns. + if num_fetched_blobs != num_expected_blobs { + debug!( + info = "Unable to compute data columns", + num_fetched_blobs, num_expected_blobs, "Not all blobs fetched from the EL" + ); + inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL); + return Ok(None); + } else { + inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL); + } + + if chain + .canonical_head + .fork_choice_read_lock() + .contains_block(&block_root) + { + // Avoid computing columns if block has already been imported. + debug!( + info = "block has already been imported", + "Ignoring EL blobs response" + ); + return Ok(None); + } + + let custody_columns = compute_and_publish_data_columns( + &chain, + block.clone(), + blobs, + proofs, + custody_columns_indices, + publish_fn, + ) + .await?; + + debug!(num_fetched_blobs, "Processing engine blobs"); + + let availability_processing_status = chain + .process_engine_blobs( + block.slot(), + block_root, + EngineGetBlobsOutput::CustodyColumns(custody_columns), + ) + .await + .map_err(FetchEngineBlobError::BlobProcessingError)?; + + Ok(Some(availability_processing_status)) +} + +/// Offload the data column computation to a blocking task to avoid holding up the async runtime. +async fn compute_and_publish_data_columns( chain: &Arc>, block: Arc>>, - blobs: FixedBlobSidecarList, + blobs: Vec>, + proofs: Vec>, + custody_columns_indices: HashSet, publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, -) -> oneshot::Receiver>>> { +) -> Result, FetchEngineBlobError> { let chain_cloned = chain.clone(); - let (data_columns_sender, data_columns_receiver) = oneshot::channel(); + chain + .spawn_blocking_handle( + move || { + let mut timer = metrics::start_timer_vec( + &metrics::DATA_COLUMN_SIDECAR_COMPUTATION, + &[&blobs.len().to_string()], + ); - chain.task_executor.spawn_blocking( - move || { - let mut timer = metrics::start_timer_vec( - &metrics::DATA_COLUMN_SIDECAR_COMPUTATION, - &[&blobs.len().to_string()], - ); - let blob_refs = blobs - .iter() - .filter_map(|b| b.as_ref().map(|b| &b.blob)) - .collect::>(); - let data_columns_result = blobs_to_data_column_sidecars( - &blob_refs, - &block, - &chain_cloned.kzg, - &chain_cloned.spec, - ) - .discard_timer_on_break(&mut timer); - drop(timer); + let blob_refs = blobs.iter().collect::>(); + let cell_proofs = proofs.into_iter().flatten().collect(); + let data_columns_result = blobs_to_data_column_sidecars( + &blob_refs, + cell_proofs, + &block, + &chain_cloned.kzg, + &chain_cloned.spec, + ) + .discard_timer_on_break(&mut timer); + drop(timer); - let all_data_columns = match data_columns_result { - Ok(d) => d, - Err(e) => { - error!( - error = ?e, - "Failed to build data column sidecars from blobs" - ); - return; - } - }; + // This filtering ensures we only import and publish the custody columns. + // `DataAvailabilityChecker` requires a strict match on custody columns count to + // consider a block available. + let custody_columns = data_columns_result + .map(|mut data_columns| { + data_columns.retain(|col| custody_columns_indices.contains(&col.index)); + data_columns + }) + .map_err(FetchEngineBlobError::DataColumnSidecarError)?; - if data_columns_sender.send(all_data_columns.clone()).is_err() { - // Data column receiver have been dropped - block may have already been imported. - // This race condition exists because gossip columns may arrive and trigger block - // import during the computation. Here we just drop the computed columns. - debug!("Failed to send computed data columns"); - return; - }; - - publish_fn(BlobsOrDataColumns::DataColumns(all_data_columns)); - }, - "compute_and_publish_data_columns", - ); - - data_columns_receiver + publish_fn(BlobsOrDataColumns::DataColumns(custody_columns.clone())); + Ok(custody_columns) + }, + "compute_and_publish_data_columns", + ) + .await + .map_err(FetchEngineBlobError::BeaconChainError) + .and_then(|r| r) } fn build_blob_sidecars( diff --git a/beacon_node/beacon_chain/src/fulu_readiness.rs b/beacon_node/beacon_chain/src/fulu_readiness.rs index 872fe58f2b..1107acad74 100644 --- a/beacon_node/beacon_chain/src/fulu_readiness.rs +++ b/beacon_node/beacon_chain/src/fulu_readiness.rs @@ -1,7 +1,7 @@ //! Provides tools for checking if a node is ready for the Fulu upgrade. use crate::{BeaconChain, BeaconChainTypes}; -use execution_layer::http::{ENGINE_GET_PAYLOAD_V4, ENGINE_NEW_PAYLOAD_V4}; +use execution_layer::http::{ENGINE_GET_PAYLOAD_V5, ENGINE_NEW_PAYLOAD_V4}; use serde::{Deserialize, Serialize}; use std::fmt; use std::time::Duration; @@ -87,12 +87,12 @@ impl BeaconChain { Ok(capabilities) => { let mut missing_methods = String::from("Required Methods Unsupported:"); let mut all_good = true; - // TODO(fulu) switch to v5 when the EL is ready - if !capabilities.get_payload_v4 { + if !capabilities.get_payload_v5 { missing_methods.push(' '); - missing_methods.push_str(ENGINE_GET_PAYLOAD_V4); + missing_methods.push_str(ENGINE_GET_PAYLOAD_V5); all_good = false; } + // TODO(fulu) switch to v5 when the EL is ready if !capabilities.new_payload_v4 { missing_methods.push(' '); missing_methods.push_str(ENGINE_NEW_PAYLOAD_V4); diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index ee51964910..348e6d52a6 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -132,7 +132,7 @@ impl BeaconChain { AvailableBlockData::Blobs(..) => { new_oldest_blob_slot = Some(block.slot()); } - AvailableBlockData::DataColumns(_) | AvailableBlockData::DataColumnsRecv(_) => { + AvailableBlockData::DataColumns(_) => { new_oldest_data_column_slot = Some(block.slot()); } } diff --git a/beacon_node/beacon_chain/src/kzg_utils.rs b/beacon_node/beacon_chain/src/kzg_utils.rs index 06cce14144..eaaa23130d 100644 --- a/beacon_node/beacon_chain/src/kzg_utils.rs +++ b/beacon_node/beacon_chain/src/kzg_utils.rs @@ -1,14 +1,15 @@ use kzg::{ - Blob as KzgBlob, Bytes48, CellRef as KzgCellRef, CellsAndKzgProofs, Error as KzgError, Kzg, + Blob as KzgBlob, Bytes48, Cell as KzgCell, CellRef as KzgCellRef, CellsAndKzgProofs, + Error as KzgError, Kzg, CELLS_PER_EXT_BLOB, }; use rayon::prelude::*; -use ssz_types::FixedVector; +use ssz_types::{FixedVector, VariableList}; use std::sync::Arc; use types::beacon_block_body::KzgCommitments; use types::data_column_sidecar::{Cell, DataColumn, DataColumnSidecarError}; use types::{ Blob, BlobSidecar, BlobSidecarList, ChainSpec, ColumnIndex, DataColumnSidecar, - DataColumnSidecarList, EthSpec, Hash256, KzgCommitment, KzgProof, KzgProofs, SignedBeaconBlock, + DataColumnSidecarList, EthSpec, Hash256, KzgCommitment, KzgProof, SignedBeaconBlock, SignedBeaconBlockHeader, SignedBlindedBeaconBlock, }; @@ -43,6 +44,33 @@ pub fn validate_blob( kzg.verify_blob_kzg_proof(&kzg_blob, kzg_commitment, kzg_proof) } +/// Validates a list of blobs along with their corresponding KZG commitments and +/// cell proofs for the extended blobs. +pub fn validate_blobs_and_cell_proofs( + kzg: &Kzg, + blobs: Vec<&Blob>, + cell_proofs: &[KzgProof], + kzg_commitments: &KzgCommitments, +) -> Result<(), KzgError> { + let cells = compute_cells::(&blobs, kzg)?; + let cell_refs = cells.iter().map(|cell| cell.as_ref()).collect::>(); + let cell_indices = (0..blobs.len()) + .flat_map(|_| 0..CELLS_PER_EXT_BLOB as u64) + .collect::>(); + + let proofs = cell_proofs + .iter() + .map(|&proof| Bytes48::from(proof)) + .collect::>(); + + let commitments = kzg_commitments + .iter() + .flat_map(|&commitment| std::iter::repeat_n(Bytes48::from(commitment), CELLS_PER_EXT_BLOB)) + .collect::>(); + + kzg.verify_cell_proof_batch(&cell_refs, &proofs, cell_indices, &commitments) +} + /// Validate a batch of `DataColumnSidecar`. pub fn validate_data_columns<'a, E: EthSpec, I>( kzg: &Kzg, @@ -148,6 +176,7 @@ pub fn verify_kzg_proof( /// Build data column sidecars from a signed beacon block and its blobs. pub fn blobs_to_data_column_sidecars( blobs: &[&Blob], + cell_proofs: Vec, block: &SignedBeaconBlock, kzg: &Kzg, spec: &ChainSpec, @@ -164,15 +193,28 @@ pub fn blobs_to_data_column_sidecars( let kzg_commitments_inclusion_proof = block.message().body().kzg_commitments_merkle_proof()?; let signed_block_header = block.signed_block_header(); + let proof_chunks = cell_proofs + .chunks_exact(spec.number_of_columns as usize) + .collect::>(); + // NOTE: assumes blob sidecars are ordered by index let blob_cells_and_proofs_vec = blobs .into_par_iter() - .map(|blob| { + .zip(proof_chunks.into_par_iter()) + .map(|(blob, proofs)| { let blob = blob .as_ref() .try_into() .expect("blob should have a guaranteed size due to FixedVector"); - kzg.compute_cells_and_proofs(blob) + + kzg.compute_cells(blob).map(|cells| { + ( + cells, + proofs + .try_into() + .expect("proof chunks should have exactly `number_of_columns` proofs"), + ) + }) }) .collect::, KzgError>>()?; @@ -186,6 +228,23 @@ pub fn blobs_to_data_column_sidecars( .map_err(DataColumnSidecarError::BuildSidecarFailed) } +pub fn compute_cells(blobs: &[&Blob], kzg: &Kzg) -> Result, KzgError> { + let cells_vec = blobs + .into_par_iter() + .map(|blob| { + let blob = blob + .as_ref() + .try_into() + .expect("blob should have a guaranteed size due to FixedVector"); + + kzg.compute_cells(blob) + }) + .collect::, KzgError>>()?; + + let cells_flattened: Vec = cells_vec.into_iter().flatten().collect(); + Ok(cells_flattened) +} + pub(crate) fn build_data_column_sidecars( kzg_commitments: KzgCommitments, kzg_commitments_inclusion_proof: FixedVector, @@ -236,7 +295,7 @@ pub(crate) fn build_data_column_sidecars( index: index as u64, column: DataColumn::::from(col), kzg_commitments: kzg_commitments.clone(), - kzg_proofs: KzgProofs::::from(proofs), + kzg_proofs: VariableList::from(proofs), signed_block_header: signed_block_header.clone(), kzg_commitments_inclusion_proof: kzg_commitments_inclusion_proof.clone(), }) @@ -300,12 +359,7 @@ pub fn reconstruct_blobs( .collect(); let blob = Blob::::new(blob_bytes).map_err(|e| format!("{e:?}"))?; - let kzg_commitment = first_data_column - .kzg_commitments - .get(row_index) - .ok_or(format!("Missing KZG commitment for blob {row_index}"))?; - let kzg_proof = compute_blob_kzg_proof::(kzg, &blob, *kzg_commitment) - .map_err(|e| format!("{e:?}"))?; + let kzg_proof = KzgProof::empty(); BlobSidecar::::new_with_existing_proof( row_index, @@ -373,14 +427,15 @@ pub fn reconstruct_data_columns( mod test { use crate::kzg_utils::{ blobs_to_data_column_sidecars, reconstruct_blobs, reconstruct_data_columns, + validate_blobs_and_cell_proofs, }; use bls::Signature; use eth2::types::BlobsBundle; use execution_layer::test_utils::generate_blobs; use kzg::{trusted_setup::get_trusted_setup, Kzg, KzgCommitment, TrustedSetup}; use types::{ - beacon_block_body::KzgCommitments, BeaconBlock, BeaconBlockDeneb, BlobsList, ChainSpec, - EmptyBlock, EthSpec, MainnetEthSpec, SignedBeaconBlock, + beacon_block_body::KzgCommitments, BeaconBlock, BeaconBlockFulu, BlobsList, ChainSpec, + EmptyBlock, EthSpec, ForkName, FullPayload, KzgProofs, MainnetEthSpec, SignedBeaconBlock, }; type E = MainnetEthSpec; @@ -389,32 +444,52 @@ mod test { // only load it once. #[test] fn test_build_data_columns_sidecars() { - let spec = E::default_spec(); + let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); let kzg = get_kzg(); test_build_data_columns_empty(&kzg, &spec); test_build_data_columns(&kzg, &spec); test_reconstruct_data_columns(&kzg, &spec); test_reconstruct_blobs_from_data_columns(&kzg, &spec); + test_verify_blob_and_cell_proofs(&kzg); + } + + #[track_caller] + fn test_verify_blob_and_cell_proofs(kzg: &Kzg) { + let (blobs_bundle, _) = generate_blobs::(3, ForkName::Fulu).unwrap(); + let BlobsBundle { + blobs, + commitments, + proofs, + } = blobs_bundle; + + let result = + validate_blobs_and_cell_proofs::(kzg, blobs.iter().collect(), &proofs, &commitments); + + assert!(result.is_ok()); } #[track_caller] fn test_build_data_columns_empty(kzg: &Kzg, spec: &ChainSpec) { let num_of_blobs = 0; - let (signed_block, blobs) = create_test_block_and_blobs::(num_of_blobs, spec); + let (signed_block, blobs, proofs) = + create_test_fulu_block_and_blobs::(num_of_blobs, spec); let blob_refs = blobs.iter().collect::>(); let column_sidecars = - blobs_to_data_column_sidecars(&blob_refs, &signed_block, kzg, spec).unwrap(); + blobs_to_data_column_sidecars(&blob_refs, proofs.to_vec(), &signed_block, kzg, spec) + .unwrap(); assert!(column_sidecars.is_empty()); } #[track_caller] fn test_build_data_columns(kzg: &Kzg, spec: &ChainSpec) { let num_of_blobs = 6; - let (signed_block, blobs) = create_test_block_and_blobs::(num_of_blobs, spec); + let (signed_block, blobs, proofs) = + create_test_fulu_block_and_blobs::(num_of_blobs, spec); let blob_refs = blobs.iter().collect::>(); let column_sidecars = - blobs_to_data_column_sidecars(&blob_refs, &signed_block, kzg, spec).unwrap(); + blobs_to_data_column_sidecars(&blob_refs, proofs.to_vec(), &signed_block, kzg, spec) + .unwrap(); let block_kzg_commitments = signed_block .message() @@ -448,10 +523,12 @@ mod test { #[track_caller] fn test_reconstruct_data_columns(kzg: &Kzg, spec: &ChainSpec) { let num_of_blobs = 6; - let (signed_block, blobs) = create_test_block_and_blobs::(num_of_blobs, spec); + let (signed_block, blobs, proofs) = + create_test_fulu_block_and_blobs::(num_of_blobs, spec); let blob_refs = blobs.iter().collect::>(); let column_sidecars = - blobs_to_data_column_sidecars(&blob_refs, &signed_block, kzg, spec).unwrap(); + blobs_to_data_column_sidecars(&blob_refs, proofs.to_vec(), &signed_block, kzg, spec) + .unwrap(); // Now reconstruct let reconstructed_columns = reconstruct_data_columns( @@ -469,10 +546,12 @@ mod test { #[track_caller] fn test_reconstruct_blobs_from_data_columns(kzg: &Kzg, spec: &ChainSpec) { let num_of_blobs = 6; - let (signed_block, blobs) = create_test_block_and_blobs::(num_of_blobs, spec); + let (signed_block, blobs, proofs) = + create_test_fulu_block_and_blobs::(num_of_blobs, spec); let blob_refs = blobs.iter().collect::>(); let column_sidecars = - blobs_to_data_column_sidecars(&blob_refs, &signed_block, kzg, spec).unwrap(); + blobs_to_data_column_sidecars(&blob_refs, proofs.to_vec(), &signed_block, kzg, spec) + .unwrap(); // Now reconstruct let signed_blinded_block = signed_block.into(); @@ -504,11 +583,15 @@ mod test { Kzg::new_from_trusted_setup_das_enabled(trusted_setup).expect("should create kzg") } - fn create_test_block_and_blobs( + fn create_test_fulu_block_and_blobs( num_of_blobs: usize, spec: &ChainSpec, - ) -> (SignedBeaconBlock, BlobsList) { - let mut block = BeaconBlock::Deneb(BeaconBlockDeneb::empty(spec)); + ) -> ( + SignedBeaconBlock>, + BlobsList, + KzgProofs, + ) { + let mut block = BeaconBlock::Fulu(BeaconBlockFulu::empty(spec)); let mut body = block.body_mut(); let blob_kzg_commitments = body.blob_kzg_commitments_mut().unwrap(); *blob_kzg_commitments = @@ -516,12 +599,12 @@ mod test { .unwrap(); let mut signed_block = SignedBeaconBlock::from_block(block, Signature::empty()); - - let (blobs_bundle, _) = generate_blobs::(num_of_blobs).unwrap(); + let fork = signed_block.fork_name_unchecked(); + let (blobs_bundle, _) = generate_blobs::(num_of_blobs, fork).unwrap(); let BlobsBundle { blobs, commitments, - proofs: _, + proofs, } = blobs_bundle; *signed_block @@ -530,6 +613,6 @@ mod test { .blob_kzg_commitments_mut() .unwrap() = commitments; - (signed_block, blobs) + (signed_block, blobs, proofs) } } diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 871721b4d8..57012161ec 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1663,7 +1663,7 @@ pub static BLOBS_FROM_EL_HIT_TOTAL: LazyLock> = LazyLock::new pub static BLOBS_FROM_EL_MISS_TOTAL: LazyLock> = LazyLock::new(|| { try_create_int_counter( "beacon_blobs_from_el_miss_total", - "Number of empty blob responses from the execution layer", + "Number of empty or incomplete blob responses from the execution layer", ) }); diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index fe78d83c03..bcab512a4b 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -3194,7 +3194,7 @@ pub fn generate_rand_block_and_blobs( NumBlobs::None => 0, }; let (bundle, transactions) = - execution_layer::test_utils::generate_blobs::(num_blobs).unwrap(); + execution_layer::test_utils::generate_blobs::(num_blobs, fork_name).unwrap(); payload.execution_payload.transactions = <_>::default(); for tx in Vec::from(transactions) { @@ -3214,7 +3214,7 @@ pub fn generate_rand_block_and_blobs( NumBlobs::None => 0, }; let (bundle, transactions) = - execution_layer::test_utils::generate_blobs::(num_blobs).unwrap(); + execution_layer::test_utils::generate_blobs::(num_blobs, fork_name).unwrap(); payload.execution_payload.transactions = <_>::default(); for tx in Vec::from(transactions) { payload.execution_payload.transactions.push(tx).unwrap(); @@ -3233,7 +3233,7 @@ pub fn generate_rand_block_and_blobs( NumBlobs::None => 0, }; let (bundle, transactions) = - execution_layer::test_utils::generate_blobs::(num_blobs).unwrap(); + execution_layer::test_utils::generate_blobs::(num_blobs, fork_name).unwrap(); payload.execution_payload.transactions = <_>::default(); for tx in Vec::from(transactions) { payload.execution_payload.transactions.push(tx).unwrap(); diff --git a/beacon_node/execution_layer/src/engine_api.rs b/beacon_node/execution_layer/src/engine_api.rs index aed6cdba67..4bfee223ff 100644 --- a/beacon_node/execution_layer/src/engine_api.rs +++ b/beacon_node/execution_layer/src/engine_api.rs @@ -1,10 +1,11 @@ use crate::engines::ForkchoiceState; use crate::http::{ ENGINE_FORKCHOICE_UPDATED_V1, ENGINE_FORKCHOICE_UPDATED_V2, ENGINE_FORKCHOICE_UPDATED_V3, - ENGINE_GET_BLOBS_V1, ENGINE_GET_CLIENT_VERSION_V1, ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1, - ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1, ENGINE_GET_PAYLOAD_V1, ENGINE_GET_PAYLOAD_V2, - ENGINE_GET_PAYLOAD_V3, ENGINE_GET_PAYLOAD_V4, ENGINE_GET_PAYLOAD_V5, ENGINE_NEW_PAYLOAD_V1, - ENGINE_NEW_PAYLOAD_V2, ENGINE_NEW_PAYLOAD_V3, ENGINE_NEW_PAYLOAD_V4, ENGINE_NEW_PAYLOAD_V5, + ENGINE_GET_BLOBS_V1, ENGINE_GET_BLOBS_V2, ENGINE_GET_CLIENT_VERSION_V1, + ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1, ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1, + ENGINE_GET_PAYLOAD_V1, ENGINE_GET_PAYLOAD_V2, ENGINE_GET_PAYLOAD_V3, ENGINE_GET_PAYLOAD_V4, + ENGINE_GET_PAYLOAD_V5, ENGINE_NEW_PAYLOAD_V1, ENGINE_NEW_PAYLOAD_V2, ENGINE_NEW_PAYLOAD_V3, + ENGINE_NEW_PAYLOAD_V4, ENGINE_NEW_PAYLOAD_V5, }; use eth2::types::{ BlobsBundle, SsePayloadAttributes, SsePayloadAttributesV1, SsePayloadAttributesV2, @@ -553,6 +554,7 @@ pub struct EngineCapabilities { pub get_payload_v5: bool, pub get_client_version_v1: bool, pub get_blobs_v1: bool, + pub get_blobs_v2: bool, } impl EngineCapabilities { @@ -609,6 +611,9 @@ impl EngineCapabilities { if self.get_blobs_v1 { response.push(ENGINE_GET_BLOBS_V1); } + if self.get_blobs_v2 { + response.push(ENGINE_GET_BLOBS_V2); + } response } diff --git a/beacon_node/execution_layer/src/engine_api/http.rs b/beacon_node/execution_layer/src/engine_api/http.rs index 747383754a..bf4c391a8d 100644 --- a/beacon_node/execution_layer/src/engine_api/http.rs +++ b/beacon_node/execution_layer/src/engine_api/http.rs @@ -61,6 +61,7 @@ pub const ENGINE_GET_CLIENT_VERSION_V1: &str = "engine_getClientVersionV1"; pub const ENGINE_GET_CLIENT_VERSION_TIMEOUT: Duration = Duration::from_secs(1); pub const ENGINE_GET_BLOBS_V1: &str = "engine_getBlobsV1"; +pub const ENGINE_GET_BLOBS_V2: &str = "engine_getBlobsV2"; pub const ENGINE_GET_BLOBS_TIMEOUT: Duration = Duration::from_secs(1); /// This error is returned during a `chainId` call by Geth. @@ -87,6 +88,7 @@ pub static LIGHTHOUSE_CAPABILITIES: &[&str] = &[ ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1, ENGINE_GET_CLIENT_VERSION_V1, ENGINE_GET_BLOBS_V1, + ENGINE_GET_BLOBS_V2, ]; /// We opt to initialize the JsonClientVersionV1 rather than the ClientVersionV1 @@ -708,7 +710,7 @@ impl HttpJsonRpc { } } - pub async fn get_blobs( + pub async fn get_blobs_v1( &self, versioned_hashes: Vec, ) -> Result>>, Error> { @@ -722,6 +724,20 @@ impl HttpJsonRpc { .await } + pub async fn get_blobs_v2( + &self, + versioned_hashes: Vec, + ) -> Result>>, Error> { + let params = json!([versioned_hashes]); + + self.rpc_request( + ENGINE_GET_BLOBS_V2, + params, + ENGINE_GET_BLOBS_TIMEOUT * self.execution_timeout_multiplier, + ) + .await + } + pub async fn get_block_by_number( &self, query: BlockByNumberQuery<'_>, @@ -963,19 +979,6 @@ impl HttpJsonRpc { .try_into() .map_err(Error::BadResponse) } - // TODO(fulu): remove when v5 method is ready. - ForkName::Fulu => { - let response: JsonGetPayloadResponseV5 = self - .rpc_request( - ENGINE_GET_PAYLOAD_V4, - params, - ENGINE_GET_PAYLOAD_TIMEOUT * self.execution_timeout_multiplier, - ) - .await?; - JsonGetPayloadResponse::V5(response) - .try_into() - .map_err(Error::BadResponse) - } _ => Err(Error::UnsupportedForkVariant(format!( "called get_payload_v4 with {}", fork_name @@ -1148,6 +1151,7 @@ impl HttpJsonRpc { get_payload_v5: capabilities.contains(ENGINE_GET_PAYLOAD_V5), get_client_version_v1: capabilities.contains(ENGINE_GET_CLIENT_VERSION_V1), get_blobs_v1: capabilities.contains(ENGINE_GET_BLOBS_V1), + get_blobs_v2: capabilities.contains(ENGINE_GET_BLOBS_V2), }) } @@ -1320,9 +1324,8 @@ impl HttpJsonRpc { } } ForkName::Fulu => { - // TODO(fulu): switch to v5 when the EL is ready - if engine_capabilities.get_payload_v4 { - self.get_payload_v4(fork_name, payload_id).await + if engine_capabilities.get_payload_v5 { + self.get_payload_v5(fork_name, payload_id).await } else { Err(Error::RequiredMethodUnsupported("engine_getPayloadv5")) } diff --git a/beacon_node/execution_layer/src/engine_api/json_structures.rs b/beacon_node/execution_layer/src/engine_api/json_structures.rs index 96615297d8..30d30481ea 100644 --- a/beacon_node/execution_layer/src/engine_api/json_structures.rs +++ b/beacon_node/execution_layer/src/engine_api/json_structures.rs @@ -717,12 +717,23 @@ impl From> for BlobsBundle { } } +#[superstruct( + variants(V1, V2), + variant_attributes( + derive(Debug, Clone, PartialEq, Serialize, Deserialize), + serde(bound = "E: EthSpec", rename_all = "camelCase") + ) +)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(bound = "E: EthSpec", rename_all = "camelCase")] -pub struct BlobAndProofV1 { +pub struct BlobAndProof { #[serde(with = "ssz_types::serde_utils::hex_fixed_vec")] pub blob: Blob, + /// KZG proof for the blob (Deneb) + #[superstruct(only(V1))] pub proof: KzgProof, + /// KZG cell proofs for the extended blob (PeerDAS) + #[superstruct(only(V2))] + pub proofs: KzgProofs, } #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 944a8e083b..ee326f22cd 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -4,7 +4,7 @@ //! This crate only provides useful functionality for "The Merge", it does not provide any of the //! deposit-contract functionality that the `beacon_node/eth1` crate already provides. -use crate::json_structures::BlobAndProofV1; +use crate::json_structures::{BlobAndProofV1, BlobAndProofV2}; use crate::payload_cache::PayloadCache; use arc_swap::ArcSwapOption; use auth::{strip_prefix, Auth, JwtKey}; @@ -16,8 +16,8 @@ pub use engine_api::*; pub use engine_api::{http, http::deposit_methods, http::HttpJsonRpc}; use engines::{Engine, EngineError}; pub use engines::{EngineState, ForkchoiceState}; -use eth2::types::FullPayloadContents; -use eth2::types::{builder_bid::SignedBuilderBid, BlobsBundle, ForkVersionedResponse}; +use eth2::types::{builder_bid::SignedBuilderBid, ForkVersionedResponse}; +use eth2::types::{BlobsBundle, FullPayloadContents}; use ethers_core::types::Transaction as EthersTransaction; use fixed_bytes::UintExtended; use fork_choice::ForkchoiceUpdateParameters; @@ -596,13 +596,7 @@ impl ExecutionLayer { let (payload_ref, maybe_json_blobs_bundle) = payload_and_blobs; let payload = payload_ref.clone_from_ref(); - let maybe_blobs_bundle = maybe_json_blobs_bundle - .cloned() - .map(|blobs_bundle| BlobsBundle { - commitments: blobs_bundle.commitments, - proofs: blobs_bundle.proofs, - blobs: blobs_bundle.blobs, - }); + let maybe_blobs_bundle = maybe_json_blobs_bundle.cloned(); self.inner .payload_cache @@ -1846,7 +1840,7 @@ impl ExecutionLayer { } } - pub async fn get_blobs( + pub async fn get_blobs_v1( &self, query: Vec, ) -> Result>>, Error> { @@ -1854,7 +1848,24 @@ impl ExecutionLayer { if capabilities.get_blobs_v1 { self.engine() - .request(|engine| async move { engine.api.get_blobs(query).await }) + .request(|engine| async move { engine.api.get_blobs_v1(query).await }) + .await + .map_err(Box::new) + .map_err(Error::EngineError) + } else { + Err(Error::GetBlobsNotSupported) + } + } + + pub async fn get_blobs_v2( + &self, + query: Vec, + ) -> Result>>, Error> { + let capabilities = self.get_engine_capabilities(None).await?; + + if capabilities.get_blobs_v2 { + self.engine() + .request(|engine| async move { engine.api.get_blobs_v2(query).await }) .await .map_err(Box::new) .map_err(Error::EngineError) diff --git a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs index 81fb9bd7b8..b057abe887 100644 --- a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs +++ b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs @@ -20,13 +20,14 @@ use tree_hash_derive::TreeHash; use types::{ Blob, ChainSpec, EthSpec, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadBellatrix, ExecutionPayloadCapella, ExecutionPayloadDeneb, ExecutionPayloadElectra, ExecutionPayloadFulu, - ExecutionPayloadHeader, FixedBytesExtended, ForkName, Hash256, Transaction, Transactions, - Uint256, + ExecutionPayloadHeader, FixedBytesExtended, ForkName, Hash256, KzgProofs, Transaction, + Transactions, Uint256, }; use super::DEFAULT_TERMINAL_BLOCK; const TEST_BLOB_BUNDLE: &[u8] = include_bytes!("fixtures/mainnet/test_blobs_bundle.ssz"); +const TEST_BLOB_BUNDLE_V2: &[u8] = include_bytes!("fixtures/mainnet/test_blobs_bundle_v2.ssz"); pub const DEFAULT_GAS_LIMIT: u64 = 30_000_000; const GAS_USED: u64 = DEFAULT_GAS_LIMIT - 1; @@ -697,15 +698,13 @@ impl ExecutionBlockGenerator { }, }; - if execution_payload.fork_name().deneb_enabled() { + let fork_name = execution_payload.fork_name(); + if fork_name.deneb_enabled() { // get random number between 0 and Max Blobs let mut rng = self.rng.lock(); - let max_blobs = self - .spec - .max_blobs_per_block_by_fork(execution_payload.fork_name()) - as usize; + let max_blobs = self.spec.max_blobs_per_block_by_fork(fork_name) as usize; let num_blobs = rng.gen::() % (max_blobs + 1); - let (bundle, transactions) = generate_blobs(num_blobs)?; + let (bundle, transactions) = generate_blobs(num_blobs, fork_name)?; for tx in Vec::from(transactions) { execution_payload .transactions_mut() @@ -721,7 +720,8 @@ impl ExecutionBlockGenerator { } } -pub fn load_test_blobs_bundle() -> Result<(KzgCommitment, KzgProof, Blob), String> { +pub fn load_test_blobs_bundle_v1() -> Result<(KzgCommitment, KzgProof, Blob), String> +{ let BlobsBundle:: { commitments, proofs, @@ -745,32 +745,56 @@ pub fn load_test_blobs_bundle() -> Result<(KzgCommitment, KzgProof, )) } +pub fn load_test_blobs_bundle_v2( +) -> Result<(KzgCommitment, KzgProofs, Blob), String> { + let BlobsBundle:: { + commitments, + proofs, + blobs, + } = BlobsBundle::from_ssz_bytes(TEST_BLOB_BUNDLE_V2) + .map_err(|e| format!("Unable to decode ssz: {:?}", e))?; + + Ok(( + commitments + .first() + .cloned() + .ok_or("commitment missing in test bundle")?, + // there's only one blob in the test bundle, hence we take all the cell proofs here. + proofs, + blobs + .first() + .cloned() + .ok_or("blob missing in test bundle")?, + )) +} + pub fn generate_blobs( n_blobs: usize, + fork_name: ForkName, ) -> Result<(BlobsBundle, Transactions), String> { - let (kzg_commitment, kzg_proof, blob) = load_test_blobs_bundle::()?; + let tx = static_valid_tx::() + .map_err(|e| format!("error creating valid tx SSZ bytes: {:?}", e))?; + let transactions = vec![tx; n_blobs]; - let mut bundle = BlobsBundle::::default(); - let mut transactions = vec![]; - - for blob_index in 0..n_blobs { - let tx = static_valid_tx::() - .map_err(|e| format!("error creating valid tx SSZ bytes: {:?}", e))?; - - transactions.push(tx); - bundle - .blobs - .push(blob.clone()) - .map_err(|_| format!("blobs are full, blob index: {:?}", blob_index))?; - bundle - .commitments - .push(kzg_commitment) - .map_err(|_| format!("blobs are full, blob index: {:?}", blob_index))?; - bundle - .proofs - .push(kzg_proof) - .map_err(|_| format!("blobs are full, blob index: {:?}", blob_index))?; - } + let bundle = if fork_name.fulu_enabled() { + let (kzg_commitment, kzg_proofs, blob) = load_test_blobs_bundle_v2::()?; + BlobsBundle { + commitments: vec![kzg_commitment; n_blobs].into(), + proofs: vec![kzg_proofs.to_vec(); n_blobs] + .into_iter() + .flatten() + .collect::>() + .into(), + blobs: vec![blob; n_blobs].into(), + } + } else { + let (kzg_commitment, kzg_proof, blob) = load_test_blobs_bundle_v1::()?; + BlobsBundle { + commitments: vec![kzg_commitment; n_blobs].into(), + proofs: vec![kzg_proof; n_blobs].into(), + blobs: vec![blob; n_blobs].into(), + } + }; Ok((bundle, transactions.into())) } @@ -905,7 +929,7 @@ pub fn generate_pow_block( #[cfg(test)] mod test { use super::*; - use kzg::{trusted_setup::get_trusted_setup, TrustedSetup}; + use kzg::{trusted_setup::get_trusted_setup, Bytes48, CellRef, KzgBlobRef, TrustedSetup}; use types::{MainnetEthSpec, MinimalEthSpec}; #[test] @@ -974,20 +998,28 @@ mod test { } #[test] - fn valid_test_blobs() { + fn valid_test_blobs_bundle_v1() { assert!( - validate_blob::().is_ok(), + validate_blob_bundle_v1::().is_ok(), "Mainnet preset test blobs bundle should contain valid proofs" ); assert!( - validate_blob::().is_ok(), + validate_blob_bundle_v1::().is_ok(), "Minimal preset test blobs bundle should contain valid proofs" ); } - fn validate_blob() -> Result<(), String> { + #[test] + fn valid_test_blobs_bundle_v2() { + validate_blob_bundle_v2::() + .expect("Mainnet preset test blobs bundle v2 should contain valid proofs"); + validate_blob_bundle_v2::() + .expect("Minimal preset test blobs bundle v2 should contain valid proofs"); + } + + fn validate_blob_bundle_v1() -> Result<(), String> { let kzg = load_kzg()?; - let (kzg_commitment, kzg_proof, blob) = load_test_blobs_bundle::()?; + let (kzg_commitment, kzg_proof, blob) = load_test_blobs_bundle_v1::()?; let kzg_blob = kzg::Blob::from_bytes(blob.as_ref()) .map(Box::new) .map_err(|e| format!("Error converting blob to kzg blob: {e:?}"))?; @@ -995,6 +1027,26 @@ mod test { .map_err(|e| format!("Invalid blobs bundle: {e:?}")) } + fn validate_blob_bundle_v2() -> Result<(), String> { + let kzg = load_kzg()?; + let (kzg_commitments, kzg_proofs, cells) = + load_test_blobs_bundle_v2::().map(|(commitment, proofs, blob)| { + let kzg_blob: KzgBlobRef = blob.as_ref().try_into().unwrap(); + ( + vec![Bytes48::from(commitment); proofs.len()], + proofs.into_iter().map(|p| p.into()).collect::>(), + kzg.compute_cells(kzg_blob).unwrap(), + ) + })?; + let (cell_indices, cell_refs): (Vec, Vec) = cells + .iter() + .enumerate() + .map(|(cell_idx, cell)| (cell_idx as u64, CellRef::try_from(cell.as_ref()).unwrap())) + .unzip(); + kzg.verify_cell_proof_batch(&cell_refs, &kzg_proofs, cell_indices, &kzg_commitments) + .map_err(|e| format!("Invalid blobs bundle: {e:?}")) + } + fn load_kzg() -> Result { let trusted_setup: TrustedSetup = serde_json::from_reader(get_trusted_setup().as_slice()) diff --git a/beacon_node/execution_layer/src/test_utils/fixtures/mainnet/test_blobs_bundle_v2.ssz b/beacon_node/execution_layer/src/test_utils/fixtures/mainnet/test_blobs_bundle_v2.ssz new file mode 100644 index 0000000000..e57096c076 Binary files /dev/null and b/beacon_node/execution_layer/src/test_utils/fixtures/mainnet/test_blobs_bundle_v2.ssz differ diff --git a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs index d727d2c159..70c21afed4 100644 --- a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs +++ b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs @@ -383,9 +383,8 @@ pub async fn handle_rpc( == ForkName::Fulu && (method == ENGINE_GET_PAYLOAD_V1 || method == ENGINE_GET_PAYLOAD_V2 - || method == ENGINE_GET_PAYLOAD_V3) - // TODO(fulu): Uncomment this once v5 method is ready for Fulu - // || method == ENGINE_GET_PAYLOAD_V4) + || method == ENGINE_GET_PAYLOAD_V3 + || method == ENGINE_GET_PAYLOAD_V4) { return Err(( format!("{} called after Fulu fork!", method), @@ -451,22 +450,6 @@ pub async fn handle_rpc( }) .unwrap() } - // TODO(fulu): remove this once we switch to v5 method - JsonExecutionPayload::V5(execution_payload) => { - serde_json::to_value(JsonGetPayloadResponseV5 { - execution_payload, - block_value: Uint256::from(DEFAULT_MOCK_EL_PAYLOAD_VALUE_WEI), - blobs_bundle: maybe_blobs - .ok_or(( - "No blobs returned despite V5 Payload".to_string(), - GENERIC_ERROR_CODE, - ))? - .into(), - should_override_builder: false, - execution_requests: Default::default(), - }) - .unwrap() - } _ => unreachable!(), }), ENGINE_GET_PAYLOAD_V5 => Ok(match JsonExecutionPayload::from(response) { diff --git a/beacon_node/execution_layer/src/test_utils/mock_builder.rs b/beacon_node/execution_layer/src/test_utils/mock_builder.rs index fba34121a7..87ea8642be 100644 --- a/beacon_node/execution_layer/src/test_utils/mock_builder.rs +++ b/beacon_node/execution_layer/src/test_utils/mock_builder.rs @@ -546,7 +546,7 @@ impl MockBuilder { .map_err(|_| "incorrect payload variant".to_string())? .into(), blob_kzg_commitments: maybe_blobs_bundle - .map(|b| b.commitments) + .map(|b| b.commitments.clone()) .unwrap_or_default(), value: self.get_bid_value(value), pubkey: self.builder_sk.public_key().compress(), @@ -558,7 +558,7 @@ impl MockBuilder { .map_err(|_| "incorrect payload variant".to_string())? .into(), blob_kzg_commitments: maybe_blobs_bundle - .map(|b| b.commitments) + .map(|b| b.commitments.clone()) .unwrap_or_default(), value: self.get_bid_value(value), pubkey: self.builder_sk.public_key().compress(), @@ -570,7 +570,7 @@ impl MockBuilder { .map_err(|_| "incorrect payload variant".to_string())? .into(), blob_kzg_commitments: maybe_blobs_bundle - .map(|b| b.commitments) + .map(|b| b.commitments.clone()) .unwrap_or_default(), value: self.get_bid_value(value), pubkey: self.builder_sk.public_key().compress(), diff --git a/beacon_node/execution_layer/src/test_utils/mod.rs b/beacon_node/execution_layer/src/test_utils/mod.rs index 17441a15fb..245aa71a15 100644 --- a/beacon_node/execution_layer/src/test_utils/mod.rs +++ b/beacon_node/execution_layer/src/test_utils/mod.rs @@ -58,6 +58,7 @@ pub const DEFAULT_ENGINE_CAPABILITIES: EngineCapabilities = EngineCapabilities { get_payload_v5: true, get_client_version_v1: true, get_blobs_v1: true, + get_blobs_v2: true, }; pub static DEFAULT_CLIENT_VERSION: LazyLock = diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index a5cd94536d..ab70521686 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -364,7 +364,7 @@ fn spawn_build_data_sidecar_task( } else { // Post PeerDAS: construct data columns. let gossip_verified_data_columns = - build_gossip_verified_data_columns(&chain, &block, blobs)?; + build_gossip_verified_data_columns(&chain, &block, blobs, kzg_proofs)?; Ok((vec![], gossip_verified_data_columns)) } }, @@ -383,10 +383,11 @@ fn build_gossip_verified_data_columns( chain: &BeaconChain, block: &SignedBeaconBlock>, blobs: BlobsList, + kzg_cell_proofs: KzgProofs, ) -> Result>>, Rejection> { let slot = block.slot(); let data_column_sidecars = - build_blob_data_column_sidecars(chain, block, blobs).map_err(|e| { + build_blob_data_column_sidecars(chain, block, blobs, kzg_cell_proofs).map_err(|e| { error!( error = ?e, %slot, diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 733f2ca1db..d61ea58377 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -825,7 +825,8 @@ impl NetworkBeaconProcessor { | GossipDataColumnError::InvalidKzgProof { .. } | GossipDataColumnError::UnexpectedDataColumn | GossipDataColumnError::InvalidColumnIndex(_) - | GossipDataColumnError::InconsistentCommitmentsOrProofLength + | GossipDataColumnError::InconsistentCommitmentsLength { .. } + | GossipDataColumnError::InconsistentProofsLength { .. } | GossipDataColumnError::NotFinalizedDescendant { .. } => { debug!( error = ?err, diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index cdcbe1bb8d..9a8edbfa4c 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -844,7 +844,6 @@ impl NetworkBeaconProcessor { publish_blobs: bool, ) { let custody_columns = self.network_globals.sampling_columns.clone(); - let is_supernode = self.network_globals.is_supernode(); let self_cloned = self.clone(); let publish_fn = move |blobs_or_data_column| { if publish_blobs { @@ -852,10 +851,7 @@ impl NetworkBeaconProcessor { BlobsOrDataColumns::Blobs(blobs) => { self_cloned.publish_blobs_gradually(blobs, block_root); } - BlobsOrDataColumns::DataColumns(mut columns) => { - if !is_supernode { - columns.retain(|col| custody_columns.contains(&col.index)); - } + BlobsOrDataColumns::DataColumns(columns) => { self_cloned.publish_data_columns_gradually(columns, block_root); } }; @@ -866,6 +862,7 @@ impl NetworkBeaconProcessor { self.chain.clone(), block_root, block.clone(), + custody_columns, publish_fn, ) .instrument(tracing::info_span!( diff --git a/consensus/types/src/data_column_sidecar.rs b/consensus/types/src/data_column_sidecar.rs index 90a914dfae..03ab6a74f8 100644 --- a/consensus/types/src/data_column_sidecar.rs +++ b/consensus/types/src/data_column_sidecar.rs @@ -1,7 +1,7 @@ use crate::beacon_block_body::{KzgCommitments, BLOB_KZG_COMMITMENTS_INDEX}; use crate::test_utils::TestRandom; use crate::BeaconStateError; -use crate::{BeaconBlockHeader, Epoch, EthSpec, Hash256, KzgProofs, SignedBeaconBlockHeader, Slot}; +use crate::{BeaconBlockHeader, Epoch, EthSpec, Hash256, SignedBeaconBlockHeader, Slot}; use bls::Signature; use derivative::Derivative; use kzg::Error as KzgError; @@ -56,7 +56,7 @@ pub struct DataColumnSidecar { pub column: DataColumn, /// All the KZG commitments and proofs associated with the block, used for verifying sample cells. pub kzg_commitments: KzgCommitments, - pub kzg_proofs: KzgProofs, + pub kzg_proofs: VariableList, pub signed_block_header: SignedBeaconBlockHeader, /// An inclusion proof, proving the inclusion of `blob_kzg_commitments` in `BeaconBlockBody`. pub kzg_commitments_inclusion_proof: FixedVector, diff --git a/consensus/types/src/eth_spec.rs b/consensus/types/src/eth_spec.rs index 0bc074072f..6f1b3e6ce6 100644 --- a/consensus/types/src/eth_spec.rs +++ b/consensus/types/src/eth_spec.rs @@ -4,8 +4,8 @@ use safe_arith::SafeArith; use serde::{Deserialize, Serialize}; use ssz_types::typenum::{ bit::B0, UInt, U0, U1, U10, U1024, U1048576, U1073741824, U1099511627776, U128, U131072, - U134217728, U16, U16777216, U17, U2, U2048, U256, U262144, U32, U4, U4096, U512, U625, U64, - U65536, U8, U8192, + U134217728, U16, U16777216, U17, U2, U2048, U256, U262144, U32, U33554432, U4, U4096, U512, + U625, U64, U65536, U8, U8192, }; use std::fmt::{self, Debug}; use std::str::FromStr; @@ -146,6 +146,11 @@ pub trait EthSpec: /// Must be set to `BytesPerFieldElement * FieldElementsPerCell`. type BytesPerCell: Unsigned + Clone + Sync + Send + Debug + PartialEq; + /// The maximum number of cell commitments per block + /// + /// FieldElementsPerExtBlob * MaxBlobCommitmentsPerBlock + type MaxCellsPerBlock: Unsigned + Clone + Sync + Send + Debug + PartialEq; + /* * New in Electra */ @@ -421,6 +426,7 @@ impl EthSpec for MainnetEthSpec { type FieldElementsPerExtBlob = U8192; type BytesPerBlob = U131072; type BytesPerCell = U2048; + type MaxCellsPerBlock = U33554432; type KzgCommitmentInclusionProofDepth = U17; type KzgCommitmentsInclusionProofDepth = U4; // inclusion of the whole list of commitments type SyncSubcommitteeSize = U128; // 512 committee size / 4 sync committee subnet count @@ -474,6 +480,7 @@ impl EthSpec for MinimalEthSpec { type MaxWithdrawalRequestsPerPayload = U2; type FieldElementsPerCell = U64; type FieldElementsPerExtBlob = U8192; + type MaxCellsPerBlock = U33554432; type BytesPerCell = U2048; type KzgCommitmentsInclusionProofDepth = U4; @@ -566,6 +573,7 @@ impl EthSpec for GnosisEthSpec { type MaxPendingDepositsPerEpoch = U16; type FieldElementsPerCell = U64; type FieldElementsPerExtBlob = U8192; + type MaxCellsPerBlock = U33554432; type BytesPerCell = U2048; type KzgCommitmentsInclusionProofDepth = U4; diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 73a50b4ef3..1d39c89cab 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -272,7 +272,14 @@ pub type Address = fixed_bytes::Address; pub type ForkVersion = [u8; 4]; pub type BLSFieldElement = Uint256; pub type Blob = FixedVector::BytesPerBlob>; -pub type KzgProofs = VariableList::MaxBlobCommitmentsPerBlock>; +// Note on List limit: +// - Deneb to Electra: `MaxBlobCommitmentsPerBlock` +// - Fulu: `MaxCellsPerBlock` +// We choose to use a single type (with the larger value from Fulu as `N`) instead of having to +// introduce a new type for Fulu. This is to avoid messy conversions and having to add extra types +// with no gains - as `N` does not impact serialisation at all, and only affects merkleization, +// which we don't current do on `KzgProofs` anyway. +pub type KzgProofs = VariableList::MaxCellsPerBlock>; pub type VersionedHash = Hash256; pub type Hash64 = alloy_primitives::B64; diff --git a/crypto/kzg/src/lib.rs b/crypto/kzg/src/lib.rs index 2a5c6e47f5..5d752cc0a5 100644 --- a/crypto/kzg/src/lib.rs +++ b/crypto/kzg/src/lib.rs @@ -220,7 +220,7 @@ impl Kzg { .map_err(Into::into) } - /// Computes the cells and associated proofs for a given `blob` at index `index`. + /// Computes the cells and associated proofs for a given `blob`. pub fn compute_cells_and_proofs( &self, blob: KzgBlobRef<'_>, @@ -235,11 +235,14 @@ impl Kzg { Ok((cells, c_kzg_proof)) } + /// Computes the cells for a given `blob`. + pub fn compute_cells(&self, blob: KzgBlobRef<'_>) -> Result<[Cell; CELLS_PER_EXT_BLOB], Error> { + self.context() + .compute_cells(blob) + .map_err(Error::PeerDASKZG) + } + /// Verifies a batch of cell-proof-commitment triplets. - /// - /// Here, `coordinates` correspond to the (row, col) coordinate of the cell in the extended - /// blob "matrix". In the 1D extension, row corresponds to the blob index, and col corresponds - /// to the data column index. pub fn verify_cell_proof_batch( &self, cells: &[CellRef<'_>], diff --git a/scripts/local_testnet/network_params_das.yaml b/scripts/local_testnet/network_params_das.yaml index 80b4bc95c6..d47dfa6b5a 100644 --- a/scripts/local_testnet/network_params_das.yaml +++ b/scripts/local_testnet/network_params_das.yaml @@ -1,6 +1,7 @@ participants: - cl_type: lighthouse cl_image: lighthouse:local + el_image: ethpandaops/geth:engine-getblobs-v2-3676b56 cl_extra_params: - --subscribe-all-data-column-subnets - --subscribe-all-subnets @@ -10,6 +11,7 @@ participants: count: 2 - cl_type: lighthouse cl_image: lighthouse:local + el_image: ethpandaops/geth:engine-getblobs-v2-3676b56 cl_extra_params: # Note: useful for testing range sync (only produce block if node is in sync to prevent forking) - --sync-tolerance-epochs=0 @@ -19,6 +21,10 @@ network_params: electra_fork_epoch: 1 fulu_fork_epoch: 2 seconds_per_slot: 6 + max_blobs_per_block_electra: 64 + target_blobs_per_block_electra: 48 + max_blobs_per_block_fulu: 64 + target_blobs_per_block_fulu: 48 snooper_enabled: false global_log_level: debug additional_services: @@ -26,4 +32,8 @@ additional_services: - spamoor_blob - prometheus_grafana dora_params: - image: ethpandaops/dora:fulu-support \ No newline at end of file + image: ethpandaops/dora:fulu-support +spamoor_blob_params: + # Throughput of spamoor + # Defaults to 3 + throughput: 32 \ No newline at end of file