Ensure we can serve blocks and columns after head event is emitted (#9338)

See related issue: https://github.com/ethpandaops/dora/pull/713

When LH emits a `head` event the block isn't written to disk yet. Some upstream consumers may expect that after a `head` event that the block should be queryable via the beacon api. This PR falls back to fetching the block from the early attester cache if it wasn't found in the store. This should ensure that a block is always queryable immediately after a `head` event is emitted.

Additionally I noticed that when serving columns we always default to using the store. We already have `get_data_columns_checking_all_caches ` which tries the da cache, then the store and finally the early attester cache.


  


Co-Authored-By: Eitan Seri-Levi <eserilev@ucsc.edu>

Co-Authored-By: Michael Sproul <michael@sigmaprime.io>
This commit is contained in:
Eitan Seri-Levi
2026-05-24 22:09:38 -07:00
committed by GitHub
parent 4903fff430
commit dfb259171a

View File

@@ -129,6 +129,15 @@ impl BlockId {
.is_finalized_block(root, block_slot)
.map_err(warp_utils::reject::unhandled_error)?;
Ok((*root, execution_optimistic, finalized))
} else if chain.early_attester_cache.get_block(*root).is_some() {
// Fall back to the early attester cache for blocks that are in fork choice
// but haven't been written to disk yet.
let execution_optimistic = chain
.canonical_head
.fork_choice_read_lock()
.is_optimistic_or_invalid_block(root)
.unwrap_or(false);
Ok((*root, execution_optimistic, false))
} else {
Err(warp_utils::reject::custom_not_found(format!(
"beacon block with root {}",
@@ -143,9 +152,18 @@ impl BlockId {
root: &Hash256,
chain: &BeaconChain<T>,
) -> Result<Option<SignedBlindedBeaconBlock<T::EthSpec>>, warp::Rejection> {
chain
if let Some(block) = chain
.get_blinded_block(root)
.map_err(warp_utils::reject::unhandled_error)
.map_err(warp_utils::reject::unhandled_error)?
{
return Ok(Some(block));
}
// Fall back to the early attester cache for blocks that are in fork choice
// but haven't been written to disk yet.
Ok(chain
.early_attester_cache
.get_block(*root)
.map(|b| b.clone_as_blinded()))
}
/// Return the `SignedBeaconBlock` identified by `self`.
@@ -253,20 +271,20 @@ impl BlockId {
}
_ => {
let (root, execution_optimistic, finalized) = self.root(chain)?;
chain
let block_opt = chain
.get_block(&root)
.await
.map_err(warp_utils::reject::unhandled_error)
.and_then(|block_opt| {
block_opt
.map(|block| (Arc::new(block), execution_optimistic, finalized))
.map_err(warp_utils::reject::unhandled_error)?;
let block = block_opt
.map(Arc::new)
.or_else(|| chain.early_attester_cache.get_block(root))
.ok_or_else(|| {
warp_utils::reject::custom_not_found(format!(
"beacon block with root {}",
root
))
})
})
})?;
Ok((block, execution_optimistic, finalized))
}
}
}
@@ -290,16 +308,20 @@ impl BlockId {
}
let data_column_sidecars = if let Some(indices) = query.indices {
indices
.iter()
.filter_map(|index| chain.get_data_column(&root, index, fork_name).transpose())
.collect::<Result<DataColumnSidecarList<T::EthSpec>, _>>()
chain
.get_data_columns_checking_all_caches(root, &indices)
.map_err(warp_utils::reject::unhandled_error)?
} else {
chain
.early_attester_cache
.get_data_columns(root)
.map(Ok)
.unwrap_or_else(|| {
chain
.get_data_columns(&root, fork_name)
.map(|opt| opt.unwrap_or_default())
})
.map_err(warp_utils::reject::unhandled_error)?
.unwrap_or_default()
};
let fork_name = block
@@ -507,3 +529,177 @@ impl fmt::Display for BlockId {
write!(f, "{}", self.0)
}
}
#[cfg(test)]
mod tests {
use super::*;
use beacon_chain::{
PayloadVerificationStatus,
block_verification_types::{AvailableBlockData, RangeSyncBlock},
test_utils::{
BeaconChainHarness, EphemeralHarnessType, fork_name_from_env,
generate_data_column_sidecars_from_block,
},
};
use std::time::Duration;
use types::MinimalEthSpec;
type TestHarness = BeaconChainHarness<EphemeralHarnessType<MinimalEthSpec>>;
fn harness() -> TestHarness {
BeaconChainHarness::builder(MinimalEthSpec)
.default_spec()
.deterministic_keypairs(8)
.fresh_ephemeral_store()
.mock_execution_layer()
.build()
}
#[tokio::test]
async fn root_uses_early_attester_cache_for_unpersisted_block() {
let Some(fork_name) = fork_name_from_env().filter(|fork_name| fork_name.fulu_enabled())
else {
return;
};
let harness = harness();
let chain = &harness.chain;
harness.execution_block_generator().set_min_blob_count(1);
harness.advance_slot();
let (block_contents, post_state) = harness
.make_block(harness.get_current_state(), harness.get_current_slot())
.await;
let (block, _) = block_contents;
let block_root = block.canonical_root();
let block_fork_name = chain.spec.fork_name_at_epoch(block.epoch());
assert_eq!(
block_fork_name, fork_name,
"precondition: test block must be produced at {fork_name:?}"
);
assert!(
block.num_expected_blobs() > 0,
"precondition: {fork_name:?} test block must have blobs that can be converted to data columns"
);
assert!(
!chain.store.block_exists(&block_root).unwrap(),
"precondition: test block must not be persisted"
);
assert!(
chain.get_blinded_block(&block_root).unwrap().is_none(),
"precondition: test block must not be retrievable from the store"
);
assert!(
chain
.get_data_columns(&block_root, block_fork_name)
.unwrap()
.is_none(),
"precondition: test data columns must not be retrievable from the store"
);
assert!(
!chain.block_is_known_to_fork_choice(&block_root),
"precondition: test block must not be imported into fork choice yet"
);
let sampling_columns = chain.sampling_columns_for_epoch(block.epoch());
let data_columns = generate_data_column_sidecars_from_block(&block, &chain.spec)
.into_iter()
.filter(|column| sampling_columns.contains(column.index()))
.collect::<Vec<_>>();
assert!(
!data_columns.is_empty(),
"precondition: {fork_name:?} test block must produce data columns"
);
let available_block = RangeSyncBlock::new(
block.clone(),
AvailableBlockData::new_with_data_columns(data_columns),
&chain.data_availability_checker,
chain.spec.clone(),
)
.unwrap()
.into_available_block();
let current_slot = harness.get_current_slot();
let cached_head = chain.canonical_head.cached_head();
let canonical_head_proposer_index = chain
.canonical_head_proposer_index(current_slot, &cached_head)
.unwrap();
chain
.canonical_head
.fork_choice_write_lock()
.on_block(
current_slot,
block.message(),
block_root,
Duration::ZERO,
&post_state,
PayloadVerificationStatus::Verified,
canonical_head_proposer_index,
&chain.spec,
)
.unwrap();
assert!(
chain.block_is_known_to_fork_choice(&block_root),
"precondition: test block must be imported into fork choice"
);
assert!(
!chain.store.block_exists(&block_root).unwrap(),
"precondition: fork choice insertion must not persist the block"
);
let proto_block = chain
.canonical_head
.fork_choice_read_lock()
.get_block(&block_root)
.unwrap();
chain
.early_attester_cache
.add_head_block(block_root, &available_block, proto_block, &post_state)
.unwrap();
let cached_data_columns = chain
.early_attester_cache
.get_data_columns(block_root)
.expect("precondition: data columns must be cached");
assert!(
!cached_data_columns.is_empty(),
"precondition: cached data columns must be non-empty"
);
assert_eq!(
BlockId(CoreBlockId::Root(block_root)).root(chain).unwrap(),
(block_root, false, false)
);
let (blinded_block, execution_optimistic, finalized) =
BlockId(CoreBlockId::Root(block_root))
.blinded_block(chain)
.unwrap();
assert_eq!(blinded_block.canonical_root(), block_root);
assert_eq!(blinded_block.slot(), block.slot());
assert!(!execution_optimistic);
assert!(!finalized);
let (data_columns, data_columns_fork_name, execution_optimistic, finalized) =
BlockId(CoreBlockId::Root(block_root))
.get_data_columns(DataColumnIndicesQuery { indices: None }, chain)
.unwrap();
assert_eq!(data_columns, cached_data_columns);
assert_eq!(data_columns_fork_name, fork_name);
assert!(!execution_optimistic);
assert!(!finalized);
chain.early_attester_cache.clear();
assert!(
BlockId(CoreBlockId::Root(block_root)).root(chain).is_err(),
"root lookup should fail once the unpersisted block leaves the early attester cache"
);
}
}