//! This module implements an optimisation to fetch blobs via JSON-RPC from the EL. //! If a blob has already been seen in the public mempool, then it is often unnecessary to wait for //! it to arrive on P2P gossip. This PR uses a new JSON-RPC method (`engine_getBlobsV1`) which //! allows the CL to load the blobs quickly from the EL's blob pool. //! //! Once the node fetches the blobs from EL, it then publishes the remaining blobs that it hasn't seen //! on P2P gossip to the network. From PeerDAS onwards, together with the increase in blob count, //! broadcasting blobs requires a much higher bandwidth, and is only done by high capacity //! supernodes. mod fetch_blobs_beacon_adapter; #[cfg(test)] mod tests; use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; #[cfg_attr(test, double)] use crate::fetch_blobs::fetch_blobs_beacon_adapter::FetchBlobsBeaconAdapter; use crate::kzg_utils::blobs_to_data_column_sidecars; use crate::observed_data_sidecars::DoNotObserve; use crate::{ metrics, AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, }; use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2}; use execution_layer::Error as ExecutionLayerError; use metrics::{inc_counter, TryExt}; #[cfg(test)] use mockall_double::double; use ssz_types::FixedVector; use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash; use std::collections::HashSet; use std::sync::Arc; use tracing::{debug, warn}; use types::blob_sidecar::{BlobSidecarError, FixedBlobSidecarList}; use types::data_column_sidecar::DataColumnSidecarError; use types::{ BeaconStateError, Blob, BlobSidecar, ChainSpec, ColumnIndex, EthSpec, FullPayload, Hash256, KzgProofs, SignedBeaconBlock, SignedBeaconBlockHeader, VersionedHash, }; /// Result from engine get blobs to be passed onto `DataAvailabilityChecker` and published to the /// gossip network. The blobs / data columns have not been marked as observed yet, as they may not /// be published immediately. #[derive(Debug)] pub enum EngineGetBlobsOutput { Blobs(Vec>), /// A filtered list of custody data columns to be imported into the `DataAvailabilityChecker`. CustodyColumns(Vec>), } #[derive(Debug)] pub enum FetchEngineBlobError { BeaconStateError(BeaconStateError), BeaconChainError(Box), BlobProcessingError(BlockError), BlobSidecarError(BlobSidecarError), DataColumnSidecarError(DataColumnSidecarError), ExecutionLayerMissing, InternalError(String), GossipBlob(GossipBlobError), GossipDataColumn(GossipDataColumnError), RequestFailed(ExecutionLayerError), RuntimeShutdown, TokioJoin(tokio::task::JoinError), } /// Fetches blobs from the EL mempool and processes them. It also broadcasts unseen blobs or /// data columns (PeerDAS onwards) to the network, using the supplied `publish_fn`. pub async fn fetch_and_process_engine_blobs( chain: Arc>, block_root: Hash256, block: Arc>>, custody_columns: HashSet, publish_fn: impl Fn(EngineGetBlobsOutput) + Send + 'static, ) -> Result, FetchEngineBlobError> { fetch_and_process_engine_blobs_inner( FetchBlobsBeaconAdapter::new(chain), block_root, block, custody_columns, publish_fn, ) .await } /// Internal implementation of fetch blobs, which uses `FetchBlobsBeaconAdapter` instead of /// `BeaconChain` for better testability. async fn fetch_and_process_engine_blobs_inner( chain_adapter: FetchBlobsBeaconAdapter, block_root: Hash256, block: Arc>>, custody_columns: HashSet, publish_fn: impl Fn(EngineGetBlobsOutput) + Send + 'static, ) -> Result, FetchEngineBlobError> { let versioned_hashes = if let Some(kzg_commitments) = block .message() .body() .blob_kzg_commitments() .ok() .filter(|blobs| !blobs.is_empty()) { kzg_commitments .iter() .map(kzg_commitment_to_versioned_hash) .collect::>() } else { debug!("Fetch blobs not triggered - none required"); return Ok(None); }; debug!( num_expected_blobs = versioned_hashes.len(), "Fetching blobs from the EL" ); if chain_adapter .spec() .is_peer_das_enabled_for_epoch(block.epoch()) { fetch_and_process_blobs_v2( chain_adapter, block_root, block, versioned_hashes, custody_columns, publish_fn, ) .await } else { fetch_and_process_blobs_v1( chain_adapter, block_root, block, versioned_hashes, publish_fn, ) .await } } async fn fetch_and_process_blobs_v1( chain_adapter: FetchBlobsBeaconAdapter, block_root: Hash256, block: Arc>, versioned_hashes: Vec, publish_fn: impl Fn(EngineGetBlobsOutput) + Send + Sized, ) -> Result, FetchEngineBlobError> { let num_expected_blobs = versioned_hashes.len(); metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64); debug!(num_expected_blobs, "Fetching blobs from the EL"); let response = chain_adapter .get_blobs_v1(versioned_hashes) .await .inspect_err(|_| { inc_counter(&metrics::BLOBS_FROM_EL_ERROR_TOTAL); })?; let num_fetched_blobs = response.iter().filter(|opt| opt.is_some()).count(); metrics::observe(&metrics::BLOBS_FROM_EL_RECEIVED, num_fetched_blobs as f64); if num_fetched_blobs == 0 { debug!(num_expected_blobs, "No blobs fetched from the EL"); inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL); return Ok(None); } else { debug!( num_expected_blobs, num_fetched_blobs, "Received blobs from the EL" ); inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL); } if chain_adapter.fork_choice_contains_block(&block_root) { // Avoid computing sidecars if the block has already been imported. debug!( info = "block has already been imported", "Ignoring EL blobs response" ); return Ok(None); } let (signed_block_header, kzg_commitments_proof) = block .signed_block_header_and_kzg_commitments_proof() .map_err(FetchEngineBlobError::BeaconStateError)?; let fixed_blob_sidecar_list = build_blob_sidecars( &block, response, signed_block_header, &kzg_commitments_proof, chain_adapter.spec(), )?; // Gossip verify blobs before publishing. This prevents blobs with invalid KZG proofs from // the EL making it into the data availability checker. We do not immediately add these // blobs to the observed blobs/columns cache because we want to allow blobs/columns to arrive on gossip // and be accepted (and propagated) while we are waiting to publish. Just before publishing // we will observe the blobs/columns and only proceed with publishing if they are not yet seen. let blobs_to_import_and_publish = fixed_blob_sidecar_list .into_iter() .filter_map(|opt_blob| { let blob = opt_blob.as_ref()?; match chain_adapter.verify_blob_for_gossip(blob) { Ok(verified) => Some(Ok(verified)), // Ignore already seen blobs. Err(GossipBlobError::RepeatBlob { .. }) => None, Err(e) => Some(Err(e)), } }) .collect::, _>>() .map_err(FetchEngineBlobError::GossipBlob)?; if blobs_to_import_and_publish.is_empty() { return Ok(None); } publish_fn(EngineGetBlobsOutput::Blobs( blobs_to_import_and_publish.clone(), )); let availability_processing_status = chain_adapter .process_engine_blobs( block.slot(), block_root, EngineGetBlobsOutput::Blobs(blobs_to_import_and_publish), ) .await?; Ok(Some(availability_processing_status)) } async fn fetch_and_process_blobs_v2( chain_adapter: FetchBlobsBeaconAdapter, block_root: Hash256, block: Arc>, versioned_hashes: Vec, custody_columns_indices: HashSet, publish_fn: impl Fn(EngineGetBlobsOutput) + Send + 'static, ) -> Result, FetchEngineBlobError> { let num_expected_blobs = versioned_hashes.len(); metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64); debug!(num_expected_blobs, "Fetching blobs from the EL"); let response = chain_adapter .get_blobs_v2(versioned_hashes) .await .inspect_err(|_| { inc_counter(&metrics::BLOBS_FROM_EL_ERROR_TOTAL); })?; let Some(blobs_and_proofs) = response else { debug!(num_expected_blobs, "No blobs fetched from the EL"); inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL); return Ok(None); }; let (blobs, proofs): (Vec<_>, Vec<_>) = blobs_and_proofs .into_iter() .map(|blob_and_proof| { let BlobAndProofV2 { blob, proofs } = blob_and_proof; (blob, proofs) }) .unzip(); let num_fetched_blobs = blobs.len(); metrics::observe(&metrics::BLOBS_FROM_EL_RECEIVED, num_fetched_blobs as f64); if num_fetched_blobs != num_expected_blobs { // This scenario is not supposed to happen if the EL is spec compliant. // It should either return all requested blobs or none, but NOT partial responses. // If we attempt to compute columns with partial blobs, we'd end up with invalid columns. warn!( num_fetched_blobs, num_expected_blobs, "The EL did not return all requested blobs" ); inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL); return Ok(None); } debug!(num_fetched_blobs, "All expected blobs received from the EL"); inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL); if chain_adapter.fork_choice_contains_block(&block_root) { // Avoid computing columns if the block has already been imported. debug!( info = "block has already been imported", "Ignoring EL blobs response" ); return Ok(None); } let chain_adapter = Arc::new(chain_adapter); let custody_columns_to_import = compute_custody_columns_to_import( &chain_adapter, block.clone(), blobs, proofs, custody_columns_indices, ) .await?; if custody_columns_to_import.is_empty() { debug!( info = "No new data columns to import", "Ignoring EL blobs response" ); return Ok(None); } publish_fn(EngineGetBlobsOutput::CustodyColumns( custody_columns_to_import.clone(), )); let availability_processing_status = chain_adapter .process_engine_blobs( block.slot(), block_root, EngineGetBlobsOutput::CustodyColumns(custody_columns_to_import), ) .await?; Ok(Some(availability_processing_status)) } /// Offload the data column computation to a blocking task to avoid holding up the async runtime. async fn compute_custody_columns_to_import( chain_adapter: &Arc>, block: Arc>>, blobs: Vec>, proofs: Vec>, custody_columns_indices: HashSet, ) -> Result>, FetchEngineBlobError> { let kzg = chain_adapter.kzg().clone(); let spec = chain_adapter.spec().clone(); let chain_adapter_cloned = chain_adapter.clone(); chain_adapter .executor() .spawn_blocking_handle( move || { let mut timer = metrics::start_timer_vec( &metrics::DATA_COLUMN_SIDECAR_COMPUTATION, &[&blobs.len().to_string()], ); let blob_refs = blobs.iter().collect::>(); let cell_proofs = proofs.into_iter().flatten().collect(); let data_columns_result = blobs_to_data_column_sidecars(&blob_refs, cell_proofs, &block, &kzg, &spec) .discard_timer_on_break(&mut timer); drop(timer); // This filtering ensures we only import and publish the custody columns. // `DataAvailabilityChecker` requires a strict match on custody columns count to // consider a block available. let custody_columns = data_columns_result .map(|mut data_columns| { data_columns.retain(|col| custody_columns_indices.contains(&col.index)); data_columns }) .map_err(FetchEngineBlobError::DataColumnSidecarError)?; // Gossip verify data columns before publishing. This prevents blobs with invalid // KZG proofs from the EL making it into the data availability checker. We do not // immediately add these blobs to the observed blobs/columns cache because we want // to allow blobs/columns to arrive on gossip and be accepted (and propagated) while // we are waiting to publish. Just before publishing we will observe the blobs/columns // and only proceed with publishing if they are not yet seen. // TODO(das): we may want to just perform kzg proof verification here, since the // `DataColumnSidecar` and inclusion proof is computed just above and is unnecessary // to verify them. let columns_to_import_and_publish = custody_columns .into_iter() .filter_map(|col| { match chain_adapter_cloned.verify_data_column_for_gossip(col) { Ok(verified) => Some(Ok(verified)), Err(e) => match e { // Ignore already seen data columns GossipDataColumnError::PriorKnown { .. } | GossipDataColumnError::PriorKnownUnpublished => None, GossipDataColumnError::BeaconChainError(_) | GossipDataColumnError::ProposalSignatureInvalid | GossipDataColumnError::UnknownValidator(_) | GossipDataColumnError::IsNotLaterThanParent { .. } | GossipDataColumnError::InvalidKzgProof(_) | GossipDataColumnError::InvalidSubnetId { .. } | GossipDataColumnError::FutureSlot { .. } | GossipDataColumnError::PastFinalizedSlot { .. } | GossipDataColumnError::PubkeyCacheTimeout | GossipDataColumnError::ProposerIndexMismatch { .. } | GossipDataColumnError::ParentUnknown { .. } | GossipDataColumnError::NotFinalizedDescendant { .. } | GossipDataColumnError::InvalidInclusionProof | GossipDataColumnError::InvalidColumnIndex(_) | GossipDataColumnError::UnexpectedDataColumn | GossipDataColumnError::InconsistentCommitmentsLength { .. } | GossipDataColumnError::InconsistentProofsLength { .. } => { Some(Err(e)) } }, } }) .collect::, _>>() .map_err(FetchEngineBlobError::GossipDataColumn)?; Ok(columns_to_import_and_publish) }, "compute_custody_columns_to_import", ) .ok_or(FetchEngineBlobError::RuntimeShutdown)? .await .map_err(FetchEngineBlobError::TokioJoin)? } fn build_blob_sidecars( block: &Arc>>, response: Vec>>, signed_block_header: SignedBeaconBlockHeader, kzg_commitments_inclusion_proof: &FixedVector, spec: &ChainSpec, ) -> Result, FetchEngineBlobError> { let epoch = block.epoch(); let mut fixed_blob_sidecar_list = FixedBlobSidecarList::default(spec.max_blobs_per_block(epoch) as usize); for (index, blob_and_proof) in response .into_iter() .enumerate() .filter_map(|(i, opt_blob)| Some((i, opt_blob?))) { match BlobSidecar::new_with_existing_proof( index, blob_and_proof.blob, block, signed_block_header.clone(), kzg_commitments_inclusion_proof, blob_and_proof.proof, ) { Ok(blob) => { if let Some(blob_mut) = fixed_blob_sidecar_list.get_mut(index) { *blob_mut = Some(Arc::new(blob)); } else { return Err(FetchEngineBlobError::InternalError(format!( "Blobs from EL contains blob with invalid index {index}" ))); } } Err(e) => { return Err(FetchEngineBlobError::BlobSidecarError(e)); } } } Ok(fixed_blob_sidecar_list) }