Fix unexpected blob error and duplicate import in fetch blobs (#7541)

Getting this error on a non-PeerDAS network:

```
May 29 13:30:13.484 ERROR Error fetching or processing blobs from EL    error: BlobProcessingError(AvailabilityCheck(Unexpected("empty blobs"))), block_root: 0x98aa3927056d453614fefbc79eb1f9865666d1f119d0e8aa9e6f4d02aa9395d9
```

It appears we're passing an empty `Vec` to DA checker, because all blobs were already seen on gossip and filtered out, this causes a `AvailabilityCheckError::Unexpected("empty blobs")`.

I've added equivalent unit tests for `getBlobsV1` to cover all the scenarios we test in `getBlobsV2`. This would have caught the bug if I had added it earlier. It also caught another bug which could trigger duplicate block import.

Thanks Santito for reporting this! 🙏
This commit is contained in:
Jimmy Chen
2025-06-02 11:51:09 +10:00
committed by GitHub
parent 886ceb7e25
commit 94a1446ac9
3 changed files with 515 additions and 216 deletions

View File

@@ -42,6 +42,7 @@ use types::{
/// Result from engine get blobs to be passed onto `DataAvailabilityChecker` and published to the
/// gossip network. The blobs / data columns have not been marked as observed yet, as they may not
/// be published immediately.
#[derive(Debug)]
pub enum EngineGetBlobsOutput<T: BeaconChainTypes> {
Blobs(Vec<GossipVerifiedBlob<T, DoNotObserve>>),
/// A filtered list of custody data columns to be imported into the `DataAvailabilityChecker`.
@@ -163,9 +164,22 @@ async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL);
return Ok(None);
} else {
debug!(
num_expected_blobs,
num_fetched_blobs, "Received blobs from the EL"
);
inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL);
}
if chain_adapter.fork_choice_contains_block(&block_root) {
// Avoid computing sidecars if the block has already been imported.
debug!(
info = "block has already been imported",
"Ignoring EL blobs response"
);
return Ok(None);
}
let (signed_block_header, kzg_commitments_proof) = block
.signed_block_header_and_kzg_commitments_proof()
.map_err(FetchEngineBlobError::BeaconStateError)?;
@@ -197,13 +211,13 @@ async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
.collect::<Result<Vec<_>, _>>()
.map_err(FetchEngineBlobError::GossipBlob)?;
if !blobs_to_import_and_publish.is_empty() {
publish_fn(EngineGetBlobsOutput::Blobs(
blobs_to_import_and_publish.clone(),
));
if blobs_to_import_and_publish.is_empty() {
return Ok(None);
}
debug!(num_fetched_blobs, "Processing engine blobs");
publish_fn(EngineGetBlobsOutput::Blobs(
blobs_to_import_and_publish.clone(),
));
let availability_processing_status = chain_adapter
.process_engine_blobs(

View File

@@ -7,227 +7,509 @@ use crate::test_utils::{get_kzg, EphemeralHarnessType};
use crate::AvailabilityProcessingStatus;
use bls::Signature;
use eth2::types::BlobsBundle;
use execution_layer::json_structures::BlobAndProofV2;
use execution_layer::json_structures::{BlobAndProof, BlobAndProofV1, BlobAndProofV2};
use execution_layer::test_utils::generate_blobs;
use maplit::hashset;
use std::sync::{Arc, Mutex};
use task_executor::test_utils::TestRuntime;
use types::{
BeaconBlockFulu, EmptyBlock, EthSpec, ForkName, Hash256, MainnetEthSpec, SignedBeaconBlock,
SignedBeaconBlockFulu,
BeaconBlock, BeaconBlockFulu, EmptyBlock, EthSpec, ForkName, Hash256, MainnetEthSpec,
SignedBeaconBlock, SignedBeaconBlockFulu,
};
type E = MainnetEthSpec;
type T = EphemeralHarnessType<E>;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fetch_blobs_v2_no_blobs_in_block() {
let mut mock_adapter = mock_beacon_adapter();
let (publish_fn, _s) = mock_publish_fn();
let block = SignedBeaconBlock::<E>::Fulu(SignedBeaconBlockFulu {
message: BeaconBlockFulu::empty(mock_adapter.spec()),
signature: Signature::empty(),
});
let block_root = block.canonical_root();
mod get_blobs_v2 {
use super::*;
// Expectations: engine fetch blobs should not be triggered
mock_adapter.expect_get_blobs_v2().times(0);
mock_adapter.expect_process_engine_blobs().times(0);
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fetch_blobs_v2_no_blobs_in_block() {
let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu);
let (publish_fn, _s) = mock_publish_fn();
let block = SignedBeaconBlock::<E>::Fulu(SignedBeaconBlockFulu {
message: BeaconBlockFulu::empty(mock_adapter.spec()),
signature: Signature::empty(),
});
let block_root = block.canonical_root();
let custody_columns = hashset![0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
Arc::new(block),
custody_columns.clone(),
publish_fn,
)
.await
.expect("fetch blobs should succeed");
// Expectations: engine fetch blobs should not be triggered
mock_adapter.expect_get_blobs_v2().times(0);
mock_adapter.expect_process_engine_blobs().times(0);
assert_eq!(processing_status, None);
let custody_columns = hashset![0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
Arc::new(block),
custody_columns.clone(),
publish_fn,
)
.await
.expect("fetch blobs should succeed");
assert_eq!(processing_status, None);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fetch_blobs_v2_no_blobs_returned() {
let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu);
let (publish_fn, _) = mock_publish_fn();
let (block, _blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
let block_root = block.canonical_root();
// No blobs in EL response
mock_get_blobs_v2_response(&mut mock_adapter, None);
// Trigger fetch blobs on the block
let custody_columns = hashset![0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns.clone(),
publish_fn,
)
.await
.expect("fetch blobs should succeed");
assert_eq!(processing_status, None);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fetch_blobs_v2_partial_blobs_returned() {
let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu);
let (publish_fn, publish_fn_args) = mock_publish_fn();
let (block, mut blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
let block_root = block.canonical_root();
// Missing blob in EL response
blobs_and_proofs.pop();
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
// No blobs should be processed
mock_adapter.expect_process_engine_blobs().times(0);
// Trigger fetch blobs on the block
let custody_columns = hashset![0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns.clone(),
publish_fn,
)
.await
.expect("fetch blobs should succeed");
assert_eq!(processing_status, None);
assert_eq!(
publish_fn_args.lock().unwrap().len(),
0,
"no columns should be published"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fetch_blobs_v2_block_imported_after_el_response() {
let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu);
let (publish_fn, publish_fn_args) = mock_publish_fn();
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
let block_root = block.canonical_root();
// All blobs returned, but fork choice already imported the block
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
mock_fork_choice_contains_block(&mut mock_adapter, vec![block.canonical_root()]);
// No blobs should be processed
mock_adapter.expect_process_engine_blobs().times(0);
// Trigger fetch blobs on the block
let custody_columns = hashset![0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns.clone(),
publish_fn,
)
.await
.expect("fetch blobs should succeed");
assert_eq!(processing_status, None);
assert_eq!(
publish_fn_args.lock().unwrap().len(),
0,
"no columns should be published"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fetch_blobs_v2_no_new_columns_to_import() {
let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu);
let (publish_fn, publish_fn_args) = mock_publish_fn();
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
let block_root = block.canonical_root();
// **GIVEN**:
// All blobs returned
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
// block not yet imported into fork choice
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
// All data columns already seen on gossip
mock_adapter
.expect_verify_data_column_for_gossip()
.returning(|c| {
Err(GossipDataColumnError::PriorKnown {
proposer: c.block_proposer_index(),
slot: c.slot(),
index: c.index,
})
});
// No blobs should be processed
mock_adapter.expect_process_engine_blobs().times(0);
// **WHEN**: Trigger `fetch_blobs` on the block
let custody_columns = hashset![0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns.clone(),
publish_fn,
)
.await
.expect("fetch blobs should succeed");
// **THEN**: Should NOT be processed and no columns should be published.
assert_eq!(processing_status, None);
assert_eq!(
publish_fn_args.lock().unwrap().len(),
0,
"no columns should be published"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fetch_blobs_v2_success() {
let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu);
let (publish_fn, publish_fn_args) = mock_publish_fn();
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
let block_root = block.canonical_root();
// All blobs returned, fork choice doesn't contain block
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
mock_adapter
.expect_verify_data_column_for_gossip()
.returning(|c| Ok(GossipVerifiedDataColumn::__new_for_testing(c)));
mock_process_engine_blobs_result(
&mut mock_adapter,
Ok(AvailabilityProcessingStatus::Imported(block_root)),
);
// Trigger fetch blobs on the block
let custody_columns = hashset![0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns.clone(),
publish_fn,
)
.await
.expect("fetch blobs should succeed");
assert_eq!(
processing_status,
Some(AvailabilityProcessingStatus::Imported(block_root))
);
let published_columns = extract_published_blobs(publish_fn_args);
assert!(
matches!(
published_columns,
EngineGetBlobsOutput::CustodyColumns(columns) if columns.len() == custody_columns.len()
),
"should publish custody columns"
);
}
fn mock_get_blobs_v2_response(
mock_adapter: &mut MockFetchBlobsBeaconAdapter<T>,
blobs_and_proofs_opt: Option<Vec<BlobAndProof<E>>>,
) {
let blobs_and_proofs_v2_opt = blobs_and_proofs_opt.map(|blobs_and_proofs| {
blobs_and_proofs
.into_iter()
.map(|blob_and_proof| match blob_and_proof {
BlobAndProof::V2(inner) => inner,
_ => panic!("BlobAndProofV2 not expected"),
})
.collect()
});
mock_adapter
.expect_get_blobs_v2()
.return_once(move |_| Ok(blobs_and_proofs_v2_opt));
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fetch_blobs_v2_no_blobs_returned() {
let mut mock_adapter = mock_beacon_adapter();
let (publish_fn, _) = mock_publish_fn();
let (block, _blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter);
let block_root = block.canonical_root();
mod get_blobs_v1 {
use super::*;
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use crate::block_verification_types::AsBlock;
// No blobs in EL response
mock_get_blobs_v2_response(&mut mock_adapter, None);
const ELECTRA_FORK: ForkName = ForkName::Electra;
// Trigger fetch blobs on the block
let custody_columns = hashset![0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns.clone(),
publish_fn,
)
.await
.expect("fetch blobs should succeed");
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fetch_blobs_v1_no_blobs_in_block() {
let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK);
let spec = mock_adapter.spec();
let (publish_fn, _s) = mock_publish_fn();
let block_no_blobs =
SignedBeaconBlock::from_block(BeaconBlock::empty(spec), Signature::empty());
let block_root = block_no_blobs.canonical_root();
assert_eq!(processing_status, None);
}
// Expectations: engine fetch blobs should not be triggered
mock_adapter.expect_get_blobs_v1().times(0);
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fetch_blobs_v2_partial_blobs_returned() {
let mut mock_adapter = mock_beacon_adapter();
let (publish_fn, publish_fn_args) = mock_publish_fn();
let (block, mut blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter);
let block_root = block.canonical_root();
// WHEN: Trigger fetch blobs on the block
let custody_columns = hashset![0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
Arc::new(block_no_blobs),
custody_columns,
publish_fn,
)
.await
.expect("fetch blobs should succeed");
// Missing blob in EL response
blobs_and_proofs.pop();
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
// No blobs should be processed
mock_adapter.expect_process_engine_blobs().times(0);
// THEN: No blob is processed
assert_eq!(processing_status, None);
}
// Trigger fetch blobs on the block
let custody_columns = hashset![0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns.clone(),
publish_fn,
)
.await
.expect("fetch blobs should succeed");
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fetch_blobs_v1_no_blobs_returned() {
let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK);
let (publish_fn, _) = mock_publish_fn();
let (block, _blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
let block_root = block.canonical_root();
assert_eq!(processing_status, None);
assert_eq!(
publish_fn_args.lock().unwrap().len(),
0,
"no columns should be published"
);
}
// GIVEN: No blobs in EL response
let expected_blob_count = block.message().body().blob_kzg_commitments().unwrap().len();
mock_get_blobs_v1_response(&mut mock_adapter, vec![None; expected_blob_count]);
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fetch_blobs_v2_block_imported_after_el_response() {
let mut mock_adapter = mock_beacon_adapter();
let (publish_fn, publish_fn_args) = mock_publish_fn();
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter);
let block_root = block.canonical_root();
// WHEN: Trigger fetch blobs on the block
let custody_columns = hashset![0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns,
publish_fn,
)
.await
.expect("fetch blobs should succeed");
// All blobs returned, but fork choice already imported the block
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
mock_fork_choice_contains_block(&mut mock_adapter, vec![block.canonical_root()]);
// No blobs should be processed
mock_adapter.expect_process_engine_blobs().times(0);
// THEN: No blob is processed
assert_eq!(processing_status, None);
}
// Trigger fetch blobs on the block
let custody_columns = hashset![0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns.clone(),
publish_fn,
)
.await
.expect("fetch blobs should succeed");
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fetch_blobs_v1_partial_blobs_returned() {
let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK);
let (publish_fn, publish_fn_args) = mock_publish_fn();
let blob_count = 2;
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, blob_count);
let block_slot = block.slot();
let block_root = block.canonical_root();
assert_eq!(processing_status, None);
assert_eq!(
publish_fn_args.lock().unwrap().len(),
0,
"no columns should be published"
);
}
// GIVEN: Missing a blob in EL response (remove 1 blob from response)
let mut blob_and_proof_opts = blobs_and_proofs.into_iter().map(Some).collect::<Vec<_>>();
blob_and_proof_opts.first_mut().unwrap().take();
mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts);
// AND block is not imported into fork choice
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
// AND all blobs returned are valid
mock_adapter
.expect_verify_blob_for_gossip()
.returning(|b| Ok(GossipVerifiedBlob::__assumed_valid(b.clone())));
// Returned blobs should be processed
mock_process_engine_blobs_result(
&mut mock_adapter,
Ok(AvailabilityProcessingStatus::MissingComponents(
block_slot, block_root,
)),
);
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fetch_blobs_v2_no_new_columns_to_import() {
let mut mock_adapter = mock_beacon_adapter();
let (publish_fn, publish_fn_args) = mock_publish_fn();
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter);
let block_root = block.canonical_root();
// WHEN: Trigger fetch blobs on the block
let custody_columns = hashset![0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns,
publish_fn,
)
.await
.expect("fetch blobs should succeed");
// **GIVEN**:
// All blobs returned
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
// block not yet imported into fork choice
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
// All data columns already seen on gossip
mock_adapter
.expect_verify_data_column_for_gossip()
.returning(|c| {
Err(GossipDataColumnError::PriorKnown {
proposer: c.block_proposer_index(),
slot: c.slot(),
index: c.index,
// THEN: Returned blobs are processed and published
assert_eq!(
processing_status,
Some(AvailabilityProcessingStatus::MissingComponents(
block_slot, block_root,
))
);
assert!(
matches!(
extract_published_blobs(publish_fn_args),
EngineGetBlobsOutput::Blobs(blobs) if blobs.len() == blob_count - 1
),
"partial blob results should still be published"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fetch_blobs_v1_block_imported_after_el_response() {
let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK);
let (publish_fn, publish_fn_args) = mock_publish_fn();
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
let block_root = block.canonical_root();
// GIVEN: All blobs returned, but fork choice already imported the block
let blob_and_proof_opts = blobs_and_proofs.into_iter().map(Some).collect::<Vec<_>>();
mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts);
mock_fork_choice_contains_block(&mut mock_adapter, vec![block.canonical_root()]);
// WHEN: Trigger fetch blobs on the block
let custody_columns = hashset![0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns,
publish_fn,
)
.await
.expect("fetch blobs should succeed");
// THEN: Returned blobs should NOT be processed or published.
assert_eq!(processing_status, None);
assert_eq!(
publish_fn_args.lock().unwrap().len(),
0,
"no blobs should be published"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fetch_blobs_v1_no_new_blobs_to_import() {
let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK);
let (publish_fn, publish_fn_args) = mock_publish_fn();
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
let block_root = block.canonical_root();
// **GIVEN**:
// All blobs returned
let blob_and_proof_opts = blobs_and_proofs.into_iter().map(Some).collect::<Vec<_>>();
mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts);
// block not yet imported into fork choice
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
// All blobs already seen on gossip
mock_adapter.expect_verify_blob_for_gossip().returning(|b| {
Err(GossipBlobError::RepeatBlob {
proposer: b.block_proposer_index(),
slot: b.slot(),
index: b.index,
})
});
// No blobs should be processed
mock_adapter.expect_process_engine_blobs().times(0);
// **WHEN**: Trigger `fetch_blobs` on the block
let custody_columns = hashset![0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns.clone(),
publish_fn,
)
.await
.expect("fetch blobs should succeed");
// **WHEN**: Trigger `fetch_blobs` on the block
let custody_columns = hashset![0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns,
publish_fn,
)
.await
.expect("fetch blobs should succeed");
// **THEN**: Should NOT be processed and no columns should be published.
assert_eq!(processing_status, None);
assert_eq!(
publish_fn_args.lock().unwrap().len(),
0,
"no columns should be published"
);
}
// **THEN**: Should NOT be processed and no blobs should be published.
assert_eq!(processing_status, None);
assert_eq!(
publish_fn_args.lock().unwrap().len(),
0,
"no blobs should be published"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fetch_blobs_v2_success() {
let mut mock_adapter = mock_beacon_adapter();
let (publish_fn, publish_fn_args) = mock_publish_fn();
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter);
let block_root = block.canonical_root();
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fetch_blobs_v1_success() {
let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK);
let (publish_fn, publish_fn_args) = mock_publish_fn();
let blob_count = 2;
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, blob_count);
let block_root = block.canonical_root();
// All blobs returned, fork choice doesn't contain block
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
mock_adapter
.expect_verify_data_column_for_gossip()
.returning(|c| Ok(GossipVerifiedDataColumn::__new_for_testing(c)));
mock_process_engine_blobs_result(
&mut mock_adapter,
Ok(AvailabilityProcessingStatus::Imported(block_root)),
);
// All blobs returned, fork choice doesn't contain block
let blob_and_proof_opts = blobs_and_proofs.into_iter().map(Some).collect::<Vec<_>>();
mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts);
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
mock_adapter
.expect_verify_blob_for_gossip()
.returning(|b| Ok(GossipVerifiedBlob::__assumed_valid(b.clone())));
mock_process_engine_blobs_result(
&mut mock_adapter,
Ok(AvailabilityProcessingStatus::Imported(block_root)),
);
// Trigger fetch blobs on the block
let custody_columns = hashset![0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns.clone(),
publish_fn,
)
.await
.expect("fetch blobs should succeed");
// Trigger fetch blobs on the block
let custody_columns = hashset![0, 1, 2];
let processing_status = fetch_and_process_engine_blobs_inner(
mock_adapter,
block_root,
block,
custody_columns,
publish_fn,
)
.await
.expect("fetch blobs should succeed");
assert_eq!(
processing_status,
Some(AvailabilityProcessingStatus::Imported(block_root))
);
// THEN all fetched blobs are processed and published
assert_eq!(
processing_status,
Some(AvailabilityProcessingStatus::Imported(block_root))
);
let published_columns = extract_published_blobs(publish_fn_args);
assert!(
matches!(
published_columns,
EngineGetBlobsOutput::CustodyColumns(columns) if columns.len() == custody_columns.len()
),
"should publish custody columns"
);
let published_blobs = extract_published_blobs(publish_fn_args);
assert!(
matches!(
published_blobs,
EngineGetBlobsOutput::Blobs(blobs) if blobs.len() == blob_count
),
"should publish fetched blobs"
);
}
fn mock_get_blobs_v1_response(
mock_adapter: &mut MockFetchBlobsBeaconAdapter<T>,
blobs_and_proofs_opt: Vec<Option<BlobAndProof<E>>>,
) {
let blobs_and_proofs_v1 = blobs_and_proofs_opt
.into_iter()
.map(|blob_and_proof_opt| {
blob_and_proof_opt.map(|blob_and_proof| match blob_and_proof {
BlobAndProof::V1(inner) => inner,
_ => panic!("BlobAndProofV1 not expected"),
})
})
.collect();
mock_adapter
.expect_get_blobs_v1()
.return_once(move |_| Ok(blobs_and_proofs_v1));
}
}
/// Extract the `EngineGetBlobsOutput` passed to the `publish_fn`.
@@ -257,23 +539,14 @@ fn mock_fork_choice_contains_block(
.returning(move |block_root| block_roots.contains(block_root));
}
fn mock_get_blobs_v2_response(
mock_adapter: &mut MockFetchBlobsBeaconAdapter<T>,
blobs_and_proofs_opt: Option<Vec<BlobAndProofV2<E>>>,
) {
mock_adapter
.expect_get_blobs_v2()
.return_once(move |_| Ok(blobs_and_proofs_opt));
}
fn create_test_block_and_blobs(
mock_adapter: &MockFetchBlobsBeaconAdapter<T>,
) -> (Arc<SignedBeaconBlock<E>>, Vec<BlobAndProofV2<E>>) {
let mut block = SignedBeaconBlock::Fulu(SignedBeaconBlockFulu {
message: BeaconBlockFulu::empty(mock_adapter.spec()),
signature: Signature::empty(),
});
let (blobs_bundle, _tx) = generate_blobs::<E>(2, block.fork_name_unchecked()).unwrap();
blob_count: usize,
) -> (Arc<SignedBeaconBlock<E>>, Vec<BlobAndProof<E>>) {
let mut block =
SignedBeaconBlock::from_block(BeaconBlock::empty(mock_adapter.spec()), Signature::empty());
let fork = block.fork_name_unchecked();
let (blobs_bundle, _tx) = generate_blobs::<E>(blob_count, fork).unwrap();
let BlobsBundle {
commitments,
proofs,
@@ -286,16 +559,27 @@ fn create_test_block_and_blobs(
.blob_kzg_commitments_mut()
.unwrap() = commitments;
let proofs_len = proofs.len() / blobs.len();
let blob_and_proofs: Vec<BlobAndProofV2<E>> = blobs
.into_iter()
.zip(proofs.chunks(proofs_len))
.map(|(blob, proofs)| BlobAndProofV2 {
blob,
proofs: proofs.to_vec().into(),
})
.collect();
(Arc::new(block), blob_and_proofs)
let blobs_and_proofs = if fork.fulu_enabled() {
let proofs_len = proofs.len() / blobs.len();
blobs
.into_iter()
.zip(proofs.chunks(proofs_len))
.map(|(blob, proofs)| {
BlobAndProof::V2(BlobAndProofV2 {
blob,
proofs: proofs.to_vec().into(),
})
})
.collect()
} else {
blobs
.into_iter()
.zip(proofs)
.map(|(blob, proof)| BlobAndProof::V1(BlobAndProofV1 { blob, proof }))
.collect()
};
(Arc::new(block), blobs_and_proofs)
}
#[allow(clippy::type_complexity)]
@@ -313,9 +597,9 @@ fn mock_publish_fn() -> (
(publish_fn, captured_args)
}
fn mock_beacon_adapter() -> MockFetchBlobsBeaconAdapter<T> {
fn mock_beacon_adapter(fork_name: ForkName) -> MockFetchBlobsBeaconAdapter<T> {
let test_runtime = TestRuntime::default();
let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec()));
let spec = Arc::new(fork_name.make_genesis_spec(E::default_spec()));
let kzg = get_kzg(&spec);
let mut mock_adapter = MockFetchBlobsBeaconAdapter::default();

View File

@@ -161,6 +161,7 @@ pub trait ObservationStrategy {
/// Type for messages that are observed immediately.
pub struct Observe;
/// Type for messages that have not been observed.
#[derive(Debug)]
pub struct DoNotObserve;
impl ObservationStrategy for Observe {