mirror of
https://github.com/sigp/lighthouse.git
synced 2026-06-15 09:48:20 +00:00
Refactor
This commit is contained in:
@@ -22,7 +22,6 @@ use crate::data_availability_checker::{
|
||||
Availability as BlockAvailability, AvailabilityCheckError, AvailableBlock, AvailableBlockData,
|
||||
DataColumnReconstructionResult,
|
||||
};
|
||||
use crate::data_availability_checker_v2::Availability as PayloadAvailability;
|
||||
use crate::data_availability_router::{
|
||||
AvailabilityOutcome, DataAvailabilityRouter, ReconstructionOutcome,
|
||||
};
|
||||
@@ -3799,7 +3798,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}
|
||||
}
|
||||
AvailabilityOutcome::Payload(_) => {
|
||||
return Err(BlockError::InternalError("Received a payload envelope availability outcome variant when a block variant was expected".to_string()))
|
||||
Err(BlockError::InternalError("Received a payload envelope availability outcome variant when a block variant was expected".to_string()))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,7 +60,6 @@ use crate::execution_payload::{
|
||||
};
|
||||
use crate::kzg_utils::blobs_to_data_column_sidecars;
|
||||
use crate::observed_block_producers::SeenBlock;
|
||||
use crate::payload_envelope_verification::EnvelopeError;
|
||||
use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS;
|
||||
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
|
||||
use crate::{
|
||||
|
||||
@@ -994,7 +994,6 @@ where
|
||||
|
||||
let da_checker_v2 = Arc::new(
|
||||
DataAvailabilityCheckerV2::new(
|
||||
slot_clock.clone(),
|
||||
self.kzg.clone(),
|
||||
custody_context.clone(),
|
||||
self.spec.clone(),
|
||||
|
||||
@@ -1,3 +1,42 @@
|
||||
//! This module builds out the data availability cache for Gloas. When a beacon block is received
|
||||
//! over gossip/p2p we insert its payload into this cache, keyed by block root. As soon as the bid
|
||||
//! is received we can begin using it to verify data columns.
|
||||
//!
|
||||
//! When a payload envelope is received over gossip/p2p we first insert it as a pre-executed envelope. A separate
|
||||
//! thread eventually executes the payload envelope against the EL. Assuming the payload is executed succesfully
|
||||
//! the envelope is updated in the cache from `PreExecuted` -> `Executed`. Once all required custody columns
|
||||
//! have been kzg verified and the envelope has been executed we can import the envelope into fork choice and store it to disk.
|
||||
//!
|
||||
//! Note that the block must have arrived before the envelope for the envelope to pass upstream verification checks and reach this cache.
|
||||
//! However data columns can potentially arrive before the block.
|
||||
//!
|
||||
//!
|
||||
//! SignedBeaconBlock
|
||||
//! |
|
||||
//! | -> SignedExecutionPayloadBid
|
||||
//!
|
||||
//!
|
||||
//! DataColumnSidecarList
|
||||
//! |
|
||||
//! | -> Perform data column verification against `SignedExecutionPayloadBid`
|
||||
//! │ │
|
||||
//! │ ▼
|
||||
//! | -> KzgVerifiedCustodyDataColumn
|
||||
//!
|
||||
//!
|
||||
//! SignedExecutionPayloadEnvelope
|
||||
//! │
|
||||
//! | -> CachedPayloadEnvelope::PreExecution
|
||||
//! │ │
|
||||
//! │ ▼
|
||||
//! | -> AvailabilityPendingExecutedEnvelope
|
||||
//! │ │
|
||||
//! │ ▼
|
||||
//! │ -> CachedPayloadEnvelope::Executed
|
||||
//! │ │
|
||||
//! │ ▼
|
||||
//! | -> AvailableExecutedEnvelope (all columns present, payload executed against the EL, ready to import)
|
||||
|
||||
use crate::data_availability_checker::AvailabilityCheckError;
|
||||
use crate::payload_envelope_verification::{
|
||||
AvailabilityPendingExecutedEnvelope, AvailableExecutedEnvelope,
|
||||
@@ -66,7 +105,9 @@ impl<E: EthSpec> Debug for Availability<E> {
|
||||
write!(f, "MissingComponents({})", block_root)
|
||||
}
|
||||
// TODO(gloas) fix success case
|
||||
Self::Available(data) => todo!(),
|
||||
Self::Available(envelope) => {
|
||||
write!(f, "Available({:?})", envelope.import_data.block_root)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -99,7 +140,6 @@ pub struct DataAvailabilityChecker<T: BeaconChainTypes> {
|
||||
|
||||
impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
pub fn new(
|
||||
_slot_clock: T::SlotClock,
|
||||
kzg: Arc<Kzg>,
|
||||
custody_context: Arc<CustodyContext<T::EthSpec>>,
|
||||
spec: Arc<ChainSpec>,
|
||||
@@ -174,7 +214,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
/// Insert an executed payload envelope into the cache and performs an availability check
|
||||
pub fn put_executed_payload_envelope(
|
||||
&self,
|
||||
@@ -416,16 +455,19 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
"Reconstructed columns"
|
||||
);
|
||||
|
||||
self.put_kzg_verified_custody_data_columns(*block_root, data_columns_to_import_and_publish.clone())
|
||||
.map(|availability| {
|
||||
DataColumnReconstructionResult::Success((
|
||||
availability,
|
||||
data_columns_to_import_and_publish
|
||||
.into_iter()
|
||||
.map(|d| d.clone_arc())
|
||||
.collect::<Vec<_>>(),
|
||||
))
|
||||
})
|
||||
self.put_kzg_verified_custody_data_columns(
|
||||
*block_root,
|
||||
data_columns_to_import_and_publish.clone(),
|
||||
)
|
||||
.map(|availability| {
|
||||
DataColumnReconstructionResult::Success((
|
||||
availability,
|
||||
data_columns_to_import_and_publish
|
||||
.into_iter()
|
||||
.map(|d| d.clone_arc())
|
||||
.collect::<Vec<_>>(),
|
||||
))
|
||||
})
|
||||
}
|
||||
|
||||
// ── Metrics ──
|
||||
@@ -664,7 +706,9 @@ async fn availability_cache_maintenance_service<T: BeaconChainTypes>(
|
||||
mod data_availability_checker_tests {
|
||||
use super::*;
|
||||
|
||||
use crate::block_verification::PayloadVerificationOutcome;
|
||||
use crate::data_column_verification::{KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn};
|
||||
use crate::payload_envelope_verification::EnvelopeImportData;
|
||||
use crate::test_utils::{
|
||||
NumBlobs, generate_data_column_indices_rand_order, generate_rand_block_and_data_columns,
|
||||
test_spec,
|
||||
@@ -673,12 +717,16 @@ mod data_availability_checker_tests {
|
||||
custody_context::NodeCustodyType,
|
||||
test_utils::{BeaconChainHarness, DiskHarnessType},
|
||||
};
|
||||
use fork_choice::PayloadVerificationStatus;
|
||||
use logging::create_test_tracing_subscriber;
|
||||
use rand::SeedableRng;
|
||||
use rand::rngs::StdRng;
|
||||
use store::{HotColdDB, StoreConfig, database::interface::BeaconNodeBackend};
|
||||
use tempfile::{TempDir, tempdir};
|
||||
use types::{ForkName, MinimalEthSpec, Slot};
|
||||
use types::{
|
||||
BeaconState, ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, ForkName,
|
||||
FullPayload, MinimalEthSpec, SignedBeaconBlock, Slot,
|
||||
};
|
||||
|
||||
type E = MinimalEthSpec;
|
||||
|
||||
@@ -732,9 +780,7 @@ mod data_availability_checker_tests {
|
||||
.build()
|
||||
}
|
||||
|
||||
async fn setup_harness_and_cache<T>(
|
||||
capacity: usize,
|
||||
) -> (
|
||||
async fn setup_harness_and_cache<T>() -> (
|
||||
BeaconChainHarness<DiskHarnessType<E>>,
|
||||
Arc<DataAvailabilityChecker<T>>,
|
||||
TempDir,
|
||||
@@ -756,17 +802,15 @@ mod data_availability_checker_tests {
|
||||
&spec,
|
||||
));
|
||||
|
||||
todo!()
|
||||
// let cache = Arc::new(
|
||||
// DataAvailabilityChecker::<T>::new(
|
||||
// harness.chain.slot_clock.clone().into(),
|
||||
// harness.chain.kzg.clone().unwrap(),
|
||||
// custody_context,
|
||||
// spec.clone(),
|
||||
// )
|
||||
// .expect("should create cache"),
|
||||
// );
|
||||
// (harness, cache, chain_db_path)
|
||||
let cache = Arc::new(
|
||||
DataAvailabilityChecker::<T>::new(
|
||||
harness.chain.kzg.clone(),
|
||||
custody_context,
|
||||
spec.clone(),
|
||||
)
|
||||
.expect("should create cache"),
|
||||
);
|
||||
(harness, cache, chain_db_path)
|
||||
}
|
||||
|
||||
fn is_gloas_enabled() -> bool {
|
||||
@@ -781,8 +825,7 @@ mod data_availability_checker_tests {
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let capacity = 4;
|
||||
let (_harness, cache, _path) = setup_harness_and_cache::<T>(capacity).await;
|
||||
let (_harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
assert_eq!(cache.block_cache_size(), 0);
|
||||
}
|
||||
|
||||
@@ -793,8 +836,7 @@ mod data_availability_checker_tests {
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let capacity = 4;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>(capacity).await;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
@@ -840,8 +882,7 @@ mod data_availability_checker_tests {
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let capacity = 4;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>(capacity).await;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
@@ -889,8 +930,7 @@ mod data_availability_checker_tests {
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let capacity = 4;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>(capacity).await;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
@@ -922,6 +962,319 @@ mod data_availability_checker_tests {
|
||||
assert!(matches!(result, Availability::MissingComponents(_)));
|
||||
}
|
||||
|
||||
/// Helper to create a test bid with the given block root and kzg commitments from a block.
|
||||
fn make_test_bid<E: EthSpec>(
|
||||
block: &SignedBeaconBlock<E, FullPayload<E>>,
|
||||
) -> Arc<SignedExecutionPayloadBid<E>> {
|
||||
let bid = block
|
||||
.message()
|
||||
.body()
|
||||
.signed_execution_payload_bid()
|
||||
.expect("gloas block should have bid")
|
||||
.clone();
|
||||
Arc::new(bid)
|
||||
}
|
||||
|
||||
fn make_test_signed_envelope(block_root: Hash256) -> Arc<SignedExecutionPayloadEnvelope<E>> {
|
||||
Arc::new(SignedExecutionPayloadEnvelope {
|
||||
message: ExecutionPayloadEnvelope {
|
||||
payload: ExecutionPayloadGloas::default(),
|
||||
execution_requests: ExecutionRequests::default(),
|
||||
builder_index: 0,
|
||||
beacon_block_root: block_root,
|
||||
slot: Slot::new(0),
|
||||
state_root: Hash256::ZERO,
|
||||
},
|
||||
signature: bls::Signature::infinity().expect("should create infinity sig"),
|
||||
})
|
||||
}
|
||||
|
||||
fn make_test_executed_envelope(block_root: Hash256) -> AvailabilityPendingExecutedEnvelope<E> {
|
||||
AvailabilityPendingExecutedEnvelope {
|
||||
envelope: make_test_signed_envelope(block_root),
|
||||
import_data: EnvelopeImportData {
|
||||
block_root,
|
||||
post_state: Box::new(BeaconState::new(0, Default::default(), &gloas_spec::<E>())),
|
||||
},
|
||||
payload_verification_outcome: PayloadVerificationOutcome {
|
||||
payload_verification_status: PayloadVerificationStatus::Verified,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_full_availability_flow() {
|
||||
if !is_gloas_enabled() {
|
||||
return;
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
|
||||
let (block, data_columns) = generate_rand_block_and_data_columns::<E>(
|
||||
ForkName::Gloas,
|
||||
NumBlobs::Number(1),
|
||||
&mut rng,
|
||||
&spec,
|
||||
);
|
||||
|
||||
let block_root = Hash256::random();
|
||||
let bid = make_test_bid(&block);
|
||||
|
||||
cache.put_bid(block_root, bid).expect("should put bid");
|
||||
assert!(matches!(
|
||||
cache.put_bid(block_root, make_test_bid(&block)),
|
||||
Ok(Availability::MissingComponents(_))
|
||||
));
|
||||
|
||||
let verified_columns: Vec<_> = data_columns
|
||||
.into_iter()
|
||||
.map(|col| {
|
||||
KzgVerifiedCustodyDataColumn::from_asserted_custody(
|
||||
KzgVerifiedDataColumn::__new_for_testing(col),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let result = cache
|
||||
.put_kzg_verified_custody_data_columns(block_root, verified_columns)
|
||||
.expect("should put columns");
|
||||
|
||||
assert!(matches!(result, Availability::MissingComponents(_)));
|
||||
|
||||
// Insert pre-executed envelope first
|
||||
cache
|
||||
.put_pre_executed_payload_envelope(
|
||||
make_test_signed_envelope(block_root),
|
||||
BlockImportSource::Gossip,
|
||||
)
|
||||
.expect("should put pre-executed envelope");
|
||||
|
||||
let status = cache.get_envelope_processing_status(&block_root);
|
||||
assert!(matches!(
|
||||
status,
|
||||
Some(PayloadEnvelopeProcessingStatus::NotValidated(..))
|
||||
));
|
||||
|
||||
// Upgrade to executed envelope (after EL validation)
|
||||
let executed_envelope = make_test_executed_envelope(block_root);
|
||||
let result = cache
|
||||
.put_executed_payload_envelope(executed_envelope)
|
||||
.expect("should put executed envelope");
|
||||
|
||||
assert!(
|
||||
matches!(result, Availability::Available(_)),
|
||||
"expected Available, got {:?}",
|
||||
result
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_zero_blob_bid_immediately_available() {
|
||||
if !is_gloas_enabled() {
|
||||
return;
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
|
||||
// Generate a block with 0 blobs — bid will have empty commitments
|
||||
let (block, _data_columns) = generate_rand_block_and_data_columns::<E>(
|
||||
ForkName::Gloas,
|
||||
NumBlobs::Number(0),
|
||||
&mut rng,
|
||||
&spec,
|
||||
);
|
||||
|
||||
let block_root = Hash256::random();
|
||||
let bid = make_test_bid(&block);
|
||||
|
||||
// Insert bid (no blobs expected)
|
||||
cache.put_bid(block_root, bid).expect("should put bid");
|
||||
|
||||
// Insert executed envelope — should become available immediately (no columns needed)
|
||||
let executed_envelope = make_test_executed_envelope(block_root);
|
||||
let result = cache
|
||||
.put_executed_payload_envelope(executed_envelope)
|
||||
.expect("should put executed envelope");
|
||||
|
||||
assert!(
|
||||
matches!(result, Availability::Available(_)),
|
||||
"zero-blob bid should be immediately available, got {:?}",
|
||||
result
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_columns_arrive_before_bid() {
|
||||
if !is_gloas_enabled() {
|
||||
return;
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
|
||||
let (block, data_columns) = generate_rand_block_and_data_columns::<E>(
|
||||
ForkName::Gloas,
|
||||
NumBlobs::Number(1),
|
||||
&mut rng,
|
||||
&spec,
|
||||
);
|
||||
|
||||
let block_root = Hash256::random();
|
||||
|
||||
// Columns arrive before bid
|
||||
let verified_columns: Vec<_> = data_columns
|
||||
.into_iter()
|
||||
.map(|col| {
|
||||
KzgVerifiedCustodyDataColumn::from_asserted_custody(
|
||||
KzgVerifiedDataColumn::__new_for_testing(col),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let result = cache
|
||||
.put_kzg_verified_custody_data_columns(block_root, verified_columns)
|
||||
.expect("should put columns");
|
||||
assert!(matches!(result, Availability::MissingComponents(_)));
|
||||
|
||||
let bid = make_test_bid(&block);
|
||||
let result = cache.put_bid(block_root, bid).expect("should put bid");
|
||||
assert!(matches!(result, Availability::MissingComponents(_)));
|
||||
|
||||
let executed_envelope = make_test_executed_envelope(block_root);
|
||||
let result = cache
|
||||
.put_executed_payload_envelope(executed_envelope)
|
||||
.expect("should put executed envelope");
|
||||
|
||||
assert!(
|
||||
matches!(result, Availability::Available(_)),
|
||||
"expected Available after all components inserted, got {:?}",
|
||||
result
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_pre_executed_envelope_not_available() {
|
||||
if !is_gloas_enabled() {
|
||||
return;
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
|
||||
let (block, data_columns) = generate_rand_block_and_data_columns::<E>(
|
||||
ForkName::Gloas,
|
||||
NumBlobs::Number(1),
|
||||
&mut rng,
|
||||
&spec,
|
||||
);
|
||||
|
||||
let block_root = Hash256::random();
|
||||
|
||||
// Insert bid + all columns
|
||||
cache
|
||||
.put_bid(block_root, make_test_bid(&block))
|
||||
.expect("should put bid");
|
||||
|
||||
let verified_columns: Vec<_> = data_columns
|
||||
.into_iter()
|
||||
.map(|col| {
|
||||
KzgVerifiedCustodyDataColumn::from_asserted_custody(
|
||||
KzgVerifiedDataColumn::__new_for_testing(col),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
cache
|
||||
.put_kzg_verified_custody_data_columns(block_root, verified_columns)
|
||||
.expect("should put columns");
|
||||
|
||||
// Insert pre-executed envelope (not yet validated by EL)
|
||||
cache
|
||||
.put_pre_executed_payload_envelope(
|
||||
make_test_signed_envelope(block_root),
|
||||
BlockImportSource::Gossip,
|
||||
)
|
||||
.expect("should put pre-executed envelope");
|
||||
|
||||
// Should NOT be available — envelope not executed yet
|
||||
let status = cache.get_envelope_processing_status(&block_root);
|
||||
assert!(matches!(
|
||||
status,
|
||||
Some(PayloadEnvelopeProcessingStatus::NotValidated(..))
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_remove_pre_executed_envelope() {
|
||||
if !is_gloas_enabled() {
|
||||
return;
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let (_harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let block_root = Hash256::random();
|
||||
|
||||
// Insert pre-executed envelope
|
||||
cache
|
||||
.put_pre_executed_payload_envelope(
|
||||
make_test_signed_envelope(block_root),
|
||||
BlockImportSource::Gossip,
|
||||
)
|
||||
.expect("should put pre-executed envelope");
|
||||
|
||||
// Verify it's there
|
||||
assert!(cache.get_envelope_processing_status(&block_root).is_some());
|
||||
|
||||
// Remove it
|
||||
cache.remove_pre_executed_payload_envelope(&block_root);
|
||||
|
||||
// Should be gone
|
||||
let status = cache.get_envelope_processing_status(&block_root);
|
||||
assert!(status.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_remove_pre_executed_does_not_remove_executed() {
|
||||
if !is_gloas_enabled() {
|
||||
return;
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let (_harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let block_root = Hash256::random();
|
||||
|
||||
// Insert executed envelope
|
||||
let executed_envelope = make_test_executed_envelope(block_root);
|
||||
cache
|
||||
.put_executed_payload_envelope(executed_envelope)
|
||||
.expect("should put executed envelope");
|
||||
|
||||
// Try to remove as pre-executed — should be a no-op
|
||||
cache.remove_pre_executed_payload_envelope(&block_root);
|
||||
|
||||
// Should still be there as executed
|
||||
let status = cache.get_envelope_processing_status(&block_root);
|
||||
assert!(matches!(
|
||||
status,
|
||||
Some(PayloadEnvelopeProcessingStatus::ExecutionValidated(..))
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_reconstruction_started_flag() {
|
||||
if !is_gloas_enabled() {
|
||||
@@ -929,8 +1282,7 @@ mod data_availability_checker_tests {
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let capacity = 4;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>(capacity).await;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
@@ -971,8 +1323,7 @@ mod data_availability_checker_tests {
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let capacity = 4;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>(capacity).await;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
@@ -1024,10 +1375,7 @@ mod data_availability_checker_tests {
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let capacity = 4;
|
||||
let (_harness, cache, _path) = setup_harness_and_cache::<T>(capacity).await;
|
||||
|
||||
let block_root = Hash256::random();
|
||||
let (_harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
// Run maintenance with a future cutoff epoch
|
||||
let cutoff_epoch = Epoch::new(100);
|
||||
@@ -1046,8 +1394,7 @@ mod data_availability_checker_tests {
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let capacity = 4;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>(capacity).await;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
@@ -1084,4 +1431,190 @@ mod data_availability_checker_tests {
|
||||
assert!(peeked.is_some());
|
||||
assert_eq!(peeked.unwrap().len(), 3);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_lru_eviction() {
|
||||
if !is_gloas_enabled() {
|
||||
return;
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
|
||||
let (_block, data_columns) = generate_rand_block_and_data_columns::<E>(
|
||||
ForkName::Gloas,
|
||||
NumBlobs::Number(1),
|
||||
&mut rng,
|
||||
&spec,
|
||||
);
|
||||
|
||||
// LRU capacity is 32 (OVERFLOW_LRU_CAPACITY_NON_ZERO). Insert 33 entries.
|
||||
let mut roots = Vec::new();
|
||||
for _ in 0..33 {
|
||||
let block_root = Hash256::random();
|
||||
roots.push(block_root);
|
||||
let col = data_columns.first().cloned().expect("should have column");
|
||||
let verified = vec![KzgVerifiedCustodyDataColumn::from_asserted_custody(
|
||||
KzgVerifiedDataColumn::__new_for_testing(col),
|
||||
)];
|
||||
cache
|
||||
.put_kzg_verified_custody_data_columns(block_root, verified)
|
||||
.expect("should put columns");
|
||||
}
|
||||
|
||||
assert_eq!(cache.block_cache_size(), 32);
|
||||
assert!(cache.get_data_columns(roots[0]).is_none());
|
||||
assert!(cache.get_data_columns(*roots.last().unwrap()).is_some());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_maintenance_prunes_old_entries() {
|
||||
if !is_gloas_enabled() {
|
||||
return;
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
|
||||
let (block, data_columns) = generate_rand_block_and_data_columns::<E>(
|
||||
ForkName::Gloas,
|
||||
NumBlobs::Number(1),
|
||||
&mut rng,
|
||||
&spec,
|
||||
);
|
||||
|
||||
let block_root = Hash256::random();
|
||||
|
||||
// Insert bid (gives the entry an epoch via the bid's slot)
|
||||
cache
|
||||
.put_bid(block_root, make_test_bid(&block))
|
||||
.expect("should put bid");
|
||||
|
||||
let col = data_columns.first().cloned().expect("should have column");
|
||||
let verified = vec![KzgVerifiedCustodyDataColumn::from_asserted_custody(
|
||||
KzgVerifiedDataColumn::__new_for_testing(col),
|
||||
)];
|
||||
cache
|
||||
.put_kzg_verified_custody_data_columns(block_root, verified)
|
||||
.expect("should put columns");
|
||||
|
||||
assert_eq!(cache.block_cache_size(), 1);
|
||||
|
||||
// Maintenance with cutoff in the future should prune (bid slot=0 → epoch=0 < cutoff=100)
|
||||
cache
|
||||
.do_maintenance(Epoch::new(100))
|
||||
.expect("maintenance should succeed");
|
||||
|
||||
assert_eq!(cache.block_cache_size(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_double_reconstruction_prevented() {
|
||||
if !is_gloas_enabled() {
|
||||
return;
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
|
||||
let (_block, data_columns) = generate_rand_block_and_data_columns::<E>(
|
||||
ForkName::Gloas,
|
||||
NumBlobs::Number(1),
|
||||
&mut rng,
|
||||
&spec,
|
||||
);
|
||||
|
||||
let block_root = Hash256::random();
|
||||
|
||||
// Insert all columns so reconstruction threshold is met
|
||||
let verified_columns: Vec<_> = data_columns
|
||||
.into_iter()
|
||||
.map(|col| {
|
||||
KzgVerifiedCustodyDataColumn::from_asserted_custody(
|
||||
KzgVerifiedDataColumn::__new_for_testing(col),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
cache
|
||||
.put_kzg_verified_custody_data_columns(block_root, verified_columns)
|
||||
.expect("should put columns");
|
||||
|
||||
// Manually set reconstruction_started via check_and_set
|
||||
// For fullnode, sampling == all columns, so this returns No("all sampling columns received")
|
||||
// But we can set the flag manually to test the guard
|
||||
cache
|
||||
.availability_cache
|
||||
.write()
|
||||
.get_mut(&block_root)
|
||||
.expect("should exist")
|
||||
.reconstruction_started = true;
|
||||
|
||||
let decision = cache.check_and_set_reconstruction_started(&block_root);
|
||||
assert!(
|
||||
matches!(decision, ReconstructColumnsDecision::No(reason) if reason == "already started"),
|
||||
"second reconstruction attempt should be prevented"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_partial_columns_missing_components() {
|
||||
if !is_gloas_enabled() {
|
||||
return;
|
||||
}
|
||||
|
||||
type T = DiskHarnessType<E>;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<T>().await;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(0xDEADBEEF);
|
||||
let spec = harness.spec.clone();
|
||||
|
||||
let (block, data_columns) = generate_rand_block_and_data_columns::<E>(
|
||||
ForkName::Gloas,
|
||||
NumBlobs::Number(1),
|
||||
&mut rng,
|
||||
&spec,
|
||||
);
|
||||
|
||||
let block_root = Hash256::random();
|
||||
|
||||
// Insert bid and executed envelope
|
||||
cache
|
||||
.put_bid(block_root, make_test_bid(&block))
|
||||
.expect("should put bid");
|
||||
|
||||
let executed_envelope = make_test_executed_envelope(block_root);
|
||||
cache
|
||||
.put_executed_payload_envelope(executed_envelope)
|
||||
.expect("should put executed envelope");
|
||||
|
||||
// Insert only 1 column (need 128 for fullnode)
|
||||
let verified_columns: Vec<_> = data_columns
|
||||
.into_iter()
|
||||
.take(1)
|
||||
.map(|col| {
|
||||
KzgVerifiedCustodyDataColumn::from_asserted_custody(
|
||||
KzgVerifiedDataColumn::__new_for_testing(col),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let result = cache
|
||||
.put_kzg_verified_custody_data_columns(block_root, verified_columns)
|
||||
.expect("should put columns");
|
||||
|
||||
assert!(
|
||||
matches!(result, Availability::MissingComponents(_)),
|
||||
"partial columns should not trigger availability"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,8 +8,8 @@ use std::sync::Arc;
|
||||
use tracing::{Span, debug, debug_span};
|
||||
use types::BlockImportSource;
|
||||
use types::{
|
||||
ChainSpec, ColumnIndex, DataColumnSidecar, Epoch, EthSpec, Hash256,
|
||||
SignedExecutionPayloadBid, SignedExecutionPayloadEnvelope,
|
||||
ChainSpec, ColumnIndex, DataColumnSidecar, Epoch, EthSpec, Hash256, SignedExecutionPayloadBid,
|
||||
SignedExecutionPayloadEnvelope,
|
||||
};
|
||||
|
||||
pub enum CachedPayloadEnvelope<E: EthSpec> {
|
||||
@@ -22,9 +22,6 @@ pub enum CachedPayloadEnvelope<E: EthSpec> {
|
||||
/// The columns are all gossip and kzg verified.
|
||||
/// The payload is considered "available" when all required columns are received.
|
||||
pub struct PendingComponents<E: EthSpec> {
|
||||
/// The block root is stored for tracing context in the span.
|
||||
#[allow(dead_code)]
|
||||
pub block_root: Hash256,
|
||||
/// The execution payload bid containing blob_kzg_commitments.
|
||||
pub bid: Option<Arc<SignedExecutionPayloadBid<E>>>,
|
||||
/// a cached pre or post executed payload envelope
|
||||
@@ -182,7 +179,6 @@ impl<E: EthSpec> PendingComponents<E> {
|
||||
let span = debug_span!(parent: None, "lh_pending_components", %block_root);
|
||||
let _guard = span.clone().entered();
|
||||
Self {
|
||||
block_root,
|
||||
bid: None,
|
||||
envelope: None,
|
||||
verified_data_columns: vec![],
|
||||
@@ -240,7 +236,6 @@ mod pending_components_tests {
|
||||
let block_root = Hash256::random();
|
||||
let components = PendingComponents::<E>::empty(block_root, spec);
|
||||
|
||||
assert_eq!(components.block_root, block_root);
|
||||
assert!(components.bid.is_none());
|
||||
assert!(components.verified_data_columns.is_empty());
|
||||
assert!(!components.reconstruction_started);
|
||||
|
||||
@@ -20,10 +20,12 @@ use crate::block_verification_types::AvailabilityPendingExecutedBlock;
|
||||
use crate::custody_context::CustodyContext;
|
||||
use crate::data_availability_checker::{
|
||||
Availability as BlockAvailability, AvailabilityCheckError, AvailableBlock,
|
||||
DataAvailabilityChecker, DataColumnReconstructionResult as BlockReconstructionResult,
|
||||
DataAvailabilityChecker, DataAvailabilityCheckerMetrics as BlockMetrics,
|
||||
DataColumnReconstructionResult as BlockReconstructionResult,
|
||||
};
|
||||
use crate::data_availability_checker_v2::{
|
||||
Availability as PayloadAvailability, DataAvailabilityChecker as DataAvailabilityCheckerV2,
|
||||
DataAvailabilityCheckerMetrics as PayloadMetrics,
|
||||
DataColumnReconstructionResult as PayloadReconstructionResult,
|
||||
};
|
||||
use crate::data_column_verification::{GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn};
|
||||
@@ -387,17 +389,44 @@ impl<T: BeaconChainTypes> DataAvailabilityRouter<T> {
|
||||
self.v1.put_gossip_verified_blobs(block_root, blobs)
|
||||
}
|
||||
|
||||
/// Direct access to v1 checker for block execution/availability checks.
|
||||
///
|
||||
/// Use this for operations that are specific to the legacy DA checker,
|
||||
// ── Metrics ──
|
||||
|
||||
pub fn metrics(&self) -> DataAvailabilityRouterMetrics {
|
||||
DataAvailabilityRouterMetrics {
|
||||
block: self.v1.metrics(),
|
||||
payload: self.v2.metrics(),
|
||||
}
|
||||
}
|
||||
|
||||
// ── Direct access ──
|
||||
|
||||
/// Direct access to the block-level DA checker (pre-Gloas).
|
||||
/// Used for block availability checks, range sync, and blob verification.
|
||||
pub fn v1(&self) -> &Arc<DataAvailabilityChecker<T>> {
|
||||
&self.v1
|
||||
}
|
||||
|
||||
/// Direct access to v2 checker for payload availability checks.
|
||||
///
|
||||
/// Use this for operations that are specific to the Gloas DA checker,
|
||||
/// Direct access to the envelope-level DA checker (Gloas).
|
||||
/// Used for payload envelope availability checks and column verification.
|
||||
pub fn v2(&self) -> &Arc<DataAvailabilityCheckerV2<T>> {
|
||||
&self.v2
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DataAvailabilityRouterMetrics {
|
||||
pub block: BlockMetrics,
|
||||
pub payload: PayloadMetrics,
|
||||
}
|
||||
|
||||
pub fn start_availability_cache_maintenance_service<T: BeaconChainTypes>(
|
||||
executor: task_executor::TaskExecutor,
|
||||
chain: Arc<crate::BeaconChain<T>>,
|
||||
) {
|
||||
crate::data_availability_checker::start_availability_cache_maintenance_service(
|
||||
executor.clone(),
|
||||
chain.clone(),
|
||||
);
|
||||
crate::data_availability_checker_v2::start_availability_cache_maintenance_service(
|
||||
executor, chain,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1897,6 +1897,12 @@ pub static DATA_AVAILABILITY_OVERFLOW_MEMORY_BLOCK_CACHE_SIZE: LazyLock<Result<I
|
||||
"Number of entries in the data availability overflow block memory cache.",
|
||||
)
|
||||
});
|
||||
pub static DATA_AVAILABILITY_PAYLOAD_CACHE_SIZE: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
|
||||
try_create_int_gauge(
|
||||
"data_availability_payload_cache_size",
|
||||
"Number of entries in the data availability payload envelope cache.",
|
||||
)
|
||||
});
|
||||
pub static DATA_AVAILABILITY_RECONSTRUCTION_TIME: LazyLock<Result<Histogram>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
@@ -1999,12 +2005,11 @@ pub fn scrape_for_metrics<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) {
|
||||
beacon_chain.store.state_cache_len(),
|
||||
);
|
||||
|
||||
// TODO(gloas) configure v2 metrics
|
||||
let da_checker_metrics = beacon_chain.data_availability_checker.v1().metrics();
|
||||
let da_checker_metrics = beacon_chain.data_availability_checker.metrics();
|
||||
|
||||
set_gauge_by_usize(
|
||||
&DATA_AVAILABILITY_OVERFLOW_MEMORY_BLOCK_CACHE_SIZE,
|
||||
da_checker_metrics.block_cache_size,
|
||||
da_checker_metrics.block.block_cache_size,
|
||||
);
|
||||
|
||||
if let Some((size, num_lookups)) = beacon_chain.pre_finalization_block_cache.metrics() {
|
||||
|
||||
@@ -173,7 +173,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
) -> Result<AvailabilityProcessingStatus, EnvelopeError> {
|
||||
match availability {
|
||||
AvailabilityOutcome::Block(_) => {
|
||||
return Err(EnvelopeError::InternalError("Received a block availability outcome variant when a payload envelope variant was expected".to_string()))
|
||||
Err(EnvelopeError::InternalError("Received a block availability outcome variant when a payload envelope variant was expected".to_string()))
|
||||
}
|
||||
AvailabilityOutcome::Payload(availability) => match availability {
|
||||
PayloadAvailability::Available(available_envelope) => {
|
||||
|
||||
Reference in New Issue
Block a user