Only mark block lookups as pending if block is importing from gossip (#8112)

- PR https://github.com/sigp/lighthouse/pull/8045 introduced a regression of how lookup sync interacts with the da_checker.

Now in unstable block import from the HTTP API also insert the block in the da_checker while the block is being execution verified. If lookup sync finds the block in the da_checker in `NotValidated` state it expects a `GossipBlockProcessResult` message sometime later. That message is only sent after block import in gossip.

I confirmed in our node's logs for 4/4 cases of stuck lookups are caused by this sequence of events:
- Receive block through API, insert into da_checker in fn process_block in put_pre_execution_block
- Create lookup and leave in AwaitingDownload(block in processing cache) state
- Block from HTTP API finishes importing
- Lookup is left stuck

Closes https://github.com/sigp/lighthouse/issues/8104


  - https://github.com/sigp/lighthouse/pull/8110 was my initial solution attempt but we can't send the `GossipBlockProcessResult` event from the `http_api` crate without adding new channels, which seems messy.

For a given node it's rare that a lookup is created at the same time that a block is being published. This PR solves https://github.com/sigp/lighthouse/issues/8104 by allowing lookup sync to import the block twice in that case.


Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com>
This commit is contained in:
Lion - dapplion
2025-09-25 05:52:27 +02:00
committed by GitHub
parent 79b33214ea
commit ffa7b2b2b9
8 changed files with 63 additions and 33 deletions

View File

@@ -404,7 +404,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
if self.check_caches == CheckCaches::Yes {
match self.beacon_chain.get_block_process_status(&root) {
BlockProcessStatus::Unknown => None,
BlockProcessStatus::NotValidated(block)
BlockProcessStatus::NotValidated(block, _)
| BlockProcessStatus::ExecutionValidated(block) => {
metrics::inc_counter(&metrics::BEACON_REQRESP_PRE_IMPORT_CACHE_HITS);
Some(block)

View File

@@ -334,7 +334,7 @@ pub enum BlockProcessStatus<E: EthSpec> {
/// Block is not in any pre-import cache. Block may be in the data-base or in the fork-choice.
Unknown,
/// Block is currently processing but not yet validated.
NotValidated(Arc<SignedBeaconBlock<E>>),
NotValidated(Arc<SignedBeaconBlock<E>>, BlockImportSource),
/// Block is fully valid, but not yet imported. It's cached in the da_checker while awaiting
/// missing block components.
ExecutionValidated(Arc<SignedBeaconBlock<E>>),
@@ -3351,8 +3351,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
}
self.data_availability_checker
.put_pre_execution_block(block_root, unverified_block.block_cloned())?;
self.data_availability_checker.put_pre_execution_block(
block_root,
unverified_block.block_cloned(),
block_source,
)?;
// Start the Prometheus timer.
let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES);

View File

@@ -21,8 +21,8 @@ use task_executor::TaskExecutor;
use tracing::{debug, error, instrument};
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{
BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256,
SignedBeaconBlock, Slot,
BlobSidecarList, BlockImportSource, ChainSpec, DataColumnSidecar, DataColumnSidecarList, Epoch,
EthSpec, Hash256, SignedBeaconBlock, Slot,
};
mod error;
@@ -354,9 +354,10 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
&self,
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
source: BlockImportSource,
) -> Result<(), Error> {
self.availability_cache
.put_pre_execution_block(block_root, block)
.put_pre_execution_block(block_root, block, source)
}
/// Removes a pre-execution block from the cache.

View File

@@ -19,13 +19,14 @@ use tracing::{Span, debug, debug_span};
use types::beacon_block_body::KzgCommitments;
use types::blob_sidecar::BlobIdentifier;
use types::{
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec,
Hash256, RuntimeFixedVector, RuntimeVariableList, SignedBeaconBlock,
BlobSidecar, BlockImportSource, ChainSpec, ColumnIndex, DataColumnSidecar,
DataColumnSidecarList, Epoch, EthSpec, Hash256, RuntimeFixedVector, RuntimeVariableList,
SignedBeaconBlock,
};
#[derive(Clone)]
pub enum CachedBlock<E: EthSpec> {
PreExecution(Arc<SignedBeaconBlock<E>>),
PreExecution(Arc<SignedBeaconBlock<E>>, BlockImportSource),
Executed(Box<DietAvailabilityPendingExecutedBlock<E>>),
}
@@ -42,7 +43,7 @@ impl<E: EthSpec> CachedBlock<E> {
fn as_block(&self) -> &SignedBeaconBlock<E> {
match self {
CachedBlock::PreExecution(b) => b,
CachedBlock::PreExecution(b, _) => b,
CachedBlock::Executed(b) => b.as_block(),
}
}
@@ -135,9 +136,13 @@ impl<E: EthSpec> PendingComponents<E> {
/// Inserts a pre-execution block into the cache.
/// This does NOT override an existing executed block.
pub fn insert_pre_execution_block(&mut self, block: Arc<SignedBeaconBlock<E>>) {
pub fn insert_pre_execution_block(
&mut self,
block: Arc<SignedBeaconBlock<E>>,
source: BlockImportSource,
) {
if self.block.is_none() {
self.block = Some(CachedBlock::PreExecution(block))
self.block = Some(CachedBlock::PreExecution(block, source))
}
}
@@ -433,7 +438,9 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
.peek(block_root)
.and_then(|pending_components| {
pending_components.block.as_ref().map(|block| match block {
CachedBlock::PreExecution(b) => BlockProcessStatus::NotValidated(b.clone()),
CachedBlock::PreExecution(b, source) => {
BlockProcessStatus::NotValidated(b.clone(), *source)
}
CachedBlock::Executed(b) => {
BlockProcessStatus::ExecutionValidated(b.block_cloned())
}
@@ -693,11 +700,12 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
&self,
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
source: BlockImportSource,
) -> Result<(), AvailabilityCheckError> {
let epoch = block.epoch();
let pending_components =
self.update_or_insert_pending_components(block_root, epoch, |pending_components| {
pending_components.insert_pre_execution_block(block);
pending_components.insert_pre_execution_block(block, source);
Ok(())
})?;
@@ -718,7 +726,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
/// This does NOT remove an existing executed block.
pub fn remove_pre_execution_block(&self, block_root: &Hash256) {
// The read lock is immediately dropped so we can safely remove the block from the cache.
if let Some(BlockProcessStatus::NotValidated(_)) = self.get_cached_block(block_root) {
if let Some(BlockProcessStatus::NotValidated(_, _)) = self.get_cached_block(block_root) {
self.critical.write().pop(block_root);
}
}
@@ -1459,9 +1467,13 @@ mod pending_components_tests {
let mut pending_component = <PendingComponents<E>>::empty(block_root, max_len);
let pre_execution_block = Arc::new(pre_execution_block);
pending_component.insert_pre_execution_block(pre_execution_block.clone());
pending_component
.insert_pre_execution_block(pre_execution_block.clone(), BlockImportSource::Gossip);
assert!(
matches!(pending_component.block, Some(CachedBlock::PreExecution(_))),
matches!(
pending_component.block,
Some(CachedBlock::PreExecution(_, _))
),
"pre execution block inserted"
);
@@ -1471,7 +1483,8 @@ mod pending_components_tests {
"executed block inserted"
);
pending_component.insert_pre_execution_block(pre_execution_block);
pending_component
.insert_pre_execution_block(pre_execution_block, BlockImportSource::Gossip);
assert!(
matches!(pending_component.block, Some(CachedBlock::Executed(_))),
"executed block should remain"

View File

@@ -219,7 +219,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
// can assert that this is the correct value of `blob_kzg_commitments_count`.
match cx.chain.get_block_process_status(&self.block_root) {
BlockProcessStatus::Unknown => None,
BlockProcessStatus::NotValidated(block)
BlockProcessStatus::NotValidated(block, _)
| BlockProcessStatus::ExecutionValidated(block) => Some(block.clone()),
}
}) {

View File

@@ -49,8 +49,8 @@ use tokio::sync::mpsc;
use tracing::{Span, debug, debug_span, error, warn};
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, ForkContext,
Hash256, SignedBeaconBlock, Slot,
BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
ForkContext, Hash256, SignedBeaconBlock, Slot,
};
pub mod custody;
@@ -835,14 +835,26 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
match self.chain.get_block_process_status(&block_root) {
// Unknown block, continue request to download
BlockProcessStatus::Unknown => {}
// Block is known are currently processing, expect a future event with the result of
// processing.
BlockProcessStatus::NotValidated { .. } => {
// Lookup sync event safety: If the block is currently in the processing cache, we
// are guaranteed to receive a `SyncMessage::GossipBlockProcessResult` that will
// make progress on this lookup
return Ok(LookupRequestResult::Pending("block in processing cache"));
}
// Block is known and currently processing. Imports from gossip and HTTP API insert the
// block in the da_cache. However, HTTP API is unable to notify sync when it completes
// block import. Returning `Pending` here will result in stuck lookups if the block is
// importing from sync.
BlockProcessStatus::NotValidated(_, source) => match source {
BlockImportSource::Gossip => {
// Lookup sync event safety: If the block is currently in the processing cache, we
// are guaranteed to receive a `SyncMessage::GossipBlockProcessResult` that will
// make progress on this lookup
return Ok(LookupRequestResult::Pending("block in processing cache"));
}
BlockImportSource::Lookup
| BlockImportSource::RangeSync
| BlockImportSource::HttpApi => {
// Lookup, RangeSync or HttpApi block import don't emit the GossipBlockProcessResult
// event. If a lookup happens to be created during block import from one of
// those sources just import the block twice. Otherwise the lookup will get
// stuck. Double imports are fine, they just waste resources.
}
},
// Block is fully validated. If it's not yet imported it's waiting for missing block
// components. Consider this request completed and do nothing.
BlockProcessStatus::ExecutionValidated { .. } => {

View File

@@ -41,8 +41,8 @@ use slot_clock::{SlotClock, TestingSlotClock};
use tokio::sync::mpsc;
use tracing::info;
use types::{
BeaconState, BeaconStateBase, BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, ForkName,
Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot,
BeaconState, BeaconStateBase, BlobSidecar, BlockImportSource, DataColumnSidecar, EthSpec,
ForkContext, ForkName, Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot,
data_column_sidecar::ColumnIndex,
test_utils::{SeedableRng, TestRandom, XorShiftRng},
};
@@ -1113,7 +1113,7 @@ impl TestRig {
self.harness
.chain
.data_availability_checker
.put_pre_execution_block(block.canonical_root(), block)
.put_pre_execution_block(block.canonical_root(), block, BlockImportSource::Gossip)
.unwrap();
}

View File

@@ -843,6 +843,7 @@ impl<'de, E: EthSpec, Payload: AbstractExecPayload<E>> ContextDeserialize<'de, F
}
}
#[derive(Clone, Copy)]
pub enum BlockImportSource {
Gossip,
Lookup,