mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-16 20:39:10 +00:00
Implement /eth/v1/beacon/blobs endpoint (#8103)
* #8085 Co-Authored-By: Tan Chee Keong <tanck@sigmaprime.io> Co-Authored-By: chonghe <44791194+chong-he@users.noreply.github.com>
This commit is contained in:
@@ -299,6 +299,8 @@ pub(crate) fn build_data_column_sidecars<E: EthSpec>(
|
||||
///
|
||||
/// If `blob_indices_opt` is `None`, this function attempts to reconstruct all blobs associated
|
||||
/// with the block.
|
||||
/// This function does NOT use rayon as this is primarily used by a non critical path in HTTP API
|
||||
/// and it will be slow if the node needs to reconstruct the blobs
|
||||
pub fn reconstruct_blobs<E: EthSpec>(
|
||||
kzg: &Kzg,
|
||||
data_columns: &[Arc<DataColumnSidecar<E>>],
|
||||
@@ -320,7 +322,7 @@ pub fn reconstruct_blobs<E: EthSpec>(
|
||||
};
|
||||
|
||||
let blob_sidecars = blob_indices
|
||||
.into_par_iter()
|
||||
.into_iter()
|
||||
.map(|row_index| {
|
||||
let mut cells: Vec<KzgCellRef> = vec![];
|
||||
let mut cell_ids: Vec<u64> = vec![];
|
||||
@@ -337,16 +339,26 @@ pub fn reconstruct_blobs<E: EthSpec>(
|
||||
cell_ids.push(data_column.index);
|
||||
}
|
||||
|
||||
let (cells, _kzg_proofs) = kzg
|
||||
.recover_cells_and_compute_kzg_proofs(&cell_ids, &cells)
|
||||
.map_err(|e| format!("Failed to recover cells and compute KZG proofs: {e:?}"))?;
|
||||
let num_cells_original_blob = E::number_of_columns() / 2;
|
||||
let blob_bytes = if data_columns.len() < E::number_of_columns() {
|
||||
let (recovered_cells, _kzg_proofs) = kzg
|
||||
.recover_cells_and_compute_kzg_proofs(&cell_ids, &cells)
|
||||
.map_err(|e| {
|
||||
format!("Failed to recover cells and compute KZG proofs: {e:?}")
|
||||
})?;
|
||||
|
||||
let num_cells_original_blob = cells.len() / 2;
|
||||
let blob_bytes = cells
|
||||
.into_iter()
|
||||
.take(num_cells_original_blob)
|
||||
.flat_map(|cell| cell.into_iter())
|
||||
.collect();
|
||||
recovered_cells
|
||||
.into_iter()
|
||||
.take(num_cells_original_blob)
|
||||
.flat_map(|cell| cell.into_iter())
|
||||
.collect()
|
||||
} else {
|
||||
cells
|
||||
.into_iter()
|
||||
.take(num_cells_original_blob)
|
||||
.flat_map(|cell| (*cell).into_iter())
|
||||
.collect()
|
||||
};
|
||||
|
||||
let blob = Blob::<E>::new(blob_bytes).map_err(|e| format!("{e:?}"))?;
|
||||
let kzg_proof = KzgProof::empty();
|
||||
|
||||
@@ -412,7 +412,7 @@ where
|
||||
let blobs = if block.message().body().has_blobs() {
|
||||
debug!("Downloading finalized blobs");
|
||||
if let Some(response) = remote
|
||||
.get_blobs::<E>(BlockId::Root(block_root), None, &spec)
|
||||
.get_blob_sidecars::<E>(BlockId::Root(block_root), None, &spec)
|
||||
.await
|
||||
.map_err(|e| format!("Error fetching finalized blobs from remote: {e:?}"))?
|
||||
{
|
||||
|
||||
@@ -2,15 +2,16 @@ use crate::version::inconsistent_fork_rejection;
|
||||
use crate::{ExecutionOptimistic, state_id::checkpoint_slot_and_execution_optimistic};
|
||||
use beacon_chain::kzg_utils::reconstruct_blobs;
|
||||
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes, WhenSlotSkipped};
|
||||
use eth2::types::BlobIndicesQuery;
|
||||
use eth2::types::BlockId as CoreBlockId;
|
||||
use eth2::types::DataColumnIndicesQuery;
|
||||
use eth2::types::{BlobIndicesQuery, BlobWrapper, BlobsVersionedHashesQuery};
|
||||
use std::fmt;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use types::{
|
||||
BlobSidecarList, DataColumnSidecarList, EthSpec, FixedBytesExtended, ForkName, Hash256,
|
||||
SignedBeaconBlock, SignedBlindedBeaconBlock, Slot,
|
||||
SignedBeaconBlock, SignedBlindedBeaconBlock, Slot, UnversionedResponse,
|
||||
beacon_response::ExecutionOptimisticFinalizedMetadata,
|
||||
};
|
||||
use warp::Rejection;
|
||||
|
||||
@@ -352,6 +353,68 @@ impl BlockId {
|
||||
Ok((block, blob_sidecar_list, execution_optimistic, finalized))
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub fn get_blobs_by_versioned_hashes<T: BeaconChainTypes>(
|
||||
&self,
|
||||
query: BlobsVersionedHashesQuery,
|
||||
chain: &BeaconChain<T>,
|
||||
) -> Result<
|
||||
UnversionedResponse<Vec<BlobWrapper<T::EthSpec>>, ExecutionOptimisticFinalizedMetadata>,
|
||||
warp::Rejection,
|
||||
> {
|
||||
let (root, execution_optimistic, finalized) = self.root(chain)?;
|
||||
let block = BlockId::blinded_block_by_root(&root, chain)?.ok_or_else(|| {
|
||||
warp_utils::reject::custom_not_found(format!("beacon block with root {}", root))
|
||||
})?;
|
||||
|
||||
// Error if the block is pre-Deneb and lacks blobs.
|
||||
let blob_kzg_commitments = block.message().body().blob_kzg_commitments().map_err(|_| {
|
||||
warp_utils::reject::custom_bad_request(
|
||||
"block is pre-Deneb and has no blobs".to_string(),
|
||||
)
|
||||
})?;
|
||||
|
||||
let blob_indices_opt = query.versioned_hashes.map(|versioned_hashes| {
|
||||
versioned_hashes
|
||||
.iter()
|
||||
.flat_map(|versioned_hash| {
|
||||
blob_kzg_commitments.iter().position(|commitment| {
|
||||
let computed_hash = commitment.calculate_versioned_hash();
|
||||
computed_hash == *versioned_hash
|
||||
})
|
||||
})
|
||||
.map(|index| index as u64)
|
||||
.collect::<Vec<_>>()
|
||||
});
|
||||
|
||||
let max_blobs_per_block = chain.spec.max_blobs_per_block(block.epoch()) as usize;
|
||||
let blob_sidecar_list = if !blob_kzg_commitments.is_empty() {
|
||||
if chain.spec.is_peer_das_enabled_for_epoch(block.epoch()) {
|
||||
Self::get_blobs_from_data_columns(chain, root, blob_indices_opt, &block)?
|
||||
} else {
|
||||
Self::get_blobs(chain, root, blob_indices_opt, max_blobs_per_block)?
|
||||
}
|
||||
} else {
|
||||
BlobSidecarList::new(vec![], max_blobs_per_block)
|
||||
.map_err(|e| warp_utils::reject::custom_server_error(format!("{:?}", e)))?
|
||||
};
|
||||
|
||||
let blobs = blob_sidecar_list
|
||||
.into_iter()
|
||||
.map(|sidecar| BlobWrapper::<T::EthSpec> {
|
||||
blob: sidecar.blob.clone(),
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(UnversionedResponse {
|
||||
metadata: ExecutionOptimisticFinalizedMetadata {
|
||||
execution_optimistic: Some(execution_optimistic),
|
||||
finalized: Some(finalized),
|
||||
},
|
||||
data: blobs,
|
||||
})
|
||||
}
|
||||
|
||||
fn get_blobs<T: BeaconChainTypes>(
|
||||
chain: &BeaconChain<T>,
|
||||
root: Hash256,
|
||||
@@ -369,9 +432,9 @@ impl BlockId {
|
||||
|
||||
let blob_sidecar_list_filtered = match indices {
|
||||
Some(vec) => {
|
||||
let list: Vec<_> = blob_sidecar_list
|
||||
let list: Vec<_> = vec
|
||||
.into_iter()
|
||||
.filter(|blob_sidecar| vec.contains(&blob_sidecar.index))
|
||||
.flat_map(|index| blob_sidecar_list.get(index as usize).cloned())
|
||||
.collect();
|
||||
|
||||
BlobSidecarList::new(list, max_blobs_per_block)
|
||||
|
||||
@@ -214,6 +214,7 @@ pub fn prometheus_metrics() -> warp::filters::log::Log<impl Fn(warp::filters::lo
|
||||
equals("v1/beacon/blocks")
|
||||
.or_else(|| starts_with("v2/beacon/blocks"))
|
||||
.or_else(|| starts_with("v1/beacon/blob_sidecars"))
|
||||
.or_else(|| starts_with("v1/beacon/blobs"))
|
||||
.or_else(|| starts_with("v1/beacon/blocks/head/root"))
|
||||
.or_else(|| starts_with("v1/beacon/blinded_blocks"))
|
||||
.or_else(|| starts_with("v2/beacon/blinded_blocks"))
|
||||
@@ -1897,7 +1898,7 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
*/
|
||||
|
||||
// GET beacon/blob_sidecars/{block_id}
|
||||
let get_blobs = eth_v1
|
||||
let get_blob_sidecars = eth_v1
|
||||
.and(warp::path("beacon"))
|
||||
.and(warp::path("blob_sidecars"))
|
||||
.and(block_id_or_err)
|
||||
@@ -1947,6 +1948,52 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
},
|
||||
);
|
||||
|
||||
// GET beacon/blobs/{block_id}
|
||||
let get_blobs = eth_v1
|
||||
.and(warp::path("beacon"))
|
||||
.and(warp::path("blobs"))
|
||||
.and(block_id_or_err)
|
||||
.and(warp::path::end())
|
||||
.and(multi_key_query::<api_types::BlobsVersionedHashesQuery>())
|
||||
.and(task_spawner_filter.clone())
|
||||
.and(chain_filter.clone())
|
||||
.and(warp::header::optional::<api_types::Accept>("accept"))
|
||||
.then(
|
||||
|block_id: BlockId,
|
||||
version_hashes_res: Result<api_types::BlobsVersionedHashesQuery, warp::Rejection>,
|
||||
task_spawner: TaskSpawner<T::EthSpec>,
|
||||
chain: Arc<BeaconChain<T>>,
|
||||
accept_header: Option<api_types::Accept>| {
|
||||
task_spawner.blocking_response_task(Priority::P1, move || {
|
||||
let versioned_hashes = version_hashes_res?;
|
||||
let response =
|
||||
block_id.get_blobs_by_versioned_hashes(versioned_hashes, &chain)?;
|
||||
|
||||
match accept_header {
|
||||
Some(api_types::Accept::Ssz) => Response::builder()
|
||||
.status(200)
|
||||
.body(response.data.as_ssz_bytes().into())
|
||||
.map(|res: Response<Body>| add_ssz_content_type_header(res))
|
||||
.map_err(|e| {
|
||||
warp_utils::reject::custom_server_error(format!(
|
||||
"failed to create response: {}",
|
||||
e
|
||||
))
|
||||
}),
|
||||
_ => {
|
||||
let res = execution_optimistic_finalized_beacon_response(
|
||||
ResponseIncludesVersion::No,
|
||||
response.metadata.execution_optimistic.unwrap_or(false),
|
||||
response.metadata.finalized.unwrap_or(false),
|
||||
response.data,
|
||||
)?;
|
||||
Ok(warp::reply::json(&res).into_response())
|
||||
}
|
||||
}
|
||||
})
|
||||
},
|
||||
);
|
||||
|
||||
/*
|
||||
* beacon/pool
|
||||
*/
|
||||
@@ -4794,6 +4841,7 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
.uor(get_beacon_block_attestations)
|
||||
.uor(get_beacon_blinded_block)
|
||||
.uor(get_beacon_block_root)
|
||||
.uor(get_blob_sidecars)
|
||||
.uor(get_blobs)
|
||||
.uor(get_beacon_pool_attestations)
|
||||
.uor(get_beacon_pool_attester_slashings)
|
||||
|
||||
@@ -90,6 +90,7 @@ struct ApiTester {
|
||||
struct ApiTesterConfig {
|
||||
spec: ChainSpec,
|
||||
retain_historic_states: bool,
|
||||
import_all_data_columns: bool,
|
||||
}
|
||||
|
||||
impl Default for ApiTesterConfig {
|
||||
@@ -99,6 +100,7 @@ impl Default for ApiTesterConfig {
|
||||
Self {
|
||||
spec,
|
||||
retain_historic_states: false,
|
||||
import_all_data_columns: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -137,6 +139,7 @@ impl ApiTester {
|
||||
.deterministic_withdrawal_keypairs(VALIDATOR_COUNT)
|
||||
.fresh_ephemeral_store()
|
||||
.mock_execution_layer()
|
||||
.import_all_data_columns(config.import_all_data_columns)
|
||||
.build();
|
||||
|
||||
harness
|
||||
@@ -441,10 +444,7 @@ impl ApiTester {
|
||||
}
|
||||
|
||||
pub async fn new_mev_tester_default_payload_value() -> Self {
|
||||
let mut config = ApiTesterConfig {
|
||||
retain_historic_states: false,
|
||||
spec: E::default_spec(),
|
||||
};
|
||||
let mut config = ApiTesterConfig::default();
|
||||
config.spec.altair_fork_epoch = Some(Epoch::new(0));
|
||||
config.spec.bellatrix_fork_epoch = Some(Epoch::new(0));
|
||||
let tester = Self::new_from_config(config)
|
||||
@@ -1858,7 +1858,7 @@ impl ApiTester {
|
||||
};
|
||||
let result = match self
|
||||
.client
|
||||
.get_blobs::<E>(
|
||||
.get_blob_sidecars::<E>(
|
||||
CoreBlockId::Root(block_root),
|
||||
blob_indices.as_deref(),
|
||||
&self.chain.spec,
|
||||
@@ -1879,6 +1879,77 @@ impl ApiTester {
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn test_get_blobs(self, versioned_hashes: bool) -> Self {
|
||||
let block_id = BlockId(CoreBlockId::Finalized);
|
||||
let (block_root, _, _) = block_id.root(&self.chain).unwrap();
|
||||
let (block, _, _) = block_id.full_block(&self.chain).await.unwrap();
|
||||
let num_blobs = block.num_expected_blobs();
|
||||
|
||||
let versioned_hashes: Option<Vec<Hash256>> = if versioned_hashes {
|
||||
Some(
|
||||
block
|
||||
.message()
|
||||
.body()
|
||||
.blob_kzg_commitments()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|commitment| commitment.calculate_versioned_hash())
|
||||
.collect(),
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let result = match self
|
||||
.client
|
||||
.get_blobs::<E>(CoreBlockId::Root(block_root), versioned_hashes.as_deref())
|
||||
.await
|
||||
{
|
||||
Ok(response) => response.unwrap().into_data(),
|
||||
Err(e) => panic!("query failed incorrectly: {e:?}"),
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
result.len(),
|
||||
versioned_hashes.map_or(num_blobs, |versioned_hashes| versioned_hashes.len())
|
||||
);
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn test_get_blobs_post_fulu_full_node(self, versioned_hashes: bool) -> Self {
|
||||
let block_id = BlockId(CoreBlockId::Finalized);
|
||||
let (block_root, _, _) = block_id.root(&self.chain).unwrap();
|
||||
let (block, _, _) = block_id.full_block(&self.chain).await.unwrap();
|
||||
|
||||
let versioned_hashes: Option<Vec<Hash256>> = if versioned_hashes {
|
||||
Some(
|
||||
block
|
||||
.message()
|
||||
.body()
|
||||
.blob_kzg_commitments()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|commitment| commitment.calculate_versioned_hash())
|
||||
.collect(),
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
match self
|
||||
.client
|
||||
.get_blobs::<E>(CoreBlockId::Root(block_root), versioned_hashes.as_deref())
|
||||
.await
|
||||
{
|
||||
Ok(result) => panic!("Full node are unable to return blobs post-Fulu: {result:?}"),
|
||||
// Post-Fulu, full nodes don't store blobs and return error 500
|
||||
Err(e) => assert_eq!(e.status().unwrap(), 500),
|
||||
};
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
/// Test fetching of blob sidecars that are not available in the database due to pruning.
|
||||
///
|
||||
/// If `zero_blobs` is false, test a block with >0 blobs, which should be unavailable.
|
||||
@@ -1918,7 +1989,7 @@ impl ApiTester {
|
||||
|
||||
match self
|
||||
.client
|
||||
.get_blobs::<E>(CoreBlockId::Slot(test_slot), None, &self.chain.spec)
|
||||
.get_blob_sidecars::<E>(CoreBlockId::Slot(test_slot), None, &self.chain.spec)
|
||||
.await
|
||||
{
|
||||
Ok(result) => {
|
||||
@@ -1956,7 +2027,7 @@ impl ApiTester {
|
||||
|
||||
match self
|
||||
.client
|
||||
.get_blobs::<E>(CoreBlockId::Slot(test_slot), None, &self.chain.spec)
|
||||
.get_blob_sidecars::<E>(CoreBlockId::Slot(test_slot), None, &self.chain.spec)
|
||||
.await
|
||||
{
|
||||
Ok(result) => panic!("queries for pre-Deneb slots should fail. got: {result:?}"),
|
||||
@@ -7704,10 +7775,7 @@ async fn builder_payload_chosen_by_profit_v3() {
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn builder_works_post_capella() {
|
||||
let mut config = ApiTesterConfig {
|
||||
retain_historic_states: false,
|
||||
spec: E::default_spec(),
|
||||
};
|
||||
let mut config = ApiTesterConfig::default();
|
||||
config.spec.altair_fork_epoch = Some(Epoch::new(0));
|
||||
config.spec.bellatrix_fork_epoch = Some(Epoch::new(0));
|
||||
config.spec.capella_fork_epoch = Some(Epoch::new(0));
|
||||
@@ -7724,10 +7792,7 @@ async fn builder_works_post_capella() {
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn builder_works_post_deneb() {
|
||||
let mut config = ApiTesterConfig {
|
||||
retain_historic_states: false,
|
||||
spec: E::default_spec(),
|
||||
};
|
||||
let mut config = ApiTesterConfig::default();
|
||||
config.spec.altair_fork_epoch = Some(Epoch::new(0));
|
||||
config.spec.bellatrix_fork_epoch = Some(Epoch::new(0));
|
||||
config.spec.capella_fork_epoch = Some(Epoch::new(0));
|
||||
@@ -7745,10 +7810,7 @@ async fn builder_works_post_deneb() {
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn get_blob_sidecars() {
|
||||
let mut config = ApiTesterConfig {
|
||||
retain_historic_states: false,
|
||||
spec: E::default_spec(),
|
||||
};
|
||||
let mut config = ApiTesterConfig::default();
|
||||
config.spec.altair_fork_epoch = Some(Epoch::new(0));
|
||||
config.spec.bellatrix_fork_epoch = Some(Epoch::new(0));
|
||||
config.spec.capella_fork_epoch = Some(Epoch::new(0));
|
||||
@@ -7761,6 +7823,53 @@ async fn get_blob_sidecars() {
|
||||
.test_get_blob_sidecars(false)
|
||||
.await
|
||||
.test_get_blob_sidecars(true)
|
||||
.await
|
||||
.test_get_blobs(false)
|
||||
.await
|
||||
.test_get_blobs(true)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn get_blobs_post_fulu_supernode() {
|
||||
let mut config = ApiTesterConfig {
|
||||
retain_historic_states: false,
|
||||
spec: E::default_spec(),
|
||||
// For supernode, we import all data columns
|
||||
import_all_data_columns: true,
|
||||
};
|
||||
config.spec.altair_fork_epoch = Some(Epoch::new(0));
|
||||
config.spec.bellatrix_fork_epoch = Some(Epoch::new(0));
|
||||
config.spec.capella_fork_epoch = Some(Epoch::new(0));
|
||||
config.spec.deneb_fork_epoch = Some(Epoch::new(0));
|
||||
config.spec.electra_fork_epoch = Some(Epoch::new(0));
|
||||
config.spec.fulu_fork_epoch = Some(Epoch::new(0));
|
||||
|
||||
ApiTester::new_from_config(config)
|
||||
.await
|
||||
// We can call the same get_blobs function in this test
|
||||
// because the function will call get_blobs_by_versioned_hashes which handles peerDAS post-Fulu
|
||||
.test_get_blobs(false)
|
||||
.await
|
||||
.test_get_blobs(true)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn get_blobs_post_fulu_full_node() {
|
||||
let mut config = ApiTesterConfig::default();
|
||||
config.spec.altair_fork_epoch = Some(Epoch::new(0));
|
||||
config.spec.bellatrix_fork_epoch = Some(Epoch::new(0));
|
||||
config.spec.capella_fork_epoch = Some(Epoch::new(0));
|
||||
config.spec.deneb_fork_epoch = Some(Epoch::new(0));
|
||||
config.spec.electra_fork_epoch = Some(Epoch::new(0));
|
||||
config.spec.fulu_fork_epoch = Some(Epoch::new(0));
|
||||
|
||||
ApiTester::new_from_config(config)
|
||||
.await
|
||||
.test_get_blobs_post_fulu_full_node(false)
|
||||
.await
|
||||
.test_get_blobs_post_fulu_full_node(true)
|
||||
.await;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user