mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-31 05:07:12 +00:00
initial straightforward merge changes
This commit is contained in:
@@ -13,31 +13,28 @@ mod fetch_blobs_beacon_adapter;
|
||||
mod tests;
|
||||
|
||||
use crate::blob_verification::{GossipBlobError, KzgVerifiedBlob};
|
||||
use crate::block_verification_types::AsBlock;
|
||||
use crate::data_column_verification::{KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn};
|
||||
use crate::data_column_verification::{
|
||||
KzgVerifiedCustodyDataColumn, KzgVerifiedCustodyPartialDataColumn, KzgVerifiedPartialDataColumn,
|
||||
};
|
||||
#[cfg_attr(test, double)]
|
||||
use crate::fetch_blobs::fetch_blobs_beacon_adapter::FetchBlobsBeaconAdapter;
|
||||
use crate::kzg_utils::blobs_to_data_column_sidecars;
|
||||
use crate::kzg_utils::blobs_to_partial_data_columns;
|
||||
use crate::observed_data_sidecars::ObservationKey;
|
||||
use crate::validator_monitor::timestamp_now;
|
||||
use crate::{
|
||||
AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, BlockError,
|
||||
metrics,
|
||||
};
|
||||
use execution_layer::Error as ExecutionLayerError;
|
||||
use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2};
|
||||
use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2, BlobAndProofV3};
|
||||
use metrics::{TryExt, inc_counter};
|
||||
#[cfg(test)]
|
||||
use mockall_double::double;
|
||||
use ssz_types::FixedVector;
|
||||
use slot_clock::timestamp_now;
|
||||
use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash;
|
||||
use std::sync::Arc;
|
||||
use tracing::{debug, instrument, warn};
|
||||
use types::data::{BlobSidecarError, DataColumnSidecarError};
|
||||
use types::{
|
||||
BeaconStateError, Blob, BlobSidecar, ColumnIndex, EthSpec, FullPayload, Hash256, KzgProofs,
|
||||
SignedBeaconBlock, SignedBeaconBlockHeader, VersionedHash,
|
||||
};
|
||||
use types::data::{BlobSidecarError, ColumnIndex, DataColumnSidecarError, PartialDataColumnHeader};
|
||||
use types::{BeaconStateError, BlobSidecar, EthSpec, Hash256, VersionedHash};
|
||||
|
||||
/// Result from engine get blobs to be passed onto `DataAvailabilityChecker` and published to the
|
||||
/// gossip network. The blobs / data columns have not been marked as observed yet, as they may not
|
||||
@@ -71,14 +68,14 @@ pub enum FetchEngineBlobError {
|
||||
pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
|
||||
chain: Arc<BeaconChain<T>>,
|
||||
block_root: Hash256,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
|
||||
header: Arc<PartialDataColumnHeader<T::EthSpec>>,
|
||||
custody_columns: &[ColumnIndex],
|
||||
publish_fn: impl Fn(EngineGetBlobsOutput<T>) + Send + 'static,
|
||||
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
|
||||
fetch_and_process_engine_blobs_inner(
|
||||
FetchBlobsBeaconAdapter::new(chain),
|
||||
block_root,
|
||||
block,
|
||||
header,
|
||||
custody_columns,
|
||||
publish_fn,
|
||||
)
|
||||
@@ -90,22 +87,16 @@ pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
|
||||
async fn fetch_and_process_engine_blobs_inner<T: BeaconChainTypes>(
|
||||
chain_adapter: FetchBlobsBeaconAdapter<T>,
|
||||
block_root: Hash256,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
|
||||
header: Arc<PartialDataColumnHeader<T::EthSpec>>,
|
||||
custody_columns: &[ColumnIndex],
|
||||
publish_fn: impl Fn(EngineGetBlobsOutput<T>) + Send + 'static,
|
||||
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
|
||||
let versioned_hashes = if let Some(kzg_commitments) = block
|
||||
.message()
|
||||
.body()
|
||||
.blob_kzg_commitments()
|
||||
.ok()
|
||||
.filter(|blobs| !blobs.is_empty())
|
||||
{
|
||||
kzg_commitments
|
||||
.iter()
|
||||
.map(kzg_commitment_to_versioned_hash)
|
||||
.collect::<Vec<_>>()
|
||||
} else {
|
||||
let versioned_hashes = header
|
||||
.kzg_commitments
|
||||
.iter()
|
||||
.map(kzg_commitment_to_versioned_hash)
|
||||
.collect::<Vec<_>>();
|
||||
if versioned_hashes.is_empty() {
|
||||
debug!("Fetch blobs not triggered - none required");
|
||||
return Ok(None);
|
||||
};
|
||||
@@ -117,12 +108,12 @@ async fn fetch_and_process_engine_blobs_inner<T: BeaconChainTypes>(
|
||||
|
||||
if chain_adapter
|
||||
.spec()
|
||||
.is_peer_das_enabled_for_epoch(block.epoch())
|
||||
.is_peer_das_enabled_for_epoch(header.slot().epoch(T::EthSpec::slots_per_epoch()))
|
||||
{
|
||||
fetch_and_process_blobs_v2(
|
||||
fetch_and_process_blobs_v2_or_v3(
|
||||
chain_adapter,
|
||||
block_root,
|
||||
block,
|
||||
header,
|
||||
versioned_hashes,
|
||||
custody_columns,
|
||||
publish_fn,
|
||||
@@ -132,7 +123,7 @@ async fn fetch_and_process_engine_blobs_inner<T: BeaconChainTypes>(
|
||||
fetch_and_process_blobs_v1(
|
||||
chain_adapter,
|
||||
block_root,
|
||||
block,
|
||||
&header,
|
||||
versioned_hashes,
|
||||
publish_fn,
|
||||
)
|
||||
@@ -144,7 +135,7 @@ async fn fetch_and_process_engine_blobs_inner<T: BeaconChainTypes>(
|
||||
async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
|
||||
chain_adapter: FetchBlobsBeaconAdapter<T>,
|
||||
block_root: Hash256,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
header: &PartialDataColumnHeader<T::EthSpec>,
|
||||
versioned_hashes: Vec<VersionedHash>,
|
||||
publish_fn: impl Fn(EngineGetBlobsOutput<T>) + Send + Sized,
|
||||
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
|
||||
@@ -182,19 +173,12 @@ async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let (signed_block_header, kzg_commitments_proof) = block
|
||||
.signed_block_header_and_kzg_commitments_proof()
|
||||
.map_err(FetchEngineBlobError::BeaconStateError)?;
|
||||
let mut blob_sidecar_list = build_blob_sidecars(header, response)?;
|
||||
|
||||
let mut blob_sidecar_list = build_blob_sidecars(
|
||||
&block,
|
||||
response,
|
||||
signed_block_header,
|
||||
&kzg_commitments_proof,
|
||||
)?;
|
||||
|
||||
let observation_key =
|
||||
ObservationKey::new_proposer_key(block.message().proposer_index(), block.slot());
|
||||
let observation_key = ObservationKey::new_proposer_key(
|
||||
header.signed_block_header.message.proposer_index,
|
||||
header.slot(),
|
||||
);
|
||||
|
||||
if let Some(observed_blobs) = chain_adapter.blobs_known_for_observation_key(observation_key) {
|
||||
blob_sidecar_list.retain(|blob| !observed_blobs.contains(&blob.blob_index()));
|
||||
@@ -225,7 +209,7 @@ async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
|
||||
|
||||
let availability_processing_status = chain_adapter
|
||||
.process_engine_blobs(
|
||||
block.slot(),
|
||||
header.slot(),
|
||||
block_root,
|
||||
EngineGetBlobsOutput::Blobs(blob_sidecar_list),
|
||||
)
|
||||
@@ -235,35 +219,53 @@ async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = "debug")]
|
||||
async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
|
||||
async fn fetch_and_process_blobs_v2_or_v3<T: BeaconChainTypes>(
|
||||
chain_adapter: FetchBlobsBeaconAdapter<T>,
|
||||
block_root: Hash256,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
header: Arc<PartialDataColumnHeader<T::EthSpec>>,
|
||||
versioned_hashes: Vec<VersionedHash>,
|
||||
custody_columns_indices: &[ColumnIndex],
|
||||
publish_fn: impl Fn(EngineGetBlobsOutput<T>) + Send + 'static,
|
||||
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
|
||||
let num_expected_blobs = versioned_hashes.len();
|
||||
let slot = header.slot();
|
||||
|
||||
metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64);
|
||||
debug!(num_expected_blobs, "Fetching blobs from the EL");
|
||||
|
||||
// Track request count and duration for standardized metrics
|
||||
inc_counter(&metrics::BEACON_ENGINE_GET_BLOBS_V2_REQUESTS_TOTAL);
|
||||
let _timer =
|
||||
metrics::start_timer(&metrics::BEACON_ENGINE_GET_BLOBS_V2_REQUEST_DURATION_SECONDS);
|
||||
let get_blobs_v3 = chain_adapter.supports_get_blobs_v3().await?;
|
||||
let response = if get_blobs_v3 {
|
||||
debug!(num_expected_blobs, "Fetching available blobs from the EL");
|
||||
// Track request count and duration for standardized metrics
|
||||
inc_counter(&metrics::BEACON_ENGINE_GET_BLOBS_V3_REQUESTS_TOTAL);
|
||||
let _timer =
|
||||
metrics::start_timer(&metrics::BEACON_ENGINE_GET_BLOBS_V3_REQUEST_DURATION_SECONDS);
|
||||
|
||||
let response = chain_adapter
|
||||
.get_blobs_v2(versioned_hashes)
|
||||
.await
|
||||
.inspect_err(|_| {
|
||||
inc_counter(&metrics::BLOBS_FROM_EL_ERROR_TOTAL);
|
||||
})?;
|
||||
chain_adapter
|
||||
.get_blobs_v3(versioned_hashes)
|
||||
.await
|
||||
.inspect_err(|_| {
|
||||
inc_counter(&metrics::BLOBS_FROM_EL_ERROR_TOTAL);
|
||||
})?
|
||||
} else {
|
||||
debug!(num_expected_blobs, "Fetching all blobs from the EL");
|
||||
|
||||
drop(_timer);
|
||||
// Track request count and duration for standardized metrics
|
||||
inc_counter(&metrics::BEACON_ENGINE_GET_BLOBS_V2_REQUESTS_TOTAL);
|
||||
let _timer =
|
||||
metrics::start_timer(&metrics::BEACON_ENGINE_GET_BLOBS_V2_REQUEST_DURATION_SECONDS);
|
||||
|
||||
// Track successful response
|
||||
inc_counter(&metrics::BEACON_ENGINE_GET_BLOBS_V2_RESPONSES_TOTAL);
|
||||
let response = chain_adapter
|
||||
.get_blobs_v2(versioned_hashes)
|
||||
.await
|
||||
.inspect_err(|_| {
|
||||
inc_counter(&metrics::BLOBS_FROM_EL_ERROR_TOTAL);
|
||||
})?;
|
||||
|
||||
// Track successful response
|
||||
inc_counter(&metrics::BEACON_ENGINE_GET_BLOBS_V2_RESPONSES_TOTAL);
|
||||
|
||||
response.map(|vec| vec.into_iter().map(Some).collect())
|
||||
};
|
||||
|
||||
let Some(blobs_and_proofs) = response else {
|
||||
debug!(num_expected_blobs, "No blobs fetched from the EL");
|
||||
@@ -271,32 +273,35 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let (blobs, proofs): (Vec<_>, Vec<_>) = blobs_and_proofs
|
||||
.into_iter()
|
||||
.map(|blob_and_proof| {
|
||||
let BlobAndProofV2 { blob, proofs } = blob_and_proof;
|
||||
(blob, proofs)
|
||||
})
|
||||
.unzip();
|
||||
|
||||
let num_fetched_blobs = blobs.len();
|
||||
let num_fetched_blobs = blobs_and_proofs.iter().filter(|opt| opt.is_some()).count();
|
||||
metrics::observe(&metrics::BLOBS_FROM_EL_RECEIVED, num_fetched_blobs as f64);
|
||||
|
||||
if num_fetched_blobs != num_expected_blobs {
|
||||
// This scenario is not supposed to happen if the EL is spec compliant.
|
||||
// It should either return all requested blobs or none, but NOT partial responses.
|
||||
// If we attempt to compute columns with partial blobs, we'd end up with invalid columns.
|
||||
warn!(
|
||||
num_fetched_blobs,
|
||||
num_expected_blobs, "The EL did not return all requested blobs"
|
||||
);
|
||||
inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL);
|
||||
return Ok(None);
|
||||
if !get_blobs_v3 {
|
||||
// This scenario is not supposed to happen if the EL is spec compliant.
|
||||
// It should either return all requested blobs or none, but NOT partial responses.
|
||||
// If we attempt to compute columns with partial blobs, we'd end up with invalid columns.
|
||||
warn!(
|
||||
num_fetched_blobs,
|
||||
num_expected_blobs, "The EL did not return all requested blobs"
|
||||
);
|
||||
inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL);
|
||||
return Ok(None);
|
||||
} else {
|
||||
inc_counter(&metrics::BEACON_ENGINE_GET_BLOBS_V3_PARTIAL_RESPONSES_TOTAL);
|
||||
debug!(
|
||||
num_fetched_blobs,
|
||||
num_expected_blobs, "Blobs partially received from the EL"
|
||||
);
|
||||
}
|
||||
} else {
|
||||
debug!(num_fetched_blobs, "All blobs received from the EL");
|
||||
inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL);
|
||||
if get_blobs_v3 {
|
||||
inc_counter(&metrics::BEACON_ENGINE_GET_BLOBS_V3_COMPLETE_RESPONSES_TOTAL);
|
||||
}
|
||||
}
|
||||
|
||||
debug!(num_fetched_blobs, "All expected blobs received from the EL");
|
||||
inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL);
|
||||
|
||||
if chain_adapter.fork_choice_contains_block(&block_root) {
|
||||
// Avoid computing columns if the block has already been imported.
|
||||
debug!(
|
||||
@@ -310,9 +315,8 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
|
||||
let custody_columns_to_import = compute_custody_columns_to_import(
|
||||
&chain_adapter,
|
||||
block_root,
|
||||
block.clone(),
|
||||
blobs,
|
||||
proofs,
|
||||
&header,
|
||||
blobs_and_proofs,
|
||||
custody_columns_indices,
|
||||
)
|
||||
.await?;
|
||||
@@ -325,20 +329,49 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Up until this point we have not observed the data columns in the gossip cache, which allows
|
||||
// them to arrive independently while this function is running. In publish_fn we will observe
|
||||
// them and then publish any columns that had not already been observed.
|
||||
publish_fn(EngineGetBlobsOutput::CustodyColumns(
|
||||
custody_columns_to_import.clone(),
|
||||
));
|
||||
let full_columns = match chain_adapter.partial_assembler() {
|
||||
Some(assembler) => {
|
||||
// Initialize the partial assembler with the columns from the engine and return any full
|
||||
// columns for publishing
|
||||
assembler
|
||||
.merge_partials(block_root, custody_columns_to_import, header)
|
||||
.ok_or_else(|| {
|
||||
FetchEngineBlobError::InternalError(
|
||||
"Failed to merge partials into assembler".to_string(),
|
||||
)
|
||||
})?
|
||||
.full_columns
|
||||
}
|
||||
None => {
|
||||
// Partial columns are disabled, so let's try to directly convert the columns we got
|
||||
// from the EL into full columns.
|
||||
custody_columns_to_import
|
||||
.into_iter()
|
||||
.filter_map(|col| col.try_into_full(&header))
|
||||
.collect()
|
||||
}
|
||||
};
|
||||
|
||||
let availability_processing_status = chain_adapter
|
||||
.process_engine_blobs(
|
||||
block.slot(),
|
||||
block_root,
|
||||
EngineGetBlobsOutput::CustodyColumns(custody_columns_to_import),
|
||||
)
|
||||
.await?;
|
||||
// Publish complete columns
|
||||
if !full_columns.is_empty() {
|
||||
publish_fn(EngineGetBlobsOutput::CustodyColumns(full_columns.clone()));
|
||||
}
|
||||
// We publish all partials at the calling site, regardless of result, as previous publishs
|
||||
// have been blocked, waiting for the results of this call
|
||||
|
||||
// Process complete columns through DA checker
|
||||
let availability_processing_status = if !full_columns.is_empty() {
|
||||
chain_adapter
|
||||
.process_engine_blobs(
|
||||
slot,
|
||||
block_root,
|
||||
EngineGetBlobsOutput::CustodyColumns(full_columns),
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
// No complete columns yet, still missing components
|
||||
AvailabilityProcessingStatus::MissingComponents(slot, block_root)
|
||||
};
|
||||
|
||||
Ok(Some(availability_processing_status))
|
||||
}
|
||||
@@ -347,28 +380,34 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
|
||||
async fn compute_custody_columns_to_import<T: BeaconChainTypes>(
|
||||
chain_adapter: &Arc<FetchBlobsBeaconAdapter<T>>,
|
||||
block_root: Hash256,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
|
||||
blobs: Vec<Blob<T::EthSpec>>,
|
||||
proofs: Vec<KzgProofs<T::EthSpec>>,
|
||||
header: &PartialDataColumnHeader<T::EthSpec>,
|
||||
blobs_and_proofs: Vec<BlobAndProofV3<T::EthSpec>>,
|
||||
custody_columns_indices: &[ColumnIndex],
|
||||
) -> Result<Vec<KzgVerifiedCustodyDataColumn<T::EthSpec>>, FetchEngineBlobError> {
|
||||
) -> Result<Vec<KzgVerifiedCustodyPartialDataColumn<T::EthSpec>>, FetchEngineBlobError> {
|
||||
let kzg = chain_adapter.kzg().clone();
|
||||
let spec = chain_adapter.spec().clone();
|
||||
let chain_adapter_cloned = chain_adapter.clone();
|
||||
let custody_columns_indices = custody_columns_indices.to_vec();
|
||||
let header = header.clone();
|
||||
chain_adapter
|
||||
.executor()
|
||||
.spawn_blocking_handle(
|
||||
move || {
|
||||
let mut timer = metrics::start_timer_vec(
|
||||
&metrics::DATA_COLUMN_SIDECAR_COMPUTATION,
|
||||
&[&blobs.len().to_string()],
|
||||
&[&blobs_and_proofs.len().to_string()],
|
||||
);
|
||||
|
||||
let blob_refs = blobs.iter().collect::<Vec<_>>();
|
||||
let cell_proofs = proofs.into_iter().flatten().collect();
|
||||
let blob_and_proof_refs = blobs_and_proofs
|
||||
.iter()
|
||||
.map(|option| {
|
||||
option
|
||||
.as_ref()
|
||||
.map(|BlobAndProofV2 { blob, proofs }| (blob, proofs.as_ref()))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let data_columns_result =
|
||||
blobs_to_data_column_sidecars(&blob_refs, cell_proofs, &block, &kzg, &spec)
|
||||
blobs_to_partial_data_columns(blob_and_proof_refs, &header, &kzg, &spec)
|
||||
.discard_timer_on_break(&mut timer);
|
||||
drop(timer);
|
||||
|
||||
@@ -379,10 +418,12 @@ async fn compute_custody_columns_to_import<T: BeaconChainTypes>(
|
||||
.map(|data_columns| {
|
||||
data_columns
|
||||
.into_iter()
|
||||
.filter(|col| custody_columns_indices.contains(col.index()))
|
||||
.filter(|col| custody_columns_indices.contains(&col.index))
|
||||
.map(|col| {
|
||||
KzgVerifiedCustodyDataColumn::from_asserted_custody(
|
||||
KzgVerifiedDataColumn::from_execution_verified(col),
|
||||
KzgVerifiedCustodyPartialDataColumn::from_asserted_custody(
|
||||
KzgVerifiedPartialDataColumn::from_execution_verified(
|
||||
Arc::new(col),
|
||||
),
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
@@ -390,7 +431,8 @@ async fn compute_custody_columns_to_import<T: BeaconChainTypes>(
|
||||
.map_err(FetchEngineBlobError::DataColumnSidecarError)?;
|
||||
|
||||
// Only consider columns that are not already observed on gossip.
|
||||
let observation_key = ObservationKey::from_block(&block, block_root, &spec);
|
||||
let observation_key =
|
||||
ObservationKey::from_partial_column_header(&header, block_root, &spec);
|
||||
|
||||
if let Some(observed_columns) =
|
||||
chain_adapter_cloned.data_column_known_for_observation_key(observation_key)
|
||||
@@ -421,10 +463,8 @@ async fn compute_custody_columns_to_import<T: BeaconChainTypes>(
|
||||
}
|
||||
|
||||
fn build_blob_sidecars<E: EthSpec>(
|
||||
block: &Arc<SignedBeaconBlock<E, FullPayload<E>>>,
|
||||
header: &PartialDataColumnHeader<E>,
|
||||
response: Vec<Option<BlobAndProofV1<E>>>,
|
||||
signed_block_header: SignedBeaconBlockHeader,
|
||||
kzg_commitments_inclusion_proof: &FixedVector<Hash256, E::KzgCommitmentsInclusionProofDepth>,
|
||||
) -> Result<Vec<KzgVerifiedBlob<E>>, FetchEngineBlobError> {
|
||||
let mut sidecars = vec![];
|
||||
for (index, blob_and_proof) in response
|
||||
@@ -435,9 +475,7 @@ fn build_blob_sidecars<E: EthSpec>(
|
||||
let blob_sidecar = BlobSidecar::new_with_existing_proof(
|
||||
index,
|
||||
blob_and_proof.blob,
|
||||
block,
|
||||
signed_block_header.clone(),
|
||||
kzg_commitments_inclusion_proof,
|
||||
header.clone(),
|
||||
blob_and_proof.proof,
|
||||
)
|
||||
.map_err(FetchEngineBlobError::BlobSidecarError)?;
|
||||
|
||||
Reference in New Issue
Block a user