mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-02 16:21:42 +00:00
Fix unexpected blob error and duplicate import in fetch blobs (#7541)
Getting this error on a non-PeerDAS network:
```
May 29 13:30:13.484 ERROR Error fetching or processing blobs from EL error: BlobProcessingError(AvailabilityCheck(Unexpected("empty blobs"))), block_root: 0x98aa3927056d453614fefbc79eb1f9865666d1f119d0e8aa9e6f4d02aa9395d9
```
It appears we're passing an empty `Vec` to DA checker, because all blobs were already seen on gossip and filtered out, this causes a `AvailabilityCheckError::Unexpected("empty blobs")`.
I've added equivalent unit tests for `getBlobsV1` to cover all the scenarios we test in `getBlobsV2`. This would have caught the bug if I had added it earlier. It also caught another bug which could trigger duplicate block import.
Thanks Santito for reporting this! 🙏
This commit is contained in:
@@ -42,6 +42,7 @@ use types::{
|
||||
/// Result from engine get blobs to be passed onto `DataAvailabilityChecker` and published to the
|
||||
/// gossip network. The blobs / data columns have not been marked as observed yet, as they may not
|
||||
/// be published immediately.
|
||||
#[derive(Debug)]
|
||||
pub enum EngineGetBlobsOutput<T: BeaconChainTypes> {
|
||||
Blobs(Vec<GossipVerifiedBlob<T, DoNotObserve>>),
|
||||
/// A filtered list of custody data columns to be imported into the `DataAvailabilityChecker`.
|
||||
@@ -163,9 +164,22 @@ async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
|
||||
inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL);
|
||||
return Ok(None);
|
||||
} else {
|
||||
debug!(
|
||||
num_expected_blobs,
|
||||
num_fetched_blobs, "Received blobs from the EL"
|
||||
);
|
||||
inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL);
|
||||
}
|
||||
|
||||
if chain_adapter.fork_choice_contains_block(&block_root) {
|
||||
// Avoid computing sidecars if the block has already been imported.
|
||||
debug!(
|
||||
info = "block has already been imported",
|
||||
"Ignoring EL blobs response"
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let (signed_block_header, kzg_commitments_proof) = block
|
||||
.signed_block_header_and_kzg_commitments_proof()
|
||||
.map_err(FetchEngineBlobError::BeaconStateError)?;
|
||||
@@ -197,13 +211,13 @@ async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map_err(FetchEngineBlobError::GossipBlob)?;
|
||||
|
||||
if !blobs_to_import_and_publish.is_empty() {
|
||||
publish_fn(EngineGetBlobsOutput::Blobs(
|
||||
blobs_to_import_and_publish.clone(),
|
||||
));
|
||||
if blobs_to_import_and_publish.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
debug!(num_fetched_blobs, "Processing engine blobs");
|
||||
publish_fn(EngineGetBlobsOutput::Blobs(
|
||||
blobs_to_import_and_publish.clone(),
|
||||
));
|
||||
|
||||
let availability_processing_status = chain_adapter
|
||||
.process_engine_blobs(
|
||||
|
||||
@@ -7,227 +7,509 @@ use crate::test_utils::{get_kzg, EphemeralHarnessType};
|
||||
use crate::AvailabilityProcessingStatus;
|
||||
use bls::Signature;
|
||||
use eth2::types::BlobsBundle;
|
||||
use execution_layer::json_structures::BlobAndProofV2;
|
||||
use execution_layer::json_structures::{BlobAndProof, BlobAndProofV1, BlobAndProofV2};
|
||||
use execution_layer::test_utils::generate_blobs;
|
||||
use maplit::hashset;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use task_executor::test_utils::TestRuntime;
|
||||
use types::{
|
||||
BeaconBlockFulu, EmptyBlock, EthSpec, ForkName, Hash256, MainnetEthSpec, SignedBeaconBlock,
|
||||
SignedBeaconBlockFulu,
|
||||
BeaconBlock, BeaconBlockFulu, EmptyBlock, EthSpec, ForkName, Hash256, MainnetEthSpec,
|
||||
SignedBeaconBlock, SignedBeaconBlockFulu,
|
||||
};
|
||||
|
||||
type E = MainnetEthSpec;
|
||||
type T = EphemeralHarnessType<E>;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fetch_blobs_v2_no_blobs_in_block() {
|
||||
let mut mock_adapter = mock_beacon_adapter();
|
||||
let (publish_fn, _s) = mock_publish_fn();
|
||||
let block = SignedBeaconBlock::<E>::Fulu(SignedBeaconBlockFulu {
|
||||
message: BeaconBlockFulu::empty(mock_adapter.spec()),
|
||||
signature: Signature::empty(),
|
||||
});
|
||||
let block_root = block.canonical_root();
|
||||
mod get_blobs_v2 {
|
||||
use super::*;
|
||||
|
||||
// Expectations: engine fetch blobs should not be triggered
|
||||
mock_adapter.expect_get_blobs_v2().times(0);
|
||||
mock_adapter.expect_process_engine_blobs().times(0);
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fetch_blobs_v2_no_blobs_in_block() {
|
||||
let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu);
|
||||
let (publish_fn, _s) = mock_publish_fn();
|
||||
let block = SignedBeaconBlock::<E>::Fulu(SignedBeaconBlockFulu {
|
||||
message: BeaconBlockFulu::empty(mock_adapter.spec()),
|
||||
signature: Signature::empty(),
|
||||
});
|
||||
let block_root = block.canonical_root();
|
||||
|
||||
let custody_columns = hashset![0, 1, 2];
|
||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||
mock_adapter,
|
||||
block_root,
|
||||
Arc::new(block),
|
||||
custody_columns.clone(),
|
||||
publish_fn,
|
||||
)
|
||||
.await
|
||||
.expect("fetch blobs should succeed");
|
||||
// Expectations: engine fetch blobs should not be triggered
|
||||
mock_adapter.expect_get_blobs_v2().times(0);
|
||||
mock_adapter.expect_process_engine_blobs().times(0);
|
||||
|
||||
assert_eq!(processing_status, None);
|
||||
let custody_columns = hashset![0, 1, 2];
|
||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||
mock_adapter,
|
||||
block_root,
|
||||
Arc::new(block),
|
||||
custody_columns.clone(),
|
||||
publish_fn,
|
||||
)
|
||||
.await
|
||||
.expect("fetch blobs should succeed");
|
||||
|
||||
assert_eq!(processing_status, None);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fetch_blobs_v2_no_blobs_returned() {
|
||||
let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu);
|
||||
let (publish_fn, _) = mock_publish_fn();
|
||||
let (block, _blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
|
||||
let block_root = block.canonical_root();
|
||||
|
||||
// No blobs in EL response
|
||||
mock_get_blobs_v2_response(&mut mock_adapter, None);
|
||||
|
||||
// Trigger fetch blobs on the block
|
||||
let custody_columns = hashset![0, 1, 2];
|
||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||
mock_adapter,
|
||||
block_root,
|
||||
block,
|
||||
custody_columns.clone(),
|
||||
publish_fn,
|
||||
)
|
||||
.await
|
||||
.expect("fetch blobs should succeed");
|
||||
|
||||
assert_eq!(processing_status, None);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fetch_blobs_v2_partial_blobs_returned() {
|
||||
let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu);
|
||||
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||
let (block, mut blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
|
||||
let block_root = block.canonical_root();
|
||||
|
||||
// Missing blob in EL response
|
||||
blobs_and_proofs.pop();
|
||||
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
|
||||
// No blobs should be processed
|
||||
mock_adapter.expect_process_engine_blobs().times(0);
|
||||
|
||||
// Trigger fetch blobs on the block
|
||||
let custody_columns = hashset![0, 1, 2];
|
||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||
mock_adapter,
|
||||
block_root,
|
||||
block,
|
||||
custody_columns.clone(),
|
||||
publish_fn,
|
||||
)
|
||||
.await
|
||||
.expect("fetch blobs should succeed");
|
||||
|
||||
assert_eq!(processing_status, None);
|
||||
assert_eq!(
|
||||
publish_fn_args.lock().unwrap().len(),
|
||||
0,
|
||||
"no columns should be published"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fetch_blobs_v2_block_imported_after_el_response() {
|
||||
let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu);
|
||||
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
|
||||
let block_root = block.canonical_root();
|
||||
|
||||
// All blobs returned, but fork choice already imported the block
|
||||
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
|
||||
mock_fork_choice_contains_block(&mut mock_adapter, vec![block.canonical_root()]);
|
||||
// No blobs should be processed
|
||||
mock_adapter.expect_process_engine_blobs().times(0);
|
||||
|
||||
// Trigger fetch blobs on the block
|
||||
let custody_columns = hashset![0, 1, 2];
|
||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||
mock_adapter,
|
||||
block_root,
|
||||
block,
|
||||
custody_columns.clone(),
|
||||
publish_fn,
|
||||
)
|
||||
.await
|
||||
.expect("fetch blobs should succeed");
|
||||
|
||||
assert_eq!(processing_status, None);
|
||||
assert_eq!(
|
||||
publish_fn_args.lock().unwrap().len(),
|
||||
0,
|
||||
"no columns should be published"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fetch_blobs_v2_no_new_columns_to_import() {
|
||||
let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu);
|
||||
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
|
||||
let block_root = block.canonical_root();
|
||||
|
||||
// **GIVEN**:
|
||||
// All blobs returned
|
||||
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
|
||||
// block not yet imported into fork choice
|
||||
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
|
||||
// All data columns already seen on gossip
|
||||
mock_adapter
|
||||
.expect_verify_data_column_for_gossip()
|
||||
.returning(|c| {
|
||||
Err(GossipDataColumnError::PriorKnown {
|
||||
proposer: c.block_proposer_index(),
|
||||
slot: c.slot(),
|
||||
index: c.index,
|
||||
})
|
||||
});
|
||||
// No blobs should be processed
|
||||
mock_adapter.expect_process_engine_blobs().times(0);
|
||||
|
||||
// **WHEN**: Trigger `fetch_blobs` on the block
|
||||
let custody_columns = hashset![0, 1, 2];
|
||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||
mock_adapter,
|
||||
block_root,
|
||||
block,
|
||||
custody_columns.clone(),
|
||||
publish_fn,
|
||||
)
|
||||
.await
|
||||
.expect("fetch blobs should succeed");
|
||||
|
||||
// **THEN**: Should NOT be processed and no columns should be published.
|
||||
assert_eq!(processing_status, None);
|
||||
assert_eq!(
|
||||
publish_fn_args.lock().unwrap().len(),
|
||||
0,
|
||||
"no columns should be published"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fetch_blobs_v2_success() {
|
||||
let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu);
|
||||
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
|
||||
let block_root = block.canonical_root();
|
||||
|
||||
// All blobs returned, fork choice doesn't contain block
|
||||
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
|
||||
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
|
||||
mock_adapter
|
||||
.expect_verify_data_column_for_gossip()
|
||||
.returning(|c| Ok(GossipVerifiedDataColumn::__new_for_testing(c)));
|
||||
mock_process_engine_blobs_result(
|
||||
&mut mock_adapter,
|
||||
Ok(AvailabilityProcessingStatus::Imported(block_root)),
|
||||
);
|
||||
|
||||
// Trigger fetch blobs on the block
|
||||
let custody_columns = hashset![0, 1, 2];
|
||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||
mock_adapter,
|
||||
block_root,
|
||||
block,
|
||||
custody_columns.clone(),
|
||||
publish_fn,
|
||||
)
|
||||
.await
|
||||
.expect("fetch blobs should succeed");
|
||||
|
||||
assert_eq!(
|
||||
processing_status,
|
||||
Some(AvailabilityProcessingStatus::Imported(block_root))
|
||||
);
|
||||
|
||||
let published_columns = extract_published_blobs(publish_fn_args);
|
||||
assert!(
|
||||
matches!(
|
||||
published_columns,
|
||||
EngineGetBlobsOutput::CustodyColumns(columns) if columns.len() == custody_columns.len()
|
||||
),
|
||||
"should publish custody columns"
|
||||
);
|
||||
}
|
||||
|
||||
fn mock_get_blobs_v2_response(
|
||||
mock_adapter: &mut MockFetchBlobsBeaconAdapter<T>,
|
||||
blobs_and_proofs_opt: Option<Vec<BlobAndProof<E>>>,
|
||||
) {
|
||||
let blobs_and_proofs_v2_opt = blobs_and_proofs_opt.map(|blobs_and_proofs| {
|
||||
blobs_and_proofs
|
||||
.into_iter()
|
||||
.map(|blob_and_proof| match blob_and_proof {
|
||||
BlobAndProof::V2(inner) => inner,
|
||||
_ => panic!("BlobAndProofV2 not expected"),
|
||||
})
|
||||
.collect()
|
||||
});
|
||||
mock_adapter
|
||||
.expect_get_blobs_v2()
|
||||
.return_once(move |_| Ok(blobs_and_proofs_v2_opt));
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fetch_blobs_v2_no_blobs_returned() {
|
||||
let mut mock_adapter = mock_beacon_adapter();
|
||||
let (publish_fn, _) = mock_publish_fn();
|
||||
let (block, _blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter);
|
||||
let block_root = block.canonical_root();
|
||||
mod get_blobs_v1 {
|
||||
use super::*;
|
||||
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
|
||||
use crate::block_verification_types::AsBlock;
|
||||
|
||||
// No blobs in EL response
|
||||
mock_get_blobs_v2_response(&mut mock_adapter, None);
|
||||
const ELECTRA_FORK: ForkName = ForkName::Electra;
|
||||
|
||||
// Trigger fetch blobs on the block
|
||||
let custody_columns = hashset![0, 1, 2];
|
||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||
mock_adapter,
|
||||
block_root,
|
||||
block,
|
||||
custody_columns.clone(),
|
||||
publish_fn,
|
||||
)
|
||||
.await
|
||||
.expect("fetch blobs should succeed");
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fetch_blobs_v1_no_blobs_in_block() {
|
||||
let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK);
|
||||
let spec = mock_adapter.spec();
|
||||
let (publish_fn, _s) = mock_publish_fn();
|
||||
let block_no_blobs =
|
||||
SignedBeaconBlock::from_block(BeaconBlock::empty(spec), Signature::empty());
|
||||
let block_root = block_no_blobs.canonical_root();
|
||||
|
||||
assert_eq!(processing_status, None);
|
||||
}
|
||||
// Expectations: engine fetch blobs should not be triggered
|
||||
mock_adapter.expect_get_blobs_v1().times(0);
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fetch_blobs_v2_partial_blobs_returned() {
|
||||
let mut mock_adapter = mock_beacon_adapter();
|
||||
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||
let (block, mut blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter);
|
||||
let block_root = block.canonical_root();
|
||||
// WHEN: Trigger fetch blobs on the block
|
||||
let custody_columns = hashset![0, 1, 2];
|
||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||
mock_adapter,
|
||||
block_root,
|
||||
Arc::new(block_no_blobs),
|
||||
custody_columns,
|
||||
publish_fn,
|
||||
)
|
||||
.await
|
||||
.expect("fetch blobs should succeed");
|
||||
|
||||
// Missing blob in EL response
|
||||
blobs_and_proofs.pop();
|
||||
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
|
||||
// No blobs should be processed
|
||||
mock_adapter.expect_process_engine_blobs().times(0);
|
||||
// THEN: No blob is processed
|
||||
assert_eq!(processing_status, None);
|
||||
}
|
||||
|
||||
// Trigger fetch blobs on the block
|
||||
let custody_columns = hashset![0, 1, 2];
|
||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||
mock_adapter,
|
||||
block_root,
|
||||
block,
|
||||
custody_columns.clone(),
|
||||
publish_fn,
|
||||
)
|
||||
.await
|
||||
.expect("fetch blobs should succeed");
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fetch_blobs_v1_no_blobs_returned() {
|
||||
let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK);
|
||||
let (publish_fn, _) = mock_publish_fn();
|
||||
let (block, _blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
|
||||
let block_root = block.canonical_root();
|
||||
|
||||
assert_eq!(processing_status, None);
|
||||
assert_eq!(
|
||||
publish_fn_args.lock().unwrap().len(),
|
||||
0,
|
||||
"no columns should be published"
|
||||
);
|
||||
}
|
||||
// GIVEN: No blobs in EL response
|
||||
let expected_blob_count = block.message().body().blob_kzg_commitments().unwrap().len();
|
||||
mock_get_blobs_v1_response(&mut mock_adapter, vec![None; expected_blob_count]);
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fetch_blobs_v2_block_imported_after_el_response() {
|
||||
let mut mock_adapter = mock_beacon_adapter();
|
||||
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter);
|
||||
let block_root = block.canonical_root();
|
||||
// WHEN: Trigger fetch blobs on the block
|
||||
let custody_columns = hashset![0, 1, 2];
|
||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||
mock_adapter,
|
||||
block_root,
|
||||
block,
|
||||
custody_columns,
|
||||
publish_fn,
|
||||
)
|
||||
.await
|
||||
.expect("fetch blobs should succeed");
|
||||
|
||||
// All blobs returned, but fork choice already imported the block
|
||||
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
|
||||
mock_fork_choice_contains_block(&mut mock_adapter, vec![block.canonical_root()]);
|
||||
// No blobs should be processed
|
||||
mock_adapter.expect_process_engine_blobs().times(0);
|
||||
// THEN: No blob is processed
|
||||
assert_eq!(processing_status, None);
|
||||
}
|
||||
|
||||
// Trigger fetch blobs on the block
|
||||
let custody_columns = hashset![0, 1, 2];
|
||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||
mock_adapter,
|
||||
block_root,
|
||||
block,
|
||||
custody_columns.clone(),
|
||||
publish_fn,
|
||||
)
|
||||
.await
|
||||
.expect("fetch blobs should succeed");
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fetch_blobs_v1_partial_blobs_returned() {
|
||||
let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK);
|
||||
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||
let blob_count = 2;
|
||||
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, blob_count);
|
||||
let block_slot = block.slot();
|
||||
let block_root = block.canonical_root();
|
||||
|
||||
assert_eq!(processing_status, None);
|
||||
assert_eq!(
|
||||
publish_fn_args.lock().unwrap().len(),
|
||||
0,
|
||||
"no columns should be published"
|
||||
);
|
||||
}
|
||||
// GIVEN: Missing a blob in EL response (remove 1 blob from response)
|
||||
let mut blob_and_proof_opts = blobs_and_proofs.into_iter().map(Some).collect::<Vec<_>>();
|
||||
blob_and_proof_opts.first_mut().unwrap().take();
|
||||
mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts);
|
||||
// AND block is not imported into fork choice
|
||||
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
|
||||
// AND all blobs returned are valid
|
||||
mock_adapter
|
||||
.expect_verify_blob_for_gossip()
|
||||
.returning(|b| Ok(GossipVerifiedBlob::__assumed_valid(b.clone())));
|
||||
// Returned blobs should be processed
|
||||
mock_process_engine_blobs_result(
|
||||
&mut mock_adapter,
|
||||
Ok(AvailabilityProcessingStatus::MissingComponents(
|
||||
block_slot, block_root,
|
||||
)),
|
||||
);
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fetch_blobs_v2_no_new_columns_to_import() {
|
||||
let mut mock_adapter = mock_beacon_adapter();
|
||||
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter);
|
||||
let block_root = block.canonical_root();
|
||||
// WHEN: Trigger fetch blobs on the block
|
||||
let custody_columns = hashset![0, 1, 2];
|
||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||
mock_adapter,
|
||||
block_root,
|
||||
block,
|
||||
custody_columns,
|
||||
publish_fn,
|
||||
)
|
||||
.await
|
||||
.expect("fetch blobs should succeed");
|
||||
|
||||
// **GIVEN**:
|
||||
// All blobs returned
|
||||
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
|
||||
// block not yet imported into fork choice
|
||||
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
|
||||
// All data columns already seen on gossip
|
||||
mock_adapter
|
||||
.expect_verify_data_column_for_gossip()
|
||||
.returning(|c| {
|
||||
Err(GossipDataColumnError::PriorKnown {
|
||||
proposer: c.block_proposer_index(),
|
||||
slot: c.slot(),
|
||||
index: c.index,
|
||||
// THEN: Returned blobs are processed and published
|
||||
assert_eq!(
|
||||
processing_status,
|
||||
Some(AvailabilityProcessingStatus::MissingComponents(
|
||||
block_slot, block_root,
|
||||
))
|
||||
);
|
||||
assert!(
|
||||
matches!(
|
||||
extract_published_blobs(publish_fn_args),
|
||||
EngineGetBlobsOutput::Blobs(blobs) if blobs.len() == blob_count - 1
|
||||
),
|
||||
"partial blob results should still be published"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fetch_blobs_v1_block_imported_after_el_response() {
|
||||
let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK);
|
||||
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
|
||||
let block_root = block.canonical_root();
|
||||
|
||||
// GIVEN: All blobs returned, but fork choice already imported the block
|
||||
let blob_and_proof_opts = blobs_and_proofs.into_iter().map(Some).collect::<Vec<_>>();
|
||||
mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts);
|
||||
mock_fork_choice_contains_block(&mut mock_adapter, vec![block.canonical_root()]);
|
||||
|
||||
// WHEN: Trigger fetch blobs on the block
|
||||
let custody_columns = hashset![0, 1, 2];
|
||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||
mock_adapter,
|
||||
block_root,
|
||||
block,
|
||||
custody_columns,
|
||||
publish_fn,
|
||||
)
|
||||
.await
|
||||
.expect("fetch blobs should succeed");
|
||||
|
||||
// THEN: Returned blobs should NOT be processed or published.
|
||||
assert_eq!(processing_status, None);
|
||||
assert_eq!(
|
||||
publish_fn_args.lock().unwrap().len(),
|
||||
0,
|
||||
"no blobs should be published"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fetch_blobs_v1_no_new_blobs_to_import() {
|
||||
let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK);
|
||||
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
|
||||
let block_root = block.canonical_root();
|
||||
|
||||
// **GIVEN**:
|
||||
// All blobs returned
|
||||
let blob_and_proof_opts = blobs_and_proofs.into_iter().map(Some).collect::<Vec<_>>();
|
||||
mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts);
|
||||
// block not yet imported into fork choice
|
||||
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
|
||||
// All blobs already seen on gossip
|
||||
mock_adapter.expect_verify_blob_for_gossip().returning(|b| {
|
||||
Err(GossipBlobError::RepeatBlob {
|
||||
proposer: b.block_proposer_index(),
|
||||
slot: b.slot(),
|
||||
index: b.index,
|
||||
})
|
||||
});
|
||||
// No blobs should be processed
|
||||
mock_adapter.expect_process_engine_blobs().times(0);
|
||||
|
||||
// **WHEN**: Trigger `fetch_blobs` on the block
|
||||
let custody_columns = hashset![0, 1, 2];
|
||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||
mock_adapter,
|
||||
block_root,
|
||||
block,
|
||||
custody_columns.clone(),
|
||||
publish_fn,
|
||||
)
|
||||
.await
|
||||
.expect("fetch blobs should succeed");
|
||||
// **WHEN**: Trigger `fetch_blobs` on the block
|
||||
let custody_columns = hashset![0, 1, 2];
|
||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||
mock_adapter,
|
||||
block_root,
|
||||
block,
|
||||
custody_columns,
|
||||
publish_fn,
|
||||
)
|
||||
.await
|
||||
.expect("fetch blobs should succeed");
|
||||
|
||||
// **THEN**: Should NOT be processed and no columns should be published.
|
||||
assert_eq!(processing_status, None);
|
||||
assert_eq!(
|
||||
publish_fn_args.lock().unwrap().len(),
|
||||
0,
|
||||
"no columns should be published"
|
||||
);
|
||||
}
|
||||
// **THEN**: Should NOT be processed and no blobs should be published.
|
||||
assert_eq!(processing_status, None);
|
||||
assert_eq!(
|
||||
publish_fn_args.lock().unwrap().len(),
|
||||
0,
|
||||
"no blobs should be published"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fetch_blobs_v2_success() {
|
||||
let mut mock_adapter = mock_beacon_adapter();
|
||||
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter);
|
||||
let block_root = block.canonical_root();
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fetch_blobs_v1_success() {
|
||||
let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK);
|
||||
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||
let blob_count = 2;
|
||||
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, blob_count);
|
||||
let block_root = block.canonical_root();
|
||||
|
||||
// All blobs returned, fork choice doesn't contain block
|
||||
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
|
||||
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
|
||||
mock_adapter
|
||||
.expect_verify_data_column_for_gossip()
|
||||
.returning(|c| Ok(GossipVerifiedDataColumn::__new_for_testing(c)));
|
||||
mock_process_engine_blobs_result(
|
||||
&mut mock_adapter,
|
||||
Ok(AvailabilityProcessingStatus::Imported(block_root)),
|
||||
);
|
||||
// All blobs returned, fork choice doesn't contain block
|
||||
let blob_and_proof_opts = blobs_and_proofs.into_iter().map(Some).collect::<Vec<_>>();
|
||||
mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts);
|
||||
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
|
||||
mock_adapter
|
||||
.expect_verify_blob_for_gossip()
|
||||
.returning(|b| Ok(GossipVerifiedBlob::__assumed_valid(b.clone())));
|
||||
mock_process_engine_blobs_result(
|
||||
&mut mock_adapter,
|
||||
Ok(AvailabilityProcessingStatus::Imported(block_root)),
|
||||
);
|
||||
|
||||
// Trigger fetch blobs on the block
|
||||
let custody_columns = hashset![0, 1, 2];
|
||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||
mock_adapter,
|
||||
block_root,
|
||||
block,
|
||||
custody_columns.clone(),
|
||||
publish_fn,
|
||||
)
|
||||
.await
|
||||
.expect("fetch blobs should succeed");
|
||||
// Trigger fetch blobs on the block
|
||||
let custody_columns = hashset![0, 1, 2];
|
||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||
mock_adapter,
|
||||
block_root,
|
||||
block,
|
||||
custody_columns,
|
||||
publish_fn,
|
||||
)
|
||||
.await
|
||||
.expect("fetch blobs should succeed");
|
||||
|
||||
assert_eq!(
|
||||
processing_status,
|
||||
Some(AvailabilityProcessingStatus::Imported(block_root))
|
||||
);
|
||||
// THEN all fetched blobs are processed and published
|
||||
assert_eq!(
|
||||
processing_status,
|
||||
Some(AvailabilityProcessingStatus::Imported(block_root))
|
||||
);
|
||||
|
||||
let published_columns = extract_published_blobs(publish_fn_args);
|
||||
assert!(
|
||||
matches!(
|
||||
published_columns,
|
||||
EngineGetBlobsOutput::CustodyColumns(columns) if columns.len() == custody_columns.len()
|
||||
),
|
||||
"should publish custody columns"
|
||||
);
|
||||
let published_blobs = extract_published_blobs(publish_fn_args);
|
||||
assert!(
|
||||
matches!(
|
||||
published_blobs,
|
||||
EngineGetBlobsOutput::Blobs(blobs) if blobs.len() == blob_count
|
||||
),
|
||||
"should publish fetched blobs"
|
||||
);
|
||||
}
|
||||
|
||||
fn mock_get_blobs_v1_response(
|
||||
mock_adapter: &mut MockFetchBlobsBeaconAdapter<T>,
|
||||
blobs_and_proofs_opt: Vec<Option<BlobAndProof<E>>>,
|
||||
) {
|
||||
let blobs_and_proofs_v1 = blobs_and_proofs_opt
|
||||
.into_iter()
|
||||
.map(|blob_and_proof_opt| {
|
||||
blob_and_proof_opt.map(|blob_and_proof| match blob_and_proof {
|
||||
BlobAndProof::V1(inner) => inner,
|
||||
_ => panic!("BlobAndProofV1 not expected"),
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
mock_adapter
|
||||
.expect_get_blobs_v1()
|
||||
.return_once(move |_| Ok(blobs_and_proofs_v1));
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract the `EngineGetBlobsOutput` passed to the `publish_fn`.
|
||||
@@ -257,23 +539,14 @@ fn mock_fork_choice_contains_block(
|
||||
.returning(move |block_root| block_roots.contains(block_root));
|
||||
}
|
||||
|
||||
fn mock_get_blobs_v2_response(
|
||||
mock_adapter: &mut MockFetchBlobsBeaconAdapter<T>,
|
||||
blobs_and_proofs_opt: Option<Vec<BlobAndProofV2<E>>>,
|
||||
) {
|
||||
mock_adapter
|
||||
.expect_get_blobs_v2()
|
||||
.return_once(move |_| Ok(blobs_and_proofs_opt));
|
||||
}
|
||||
|
||||
fn create_test_block_and_blobs(
|
||||
mock_adapter: &MockFetchBlobsBeaconAdapter<T>,
|
||||
) -> (Arc<SignedBeaconBlock<E>>, Vec<BlobAndProofV2<E>>) {
|
||||
let mut block = SignedBeaconBlock::Fulu(SignedBeaconBlockFulu {
|
||||
message: BeaconBlockFulu::empty(mock_adapter.spec()),
|
||||
signature: Signature::empty(),
|
||||
});
|
||||
let (blobs_bundle, _tx) = generate_blobs::<E>(2, block.fork_name_unchecked()).unwrap();
|
||||
blob_count: usize,
|
||||
) -> (Arc<SignedBeaconBlock<E>>, Vec<BlobAndProof<E>>) {
|
||||
let mut block =
|
||||
SignedBeaconBlock::from_block(BeaconBlock::empty(mock_adapter.spec()), Signature::empty());
|
||||
let fork = block.fork_name_unchecked();
|
||||
let (blobs_bundle, _tx) = generate_blobs::<E>(blob_count, fork).unwrap();
|
||||
let BlobsBundle {
|
||||
commitments,
|
||||
proofs,
|
||||
@@ -286,16 +559,27 @@ fn create_test_block_and_blobs(
|
||||
.blob_kzg_commitments_mut()
|
||||
.unwrap() = commitments;
|
||||
|
||||
let proofs_len = proofs.len() / blobs.len();
|
||||
let blob_and_proofs: Vec<BlobAndProofV2<E>> = blobs
|
||||
.into_iter()
|
||||
.zip(proofs.chunks(proofs_len))
|
||||
.map(|(blob, proofs)| BlobAndProofV2 {
|
||||
blob,
|
||||
proofs: proofs.to_vec().into(),
|
||||
})
|
||||
.collect();
|
||||
(Arc::new(block), blob_and_proofs)
|
||||
let blobs_and_proofs = if fork.fulu_enabled() {
|
||||
let proofs_len = proofs.len() / blobs.len();
|
||||
blobs
|
||||
.into_iter()
|
||||
.zip(proofs.chunks(proofs_len))
|
||||
.map(|(blob, proofs)| {
|
||||
BlobAndProof::V2(BlobAndProofV2 {
|
||||
blob,
|
||||
proofs: proofs.to_vec().into(),
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
} else {
|
||||
blobs
|
||||
.into_iter()
|
||||
.zip(proofs)
|
||||
.map(|(blob, proof)| BlobAndProof::V1(BlobAndProofV1 { blob, proof }))
|
||||
.collect()
|
||||
};
|
||||
|
||||
(Arc::new(block), blobs_and_proofs)
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
@@ -313,9 +597,9 @@ fn mock_publish_fn() -> (
|
||||
(publish_fn, captured_args)
|
||||
}
|
||||
|
||||
fn mock_beacon_adapter() -> MockFetchBlobsBeaconAdapter<T> {
|
||||
fn mock_beacon_adapter(fork_name: ForkName) -> MockFetchBlobsBeaconAdapter<T> {
|
||||
let test_runtime = TestRuntime::default();
|
||||
let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec()));
|
||||
let spec = Arc::new(fork_name.make_genesis_spec(E::default_spec()));
|
||||
let kzg = get_kzg(&spec);
|
||||
|
||||
let mut mock_adapter = MockFetchBlobsBeaconAdapter::default();
|
||||
|
||||
@@ -161,6 +161,7 @@ pub trait ObservationStrategy {
|
||||
/// Type for messages that are observed immediately.
|
||||
pub struct Observe;
|
||||
/// Type for messages that have not been observed.
|
||||
#[derive(Debug)]
|
||||
pub struct DoNotObserve;
|
||||
|
||||
impl ObservationStrategy for Observe {
|
||||
|
||||
Reference in New Issue
Block a user