This commit is contained in:
Daniel Knopik
2026-04-29 13:06:07 +02:00
parent 2d3354551e
commit 7cf76ac7af
19 changed files with 59 additions and 119 deletions

View File

@@ -71,7 +71,7 @@ use crate::payload_envelope_verification::EnvelopeError;
use crate::pending_payload_cache::PendingPayloadCache;
use crate::pending_payload_cache::{
Availability as PayloadAvailability,
DataColumnReconstructionResult as DataColumnReconstructionResultV2,
DataColumnReconstructionResult as DataColumnReconstructionResultGloas,
};
use crate::pending_payload_envelopes::PendingPayloadEnvelopes;
use crate::persisted_beacon_chain::PersistedBeaconChain;
@@ -1185,14 +1185,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
block_root: Hash256,
indices: &[ColumnIndex],
fork_name: ForkName,
) -> Result<DataColumnSidecarList<T::EthSpec>, Error> {
let all_cached_columns_opt = if fork_name.gloas_enabled() {
self.pending_payload_cache.get_data_columns(block_root)
} else {
self.data_availability_checker.get_data_columns(block_root)
}
.or_else(|| self.early_attester_cache.get_data_columns(block_root));
let all_cached_columns_opt = self
.pending_payload_cache
.get_data_columns(block_root)
.or_else(|| self.data_availability_checker.get_data_columns(block_root))
.or_else(|| self.early_attester_cache.get_data_columns(block_root));
if let Some(mut all_cached_columns) = all_cached_columns_opt {
all_cached_columns.retain(|col| indices.contains(col.index()));
@@ -3676,7 +3674,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_err(EnvelopeError::from)?;
match result {
DataColumnReconstructionResultV2::Success((
DataColumnReconstructionResultGloas::Success((
availability,
data_columns_to_publish,
)) => {
@@ -3689,8 +3687,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.await
.map(|status| Some((status, data_columns_to_publish)))?)
}
DataColumnReconstructionResultV2::NotStarted(reason)
| DataColumnReconstructionResultV2::RecoveredColumnsNotImported(reason) => {
DataColumnReconstructionResultGloas::NotStarted(reason)
| DataColumnReconstructionResultGloas::RecoveredColumnsNotImported(reason) => {
metrics::inc_counter_vec(
&metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL,
&[reason],

View File

@@ -1202,7 +1202,6 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
AvailableBlock::new(
block,
AvailableBlockData::NoData,
// TODO(gloas) shouldnt matter which da checker we pass?
&chain.data_availability_checker,
chain.spec.clone(),
)

View File

@@ -12,7 +12,7 @@ use crate::kzg_utils::{build_data_column_sidecars_fulu, build_data_column_sideca
use crate::light_client_server_cache::LightClientServerCache;
use crate::migrate::{BackgroundMigrator, MigratorConfig};
use crate::observed_data_sidecars::ObservedDataSidecars;
use crate::pending_payload_cache::PendingPayloadCache as DataAvailabilityCheckerV2;
use crate::pending_payload_cache::PendingPayloadCache;
use crate::persisted_beacon_chain::PersistedBeaconChain;
use crate::persisted_custody::load_custody_context;
use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
@@ -987,31 +987,7 @@ where
)
};
debug!(?custody_context, "Loaded persisted custody context");
let custody_context = Arc::new(custody_context);
let da_checker_v1 = Arc::new(
DataAvailabilityChecker::new(
complete_blob_backfill,
slot_clock.clone(),
self.kzg.clone(),
custody_context.clone(),
self.spec.clone(),
enable_partial_columns,
)
.map_err(|e| format!("Error initializing DataAvailabilityCheckerV1: {:?}", e))?,
);
let da_checker_v2 = Arc::new(
DataAvailabilityCheckerV2::new(
self.kzg.clone(),
custody_context.clone(),
self.spec.clone(),
)
.map_err(|e| format!("Error initializing DataAvailabilityCheckerV2: {:?}", e))?,
);
let pending_block_cache = da_checker_v1;
let pending_payload_cache = da_checker_v2;
let beacon_chain = BeaconChain {
spec: self.spec.clone(),
@@ -1084,8 +1060,25 @@ where
slasher: self.slasher.clone(),
validator_monitor: RwLock::new(validator_monitor),
genesis_backfill_slot,
data_availability_checker: pending_block_cache,
pending_payload_cache,
data_availability_checker: Arc::new(
DataAvailabilityChecker::new(
complete_blob_backfill,
slot_clock.clone(),
self.kzg.clone(),
custody_context.clone(),
self.spec.clone(),
enable_partial_columns,
)
.map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?,
),
pending_payload_cache: Arc::new(
PendingPayloadCache::new(
self.kzg.clone(),
custody_context.clone(),
self.spec.clone(),
)
.map_err(|e| format!("Error initializing PendingPayloadCache: {:?}", e))?,
),
kzg: self.kzg.clone(),
rng: Arc::new(Mutex::new(rng)),
gossip_verified_payload_bid_cache: <_>::default(),

View File

@@ -21,7 +21,7 @@ use tracing::{debug, error, instrument};
use types::data::{BlobIdentifier, FixedBlobSidecarList, PartialDataColumn};
use types::{
BlobSidecar, BlobSidecarList, BlockImportSource, ChainSpec, DataColumnSidecar,
DataColumnSidecarList, Epoch, EthSpec, Hash256, PartialDataColumnSidecarError,
DataColumnSidecarList, Epoch, EthSpec, ForkName, Hash256, PartialDataColumnSidecarError,
PartialDataColumnSidecarRef, SignedBeaconBlock, Slot, new_non_zero_usize,
};
@@ -877,10 +877,16 @@ impl<E: EthSpec> AvailableBlock<E> {
match &block_data {
AvailableBlockData::NoData => {
if columns_required {
return Err(AvailabilityCheckError::MissingCustodyColumns);
} else if blobs_required {
return Err(AvailabilityCheckError::MissingBlobs);
// For Gloas, DA is checked for the PayloadEnvelope, not for the block.
if block.fork_name(&spec).map_err(|_| {
AvailabilityCheckError::Unexpected("Unexpected fork mismatch".to_string())
})? < ForkName::Gloas
{
if columns_required {
return Err(AvailabilityCheckError::MissingCustodyColumns);
} else if blobs_required {
return Err(AvailabilityCheckError::MissingBlobs);
}
}
}
AvailableBlockData::Blobs(blobs) => {

View File

@@ -119,14 +119,15 @@ impl<T: BeaconChainTypes> FetchBlobsBeaconAdapter<T> {
.cached_blob_indexes(block_root)
}
pub(crate) fn cached_data_column_indexes(
&self,
_slot: Slot,
block_root: &Hash256,
) -> Option<Vec<u64>> {
pub(crate) fn cached_data_column_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
self.chain
.data_availability_checker
.cached_data_column_indexes(block_root)
.or_else(|| {
self.chain
.pending_payload_cache
.cached_data_column_indexes(block_root)
})
}
pub(crate) async fn process_engine_blobs(

View File

@@ -445,7 +445,7 @@ async fn compute_custody_columns_to_import<T: BeaconChainTypes>(
// Only consider columns that are not already known to data availability.
if let Some(known_columns) =
chain_adapter_cloned.cached_data_column_indexes(header.slot(), &block_root)
chain_adapter_cloned.cached_data_column_indexes(&block_root)
{
custody_columns.retain(|col| !known_columns.contains(&col.index()));
if custody_columns.is_empty() {

View File

@@ -199,7 +199,7 @@ mod get_blobs_v2 {
.returning(|_| None);
mock_adapter
.expect_cached_data_column_indexes()
.returning(|_, _| None);
.returning(|_| None);
mock_process_engine_blobs_result(
&mut mock_adapter,
Ok(AvailabilityProcessingStatus::Imported(block_root)),

View File

@@ -2146,7 +2146,6 @@ pub fn scrape_for_metrics<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) {
);
let da_checker_metrics = beacon_chain.data_availability_checker.metrics();
set_gauge_by_usize(
&DATA_AVAILABILITY_OVERFLOW_MEMORY_BLOCK_CACHE_SIZE,
da_checker_metrics.block_cache_size,

View File

@@ -56,6 +56,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
}
// TODO(gloas) insert the pre-executed envelope into some type of cache?
let _full_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_TIMES);
metrics::inc_counter(&metrics::ENVELOPE_PROCESSING_REQUESTS);

View File

@@ -100,47 +100,6 @@ pub struct EnvelopeProcessingSnapshot<E: EthSpec> {
pub beacon_block_root: Hash256,
}
/// A payload envelope that has gone through processing checks and execution by an EL client.
/// This envelope hasn't necessarily completed data availability checks.
///
///
/// It contains 2 variants:
/// 1. `Available`: This envelope has been executed and also contains all data to consider it
/// fully available.
/// 2. `AvailabilityPending`: This envelope hasn't received all required blobs to consider it
/// fully available.
#[allow(dead_code)]
pub enum ExecutedEnvelope<E: EthSpec> {
Available(AvailableExecutedEnvelope<E>),
AvailabilityPending(AvailabilityPendingExecutedEnvelope<E>),
}
impl<E: EthSpec> ExecutedEnvelope<E> {
pub fn new(
envelope: MaybeAvailableEnvelope<E>,
block_root: Hash256,
payload_verification_outcome: PayloadVerificationOutcome,
) -> Self {
match envelope {
MaybeAvailableEnvelope::Available(available_envelope) => {
Self::Available(AvailableExecutedEnvelope::new(
available_envelope,
block_root,
payload_verification_outcome,
))
}
MaybeAvailableEnvelope::AvailabilityPending {
block_hash: _,
envelope,
} => Self::AvailabilityPending(AvailabilityPendingExecutedEnvelope::new(
envelope,
block_root,
payload_verification_outcome,
)),
}
}
}
/// A payload ernvelope that has completed all envelope procesing checks, verification
/// by an EL client but does not have all requisite columns to get imported into
/// fork choice.
@@ -162,10 +121,6 @@ impl<E: EthSpec> AvailabilityPendingExecutedEnvelope<E> {
payload_verification_outcome,
}
}
pub fn as_envelope(&self) -> &SignedExecutionPayloadEnvelope<E> {
&self.envelope
}
}
/// A payload envelope that has completed all payload processing checks including verification

View File

@@ -80,9 +80,6 @@ use types::new_non_zero_usize;
/// eviction to prevent race conditions (#7961), so we expect this cache to be full all the time.
const OVERFLOW_LRU_CAPACITY_NON_ZERO: NonZeroUsize = new_non_zero_usize(32);
/// Represents available data for a payload - its block root and its data columns.
pub type AvailableData<E> = (Hash256, DataColumnSidecarList<E>);
/// This type is returned after adding a bid / column to the `DataAvailabilityChecker`.
///
/// Indicates if the payloads data is fully `Available` or if we need more columns.

View File

@@ -587,7 +587,6 @@ fn handle_rpc_request<E: EthSpec>(
decoded_buffer,
spec.max_request_blocks(current_fork),
)?,
fork_name: current_fork,
},
))),
SupportedProtocol::PingV1 => Ok(Some(RequestType::Ping(Ping {
@@ -1156,7 +1155,6 @@ mod tests {
spec.max_request_blocks(fork_name),
)
.unwrap(),
fork_name,
}
}

View File

@@ -12,7 +12,6 @@ use std::ops::Deref;
use std::sync::Arc;
use strum::IntoStaticStr;
use superstruct::superstruct;
use types::ForkName;
use types::data::BlobIdentifier;
use types::light_client::consts::MAX_REQUEST_LIGHT_CLIENT_UPDATES;
use types::{
@@ -563,21 +562,16 @@ impl BlobsByRootRequest {
pub struct DataColumnsByRootRequest<E: EthSpec> {
/// The list of beacon block roots and column indices being requested.
pub data_column_ids: RuntimeVariableList<DataColumnsByRootIdentifier<E>>,
pub fork_name: ForkName,
}
impl<E: EthSpec> DataColumnsByRootRequest<E> {
pub fn new(
data_column_ids: Vec<DataColumnsByRootIdentifier<E>>,
fork_name: ForkName,
max_request_blocks: usize,
) -> Result<Self, &'static str> {
let data_column_ids = RuntimeVariableList::new(data_column_ids, max_request_blocks)
.map_err(|_| "DataColumnsByRootRequest too many column IDs")?;
Ok(Self {
data_column_ids,
fork_name,
})
Ok(Self { data_column_ids })
}
pub fn max_requested(&self) -> usize {

View File

@@ -982,7 +982,6 @@ fn test_tcp_columns_by_root_chunked_rpc_for_fork(fork_name: ForkName) {
};
max_request_blocks
],
fork_name,
max_request_blocks,
)
.unwrap();
@@ -993,7 +992,6 @@ fn test_tcp_columns_by_root_chunked_rpc_for_fork(fork_name: ForkName) {
spec.max_request_blocks(fork_name),
)
.unwrap(),
fork_name,
};
assert_eq!(req, req_decoded);
let rpc_request = RequestType::DataColumnsByRoot(req);

View File

@@ -543,7 +543,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
match self.chain.get_data_columns_checking_all_caches(
data_column_ids_by_root.block_root,
&indices_to_retrieve,
request.fork_name,
) {
Ok(data_columns) => {
send_data_column_count += data_columns.len();

View File

@@ -172,7 +172,7 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
_: usize,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.custody_lookup_request(id, self.slot, self.block_root, lookup_peers)
cx.custody_lookup_request(id, self.block_root, lookup_peers)
.map_err(LookupRequestError::SendFailedNetwork)
}

View File

@@ -241,7 +241,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
);
} else if cx.chain.should_fetch_custody_columns(block_epoch) {
self.component_requests = ComponentRequests::ActiveCustodyRequest(
CustodyRequestState::new(block.slot(), self.block_root),
CustodyRequestState::new(self.block_root),
);
} else {
self.component_requests = ComponentRequests::NotNeeded("outside da window");
@@ -399,15 +399,13 @@ impl<E: EthSpec> BlobRequestState<E> {
pub struct CustodyRequestState<E: EthSpec> {
#[educe(Debug(ignore))]
pub block_root: Hash256,
pub slot: Slot,
pub state: SingleLookupRequestState<DataColumnSidecarList<E>>,
}
impl<E: EthSpec> CustodyRequestState<E> {
pub fn new(slot: Slot, block_root: Hash256) -> Self {
pub fn new(block_root: Hash256) -> Self {
Self {
block_root,
slot,
state: SingleLookupRequestState::new(),
}
}

View File

@@ -1082,7 +1082,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn custody_lookup_request(
&mut self,
lookup_id: SingleLookupId,
_slot: Slot,
block_root: Hash256,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
) -> Result<LookupRequestResult, RpcRequestSendError> {
@@ -1090,6 +1089,11 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.chain
.data_availability_checker
.cached_data_column_indexes(&block_root)
.or_else(|| {
self.chain
.pending_payload_cache
.cached_data_column_indexes(&block_root)
})
.unwrap_or_default();
let current_epoch = self.chain.epoch().map_err(|e| {

View File

@@ -26,7 +26,6 @@ impl DataColumnsByRootSingleBlockRequest {
block_root: self.block_root,
columns,
}],
fork_name,
spec.max_request_blocks(fork_name),
)
}