mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-08 09:16:00 +00:00
Update engine_getBlobsV2 response type and add getBlobsV2 tests (#7505)
Update `engine_getBlobsV2` response type to `Option<Vec<BlobsAndProofV2>>`. See recent spec change [here](https://github.com/ethereum/execution-apis/pull/630). Added some tests to cover basic fetch blob scenarios.
This commit is contained in:
84
Cargo.lock
generated
84
Cargo.lock
generated
@@ -813,6 +813,8 @@ dependencies = [
|
|||||||
"maplit",
|
"maplit",
|
||||||
"merkle_proof",
|
"merkle_proof",
|
||||||
"metrics",
|
"metrics",
|
||||||
|
"mockall",
|
||||||
|
"mockall_double",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"oneshot_broadcast",
|
"oneshot_broadcast",
|
||||||
"operation_pool",
|
"operation_pool",
|
||||||
@@ -2376,6 +2378,12 @@ dependencies = [
|
|||||||
"validator_store",
|
"validator_store",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "downcast"
|
||||||
|
version = "0.11.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "dtoa"
|
name = "dtoa"
|
||||||
version = "1.0.10"
|
version = "1.0.10"
|
||||||
@@ -3504,6 +3512,12 @@ dependencies = [
|
|||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "fragile"
|
||||||
|
version = "2.0.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "fs2"
|
name = "fs2"
|
||||||
version = "0.4.3"
|
version = "0.4.3"
|
||||||
@@ -5932,6 +5946,44 @@ version = "0.3.2"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "9366861eb2a2c436c20b12c8dbec5f798cea6b47ad99216be0282942e2c81ea0"
|
checksum = "9366861eb2a2c436c20b12c8dbec5f798cea6b47ad99216be0282942e2c81ea0"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "mockall"
|
||||||
|
version = "0.13.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "39a6bfcc6c8c7eed5ee98b9c3e33adc726054389233e201c95dab2d41a3839d2"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"downcast",
|
||||||
|
"fragile",
|
||||||
|
"mockall_derive",
|
||||||
|
"predicates",
|
||||||
|
"predicates-tree",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "mockall_derive"
|
||||||
|
version = "0.13.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "25ca3004c2efe9011bd4e461bd8256445052b9615405b4f7ea43fc8ca5c20898"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn 2.0.101",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "mockall_double"
|
||||||
|
version = "0.3.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f1ca96e5ac35256ae3e13536edd39b172b88f41615e1d7b653c8ad24524113e8"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn 2.0.101",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "mockito"
|
name = "mockito"
|
||||||
version = "1.7.0"
|
version = "1.7.0"
|
||||||
@@ -6898,6 +6950,32 @@ dependencies = [
|
|||||||
"zerocopy",
|
"zerocopy",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "predicates"
|
||||||
|
version = "3.1.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a5d19ee57562043d37e82899fade9a22ebab7be9cef5026b07fda9cdd4293573"
|
||||||
|
dependencies = [
|
||||||
|
"anstyle",
|
||||||
|
"predicates-core",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "predicates-core"
|
||||||
|
version = "1.0.9"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "727e462b119fe9c93fd0eb1429a5f7647394014cf3c04ab2c0350eeb09095ffa"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "predicates-tree"
|
||||||
|
version = "1.0.12"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "72dd2d6d381dfb73a193c7fca536518d7caee39fc8503f74e7dc0be0531b425c"
|
||||||
|
dependencies = [
|
||||||
|
"predicates-core",
|
||||||
|
"termtree",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pretty_reqwest_error"
|
name = "pretty_reqwest_error"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
@@ -8902,6 +8980,12 @@ dependencies = [
|
|||||||
"windows-sys 0.59.0",
|
"windows-sys 0.59.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "termtree"
|
||||||
|
version = "0.5.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "test_random_derive"
|
name = "test_random_derive"
|
||||||
version = "0.2.0"
|
version = "0.2.0"
|
||||||
|
|||||||
@@ -188,6 +188,8 @@ maplit = "1"
|
|||||||
merkle_proof = { path = "consensus/merkle_proof" }
|
merkle_proof = { path = "consensus/merkle_proof" }
|
||||||
metrics = { path = "common/metrics" }
|
metrics = { path = "common/metrics" }
|
||||||
milhouse = "0.5"
|
milhouse = "0.5"
|
||||||
|
mockall = "0.13"
|
||||||
|
mockall_double = "0.3"
|
||||||
mockito = "1.5.0"
|
mockito = "1.5.0"
|
||||||
monitoring_api = { path = "common/monitoring_api" }
|
monitoring_api = { path = "common/monitoring_api" }
|
||||||
network = { path = "beacon_node/network" }
|
network = { path = "beacon_node/network" }
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
|
||||||
[package]
|
[package]
|
||||||
name = "beacon_chain"
|
name = "beacon_chain"
|
||||||
version = "0.2.0"
|
version = "0.2.0"
|
||||||
@@ -69,6 +70,8 @@ types = { workspace = true }
|
|||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
criterion = { workspace = true }
|
criterion = { workspace = true }
|
||||||
maplit = { workspace = true }
|
maplit = { workspace = true }
|
||||||
|
mockall = { workspace = true }
|
||||||
|
mockall_double = { workspace = true }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
|
|
||||||
[[bench]]
|
[[bench]]
|
||||||
|
|||||||
@@ -0,0 +1,95 @@
|
|||||||
|
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
|
||||||
|
use crate::fetch_blobs::{EngineGetBlobsOutput, FetchEngineBlobError};
|
||||||
|
use crate::observed_data_sidecars::DoNotObserve;
|
||||||
|
use crate::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes};
|
||||||
|
use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2};
|
||||||
|
use kzg::Kzg;
|
||||||
|
#[cfg(test)]
|
||||||
|
use mockall::automock;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use task_executor::TaskExecutor;
|
||||||
|
use types::{BlobSidecar, ChainSpec, Hash256, Slot};
|
||||||
|
|
||||||
|
/// An adapter to the `BeaconChain` functionalities to remove `BeaconChain` from direct dependency to enable testing fetch blobs logic.
|
||||||
|
pub(crate) struct FetchBlobsBeaconAdapter<T: BeaconChainTypes> {
|
||||||
|
chain: Arc<BeaconChain<T>>,
|
||||||
|
spec: Arc<ChainSpec>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg_attr(test, automock, allow(dead_code))]
|
||||||
|
impl<T: BeaconChainTypes> FetchBlobsBeaconAdapter<T> {
|
||||||
|
pub(crate) fn new(chain: Arc<BeaconChain<T>>) -> Self {
|
||||||
|
let spec = chain.spec.clone();
|
||||||
|
Self { chain, spec }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn spec(&self) -> &Arc<ChainSpec> {
|
||||||
|
&self.spec
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn kzg(&self) -> &Arc<Kzg> {
|
||||||
|
&self.chain.kzg
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn executor(&self) -> &TaskExecutor {
|
||||||
|
&self.chain.task_executor
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn get_blobs_v1(
|
||||||
|
&self,
|
||||||
|
versioned_hashes: Vec<Hash256>,
|
||||||
|
) -> Result<Vec<Option<BlobAndProofV1<T::EthSpec>>>, FetchEngineBlobError> {
|
||||||
|
let execution_layer = self
|
||||||
|
.chain
|
||||||
|
.execution_layer
|
||||||
|
.as_ref()
|
||||||
|
.ok_or(FetchEngineBlobError::ExecutionLayerMissing)?;
|
||||||
|
|
||||||
|
execution_layer
|
||||||
|
.get_blobs_v1(versioned_hashes)
|
||||||
|
.await
|
||||||
|
.map_err(FetchEngineBlobError::RequestFailed)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn get_blobs_v2(
|
||||||
|
&self,
|
||||||
|
versioned_hashes: Vec<Hash256>,
|
||||||
|
) -> Result<Option<Vec<BlobAndProofV2<T::EthSpec>>>, FetchEngineBlobError> {
|
||||||
|
let execution_layer = self
|
||||||
|
.chain
|
||||||
|
.execution_layer
|
||||||
|
.as_ref()
|
||||||
|
.ok_or(FetchEngineBlobError::ExecutionLayerMissing)?;
|
||||||
|
|
||||||
|
execution_layer
|
||||||
|
.get_blobs_v2(versioned_hashes)
|
||||||
|
.await
|
||||||
|
.map_err(FetchEngineBlobError::RequestFailed)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn verify_blob_for_gossip(
|
||||||
|
&self,
|
||||||
|
blob: &Arc<BlobSidecar<T::EthSpec>>,
|
||||||
|
) -> Result<GossipVerifiedBlob<T, DoNotObserve>, GossipBlobError> {
|
||||||
|
GossipVerifiedBlob::<T, DoNotObserve>::new(blob.clone(), blob.index, &self.chain)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn process_engine_blobs(
|
||||||
|
&self,
|
||||||
|
slot: Slot,
|
||||||
|
block_root: Hash256,
|
||||||
|
blobs: EngineGetBlobsOutput<T::EthSpec>,
|
||||||
|
) -> Result<AvailabilityProcessingStatus, FetchEngineBlobError> {
|
||||||
|
self.chain
|
||||||
|
.process_engine_blobs(slot, block_root, blobs)
|
||||||
|
.await
|
||||||
|
.map_err(FetchEngineBlobError::BlobProcessingError)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn fork_choice_contains_block(&self, block_root: &Hash256) -> bool {
|
||||||
|
self.chain
|
||||||
|
.canonical_head
|
||||||
|
.fork_choice_read_lock()
|
||||||
|
.contains_block(block_root)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -8,7 +8,13 @@
|
|||||||
//! broadcasting blobs requires a much higher bandwidth, and is only done by high capacity
|
//! broadcasting blobs requires a much higher bandwidth, and is only done by high capacity
|
||||||
//! supernodes.
|
//! supernodes.
|
||||||
|
|
||||||
|
mod fetch_blobs_beacon_adapter;
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests;
|
||||||
|
|
||||||
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
|
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
|
||||||
|
#[cfg_attr(test, double)]
|
||||||
|
use crate::fetch_blobs::fetch_blobs_beacon_adapter::FetchBlobsBeaconAdapter;
|
||||||
use crate::kzg_utils::blobs_to_data_column_sidecars;
|
use crate::kzg_utils::blobs_to_data_column_sidecars;
|
||||||
use crate::observed_data_sidecars::DoNotObserve;
|
use crate::observed_data_sidecars::DoNotObserve;
|
||||||
use crate::{
|
use crate::{
|
||||||
@@ -18,11 +24,13 @@ use crate::{
|
|||||||
use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2};
|
use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2};
|
||||||
use execution_layer::Error as ExecutionLayerError;
|
use execution_layer::Error as ExecutionLayerError;
|
||||||
use metrics::{inc_counter, TryExt};
|
use metrics::{inc_counter, TryExt};
|
||||||
|
#[cfg(test)]
|
||||||
|
use mockall_double::double;
|
||||||
use ssz_types::FixedVector;
|
use ssz_types::FixedVector;
|
||||||
use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash;
|
use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash;
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tracing::debug;
|
use tracing::{debug, warn};
|
||||||
use types::blob_sidecar::{BlobSidecarError, FixedBlobSidecarList};
|
use types::blob_sidecar::{BlobSidecarError, FixedBlobSidecarList};
|
||||||
use types::data_column_sidecar::DataColumnSidecarError;
|
use types::data_column_sidecar::DataColumnSidecarError;
|
||||||
use types::{
|
use types::{
|
||||||
@@ -58,6 +66,7 @@ pub enum FetchEngineBlobError {
|
|||||||
GossipBlob(GossipBlobError),
|
GossipBlob(GossipBlobError),
|
||||||
RequestFailed(ExecutionLayerError),
|
RequestFailed(ExecutionLayerError),
|
||||||
RuntimeShutdown,
|
RuntimeShutdown,
|
||||||
|
TokioJoin(tokio::task::JoinError),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetches blobs from the EL mempool and processes them. It also broadcasts unseen blobs or
|
/// Fetches blobs from the EL mempool and processes them. It also broadcasts unseen blobs or
|
||||||
@@ -68,6 +77,25 @@ pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
|
|||||||
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
|
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
|
||||||
custody_columns: HashSet<ColumnIndex>,
|
custody_columns: HashSet<ColumnIndex>,
|
||||||
publish_fn: impl Fn(BlobsOrDataColumns<T>) + Send + 'static,
|
publish_fn: impl Fn(BlobsOrDataColumns<T>) + Send + 'static,
|
||||||
|
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
|
||||||
|
fetch_and_process_engine_blobs_inner(
|
||||||
|
FetchBlobsBeaconAdapter::new(chain),
|
||||||
|
block_root,
|
||||||
|
block,
|
||||||
|
custody_columns,
|
||||||
|
publish_fn,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Internal implementation of fetch blobs, which uses `FetchBlobsBeaconAdapter` instead of
|
||||||
|
/// `BeaconChain` for better testability.
|
||||||
|
async fn fetch_and_process_engine_blobs_inner<T: BeaconChainTypes>(
|
||||||
|
chain_adapter: FetchBlobsBeaconAdapter<T>,
|
||||||
|
block_root: Hash256,
|
||||||
|
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
|
||||||
|
custody_columns: HashSet<ColumnIndex>,
|
||||||
|
publish_fn: impl Fn(BlobsOrDataColumns<T>) + Send + 'static,
|
||||||
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
|
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
|
||||||
let versioned_hashes = if let Some(kzg_commitments) = block
|
let versioned_hashes = if let Some(kzg_commitments) = block
|
||||||
.message()
|
.message()
|
||||||
@@ -90,9 +118,12 @@ pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
|
|||||||
"Fetching blobs from the EL"
|
"Fetching blobs from the EL"
|
||||||
);
|
);
|
||||||
|
|
||||||
if chain.spec.is_peer_das_enabled_for_epoch(block.epoch()) {
|
if chain_adapter
|
||||||
|
.spec()
|
||||||
|
.is_peer_das_enabled_for_epoch(block.epoch())
|
||||||
|
{
|
||||||
fetch_and_process_blobs_v2(
|
fetch_and_process_blobs_v2(
|
||||||
chain,
|
chain_adapter,
|
||||||
block_root,
|
block_root,
|
||||||
block,
|
block,
|
||||||
versioned_hashes,
|
versioned_hashes,
|
||||||
@@ -101,32 +132,33 @@ pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
} else {
|
} else {
|
||||||
fetch_and_process_blobs_v1(chain, block_root, block, versioned_hashes, publish_fn).await
|
fetch_and_process_blobs_v1(
|
||||||
|
chain_adapter,
|
||||||
|
block_root,
|
||||||
|
block,
|
||||||
|
versioned_hashes,
|
||||||
|
publish_fn,
|
||||||
|
)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
|
async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
|
||||||
chain: Arc<BeaconChain<T>>,
|
chain_adapter: FetchBlobsBeaconAdapter<T>,
|
||||||
block_root: Hash256,
|
block_root: Hash256,
|
||||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||||
versioned_hashes: Vec<VersionedHash>,
|
versioned_hashes: Vec<VersionedHash>,
|
||||||
publish_fn: impl Fn(BlobsOrDataColumns<T>) + Send + Sized,
|
publish_fn: impl Fn(BlobsOrDataColumns<T>) + Send + Sized,
|
||||||
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
|
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
|
||||||
let num_expected_blobs = versioned_hashes.len();
|
let num_expected_blobs = versioned_hashes.len();
|
||||||
let execution_layer = chain
|
|
||||||
.execution_layer
|
|
||||||
.as_ref()
|
|
||||||
.ok_or(FetchEngineBlobError::ExecutionLayerMissing)?;
|
|
||||||
|
|
||||||
metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64);
|
metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64);
|
||||||
debug!(num_expected_blobs, "Fetching blobs from the EL");
|
debug!(num_expected_blobs, "Fetching blobs from the EL");
|
||||||
let response = execution_layer
|
let response = chain_adapter
|
||||||
.get_blobs_v1(versioned_hashes)
|
.get_blobs_v1(versioned_hashes)
|
||||||
.await
|
.await
|
||||||
.inspect_err(|_| {
|
.inspect_err(|_| {
|
||||||
inc_counter(&metrics::BLOBS_FROM_EL_ERROR_TOTAL);
|
inc_counter(&metrics::BLOBS_FROM_EL_ERROR_TOTAL);
|
||||||
})
|
})?;
|
||||||
.map_err(FetchEngineBlobError::RequestFailed)?;
|
|
||||||
|
|
||||||
let num_fetched_blobs = response.iter().filter(|opt| opt.is_some()).count();
|
let num_fetched_blobs = response.iter().filter(|opt| opt.is_some()).count();
|
||||||
metrics::observe(&metrics::BLOBS_FROM_EL_RECEIVED, num_fetched_blobs as f64);
|
metrics::observe(&metrics::BLOBS_FROM_EL_RECEIVED, num_fetched_blobs as f64);
|
||||||
@@ -148,7 +180,7 @@ async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
|
|||||||
response,
|
response,
|
||||||
signed_block_header,
|
signed_block_header,
|
||||||
&kzg_commitments_proof,
|
&kzg_commitments_proof,
|
||||||
&chain.spec,
|
chain_adapter.spec(),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
// Gossip verify blobs before publishing. This prevents blobs with invalid KZG proofs from
|
// Gossip verify blobs before publishing. This prevents blobs with invalid KZG proofs from
|
||||||
@@ -160,7 +192,7 @@ async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
|
|||||||
.iter()
|
.iter()
|
||||||
.filter_map(|opt_blob| {
|
.filter_map(|opt_blob| {
|
||||||
let blob = opt_blob.as_ref()?;
|
let blob = opt_blob.as_ref()?;
|
||||||
match GossipVerifiedBlob::<T, DoNotObserve>::new(blob.clone(), blob.index, &chain) {
|
match chain_adapter.verify_blob_for_gossip(blob) {
|
||||||
Ok(verified) => Some(Ok(verified)),
|
Ok(verified) => Some(Ok(verified)),
|
||||||
// Ignore already seen blobs.
|
// Ignore already seen blobs.
|
||||||
Err(GossipBlobError::RepeatBlob { .. }) => None,
|
Err(GossipBlobError::RepeatBlob { .. }) => None,
|
||||||
@@ -176,20 +208,19 @@ async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
|
|||||||
|
|
||||||
debug!(num_fetched_blobs, "Processing engine blobs");
|
debug!(num_fetched_blobs, "Processing engine blobs");
|
||||||
|
|
||||||
let availability_processing_status = chain
|
let availability_processing_status = chain_adapter
|
||||||
.process_engine_blobs(
|
.process_engine_blobs(
|
||||||
block.slot(),
|
block.slot(),
|
||||||
block_root,
|
block_root,
|
||||||
EngineGetBlobsOutput::Blobs(fixed_blob_sidecar_list.clone()),
|
EngineGetBlobsOutput::Blobs(fixed_blob_sidecar_list.clone()),
|
||||||
)
|
)
|
||||||
.await
|
.await?;
|
||||||
.map_err(FetchEngineBlobError::BlobProcessingError)?;
|
|
||||||
|
|
||||||
Ok(Some(availability_processing_status))
|
Ok(Some(availability_processing_status))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
|
async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
|
||||||
chain: Arc<BeaconChain<T>>,
|
chain_adapter: FetchBlobsBeaconAdapter<T>,
|
||||||
block_root: Hash256,
|
block_root: Hash256,
|
||||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||||
versioned_hashes: Vec<VersionedHash>,
|
versioned_hashes: Vec<VersionedHash>,
|
||||||
@@ -197,52 +228,49 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
|
|||||||
publish_fn: impl Fn(BlobsOrDataColumns<T>) + Send + 'static,
|
publish_fn: impl Fn(BlobsOrDataColumns<T>) + Send + 'static,
|
||||||
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
|
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
|
||||||
let num_expected_blobs = versioned_hashes.len();
|
let num_expected_blobs = versioned_hashes.len();
|
||||||
let execution_layer = chain
|
|
||||||
.execution_layer
|
|
||||||
.as_ref()
|
|
||||||
.ok_or(FetchEngineBlobError::ExecutionLayerMissing)?;
|
|
||||||
|
|
||||||
metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64);
|
metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64);
|
||||||
debug!(num_expected_blobs, "Fetching blobs from the EL");
|
debug!(num_expected_blobs, "Fetching blobs from the EL");
|
||||||
let response = execution_layer
|
let response = chain_adapter
|
||||||
.get_blobs_v2(versioned_hashes)
|
.get_blobs_v2(versioned_hashes)
|
||||||
.await
|
.await
|
||||||
.inspect_err(|_| {
|
.inspect_err(|_| {
|
||||||
inc_counter(&metrics::BLOBS_FROM_EL_ERROR_TOTAL);
|
inc_counter(&metrics::BLOBS_FROM_EL_ERROR_TOTAL);
|
||||||
})
|
})?;
|
||||||
.map_err(FetchEngineBlobError::RequestFailed)?;
|
|
||||||
|
|
||||||
let (blobs, proofs): (Vec<_>, Vec<_>) = response
|
let Some(blobs_and_proofs) = response else {
|
||||||
|
debug!(num_expected_blobs, "No blobs fetched from the EL");
|
||||||
|
inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL);
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
|
|
||||||
|
let (blobs, proofs): (Vec<_>, Vec<_>) = blobs_and_proofs
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(|blob_and_proof_opt| {
|
.map(|blob_and_proof| {
|
||||||
blob_and_proof_opt.map(|blob_and_proof| {
|
let BlobAndProofV2 { blob, proofs } = blob_and_proof;
|
||||||
let BlobAndProofV2 { blob, proofs } = blob_and_proof;
|
(blob, proofs)
|
||||||
(blob, proofs)
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
.unzip();
|
.unzip();
|
||||||
|
|
||||||
let num_fetched_blobs = blobs.len();
|
let num_fetched_blobs = blobs.len();
|
||||||
metrics::observe(&metrics::BLOBS_FROM_EL_RECEIVED, num_fetched_blobs as f64);
|
metrics::observe(&metrics::BLOBS_FROM_EL_RECEIVED, num_fetched_blobs as f64);
|
||||||
|
|
||||||
// Partial blobs response isn't useful for PeerDAS, so we don't bother building and publishing data columns.
|
|
||||||
if num_fetched_blobs != num_expected_blobs {
|
if num_fetched_blobs != num_expected_blobs {
|
||||||
debug!(
|
// This scenario is not supposed to happen if the EL is spec compliant.
|
||||||
info = "Unable to compute data columns",
|
// It should either return all requested blobs or none, but NOT partial responses.
|
||||||
num_fetched_blobs, num_expected_blobs, "Not all blobs fetched from the EL"
|
// If we attempt to compute columns with partial blobs, we'd end up with invalid columns.
|
||||||
|
warn!(
|
||||||
|
num_fetched_blobs,
|
||||||
|
num_expected_blobs, "The EL did not return all requested blobs"
|
||||||
);
|
);
|
||||||
inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL);
|
inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL);
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
} else {
|
|
||||||
inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if chain
|
inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL);
|
||||||
.canonical_head
|
|
||||||
.fork_choice_read_lock()
|
if chain_adapter.fork_choice_contains_block(&block_root) {
|
||||||
.contains_block(&block_root)
|
// Avoid computing columns if the block has already been imported.
|
||||||
{
|
|
||||||
// Avoid computing columns if block has already been imported.
|
|
||||||
debug!(
|
debug!(
|
||||||
info = "block has already been imported",
|
info = "block has already been imported",
|
||||||
"Ignoring EL blobs response"
|
"Ignoring EL blobs response"
|
||||||
@@ -251,7 +279,7 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let custody_columns = compute_and_publish_data_columns(
|
let custody_columns = compute_and_publish_data_columns(
|
||||||
&chain,
|
&chain_adapter,
|
||||||
block.clone(),
|
block.clone(),
|
||||||
blobs,
|
blobs,
|
||||||
proofs,
|
proofs,
|
||||||
@@ -262,29 +290,30 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
|
|||||||
|
|
||||||
debug!(num_fetched_blobs, "Processing engine blobs");
|
debug!(num_fetched_blobs, "Processing engine blobs");
|
||||||
|
|
||||||
let availability_processing_status = chain
|
let availability_processing_status = chain_adapter
|
||||||
.process_engine_blobs(
|
.process_engine_blobs(
|
||||||
block.slot(),
|
block.slot(),
|
||||||
block_root,
|
block_root,
|
||||||
EngineGetBlobsOutput::CustodyColumns(custody_columns),
|
EngineGetBlobsOutput::CustodyColumns(custody_columns),
|
||||||
)
|
)
|
||||||
.await
|
.await?;
|
||||||
.map_err(FetchEngineBlobError::BlobProcessingError)?;
|
|
||||||
|
|
||||||
Ok(Some(availability_processing_status))
|
Ok(Some(availability_processing_status))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Offload the data column computation to a blocking task to avoid holding up the async runtime.
|
/// Offload the data column computation to a blocking task to avoid holding up the async runtime.
|
||||||
async fn compute_and_publish_data_columns<T: BeaconChainTypes>(
|
async fn compute_and_publish_data_columns<T: BeaconChainTypes>(
|
||||||
chain: &Arc<BeaconChain<T>>,
|
chain_adapter: &FetchBlobsBeaconAdapter<T>,
|
||||||
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
|
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
|
||||||
blobs: Vec<Blob<T::EthSpec>>,
|
blobs: Vec<Blob<T::EthSpec>>,
|
||||||
proofs: Vec<KzgProofs<T::EthSpec>>,
|
proofs: Vec<KzgProofs<T::EthSpec>>,
|
||||||
custody_columns_indices: HashSet<ColumnIndex>,
|
custody_columns_indices: HashSet<ColumnIndex>,
|
||||||
publish_fn: impl Fn(BlobsOrDataColumns<T>) + Send + 'static,
|
publish_fn: impl Fn(BlobsOrDataColumns<T>) + Send + 'static,
|
||||||
) -> Result<DataColumnSidecarList<T::EthSpec>, FetchEngineBlobError> {
|
) -> Result<DataColumnSidecarList<T::EthSpec>, FetchEngineBlobError> {
|
||||||
let chain_cloned = chain.clone();
|
let kzg = chain_adapter.kzg().clone();
|
||||||
chain
|
let spec = chain_adapter.spec().clone();
|
||||||
|
chain_adapter
|
||||||
|
.executor()
|
||||||
.spawn_blocking_handle(
|
.spawn_blocking_handle(
|
||||||
move || {
|
move || {
|
||||||
let mut timer = metrics::start_timer_vec(
|
let mut timer = metrics::start_timer_vec(
|
||||||
@@ -294,14 +323,9 @@ async fn compute_and_publish_data_columns<T: BeaconChainTypes>(
|
|||||||
|
|
||||||
let blob_refs = blobs.iter().collect::<Vec<_>>();
|
let blob_refs = blobs.iter().collect::<Vec<_>>();
|
||||||
let cell_proofs = proofs.into_iter().flatten().collect();
|
let cell_proofs = proofs.into_iter().flatten().collect();
|
||||||
let data_columns_result = blobs_to_data_column_sidecars(
|
let data_columns_result =
|
||||||
&blob_refs,
|
blobs_to_data_column_sidecars(&blob_refs, cell_proofs, &block, &kzg, &spec)
|
||||||
cell_proofs,
|
.discard_timer_on_break(&mut timer);
|
||||||
&block,
|
|
||||||
&chain_cloned.kzg,
|
|
||||||
&chain_cloned.spec,
|
|
||||||
)
|
|
||||||
.discard_timer_on_break(&mut timer);
|
|
||||||
drop(timer);
|
drop(timer);
|
||||||
|
|
||||||
// This filtering ensures we only import and publish the custody columns.
|
// This filtering ensures we only import and publish the custody columns.
|
||||||
@@ -319,9 +343,9 @@ async fn compute_and_publish_data_columns<T: BeaconChainTypes>(
|
|||||||
},
|
},
|
||||||
"compute_and_publish_data_columns",
|
"compute_and_publish_data_columns",
|
||||||
)
|
)
|
||||||
|
.ok_or(FetchEngineBlobError::RuntimeShutdown)?
|
||||||
.await
|
.await
|
||||||
.map_err(|e| FetchEngineBlobError::BeaconChainError(Box::new(e)))
|
.map_err(FetchEngineBlobError::TokioJoin)?
|
||||||
.and_then(|r| r)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn build_blob_sidecars<E: EthSpec>(
|
fn build_blob_sidecars<E: EthSpec>(
|
||||||
278
beacon_node/beacon_chain/src/fetch_blobs/tests.rs
Normal file
278
beacon_node/beacon_chain/src/fetch_blobs/tests.rs
Normal file
@@ -0,0 +1,278 @@
|
|||||||
|
use crate::fetch_blobs::fetch_blobs_beacon_adapter::MockFetchBlobsBeaconAdapter;
|
||||||
|
use crate::fetch_blobs::{
|
||||||
|
fetch_and_process_engine_blobs_inner, BlobsOrDataColumns, FetchEngineBlobError,
|
||||||
|
};
|
||||||
|
use crate::test_utils::{get_kzg, EphemeralHarnessType};
|
||||||
|
use crate::AvailabilityProcessingStatus;
|
||||||
|
use bls::Signature;
|
||||||
|
use eth2::types::BlobsBundle;
|
||||||
|
use execution_layer::json_structures::BlobAndProofV2;
|
||||||
|
use execution_layer::test_utils::generate_blobs;
|
||||||
|
use maplit::hashset;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use task_executor::test_utils::TestRuntime;
|
||||||
|
use types::{
|
||||||
|
BeaconBlockFulu, EmptyBlock, EthSpec, ForkName, Hash256, MainnetEthSpec, SignedBeaconBlock,
|
||||||
|
SignedBeaconBlockFulu,
|
||||||
|
};
|
||||||
|
|
||||||
|
type E = MainnetEthSpec;
|
||||||
|
type T = EphemeralHarnessType<E>;
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn test_fetch_blobs_v2_no_blobs_in_block() {
|
||||||
|
let mut mock_adapter = mock_beacon_adapter();
|
||||||
|
let (publish_fn, _s) = mock_publish_fn();
|
||||||
|
let block = SignedBeaconBlock::<E>::Fulu(SignedBeaconBlockFulu {
|
||||||
|
message: BeaconBlockFulu::empty(mock_adapter.spec()),
|
||||||
|
signature: Signature::empty(),
|
||||||
|
});
|
||||||
|
let block_root = block.canonical_root();
|
||||||
|
|
||||||
|
// Expectations: engine fetch blobs should not be triggered
|
||||||
|
mock_adapter.expect_get_blobs_v2().times(0);
|
||||||
|
mock_adapter.expect_process_engine_blobs().times(0);
|
||||||
|
|
||||||
|
let custody_columns = hashset![0, 1, 2];
|
||||||
|
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||||
|
mock_adapter,
|
||||||
|
block_root,
|
||||||
|
Arc::new(block),
|
||||||
|
custody_columns.clone(),
|
||||||
|
publish_fn,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("fetch blobs should succeed");
|
||||||
|
|
||||||
|
assert_eq!(processing_status, None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn test_fetch_blobs_v2_no_blobs_returned() {
|
||||||
|
let mut mock_adapter = mock_beacon_adapter();
|
||||||
|
let (publish_fn, _) = mock_publish_fn();
|
||||||
|
let (block, _blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter);
|
||||||
|
let block_root = block.canonical_root();
|
||||||
|
|
||||||
|
// No blobs in EL response
|
||||||
|
mock_get_blobs_v2_response(&mut mock_adapter, None);
|
||||||
|
|
||||||
|
// Trigger fetch blobs on the block
|
||||||
|
let custody_columns = hashset![0, 1, 2];
|
||||||
|
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||||
|
mock_adapter,
|
||||||
|
block_root,
|
||||||
|
block,
|
||||||
|
custody_columns.clone(),
|
||||||
|
publish_fn,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("fetch blobs should succeed");
|
||||||
|
|
||||||
|
assert_eq!(processing_status, None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn test_fetch_blobs_v2_partial_blobs_returned() {
|
||||||
|
let mut mock_adapter = mock_beacon_adapter();
|
||||||
|
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||||
|
let (block, mut blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter);
|
||||||
|
let block_root = block.canonical_root();
|
||||||
|
|
||||||
|
// Missing blob in EL response
|
||||||
|
blobs_and_proofs.pop();
|
||||||
|
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
|
||||||
|
// No blobs should be processed
|
||||||
|
mock_adapter.expect_process_engine_blobs().times(0);
|
||||||
|
|
||||||
|
// Trigger fetch blobs on the block
|
||||||
|
let custody_columns = hashset![0, 1, 2];
|
||||||
|
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||||
|
mock_adapter,
|
||||||
|
block_root,
|
||||||
|
block,
|
||||||
|
custody_columns.clone(),
|
||||||
|
publish_fn,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("fetch blobs should succeed");
|
||||||
|
|
||||||
|
assert_eq!(processing_status, None);
|
||||||
|
assert_eq!(
|
||||||
|
publish_fn_args.lock().unwrap().len(),
|
||||||
|
0,
|
||||||
|
"no columns should be published"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn test_fetch_blobs_v2_block_imported_after_el_response() {
|
||||||
|
let mut mock_adapter = mock_beacon_adapter();
|
||||||
|
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||||
|
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter);
|
||||||
|
let block_root = block.canonical_root();
|
||||||
|
|
||||||
|
// All blobs returned, but fork choice already imported the block
|
||||||
|
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
|
||||||
|
mock_fork_choice_contains_block(&mut mock_adapter, vec![block.canonical_root()]);
|
||||||
|
// No blobs should be processed
|
||||||
|
mock_adapter.expect_process_engine_blobs().times(0);
|
||||||
|
|
||||||
|
// Trigger fetch blobs on the block
|
||||||
|
let custody_columns = hashset![0, 1, 2];
|
||||||
|
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||||
|
mock_adapter,
|
||||||
|
block_root,
|
||||||
|
block,
|
||||||
|
custody_columns.clone(),
|
||||||
|
publish_fn,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("fetch blobs should succeed");
|
||||||
|
|
||||||
|
assert_eq!(processing_status, None);
|
||||||
|
assert_eq!(
|
||||||
|
publish_fn_args.lock().unwrap().len(),
|
||||||
|
0,
|
||||||
|
"no columns should be published"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn test_fetch_blobs_v2_success() {
|
||||||
|
let mut mock_adapter = mock_beacon_adapter();
|
||||||
|
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||||
|
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter);
|
||||||
|
let block_root = block.canonical_root();
|
||||||
|
|
||||||
|
// All blobs returned, fork choice doesn't contain block
|
||||||
|
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
|
||||||
|
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
|
||||||
|
mock_process_engine_blobs_result(
|
||||||
|
&mut mock_adapter,
|
||||||
|
Ok(AvailabilityProcessingStatus::Imported(block_root)),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Trigger fetch blobs on the block
|
||||||
|
let custody_columns = hashset![0, 1, 2];
|
||||||
|
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||||
|
mock_adapter,
|
||||||
|
block_root,
|
||||||
|
block,
|
||||||
|
custody_columns.clone(),
|
||||||
|
publish_fn,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("fetch blobs should succeed");
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
processing_status,
|
||||||
|
Some(AvailabilityProcessingStatus::Imported(block_root))
|
||||||
|
);
|
||||||
|
|
||||||
|
let published_columns = extract_published_blobs(publish_fn_args);
|
||||||
|
assert!(
|
||||||
|
matches!(
|
||||||
|
published_columns,
|
||||||
|
BlobsOrDataColumns::DataColumns (columns) if columns.len() == custody_columns.len()
|
||||||
|
),
|
||||||
|
"should publish custody columns"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extract the `BlobsOrDataColumns` passed to the `publish_fn`.
|
||||||
|
fn extract_published_blobs(
|
||||||
|
publish_fn_args: Arc<Mutex<Vec<BlobsOrDataColumns<T>>>>,
|
||||||
|
) -> BlobsOrDataColumns<T> {
|
||||||
|
let mut calls = publish_fn_args.lock().unwrap();
|
||||||
|
assert_eq!(calls.len(), 1);
|
||||||
|
calls.pop().unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn mock_process_engine_blobs_result(
|
||||||
|
mock_adapter: &mut MockFetchBlobsBeaconAdapter<T>,
|
||||||
|
result: Result<AvailabilityProcessingStatus, FetchEngineBlobError>,
|
||||||
|
) {
|
||||||
|
mock_adapter
|
||||||
|
.expect_process_engine_blobs()
|
||||||
|
.return_once(move |_, _, _| result);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn mock_fork_choice_contains_block(
|
||||||
|
mock_adapter: &mut MockFetchBlobsBeaconAdapter<T>,
|
||||||
|
block_roots: Vec<Hash256>,
|
||||||
|
) {
|
||||||
|
mock_adapter
|
||||||
|
.expect_fork_choice_contains_block()
|
||||||
|
.returning(move |block_root| block_roots.contains(block_root));
|
||||||
|
}
|
||||||
|
|
||||||
|
fn mock_get_blobs_v2_response(
|
||||||
|
mock_adapter: &mut MockFetchBlobsBeaconAdapter<T>,
|
||||||
|
blobs_and_proofs_opt: Option<Vec<BlobAndProofV2<E>>>,
|
||||||
|
) {
|
||||||
|
mock_adapter
|
||||||
|
.expect_get_blobs_v2()
|
||||||
|
.return_once(move |_| Ok(blobs_and_proofs_opt));
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_test_block_and_blobs(
|
||||||
|
mock_adapter: &MockFetchBlobsBeaconAdapter<T>,
|
||||||
|
) -> (Arc<SignedBeaconBlock<E>>, Vec<BlobAndProofV2<E>>) {
|
||||||
|
let mut block = SignedBeaconBlock::Fulu(SignedBeaconBlockFulu {
|
||||||
|
message: BeaconBlockFulu::empty(mock_adapter.spec()),
|
||||||
|
signature: Signature::empty(),
|
||||||
|
});
|
||||||
|
let (blobs_bundle, _tx) = generate_blobs::<E>(2, block.fork_name_unchecked()).unwrap();
|
||||||
|
let BlobsBundle {
|
||||||
|
commitments,
|
||||||
|
proofs,
|
||||||
|
blobs,
|
||||||
|
} = blobs_bundle;
|
||||||
|
|
||||||
|
*block
|
||||||
|
.message_mut()
|
||||||
|
.body_mut()
|
||||||
|
.blob_kzg_commitments_mut()
|
||||||
|
.unwrap() = commitments;
|
||||||
|
|
||||||
|
let proofs_len = proofs.len() / blobs.len();
|
||||||
|
let blob_and_proofs: Vec<BlobAndProofV2<E>> = blobs
|
||||||
|
.into_iter()
|
||||||
|
.zip(proofs.chunks(proofs_len))
|
||||||
|
.map(|(blob, proofs)| BlobAndProofV2 {
|
||||||
|
blob,
|
||||||
|
proofs: proofs.to_vec().into(),
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
(Arc::new(block), blob_and_proofs)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::type_complexity)]
|
||||||
|
fn mock_publish_fn() -> (
|
||||||
|
impl Fn(BlobsOrDataColumns<T>) + Send + 'static,
|
||||||
|
Arc<Mutex<Vec<BlobsOrDataColumns<T>>>>,
|
||||||
|
) {
|
||||||
|
// Keep track of the arguments captured by `publish_fn`.
|
||||||
|
let captured_args = Arc::new(Mutex::new(vec![]));
|
||||||
|
let captured_args_clone = captured_args.clone();
|
||||||
|
let publish_fn = move |args| {
|
||||||
|
let mut lock = captured_args_clone.lock().unwrap();
|
||||||
|
lock.push(args);
|
||||||
|
};
|
||||||
|
(publish_fn, captured_args)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn mock_beacon_adapter() -> MockFetchBlobsBeaconAdapter<T> {
|
||||||
|
let test_runtime = TestRuntime::default();
|
||||||
|
let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec()));
|
||||||
|
let kzg = get_kzg(&spec);
|
||||||
|
|
||||||
|
let mut mock_adapter = MockFetchBlobsBeaconAdapter::default();
|
||||||
|
mock_adapter.expect_spec().return_const(spec.clone());
|
||||||
|
mock_adapter.expect_kzg().return_const(kzg.clone());
|
||||||
|
mock_adapter
|
||||||
|
.expect_executor()
|
||||||
|
.return_const(test_runtime.task_executor.clone());
|
||||||
|
mock_adapter
|
||||||
|
}
|
||||||
@@ -727,7 +727,7 @@ impl HttpJsonRpc {
|
|||||||
pub async fn get_blobs_v2<E: EthSpec>(
|
pub async fn get_blobs_v2<E: EthSpec>(
|
||||||
&self,
|
&self,
|
||||||
versioned_hashes: Vec<Hash256>,
|
versioned_hashes: Vec<Hash256>,
|
||||||
) -> Result<Vec<Option<BlobAndProofV2<E>>>, Error> {
|
) -> Result<Option<Vec<BlobAndProofV2<E>>>, Error> {
|
||||||
let params = json!([versioned_hashes]);
|
let params = json!([versioned_hashes]);
|
||||||
|
|
||||||
self.rpc_request(
|
self.rpc_request(
|
||||||
|
|||||||
@@ -1864,7 +1864,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
|
|||||||
pub async fn get_blobs_v2(
|
pub async fn get_blobs_v2(
|
||||||
&self,
|
&self,
|
||||||
query: Vec<Hash256>,
|
query: Vec<Hash256>,
|
||||||
) -> Result<Vec<Option<BlobAndProofV2<E>>>, Error> {
|
) -> Result<Option<Vec<BlobAndProofV2<E>>>, Error> {
|
||||||
let capabilities = self.get_engine_capabilities(None).await?;
|
let capabilities = self.get_engine_capabilities(None).await?;
|
||||||
|
|
||||||
if capabilities.get_blobs_v2 {
|
if capabilities.get_blobs_v2 {
|
||||||
|
|||||||
Reference in New Issue
Block a user